Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024, 2025 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * Niels Nes, Martin Kersten
15 : *
16 : * Parallel bulk load for SQL
17 : * The COPY INTO command for SQL is heavily CPU bound, which means
18 : * that ideally we would like to exploit the multi-cores to do that
19 : * work in parallel.
20 : * Complicating factors are the initial record offset, the
21 : * possible variable length of the input, and the original sort order
22 : * that should preferable be maintained.
23 : *
24 : * The code below consists of a file reader, which breaks up the
25 : * file into chunks of distinct rows. Then multiple parallel threads
26 : * grab them, and break them on the field boundaries.
27 : * After all fields are identified this way, the columns are converted
28 : * and stored in the BATs.
29 : *
30 : * The threads get a reference to a private copy of the READERtask.
31 : * It includes a list of columns they should handle. This is a basis
32 : * to distributed cheap and expensive columns over threads.
33 : *
34 : * The file reader overlaps IO with updates of the BAT.
35 : * Also the buffer size of the block stream might be a little small for
36 : * this task (1MB). It has been increased to 8MB, which indeed improved.
37 : *
38 : * The work divider allocates subtasks to threads based on the
39 : * observed time spending so far.
40 : */
41 :
42 : #include "monetdb_config.h"
43 : #include "tablet.h"
44 : #include "mapi_prompt.h"
45 : #include "mal_internal.h"
46 :
47 : #include <string.h>
48 : #include <ctype.h>
49 :
50 : #define MAXWORKERS 64
51 : #define MAXBUFFERS 2
52 : /* We restrict the row length to be 32MB for the time being */
53 : #define MAXROWSIZE(X) (X > 32*1024*1024 ? X : 32*1024*1024)
54 :
55 : static MT_Lock errorlock = MT_LOCK_INITIALIZER(errorlock);
56 :
57 : static BAT *
58 10372 : void_bat_create(int adt, BUN nr)
59 : {
60 10372 : BAT *b = COLnew(0, adt, nr, TRANSIENT);
61 :
62 : /* check for correct structures */
63 10372 : if (b == NULL)
64 : return NULL;
65 10372 : if ((b = BATsetaccess(b, BAT_APPEND)) == NULL) {
66 : return NULL;
67 : }
68 :
69 : /* disable all properties here */
70 10372 : b->tsorted = false;
71 10372 : b->trevsorted = false;
72 10372 : b->tnosorted = 0;
73 10372 : b->tnorevsorted = 0;
74 10372 : b->tseqbase = oid_nil;
75 10372 : b->tkey = false;
76 10372 : b->tnokey[0] = 0;
77 10372 : b->tnokey[1] = 0;
78 10372 : return b;
79 : }
80 :
81 : void
82 127857 : TABLETdestroy_format(Tablet *as)
83 : {
84 127857 : BUN p;
85 127857 : Column *fmt = as->format;
86 :
87 609748 : for (p = 0; p < as->nr_attrs; p++) {
88 481853 : BBPreclaim(fmt[p].c);
89 481849 : if (fmt[p].data)
90 10377 : GDKfree(fmt[p].data);
91 : }
92 127895 : GDKfree(fmt);
93 127903 : }
94 :
95 : static oid
96 126740 : check_BATs(Tablet *as)
97 : {
98 126740 : Column *fmt = as->format;
99 126740 : BUN i = 0;
100 126740 : BUN cnt;
101 126740 : oid base;
102 :
103 126740 : if (fmt[i].c == NULL)
104 126700 : i++;
105 126740 : cnt = BATcount(fmt[i].c);
106 126740 : base = fmt[i].c->hseqbase;
107 :
108 126740 : if (as->nr != cnt) {
109 6697 : for (i = 0; i < as->nr_attrs; i++)
110 5997 : if (fmt[i].c)
111 5297 : fmt[i].p = as->offset;
112 700 : return oid_nil;
113 : }
114 :
115 591164 : for (i = 0; i < as->nr_attrs; i++) {
116 465124 : BAT *b = fmt[i].c;
117 :
118 465124 : if (b == NULL)
119 125991 : continue;
120 :
121 339133 : if (BATcount(b) != cnt || b->hseqbase != base)
122 0 : return oid_nil;
123 :
124 339133 : fmt[i].p = as->offset;
125 : }
126 : return base;
127 : }
128 :
129 : str
130 1128 : TABLETcreate_bats(Tablet *as, BUN est)
131 : {
132 1128 : Column *fmt = as->format;
133 1128 : BUN i, nr = 0;
134 :
135 11506 : for (i = 0; i < as->nr_attrs; i++) {
136 10378 : if (fmt[i].skip)
137 6 : continue;
138 10372 : fmt[i].c = void_bat_create(fmt[i].adt, est);
139 10372 : if (!fmt[i].c) {
140 0 : while (i > 0) {
141 0 : if (!fmt[--i].skip) {
142 0 : BBPreclaim(fmt[i].c);
143 0 : fmt[i].c = NULL;
144 : }
145 : }
146 0 : throw(SQL, "copy", "Failed to create bat of size " BUNFMT "\n",
147 : est);
148 : }
149 10372 : fmt[i].ci = bat_iterator_nolock(fmt[i].c);
150 10372 : nr++;
151 : }
152 1128 : if (!nr)
153 0 : throw(SQL, "copy",
154 : "At least one column should be read from the input\n");
155 : return MAL_SUCCEED;
156 : }
157 :
158 : str
159 1100 : TABLETcollect(BAT **bats, Tablet *as)
160 : {
161 1100 : Column *fmt = as->format;
162 1100 : BUN i, j;
163 1100 : BUN cnt = 0;
164 :
165 1100 : if (bats == NULL)
166 0 : throw(SQL, "copy", "Missing container");
167 2604 : for (i = 0; i < as->nr_attrs && !cnt; i++)
168 1504 : if (!fmt[i].skip)
169 1501 : cnt = BATcount(fmt[i].c);
170 11374 : for (i = 0, j = 0; i < as->nr_attrs; i++) {
171 10274 : if (fmt[i].skip)
172 6 : continue;
173 10268 : bats[j] = fmt[i].c;
174 10268 : BBPfix(bats[j]->batCacheid);
175 10268 : if ((fmt[i].c = BATsetaccess(fmt[i].c, BAT_READ)) == NULL)
176 0 : throw(SQL, "copy",
177 : "Failed to set access at tablet part " BUNFMT "\n", cnt);
178 10268 : fmt[i].c->tsorted = fmt[i].c->trevsorted = false;
179 10268 : fmt[i].c->tkey = false;
180 10268 : BATsettrivprop(fmt[i].c);
181 :
182 10268 : if (cnt != BATcount(fmt[i].c))
183 0 : throw(SQL, "copy", "Count " BUNFMT " differs from " BUNFMT "\n",
184 : BATcount(fmt[i].c), cnt);
185 10268 : j++;
186 : }
187 : return MAL_SUCCEED;
188 : }
189 :
190 : // the starting quote character has already been skipped
191 :
192 : static char *
193 4688961 : tablet_skip_string(char *s, char quote, bool escape)
194 : {
195 4688961 : size_t i = 0, j = 0;
196 101817311 : while (s[i]) {
197 101769994 : if (escape && s[i] == '\\' && s[i + 1] != '\0')
198 502365 : s[j++] = s[i++];
199 101267629 : else if (s[i] == quote) {
200 3282553 : if (s[i + 1] != quote)
201 : break;
202 : i++; /* skip the first quote */
203 : }
204 97128350 : s[j++] = s[i++];
205 : }
206 4688961 : assert(s[i] == quote || s[i] == '\0');
207 4688961 : if (s[i] == 0)
208 : return NULL;
209 4688961 : s[j] = 0;
210 4688961 : return s + i;
211 : }
212 :
213 : static int
214 0 : TABLET_error(stream *s)
215 : {
216 0 : const char *err = mnstr_peek_error(s);
217 0 : if (err)
218 0 : TRC_ERROR(MAL_SERVER, "Stream error: %s\n", err);
219 0 : return -1;
220 : }
221 :
222 : /* The output line is first built before being sent. It solves a problem
223 : with UDP, where you may loose most of the information using short writes
224 : */
225 :
226 : static inline int
227 0 : output_line(char **buf, size_t *len, char **localbuf, size_t *locallen,
228 : Column *fmt, stream *fd, BUN nr_attrs, oid id)
229 : {
230 0 : BUN i;
231 0 : ssize_t fill = 0;
232 :
233 0 : for (i = 0; i < nr_attrs; i++) {
234 0 : if (fmt[i].c == NULL || fmt[i].virt)
235 0 : continue;
236 0 : if (id < fmt[i].c->hseqbase
237 0 : || id >= fmt[i].c->hseqbase + BATcount(fmt[i].c))
238 : break;
239 0 : fmt[i].p = id - fmt[i].c->hseqbase;
240 : }
241 0 : if (i == nr_attrs) {
242 0 : for (i = 0; i < nr_attrs; i++) {
243 0 : Column *f = fmt + i;
244 0 : const char *p;
245 0 : ssize_t l;
246 :
247 0 : if (f->c) {
248 0 : p = BUNtail(f->ci, f->p);
249 :
250 0 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
251 0 : p = f->nullstr;
252 0 : l = (ssize_t) strlen(f->nullstr);
253 : } else {
254 0 : l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
255 0 : if (l < 0)
256 : return -1;
257 0 : p = *localbuf;
258 : }
259 0 : if (fill + l + f->seplen >= (ssize_t) * len) {
260 : /* extend the buffer */
261 0 : char *nbuf;
262 0 : nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
263 0 : if (nbuf == NULL)
264 : return -1; /* *buf freed by caller */
265 0 : *buf = nbuf;
266 0 : *len = fill + l + f->seplen + BUFSIZ;
267 : }
268 0 : strncpy(*buf + fill, p, l);
269 0 : fill += l;
270 : }
271 0 : strncpy(*buf + fill, f->sep, f->seplen);
272 0 : fill += f->seplen;
273 : }
274 : }
275 0 : if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
276 0 : return TABLET_error(fd);
277 : return 0;
278 : }
279 :
280 : static ssize_t output_multiset_dense(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen, Column *fmt, BUN nr_attrs, int multiset, int composite, bool quoted, int id);
281 : static ssize_t output_multiset_sorted(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen, Column *fmt, BUN nr_attrs, int multiset, int composite, bool quoted, int id);
282 :
283 : static inline ssize_t
284 329 : output_value(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen, Column *f)
285 : {
286 329 : assert (!f->virt && !f->composite && !f->multiset);
287 :
288 329 : const char *p = BUNtail(f->ci, f->p);
289 329 : ssize_t l = 0;
290 :
291 329 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
292 0 : p = f->nullstr;
293 0 : l = (ssize_t) strlen(p);
294 : } else {
295 329 : l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
296 329 : if (l < 0)
297 : return -1;
298 329 : p = *localbuf;
299 : }
300 329 : if (fill + l + f->seplen >= (ssize_t) * len) {
301 : /* extend the buffer */
302 0 : char *nbuf;
303 0 : nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
304 0 : if (nbuf == NULL)
305 : return -1; /* *buf freed by caller */
306 0 : *buf = nbuf;
307 0 : *len = fill + l + f->seplen + BUFSIZ;
308 : }
309 329 : if (l) {
310 329 : strncpy(*buf + fill, p, l);
311 329 : fill += l;
312 329 : f->p++;
313 : }
314 : return fill;
315 : }
316 :
317 : static ssize_t
318 111 : output_composite(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen, Column *fmt, BUN nr_attrs, int composite, bool quoted)
319 : {
320 111 : if (!quoted)
321 25 : (*buf)[fill++] = '\'';
322 111 : (*buf)[fill++] = '(';
323 111 : (void)nr_attrs;
324 : /*
325 : if (fmt->virt) {
326 : assert(0);
327 : fmt++;
328 : nr_attrs--;
329 : }
330 : */
331 111 : int first = 1, j = 0;
332 309 : for( ; composite; composite--) {
333 198 : Column *f = fmt + j;
334 :
335 198 : if (!first) {
336 87 : (*buf)[fill++] = ',';
337 87 : (*buf)[fill++] = ' ';
338 87 : (*buf)[fill] = 0;
339 : }
340 198 : if (f->multiset) {
341 10 : int nr_attrs = f->nrfields - 1;
342 10 : Column *rowid = fmt+j+nr_attrs;
343 10 : const char *p = BUNtail(rowid->ci, rowid->p);
344 :
345 : /* various cases:
346 : * rowid column dense (but for ints !!
347 : * rowid column sorted
348 : * else
349 : */
350 10 : assert(rowid->c->tsorted);
351 10 : if (rowid->c->tkey)
352 10 : fill = output_multiset_dense(buf, len, fill, localbuf, locallen, fmt + j + 1, nr_attrs-1, f->multiset, f->composite, true, *(int*)p);
353 : else
354 0 : fill = output_multiset_sorted(buf, len, fill, localbuf, locallen, fmt + j + 1, nr_attrs-1, f->multiset, f->composite, true, *(int*)p);
355 10 : fmt[j+nr_attrs].p++;
356 10 : f = fmt + j + nr_attrs; /* closing bracket */
357 10 : j += nr_attrs + 1;
358 188 : } else if (f->composite) {
359 44 : int nr_attrs = f->nrfields - 1;
360 44 : fill = output_composite(buf, len, fill, localbuf, locallen, fmt + j + 1, nr_attrs-j-1, f->composite, true);
361 44 : f = fmt + j + nr_attrs; /* closing bracket */
362 44 : j += nr_attrs + 1;
363 144 : } else if (f->c) {
364 144 : fill = output_value(buf, len, fill, localbuf, locallen, f);
365 144 : j++;
366 : }
367 198 : first = 0;
368 : }
369 111 : (*buf)[fill++] = ')';
370 111 : if (!quoted)
371 25 : (*buf)[fill++] = '\'';
372 111 : (*buf)[fill] = 0;
373 111 : return fill;
374 : }
375 :
376 : #define MS_ARRAY 2
377 : /* id is prev id + 1 */
378 : static ssize_t
379 59 : output_multiset_dense(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen,
380 : Column *fmt, BUN nr_attrs, int multiset, int composite, bool quoted, int id)
381 : {
382 59 : nr_attrs -= (multiset == MS_ARRAY)?2:1;
383 59 : Column *msid = fmt + nr_attrs;
384 59 : int *idp = (int*)Tloc(msid->c, msid->p);
385 59 : int first = 1;
386 :
387 59 : if (!quoted)
388 49 : (*buf)[fill++] = '\'';
389 59 : (*buf)[fill++] = '{';
390 59 : (*buf)[fill] = 0;
391 190 : for (; *idp == id && fill > 0; idp++, msid->p++) {
392 131 : if (!first)
393 72 : (*buf)[fill++] = ',';
394 131 : if (composite) {
395 42 : fill = output_composite(buf, len, fill, localbuf, locallen, fmt, nr_attrs, composite, true);
396 : } else {
397 89 : fill = output_value(buf, len, fill, localbuf, locallen, fmt);
398 : }
399 131 : first = 0;
400 : }
401 59 : if (fill < 0)
402 : return fill;
403 59 : (*buf)[fill++] = '}';
404 59 : if (!quoted)
405 49 : (*buf)[fill++] = '\'';
406 59 : (*buf)[fill] = 0;
407 59 : return fill;
408 : }
409 :
410 : /* id >= prev id */
411 : static ssize_t
412 6 : output_multiset_sorted(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen,
413 : Column *fmt, BUN nr_attrs, int multiset, int composite, bool quoted, int id)
414 : {
415 6 : nr_attrs -= (multiset == MS_ARRAY)?2:1;
416 6 : Column *msid = fmt + nr_attrs;
417 : /* how to also keep prev id */
418 6 : BUN pos = msid->p;
419 6 : int *idp = (int*)Tloc(msid->c, pos);
420 6 : int first = 1;
421 :
422 6 : if (msid->p) {
423 17 : while (pos > 0 && idp[-1] >= id) {
424 12 : idp--;
425 12 : pos--;
426 : }
427 5 : msid->p = pos;
428 : }
429 :
430 6 : if (!quoted)
431 6 : (*buf)[fill++] = '\'';
432 6 : (*buf)[fill++] = '{';
433 6 : (*buf)[fill] = 0;
434 24 : for (; *idp == id && fill > 0; idp++, msid->p++, pos++) {
435 18 : if (!first)
436 12 : (*buf)[fill++] = ',';
437 18 : if (composite) {
438 0 : fill = output_composite(buf, len, fill, localbuf, locallen, fmt, nr_attrs, composite, true);
439 : } else {
440 18 : fmt->p = pos;
441 18 : fill = output_value(buf, len, fill, localbuf, locallen, fmt);
442 : }
443 18 : first = 0;
444 : }
445 6 : if (fill < 0)
446 : return fill;
447 6 : (*buf)[fill++] = '}';
448 6 : if (!quoted)
449 6 : (*buf)[fill++] = '\'';
450 6 : (*buf)[fill] = 0;
451 6 : return fill;
452 : }
453 :
454 : static ssize_t
455 80 : output_line_complex(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen,
456 : Column *fmt, BUN nr_attrs)
457 : {
458 80 : BUN j;
459 :
460 318 : for (j = 0; j < nr_attrs; ) {
461 238 : Column *f = fmt + j;
462 238 : const char *p;
463 :
464 238 : if (f->multiset) {
465 55 : int nr_attrs = f->nrfields - 1;
466 55 : Column *rowid = fmt+j+nr_attrs;
467 55 : p = BUNtail(rowid->ci, rowid->p);
468 :
469 55 : assert(rowid->c->tsorted);
470 55 : if (rowid->c->tkey)
471 49 : fill = output_multiset_dense(buf, len, fill, localbuf, locallen, fmt + j + 1, nr_attrs-1, f->multiset, f->composite, false, *(int*)p);
472 : else
473 6 : fill = output_multiset_sorted(buf, len, fill, localbuf, locallen, fmt + j + 1, nr_attrs-1, f->multiset, f->composite, false, *(int*)p);
474 55 : fmt[j+nr_attrs].p++;
475 55 : f = fmt + j + nr_attrs; /* closing bracket */
476 55 : j += nr_attrs + 1;
477 183 : } else if (f->composite) {
478 25 : int nr_attrs = f->nrfields - 1;
479 25 : fill = output_composite(buf, len, fill, localbuf, locallen, fmt + j + 1, nr_attrs-j, f->composite, false);
480 25 : f = fmt + j + nr_attrs; /* closing bracket */
481 25 : j += nr_attrs + 1;
482 158 : } else if (f->c) {
483 78 : fill = output_value(buf, len, fill, localbuf, locallen, f);
484 78 : j++;
485 : } else {
486 80 : j++;
487 : }
488 238 : strncpy(*buf + fill, f->sep, f->seplen);
489 238 : fill += f->seplen;
490 : }
491 80 : return fill;
492 : }
493 :
494 : static inline int
495 6210605 : output_line_dense(char **buf, size_t *len, char **localbuf, size_t *locallen,
496 : Column *fmt, stream *fd, BUN nr_attrs)
497 : {
498 6210605 : BUN i;
499 6210605 : ssize_t fill = 0;
500 :
501 27939363 : for (i = 0; i < nr_attrs; i++) {
502 21728784 : Column *f = fmt + i;
503 21728784 : const char *p;
504 21728784 : ssize_t l;
505 :
506 21728784 : if (f->virt)
507 0 : continue;
508 21728784 : if (f->c) {
509 15518174 : p = BUNtail(f->ci, f->p);
510 :
511 15518174 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
512 854566 : p = f->nullstr;
513 854566 : l = (ssize_t) strlen(p);
514 : } else {
515 14663533 : l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
516 14663582 : if (l < 0)
517 : return -1;
518 14663582 : p = *localbuf;
519 : }
520 15518148 : if (fill + l + f->seplen >= (ssize_t) * len) {
521 : /* extend the buffer */
522 78 : char *nbuf;
523 78 : nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
524 78 : if (nbuf == NULL)
525 : return -1; /* *buf freed by caller */
526 78 : *buf = nbuf;
527 78 : *len = fill + l + f->seplen + BUFSIZ;
528 : }
529 15518148 : strncpy(*buf + fill, p, l);
530 15518148 : fill += l;
531 15518148 : f->p++;
532 : }
533 21728758 : strncpy(*buf + fill, f->sep, f->seplen);
534 21728758 : fill += f->seplen;
535 : }
536 6210579 : if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
537 0 : return TABLET_error(fd);
538 : return 0;
539 : }
540 :
541 : static inline int
542 0 : output_line_lookup(char **buf, size_t *len, Column *fmt, stream *fd,
543 : BUN nr_attrs, oid id)
544 : {
545 0 : BUN i;
546 :
547 0 : for (i = 0; i < nr_attrs; i++) {
548 0 : Column *f = fmt + i;
549 :
550 0 : if (f->virt)
551 0 : continue;
552 0 : if (f->c) {
553 0 : const void *p = BUNtail(f->ci, id - f->c->hseqbase);
554 :
555 0 : if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
556 0 : size_t l = strlen(f->nullstr);
557 0 : if (mnstr_write(fd, f->nullstr, 1, l) != (ssize_t) l)
558 0 : return TABLET_error(fd);
559 : } else {
560 0 : ssize_t l = f->tostr(f->extra, buf, len, f->adt, p);
561 :
562 0 : if (l < 0 || mnstr_write(fd, *buf, 1, l) != l)
563 0 : return TABLET_error(fd);
564 : }
565 : }
566 0 : if (mnstr_write(fd, f->sep, 1, f->seplen) != f->seplen)
567 0 : return TABLET_error(fd);
568 : }
569 : return 0;
570 : }
571 :
572 : /*
573 : * Fast Load
574 : * To speedup the CPU intensive loading of files we have to break
575 : * the file into pieces and perform parallel analysis. Experimentation
576 : * against lineitem SF1 showed that half of the time goes into very
577 : * basis atom analysis (41 out of 102 B instructions).
578 : * Furthermore, the actual insertion into the BATs takes only
579 : * about 10% of the total. With multi-core processors around
580 : * it seems we can gain here significantly.
581 : *
582 : * The approach taken is to fork a parallel scan over the text file.
583 : * We assume that the blocked stream is already
584 : * positioned correctly at the reading position. The start and limit
585 : * indicates the byte range to search for tuples.
586 : * If start> 0 then we first skip to the next record separator.
587 : * If necessary we read more than 'limit' bytes to ensure parsing a complete
588 : * record and stop at the record boundary.
589 : * Beware, we should allocate Tablet descriptors for each file segment,
590 : * otherwise we end up with a gross concurrency control problem.
591 : * The resulting BATs should be glued at the final phase.
592 : *
593 : * Raw Load
594 : * Front-ends can bypass most of the overhead in loading the BATs
595 : * by preparing the corresponding files directly and replace those
596 : * created by e.g. the SQL frontend.
597 : * This strategy is only advisable for cases where we have very
598 : * large files >200GB and/or are created by a well debugged code.
599 : *
600 : * To experiment with this approach, the code base responds
601 : * on negative number of cores by dumping the data directly in BAT
602 : * storage format into a collections of files on disk.
603 : * It reports on the actions to be taken to replace BATs.
604 : * This technique is initially only supported for fixed-sized columns.
605 : * The rawmode() indicator acts as the internal switch.
606 : */
607 :
608 : /*
609 : * To speed up loading ascii files we have to determine the number of blocks.
610 : * This depends on the number of cores available.
611 : * For the time being we hardwire this decision based on our own
612 : * platforms.
613 : * Furthermore, we only consider parallel load for file-based requests.
614 : *
615 : * To simplify our world, we assume a single producer process.
616 : */
617 :
618 : static int
619 0 : output_file_default(Tablet *as, BAT *order, stream *fd, bstream *in)
620 : {
621 0 : size_t len = BUFSIZ, locallen = BUFSIZ;
622 0 : int res = 0;
623 0 : char *buf = GDKmalloc(len);
624 0 : char *localbuf = GDKmalloc(len);
625 0 : BUN p, q;
626 0 : oid id;
627 0 : BUN offset = as->offset;
628 :
629 0 : if (buf == NULL || localbuf == NULL) {
630 0 : GDKfree(buf);
631 0 : GDKfree(localbuf);
632 0 : return -1;
633 : }
634 0 : for (q = offset + as->nr, p = offset, id = order->hseqbase + offset; p < q;
635 0 : p++, id++) {
636 0 : if (((p - offset) & 8191) == 8191 && bstream_getoob(in)) {
637 : res = -5;
638 : break;
639 : }
640 0 : if ((res = output_line(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs, id)) < 0) {
641 : break;
642 : }
643 : }
644 0 : GDKfree(localbuf);
645 0 : GDKfree(buf);
646 0 : return res;
647 : }
648 :
649 : static int
650 24 : output_complex(Tablet *as, stream *fd, bstream *in)
651 : {
652 24 : size_t len = BUFSIZ, locallen = BUFSIZ;
653 24 : ssize_t res = 0;
654 24 : char *buf = GDKmalloc(len);
655 24 : char *localbuf = GDKmalloc(len);
656 24 : BUN i = 0;
657 :
658 24 : if (buf == NULL || localbuf == NULL) {
659 0 : GDKfree(buf);
660 0 : GDKfree(localbuf);
661 0 : return -1;
662 : }
663 104 : for (i = 0; i < as->nr; i++) {
664 80 : if ((i & 8191) == 8191 && bstream_getoob(in)) {
665 : res = -5; /* "Query aborted" */
666 : break;
667 : }
668 80 : if ((res = output_line_complex(&buf, &len, 0, &localbuf, &locallen, as->format, as->nr_attrs)) < 0) {
669 : break;
670 : }
671 80 : if (fd && mnstr_write(fd, buf, 1, res) != res)
672 0 : return TABLET_error(fd);
673 : }
674 24 : GDKfree(localbuf);
675 24 : GDKfree(buf);
676 24 : if (res < 0)
677 0 : return (int)res;
678 : return 0;
679 : }
680 :
681 : static int
682 126692 : output_file_dense(Tablet *as, stream *fd, bstream *in)
683 : {
684 126692 : size_t len = BUFSIZ, locallen = BUFSIZ;
685 126692 : int res = 0;
686 126692 : char *buf = GDKmalloc(len);
687 126735 : char *localbuf = GDKmalloc(len);
688 126733 : BUN i = 0;
689 :
690 126733 : if (buf == NULL || localbuf == NULL) {
691 0 : GDKfree(buf);
692 0 : GDKfree(localbuf);
693 0 : return -1;
694 : }
695 6337326 : for (i = 0; i < as->nr; i++) {
696 6210593 : if ((i & 8191) == 8191 && bstream_getoob(in)) {
697 : res = -5; /* "Query aborted" */
698 : break;
699 : }
700 6210593 : if ((res = output_line_dense(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs)) < 0) {
701 : break;
702 : }
703 : }
704 126733 : GDKfree(localbuf);
705 126751 : GDKfree(buf);
706 126751 : return res;
707 : }
708 :
709 : static int
710 0 : output_file_ordered(Tablet *as, BAT *order, stream *fd, bstream *in)
711 : {
712 0 : size_t len = BUFSIZ;
713 0 : int res = 0;
714 0 : char *buf = GDKmalloc(len);
715 0 : BUN p, q;
716 0 : BUN i = 0;
717 0 : BUN offset = as->offset;
718 :
719 0 : if (buf == NULL)
720 : return -1;
721 0 : for (q = offset + as->nr, p = offset; p < q; p++, i++) {
722 0 : oid h = order->hseqbase + p;
723 :
724 0 : if (((p - offset) & 8191) == 8191 && bstream_getoob(in)) {
725 : res = -5;
726 : break;
727 : }
728 0 : if ((res = output_line_lookup(&buf, &len, as->format, fd, as->nr_attrs, h)) < 0) {
729 0 : GDKfree(buf);
730 0 : return res;
731 : }
732 : }
733 0 : GDKfree(buf);
734 0 : return res;
735 : }
736 :
737 : int
738 126722 : TABLEToutput_file(Tablet *as, BAT *order, stream *s, bstream *in)
739 : {
740 126722 : oid base = oid_nil;
741 126722 : int ret = 0;
742 :
743 : /* only set nr if it is zero or lower (bogus) to the maximum value
744 : * possible (BATcount), if already set within BATcount range,
745 : * preserve value such that for instance SQL's reply_size still
746 : * works
747 : */
748 126722 : if (order) {
749 0 : BUN maxnr = BATcount(order);
750 0 : if (as->nr == BUN_NONE || as->nr > maxnr)
751 0 : as->nr = maxnr;
752 : }
753 126722 : assert(as->nr != BUN_NONE);
754 : int complex = 0;
755 471241 : for (unsigned int i = 1; i < as->nr_attrs && !complex; i++)
756 344519 : complex |= (as->format[i].multiset || as->format[i].composite);
757 :
758 126722 : if (complex) {
759 0 : assert(!order);
760 0 : return output_complex(as, s, in);
761 : }
762 126740 : base = check_BATs(as);
763 126696 : if (!order || !is_oid_nil(base)) {
764 126696 : if (!order || order->hseqbase == base)
765 126696 : ret = output_file_dense(as, s, in);
766 : else
767 0 : ret = output_file_ordered(as, order, s, in);
768 : } else {
769 0 : ret = output_file_default(as, order, s, in);
770 : }
771 : return ret;
772 : }
773 :
774 : /*
775 : * Niels Nes, Martin Kersten
776 : *
777 : * Parallel bulk load for SQL
778 : * The COPY INTO command for SQL is heavily CPU bound, which means
779 : * that ideally we would like to exploit the multi-cores to do that
780 : * work in parallel.
781 : * Complicating factors are the initial record offset, the
782 : * possible variable length of the input, and the original sort order
783 : * that should preferable be maintained.
784 : *
785 : * The code below consists of a file reader, which breaks up the
786 : * file into chunks of distinct rows. Then multiple parallel threads
787 : * grab them, and break them on the field boundaries.
788 : * After all fields are identified this way, the columns are converted
789 : * and stored in the BATs.
790 : *
791 : * The threads get a reference to a private copy of the READERtask.
792 : * It includes a list of columns they should handle. This is a basis
793 : * to distributed cheap and expensive columns over threads.
794 : *
795 : * The file reader overlaps IO with updates of the BAT.
796 : * Also the buffer size of the block stream might be a little small for
797 : * this task (1MB). It has been increased to 8MB, which indeed improved.
798 : *
799 : * The work divider allocates subtasks to threads based on the
800 : * observed time spending so far.
801 : */
802 :
803 : #define BREAKROW 1
804 : #define UPDATEBAT 2
805 : #define ENDOFCOPY 3
806 :
807 : typedef struct {
808 : Client cntxt;
809 : int id; /* for self reference */
810 : int state; /* row break=1 , 2 = update bat */
811 : int workers; /* how many concurrent ones */
812 : int error; /* error during row break */
813 : int next;
814 : int limit;
815 : BUN cnt, maxrow; /* first row in file chunk. */
816 : lng skip; /* number of lines to be skipped */
817 : lng *time, wtime; /* time per col + time per thread */
818 : int rounds; /* how often did we divide the work */
819 : bool ateof; /* io control */
820 : bool from_stdin;
821 : bool escape; /* whether to handle \ escapes */
822 : bool besteffort;
823 : char quote;
824 : bstream *b;
825 : stream *out;
826 : MT_Id tid;
827 : MT_Sema producer; /* reader waits for call */
828 : MT_Sema consumer; /* reader waits for call */
829 : MT_Sema sema; /* threads wait for work , negative next implies exit */
830 : MT_Sema reply; /* let reader continue */
831 : Tablet *as;
832 : char *errbuf;
833 : const char *csep, *rsep;
834 : size_t seplen, rseplen;
835 :
836 : char *base[MAXBUFFERS], *input[MAXBUFFERS]; /* buffers for row splitter and tokenizer */
837 : size_t rowlimit[MAXBUFFERS]; /* determines maximal record length buffer */
838 : char **rows[MAXBUFFERS];
839 : lng *startlineno[MAXBUFFERS];
840 : int top[MAXBUFFERS]; /* number of rows in this buffer */
841 : int cur; /* current buffer used by splitter and update threads */
842 :
843 : int *cols; /* columns to handle */
844 : char ***fields;
845 : bte *rowerror;
846 : int errorcnt;
847 : bool aborted;
848 : bool set_qry_ctx;
849 : } READERtask;
850 :
851 : /* returns TRUE if there is/might be more */
852 : static bool
853 102768 : tablet_read_more(READERtask *task)
854 : {
855 102768 : bstream *in = task->b;
856 102768 : stream *out = task->out;
857 102768 : size_t n = task->b->size;
858 102768 : if (out) {
859 101136 : do {
860 : /* query is not finished ask for more */
861 : /* we need more query text */
862 101136 : if (bstream_next(in) < 0) {
863 0 : if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) {
864 0 : task->aborted = true;
865 0 : mnstr_clearerr(in->s);
866 : }
867 0 : return false;
868 : }
869 101136 : if (in->eof) {
870 101134 : if (bstream_getoob(in)) {
871 0 : task->aborted = true;
872 0 : return false;
873 : }
874 101134 : if (mnstr_write(out, PROMPT2, sizeof(PROMPT2) - 1, 1) == 1)
875 101134 : mnstr_flush(out, MNSTR_FLUSH_DATA);
876 101134 : in->eof = false;
877 : /* we need more query text */
878 101134 : if (bstream_next(in) < 0) {
879 0 : if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) {
880 0 : task->aborted = true;
881 0 : mnstr_clearerr(in->s);
882 : }
883 0 : return false;
884 : }
885 101134 : if (in->eof)
886 : return false;
887 : }
888 101134 : } while (in->len <= in->pos);
889 1632 : } else if (bstream_read(in, n) <= 0) {
890 : return false;
891 : }
892 : return true;
893 : }
894 :
895 : /* note, the column value that is passed here is the 0 based value; the
896 : * lineno value on the other hand is 1 based */
897 : static void
898 151 : tablet_error(READERtask *task, lng idx, lng lineno, int col, const char *msg,
899 : const char *fcn)
900 : {
901 151 : assert(is_int_nil(col) || col >= 0);
902 151 : assert(is_lng_nil(lineno) || lineno >= 1);
903 151 : MT_lock_set(&errorlock);
904 151 : if (task->cntxt->error_row != NULL
905 151 : && (BUNappend(task->cntxt->error_row, &lineno, false) != GDK_SUCCEED
906 151 : || BUNappend(task->cntxt->error_fld, &(int) { col + 1 },
907 : false) != GDK_SUCCEED
908 151 : || BUNappend(task->cntxt->error_msg, msg, false) != GDK_SUCCEED
909 151 : || BUNappend(task->cntxt->error_input, fcn,
910 : false) != GDK_SUCCEED)) {
911 0 : task->besteffort = false;
912 : }
913 151 : if (!is_lng_nil(idx) && task->rowerror && idx < task->limit)
914 151 : task->rowerror[idx]++;
915 151 : if (task->as->error == NULL) {
916 68 : const char *colnam = is_int_nil(col) || col < 0
917 34 : || (BUN) col >= task->as->nr_attrs ? NULL : task->as->format[col].name;
918 34 : if (msg == NULL) {
919 0 : task->besteffort = false;
920 34 : } else if (!is_lng_nil(lineno)) {
921 34 : if (!is_int_nil(col)) {
922 32 : if (colnam)
923 32 : task->as->error = createException(MAL, "sql.copy_from",
924 : "line " LLFMT ": column %d %s: %s",
925 : lineno, col + 1, colnam, msg);
926 : else
927 0 : task->as->error = createException(MAL, "sql.copy_from",
928 : "line " LLFMT ": column %d: %s",
929 : lineno, col + 1, msg);
930 : } else {
931 2 : task->as->error = createException(MAL, "sql.copy_from",
932 : "line " LLFMT ": %s", lineno, msg);
933 : }
934 : } else {
935 0 : if (!is_int_nil(col)) {
936 0 : if (colnam)
937 0 : task->as->error = createException(MAL, "sql.copy_from",
938 : "column %d %s: %s", col + 1, colnam,
939 : msg);
940 : else
941 0 : task->as->error = createException(MAL, "sql.copy_from",
942 : "column %d: %s", col + 1, msg);
943 : } else {
944 0 : task->as->error = createException(MAL, "sql.copy_from", "%s", msg);
945 : }
946 : }
947 : }
948 151 : task->errorcnt++;
949 151 : MT_lock_unset(&errorlock);
950 151 : }
951 :
952 : /*
953 : * The row is broken into pieces directly on their field separators. It assumes that we have
954 : * the record in the cache already, so we can do most work quickly.
955 : * Furthermore, it assume a uniform (SQL) pattern, without whitespace skipping, but with quote and separator.
956 : */
957 :
958 : static size_t
959 566 : mystrlen(const char *s)
960 : {
961 : /* Calculate and return the space that is needed for the function
962 : * mycpstr below to do its work. */
963 566 : size_t len = 0;
964 566 : const char *s0 = s;
965 :
966 44715 : while (*s) {
967 44149 : if ((*s & 0x80) == 0) {
968 : ;
969 6 : } else if ((*s & 0xC0) == 0x80) {
970 : /* continuation byte */
971 0 : len += 3;
972 6 : } else if ((*s & 0xE0) == 0xC0) {
973 : /* two-byte sequence */
974 6 : if ((s[1] & 0xC0) != 0x80)
975 0 : len += 3;
976 : else
977 6 : s += 2;
978 0 : } else if ((*s & 0xF0) == 0xE0) {
979 : /* three-byte sequence */
980 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
981 0 : len += 3;
982 : else
983 0 : s += 3;
984 0 : } else if ((*s & 0xF8) == 0xF0) {
985 : /* four-byte sequence */
986 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
987 0 : || (s[3] & 0xC0) != 0x80)
988 0 : len += 3;
989 : else
990 0 : s += 4;
991 : } else {
992 : /* not a valid start byte */
993 0 : len += 3;
994 : }
995 44149 : s++;
996 : }
997 566 : len += s - s0;
998 566 : return len;
999 : }
1000 :
1001 : static char *
1002 855 : mycpstr(char *t, const char *s)
1003 : {
1004 : /* Copy the string pointed to by s into the buffer pointed to by
1005 : * t, and return a pointer to the NULL byte at the end. During
1006 : * the copy we translate incorrect UTF-8 sequences to escapes
1007 : * looking like <XX> where XX is the hexadecimal representation of
1008 : * the incorrect byte. The buffer t needs to be large enough to
1009 : * hold the result, but the correct length can be calculated by
1010 : * the function mystrlen above.*/
1011 45299 : while (*s) {
1012 44444 : if ((*s & 0x80) == 0) {
1013 44438 : *t++ = *s++;
1014 6 : } else if ((*s & 0xC0) == 0x80) {
1015 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
1016 6 : } else if ((*s & 0xE0) == 0xC0) {
1017 : /* two-byte sequence */
1018 6 : if ((s[1] & 0xC0) != 0x80)
1019 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
1020 : else {
1021 6 : *t++ = *s++;
1022 6 : *t++ = *s++;
1023 : }
1024 0 : } else if ((*s & 0xF0) == 0xE0) {
1025 : /* three-byte sequence */
1026 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
1027 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
1028 : else {
1029 0 : *t++ = *s++;
1030 0 : *t++ = *s++;
1031 0 : *t++ = *s++;
1032 : }
1033 0 : } else if ((*s & 0xF8) == 0xF0) {
1034 : /* four-byte sequence */
1035 0 : if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
1036 0 : || (s[3] & 0xC0) != 0x80)
1037 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
1038 : else {
1039 0 : *t++ = *s++;
1040 0 : *t++ = *s++;
1041 0 : *t++ = *s++;
1042 0 : *t++ = *s++;
1043 : }
1044 : } else {
1045 : /* not a valid start byte */
1046 0 : t += sprintf(t, "<%02X>", (uint8_t) * s++);
1047 : }
1048 : }
1049 855 : *t = 0;
1050 855 : return t;
1051 : }
1052 :
1053 : static str
1054 149 : SQLload_error(READERtask *task, lng idx, BUN attrs)
1055 : {
1056 149 : str line;
1057 149 : char *s;
1058 149 : size_t sz = 0;
1059 149 : BUN i;
1060 :
1061 587 : for (i = 0; i < attrs; i++) {
1062 438 : if (task->fields[i][idx])
1063 428 : sz += mystrlen(task->fields[i][idx]);
1064 438 : sz += task->seplen;
1065 : }
1066 :
1067 149 : s = line = GDKmalloc(sz + task->rseplen + 1);
1068 149 : if (line == NULL) {
1069 0 : tablet_error(task, idx, lng_nil, int_nil, "SQLload malloc error",
1070 : "SQLload_error");
1071 0 : return NULL;
1072 : }
1073 587 : for (i = 0; i < attrs; i++) {
1074 438 : if (task->fields[i][idx])
1075 428 : s = mycpstr(s, task->fields[i][idx]);
1076 438 : if (i < attrs - 1)
1077 289 : s = mycpstr(s, task->csep);
1078 : }
1079 149 : strcpy(s, task->rsep);
1080 149 : return line;
1081 : }
1082 :
1083 : /*
1084 : * The parsing of the individual values is straightforward. If the value represents
1085 : * the null-replacement string then we grab the underlying nil.
1086 : * If the string starts with the quote identified from SQL, we locate the tail
1087 : * and interpret the body.
1088 : *
1089 : * If inserting fails, we return -1; if the value cannot be parsed, we
1090 : * return -1 if besteffort is not set, otherwise we return 0, but in
1091 : * either case an entry is added to the error table.
1092 : */
1093 : static inline int
1094 275770996 : SQLinsert_val(READERtask *task, int col, int idx)
1095 : {
1096 275770996 : Column *fmt = task->as->format + col;
1097 275770996 : const void *adt;
1098 275770996 : char buf[BUFSIZ];
1099 275770996 : char *s = task->fields[col][idx];
1100 275770996 : char *err = NULL;
1101 275770996 : int ret = 0;
1102 :
1103 : /* include testing on the terminating null byte !! */
1104 275770996 : if (s == NULL) {
1105 6364758 : adt = fmt->nildata;
1106 6364758 : fmt->c->tnonil = false;
1107 : } else {
1108 269406238 : if (task->escape) {
1109 269176238 : size_t slen = strlen(s) + 1;
1110 269176238 : char *data = slen <= sizeof(buf) ? buf : GDKmalloc(strlen(s) + 1);
1111 18 : if (data == NULL
1112 302502308 : || GDKstrFromStr((unsigned char *) data, (unsigned char *) s,
1113 269176238 : strlen(s), '\0') < 0)
1114 : adt = NULL;
1115 : else
1116 302502306 : adt = fmt->frstr(fmt, fmt->adt, data);
1117 309109493 : if (data != buf)
1118 18 : GDKfree(data);
1119 : } else
1120 230000 : adt = fmt->frstr(fmt, fmt->adt, s);
1121 : }
1122 :
1123 315704290 : lng row = task->cnt + idx + 1;
1124 315704290 : if (adt == NULL) {
1125 138 : if (task->rowerror) {
1126 138 : err = SQLload_error(task, idx, task->as->nr_attrs);
1127 138 : if (s) {
1128 138 : size_t slen = mystrlen(s);
1129 138 : char *scpy = GDKmalloc(slen + 1);
1130 138 : if (scpy == NULL) {
1131 0 : tablet_error(task, idx, row, col,
1132 : SQLSTATE(HY013) MAL_MALLOC_FAIL, err);
1133 0 : task->besteffort = false; /* no longer best effort */
1134 0 : GDKfree(err);
1135 0 : return -1;
1136 : }
1137 138 : mycpstr(scpy, s);
1138 138 : s = scpy;
1139 : }
1140 138 : snprintf(buf, sizeof(buf), "'%s' expected%s%s%s", fmt->type,
1141 : s ? " in '" : "", s ? s : "", s ? "'" : "");
1142 138 : GDKfree(s);
1143 138 : tablet_error(task, idx, row, col, buf, err);
1144 138 : GDKfree(err);
1145 138 : if (!task->besteffort)
1146 : return -1;
1147 : }
1148 117 : ret = -!task->besteffort; /* yep, two unary operators ;-) */
1149 : /* replace it with a nil */
1150 117 : adt = fmt->nildata;
1151 117 : fmt->c->tnonil = false;
1152 : }
1153 315704269 : if (bunfastapp(fmt->c, adt) == GDK_SUCCEED)
1154 : return ret;
1155 :
1156 : /* failure */
1157 0 : if (task->rowerror) {
1158 0 : char *msg = GDKerrbuf;
1159 0 : err = SQLload_error(task, idx, task->as->nr_attrs);
1160 0 : tablet_error(task, idx, row, col, msg
1161 0 : && *msg ? msg : "insert failed", err);
1162 0 : GDKfree(err);
1163 : }
1164 0 : task->besteffort = false; /* no longer best effort */
1165 0 : return -1;
1166 : }
1167 :
1168 : static int
1169 325739 : SQLworker_column(READERtask *task, int col)
1170 : {
1171 325739 : int i;
1172 325739 : Column *fmt = task->as->format;
1173 :
1174 325739 : if (fmt[col].c == NULL)
1175 : return 0;
1176 :
1177 : /* watch out for concurrent threads */
1178 325733 : MT_lock_set(&mal_copyLock);
1179 327456 : if (!fmt[col].skip
1180 327456 : && BATcapacity(fmt[col].c) < BATcount(fmt[col].c) + task->next) {
1181 289 : if (BATextend(fmt[col].c, BATgrows(fmt[col].c) + task->limit) != GDK_SUCCEED) {
1182 0 : tablet_error(task, lng_nil, lng_nil, col,
1183 : "Failed to extend the BAT\n", "SQLworker_column");
1184 0 : MT_lock_unset(&mal_copyLock);
1185 0 : return -1;
1186 : }
1187 : }
1188 327456 : MT_lock_unset(&mal_copyLock);
1189 :
1190 281345446 : for (i = 0; i < task->top[task->cur]; i++) {
1191 280691793 : if (!fmt[col].skip && SQLinsert_val(task, col, i) < 0) {
1192 21 : BATsetcount(fmt[col].c, BATcount(fmt[col].c));
1193 21 : return -1;
1194 : }
1195 : }
1196 326197 : BATsetcount(fmt[col].c, BATcount(fmt[col].c));
1197 :
1198 326197 : return 0;
1199 : }
1200 :
1201 : /*
1202 : * The rows are broken on the column separator. Any error is shown and reflected with
1203 : * setting the reference of the offending row fields to NULL.
1204 : * This allows the loading to continue, skipping the minimal number of rows.
1205 : * The details about the locations can be inspected from the error table.
1206 : * We also trim the quotes around strings.
1207 : */
1208 : static int
1209 135273489 : SQLload_parse_row(READERtask *task, int idx)
1210 : {
1211 135273489 : BUN i;
1212 135273489 : char errmsg[BUFSIZ];
1213 135273489 : char ch = *task->csep;
1214 135273489 : char *row = task->rows[task->cur][idx];
1215 135273489 : lng startlineno = task->startlineno[task->cur][idx];
1216 135273489 : Tablet *as = task->as;
1217 135273489 : Column *fmt = as->format;
1218 135273489 : bool error = false;
1219 135273489 : str errline = NULL;
1220 :
1221 135273489 : assert(idx < task->top[task->cur]);
1222 135273489 : assert(row);
1223 135273489 : errmsg[0] = 0;
1224 :
1225 135273489 : if (task->quote || task->seplen != 1) {
1226 11437216 : for (i = 0; i < as->nr_attrs; i++) {
1227 8673546 : bool quote = false;
1228 8673546 : task->fields[i][idx] = row;
1229 : /* recognize fields starting with a quote, keep them */
1230 8673546 : if (*row && *row == task->quote) {
1231 4442535 : quote = true;
1232 4442535 : task->fields[i][idx] = row + 1;
1233 4442535 : row = tablet_skip_string(row + 1, task->quote, task->escape);
1234 :
1235 4644337 : if (!row) {
1236 0 : errline = SQLload_error(task, idx, i + 1);
1237 0 : snprintf(errmsg, BUFSIZ, "Quote (%c) missing", task->quote);
1238 0 : tablet_error(task, idx, startlineno, (int) i, errmsg,
1239 : errline);
1240 0 : GDKfree(errline);
1241 0 : error = true;
1242 0 : goto errors1;
1243 : } else
1244 4644337 : *row++ = 0;
1245 : }
1246 :
1247 : /* eat away the column separator */
1248 50368888 : for (; *row; row++)
1249 49592171 : if (*row == '\\' && task->escape) {
1250 2 : if (row[1])
1251 2 : row++;
1252 49592169 : } else if (*row == ch
1253 8098631 : && (task->seplen == 1
1254 4 : || strncmp(row, task->csep,
1255 : task->seplen) == 0)) {
1256 8098631 : *row = 0;
1257 8098631 : row += task->seplen;
1258 8098631 : goto endoffieldcheck;
1259 : }
1260 :
1261 : /* not enough fields */
1262 776717 : if (i < as->nr_attrs - 1) {
1263 1 : errline = SQLload_error(task, idx, i + 1);
1264 : /* it's the next value that is missing */
1265 1 : tablet_error(task, idx, startlineno, (int) i + 1,
1266 : "Column value missing", errline);
1267 1 : GDKfree(errline);
1268 1 : error = true;
1269 1 : errors1:
1270 : /* we save all errors detected as NULL values */
1271 4 : for (; i < as->nr_attrs; i++)
1272 3 : task->fields[i][idx] = NULL;
1273 1 : i--;
1274 : }
1275 776716 : endoffieldcheck:
1276 8875348 : ;
1277 : /* check for user defined NULL string */
1278 8875348 : if ((!quote || !fmt->null_length) && fmt->nullstr
1279 7378671 : && task->fields[i][idx]
1280 7378670 : && GDKstrcasecmp(task->fields[i][idx], fmt->nullstr) == 0)
1281 1960415 : task->fields[i][idx] = 0;
1282 : }
1283 : } else {
1284 : assert(!task->quote);
1285 : assert(task->seplen == 1);
1286 326143620 : for (i = 0; i < as->nr_attrs; i++) {
1287 195413124 : task->fields[i][idx] = row;
1288 :
1289 : /* eat away the column separator */
1290 1885916832 : for (; *row; row++)
1291 1754973294 : if (*row == '\\' && task->escape) {
1292 382 : if (row[1])
1293 382 : row++;
1294 1754972912 : } else if (*row == ch) {
1295 64469586 : *row = 0;
1296 64469586 : row++;
1297 64469586 : goto endoffield2;
1298 : }
1299 :
1300 : /* not enough fields */
1301 130943538 : if (i < as->nr_attrs - 1) {
1302 9 : errline = SQLload_error(task, idx, i + 1);
1303 : /* it's the next value that is missing */
1304 9 : tablet_error(task, idx, startlineno, (int) i + 1,
1305 : "Column value missing", errline);
1306 9 : GDKfree(errline);
1307 9 : error = true;
1308 : /* we save all errors detected */
1309 43 : for (; i < as->nr_attrs; i++)
1310 25 : task->fields[i][idx] = NULL;
1311 9 : i--;
1312 : }
1313 130943529 : endoffield2:
1314 195413124 : ;
1315 : /* check for user defined NULL string */
1316 195413124 : if (fmt->nullstr && task->fields[i][idx]
1317 181136481 : && GDKstrcasecmp(task->fields[i][idx], fmt->nullstr) == 0) {
1318 3673724 : task->fields[i][idx] = 0;
1319 : }
1320 : }
1321 : }
1322 : /* check for too many values as well */
1323 133494166 : if (row && *row && i == as->nr_attrs) {
1324 1 : errline = SQLload_error(task, idx, task->as->nr_attrs);
1325 1 : snprintf(errmsg, BUFSIZ, "Leftover data '%s'", row);
1326 1 : tablet_error(task, idx, startlineno, (int) i, errmsg, errline);
1327 1 : GDKfree(errline);
1328 1 : error = true;
1329 : }
1330 133494166 : return error ? -1 : 0;
1331 : }
1332 :
1333 : static void
1334 1495 : SQLworker(void *arg)
1335 : {
1336 1495 : READERtask *task = (READERtask *) arg;
1337 1495 : unsigned int i;
1338 1495 : int j, piece;
1339 1495 : lng t0;
1340 :
1341 1495 : GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
1342 1497 : GDKclrerr();
1343 1497 : task->errbuf = GDKerrbuf;
1344 1497 : MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
1345 :
1346 1497 : MT_sema_down(&task->sema);
1347 613090 : while (task->top[task->cur] >= 0) {
1348 : /* stage one, break the rows spread the work over the workers */
1349 613090 : switch (task->state) {
1350 305881 : case BREAKROW:
1351 305881 : t0 = GDKusec();
1352 305760 : piece = (task->top[task->cur] + task->workers) / task->workers;
1353 :
1354 305760 : for (j = piece * task->id;
1355 134017125 : j < task->top[task->cur] && j < piece * (task->id + 1); j++)
1356 133712251 : if (task->rows[task->cur][j]) {
1357 133712251 : if (SQLload_parse_row(task, j) < 0) {
1358 11 : task->errorcnt++;
1359 : // early break unless best effort
1360 11 : if (!task->besteffort) {
1361 8 : for (j++;
1362 9316 : j < task->top[task->cur]
1363 9316 : && j < piece * (task->id + 1); j++)
1364 37233 : for (i = 0; i < task->as->nr_attrs; i++)
1365 27925 : task->fields[i][j] = NULL;
1366 : break;
1367 : }
1368 : }
1369 : }
1370 304882 : task->wtime = GDKusec() - t0;
1371 305815 : break;
1372 305727 : case UPDATEBAT:
1373 305727 : if (!task->besteffort && task->errorcnt)
1374 : break;
1375 : /* stage two, updating the BATs */
1376 1297605 : for (i = 0; i < task->as->nr_attrs; i++)
1377 991645 : if (task->cols[i]) {
1378 325961 : t0 = GDKusec();
1379 325282 : if (SQLworker_column(task, task->cols[i] - 1) < 0)
1380 : break;
1381 326028 : t0 = GDKusec() - t0;
1382 326132 : task->time[i] += t0;
1383 326132 : task->wtime += t0;
1384 : }
1385 : break;
1386 1482 : case ENDOFCOPY:
1387 1482 : MT_sema_up(&task->reply);
1388 1485 : goto do_return;
1389 : }
1390 611734 : MT_sema_up(&task->reply);
1391 1222482 : MT_sema_down(&task->sema);
1392 : }
1393 0 : MT_sema_up(&task->reply);
1394 :
1395 1485 : do_return:
1396 1485 : GDKfree(GDKerrbuf);
1397 1491 : GDKsetbuf(NULL);
1398 1480 : MT_thread_set_qry_ctx(NULL);
1399 1480 : }
1400 :
1401 : static void
1402 103364 : SQLworkdivider(READERtask *task, READERtask *ptask, int nr_attrs, int threads)
1403 : {
1404 103364 : int i, j, mi;
1405 103364 : lng loc[MAXWORKERS];
1406 :
1407 : /* after a few rounds we stick to the work assignment */
1408 103364 : if (task->rounds > 8)
1409 103225 : return;
1410 : /* simple round robin the first time */
1411 2757 : if (threads == 1 || task->rounds++ == 0) {
1412 16461 : for (i = j = 0; i < nr_attrs; i++, j++)
1413 13843 : ptask[j % threads].cols[i] = task->cols[i];
1414 : return;
1415 : }
1416 139 : memset(loc, 0, sizeof(loc));
1417 : /* use of load directives */
1418 2054 : for (i = 0; i < nr_attrs; i++)
1419 14624 : for (j = 0; j < threads; j++)
1420 12709 : ptask[j].cols[i] = 0;
1421 :
1422 : /* now allocate the work to the threads */
1423 2054 : for (i = 0; i < nr_attrs; i++, j++) {
1424 : mi = 0;
1425 12709 : for (j = 1; j < threads; j++)
1426 10794 : if (loc[j] < loc[mi])
1427 3098 : mi = j;
1428 :
1429 1915 : ptask[mi].cols[i] = task->cols[i];
1430 1915 : loc[mi] += task->time[i];
1431 : }
1432 : /* reset the timer */
1433 2054 : for (i = 0; i < nr_attrs; i++, j++)
1434 1915 : task->time[i] = 0;
1435 : }
1436 :
1437 : /*
1438 : * Reading is handled by a separate task as a preparation for more parallelism.
1439 : * A buffer is filled with proper rows.
1440 : * If we are reading from a file then a double buffering scheme ia activated.
1441 : * Reading from the console (stdin) remains single buffered only.
1442 : * If we end up with unfinished records, then the rowlimit will terminate the process.
1443 : */
1444 :
1445 : typedef unsigned char (*dfa_t)[256];
1446 :
1447 : static dfa_t
1448 1128 : mkdfa(const unsigned char *sep, size_t seplen)
1449 : {
1450 1128 : dfa_t dfa;
1451 1128 : size_t i, j, k;
1452 :
1453 1128 : dfa = GDKzalloc(seplen * sizeof(*dfa));
1454 1128 : if (dfa == NULL)
1455 : return NULL;
1456 : /* Each character in the separator string advances the state by
1457 : * one. If state reaches seplen, the separator was recognized.
1458 : *
1459 : * The first loop and the nested loop make sure that if in any
1460 : * state we encounter an invalid character, but part of what we've
1461 : * matched so far is a prefix of the separator, we go to the
1462 : * appropriate state. */
1463 2278 : for (i = 0; i < seplen; i++)
1464 1150 : dfa[i][sep[0]] = 1;
1465 2278 : for (j = 0; j < seplen; j++) {
1466 1150 : dfa[j][sep[j]] = (unsigned char) (j + 1);
1467 1172 : for (k = 0; k < j; k++) {
1468 44 : for (i = 0; i < j - k; i++)
1469 22 : if (sep[k + i] != sep[i])
1470 : break;
1471 22 : if (i == j - k && dfa[j][sep[i]] <= i)
1472 0 : dfa[j][sep[i]] = (unsigned char) (i + 1);
1473 : }
1474 : }
1475 : return dfa;
1476 : }
1477 :
1478 : #ifdef __has_builtin
1479 : #if __has_builtin(__builtin_expect)
1480 : /* __builtin_expect returns its first argument; it is expected to be
1481 : * equal to the second argument */
1482 : #define unlikely(expr) __builtin_expect((expr) != 0, 0)
1483 : #define likely(expr) __builtin_expect((expr) != 0, 1)
1484 : #endif
1485 : #endif
1486 : #ifndef unlikely
1487 : #ifdef _MSC_VER
1488 : #define unlikely(expr) (__assume(!(expr)), (expr))
1489 : #define likely(expr) (__assume((expr)), (expr))
1490 : #else
1491 : #define unlikely(expr) (expr)
1492 : #define likely(expr) (expr)
1493 : #endif
1494 : #endif
1495 :
1496 : static void
1497 1128 : SQLproducer(void *p)
1498 : {
1499 1128 : READERtask *task = (READERtask *) p;
1500 1128 : bool consoleinput = false;
1501 1128 : int cur = 0; // buffer being filled
1502 1128 : bool blocked[MAXBUFFERS] = { false };
1503 1128 : bool ateof[MAXBUFFERS] = { false };
1504 1128 : BUN cnt = 0, bufcnt[MAXBUFFERS] = { 0 };
1505 1128 : char *end = NULL, *e = NULL, *s = NULL, *base;
1506 1128 : const char *rsep = task->rsep;
1507 1128 : size_t rseplen = strlen(rsep), partial = 0;
1508 1128 : char quote = task->quote;
1509 1128 : dfa_t rdfa;
1510 1128 : lng rowno = 0;
1511 1128 : lng lineno = 1;
1512 1128 : lng startlineno = 1;
1513 1128 : int more = 0;
1514 :
1515 1128 : MT_sema_down(&task->producer);
1516 1128 : if (task->id < 0) {
1517 : return;
1518 : }
1519 :
1520 1128 : MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
1521 1128 : rdfa = mkdfa((const unsigned char *) rsep, rseplen);
1522 1128 : if (rdfa == NULL) {
1523 0 : tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate memory",
1524 : "");
1525 0 : ateof[cur] = true;
1526 0 : goto reportlackofinput;
1527 : }
1528 :
1529 : /* TRC_DEBUG(MAL_SERVER, "SQLproducer started size '%zu' and len '%zu'\n", task->b->size, task->b->len);*/
1530 :
1531 1128 : base = end = s = task->input[cur];
1532 1128 : *s = 0;
1533 1128 : task->cur = cur;
1534 1128 : if (task->as->filename == NULL) {
1535 787 : consoleinput = true;
1536 787 : goto parseSTDIN;
1537 : }
1538 205195 : for (;;) {
1539 102768 : startlineno = lineno;
1540 102768 : ateof[cur] = !tablet_read_more(task);
1541 :
1542 : // we may be reading from standard input and may be out of input
1543 : // warn the consumers
1544 102768 : if (task->aborted || ((lineno & 8191) == 0 && bstream_getoob(task->cntxt->fdin))) {
1545 0 : tablet_error(task, rowno, lineno, int_nil,
1546 : "problem reported by client", s);
1547 0 : ateof[cur] = true;
1548 0 : goto reportlackofinput;
1549 : }
1550 :
1551 102768 : if (ateof[cur] && partial) {
1552 1 : if (unlikely(partial)) {
1553 1 : tablet_error(task, rowno, lineno, int_nil,
1554 : "incomplete record at end of file", s);
1555 1 : task->b->pos += partial;
1556 : }
1557 1 : goto reportlackofinput;
1558 : }
1559 :
1560 102767 : if (task->errbuf && task->errbuf[0]) {
1561 0 : if (unlikely(GDKerrbuf && GDKerrbuf[0])) {
1562 0 : tablet_error(task, rowno, lineno, int_nil, GDKerrbuf,
1563 : "SQLload_file");
1564 : /* TRC_DEBUG(MAL_SERVER, "Bailout on SQLload\n");*/
1565 0 : ateof[cur] = true;
1566 0 : break;
1567 : }
1568 : }
1569 :
1570 102767 : parseSTDIN:
1571 :
1572 : /* copy the stream buffer into the input buffer, which is guaranteed larger, but still limited */
1573 103554 : partial = 0;
1574 103554 : task->top[cur] = 0;
1575 103554 : s = task->input[cur];
1576 103554 : base = end;
1577 : /* avoid too long records */
1578 103554 : if (unlikely(end - s + task->b->len - task->b->pos >= task->rowlimit[cur])) {
1579 : /* the input buffer should be extended, but 'base' is not shared
1580 : between the threads, which we can not now update.
1581 : Mimic an ateof instead; */
1582 0 : tablet_error(task, rowno, lineno, int_nil, "record too long", "");
1583 0 : ateof[cur] = true;
1584 : /* TRC_DEBUG(MAL_SERVER, "Bailout on SQLload confronted with too large record\n");*/
1585 0 : goto reportlackofinput;
1586 : }
1587 103554 : memcpy(end, task->b->buf + task->b->pos, task->b->len - task->b->pos);
1588 103554 : end = end + task->b->len - task->b->pos;
1589 103554 : *end = '\0'; /* this is safe, as the stream ensures an extra byte */
1590 : /* Note that we rescan from the start of a record (the last
1591 : * partial buffer from the previous iteration), even if in the
1592 : * previous iteration we have already established that there
1593 : * is no record separator in the first, perhaps significant,
1594 : * part of the buffer. This is because if the record separator
1595 : * is longer than one byte, it is too complex (i.e. would
1596 : * require more state) to be sure what the state of the quote
1597 : * status is when we back off a few bytes from where the last
1598 : * scan ended (we need to back off some since we could be in
1599 : * the middle of the record separator). If this is too
1600 : * costly, we have to rethink the matter. */
1601 103554 : if (task->from_stdin && *s == '\n' && task->maxrow == BUN_MAX) {
1602 0 : ateof[cur] = true;
1603 0 : goto reportlackofinput;
1604 : }
1605 145229275 : for (e = s; *e && e < end && cnt < task->maxrow;) {
1606 : /* tokenize the record completely
1607 : *
1608 : * The format of the input should comply to the following
1609 : * grammar rule [ [[quote][[esc]char]*[quote]csep]*rsep]*
1610 : * where quote is a single user-defined character.
1611 : * Within the quoted fields a character may be escaped
1612 : * with a backslash. The correct number of fields should
1613 : * be supplied. In the first phase we simply break the
1614 : * rows at the record boundary. */
1615 : int nutf = 0;
1616 : int m = 0;
1617 : bool bs = false;
1618 : char q = 0;
1619 : size_t i = 0;
1620 3153552235 : while (*e) {
1621 3153551192 : if (task->skip > 0) {
1622 : /* no interpretation of data we're skipping, just
1623 : * look for newline */
1624 1827 : if (*e == '\n') {
1625 27 : lineno++;
1626 27 : break;
1627 : }
1628 : } else {
1629 : /* check for correctly encoded UTF-8 */
1630 3153549365 : if (nutf > 0) {
1631 1841 : if (unlikely((*e & 0xC0) != 0x80))
1632 1 : goto badutf8;
1633 3680 : if (unlikely(m != 0 && (*e & m) == 0))
1634 0 : goto badutf8;
1635 1840 : m = 0;
1636 1840 : nutf--;
1637 3153547524 : } else if ((*e & 0x80) != 0) {
1638 1709 : if ((*e & 0xE0) == 0xC0) {
1639 1582 : nutf = 1;
1640 1582 : if (unlikely((e[0] & 0x1E) == 0))
1641 0 : goto badutf8;
1642 127 : } else if ((*e & 0xF0) == 0xE0) {
1643 122 : nutf = 2;
1644 122 : if ((e[0] & 0x0F) == 0)
1645 3 : m = 0x20;
1646 5 : } else if (likely((*e & 0xF8) == 0xF0)) {
1647 5 : nutf = 3;
1648 5 : if ((e[0] & 0x07) == 0)
1649 5 : m = 0x30;
1650 : } else {
1651 0 : goto badutf8;
1652 : }
1653 3153545815 : } else if (*e == '\n')
1654 145126151 : lineno++;
1655 : /* check for quoting and the row separator */
1656 3153549364 : if (bs) {
1657 : bs = false;
1658 3153006292 : } else if (task->escape && *e == '\\') {
1659 : bs = true;
1660 : i = 0;
1661 3152463220 : } else if (*e == q) {
1662 : q = 0;
1663 3146894538 : } else if (*e == quote) {
1664 : q = quote;
1665 : i = 0;
1666 3141325773 : } else if (q == 0) {
1667 2959386861 : i = rdfa[i][(unsigned char) *e];
1668 2959386861 : if (i == rseplen)
1669 : break;
1670 : }
1671 : }
1672 3008425470 : e++;
1673 : }
1674 145126764 : if (*e == 0) {
1675 1043 : partial = e - s;
1676 : /* found an incomplete record, saved for next round */
1677 1043 : if (unlikely(s + partial < end)) {
1678 : /* found a EOS in the input */
1679 0 : tablet_error(task, rowno, startlineno, int_nil,
1680 : "record too long (EOS found)", "");
1681 0 : ateof[cur] = true;
1682 0 : goto reportlackofinput;
1683 : }
1684 : break;
1685 : } else {
1686 145125721 : rowno++;
1687 145125721 : if (task->skip > 0) {
1688 27 : task->skip--;
1689 : } else {
1690 145125694 : if (cnt < task->maxrow) {
1691 145125694 : task->startlineno[cur][task->top[cur]] = startlineno;
1692 145125694 : task->rows[cur][task->top[cur]++] = s;
1693 145125694 : startlineno = lineno;
1694 145125694 : cnt++;
1695 : }
1696 145125694 : *(e + 1 - rseplen) = 0;
1697 : }
1698 145125721 : s = ++e;
1699 145125721 : task->b->pos += (size_t) (e - base);
1700 145125721 : base = e;
1701 145125721 : if (task->top[cur] == task->limit)
1702 : break;
1703 : }
1704 : }
1705 :
1706 102510 : reportlackofinput:
1707 : /* TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
1708 :
1709 103555 : if (consoleinput) {
1710 101923 : task->cur = cur;
1711 101923 : task->ateof = ateof[cur];
1712 101923 : task->cnt = bufcnt[cur];
1713 : /* tell consumer to go ahead */
1714 101923 : MT_sema_up(&task->consumer);
1715 : /* then wait until it is done */
1716 101923 : MT_sema_down(&task->producer);
1717 101923 : if (cnt == task->maxrow) {
1718 784 : GDKfree(rdfa);
1719 784 : MT_thread_set_qry_ctx(NULL);
1720 784 : return;
1721 : }
1722 : } else {
1723 1632 : assert(!blocked[cur]);
1724 1632 : if (blocked[(cur + 1) % MAXBUFFERS]) {
1725 : /* first wait until other buffer is done */
1726 : /* TRC_DEBUG(MAL_SERVER, "Wait for consumers to finish buffer: %d\n", (cur + 1) % MAXBUFFERS);*/
1727 :
1728 1291 : MT_sema_down(&task->producer);
1729 1291 : blocked[(cur + 1) % MAXBUFFERS] = false;
1730 1291 : if (task->state == ENDOFCOPY) {
1731 3 : GDKfree(rdfa);
1732 3 : MT_thread_set_qry_ctx(NULL);
1733 3 : return;
1734 : }
1735 : }
1736 : /* other buffer is done, proceed with current buffer */
1737 1629 : assert(!blocked[(cur + 1) % MAXBUFFERS]);
1738 1629 : blocked[cur] = true;
1739 1629 : task->cur = cur;
1740 1629 : task->ateof = ateof[cur];
1741 1629 : task->cnt = bufcnt[cur];
1742 1629 : more = !ateof[cur] || (e && e < end
1743 0 : && task->top[cur] == task->limit);
1744 : /* TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
1745 :
1746 1629 : MT_sema_up(&task->consumer);
1747 :
1748 1629 : cur = (cur + 1) % MAXBUFFERS;
1749 : /* TRC_DEBUG(MAL_SERVER, "May continue with buffer: %d\n", cur);*/
1750 :
1751 1629 : if (cnt == task->maxrow) {
1752 182 : MT_sema_down(&task->producer);
1753 : /* TRC_DEBUG(MAL_SERVER, "Producer delivered all\n");*/
1754 182 : GDKfree(rdfa);
1755 182 : MT_thread_set_qry_ctx(NULL);
1756 182 : return;
1757 : }
1758 : }
1759 : /* TRC_DEBUG(MAL_SERVER, "Continue producer buffer: %d\n", cur);*/
1760 :
1761 : /* we ran out of input? */
1762 102586 : if (task->ateof && !more) {
1763 : /* TRC_DEBUG(MAL_SERVER, "Producer encountered eof\n");*/
1764 159 : GDKfree(rdfa);
1765 159 : MT_thread_set_qry_ctx(NULL);
1766 159 : return;
1767 : }
1768 : /* consumers ask us to stop? */
1769 102427 : if (task->state == ENDOFCOPY) {
1770 0 : GDKfree(rdfa);
1771 0 : MT_thread_set_qry_ctx(NULL);
1772 0 : return;
1773 : }
1774 102427 : bufcnt[cur] = cnt;
1775 : /* move the non-parsed correct row data to the head of the next buffer */
1776 102427 : end = s = task->input[cur];
1777 : }
1778 0 : if (unlikely(cnt < task->maxrow && task->maxrow != BUN_NONE)) {
1779 0 : char msg[256];
1780 0 : snprintf(msg, sizeof(msg), "incomplete record at end of file:%s\n", s);
1781 0 : task->as->error = GDKstrdup(msg);
1782 0 : tablet_error(task, rowno, startlineno, int_nil,
1783 : "incomplete record at end of file", s);
1784 0 : task->b->pos += partial;
1785 : }
1786 0 : GDKfree(rdfa);
1787 0 : MT_thread_set_qry_ctx(NULL);
1788 :
1789 0 : return;
1790 :
1791 1 : badutf8:
1792 1 : tablet_error(task, rowno, startlineno, int_nil,
1793 : "input not properly encoded UTF-8", "");
1794 1 : ateof[cur] = true;
1795 1 : goto reportlackofinput;
1796 : }
1797 :
1798 : static void
1799 1158 : create_rejects_table(Client cntxt)
1800 : {
1801 1158 : MT_lock_set(&mal_contextLock);
1802 1158 : if (cntxt->error_row == NULL) {
1803 490 : cntxt->error_row = COLnew(0, TYPE_lng, 0, TRANSIENT);
1804 490 : cntxt->error_fld = COLnew(0, TYPE_int, 0, TRANSIENT);
1805 490 : cntxt->error_msg = COLnew(0, TYPE_str, 0, TRANSIENT);
1806 490 : cntxt->error_input = COLnew(0, TYPE_str, 0, TRANSIENT);
1807 490 : if (cntxt->error_row == NULL || cntxt->error_fld == NULL
1808 490 : || cntxt->error_msg == NULL || cntxt->error_input == NULL) {
1809 0 : BBPreclaim(cntxt->error_row);
1810 0 : BBPreclaim(cntxt->error_fld);
1811 0 : BBPreclaim(cntxt->error_msg);
1812 0 : BBPreclaim(cntxt->error_input);
1813 0 : cntxt->error_row = cntxt->error_fld = cntxt->error_msg = cntxt->error_input = NULL;
1814 : }
1815 : }
1816 1158 : MT_lock_unset(&mal_contextLock);
1817 1158 : }
1818 :
1819 : BUN
1820 1128 : SQLload_file(Client cntxt, Tablet *as, bstream *b, stream *out,
1821 : const char *csep, const char *rsep, char quote, lng skip,
1822 : lng maxrow, int best, bool from_stdin, const char *tabnam,
1823 : bool escape)
1824 : {
1825 1128 : BUN cnt = 0, cntstart = 0, leftover = 0;
1826 1128 : int res = 0; /* < 0: error, > 0: success, == 0: continue processing */
1827 1128 : int j;
1828 1128 : BUN firstcol;
1829 1128 : BUN i, attr;
1830 1128 : READERtask task;
1831 1128 : READERtask ptask[MAXWORKERS];
1832 1128 : int threads = 1;
1833 1128 : lng tio, t1 = 0;
1834 1128 : char name[MT_NAME_LEN];
1835 :
1836 1128 : if (maxrow < 0 || maxrow > (LL_CONSTANT(1) << 16)) {
1837 116 : threads = GDKgetenv_int("tablet_threads", GDKnr_threads);
1838 116 : if (threads > 1)
1839 116 : threads = threads < MAXWORKERS ? threads - 1 : MAXWORKERS - 1;
1840 : else
1841 : threads = 1;
1842 : }
1843 :
1844 : /* TRC_DEBUG(MAL_SERVER, "Prepare copy work for '%d' threads col '%s' rec '%s' quot '%c'\n", threads, csep, rsep, quote);*/
1845 :
1846 1128 : memset(ptask, 0, sizeof(ptask));
1847 2256 : task = (READERtask) {
1848 : .cntxt = cntxt,
1849 : .from_stdin = from_stdin,
1850 : .as = as,
1851 : .escape = escape, /* TODO: implement feature!!! */
1852 1128 : .set_qry_ctx = MT_thread_get_qry_ctx() != NULL,
1853 : };
1854 :
1855 : /* create the reject tables */
1856 1128 : create_rejects_table(task.cntxt);
1857 1128 : if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL
1858 1128 : || task.cntxt->error_msg == NULL || task.cntxt->error_input == NULL) {
1859 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1860 : "SQLload initialization failed", "");
1861 : /* nothing allocated yet, so nothing to free */
1862 0 : return BUN_NONE;
1863 : }
1864 :
1865 1128 : assert(rsep);
1866 1128 : assert(csep);
1867 1128 : assert(maxrow < 0 || maxrow <= (lng) BUN_MAX);
1868 1128 : task.fields = (char ***) GDKzalloc(as->nr_attrs * sizeof(char **));
1869 1128 : task.cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
1870 1128 : task.time = (lng *) GDKzalloc(as->nr_attrs * sizeof(lng));
1871 1128 : if (task.fields == NULL || task.cols == NULL || task.time == NULL) {
1872 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1873 : "memory allocation failed", "SQLload_file");
1874 0 : goto bailout;
1875 : }
1876 1128 : task.cur = 0;
1877 3384 : for (i = 0; i < MAXBUFFERS; i++) {
1878 2256 : task.base[i] = GDKmalloc(MAXROWSIZE(2 * b->size) + 2);
1879 2256 : task.rowlimit[i] = MAXROWSIZE(2 * b->size);
1880 2256 : if (task.base[i] == NULL) {
1881 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1882 : SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
1883 0 : goto bailout;
1884 : }
1885 2256 : task.base[i][0] = task.base[i][b->size + 1] = 0;
1886 2256 : task.input[i] = task.base[i] + 1; /* wrap the buffer with null bytes */
1887 : }
1888 1128 : task.besteffort = best;
1889 :
1890 1128 : if (maxrow < 0)
1891 82 : task.maxrow = BUN_MAX;
1892 : else
1893 1046 : task.maxrow = (BUN) maxrow;
1894 :
1895 1128 : task.skip = skip;
1896 1128 : task.quote = quote;
1897 1128 : task.csep = csep;
1898 1128 : task.seplen = strlen(csep);
1899 1128 : task.rsep = rsep;
1900 1128 : task.rseplen = strlen(rsep);
1901 1128 : task.errbuf = cntxt->errbuf;
1902 :
1903 1128 : MT_sema_init(&task.producer, 0, "task.producer");
1904 1128 : MT_sema_init(&task.consumer, 0, "task.consumer");
1905 1128 : task.ateof = false;
1906 1128 : task.b = b;
1907 1128 : task.out = out;
1908 :
1909 1128 : as->error = NULL;
1910 :
1911 : /* there is no point in creating more threads than we have columns */
1912 1128 : if (as->nr_attrs < (BUN) threads)
1913 74 : threads = (int) as->nr_attrs;
1914 :
1915 : /* allocate enough space for pointers into the buffer pool. */
1916 : /* the record separator is considered a column */
1917 1128 : task.limit = (int) (b->size / as->nr_attrs + as->nr_attrs);
1918 11506 : for (i = 0; i < as->nr_attrs; i++) {
1919 10378 : task.fields[i] = GDKmalloc(sizeof(char *) * task.limit);
1920 10378 : if (task.fields[i] == NULL) {
1921 0 : if (task.as->error == NULL)
1922 0 : as->error = createException(MAL, "sql.copy_from",
1923 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
1924 0 : goto bailout;
1925 : }
1926 10378 : task.cols[i] = (int) (i + 1); /* to distinguish non initialized later with zero */
1927 : }
1928 3384 : for (i = 0; i < MAXBUFFERS; i++) {
1929 2256 : task.rows[i] = GDKzalloc(sizeof(char *) * task.limit);
1930 2256 : task.startlineno[i] = GDKzalloc(sizeof(lng) * task.limit);
1931 2256 : if (task.rows[i] == NULL || task.startlineno[i] == NULL) {
1932 0 : GDKfree(task.rows[i]);
1933 0 : GDKfree(task.startlineno[i]);
1934 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1935 : SQLSTATE(HY013) MAL_MALLOC_FAIL,
1936 : "SQLload_file:failed to alloc buffers");
1937 0 : goto bailout;
1938 : }
1939 : }
1940 1128 : task.rowerror = (bte *) GDKzalloc(sizeof(bte) * task.limit);
1941 1128 : if (task.rowerror == NULL) {
1942 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1943 : SQLSTATE(HY013) MAL_MALLOC_FAIL,
1944 : "SQLload_file:failed to alloc rowerror buffer");
1945 0 : goto bailout;
1946 : }
1947 :
1948 1128 : task.id = 0;
1949 1128 : snprintf(name, sizeof(name), "prod-%s", tabnam);
1950 1128 : if (MT_create_thread(&task.tid, SQLproducer, (void *) &task, MT_THR_JOINABLE, name) < 0) {
1951 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1952 : SQLSTATE(42000) "failed to start producer thread",
1953 : "SQLload_file");
1954 0 : goto bailout;
1955 : }
1956 : /* TRC_DEBUG(MAL_SERVER, "Parallel bulk load " LLFMT " - " BUNFMT "\n", skip, task.maxrow);*/
1957 :
1958 1128 : task.workers = threads;
1959 2625 : for (j = 0; j < threads; j++) {
1960 1497 : ptask[j] = task;
1961 1497 : ptask[j].id = j;
1962 1497 : ptask[j].cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
1963 1497 : if (ptask[j].cols == NULL) {
1964 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1965 : SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
1966 0 : task.id = -1;
1967 0 : MT_sema_up(&task.producer);
1968 0 : goto bailout;
1969 : }
1970 1497 : snprintf(name, sizeof(name), "ptask%d.sema", j);
1971 1497 : MT_sema_init(&ptask[j].sema, 0, name);
1972 1497 : snprintf(name, sizeof(name), "ptask%d.repl", j);
1973 1497 : MT_sema_init(&ptask[j].reply, 0, name);
1974 1497 : snprintf(name, sizeof(name), "wrkr%d-%s", j, tabnam);
1975 1497 : if (MT_create_thread(&ptask[j].tid, SQLworker, (void *) &ptask[j], MT_THR_JOINABLE, name) < 0) {
1976 0 : tablet_error(&task, lng_nil, lng_nil, int_nil,
1977 : SQLSTATE(42000) "failed to start worker thread",
1978 : "SQLload_file");
1979 0 : threads = j;
1980 0 : for (j = 0; j < threads; j++)
1981 0 : ptask[j].workers = threads;
1982 : }
1983 : }
1984 1128 : if (threads == 0) {
1985 : /* no threads started */
1986 0 : task.id = -1;
1987 0 : MT_sema_up(&task.producer);
1988 0 : goto bailout;
1989 : }
1990 1128 : MT_sema_up(&task.producer);
1991 :
1992 1128 : tio = GDKusec();
1993 1128 : tio = GDKusec() - tio;
1994 1128 : t1 = GDKusec();
1995 2259 : for (firstcol = 0; firstcol < task.as->nr_attrs; firstcol++)
1996 1131 : if (task.as->format[firstcol].c != NULL)
1997 : break;
1998 104495 : while (res == 0 && cnt < task.maxrow) {
1999 :
2000 : // track how many elements are in the aggregated BATs
2001 103552 : cntstart = BATcount(task.as->format[firstcol].c);
2002 : /* block until the producer has data available */
2003 103552 : MT_sema_down(&task.consumer);
2004 103552 : cnt += task.top[task.cur];
2005 103552 : if (task.ateof && !task.top[task.cur])
2006 : break;
2007 103393 : t1 = GDKusec() - t1;
2008 : /* TRC_DEBUG(MAL_SERVER, "Break: %d rows\n", task.top[task.cur]);*/
2009 :
2010 103393 : t1 = GDKusec();
2011 103393 : if (task.top[task.cur]) {
2012 : /* activate the workers to break rows */
2013 410607 : for (j = 0; j < threads; j++) {
2014 : /* stage one, break the rows in parallel */
2015 307243 : ptask[j].error = 0;
2016 307243 : ptask[j].state = BREAKROW;
2017 307243 : ptask[j].next = task.top[task.cur];
2018 307243 : ptask[j].fields = task.fields;
2019 307243 : ptask[j].limit = task.limit;
2020 307243 : ptask[j].cnt = task.cnt;
2021 307243 : ptask[j].cur = task.cur;
2022 307243 : ptask[j].top[task.cur] = task.top[task.cur];
2023 307243 : MT_sema_up(&ptask[j].sema);
2024 : }
2025 : }
2026 103393 : if (task.top[task.cur]) {
2027 : /* await completion of row break phase */
2028 410607 : for (j = 0; j < threads; j++) {
2029 307243 : MT_sema_down(&ptask[j].reply);
2030 307243 : if (ptask[j].error) {
2031 0 : res = -1;
2032 : /* TRC_ERROR(MAL_SERVER, "Error in task: %d %d\n", j, ptask[j].error);*/
2033 : }
2034 : }
2035 : }
2036 :
2037 : /* TRC_DEBUG(MAL_SERVER,
2038 : "Fill the BATs '%d' " BUNFMT " cap " BUNFMT "\n",
2039 : task.top[task.cur], task.cnt, BATcapacity(as->format[task.cur].c));*/
2040 :
2041 103393 : if (task.top[task.cur]) {
2042 103364 : if (res == 0) {
2043 103364 : SQLworkdivider(&task, ptask, (int) as->nr_attrs, threads);
2044 :
2045 : /* activate the workers to update the BATs */
2046 513971 : for (j = 0; j < threads; j++) {
2047 : /* stage two, update the BATs */
2048 307243 : ptask[j].state = UPDATEBAT;
2049 307243 : MT_sema_up(&ptask[j].sema);
2050 : }
2051 : }
2052 : }
2053 103393 : tio = GDKusec();
2054 103393 : tio = t1 - tio;
2055 :
2056 : /* await completion of the BAT updates */
2057 103393 : if (res == 0 && task.top[task.cur]) {
2058 410607 : for (j = 0; j < threads; j++) {
2059 307243 : MT_sema_down(&ptask[j].reply);
2060 307243 : if (ptask[j].errorcnt > 0 && !ptask[j].besteffort) {
2061 307243 : res = -1;
2062 307243 : best = 0;
2063 : }
2064 : }
2065 : }
2066 :
2067 : /* trim the BATs discarding error tuples */
2068 : #define trimerrors(TYPE) \
2069 : do { \
2070 : TYPE *src, *dst; \
2071 : leftover= BATcount(task.as->format[attr].c); \
2072 : limit = leftover - cntstart; \
2073 : dst =src= (TYPE *) BUNtloc(task.as->format[attr].ci,cntstart); \
2074 : for(j = 0; j < (int) limit; j++, src++){ \
2075 : if ( task.rowerror[j]){ \
2076 : leftover--; \
2077 : continue; \
2078 : } \
2079 : *dst++ = *src; \
2080 : } \
2081 : BATsetcount(task.as->format[attr].c, leftover ); \
2082 : } while (0)
2083 :
2084 : /* TRC_DEBUG(MAL_SERVER, "Trim bbest '%d' table size " BUNFMT " - rows found so far " BUNFMT "\n",
2085 : best, BATcount(as->format[firstcol].c), task.cnt); */
2086 :
2087 103393 : if (best && BATcount(as->format[firstcol].c)) {
2088 : BUN limit;
2089 : int width;
2090 :
2091 53 : for (attr = 0; attr < as->nr_attrs; attr++) {
2092 37 : if (as->format[attr].skip)
2093 5 : continue;
2094 32 : width = as->format[attr].c->twidth;
2095 32 : as->format[attr].ci = bat_iterator_nolock(as->format[attr].c);
2096 32 : switch (width) {
2097 5 : case 1:
2098 16 : trimerrors(bte);
2099 5 : break;
2100 0 : case 2:
2101 0 : trimerrors(sht);
2102 0 : break;
2103 27 : case 4:
2104 30061 : trimerrors(int);
2105 27 : break;
2106 0 : case 8:
2107 0 : trimerrors(lng);
2108 0 : break;
2109 : #ifdef HAVE_HGE
2110 0 : case 16:
2111 0 : trimerrors(hge);
2112 0 : break;
2113 : #endif
2114 0 : default:
2115 : {
2116 0 : char *src, *dst;
2117 0 : leftover = BATcount(task.as->format[attr].c);
2118 0 : limit = leftover - cntstart;
2119 0 : dst = src = BUNtloc(task.as->format[attr].ci, cntstart);
2120 0 : for (j = 0; j < (int) limit; j++, src += width) {
2121 0 : if (task.rowerror[j]) {
2122 0 : leftover--;
2123 0 : continue;
2124 : }
2125 0 : if (dst != src)
2126 0 : memcpy(dst, src, width);
2127 0 : dst += width;
2128 : }
2129 0 : BATsetcount(task.as->format[attr].c, leftover);
2130 : }
2131 0 : break;
2132 : }
2133 : }
2134 : // re-initialize the error vector;
2135 16 : memset(task.rowerror, 0, task.limit);
2136 16 : task.errorcnt = 0;
2137 : }
2138 :
2139 103393 : if (res < 0) {
2140 : /* producer should stop */
2141 26 : task.maxrow = cnt;
2142 26 : task.state = ENDOFCOPY;
2143 26 : task.ateof = true;
2144 : }
2145 103393 : if (task.ateof && task.top[task.cur] < task.limit && cnt != task.maxrow)
2146 : break;
2147 103393 : task.top[task.cur] = 0;
2148 103393 : if (cnt == task.maxrow)
2149 969 : task.ateof = true;
2150 104521 : MT_sema_up(&task.producer);
2151 : }
2152 :
2153 : /* TRC_DEBUG(MAL_SERVER, "End of block stream eof=%d - res=%d\n", task.ateof, res);*/
2154 :
2155 1128 : cnt = BATcount(task.as->format[firstcol].c);
2156 :
2157 1128 : task.state = ENDOFCOPY;
2158 : /* TRC_DEBUG(MAL_SERVER, "Activate sync on disk\n");*/
2159 :
2160 1128 : if (!task.ateof || cnt < task.maxrow) {
2161 : /* TRC_DEBUG(MAL_SERVER, "Shut down reader\n");*/
2162 177 : MT_sema_up(&task.producer);
2163 : }
2164 1128 : MT_join_thread(task.tid);
2165 :
2166 : /* TRC_DEBUG(MAL_SERVER, "Activate endofcopy\n");*/
2167 :
2168 3753 : for (j = 0; j < threads; j++) {
2169 1497 : ptask[j].state = ENDOFCOPY;
2170 1497 : MT_sema_up(&ptask[j].sema);
2171 : }
2172 : /* wait for their death */
2173 2625 : for (j = 0; j < threads; j++)
2174 1497 : MT_sema_down(&ptask[j].reply);
2175 :
2176 : /* TRC_DEBUG(MAL_SERVER, "Kill the workers\n");*/
2177 :
2178 2625 : for (j = 0; j < threads; j++) {
2179 1497 : MT_join_thread(ptask[j].tid);
2180 1497 : GDKfree(ptask[j].cols);
2181 1497 : MT_sema_destroy(&ptask[j].sema);
2182 1497 : MT_sema_destroy(&ptask[j].reply);
2183 : }
2184 :
2185 : /* TRC_DEBUG(MAL_SERVER, "Found " BUNFMT " tuples\n", cnt);*/
2186 : /* TRC_DEBUG(MAL_SERVER, "Leftover input: %.63s\n", task.b->buf + task.b->pos);*/
2187 :
2188 11506 : for (i = 0; i < as->nr_attrs; i++) {
2189 10378 : BAT *b = task.as->format[i].c;
2190 10378 : if (b)
2191 10372 : BATsettrivprop(b);
2192 10378 : GDKfree(task.fields[i]);
2193 : }
2194 1128 : GDKfree(task.fields);
2195 1128 : GDKfree(task.cols);
2196 1128 : GDKfree(task.time);
2197 4512 : for (i = 0; i < MAXBUFFERS; i++) {
2198 2256 : GDKfree(task.base[i]);
2199 2256 : GDKfree(task.rows[i]);
2200 2256 : GDKfree(task.startlineno[i]);
2201 : }
2202 1128 : if (task.rowerror)
2203 1128 : GDKfree(task.rowerror);
2204 1128 : MT_sema_destroy(&task.producer);
2205 1128 : MT_sema_destroy(&task.consumer);
2206 :
2207 1128 : return res < 0 ? BUN_NONE : cnt;
2208 :
2209 0 : bailout:
2210 0 : if (task.fields) {
2211 0 : for (i = 0; i < as->nr_attrs; i++)
2212 0 : GDKfree(task.fields[i]);
2213 0 : GDKfree(task.fields);
2214 : }
2215 0 : GDKfree(task.time);
2216 0 : GDKfree(task.cols);
2217 0 : GDKfree(task.base[task.cur]);
2218 0 : GDKfree(task.rowerror);
2219 0 : for (i = 0; i < MAXWORKERS; i++)
2220 0 : GDKfree(ptask[i].cols);
2221 : return BUN_NONE;
2222 : }
2223 :
2224 : /* return the latest reject table */
2225 : str
2226 30 : COPYrejects(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
2227 : {
2228 30 : bat *row = getArgReference_bat(stk, pci, 0);
2229 30 : bat *fld = getArgReference_bat(stk, pci, 1);
2230 30 : bat *msg = getArgReference_bat(stk, pci, 2);
2231 30 : bat *inp = getArgReference_bat(stk, pci, 3);
2232 :
2233 30 : create_rejects_table(cntxt);
2234 30 : if (cntxt->error_row == NULL)
2235 0 : throw(MAL, "sql.rejects", "No reject table available");
2236 30 : MT_lock_set(&errorlock);
2237 30 : BAT *bn1 = COLcopy(cntxt->error_row, cntxt->error_row->ttype, true, TRANSIENT);
2238 30 : BAT *bn2 = COLcopy(cntxt->error_fld, cntxt->error_fld->ttype, true, TRANSIENT);
2239 30 : BAT *bn3 = COLcopy(cntxt->error_msg, cntxt->error_msg->ttype, true, TRANSIENT);
2240 30 : BAT *bn4 = COLcopy(cntxt->error_input, cntxt->error_input->ttype, true, TRANSIENT);
2241 30 : MT_lock_unset(&errorlock);
2242 30 : if (bn1 == NULL || bn2 == NULL || bn3 == NULL || bn4 == NULL) {
2243 0 : BBPreclaim(bn1);
2244 0 : BBPreclaim(bn2);
2245 0 : BBPreclaim(bn3);
2246 0 : BBPreclaim(bn4);
2247 0 : throw(MAL, "sql.rejects", GDK_EXCEPTION);
2248 : }
2249 30 : *row = bn1->batCacheid;
2250 30 : *fld = bn2->batCacheid;
2251 30 : *msg = bn3->batCacheid;
2252 30 : *inp = bn4->batCacheid;
2253 30 : BBPkeepref(bn1);
2254 30 : BBPkeepref(bn2);
2255 30 : BBPkeepref(bn3);
2256 30 : BBPkeepref(bn4);
2257 30 : (void) mb;
2258 30 : return MAL_SUCCEED;
2259 : }
2260 :
2261 : str
2262 14 : COPYrejects_clear(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
2263 : {
2264 14 : if (cntxt->error_row) {
2265 13 : MT_lock_set(&errorlock);
2266 13 : BATclear(cntxt->error_row, true);
2267 13 : if (cntxt->error_fld)
2268 13 : BATclear(cntxt->error_fld, true);
2269 13 : if (cntxt->error_msg)
2270 13 : BATclear(cntxt->error_msg, true);
2271 13 : if (cntxt->error_input)
2272 13 : BATclear(cntxt->error_input, true);
2273 13 : MT_lock_unset(&errorlock);
2274 : }
2275 14 : (void) mb;
2276 14 : (void) stk;
2277 14 : (void) pci;
2278 14 : return MAL_SUCCEED;
2279 : }
|