Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * Niels Nes, Martin Kersten
15 : *
16 : * Parallel bulk load for SQL
17 : * The COPY INTO command for SQL is heavily CPU bound, which means
18 : * that ideally we would like to exploit the multi-cores to do that
19 : * work in parallel.
20 : * Complicating factors are the initial record offset, the
21 : * possible variable length of the input, and the original sort order
22 : * that should preferable be maintained.
23 : *
24 : * The code below consists of a file reader, which breaks up the
25 : * file into chunks of distinct rows. Then multiple parallel threads
26 : * grab them, and break them on the field boundaries.
27 : * After all fields are identified this way, the columns are converted
28 : * and stored in the BATs.
29 : *
30 : * The threads get a reference to a private copy of the READERtask.
31 : * It includes a list of columns they should handle. This is a basis
32 : * to distributed cheap and expensive columns over threads.
33 : *
34 : * The file reader overlaps IO with updates of the BAT.
35 : * Also the buffer size of the block stream might be a little small for
36 : * this task (1MB). It has been increased to 8MB, which indeed improved.
37 : *
38 : * The work divider allocates subtasks to threads based on the
39 : * observed time spending so far.
40 : */
41 :
42 : #include "monetdb_config.h"
43 : #include "tablet.h"
44 : #include "mapi_prompt.h"
45 : #include "mal_internal.h"
46 :
47 : #include <string.h>
48 : #include <ctype.h>
49 :
50 : #define MAXWORKERS 64
51 : #define MAXBUFFERS 2
52 : /* We restrict the row length to be 32MB for the time being */
53 : #define MAXROWSIZE(X) (X > 32*1024*1024 ? X : 32*1024*1024)
54 :
55 : static MT_Lock errorlock = MT_LOCK_INITIALIZER(errorlock);
56 :
57 : static BAT *
58 10333 : void_bat_create(int adt, BUN nr)
59 : {
60 10333 : BAT *b = COLnew(0, adt, nr, TRANSIENT);
61 :
62 : /* check for correct structures */
63 10333 : if (b == NULL)
64 : return NULL;
65 10333 : if ((b = BATsetaccess(b, BAT_APPEND)) == NULL) {
66 : return NULL;
67 : }
68 :
69 : /* disable all properties here */
70 10333 : b->tsorted = false;
71 10333 : b->trevsorted = false;
72 10333 : b->tnosorted = 0;
73 10333 : b->tnorevsorted = 0;
74 10333 : b->tseqbase = oid_nil;
75 10333 : b->tkey = false;
76 10333 : b->tnokey[0] = 0;
77 10333 : b->tnokey[1] = 0;
78 10333 : return b;
79 : }
80 :
81 : void
82 124394 : TABLETdestroy_format(Tablet *as)
83 : {
84 124394 : BUN p;
85 124394 : Column *fmt = as->format;
86 :
87 594062 : for (p = 0; p < as->nr_attrs; p++) {
88 469670 : BBPreclaim(fmt[p].c);
89 469667 : if (fmt[p].data)
90 10338 : GDKfree(fmt[p].data);
91 : }
92 124392 : GDKfree(fmt);
93 124394 : }
94 :
95 : static oid
96 123275 : check_BATs(Tablet *as)
97 : {
98 123275 : Column *fmt = as->format;
99 123275 : BUN i = 0;
100 123275 : BUN cnt;
101 123275 : oid base;
102 :
103 123275 : if (fmt[i].c == NULL)
104 123275 : i++;
105 123275 : cnt = BATcount(fmt[i].c);
106 123275 : base = fmt[i].c->hseqbase;
107 :
108 123275 : if (as->nr != cnt) {
109 6653 : for (i = 0; i < as->nr_attrs; i++)
110 5959 : if (fmt[i].c)
111 5265 : fmt[i].p = as->offset;
112 694 : return oid_nil;
113 : }
114 :
115 575949 : for (i = 0; i < as->nr_attrs; i++) {
116 453368 : BAT *b = fmt[i].c;
117 :
118 453368 : if (b == NULL)
119 122578 : continue;
120 :
121 330790 : if (BATcount(b) != cnt || b->hseqbase != base)
122 0 : return oid_nil;
123 :
124 330790 : fmt[i].p = as->offset;
125 : }
126 : return base;
127 : }
128 :
129 : str
130 1119 : TABLETcreate_bats(Tablet *as, BUN est)
131 : {
132 1119 : Column *fmt = as->format;
133 1119 : BUN i, nr = 0;
134 :
135 11458 : for (i = 0; i < as->nr_attrs; i++) {
136 10339 : if (fmt[i].skip)
137 6 : continue;
138 10333 : fmt[i].c = void_bat_create(fmt[i].adt, est);
139 10333 : if (!fmt[i].c) {
140 0 : while (i > 0) {
141 0 : if (!fmt[--i].skip) {
142 0 : BBPreclaim(fmt[i].c);
143 0 : fmt[i].c = NULL;
144 : }
145 : }
146 0 : throw(SQL, "copy", "Failed to create bat of size " BUNFMT "\n",
147 : est);
148 : }
149 10333 : fmt[i].ci = bat_iterator_nolock(fmt[i].c);
150 10333 : nr++;
151 : }
152 1119 : if (!nr)
153 0 : throw(SQL, "copy",
154 : "At least one column should be read from the input\n");
155 : return MAL_SUCCEED;
156 : }
157 :
158 : str
159 1094 : TABLETcollect(BAT **bats, Tablet *as)
160 : {
161 1094 : Column *fmt = as->format;
162 1094 : BUN i, j;
163 1094 : BUN cnt = 0;
164 :
165 1094 : if (bats == NULL)
166 0 : throw(SQL, "copy", "Missing container");
167 2592 : for (i = 0; i < as->nr_attrs && !cnt; i++)
168 1498 : if (!fmt[i].skip)
169 1495 : cnt = BATcount(fmt[i].c);
170 11340 : for (i = 0, j = 0; i < as->nr_attrs; i++) {
171 10246 : if (fmt[i].skip)
172 6 : continue;
173 10240 : bats[j] = fmt[i].c;
174 10240 : BBPfix(bats[j]->batCacheid);
175 10240 : if ((fmt[i].c = BATsetaccess(fmt[i].c, BAT_READ)) == NULL)
176 0 : throw(SQL, "copy",
177 : "Failed to set access at tablet part " BUNFMT "\n", cnt);
178 10240 : fmt[i].c->tsorted = fmt[i].c->trevsorted = false;
179 10240 : fmt[i].c->tkey = false;
180 10240 : BATsettrivprop(fmt[i].c);
181 :
182 10240 : if (cnt != BATcount(fmt[i].c))
183 0 : throw(SQL, "copy", "Count " BUNFMT " differs from " BUNFMT "\n",
184 : BATcount(fmt[i].c), cnt);
185 10240 : j++;
186 : }
187 : return MAL_SUCCEED;
188 : }
189 :
190 : // the starting quote character has already been skipped
191 :
192 : static char *
193 4999312 : tablet_skip_string(char *s, char quote, bool escape)
194 : {
195 4999312 : size_t i = 0, j = 0;
196 111424772 : while (s[i]) {
197 111373159 : if (escape && s[i] == '\\' && s[i + 1] != '\0')
198 506221 : s[j++] = s[i++];
199 110866938 : else if (s[i] == quote) {
200 3559197 : if (s[i + 1] != quote)
201 : break;
202 : i++; /* skip the first quote */
203 : }
204 106425460 : s[j++] = s[i++];
205 : }
206 4999312 : assert(s[i] == quote || s[i] == '\0');
207 4999312 : if (s[i] == 0)
208 : return NULL;
209 4999312 : s[j] = 0;
210 4999312 : return s + i;
211 : }
212 :
213 : static int
214 0 : TABLET_error(stream *s)
215 : {
216 0 : const char *err = mnstr_peek_error(s);
217 0 : if (err)
218 0 : TRC_ERROR(MAL_SERVER, "Stream error: %s\n", err);
219 0 : return -1;
220 : }
221 :
222 : /* The output line is first built before being sent. It solves a problem
223 : with UDP, where you may loose most of the information using short writes
224 : */
225 : static inline int
226 0 : output_line(char **buf, size_t *len, char **localbuf, size_t *locallen,
227 : Column *fmt, stream *fd, BUN nr_attrs, oid id)
228 : {
229 0 : BUN i;
230 0 : ssize_t fill = 0;
231 :
232 0 : for (i = 0; i < nr_attrs; i++) {
233 0 : if (fmt[i].c == NULL)
234 0 : continue;
235 0 : if (id < fmt[i].c->hseqbase
236 0 : || id >= fmt[i].c->hseqbase + BATcount(fmt[i].c))
237 : break;
238 0 : fmt[i].p = id - fmt[i].c->hseqbase;
239 : }
240 0 : if (i == nr_attrs) {
241 0 : for (i = 0; i < nr_attrs; i++) {
242 0 : Column *f = fmt + i;
243 0 : const char *p;
244 0 : ssize_t l;
245 :
246 0 : if (f->c) {
247 0 : p = BUNtail(f->ci, f->p);
248 :
249 0 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
250 0 : p = f->nullstr;
251 0 : l = (ssize_t) strlen(f->nullstr);
252 : } else {
253 0 : l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
254 0 : if (l < 0)
255 : return -1;
256 0 : p = *localbuf;
257 : }
258 0 : if (fill + l + f->seplen >= (ssize_t) * len) {
259 : /* extend the buffer */
260 0 : char *nbuf;
261 0 : nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
262 0 : if (nbuf == NULL)
263 : return -1; /* *buf freed by caller */
264 0 : *buf = nbuf;
265 0 : *len = fill + l + f->seplen + BUFSIZ;
266 : }
267 0 : strncpy(*buf + fill, p, l);
268 0 : fill += l;
269 : }
270 0 : strncpy(*buf + fill, f->sep, f->seplen);
271 0 : fill += f->seplen;
272 : }
273 : }
274 0 : if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
275 0 : return TABLET_error(fd);
276 : return 0;
277 : }
278 :
279 : static inline int
280 6208203 : output_line_dense(char **buf, size_t *len, char **localbuf, size_t *locallen,
281 : Column *fmt, stream *fd, BUN nr_attrs)
282 : {
283 6208203 : BUN i;
284 6208203 : ssize_t fill = 0;
285 :
286 27913522 : for (i = 0; i < nr_attrs; i++) {
287 21705318 : Column *f = fmt + i;
288 21705318 : const char *p;
289 21705318 : ssize_t l;
290 :
291 21705318 : if (f->c) {
292 15497115 : p = BUNtail(f->ci, f->p);
293 :
294 15497115 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
295 847612 : p = f->nullstr;
296 847612 : l = (ssize_t) strlen(p);
297 : } else {
298 14649499 : l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
299 14649504 : if (l < 0)
300 : return -1;
301 14649504 : p = *localbuf;
302 : }
303 15497116 : if (fill + l + f->seplen >= (ssize_t) * len) {
304 : /* extend the buffer */
305 78 : char *nbuf;
306 78 : nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
307 78 : if (nbuf == NULL)
308 : return -1; /* *buf freed by caller */
309 78 : *buf = nbuf;
310 78 : *len = fill + l + f->seplen + BUFSIZ;
311 : }
312 15497116 : strncpy(*buf + fill, p, l);
313 15497116 : fill += l;
314 15497116 : f->p++;
315 : }
316 21705319 : strncpy(*buf + fill, f->sep, f->seplen);
317 21705319 : fill += f->seplen;
318 : }
319 6208204 : if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
320 0 : return TABLET_error(fd);
321 : return 0;
322 : }
323 :
324 : static inline int
325 0 : output_line_lookup(char **buf, size_t *len, Column *fmt, stream *fd,
326 : BUN nr_attrs, oid id)
327 : {
328 0 : BUN i;
329 :
330 0 : for (i = 0; i < nr_attrs; i++) {
331 0 : Column *f = fmt + i;
332 :
333 0 : if (f->c) {
334 0 : const void *p = BUNtail(f->ci, id - f->c->hseqbase);
335 :
336 0 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
337 0 : size_t l = strlen(f->nullstr);
338 0 : if (mnstr_write(fd, f->nullstr, 1, l) != (ssize_t) l)
339 0 : return TABLET_error(fd);
340 : } else {
341 0 : ssize_t l = f->tostr(f->extra, buf, len, f->adt, p);
342 :
343 0 : if (l < 0 || mnstr_write(fd, *buf, 1, l) != l)
344 0 : return TABLET_error(fd);
345 : }
346 : }
347 0 : if (mnstr_write(fd, f->sep, 1, f->seplen) != f->seplen)
348 0 : return TABLET_error(fd);
349 : }
350 : return 0;
351 : }
352 :
353 : /*
354 : * Fast Load
355 : * To speedup the CPU intensive loading of files we have to break
356 : * the file into pieces and perform parallel analysis. Experimentation
357 : * against lineitem SF1 showed that half of the time goes into very
358 : * basis atom analysis (41 out of 102 B instructions).
359 : * Furthermore, the actual insertion into the BATs takes only
360 : * about 10% of the total. With multi-core processors around
361 : * it seems we can gain here significantly.
362 : *
363 : * The approach taken is to fork a parallel scan over the text file.
364 : * We assume that the blocked stream is already
365 : * positioned correctly at the reading position. The start and limit
366 : * indicates the byte range to search for tuples.
367 : * If start> 0 then we first skip to the next record separator.
368 : * If necessary we read more than 'limit' bytes to ensure parsing a complete
369 : * record and stop at the record boundary.
370 : * Beware, we should allocate Tablet descriptors for each file segment,
371 : * otherwise we end up with a gross concurrency control problem.
372 : * The resulting BATs should be glued at the final phase.
373 : *
374 : * Raw Load
375 : * Front-ends can bypass most of the overhead in loading the BATs
376 : * by preparing the corresponding files directly and replace those
377 : * created by e.g. the SQL frontend.
378 : * This strategy is only advisable for cases where we have very
379 : * large files >200GB and/or are created by a well debugged code.
380 : *
381 : * To experiment with this approach, the code base responds
382 : * on negative number of cores by dumping the data directly in BAT
383 : * storage format into a collections of files on disk.
384 : * It reports on the actions to be taken to replace BATs.
385 : * This technique is initially only supported for fixed-sized columns.
386 : * The rawmode() indicator acts as the internal switch.
387 : */
388 :
389 : /*
390 : * To speed up loading ascii files we have to determine the number of blocks.
391 : * This depends on the number of cores available.
392 : * For the time being we hardwire this decision based on our own
393 : * platforms.
394 : * Furthermore, we only consider parallel load for file-based requests.
395 : *
396 : * To simplify our world, we assume a single producer process.
397 : */
398 :
399 : static int
400 0 : output_file_default(Tablet *as, BAT *order, stream *fd, bstream *in)
401 : {
402 0 : size_t len = BUFSIZ, locallen = BUFSIZ;
403 0 : int res = 0;
404 0 : char *buf = GDKmalloc(len);
405 0 : char *localbuf = GDKmalloc(len);
406 0 : BUN p, q;
407 0 : oid id;
408 0 : BUN offset = as->offset;
409 :
410 0 : if (buf == NULL || localbuf == NULL) {
411 0 : GDKfree(buf);
412 0 : GDKfree(localbuf);
413 0 : return -1;
414 : }
415 0 : for (q = offset + as->nr, p = offset, id = order->hseqbase + offset; p < q;
416 0 : p++, id++) {
417 0 : if (((p - offset) & 8191) == 8191 && bstream_getoob(in)) {
418 : res = -5;
419 : break;
420 : }
421 0 : if ((res = output_line(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs, id)) < 0) {
422 : break;
423 : }
424 : }
425 0 : GDKfree(localbuf);
426 0 : GDKfree(buf);
427 0 : return res;
428 : }
429 :
430 : static int
431 123275 : output_file_dense(Tablet *as, stream *fd, bstream *in)
432 : {
433 123275 : size_t len = BUFSIZ, locallen = BUFSIZ;
434 123275 : int res = 0;
435 123275 : char *buf = GDKmalloc(len);
436 123272 : char *localbuf = GDKmalloc(len);
437 123274 : BUN i = 0;
438 :
439 123274 : if (buf == NULL || localbuf == NULL) {
440 0 : GDKfree(buf);
441 0 : GDKfree(localbuf);
442 0 : return -1;
443 : }
444 6331478 : for (i = 0; i < as->nr; i++) {
445 6208203 : if ((i & 8191) == 8191 && bstream_getoob(in)) {
446 : res = -5; /* "Query aborted" */
447 : break;
448 : }
449 6208203 : if ((res = output_line_dense(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs)) < 0) {
450 : break;
451 : }
452 : }
453 123275 : GDKfree(localbuf);
454 123275 : GDKfree(buf);
455 123275 : return res;
456 : }
457 :
458 : static int
459 0 : output_file_ordered(Tablet *as, BAT *order, stream *fd, bstream *in)
460 : {
461 0 : size_t len = BUFSIZ;
462 0 : int res = 0;
463 0 : char *buf = GDKmalloc(len);
464 0 : BUN p, q;
465 0 : BUN i = 0;
466 0 : BUN offset = as->offset;
467 :
468 0 : if (buf == NULL)
469 : return -1;
470 0 : for (q = offset + as->nr, p = offset; p < q; p++, i++) {
471 0 : oid h = order->hseqbase + p;
472 :
473 0 : if (((p - offset) & 8191) == 8191 && bstream_getoob(in)) {
474 : res = -5;
475 : break;
476 : }
477 0 : if ((res = output_line_lookup(&buf, &len, as->format, fd, as->nr_attrs, h)) < 0) {
478 0 : GDKfree(buf);
479 0 : return res;
480 : }
481 : }
482 0 : GDKfree(buf);
483 0 : return res;
484 : }
485 :
486 : int
487 123275 : TABLEToutput_file(Tablet *as, BAT *order, stream *s, bstream *in)
488 : {
489 123275 : oid base = oid_nil;
490 123275 : int ret = 0;
491 :
492 : /* only set nr if it is zero or lower (bogus) to the maximum value
493 : * possible (BATcount), if already set within BATcount range,
494 : * preserve value such that for instance SQL's reply_size still
495 : * works
496 : */
497 123275 : if (order) {
498 0 : BUN maxnr = BATcount(order);
499 0 : if (as->nr == BUN_NONE || as->nr > maxnr)
500 0 : as->nr = maxnr;
501 : }
502 123275 : assert(as->nr != BUN_NONE);
503 :
504 123275 : base = check_BATs(as);
505 123275 : if (!order || !is_oid_nil(base)) {
506 123275 : if (!order || order->hseqbase == base)
507 123275 : ret = output_file_dense(as, s, in);
508 : else
509 0 : ret = output_file_ordered(as, order, s, in);
510 : } else {
511 0 : ret = output_file_default(as, order, s, in);
512 : }
513 123275 : return ret;
514 : }
515 :
516 : /*
517 : * Niels Nes, Martin Kersten
518 : *
519 : * Parallel bulk load for SQL
520 : * The COPY INTO command for SQL is heavily CPU bound, which means
521 : * that ideally we would like to exploit the multi-cores to do that
522 : * work in parallel.
523 : * Complicating factors are the initial record offset, the
524 : * possible variable length of the input, and the original sort order
525 : * that should preferable be maintained.
526 : *
527 : * The code below consists of a file reader, which breaks up the
528 : * file into chunks of distinct rows. Then multiple parallel threads
529 : * grab them, and break them on the field boundaries.
530 : * After all fields are identified this way, the columns are converted
531 : * and stored in the BATs.
532 : *
533 : * The threads get a reference to a private copy of the READERtask.
534 : * It includes a list of columns they should handle. This is a basis
535 : * to distributed cheap and expensive columns over threads.
536 : *
537 : * The file reader overlaps IO with updates of the BAT.
538 : * Also the buffer size of the block stream might be a little small for
539 : * this task (1MB). It has been increased to 8MB, which indeed improved.
540 : *
541 : * The work divider allocates subtasks to threads based on the
542 : * observed time spending so far.
543 : */
544 :
545 : #define BREAKROW 1
546 : #define UPDATEBAT 2
547 : #define ENDOFCOPY 3
548 :
549 : typedef struct {
550 : Client cntxt;
551 : int id; /* for self reference */
552 : int state; /* row break=1 , 2 = update bat */
553 : int workers; /* how many concurrent ones */
554 : int error; /* error during row break */
555 : int next;
556 : int limit;
557 : BUN cnt, maxrow; /* first row in file chunk. */
558 : lng skip; /* number of lines to be skipped */
559 : lng *time, wtime; /* time per col + time per thread */
560 : int rounds; /* how often did we divide the work */
561 : bool ateof; /* io control */
562 : bool from_stdin;
563 : bool escape; /* whether to handle \ escapes */
564 : bool besteffort;
565 : char quote;
566 : bstream *b;
567 : stream *out;
568 : MT_Id tid;
569 : MT_Sema producer; /* reader waits for call */
570 : MT_Sema consumer; /* reader waits for call */
571 : MT_Sema sema; /* threads wait for work , negative next implies exit */
572 : MT_Sema reply; /* let reader continue */
573 : Tablet *as;
574 : char *errbuf;
575 : const char *csep, *rsep;
576 : size_t seplen, rseplen;
577 :
578 : char *base[MAXBUFFERS], *input[MAXBUFFERS]; /* buffers for row splitter and tokenizer */
579 : size_t rowlimit[MAXBUFFERS]; /* determines maximal record length buffer */
580 : char **rows[MAXBUFFERS];
581 : lng *startlineno[MAXBUFFERS];
582 : int top[MAXBUFFERS]; /* number of rows in this buffer */
583 : int cur; /* current buffer used by splitter and update threads */
584 :
585 : int *cols; /* columns to handle */
586 : char ***fields;
587 : bte *rowerror;
588 : int errorcnt;
589 : bool aborted;
590 : bool set_qry_ctx;
591 : } READERtask;
592 :
593 : /* returns TRUE if there is/might be more */
594 : static bool
595 102786 : tablet_read_more(READERtask *task)
596 : {
597 102786 : bstream *in = task->b;
598 102786 : stream *out = task->out;
599 102786 : size_t n = task->b->size;
600 102786 : if (out) {
601 101175 : do {
602 : /* query is not finished ask for more */
603 : /* we need more query text */
604 101175 : if (bstream_next(in) < 0) {
605 0 : if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) {
606 0 : task->aborted = true;
607 0 : mnstr_clearerr(in->s);
608 : }
609 0 : return false;
610 : }
611 101175 : if (in->eof) {
612 101173 : if (bstream_getoob(in)) {
613 0 : task->aborted = true;
614 0 : return false;
615 : }
616 101173 : if (mnstr_write(out, PROMPT2, sizeof(PROMPT2) - 1, 1) == 1)
617 101173 : mnstr_flush(out, MNSTR_FLUSH_DATA);
618 101173 : in->eof = false;
619 : /* we need more query text */
620 101173 : if (bstream_next(in) < 0) {
621 0 : if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) {
622 0 : task->aborted = true;
623 0 : mnstr_clearerr(in->s);
624 : }
625 0 : return false;
626 : }
627 101173 : if (in->eof)
628 : return false;
629 : }
630 101173 : } while (in->len <= in->pos);
631 1611 : } else if (bstream_read(in, n) <= 0) {
632 : return false;
633 : }
634 : return true;
635 : }
636 :
637 : /* note, the column value that is passed here is the 0 based value; the
638 : * lineno value on the other hand is 1 based */
639 : static void
640 34 : tablet_error(READERtask *task, lng idx, lng lineno, int col, const char *msg,
641 : const char *fcn)
642 : {
643 34 : assert(is_int_nil(col) || col >= 0);
644 34 : assert(is_lng_nil(lineno) || lineno >= 1);
645 34 : MT_lock_set(&errorlock);
646 34 : if (task->cntxt->error_row != NULL
647 34 : && (BUNappend(task->cntxt->error_row, &lineno, false) != GDK_SUCCEED
648 34 : || BUNappend(task->cntxt->error_fld, &(int) { col + 1 },
649 : false) != GDK_SUCCEED
650 34 : || BUNappend(task->cntxt->error_msg, msg, false) != GDK_SUCCEED
651 34 : || BUNappend(task->cntxt->error_input, fcn,
652 : false) != GDK_SUCCEED)) {
653 0 : task->besteffort = false;
654 : }
655 34 : if (!is_lng_nil(idx) && task->rowerror && idx < task->limit)
656 34 : task->rowerror[idx]++;
657 34 : if (task->as->error == NULL) {
658 60 : const char *colnam = is_int_nil(col) || col < 0
659 30 : || (BUN) col >= task->as->nr_attrs ? NULL : task->as->format[col].name;
660 30 : if (msg == NULL) {
661 0 : task->besteffort = false;
662 30 : } else if (!is_lng_nil(lineno)) {
663 30 : if (!is_int_nil(col)) {
664 28 : if (colnam)
665 28 : task->as->error = createException(MAL, "sql.copy_from",
666 : "line " LLFMT ": column %d %s: %s",
667 : lineno, col + 1, colnam, msg);
668 : else
669 0 : task->as->error = createException(MAL, "sql.copy_from",
670 : "line " LLFMT ": column %d: %s",
671 : lineno, col + 1, msg);
672 : } else {
673 2 : task->as->error = createException(MAL, "sql.copy_from",
674 : "line " LLFMT ": %s", lineno, msg);
675 : }
676 : } else {
677 0 : if (!is_int_nil(col)) {
678 0 : if (colnam)
679 0 : task->as->error = createException(MAL, "sql.copy_from",
680 : "column %d %s: %s", col + 1, colnam,
681 : msg);
682 : else
683 0 : task->as->error = createException(MAL, "sql.copy_from",
684 : "column %d: %s", col + 1, msg);
685 : } else {
686 0 : task->as->error = createException(MAL, "sql.copy_from", "%s", msg);
687 : }
688 : }
689 : }
690 34 : task->errorcnt++;
691 34 : MT_lock_unset(&errorlock);
692 34 : }
693 :
694 : /*
695 : * The row is broken into pieces directly on their field separators. It assumes that we have
696 : * the record in the cache already, so we can do most work quickly.
697 : * Furthermore, it assume a uniform (SQL) pattern, without whitespace skipping, but with quote and separator.
698 : */
699 :
700 : static size_t
701 115 : mystrlen(const char *s)
702 : {
703 : /* Calculate and return the space that is needed for the function
704 : * mycpstr below to do its work. */
705 115 : size_t len = 0;
706 115 : const char *s0 = s;
707 :
708 30878 : while (*s) {
709 30763 : if ((*s & 0x80) == 0) {
710 : ;
711 6 : } else if ((*s & 0xC0) == 0x80) {
712 : /* continuation byte */
713 0 : len += 3;
714 6 : } else if ((*s & 0xE0) == 0xC0) {
715 : /* two-byte sequence */
716 6 : if ((s[1] & 0xC0) != 0x80)
717 0 : len += 3;
718 : else
719 6 : s += 2;
720 0 : } else if ((*s & 0xF0) == 0xE0) {
721 : /* three-byte sequence */
722 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
723 0 : len += 3;
724 : else
725 0 : s += 3;
726 0 : } else if ((*s & 0xF8) == 0xF0) {
727 : /* four-byte sequence */
728 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
729 0 : || (s[3] & 0xC0) != 0x80)
730 0 : len += 3;
731 : else
732 0 : s += 4;
733 : } else {
734 : /* not a valid start byte */
735 0 : len += 3;
736 : }
737 30763 : s++;
738 : }
739 115 : len += s - s0;
740 115 : return len;
741 : }
742 :
743 : static char *
744 181 : mycpstr(char *t, const char *s)
745 : {
746 : /* Copy the string pointed to by s into the buffer pointed to by
747 : * t, and return a pointer to the NULL byte at the end. During
748 : * the copy we translate incorrect UTF-8 sequences to escapes
749 : * looking like <XX> where XX is the hexadecimal representation of
750 : * the incorrect byte. The buffer t needs to be large enough to
751 : * hold the result, but the correct length can be calculated by
752 : * the function mystrlen above.*/
753 31016 : while (*s) {
754 30835 : if ((*s & 0x80) == 0) {
755 30829 : *t++ = *s++;
756 6 : } else if ((*s & 0xC0) == 0x80) {
757 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
758 6 : } else if ((*s & 0xE0) == 0xC0) {
759 : /* two-byte sequence */
760 6 : if ((s[1] & 0xC0) != 0x80)
761 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
762 : else {
763 6 : *t++ = *s++;
764 6 : *t++ = *s++;
765 : }
766 0 : } else if ((*s & 0xF0) == 0xE0) {
767 : /* three-byte sequence */
768 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
769 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
770 : else {
771 0 : *t++ = *s++;
772 0 : *t++ = *s++;
773 0 : *t++ = *s++;
774 : }
775 0 : } else if ((*s & 0xF8) == 0xF0) {
776 : /* four-byte sequence */
777 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
778 0 : || (s[3] & 0xC0) != 0x80)
779 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
780 : else {
781 0 : *t++ = *s++;
782 0 : *t++ = *s++;
783 0 : *t++ = *s++;
784 0 : *t++ = *s++;
785 : }
786 : } else {
787 : /* not a valid start byte */
788 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
789 : }
790 : }
791 181 : *t = 0;
792 181 : return t;
793 : }
794 :
795 : static str
796 32 : SQLload_error(READERtask *task, lng idx, BUN attrs)
797 : {
798 32 : str line;
799 32 : char *s;
800 32 : size_t sz = 0;
801 32 : BUN i;
802 :
803 130 : for (i = 0; i < attrs; i++) {
804 98 : if (task->fields[i][idx])
805 88 : sz += mystrlen(task->fields[i][idx]);
806 98 : sz += task->seplen;
807 : }
808 :
809 32 : s = line = GDKmalloc(sz + task->rseplen + 1);
810 32 : if (line == NULL) {
811 0 : tablet_error(task, idx, lng_nil, int_nil, "SQLload malloc error",
812 : "SQLload_error");
813 0 : return NULL;
814 : }
815 130 : for (i = 0; i < attrs; i++) {
816 98 : if (task->fields[i][idx])
817 88 : s = mycpstr(s, task->fields[i][idx]);
818 98 : if (i < attrs - 1)
819 66 : s = mycpstr(s, task->csep);
820 : }
821 32 : strcpy(s, task->rsep);
822 32 : return line;
823 : }
824 :
825 : /*
826 : * The parsing of the individual values is straightforward. If the value represents
827 : * the null-replacement string then we grab the underlying nil.
828 : * If the string starts with the quote identified from SQL, we locate the tail
829 : * and interpret the body.
830 : *
831 : * If inserting fails, we return -1; if the value cannot be parsed, we
832 : * return -1 if besteffort is not set, otherwise we return 0, but in
833 : * either case an entry is added to the error table.
834 : */
835 : static inline int
836 323388421 : SQLinsert_val(READERtask *task, int col, int idx)
837 : {
838 323388421 : Column *fmt = task->as->format + col;
839 323388421 : const void *adt;
840 323388421 : char buf[BUFSIZ];
841 323388421 : char *s = task->fields[col][idx];
842 323388421 : char *err = NULL;
843 323388421 : int ret = 0;
844 :
845 : /* include testing on the terminating null byte !! */
846 323388421 : if (s == NULL) {
847 6530414 : adt = fmt->nildata;
848 6530414 : fmt->c->tnonil = false;
849 : } else {
850 316858007 : if (task->escape) {
851 316657867 : size_t slen = strlen(s) + 1;
852 316657867 : char *data = slen <= sizeof(buf) ? buf : GDKmalloc(strlen(s) + 1);
853 18 : if (data == NULL
854 336414675 : || GDKstrFromStr((unsigned char *) data, (unsigned char *) s,
855 316657867 : strlen(s), '\0') < 0)
856 : adt = NULL;
857 : else
858 336414673 : adt = fmt->frstr(fmt, fmt->adt, data);
859 339198981 : if (data != buf)
860 18 : GDKfree(data);
861 : } else
862 200140 : adt = fmt->frstr(fmt, fmt->adt, s);
863 : }
864 :
865 345929534 : lng row = BATcount(fmt->c) + 1;
866 345929534 : if (adt == NULL) {
867 27 : if (task->rowerror) {
868 27 : err = SQLload_error(task, idx, task->as->nr_attrs);
869 27 : if (s) {
870 27 : size_t slen = mystrlen(s);
871 27 : char *scpy = GDKmalloc(slen + 1);
872 27 : if (scpy == NULL) {
873 0 : tablet_error(task, idx, row, col,
874 : SQLSTATE(HY013) MAL_MALLOC_FAIL, err);
875 0 : task->besteffort = false; /* no longer best effort */
876 0 : GDKfree(err);
877 0 : return -1;
878 : }
879 27 : mycpstr(scpy, s);
880 27 : s = scpy;
881 : }
882 27 : snprintf(buf, sizeof(buf), "'%s' expected%s%s%s", fmt->type,
883 : s ? " in '" : "", s ? s : "", s ? "'" : "");
884 27 : GDKfree(s);
885 27 : tablet_error(task, idx, row, col, buf, err);
886 27 : GDKfree(err);
887 27 : if (!task->besteffort)
888 : return -1;
889 : }
890 6 : ret = -!task->besteffort; /* yep, two unary operators ;-) */
891 : /* replace it with a nil */
892 6 : adt = fmt->nildata;
893 6 : fmt->c->tnonil = false;
894 : }
895 345929513 : if (bunfastapp(fmt->c, adt) == GDK_SUCCEED)
896 : return ret;
897 :
898 : /* failure */
899 0 : if (task->rowerror) {
900 0 : char *msg = GDKerrbuf;
901 0 : err = SQLload_error(task, idx, task->as->nr_attrs);
902 0 : tablet_error(task, idx, row, col, msg
903 0 : && *msg ? msg : "insert failed", err);
904 0 : GDKfree(err);
905 : }
906 0 : task->besteffort = false; /* no longer best effort */
907 0 : return -1;
908 : }
909 :
910 : static int
911 327583 : SQLworker_column(READERtask *task, int col)
912 : {
913 327583 : int i;
914 327583 : Column *fmt = task->as->format;
915 :
916 327583 : if (fmt[col].c == NULL)
917 : return 0;
918 :
919 : /* watch out for concurrent threads */
920 327577 : MT_lock_set(&mal_copyLock);
921 327581 : if (!fmt[col].skip
922 327581 : && BATcapacity(fmt[col].c) < BATcount(fmt[col].c) + task->next) {
923 286 : if (BATextend(fmt[col].c, BATgrows(fmt[col].c) + task->limit) != GDK_SUCCEED) {
924 0 : tablet_error(task, lng_nil, lng_nil, col,
925 : "Failed to extend the BAT\n", "SQLworker_column");
926 0 : MT_lock_unset(&mal_copyLock);
927 0 : return -1;
928 : }
929 : }
930 327581 : MT_lock_unset(&mal_copyLock);
931 :
932 327170912 : for (i = 0; i < task->top[task->cur]; i++) {
933 326515932 : if (!fmt[col].skip && SQLinsert_val(task, col, i) < 0) {
934 21 : BATsetcount(fmt[col].c, BATcount(fmt[col].c));
935 21 : return -1;
936 : }
937 : }
938 327399 : BATsetcount(fmt[col].c, BATcount(fmt[col].c));
939 :
940 327399 : return 0;
941 : }
942 :
943 : /*
944 : * The rows are broken on the column separator. Any error is shown and reflected with
945 : * setting the reference of the offending row fields to NULL.
946 : * This allows the loading to continue, skipping the minimal number of rows.
947 : * The details about the locations can be inspected from the error table.
948 : * We also trim the quotes around strings.
949 : */
950 : static int
951 139908870 : SQLload_parse_row(READERtask *task, int idx)
952 : {
953 139908870 : BUN i;
954 139908870 : char errmsg[BUFSIZ];
955 139908870 : char ch = *task->csep;
956 139908870 : char *row = task->rows[task->cur][idx];
957 139908870 : lng startlineno = task->startlineno[task->cur][idx];
958 139908870 : Tablet *as = task->as;
959 139908870 : Column *fmt = as->format;
960 139908870 : bool error = false;
961 139908870 : str errline = NULL;
962 :
963 139908870 : assert(idx < task->top[task->cur]);
964 139908870 : assert(row);
965 139908870 : errmsg[0] = 0;
966 :
967 139908870 : if (task->quote || task->seplen != 1) {
968 13457856 : for (i = 0; i < as->nr_attrs; i++) {
969 10580317 : bool quote = false;
970 10580317 : task->fields[i][idx] = row;
971 : /* recognize fields starting with a quote, keep them */
972 10580317 : if (*row && *row == task->quote) {
973 4529415 : quote = true;
974 4529415 : task->fields[i][idx] = row + 1;
975 4529415 : row = tablet_skip_string(row + 1, task->quote, task->escape);
976 :
977 5060898 : if (!row) {
978 0 : errline = SQLload_error(task, idx, i + 1);
979 0 : snprintf(errmsg, BUFSIZ, "Quote (%c) missing", task->quote);
980 0 : tablet_error(task, idx, startlineno, (int) i, errmsg,
981 : errline);
982 0 : GDKfree(errline);
983 0 : error = true;
984 0 : goto errors1;
985 : } else
986 5060898 : *row++ = 0;
987 : }
988 :
989 : /* eat away the column separator */
990 56591469 : for (; *row; row++)
991 55239441 : if (*row == '\\' && task->escape) {
992 2 : if (row[1])
993 2 : row++;
994 55239439 : } else if (*row == ch
995 9759772 : && (task->seplen == 1
996 4 : || strncmp(row, task->csep,
997 : task->seplen) == 0)) {
998 9759772 : *row = 0;
999 9759772 : row += task->seplen;
1000 9759772 : goto endoffieldcheck;
1001 : }
1002 :
1003 : /* not enough fields */
1004 1352028 : if (i < as->nr_attrs - 1) {
1005 0 : errline = SQLload_error(task, idx, i + 1);
1006 : /* it's the next value that is missing */
1007 0 : tablet_error(task, idx, startlineno, (int) i + 1,
1008 : "Column value missing", errline);
1009 0 : GDKfree(errline);
1010 0 : error = true;
1011 0 : errors1:
1012 : /* we save all errors detected as NULL values */
1013 0 : for (; i < as->nr_attrs; i++)
1014 0 : task->fields[i][idx] = NULL;
1015 0 : i--;
1016 : }
1017 1352028 : endoffieldcheck:
1018 11111800 : ;
1019 : /* check for user defined NULL string */
1020 11111800 : if ((!quote || !fmt->null_length) && fmt->nullstr
1021 9487872 : && task->fields[i][idx]
1022 9487872 : && GDKstrcasecmp(task->fields[i][idx], fmt->nullstr) == 0)
1023 2200972 : task->fields[i][idx] = 0;
1024 : }
1025 : } else {
1026 : assert(!task->quote);
1027 : assert(task->seplen == 1);
1028 369908797 : for (i = 0; i < as->nr_attrs; i++) {
1029 234368119 : task->fields[i][idx] = row;
1030 :
1031 : /* eat away the column separator */
1032 2080763560 : for (; *row; row++)
1033 1944782990 : if (*row == '\\' && task->escape) {
1034 382 : if (row[1])
1035 382 : row++;
1036 1944782608 : } else if (*row == ch) {
1037 98387549 : *row = 0;
1038 98387549 : row++;
1039 98387549 : goto endoffield2;
1040 : }
1041 :
1042 : /* not enough fields */
1043 135980570 : if (i < as->nr_attrs - 1) {
1044 4 : errline = SQLload_error(task, idx, i + 1);
1045 : /* it's the next value that is missing */
1046 4 : tablet_error(task, idx, startlineno, (int) i + 1,
1047 : "Column value missing", errline);
1048 4 : GDKfree(errline);
1049 4 : error = true;
1050 : /* we save all errors detected */
1051 16 : for (; i < as->nr_attrs; i++)
1052 8 : task->fields[i][idx] = NULL;
1053 4 : i--;
1054 : }
1055 135980566 : endoffield2:
1056 234368119 : ;
1057 : /* check for user defined NULL string */
1058 234368119 : if (fmt->nullstr && task->fields[i][idx]
1059 218880346 : && GDKstrcasecmp(task->fields[i][idx], fmt->nullstr) == 0) {
1060 3876601 : task->fields[i][idx] = 0;
1061 : }
1062 : }
1063 : }
1064 : /* check for too many values as well */
1065 138418217 : if (row && *row && i == as->nr_attrs) {
1066 1 : errline = SQLload_error(task, idx, task->as->nr_attrs);
1067 1 : snprintf(errmsg, BUFSIZ, "Leftover data '%s'", row);
1068 1 : tablet_error(task, idx, startlineno, (int) i, errmsg, errline);
1069 1 : GDKfree(errline);
1070 1 : error = true;
1071 : }
1072 138418217 : return error ? -1 : 0;
1073 : }
1074 :
1075 : static void
1076 1290 : SQLworker(void *arg)
1077 : {
1078 1290 : READERtask *task = (READERtask *) arg;
1079 1290 : unsigned int i;
1080 1290 : int j, piece;
1081 1290 : lng t0;
1082 :
1083 1290 : GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
1084 1290 : GDKclrerr();
1085 1290 : task->errbuf = GDKerrbuf;
1086 1290 : MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
1087 :
1088 1290 : MT_sema_down(&task->sema);
1089 611152 : while (task->top[task->cur] >= 0) {
1090 : /* stage one, break the rows spread the work over the workers */
1091 611152 : switch (task->state) {
1092 304924 : case BREAKROW:
1093 304924 : t0 = GDKusec();
1094 304924 : piece = (task->top[task->cur] + task->workers) / task->workers;
1095 :
1096 304924 : for (j = piece * task->id;
1097 138839019 : j < task->top[task->cur] && j < piece * (task->id + 1); j++)
1098 138534491 : if (task->rows[task->cur][j]) {
1099 138534491 : if (SQLload_parse_row(task, j) < 0) {
1100 5 : task->errorcnt++;
1101 : // early break unless best effort
1102 5 : if (!task->besteffort) {
1103 2 : for (j++;
1104 4 : j < task->top[task->cur]
1105 4 : && j < piece * (task->id + 1); j++)
1106 8 : for (i = 0; i < task->as->nr_attrs; i++)
1107 6 : task->fields[i][j] = NULL;
1108 : break;
1109 : }
1110 : }
1111 : }
1112 304530 : task->wtime = GDKusec() - t0;
1113 304570 : break;
1114 304938 : case UPDATEBAT:
1115 304938 : if (!task->besteffort && task->errorcnt)
1116 : break;
1117 : /* stage two, updating the BATs */
1118 1260959 : for (i = 0; i < task->as->nr_attrs; i++)
1119 956195 : if (task->cols[i]) {
1120 327616 : t0 = GDKusec();
1121 327581 : if (SQLworker_column(task, task->cols[i] - 1) < 0)
1122 : break;
1123 327452 : t0 = GDKusec() - t0;
1124 327445 : task->time[i] += t0;
1125 327445 : task->wtime += t0;
1126 : }
1127 : break;
1128 1290 : case ENDOFCOPY:
1129 1290 : MT_sema_up(&task->reply);
1130 1290 : goto do_return;
1131 : }
1132 609358 : MT_sema_up(&task->reply);
1133 1214251 : MT_sema_down(&task->sema);
1134 : }
1135 0 : MT_sema_up(&task->reply);
1136 :
1137 1290 : do_return:
1138 1290 : GDKfree(GDKerrbuf);
1139 1290 : GDKsetbuf(NULL);
1140 1290 : MT_thread_set_qry_ctx(NULL);
1141 1290 : }
1142 :
1143 : static void
1144 103380 : SQLworkdivider(READERtask *task, READERtask *ptask, int nr_attrs, int threads)
1145 : {
1146 103380 : int i, j, mi;
1147 103380 : lng loc[MAXWORKERS];
1148 :
1149 : /* after a few rounds we stick to the work assignment */
1150 103380 : if (task->rounds > 8)
1151 103248 : return;
1152 : /* simple round robin the first time */
1153 2773 : if (threads == 1 || task->rounds++ == 0) {
1154 16720 : for (i = j = 0; i < nr_attrs; i++, j++)
1155 14079 : ptask[j % threads].cols[i] = task->cols[i];
1156 : return;
1157 : }
1158 132 : memset(loc, 0, sizeof(loc));
1159 : /* use of load directives */
1160 1930 : for (i = 0; i < nr_attrs; i++)
1161 7132 : for (j = 0; j < threads; j++)
1162 5334 : ptask[j].cols[i] = 0;
1163 :
1164 : /* now allocate the work to the threads */
1165 1930 : for (i = 0; i < nr_attrs; i++, j++) {
1166 : mi = 0;
1167 5334 : for (j = 1; j < threads; j++)
1168 3536 : if (loc[j] < loc[mi])
1169 1502 : mi = j;
1170 :
1171 1798 : ptask[mi].cols[i] = task->cols[i];
1172 1798 : loc[mi] += task->time[i];
1173 : }
1174 : /* reset the timer */
1175 1930 : for (i = 0; i < nr_attrs; i++, j++)
1176 1798 : task->time[i] = 0;
1177 : }
1178 :
1179 : /*
1180 : * Reading is handled by a separate task as a preparation for more parallelism.
1181 : * A buffer is filled with proper rows.
1182 : * If we are reading from a file then a double buffering scheme ia activated.
1183 : * Reading from the console (stdin) remains single buffered only.
1184 : * If we end up with unfinished records, then the rowlimit will terminate the process.
1185 : */
1186 :
1187 : typedef unsigned char (*dfa_t)[256];
1188 :
1189 : static dfa_t
1190 1119 : mkdfa(const unsigned char *sep, size_t seplen)
1191 : {
1192 1119 : dfa_t dfa;
1193 1119 : size_t i, j, k;
1194 :
1195 1119 : dfa = GDKzalloc(seplen * sizeof(*dfa));
1196 1119 : if (dfa == NULL)
1197 : return NULL;
1198 : /* Each character in the separator string advances the state by
1199 : * one. If state reaches seplen, the separator was recognized.
1200 : *
1201 : * The first loop and the nested loop make sure that if in any
1202 : * state we encounter an invalid character, but part of what we've
1203 : * matched so far is a prefix of the separator, we go to the
1204 : * appropriate state. */
1205 2260 : for (i = 0; i < seplen; i++)
1206 1141 : dfa[i][sep[0]] = 1;
1207 2260 : for (j = 0; j < seplen; j++) {
1208 1141 : dfa[j][sep[j]] = (unsigned char) (j + 1);
1209 1163 : for (k = 0; k < j; k++) {
1210 44 : for (i = 0; i < j - k; i++)
1211 22 : if (sep[k + i] != sep[i])
1212 : break;
1213 22 : if (i == j - k && dfa[j][sep[i]] <= i)
1214 0 : dfa[j][sep[i]] = (unsigned char) (i + 1);
1215 : }
1216 : }
1217 : return dfa;
1218 : }
1219 :
1220 : #ifdef __has_builtin
1221 : #if __has_builtin(__builtin_expect)
1222 : /* __builtin_expect returns its first argument; it is expected to be
1223 : * equal to the second argument */
1224 : #define unlikely(expr) __builtin_expect((expr) != 0, 0)
1225 : #define likely(expr) __builtin_expect((expr) != 0, 1)
1226 : #endif
1227 : #endif
1228 : #ifndef unlikely
1229 : #ifdef _MSC_VER
1230 : #define unlikely(expr) (__assume(!(expr)), (expr))
1231 : #define likely(expr) (__assume((expr)), (expr))
1232 : #else
1233 : #define unlikely(expr) (expr)
1234 : #define likely(expr) (expr)
1235 : #endif
1236 : #endif
1237 :
1238 : static void
1239 1119 : SQLproducer(void *p)
1240 : {
1241 1119 : READERtask *task = (READERtask *) p;
1242 1119 : bool consoleinput = false;
1243 1119 : int cur = 0; // buffer being filled
1244 1119 : bool blocked[MAXBUFFERS] = { false };
1245 1119 : bool ateof[MAXBUFFERS] = { false };
1246 1119 : BUN cnt = 0, bufcnt[MAXBUFFERS] = { 0 };
1247 1119 : char *end = NULL, *e = NULL, *s = NULL, *base;
1248 1119 : const char *rsep = task->rsep;
1249 1119 : size_t rseplen = strlen(rsep), partial = 0;
1250 1119 : char quote = task->quote;
1251 1119 : dfa_t rdfa;
1252 1119 : lng rowno = 0;
1253 1119 : lng lineno = 1;
1254 1119 : lng startlineno = 1;
1255 1119 : int more = 0;
1256 :
1257 1119 : MT_sema_down(&task->producer);
1258 1119 : if (task->id < 0) {
1259 : return;
1260 : }
1261 :
1262 1119 : MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
1263 1119 : rdfa = mkdfa((const unsigned char *) rsep, rseplen);
1264 1119 : if (rdfa == NULL) {
1265 0 : tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate memory",
1266 : "");
1267 0 : ateof[cur] = true;
1268 0 : goto reportlackofinput;
1269 : }
1270 :
1271 : /* TRC_DEBUG(MAL_SERVER, "SQLproducer started size '%zu' and len '%zu'\n", task->b->size, task->b->len);*/
1272 :
1273 1119 : base = end = s = task->input[cur];
1274 1119 : *s = 0;
1275 1119 : task->cur = cur;
1276 1119 : if (task->as->filename == NULL) {
1277 782 : consoleinput = true;
1278 782 : goto parseSTDIN;
1279 : }
1280 205235 : for (;;) {
1281 102786 : startlineno = lineno;
1282 102786 : ateof[cur] = !tablet_read_more(task);
1283 :
1284 : // we may be reading from standard input and may be out of input
1285 : // warn the consumers
1286 102786 : if (task->aborted || ((lineno & 8191) == 0 && bstream_getoob(task->cntxt->fdin))) {
1287 0 : tablet_error(task, rowno, lineno, int_nil,
1288 : "problem reported by client", s);
1289 0 : ateof[cur] = true;
1290 0 : goto reportlackofinput;
1291 : }
1292 :
1293 102786 : if (ateof[cur] && partial) {
1294 1 : if (unlikely(partial)) {
1295 1 : tablet_error(task, rowno, lineno, int_nil,
1296 : "incomplete record at end of file", s);
1297 1 : task->b->pos += partial;
1298 : }
1299 1 : goto reportlackofinput;
1300 : }
1301 :
1302 102785 : if (task->errbuf && task->errbuf[0]) {
1303 0 : if (unlikely(GDKerrbuf && GDKerrbuf[0])) {
1304 0 : tablet_error(task, rowno, lineno, int_nil, GDKerrbuf,
1305 : "SQLload_file");
1306 : /* TRC_DEBUG(MAL_SERVER, "Bailout on SQLload\n");*/
1307 0 : ateof[cur] = true;
1308 0 : break;
1309 : }
1310 : }
1311 :
1312 102785 : parseSTDIN:
1313 :
1314 : /* copy the stream buffer into the input buffer, which is guaranteed larger, but still limited */
1315 103567 : partial = 0;
1316 103567 : task->top[cur] = 0;
1317 103567 : s = task->input[cur];
1318 103567 : base = end;
1319 : /* avoid too long records */
1320 103567 : if (unlikely(end - s + task->b->len - task->b->pos >= task->rowlimit[cur])) {
1321 : /* the input buffer should be extended, but 'base' is not shared
1322 : between the threads, which we can not now update.
1323 : Mimic an ateof instead; */
1324 0 : tablet_error(task, rowno, lineno, int_nil, "record too long", "");
1325 0 : ateof[cur] = true;
1326 : /* TRC_DEBUG(MAL_SERVER, "Bailout on SQLload confronted with too large record\n");*/
1327 0 : goto reportlackofinput;
1328 : }
1329 103567 : memcpy(end, task->b->buf + task->b->pos, task->b->len - task->b->pos);
1330 103567 : end = end + task->b->len - task->b->pos;
1331 103567 : *end = '\0'; /* this is safe, as the stream ensures an extra byte */
1332 : /* Note that we rescan from the start of a record (the last
1333 : * partial buffer from the previous iteration), even if in the
1334 : * previous iteration we have already established that there
1335 : * is no record separator in the first, perhaps significant,
1336 : * part of the buffer. This is because if the record separator
1337 : * is longer than one byte, it is too complex (i.e. would
1338 : * require more state) to be sure what the state of the quote
1339 : * status is when we back off a few bytes from where the last
1340 : * scan ended (we need to back off some since we could be in
1341 : * the middle of the record separator). If this is too
1342 : * costly, we have to rethink the matter. */
1343 103567 : if (task->from_stdin && *s == '\n' && task->maxrow == BUN_MAX) {
1344 0 : ateof[cur] = true;
1345 0 : goto reportlackofinput;
1346 : }
1347 145190414 : for (e = s; *e && e < end && cnt < task->maxrow;) {
1348 : /* tokenize the record completely
1349 : *
1350 : * The format of the input should comply to the following
1351 : * grammar rule [ [[quote][[esc]char]*[quote]csep]*rsep]*
1352 : * where quote is a single user-defined character.
1353 : * Within the quoted fields a character may be escaped
1354 : * with a backslash. The correct number of fields should
1355 : * be supplied. In the first phase we simply break the
1356 : * rows at the record boundary. */
1357 : int nutf = 0;
1358 : int m = 0;
1359 : bool bs = false;
1360 : char q = 0;
1361 : size_t i = 0;
1362 3136516453 : while (*e) {
1363 3136515424 : if (task->skip > 0) {
1364 : /* no interpretation of data we're skipping, just
1365 : * look for newline */
1366 1827 : if (*e == '\n') {
1367 27 : lineno++;
1368 27 : break;
1369 : }
1370 : } else {
1371 : /* check for correctly encoded UTF-8 */
1372 3136513597 : if (nutf > 0) {
1373 1841 : if (unlikely((*e & 0xC0) != 0x80))
1374 1 : goto badutf8;
1375 3680 : if (unlikely(m != 0 && (*e & m) == 0))
1376 0 : goto badutf8;
1377 1840 : m = 0;
1378 1840 : nutf--;
1379 3136511756 : } else if ((*e & 0x80) != 0) {
1380 1709 : if ((*e & 0xE0) == 0xC0) {
1381 1582 : nutf = 1;
1382 1582 : if (unlikely((e[0] & 0x1E) == 0))
1383 0 : goto badutf8;
1384 127 : } else if ((*e & 0xF0) == 0xE0) {
1385 122 : nutf = 2;
1386 122 : if ((e[0] & 0x0F) == 0)
1387 3 : m = 0x20;
1388 5 : } else if (likely((*e & 0xF8) == 0xF0)) {
1389 5 : nutf = 3;
1390 5 : if ((e[0] & 0x07) == 0)
1391 5 : m = 0x30;
1392 : } else {
1393 0 : goto badutf8;
1394 : }
1395 3136510047 : } else if (*e == '\n')
1396 145086677 : lineno++;
1397 : /* check for quoting and the row separator */
1398 3136513596 : if (bs) {
1399 : bs = false;
1400 3135970380 : } else if (task->escape && *e == '\\') {
1401 : bs = true;
1402 : i = 0;
1403 3135427164 : } else if (*e == q) {
1404 : q = 0;
1405 3129868493 : } else if (*e == quote) {
1406 : q = quote;
1407 : i = 0;
1408 3124309740 : } else if (q == 0) {
1409 2943366880 : i = rdfa[i][(unsigned char) *e];
1410 2943366880 : if (i == rseplen)
1411 : break;
1412 : }
1413 : }
1414 2991428576 : e++;
1415 : }
1416 145087876 : if (*e == 0) {
1417 1029 : partial = e - s;
1418 : /* found an incomplete record, saved for next round */
1419 1029 : if (unlikely(s + partial < end)) {
1420 : /* found a EOS in the input */
1421 0 : tablet_error(task, rowno, startlineno, int_nil,
1422 : "record too long (EOS found)", "");
1423 0 : ateof[cur] = true;
1424 0 : goto reportlackofinput;
1425 : }
1426 : break;
1427 : } else {
1428 145086847 : rowno++;
1429 145086847 : if (task->skip > 0) {
1430 27 : task->skip--;
1431 : } else {
1432 145086820 : if (cnt < task->maxrow) {
1433 145086820 : task->startlineno[cur][task->top[cur]] = startlineno;
1434 145086820 : task->rows[cur][task->top[cur]++] = s;
1435 145086820 : startlineno = lineno;
1436 145086820 : cnt++;
1437 : }
1438 145086820 : *(e + 1 - rseplen) = 0;
1439 : }
1440 145086847 : s = ++e;
1441 145086847 : task->b->pos += (size_t) (e - base);
1442 145086847 : base = e;
1443 145086847 : if (task->top[cur] == task->limit)
1444 : break;
1445 : }
1446 : }
1447 :
1448 102537 : reportlackofinput:
1449 : /* TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
1450 :
1451 103568 : if (consoleinput) {
1452 101957 : task->cur = cur;
1453 101957 : task->ateof = ateof[cur];
1454 101957 : task->cnt = bufcnt[cur];
1455 : /* tell consumer to go ahead */
1456 101957 : MT_sema_up(&task->consumer);
1457 : /* then wait until it is done */
1458 101957 : MT_sema_down(&task->producer);
1459 101957 : if (cnt == task->maxrow) {
1460 779 : GDKfree(rdfa);
1461 779 : MT_thread_set_qry_ctx(NULL);
1462 779 : return;
1463 : }
1464 : } else {
1465 1611 : assert(!blocked[cur]);
1466 1611 : if (blocked[(cur + 1) % MAXBUFFERS]) {
1467 : /* first wait until other buffer is done */
1468 : /* TRC_DEBUG(MAL_SERVER, "Wait for consumers to finish buffer: %d\n", (cur + 1) % MAXBUFFERS);*/
1469 :
1470 1274 : MT_sema_down(&task->producer);
1471 1274 : blocked[(cur + 1) % MAXBUFFERS] = false;
1472 1274 : if (task->state == ENDOFCOPY) {
1473 0 : GDKfree(rdfa);
1474 0 : MT_thread_set_qry_ctx(NULL);
1475 0 : return;
1476 : }
1477 : }
1478 : /* other buffer is done, proceed with current buffer */
1479 1611 : assert(!blocked[(cur + 1) % MAXBUFFERS]);
1480 1611 : blocked[cur] = true;
1481 1611 : task->cur = cur;
1482 1611 : task->ateof = ateof[cur];
1483 1611 : task->cnt = bufcnt[cur];
1484 1611 : more = !ateof[cur] || (e && e < end
1485 0 : && task->top[cur] == task->limit);
1486 : /* TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
1487 :
1488 1611 : MT_sema_up(&task->consumer);
1489 :
1490 1611 : cur = (cur + 1) % MAXBUFFERS;
1491 : /* TRC_DEBUG(MAL_SERVER, "May continue with buffer: %d\n", cur);*/
1492 :
1493 1611 : if (cnt == task->maxrow) {
1494 182 : MT_sema_down(&task->producer);
1495 : /* TRC_DEBUG(MAL_SERVER, "Producer delivered all\n");*/
1496 182 : GDKfree(rdfa);
1497 182 : MT_thread_set_qry_ctx(NULL);
1498 182 : return;
1499 : }
1500 : }
1501 : /* TRC_DEBUG(MAL_SERVER, "Continue producer buffer: %d\n", cur);*/
1502 :
1503 : /* we ran out of input? */
1504 102607 : if (task->ateof && !more) {
1505 : /* TRC_DEBUG(MAL_SERVER, "Producer encountered eof\n");*/
1506 158 : GDKfree(rdfa);
1507 158 : MT_thread_set_qry_ctx(NULL);
1508 158 : return;
1509 : }
1510 : /* consumers ask us to stop? */
1511 102449 : if (task->state == ENDOFCOPY) {
1512 0 : GDKfree(rdfa);
1513 0 : MT_thread_set_qry_ctx(NULL);
1514 0 : return;
1515 : }
1516 102449 : bufcnt[cur] = cnt;
1517 : /* move the non-parsed correct row data to the head of the next buffer */
1518 102449 : end = s = task->input[cur];
1519 : }
1520 0 : if (unlikely(cnt < task->maxrow && task->maxrow != BUN_NONE)) {
1521 0 : char msg[256];
1522 0 : snprintf(msg, sizeof(msg), "incomplete record at end of file:%s\n", s);
1523 0 : task->as->error = GDKstrdup(msg);
1524 0 : tablet_error(task, rowno, startlineno, int_nil,
1525 : "incomplete record at end of file", s);
1526 0 : task->b->pos += partial;
1527 : }
1528 0 : GDKfree(rdfa);
1529 0 : MT_thread_set_qry_ctx(NULL);
1530 :
1531 0 : return;
1532 :
1533 1 : badutf8:
1534 1 : tablet_error(task, rowno, startlineno, int_nil,
1535 : "input not properly encoded UTF-8", "");
1536 1 : ateof[cur] = true;
1537 1 : goto reportlackofinput;
1538 : }
1539 :
1540 : static void
1541 1147 : create_rejects_table(Client cntxt)
1542 : {
1543 1147 : MT_lock_set(&mal_contextLock);
1544 1147 : if (cntxt->error_row == NULL) {
1545 483 : cntxt->error_row = COLnew(0, TYPE_lng, 0, TRANSIENT);
1546 483 : cntxt->error_fld = COLnew(0, TYPE_int, 0, TRANSIENT);
1547 483 : cntxt->error_msg = COLnew(0, TYPE_str, 0, TRANSIENT);
1548 483 : cntxt->error_input = COLnew(0, TYPE_str, 0, TRANSIENT);
1549 483 : if (cntxt->error_row == NULL || cntxt->error_fld == NULL
1550 483 : || cntxt->error_msg == NULL || cntxt->error_input == NULL) {
1551 0 : BBPreclaim(cntxt->error_row);
1552 0 : BBPreclaim(cntxt->error_fld);
1553 0 : BBPreclaim(cntxt->error_msg);
1554 0 : BBPreclaim(cntxt->error_input);
1555 0 : cntxt->error_row = cntxt->error_fld = cntxt->error_msg = cntxt->error_input = NULL;
1556 : }
1557 : }
1558 1147 : MT_lock_unset(&mal_contextLock);
1559 1147 : }
1560 :
1561 : BUN
1562 1119 : SQLload_file(Client cntxt, Tablet *as, bstream *b, stream *out,
1563 : const char *csep, const char *rsep, char quote, lng skip,
1564 : lng maxrow, int best, bool from_stdin, const char *tabnam,
1565 : bool escape)
1566 : {
1567 1119 : BUN cnt = 0, cntstart = 0, leftover = 0;
1568 1119 : int res = 0; /* < 0: error, > 0: success, == 0: continue processing */
1569 1119 : int j;
1570 1119 : BUN firstcol;
1571 1119 : BUN i, attr;
1572 1119 : READERtask task;
1573 1119 : READERtask ptask[MAXWORKERS];
1574 1119 : int threads = 1;
1575 1119 : lng tio, t1 = 0;
1576 1119 : char name[MT_NAME_LEN];
1577 :
1578 1119 : if (maxrow < 0 || maxrow > (LL_CONSTANT(1) << 16)) {
1579 112 : threads = GDKgetenv_int("tablet_threads", GDKnr_threads);
1580 112 : if (threads > 1)
1581 112 : threads = threads < MAXWORKERS ? threads - 1 : MAXWORKERS - 1;
1582 : else
1583 : threads = 1;
1584 : }
1585 :
1586 : /* TRC_DEBUG(MAL_SERVER, "Prepare copy work for '%d' threads col '%s' rec '%s' quot '%c'\n", threads, csep, rsep, quote);*/
1587 :
1588 1119 : memset(ptask, 0, sizeof(ptask));
1589 2238 : task = (READERtask) {
1590 : .cntxt = cntxt,
1591 : .from_stdin = from_stdin,
1592 : .as = as,
1593 : .escape = escape, /* TODO: implement feature!!! */
1594 1119 : .set_qry_ctx = MT_thread_get_qry_ctx() != NULL,
1595 : };
1596 :
1597 : /* create the reject tables */
1598 1119 : create_rejects_table(task.cntxt);
1599 1119 : if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL
1600 1119 : || task.cntxt->error_msg == NULL || task.cntxt->error_input == NULL) {
1601 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1602 : "SQLload initialization failed", "");
1603 : /* nothing allocated yet, so nothing to free */
1604 0 : return BUN_NONE;
1605 : }
1606 :
1607 1119 : assert(rsep);
1608 1119 : assert(csep);
1609 1119 : assert(maxrow < 0 || maxrow <= (lng) BUN_MAX);
1610 1119 : task.fields = (char ***) GDKzalloc(as->nr_attrs * sizeof(char **));
1611 1119 : task.cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
1612 1119 : task.time = (lng *) GDKzalloc(as->nr_attrs * sizeof(lng));
1613 1119 : if (task.fields == NULL || task.cols == NULL || task.time == NULL) {
1614 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1615 : "memory allocation failed", "SQLload_file");
1616 0 : goto bailout;
1617 : }
1618 1119 : task.cur = 0;
1619 3357 : for (i = 0; i < MAXBUFFERS; i++) {
1620 2238 : task.base[i] = GDKmalloc(MAXROWSIZE(2 * b->size) + 2);
1621 2238 : task.rowlimit[i] = MAXROWSIZE(2 * b->size);
1622 2238 : if (task.base[i] == NULL) {
1623 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1624 : SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
1625 0 : goto bailout;
1626 : }
1627 2238 : task.base[i][0] = task.base[i][b->size + 1] = 0;
1628 2238 : task.input[i] = task.base[i] + 1; /* wrap the buffer with null bytes */
1629 : }
1630 1119 : task.besteffort = best;
1631 :
1632 1119 : if (maxrow < 0)
1633 78 : task.maxrow = BUN_MAX;
1634 : else
1635 1041 : task.maxrow = (BUN) maxrow;
1636 :
1637 1119 : task.skip = skip;
1638 1119 : task.quote = quote;
1639 1119 : task.csep = csep;
1640 1119 : task.seplen = strlen(csep);
1641 1119 : task.rsep = rsep;
1642 1119 : task.rseplen = strlen(rsep);
1643 1119 : task.errbuf = cntxt->errbuf;
1644 :
1645 1119 : MT_sema_init(&task.producer, 0, "task.producer");
1646 1119 : MT_sema_init(&task.consumer, 0, "task.consumer");
1647 1119 : task.ateof = false;
1648 1119 : task.b = b;
1649 1119 : task.out = out;
1650 :
1651 1119 : as->error = NULL;
1652 :
1653 : /* there is no point in creating more threads than we have columns */
1654 1119 : if (as->nr_attrs < (BUN) threads)
1655 44 : threads = (int) as->nr_attrs;
1656 :
1657 : /* allocate enough space for pointers into the buffer pool. */
1658 : /* the record separator is considered a column */
1659 1119 : task.limit = (int) (b->size / as->nr_attrs + as->nr_attrs);
1660 11458 : for (i = 0; i < as->nr_attrs; i++) {
1661 10339 : task.fields[i] = GDKmalloc(sizeof(char *) * task.limit);
1662 10339 : if (task.fields[i] == NULL) {
1663 0 : if (task.as->error == NULL)
1664 0 : as->error = createException(MAL, "sql.copy_from",
1665 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
1666 0 : goto bailout;
1667 : }
1668 10339 : task.cols[i] = (int) (i + 1); /* to distinguish non initialized later with zero */
1669 : }
1670 3357 : for (i = 0; i < MAXBUFFERS; i++) {
1671 2238 : task.rows[i] = GDKzalloc(sizeof(char *) * task.limit);
1672 2238 : task.startlineno[i] = GDKzalloc(sizeof(lng) * task.limit);
1673 2238 : if (task.rows[i] == NULL || task.startlineno[i] == NULL) {
1674 0 : GDKfree(task.rows[i]);
1675 0 : GDKfree(task.startlineno[i]);
1676 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1677 : SQLSTATE(HY013) MAL_MALLOC_FAIL,
1678 : "SQLload_file:failed to alloc buffers");
1679 0 : goto bailout;
1680 : }
1681 : }
1682 1119 : task.rowerror = (bte *) GDKzalloc(sizeof(bte) * task.limit);
1683 1119 : if (task.rowerror == NULL) {
1684 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1685 : SQLSTATE(HY013) MAL_MALLOC_FAIL,
1686 : "SQLload_file:failed to alloc rowerror buffer");
1687 0 : goto bailout;
1688 : }
1689 :
1690 1119 : task.id = 0;
1691 1119 : snprintf(name, sizeof(name), "prod-%s", tabnam);
1692 1119 : if (MT_create_thread(&task.tid, SQLproducer, (void *) &task, MT_THR_JOINABLE, name) < 0) {
1693 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1694 : SQLSTATE(42000) "failed to start producer thread",
1695 : "SQLload_file");
1696 0 : goto bailout;
1697 : }
1698 : /* TRC_DEBUG(MAL_SERVER, "Parallel bulk load " LLFMT " - " BUNFMT "\n", skip, task.maxrow);*/
1699 :
1700 1119 : task.workers = threads;
1701 2409 : for (j = 0; j < threads; j++) {
1702 1290 : ptask[j] = task;
1703 1290 : ptask[j].id = j;
1704 1290 : ptask[j].cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
1705 1290 : if (ptask[j].cols == NULL) {
1706 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1707 : SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
1708 0 : task.id = -1;
1709 0 : MT_sema_up(&task.producer);
1710 0 : goto bailout;
1711 : }
1712 1290 : snprintf(name, sizeof(name), "ptask%d.sema", j);
1713 1290 : MT_sema_init(&ptask[j].sema, 0, name);
1714 1290 : snprintf(name, sizeof(name), "ptask%d.repl", j);
1715 1290 : MT_sema_init(&ptask[j].reply, 0, name);
1716 1290 : snprintf(name, sizeof(name), "wrkr%d-%s", j, tabnam);
1717 1290 : if (MT_create_thread(&ptask[j].tid, SQLworker, (void *) &ptask[j], MT_THR_JOINABLE, name) < 0) {
1718 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1719 : SQLSTATE(42000) "failed to start worker thread",
1720 : "SQLload_file");
1721 0 : threads = j;
1722 0 : for (j = 0; j < threads; j++)
1723 0 : ptask[j].workers = threads;
1724 : }
1725 : }
1726 1119 : if (threads == 0) {
1727 : /* no threads started */
1728 0 : task.id = -1;
1729 0 : MT_sema_up(&task.producer);
1730 0 : goto bailout;
1731 : }
1732 1119 : MT_sema_up(&task.producer);
1733 :
1734 1119 : tio = GDKusec();
1735 1119 : tio = GDKusec() - tio;
1736 1119 : t1 = GDKusec();
1737 2241 : for (firstcol = 0; firstcol < task.as->nr_attrs; firstcol++)
1738 1122 : if (task.as->format[firstcol].c != NULL)
1739 : break;
1740 104506 : while (res == 0 && cnt < task.maxrow) {
1741 :
1742 : // track how many elements are in the aggregated BATs
1743 103568 : cntstart = BATcount(task.as->format[firstcol].c);
1744 : /* block until the producer has data available */
1745 103568 : MT_sema_down(&task.consumer);
1746 103568 : cnt += task.top[task.cur];
1747 103568 : if (task.ateof && !task.top[task.cur])
1748 : break;
1749 103410 : t1 = GDKusec() - t1;
1750 : /* TRC_DEBUG(MAL_SERVER, "Break: %d rows\n", task.top[task.cur]);*/
1751 :
1752 103410 : t1 = GDKusec();
1753 103410 : if (task.top[task.cur]) {
1754 : /* activate the workers to break rows */
1755 408329 : for (j = 0; j < threads; j++) {
1756 : /* stage one, break the rows in parallel */
1757 304949 : ptask[j].error = 0;
1758 304949 : ptask[j].state = BREAKROW;
1759 304949 : ptask[j].next = task.top[task.cur];
1760 304949 : ptask[j].fields = task.fields;
1761 304949 : ptask[j].limit = task.limit;
1762 304949 : ptask[j].cnt = task.cnt;
1763 304949 : ptask[j].cur = task.cur;
1764 304949 : ptask[j].top[task.cur] = task.top[task.cur];
1765 304949 : MT_sema_up(&ptask[j].sema);
1766 : }
1767 : }
1768 103410 : if (task.top[task.cur]) {
1769 : /* await completion of row break phase */
1770 408329 : for (j = 0; j < threads; j++) {
1771 304949 : MT_sema_down(&ptask[j].reply);
1772 304949 : if (ptask[j].error) {
1773 0 : res = -1;
1774 : /* TRC_ERROR(MAL_SERVER, "Error in task: %d %d\n", j, ptask[j].error);*/
1775 : }
1776 : }
1777 : }
1778 :
1779 : /* TRC_DEBUG(MAL_SERVER,
1780 : "Fill the BATs '%d' " BUNFMT " cap " BUNFMT "\n",
1781 : task.top[task.cur], task.cnt, BATcapacity(as->format[task.cur].c));*/
1782 :
1783 103410 : if (task.top[task.cur]) {
1784 103380 : if (res == 0) {
1785 103380 : SQLworkdivider(&task, ptask, (int) as->nr_attrs, threads);
1786 :
1787 : /* activate the workers to update the BATs */
1788 511709 : for (j = 0; j < threads; j++) {
1789 : /* stage two, update the BATs */
1790 304949 : ptask[j].state = UPDATEBAT;
1791 304949 : MT_sema_up(&ptask[j].sema);
1792 : }
1793 : }
1794 : }
1795 103410 : tio = GDKusec();
1796 103410 : tio = t1 - tio;
1797 :
1798 : /* await completion of the BAT updates */
1799 103410 : if (res == 0 && task.top[task.cur]) {
1800 408329 : for (j = 0; j < threads; j++) {
1801 304949 : MT_sema_down(&ptask[j].reply);
1802 304949 : if (ptask[j].errorcnt > 0 && !ptask[j].besteffort) {
1803 304949 : res = -1;
1804 304949 : best = 0;
1805 : }
1806 : }
1807 : }
1808 :
1809 : /* trim the BATs discarding error tuples */
1810 : #define trimerrors(TYPE) \
1811 : do { \
1812 : TYPE *src, *dst; \
1813 : leftover= BATcount(task.as->format[attr].c); \
1814 : limit = leftover - cntstart; \
1815 : dst =src= (TYPE *) BUNtloc(task.as->format[attr].ci,cntstart); \
1816 : for(j = 0; j < (int) limit; j++, src++){ \
1817 : if ( task.rowerror[j]){ \
1818 : leftover--; \
1819 : continue; \
1820 : } \
1821 : *dst++ = *src; \
1822 : } \
1823 : BATsetcount(task.as->format[attr].c, leftover ); \
1824 : } while (0)
1825 :
1826 : /* TRC_DEBUG(MAL_SERVER, "Trim bbest '%d' table size " BUNFMT " - rows found so far " BUNFMT "\n",
1827 : best, BATcount(as->format[firstcol].c), task.cnt); */
1828 :
1829 103410 : if (best && BATcount(as->format[firstcol].c)) {
1830 : BUN limit;
1831 : int width;
1832 :
1833 45 : for (attr = 0; attr < as->nr_attrs; attr++) {
1834 31 : if (as->format[attr].skip)
1835 5 : continue;
1836 26 : width = as->format[attr].c->twidth;
1837 26 : as->format[attr].ci = bat_iterator_nolock(as->format[attr].c);
1838 26 : switch (width) {
1839 5 : case 1:
1840 16 : trimerrors(bte);
1841 5 : break;
1842 0 : case 2:
1843 0 : trimerrors(sht);
1844 0 : break;
1845 21 : case 4:
1846 55 : trimerrors(int);
1847 21 : break;
1848 0 : case 8:
1849 0 : trimerrors(lng);
1850 0 : break;
1851 : #ifdef HAVE_HGE
1852 0 : case 16:
1853 0 : trimerrors(hge);
1854 0 : break;
1855 : #endif
1856 0 : default:
1857 : {
1858 0 : char *src, *dst;
1859 0 : leftover = BATcount(task.as->format[attr].c);
1860 0 : limit = leftover - cntstart;
1861 0 : dst = src = BUNtloc(task.as->format[attr].ci, cntstart);
1862 0 : for (j = 0; j < (int) limit; j++, src += width) {
1863 0 : if (task.rowerror[j]) {
1864 0 : leftover--;
1865 0 : continue;
1866 : }
1867 0 : if (dst != src)
1868 0 : memcpy(dst, src, width);
1869 0 : dst += width;
1870 : }
1871 0 : BATsetcount(task.as->format[attr].c, leftover);
1872 : }
1873 0 : break;
1874 : }
1875 : }
1876 : // re-initialize the error vector;
1877 14 : memset(task.rowerror, 0, task.limit);
1878 14 : task.errorcnt = 0;
1879 : }
1880 :
1881 103410 : if (res < 0) {
1882 : /* producer should stop */
1883 23 : task.maxrow = cnt;
1884 23 : task.state = ENDOFCOPY;
1885 23 : task.ateof = true;
1886 : }
1887 103410 : if (task.ateof && task.top[task.cur] < task.limit && cnt != task.maxrow)
1888 : break;
1889 103410 : task.top[task.cur] = 0;
1890 103410 : if (cnt == task.maxrow)
1891 961 : task.ateof = true;
1892 104529 : MT_sema_up(&task.producer);
1893 : }
1894 :
1895 : /* TRC_DEBUG(MAL_SERVER, "End of block stream eof=%d - res=%d\n", task.ateof, res);*/
1896 :
1897 1119 : cnt = BATcount(task.as->format[firstcol].c);
1898 :
1899 1119 : task.state = ENDOFCOPY;
1900 : /* TRC_DEBUG(MAL_SERVER, "Activate sync on disk\n");*/
1901 :
1902 1119 : if (!task.ateof || cnt < task.maxrow) {
1903 : /* TRC_DEBUG(MAL_SERVER, "Shut down reader\n");*/
1904 175 : MT_sema_up(&task.producer);
1905 : }
1906 1119 : MT_join_thread(task.tid);
1907 :
1908 : /* TRC_DEBUG(MAL_SERVER, "Activate endofcopy\n");*/
1909 :
1910 3528 : for (j = 0; j < threads; j++) {
1911 1290 : ptask[j].state = ENDOFCOPY;
1912 1290 : MT_sema_up(&ptask[j].sema);
1913 : }
1914 : /* wait for their death */
1915 2409 : for (j = 0; j < threads; j++)
1916 1290 : MT_sema_down(&ptask[j].reply);
1917 :
1918 : /* TRC_DEBUG(MAL_SERVER, "Kill the workers\n");*/
1919 :
1920 2409 : for (j = 0; j < threads; j++) {
1921 1290 : MT_join_thread(ptask[j].tid);
1922 1290 : GDKfree(ptask[j].cols);
1923 1290 : MT_sema_destroy(&ptask[j].sema);
1924 1290 : MT_sema_destroy(&ptask[j].reply);
1925 : }
1926 :
1927 : /* TRC_DEBUG(MAL_SERVER, "Found " BUNFMT " tuples\n", cnt);*/
1928 : /* TRC_DEBUG(MAL_SERVER, "Leftover input: %.63s\n", task.b->buf + task.b->pos);*/
1929 :
1930 11458 : for (i = 0; i < as->nr_attrs; i++) {
1931 10339 : BAT *b = task.as->format[i].c;
1932 10339 : if (b)
1933 10333 : BATsettrivprop(b);
1934 10339 : GDKfree(task.fields[i]);
1935 : }
1936 1119 : GDKfree(task.fields);
1937 1119 : GDKfree(task.cols);
1938 1119 : GDKfree(task.time);
1939 4476 : for (i = 0; i < MAXBUFFERS; i++) {
1940 2238 : GDKfree(task.base[i]);
1941 2238 : GDKfree(task.rows[i]);
1942 2238 : GDKfree(task.startlineno[i]);
1943 : }
1944 1119 : if (task.rowerror)
1945 1119 : GDKfree(task.rowerror);
1946 1119 : MT_sema_destroy(&task.producer);
1947 1119 : MT_sema_destroy(&task.consumer);
1948 :
1949 1119 : return res < 0 ? BUN_NONE : cnt;
1950 :
1951 0 : bailout:
1952 0 : if (task.fields) {
1953 0 : for (i = 0; i < as->nr_attrs; i++)
1954 0 : GDKfree(task.fields[i]);
1955 0 : GDKfree(task.fields);
1956 : }
1957 0 : GDKfree(task.time);
1958 0 : GDKfree(task.cols);
1959 0 : GDKfree(task.base[task.cur]);
1960 0 : GDKfree(task.rowerror);
1961 0 : for (i = 0; i < MAXWORKERS; i++)
1962 0 : GDKfree(ptask[i].cols);
1963 : return BUN_NONE;
1964 : }
1965 :
1966 : /* return the latest reject table */
1967 : str
1968 28 : COPYrejects(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1969 : {
1970 28 : bat *row = getArgReference_bat(stk, pci, 0);
1971 28 : bat *fld = getArgReference_bat(stk, pci, 1);
1972 28 : bat *msg = getArgReference_bat(stk, pci, 2);
1973 28 : bat *inp = getArgReference_bat(stk, pci, 3);
1974 :
1975 28 : create_rejects_table(cntxt);
1976 28 : if (cntxt->error_row == NULL)
1977 0 : throw(MAL, "sql.rejects", "No reject table available");
1978 28 : MT_lock_set(&errorlock);
1979 28 : BAT *bn1 = COLcopy(cntxt->error_row, cntxt->error_row->ttype, true, TRANSIENT);
1980 28 : BAT *bn2 = COLcopy(cntxt->error_fld, cntxt->error_fld->ttype, true, TRANSIENT);
1981 28 : BAT *bn3 = COLcopy(cntxt->error_msg, cntxt->error_msg->ttype, true, TRANSIENT);
1982 28 : BAT *bn4 = COLcopy(cntxt->error_input, cntxt->error_input->ttype, true, TRANSIENT);
1983 28 : MT_lock_unset(&errorlock);
1984 28 : if (bn1 == NULL || bn2 == NULL || bn3 == NULL || bn4 == NULL) {
1985 0 : BBPreclaim(bn1);
1986 0 : BBPreclaim(bn2);
1987 0 : BBPreclaim(bn3);
1988 0 : BBPreclaim(bn4);
1989 0 : throw(MAL, "sql.rejects", GDK_EXCEPTION);
1990 : }
1991 28 : *row = bn1->batCacheid;
1992 28 : *fld = bn2->batCacheid;
1993 28 : *msg = bn3->batCacheid;
1994 28 : *inp = bn4->batCacheid;
1995 28 : BBPkeepref(bn1);
1996 28 : BBPkeepref(bn2);
1997 28 : BBPkeepref(bn3);
1998 28 : BBPkeepref(bn4);
1999 28 : (void) mb;
2000 28 : return MAL_SUCCEED;
2001 : }
2002 :
2003 : str
2004 13 : COPYrejects_clear(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
2005 : {
2006 13 : if (cntxt->error_row) {
2007 13 : MT_lock_set(&errorlock);
2008 13 : BATclear(cntxt->error_row, true);
2009 13 : if (cntxt->error_fld)
2010 13 : BATclear(cntxt->error_fld, true);
2011 13 : if (cntxt->error_msg)
2012 13 : BATclear(cntxt->error_msg, true);
2013 13 : if (cntxt->error_input)
2014 13 : BATclear(cntxt->error_input, true);
2015 13 : MT_lock_unset(&errorlock);
2016 : }
2017 13 : (void) mb;
2018 13 : (void) stk;
2019 13 : (void) pci;
2020 13 : return MAL_SUCCEED;
2021 : }
|