Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * Niels Nes, Martin Kersten
15 : *
16 : * Parallel bulk load for SQL
17 : * The COPY INTO command for SQL is heavily CPU bound, which means
18 : * that ideally we would like to exploit the multi-cores to do that
19 : * work in parallel.
20 : * Complicating factors are the initial record offset, the
21 : * possible variable length of the input, and the original sort order
22 : * that should preferable be maintained.
23 : *
24 : * The code below consists of a file reader, which breaks up the
25 : * file into chunks of distinct rows. Then multiple parallel threads
26 : * grab them, and break them on the field boundaries.
27 : * After all fields are identified this way, the columns are converted
28 : * and stored in the BATs.
29 : *
30 : * The threads get a reference to a private copy of the READERtask.
31 : * It includes a list of columns they should handle. This is a basis
32 : * to distributed cheap and expensive columns over threads.
33 : *
34 : * The file reader overlaps IO with updates of the BAT.
35 : * Also the buffer size of the block stream might be a little small for
36 : * this task (1MB). It has been increased to 8MB, which indeed improved.
37 : *
38 : * The work divider allocates subtasks to threads based on the
39 : * observed time spending so far.
40 : */
41 :
42 : #include "monetdb_config.h"
43 : #include "tablet.h"
44 : #include "mapi_prompt.h"
45 : #include "mal_internal.h"
46 :
47 : #include <string.h>
48 : #include <ctype.h>
49 :
50 : #define MAXWORKERS 64
51 : #define MAXBUFFERS 2
52 : /* We restrict the row length to be 32MB for the time being */
53 : #define MAXROWSIZE(X) (X > 32*1024*1024 ? X : 32*1024*1024)
54 :
55 : static MT_Lock errorlock = MT_LOCK_INITIALIZER(errorlock);
56 :
57 : static BAT *
58 10113 : void_bat_create(int adt, BUN nr)
59 : {
60 10113 : BAT *b = COLnew(0, adt, nr, TRANSIENT);
61 :
62 : /* check for correct structures */
63 10113 : if (b == NULL)
64 : return NULL;
65 10113 : if ((b = BATsetaccess(b, BAT_APPEND)) == NULL) {
66 : return NULL;
67 : }
68 :
69 : /* disable all properties here */
70 10113 : b->tsorted = false;
71 10113 : b->trevsorted = false;
72 10113 : b->tnosorted = 0;
73 10113 : b->tnorevsorted = 0;
74 10113 : b->tseqbase = oid_nil;
75 10113 : b->tkey = false;
76 10113 : b->tnokey[0] = 0;
77 10113 : b->tnokey[1] = 0;
78 10113 : return b;
79 : }
80 :
81 : void
82 121060 : TABLETdestroy_format(Tablet *as)
83 : {
84 121060 : BUN p;
85 121060 : Column *fmt = as->format;
86 :
87 566770 : for (p = 0; p < as->nr_attrs; p++) {
88 445709 : BBPreclaim(fmt[p].c);
89 445707 : if (fmt[p].data)
90 10118 : GDKfree(fmt[p].data);
91 : }
92 121061 : GDKfree(fmt);
93 121061 : }
94 :
95 : static oid
96 119987 : check_BATs(Tablet *as)
97 : {
98 119987 : Column *fmt = as->format;
99 119987 : BUN i = 0;
100 119987 : BUN cnt;
101 119987 : oid base;
102 :
103 119987 : if (fmt[i].c == NULL)
104 119990 : i++;
105 119987 : cnt = BATcount(fmt[i].c);
106 119987 : base = fmt[i].c->hseqbase;
107 :
108 119987 : if (as->nr != cnt) {
109 5786 : for (i = 0; i < as->nr_attrs; i++)
110 5107 : if (fmt[i].c)
111 4428 : fmt[i].p = as->offset;
112 679 : return oid_nil;
113 : }
114 :
115 549781 : for (i = 0; i < as->nr_attrs; i++) {
116 430473 : BAT *b = fmt[i].c;
117 :
118 430473 : if (b == NULL)
119 119307 : continue;
120 :
121 311166 : if (BATcount(b) != cnt || b->hseqbase != base)
122 0 : return oid_nil;
123 :
124 311166 : fmt[i].p = as->offset;
125 : }
126 : return base;
127 : }
128 :
129 : str
130 1071 : TABLETcreate_bats(Tablet *as, BUN est)
131 : {
132 1071 : Column *fmt = as->format;
133 1071 : BUN i, nr = 0;
134 :
135 11190 : for (i = 0; i < as->nr_attrs; i++) {
136 10119 : if (fmt[i].skip)
137 6 : continue;
138 10113 : fmt[i].c = void_bat_create(fmt[i].adt, est);
139 10113 : if (!fmt[i].c) {
140 0 : while (i > 0) {
141 0 : if (!fmt[--i].skip) {
142 0 : BBPreclaim(fmt[i].c);
143 0 : fmt[i].c = NULL;
144 : }
145 : }
146 0 : throw(SQL, "copy", "Failed to create bat of size " BUNFMT "\n",
147 : est);
148 : }
149 10113 : fmt[i].ci = bat_iterator_nolock(fmt[i].c);
150 10113 : nr++;
151 : }
152 1071 : if (!nr)
153 0 : throw(SQL, "copy",
154 : "At least one column should be read from the input\n");
155 : return MAL_SUCCEED;
156 : }
157 :
158 : str
159 1046 : TABLETcollect(BAT **bats, Tablet *as)
160 : {
161 1046 : Column *fmt = as->format;
162 1046 : BUN i, j;
163 1046 : BUN cnt = 0;
164 :
165 1046 : if (bats == NULL)
166 0 : throw(SQL, "copy", "Missing container");
167 2496 : for (i = 0; i < as->nr_attrs && !cnt; i++)
168 1450 : if (!fmt[i].skip)
169 1447 : cnt = BATcount(fmt[i].c);
170 11072 : for (i = 0, j = 0; i < as->nr_attrs; i++) {
171 10026 : if (fmt[i].skip)
172 6 : continue;
173 10020 : bats[j] = fmt[i].c;
174 10020 : BBPfix(bats[j]->batCacheid);
175 10020 : if ((fmt[i].c = BATsetaccess(fmt[i].c, BAT_READ)) == NULL)
176 0 : throw(SQL, "copy",
177 : "Failed to set access at tablet part " BUNFMT "\n", cnt);
178 10020 : fmt[i].c->tsorted = fmt[i].c->trevsorted = false;
179 10020 : fmt[i].c->tkey = false;
180 10020 : BATsettrivprop(fmt[i].c);
181 :
182 10020 : if (cnt != BATcount(fmt[i].c))
183 0 : throw(SQL, "copy", "Count " BUNFMT " differs from " BUNFMT "\n",
184 : BATcount(fmt[i].c), cnt);
185 10020 : j++;
186 : }
187 : return MAL_SUCCEED;
188 : }
189 :
190 : // the starting quote character has already been skipped
191 :
192 : static char *
193 4235056 : tablet_skip_string(char *s, char quote, bool escape)
194 : {
195 4235056 : size_t i = 0, j = 0;
196 87832117 : while (s[i]) {
197 87742549 : if (escape && s[i] == '\\' && s[i + 1] != '\0')
198 485952 : s[j++] = s[i++];
199 87256597 : else if (s[i] == quote) {
200 2928555 : if (s[i + 1] != quote)
201 : break;
202 : i++; /* skip the first quote */
203 : }
204 83597061 : s[j++] = s[i++];
205 : }
206 4235056 : assert(s[i] == quote || s[i] == '\0');
207 4235056 : if (s[i] == 0)
208 : return NULL;
209 4235056 : s[j] = 0;
210 4235056 : return s + i;
211 : }
212 :
213 : static int
214 0 : TABLET_error(stream *s)
215 : {
216 0 : const char *err = mnstr_peek_error(s);
217 0 : if (err)
218 0 : TRC_ERROR(MAL_SERVER, "Stream error: %s\n", err);
219 0 : return -1;
220 : }
221 :
222 : /* The output line is first built before being sent. It solves a problem
223 : with UDP, where you may loose most of the information using short writes
224 : */
225 : static inline int
226 0 : output_line(char **buf, size_t *len, char **localbuf, size_t *locallen,
227 : Column *fmt, stream *fd, BUN nr_attrs, oid id)
228 : {
229 0 : BUN i;
230 0 : ssize_t fill = 0;
231 :
232 0 : for (i = 0; i < nr_attrs; i++) {
233 0 : if (fmt[i].c == NULL)
234 0 : continue;
235 0 : if (id < fmt[i].c->hseqbase
236 0 : || id >= fmt[i].c->hseqbase + BATcount(fmt[i].c))
237 : break;
238 0 : fmt[i].p = id - fmt[i].c->hseqbase;
239 : }
240 0 : if (i == nr_attrs) {
241 0 : for (i = 0; i < nr_attrs; i++) {
242 0 : Column *f = fmt + i;
243 0 : const char *p;
244 0 : ssize_t l;
245 :
246 0 : if (f->c) {
247 0 : p = BUNtail(f->ci, f->p);
248 :
249 0 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
250 0 : p = f->nullstr;
251 0 : l = (ssize_t) strlen(f->nullstr);
252 : } else {
253 0 : l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
254 0 : if (l < 0)
255 : return -1;
256 0 : p = *localbuf;
257 : }
258 0 : if (fill + l + f->seplen >= (ssize_t) * len) {
259 : /* extend the buffer */
260 0 : char *nbuf;
261 0 : nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
262 0 : if (nbuf == NULL)
263 : return -1; /* *buf freed by caller */
264 0 : *buf = nbuf;
265 0 : *len = fill + l + f->seplen + BUFSIZ;
266 : }
267 0 : strncpy(*buf + fill, p, l);
268 0 : fill += l;
269 : }
270 0 : strncpy(*buf + fill, f->sep, f->seplen);
271 0 : fill += f->seplen;
272 : }
273 : }
274 0 : if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
275 0 : return TABLET_error(fd);
276 : return 0;
277 : }
278 :
279 : static inline int
280 5983923 : output_line_dense(char **buf, size_t *len, char **localbuf, size_t *locallen,
281 : Column *fmt, stream *fd, BUN nr_attrs)
282 : {
283 5983923 : BUN i;
284 5983923 : ssize_t fill = 0;
285 :
286 26309089 : for (i = 0; i < nr_attrs; i++) {
287 20325164 : Column *f = fmt + i;
288 20325164 : const char *p;
289 20325164 : ssize_t l;
290 :
291 20325164 : if (f->c) {
292 14341239 : p = BUNtail(f->ci, f->p);
293 :
294 14341239 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
295 528798 : p = f->nullstr;
296 528798 : l = (ssize_t) strlen(p);
297 : } else {
298 13812440 : l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
299 13812443 : if (l < 0)
300 : return -1;
301 13812443 : p = *localbuf;
302 : }
303 14341241 : if (fill + l + f->seplen >= (ssize_t) * len) {
304 : /* extend the buffer */
305 74 : char *nbuf;
306 74 : nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
307 74 : if (nbuf == NULL)
308 : return -1; /* *buf freed by caller */
309 74 : *buf = nbuf;
310 74 : *len = fill + l + f->seplen + BUFSIZ;
311 : }
312 14341241 : strncpy(*buf + fill, p, l);
313 14341241 : fill += l;
314 14341241 : f->p++;
315 : }
316 20325166 : strncpy(*buf + fill, f->sep, f->seplen);
317 20325166 : fill += f->seplen;
318 : }
319 5983925 : if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
320 0 : return TABLET_error(fd);
321 : return 0;
322 : }
323 :
324 : static inline int
325 0 : output_line_lookup(char **buf, size_t *len, Column *fmt, stream *fd,
326 : BUN nr_attrs, oid id)
327 : {
328 0 : BUN i;
329 :
330 0 : for (i = 0; i < nr_attrs; i++) {
331 0 : Column *f = fmt + i;
332 :
333 0 : if (f->c) {
334 0 : const void *p = BUNtail(f->ci, id - f->c->hseqbase);
335 :
336 0 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
337 0 : size_t l = strlen(f->nullstr);
338 0 : if (mnstr_write(fd, f->nullstr, 1, l) != (ssize_t) l)
339 0 : return TABLET_error(fd);
340 : } else {
341 0 : ssize_t l = f->tostr(f->extra, buf, len, f->adt, p);
342 :
343 0 : if (l < 0 || mnstr_write(fd, *buf, 1, l) != l)
344 0 : return TABLET_error(fd);
345 : }
346 : }
347 0 : if (mnstr_write(fd, f->sep, 1, f->seplen) != f->seplen)
348 0 : return TABLET_error(fd);
349 : }
350 : return 0;
351 : }
352 :
353 : /* returns TRUE if there is/might be more */
354 : static bool
355 102735 : tablet_read_more(bstream *in, stream *out, size_t n)
356 : {
357 102735 : if (out) {
358 101174 : do {
359 : /* query is not finished ask for more */
360 : /* we need more query text */
361 101174 : if (bstream_next(in) < 0)
362 : return false;
363 101174 : if (in->eof) {
364 101172 : if (mnstr_write(out, PROMPT2, sizeof(PROMPT2) - 1, 1) == 1)
365 101172 : mnstr_flush(out, MNSTR_FLUSH_DATA);
366 101172 : in->eof = false;
367 : /* we need more query text */
368 101172 : if (bstream_next(in) <= 0)
369 : return false;
370 : }
371 101172 : } while (in->len <= in->pos);
372 1561 : } else if (bstream_read(in, n) <= 0) {
373 : return false;
374 : }
375 : return true;
376 : }
377 :
378 : /*
379 : * Fast Load
380 : * To speedup the CPU intensive loading of files we have to break
381 : * the file into pieces and perform parallel analysis. Experimentation
382 : * against lineitem SF1 showed that half of the time goes into very
383 : * basis atom analysis (41 out of 102 B instructions).
384 : * Furthermore, the actual insertion into the BATs takes only
385 : * about 10% of the total. With multi-core processors around
386 : * it seems we can gain here significantly.
387 : *
388 : * The approach taken is to fork a parallel scan over the text file.
389 : * We assume that the blocked stream is already
390 : * positioned correctly at the reading position. The start and limit
391 : * indicates the byte range to search for tuples.
392 : * If start> 0 then we first skip to the next record separator.
393 : * If necessary we read more than 'limit' bytes to ensure parsing a complete
394 : * record and stop at the record boundary.
395 : * Beware, we should allocate Tablet descriptors for each file segment,
396 : * otherwise we end up with a gross concurrency control problem.
397 : * The resulting BATs should be glued at the final phase.
398 : *
399 : * Raw Load
400 : * Front-ends can bypass most of the overhead in loading the BATs
401 : * by preparing the corresponding files directly and replace those
402 : * created by e.g. the SQL frontend.
403 : * This strategy is only advisable for cases where we have very
404 : * large files >200GB and/or are created by a well debugged code.
405 : *
406 : * To experiment with this approach, the code base responds
407 : * on negative number of cores by dumping the data directly in BAT
408 : * storage format into a collections of files on disk.
409 : * It reports on the actions to be taken to replace BATs.
410 : * This technique is initially only supported for fixed-sized columns.
411 : * The rawmode() indicator acts as the internal switch.
412 : */
413 :
414 : /*
415 : * To speed up loading ascii files we have to determine the number of blocks.
416 : * This depends on the number of cores available.
417 : * For the time being we hardwire this decision based on our own
418 : * platforms.
419 : * Furthermore, we only consider parallel load for file-based requests.
420 : *
421 : * To simplify our world, we assume a single producer process.
422 : */
423 :
424 : static int
425 0 : output_file_default(Tablet *as, BAT *order, stream *fd)
426 : {
427 0 : size_t len = BUFSIZ, locallen = BUFSIZ;
428 0 : int res = 0;
429 0 : char *buf = GDKmalloc(len);
430 0 : char *localbuf = GDKmalloc(len);
431 0 : BUN p, q;
432 0 : oid id;
433 0 : BUN offset = as->offset;
434 :
435 0 : if (buf == NULL || localbuf == NULL) {
436 0 : GDKfree(buf);
437 0 : GDKfree(localbuf);
438 0 : return -1;
439 : }
440 0 : for (q = offset + as->nr, p = offset, id = order->hseqbase + offset; p < q;
441 0 : p++, id++) {
442 0 : if ((res = output_line(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs, id)) < 0) {
443 0 : GDKfree(buf);
444 0 : GDKfree(localbuf);
445 0 : return res;
446 : }
447 : }
448 0 : GDKfree(localbuf);
449 0 : GDKfree(buf);
450 0 : return res;
451 : }
452 :
453 : static int
454 119989 : output_file_dense(Tablet *as, stream *fd)
455 : {
456 119989 : size_t len = BUFSIZ, locallen = BUFSIZ;
457 119989 : int res = 0;
458 119989 : char *buf = GDKmalloc(len);
459 119988 : char *localbuf = GDKmalloc(len);
460 119986 : BUN i = 0;
461 :
462 119986 : if (buf == NULL || localbuf == NULL) {
463 0 : GDKfree(buf);
464 0 : GDKfree(localbuf);
465 0 : return -1;
466 : }
467 6103909 : for (i = 0; i < as->nr; i++) {
468 5983922 : if ((res = output_line_dense(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs)) < 0) {
469 0 : GDKfree(buf);
470 0 : GDKfree(localbuf);
471 0 : return res;
472 : }
473 : }
474 119987 : GDKfree(localbuf);
475 119990 : GDKfree(buf);
476 119990 : return res;
477 : }
478 :
479 : static int
480 0 : output_file_ordered(Tablet *as, BAT *order, stream *fd)
481 : {
482 0 : size_t len = BUFSIZ;
483 0 : int res = 0;
484 0 : char *buf = GDKmalloc(len);
485 0 : BUN p, q;
486 0 : BUN i = 0;
487 0 : BUN offset = as->offset;
488 :
489 0 : if (buf == NULL)
490 : return -1;
491 0 : for (q = offset + as->nr, p = offset; p < q; p++, i++) {
492 0 : oid h = order->hseqbase + p;
493 :
494 0 : if ((res = output_line_lookup(&buf, &len, as->format, fd, as->nr_attrs, h)) < 0) {
495 0 : GDKfree(buf);
496 0 : return res;
497 : }
498 : }
499 0 : GDKfree(buf);
500 0 : return res;
501 : }
502 :
503 : int
504 119990 : TABLEToutput_file(Tablet *as, BAT *order, stream *s)
505 : {
506 119990 : oid base = oid_nil;
507 119990 : int ret = 0;
508 :
509 : /* only set nr if it is zero or lower (bogus) to the maximum value
510 : * possible (BATcount), if already set within BATcount range,
511 : * preserve value such that for instance SQL's reply_size still
512 : * works
513 : */
514 119990 : if (order) {
515 0 : BUN maxnr = BATcount(order);
516 0 : if (as->nr == BUN_NONE || as->nr > maxnr)
517 0 : as->nr = maxnr;
518 : }
519 119990 : assert(as->nr != BUN_NONE);
520 :
521 119990 : base = check_BATs(as);
522 119990 : if (!order || !is_oid_nil(base)) {
523 119990 : if (!order || order->hseqbase == base)
524 119990 : ret = output_file_dense(as, s);
525 : else
526 0 : ret = output_file_ordered(as, order, s);
527 : } else {
528 0 : ret = output_file_default(as, order, s);
529 : }
530 119990 : return ret;
531 : }
532 :
533 : /*
534 : * Niels Nes, Martin Kersten
535 : *
536 : * Parallel bulk load for SQL
537 : * The COPY INTO command for SQL is heavily CPU bound, which means
538 : * that ideally we would like to exploit the multi-cores to do that
539 : * work in parallel.
540 : * Complicating factors are the initial record offset, the
541 : * possible variable length of the input, and the original sort order
542 : * that should preferable be maintained.
543 : *
544 : * The code below consists of a file reader, which breaks up the
545 : * file into chunks of distinct rows. Then multiple parallel threads
546 : * grab them, and break them on the field boundaries.
547 : * After all fields are identified this way, the columns are converted
548 : * and stored in the BATs.
549 : *
550 : * The threads get a reference to a private copy of the READERtask.
551 : * It includes a list of columns they should handle. This is a basis
552 : * to distributed cheap and expensive columns over threads.
553 : *
554 : * The file reader overlaps IO with updates of the BAT.
555 : * Also the buffer size of the block stream might be a little small for
556 : * this task (1MB). It has been increased to 8MB, which indeed improved.
557 : *
558 : * The work divider allocates subtasks to threads based on the
559 : * observed time spending so far.
560 : */
561 :
562 : #define BREAKROW 1
563 : #define UPDATEBAT 2
564 : #define ENDOFCOPY 3
565 :
566 : typedef struct {
567 : Client cntxt;
568 : int id; /* for self reference */
569 : int state; /* row break=1 , 2 = update bat */
570 : int workers; /* how many concurrent ones */
571 : int error; /* error during row break */
572 : int next;
573 : int limit;
574 : BUN cnt, maxrow; /* first row in file chunk. */
575 : lng skip; /* number of lines to be skipped */
576 : lng *time, wtime; /* time per col + time per thread */
577 : int rounds; /* how often did we divide the work */
578 : bool ateof; /* io control */
579 : bool from_stdin;
580 : bool escape; /* whether to handle \ escapes */
581 : bool besteffort;
582 : char quote;
583 : bstream *b;
584 : stream *out;
585 : MT_Id tid;
586 : MT_Sema producer; /* reader waits for call */
587 : MT_Sema consumer; /* reader waits for call */
588 : MT_Sema sema; /* threads wait for work , negative next implies exit */
589 : MT_Sema reply; /* let reader continue */
590 : Tablet *as;
591 : char *errbuf;
592 : const char *csep, *rsep;
593 : size_t seplen, rseplen;
594 :
595 : char *base[MAXBUFFERS], *input[MAXBUFFERS]; /* buffers for row splitter and tokenizer */
596 : size_t rowlimit[MAXBUFFERS]; /* determines maximal record length buffer */
597 : char **rows[MAXBUFFERS];
598 : lng *startlineno[MAXBUFFERS];
599 : int top[MAXBUFFERS]; /* number of rows in this buffer */
600 : int cur; /* current buffer used by splitter and update threads */
601 :
602 : int *cols; /* columns to handle */
603 : char ***fields;
604 : bte *rowerror;
605 : int errorcnt;
606 : bool set_qry_ctx;
607 : } READERtask;
608 :
609 : /* note, the column value that is passed here is the 0 based value; the
610 : * lineno value on the other hand is 1 based */
611 : static void
612 34 : tablet_error(READERtask *task, lng idx, lng lineno, int col, const char *msg,
613 : const char *fcn)
614 : {
615 34 : assert(is_int_nil(col) || col >= 0);
616 34 : assert(is_lng_nil(lineno) || lineno >= 1);
617 34 : MT_lock_set(&errorlock);
618 34 : if (task->cntxt->error_row != NULL
619 34 : && (BUNappend(task->cntxt->error_row, &lineno, false) != GDK_SUCCEED
620 34 : || BUNappend(task->cntxt->error_fld, &(int) { col + 1 },
621 : false) != GDK_SUCCEED
622 34 : || BUNappend(task->cntxt->error_msg, msg, false) != GDK_SUCCEED
623 34 : || BUNappend(task->cntxt->error_input, fcn,
624 : false) != GDK_SUCCEED)) {
625 0 : task->besteffort = false;
626 : }
627 34 : if (!is_lng_nil(idx) && task->rowerror && idx < task->limit)
628 34 : task->rowerror[idx]++;
629 34 : if (task->as->error == NULL) {
630 60 : const char *colnam = is_int_nil(col) || col < 0
631 30 : || (BUN) col >= task->as->nr_attrs ? NULL : task->as->format[col].name;
632 30 : if (msg == NULL) {
633 0 : task->besteffort = false;
634 30 : } else if (!is_lng_nil(lineno)) {
635 30 : if (!is_int_nil(col)) {
636 28 : if (colnam)
637 28 : task->as->error = createException(MAL, "sql.copy_from",
638 : "line " LLFMT ": column %d %s: %s",
639 : lineno, col + 1, colnam, msg);
640 : else
641 0 : task->as->error = createException(MAL, "sql.copy_from",
642 : "line " LLFMT ": column %d: %s",
643 : lineno, col + 1, msg);
644 : } else {
645 2 : task->as->error = createException(MAL, "sql.copy_from",
646 : "line " LLFMT ": %s", lineno, msg);
647 : }
648 : } else {
649 0 : if (!is_int_nil(col)) {
650 0 : if (colnam)
651 0 : task->as->error = createException(MAL, "sql.copy_from",
652 : "column %d %s: %s", col + 1, colnam,
653 : msg);
654 : else
655 0 : task->as->error = createException(MAL, "sql.copy_from",
656 : "column %d: %s", col + 1, msg);
657 : } else {
658 0 : task->as->error = createException(MAL, "sql.copy_from", "%s", msg);
659 : }
660 : }
661 : }
662 34 : task->errorcnt++;
663 34 : MT_lock_unset(&errorlock);
664 34 : }
665 :
666 : /*
667 : * The row is broken into pieces directly on their field separators. It assumes that we have
668 : * the record in the cache already, so we can do most work quickly.
669 : * Furthermore, it assume a uniform (SQL) pattern, without whitespace skipping, but with quote and separator.
670 : */
671 :
672 : static size_t
673 115 : mystrlen(const char *s)
674 : {
675 : /* Calculate and return the space that is needed for the function
676 : * mycpstr below to do its work. */
677 115 : size_t len = 0;
678 115 : const char *s0 = s;
679 :
680 30878 : while (*s) {
681 30763 : if ((*s & 0x80) == 0) {
682 : ;
683 6 : } else if ((*s & 0xC0) == 0x80) {
684 : /* continuation byte */
685 0 : len += 3;
686 6 : } else if ((*s & 0xE0) == 0xC0) {
687 : /* two-byte sequence */
688 6 : if ((s[1] & 0xC0) != 0x80)
689 0 : len += 3;
690 : else
691 6 : s += 2;
692 0 : } else if ((*s & 0xF0) == 0xE0) {
693 : /* three-byte sequence */
694 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
695 0 : len += 3;
696 : else
697 0 : s += 3;
698 0 : } else if ((*s & 0xF8) == 0xF0) {
699 : /* four-byte sequence */
700 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
701 0 : || (s[3] & 0xC0) != 0x80)
702 0 : len += 3;
703 : else
704 0 : s += 4;
705 : } else {
706 : /* not a valid start byte */
707 0 : len += 3;
708 : }
709 30763 : s++;
710 : }
711 115 : len += s - s0;
712 115 : return len;
713 : }
714 :
715 : static char *
716 181 : mycpstr(char *t, const char *s)
717 : {
718 : /* Copy the string pointed to by s into the buffer pointed to by
719 : * t, and return a pointer to the NULL byte at the end. During
720 : * the copy we translate incorrect UTF-8 sequences to escapes
721 : * looking like <XX> where XX is the hexadecimal representation of
722 : * the incorrect byte. The buffer t needs to be large enough to
723 : * hold the result, but the correct length can be calculated by
724 : * the function mystrlen above.*/
725 31016 : while (*s) {
726 30835 : if ((*s & 0x80) == 0) {
727 30829 : *t++ = *s++;
728 6 : } else if ((*s & 0xC0) == 0x80) {
729 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
730 6 : } else if ((*s & 0xE0) == 0xC0) {
731 : /* two-byte sequence */
732 6 : if ((s[1] & 0xC0) != 0x80)
733 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
734 : else {
735 6 : *t++ = *s++;
736 6 : *t++ = *s++;
737 : }
738 0 : } else if ((*s & 0xF0) == 0xE0) {
739 : /* three-byte sequence */
740 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
741 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
742 : else {
743 0 : *t++ = *s++;
744 0 : *t++ = *s++;
745 0 : *t++ = *s++;
746 : }
747 0 : } else if ((*s & 0xF8) == 0xF0) {
748 : /* four-byte sequence */
749 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
750 0 : || (s[3] & 0xC0) != 0x80)
751 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
752 : else {
753 0 : *t++ = *s++;
754 0 : *t++ = *s++;
755 0 : *t++ = *s++;
756 0 : *t++ = *s++;
757 : }
758 : } else {
759 : /* not a valid start byte */
760 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
761 : }
762 : }
763 181 : *t = 0;
764 181 : return t;
765 : }
766 :
767 : static str
768 32 : SQLload_error(READERtask *task, lng idx, BUN attrs)
769 : {
770 32 : str line;
771 32 : char *s;
772 32 : size_t sz = 0;
773 32 : BUN i;
774 :
775 130 : for (i = 0; i < attrs; i++) {
776 98 : if (task->fields[i][idx])
777 88 : sz += mystrlen(task->fields[i][idx]);
778 98 : sz += task->seplen;
779 : }
780 :
781 32 : s = line = GDKmalloc(sz + task->rseplen + 1);
782 32 : if (line == NULL) {
783 0 : tablet_error(task, idx, lng_nil, int_nil, "SQLload malloc error",
784 : "SQLload_error");
785 0 : return NULL;
786 : }
787 130 : for (i = 0; i < attrs; i++) {
788 98 : if (task->fields[i][idx])
789 88 : s = mycpstr(s, task->fields[i][idx]);
790 98 : if (i < attrs - 1)
791 66 : s = mycpstr(s, task->csep);
792 : }
793 32 : strcpy(s, task->rsep);
794 32 : return line;
795 : }
796 :
797 : /*
798 : * The parsing of the individual values is straightforward. If the value represents
799 : * the null-replacement string then we grab the underlying nil.
800 : * If the string starts with the quote identified from SQL, we locate the tail
801 : * and interpret the body.
802 : *
803 : * If inserting fails, we return -1; if the value cannot be parsed, we
804 : * return -1 if besteffort is not set, otherwise we return 0, but in
805 : * either case an entry is added to the error table.
806 : */
807 : static inline int
808 328034488 : SQLinsert_val(READERtask *task, int col, int idx)
809 : {
810 328034488 : Column *fmt = task->as->format + col;
811 328034488 : const void *adt;
812 328034488 : char buf[BUFSIZ];
813 328034488 : char *s = task->fields[col][idx];
814 328034488 : char *err = NULL;
815 328034488 : int ret = 0;
816 :
817 : /* include testing on the terminating null byte !! */
818 328034488 : if (s == NULL) {
819 6540601 : adt = fmt->nildata;
820 6540601 : fmt->c->tnonil = false;
821 : } else {
822 321493887 : if (task->escape) {
823 321293784 : size_t slen = strlen(s) + 1;
824 321293784 : char *data = slen <= sizeof(buf) ? buf : GDKmalloc(strlen(s) + 1);
825 15 : if (data == NULL
826 338772899 : || GDKstrFromStr((unsigned char *) data, (unsigned char *) s,
827 321293784 : strlen(s), '\0') < 0)
828 : adt = NULL;
829 : else
830 338772897 : adt = fmt->frstr(fmt, fmt->adt, data);
831 344283667 : if (data != buf)
832 15 : GDKfree(data);
833 : } else
834 200103 : adt = fmt->frstr(fmt, fmt->adt, s);
835 : }
836 :
837 351024371 : lng row = BATcount(fmt->c) + 1;
838 351024371 : if (adt == NULL) {
839 27 : if (task->rowerror) {
840 27 : err = SQLload_error(task, idx, task->as->nr_attrs);
841 27 : if (s) {
842 27 : size_t slen = mystrlen(s);
843 27 : char *scpy = GDKmalloc(slen + 1);
844 27 : if (scpy == NULL) {
845 0 : tablet_error(task, idx, row, col,
846 : SQLSTATE(HY013) MAL_MALLOC_FAIL, err);
847 0 : task->besteffort = false; /* no longer best effort */
848 0 : GDKfree(err);
849 0 : return -1;
850 : }
851 27 : mycpstr(scpy, s);
852 27 : s = scpy;
853 : }
854 27 : snprintf(buf, sizeof(buf), "'%s' expected%s%s%s", fmt->type,
855 : s ? " in '" : "", s ? s : "", s ? "'" : "");
856 27 : GDKfree(s);
857 27 : tablet_error(task, idx, row, col, buf, err);
858 27 : GDKfree(err);
859 27 : if (!task->besteffort)
860 : return -1;
861 : }
862 6 : ret = -!task->besteffort; /* yep, two unary operators ;-) */
863 : /* replace it with a nil */
864 6 : adt = fmt->nildata;
865 6 : fmt->c->tnonil = false;
866 : }
867 351024350 : if (bunfastapp(fmt->c, adt) == GDK_SUCCEED)
868 : return ret;
869 :
870 : /* failure */
871 0 : if (task->rowerror) {
872 0 : char *msg = GDKerrbuf;
873 0 : err = SQLload_error(task, idx, task->as->nr_attrs);
874 0 : tablet_error(task, idx, row, col, msg
875 0 : && *msg ? msg : "insert failed", err);
876 0 : GDKfree(err);
877 : }
878 0 : task->besteffort = false; /* no longer best effort */
879 0 : return -1;
880 : }
881 :
882 : static int
883 327302 : SQLworker_column(READERtask *task, int col)
884 : {
885 327302 : int i;
886 327302 : Column *fmt = task->as->format;
887 :
888 327302 : if (fmt[col].c == NULL)
889 : return 0;
890 :
891 : /* watch out for concurrent threads */
892 327296 : MT_lock_set(&mal_copyLock);
893 327320 : if (!fmt[col].skip
894 327320 : && BATcapacity(fmt[col].c) < BATcount(fmt[col].c) + task->next) {
895 263 : if (BATextend(fmt[col].c, BATgrows(fmt[col].c) + task->limit) != GDK_SUCCEED) {
896 0 : tablet_error(task, lng_nil, lng_nil, col,
897 : "Failed to extend the BAT\n", "SQLworker_column");
898 0 : MT_lock_unset(&mal_copyLock);
899 0 : return -1;
900 : }
901 : }
902 327320 : MT_lock_unset(&mal_copyLock);
903 :
904 331366706 : for (i = 0; i < task->top[task->cur]; i++) {
905 330712165 : if (!fmt[col].skip && SQLinsert_val(task, col, i) < 0) {
906 21 : BATsetcount(fmt[col].c, BATcount(fmt[col].c));
907 21 : return -1;
908 : }
909 : }
910 327221 : BATsetcount(fmt[col].c, BATcount(fmt[col].c));
911 :
912 327221 : return 0;
913 : }
914 :
915 : /*
916 : * The rows are broken on the column separator. Any error is shown and reflected with
917 : * setting the reference of the offending row fields to NULL.
918 : * This allows the loading to continue, skipping the minimal number of rows.
919 : * The details about the locations can be inspected from the error table.
920 : * We also trim the quotes around strings.
921 : */
922 : static int
923 137988887 : SQLload_parse_row(READERtask *task, int idx)
924 : {
925 137988887 : BUN i;
926 137988887 : char errmsg[BUFSIZ];
927 137988887 : char ch = *task->csep;
928 137988887 : char *row = task->rows[task->cur][idx];
929 137988887 : lng startlineno = task->startlineno[task->cur][idx];
930 137988887 : Tablet *as = task->as;
931 137988887 : Column *fmt = as->format;
932 137988887 : bool error = false;
933 137988887 : str errline = NULL;
934 :
935 137988887 : assert(idx < task->top[task->cur]);
936 137988887 : assert(row);
937 137988887 : errmsg[0] = 0;
938 :
939 137988887 : if (task->quote || task->seplen != 1) {
940 12729702 : for (i = 0; i < as->nr_attrs; i++) {
941 10650907 : bool quote = false;
942 10650907 : task->fields[i][idx] = row;
943 : /* recognize fields starting with a quote, keep them */
944 10650907 : if (*row && *row == task->quote) {
945 4738985 : quote = true;
946 4738985 : task->fields[i][idx] = row + 1;
947 4738985 : row = tablet_skip_string(row + 1, task->quote, task->escape);
948 :
949 4198149 : if (!row) {
950 0 : errline = SQLload_error(task, idx, i + 1);
951 0 : snprintf(errmsg, BUFSIZ, "Quote (%c) missing", task->quote);
952 0 : tablet_error(task, idx, startlineno, (int) i, errmsg,
953 : errline);
954 0 : GDKfree(errline);
955 0 : error = true;
956 0 : goto errors1;
957 : } else
958 4198149 : *row++ = 0;
959 : }
960 :
961 : /* eat away the column separator */
962 54916912 : for (; *row; row++)
963 53908415 : if (*row == '\\' && task->escape) {
964 2 : if (row[1])
965 2 : row++;
966 53908413 : } else if (*row == ch
967 9101574 : && (task->seplen == 1
968 4 : || strncmp(row, task->csep,
969 : task->seplen) == 0)) {
970 9101574 : *row = 0;
971 9101574 : row += task->seplen;
972 9101574 : goto endoffieldcheck;
973 : }
974 :
975 : /* not enough fields */
976 1008497 : if (i < as->nr_attrs - 1) {
977 0 : errline = SQLload_error(task, idx, i + 1);
978 : /* it's the next value that is missing */
979 0 : tablet_error(task, idx, startlineno, (int) i + 1,
980 : "Column value missing", errline);
981 0 : GDKfree(errline);
982 0 : error = true;
983 0 : errors1:
984 : /* we save all errors detected as NULL values */
985 0 : for (; i < as->nr_attrs; i++)
986 0 : task->fields[i][idx] = NULL;
987 0 : i--;
988 : }
989 1008497 : endoffieldcheck:
990 10110071 : ;
991 : /* check for user defined NULL string */
992 10110071 : if ((!quote || !fmt->null_length) && fmt->nullstr
993 8989621 : && task->fields[i][idx]
994 8989621 : && strncasecmp(task->fields[i][idx], fmt->nullstr,
995 8989621 : fmt->null_length + 1) == 0)
996 2210299 : task->fields[i][idx] = 0;
997 : }
998 : } else {
999 : assert(!task->quote);
1000 : assert(task->seplen == 1);
1001 364448948 : for (i = 0; i < as->nr_attrs; i++) {
1002 229079692 : task->fields[i][idx] = row;
1003 :
1004 : /* eat away the column separator */
1005 2060746172 : for (; *row; row++)
1006 1925815705 : if (*row == '\\' && task->escape) {
1007 382 : if (row[1])
1008 382 : row++;
1009 1925815323 : } else if (*row == ch) {
1010 94149225 : *row = 0;
1011 94149225 : row++;
1012 94149225 : goto endoffield2;
1013 : }
1014 :
1015 : /* not enough fields */
1016 134930467 : if (i < as->nr_attrs - 1) {
1017 4 : errline = SQLload_error(task, idx, i + 1);
1018 : /* it's the next value that is missing */
1019 4 : tablet_error(task, idx, startlineno, (int) i + 1,
1020 : "Column value missing", errline);
1021 4 : GDKfree(errline);
1022 4 : error = true;
1023 : /* we save all errors detected */
1024 16 : for (; i < as->nr_attrs; i++)
1025 8 : task->fields[i][idx] = NULL;
1026 4 : i--;
1027 : }
1028 134930463 : endoffield2:
1029 229079692 : ;
1030 : /* check for user defined NULL string */
1031 229079692 : if (fmt->nullstr && task->fields[i][idx]
1032 214406445 : && strncasecmp(task->fields[i][idx], fmt->nullstr,
1033 214406445 : fmt->null_length + 1) == 0) {
1034 3837665 : task->fields[i][idx] = 0;
1035 : }
1036 : }
1037 : }
1038 : /* check for too many values as well */
1039 137448051 : if (row && *row && i == as->nr_attrs) {
1040 1 : errline = SQLload_error(task, idx, task->as->nr_attrs);
1041 1 : snprintf(errmsg, BUFSIZ, "Leftover data '%s'", row);
1042 1 : tablet_error(task, idx, startlineno, (int) i, errmsg, errline);
1043 1 : GDKfree(errline);
1044 1 : error = true;
1045 : }
1046 137448051 : return error ? -1 : 0;
1047 : }
1048 :
1049 : static void
1050 1230 : SQLworker(void *arg)
1051 : {
1052 1230 : READERtask *task = (READERtask *) arg;
1053 1230 : unsigned int i;
1054 1230 : int j, piece;
1055 1230 : lng t0;
1056 :
1057 1230 : GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
1058 1230 : GDKclrerr();
1059 1230 : task->errbuf = GDKerrbuf;
1060 1230 : MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
1061 :
1062 1230 : MT_sema_down(&task->sema);
1063 610585 : while (task->top[task->cur] >= 0) {
1064 : /* stage one, break the rows spread the work over the workers */
1065 610585 : switch (task->state) {
1066 304631 : case BREAKROW:
1067 304631 : t0 = GDKusec();
1068 304811 : piece = (task->top[task->cur] + task->workers) / task->workers;
1069 :
1070 304811 : for (j = piece * task->id;
1071 137708461 : j < task->top[task->cur] && j < piece * (task->id + 1); j++)
1072 137404970 : if (task->rows[task->cur][j]) {
1073 137404970 : if (SQLload_parse_row(task, j) < 0) {
1074 5 : task->errorcnt++;
1075 : // early break unless best effort
1076 5 : if (!task->besteffort) {
1077 2 : for (j++;
1078 4 : j < task->top[task->cur]
1079 4 : && j < piece * (task->id + 1); j++)
1080 8 : for (i = 0; i < task->as->nr_attrs; i++)
1081 6 : task->fields[i][j] = NULL;
1082 : break;
1083 : }
1084 : }
1085 : }
1086 303493 : task->wtime = GDKusec() - t0;
1087 304456 : break;
1088 304724 : case UPDATEBAT:
1089 304724 : if (!task->besteffort && task->errorcnt)
1090 : break;
1091 : /* stage two, updating the BATs */
1092 1260315 : for (i = 0; i < task->as->nr_attrs; i++)
1093 955571 : if (task->cols[i]) {
1094 327215 : t0 = GDKusec();
1095 327305 : if (SQLworker_column(task, task->cols[i] - 1) < 0)
1096 : break;
1097 327236 : t0 = GDKusec() - t0;
1098 327232 : task->time[i] += t0;
1099 327232 : task->wtime += t0;
1100 : }
1101 : break;
1102 1230 : case ENDOFCOPY:
1103 1230 : MT_sema_up(&task->reply);
1104 1230 : goto do_return;
1105 : }
1106 609218 : MT_sema_up(&task->reply);
1107 1218302 : MT_sema_down(&task->sema);
1108 : }
1109 0 : MT_sema_up(&task->reply);
1110 :
1111 1230 : do_return:
1112 1230 : GDKfree(GDKerrbuf);
1113 1230 : GDKsetbuf(NULL);
1114 1230 : MT_thread_set_qry_ctx(NULL);
1115 1230 : }
1116 :
1117 : static void
1118 103317 : SQLworkdivider(READERtask *task, READERtask *ptask, int nr_attrs, int threads)
1119 : {
1120 103317 : int i, j, mi;
1121 103317 : lng loc[MAXWORKERS];
1122 :
1123 : /* after a few rounds we stick to the work assignment */
1124 103317 : if (task->rounds > 8)
1125 103193 : return;
1126 : /* simple round robin the first time */
1127 2715 : if (threads == 1 || task->rounds++ == 0) {
1128 16448 : for (i = j = 0; i < nr_attrs; i++, j++)
1129 13857 : ptask[j % threads].cols[i] = task->cols[i];
1130 : return;
1131 : }
1132 124 : memset(loc, 0, sizeof(loc));
1133 : /* use of load directives */
1134 1898 : for (i = 0; i < nr_attrs; i++)
1135 7036 : for (j = 0; j < threads; j++)
1136 5262 : ptask[j].cols[i] = 0;
1137 :
1138 : /* now allocate the work to the threads */
1139 1898 : for (i = 0; i < nr_attrs; i++, j++) {
1140 : mi = 0;
1141 5262 : for (j = 1; j < threads; j++)
1142 3488 : if (loc[j] < loc[mi])
1143 1433 : mi = j;
1144 :
1145 1774 : ptask[mi].cols[i] = task->cols[i];
1146 1774 : loc[mi] += task->time[i];
1147 : }
1148 : /* reset the timer */
1149 1898 : for (i = 0; i < nr_attrs; i++, j++)
1150 1774 : task->time[i] = 0;
1151 : }
1152 :
1153 : /*
1154 : * Reading is handled by a separate task as a preparation for more parallelism.
1155 : * A buffer is filled with proper rows.
1156 : * If we are reading from a file then a double buffering scheme ia activated.
1157 : * Reading from the console (stdin) remains single buffered only.
1158 : * If we end up with unfinished records, then the rowlimit will terminate the process.
1159 : */
1160 :
1161 : typedef unsigned char (*dfa_t)[256];
1162 :
1163 : static dfa_t
1164 1071 : mkdfa(const unsigned char *sep, size_t seplen)
1165 : {
1166 1071 : dfa_t dfa;
1167 1071 : size_t i, j, k;
1168 :
1169 1071 : dfa = GDKzalloc(seplen * sizeof(*dfa));
1170 1071 : if (dfa == NULL)
1171 : return NULL;
1172 : /* Each character in the separator string advances the state by
1173 : * one. If state reaches seplen, the separator was recognized.
1174 : *
1175 : * The first loop and the nested loop make sure that if in any
1176 : * state we encounter an invalid character, but part of what we've
1177 : * matched so far is a prefix of the separator, we go to the
1178 : * appropriate state. */
1179 2164 : for (i = 0; i < seplen; i++)
1180 1093 : dfa[i][sep[0]] = 1;
1181 2164 : for (j = 0; j < seplen; j++) {
1182 1093 : dfa[j][sep[j]] = (unsigned char) (j + 1);
1183 1115 : for (k = 0; k < j; k++) {
1184 44 : for (i = 0; i < j - k; i++)
1185 22 : if (sep[k + i] != sep[i])
1186 : break;
1187 22 : if (i == j - k && dfa[j][sep[i]] <= i)
1188 0 : dfa[j][sep[i]] = (unsigned char) (i + 1);
1189 : }
1190 : }
1191 : return dfa;
1192 : }
1193 :
1194 : #ifdef __GNUC__
1195 : /* __builtin_expect returns its first argument; it is expected to be
1196 : * equal to the second argument */
1197 : #define unlikely(expr) __builtin_expect((expr) != 0, 0)
1198 : #define likely(expr) __builtin_expect((expr) != 0, 1)
1199 : #else
1200 : #define unlikely(expr) (expr)
1201 : #define likely(expr) (expr)
1202 : #endif
1203 :
1204 : static void
1205 1071 : SQLproducer(void *p)
1206 : {
1207 1071 : READERtask *task = (READERtask *) p;
1208 1071 : bool consoleinput = false;
1209 1071 : int cur = 0; // buffer being filled
1210 1071 : bool blocked[MAXBUFFERS] = { false };
1211 1071 : bool ateof[MAXBUFFERS] = { false };
1212 1071 : BUN cnt = 0, bufcnt[MAXBUFFERS] = { 0 };
1213 1071 : char *end = NULL, *e = NULL, *s = NULL, *base;
1214 1071 : const char *rsep = task->rsep;
1215 1071 : size_t rseplen = strlen(rsep), partial = 0;
1216 1071 : char quote = task->quote;
1217 1071 : dfa_t rdfa;
1218 1071 : lng rowno = 0;
1219 1071 : lng lineno = 1;
1220 1071 : lng startlineno = 1;
1221 1071 : int more = 0;
1222 :
1223 1071 : MT_sema_down(&task->producer);
1224 1071 : if (task->id < 0) {
1225 : return;
1226 : }
1227 :
1228 1071 : MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
1229 1071 : rdfa = mkdfa((const unsigned char *) rsep, rseplen);
1230 1071 : if (rdfa == NULL) {
1231 0 : tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate memory",
1232 : "");
1233 0 : ateof[cur] = true;
1234 0 : goto reportlackofinput;
1235 : }
1236 :
1237 : /* TRC_DEBUG(MAL_SERVER, "SQLproducer started size '%zu' and len '%zu'\n", task->b->size, task->b->len);*/
1238 :
1239 1071 : base = end = s = task->input[cur];
1240 1071 : *s = 0;
1241 1071 : task->cur = cur;
1242 1071 : if (task->as->filename == NULL) {
1243 760 : consoleinput = true;
1244 760 : goto parseSTDIN;
1245 : }
1246 205159 : for (;;) {
1247 102735 : startlineno = lineno;
1248 102735 : ateof[cur] = !tablet_read_more(task->b, task->out, task->b->size);
1249 :
1250 : // we may be reading from standard input and may be out of input
1251 : // warn the consumers
1252 102735 : if (ateof[cur] && partial) {
1253 1 : if (unlikely(partial)) {
1254 1 : tablet_error(task, rowno, lineno, int_nil,
1255 : "incomplete record at end of file", s);
1256 1 : task->b->pos += partial;
1257 : }
1258 1 : goto reportlackofinput;
1259 : }
1260 :
1261 102734 : if (task->errbuf && task->errbuf[0]) {
1262 0 : if (unlikely(GDKerrbuf && GDKerrbuf[0])) {
1263 0 : tablet_error(task, rowno, lineno, int_nil, GDKerrbuf,
1264 : "SQLload_file");
1265 : /* TRC_DEBUG(MAL_SERVER, "Bailout on SQLload\n");*/
1266 0 : ateof[cur] = true;
1267 0 : break;
1268 : }
1269 : }
1270 :
1271 102734 : parseSTDIN:
1272 :
1273 : /* copy the stream buffer into the input buffer, which is guaranteed larger, but still limited */
1274 103494 : partial = 0;
1275 103494 : task->top[cur] = 0;
1276 103494 : s = task->input[cur];
1277 103494 : base = end;
1278 : /* avoid too long records */
1279 103494 : if (unlikely(end - s + task->b->len - task->b->pos >= task->rowlimit[cur])) {
1280 : /* the input buffer should be extended, but 'base' is not shared
1281 : between the threads, which we can not now update.
1282 : Mimick an ateof instead; */
1283 0 : tablet_error(task, rowno, lineno, int_nil, "record too long", "");
1284 0 : ateof[cur] = true;
1285 : /* TRC_DEBUG(MAL_SERVER, "Bailout on SQLload confronted with too large record\n");*/
1286 0 : goto reportlackofinput;
1287 : }
1288 103494 : memcpy(end, task->b->buf + task->b->pos, task->b->len - task->b->pos);
1289 103494 : end = end + task->b->len - task->b->pos;
1290 103494 : *end = '\0'; /* this is safe, as the stream ensures an extra byte */
1291 : /* Note that we rescan from the start of a record (the last
1292 : * partial buffer from the previous iteration), even if in the
1293 : * previous iteration we have already established that there
1294 : * is no record separator in the first, perhaps significant,
1295 : * part of the buffer. This is because if the record separator
1296 : * is longer than one byte, it is too complex (i.e. would
1297 : * require more state) to be sure what the state of the quote
1298 : * status is when we back off a few bytes from where the last
1299 : * scan ended (we need to back off some since we could be in
1300 : * the middle of the record separator). If this is too
1301 : * costly, we have to rethink the matter. */
1302 103494 : if (task->from_stdin && *s == '\n' && task->maxrow == BUN_MAX) {
1303 0 : ateof[cur] = true;
1304 0 : goto reportlackofinput;
1305 : }
1306 144950269 : for (e = s; *e && e < end && cnt < task->maxrow;) {
1307 : /* tokenize the record completely
1308 : *
1309 : * The format of the input should comply to the following
1310 : * grammar rule [ [[quote][[esc]char]*[quote]csep]*rsep]*
1311 : * where quote is a single user-defined character.
1312 : * Within the quoted fields a character may be escaped
1313 : * with a backslash. The correct number of fields should
1314 : * be supplied. In the first phase we simply break the
1315 : * rows at the record boundary. */
1316 : int nutf = 0;
1317 : int m = 0;
1318 : bool bs = false;
1319 : char q = 0;
1320 : size_t i = 0;
1321 3093097739 : while (*e) {
1322 3093096727 : if (task->skip > 0) {
1323 : /* no interpretation of data we're skipping, just
1324 : * look for newline */
1325 1827 : if (*e == '\n') {
1326 27 : lineno++;
1327 27 : break;
1328 : }
1329 : } else {
1330 : /* check for correctly encoded UTF-8 */
1331 3093094900 : if (nutf > 0) {
1332 1835 : if (unlikely((*e & 0xC0) != 0x80))
1333 1 : goto badutf8;
1334 3668 : if (unlikely(m != 0 && (*e & m) == 0))
1335 0 : goto badutf8;
1336 1834 : m = 0;
1337 1834 : nutf--;
1338 3093093065 : } else if ((*e & 0x80) != 0) {
1339 1707 : if ((*e & 0xE0) == 0xC0) {
1340 1582 : nutf = 1;
1341 1582 : if (unlikely((e[0] & 0x1E) == 0))
1342 0 : goto badutf8;
1343 125 : } else if ((*e & 0xF0) == 0xE0) {
1344 122 : nutf = 2;
1345 122 : if ((e[0] & 0x0F) == 0)
1346 3 : m = 0x20;
1347 3 : } else if (likely((*e & 0xF8) == 0xF0)) {
1348 3 : nutf = 3;
1349 3 : if ((e[0] & 0x07) == 0)
1350 3 : m = 0x30;
1351 : } else {
1352 0 : goto badutf8;
1353 : }
1354 3093091358 : } else if (*e == '\n')
1355 144846605 : lineno++;
1356 : /* check for quoting and the row separator */
1357 3093094899 : if (bs) {
1358 : bs = false;
1359 3092551619 : } else if (task->escape && *e == '\\') {
1360 : bs = true;
1361 : i = 0;
1362 3092008339 : } else if (*e == q) {
1363 : q = 0;
1364 3087056140 : } else if (*e == quote) {
1365 : q = quote;
1366 : i = 0;
1367 3082103875 : } else if (q == 0) {
1368 2934833312 : i = rdfa[i][(unsigned char) *e];
1369 2934833312 : if (i == rseplen)
1370 : break;
1371 : }
1372 : }
1373 2948249951 : e++;
1374 : }
1375 144847787 : if (*e == 0) {
1376 1012 : partial = e - s;
1377 : /* found an incomplete record, saved for next round */
1378 1012 : if (unlikely(s + partial < end)) {
1379 : /* found a EOS in the input */
1380 0 : tablet_error(task, rowno, startlineno, int_nil,
1381 : "record too long (EOS found)", "");
1382 0 : ateof[cur] = true;
1383 0 : goto reportlackofinput;
1384 : }
1385 : break;
1386 : } else {
1387 144846775 : rowno++;
1388 144846775 : if (task->skip > 0) {
1389 27 : task->skip--;
1390 : } else {
1391 144846748 : if (cnt < task->maxrow) {
1392 144846748 : task->startlineno[cur][task->top[cur]] = startlineno;
1393 144846748 : task->rows[cur][task->top[cur]++] = s;
1394 144846748 : startlineno = lineno;
1395 144846748 : cnt++;
1396 : }
1397 144846748 : *(e + 1 - rseplen) = 0;
1398 : }
1399 144846775 : s = ++e;
1400 144846775 : task->b->pos += (size_t) (e - base);
1401 144846775 : base = e;
1402 144846775 : if (task->top[cur] == task->limit)
1403 : break;
1404 : }
1405 : }
1406 :
1407 102481 : reportlackofinput:
1408 : /* TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
1409 :
1410 103495 : if (consoleinput) {
1411 101934 : task->cur = cur;
1412 101934 : task->ateof = ateof[cur];
1413 101934 : task->cnt = bufcnt[cur];
1414 : /* tell consumer to go ahead */
1415 101934 : MT_sema_up(&task->consumer);
1416 : /* then wait until it is done */
1417 101934 : MT_sema_down(&task->producer);
1418 101934 : if (cnt == task->maxrow) {
1419 757 : GDKfree(rdfa);
1420 757 : MT_thread_set_qry_ctx(NULL);
1421 757 : return;
1422 : }
1423 : } else {
1424 1561 : assert(!blocked[cur]);
1425 1561 : if (blocked[(cur + 1) % MAXBUFFERS]) {
1426 : /* first wait until other buffer is done */
1427 : /* TRC_DEBUG(MAL_SERVER, "Wait for consumers to finish buffer: %d\n", (cur + 1) % MAXBUFFERS);*/
1428 :
1429 1250 : MT_sema_down(&task->producer);
1430 1250 : blocked[(cur + 1) % MAXBUFFERS] = false;
1431 1250 : if (task->state == ENDOFCOPY) {
1432 0 : GDKfree(rdfa);
1433 0 : MT_thread_set_qry_ctx(NULL);
1434 0 : return;
1435 : }
1436 : }
1437 : /* other buffer is done, proceed with current buffer */
1438 1561 : assert(!blocked[(cur + 1) % MAXBUFFERS]);
1439 1561 : blocked[cur] = true;
1440 1561 : task->cur = cur;
1441 1561 : task->ateof = ateof[cur];
1442 1561 : task->cnt = bufcnt[cur];
1443 1561 : more = !ateof[cur] || (e && e < end
1444 0 : && task->top[cur] == task->limit);
1445 : /* TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
1446 :
1447 1561 : MT_sema_up(&task->consumer);
1448 :
1449 1561 : cur = (cur + 1) % MAXBUFFERS;
1450 : /* TRC_DEBUG(MAL_SERVER, "May continue with buffer: %d\n", cur);*/
1451 :
1452 1561 : if (cnt == task->maxrow) {
1453 163 : MT_sema_down(&task->producer);
1454 : /* TRC_DEBUG(MAL_SERVER, "Producer delivered all\n");*/
1455 163 : GDKfree(rdfa);
1456 163 : MT_thread_set_qry_ctx(NULL);
1457 163 : return;
1458 : }
1459 : }
1460 : /* TRC_DEBUG(MAL_SERVER, "Continue producer buffer: %d\n", cur);*/
1461 :
1462 : /* we ran out of input? */
1463 102575 : if (task->ateof && !more) {
1464 : /* TRC_DEBUG(MAL_SERVER, "Producer encountered eof\n");*/
1465 151 : GDKfree(rdfa);
1466 151 : MT_thread_set_qry_ctx(NULL);
1467 151 : return;
1468 : }
1469 : /* consumers ask us to stop? */
1470 102424 : if (task->state == ENDOFCOPY) {
1471 0 : GDKfree(rdfa);
1472 0 : MT_thread_set_qry_ctx(NULL);
1473 0 : return;
1474 : }
1475 102424 : bufcnt[cur] = cnt;
1476 : /* move the non-parsed correct row data to the head of the next buffer */
1477 102424 : end = s = task->input[cur];
1478 : }
1479 0 : if (unlikely(cnt < task->maxrow && task->maxrow != BUN_NONE)) {
1480 0 : char msg[256];
1481 0 : snprintf(msg, sizeof(msg), "incomplete record at end of file:%s\n", s);
1482 0 : task->as->error = GDKstrdup(msg);
1483 0 : tablet_error(task, rowno, startlineno, int_nil,
1484 : "incomplete record at end of file", s);
1485 0 : task->b->pos += partial;
1486 : }
1487 0 : GDKfree(rdfa);
1488 0 : MT_thread_set_qry_ctx(NULL);
1489 :
1490 0 : return;
1491 :
1492 1 : badutf8:
1493 1 : tablet_error(task, rowno, startlineno, int_nil,
1494 : "input not properly encoded UTF-8", "");
1495 1 : ateof[cur] = true;
1496 1 : goto reportlackofinput;
1497 : }
1498 :
1499 : static void
1500 1095 : create_rejects_table(Client cntxt)
1501 : {
1502 1095 : MT_lock_set(&mal_contextLock);
1503 1095 : if (cntxt->error_row == NULL) {
1504 472 : cntxt->error_row = COLnew(0, TYPE_lng, 0, TRANSIENT);
1505 472 : cntxt->error_fld = COLnew(0, TYPE_int, 0, TRANSIENT);
1506 472 : cntxt->error_msg = COLnew(0, TYPE_str, 0, TRANSIENT);
1507 472 : cntxt->error_input = COLnew(0, TYPE_str, 0, TRANSIENT);
1508 472 : if (cntxt->error_row == NULL || cntxt->error_fld == NULL
1509 472 : || cntxt->error_msg == NULL || cntxt->error_input == NULL) {
1510 0 : BBPreclaim(cntxt->error_row);
1511 0 : BBPreclaim(cntxt->error_fld);
1512 0 : BBPreclaim(cntxt->error_msg);
1513 0 : BBPreclaim(cntxt->error_input);
1514 0 : cntxt->error_row = cntxt->error_fld = cntxt->error_msg = cntxt->error_input = NULL;
1515 : }
1516 : }
1517 1095 : MT_lock_unset(&mal_contextLock);
1518 1095 : }
1519 :
1520 : BUN
1521 1071 : SQLload_file(Client cntxt, Tablet *as, bstream *b, stream *out,
1522 : const char *csep, const char *rsep, char quote, lng skip,
1523 : lng maxrow, int best, bool from_stdin, const char *tabnam,
1524 : bool escape)
1525 : {
1526 1071 : BUN cnt = 0, cntstart = 0, leftover = 0;
1527 1071 : int res = 0; /* < 0: error, > 0: success, == 0: continue processing */
1528 1071 : int j;
1529 1071 : BUN firstcol;
1530 1071 : BUN i, attr;
1531 1071 : READERtask task;
1532 1071 : READERtask ptask[MAXWORKERS];
1533 1071 : int threads = 1;
1534 1071 : lng tio, t1 = 0;
1535 1071 : char name[MT_NAME_LEN];
1536 :
1537 1071 : if (maxrow < 0 || maxrow > (LL_CONSTANT(1) << 16)) {
1538 103 : threads = GDKgetenv_int("tablet_threads", GDKnr_threads);
1539 103 : if (threads > 1)
1540 103 : threads = threads < MAXWORKERS ? threads - 1 : MAXWORKERS - 1;
1541 : else
1542 : threads = 1;
1543 : }
1544 :
1545 : /* TRC_DEBUG(MAL_SERVER, "Prepare copy work for '%d' threads col '%s' rec '%s' quot '%c'\n", threads, csep, rsep, quote);*/
1546 :
1547 1071 : memset(ptask, 0, sizeof(ptask));
1548 2142 : task = (READERtask) {
1549 : .cntxt = cntxt,
1550 : .from_stdin = from_stdin,
1551 : .as = as,
1552 : .escape = escape, /* TODO: implement feature!!! */
1553 1071 : .set_qry_ctx = MT_thread_get_qry_ctx() != NULL,
1554 : };
1555 :
1556 : /* create the reject tables */
1557 1071 : create_rejects_table(task.cntxt);
1558 1071 : if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL
1559 1071 : || task.cntxt->error_msg == NULL || task.cntxt->error_input == NULL) {
1560 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1561 : "SQLload initialization failed", "");
1562 : /* nothing allocated yet, so nothing to free */
1563 0 : return BUN_NONE;
1564 : }
1565 :
1566 1071 : assert(rsep);
1567 1071 : assert(csep);
1568 1071 : assert(maxrow < 0 || maxrow <= (lng) BUN_MAX);
1569 1071 : task.fields = (char ***) GDKzalloc(as->nr_attrs * sizeof(char **));
1570 1071 : task.cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
1571 1071 : task.time = (lng *) GDKzalloc(as->nr_attrs * sizeof(lng));
1572 1071 : if (task.fields == NULL || task.cols == NULL || task.time == NULL) {
1573 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1574 : "memory allocation failed", "SQLload_file");
1575 0 : goto bailout;
1576 : }
1577 1071 : task.cur = 0;
1578 3213 : for (i = 0; i < MAXBUFFERS; i++) {
1579 2142 : task.base[i] = GDKmalloc(MAXROWSIZE(2 * b->size) + 2);
1580 2142 : task.rowlimit[i] = MAXROWSIZE(2 * b->size);
1581 2142 : if (task.base[i] == NULL) {
1582 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1583 : SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
1584 0 : goto bailout;
1585 : }
1586 2142 : task.base[i][0] = task.base[i][b->size + 1] = 0;
1587 2142 : task.input[i] = task.base[i] + 1; /* wrap the buffer with null bytes */
1588 : }
1589 1071 : task.besteffort = best;
1590 :
1591 1071 : if (maxrow < 0)
1592 71 : task.maxrow = BUN_MAX;
1593 : else
1594 1000 : task.maxrow = (BUN) maxrow;
1595 :
1596 1071 : task.skip = skip;
1597 1071 : task.quote = quote;
1598 1071 : task.csep = csep;
1599 1071 : task.seplen = strlen(csep);
1600 1071 : task.rsep = rsep;
1601 1071 : task.rseplen = strlen(rsep);
1602 1071 : task.errbuf = cntxt->errbuf;
1603 :
1604 1071 : MT_sema_init(&task.producer, 0, "task.producer");
1605 1071 : MT_sema_init(&task.consumer, 0, "task.consumer");
1606 1071 : task.ateof = false;
1607 1071 : task.b = b;
1608 1071 : task.out = out;
1609 :
1610 1071 : as->error = NULL;
1611 :
1612 : /* there is no point in creating more threads than we have columns */
1613 1071 : if (as->nr_attrs < (BUN) threads)
1614 38 : threads = (int) as->nr_attrs;
1615 :
1616 : /* allocate enough space for pointers into the buffer pool. */
1617 : /* the record separator is considered a column */
1618 1071 : task.limit = (int) (b->size / as->nr_attrs + as->nr_attrs);
1619 11190 : for (i = 0; i < as->nr_attrs; i++) {
1620 10119 : task.fields[i] = GDKmalloc(sizeof(char *) * task.limit);
1621 10119 : if (task.fields[i] == NULL) {
1622 0 : if (task.as->error == NULL)
1623 0 : as->error = createException(MAL, "sql.copy_from",
1624 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
1625 0 : goto bailout;
1626 : }
1627 10119 : task.cols[i] = (int) (i + 1); /* to distinguish non initialized later with zero */
1628 : }
1629 3213 : for (i = 0; i < MAXBUFFERS; i++) {
1630 2142 : task.rows[i] = GDKzalloc(sizeof(char *) * task.limit);
1631 2142 : task.startlineno[i] = GDKzalloc(sizeof(lng) * task.limit);
1632 2142 : if (task.rows[i] == NULL || task.startlineno[i] == NULL) {
1633 0 : GDKfree(task.rows[i]);
1634 0 : GDKfree(task.startlineno[i]);
1635 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1636 : SQLSTATE(HY013) MAL_MALLOC_FAIL,
1637 : "SQLload_file:failed to alloc buffers");
1638 0 : goto bailout;
1639 : }
1640 : }
1641 1071 : task.rowerror = (bte *) GDKzalloc(sizeof(bte) * task.limit);
1642 1071 : if (task.rowerror == NULL) {
1643 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1644 : SQLSTATE(HY013) MAL_MALLOC_FAIL,
1645 : "SQLload_file:failed to alloc rowerror buffer");
1646 0 : goto bailout;
1647 : }
1648 :
1649 1071 : task.id = 0;
1650 1071 : snprintf(name, sizeof(name), "prod-%s", tabnam);
1651 1071 : if (MT_create_thread(&task.tid, SQLproducer, (void *) &task, MT_THR_JOINABLE, name) < 0) {
1652 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1653 : SQLSTATE(42000) "failed to start producer thread",
1654 : "SQLload_file");
1655 0 : goto bailout;
1656 : }
1657 : /* TRC_DEBUG(MAL_SERVER, "Parallel bulk load " LLFMT " - " BUNFMT "\n", skip, task.maxrow);*/
1658 :
1659 1071 : task.workers = threads;
1660 2301 : for (j = 0; j < threads; j++) {
1661 1230 : ptask[j] = task;
1662 1230 : ptask[j].id = j;
1663 1230 : ptask[j].cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
1664 1230 : if (ptask[j].cols == NULL) {
1665 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1666 : SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
1667 0 : task.id = -1;
1668 0 : MT_sema_up(&task.producer);
1669 0 : goto bailout;
1670 : }
1671 1230 : snprintf(name, sizeof(name), "ptask%d.sema", j);
1672 1230 : MT_sema_init(&ptask[j].sema, 0, name);
1673 1230 : snprintf(name, sizeof(name), "ptask%d.repl", j);
1674 1230 : MT_sema_init(&ptask[j].reply, 0, name);
1675 1230 : snprintf(name, sizeof(name), "wrkr%d-%s", j, tabnam);
1676 1230 : if (MT_create_thread(&ptask[j].tid, SQLworker, (void *) &ptask[j], MT_THR_JOINABLE, name) < 0) {
1677 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1678 : SQLSTATE(42000) "failed to start worker thread",
1679 : "SQLload_file");
1680 0 : threads = j;
1681 0 : for (j = 0; j < threads; j++)
1682 0 : ptask[j].workers = threads;
1683 : }
1684 : }
1685 1071 : if (threads == 0) {
1686 : /* no threads started */
1687 0 : task.id = -1;
1688 0 : MT_sema_up(&task.producer);
1689 0 : goto bailout;
1690 : }
1691 1071 : MT_sema_up(&task.producer);
1692 :
1693 1071 : tio = GDKusec();
1694 1071 : tio = GDKusec() - tio;
1695 1071 : t1 = GDKusec();
1696 2145 : for (firstcol = 0; firstcol < task.as->nr_attrs; firstcol++)
1697 1074 : if (task.as->format[firstcol].c != NULL)
1698 : break;
1699 104392 : while (res == 0 && cnt < task.maxrow) {
1700 :
1701 : // track how many elements are in the aggregated BATs
1702 103495 : cntstart = BATcount(task.as->format[firstcol].c);
1703 : /* block until the producer has data available */
1704 103495 : MT_sema_down(&task.consumer);
1705 103495 : cnt += task.top[task.cur];
1706 103495 : if (task.ateof && !task.top[task.cur])
1707 : break;
1708 103344 : t1 = GDKusec() - t1;
1709 : /* TRC_DEBUG(MAL_SERVER, "Break: %d rows\n", task.top[task.cur]);*/
1710 :
1711 103344 : t1 = GDKusec();
1712 103344 : if (task.top[task.cur]) {
1713 : /* activate the workers to break rows */
1714 408165 : for (j = 0; j < threads; j++) {
1715 : /* stage one, break the rows in parallel */
1716 304848 : ptask[j].error = 0;
1717 304848 : ptask[j].state = BREAKROW;
1718 304848 : ptask[j].next = task.top[task.cur];
1719 304848 : ptask[j].fields = task.fields;
1720 304848 : ptask[j].limit = task.limit;
1721 304848 : ptask[j].cnt = task.cnt;
1722 304848 : ptask[j].cur = task.cur;
1723 304848 : ptask[j].top[task.cur] = task.top[task.cur];
1724 304848 : MT_sema_up(&ptask[j].sema);
1725 : }
1726 : }
1727 103344 : if (task.top[task.cur]) {
1728 : /* await completion of row break phase */
1729 408165 : for (j = 0; j < threads; j++) {
1730 304848 : MT_sema_down(&ptask[j].reply);
1731 304848 : if (ptask[j].error) {
1732 0 : res = -1;
1733 : /* TRC_ERROR(MAL_SERVER, "Error in task: %d %d\n", j, ptask[j].error);*/
1734 : }
1735 : }
1736 : }
1737 :
1738 : /* TRC_DEBUG(MAL_SERVER,
1739 : "Fill the BATs '%d' " BUNFMT " cap " BUNFMT "\n",
1740 : task.top[task.cur], task.cnt, BATcapacity(as->format[task.cur].c));*/
1741 :
1742 103344 : if (task.top[task.cur]) {
1743 103317 : if (res == 0) {
1744 103317 : SQLworkdivider(&task, ptask, (int) as->nr_attrs, threads);
1745 :
1746 : /* activate the workers to update the BATs */
1747 511482 : for (j = 0; j < threads; j++) {
1748 : /* stage two, update the BATs */
1749 304848 : ptask[j].state = UPDATEBAT;
1750 304848 : MT_sema_up(&ptask[j].sema);
1751 : }
1752 : }
1753 : }
1754 103344 : tio = GDKusec();
1755 103344 : tio = t1 - tio;
1756 :
1757 : /* await completion of the BAT updates */
1758 103344 : if (res == 0 && task.top[task.cur]) {
1759 408165 : for (j = 0; j < threads; j++) {
1760 304848 : MT_sema_down(&ptask[j].reply);
1761 304848 : if (ptask[j].errorcnt > 0 && !ptask[j].besteffort) {
1762 304848 : res = -1;
1763 304848 : best = 0;
1764 : }
1765 : }
1766 : }
1767 :
1768 : /* trim the BATs discarding error tuples */
1769 : #define trimerrors(TYPE) \
1770 : do { \
1771 : TYPE *src, *dst; \
1772 : leftover= BATcount(task.as->format[attr].c); \
1773 : limit = leftover - cntstart; \
1774 : dst =src= (TYPE *) BUNtloc(task.as->format[attr].ci,cntstart); \
1775 : for(j = 0; j < (int) limit; j++, src++){ \
1776 : if ( task.rowerror[j]){ \
1777 : leftover--; \
1778 : continue; \
1779 : } \
1780 : *dst++ = *src; \
1781 : } \
1782 : BATsetcount(task.as->format[attr].c, leftover ); \
1783 : } while (0)
1784 :
1785 : /* TRC_DEBUG(MAL_SERVER, "Trim bbest '%d' table size " BUNFMT " - rows found so far " BUNFMT "\n",
1786 : best, BATcount(as->format[firstcol].c), task.cnt); */
1787 :
1788 103344 : if (best && BATcount(as->format[firstcol].c)) {
1789 : BUN limit;
1790 : int width;
1791 :
1792 45 : for (attr = 0; attr < as->nr_attrs; attr++) {
1793 31 : if (as->format[attr].skip)
1794 5 : continue;
1795 26 : width = as->format[attr].c->twidth;
1796 26 : as->format[attr].ci = bat_iterator_nolock(as->format[attr].c);
1797 26 : switch (width) {
1798 5 : case 1:
1799 16 : trimerrors(bte);
1800 5 : break;
1801 0 : case 2:
1802 0 : trimerrors(sht);
1803 0 : break;
1804 21 : case 4:
1805 55 : trimerrors(int);
1806 21 : break;
1807 0 : case 8:
1808 0 : trimerrors(lng);
1809 0 : break;
1810 : #ifdef HAVE_HGE
1811 0 : case 16:
1812 0 : trimerrors(hge);
1813 0 : break;
1814 : #endif
1815 0 : default:
1816 : {
1817 0 : char *src, *dst;
1818 0 : leftover = BATcount(task.as->format[attr].c);
1819 0 : limit = leftover - cntstart;
1820 0 : dst = src = BUNtloc(task.as->format[attr].ci, cntstart);
1821 0 : for (j = 0; j < (int) limit; j++, src += width) {
1822 0 : if (task.rowerror[j]) {
1823 0 : leftover--;
1824 0 : continue;
1825 : }
1826 0 : if (dst != src)
1827 0 : memcpy(dst, src, width);
1828 0 : dst += width;
1829 : }
1830 0 : BATsetcount(task.as->format[attr].c, leftover);
1831 : }
1832 0 : break;
1833 : }
1834 : }
1835 : // re-initialize the error vector;
1836 14 : memset(task.rowerror, 0, task.limit);
1837 14 : task.errorcnt = 0;
1838 : }
1839 :
1840 103344 : if (res < 0) {
1841 : /* producer should stop */
1842 23 : task.maxrow = cnt;
1843 23 : task.state = ENDOFCOPY;
1844 23 : task.ateof = true;
1845 : }
1846 103344 : if (task.ateof && task.top[task.cur] < task.limit && cnt != task.maxrow)
1847 : break;
1848 103344 : task.top[task.cur] = 0;
1849 103344 : if (cnt == task.maxrow)
1850 920 : task.ateof = true;
1851 104415 : MT_sema_up(&task.producer);
1852 : }
1853 :
1854 : /* TRC_DEBUG(MAL_SERVER, "End of block stream eof=%d - res=%d\n", task.ateof, res);*/
1855 :
1856 1071 : cnt = BATcount(task.as->format[firstcol].c);
1857 :
1858 1071 : task.state = ENDOFCOPY;
1859 : /* TRC_DEBUG(MAL_SERVER, "Activate sync on disk\n");*/
1860 :
1861 1071 : if (!task.ateof || cnt < task.maxrow) {
1862 : /* TRC_DEBUG(MAL_SERVER, "Shut down reader\n");*/
1863 168 : MT_sema_up(&task.producer);
1864 : }
1865 1071 : MT_join_thread(task.tid);
1866 :
1867 : /* TRC_DEBUG(MAL_SERVER, "Activate endofcopy\n");*/
1868 :
1869 3372 : for (j = 0; j < threads; j++) {
1870 1230 : ptask[j].state = ENDOFCOPY;
1871 1230 : MT_sema_up(&ptask[j].sema);
1872 : }
1873 : /* wait for their death */
1874 2301 : for (j = 0; j < threads; j++)
1875 1230 : MT_sema_down(&ptask[j].reply);
1876 :
1877 : /* TRC_DEBUG(MAL_SERVER, "Kill the workers\n");*/
1878 :
1879 2301 : for (j = 0; j < threads; j++) {
1880 1230 : MT_join_thread(ptask[j].tid);
1881 1230 : GDKfree(ptask[j].cols);
1882 1230 : MT_sema_destroy(&ptask[j].sema);
1883 1230 : MT_sema_destroy(&ptask[j].reply);
1884 : }
1885 :
1886 : /* TRC_DEBUG(MAL_SERVER, "Found " BUNFMT " tuples\n", cnt);*/
1887 : /* TRC_DEBUG(MAL_SERVER, "Leftover input: %.63s\n", task.b->buf + task.b->pos);*/
1888 :
1889 11190 : for (i = 0; i < as->nr_attrs; i++) {
1890 10119 : BAT *b = task.as->format[i].c;
1891 10119 : if (b)
1892 10113 : BATsettrivprop(b);
1893 10119 : GDKfree(task.fields[i]);
1894 : }
1895 1071 : GDKfree(task.fields);
1896 1071 : GDKfree(task.cols);
1897 1071 : GDKfree(task.time);
1898 4284 : for (i = 0; i < MAXBUFFERS; i++) {
1899 2142 : GDKfree(task.base[i]);
1900 2142 : GDKfree(task.rows[i]);
1901 2142 : GDKfree(task.startlineno[i]);
1902 : }
1903 1071 : if (task.rowerror)
1904 1071 : GDKfree(task.rowerror);
1905 1071 : MT_sema_destroy(&task.producer);
1906 1071 : MT_sema_destroy(&task.consumer);
1907 :
1908 1071 : return res < 0 ? BUN_NONE : cnt;
1909 :
1910 0 : bailout:
1911 0 : if (task.fields) {
1912 0 : for (i = 0; i < as->nr_attrs; i++)
1913 0 : GDKfree(task.fields[i]);
1914 0 : GDKfree(task.fields);
1915 : }
1916 0 : GDKfree(task.time);
1917 0 : GDKfree(task.cols);
1918 0 : GDKfree(task.base[task.cur]);
1919 0 : GDKfree(task.rowerror);
1920 0 : for (i = 0; i < MAXWORKERS; i++)
1921 0 : GDKfree(ptask[i].cols);
1922 : return BUN_NONE;
1923 : }
1924 :
1925 : /* return the latest reject table, to be on the safe side we should
1926 : * actually create copies within a critical section. Ignored for now. */
1927 : str
1928 24 : COPYrejects(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1929 : {
1930 24 : bat *row = getArgReference_bat(stk, pci, 0);
1931 24 : bat *fld = getArgReference_bat(stk, pci, 1);
1932 24 : bat *msg = getArgReference_bat(stk, pci, 2);
1933 24 : bat *inp = getArgReference_bat(stk, pci, 3);
1934 :
1935 24 : create_rejects_table(cntxt);
1936 24 : if (cntxt->error_row == NULL)
1937 0 : throw(MAL, "sql.rejects", "No reject table available");
1938 24 : BBPretain(*row = cntxt->error_row->batCacheid);
1939 24 : BBPretain(*fld = cntxt->error_fld->batCacheid);
1940 24 : BBPretain(*msg = cntxt->error_msg->batCacheid);
1941 24 : BBPretain(*inp = cntxt->error_input->batCacheid);
1942 24 : (void) mb;
1943 24 : return MAL_SUCCEED;
1944 : }
1945 :
1946 : str
1947 13 : COPYrejects_clear(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1948 : {
1949 13 : if (cntxt->error_row) {
1950 13 : MT_lock_set(&errorlock);
1951 13 : BATclear(cntxt->error_row, true);
1952 13 : if (cntxt->error_fld)
1953 13 : BATclear(cntxt->error_fld, true);
1954 13 : if (cntxt->error_msg)
1955 13 : BATclear(cntxt->error_msg, true);
1956 13 : if (cntxt->error_input)
1957 13 : BATclear(cntxt->error_input, true);
1958 13 : MT_lock_unset(&errorlock);
1959 : }
1960 13 : (void) mb;
1961 13 : (void) stk;
1962 13 : (void) pci;
1963 13 : return MAL_SUCCEED;
1964 : }
|