Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * Implementation of COPY BINARY INTO
15 : */
16 :
17 : #include "monetdb_config.h"
18 : #include "mapi_prompt.h"
19 : #include "gdk.h"
20 : #include "sql.h"
21 : #include "mal_backend.h"
22 : #include "mal_interpreter.h"
23 : #include "sql_bincopyconvert.h"
24 : #include "copybinary.h"
25 : #include "copybinary_support.h"
26 :
27 :
28 : #define bailout(...) do { \
29 : msg = createException(MAL, mal_operator, SQLSTATE(42000) __VA_ARGS__); \
30 : goto end; \
31 : } while (0)
32 :
33 :
34 : static str
35 152 : load_trivial(BAT *bat, stream *s, const char *filename, bincopy_validate_t validate, int width, BUN rows_estimate, int *eof_seen)
36 : {
37 152 : const char *mal_operator = "sql.importColumn";
38 152 : str msg = MAL_SUCCEED;
39 152 : int tt = BATttype(bat);
40 152 : const size_t asz = (size_t) ATOMsize(tt);
41 152 : const size_t chunk_size = 1<<20;
42 :
43 152 : bool eof = false;
44 470 : while (!eof) {
45 319 : assert(chunk_size % asz == 0);
46 319 : size_t n;
47 319 : if (rows_estimate > 0) {
48 : // Set n to estimate+1 so it will read once, get n - 1 and know it's at EOF.
49 : // Otherwise, it will read n, get n, then enlarge the heap, read again,
50 : // and only then know it's at eof.
51 84 : n = rows_estimate + 1;
52 84 : rows_estimate = 0;
53 : } else {
54 235 : n = chunk_size / asz;
55 : }
56 :
57 : // First make some room
58 319 : BUN validCount = bat->batCount;
59 319 : BUN newCount = validCount + n;
60 319 : if (BATextend(bat, newCount) != GDK_SUCCEED)
61 0 : bailout("load_trivial: %s", GDK_EXCEPTION);
62 :
63 : // Read into the newly allocated space
64 319 : char *start = Tloc(bat, validCount);
65 319 : char *cur = start;
66 319 : char *end = Tloc(bat, newCount);
67 319 : char *validated = start;
68 945 : while (cur < end) {
69 627 : ssize_t nread = mnstr_read(s, cur, 1, end - cur);
70 627 : if (nread < 0)
71 0 : bailout("load_trivial: %s", mnstr_peek_error(s));
72 627 : if (nread == 0) {
73 151 : eof = true;
74 151 : size_t tail = (cur - start) % asz;
75 151 : if (tail != 0) {
76 0 : bailout("load_trivial: final item incomplete: %d bytes instead of %d", (int) tail, (int) asz);
77 : }
78 : end = cur;
79 : }
80 627 : cur += (size_t) nread;
81 627 : if (validate) {
82 479 : size_t to_validate = (cur - validated) / asz;
83 479 : msg = validate(validated, to_validate, width, filename);
84 479 : if (msg != MAL_SUCCEED)
85 : break;
86 478 : validated += to_validate * asz;
87 : }
88 : }
89 319 : if (msg != NULL)
90 1 : goto end;
91 318 : BUN actualCount = validCount + (end - start) / asz;
92 318 : BATsetcount(bat, actualCount);
93 : }
94 :
95 151 : BATsetcount(bat, bat->batCount);
96 151 : bat->tseqbase = oid_nil;
97 151 : bat->tnonil = bat->batCount == 0;
98 151 : bat->tnil = false;
99 151 : if (bat->batCount <= 1) {
100 0 : bat->tsorted = true;
101 0 : bat->trevsorted = true;
102 0 : bat->tkey = true;
103 : } else {
104 151 : bat->tsorted = false;
105 151 : bat->trevsorted = false;
106 151 : bat->tkey = false;
107 : }
108 :
109 152 : end:
110 152 : *eof_seen = (int)eof;
111 152 : return msg;
112 : }
113 :
114 : static str
115 36 : load_fixed_width(BAT *bat, stream *s, const char *filename, int width, bool byteswap, bincopy_decoder_t convert, bincopy_validate_t validate, size_t record_size, int *eof_reached)
116 : {
117 36 : const char *mal_operator = "sql.importColumn";
118 36 : str msg = MAL_SUCCEED;
119 36 : bstream *bs = NULL;
120 :
121 36 : if (record_size == 0) {
122 26 : int tt = BATttype(bat);
123 26 : record_size = (size_t) ATOMsize(tt);
124 : }
125 :
126 : // Read whole number of records
127 62 : size_t chunk_size = 1<<20;
128 26 : assert(record_size > 0);
129 36 : chunk_size -= chunk_size % record_size;
130 :
131 36 : bs = bstream_create(s, chunk_size);
132 36 : if (bs == NULL) {
133 0 : msg = createException(SQL, "sql", SQLSTATE(HY013) MAL_MALLOC_FAIL);
134 0 : goto end;
135 : }
136 :
137 308 : while (1) {
138 172 : ssize_t nread = bstream_next(bs);
139 172 : if (nread < 0)
140 0 : bailout("%s", mnstr_peek_error(s));
141 172 : if (nread == 0)
142 : break;
143 :
144 137 : size_t n = (bs->len - bs->pos) / record_size;
145 137 : size_t extent = n * record_size;
146 137 : BUN count = BATcount(bat);
147 137 : BUN newCount = count + n;
148 137 : if (BATextend(bat, newCount) != GDK_SUCCEED)
149 0 : bailout("%s", GDK_EXCEPTION);
150 :
151 137 : msg = convert(Tloc(bat, count), &bs->buf[bs->pos], n, byteswap);
152 137 : if (validate != NULL && msg == MAL_SUCCEED)
153 48 : msg = validate(Tloc(bat, count), n, width, filename);
154 137 : if (msg != MAL_SUCCEED)
155 1 : goto end;
156 136 : BATsetcount(bat, newCount);
157 136 : bs->pos += extent;
158 : }
159 :
160 35 : bat->tseqbase = oid_nil;
161 35 : bat->tnonil = bat->batCount == 0;
162 35 : bat->tnil = false;
163 35 : if (bat->batCount <= 1) {
164 0 : bat->tsorted = true;
165 0 : bat->trevsorted = true;
166 0 : bat->tkey = true;
167 : } else {
168 35 : bat->tsorted = false;
169 35 : bat->trevsorted = false;
170 35 : bat->tkey = false;
171 : }
172 :
173 36 : end:
174 36 : *eof_reached = 0;
175 36 : if (bs != NULL) {
176 36 : *eof_reached = (int)bs->eof;
177 36 : bs->s = NULL;
178 36 : bstream_destroy(bs);
179 : }
180 36 : return msg;
181 : }
182 :
183 :
184 : static str
185 217 : load_column(type_record_t *rec, const char *name, BAT *bat, stream *s, int width, bool byteswap, BUN rows_estimate, int *eof_reached)
186 : {
187 217 : const char *mal_operator = "sql.importColumn";
188 217 : BUN orig_count, new_count;
189 217 : str msg = MAL_SUCCEED;
190 217 : BUN rows_added;
191 :
192 217 : bincopy_loader_t loader = rec->loader;
193 217 : bincopy_decoder_t decoder = rec->decoder;
194 217 : bool trivial = rec->decoder_trivial;
195 :
196 : // sanity check
197 217 : assert( (loader != NULL) + (decoder != NULL) + trivial == 1); (void)trivial;
198 :
199 217 : if (rec->trivial_if_no_byteswap && !byteswap)
200 217 : decoder = NULL;
201 :
202 217 : orig_count = BATcount(bat);
203 :
204 217 : if (loader) {
205 29 : msg = loader(bat, s, eof_reached, width, byteswap);
206 188 : } else if (decoder) {
207 36 : msg = load_fixed_width(bat, s, name, width, byteswap, rec->decoder, rec->validate, rec->record_size, eof_reached);
208 : } else {
209 : // load the bytes directly into the bat, as-is
210 152 : msg = load_trivial(bat, s, name, rec->validate, width, rows_estimate, eof_reached);
211 : }
212 :
213 217 : new_count = BATcount(bat);
214 217 : rows_added = new_count - orig_count;
215 :
216 217 : if (msg == MAL_SUCCEED && rows_estimate != 0 && rows_estimate != rows_added)
217 3 : bailout(
218 : "inconsistent row count in %s: expected "BUNFMT", got "BUNFMT,
219 : name,
220 : rows_estimate, rows_added);
221 :
222 214 : end:
223 217 : return msg;
224 : }
225 :
226 : /* Import a single file into a new BAT.
227 : */
228 : static str
229 217 : import_column(backend *be, bat *ret, BUN *retcnt, str method, int width, bool byteswap, str path, int onclient, BUN nrows)
230 : {
231 : // In this function we create the BAT and open the file, and tidy
232 : // up when things go wrong. The actual work happens in load_column().
233 :
234 217 : const str mal_operator = "sql.importColumn";
235 :
236 : // These are managed by the end: block.
237 217 : str msg = MAL_SUCCEED;
238 217 : int gdk_type;
239 217 : BAT *bat = NULL;
240 217 : int eof_reached = -1; // 1 = read to the end; 0 = stopped reading early; -1 = unset, a bug.
241 217 : stream *s = NULL;
242 :
243 : // Set safe values
244 217 : *ret = 0;
245 217 : *retcnt = 0;
246 :
247 : // Figure out what kind of data we have
248 217 : type_record_t *rec = find_type_rec(method);
249 217 : if (rec == NULL)
250 0 : bailout("COPY BINARY FROM not implemented for '%s'", method);
251 :
252 : // Create the BAT
253 217 : gdk_type = ATOMindex(rec->gdk_type);
254 217 : if (gdk_type < 0)
255 0 : bailout("cannot load %s as %s: unknown atom type %s", path, method, rec->gdk_type);
256 217 : bat = COLnew(0, gdk_type, nrows, PERSISTENT);
257 217 : if (bat == NULL)
258 0 : bailout("%s", GDK_EXCEPTION);
259 :
260 : // Open the input stream
261 217 : if (onclient) {
262 109 : s = mapi_request_upload(path, true, be->mvc->scanner.rs, be->mvc->scanner.ws);
263 : } else {
264 108 : s = open_rstream(path);
265 : }
266 217 : if (!s) {
267 0 : bailout("%s", mnstr_peek_error(NULL));
268 : }
269 :
270 : // Do the work
271 217 : msg = load_column(rec, path, bat, s, width, byteswap, nrows, &eof_reached);
272 217 : if (eof_reached != 0 && eof_reached != 1) {
273 0 : if (msg)
274 0 : bailout("internal error in sql.importColumn: eof_reached not set (%s). Earlier error: %s", method, msg);
275 : else
276 0 : bailout("internal error in sql.importColumn: eof_reached not set (%s)", method);
277 : }
278 :
279 : // Fall through into the end block which will clean things up
280 217 : end:
281 217 : if (s)
282 217 : close_stream(s);
283 :
284 : // Manage the return values and `bat`.
285 217 : if (msg == MAL_SUCCEED) {
286 207 : BBPkeepref(bat);
287 207 : *ret = bat->batCacheid;
288 207 : *retcnt = BATcount(bat);
289 : } else {
290 10 : if (bat != NULL) {
291 10 : BBPunfix(bat->batCacheid);
292 10 : bat = NULL;
293 : }
294 10 : *ret = 0;
295 10 : *retcnt = 0;
296 : }
297 :
298 217 : return msg;
299 : }
300 :
301 :
302 : str
303 217 : mvc_bin_import_column_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
304 : {
305 : // Entry point for sql.importColumn.
306 : // Does the argument/return handling, the work is done by importColumn.
307 217 : (void)mb;
308 :
309 217 : assert(pci->retc == 2);
310 217 : bat *ret = getArgReference_bat(stk, pci, 0);
311 217 : BUN *retcnt = getArgReference_oid(stk, pci, 1);
312 :
313 217 : assert(pci->argc == 8);
314 217 : str method = *getArgReference_str(stk, pci, 2);
315 217 : int width = *getArgReference_int(stk, pci, 3);
316 217 : bit byteswap = *getArgReference_bit(stk, pci, 4);
317 217 : str path = *getArgReference_str(stk, pci, 5);
318 217 : int onclient = *getArgReference_int(stk, pci, 6);
319 217 : BUN nrows = *getArgReference_oid(stk, pci, 7);
320 :
321 217 : backend *be = cntxt->sqlcontext;
322 :
323 217 : return import_column(be, ret, retcnt, method, width, byteswap, path, onclient, nrows);
324 : }
325 :
326 :
327 :
328 : static str
329 2867 : write_out(const char *start, const char *end, stream *s)
330 : {
331 2867 : const char *mal_operator = "sql.export_bin_column";
332 2867 : str msg = MAL_SUCCEED;
333 :
334 2867 : const char *p = start;
335 5734 : while (p < end) {
336 2867 : ssize_t nwritten = mnstr_write(s, p, 1, end - p);
337 2867 : if (nwritten < 0)
338 0 : bailout("%s", mnstr_peek_error(s));
339 2867 : if (nwritten == 0)
340 0 : bailout("Unexpected EOF on %s", mnstr_name(s));
341 2867 : p += nwritten;
342 : }
343 2867 : end:
344 2867 : return msg;
345 : }
346 :
347 : static str
348 2757 : dump_trivial(BAT *b, stream *s, BUN start, BUN length)
349 : {
350 2757 : assert(!ATOMvarsized(BATttype(b)));
351 2757 : BUN end = start + length;
352 2757 : assert(end <= BATcount(b));
353 2757 : return write_out(Tloc(b, start), Tloc(b, end), s);
354 : }
355 :
356 : static str
357 20 : dump_fixed_width(BAT *b, stream *s, BUN start, BUN length, bool byteswap, bincopy_encoder_t encoder, size_t record_size)
358 : {
359 20 : const char *mal_operator = "sql.export_bin_column";
360 20 : str msg = MAL_SUCCEED;
361 20 : char *buffer = NULL;
362 :
363 20 : BUN end = start + length;
364 20 : assert(end <= BATcount(b));
365 :
366 20 : if (record_size == 0) {
367 12 : int tt = BATttype(b);
368 12 : record_size = (size_t) ATOMsize(tt);
369 : }
370 20 : size_t buffer_size = 1024 * 1024;
371 20 : BUN batch_size = buffer_size / record_size;
372 20 : if (batch_size > length)
373 : batch_size = length;
374 20 : buffer_size = batch_size * record_size;
375 20 : buffer = GDKmalloc(buffer_size);
376 20 : if (buffer == NULL)
377 0 : bailout(MAL_MALLOC_FAIL);
378 :
379 : BUN n;
380 130 : for (BUN pos = start; pos < end; pos += n) {
381 110 : n = end - pos;
382 110 : if (n > batch_size)
383 : n = batch_size;
384 110 : msg = encoder(buffer, Tloc(b, pos), n, byteswap);
385 110 : if (msg != MAL_SUCCEED)
386 0 : goto end;
387 110 : msg = write_out(buffer, buffer + n * record_size, s);
388 110 : if (msg != MAL_SUCCEED)
389 0 : goto end;
390 : }
391 :
392 20 : end:
393 20 : GDKfree(buffer);
394 20 : return msg;
395 : }
396 :
397 : str
398 3382 : dump_binary_column(const struct type_record_t *rec, BAT *b, BUN start, BUN length, bool byteswap, stream *s)
399 : {
400 3382 : str msg = MAL_SUCCEED;
401 :
402 3382 : bincopy_dumper_t dumper = rec->dumper;
403 3382 : bincopy_encoder_t encoder = rec->encoder;
404 3382 : bool trivial = rec->encoder_trivial;
405 :
406 : // sanity check
407 3382 : assert( (dumper != NULL) + (encoder != NULL) + trivial == 1); (void)trivial;
408 :
409 3382 : if (rec->trivial_if_no_byteswap && !byteswap)
410 3382 : encoder = NULL;
411 :
412 3382 : if (dumper) {
413 605 : msg = rec->dumper(b, s, start, length, byteswap);
414 2777 : } else if (encoder) {
415 20 : msg = dump_fixed_width(b, s, start, length, byteswap, rec->encoder, rec->record_size);
416 : } else {
417 2757 : msg = dump_trivial(b, s, start, length);
418 : }
419 :
420 3382 : return msg;
421 : }
422 :
423 :
424 : static str
425 150 : export_column(backend *be, BAT *b, bool byteswap, str filename, bool onclient)
426 : {
427 150 : const char *mal_operator = "sql.export_bin_column";
428 150 : str msg = MAL_SUCCEED;
429 150 : stream *s = NULL;
430 :
431 : // Figure out what kind of data we have
432 150 : int tpe = BATttype(b);
433 150 : const char *gdk_name = ATOMname(tpe);
434 150 : type_record_t *rec = find_type_rec(gdk_name);
435 150 : if (rec == NULL)
436 0 : bailout("COPY INTO BINARY not implemented for '%s'", gdk_name);
437 :
438 150 : if (onclient) {
439 75 : (void)be;
440 75 : s = mapi_request_download(filename, true, be->mvc->scanner.rs, be->mvc->scanner.ws);
441 : } else {
442 75 : s = open_wstream(filename);
443 : }
444 150 : if (!s) {
445 0 : bailout("%s", mnstr_peek_error(NULL));
446 : }
447 :
448 150 : msg = dump_binary_column(rec, b, 0, BATcount(b), byteswap, s);
449 :
450 150 : if (s && msg == MAL_SUCCEED) {
451 150 : if (mnstr_flush(s, MNSTR_FLUSH_DATA) != 0) {
452 0 : bailout("%s", mnstr_peek_error(s));
453 : }
454 : }
455 :
456 150 : end:
457 150 : close_stream(s);
458 150 : return msg;
459 : }
460 :
461 : str
462 150 : mvc_bin_export_column_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
463 : {
464 150 : const char *mal_operator = "sql.export_bin_column";
465 150 : str msg = MAL_SUCCEED;
466 150 : BAT *b = NULL;
467 150 : backend *be = cntxt->sqlcontext;
468 150 : assert(pci->retc == 1);
469 150 : assert(pci->argc == 5);
470 :
471 150 : lng *ret = getArgReference_lng(stk, pci, 0);
472 : // arg 1 handled below
473 150 : bool byteswap = *getArgReference_bit(stk, pci, 2);
474 150 : str filename = *getArgReference_str(stk, pci, 3);
475 150 : bool onclient = (bool) *getArgReference_int(stk, pci, 4);
476 :
477 : // Usually we are called with a BAT argument but if the user types
478 : // something like
479 : //
480 : // COPY (SELECT 42 AS num, 'banana' AS word) INTO BINARY ...
481 : //
482 : // it will be a single value instead.
483 : // To avoid having to handle separate cases everywhere we simply
484 : // stuff the value into a temporary BAT
485 150 : int arg_type = getArgType(mb, pci, 1);
486 300 : if (isaBatType(arg_type)) {
487 150 : bat id = *getArgReference_bat(stk, pci, 1);
488 150 : b = BATdescriptor(id);
489 : } else {
490 0 : void *value = getArgReference(stk, pci, 1);
491 0 : b = COLnew(0, arg_type, 1, TRANSIENT);
492 0 : if (!b)
493 0 : bailout("%s", GDK_EXCEPTION);
494 0 : if (BUNappend(b, value, true) != GDK_SUCCEED)
495 0 : bailout("%s", GDK_EXCEPTION);
496 : }
497 :
498 150 : msg = export_column(be, b, byteswap, filename, onclient);
499 150 : if (msg == MAL_SUCCEED)
500 150 : *ret = BATcount(b);
501 :
502 0 : end:
503 150 : BBPreclaim(b);
504 150 : return msg;
505 : }
|