Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * Niels Nes, Martin Kersten
15 : *
16 : * Parallel bulk load for SQL
17 : * The COPY INTO command for SQL is heavily CPU bound, which means
18 : * that ideally we would like to exploit the multi-cores to do that
19 : * work in parallel.
20 : * Complicating factors are the initial record offset, the
21 : * possible variable length of the input, and the original sort order
22 : * that should preferable be maintained.
23 : *
24 : * The code below consists of a file reader, which breaks up the
25 : * file into chunks of distinct rows. Then multiple parallel threads
26 : * grab them, and break them on the field boundaries.
27 : * After all fields are identified this way, the columns are converted
28 : * and stored in the BATs.
29 : *
30 : * The threads get a reference to a private copy of the READERtask.
31 : * It includes a list of columns they should handle. This is a basis
32 : * to distributed cheap and expensive columns over threads.
33 : *
34 : * The file reader overlaps IO with updates of the BAT.
35 : * Also the buffer size of the block stream might be a little small for
36 : * this task (1MB). It has been increased to 8MB, which indeed improved.
37 : *
38 : * The work divider allocates subtasks to threads based on the
39 : * observed time spending so far.
40 : */
41 :
42 : #include "monetdb_config.h"
43 : #include "tablet.h"
44 : #include "mapi_prompt.h"
45 : #include "mal_internal.h"
46 :
47 : #include <string.h>
48 : #include <ctype.h>
49 :
50 : #define MAXWORKERS 64
51 : #define MAXBUFFERS 2
52 : /* We restrict the row length to be 32MB for the time being */
53 : #define MAXROWSIZE(X) (X > 32*1024*1024 ? X : 32*1024*1024)
54 :
55 : static MT_Lock errorlock = MT_LOCK_INITIALIZER(errorlock);
56 :
57 : static BAT *
58 10044 : void_bat_create(int adt, BUN nr)
59 : {
60 10044 : BAT *b = COLnew(0, adt, nr, TRANSIENT);
61 :
62 : /* check for correct structures */
63 10044 : if (b == NULL)
64 : return NULL;
65 10044 : if ((b = BATsetaccess(b, BAT_APPEND)) == NULL) {
66 : return NULL;
67 : }
68 :
69 : /* disable all properties here */
70 10044 : b->tsorted = false;
71 10044 : b->trevsorted = false;
72 10044 : b->tnosorted = 0;
73 10044 : b->tnorevsorted = 0;
74 10044 : b->tseqbase = oid_nil;
75 10044 : b->tkey = false;
76 10044 : b->tnokey[0] = 0;
77 10044 : b->tnokey[1] = 0;
78 10044 : return b;
79 : }
80 :
81 : void
82 120010 : TABLETdestroy_format(Tablet *as)
83 : {
84 120010 : BUN p;
85 120010 : Column *fmt = as->format;
86 :
87 562927 : for (p = 0; p < as->nr_attrs; p++) {
88 442858 : BBPreclaim(fmt[p].c);
89 442846 : if (fmt[p].data)
90 10049 : GDKfree(fmt[p].data);
91 : }
92 120069 : GDKfree(fmt);
93 120079 : }
94 :
95 : static oid
96 118968 : check_BATs(Tablet *as)
97 : {
98 118968 : Column *fmt = as->format;
99 118968 : BUN i = 0;
100 118968 : BUN cnt;
101 118968 : oid base;
102 :
103 118968 : if (fmt[i].c == NULL)
104 118945 : i++;
105 118968 : cnt = BATcount(fmt[i].c);
106 118968 : base = fmt[i].c->hseqbase;
107 :
108 118968 : if (as->nr != cnt) {
109 5782 : for (i = 0; i < as->nr_attrs; i++)
110 5104 : if (fmt[i].c)
111 4426 : fmt[i].p = as->offset;
112 678 : return oid_nil;
113 : }
114 :
115 545812 : for (i = 0; i < as->nr_attrs; i++) {
116 427522 : BAT *b = fmt[i].c;
117 :
118 427522 : if (b == NULL)
119 118260 : continue;
120 :
121 309262 : if (BATcount(b) != cnt || b->hseqbase != base)
122 0 : return oid_nil;
123 :
124 309262 : fmt[i].p = as->offset;
125 : }
126 : return base;
127 : }
128 :
129 : str
130 1070 : TABLETcreate_bats(Tablet *as, BUN est)
131 : {
132 1070 : Column *fmt = as->format;
133 1070 : BUN i, nr = 0;
134 :
135 11120 : for (i = 0; i < as->nr_attrs; i++) {
136 10050 : if (fmt[i].skip)
137 6 : continue;
138 10044 : fmt[i].c = void_bat_create(fmt[i].adt, est);
139 10044 : if (!fmt[i].c) {
140 0 : while (i > 0) {
141 0 : if (!fmt[--i].skip) {
142 0 : BBPreclaim(fmt[i].c);
143 0 : fmt[i].c = NULL;
144 : }
145 : }
146 0 : throw(SQL, "copy", "Failed to create bat of size " BUNFMT "\n",
147 : est);
148 : }
149 10044 : fmt[i].ci = bat_iterator_nolock(fmt[i].c);
150 10044 : nr++;
151 : }
152 1070 : if (!nr)
153 0 : throw(SQL, "copy",
154 : "At least one column should be read from the input\n");
155 : return MAL_SUCCEED;
156 : }
157 :
158 : str
159 1046 : TABLETcollect(BAT **bats, Tablet *as)
160 : {
161 1046 : Column *fmt = as->format;
162 1046 : BUN i, j;
163 1046 : BUN cnt = 0;
164 :
165 1046 : if (bats == NULL)
166 0 : throw(SQL, "copy", "Missing container");
167 2496 : for (i = 0; i < as->nr_attrs && !cnt; i++)
168 1450 : if (!fmt[i].skip)
169 1447 : cnt = BATcount(fmt[i].c);
170 11005 : for (i = 0, j = 0; i < as->nr_attrs; i++) {
171 9959 : if (fmt[i].skip)
172 6 : continue;
173 9953 : bats[j] = fmt[i].c;
174 9953 : BBPfix(bats[j]->batCacheid);
175 9953 : if ((fmt[i].c = BATsetaccess(fmt[i].c, BAT_READ)) == NULL)
176 0 : throw(SQL, "copy",
177 : "Failed to set access at tablet part " BUNFMT "\n", cnt);
178 9953 : fmt[i].c->tsorted = fmt[i].c->trevsorted = false;
179 9953 : fmt[i].c->tkey = false;
180 9953 : BATsettrivprop(fmt[i].c);
181 :
182 9953 : if (cnt != BATcount(fmt[i].c))
183 0 : throw(SQL, "copy", "Count " BUNFMT " differs from " BUNFMT "\n",
184 : BATcount(fmt[i].c), cnt);
185 9953 : j++;
186 : }
187 : return MAL_SUCCEED;
188 : }
189 :
190 : // the starting quote character has already been skipped
191 :
192 : static char *
193 3988504 : tablet_skip_string(char *s, char quote, bool escape)
194 : {
195 3988504 : size_t i = 0, j = 0;
196 87697585 : while (s[i]) {
197 87673445 : if (escape && s[i] == '\\' && s[i + 1] != '\0')
198 474708 : s[j++] = s[i++];
199 87198737 : else if (s[i] == quote) {
200 2630508 : if (s[i + 1] != quote)
201 : break;
202 : i++; /* skip the first quote */
203 : }
204 83709081 : s[j++] = s[i++];
205 : }
206 3988504 : assert(s[i] == quote || s[i] == '\0');
207 3988504 : if (s[i] == 0)
208 : return NULL;
209 3988504 : s[j] = 0;
210 3988504 : return s + i;
211 : }
212 :
213 : static int
214 0 : TABLET_error(stream *s)
215 : {
216 0 : const char *err = mnstr_peek_error(s);
217 0 : if (err)
218 0 : TRC_ERROR(MAL_SERVER, "Stream error: %s\n", err);
219 0 : return -1;
220 : }
221 :
222 : /* The output line is first built before being sent. It solves a problem
223 : with UDP, where you may loose most of the information using short writes
224 : */
225 : static inline int
226 0 : output_line(char **buf, size_t *len, char **localbuf, size_t *locallen,
227 : Column *fmt, stream *fd, BUN nr_attrs, oid id)
228 : {
229 0 : BUN i;
230 0 : ssize_t fill = 0;
231 :
232 0 : for (i = 0; i < nr_attrs; i++) {
233 0 : if (fmt[i].c == NULL)
234 0 : continue;
235 0 : if (id < fmt[i].c->hseqbase
236 0 : || id >= fmt[i].c->hseqbase + BATcount(fmt[i].c))
237 : break;
238 0 : fmt[i].p = id - fmt[i].c->hseqbase;
239 : }
240 0 : if (i == nr_attrs) {
241 0 : for (i = 0; i < nr_attrs; i++) {
242 0 : Column *f = fmt + i;
243 0 : const char *p;
244 0 : ssize_t l;
245 :
246 0 : if (f->c) {
247 0 : p = BUNtail(f->ci, f->p);
248 :
249 0 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
250 0 : p = f->nullstr;
251 0 : l = (ssize_t) strlen(f->nullstr);
252 : } else {
253 0 : l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
254 0 : if (l < 0)
255 : return -1;
256 0 : p = *localbuf;
257 : }
258 0 : if (fill + l + f->seplen >= (ssize_t) * len) {
259 : /* extend the buffer */
260 0 : char *nbuf;
261 0 : nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
262 0 : if (nbuf == NULL)
263 : return -1; /* *buf freed by caller */
264 0 : *buf = nbuf;
265 0 : *len = fill + l + f->seplen + BUFSIZ;
266 : }
267 0 : strncpy(*buf + fill, p, l);
268 0 : fill += l;
269 : }
270 0 : strncpy(*buf + fill, f->sep, f->seplen);
271 0 : fill += f->seplen;
272 : }
273 : }
274 0 : if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
275 0 : return TABLET_error(fd);
276 : return 0;
277 : }
278 :
279 : static inline int
280 6185348 : output_line_dense(char **buf, size_t *len, char **localbuf, size_t *locallen,
281 : Column *fmt, stream *fd, BUN nr_attrs)
282 : {
283 6185348 : BUN i;
284 6185348 : ssize_t fill = 0;
285 :
286 27271679 : for (i = 0; i < nr_attrs; i++) {
287 21086359 : Column *f = fmt + i;
288 21086359 : const char *p;
289 21086359 : ssize_t l;
290 :
291 21086359 : if (f->c) {
292 14901007 : p = BUNtail(f->ci, f->p);
293 :
294 14901007 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
295 495873 : p = f->nullstr;
296 495873 : l = (ssize_t) strlen(p);
297 : } else {
298 14405082 : l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
299 14405106 : if (l < 0)
300 : return -1;
301 14405106 : p = *localbuf;
302 : }
303 14900979 : if (fill + l + f->seplen >= (ssize_t) * len) {
304 : /* extend the buffer */
305 78 : char *nbuf;
306 78 : nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
307 78 : if (nbuf == NULL)
308 : return -1; /* *buf freed by caller */
309 78 : *buf = nbuf;
310 78 : *len = fill + l + f->seplen + BUFSIZ;
311 : }
312 14900979 : strncpy(*buf + fill, p, l);
313 14900979 : fill += l;
314 14900979 : f->p++;
315 : }
316 21086331 : strncpy(*buf + fill, f->sep, f->seplen);
317 21086331 : fill += f->seplen;
318 : }
319 6185320 : if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
320 0 : return TABLET_error(fd);
321 : return 0;
322 : }
323 :
324 : static inline int
325 0 : output_line_lookup(char **buf, size_t *len, Column *fmt, stream *fd,
326 : BUN nr_attrs, oid id)
327 : {
328 0 : BUN i;
329 :
330 0 : for (i = 0; i < nr_attrs; i++) {
331 0 : Column *f = fmt + i;
332 :
333 0 : if (f->c) {
334 0 : const void *p = BUNtail(f->ci, id - f->c->hseqbase);
335 :
336 0 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
337 0 : size_t l = strlen(f->nullstr);
338 0 : if (mnstr_write(fd, f->nullstr, 1, l) != (ssize_t) l)
339 0 : return TABLET_error(fd);
340 : } else {
341 0 : ssize_t l = f->tostr(f->extra, buf, len, f->adt, p);
342 :
343 0 : if (l < 0 || mnstr_write(fd, *buf, 1, l) != l)
344 0 : return TABLET_error(fd);
345 : }
346 : }
347 0 : if (mnstr_write(fd, f->sep, 1, f->seplen) != f->seplen)
348 0 : return TABLET_error(fd);
349 : }
350 : return 0;
351 : }
352 :
353 : /*
354 : * Fast Load
355 : * To speedup the CPU intensive loading of files we have to break
356 : * the file into pieces and perform parallel analysis. Experimentation
357 : * against lineitem SF1 showed that half of the time goes into very
358 : * basis atom analysis (41 out of 102 B instructions).
359 : * Furthermore, the actual insertion into the BATs takes only
360 : * about 10% of the total. With multi-core processors around
361 : * it seems we can gain here significantly.
362 : *
363 : * The approach taken is to fork a parallel scan over the text file.
364 : * We assume that the blocked stream is already
365 : * positioned correctly at the reading position. The start and limit
366 : * indicates the byte range to search for tuples.
367 : * If start> 0 then we first skip to the next record separator.
368 : * If necessary we read more than 'limit' bytes to ensure parsing a complete
369 : * record and stop at the record boundary.
370 : * Beware, we should allocate Tablet descriptors for each file segment,
371 : * otherwise we end up with a gross concurrency control problem.
372 : * The resulting BATs should be glued at the final phase.
373 : *
374 : * Raw Load
375 : * Front-ends can bypass most of the overhead in loading the BATs
376 : * by preparing the corresponding files directly and replace those
377 : * created by e.g. the SQL frontend.
378 : * This strategy is only advisable for cases where we have very
379 : * large files >200GB and/or are created by a well debugged code.
380 : *
381 : * To experiment with this approach, the code base responds
382 : * on negative number of cores by dumping the data directly in BAT
383 : * storage format into a collections of files on disk.
384 : * It reports on the actions to be taken to replace BATs.
385 : * This technique is initially only supported for fixed-sized columns.
386 : * The rawmode() indicator acts as the internal switch.
387 : */
388 :
389 : /*
390 : * To speed up loading ascii files we have to determine the number of blocks.
391 : * This depends on the number of cores available.
392 : * For the time being we hardwire this decision based on our own
393 : * platforms.
394 : * Furthermore, we only consider parallel load for file-based requests.
395 : *
396 : * To simplify our world, we assume a single producer process.
397 : */
398 :
399 : static int
400 0 : output_file_default(Tablet *as, BAT *order, stream *fd, bstream *in)
401 : {
402 0 : size_t len = BUFSIZ, locallen = BUFSIZ;
403 0 : int res = 0;
404 0 : char *buf = GDKmalloc(len);
405 0 : char *localbuf = GDKmalloc(len);
406 0 : BUN p, q;
407 0 : oid id;
408 0 : BUN offset = as->offset;
409 :
410 0 : if (buf == NULL || localbuf == NULL) {
411 0 : GDKfree(buf);
412 0 : GDKfree(localbuf);
413 0 : return -1;
414 : }
415 0 : for (q = offset + as->nr, p = offset, id = order->hseqbase + offset; p < q;
416 0 : p++, id++) {
417 0 : if (((p - offset) & 8191) == 8191 && bstream_getoob(in)) {
418 : res = -5;
419 : break;
420 : }
421 0 : if ((res = output_line(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs, id)) < 0) {
422 : break;
423 : }
424 : }
425 0 : GDKfree(localbuf);
426 0 : GDKfree(buf);
427 0 : return res;
428 : }
429 :
430 : static int
431 118949 : output_file_dense(Tablet *as, stream *fd, bstream *in)
432 : {
433 118949 : size_t len = BUFSIZ, locallen = BUFSIZ;
434 118949 : int res = 0;
435 118949 : char *buf = GDKmalloc(len);
436 118988 : char *localbuf = GDKmalloc(len);
437 118983 : BUN i = 0;
438 :
439 118983 : if (buf == NULL || localbuf == NULL) {
440 0 : GDKfree(buf);
441 0 : GDKfree(localbuf);
442 0 : return -1;
443 : }
444 6304312 : for (i = 0; i < as->nr; i++) {
445 6185326 : if ((i & 8191) == 8191 && bstream_getoob(in)) {
446 : res = -5; /* "Query aborted" */
447 : break;
448 : }
449 6185326 : if ((res = output_line_dense(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs)) < 0) {
450 : break;
451 : }
452 : }
453 118986 : GDKfree(localbuf);
454 119009 : GDKfree(buf);
455 119009 : return res;
456 : }
457 :
458 : static int
459 0 : output_file_ordered(Tablet *as, BAT *order, stream *fd, bstream *in)
460 : {
461 0 : size_t len = BUFSIZ;
462 0 : int res = 0;
463 0 : char *buf = GDKmalloc(len);
464 0 : BUN p, q;
465 0 : BUN i = 0;
466 0 : BUN offset = as->offset;
467 :
468 0 : if (buf == NULL)
469 : return -1;
470 0 : for (q = offset + as->nr, p = offset; p < q; p++, i++) {
471 0 : oid h = order->hseqbase + p;
472 :
473 0 : if (((p - offset) & 8191) == 8191 && bstream_getoob(in)) {
474 : res = -5;
475 : break;
476 : }
477 0 : if ((res = output_line_lookup(&buf, &len, as->format, fd, as->nr_attrs, h)) < 0) {
478 0 : GDKfree(buf);
479 0 : return res;
480 : }
481 : }
482 0 : GDKfree(buf);
483 0 : return res;
484 : }
485 :
486 : int
487 118943 : TABLEToutput_file(Tablet *as, BAT *order, stream *s, bstream *in)
488 : {
489 118943 : oid base = oid_nil;
490 118943 : int ret = 0;
491 :
492 : /* only set nr if it is zero or lower (bogus) to the maximum value
493 : * possible (BATcount), if already set within BATcount range,
494 : * preserve value such that for instance SQL's reply_size still
495 : * works
496 : */
497 118943 : if (order) {
498 0 : BUN maxnr = BATcount(order);
499 0 : if (as->nr == BUN_NONE || as->nr > maxnr)
500 0 : as->nr = maxnr;
501 : }
502 118943 : assert(as->nr != BUN_NONE);
503 :
504 118943 : base = check_BATs(as);
505 118949 : if (!order || !is_oid_nil(base)) {
506 118949 : if (!order || order->hseqbase == base)
507 118949 : ret = output_file_dense(as, s, in);
508 : else
509 0 : ret = output_file_ordered(as, order, s, in);
510 : } else {
511 0 : ret = output_file_default(as, order, s, in);
512 : }
513 119009 : return ret;
514 : }
515 :
516 : /*
517 : * Niels Nes, Martin Kersten
518 : *
519 : * Parallel bulk load for SQL
520 : * The COPY INTO command for SQL is heavily CPU bound, which means
521 : * that ideally we would like to exploit the multi-cores to do that
522 : * work in parallel.
523 : * Complicating factors are the initial record offset, the
524 : * possible variable length of the input, and the original sort order
525 : * that should preferable be maintained.
526 : *
527 : * The code below consists of a file reader, which breaks up the
528 : * file into chunks of distinct rows. Then multiple parallel threads
529 : * grab them, and break them on the field boundaries.
530 : * After all fields are identified this way, the columns are converted
531 : * and stored in the BATs.
532 : *
533 : * The threads get a reference to a private copy of the READERtask.
534 : * It includes a list of columns they should handle. This is a basis
535 : * to distributed cheap and expensive columns over threads.
536 : *
537 : * The file reader overlaps IO with updates of the BAT.
538 : * Also the buffer size of the block stream might be a little small for
539 : * this task (1MB). It has been increased to 8MB, which indeed improved.
540 : *
541 : * The work divider allocates subtasks to threads based on the
542 : * observed time spending so far.
543 : */
544 :
545 : #define BREAKROW 1
546 : #define UPDATEBAT 2
547 : #define ENDOFCOPY 3
548 :
549 : typedef struct {
550 : Client cntxt;
551 : int id; /* for self reference */
552 : int state; /* row break=1 , 2 = update bat */
553 : int workers; /* how many concurrent ones */
554 : int error; /* error during row break */
555 : int next;
556 : int limit;
557 : BUN cnt, maxrow; /* first row in file chunk. */
558 : lng skip; /* number of lines to be skipped */
559 : lng *time, wtime; /* time per col + time per thread */
560 : int rounds; /* how often did we divide the work */
561 : bool ateof; /* io control */
562 : bool from_stdin;
563 : bool escape; /* whether to handle \ escapes */
564 : bool besteffort;
565 : char quote;
566 : bstream *b;
567 : stream *out;
568 : MT_Id tid;
569 : MT_Sema producer; /* reader waits for call */
570 : MT_Sema consumer; /* reader waits for call */
571 : MT_Sema sema; /* threads wait for work , negative next implies exit */
572 : MT_Sema reply; /* let reader continue */
573 : Tablet *as;
574 : char *errbuf;
575 : const char *csep, *rsep;
576 : size_t seplen, rseplen;
577 :
578 : char *base[MAXBUFFERS], *input[MAXBUFFERS]; /* buffers for row splitter and tokenizer */
579 : size_t rowlimit[MAXBUFFERS]; /* determines maximal record length buffer */
580 : char **rows[MAXBUFFERS];
581 : lng *startlineno[MAXBUFFERS];
582 : int top[MAXBUFFERS]; /* number of rows in this buffer */
583 : int cur; /* current buffer used by splitter and update threads */
584 :
585 : int *cols; /* columns to handle */
586 : char ***fields;
587 : bte *rowerror;
588 : int errorcnt;
589 : bool aborted;
590 : bool set_qry_ctx;
591 : } READERtask;
592 :
593 : /* returns TRUE if there is/might be more */
594 : static bool
595 102765 : tablet_read_more(READERtask *task)
596 : {
597 102765 : bstream *in = task->b;
598 102765 : stream *out = task->out;
599 102765 : size_t n = task->b->size;
600 102765 : if (out) {
601 101174 : do {
602 : /* query is not finished ask for more */
603 : /* we need more query text */
604 101174 : if (bstream_next(in) < 0)
605 : return false;
606 101174 : if (in->eof) {
607 101172 : if (bstream_getoob(in)) {
608 0 : task->aborted = true;
609 0 : return false;
610 : }
611 101172 : if (mnstr_write(out, PROMPT2, sizeof(PROMPT2) - 1, 1) == 1)
612 101172 : mnstr_flush(out, MNSTR_FLUSH_DATA);
613 101172 : in->eof = false;
614 : /* we need more query text */
615 101172 : if (bstream_next(in) <= 0)
616 : return false;
617 : }
618 101172 : } while (in->len <= in->pos);
619 1591 : } else if (bstream_read(in, n) <= 0) {
620 : return false;
621 : }
622 : return true;
623 : }
624 :
625 : /* note, the column value that is passed here is the 0 based value; the
626 : * lineno value on the other hand is 1 based */
627 : static void
628 33 : tablet_error(READERtask *task, lng idx, lng lineno, int col, const char *msg,
629 : const char *fcn)
630 : {
631 33 : assert(is_int_nil(col) || col >= 0);
632 33 : assert(is_lng_nil(lineno) || lineno >= 1);
633 33 : MT_lock_set(&errorlock);
634 33 : if (task->cntxt->error_row != NULL
635 33 : && (BUNappend(task->cntxt->error_row, &lineno, false) != GDK_SUCCEED
636 33 : || BUNappend(task->cntxt->error_fld, &(int) { col + 1 },
637 : false) != GDK_SUCCEED
638 33 : || BUNappend(task->cntxt->error_msg, msg, false) != GDK_SUCCEED
639 33 : || BUNappend(task->cntxt->error_input, fcn,
640 : false) != GDK_SUCCEED)) {
641 0 : task->besteffort = false;
642 : }
643 33 : if (!is_lng_nil(idx) && task->rowerror && idx < task->limit)
644 33 : task->rowerror[idx]++;
645 33 : if (task->as->error == NULL) {
646 58 : const char *colnam = is_int_nil(col) || col < 0
647 29 : || (BUN) col >= task->as->nr_attrs ? NULL : task->as->format[col].name;
648 29 : if (msg == NULL) {
649 0 : task->besteffort = false;
650 29 : } else if (!is_lng_nil(lineno)) {
651 29 : if (!is_int_nil(col)) {
652 27 : if (colnam)
653 27 : task->as->error = createException(MAL, "sql.copy_from",
654 : "line " LLFMT ": column %d %s: %s",
655 : lineno, col + 1, colnam, msg);
656 : else
657 0 : task->as->error = createException(MAL, "sql.copy_from",
658 : "line " LLFMT ": column %d: %s",
659 : lineno, col + 1, msg);
660 : } else {
661 2 : task->as->error = createException(MAL, "sql.copy_from",
662 : "line " LLFMT ": %s", lineno, msg);
663 : }
664 : } else {
665 0 : if (!is_int_nil(col)) {
666 0 : if (colnam)
667 0 : task->as->error = createException(MAL, "sql.copy_from",
668 : "column %d %s: %s", col + 1, colnam,
669 : msg);
670 : else
671 0 : task->as->error = createException(MAL, "sql.copy_from",
672 : "column %d: %s", col + 1, msg);
673 : } else {
674 0 : task->as->error = createException(MAL, "sql.copy_from", "%s", msg);
675 : }
676 : }
677 : }
678 33 : task->errorcnt++;
679 33 : MT_lock_unset(&errorlock);
680 33 : }
681 :
682 : /*
683 : * The row is broken into pieces directly on their field separators. It assumes that we have
684 : * the record in the cache already, so we can do most work quickly.
685 : * Furthermore, it assume a uniform (SQL) pattern, without whitespace skipping, but with quote and separator.
686 : */
687 :
688 : static size_t
689 114 : mystrlen(const char *s)
690 : {
691 : /* Calculate and return the space that is needed for the function
692 : * mycpstr below to do its work. */
693 114 : size_t len = 0;
694 114 : const char *s0 = s;
695 :
696 30869 : while (*s) {
697 30755 : if ((*s & 0x80) == 0) {
698 : ;
699 6 : } else if ((*s & 0xC0) == 0x80) {
700 : /* continuation byte */
701 0 : len += 3;
702 6 : } else if ((*s & 0xE0) == 0xC0) {
703 : /* two-byte sequence */
704 6 : if ((s[1] & 0xC0) != 0x80)
705 0 : len += 3;
706 : else
707 6 : s += 2;
708 0 : } else if ((*s & 0xF0) == 0xE0) {
709 : /* three-byte sequence */
710 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
711 0 : len += 3;
712 : else
713 0 : s += 3;
714 0 : } else if ((*s & 0xF8) == 0xF0) {
715 : /* four-byte sequence */
716 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
717 0 : || (s[3] & 0xC0) != 0x80)
718 0 : len += 3;
719 : else
720 0 : s += 4;
721 : } else {
722 : /* not a valid start byte */
723 0 : len += 3;
724 : }
725 30755 : s++;
726 : }
727 114 : len += s - s0;
728 114 : return len;
729 : }
730 :
731 : static char *
732 180 : mycpstr(char *t, const char *s)
733 : {
734 : /* Copy the string pointed to by s into the buffer pointed to by
735 : * t, and return a pointer to the NULL byte at the end. During
736 : * the copy we translate incorrect UTF-8 sequences to escapes
737 : * looking like <XX> where XX is the hexadecimal representation of
738 : * the incorrect byte. The buffer t needs to be large enough to
739 : * hold the result, but the correct length can be calculated by
740 : * the function mystrlen above.*/
741 31007 : while (*s) {
742 30827 : if ((*s & 0x80) == 0) {
743 30821 : *t++ = *s++;
744 6 : } else if ((*s & 0xC0) == 0x80) {
745 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
746 6 : } else if ((*s & 0xE0) == 0xC0) {
747 : /* two-byte sequence */
748 6 : if ((s[1] & 0xC0) != 0x80)
749 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
750 : else {
751 6 : *t++ = *s++;
752 6 : *t++ = *s++;
753 : }
754 0 : } else if ((*s & 0xF0) == 0xE0) {
755 : /* three-byte sequence */
756 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
757 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
758 : else {
759 0 : *t++ = *s++;
760 0 : *t++ = *s++;
761 0 : *t++ = *s++;
762 : }
763 0 : } else if ((*s & 0xF8) == 0xF0) {
764 : /* four-byte sequence */
765 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
766 0 : || (s[3] & 0xC0) != 0x80)
767 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
768 : else {
769 0 : *t++ = *s++;
770 0 : *t++ = *s++;
771 0 : *t++ = *s++;
772 0 : *t++ = *s++;
773 : }
774 : } else {
775 : /* not a valid start byte */
776 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
777 : }
778 : }
779 180 : *t = 0;
780 180 : return t;
781 : }
782 :
783 : static str
784 31 : SQLload_error(READERtask *task, lng idx, BUN attrs)
785 : {
786 31 : str line;
787 31 : char *s;
788 31 : size_t sz = 0;
789 31 : BUN i;
790 :
791 128 : for (i = 0; i < attrs; i++) {
792 97 : if (task->fields[i][idx])
793 87 : sz += mystrlen(task->fields[i][idx]);
794 97 : sz += task->seplen;
795 : }
796 :
797 31 : s = line = GDKmalloc(sz + task->rseplen + 1);
798 31 : if (line == NULL) {
799 0 : tablet_error(task, idx, lng_nil, int_nil, "SQLload malloc error",
800 : "SQLload_error");
801 0 : return NULL;
802 : }
803 128 : for (i = 0; i < attrs; i++) {
804 97 : if (task->fields[i][idx])
805 87 : s = mycpstr(s, task->fields[i][idx]);
806 97 : if (i < attrs - 1)
807 66 : s = mycpstr(s, task->csep);
808 : }
809 31 : strcpy(s, task->rsep);
810 31 : return line;
811 : }
812 :
813 : /*
814 : * The parsing of the individual values is straightforward. If the value represents
815 : * the null-replacement string then we grab the underlying nil.
816 : * If the string starts with the quote identified from SQL, we locate the tail
817 : * and interpret the body.
818 : *
819 : * If inserting fails, we return -1; if the value cannot be parsed, we
820 : * return -1 if besteffort is not set, otherwise we return 0, but in
821 : * either case an entry is added to the error table.
822 : */
823 : static inline int
824 294410907 : SQLinsert_val(READERtask *task, int col, int idx)
825 : {
826 294410907 : Column *fmt = task->as->format + col;
827 294410907 : const void *adt;
828 294410907 : char buf[BUFSIZ];
829 294410907 : char *s = task->fields[col][idx];
830 294410907 : char *err = NULL;
831 294410907 : int ret = 0;
832 :
833 : /* include testing on the terminating null byte !! */
834 294410907 : if (s == NULL) {
835 6294091 : adt = fmt->nildata;
836 6294091 : fmt->c->tnonil = false;
837 : } else {
838 288116816 : if (task->escape) {
839 287916713 : size_t slen = strlen(s) + 1;
840 287916713 : char *data = slen <= sizeof(buf) ? buf : GDKmalloc(strlen(s) + 1);
841 18 : if (data == NULL
842 312931752 : || GDKstrFromStr((unsigned char *) data, (unsigned char *) s,
843 287916713 : strlen(s), '\0') < 0)
844 : adt = NULL;
845 : else
846 312931750 : adt = fmt->frstr(fmt, fmt->adt, data);
847 306952033 : if (data != buf)
848 18 : GDKfree(data);
849 : } else
850 200103 : adt = fmt->frstr(fmt, fmt->adt, s);
851 : }
852 :
853 313446226 : lng row = BATcount(fmt->c) + 1;
854 313446226 : if (adt == NULL) {
855 27 : if (task->rowerror) {
856 27 : err = SQLload_error(task, idx, task->as->nr_attrs);
857 27 : if (s) {
858 27 : size_t slen = mystrlen(s);
859 27 : char *scpy = GDKmalloc(slen + 1);
860 27 : if (scpy == NULL) {
861 0 : tablet_error(task, idx, row, col,
862 : SQLSTATE(HY013) MAL_MALLOC_FAIL, err);
863 0 : task->besteffort = false; /* no longer best effort */
864 0 : GDKfree(err);
865 0 : return -1;
866 : }
867 27 : mycpstr(scpy, s);
868 27 : s = scpy;
869 : }
870 27 : snprintf(buf, sizeof(buf), "'%s' expected%s%s%s", fmt->type,
871 : s ? " in '" : "", s ? s : "", s ? "'" : "");
872 27 : GDKfree(s);
873 27 : tablet_error(task, idx, row, col, buf, err);
874 27 : GDKfree(err);
875 27 : if (!task->besteffort)
876 : return -1;
877 : }
878 6 : ret = -!task->besteffort; /* yep, two unary operators ;-) */
879 : /* replace it with a nil */
880 6 : adt = fmt->nildata;
881 6 : fmt->c->tnonil = false;
882 : }
883 313446205 : if (bunfastapp(fmt->c, adt) == GDK_SUCCEED)
884 : return ret;
885 :
886 : /* failure */
887 0 : if (task->rowerror) {
888 0 : char *msg = GDKerrbuf;
889 0 : err = SQLload_error(task, idx, task->as->nr_attrs);
890 0 : tablet_error(task, idx, row, col, msg
891 0 : && *msg ? msg : "insert failed", err);
892 0 : GDKfree(err);
893 : }
894 0 : task->besteffort = false; /* no longer best effort */
895 0 : return -1;
896 : }
897 :
898 : static int
899 322802 : SQLworker_column(READERtask *task, int col)
900 : {
901 322802 : int i;
902 322802 : Column *fmt = task->as->format;
903 :
904 322802 : if (fmt[col].c == NULL)
905 : return 0;
906 :
907 : /* watch out for concurrent threads */
908 322796 : MT_lock_set(&mal_copyLock);
909 327228 : if (!fmt[col].skip
910 327228 : && BATcapacity(fmt[col].c) < BATcount(fmt[col].c) + task->next) {
911 219 : if (BATextend(fmt[col].c, BATgrows(fmt[col].c) + task->limit) != GDK_SUCCEED) {
912 0 : tablet_error(task, lng_nil, lng_nil, col,
913 : "Failed to extend the BAT\n", "SQLworker_column");
914 0 : MT_lock_unset(&mal_copyLock);
915 0 : return -1;
916 : }
917 : }
918 327228 : MT_lock_unset(&mal_copyLock);
919 :
920 293120967 : for (i = 0; i < task->top[task->cur]; i++) {
921 292469511 : if (!fmt[col].skip && SQLinsert_val(task, col, i) < 0) {
922 21 : BATsetcount(fmt[col].c, BATcount(fmt[col].c));
923 21 : return -1;
924 : }
925 : }
926 324228 : BATsetcount(fmt[col].c, BATcount(fmt[col].c));
927 :
928 324228 : return 0;
929 : }
930 :
931 : /*
932 : * The rows are broken on the column separator. Any error is shown and reflected with
933 : * setting the reference of the offending row fields to NULL.
934 : * This allows the loading to continue, skipping the minimal number of rows.
935 : * The details about the locations can be inspected from the error table.
936 : * We also trim the quotes around strings.
937 : */
938 : static int
939 135984401 : SQLload_parse_row(READERtask *task, int idx)
940 : {
941 135984401 : BUN i;
942 135984401 : char errmsg[BUFSIZ];
943 135984401 : char ch = *task->csep;
944 135984401 : char *row = task->rows[task->cur][idx];
945 135984401 : lng startlineno = task->startlineno[task->cur][idx];
946 135984401 : Tablet *as = task->as;
947 135984401 : Column *fmt = as->format;
948 135984401 : bool error = false;
949 135984401 : str errline = NULL;
950 :
951 135984401 : assert(idx < task->top[task->cur]);
952 135984401 : assert(row);
953 135984401 : errmsg[0] = 0;
954 :
955 135984401 : if (task->quote || task->seplen != 1) {
956 10834242 : for (i = 0; i < as->nr_attrs; i++) {
957 9089811 : bool quote = false;
958 9089811 : task->fields[i][idx] = row;
959 : /* recognize fields starting with a quote, keep them */
960 9089811 : if (*row && *row == task->quote) {
961 4666104 : quote = true;
962 4666104 : task->fields[i][idx] = row + 1;
963 4666104 : row = tablet_skip_string(row + 1, task->quote, task->escape);
964 :
965 3906089 : if (!row) {
966 0 : errline = SQLload_error(task, idx, i + 1);
967 0 : snprintf(errmsg, BUFSIZ, "Quote (%c) missing", task->quote);
968 0 : tablet_error(task, idx, startlineno, (int) i, errmsg,
969 : errline);
970 0 : GDKfree(errline);
971 0 : error = true;
972 0 : goto errors1;
973 : } else
974 3906089 : *row++ = 0;
975 : }
976 :
977 : /* eat away the column separator */
978 48499340 : for (; *row; row++)
979 47451156 : if (*row == '\\' && task->escape) {
980 2 : if (row[1])
981 2 : row++;
982 47451154 : } else if (*row == ch
983 7281612 : && (task->seplen == 1
984 4 : || strncmp(row, task->csep,
985 : task->seplen) == 0)) {
986 7281612 : *row = 0;
987 7281612 : row += task->seplen;
988 7281612 : goto endoffieldcheck;
989 : }
990 :
991 : /* not enough fields */
992 1048184 : if (i < as->nr_attrs - 1) {
993 0 : errline = SQLload_error(task, idx, i + 1);
994 : /* it's the next value that is missing */
995 0 : tablet_error(task, idx, startlineno, (int) i + 1,
996 : "Column value missing", errline);
997 0 : GDKfree(errline);
998 0 : error = true;
999 0 : errors1:
1000 : /* we save all errors detected as NULL values */
1001 0 : for (; i < as->nr_attrs; i++)
1002 0 : task->fields[i][idx] = NULL;
1003 0 : i--;
1004 : }
1005 1048184 : endoffieldcheck:
1006 8329796 : ;
1007 : /* check for user defined NULL string */
1008 8329796 : if ((!quote || !fmt->null_length) && fmt->nullstr
1009 6815167 : && task->fields[i][idx]
1010 6815167 : && strncasecmp(task->fields[i][idx], fmt->nullstr,
1011 6815167 : fmt->null_length + 1) == 0)
1012 1878688 : task->fields[i][idx] = 0;
1013 : }
1014 : } else {
1015 : assert(!task->quote);
1016 : assert(task->seplen == 1);
1017 331604194 : for (i = 0; i < as->nr_attrs; i++) {
1018 198124239 : task->fields[i][idx] = row;
1019 :
1020 : /* eat away the column separator */
1021 1850921688 : for (; *row; row++)
1022 1717815355 : if (*row == '\\' && task->escape) {
1023 381 : if (row[1])
1024 381 : row++;
1025 1717814974 : } else if (*row == ch) {
1026 65017906 : *row = 0;
1027 65017906 : row++;
1028 65017906 : goto endoffield2;
1029 : }
1030 :
1031 : /* not enough fields */
1032 133106333 : if (i < as->nr_attrs - 1) {
1033 3 : errline = SQLload_error(task, idx, i + 1);
1034 : /* it's the next value that is missing */
1035 3 : tablet_error(task, idx, startlineno, (int) i + 1,
1036 : "Column value missing", errline);
1037 3 : GDKfree(errline);
1038 3 : error = true;
1039 : /* we save all errors detected */
1040 12 : for (; i < as->nr_attrs; i++)
1041 6 : task->fields[i][idx] = NULL;
1042 3 : i--;
1043 : }
1044 133106330 : endoffield2:
1045 198124239 : ;
1046 : /* check for user defined NULL string */
1047 198124239 : if (fmt->nullstr && task->fields[i][idx]
1048 183360338 : && strncasecmp(task->fields[i][idx], fmt->nullstr,
1049 183360338 : fmt->null_length + 1) == 0) {
1050 3603038 : task->fields[i][idx] = 0;
1051 : }
1052 : }
1053 : }
1054 : /* check for too many values as well */
1055 135224386 : if (row && *row && i == as->nr_attrs) {
1056 1 : errline = SQLload_error(task, idx, task->as->nr_attrs);
1057 1 : snprintf(errmsg, BUFSIZ, "Leftover data '%s'", row);
1058 1 : tablet_error(task, idx, startlineno, (int) i, errmsg, errline);
1059 1 : GDKfree(errline);
1060 1 : error = true;
1061 : }
1062 135224386 : return error ? -1 : 0;
1063 : }
1064 :
1065 : static void
1066 1411 : SQLworker(void *arg)
1067 : {
1068 1411 : READERtask *task = (READERtask *) arg;
1069 1411 : unsigned int i;
1070 1411 : int j, piece;
1071 1411 : lng t0;
1072 :
1073 1411 : GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
1074 1411 : GDKclrerr();
1075 1411 : task->errbuf = GDKerrbuf;
1076 1411 : MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
1077 :
1078 1410 : MT_sema_down(&task->sema);
1079 610025 : while (task->top[task->cur] >= 0) {
1080 : /* stage one, break the rows spread the work over the workers */
1081 610025 : switch (task->state) {
1082 304422 : case BREAKROW:
1083 304422 : t0 = GDKusec();
1084 304179 : piece = (task->top[task->cur] + task->workers) / task->workers;
1085 :
1086 304179 : for (j = piece * task->id;
1087 135652308 : j < task->top[task->cur] && j < piece * (task->id + 1); j++)
1088 135349211 : if (task->rows[task->cur][j]) {
1089 135349211 : if (SQLload_parse_row(task, j) < 0) {
1090 4 : task->errorcnt++;
1091 : // early break unless best effort
1092 4 : if (!task->besteffort) {
1093 1 : for (j++;
1094 3 : j < task->top[task->cur]
1095 3 : && j < piece * (task->id + 1); j++)
1096 8 : for (i = 0; i < task->as->nr_attrs; i++)
1097 6 : task->fields[i][j] = NULL;
1098 : break;
1099 : }
1100 : }
1101 : }
1102 303098 : task->wtime = GDKusec() - t0;
1103 305073 : break;
1104 304210 : case UPDATEBAT:
1105 304210 : if (!task->besteffort && task->errorcnt)
1106 : break;
1107 : /* stage two, updating the BATs */
1108 1282153 : for (i = 0; i < task->as->nr_attrs; i++)
1109 977960 : if (task->cols[i]) {
1110 324906 : t0 = GDKusec();
1111 322524 : if (SQLworker_column(task, task->cols[i] - 1) < 0)
1112 : break;
1113 324707 : t0 = GDKusec() - t0;
1114 324876 : task->time[i] += t0;
1115 324876 : task->wtime += t0;
1116 : }
1117 : break;
1118 1393 : case ENDOFCOPY:
1119 1393 : MT_sema_up(&task->reply);
1120 1394 : goto do_return;
1121 : }
1122 609274 : MT_sema_up(&task->reply);
1123 1219850 : MT_sema_down(&task->sema);
1124 : }
1125 0 : MT_sema_up(&task->reply);
1126 :
1127 1394 : do_return:
1128 1394 : GDKfree(GDKerrbuf);
1129 1408 : GDKsetbuf(NULL);
1130 1399 : MT_thread_set_qry_ctx(NULL);
1131 1399 : }
1132 :
1133 : static void
1134 103328 : SQLworkdivider(READERtask *task, READERtask *ptask, int nr_attrs, int threads)
1135 : {
1136 103328 : int i, j, mi;
1137 103328 : lng loc[MAXWORKERS];
1138 :
1139 : /* after a few rounds we stick to the work assignment */
1140 103328 : if (task->rounds > 8)
1141 103199 : return;
1142 : /* simple round robin the first time */
1143 2721 : if (threads == 1 || task->rounds++ == 0) {
1144 16382 : for (i = j = 0; i < nr_attrs; i++, j++)
1145 13790 : ptask[j % threads].cols[i] = task->cols[i];
1146 : return;
1147 : }
1148 129 : memset(loc, 0, sizeof(loc));
1149 : /* use of load directives */
1150 1861 : for (i = 0; i < nr_attrs; i++)
1151 13172 : for (j = 0; j < threads; j++)
1152 11440 : ptask[j].cols[i] = 0;
1153 :
1154 : /* now allocate the work to the threads */
1155 1861 : for (i = 0; i < nr_attrs; i++, j++) {
1156 : mi = 0;
1157 11440 : for (j = 1; j < threads; j++)
1158 9708 : if (loc[j] < loc[mi])
1159 2652 : mi = j;
1160 :
1161 1732 : ptask[mi].cols[i] = task->cols[i];
1162 1732 : loc[mi] += task->time[i];
1163 : }
1164 : /* reset the timer */
1165 1861 : for (i = 0; i < nr_attrs; i++, j++)
1166 1732 : task->time[i] = 0;
1167 : }
1168 :
1169 : /*
1170 : * Reading is handled by a separate task as a preparation for more parallelism.
1171 : * A buffer is filled with proper rows.
1172 : * If we are reading from a file then a double buffering scheme ia activated.
1173 : * Reading from the console (stdin) remains single buffered only.
1174 : * If we end up with unfinished records, then the rowlimit will terminate the process.
1175 : */
1176 :
1177 : typedef unsigned char (*dfa_t)[256];
1178 :
1179 : static dfa_t
1180 1070 : mkdfa(const unsigned char *sep, size_t seplen)
1181 : {
1182 1070 : dfa_t dfa;
1183 1070 : size_t i, j, k;
1184 :
1185 1070 : dfa = GDKzalloc(seplen * sizeof(*dfa));
1186 1070 : if (dfa == NULL)
1187 : return NULL;
1188 : /* Each character in the separator string advances the state by
1189 : * one. If state reaches seplen, the separator was recognized.
1190 : *
1191 : * The first loop and the nested loop make sure that if in any
1192 : * state we encounter an invalid character, but part of what we've
1193 : * matched so far is a prefix of the separator, we go to the
1194 : * appropriate state. */
1195 2162 : for (i = 0; i < seplen; i++)
1196 1092 : dfa[i][sep[0]] = 1;
1197 2162 : for (j = 0; j < seplen; j++) {
1198 1092 : dfa[j][sep[j]] = (unsigned char) (j + 1);
1199 1114 : for (k = 0; k < j; k++) {
1200 44 : for (i = 0; i < j - k; i++)
1201 22 : if (sep[k + i] != sep[i])
1202 : break;
1203 22 : if (i == j - k && dfa[j][sep[i]] <= i)
1204 0 : dfa[j][sep[i]] = (unsigned char) (i + 1);
1205 : }
1206 : }
1207 : return dfa;
1208 : }
1209 :
1210 : #ifdef __GNUC__
1211 : /* __builtin_expect returns its first argument; it is expected to be
1212 : * equal to the second argument */
1213 : #define unlikely(expr) __builtin_expect((expr) != 0, 0)
1214 : #define likely(expr) __builtin_expect((expr) != 0, 1)
1215 : #else
1216 : #define unlikely(expr) (expr)
1217 : #define likely(expr) (expr)
1218 : #endif
1219 :
1220 : static void
1221 1070 : SQLproducer(void *p)
1222 : {
1223 1070 : READERtask *task = (READERtask *) p;
1224 1070 : bool consoleinput = false;
1225 1070 : int cur = 0; // buffer being filled
1226 1070 : bool blocked[MAXBUFFERS] = { false };
1227 1070 : bool ateof[MAXBUFFERS] = { false };
1228 1070 : BUN cnt = 0, bufcnt[MAXBUFFERS] = { 0 };
1229 1070 : char *end = NULL, *e = NULL, *s = NULL, *base;
1230 1070 : const char *rsep = task->rsep;
1231 1070 : size_t rseplen = strlen(rsep), partial = 0;
1232 1070 : char quote = task->quote;
1233 1070 : dfa_t rdfa;
1234 1070 : lng rowno = 0;
1235 1070 : lng lineno = 1;
1236 1070 : lng startlineno = 1;
1237 1070 : int more = 0;
1238 :
1239 1070 : MT_sema_down(&task->producer);
1240 1070 : if (task->id < 0) {
1241 : return;
1242 : }
1243 :
1244 1070 : MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
1245 1070 : rdfa = mkdfa((const unsigned char *) rsep, rseplen);
1246 1070 : if (rdfa == NULL) {
1247 0 : tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate memory",
1248 : "");
1249 0 : ateof[cur] = true;
1250 0 : goto reportlackofinput;
1251 : }
1252 :
1253 : /* TRC_DEBUG(MAL_SERVER, "SQLproducer started size '%zu' and len '%zu'\n", task->b->size, task->b->len);*/
1254 :
1255 1070 : base = end = s = task->input[cur];
1256 1070 : *s = 0;
1257 1070 : task->cur = cur;
1258 1070 : if (task->as->filename == NULL) {
1259 742 : consoleinput = true;
1260 742 : goto parseSTDIN;
1261 : }
1262 205202 : for (;;) {
1263 102765 : startlineno = lineno;
1264 102765 : ateof[cur] = !tablet_read_more(task);
1265 :
1266 : // we may be reading from standard input and may be out of input
1267 : // warn the consumers
1268 102765 : if (task->aborted || ((lineno & 8191) == 0 && bstream_getoob(task->cntxt->fdin))) {
1269 0 : tablet_error(task, rowno, lineno, int_nil,
1270 : "problem reported by client", s);
1271 0 : ateof[cur] = true;
1272 0 : goto reportlackofinput;
1273 : }
1274 :
1275 102765 : if (ateof[cur] && partial) {
1276 1 : if (unlikely(partial)) {
1277 1 : tablet_error(task, rowno, lineno, int_nil,
1278 : "incomplete record at end of file", s);
1279 1 : task->b->pos += partial;
1280 : }
1281 1 : goto reportlackofinput;
1282 : }
1283 :
1284 102764 : if (task->errbuf && task->errbuf[0]) {
1285 0 : if (unlikely(GDKerrbuf && GDKerrbuf[0])) {
1286 0 : tablet_error(task, rowno, lineno, int_nil, GDKerrbuf,
1287 : "SQLload_file");
1288 : /* TRC_DEBUG(MAL_SERVER, "Bailout on SQLload\n");*/
1289 0 : ateof[cur] = true;
1290 0 : break;
1291 : }
1292 : }
1293 :
1294 102764 : parseSTDIN:
1295 :
1296 : /* copy the stream buffer into the input buffer, which is guaranteed larger, but still limited */
1297 103506 : partial = 0;
1298 103506 : task->top[cur] = 0;
1299 103506 : s = task->input[cur];
1300 103506 : base = end;
1301 : /* avoid too long records */
1302 103506 : if (unlikely(end - s + task->b->len - task->b->pos >= task->rowlimit[cur])) {
1303 : /* the input buffer should be extended, but 'base' is not shared
1304 : between the threads, which we can not now update.
1305 : Mimick an ateof instead; */
1306 0 : tablet_error(task, rowno, lineno, int_nil, "record too long", "");
1307 0 : ateof[cur] = true;
1308 : /* TRC_DEBUG(MAL_SERVER, "Bailout on SQLload confronted with too large record\n");*/
1309 0 : goto reportlackofinput;
1310 : }
1311 103506 : memcpy(end, task->b->buf + task->b->pos, task->b->len - task->b->pos);
1312 103506 : end = end + task->b->len - task->b->pos;
1313 103506 : *end = '\0'; /* this is safe, as the stream ensures an extra byte */
1314 : /* Note that we rescan from the start of a record (the last
1315 : * partial buffer from the previous iteration), even if in the
1316 : * previous iteration we have already established that there
1317 : * is no record separator in the first, perhaps significant,
1318 : * part of the buffer. This is because if the record separator
1319 : * is longer than one byte, it is too complex (i.e. would
1320 : * require more state) to be sure what the state of the quote
1321 : * status is when we back off a few bytes from where the last
1322 : * scan ended (we need to back off some since we could be in
1323 : * the middle of the record separator). If this is too
1324 : * costly, we have to rethink the matter. */
1325 103506 : if (task->from_stdin && *s == '\n' && task->maxrow == BUN_MAX) {
1326 0 : ateof[cur] = true;
1327 0 : goto reportlackofinput;
1328 : }
1329 145043254 : for (e = s; *e && e < end && cnt < task->maxrow;) {
1330 : /* tokenize the record completely
1331 : *
1332 : * The format of the input should comply to the following
1333 : * grammar rule [ [[quote][[esc]char]*[quote]csep]*rsep]*
1334 : * where quote is a single user-defined character.
1335 : * Within the quoted fields a character may be escaped
1336 : * with a backslash. The correct number of fields should
1337 : * be supplied. In the first phase we simply break the
1338 : * rows at the record boundary. */
1339 : int nutf = 0;
1340 : int m = 0;
1341 : bool bs = false;
1342 : char q = 0;
1343 : size_t i = 0;
1344 3115574395 : while (*e) {
1345 3115573369 : if (task->skip > 0) {
1346 : /* no interpretation of data we're skipping, just
1347 : * look for newline */
1348 1827 : if (*e == '\n') {
1349 27 : lineno++;
1350 27 : break;
1351 : }
1352 : } else {
1353 : /* check for correctly encoded UTF-8 */
1354 3115571542 : if (nutf > 0) {
1355 1725 : if (unlikely((*e & 0xC0) != 0x80))
1356 1 : goto badutf8;
1357 3448 : if (unlikely(m != 0 && (*e & m) == 0))
1358 0 : goto badutf8;
1359 1724 : m = 0;
1360 1724 : nutf--;
1361 3115569817 : } else if ((*e & 0x80) != 0) {
1362 1598 : if ((*e & 0xE0) == 0xC0) {
1363 1476 : nutf = 1;
1364 1476 : if (unlikely((e[0] & 0x1E) == 0))
1365 0 : goto badutf8;
1366 122 : } else if ((*e & 0xF0) == 0xE0) {
1367 117 : nutf = 2;
1368 117 : if ((e[0] & 0x0F) == 0)
1369 3 : m = 0x20;
1370 5 : } else if (likely((*e & 0xF8) == 0xF0)) {
1371 5 : nutf = 3;
1372 5 : if ((e[0] & 0x07) == 0)
1373 5 : m = 0x30;
1374 : } else {
1375 0 : goto badutf8;
1376 : }
1377 3115568219 : } else if (*e == '\n')
1378 144939578 : lineno++;
1379 : /* check for quoting and the row separator */
1380 3115571541 : if (bs) {
1381 : bs = false;
1382 3115028300 : } else if (task->escape && *e == '\\') {
1383 : bs = true;
1384 : i = 0;
1385 3114485059 : } else if (*e == q) {
1386 : q = 0;
1387 3109231494 : } else if (*e == quote) {
1388 : q = quote;
1389 : i = 0;
1390 3103977847 : } else if (q == 0) {
1391 2924500598 : i = rdfa[i][(unsigned char) *e];
1392 2924500598 : if (i == rseplen)
1393 : break;
1394 : }
1395 : }
1396 2970633620 : e++;
1397 : }
1398 144940774 : if (*e == 0) {
1399 1026 : partial = e - s;
1400 : /* found an incomplete record, saved for next round */
1401 1026 : if (unlikely(s + partial < end)) {
1402 : /* found a EOS in the input */
1403 0 : tablet_error(task, rowno, startlineno, int_nil,
1404 : "record too long (EOS found)", "");
1405 0 : ateof[cur] = true;
1406 0 : goto reportlackofinput;
1407 : }
1408 : break;
1409 : } else {
1410 144939748 : rowno++;
1411 144939748 : if (task->skip > 0) {
1412 27 : task->skip--;
1413 : } else {
1414 144939721 : if (cnt < task->maxrow) {
1415 144939721 : task->startlineno[cur][task->top[cur]] = startlineno;
1416 144939721 : task->rows[cur][task->top[cur]++] = s;
1417 144939721 : startlineno = lineno;
1418 144939721 : cnt++;
1419 : }
1420 144939721 : *(e + 1 - rseplen) = 0;
1421 : }
1422 144939748 : s = ++e;
1423 144939748 : task->b->pos += (size_t) (e - base);
1424 144939748 : base = e;
1425 144939748 : if (task->top[cur] == task->limit)
1426 : break;
1427 : }
1428 : }
1429 :
1430 102479 : reportlackofinput:
1431 : /* TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
1432 :
1433 103507 : if (consoleinput) {
1434 101916 : task->cur = cur;
1435 101916 : task->ateof = ateof[cur];
1436 101916 : task->cnt = bufcnt[cur];
1437 : /* tell consumer to go ahead */
1438 101916 : MT_sema_up(&task->consumer);
1439 : /* then wait until it is done */
1440 101916 : MT_sema_down(&task->producer);
1441 101916 : if (cnt == task->maxrow) {
1442 739 : GDKfree(rdfa);
1443 739 : MT_thread_set_qry_ctx(NULL);
1444 739 : return;
1445 : }
1446 : } else {
1447 1591 : assert(!blocked[cur]);
1448 1591 : if (blocked[(cur + 1) % MAXBUFFERS]) {
1449 : /* first wait until other buffer is done */
1450 : /* TRC_DEBUG(MAL_SERVER, "Wait for consumers to finish buffer: %d\n", (cur + 1) % MAXBUFFERS);*/
1451 :
1452 1263 : MT_sema_down(&task->producer);
1453 1263 : blocked[(cur + 1) % MAXBUFFERS] = false;
1454 1263 : if (task->state == ENDOFCOPY) {
1455 0 : GDKfree(rdfa);
1456 0 : MT_thread_set_qry_ctx(NULL);
1457 0 : return;
1458 : }
1459 : }
1460 : /* other buffer is done, proceed with current buffer */
1461 1591 : assert(!blocked[(cur + 1) % MAXBUFFERS]);
1462 1591 : blocked[cur] = true;
1463 1591 : task->cur = cur;
1464 1591 : task->ateof = ateof[cur];
1465 1591 : task->cnt = bufcnt[cur];
1466 1591 : more = !ateof[cur] || (e && e < end
1467 0 : && task->top[cur] == task->limit);
1468 : /* TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
1469 :
1470 1591 : MT_sema_up(&task->consumer);
1471 :
1472 1591 : cur = (cur + 1) % MAXBUFFERS;
1473 : /* TRC_DEBUG(MAL_SERVER, "May continue with buffer: %d\n", cur);*/
1474 :
1475 1591 : if (cnt == task->maxrow) {
1476 181 : MT_sema_down(&task->producer);
1477 : /* TRC_DEBUG(MAL_SERVER, "Producer delivered all\n");*/
1478 181 : GDKfree(rdfa);
1479 181 : MT_thread_set_qry_ctx(NULL);
1480 181 : return;
1481 : }
1482 : }
1483 : /* TRC_DEBUG(MAL_SERVER, "Continue producer buffer: %d\n", cur);*/
1484 :
1485 : /* we ran out of input? */
1486 102587 : if (task->ateof && !more) {
1487 : /* TRC_DEBUG(MAL_SERVER, "Producer encountered eof\n");*/
1488 150 : GDKfree(rdfa);
1489 150 : MT_thread_set_qry_ctx(NULL);
1490 150 : return;
1491 : }
1492 : /* consumers ask us to stop? */
1493 102437 : if (task->state == ENDOFCOPY) {
1494 0 : GDKfree(rdfa);
1495 0 : MT_thread_set_qry_ctx(NULL);
1496 0 : return;
1497 : }
1498 102437 : bufcnt[cur] = cnt;
1499 : /* move the non-parsed correct row data to the head of the next buffer */
1500 102437 : end = s = task->input[cur];
1501 : }
1502 0 : if (unlikely(cnt < task->maxrow && task->maxrow != BUN_NONE)) {
1503 0 : char msg[256];
1504 0 : snprintf(msg, sizeof(msg), "incomplete record at end of file:%s\n", s);
1505 0 : task->as->error = GDKstrdup(msg);
1506 0 : tablet_error(task, rowno, startlineno, int_nil,
1507 : "incomplete record at end of file", s);
1508 0 : task->b->pos += partial;
1509 : }
1510 0 : GDKfree(rdfa);
1511 0 : MT_thread_set_qry_ctx(NULL);
1512 :
1513 0 : return;
1514 :
1515 1 : badutf8:
1516 1 : tablet_error(task, rowno, startlineno, int_nil,
1517 : "input not properly encoded UTF-8", "");
1518 1 : ateof[cur] = true;
1519 1 : goto reportlackofinput;
1520 : }
1521 :
1522 : static void
1523 1093 : create_rejects_table(Client cntxt)
1524 : {
1525 1093 : MT_lock_set(&mal_contextLock);
1526 1093 : if (cntxt->error_row == NULL) {
1527 451 : cntxt->error_row = COLnew(0, TYPE_lng, 0, TRANSIENT);
1528 451 : cntxt->error_fld = COLnew(0, TYPE_int, 0, TRANSIENT);
1529 451 : cntxt->error_msg = COLnew(0, TYPE_str, 0, TRANSIENT);
1530 451 : cntxt->error_input = COLnew(0, TYPE_str, 0, TRANSIENT);
1531 451 : if (cntxt->error_row == NULL || cntxt->error_fld == NULL
1532 451 : || cntxt->error_msg == NULL || cntxt->error_input == NULL) {
1533 0 : BBPreclaim(cntxt->error_row);
1534 0 : BBPreclaim(cntxt->error_fld);
1535 0 : BBPreclaim(cntxt->error_msg);
1536 0 : BBPreclaim(cntxt->error_input);
1537 0 : cntxt->error_row = cntxt->error_fld = cntxt->error_msg = cntxt->error_input = NULL;
1538 : }
1539 : }
1540 1093 : MT_lock_unset(&mal_contextLock);
1541 1093 : }
1542 :
1543 : BUN
1544 1070 : SQLload_file(Client cntxt, Tablet *as, bstream *b, stream *out,
1545 : const char *csep, const char *rsep, char quote, lng skip,
1546 : lng maxrow, int best, bool from_stdin, const char *tabnam,
1547 : bool escape)
1548 : {
1549 1070 : BUN cnt = 0, cntstart = 0, leftover = 0;
1550 1070 : int res = 0; /* < 0: error, > 0: success, == 0: continue processing */
1551 1070 : int j;
1552 1070 : BUN firstcol;
1553 1070 : BUN i, attr;
1554 1070 : READERtask task;
1555 1070 : READERtask ptask[MAXWORKERS];
1556 1070 : int threads = 1;
1557 1070 : lng tio, t1 = 0;
1558 1070 : char name[MT_NAME_LEN];
1559 :
1560 1070 : if (maxrow < 0 || maxrow > (LL_CONSTANT(1) << 16)) {
1561 104 : threads = GDKgetenv_int("tablet_threads", GDKnr_threads);
1562 104 : if (threads > 1)
1563 104 : threads = threads < MAXWORKERS ? threads - 1 : MAXWORKERS - 1;
1564 : else
1565 : threads = 1;
1566 : }
1567 :
1568 : /* TRC_DEBUG(MAL_SERVER, "Prepare copy work for '%d' threads col '%s' rec '%s' quot '%c'\n", threads, csep, rsep, quote);*/
1569 :
1570 1070 : memset(ptask, 0, sizeof(ptask));
1571 2140 : task = (READERtask) {
1572 : .cntxt = cntxt,
1573 : .from_stdin = from_stdin,
1574 : .as = as,
1575 : .escape = escape, /* TODO: implement feature!!! */
1576 1070 : .set_qry_ctx = MT_thread_get_qry_ctx() != NULL,
1577 : };
1578 :
1579 : /* create the reject tables */
1580 1070 : create_rejects_table(task.cntxt);
1581 1070 : if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL
1582 1070 : || task.cntxt->error_msg == NULL || task.cntxt->error_input == NULL) {
1583 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1584 : "SQLload initialization failed", "");
1585 : /* nothing allocated yet, so nothing to free */
1586 0 : return BUN_NONE;
1587 : }
1588 :
1589 1070 : assert(rsep);
1590 1070 : assert(csep);
1591 1070 : assert(maxrow < 0 || maxrow <= (lng) BUN_MAX);
1592 1070 : task.fields = (char ***) GDKzalloc(as->nr_attrs * sizeof(char **));
1593 1070 : task.cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
1594 1070 : task.time = (lng *) GDKzalloc(as->nr_attrs * sizeof(lng));
1595 1070 : if (task.fields == NULL || task.cols == NULL || task.time == NULL) {
1596 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1597 : "memory allocation failed", "SQLload_file");
1598 0 : goto bailout;
1599 : }
1600 1070 : task.cur = 0;
1601 3210 : for (i = 0; i < MAXBUFFERS; i++) {
1602 2140 : task.base[i] = GDKmalloc(MAXROWSIZE(2 * b->size) + 2);
1603 2140 : task.rowlimit[i] = MAXROWSIZE(2 * b->size);
1604 2140 : if (task.base[i] == NULL) {
1605 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1606 : SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
1607 0 : goto bailout;
1608 : }
1609 2140 : task.base[i][0] = task.base[i][b->size + 1] = 0;
1610 2140 : task.input[i] = task.base[i] + 1; /* wrap the buffer with null bytes */
1611 : }
1612 1070 : task.besteffort = best;
1613 :
1614 1070 : if (maxrow < 0)
1615 70 : task.maxrow = BUN_MAX;
1616 : else
1617 1000 : task.maxrow = (BUN) maxrow;
1618 :
1619 1070 : task.skip = skip;
1620 1070 : task.quote = quote;
1621 1070 : task.csep = csep;
1622 1070 : task.seplen = strlen(csep);
1623 1070 : task.rsep = rsep;
1624 1070 : task.rseplen = strlen(rsep);
1625 1070 : task.errbuf = cntxt->errbuf;
1626 :
1627 1070 : MT_sema_init(&task.producer, 0, "task.producer");
1628 1070 : MT_sema_init(&task.consumer, 0, "task.consumer");
1629 1070 : task.ateof = false;
1630 1070 : task.b = b;
1631 1070 : task.out = out;
1632 :
1633 1070 : as->error = NULL;
1634 :
1635 : /* there is no point in creating more threads than we have columns */
1636 1070 : if (as->nr_attrs < (BUN) threads)
1637 64 : threads = (int) as->nr_attrs;
1638 :
1639 : /* allocate enough space for pointers into the buffer pool. */
1640 : /* the record separator is considered a column */
1641 1070 : task.limit = (int) (b->size / as->nr_attrs + as->nr_attrs);
1642 11120 : for (i = 0; i < as->nr_attrs; i++) {
1643 10050 : task.fields[i] = GDKmalloc(sizeof(char *) * task.limit);
1644 10050 : if (task.fields[i] == NULL) {
1645 0 : if (task.as->error == NULL)
1646 0 : as->error = createException(MAL, "sql.copy_from",
1647 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
1648 0 : goto bailout;
1649 : }
1650 10050 : task.cols[i] = (int) (i + 1); /* to distinguish non initialized later with zero */
1651 : }
1652 3210 : for (i = 0; i < MAXBUFFERS; i++) {
1653 2140 : task.rows[i] = GDKzalloc(sizeof(char *) * task.limit);
1654 2140 : task.startlineno[i] = GDKzalloc(sizeof(lng) * task.limit);
1655 2140 : if (task.rows[i] == NULL || task.startlineno[i] == NULL) {
1656 0 : GDKfree(task.rows[i]);
1657 0 : GDKfree(task.startlineno[i]);
1658 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1659 : SQLSTATE(HY013) MAL_MALLOC_FAIL,
1660 : "SQLload_file:failed to alloc buffers");
1661 0 : goto bailout;
1662 : }
1663 : }
1664 1070 : task.rowerror = (bte *) GDKzalloc(sizeof(bte) * task.limit);
1665 1070 : if (task.rowerror == NULL) {
1666 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1667 : SQLSTATE(HY013) MAL_MALLOC_FAIL,
1668 : "SQLload_file:failed to alloc rowerror buffer");
1669 0 : goto bailout;
1670 : }
1671 :
1672 1070 : task.id = 0;
1673 1070 : snprintf(name, sizeof(name), "prod-%s", tabnam);
1674 1070 : if (MT_create_thread(&task.tid, SQLproducer, (void *) &task, MT_THR_JOINABLE, name) < 0) {
1675 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1676 : SQLSTATE(42000) "failed to start producer thread",
1677 : "SQLload_file");
1678 0 : goto bailout;
1679 : }
1680 : /* TRC_DEBUG(MAL_SERVER, "Parallel bulk load " LLFMT " - " BUNFMT "\n", skip, task.maxrow);*/
1681 :
1682 1070 : task.workers = threads;
1683 2481 : for (j = 0; j < threads; j++) {
1684 1411 : ptask[j] = task;
1685 1411 : ptask[j].id = j;
1686 1411 : ptask[j].cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
1687 1411 : if (ptask[j].cols == NULL) {
1688 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1689 : SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
1690 0 : task.id = -1;
1691 0 : MT_sema_up(&task.producer);
1692 0 : goto bailout;
1693 : }
1694 1411 : snprintf(name, sizeof(name), "ptask%d.sema", j);
1695 1411 : MT_sema_init(&ptask[j].sema, 0, name);
1696 1411 : snprintf(name, sizeof(name), "ptask%d.repl", j);
1697 1411 : MT_sema_init(&ptask[j].reply, 0, name);
1698 1411 : snprintf(name, sizeof(name), "wrkr%d-%s", j, tabnam);
1699 1411 : if (MT_create_thread(&ptask[j].tid, SQLworker, (void *) &ptask[j], MT_THR_JOINABLE, name) < 0) {
1700 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1701 : SQLSTATE(42000) "failed to start worker thread",
1702 : "SQLload_file");
1703 0 : threads = j;
1704 0 : for (j = 0; j < threads; j++)
1705 0 : ptask[j].workers = threads;
1706 : }
1707 : }
1708 1070 : if (threads == 0) {
1709 : /* no threads started */
1710 0 : task.id = -1;
1711 0 : MT_sema_up(&task.producer);
1712 0 : goto bailout;
1713 : }
1714 1070 : MT_sema_up(&task.producer);
1715 :
1716 1070 : tio = GDKusec();
1717 1070 : tio = GDKusec() - tio;
1718 1070 : t1 = GDKusec();
1719 2143 : for (firstcol = 0; firstcol < task.as->nr_attrs; firstcol++)
1720 1073 : if (task.as->format[firstcol].c != NULL)
1721 : break;
1722 104405 : while (res == 0 && cnt < task.maxrow) {
1723 :
1724 : // track how many elements are in the aggregated BATs
1725 103507 : cntstart = BATcount(task.as->format[firstcol].c);
1726 : /* block until the producer has data available */
1727 103507 : MT_sema_down(&task.consumer);
1728 103507 : cnt += task.top[task.cur];
1729 103507 : if (task.ateof && !task.top[task.cur])
1730 : break;
1731 103357 : t1 = GDKusec() - t1;
1732 : /* TRC_DEBUG(MAL_SERVER, "Break: %d rows\n", task.top[task.cur]);*/
1733 :
1734 103357 : t1 = GDKusec();
1735 103357 : if (task.top[task.cur]) {
1736 : /* activate the workers to break rows */
1737 410451 : for (j = 0; j < threads; j++) {
1738 : /* stage one, break the rows in parallel */
1739 307123 : ptask[j].error = 0;
1740 307123 : ptask[j].state = BREAKROW;
1741 307123 : ptask[j].next = task.top[task.cur];
1742 307123 : ptask[j].fields = task.fields;
1743 307123 : ptask[j].limit = task.limit;
1744 307123 : ptask[j].cnt = task.cnt;
1745 307123 : ptask[j].cur = task.cur;
1746 307123 : ptask[j].top[task.cur] = task.top[task.cur];
1747 307123 : MT_sema_up(&ptask[j].sema);
1748 : }
1749 : }
1750 103357 : if (task.top[task.cur]) {
1751 : /* await completion of row break phase */
1752 410451 : for (j = 0; j < threads; j++) {
1753 307123 : MT_sema_down(&ptask[j].reply);
1754 307123 : if (ptask[j].error) {
1755 0 : res = -1;
1756 : /* TRC_ERROR(MAL_SERVER, "Error in task: %d %d\n", j, ptask[j].error);*/
1757 : }
1758 : }
1759 : }
1760 :
1761 : /* TRC_DEBUG(MAL_SERVER,
1762 : "Fill the BATs '%d' " BUNFMT " cap " BUNFMT "\n",
1763 : task.top[task.cur], task.cnt, BATcapacity(as->format[task.cur].c));*/
1764 :
1765 103357 : if (task.top[task.cur]) {
1766 103328 : if (res == 0) {
1767 103328 : SQLworkdivider(&task, ptask, (int) as->nr_attrs, threads);
1768 :
1769 : /* activate the workers to update the BATs */
1770 513779 : for (j = 0; j < threads; j++) {
1771 : /* stage two, update the BATs */
1772 307123 : ptask[j].state = UPDATEBAT;
1773 307123 : MT_sema_up(&ptask[j].sema);
1774 : }
1775 : }
1776 : }
1777 103357 : tio = GDKusec();
1778 103357 : tio = t1 - tio;
1779 :
1780 : /* await completion of the BAT updates */
1781 103357 : if (res == 0 && task.top[task.cur]) {
1782 410451 : for (j = 0; j < threads; j++) {
1783 307123 : MT_sema_down(&ptask[j].reply);
1784 307123 : if (ptask[j].errorcnt > 0 && !ptask[j].besteffort) {
1785 307123 : res = -1;
1786 307123 : best = 0;
1787 : }
1788 : }
1789 : }
1790 :
1791 : /* trim the BATs discarding error tuples */
1792 : #define trimerrors(TYPE) \
1793 : do { \
1794 : TYPE *src, *dst; \
1795 : leftover= BATcount(task.as->format[attr].c); \
1796 : limit = leftover - cntstart; \
1797 : dst =src= (TYPE *) BUNtloc(task.as->format[attr].ci,cntstart); \
1798 : for(j = 0; j < (int) limit; j++, src++){ \
1799 : if ( task.rowerror[j]){ \
1800 : leftover--; \
1801 : continue; \
1802 : } \
1803 : *dst++ = *src; \
1804 : } \
1805 : BATsetcount(task.as->format[attr].c, leftover ); \
1806 : } while (0)
1807 :
1808 : /* TRC_DEBUG(MAL_SERVER, "Trim bbest '%d' table size " BUNFMT " - rows found so far " BUNFMT "\n",
1809 : best, BATcount(as->format[firstcol].c), task.cnt); */
1810 :
1811 103357 : if (best && BATcount(as->format[firstcol].c)) {
1812 : BUN limit;
1813 : int width;
1814 :
1815 45 : for (attr = 0; attr < as->nr_attrs; attr++) {
1816 31 : if (as->format[attr].skip)
1817 5 : continue;
1818 26 : width = as->format[attr].c->twidth;
1819 26 : as->format[attr].ci = bat_iterator_nolock(as->format[attr].c);
1820 26 : switch (width) {
1821 5 : case 1:
1822 16 : trimerrors(bte);
1823 5 : break;
1824 0 : case 2:
1825 0 : trimerrors(sht);
1826 0 : break;
1827 21 : case 4:
1828 55 : trimerrors(int);
1829 21 : break;
1830 0 : case 8:
1831 0 : trimerrors(lng);
1832 0 : break;
1833 : #ifdef HAVE_HGE
1834 0 : case 16:
1835 0 : trimerrors(hge);
1836 0 : break;
1837 : #endif
1838 0 : default:
1839 : {
1840 0 : char *src, *dst;
1841 0 : leftover = BATcount(task.as->format[attr].c);
1842 0 : limit = leftover - cntstart;
1843 0 : dst = src = BUNtloc(task.as->format[attr].ci, cntstart);
1844 0 : for (j = 0; j < (int) limit; j++, src += width) {
1845 0 : if (task.rowerror[j]) {
1846 0 : leftover--;
1847 0 : continue;
1848 : }
1849 0 : if (dst != src)
1850 0 : memcpy(dst, src, width);
1851 0 : dst += width;
1852 : }
1853 0 : BATsetcount(task.as->format[attr].c, leftover);
1854 : }
1855 0 : break;
1856 : }
1857 : }
1858 : // re-initialize the error vector;
1859 14 : memset(task.rowerror, 0, task.limit);
1860 14 : task.errorcnt = 0;
1861 : }
1862 :
1863 103357 : if (res < 0) {
1864 : /* producer should stop */
1865 22 : task.maxrow = cnt;
1866 22 : task.state = ENDOFCOPY;
1867 22 : task.ateof = true;
1868 : }
1869 103357 : if (task.ateof && task.top[task.cur] < task.limit && cnt != task.maxrow)
1870 : break;
1871 103357 : task.top[task.cur] = 0;
1872 103357 : if (cnt == task.maxrow)
1873 920 : task.ateof = true;
1874 104427 : MT_sema_up(&task.producer);
1875 : }
1876 :
1877 : /* TRC_DEBUG(MAL_SERVER, "End of block stream eof=%d - res=%d\n", task.ateof, res);*/
1878 :
1879 1070 : cnt = BATcount(task.as->format[firstcol].c);
1880 :
1881 1070 : task.state = ENDOFCOPY;
1882 : /* TRC_DEBUG(MAL_SERVER, "Activate sync on disk\n");*/
1883 :
1884 1070 : if (!task.ateof || cnt < task.maxrow) {
1885 : /* TRC_DEBUG(MAL_SERVER, "Shut down reader\n");*/
1886 166 : MT_sema_up(&task.producer);
1887 : }
1888 1070 : MT_join_thread(task.tid);
1889 :
1890 : /* TRC_DEBUG(MAL_SERVER, "Activate endofcopy\n");*/
1891 :
1892 3551 : for (j = 0; j < threads; j++) {
1893 1411 : ptask[j].state = ENDOFCOPY;
1894 1411 : MT_sema_up(&ptask[j].sema);
1895 : }
1896 : /* wait for their death */
1897 2481 : for (j = 0; j < threads; j++)
1898 1411 : MT_sema_down(&ptask[j].reply);
1899 :
1900 : /* TRC_DEBUG(MAL_SERVER, "Kill the workers\n");*/
1901 :
1902 2481 : for (j = 0; j < threads; j++) {
1903 1411 : MT_join_thread(ptask[j].tid);
1904 1411 : GDKfree(ptask[j].cols);
1905 1411 : MT_sema_destroy(&ptask[j].sema);
1906 1411 : MT_sema_destroy(&ptask[j].reply);
1907 : }
1908 :
1909 : /* TRC_DEBUG(MAL_SERVER, "Found " BUNFMT " tuples\n", cnt);*/
1910 : /* TRC_DEBUG(MAL_SERVER, "Leftover input: %.63s\n", task.b->buf + task.b->pos);*/
1911 :
1912 11120 : for (i = 0; i < as->nr_attrs; i++) {
1913 10050 : BAT *b = task.as->format[i].c;
1914 10050 : if (b)
1915 10044 : BATsettrivprop(b);
1916 10050 : GDKfree(task.fields[i]);
1917 : }
1918 1070 : GDKfree(task.fields);
1919 1070 : GDKfree(task.cols);
1920 1070 : GDKfree(task.time);
1921 4280 : for (i = 0; i < MAXBUFFERS; i++) {
1922 2140 : GDKfree(task.base[i]);
1923 2140 : GDKfree(task.rows[i]);
1924 2140 : GDKfree(task.startlineno[i]);
1925 : }
1926 1070 : if (task.rowerror)
1927 1070 : GDKfree(task.rowerror);
1928 1070 : MT_sema_destroy(&task.producer);
1929 1070 : MT_sema_destroy(&task.consumer);
1930 :
1931 1070 : return res < 0 ? BUN_NONE : cnt;
1932 :
1933 0 : bailout:
1934 0 : if (task.fields) {
1935 0 : for (i = 0; i < as->nr_attrs; i++)
1936 0 : GDKfree(task.fields[i]);
1937 0 : GDKfree(task.fields);
1938 : }
1939 0 : GDKfree(task.time);
1940 0 : GDKfree(task.cols);
1941 0 : GDKfree(task.base[task.cur]);
1942 0 : GDKfree(task.rowerror);
1943 0 : for (i = 0; i < MAXWORKERS; i++)
1944 0 : GDKfree(ptask[i].cols);
1945 : return BUN_NONE;
1946 : }
1947 :
1948 : /* return the latest reject table, to be on the safe side we should
1949 : * actually create copies within a critical section. Ignored for now. */
1950 : str
1951 23 : COPYrejects(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1952 : {
1953 23 : bat *row = getArgReference_bat(stk, pci, 0);
1954 23 : bat *fld = getArgReference_bat(stk, pci, 1);
1955 23 : bat *msg = getArgReference_bat(stk, pci, 2);
1956 23 : bat *inp = getArgReference_bat(stk, pci, 3);
1957 :
1958 23 : create_rejects_table(cntxt);
1959 23 : if (cntxt->error_row == NULL)
1960 0 : throw(MAL, "sql.rejects", "No reject table available");
1961 23 : BBPretain(*row = cntxt->error_row->batCacheid);
1962 23 : BBPretain(*fld = cntxt->error_fld->batCacheid);
1963 23 : BBPretain(*msg = cntxt->error_msg->batCacheid);
1964 23 : BBPretain(*inp = cntxt->error_input->batCacheid);
1965 23 : (void) mb;
1966 23 : return MAL_SUCCEED;
1967 : }
1968 :
1969 : str
1970 13 : COPYrejects_clear(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1971 : {
1972 13 : if (cntxt->error_row) {
1973 13 : MT_lock_set(&errorlock);
1974 13 : BATclear(cntxt->error_row, true);
1975 13 : if (cntxt->error_fld)
1976 13 : BATclear(cntxt->error_fld, true);
1977 13 : if (cntxt->error_msg)
1978 13 : BATclear(cntxt->error_msg, true);
1979 13 : if (cntxt->error_input)
1980 13 : BATclear(cntxt->error_input, true);
1981 13 : MT_lock_unset(&errorlock);
1982 : }
1983 13 : (void) mb;
1984 13 : (void) stk;
1985 13 : (void) pci;
1986 13 : return MAL_SUCCEED;
1987 : }
|