Mercurial > hg > monetdb-ruby
view lib/MonetDBData.rb @ 10:2ebc526bc7dd
Updated copyright.
author | Sjoerd Mullender <sjoerd@acm.org> |
---|---|
date | Fri, 06 Jan 2017 13:16:10 +0100 (2017-01-06) |
parents | aab36be83762 |
children | 83db5acbd01a |
line wrap: on
line source
# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. # # Copyright 1997 - July 2008 CWI, August 2008 - 2017 MonetDB B.V. # Models a MonetDB RecordSet require 'time' require 'date' require 'ostruct' require "bigdecimal" require_relative 'MonetDBConnection' require 'logger' class MonetDBData @@DEBUG = false def initialize(connection) @connection = connection @lang = @connection.lang # Structure containing the header+results set for a fired Q_TABLE query @header = [] @query = {} @record_set = [] @index = 0 # Position of the last returned record @row_count = 0 @row_offset = 10 @row_index = Integer(MonetDBConnection::REPLY_SIZE) end # Fire a query and return the server response def execute(q) # fire a query and get ready to receive the data @connection.send(format_query(q)) data = @connection.receive return if data == nil record_set = "" # temporarly store retrieved rows record_set = receive_record_set(data) if (@lang == MonetDBConnection::LANG_SQL) rows = receive_record_set(data) # the fired query is a SELECT; store and return the whole record set if @action == MonetDBConnection::Q_TABLE @header = parse_header_table(@header) @header.freeze if @row_index.to_i < @row_count.to_i block_rows = "" while next_block data = @connection.receive block_rows += receive_record_set(data) end record_set += block_rows end end # ruby string management seems to not properly understand the MSG_PROMPT escape character. # In order to avoid data loss the @record_set array is built once that all tuples have been retrieved @record_set = record_set.split("\t]\n") if @record_set.length != @query['rows'].to_i raise MonetDBQueryError, "Warning: Query #{@query['id']} declared to result in #{@query['rows']} but #{@record_set.length} returned instead" end end @record_set.freeze end # Returns the record set entries hashed by column name orderd by column position def fetch_all_as_column_hash columns = {} @header["columns_name"].each do |col_name| columns[col_name] = fetch_by_column_name(col_name) end return columns end # returns a record hash (i.e: { id: 1, name: "John Doe", age: 42 } ) def fetch_hash return false if @index >= @query['rows'].to_i record_hash = record_hash(parse_tuple(@record_set[@index])) @index += 1 return record_hash end # loops through all the hashes of the records and yields them to a given block def each_record_as_hash @record_set.each do |record| parsed_record = parse_tuple(record) yield(record_hash(parsed_record)) end end # Returns the values for the column 'field' def fetch_by_column_name(column_name="") position = @header["columns_order"].fetch(column_name) column_values = [] @record_set.each do |row| column_values << parse_tuple(row)[position] end return column_values end # fetches a single record, updates the iterator index def fetch return false if @index >= @query['rows'].to_i result = parse_tuple(@record_set[@index]) @index += 1 return result end # resets the internal iterator index used by fetch and fetch_hash def reset_index @index = 0 end # loops through all records and yields to a given block paramter def each_record raise MonetDBDataError, "There is no record set currently available" unless @query['type'] == MonetDBConnection::Q_TABLE @record_set.each { |record| yield(parse_tuple(record)) } end # Cursor method that returns all the records def fetch_all result = [] each_record do |record| result.push(record) end return result end # Returns the number of rows in the record set def num_rows return @query['rows'].to_i end # Returns the number of fields in the record set def num_fields return @query['columns'].to_i end # Returns the (ordered) name of the columns in the record set def name_fields return @header['columns_name'] end # Returns the (ordered) name of the columns in the record set def type_fields return @header['columns_type'] end # =================== private # =================== # store block of data, parse it and store it. def receive_record_set(response) rows = "" response.each_line do |row| case row[0] when MonetDBConnection::MSG_QUERY then parse_query(row) when MonetDBConnection::MSG_INFO then raise MonetDBQueryError, row when MonetDBConnection::MSG_SCHEMA_HEADER then @header << row when MonetDBConnection::MSG_TUPLE then rows += row when MonetDBConnection::MSG_PROMPT then return rows end end return rows # return an array of unparsed tuples end def parse_query(row) case row[1] when MonetDBConnection::Q_TABLE @action = MonetDBConnection::Q_TABLE @query = parse_header_query(row) @query.freeze @row_count = @query['rows'].to_i #total number of rows in table when MonetDBConnection::Q_BLOCK @action = MonetDBConnection::Q_BLOCK # strip the block header from data @block = parse_header_query(row) when MonetDBConnection::Q_TRANSACTION @action = MonetDBConnection::Q_TRANSACTION when MonetDBConnection::Q_CREATE @action = MonetDBConnection::Q_CREATE end end def record_hash(record) result = {} @header["columns_name"].each do |column_name| position = @header["columns_order"].fetch(column_name) result[column_name] = record[position] end return result end def next_block if @row_index == @row_count return false else # The increment step is small to better deal with ruby socket's performance. # For larger values of the step performance drop; # @row_offset = [@row_offset, (@row_count - @row_index)].min # export offset amount @connection.set_export(@query['id'], @row_index.to_s, @row_offset.to_s) @row_index += @row_offset @row_offset += 1 end return true end # Formats a query <i>string</i> so that it can be parsed by the server def format_query(q) if @lang == MonetDBConnection::LANG_SQL return "s" + q + "\n;" else raise LanguageNotSupported, @lang end end # parse one tuple as returned from the server def parse_tuple(tuple) fields = [] # remove trailing "[" tuple = tuple.gsub(/^\[\s+/,'') tuple.split(/,\t/).each_with_index do |field, index| field_value = convert_type(field, index) fields << field_value end return fields.freeze end # converts the given value the correct type def convert_type(value, index) return nil if "NULL" == value.upcase return case type_fields.values[index] when "int", "tinyint", "smallint", "bigint", "hugeint" then value.to_i when "double", "real", "decimal" then value.to_f when "boolean" then value.downcase == true when "date" then Date.parse(value) when "time" then Time.parse(value, Time.new("2000-01-01")) when "timestamp" then DateTime.parse(value) when "timestamptz" then DateTime.parse(value) else value.gsub(/\\/, '').gsub(/^"/,'').gsub(/"$/,'').gsub(/\"/, '') end end # Parses a query header and returns information about the query. def parse_header_query(row) type = row[1].chr if type == MonetDBConnection::Q_TABLE # Performing a SELECT: store informations about the table size, query id, total number of records and returned. id = row.split(' ')[1] rows = row.split(' ')[2] columns = row.split(' ')[3] returned = row.split(' ')[4] header = { "id" => id, "type" => type, "rows" => rows, "columns" => columns, "returned" => returned } elsif type == MonetDBConnection::Q_BLOCK # processing block header id = row.split(' ')[1] columns = row.split(' ')[2] remains = row.split(' ')[3] offset = row.split(' ')[4] header = { "id" => id, "type" => type, "remains" => remains, "columns" => columns, "offset" => offset } else header = {"type" => type} end return header.freeze end # Parses a Q_TABLE header and returns information about the schema. def parse_header_table(header_t) if @query["type"] == MonetDBConnection::Q_TABLE if header_t != nil name_t = header_t[0].split(' ')[1].gsub(/,$/, '') name_cols = Array.new header_t[1].split('%')[1].gsub(/'^\%'/, '').split('#')[0].split(' ').each do |col| name_cols << col.gsub(/,$/, '') end type_cols = { } header_t[2].split('%')[1].gsub(/'^\%'/, '').split('#')[0].split(' ').each_with_index do |col, i| if col.gsub(/,$/, '') != nil type_cols[ name_cols[i] ] = col.gsub(/,$/, '') end end length_cols = { } header_t[3].split('%')[1].gsub(/'^\%'/, '').split('#')[0].split(' ').each_with_index do |col, i| length_cols[ name_cols[i] ] = col.gsub(/,$/, '') end columns_order = {} name_cols.each_with_index do |col, i| columns_order[col] = i end return {"table_name" => name_t, "columns_name" => name_cols, "columns_type" => type_cols, "columns_length" => length_cols, "columns_order" => columns_order}.freeze end end end end