Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * Niels Nes, Martin Kersten
15 : *
16 : * Parallel bulk load for SQL
17 : * The COPY INTO command for SQL is heavily CPU bound, which means
18 : * that ideally we would like to exploit the multi-cores to do that
19 : * work in parallel.
20 : * Complicating factors are the initial record offset, the
21 : * possible variable length of the input, and the original sort order
22 : * that should preferable be maintained.
23 : *
24 : * The code below consists of a file reader, which breaks up the
25 : * file into chunks of distinct rows. Then multiple parallel threads
26 : * grab them, and break them on the field boundaries.
27 : * After all fields are identified this way, the columns are converted
28 : * and stored in the BATs.
29 : *
30 : * The threads get a reference to a private copy of the READERtask.
31 : * It includes a list of columns they should handle. This is a basis
32 : * to distributed cheap and expensive columns over threads.
33 : *
34 : * The file reader overlaps IO with updates of the BAT.
35 : * Also the buffer size of the block stream might be a little small for
36 : * this task (1MB). It has been increased to 8MB, which indeed improved.
37 : *
38 : * The work divider allocates subtasks to threads based on the
39 : * observed time spending so far.
40 : */
41 :
42 : #include "monetdb_config.h"
43 : #include "tablet.h"
44 : #include "mapi_prompt.h"
45 : #include "mal_internal.h"
46 :
47 : #include <string.h>
48 : #include <ctype.h>
49 :
50 : #define MAXWORKERS 64
51 : #define MAXBUFFERS 2
52 : /* We restrict the row length to be 32MB for the time being */
53 : #define MAXROWSIZE(X) (X > 32*1024*1024 ? X : 32*1024*1024)
54 :
55 : static MT_Lock errorlock = MT_LOCK_INITIALIZER(errorlock);
56 :
57 : static BAT *
58 10336 : void_bat_create(int adt, BUN nr)
59 : {
60 10336 : BAT *b = COLnew(0, adt, nr, TRANSIENT);
61 :
62 : /* check for correct structures */
63 10336 : if (b == NULL)
64 : return NULL;
65 10336 : if ((b = BATsetaccess(b, BAT_APPEND)) == NULL) {
66 : return NULL;
67 : }
68 :
69 : /* disable all properties here */
70 10336 : b->tsorted = false;
71 10336 : b->trevsorted = false;
72 10336 : b->tnosorted = 0;
73 10336 : b->tnorevsorted = 0;
74 10336 : b->tseqbase = oid_nil;
75 10336 : b->tkey = false;
76 10336 : b->tnokey[0] = 0;
77 10336 : b->tnokey[1] = 0;
78 10336 : return b;
79 : }
80 :
81 : void
82 124749 : TABLETdestroy_format(Tablet *as)
83 : {
84 124749 : BUN p;
85 124749 : Column *fmt = as->format;
86 :
87 596327 : for (p = 0; p < as->nr_attrs; p++) {
88 471578 : BBPreclaim(fmt[p].c);
89 471575 : if (fmt[p].data)
90 10341 : GDKfree(fmt[p].data);
91 : }
92 124749 : GDKfree(fmt);
93 124749 : }
94 :
95 : static oid
96 123628 : check_BATs(Tablet *as)
97 : {
98 123628 : Column *fmt = as->format;
99 123628 : BUN i = 0;
100 123628 : BUN cnt;
101 123628 : oid base;
102 :
103 123628 : if (fmt[i].c == NULL)
104 123629 : i++;
105 123628 : cnt = BATcount(fmt[i].c);
106 123628 : base = fmt[i].c->hseqbase;
107 :
108 123628 : if (as->nr != cnt) {
109 6653 : for (i = 0; i < as->nr_attrs; i++)
110 5959 : if (fmt[i].c)
111 5265 : fmt[i].p = as->offset;
112 694 : return oid_nil;
113 : }
114 :
115 578196 : for (i = 0; i < as->nr_attrs; i++) {
116 455262 : BAT *b = fmt[i].c;
117 :
118 455262 : if (b == NULL)
119 122931 : continue;
120 :
121 332331 : if (BATcount(b) != cnt || b->hseqbase != base)
122 0 : return oid_nil;
123 :
124 332331 : fmt[i].p = as->offset;
125 : }
126 : return base;
127 : }
128 :
129 : str
130 1120 : TABLETcreate_bats(Tablet *as, BUN est)
131 : {
132 1120 : Column *fmt = as->format;
133 1120 : BUN i, nr = 0;
134 :
135 11462 : for (i = 0; i < as->nr_attrs; i++) {
136 10342 : if (fmt[i].skip)
137 6 : continue;
138 10336 : fmt[i].c = void_bat_create(fmt[i].adt, est);
139 10336 : if (!fmt[i].c) {
140 0 : while (i > 0) {
141 0 : if (!fmt[--i].skip) {
142 0 : BBPreclaim(fmt[i].c);
143 0 : fmt[i].c = NULL;
144 : }
145 : }
146 0 : throw(SQL, "copy", "Failed to create bat of size " BUNFMT "\n",
147 : est);
148 : }
149 10336 : fmt[i].ci = bat_iterator_nolock(fmt[i].c);
150 10336 : nr++;
151 : }
152 1120 : if (!nr)
153 0 : throw(SQL, "copy",
154 : "At least one column should be read from the input\n");
155 : return MAL_SUCCEED;
156 : }
157 :
158 : str
159 1095 : TABLETcollect(BAT **bats, Tablet *as)
160 : {
161 1095 : Column *fmt = as->format;
162 1095 : BUN i, j;
163 1095 : BUN cnt = 0;
164 :
165 1095 : if (bats == NULL)
166 0 : throw(SQL, "copy", "Missing container");
167 2594 : for (i = 0; i < as->nr_attrs && !cnt; i++)
168 1499 : if (!fmt[i].skip)
169 1496 : cnt = BATcount(fmt[i].c);
170 11344 : for (i = 0, j = 0; i < as->nr_attrs; i++) {
171 10249 : if (fmt[i].skip)
172 6 : continue;
173 10243 : bats[j] = fmt[i].c;
174 10243 : BBPfix(bats[j]->batCacheid);
175 10243 : if ((fmt[i].c = BATsetaccess(fmt[i].c, BAT_READ)) == NULL)
176 0 : throw(SQL, "copy",
177 : "Failed to set access at tablet part " BUNFMT "\n", cnt);
178 10243 : fmt[i].c->tsorted = fmt[i].c->trevsorted = false;
179 10243 : fmt[i].c->tkey = false;
180 10243 : BATsettrivprop(fmt[i].c);
181 :
182 10243 : if (cnt != BATcount(fmt[i].c))
183 0 : throw(SQL, "copy", "Count " BUNFMT " differs from " BUNFMT "\n",
184 : BATcount(fmt[i].c), cnt);
185 10243 : j++;
186 : }
187 : return MAL_SUCCEED;
188 : }
189 :
190 : // the starting quote character has already been skipped
191 :
192 : static char *
193 4880613 : tablet_skip_string(char *s, char quote, bool escape)
194 : {
195 4880613 : size_t i = 0, j = 0;
196 111296852 : while (s[i]) {
197 111252864 : if (escape && s[i] == '\\' && s[i + 1] != '\0')
198 504234 : s[j++] = s[i++];
199 110748630 : else if (s[i] == quote) {
200 3540469 : if (s[i + 1] != quote)
201 : break;
202 : i++; /* skip the first quote */
203 : }
204 106416239 : s[j++] = s[i++];
205 : }
206 4880613 : assert(s[i] == quote || s[i] == '\0');
207 4880613 : if (s[i] == 0)
208 : return NULL;
209 4880613 : s[j] = 0;
210 4880613 : return s + i;
211 : }
212 :
213 : static int
214 0 : TABLET_error(stream *s)
215 : {
216 0 : const char *err = mnstr_peek_error(s);
217 0 : if (err)
218 0 : TRC_ERROR(MAL_SERVER, "Stream error: %s\n", err);
219 0 : return -1;
220 : }
221 :
222 : /* The output line is first built before being sent. It solves a problem
223 : with UDP, where you may loose most of the information using short writes
224 : */
225 : static inline int
226 0 : output_line(char **buf, size_t *len, char **localbuf, size_t *locallen,
227 : Column *fmt, stream *fd, BUN nr_attrs, oid id)
228 : {
229 0 : BUN i;
230 0 : ssize_t fill = 0;
231 :
232 0 : for (i = 0; i < nr_attrs; i++) {
233 0 : if (fmt[i].c == NULL)
234 0 : continue;
235 0 : if (id < fmt[i].c->hseqbase
236 0 : || id >= fmt[i].c->hseqbase + BATcount(fmt[i].c))
237 : break;
238 0 : fmt[i].p = id - fmt[i].c->hseqbase;
239 : }
240 0 : if (i == nr_attrs) {
241 0 : for (i = 0; i < nr_attrs; i++) {
242 0 : Column *f = fmt + i;
243 0 : const char *p;
244 0 : ssize_t l;
245 :
246 0 : if (f->c) {
247 0 : p = BUNtail(f->ci, f->p);
248 :
249 0 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
250 0 : p = f->nullstr;
251 0 : l = (ssize_t) strlen(f->nullstr);
252 : } else {
253 0 : l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
254 0 : if (l < 0)
255 : return -1;
256 0 : p = *localbuf;
257 : }
258 0 : if (fill + l + f->seplen >= (ssize_t) * len) {
259 : /* extend the buffer */
260 0 : char *nbuf;
261 0 : nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
262 0 : if (nbuf == NULL)
263 : return -1; /* *buf freed by caller */
264 0 : *buf = nbuf;
265 0 : *len = fill + l + f->seplen + BUFSIZ;
266 : }
267 0 : strncpy(*buf + fill, p, l);
268 0 : fill += l;
269 : }
270 0 : strncpy(*buf + fill, f->sep, f->seplen);
271 0 : fill += f->seplen;
272 : }
273 : }
274 0 : if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
275 0 : return TABLET_error(fd);
276 : return 0;
277 : }
278 :
279 : static inline int
280 6208551 : output_line_dense(char **buf, size_t *len, char **localbuf, size_t *locallen,
281 : Column *fmt, stream *fd, BUN nr_attrs)
282 : {
283 6208551 : BUN i;
284 6208551 : ssize_t fill = 0;
285 :
286 27920683 : for (i = 0; i < nr_attrs; i++) {
287 21712131 : Column *f = fmt + i;
288 21712131 : const char *p;
289 21712131 : ssize_t l;
290 :
291 21712131 : if (f->c) {
292 15503580 : p = BUNtail(f->ci, f->p);
293 :
294 15503580 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
295 851369 : p = f->nullstr;
296 851369 : l = (ssize_t) strlen(p);
297 : } else {
298 14652211 : l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
299 14652212 : if (l < 0)
300 : return -1;
301 14652212 : p = *localbuf;
302 : }
303 15503581 : if (fill + l + f->seplen >= (ssize_t) * len) {
304 : /* extend the buffer */
305 78 : char *nbuf;
306 78 : nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
307 78 : if (nbuf == NULL)
308 : return -1; /* *buf freed by caller */
309 78 : *buf = nbuf;
310 78 : *len = fill + l + f->seplen + BUFSIZ;
311 : }
312 15503581 : strncpy(*buf + fill, p, l);
313 15503581 : fill += l;
314 15503581 : f->p++;
315 : }
316 21712132 : strncpy(*buf + fill, f->sep, f->seplen);
317 21712132 : fill += f->seplen;
318 : }
319 6208552 : if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
320 0 : return TABLET_error(fd);
321 : return 0;
322 : }
323 :
324 : static inline int
325 0 : output_line_lookup(char **buf, size_t *len, Column *fmt, stream *fd,
326 : BUN nr_attrs, oid id)
327 : {
328 0 : BUN i;
329 :
330 0 : for (i = 0; i < nr_attrs; i++) {
331 0 : Column *f = fmt + i;
332 :
333 0 : if (f->c) {
334 0 : const void *p = BUNtail(f->ci, id - f->c->hseqbase);
335 :
336 0 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
337 0 : size_t l = strlen(f->nullstr);
338 0 : if (mnstr_write(fd, f->nullstr, 1, l) != (ssize_t) l)
339 0 : return TABLET_error(fd);
340 : } else {
341 0 : ssize_t l = f->tostr(f->extra, buf, len, f->adt, p);
342 :
343 0 : if (l < 0 || mnstr_write(fd, *buf, 1, l) != l)
344 0 : return TABLET_error(fd);
345 : }
346 : }
347 0 : if (mnstr_write(fd, f->sep, 1, f->seplen) != f->seplen)
348 0 : return TABLET_error(fd);
349 : }
350 : return 0;
351 : }
352 :
353 : /*
354 : * Fast Load
355 : * To speedup the CPU intensive loading of files we have to break
356 : * the file into pieces and perform parallel analysis. Experimentation
357 : * against lineitem SF1 showed that half of the time goes into very
358 : * basis atom analysis (41 out of 102 B instructions).
359 : * Furthermore, the actual insertion into the BATs takes only
360 : * about 10% of the total. With multi-core processors around
361 : * it seems we can gain here significantly.
362 : *
363 : * The approach taken is to fork a parallel scan over the text file.
364 : * We assume that the blocked stream is already
365 : * positioned correctly at the reading position. The start and limit
366 : * indicates the byte range to search for tuples.
367 : * If start> 0 then we first skip to the next record separator.
368 : * If necessary we read more than 'limit' bytes to ensure parsing a complete
369 : * record and stop at the record boundary.
370 : * Beware, we should allocate Tablet descriptors for each file segment,
371 : * otherwise we end up with a gross concurrency control problem.
372 : * The resulting BATs should be glued at the final phase.
373 : *
374 : * Raw Load
375 : * Front-ends can bypass most of the overhead in loading the BATs
376 : * by preparing the corresponding files directly and replace those
377 : * created by e.g. the SQL frontend.
378 : * This strategy is only advisable for cases where we have very
379 : * large files >200GB and/or are created by a well debugged code.
380 : *
381 : * To experiment with this approach, the code base responds
382 : * on negative number of cores by dumping the data directly in BAT
383 : * storage format into a collections of files on disk.
384 : * It reports on the actions to be taken to replace BATs.
385 : * This technique is initially only supported for fixed-sized columns.
386 : * The rawmode() indicator acts as the internal switch.
387 : */
388 :
389 : /*
390 : * To speed up loading ascii files we have to determine the number of blocks.
391 : * This depends on the number of cores available.
392 : * For the time being we hardwire this decision based on our own
393 : * platforms.
394 : * Furthermore, we only consider parallel load for file-based requests.
395 : *
396 : * To simplify our world, we assume a single producer process.
397 : */
398 :
399 : static int
400 0 : output_file_default(Tablet *as, BAT *order, stream *fd, bstream *in)
401 : {
402 0 : size_t len = BUFSIZ, locallen = BUFSIZ;
403 0 : int res = 0;
404 0 : char *buf = GDKmalloc(len);
405 0 : char *localbuf = GDKmalloc(len);
406 0 : BUN p, q;
407 0 : oid id;
408 0 : BUN offset = as->offset;
409 :
410 0 : if (buf == NULL || localbuf == NULL) {
411 0 : GDKfree(buf);
412 0 : GDKfree(localbuf);
413 0 : return -1;
414 : }
415 0 : for (q = offset + as->nr, p = offset, id = order->hseqbase + offset; p < q;
416 0 : p++, id++) {
417 0 : if (((p - offset) & 8191) == 8191 && bstream_getoob(in)) {
418 : res = -5;
419 : break;
420 : }
421 0 : if ((res = output_line(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs, id)) < 0) {
422 : break;
423 : }
424 : }
425 0 : GDKfree(localbuf);
426 0 : GDKfree(buf);
427 0 : return res;
428 : }
429 :
430 : static int
431 123628 : output_file_dense(Tablet *as, stream *fd, bstream *in)
432 : {
433 123628 : size_t len = BUFSIZ, locallen = BUFSIZ;
434 123628 : int res = 0;
435 123628 : char *buf = GDKmalloc(len);
436 123629 : char *localbuf = GDKmalloc(len);
437 123628 : BUN i = 0;
438 :
439 123628 : if (buf == NULL || localbuf == NULL) {
440 0 : GDKfree(buf);
441 0 : GDKfree(localbuf);
442 0 : return -1;
443 : }
444 6332180 : for (i = 0; i < as->nr; i++) {
445 6208551 : if ((i & 8191) == 8191 && bstream_getoob(in)) {
446 : res = -5; /* "Query aborted" */
447 : break;
448 : }
449 6208551 : if ((res = output_line_dense(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs)) < 0) {
450 : break;
451 : }
452 : }
453 123629 : GDKfree(localbuf);
454 123629 : GDKfree(buf);
455 123629 : return res;
456 : }
457 :
458 : static int
459 0 : output_file_ordered(Tablet *as, BAT *order, stream *fd, bstream *in)
460 : {
461 0 : size_t len = BUFSIZ;
462 0 : int res = 0;
463 0 : char *buf = GDKmalloc(len);
464 0 : BUN p, q;
465 0 : BUN i = 0;
466 0 : BUN offset = as->offset;
467 :
468 0 : if (buf == NULL)
469 : return -1;
470 0 : for (q = offset + as->nr, p = offset; p < q; p++, i++) {
471 0 : oid h = order->hseqbase + p;
472 :
473 0 : if (((p - offset) & 8191) == 8191 && bstream_getoob(in)) {
474 : res = -5;
475 : break;
476 : }
477 0 : if ((res = output_line_lookup(&buf, &len, as->format, fd, as->nr_attrs, h)) < 0) {
478 0 : GDKfree(buf);
479 0 : return res;
480 : }
481 : }
482 0 : GDKfree(buf);
483 0 : return res;
484 : }
485 :
486 : int
487 123629 : TABLEToutput_file(Tablet *as, BAT *order, stream *s, bstream *in)
488 : {
489 123629 : oid base = oid_nil;
490 123629 : int ret = 0;
491 :
492 : /* only set nr if it is zero or lower (bogus) to the maximum value
493 : * possible (BATcount), if already set within BATcount range,
494 : * preserve value such that for instance SQL's reply_size still
495 : * works
496 : */
497 123629 : if (order) {
498 0 : BUN maxnr = BATcount(order);
499 0 : if (as->nr == BUN_NONE || as->nr > maxnr)
500 0 : as->nr = maxnr;
501 : }
502 123629 : assert(as->nr != BUN_NONE);
503 :
504 123629 : base = check_BATs(as);
505 123628 : if (!order || !is_oid_nil(base)) {
506 123628 : if (!order || order->hseqbase == base)
507 123628 : ret = output_file_dense(as, s, in);
508 : else
509 0 : ret = output_file_ordered(as, order, s, in);
510 : } else {
511 0 : ret = output_file_default(as, order, s, in);
512 : }
513 123629 : return ret;
514 : }
515 :
516 : /*
517 : * Niels Nes, Martin Kersten
518 : *
519 : * Parallel bulk load for SQL
520 : * The COPY INTO command for SQL is heavily CPU bound, which means
521 : * that ideally we would like to exploit the multi-cores to do that
522 : * work in parallel.
523 : * Complicating factors are the initial record offset, the
524 : * possible variable length of the input, and the original sort order
525 : * that should preferable be maintained.
526 : *
527 : * The code below consists of a file reader, which breaks up the
528 : * file into chunks of distinct rows. Then multiple parallel threads
529 : * grab them, and break them on the field boundaries.
530 : * After all fields are identified this way, the columns are converted
531 : * and stored in the BATs.
532 : *
533 : * The threads get a reference to a private copy of the READERtask.
534 : * It includes a list of columns they should handle. This is a basis
535 : * to distributed cheap and expensive columns over threads.
536 : *
537 : * The file reader overlaps IO with updates of the BAT.
538 : * Also the buffer size of the block stream might be a little small for
539 : * this task (1MB). It has been increased to 8MB, which indeed improved.
540 : *
541 : * The work divider allocates subtasks to threads based on the
542 : * observed time spending so far.
543 : */
544 :
545 : #define BREAKROW 1
546 : #define UPDATEBAT 2
547 : #define ENDOFCOPY 3
548 :
549 : typedef struct {
550 : Client cntxt;
551 : int id; /* for self reference */
552 : int state; /* row break=1 , 2 = update bat */
553 : int workers; /* how many concurrent ones */
554 : int error; /* error during row break */
555 : int next;
556 : int limit;
557 : BUN cnt, maxrow; /* first row in file chunk. */
558 : lng skip; /* number of lines to be skipped */
559 : lng *time, wtime; /* time per col + time per thread */
560 : int rounds; /* how often did we divide the work */
561 : bool ateof; /* io control */
562 : bool from_stdin;
563 : bool escape; /* whether to handle \ escapes */
564 : bool besteffort;
565 : char quote;
566 : bstream *b;
567 : stream *out;
568 : MT_Id tid;
569 : MT_Sema producer; /* reader waits for call */
570 : MT_Sema consumer; /* reader waits for call */
571 : MT_Sema sema; /* threads wait for work , negative next implies exit */
572 : MT_Sema reply; /* let reader continue */
573 : Tablet *as;
574 : char *errbuf;
575 : const char *csep, *rsep;
576 : size_t seplen, rseplen;
577 :
578 : char *base[MAXBUFFERS], *input[MAXBUFFERS]; /* buffers for row splitter and tokenizer */
579 : size_t rowlimit[MAXBUFFERS]; /* determines maximal record length buffer */
580 : char **rows[MAXBUFFERS];
581 : lng *startlineno[MAXBUFFERS];
582 : int top[MAXBUFFERS]; /* number of rows in this buffer */
583 : int cur; /* current buffer used by splitter and update threads */
584 :
585 : int *cols; /* columns to handle */
586 : char ***fields;
587 : bte *rowerror;
588 : int errorcnt;
589 : bool aborted;
590 : bool set_qry_ctx;
591 : } READERtask;
592 :
593 : /* returns TRUE if there is/might be more */
594 : static bool
595 102801 : tablet_read_more(READERtask *task)
596 : {
597 102801 : bstream *in = task->b;
598 102801 : stream *out = task->out;
599 102801 : size_t n = task->b->size;
600 102801 : if (out) {
601 101175 : do {
602 : /* query is not finished ask for more */
603 : /* we need more query text */
604 101175 : if (bstream_next(in) < 0) {
605 0 : if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) {
606 0 : task->aborted = true;
607 0 : mnstr_clearerr(in->s);
608 : }
609 0 : return false;
610 : }
611 101175 : if (in->eof) {
612 101173 : if (bstream_getoob(in)) {
613 0 : task->aborted = true;
614 0 : return false;
615 : }
616 101173 : if (mnstr_write(out, PROMPT2, sizeof(PROMPT2) - 1, 1) == 1)
617 101173 : mnstr_flush(out, MNSTR_FLUSH_DATA);
618 101173 : in->eof = false;
619 : /* we need more query text */
620 101173 : if (bstream_next(in) < 0) {
621 0 : if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) {
622 0 : task->aborted = true;
623 0 : mnstr_clearerr(in->s);
624 : }
625 0 : return false;
626 : }
627 101173 : if (in->eof)
628 : return false;
629 : }
630 101173 : } while (in->len <= in->pos);
631 1626 : } else if (bstream_read(in, n) <= 0) {
632 : return false;
633 : }
634 : return true;
635 : }
636 :
637 : /* note, the column value that is passed here is the 0 based value; the
638 : * lineno value on the other hand is 1 based */
639 : static void
640 145 : tablet_error(READERtask *task, lng idx, lng lineno, int col, const char *msg,
641 : const char *fcn)
642 : {
643 145 : assert(is_int_nil(col) || col >= 0);
644 145 : assert(is_lng_nil(lineno) || lineno >= 1);
645 145 : MT_lock_set(&errorlock);
646 145 : if (task->cntxt->error_row != NULL
647 145 : && (BUNappend(task->cntxt->error_row, &lineno, false) != GDK_SUCCEED
648 145 : || BUNappend(task->cntxt->error_fld, &(int) { col + 1 },
649 : false) != GDK_SUCCEED
650 145 : || BUNappend(task->cntxt->error_msg, msg, false) != GDK_SUCCEED
651 145 : || BUNappend(task->cntxt->error_input, fcn,
652 : false) != GDK_SUCCEED)) {
653 0 : task->besteffort = false;
654 : }
655 145 : if (!is_lng_nil(idx) && task->rowerror && idx < task->limit)
656 145 : task->rowerror[idx]++;
657 145 : if (task->as->error == NULL) {
658 62 : const char *colnam = is_int_nil(col) || col < 0
659 31 : || (BUN) col >= task->as->nr_attrs ? NULL : task->as->format[col].name;
660 31 : if (msg == NULL) {
661 0 : task->besteffort = false;
662 31 : } else if (!is_lng_nil(lineno)) {
663 31 : if (!is_int_nil(col)) {
664 29 : if (colnam)
665 29 : task->as->error = createException(MAL, "sql.copy_from",
666 : "line " LLFMT ": column %d %s: %s",
667 : lineno, col + 1, colnam, msg);
668 : else
669 0 : task->as->error = createException(MAL, "sql.copy_from",
670 : "line " LLFMT ": column %d: %s",
671 : lineno, col + 1, msg);
672 : } else {
673 2 : task->as->error = createException(MAL, "sql.copy_from",
674 : "line " LLFMT ": %s", lineno, msg);
675 : }
676 : } else {
677 0 : if (!is_int_nil(col)) {
678 0 : if (colnam)
679 0 : task->as->error = createException(MAL, "sql.copy_from",
680 : "column %d %s: %s", col + 1, colnam,
681 : msg);
682 : else
683 0 : task->as->error = createException(MAL, "sql.copy_from",
684 : "column %d: %s", col + 1, msg);
685 : } else {
686 0 : task->as->error = createException(MAL, "sql.copy_from", "%s", msg);
687 : }
688 : }
689 : }
690 145 : task->errorcnt++;
691 145 : MT_lock_unset(&errorlock);
692 145 : }
693 :
694 : /*
695 : * The row is broken into pieces directly on their field separators. It assumes that we have
696 : * the record in the cache already, so we can do most work quickly.
697 : * Furthermore, it assume a uniform (SQL) pattern, without whitespace skipping, but with quote and separator.
698 : */
699 :
700 : static size_t
701 559 : mystrlen(const char *s)
702 : {
703 : /* Calculate and return the space that is needed for the function
704 : * mycpstr below to do its work. */
705 559 : size_t len = 0;
706 559 : const char *s0 = s;
707 :
708 44372 : while (*s) {
709 43813 : if ((*s & 0x80) == 0) {
710 : ;
711 6 : } else if ((*s & 0xC0) == 0x80) {
712 : /* continuation byte */
713 0 : len += 3;
714 6 : } else if ((*s & 0xE0) == 0xC0) {
715 : /* two-byte sequence */
716 6 : if ((s[1] & 0xC0) != 0x80)
717 0 : len += 3;
718 : else
719 6 : s += 2;
720 0 : } else if ((*s & 0xF0) == 0xE0) {
721 : /* three-byte sequence */
722 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
723 0 : len += 3;
724 : else
725 0 : s += 3;
726 0 : } else if ((*s & 0xF8) == 0xF0) {
727 : /* four-byte sequence */
728 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
729 0 : || (s[3] & 0xC0) != 0x80)
730 0 : len += 3;
731 : else
732 0 : s += 4;
733 : } else {
734 : /* not a valid start byte */
735 0 : len += 3;
736 : }
737 43813 : s++;
738 : }
739 559 : len += s - s0;
740 559 : return len;
741 : }
742 :
743 : static char *
744 847 : mycpstr(char *t, const char *s)
745 : {
746 : /* Copy the string pointed to by s into the buffer pointed to by
747 : * t, and return a pointer to the NULL byte at the end. During
748 : * the copy we translate incorrect UTF-8 sequences to escapes
749 : * looking like <XX> where XX is the hexadecimal representation of
750 : * the incorrect byte. The buffer t needs to be large enough to
751 : * hold the result, but the correct length can be calculated by
752 : * the function mystrlen above.*/
753 44954 : while (*s) {
754 44107 : if ((*s & 0x80) == 0) {
755 44101 : *t++ = *s++;
756 6 : } else if ((*s & 0xC0) == 0x80) {
757 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
758 6 : } else if ((*s & 0xE0) == 0xC0) {
759 : /* two-byte sequence */
760 6 : if ((s[1] & 0xC0) != 0x80)
761 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
762 : else {
763 6 : *t++ = *s++;
764 6 : *t++ = *s++;
765 : }
766 0 : } else if ((*s & 0xF0) == 0xE0) {
767 : /* three-byte sequence */
768 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
769 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
770 : else {
771 0 : *t++ = *s++;
772 0 : *t++ = *s++;
773 0 : *t++ = *s++;
774 : }
775 0 : } else if ((*s & 0xF8) == 0xF0) {
776 : /* four-byte sequence */
777 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
778 0 : || (s[3] & 0xC0) != 0x80)
779 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
780 : else {
781 0 : *t++ = *s++;
782 0 : *t++ = *s++;
783 0 : *t++ = *s++;
784 0 : *t++ = *s++;
785 : }
786 : } else {
787 : /* not a valid start byte */
788 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
789 : }
790 : }
791 847 : *t = 0;
792 847 : return t;
793 : }
794 :
795 : static str
796 143 : SQLload_error(READERtask *task, lng idx, BUN attrs)
797 : {
798 143 : str line;
799 143 : char *s;
800 143 : size_t sz = 0;
801 143 : BUN i;
802 :
803 574 : for (i = 0; i < attrs; i++) {
804 431 : if (task->fields[i][idx])
805 421 : sz += mystrlen(task->fields[i][idx]);
806 431 : sz += task->seplen;
807 : }
808 :
809 143 : s = line = GDKmalloc(sz + task->rseplen + 1);
810 143 : if (line == NULL) {
811 0 : tablet_error(task, idx, lng_nil, int_nil, "SQLload malloc error",
812 : "SQLload_error");
813 0 : return NULL;
814 : }
815 574 : for (i = 0; i < attrs; i++) {
816 431 : if (task->fields[i][idx])
817 421 : s = mycpstr(s, task->fields[i][idx]);
818 431 : if (i < attrs - 1)
819 288 : s = mycpstr(s, task->csep);
820 : }
821 143 : strcpy(s, task->rsep);
822 143 : return line;
823 : }
824 :
825 : /*
826 : * The parsing of the individual values is straightforward. If the value represents
827 : * the null-replacement string then we grab the underlying nil.
828 : * If the string starts with the quote identified from SQL, we locate the tail
829 : * and interpret the body.
830 : *
831 : * If inserting fails, we return -1; if the value cannot be parsed, we
832 : * return -1 if besteffort is not set, otherwise we return 0, but in
833 : * either case an entry is added to the error table.
834 : */
835 : static inline int
836 331753489 : SQLinsert_val(READERtask *task, int col, int idx)
837 : {
838 331753489 : Column *fmt = task->as->format + col;
839 331753489 : const void *adt;
840 331753489 : char buf[BUFSIZ];
841 331753489 : char *s = task->fields[col][idx];
842 331753489 : char *err = NULL;
843 331753489 : int ret = 0;
844 :
845 : /* include testing on the terminating null byte !! */
846 331753489 : if (s == NULL) {
847 6520380 : adt = fmt->nildata;
848 6520380 : fmt->c->tnonil = false;
849 : } else {
850 325233109 : if (task->escape) {
851 325003122 : size_t slen = strlen(s) + 1;
852 325003122 : char *data = slen <= sizeof(buf) ? buf : GDKmalloc(strlen(s) + 1);
853 18 : if (data == NULL
854 345043660 : || GDKstrFromStr((unsigned char *) data, (unsigned char *) s,
855 325003122 : strlen(s), '\0') < 0)
856 : adt = NULL;
857 : else
858 345043658 : adt = fmt->frstr(fmt, fmt->adt, data);
859 347641620 : if (data != buf)
860 18 : GDKfree(data);
861 : } else
862 229987 : adt = fmt->frstr(fmt, fmt->adt, s);
863 : }
864 :
865 354392051 : lng row = task->cnt + idx + 1;
866 354392051 : if (adt == NULL) {
867 138 : if (task->rowerror) {
868 138 : err = SQLload_error(task, idx, task->as->nr_attrs);
869 138 : if (s) {
870 138 : size_t slen = mystrlen(s);
871 138 : char *scpy = GDKmalloc(slen + 1);
872 138 : if (scpy == NULL) {
873 0 : tablet_error(task, idx, row, col,
874 : SQLSTATE(HY013) MAL_MALLOC_FAIL, err);
875 0 : task->besteffort = false; /* no longer best effort */
876 0 : GDKfree(err);
877 0 : return -1;
878 : }
879 138 : mycpstr(scpy, s);
880 138 : s = scpy;
881 : }
882 138 : snprintf(buf, sizeof(buf), "'%s' expected%s%s%s", fmt->type,
883 : s ? " in '" : "", s ? s : "", s ? "'" : "");
884 138 : GDKfree(s);
885 138 : tablet_error(task, idx, row, col, buf, err);
886 138 : GDKfree(err);
887 138 : if (!task->besteffort)
888 : return -1;
889 : }
890 117 : ret = -!task->besteffort; /* yep, two unary operators ;-) */
891 : /* replace it with a nil */
892 117 : adt = fmt->nildata;
893 117 : fmt->c->tnonil = false;
894 : }
895 354392030 : if (bunfastapp(fmt->c, adt) == GDK_SUCCEED)
896 : return ret;
897 :
898 : /* failure */
899 0 : if (task->rowerror) {
900 0 : char *msg = GDKerrbuf;
901 0 : err = SQLload_error(task, idx, task->as->nr_attrs);
902 0 : tablet_error(task, idx, row, col, msg
903 0 : && *msg ? msg : "insert failed", err);
904 0 : GDKfree(err);
905 : }
906 0 : task->besteffort = false; /* no longer best effort */
907 0 : return -1;
908 : }
909 :
910 : static int
911 327766 : SQLworker_column(READERtask *task, int col)
912 : {
913 327766 : int i;
914 327766 : Column *fmt = task->as->format;
915 :
916 327766 : if (fmt[col].c == NULL)
917 : return 0;
918 :
919 : /* watch out for concurrent threads */
920 327760 : MT_lock_set(&mal_copyLock);
921 327768 : if (!fmt[col].skip
922 327768 : && BATcapacity(fmt[col].c) < BATcount(fmt[col].c) + task->next) {
923 289 : if (BATextend(fmt[col].c, BATgrows(fmt[col].c) + task->limit) != GDK_SUCCEED) {
924 0 : tablet_error(task, lng_nil, lng_nil, col,
925 : "Failed to extend the BAT\n", "SQLworker_column");
926 0 : MT_lock_unset(&mal_copyLock);
927 0 : return -1;
928 : }
929 : }
930 327768 : MT_lock_unset(&mal_copyLock);
931 :
932 332960000 : for (i = 0; i < task->top[task->cur]; i++) {
933 332304559 : if (!fmt[col].skip && SQLinsert_val(task, col, i) < 0) {
934 21 : BATsetcount(fmt[col].c, BATcount(fmt[col].c));
935 21 : return -1;
936 : }
937 : }
938 327673 : BATsetcount(fmt[col].c, BATcount(fmt[col].c));
939 :
940 327673 : return 0;
941 : }
942 :
943 : /*
944 : * The rows are broken on the column separator. Any error is shown and reflected with
945 : * setting the reference of the offending row fields to NULL.
946 : * This allows the loading to continue, skipping the minimal number of rows.
947 : * The details about the locations can be inspected from the error table.
948 : * We also trim the quotes around strings.
949 : */
950 : static int
951 140314488 : SQLload_parse_row(READERtask *task, int idx)
952 : {
953 140314488 : BUN i;
954 140314488 : char errmsg[BUFSIZ];
955 140314488 : char ch = *task->csep;
956 140314488 : char *row = task->rows[task->cur][idx];
957 140314488 : lng startlineno = task->startlineno[task->cur][idx];
958 140314488 : Tablet *as = task->as;
959 140314488 : Column *fmt = as->format;
960 140314488 : bool error = false;
961 140314488 : str errline = NULL;
962 :
963 140314488 : assert(idx < task->top[task->cur]);
964 140314488 : assert(row);
965 140314488 : errmsg[0] = 0;
966 :
967 140314488 : if (task->quote || task->seplen != 1) {
968 13374024 : for (i = 0; i < as->nr_attrs; i++) {
969 10530899 : bool quote = false;
970 10530899 : task->fields[i][idx] = row;
971 : /* recognize fields starting with a quote, keep them */
972 10530899 : if (*row && *row == task->quote) {
973 4480754 : quote = true;
974 4480754 : task->fields[i][idx] = row + 1;
975 4480754 : row = tablet_skip_string(row + 1, task->quote, task->escape);
976 :
977 4939980 : if (!row) {
978 0 : errline = SQLload_error(task, idx, i + 1);
979 0 : snprintf(errmsg, BUFSIZ, "Quote (%c) missing", task->quote);
980 0 : tablet_error(task, idx, startlineno, (int) i, errmsg,
981 : errline);
982 0 : GDKfree(errline);
983 0 : error = true;
984 0 : goto errors1;
985 : } else
986 4939980 : *row++ = 0;
987 : }
988 :
989 : /* eat away the column separator */
990 56538856 : for (; *row; row++)
991 55235944 : if (*row == '\\' && task->escape) {
992 2 : if (row[1])
993 2 : row++;
994 55235942 : } else if (*row == ch
995 9687213 : && (task->seplen == 1
996 4 : || strncmp(row, task->csep,
997 : task->seplen) == 0)) {
998 9687213 : *row = 0;
999 9687213 : row += task->seplen;
1000 9687213 : goto endoffieldcheck;
1001 : }
1002 :
1003 : /* not enough fields */
1004 1302912 : if (i < as->nr_attrs - 1) {
1005 0 : errline = SQLload_error(task, idx, i + 1);
1006 : /* it's the next value that is missing */
1007 0 : tablet_error(task, idx, startlineno, (int) i + 1,
1008 : "Column value missing", errline);
1009 0 : GDKfree(errline);
1010 0 : error = true;
1011 0 : errors1:
1012 : /* we save all errors detected as NULL values */
1013 0 : for (; i < as->nr_attrs; i++)
1014 0 : task->fields[i][idx] = NULL;
1015 0 : i--;
1016 : }
1017 1302912 : endoffieldcheck:
1018 10990125 : ;
1019 : /* check for user defined NULL string */
1020 10990125 : if ((!quote || !fmt->null_length) && fmt->nullstr
1021 9444675 : && task->fields[i][idx]
1022 9444675 : && GDKstrcasecmp(task->fields[i][idx], fmt->nullstr) == 0)
1023 2224781 : task->fields[i][idx] = 0;
1024 : }
1025 : } else {
1026 : assert(!task->quote);
1027 : assert(task->seplen == 1);
1028 372982794 : for (i = 0; i < as->nr_attrs; i++) {
1029 236824887 : task->fields[i][idx] = row;
1030 :
1031 : /* eat away the column separator */
1032 2095038465 : for (; *row; row++)
1033 1958412928 : if (*row == '\\' && task->escape) {
1034 382 : if (row[1])
1035 382 : row++;
1036 1958412546 : } else if (*row == ch) {
1037 100199350 : *row = 0;
1038 100199350 : row++;
1039 100199350 : goto endoffield2;
1040 : }
1041 :
1042 : /* not enough fields */
1043 136625537 : if (i < as->nr_attrs - 1) {
1044 4 : errline = SQLload_error(task, idx, i + 1);
1045 : /* it's the next value that is missing */
1046 4 : tablet_error(task, idx, startlineno, (int) i + 1,
1047 : "Column value missing", errline);
1048 4 : GDKfree(errline);
1049 4 : error = true;
1050 : /* we save all errors detected */
1051 16 : for (; i < as->nr_attrs; i++)
1052 8 : task->fields[i][idx] = NULL;
1053 4 : i--;
1054 : }
1055 136625533 : endoffield2:
1056 236824887 : ;
1057 : /* check for user defined NULL string */
1058 236824887 : if (fmt->nullstr && task->fields[i][idx]
1059 221895250 : && GDKstrcasecmp(task->fields[i][idx], fmt->nullstr) == 0) {
1060 3891128 : task->fields[i][idx] = 0;
1061 : }
1062 : }
1063 : }
1064 : /* check for too many values as well */
1065 139001032 : if (row && *row && i == as->nr_attrs) {
1066 1 : errline = SQLload_error(task, idx, task->as->nr_attrs);
1067 1 : snprintf(errmsg, BUFSIZ, "Leftover data '%s'", row);
1068 1 : tablet_error(task, idx, startlineno, (int) i, errmsg, errline);
1069 1 : GDKfree(errline);
1070 1 : error = true;
1071 : }
1072 139001032 : return error ? -1 : 0;
1073 : }
1074 :
1075 : static void
1076 1293 : SQLworker(void *arg)
1077 : {
1078 1293 : READERtask *task = (READERtask *) arg;
1079 1293 : unsigned int i;
1080 1293 : int j, piece;
1081 1293 : lng t0;
1082 :
1083 1293 : GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
1084 1293 : GDKclrerr();
1085 1293 : task->errbuf = GDKerrbuf;
1086 1292 : MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
1087 :
1088 1290 : MT_sema_down(&task->sema);
1089 611147 : while (task->top[task->cur] >= 0) {
1090 : /* stage one, break the rows spread the work over the workers */
1091 611147 : switch (task->state) {
1092 304943 : case BREAKROW:
1093 304943 : t0 = GDKusec();
1094 304977 : piece = (task->top[task->cur] + task->workers) / task->workers;
1095 :
1096 304977 : for (j = piece * task->id;
1097 139373667 : j < task->top[task->cur] && j < piece * (task->id + 1); j++)
1098 139068878 : if (task->rows[task->cur][j]) {
1099 139068878 : if (SQLload_parse_row(task, j) < 0) {
1100 5 : task->errorcnt++;
1101 : // early break unless best effort
1102 5 : if (!task->besteffort) {
1103 2 : for (j++;
1104 4 : j < task->top[task->cur]
1105 4 : && j < piece * (task->id + 1); j++)
1106 8 : for (i = 0; i < task->as->nr_attrs; i++)
1107 6 : task->fields[i][j] = NULL;
1108 : break;
1109 : }
1110 : }
1111 : }
1112 304791 : task->wtime = GDKusec() - t0;
1113 304461 : break;
1114 304911 : case UPDATEBAT:
1115 304911 : if (!task->besteffort && task->errorcnt)
1116 : break;
1117 : /* stage two, updating the BATs */
1118 1261412 : for (i = 0; i < task->as->nr_attrs; i++)
1119 956583 : if (task->cols[i]) {
1120 327773 : t0 = GDKusec();
1121 327770 : if (SQLworker_column(task, task->cols[i] - 1) < 0)
1122 : break;
1123 327696 : t0 = GDKusec() - t0;
1124 327693 : task->time[i] += t0;
1125 327693 : task->wtime += t0;
1126 : }
1127 : break;
1128 1293 : case ENDOFCOPY:
1129 1293 : MT_sema_up(&task->reply);
1130 1293 : goto do_return;
1131 : }
1132 609313 : MT_sema_up(&task->reply);
1133 1216685 : MT_sema_down(&task->sema);
1134 : }
1135 0 : MT_sema_up(&task->reply);
1136 :
1137 1293 : do_return:
1138 1293 : GDKfree(GDKerrbuf);
1139 1293 : GDKsetbuf(NULL);
1140 1293 : MT_thread_set_qry_ctx(NULL);
1141 1293 : }
1142 :
1143 : static void
1144 103394 : SQLworkdivider(READERtask *task, READERtask *ptask, int nr_attrs, int threads)
1145 : {
1146 103394 : int i, j, mi;
1147 103394 : lng loc[MAXWORKERS];
1148 :
1149 : /* after a few rounds we stick to the work assignment */
1150 103394 : if (task->rounds > 8)
1151 103255 : return;
1152 : /* simple round robin the first time */
1153 2787 : if (threads == 1 || task->rounds++ == 0) {
1154 16797 : for (i = j = 0; i < nr_attrs; i++, j++)
1155 14149 : ptask[j % threads].cols[i] = task->cols[i];
1156 : return;
1157 : }
1158 139 : memset(loc, 0, sizeof(loc));
1159 : /* use of load directives */
1160 2054 : for (i = 0; i < nr_attrs; i++)
1161 7600 : for (j = 0; j < threads; j++)
1162 5685 : ptask[j].cols[i] = 0;
1163 :
1164 : /* now allocate the work to the threads */
1165 2054 : for (i = 0; i < nr_attrs; i++, j++) {
1166 : mi = 0;
1167 5685 : for (j = 1; j < threads; j++)
1168 3770 : if (loc[j] < loc[mi])
1169 1604 : mi = j;
1170 :
1171 1915 : ptask[mi].cols[i] = task->cols[i];
1172 1915 : loc[mi] += task->time[i];
1173 : }
1174 : /* reset the timer */
1175 2054 : for (i = 0; i < nr_attrs; i++, j++)
1176 1915 : task->time[i] = 0;
1177 : }
1178 :
1179 : /*
1180 : * Reading is handled by a separate task as a preparation for more parallelism.
1181 : * A buffer is filled with proper rows.
1182 : * If we are reading from a file then a double buffering scheme ia activated.
1183 : * Reading from the console (stdin) remains single buffered only.
1184 : * If we end up with unfinished records, then the rowlimit will terminate the process.
1185 : */
1186 :
1187 : typedef unsigned char (*dfa_t)[256];
1188 :
1189 : static dfa_t
1190 1120 : mkdfa(const unsigned char *sep, size_t seplen)
1191 : {
1192 1120 : dfa_t dfa;
1193 1120 : size_t i, j, k;
1194 :
1195 1120 : dfa = GDKzalloc(seplen * sizeof(*dfa));
1196 1120 : if (dfa == NULL)
1197 : return NULL;
1198 : /* Each character in the separator string advances the state by
1199 : * one. If state reaches seplen, the separator was recognized.
1200 : *
1201 : * The first loop and the nested loop make sure that if in any
1202 : * state we encounter an invalid character, but part of what we've
1203 : * matched so far is a prefix of the separator, we go to the
1204 : * appropriate state. */
1205 2262 : for (i = 0; i < seplen; i++)
1206 1142 : dfa[i][sep[0]] = 1;
1207 2262 : for (j = 0; j < seplen; j++) {
1208 1142 : dfa[j][sep[j]] = (unsigned char) (j + 1);
1209 1164 : for (k = 0; k < j; k++) {
1210 44 : for (i = 0; i < j - k; i++)
1211 22 : if (sep[k + i] != sep[i])
1212 : break;
1213 22 : if (i == j - k && dfa[j][sep[i]] <= i)
1214 0 : dfa[j][sep[i]] = (unsigned char) (i + 1);
1215 : }
1216 : }
1217 : return dfa;
1218 : }
1219 :
1220 : #ifdef __has_builtin
1221 : #if __has_builtin(__builtin_expect)
1222 : /* __builtin_expect returns its first argument; it is expected to be
1223 : * equal to the second argument */
1224 : #define unlikely(expr) __builtin_expect((expr) != 0, 0)
1225 : #define likely(expr) __builtin_expect((expr) != 0, 1)
1226 : #endif
1227 : #endif
1228 : #ifndef unlikely
1229 : #ifdef _MSC_VER
1230 : #define unlikely(expr) (__assume(!(expr)), (expr))
1231 : #define likely(expr) (__assume((expr)), (expr))
1232 : #else
1233 : #define unlikely(expr) (expr)
1234 : #define likely(expr) (expr)
1235 : #endif
1236 : #endif
1237 :
1238 : static void
1239 1120 : SQLproducer(void *p)
1240 : {
1241 1120 : READERtask *task = (READERtask *) p;
1242 1120 : bool consoleinput = false;
1243 1120 : int cur = 0; // buffer being filled
1244 1120 : bool blocked[MAXBUFFERS] = { false };
1245 1120 : bool ateof[MAXBUFFERS] = { false };
1246 1120 : BUN cnt = 0, bufcnt[MAXBUFFERS] = { 0 };
1247 1120 : char *end = NULL, *e = NULL, *s = NULL, *base;
1248 1120 : const char *rsep = task->rsep;
1249 1120 : size_t rseplen = strlen(rsep), partial = 0;
1250 1120 : char quote = task->quote;
1251 1120 : dfa_t rdfa;
1252 1120 : lng rowno = 0;
1253 1120 : lng lineno = 1;
1254 1120 : lng startlineno = 1;
1255 1120 : int more = 0;
1256 :
1257 1120 : MT_sema_down(&task->producer);
1258 1120 : if (task->id < 0) {
1259 : return;
1260 : }
1261 :
1262 1120 : MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
1263 1120 : rdfa = mkdfa((const unsigned char *) rsep, rseplen);
1264 1120 : if (rdfa == NULL) {
1265 0 : tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate memory",
1266 : "");
1267 0 : ateof[cur] = true;
1268 0 : goto reportlackofinput;
1269 : }
1270 :
1271 : /* TRC_DEBUG(MAL_SERVER, "SQLproducer started size '%zu' and len '%zu'\n", task->b->size, task->b->len);*/
1272 :
1273 1120 : base = end = s = task->input[cur];
1274 1120 : *s = 0;
1275 1120 : task->cur = cur;
1276 1120 : if (task->as->filename == NULL) {
1277 782 : consoleinput = true;
1278 782 : goto parseSTDIN;
1279 : }
1280 205264 : for (;;) {
1281 102801 : startlineno = lineno;
1282 102801 : ateof[cur] = !tablet_read_more(task);
1283 :
1284 : // we may be reading from standard input and may be out of input
1285 : // warn the consumers
1286 102801 : if (task->aborted || ((lineno & 8191) == 0 && bstream_getoob(task->cntxt->fdin))) {
1287 0 : tablet_error(task, rowno, lineno, int_nil,
1288 : "problem reported by client", s);
1289 0 : ateof[cur] = true;
1290 0 : goto reportlackofinput;
1291 : }
1292 :
1293 102801 : if (ateof[cur] && partial) {
1294 1 : if (unlikely(partial)) {
1295 1 : tablet_error(task, rowno, lineno, int_nil,
1296 : "incomplete record at end of file", s);
1297 1 : task->b->pos += partial;
1298 : }
1299 1 : goto reportlackofinput;
1300 : }
1301 :
1302 102800 : if (task->errbuf && task->errbuf[0]) {
1303 0 : if (unlikely(GDKerrbuf && GDKerrbuf[0])) {
1304 0 : tablet_error(task, rowno, lineno, int_nil, GDKerrbuf,
1305 : "SQLload_file");
1306 : /* TRC_DEBUG(MAL_SERVER, "Bailout on SQLload\n");*/
1307 0 : ateof[cur] = true;
1308 0 : break;
1309 : }
1310 : }
1311 :
1312 102800 : parseSTDIN:
1313 :
1314 : /* copy the stream buffer into the input buffer, which is guaranteed larger, but still limited */
1315 103582 : partial = 0;
1316 103582 : task->top[cur] = 0;
1317 103582 : s = task->input[cur];
1318 103582 : base = end;
1319 : /* avoid too long records */
1320 103582 : if (unlikely(end - s + task->b->len - task->b->pos >= task->rowlimit[cur])) {
1321 : /* the input buffer should be extended, but 'base' is not shared
1322 : between the threads, which we can not now update.
1323 : Mimic an ateof instead; */
1324 0 : tablet_error(task, rowno, lineno, int_nil, "record too long", "");
1325 0 : ateof[cur] = true;
1326 : /* TRC_DEBUG(MAL_SERVER, "Bailout on SQLload confronted with too large record\n");*/
1327 0 : goto reportlackofinput;
1328 : }
1329 103582 : memcpy(end, task->b->buf + task->b->pos, task->b->len - task->b->pos);
1330 103582 : end = end + task->b->len - task->b->pos;
1331 103582 : *end = '\0'; /* this is safe, as the stream ensures an extra byte */
1332 : /* Note that we rescan from the start of a record (the last
1333 : * partial buffer from the previous iteration), even if in the
1334 : * previous iteration we have already established that there
1335 : * is no record separator in the first, perhaps significant,
1336 : * part of the buffer. This is because if the record separator
1337 : * is longer than one byte, it is too complex (i.e. would
1338 : * require more state) to be sure what the state of the quote
1339 : * status is when we back off a few bytes from where the last
1340 : * scan ended (we need to back off some since we could be in
1341 : * the middle of the record separator). If this is too
1342 : * costly, we have to rethink the matter. */
1343 103582 : if (task->from_stdin && *s == '\n' && task->maxrow == BUN_MAX) {
1344 0 : ateof[cur] = true;
1345 0 : goto reportlackofinput;
1346 : }
1347 145200429 : for (e = s; *e && e < end && cnt < task->maxrow;) {
1348 : /* tokenize the record completely
1349 : *
1350 : * The format of the input should comply to the following
1351 : * grammar rule [ [[quote][[esc]char]*[quote]csep]*rsep]*
1352 : * where quote is a single user-defined character.
1353 : * Within the quoted fields a character may be escaped
1354 : * with a backslash. The correct number of fields should
1355 : * be supplied. In the first phase we simply break the
1356 : * rows at the record boundary. */
1357 : int nutf = 0;
1358 : int m = 0;
1359 : bool bs = false;
1360 : char q = 0;
1361 : size_t i = 0;
1362 3137643699 : while (*e) {
1363 3137642657 : if (task->skip > 0) {
1364 : /* no interpretation of data we're skipping, just
1365 : * look for newline */
1366 1827 : if (*e == '\n') {
1367 27 : lineno++;
1368 27 : break;
1369 : }
1370 : } else {
1371 : /* check for correctly encoded UTF-8 */
1372 3137640830 : if (nutf > 0) {
1373 1841 : if (unlikely((*e & 0xC0) != 0x80))
1374 1 : goto badutf8;
1375 3680 : if (unlikely(m != 0 && (*e & m) == 0))
1376 0 : goto badutf8;
1377 1840 : m = 0;
1378 1840 : nutf--;
1379 3137638989 : } else if ((*e & 0x80) != 0) {
1380 1709 : if ((*e & 0xE0) == 0xC0) {
1381 1582 : nutf = 1;
1382 1582 : if (unlikely((e[0] & 0x1E) == 0))
1383 0 : goto badutf8;
1384 127 : } else if ((*e & 0xF0) == 0xE0) {
1385 122 : nutf = 2;
1386 122 : if ((e[0] & 0x0F) == 0)
1387 3 : m = 0x20;
1388 5 : } else if (likely((*e & 0xF8) == 0xF0)) {
1389 5 : nutf = 3;
1390 5 : if ((e[0] & 0x07) == 0)
1391 5 : m = 0x30;
1392 : } else {
1393 0 : goto badutf8;
1394 : }
1395 3137637280 : } else if (*e == '\n')
1396 145097277 : lineno++;
1397 : /* check for quoting and the row separator */
1398 3137640829 : if (bs) {
1399 : bs = false;
1400 3137097613 : } else if (task->escape && *e == '\\') {
1401 : bs = true;
1402 : i = 0;
1403 3136554397 : } else if (*e == q) {
1404 : q = 0;
1405 3130985723 : } else if (*e == quote) {
1406 : q = quote;
1407 : i = 0;
1408 3125416966 : } else if (q == 0) {
1409 2943475243 : i = rdfa[i][(unsigned char) *e];
1410 2943475243 : if (i == rseplen)
1411 : break;
1412 : }
1413 : }
1414 2992545809 : e++;
1415 : }
1416 145097889 : if (*e == 0) {
1417 1042 : partial = e - s;
1418 : /* found an incomplete record, saved for next round */
1419 1042 : if (unlikely(s + partial < end)) {
1420 : /* found a EOS in the input */
1421 0 : tablet_error(task, rowno, startlineno, int_nil,
1422 : "record too long (EOS found)", "");
1423 0 : ateof[cur] = true;
1424 0 : goto reportlackofinput;
1425 : }
1426 : break;
1427 : } else {
1428 145096847 : rowno++;
1429 145096847 : if (task->skip > 0) {
1430 27 : task->skip--;
1431 : } else {
1432 145096820 : if (cnt < task->maxrow) {
1433 145096820 : task->startlineno[cur][task->top[cur]] = startlineno;
1434 145096820 : task->rows[cur][task->top[cur]++] = s;
1435 145096820 : startlineno = lineno;
1436 145096820 : cnt++;
1437 : }
1438 145096820 : *(e + 1 - rseplen) = 0;
1439 : }
1440 145096847 : s = ++e;
1441 145096847 : task->b->pos += (size_t) (e - base);
1442 145096847 : base = e;
1443 145096847 : if (task->top[cur] == task->limit)
1444 : break;
1445 : }
1446 : }
1447 :
1448 102539 : reportlackofinput:
1449 : /* TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
1450 :
1451 103583 : if (consoleinput) {
1452 101957 : task->cur = cur;
1453 101957 : task->ateof = ateof[cur];
1454 101957 : task->cnt = bufcnt[cur];
1455 : /* tell consumer to go ahead */
1456 101957 : MT_sema_up(&task->consumer);
1457 : /* then wait until it is done */
1458 101957 : MT_sema_down(&task->producer);
1459 101957 : if (cnt == task->maxrow) {
1460 779 : GDKfree(rdfa);
1461 779 : MT_thread_set_qry_ctx(NULL);
1462 779 : return;
1463 : }
1464 : } else {
1465 1626 : assert(!blocked[cur]);
1466 1626 : if (blocked[(cur + 1) % MAXBUFFERS]) {
1467 : /* first wait until other buffer is done */
1468 : /* TRC_DEBUG(MAL_SERVER, "Wait for consumers to finish buffer: %d\n", (cur + 1) % MAXBUFFERS);*/
1469 :
1470 1288 : MT_sema_down(&task->producer);
1471 1288 : blocked[(cur + 1) % MAXBUFFERS] = false;
1472 1288 : if (task->state == ENDOFCOPY) {
1473 0 : GDKfree(rdfa);
1474 0 : MT_thread_set_qry_ctx(NULL);
1475 0 : return;
1476 : }
1477 : }
1478 : /* other buffer is done, proceed with current buffer */
1479 1626 : assert(!blocked[(cur + 1) % MAXBUFFERS]);
1480 1626 : blocked[cur] = true;
1481 1626 : task->cur = cur;
1482 1626 : task->ateof = ateof[cur];
1483 1626 : task->cnt = bufcnt[cur];
1484 1626 : more = !ateof[cur] || (e && e < end
1485 0 : && task->top[cur] == task->limit);
1486 : /* TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
1487 :
1488 1626 : MT_sema_up(&task->consumer);
1489 :
1490 1626 : cur = (cur + 1) % MAXBUFFERS;
1491 : /* TRC_DEBUG(MAL_SERVER, "May continue with buffer: %d\n", cur);*/
1492 :
1493 1626 : if (cnt == task->maxrow) {
1494 182 : MT_sema_down(&task->producer);
1495 : /* TRC_DEBUG(MAL_SERVER, "Producer delivered all\n");*/
1496 182 : GDKfree(rdfa);
1497 182 : MT_thread_set_qry_ctx(NULL);
1498 182 : return;
1499 : }
1500 : }
1501 : /* TRC_DEBUG(MAL_SERVER, "Continue producer buffer: %d\n", cur);*/
1502 :
1503 : /* we ran out of input? */
1504 102622 : if (task->ateof && !more) {
1505 : /* TRC_DEBUG(MAL_SERVER, "Producer encountered eof\n");*/
1506 159 : GDKfree(rdfa);
1507 159 : MT_thread_set_qry_ctx(NULL);
1508 159 : return;
1509 : }
1510 : /* consumers ask us to stop? */
1511 102463 : if (task->state == ENDOFCOPY) {
1512 0 : GDKfree(rdfa);
1513 0 : MT_thread_set_qry_ctx(NULL);
1514 0 : return;
1515 : }
1516 102463 : bufcnt[cur] = cnt;
1517 : /* move the non-parsed correct row data to the head of the next buffer */
1518 102463 : end = s = task->input[cur];
1519 : }
1520 0 : if (unlikely(cnt < task->maxrow && task->maxrow != BUN_NONE)) {
1521 0 : char msg[256];
1522 0 : snprintf(msg, sizeof(msg), "incomplete record at end of file:%s\n", s);
1523 0 : task->as->error = GDKstrdup(msg);
1524 0 : tablet_error(task, rowno, startlineno, int_nil,
1525 : "incomplete record at end of file", s);
1526 0 : task->b->pos += partial;
1527 : }
1528 0 : GDKfree(rdfa);
1529 0 : MT_thread_set_qry_ctx(NULL);
1530 :
1531 0 : return;
1532 :
1533 1 : badutf8:
1534 1 : tablet_error(task, rowno, startlineno, int_nil,
1535 : "input not properly encoded UTF-8", "");
1536 1 : ateof[cur] = true;
1537 1 : goto reportlackofinput;
1538 : }
1539 :
1540 : static void
1541 1150 : create_rejects_table(Client cntxt)
1542 : {
1543 1150 : MT_lock_set(&mal_contextLock);
1544 1150 : if (cntxt->error_row == NULL) {
1545 484 : cntxt->error_row = COLnew(0, TYPE_lng, 0, TRANSIENT);
1546 484 : cntxt->error_fld = COLnew(0, TYPE_int, 0, TRANSIENT);
1547 484 : cntxt->error_msg = COLnew(0, TYPE_str, 0, TRANSIENT);
1548 484 : cntxt->error_input = COLnew(0, TYPE_str, 0, TRANSIENT);
1549 484 : if (cntxt->error_row == NULL || cntxt->error_fld == NULL
1550 484 : || cntxt->error_msg == NULL || cntxt->error_input == NULL) {
1551 0 : BBPreclaim(cntxt->error_row);
1552 0 : BBPreclaim(cntxt->error_fld);
1553 0 : BBPreclaim(cntxt->error_msg);
1554 0 : BBPreclaim(cntxt->error_input);
1555 0 : cntxt->error_row = cntxt->error_fld = cntxt->error_msg = cntxt->error_input = NULL;
1556 : }
1557 : }
1558 1150 : MT_lock_unset(&mal_contextLock);
1559 1150 : }
1560 :
1561 : BUN
1562 1120 : SQLload_file(Client cntxt, Tablet *as, bstream *b, stream *out,
1563 : const char *csep, const char *rsep, char quote, lng skip,
1564 : lng maxrow, int best, bool from_stdin, const char *tabnam,
1565 : bool escape)
1566 : {
1567 1120 : BUN cnt = 0, cntstart = 0, leftover = 0;
1568 1120 : int res = 0; /* < 0: error, > 0: success, == 0: continue processing */
1569 1120 : int j;
1570 1120 : BUN firstcol;
1571 1120 : BUN i, attr;
1572 1120 : READERtask task;
1573 1120 : READERtask ptask[MAXWORKERS];
1574 1120 : int threads = 1;
1575 1120 : lng tio, t1 = 0;
1576 1120 : char name[MT_NAME_LEN];
1577 :
1578 1120 : if (maxrow < 0 || maxrow > (LL_CONSTANT(1) << 16)) {
1579 113 : threads = GDKgetenv_int("tablet_threads", GDKnr_threads);
1580 113 : if (threads > 1)
1581 113 : threads = threads < MAXWORKERS ? threads - 1 : MAXWORKERS - 1;
1582 : else
1583 : threads = 1;
1584 : }
1585 :
1586 : /* TRC_DEBUG(MAL_SERVER, "Prepare copy work for '%d' threads col '%s' rec '%s' quot '%c'\n", threads, csep, rsep, quote);*/
1587 :
1588 1120 : memset(ptask, 0, sizeof(ptask));
1589 2240 : task = (READERtask) {
1590 : .cntxt = cntxt,
1591 : .from_stdin = from_stdin,
1592 : .as = as,
1593 : .escape = escape, /* TODO: implement feature!!! */
1594 1120 : .set_qry_ctx = MT_thread_get_qry_ctx() != NULL,
1595 : };
1596 :
1597 : /* create the reject tables */
1598 1120 : create_rejects_table(task.cntxt);
1599 1120 : if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL
1600 1120 : || task.cntxt->error_msg == NULL || task.cntxt->error_input == NULL) {
1601 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1602 : "SQLload initialization failed", "");
1603 : /* nothing allocated yet, so nothing to free */
1604 0 : return BUN_NONE;
1605 : }
1606 :
1607 1120 : assert(rsep);
1608 1120 : assert(csep);
1609 1120 : assert(maxrow < 0 || maxrow <= (lng) BUN_MAX);
1610 1120 : task.fields = (char ***) GDKzalloc(as->nr_attrs * sizeof(char **));
1611 1120 : task.cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
1612 1120 : task.time = (lng *) GDKzalloc(as->nr_attrs * sizeof(lng));
1613 1120 : if (task.fields == NULL || task.cols == NULL || task.time == NULL) {
1614 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1615 : "memory allocation failed", "SQLload_file");
1616 0 : goto bailout;
1617 : }
1618 1120 : task.cur = 0;
1619 3360 : for (i = 0; i < MAXBUFFERS; i++) {
1620 2240 : task.base[i] = GDKmalloc(MAXROWSIZE(2 * b->size) + 2);
1621 2240 : task.rowlimit[i] = MAXROWSIZE(2 * b->size);
1622 2240 : if (task.base[i] == NULL) {
1623 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1624 : SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
1625 0 : goto bailout;
1626 : }
1627 2240 : task.base[i][0] = task.base[i][b->size + 1] = 0;
1628 2240 : task.input[i] = task.base[i] + 1; /* wrap the buffer with null bytes */
1629 : }
1630 1120 : task.besteffort = best;
1631 :
1632 1120 : if (maxrow < 0)
1633 79 : task.maxrow = BUN_MAX;
1634 : else
1635 1041 : task.maxrow = (BUN) maxrow;
1636 :
1637 1120 : task.skip = skip;
1638 1120 : task.quote = quote;
1639 1120 : task.csep = csep;
1640 1120 : task.seplen = strlen(csep);
1641 1120 : task.rsep = rsep;
1642 1120 : task.rseplen = strlen(rsep);
1643 1120 : task.errbuf = cntxt->errbuf;
1644 :
1645 1120 : MT_sema_init(&task.producer, 0, "task.producer");
1646 1120 : MT_sema_init(&task.consumer, 0, "task.consumer");
1647 1120 : task.ateof = false;
1648 1120 : task.b = b;
1649 1120 : task.out = out;
1650 :
1651 1120 : as->error = NULL;
1652 :
1653 : /* there is no point in creating more threads than we have columns */
1654 1120 : if (as->nr_attrs < (BUN) threads)
1655 44 : threads = (int) as->nr_attrs;
1656 :
1657 : /* allocate enough space for pointers into the buffer pool. */
1658 : /* the record separator is considered a column */
1659 1120 : task.limit = (int) (b->size / as->nr_attrs + as->nr_attrs);
1660 11462 : for (i = 0; i < as->nr_attrs; i++) {
1661 10342 : task.fields[i] = GDKmalloc(sizeof(char *) * task.limit);
1662 10342 : if (task.fields[i] == NULL) {
1663 0 : if (task.as->error == NULL)
1664 0 : as->error = createException(MAL, "sql.copy_from",
1665 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
1666 0 : goto bailout;
1667 : }
1668 10342 : task.cols[i] = (int) (i + 1); /* to distinguish non initialized later with zero */
1669 : }
1670 3360 : for (i = 0; i < MAXBUFFERS; i++) {
1671 2240 : task.rows[i] = GDKzalloc(sizeof(char *) * task.limit);
1672 2240 : task.startlineno[i] = GDKzalloc(sizeof(lng) * task.limit);
1673 2240 : if (task.rows[i] == NULL || task.startlineno[i] == NULL) {
1674 0 : GDKfree(task.rows[i]);
1675 0 : GDKfree(task.startlineno[i]);
1676 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1677 : SQLSTATE(HY013) MAL_MALLOC_FAIL,
1678 : "SQLload_file:failed to alloc buffers");
1679 0 : goto bailout;
1680 : }
1681 : }
1682 1120 : task.rowerror = (bte *) GDKzalloc(sizeof(bte) * task.limit);
1683 1120 : if (task.rowerror == NULL) {
1684 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1685 : SQLSTATE(HY013) MAL_MALLOC_FAIL,
1686 : "SQLload_file:failed to alloc rowerror buffer");
1687 0 : goto bailout;
1688 : }
1689 :
1690 1120 : task.id = 0;
1691 1120 : snprintf(name, sizeof(name), "prod-%s", tabnam);
1692 1120 : if (MT_create_thread(&task.tid, SQLproducer, (void *) &task, MT_THR_JOINABLE, name) < 0) {
1693 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1694 : SQLSTATE(42000) "failed to start producer thread",
1695 : "SQLload_file");
1696 0 : goto bailout;
1697 : }
1698 : /* TRC_DEBUG(MAL_SERVER, "Parallel bulk load " LLFMT " - " BUNFMT "\n", skip, task.maxrow);*/
1699 :
1700 1120 : task.workers = threads;
1701 2413 : for (j = 0; j < threads; j++) {
1702 1293 : ptask[j] = task;
1703 1293 : ptask[j].id = j;
1704 1293 : ptask[j].cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
1705 1293 : if (ptask[j].cols == NULL) {
1706 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1707 : SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
1708 0 : task.id = -1;
1709 0 : MT_sema_up(&task.producer);
1710 0 : goto bailout;
1711 : }
1712 1293 : snprintf(name, sizeof(name), "ptask%d.sema", j);
1713 1293 : MT_sema_init(&ptask[j].sema, 0, name);
1714 1293 : snprintf(name, sizeof(name), "ptask%d.repl", j);
1715 1293 : MT_sema_init(&ptask[j].reply, 0, name);
1716 1293 : snprintf(name, sizeof(name), "wrkr%d-%s", j, tabnam);
1717 1293 : if (MT_create_thread(&ptask[j].tid, SQLworker, (void *) &ptask[j], MT_THR_JOINABLE, name) < 0) {
1718 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1719 : SQLSTATE(42000) "failed to start worker thread",
1720 : "SQLload_file");
1721 0 : threads = j;
1722 0 : for (j = 0; j < threads; j++)
1723 0 : ptask[j].workers = threads;
1724 : }
1725 : }
1726 1120 : if (threads == 0) {
1727 : /* no threads started */
1728 0 : task.id = -1;
1729 0 : MT_sema_up(&task.producer);
1730 0 : goto bailout;
1731 : }
1732 1120 : MT_sema_up(&task.producer);
1733 :
1734 1120 : tio = GDKusec();
1735 1120 : tio = GDKusec() - tio;
1736 1120 : t1 = GDKusec();
1737 2243 : for (firstcol = 0; firstcol < task.as->nr_attrs; firstcol++)
1738 1123 : if (task.as->format[firstcol].c != NULL)
1739 : break;
1740 104521 : while (res == 0 && cnt < task.maxrow) {
1741 :
1742 : // track how many elements are in the aggregated BATs
1743 103583 : cntstart = BATcount(task.as->format[firstcol].c);
1744 : /* block until the producer has data available */
1745 103583 : MT_sema_down(&task.consumer);
1746 103583 : cnt += task.top[task.cur];
1747 103583 : if (task.ateof && !task.top[task.cur])
1748 : break;
1749 103424 : t1 = GDKusec() - t1;
1750 : /* TRC_DEBUG(MAL_SERVER, "Break: %d rows\n", task.top[task.cur]);*/
1751 :
1752 103424 : t1 = GDKusec();
1753 103424 : if (task.top[task.cur]) {
1754 : /* activate the workers to break rows */
1755 408373 : for (j = 0; j < threads; j++) {
1756 : /* stage one, break the rows in parallel */
1757 304979 : ptask[j].error = 0;
1758 304979 : ptask[j].state = BREAKROW;
1759 304979 : ptask[j].next = task.top[task.cur];
1760 304979 : ptask[j].fields = task.fields;
1761 304979 : ptask[j].limit = task.limit;
1762 304979 : ptask[j].cnt = task.cnt;
1763 304979 : ptask[j].cur = task.cur;
1764 304979 : ptask[j].top[task.cur] = task.top[task.cur];
1765 304979 : MT_sema_up(&ptask[j].sema);
1766 : }
1767 : }
1768 103424 : if (task.top[task.cur]) {
1769 : /* await completion of row break phase */
1770 408373 : for (j = 0; j < threads; j++) {
1771 304979 : MT_sema_down(&ptask[j].reply);
1772 304979 : if (ptask[j].error) {
1773 0 : res = -1;
1774 : /* TRC_ERROR(MAL_SERVER, "Error in task: %d %d\n", j, ptask[j].error);*/
1775 : }
1776 : }
1777 : }
1778 :
1779 : /* TRC_DEBUG(MAL_SERVER,
1780 : "Fill the BATs '%d' " BUNFMT " cap " BUNFMT "\n",
1781 : task.top[task.cur], task.cnt, BATcapacity(as->format[task.cur].c));*/
1782 :
1783 103424 : if (task.top[task.cur]) {
1784 103394 : if (res == 0) {
1785 103394 : SQLworkdivider(&task, ptask, (int) as->nr_attrs, threads);
1786 :
1787 : /* activate the workers to update the BATs */
1788 511767 : for (j = 0; j < threads; j++) {
1789 : /* stage two, update the BATs */
1790 304979 : ptask[j].state = UPDATEBAT;
1791 304979 : MT_sema_up(&ptask[j].sema);
1792 : }
1793 : }
1794 : }
1795 103424 : tio = GDKusec();
1796 103424 : tio = t1 - tio;
1797 :
1798 : /* await completion of the BAT updates */
1799 103424 : if (res == 0 && task.top[task.cur]) {
1800 408373 : for (j = 0; j < threads; j++) {
1801 304979 : MT_sema_down(&ptask[j].reply);
1802 304979 : if (ptask[j].errorcnt > 0 && !ptask[j].besteffort) {
1803 304979 : res = -1;
1804 304979 : best = 0;
1805 : }
1806 : }
1807 : }
1808 :
1809 : /* trim the BATs discarding error tuples */
1810 : #define trimerrors(TYPE) \
1811 : do { \
1812 : TYPE *src, *dst; \
1813 : leftover= BATcount(task.as->format[attr].c); \
1814 : limit = leftover - cntstart; \
1815 : dst =src= (TYPE *) BUNtloc(task.as->format[attr].ci,cntstart); \
1816 : for(j = 0; j < (int) limit; j++, src++){ \
1817 : if ( task.rowerror[j]){ \
1818 : leftover--; \
1819 : continue; \
1820 : } \
1821 : *dst++ = *src; \
1822 : } \
1823 : BATsetcount(task.as->format[attr].c, leftover ); \
1824 : } while (0)
1825 :
1826 : /* TRC_DEBUG(MAL_SERVER, "Trim bbest '%d' table size " BUNFMT " - rows found so far " BUNFMT "\n",
1827 : best, BATcount(as->format[firstcol].c), task.cnt); */
1828 :
1829 103424 : if (best && BATcount(as->format[firstcol].c)) {
1830 : BUN limit;
1831 : int width;
1832 :
1833 53 : for (attr = 0; attr < as->nr_attrs; attr++) {
1834 37 : if (as->format[attr].skip)
1835 5 : continue;
1836 32 : width = as->format[attr].c->twidth;
1837 32 : as->format[attr].ci = bat_iterator_nolock(as->format[attr].c);
1838 32 : switch (width) {
1839 5 : case 1:
1840 16 : trimerrors(bte);
1841 5 : break;
1842 0 : case 2:
1843 0 : trimerrors(sht);
1844 0 : break;
1845 27 : case 4:
1846 30061 : trimerrors(int);
1847 27 : break;
1848 0 : case 8:
1849 0 : trimerrors(lng);
1850 0 : break;
1851 : #ifdef HAVE_HGE
1852 0 : case 16:
1853 0 : trimerrors(hge);
1854 0 : break;
1855 : #endif
1856 0 : default:
1857 : {
1858 0 : char *src, *dst;
1859 0 : leftover = BATcount(task.as->format[attr].c);
1860 0 : limit = leftover - cntstart;
1861 0 : dst = src = BUNtloc(task.as->format[attr].ci, cntstart);
1862 0 : for (j = 0; j < (int) limit; j++, src += width) {
1863 0 : if (task.rowerror[j]) {
1864 0 : leftover--;
1865 0 : continue;
1866 : }
1867 0 : if (dst != src)
1868 0 : memcpy(dst, src, width);
1869 0 : dst += width;
1870 : }
1871 0 : BATsetcount(task.as->format[attr].c, leftover);
1872 : }
1873 0 : break;
1874 : }
1875 : }
1876 : // re-initialize the error vector;
1877 16 : memset(task.rowerror, 0, task.limit);
1878 16 : task.errorcnt = 0;
1879 : }
1880 :
1881 103424 : if (res < 0) {
1882 : /* producer should stop */
1883 23 : task.maxrow = cnt;
1884 23 : task.state = ENDOFCOPY;
1885 23 : task.ateof = true;
1886 : }
1887 103424 : if (task.ateof && task.top[task.cur] < task.limit && cnt != task.maxrow)
1888 : break;
1889 103424 : task.top[task.cur] = 0;
1890 103424 : if (cnt == task.maxrow)
1891 961 : task.ateof = true;
1892 104544 : MT_sema_up(&task.producer);
1893 : }
1894 :
1895 : /* TRC_DEBUG(MAL_SERVER, "End of block stream eof=%d - res=%d\n", task.ateof, res);*/
1896 :
1897 1120 : cnt = BATcount(task.as->format[firstcol].c);
1898 :
1899 1120 : task.state = ENDOFCOPY;
1900 : /* TRC_DEBUG(MAL_SERVER, "Activate sync on disk\n");*/
1901 :
1902 1120 : if (!task.ateof || cnt < task.maxrow) {
1903 : /* TRC_DEBUG(MAL_SERVER, "Shut down reader\n");*/
1904 176 : MT_sema_up(&task.producer);
1905 : }
1906 1120 : MT_join_thread(task.tid);
1907 :
1908 : /* TRC_DEBUG(MAL_SERVER, "Activate endofcopy\n");*/
1909 :
1910 3533 : for (j = 0; j < threads; j++) {
1911 1293 : ptask[j].state = ENDOFCOPY;
1912 1293 : MT_sema_up(&ptask[j].sema);
1913 : }
1914 : /* wait for their death */
1915 2413 : for (j = 0; j < threads; j++)
1916 1293 : MT_sema_down(&ptask[j].reply);
1917 :
1918 : /* TRC_DEBUG(MAL_SERVER, "Kill the workers\n");*/
1919 :
1920 2413 : for (j = 0; j < threads; j++) {
1921 1293 : MT_join_thread(ptask[j].tid);
1922 1293 : GDKfree(ptask[j].cols);
1923 1293 : MT_sema_destroy(&ptask[j].sema);
1924 1293 : MT_sema_destroy(&ptask[j].reply);
1925 : }
1926 :
1927 : /* TRC_DEBUG(MAL_SERVER, "Found " BUNFMT " tuples\n", cnt);*/
1928 : /* TRC_DEBUG(MAL_SERVER, "Leftover input: %.63s\n", task.b->buf + task.b->pos);*/
1929 :
1930 11462 : for (i = 0; i < as->nr_attrs; i++) {
1931 10342 : BAT *b = task.as->format[i].c;
1932 10342 : if (b)
1933 10336 : BATsettrivprop(b);
1934 10342 : GDKfree(task.fields[i]);
1935 : }
1936 1120 : GDKfree(task.fields);
1937 1120 : GDKfree(task.cols);
1938 1120 : GDKfree(task.time);
1939 4480 : for (i = 0; i < MAXBUFFERS; i++) {
1940 2240 : GDKfree(task.base[i]);
1941 2240 : GDKfree(task.rows[i]);
1942 2240 : GDKfree(task.startlineno[i]);
1943 : }
1944 1120 : if (task.rowerror)
1945 1120 : GDKfree(task.rowerror);
1946 1120 : MT_sema_destroy(&task.producer);
1947 1120 : MT_sema_destroy(&task.consumer);
1948 :
1949 1120 : return res < 0 ? BUN_NONE : cnt;
1950 :
1951 0 : bailout:
1952 0 : if (task.fields) {
1953 0 : for (i = 0; i < as->nr_attrs; i++)
1954 0 : GDKfree(task.fields[i]);
1955 0 : GDKfree(task.fields);
1956 : }
1957 0 : GDKfree(task.time);
1958 0 : GDKfree(task.cols);
1959 0 : GDKfree(task.base[task.cur]);
1960 0 : GDKfree(task.rowerror);
1961 0 : for (i = 0; i < MAXWORKERS; i++)
1962 0 : GDKfree(ptask[i].cols);
1963 : return BUN_NONE;
1964 : }
1965 :
1966 : /* return the latest reject table */
1967 : str
1968 30 : COPYrejects(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1969 : {
1970 30 : bat *row = getArgReference_bat(stk, pci, 0);
1971 30 : bat *fld = getArgReference_bat(stk, pci, 1);
1972 30 : bat *msg = getArgReference_bat(stk, pci, 2);
1973 30 : bat *inp = getArgReference_bat(stk, pci, 3);
1974 :
1975 30 : create_rejects_table(cntxt);
1976 30 : if (cntxt->error_row == NULL)
1977 0 : throw(MAL, "sql.rejects", "No reject table available");
1978 30 : MT_lock_set(&errorlock);
1979 30 : BAT *bn1 = COLcopy(cntxt->error_row, cntxt->error_row->ttype, true, TRANSIENT);
1980 30 : BAT *bn2 = COLcopy(cntxt->error_fld, cntxt->error_fld->ttype, true, TRANSIENT);
1981 30 : BAT *bn3 = COLcopy(cntxt->error_msg, cntxt->error_msg->ttype, true, TRANSIENT);
1982 30 : BAT *bn4 = COLcopy(cntxt->error_input, cntxt->error_input->ttype, true, TRANSIENT);
1983 30 : MT_lock_unset(&errorlock);
1984 30 : if (bn1 == NULL || bn2 == NULL || bn3 == NULL || bn4 == NULL) {
1985 0 : BBPreclaim(bn1);
1986 0 : BBPreclaim(bn2);
1987 0 : BBPreclaim(bn3);
1988 0 : BBPreclaim(bn4);
1989 0 : throw(MAL, "sql.rejects", GDK_EXCEPTION);
1990 : }
1991 30 : *row = bn1->batCacheid;
1992 30 : *fld = bn2->batCacheid;
1993 30 : *msg = bn3->batCacheid;
1994 30 : *inp = bn4->batCacheid;
1995 30 : BBPkeepref(bn1);
1996 30 : BBPkeepref(bn2);
1997 30 : BBPkeepref(bn3);
1998 30 : BBPkeepref(bn4);
1999 30 : (void) mb;
2000 30 : return MAL_SUCCEED;
2001 : }
2002 :
2003 : str
2004 14 : COPYrejects_clear(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
2005 : {
2006 14 : if (cntxt->error_row) {
2007 13 : MT_lock_set(&errorlock);
2008 13 : BATclear(cntxt->error_row, true);
2009 13 : if (cntxt->error_fld)
2010 13 : BATclear(cntxt->error_fld, true);
2011 13 : if (cntxt->error_msg)
2012 13 : BATclear(cntxt->error_msg, true);
2013 13 : if (cntxt->error_input)
2014 13 : BATclear(cntxt->error_input, true);
2015 13 : MT_lock_unset(&errorlock);
2016 : }
2017 14 : (void) mb;
2018 14 : (void) stk;
2019 14 : (void) pci;
2020 14 : return MAL_SUCCEED;
2021 : }
|