LCOV - code coverage report
Current view: top level - sql/backends/monet5 - sql_bincopy.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 256 291 88.0 %
Date: 2024-04-25 20:03:45 Functions: 11 11 100.0 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /*
      14             :  * Implementation of COPY BINARY INTO
      15             :  */
      16             : 
      17             : #include "monetdb_config.h"
      18             : #include "mapi_prompt.h"
      19             : #include "gdk.h"
      20             : #include "sql.h"
      21             : #include "mal_backend.h"
      22             : #include "mal_interpreter.h"
      23             : #include "sql_bincopyconvert.h"
      24             : #include "copybinary.h"
      25             : #include "copybinary_support.h"
      26             : 
      27             : 
      28             : #define bailout(...) do { \
      29             :                 msg = createException(MAL, mal_operator, SQLSTATE(42000) __VA_ARGS__); \
      30             :                 goto end; \
      31             :         } while (0)
      32             : 
      33             : 
      34             : static str
      35         152 : load_trivial(BAT *bat, stream *s, const char *filename, bincopy_validate_t validate, int width, BUN rows_estimate, int *eof_seen)
      36             : {
      37         152 :         const char *mal_operator = "sql.importColumn";
      38         152 :         str msg = MAL_SUCCEED;
      39         152 :         int tt = BATttype(bat);
      40         152 :         const size_t asz = (size_t) ATOMsize(tt);
      41         152 :         const size_t chunk_size = 1<<20;
      42             : 
      43         152 :         bool eof = false;
      44         470 :         while (!eof) {
      45         319 :                 assert(chunk_size % asz == 0);
      46         319 :                 size_t n;
      47         319 :                 if (rows_estimate > 0) {
      48             :                         // Set n to estimate+1 so it will read once, get n - 1 and know it's at EOF.
      49             :                         // Otherwise, it will read n, get n, then enlarge the heap, read again,
      50             :                         // and only then know it's at eof.
      51          84 :                         n = rows_estimate + 1;
      52          84 :                         rows_estimate = 0;
      53             :                 } else {
      54         235 :                         n = chunk_size / asz;
      55             :                 }
      56             : 
      57             :                 // First make some room
      58         319 :                 BUN validCount = bat->batCount;
      59         319 :                 BUN newCount = validCount + n;
      60         319 :                 if (BATextend(bat, newCount) != GDK_SUCCEED)
      61           0 :                         bailout("load_trivial: %s", GDK_EXCEPTION);
      62             : 
      63             :                 // Read into the newly allocated space
      64         319 :                 char *start = Tloc(bat, validCount);
      65         319 :                 char *cur = start;
      66         319 :                 char *end = Tloc(bat, newCount);
      67         319 :                 char *validated = start;
      68         945 :                 while (cur < end) {
      69         627 :                         ssize_t nread = mnstr_read(s, cur, 1, end - cur);
      70         627 :                         if (nread < 0)
      71           0 :                                 bailout("load_trivial: %s", mnstr_peek_error(s));
      72         627 :                         if (nread == 0) {
      73         151 :                                 eof = true;
      74         151 :                                 size_t tail = (cur - start) % asz;
      75         151 :                                 if (tail != 0) {
      76           0 :                                         bailout("load_trivial: final item incomplete: %d bytes instead of %d", (int) tail, (int) asz);
      77             :                                 }
      78             :                                 end = cur;
      79             :                         }
      80         627 :                         cur += (size_t) nread;
      81         627 :                         if (validate) {
      82         479 :                                 size_t to_validate = (cur - validated) / asz;
      83         479 :                                 msg = validate(validated, to_validate, width, filename);
      84         479 :                                 if (msg != MAL_SUCCEED)
      85             :                                         break;
      86         478 :                                 validated += to_validate * asz;
      87             :                         }
      88             :                 }
      89         319 :                 if (msg != NULL)
      90           1 :                         goto end;
      91         318 :                 BUN actualCount = validCount + (end - start) / asz;
      92         318 :                 BATsetcount(bat, actualCount);
      93             :         }
      94             : 
      95         151 :         BATsetcount(bat, bat->batCount);
      96         151 :         bat->tseqbase = oid_nil;
      97         151 :         bat->tnonil = bat->batCount == 0;
      98         151 :         bat->tnil = false;
      99         151 :         if (bat->batCount <= 1) {
     100           0 :                 bat->tsorted = true;
     101           0 :                 bat->trevsorted = true;
     102           0 :                 bat->tkey = true;
     103             :         } else {
     104         151 :                 bat->tsorted = false;
     105         151 :                 bat->trevsorted = false;
     106         151 :                 bat->tkey = false;
     107             :         }
     108             : 
     109         152 : end:
     110         152 :         *eof_seen = (int)eof;
     111         152 :         return msg;
     112             : }
     113             : 
     114             : static str
     115          36 : load_fixed_width(BAT *bat, stream *s, const char *filename, int width, bool byteswap, bincopy_decoder_t convert, bincopy_validate_t validate, size_t record_size, int *eof_reached)
     116             : {
     117          36 :         const char *mal_operator = "sql.importColumn";
     118          36 :         str msg = MAL_SUCCEED;
     119          36 :         bstream *bs = NULL;
     120             : 
     121          36 :         if (record_size == 0) {
     122          26 :                 int tt = BATttype(bat);
     123          26 :                 record_size = (size_t) ATOMsize(tt);
     124             :         }
     125             : 
     126             :         // Read whole number of records
     127          62 :         size_t chunk_size = 1<<20;
     128          26 :         assert(record_size > 0);
     129          36 :         chunk_size -= chunk_size % record_size;
     130             : 
     131          36 :         bs = bstream_create(s, chunk_size);
     132          36 :         if (bs == NULL) {
     133           0 :                 msg = createException(SQL, "sql", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     134           0 :                 goto end;
     135             :         }
     136             : 
     137         308 :         while (1) {
     138         172 :                 ssize_t nread = bstream_next(bs);
     139         172 :                 if (nread < 0)
     140           0 :                         bailout("%s", mnstr_peek_error(s));
     141         172 :                 if (nread == 0)
     142             :                         break;
     143             : 
     144         137 :                 size_t n = (bs->len - bs->pos) / record_size;
     145         137 :                 size_t extent = n * record_size;
     146         137 :                 BUN count = BATcount(bat);
     147         137 :                 BUN newCount = count + n;
     148         137 :                 if (BATextend(bat, newCount) != GDK_SUCCEED)
     149           0 :                         bailout("%s", GDK_EXCEPTION);
     150             : 
     151         137 :                 msg = convert(Tloc(bat, count), &bs->buf[bs->pos], n, byteswap);
     152         137 :                 if (validate != NULL && msg == MAL_SUCCEED)
     153          48 :                         msg = validate(Tloc(bat, count), n, width, filename);
     154         137 :                 if (msg != MAL_SUCCEED)
     155           1 :                         goto end;
     156         136 :                 BATsetcount(bat, newCount);
     157         136 :                 bs->pos += extent;
     158             :         }
     159             : 
     160          35 :         bat->tseqbase = oid_nil;
     161          35 :         bat->tnonil = bat->batCount == 0;
     162          35 :         bat->tnil = false;
     163          35 :         if (bat->batCount <= 1) {
     164           0 :                 bat->tsorted = true;
     165           0 :                 bat->trevsorted = true;
     166           0 :                 bat->tkey = true;
     167             :         } else {
     168          35 :                 bat->tsorted = false;
     169          35 :                 bat->trevsorted = false;
     170          35 :                 bat->tkey = false;
     171             :         }
     172             : 
     173          36 : end:
     174          36 :         *eof_reached = 0;
     175          36 :         if (bs != NULL) {
     176          36 :                 *eof_reached = (int)bs->eof;
     177          36 :                 bs->s = NULL;
     178          36 :                 bstream_destroy(bs);
     179             :         }
     180          36 :         return msg;
     181             : }
     182             : 
     183             : 
     184             : static str
     185         217 : load_column(type_record_t *rec, const char *name, BAT *bat, stream *s, int width, bool byteswap, BUN rows_estimate, int *eof_reached)
     186             : {
     187         217 :         const char *mal_operator = "sql.importColumn";
     188         217 :         BUN orig_count, new_count;
     189         217 :         str msg = MAL_SUCCEED;
     190         217 :         BUN rows_added;
     191             : 
     192         217 :         bincopy_loader_t loader = rec->loader;
     193         217 :         bincopy_decoder_t decoder = rec->decoder;
     194         217 :         bool trivial = rec->decoder_trivial;
     195             : 
     196             :         // sanity check
     197         217 :         assert( (loader != NULL) + (decoder != NULL) + trivial == 1); (void)trivial;
     198             : 
     199         217 :         if (rec->trivial_if_no_byteswap && !byteswap)
     200         217 :                 decoder = NULL;
     201             : 
     202         217 :         orig_count = BATcount(bat);
     203             : 
     204         217 :         if (loader) {
     205          29 :                 msg = loader(bat, s, eof_reached, width, byteswap);
     206         188 :         } else if (decoder) {
     207          36 :                 msg = load_fixed_width(bat, s, name, width, byteswap, rec->decoder, rec->validate, rec->record_size, eof_reached);
     208             :         } else {
     209             :                 // load the bytes directly into the bat, as-is
     210         152 :                 msg = load_trivial(bat, s, name, rec->validate, width, rows_estimate, eof_reached);
     211             :         }
     212             : 
     213         217 :         new_count = BATcount(bat);
     214         217 :         rows_added = new_count - orig_count;
     215             : 
     216         217 :         if (msg == MAL_SUCCEED && rows_estimate != 0 && rows_estimate != rows_added)
     217           3 :                 bailout(
     218             :                         "inconsistent row count in %s: expected "BUNFMT", got "BUNFMT,
     219             :                         name,
     220             :                         rows_estimate, rows_added);
     221             : 
     222         214 :         end:
     223         217 :                 return msg;
     224             : }
     225             : 
     226             : /* Import a single file into a new BAT.
     227             :  */
     228             : static str
     229         217 : import_column(backend *be, bat *ret, BUN *retcnt, str method, int width, bool byteswap, str path, int onclient,  BUN nrows)
     230             : {
     231             :         // In this function we create the BAT and open the file, and tidy
     232             :         // up when things go wrong. The actual work happens in load_column().
     233             : 
     234         217 :         const str mal_operator = "sql.importColumn";
     235             : 
     236             :         // These are managed by the end: block.
     237         217 :         str msg = MAL_SUCCEED;
     238         217 :         int gdk_type;
     239         217 :         BAT *bat = NULL;
     240         217 :         int eof_reached = -1; // 1 = read to the end; 0 = stopped reading early; -1 = unset, a bug.
     241         217 :         stream *s = NULL;
     242             : 
     243             :         // Set safe values
     244         217 :         *ret = 0;
     245         217 :         *retcnt = 0;
     246             : 
     247             :         // Figure out what kind of data we have
     248         217 :         type_record_t *rec = find_type_rec(method);
     249         217 :         if (rec == NULL)
     250           0 :                 bailout("COPY BINARY FROM not implemented for '%s'", method);
     251             : 
     252             :         // Create the BAT
     253         217 :         gdk_type = ATOMindex(rec->gdk_type);
     254         217 :         if (gdk_type < 0)
     255           0 :                 bailout("cannot load %s as %s: unknown atom type %s", path, method, rec->gdk_type);
     256         217 :         bat = COLnew(0, gdk_type, nrows, PERSISTENT);
     257         217 :         if (bat == NULL)
     258           0 :                 bailout("%s", GDK_EXCEPTION);
     259             : 
     260             :         // Open the input stream
     261         217 :         if (onclient) {
     262         109 :                 s = mapi_request_upload(path, true, be->mvc->scanner.rs, be->mvc->scanner.ws);
     263             :         } else {
     264         108 :                 s = open_rstream(path);
     265             :         }
     266         217 :         if (!s) {
     267           0 :                 bailout("%s", mnstr_peek_error(NULL));
     268             :         }
     269             : 
     270             :         // Do the work
     271         217 :         msg = load_column(rec, path, bat, s, width, byteswap, nrows, &eof_reached);
     272         217 :         if (eof_reached != 0 && eof_reached != 1) {
     273           0 :                 if (msg)
     274           0 :                         bailout("internal error in sql.importColumn: eof_reached not set (%s). Earlier error: %s", method, msg);
     275             :                 else
     276           0 :                         bailout("internal error in sql.importColumn: eof_reached not set (%s)", method);
     277             :         }
     278             : 
     279             :         // Fall through into the end block which will clean things up
     280         217 : end:
     281         217 :         if (s)
     282         217 :                 close_stream(s);
     283             : 
     284             :         // Manage the return values and `bat`.
     285         217 :         if (msg == MAL_SUCCEED) {
     286         207 :                 BBPkeepref(bat);
     287         207 :                 *ret = bat->batCacheid;
     288         207 :                 *retcnt = BATcount(bat);
     289             :         } else {
     290          10 :                 if (bat != NULL) {
     291          10 :                         BBPunfix(bat->batCacheid);
     292          10 :                         bat = NULL;
     293             :                 }
     294          10 :                 *ret = 0;
     295          10 :                 *retcnt = 0;
     296             :         }
     297             : 
     298         217 :         return msg;
     299             : }
     300             : 
     301             : 
     302             : str
     303         217 : mvc_bin_import_column_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     304             : {
     305             :         // Entry point for sql.importColumn.
     306             :         // Does the argument/return handling, the work is done by importColumn.
     307         217 :         (void)mb;
     308             : 
     309         217 :         assert(pci->retc == 2);
     310         217 :         bat *ret = getArgReference_bat(stk, pci, 0);
     311         217 :         BUN *retcnt = getArgReference_oid(stk, pci, 1);
     312             : 
     313         217 :         assert(pci->argc == 8);
     314         217 :         str method = *getArgReference_str(stk, pci, 2);
     315         217 :         int width = *getArgReference_int(stk, pci, 3);
     316         217 :         bit byteswap = *getArgReference_bit(stk, pci, 4);
     317         217 :         str path = *getArgReference_str(stk, pci, 5);
     318         217 :         int onclient = *getArgReference_int(stk, pci, 6);
     319         217 :         BUN nrows = *getArgReference_oid(stk, pci, 7);
     320             : 
     321         217 :         backend *be = cntxt->sqlcontext;
     322             : 
     323         217 :         return import_column(be, ret, retcnt, method, width, byteswap, path, onclient, nrows);
     324             : }
     325             : 
     326             : 
     327             : 
     328             : static str
     329        2867 : write_out(const char *start, const char *end, stream *s)
     330             : {
     331        2867 :         const char *mal_operator = "sql.export_bin_column";
     332        2867 :         str msg = MAL_SUCCEED;
     333             : 
     334        2867 :         const char *p = start;
     335        5734 :         while (p < end) {
     336        2867 :                 ssize_t nwritten = mnstr_write(s, p, 1, end - p);
     337        2867 :                 if (nwritten < 0)
     338           0 :                         bailout("%s", mnstr_peek_error(s));
     339        2867 :                 if (nwritten == 0)
     340           0 :                         bailout("Unexpected EOF on %s", mnstr_name(s));
     341        2867 :                 p += nwritten;
     342             :         }
     343        2867 : end:
     344        2867 :         return msg;
     345             : }
     346             : 
     347             : static str
     348        2757 : dump_trivial(BAT *b, stream *s, BUN start, BUN length)
     349             : {
     350        2757 :         assert(!ATOMvarsized(BATttype(b)));
     351        2757 :         BUN end = start + length;
     352        2757 :         assert(end <= BATcount(b));
     353        2757 :         return write_out(Tloc(b, start), Tloc(b, end), s);
     354             : }
     355             : 
     356             : static str
     357          20 : dump_fixed_width(BAT *b, stream *s, BUN start, BUN length, bool byteswap, bincopy_encoder_t encoder, size_t record_size)
     358             : {
     359          20 :         const char *mal_operator = "sql.export_bin_column";
     360          20 :         str msg = MAL_SUCCEED;
     361          20 :         char *buffer = NULL;
     362             : 
     363          20 :         BUN end = start + length;
     364          20 :         assert(end <= BATcount(b));
     365             : 
     366          20 :         if (record_size == 0) {
     367          12 :                 int tt = BATttype(b);
     368          12 :                 record_size = (size_t) ATOMsize(tt);
     369             :         }
     370          20 :         size_t buffer_size = 1024 * 1024;
     371          20 :         BUN batch_size = buffer_size / record_size;
     372          20 :         if (batch_size > length)
     373             :                 batch_size = length;
     374          20 :         buffer_size = batch_size * record_size;
     375          20 :         buffer = GDKmalloc(buffer_size);
     376          20 :         if (buffer == NULL)
     377           0 :                 bailout(MAL_MALLOC_FAIL);
     378             : 
     379             :         BUN n;
     380         130 :         for (BUN pos = start; pos < end; pos += n) {
     381         110 :                 n = end - pos;
     382         110 :                 if (n > batch_size)
     383             :                         n = batch_size;
     384         110 :                 msg = encoder(buffer, Tloc(b, pos), n, byteswap);
     385         110 :                 if (msg != MAL_SUCCEED)
     386           0 :                         goto end;
     387         110 :                 msg = write_out(buffer, buffer + n * record_size, s);
     388         110 :                 if (msg != MAL_SUCCEED)
     389           0 :                         goto end;
     390             :         }
     391             : 
     392          20 : end:
     393          20 :         GDKfree(buffer);
     394          20 :         return msg;
     395             : }
     396             : 
     397             : str
     398        3382 : dump_binary_column(const struct type_record_t *rec, BAT *b, BUN start, BUN length, bool byteswap, stream *s)
     399             : {
     400        3382 :         str msg = MAL_SUCCEED;
     401             : 
     402        3382 :         bincopy_dumper_t dumper = rec->dumper;
     403        3382 :         bincopy_encoder_t encoder = rec->encoder;
     404        3382 :         bool trivial = rec->encoder_trivial;
     405             : 
     406             :         // sanity check
     407        3382 :         assert( (dumper != NULL) + (encoder != NULL) + trivial == 1); (void)trivial;
     408             : 
     409        3382 :         if (rec->trivial_if_no_byteswap && !byteswap)
     410        3382 :                 encoder = NULL;
     411             : 
     412        3382 :         if (dumper) {
     413         605 :                 msg = rec->dumper(b, s, start, length, byteswap);
     414        2777 :         } else if (encoder) {
     415          20 :                 msg = dump_fixed_width(b, s, start, length, byteswap, rec->encoder, rec->record_size);
     416             :         } else {
     417        2757 :                 msg = dump_trivial(b, s, start, length);
     418             :         }
     419             : 
     420        3382 :         return msg;
     421             : }
     422             : 
     423             : 
     424             : static str
     425         150 : export_column(backend *be, BAT *b, bool byteswap, str filename, bool onclient)
     426             : {
     427         150 :         const char *mal_operator = "sql.export_bin_column";
     428         150 :         str msg = MAL_SUCCEED;
     429         150 :         stream *s = NULL;
     430             : 
     431             :         // Figure out what kind of data we have
     432         150 :         int tpe = BATttype(b);
     433         150 :         const char *gdk_name = ATOMname(tpe);
     434         150 :         type_record_t *rec = find_type_rec(gdk_name);
     435         150 :         if (rec == NULL)
     436           0 :                 bailout("COPY INTO BINARY not implemented for '%s'", gdk_name);
     437             : 
     438         150 :         if (onclient) {
     439          75 :                 (void)be;
     440          75 :                 s = mapi_request_download(filename, true, be->mvc->scanner.rs, be->mvc->scanner.ws);
     441             :         } else {
     442          75 :                 s = open_wstream(filename);
     443             :         }
     444         150 :         if (!s) {
     445           0 :                 bailout("%s", mnstr_peek_error(NULL));
     446             :         }
     447             : 
     448         150 :         msg = dump_binary_column(rec, b, 0, BATcount(b), byteswap, s);
     449             : 
     450         150 :         if (s && msg == MAL_SUCCEED) {
     451         150 :                 if (mnstr_flush(s, MNSTR_FLUSH_DATA) != 0) {
     452           0 :                         bailout("%s", mnstr_peek_error(s));
     453             :                 }
     454             :         }
     455             : 
     456         150 : end:
     457         150 :         close_stream(s);
     458         150 :         return msg;
     459             : }
     460             : 
     461             : str
     462         150 : mvc_bin_export_column_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     463             : {
     464         150 :         const char *mal_operator = "sql.export_bin_column";
     465         150 :         str msg = MAL_SUCCEED;
     466         150 :         BAT *b = NULL;
     467         150 :         backend *be = cntxt->sqlcontext;
     468         150 :         assert(pci->retc == 1);
     469         150 :         assert(pci->argc == 5);
     470             : 
     471         150 :         lng *ret = getArgReference_lng(stk, pci, 0);
     472             :         // arg 1 handled below
     473         150 :         bool byteswap = *getArgReference_bit(stk, pci, 2);
     474         150 :         str filename = *getArgReference_str(stk, pci, 3);
     475         150 :         bool onclient = (bool) *getArgReference_int(stk, pci, 4);
     476             : 
     477             :         // Usually we are called with a BAT argument but if the user types
     478             :         // something like
     479             :         //
     480             :         //    COPY (SELECT 42 AS num, 'banana' AS word) INTO BINARY ...
     481             :         //
     482             :         // it will be a single value instead.
     483             :         // To avoid having to handle separate cases everywhere we simply
     484             :         // stuff the value into a temporary BAT
     485         150 :         int arg_type = getArgType(mb, pci, 1);
     486         300 :         if (isaBatType(arg_type)) {
     487         150 :                 bat id = *getArgReference_bat(stk, pci, 1);
     488         150 :                 b = BATdescriptor(id);
     489             :         } else {
     490           0 :                 void *value = getArgReference(stk, pci, 1);
     491           0 :                 b = COLnew(0, arg_type, 1, TRANSIENT);
     492           0 :                 if (!b)
     493           0 :                         bailout("%s", GDK_EXCEPTION);
     494           0 :                 if (BUNappend(b, value, true) != GDK_SUCCEED)
     495           0 :                         bailout("%s", GDK_EXCEPTION);
     496             :         }
     497             : 
     498         150 :         msg = export_column(be, b, byteswap, filename, onclient);
     499         150 :         if (msg == MAL_SUCCEED)
     500         150 :                 *ret = BATcount(b);
     501             : 
     502           0 : end:
     503         150 :         BBPreclaim(b);
     504         150 :         return msg;
     505             : }

Generated by: LCOV version 1.14