Ruby製の自作RDBMSにて、PostgreSQLプロトコルに対応して psql でメッセージを交換できるようにしてみた

前回の記事にて、 psql と自作のTCPサーバの間でメッセージを交換できるようにしてみました。
Rubyを使って、psqlとメッセージを交換できるPostgreSQL風TCPサーバを作ってみた - メモ的な思考的な

次は、 psql と以前作った自作RDBMSの間でメッセージ交換をしたくなりました。
書籍「Database Design and Implementation」の SimpleDB をベースに、必要最低限の機能を持つ RDBMS を Ruby で実装してみた - メモ的な思考的な

そこで、メッセージ交換できるようにしてみたことから、メモを残します。

 
目次

 

環境

 

実装方針について

psqlとメッセージ交換するために、今回は

  1. 自作RDBMSTCPサーバとして起動できるようにする
  2. 前回作ったPostgreSQLTCPサーバの実装を自作RDBMSへ移植する
  3. psqlから受け取ったSQLをもとに自作RDBMSで処理を行い、結果を返す

の3点を実装していきます

1と2については前回の記事を参考にすればよさそうなので、今回は3をメインでメモしていきます。

 

自作RDBMSTCPサーバとして起動できるようにする

今回は

という実装にしました。

これにより、自作RDBMS

  • gemとして組み込みで使う
  • TCPサーバとして起動する

の両方ができそうと考えたためです。

 
そこで、server/server.rb に以下のファイルを用意し、TCPサーバとして起動できるようにしました。

# frozen_string_literal: true

$LOAD_PATH.unshift File.expand_path("../lib", __dir__)

require 'socket'
require 'tempfile'
require 'simple_ruby_db'

class Server
  private attr_reader :port, :db, :planner

  def initialize(port=25432)
    @port = port
  end

  def start
    puts "Start server on port #{port}"

    Socket.tcp_server_loop(port) do |sock, addr_info|
      startup_phase(sock)

      loop do
        break unless simple_query_phase(sock)
      end

    ensure
      sock.close

      puts 'End'
    end
end

 

PostgreSQLTCPサーバの実装を自作RDBMSへ移植する

移植するだけなので、PostgreSQLTCPサーバの実装をそのままServerクラスのメソッドにしました。

 

psqlから受け取ったSQLをもとに自作RDBMSで処理を行い、結果を返す

今回のメインですので、細かくメモしておきます。

 

initialzeで自作RDBMSインスタンスを生成

クラスの中で自作RDBMSを参照できるよう実装します。なお、ファイル置き場やファイル名は適当です。

def initialize(port=25432)
  @port = port

  # 以下を追加
  data_file = "#{Dir.pwd}/datafile"
  meta_file = "#{Dir.pwd}/metafile"

  File.open(data_file, 'wb') { |f| } unless File.exist?(data_file)
  File.open(meta_file, 'wb') { |f| } unless File.exist?(meta_file)
  @db = SimpleRubyDb::SimpleDb.new(data_file, meta_file)
  @planner = @db.planner
end

 

CREATE TABLEに対応する

CREATE TABLE への対応は容易で、自作RDBMSSQLをそのまま流せばOKです。

なお、簡易的な実装とするため、エラーハンドリングはしません。

def send_command_complete_of_insert(sock, sql)
  planner.execute_update(sql, db.buffer_pool_manager)  # この行だけ追加

  # 以下は同じなので省略
end

 

INSERTに対応する

INSERT への対応も、CREATE TABLE同様となります。

なお、自作RDBMSは1行ずつのINSERTしか想定していないため、psqlに送信するメッセージは固定となります。

def send_command_complete_of_insert(sock, sql)
  planner.execute_update(sql, db.buffer_pool_manager)

  # 今回のRDBMSは1行ずつしかINSERTできないので、行数は1で固定
  value = "INSERT #{OID} 1"

  # 以下は同じなので省略
end

 

SELECTに対応する

一番手間なのがSELECTへの対応です。

対応としては

  • RowDescriptionメッセージで、SELECT句で指定された列の構造を送信する
  • DataRowメッセージで、実際に取得できた値を送信する

という2つの追加実装が必要です。

それぞれ見ていきます。

 

RowDescriptionメッセージの修正

RowDescriptionメッセージでは「自作RDBMSの MetadataManagerを使って、SELECT句で指定された列ごとに、テーブルのSchemaから構造を取得する」を実装する必要があります。

とはいえ、RowDescriptionメッセージで利用する機能は自作RDBMS実装済です。

  • SQLのFROM句で指定したテーブル名を取得する
    • SimpleRubyDb::Parse::Parser.new(sql).query.table_list.first
  • SQLのSELECT句で指定した列を取得する
    • SimpleRubyDb::Parse::Parser.new(sql).query.field_list
  • テーブルのスキーマを取得する
    • db.metadata_manager.layout(table_name(sql)).schema

 
そのため、今回はRDBMSの機能を必要な箇所で利用するだけです。

def send_row_description(sock, sql)
  schema = db.metadata_manager.layout(table_name(sql)).schema
  request_field_list = field_list(sql)
  defs = request_field_list.each_with_index.map { |col_name, col_index| row_definition(schema, col_name, col_index) }

  contents = defs.reduce { |result, item| result + item }

  # length_of_message_contentsはInt32なので4バイト、number_of_columnはInt16なので2バイト
  # 上記2つに各列の内容を合わせたものが、メッセージ全体の長さ
  bytesize_for_length_of_message_contents = 4
  bytesize_for_number_of_column = 2

  message_bytesize = bytesize_for_length_of_message_contents + bytesize_for_number_of_column + contents.length

  # SELECT句で指定された列数を設定
  number_of_field_list = request_field_list.length

  msg = sb('T') + i32b(message_bytesize) + i16b(number_of_field_list) + contents
  sock.write(msg)
end

def table_name(sql)
  # 今回、テーブル名は1つしか指定できない仕様なので、 first で取得して問題ない
  SimpleRubyDb::Parse::Parser.new(sql).query.table_list.first
end

def row_definition(schema, col_name, col_index)
  field_name = sbn(col_name)  # field name には NULL終端文字列 が必要
  object_id_of_table = 16385 # 適当な値
  column_id = col_index + 1

  # schema.field_type と schema.field_length を使って、列の型と列の長さを取得できる
  col_type = schema.field_type(col_name)
  is_integer = col_type == 'integer'

  # https://www.postgresql.jp/document/16/html/catalog-pg-type.html
  pg_type_oid = is_integer ? i32b(23) : i32b(1043)

  pg_type_typlen = is_integer ? i16b(4) : i16b(-1)

  # https://www.postgresql.jp/document/16/html/catalog-pg-attribute.html
  col_length = schema.field_length(col_name)
  pg_attribute_attypmod = is_integer ? i32b(-1) : i32b(col_length + 4)
  format_code = i16b(0)

  field_name + i32b(object_id_of_table) + i16b(column_id) + pg_type_oid + pg_type_typlen + pg_attribute_attypmod + format_code
end

 

DataRowメッセージの修正

RowDescriptionメッセージ同様、自作RDBMS実装済のため利用するだけです。

def send_data_row(sock, sql)
  scan = db.planner.create_query_plan(sql, db.buffer_pool_manager).open

  schema = db.metadata_manager.layout(table_name(sql)).schema
  request_field_list = field_list(sql)
  col_defs = request_field_list.map { |field_name| {name: field_name, type: schema.field_type(field_name)} }

  # SELECT句で指定された列数を設定
  number_of_field_list = request_field_list.length

  row_values = [].tap do |result|
    while scan.next
      result.push(data_row(col_values(col_defs, scan), number_of_field_list))
    end
  end

  selected_rows = row_values.length

  message = row_values.reduce { |result, row| result + row }
  sock.write message

  selected_rows
end

def data_row(col_values, number_of_field_list)
  hex_values = col_values.map do |col_value|
    data_column(col_value)
  end

  contents = hex_values.reduce { |result, value| result + value }

  # length_of_message_contentsはInt32なので4バイト、number_of_columnはInt16なので2バイト
  # 上記2つに各列の内容を合わせたものが、メッセージ全体の長さ
  bytesize_for_length_of_message_contents = 4
  bytesize_for_number_of_column = 2
  message_bytesize = bytesize_for_length_of_message_contents + bytesize_for_number_of_column + contents.length

  sb('D') + i32b(message_bytesize) + i16b(number_of_field_list) + contents
end

def data_column(column_value)
  i32b(column_value.to_s.length) + sb(column_value.to_s)
end

def col_values(col_defs, scan)
  col_defs.map do |col_def|
    col_def[:type] == 'integer' ? scan.get_int(col_def[:name]) : scan.get_string(col_def[:name]).delete_prefix("'").delete_suffix("'")
  end
end

 

CommandCompleteメッセージの修正

SELECTの場合、CommandCompleteメッセージでは取得した件数を設定する必要があります。

そのため、DataRowメッセージを作る際に「取得した件数」を保持しておき、CommandCompleteメッセージで設定すればOKです。

def send_command_complete_of_select(sock, selected_rows)
  value = "SELECT #{selected_rows}"

  # 以下略
end

 

UPDATEに対応する

前回の記事ではUPDATEへの対応は行いませんでした。

一方、自作RDBMSにはUPDATEの機能もあることから、実装を追加します。

## UPDATE向け
def send_command_complete_of_update(sock, sql)
  number_of_updated_rows = planner.execute_update(sql, db.buffer_pool_manager)

  # 今回のRDBMSは1行ずつしかINSERTできないので、行数は1で固定
  value = "UPDATE #{OID} #{number_of_updated_rows}"

  value_length = value.bytesize + 1 # NULL終端文字列の分も長さとして計算する

  # CREATE TABLE同様
  msg = sb('C') + i32b(value_length + LENGTH_OF_MESSAGE_CONTENTS) + sbn(value)
  sock.write msg
end

 

動作確認

まずは、自作RDBMSTCPサーバとして起動します。

$ ruby server.rb
Start server on port 25432

 
続いて、 psql で接続します。

接続後のメッセージが変ですが、ParameterStatusメッセージを返していないので、気にしないことにします。

$ psql postgres postgres -h localhost -p 25432
psql (14.15 (Ubuntu 14.15-0ubuntu0.22.04.1), server 0.0.0)
Type "help" for help.

 
psqlでCREATE TABLEします。

postgres=> create table apples (id int, name varchar(255));
CREATE TABLE

 
psqlで2件、データをINSERTします。

postgres=> insert into apples(id, name) values (1, 'shinano_gold');
INSERT 0 1
postgres=> insert into apples(id, name) values (2, 'fuji');
INSERT 0 1

 
続いてSELECTです。今回はいくつかのパターンを試してみます。

まずは全件のSELECTです。自作RDBMS* には対応していないため、全列指定します。

postgres=> select id, name from apples;
 id |     name     
----+--------------
  1 | shinano_gold
  2 | fuji
(2 rows)

 
次に、WHERE句として where id = 1 を追加します。

postgres=> select id, name from apples where id = 1;
 id |     name     
----+--------------
  1 | shinano_gold
(1 row)

 
続いて、SELECT句の列を name だけにしてみます。

postgres=> select name from apples where id = 1;
     name     
--------------
 shinano_gold
(1 row)

 
では UPDATE を実行します。

postgres=> update apples set name = 'akibae' where id = 2;
UPDATE 1

 
もう一度、全件を取得してみます。

postgres=> select id, name from apples;
 id |     name     
----+--------------
  1 | shinano_gold
  2 | akibae
(2 rows)

 
いずれも想定通りに動作しました。

 

ソースコード

GitHubに上げました。
https://github.com/thinkAmi-sandbox/simple_ruby_db

今回のプルリクはこちら。
https://github.com/thinkAmi-sandbox/simple_ruby_db/pull/2