LCOV - code coverage report
Current view: top level - monetdb5/modules/mal - tablet.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 784 1083 72.4 %
Date: 2024-10-07 21:21:43 Functions: 25 30 83.3 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /*
      14             :  *  Niels Nes, Martin Kersten
      15             :  *
      16             :  * Parallel bulk load for SQL
      17             :  * The COPY INTO command for SQL is heavily CPU bound, which means
      18             :  * that ideally we would like to exploit the multi-cores to do that
      19             :  * work in parallel.
      20             :  * Complicating factors are the initial record offset, the
      21             :  * possible variable length of the input, and the original sort order
      22             :  * that should preferable be maintained.
      23             :  *
      24             :  * The code below consists of a file reader, which breaks up the
      25             :  * file into chunks of distinct rows. Then multiple parallel threads
      26             :  * grab them, and break them on the field boundaries.
      27             :  * After all fields are identified this way, the columns are converted
      28             :  * and stored in the BATs.
      29             :  *
      30             :  * The threads get a reference to a private copy of the READERtask.
      31             :  * It includes a list of columns they should handle. This is a basis
      32             :  * to distributed cheap and expensive columns over threads.
      33             :  *
      34             :  * The file reader overlaps IO with updates of the BAT.
      35             :  * Also the buffer size of the block stream might be a little small for
      36             :  * this task (1MB). It has been increased to 8MB, which indeed improved.
      37             :  *
      38             :  * The work divider allocates subtasks to threads based on the
      39             :  * observed time spending so far.
      40             :  */
      41             : 
      42             : #include "monetdb_config.h"
      43             : #include "tablet.h"
      44             : #include "mapi_prompt.h"
      45             : #include "mal_internal.h"
      46             : 
      47             : #include <string.h>
      48             : #include <ctype.h>
      49             : 
      50             : #define MAXWORKERS      64
      51             : #define MAXBUFFERS 2
      52             : /* We restrict the row length to be 32MB for the time being */
      53             : #define MAXROWSIZE(X) (X > 32*1024*1024 ? X : 32*1024*1024)
      54             : 
      55             : static MT_Lock errorlock = MT_LOCK_INITIALIZER(errorlock);
      56             : 
      57             : static BAT *
      58       10328 : void_bat_create(int adt, BUN nr)
      59             : {
      60       10328 :         BAT *b = COLnew(0, adt, nr, TRANSIENT);
      61             : 
      62             :         /* check for correct structures */
      63       10328 :         if (b == NULL)
      64             :                 return NULL;
      65       10328 :         if ((b = BATsetaccess(b, BAT_APPEND)) == NULL) {
      66             :                 return NULL;
      67             :         }
      68             : 
      69             :         /* disable all properties here */
      70       10328 :         b->tsorted = false;
      71       10328 :         b->trevsorted = false;
      72       10328 :         b->tnosorted = 0;
      73       10328 :         b->tnorevsorted = 0;
      74       10328 :         b->tseqbase = oid_nil;
      75       10328 :         b->tkey = false;
      76       10328 :         b->tnokey[0] = 0;
      77       10328 :         b->tnokey[1] = 0;
      78       10328 :         return b;
      79             : }
      80             : 
      81             : void
      82      123869 : TABLETdestroy_format(Tablet *as)
      83             : {
      84      123869 :         BUN p;
      85      123869 :         Column *fmt = as->format;
      86             : 
      87      592179 :         for (p = 0; p < as->nr_attrs; p++) {
      88      468257 :                 BBPreclaim(fmt[p].c);
      89      468256 :                 if (fmt[p].data)
      90       10333 :                         GDKfree(fmt[p].data);
      91             :         }
      92      123922 :         GDKfree(fmt);
      93      123925 : }
      94             : 
      95             : static oid
      96      122797 : check_BATs(Tablet *as)
      97             : {
      98      122797 :         Column *fmt = as->format;
      99      122797 :         BUN i = 0;
     100      122797 :         BUN cnt;
     101      122797 :         oid base;
     102             : 
     103      122797 :         if (fmt[i].c == NULL)
     104      122762 :                 i++;
     105      122797 :         cnt = BATcount(fmt[i].c);
     106      122797 :         base = fmt[i].c->hseqbase;
     107             : 
     108      122797 :         if (as->nr != cnt) {
     109        6653 :                 for (i = 0; i < as->nr_attrs; i++)
     110        5959 :                         if (fmt[i].c)
     111        5265 :                                 fmt[i].p = as->offset;
     112         694 :                 return oid_nil;
     113             :         }
     114             : 
     115      574004 :         for (i = 0; i < as->nr_attrs; i++) {
     116      451901 :                 BAT *b = fmt[i].c;
     117             : 
     118      451901 :                 if (b == NULL)
     119      122077 :                         continue;
     120             : 
     121      329824 :                 if (BATcount(b) != cnt || b->hseqbase != base)
     122           0 :                         return oid_nil;
     123             : 
     124      329824 :                 fmt[i].p = as->offset;
     125             :         }
     126             :         return base;
     127             : }
     128             : 
     129             : str
     130        1118 : TABLETcreate_bats(Tablet *as, BUN est)
     131             : {
     132        1118 :         Column *fmt = as->format;
     133        1118 :         BUN i, nr = 0;
     134             : 
     135       11452 :         for (i = 0; i < as->nr_attrs; i++) {
     136       10334 :                 if (fmt[i].skip)
     137           6 :                         continue;
     138       10328 :                 fmt[i].c = void_bat_create(fmt[i].adt, est);
     139       10328 :                 if (!fmt[i].c) {
     140           0 :                         while (i > 0) {
     141           0 :                                 if (!fmt[--i].skip) {
     142           0 :                                         BBPreclaim(fmt[i].c);
     143           0 :                                         fmt[i].c = NULL;
     144             :                                 }
     145             :                         }
     146           0 :                         throw(SQL, "copy", "Failed to create bat of size " BUNFMT "\n",
     147             :                                   est);
     148             :                 }
     149       10328 :                 fmt[i].ci = bat_iterator_nolock(fmt[i].c);
     150       10328 :                 nr++;
     151             :         }
     152        1118 :         if (!nr)
     153           0 :                 throw(SQL, "copy",
     154             :                           "At least one column should be read from the input\n");
     155             :         return MAL_SUCCEED;
     156             : }
     157             : 
     158             : str
     159        1093 : TABLETcollect(BAT **bats, Tablet *as)
     160             : {
     161        1093 :         Column *fmt = as->format;
     162        1093 :         BUN i, j;
     163        1093 :         BUN cnt = 0;
     164             : 
     165        1093 :         if (bats == NULL)
     166           0 :                 throw(SQL, "copy", "Missing container");
     167        2590 :         for (i = 0; i < as->nr_attrs && !cnt; i++)
     168        1497 :                 if (!fmt[i].skip)
     169        1494 :                         cnt = BATcount(fmt[i].c);
     170       11334 :         for (i = 0, j = 0; i < as->nr_attrs; i++) {
     171       10241 :                 if (fmt[i].skip)
     172           6 :                         continue;
     173       10235 :                 bats[j] = fmt[i].c;
     174       10235 :                 BBPfix(bats[j]->batCacheid);
     175       10235 :                 if ((fmt[i].c = BATsetaccess(fmt[i].c, BAT_READ)) == NULL)
     176           0 :                         throw(SQL, "copy",
     177             :                                   "Failed to set access at tablet part " BUNFMT "\n", cnt);
     178       10235 :                 fmt[i].c->tsorted = fmt[i].c->trevsorted = false;
     179       10235 :                 fmt[i].c->tkey = false;
     180       10235 :                 BATsettrivprop(fmt[i].c);
     181             : 
     182       10235 :                 if (cnt != BATcount(fmt[i].c))
     183           0 :                         throw(SQL, "copy", "Count " BUNFMT " differs from " BUNFMT "\n",
     184             :                                   BATcount(fmt[i].c), cnt);
     185       10235 :                 j++;
     186             :         }
     187             :         return MAL_SUCCEED;
     188             : }
     189             : 
     190             : // the starting quote character has already been skipped
     191             : 
     192             : static char *
     193     4677583 : tablet_skip_string(char *s, char quote, bool escape)
     194             : {
     195     4677583 :         size_t i = 0, j = 0;
     196   101373137 :         while (s[i]) {
     197   101323204 :                 if (escape && s[i] == '\\' && s[i + 1] != '\0')
     198      500383 :                         s[j++] = s[i++];
     199   100822821 :                 else if (s[i] == quote) {
     200     3252358 :                         if (s[i + 1] != quote)
     201             :                                 break;
     202             :                         i++;                            /* skip the first quote */
     203             :                 }
     204    96695554 :                 s[j++] = s[i++];
     205             :         }
     206     4677583 :         assert(s[i] == quote || s[i] == '\0');
     207     4677583 :         if (s[i] == 0)
     208             :                 return NULL;
     209     4677583 :         s[j] = 0;
     210     4677583 :         return s + i;
     211             : }
     212             : 
     213             : static int
     214           0 : TABLET_error(stream *s)
     215             : {
     216           0 :         const char *err = mnstr_peek_error(s);
     217           0 :         if (err)
     218           0 :                 TRC_ERROR(MAL_SERVER, "Stream error: %s\n", err);
     219           0 :         return -1;
     220             : }
     221             : 
     222             : /* The output line is first built before being sent. It solves a problem
     223             :    with UDP, where you may loose most of the information using short writes
     224             : */
     225             : static inline int
     226           0 : output_line(char **buf, size_t *len, char **localbuf, size_t *locallen,
     227             :                         Column *fmt, stream *fd, BUN nr_attrs, oid id)
     228             : {
     229           0 :         BUN i;
     230           0 :         ssize_t fill = 0;
     231             : 
     232           0 :         for (i = 0; i < nr_attrs; i++) {
     233           0 :                 if (fmt[i].c == NULL)
     234           0 :                         continue;
     235           0 :                 if (id < fmt[i].c->hseqbase
     236           0 :                         || id >= fmt[i].c->hseqbase + BATcount(fmt[i].c))
     237             :                         break;
     238           0 :                 fmt[i].p = id - fmt[i].c->hseqbase;
     239             :         }
     240           0 :         if (i == nr_attrs) {
     241           0 :                 for (i = 0; i < nr_attrs; i++) {
     242           0 :                         Column *f = fmt + i;
     243           0 :                         const char *p;
     244           0 :                         ssize_t l;
     245             : 
     246           0 :                         if (f->c) {
     247           0 :                                 p = BUNtail(f->ci, f->p);
     248             : 
     249           0 :                                 if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
     250           0 :                                         p = f->nullstr;
     251           0 :                                         l = (ssize_t) strlen(f->nullstr);
     252             :                                 } else {
     253           0 :                                         l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
     254           0 :                                         if (l < 0)
     255             :                                                 return -1;
     256           0 :                                         p = *localbuf;
     257             :                                 }
     258           0 :                                 if (fill + l + f->seplen >= (ssize_t) * len) {
     259             :                                         /* extend the buffer */
     260           0 :                                         char *nbuf;
     261           0 :                                         nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
     262           0 :                                         if (nbuf == NULL)
     263             :                                                 return -1;      /* *buf freed by caller */
     264           0 :                                         *buf = nbuf;
     265           0 :                                         *len = fill + l + f->seplen + BUFSIZ;
     266             :                                 }
     267           0 :                                 strncpy(*buf + fill, p, l);
     268           0 :                                 fill += l;
     269             :                         }
     270           0 :                         strncpy(*buf + fill, f->sep, f->seplen);
     271           0 :                         fill += f->seplen;
     272             :                 }
     273             :         }
     274           0 :         if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
     275           0 :                 return TABLET_error(fd);
     276             :         return 0;
     277             : }
     278             : 
     279             : static inline int
     280     6207109 : output_line_dense(char **buf, size_t *len, char **localbuf, size_t *locallen,
     281             :                                   Column *fmt, stream *fd, BUN nr_attrs)
     282             : {
     283     6207109 :         BUN i;
     284     6207109 :         ssize_t fill = 0;
     285             : 
     286    27914331 :         for (i = 0; i < nr_attrs; i++) {
     287    21707252 :                 Column *f = fmt + i;
     288    21707252 :                 const char *p;
     289    21707252 :                 ssize_t l;
     290             : 
     291    21707252 :                 if (f->c) {
     292    15500141 :                         p = BUNtail(f->ci, f->p);
     293             : 
     294    15500141 :                         if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
     295      850922 :                                 p = f->nullstr;
     296      850922 :                                 l = (ssize_t) strlen(p);
     297             :                         } else {
     298    14649174 :                                 l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
     299    14649189 :                                 if (l < 0)
     300             :                                         return -1;
     301    14649189 :                                 p = *localbuf;
     302             :                         }
     303    15500111 :                         if (fill + l + f->seplen >= (ssize_t) * len) {
     304             :                                 /* extend the buffer */
     305          78 :                                 char *nbuf;
     306          78 :                                 nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
     307          78 :                                 if (nbuf == NULL)
     308             :                                         return -1;      /* *buf freed by caller */
     309          78 :                                 *buf = nbuf;
     310          78 :                                 *len = fill + l + f->seplen + BUFSIZ;
     311             :                         }
     312    15500111 :                         strncpy(*buf + fill, p, l);
     313    15500111 :                         fill += l;
     314    15500111 :                         f->p++;
     315             :                 }
     316    21707222 :                 strncpy(*buf + fill, f->sep, f->seplen);
     317    21707222 :                 fill += f->seplen;
     318             :         }
     319     6207079 :         if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
     320           0 :                 return TABLET_error(fd);
     321             :         return 0;
     322             : }
     323             : 
     324             : static inline int
     325           0 : output_line_lookup(char **buf, size_t *len, Column *fmt, stream *fd,
     326             :                                    BUN nr_attrs, oid id)
     327             : {
     328           0 :         BUN i;
     329             : 
     330           0 :         for (i = 0; i < nr_attrs; i++) {
     331           0 :                 Column *f = fmt + i;
     332             : 
     333           0 :                 if (f->c) {
     334           0 :                         const void *p = BUNtail(f->ci, id - f->c->hseqbase);
     335             : 
     336           0 :                         if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
     337           0 :                                 size_t l = strlen(f->nullstr);
     338           0 :                                 if (mnstr_write(fd, f->nullstr, 1, l) != (ssize_t) l)
     339           0 :                                         return TABLET_error(fd);
     340             :                         } else {
     341           0 :                                 ssize_t l = f->tostr(f->extra, buf, len, f->adt, p);
     342             : 
     343           0 :                                 if (l < 0 || mnstr_write(fd, *buf, 1, l) != l)
     344           0 :                                         return TABLET_error(fd);
     345             :                         }
     346             :                 }
     347           0 :                 if (mnstr_write(fd, f->sep, 1, f->seplen) != f->seplen)
     348           0 :                         return TABLET_error(fd);
     349             :         }
     350             :         return 0;
     351             : }
     352             : 
     353             : /*
     354             :  * Fast Load
     355             :  * To speedup the CPU intensive loading of files we have to break
     356             :  * the file into pieces and perform parallel analysis. Experimentation
     357             :  * against lineitem SF1 showed that half of the time goes into very
     358             :  * basis atom analysis (41 out of 102 B instructions).
     359             :  * Furthermore, the actual insertion into the BATs takes only
     360             :  * about 10% of the total. With multi-core processors around
     361             :  * it seems we can gain here significantly.
     362             :  *
     363             :  * The approach taken is to fork a parallel scan over the text file.
     364             :  * We assume that the blocked stream is already
     365             :  * positioned correctly at the reading position. The start and limit
     366             :  * indicates the byte range to search for tuples.
     367             :  * If start> 0 then we first skip to the next record separator.
     368             :  * If necessary we read more than 'limit' bytes to ensure parsing a complete
     369             :  * record and stop at the record boundary.
     370             :  * Beware, we should allocate Tablet descriptors for each file segment,
     371             :  * otherwise we end up with a gross concurrency control problem.
     372             :  * The resulting BATs should be glued at the final phase.
     373             :  *
     374             :  * Raw Load
     375             :  * Front-ends can bypass most of the overhead in loading the BATs
     376             :  * by preparing the corresponding files directly and replace those
     377             :  * created by e.g. the SQL frontend.
     378             :  * This strategy is only advisable for cases where we have very
     379             :  * large files >200GB and/or are created by a well debugged code.
     380             :  *
     381             :  * To experiment with this approach, the code base responds
     382             :  * on negative number of cores by dumping the data directly in BAT
     383             :  * storage format into a collections of files on disk.
     384             :  * It reports on the actions to be taken to replace BATs.
     385             :  * This technique is initially only supported for fixed-sized columns.
     386             :  * The rawmode() indicator acts as the internal switch.
     387             :  */
     388             : 
     389             : /*
     390             :  * To speed up loading ascii files we have to determine the number of blocks.
     391             :  * This depends on the number of cores available.
     392             :  * For the time being we hardwire this decision based on our own
     393             :  * platforms.
     394             :  * Furthermore, we only consider parallel load for file-based requests.
     395             :  *
     396             :  * To simplify our world, we assume a single producer process.
     397             :  */
     398             : 
     399             : static int
     400           0 : output_file_default(Tablet *as, BAT *order, stream *fd, bstream *in)
     401             : {
     402           0 :         size_t len = BUFSIZ, locallen = BUFSIZ;
     403           0 :         int res = 0;
     404           0 :         char *buf = GDKmalloc(len);
     405           0 :         char *localbuf = GDKmalloc(len);
     406           0 :         BUN p, q;
     407           0 :         oid id;
     408           0 :         BUN offset = as->offset;
     409             : 
     410           0 :         if (buf == NULL || localbuf == NULL) {
     411           0 :                 GDKfree(buf);
     412           0 :                 GDKfree(localbuf);
     413           0 :                 return -1;
     414             :         }
     415           0 :         for (q = offset + as->nr, p = offset, id = order->hseqbase + offset; p < q;
     416           0 :                  p++, id++) {
     417           0 :                 if (((p - offset) & 8191) == 8191 && bstream_getoob(in)) {
     418             :                         res = -5;
     419             :                         break;
     420             :                 }
     421           0 :                 if ((res = output_line(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs, id)) < 0) {
     422             :                         break;
     423             :                 }
     424             :         }
     425           0 :         GDKfree(localbuf);
     426           0 :         GDKfree(buf);
     427           0 :         return res;
     428             : }
     429             : 
     430             : static int
     431      122785 : output_file_dense(Tablet *as, stream *fd, bstream *in)
     432             : {
     433      122785 :         size_t len = BUFSIZ, locallen = BUFSIZ;
     434      122785 :         int res = 0;
     435      122785 :         char *buf = GDKmalloc(len);
     436      122793 :         char *localbuf = GDKmalloc(len);
     437      122777 :         BUN i = 0;
     438             : 
     439      122777 :         if (buf == NULL || localbuf == NULL) {
     440           0 :                 GDKfree(buf);
     441           0 :                 GDKfree(localbuf);
     442           0 :                 return -1;
     443             :         }
     444     6329879 :         for (i = 0; i < as->nr; i++) {
     445     6207100 :                 if ((i & 8191) == 8191 && bstream_getoob(in)) {
     446             :                         res = -5;                       /* "Query aborted" */
     447             :                         break;
     448             :                 }
     449     6207100 :                 if ((res = output_line_dense(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs)) < 0) {
     450             :                         break;
     451             :                 }
     452             :         }
     453      122779 :         GDKfree(localbuf);
     454      122807 :         GDKfree(buf);
     455      122807 :         return res;
     456             : }
     457             : 
     458             : static int
     459           0 : output_file_ordered(Tablet *as, BAT *order, stream *fd, bstream *in)
     460             : {
     461           0 :         size_t len = BUFSIZ;
     462           0 :         int res = 0;
     463           0 :         char *buf = GDKmalloc(len);
     464           0 :         BUN p, q;
     465           0 :         BUN i = 0;
     466           0 :         BUN offset = as->offset;
     467             : 
     468           0 :         if (buf == NULL)
     469             :                 return -1;
     470           0 :         for (q = offset + as->nr, p = offset; p < q; p++, i++) {
     471           0 :                 oid h = order->hseqbase + p;
     472             : 
     473           0 :                 if (((p - offset) & 8191) == 8191 && bstream_getoob(in)) {
     474             :                         res = -5;
     475             :                         break;
     476             :                 }
     477           0 :                 if ((res = output_line_lookup(&buf, &len, as->format, fd, as->nr_attrs, h)) < 0) {
     478           0 :                         GDKfree(buf);
     479           0 :                         return res;
     480             :                 }
     481             :         }
     482           0 :         GDKfree(buf);
     483           0 :         return res;
     484             : }
     485             : 
     486             : int
     487      122758 : TABLEToutput_file(Tablet *as, BAT *order, stream *s, bstream *in)
     488             : {
     489      122758 :         oid base = oid_nil;
     490      122758 :         int ret = 0;
     491             : 
     492             :         /* only set nr if it is zero or lower (bogus) to the maximum value
     493             :          * possible (BATcount), if already set within BATcount range,
     494             :          * preserve value such that for instance SQL's reply_size still
     495             :          * works
     496             :          */
     497      122758 :         if (order) {
     498           0 :                 BUN maxnr = BATcount(order);
     499           0 :                 if (as->nr == BUN_NONE || as->nr > maxnr)
     500           0 :                         as->nr = maxnr;
     501             :         }
     502      122758 :         assert(as->nr != BUN_NONE);
     503             : 
     504      122758 :         base = check_BATs(as);
     505      122801 :         if (!order || !is_oid_nil(base)) {
     506      122801 :                 if (!order || order->hseqbase == base)
     507      122801 :                         ret = output_file_dense(as, s, in);
     508             :                 else
     509           0 :                         ret = output_file_ordered(as, order, s, in);
     510             :         } else {
     511           0 :                 ret = output_file_default(as, order, s, in);
     512             :         }
     513      122807 :         return ret;
     514             : }
     515             : 
     516             : /*
     517             :  *  Niels Nes, Martin Kersten
     518             :  *
     519             :  * Parallel bulk load for SQL
     520             :  * The COPY INTO command for SQL is heavily CPU bound, which means
     521             :  * that ideally we would like to exploit the multi-cores to do that
     522             :  * work in parallel.
     523             :  * Complicating factors are the initial record offset, the
     524             :  * possible variable length of the input, and the original sort order
     525             :  * that should preferable be maintained.
     526             :  *
     527             :  * The code below consists of a file reader, which breaks up the
     528             :  * file into chunks of distinct rows. Then multiple parallel threads
     529             :  * grab them, and break them on the field boundaries.
     530             :  * After all fields are identified this way, the columns are converted
     531             :  * and stored in the BATs.
     532             :  *
     533             :  * The threads get a reference to a private copy of the READERtask.
     534             :  * It includes a list of columns they should handle. This is a basis
     535             :  * to distributed cheap and expensive columns over threads.
     536             :  *
     537             :  * The file reader overlaps IO with updates of the BAT.
     538             :  * Also the buffer size of the block stream might be a little small for
     539             :  * this task (1MB). It has been increased to 8MB, which indeed improved.
     540             :  *
     541             :  * The work divider allocates subtasks to threads based on the
     542             :  * observed time spending so far.
     543             :  */
     544             : 
     545             : #define BREAKROW 1
     546             : #define UPDATEBAT 2
     547             : #define ENDOFCOPY 3
     548             : 
     549             : typedef struct {
     550             :         Client cntxt;
     551             :         int id;                                         /* for self reference */
     552             :         int state;                                      /* row break=1 , 2 = update bat */
     553             :         int workers;                            /* how many concurrent ones */
     554             :         int error;                                      /* error during row break */
     555             :         int next;
     556             :         int limit;
     557             :         BUN cnt, maxrow;                        /* first row in file chunk. */
     558             :         lng skip;                                       /* number of lines to be skipped */
     559             :         lng *time, wtime;                       /* time per col + time per thread */
     560             :         int rounds;                                     /* how often did we divide the work */
     561             :         bool ateof;                                     /* io control */
     562             :         bool from_stdin;
     563             :         bool escape;                            /* whether to handle \ escapes */
     564             :         bool besteffort;
     565             :         char quote;
     566             :         bstream *b;
     567             :         stream *out;
     568             :         MT_Id tid;
     569             :         MT_Sema producer;                       /* reader waits for call */
     570             :         MT_Sema consumer;                       /* reader waits for call */
     571             :         MT_Sema sema;                           /* threads wait for work , negative next implies exit */
     572             :         MT_Sema reply;                          /* let reader continue */
     573             :         Tablet *as;
     574             :         char *errbuf;
     575             :         const char *csep, *rsep;
     576             :         size_t seplen, rseplen;
     577             : 
     578             :         char *base[MAXBUFFERS], *input[MAXBUFFERS];     /* buffers for row splitter and tokenizer */
     579             :         size_t rowlimit[MAXBUFFERS];    /* determines maximal record length buffer */
     580             :         char **rows[MAXBUFFERS];
     581             :         lng *startlineno[MAXBUFFERS];
     582             :         int top[MAXBUFFERS];            /* number of rows in this buffer */
     583             :         int cur;                                        /* current buffer used by splitter and update threads */
     584             : 
     585             :         int *cols;                                      /* columns to handle */
     586             :         char ***fields;
     587             :         bte *rowerror;
     588             :         int errorcnt;
     589             :         bool aborted;
     590             :         bool set_qry_ctx;
     591             : } READERtask;
     592             : 
     593             : /* returns TRUE if there is/might be more */
     594             : static bool
     595      102786 : tablet_read_more(READERtask *task)
     596             : {
     597      102786 :         bstream *in = task->b;
     598      102786 :         stream *out = task->out;
     599      102786 :         size_t n =  task->b->size;
     600      102786 :         if (out) {
     601      101175 :                 do {
     602             :                         /* query is not finished ask for more */
     603             :                         /* we need more query text */
     604      101175 :                         if (bstream_next(in) < 0) {
     605           0 :                                 if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) {
     606           0 :                                         task->aborted = true;
     607           0 :                                         mnstr_clearerr(in->s);
     608             :                                 }
     609           0 :                                 return false;
     610             :                         }
     611      101175 :                         if (in->eof) {
     612      101173 :                                 if (bstream_getoob(in)) {
     613           0 :                                         task->aborted = true;
     614           0 :                                         return false;
     615             :                                 }
     616      101173 :                                 if (mnstr_write(out, PROMPT2, sizeof(PROMPT2) - 1, 1) == 1)
     617      101173 :                                         mnstr_flush(out, MNSTR_FLUSH_DATA);
     618      101173 :                                 in->eof = false;
     619             :                                 /* we need more query text */
     620      101173 :                                 if (bstream_next(in) < 0) {
     621           0 :                                         if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) {
     622           0 :                                                 task->aborted = true;
     623           0 :                                                 mnstr_clearerr(in->s);
     624             :                                         }
     625           0 :                                         return false;
     626             :                                 }
     627      101173 :                                 if (in->eof)
     628             :                                         return false;
     629             :                         }
     630      101173 :                 } while (in->len <= in->pos);
     631        1611 :         } else if (bstream_read(in, n) <= 0) {
     632             :                 return false;
     633             :         }
     634             :         return true;
     635             : }
     636             : 
     637             : /* note, the column value that is passed here is the 0 based value; the
     638             :  * lineno value on the other hand is 1 based */
     639             : static void
     640          34 : tablet_error(READERtask *task, lng idx, lng lineno, int col, const char *msg,
     641             :                          const char *fcn)
     642             : {
     643          34 :         assert(is_int_nil(col) || col >= 0);
     644          34 :         assert(is_lng_nil(lineno) || lineno >= 1);
     645          34 :         MT_lock_set(&errorlock);
     646          34 :         if (task->cntxt->error_row != NULL
     647          34 :                 && (BUNappend(task->cntxt->error_row, &lineno, false) != GDK_SUCCEED
     648          34 :                         || BUNappend(task->cntxt->error_fld, &(int) { col + 1 },
     649             :                                                  false) != GDK_SUCCEED
     650          34 :                         || BUNappend(task->cntxt->error_msg, msg, false) != GDK_SUCCEED
     651          34 :                         || BUNappend(task->cntxt->error_input, fcn,
     652             :                                                  false) != GDK_SUCCEED)) {
     653           0 :                 task->besteffort = false;
     654             :         }
     655          34 :         if (!is_lng_nil(idx) && task->rowerror && idx < task->limit)
     656          34 :                 task->rowerror[idx]++;
     657          34 :         if (task->as->error == NULL) {
     658          60 :                 const char *colnam = is_int_nil(col) || col < 0
     659          30 :                                 || (BUN) col >= task->as->nr_attrs ? NULL : task->as->format[col].name;
     660          30 :                 if (msg == NULL) {
     661           0 :                         task->besteffort = false;
     662          30 :                 } else if (!is_lng_nil(lineno)) {
     663          30 :                         if (!is_int_nil(col)) {
     664          28 :                                 if (colnam)
     665          28 :                                         task->as->error = createException(MAL, "sql.copy_from",
     666             :                                                                                                           "line " LLFMT ": column %d %s: %s",
     667             :                                                                                                           lineno, col + 1, colnam, msg);
     668             :                                 else
     669           0 :                                         task->as->error = createException(MAL, "sql.copy_from",
     670             :                                                                                                           "line " LLFMT ": column %d: %s",
     671             :                                                                                                           lineno, col + 1, msg);
     672             :                         } else {
     673           2 :                                 task->as->error = createException(MAL, "sql.copy_from",
     674             :                                                                                                   "line " LLFMT ": %s", lineno, msg);
     675             :                         }
     676             :                 } else {
     677           0 :                         if (!is_int_nil(col)) {
     678           0 :                                 if (colnam)
     679           0 :                                         task->as->error = createException(MAL, "sql.copy_from",
     680             :                                                                                                           "column %d %s: %s", col + 1, colnam,
     681             :                                                                                                           msg);
     682             :                                 else
     683           0 :                                         task->as->error = createException(MAL, "sql.copy_from",
     684             :                                                                                                           "column %d: %s", col + 1, msg);
     685             :                         } else {
     686           0 :                                 task->as->error = createException(MAL, "sql.copy_from", "%s", msg);
     687             :                         }
     688             :                 }
     689             :         }
     690          34 :         task->errorcnt++;
     691          34 :         MT_lock_unset(&errorlock);
     692          34 : }
     693             : 
     694             : /*
     695             :  * The row is broken into pieces directly on their field separators. It assumes that we have
     696             :  * the record in the cache already, so we can do most work quickly.
     697             :  * Furthermore, it assume a uniform (SQL) pattern, without whitespace skipping, but with quote and separator.
     698             :  */
     699             : 
     700             : static size_t
     701         115 : mystrlen(const char *s)
     702             : {
     703             :         /* Calculate and return the space that is needed for the function
     704             :          * mycpstr below to do its work. */
     705         115 :         size_t len = 0;
     706         115 :         const char *s0 = s;
     707             : 
     708       30878 :         while (*s) {
     709       30763 :                 if ((*s & 0x80) == 0) {
     710             :                         ;
     711           6 :                 } else if ((*s & 0xC0) == 0x80) {
     712             :                         /* continuation byte */
     713           0 :                         len += 3;
     714           6 :                 } else if ((*s & 0xE0) == 0xC0) {
     715             :                         /* two-byte sequence */
     716           6 :                         if ((s[1] & 0xC0) != 0x80)
     717           0 :                                 len += 3;
     718             :                         else
     719           6 :                                 s += 2;
     720           0 :                 } else if ((*s & 0xF0) == 0xE0) {
     721             :                         /* three-byte sequence */
     722           0 :                         if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
     723           0 :                                 len += 3;
     724             :                         else
     725           0 :                                 s += 3;
     726           0 :                 } else if ((*s & 0xF8) == 0xF0) {
     727             :                         /* four-byte sequence */
     728           0 :                         if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
     729           0 :                                 || (s[3] & 0xC0) != 0x80)
     730           0 :                                 len += 3;
     731             :                         else
     732           0 :                                 s += 4;
     733             :                 } else {
     734             :                         /* not a valid start byte */
     735           0 :                         len += 3;
     736             :                 }
     737       30763 :                 s++;
     738             :         }
     739         115 :         len += s - s0;
     740         115 :         return len;
     741             : }
     742             : 
     743             : static char *
     744         181 : mycpstr(char *t, const char *s)
     745             : {
     746             :         /* Copy the string pointed to by s into the buffer pointed to by
     747             :          * t, and return a pointer to the NULL byte at the end.  During
     748             :          * the copy we translate incorrect UTF-8 sequences to escapes
     749             :          * looking like <XX> where XX is the hexadecimal representation of
     750             :          * the incorrect byte.  The buffer t needs to be large enough to
     751             :          * hold the result, but the correct length can be calculated by
     752             :          * the function mystrlen above.*/
     753       31016 :         while (*s) {
     754       30835 :                 if ((*s & 0x80) == 0) {
     755       30829 :                         *t++ = *s++;
     756           6 :                 } else if ((*s & 0xC0) == 0x80) {
     757           0 :                         t += sprintf(t, "<%02X>", (uint8_t) * s++);
     758           6 :                 } else if ((*s & 0xE0) == 0xC0) {
     759             :                         /* two-byte sequence */
     760           6 :                         if ((s[1] & 0xC0) != 0x80)
     761           0 :                                 t += sprintf(t, "<%02X>", (uint8_t) * s++);
     762             :                         else {
     763           6 :                                 *t++ = *s++;
     764           6 :                                 *t++ = *s++;
     765             :                         }
     766           0 :                 } else if ((*s & 0xF0) == 0xE0) {
     767             :                         /* three-byte sequence */
     768           0 :                         if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
     769           0 :                                 t += sprintf(t, "<%02X>", (uint8_t) * s++);
     770             :                         else {
     771           0 :                                 *t++ = *s++;
     772           0 :                                 *t++ = *s++;
     773           0 :                                 *t++ = *s++;
     774             :                         }
     775           0 :                 } else if ((*s & 0xF8) == 0xF0) {
     776             :                         /* four-byte sequence */
     777           0 :                         if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
     778           0 :                                 || (s[3] & 0xC0) != 0x80)
     779           0 :                                 t += sprintf(t, "<%02X>", (uint8_t) * s++);
     780             :                         else {
     781           0 :                                 *t++ = *s++;
     782           0 :                                 *t++ = *s++;
     783           0 :                                 *t++ = *s++;
     784           0 :                                 *t++ = *s++;
     785             :                         }
     786             :                 } else {
     787             :                         /* not a valid start byte */
     788           0 :                         t += sprintf(t, "<%02X>", (uint8_t) * s++);
     789             :                 }
     790             :         }
     791         181 :         *t = 0;
     792         181 :         return t;
     793             : }
     794             : 
     795             : static str
     796          32 : SQLload_error(READERtask *task, lng idx, BUN attrs)
     797             : {
     798          32 :         str line;
     799          32 :         char *s;
     800          32 :         size_t sz = 0;
     801          32 :         BUN i;
     802             : 
     803         130 :         for (i = 0; i < attrs; i++) {
     804          98 :                 if (task->fields[i][idx])
     805          88 :                         sz += mystrlen(task->fields[i][idx]);
     806          98 :                 sz += task->seplen;
     807             :         }
     808             : 
     809          32 :         s = line = GDKmalloc(sz + task->rseplen + 1);
     810          32 :         if (line == NULL) {
     811           0 :                 tablet_error(task, idx, lng_nil, int_nil, "SQLload malloc error",
     812             :                                          "SQLload_error");
     813           0 :                 return NULL;
     814             :         }
     815         130 :         for (i = 0; i < attrs; i++) {
     816          98 :                 if (task->fields[i][idx])
     817          88 :                         s = mycpstr(s, task->fields[i][idx]);
     818          98 :                 if (i < attrs - 1)
     819          66 :                         s = mycpstr(s, task->csep);
     820             :         }
     821          32 :         strcpy(s, task->rsep);
     822          32 :         return line;
     823             : }
     824             : 
     825             : /*
     826             :  * The parsing of the individual values is straightforward. If the value represents
     827             :  * the null-replacement string then we grab the underlying nil.
     828             :  * If the string starts with the quote identified from SQL, we locate the tail
     829             :  * and interpret the body.
     830             :  *
     831             :  * If inserting fails, we return -1; if the value cannot be parsed, we
     832             :  * return -1 if besteffort is not set, otherwise we return 0, but in
     833             :  * either case an entry is added to the error table.
     834             :  */
     835             : static inline int
     836   279932328 : SQLinsert_val(READERtask *task, int col, int idx)
     837             : {
     838   279932328 :         Column *fmt = task->as->format + col;
     839   279932328 :         const void *adt;
     840   279932328 :         char buf[BUFSIZ];
     841   279932328 :         char *s = task->fields[col][idx];
     842   279932328 :         char *err = NULL;
     843   279932328 :         int ret = 0;
     844             : 
     845             :         /* include testing on the terminating null byte !! */
     846   279932328 :         if (s == NULL) {
     847     6390328 :                 adt = fmt->nildata;
     848     6390328 :                 fmt->c->tnonil = false;
     849             :         } else {
     850   273542000 :                 if (task->escape) {
     851   273341861 :                         size_t slen = strlen(s) + 1;
     852   273341861 :                         char *data = slen <= sizeof(buf) ? buf : GDKmalloc(strlen(s) + 1);
     853          18 :                         if (data == NULL
     854   306047914 :                                 || GDKstrFromStr((unsigned char *) data, (unsigned char *) s,
     855   273341861 :                                                                  strlen(s), '\0') < 0)
     856             :                                 adt = NULL;
     857             :                         else
     858   306047912 :                                 adt = fmt->frstr(fmt, fmt->adt, data);
     859   311242779 :                         if (data != buf)
     860          18 :                                 GDKfree(data);
     861             :                 } else
     862      200139 :                         adt = fmt->frstr(fmt, fmt->adt, s);
     863             :         }
     864             : 
     865   317833246 :         lng row = BATcount(fmt->c) + 1;
     866   317833246 :         if (adt == NULL) {
     867          27 :                 if (task->rowerror) {
     868          27 :                         err = SQLload_error(task, idx, task->as->nr_attrs);
     869          27 :                         if (s) {
     870          27 :                                 size_t slen = mystrlen(s);
     871          27 :                                 char *scpy = GDKmalloc(slen + 1);
     872          27 :                                 if (scpy == NULL) {
     873           0 :                                         tablet_error(task, idx, row, col,
     874             :                                                                  SQLSTATE(HY013) MAL_MALLOC_FAIL, err);
     875           0 :                                         task->besteffort = false;    /* no longer best effort */
     876           0 :                                         GDKfree(err);
     877           0 :                                         return -1;
     878             :                                 }
     879          27 :                                 mycpstr(scpy, s);
     880          27 :                                 s = scpy;
     881             :                         }
     882          27 :                         snprintf(buf, sizeof(buf), "'%s' expected%s%s%s", fmt->type,
     883             :                                          s ? " in '" : "", s ? s : "", s ? "'" : "");
     884          27 :                         GDKfree(s);
     885          27 :                         tablet_error(task, idx, row, col, buf, err);
     886          27 :                         GDKfree(err);
     887          27 :                         if (!task->besteffort)
     888             :                                 return -1;
     889             :                 }
     890           6 :                 ret = -!task->besteffort;    /* yep, two unary operators ;-) */
     891             :                 /* replace it with a nil */
     892           6 :                 adt = fmt->nildata;
     893           6 :                 fmt->c->tnonil = false;
     894             :         }
     895   317833225 :         if (bunfastapp(fmt->c, adt) == GDK_SUCCEED)
     896             :                 return ret;
     897             : 
     898             :         /* failure */
     899           0 :         if (task->rowerror) {
     900           0 :                 char *msg = GDKerrbuf;
     901           0 :                 err = SQLload_error(task, idx, task->as->nr_attrs);
     902           0 :                 tablet_error(task, idx, row, col, msg
     903           0 :                                          && *msg ? msg : "insert failed", err);
     904           0 :                 GDKfree(err);
     905             :         }
     906           0 :         task->besteffort = false;    /* no longer best effort */
     907           0 :         return -1;
     908             : }
     909             : 
     910             : static int
     911      324547 : SQLworker_column(READERtask *task, int col)
     912             : {
     913      324547 :         int i;
     914      324547 :         Column *fmt = task->as->format;
     915             : 
     916      324547 :         if (fmt[col].c == NULL)
     917             :                 return 0;
     918             : 
     919             :         /* watch out for concurrent threads */
     920      324541 :         MT_lock_set(&mal_copyLock);
     921      327576 :         if (!fmt[col].skip
     922      327576 :                 && BATcapacity(fmt[col].c) < BATcount(fmt[col].c) + task->next) {
     923         286 :                 if (BATextend(fmt[col].c, BATgrows(fmt[col].c) + task->limit) != GDK_SUCCEED) {
     924           0 :                         tablet_error(task, lng_nil, lng_nil, col,
     925             :                                                  "Failed to extend the BAT\n", "SQLworker_column");
     926           0 :                         MT_lock_unset(&mal_copyLock);
     927           0 :                         return -1;
     928             :                 }
     929             :         }
     930      327576 :         MT_lock_unset(&mal_copyLock);
     931             : 
     932   283334021 :         for (i = 0; i < task->top[task->cur]; i++) {
     933   282680194 :                 if (!fmt[col].skip && SQLinsert_val(task, col, i) < 0) {
     934          21 :                         BATsetcount(fmt[col].c, BATcount(fmt[col].c));
     935          21 :                         return -1;
     936             :                 }
     937             :         }
     938      326251 :         BATsetcount(fmt[col].c, BATcount(fmt[col].c));
     939             : 
     940      326251 :         return 0;
     941             : }
     942             : 
     943             : /*
     944             :  * The rows are broken on the column separator. Any error is shown and reflected with
     945             :  * setting the reference of the offending row fields to NULL.
     946             :  * This allows the loading to continue, skipping the minimal number of rows.
     947             :  * The details about the locations can be inspected from the error table.
     948             :  * We also trim the quotes around strings.
     949             :  */
     950             : static int
     951   136115137 : SQLload_parse_row(READERtask *task, int idx)
     952             : {
     953   136115137 :         BUN i;
     954   136115137 :         char errmsg[BUFSIZ];
     955   136115137 :         char ch = *task->csep;
     956   136115137 :         char *row = task->rows[task->cur][idx];
     957   136115137 :         lng startlineno = task->startlineno[task->cur][idx];
     958   136115137 :         Tablet *as = task->as;
     959   136115137 :         Column *fmt = as->format;
     960   136115137 :         bool error = false;
     961   136115137 :         str errline = NULL;
     962             : 
     963   136115137 :         assert(idx < task->top[task->cur]);
     964   136115137 :         assert(row);
     965   136115137 :         errmsg[0] = 0;
     966             : 
     967   136115137 :         if (task->quote || task->seplen != 1) {
     968    11354452 :                 for (i = 0; i < as->nr_attrs; i++) {
     969     8547153 :                         bool quote = false;
     970     8547153 :                         task->fields[i][idx] = row;
     971             :                         /* recognize fields starting with a quote, keep them */
     972     8547153 :                         if (*row && *row == task->quote) {
     973     4372389 :                                 quote = true;
     974     4372389 :                                 task->fields[i][idx] = row + 1;
     975     4372389 :                                 row = tablet_skip_string(row + 1, task->quote, task->escape);
     976             : 
     977     4635509 :                                 if (!row) {
     978           0 :                                         errline = SQLload_error(task, idx, i + 1);
     979           0 :                                         snprintf(errmsg, BUFSIZ, "Quote (%c) missing", task->quote);
     980           0 :                                         tablet_error(task, idx, startlineno, (int) i, errmsg,
     981             :                                                                  errline);
     982           0 :                                         GDKfree(errline);
     983           0 :                                         error = true;
     984           0 :                                         goto errors1;
     985             :                                 } else
     986     4635509 :                                         *row++ = 0;
     987             :                         }
     988             : 
     989             :                         /* eat away the column separator */
     990    50212042 :                         for (; *row; row++)
     991    49420447 :                                 if (*row == '\\' && task->escape) {
     992           2 :                                         if (row[1])
     993           2 :                                                 row++;
     994    49420445 :                                 } else if (*row == ch
     995     8018678 :                                                    && (task->seplen == 1
     996           4 :                                                            || strncmp(row, task->csep,
     997             :                                                                                   task->seplen) == 0)) {
     998     8018678 :                                         *row = 0;
     999     8018678 :                                         row += task->seplen;
    1000     8018678 :                                         goto endoffieldcheck;
    1001             :                                 }
    1002             : 
    1003             :                         /* not enough fields */
    1004      791595 :                         if (i < as->nr_attrs - 1) {
    1005           0 :                                 errline = SQLload_error(task, idx, i + 1);
    1006             :                                 /* it's the next value that is missing */
    1007           0 :                                 tablet_error(task, idx, startlineno, (int) i + 1,
    1008             :                                                          "Column value missing", errline);
    1009           0 :                                 GDKfree(errline);
    1010           0 :                                 error = true;
    1011           0 :   errors1:
    1012             :                                 /* we save all errors detected  as NULL values */
    1013           0 :                                 for (; i < as->nr_attrs; i++)
    1014           0 :                                         task->fields[i][idx] = NULL;
    1015           0 :                                 i--;
    1016             :                         }
    1017      791595 :   endoffieldcheck:
    1018     8810273 :                         ;
    1019             :                         /* check for user defined NULL string */
    1020     8810273 :                         if ((!quote || !fmt->null_length) && fmt->nullstr
    1021     7295373 :                                 && task->fields[i][idx]
    1022     7295373 :                                 && GDKstrcasecmp(task->fields[i][idx], fmt->nullstr) == 0)
    1023     1956755 :                                 task->fields[i][idx] = 0;
    1024             :                 }
    1025             :         } else {
    1026             :                 assert(!task->quote);
    1027             :                 assert(task->seplen == 1);
    1028   327712935 :                 for (i = 0; i < as->nr_attrs; i++) {
    1029   195925838 :                         task->fields[i][idx] = row;
    1030             : 
    1031             :                         /* eat away the column separator */
    1032  1861946839 :                         for (; *row; row++)
    1033  1730134885 :                                 if (*row == '\\' && task->escape) {
    1034         382 :                                         if (row[1])
    1035         382 :                                                 row++;
    1036  1730134503 :                                 } else if (*row == ch) {
    1037    64113884 :                                         *row = 0;
    1038    64113884 :                                         row++;
    1039    64113884 :                                         goto endoffield2;
    1040             :                                 }
    1041             : 
    1042             :                         /* not enough fields */
    1043   131811954 :                         if (i < as->nr_attrs - 1) {
    1044           4 :                                 errline = SQLload_error(task, idx, i + 1);
    1045             :                                 /* it's the next value that is missing */
    1046           4 :                                 tablet_error(task, idx, startlineno, (int) i + 1,
    1047             :                                                          "Column value missing", errline);
    1048           4 :                                 GDKfree(errline);
    1049           4 :                                 error = true;
    1050             :                                 /* we save all errors detected */
    1051          16 :                                 for (; i < as->nr_attrs; i++)
    1052           8 :                                         task->fields[i][idx] = NULL;
    1053           4 :                                 i--;
    1054             :                         }
    1055   131811950 :   endoffield2:
    1056   195925838 :                         ;
    1057             :                         /* check for user defined NULL string */
    1058   195925838 :                         if (fmt->nullstr && task->fields[i][idx]
    1059   179887425 :                                 && GDKstrcasecmp(task->fields[i][idx], fmt->nullstr) == 0) {
    1060     3662685 :                                 task->fields[i][idx] = 0;
    1061             :                         }
    1062             :                 }
    1063             :         }
    1064             :         /* check for too many values as well */
    1065   134594396 :         if (row && *row && i == as->nr_attrs) {
    1066           1 :                 errline = SQLload_error(task, idx, task->as->nr_attrs);
    1067           1 :                 snprintf(errmsg, BUFSIZ, "Leftover data '%s'", row);
    1068           1 :                 tablet_error(task, idx, startlineno, (int) i, errmsg, errline);
    1069           1 :                 GDKfree(errline);
    1070           1 :                 error = true;
    1071             :         }
    1072   134594396 :         return error ? -1 : 0;
    1073             : }
    1074             : 
    1075             : static void
    1076        1477 : SQLworker(void *arg)
    1077             : {
    1078        1477 :         READERtask *task = (READERtask *) arg;
    1079        1477 :         unsigned int i;
    1080        1477 :         int j, piece;
    1081        1477 :         lng t0;
    1082             : 
    1083        1477 :         GDKsetbuf(GDKmalloc(GDKMAXERRLEN));     /* where to leave errors */
    1084        1476 :         GDKclrerr();
    1085        1476 :         task->errbuf = GDKerrbuf;
    1086        1475 :         MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
    1087             : 
    1088        1475 :         MT_sema_down(&task->sema);
    1089      605182 :         while (task->top[task->cur] >= 0) {
    1090             :                 /* stage one, break the rows spread the work over the workers */
    1091      605182 :                 switch (task->state) {
    1092      300549 :                 case BREAKROW:
    1093      300549 :                         t0 = GDKusec();
    1094      301995 :                         piece = (task->top[task->cur] + task->workers) / task->workers;
    1095             : 
    1096      301995 :                         for (j = piece * task->id;
    1097   135141767 :                                  j < task->top[task->cur] && j < piece * (task->id + 1); j++)
    1098   134842087 :                                 if (task->rows[task->cur][j]) {
    1099   134842087 :                                         if (SQLload_parse_row(task, j) < 0) {
    1100           5 :                                                 task->errorcnt++;
    1101             :                                                 // early break unless best effort
    1102           5 :                                                 if (!task->besteffort) {
    1103           2 :                                                         for (j++;
    1104           4 :                                                                  j < task->top[task->cur]
    1105           4 :                                                                  && j < piece * (task->id + 1); j++)
    1106           8 :                                                                 for (i = 0; i < task->as->nr_attrs; i++)
    1107           6 :                                                                         task->fields[i][j] = NULL;
    1108             :                                                         break;
    1109             :                                                 }
    1110             :                                         }
    1111             :                                 }
    1112      299682 :                         task->wtime = GDKusec() - t0;
    1113      302321 :                         break;
    1114      303190 :                 case UPDATEBAT:
    1115      303190 :                         if (!task->besteffort && task->errorcnt)
    1116             :                                 break;
    1117             :                         /* stage two, updating the BATs */
    1118     1284257 :                         for (i = 0; i < task->as->nr_attrs; i++)
    1119      978904 :                                 if (task->cols[i]) {
    1120      323976 :                                         t0 = GDKusec();
    1121      324121 :                                         if (SQLworker_column(task, task->cols[i] - 1) < 0)
    1122             :                                                 break;
    1123      326044 :                                         t0 = GDKusec() - t0;
    1124      325999 :                                         task->time[i] += t0;
    1125      325999 :                                         task->wtime += t0;
    1126             :                                 }
    1127             :                         break;
    1128        1443 :                 case ENDOFCOPY:
    1129        1443 :                         MT_sema_up(&task->reply);
    1130        1464 :                         goto do_return;
    1131             :                 }
    1132      607555 :                 MT_sema_up(&task->reply);
    1133     1216029 :                 MT_sema_down(&task->sema);
    1134             :         }
    1135           0 :         MT_sema_up(&task->reply);
    1136             : 
    1137        1464 :   do_return:
    1138        1464 :         GDKfree(GDKerrbuf);
    1139        1476 :         GDKsetbuf(NULL);
    1140        1465 :         MT_thread_set_qry_ctx(NULL);
    1141        1462 : }
    1142             : 
    1143             : static void
    1144      103379 : SQLworkdivider(READERtask *task, READERtask *ptask, int nr_attrs, int threads)
    1145             : {
    1146      103379 :         int i, j, mi;
    1147      103379 :         lng loc[MAXWORKERS];
    1148             : 
    1149             :         /* after a few rounds we stick to the work assignment */
    1150      103379 :         if (task->rounds > 8)
    1151      103247 :                 return;
    1152             :         /* simple round robin the first time */
    1153        2772 :         if (threads == 1 || task->rounds++ == 0) {
    1154       16714 :                 for (i = j = 0; i < nr_attrs; i++, j++)
    1155       14074 :                         ptask[j % threads].cols[i] = task->cols[i];
    1156             :                 return;
    1157             :         }
    1158         132 :         memset(loc, 0, sizeof(loc));
    1159             :         /* use of load directives */
    1160        1930 :         for (i = 0; i < nr_attrs; i++)
    1161       13700 :                 for (j = 0; j < threads; j++)
    1162       11902 :                         ptask[j].cols[i] = 0;
    1163             : 
    1164             :         /* now allocate the work to the threads */
    1165        1930 :         for (i = 0; i < nr_attrs; i++, j++) {
    1166             :                 mi = 0;
    1167       11902 :                 for (j = 1; j < threads; j++)
    1168       10104 :                         if (loc[j] < loc[mi])
    1169        2806 :                                 mi = j;
    1170             : 
    1171        1798 :                 ptask[mi].cols[i] = task->cols[i];
    1172        1798 :                 loc[mi] += task->time[i];
    1173             :         }
    1174             :         /* reset the timer */
    1175        1930 :         for (i = 0; i < nr_attrs; i++, j++)
    1176        1798 :                 task->time[i] = 0;
    1177             : }
    1178             : 
    1179             : /*
    1180             :  * Reading is handled by a separate task as a preparation for more parallelism.
    1181             :  * A buffer is filled with proper rows.
    1182             :  * If we are reading from a file then a double buffering scheme ia activated.
    1183             :  * Reading from the console (stdin) remains single buffered only.
    1184             :  * If we end up with unfinished records, then the rowlimit will terminate the process.
    1185             :  */
    1186             : 
    1187             : typedef unsigned char (*dfa_t)[256];
    1188             : 
    1189             : static dfa_t
    1190        1118 : mkdfa(const unsigned char *sep, size_t seplen)
    1191             : {
    1192        1118 :         dfa_t dfa;
    1193        1118 :         size_t i, j, k;
    1194             : 
    1195        1118 :         dfa = GDKzalloc(seplen * sizeof(*dfa));
    1196        1118 :         if (dfa == NULL)
    1197             :                 return NULL;
    1198             :         /* Each character in the separator string advances the state by
    1199             :          * one.  If state reaches seplen, the separator was recognized.
    1200             :          *
    1201             :          * The first loop and the nested loop make sure that if in any
    1202             :          * state we encounter an invalid character, but part of what we've
    1203             :          * matched so far is a prefix of the separator, we go to the
    1204             :          * appropriate state. */
    1205        2258 :         for (i = 0; i < seplen; i++)
    1206        1140 :                 dfa[i][sep[0]] = 1;
    1207        2258 :         for (j = 0; j < seplen; j++) {
    1208        1140 :                 dfa[j][sep[j]] = (unsigned char) (j + 1);
    1209        1162 :                 for (k = 0; k < j; k++) {
    1210          44 :                         for (i = 0; i < j - k; i++)
    1211          22 :                                 if (sep[k + i] != sep[i])
    1212             :                                         break;
    1213          22 :                         if (i == j - k && dfa[j][sep[i]] <= i)
    1214           0 :                                 dfa[j][sep[i]] = (unsigned char) (i + 1);
    1215             :                 }
    1216             :         }
    1217             :         return dfa;
    1218             : }
    1219             : 
    1220             : #ifdef __has_builtin
    1221             : #if __has_builtin(__builtin_expect)
    1222             : /* __builtin_expect returns its first argument; it is expected to be
    1223             :  * equal to the second argument */
    1224             : #define unlikely(expr)  __builtin_expect((expr) != 0, 0)
    1225             : #define likely(expr)    __builtin_expect((expr) != 0, 1)
    1226             : #endif
    1227             : #endif
    1228             : #ifndef unlikely
    1229             : #ifdef _MSC_VER
    1230             : #define unlikely(expr)  (__assume(!(expr)), (expr))
    1231             : #define likely(expr)    (__assume((expr)), (expr))
    1232             : #else
    1233             : #define unlikely(expr)  (expr)
    1234             : #define likely(expr)    (expr)
    1235             : #endif
    1236             : #endif
    1237             : 
    1238             : static void
    1239        1118 : SQLproducer(void *p)
    1240             : {
    1241        1118 :         READERtask *task = (READERtask *) p;
    1242        1118 :         bool consoleinput = false;
    1243        1118 :         int cur = 0;                            // buffer being filled
    1244        1118 :         bool blocked[MAXBUFFERS] = { false };
    1245        1118 :         bool ateof[MAXBUFFERS] = { false };
    1246        1118 :         BUN cnt = 0, bufcnt[MAXBUFFERS] = { 0 };
    1247        1118 :         char *end = NULL, *e = NULL, *s = NULL, *base;
    1248        1118 :         const char *rsep = task->rsep;
    1249        1118 :         size_t rseplen = strlen(rsep), partial = 0;
    1250        1118 :         char quote = task->quote;
    1251        1118 :         dfa_t rdfa;
    1252        1118 :         lng rowno = 0;
    1253        1118 :         lng lineno = 1;
    1254        1118 :         lng startlineno = 1;
    1255        1118 :         int more = 0;
    1256             : 
    1257        1118 :         MT_sema_down(&task->producer);
    1258        1118 :         if (task->id < 0) {
    1259             :                 return;
    1260             :         }
    1261             : 
    1262        1118 :         MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
    1263        1118 :         rdfa = mkdfa((const unsigned char *) rsep, rseplen);
    1264        1118 :         if (rdfa == NULL) {
    1265           0 :                 tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate memory",
    1266             :                                          "");
    1267           0 :                 ateof[cur] = true;
    1268           0 :                 goto reportlackofinput;
    1269             :         }
    1270             : 
    1271             : /*      TRC_DEBUG(MAL_SERVER, "SQLproducer started size '%zu' and len '%zu'\n", task->b->size, task->b->len);*/
    1272             : 
    1273        1118 :         base = end = s = task->input[cur];
    1274        1118 :         *s = 0;
    1275        1118 :         task->cur = cur;
    1276        1118 :         if (task->as->filename == NULL) {
    1277         781 :                 consoleinput = true;
    1278         781 :                 goto parseSTDIN;
    1279             :         }
    1280      205235 :         for (;;) {
    1281      102786 :                 startlineno = lineno;
    1282      102786 :                 ateof[cur] = !tablet_read_more(task);
    1283             : 
    1284             :                 // we may be reading from standard input and may be out of input
    1285             :                 // warn the consumers
    1286      102786 :                 if (task->aborted || ((lineno & 8191) == 0 && bstream_getoob(task->cntxt->fdin))) {
    1287           0 :                         tablet_error(task, rowno, lineno, int_nil,
    1288             :                                                  "problem reported by client", s);
    1289           0 :                         ateof[cur] = true;
    1290           0 :                         goto reportlackofinput;
    1291             :                 }
    1292             : 
    1293      102786 :                 if (ateof[cur] && partial) {
    1294           1 :                         if (unlikely(partial)) {
    1295           1 :                                 tablet_error(task, rowno, lineno, int_nil,
    1296             :                                                          "incomplete record at end of file", s);
    1297           1 :                                 task->b->pos += partial;
    1298             :                         }
    1299           1 :                         goto reportlackofinput;
    1300             :                 }
    1301             : 
    1302      102785 :                 if (task->errbuf && task->errbuf[0]) {
    1303           0 :                         if (unlikely(GDKerrbuf && GDKerrbuf[0])) {
    1304           0 :                                 tablet_error(task, rowno, lineno, int_nil, GDKerrbuf,
    1305             :                                                          "SQLload_file");
    1306             : /*                              TRC_DEBUG(MAL_SERVER, "Bailout on SQLload\n");*/
    1307           0 :                                 ateof[cur] = true;
    1308           0 :                                 break;
    1309             :                         }
    1310             :                 }
    1311             : 
    1312      102785 :   parseSTDIN:
    1313             : 
    1314             :                 /* copy the stream buffer into the input buffer, which is guaranteed larger, but still limited */
    1315      103566 :                 partial = 0;
    1316      103566 :                 task->top[cur] = 0;
    1317      103566 :                 s = task->input[cur];
    1318      103566 :                 base = end;
    1319             :                 /* avoid too long records */
    1320      103566 :                 if (unlikely(end - s + task->b->len - task->b->pos >= task->rowlimit[cur])) {
    1321             :                         /* the input buffer should be extended, but 'base' is not shared
    1322             :                            between the threads, which we can not now update.
    1323             :                            Mimic an ateof instead; */
    1324           0 :                         tablet_error(task, rowno, lineno, int_nil, "record too long", "");
    1325           0 :                         ateof[cur] = true;
    1326             : /*                      TRC_DEBUG(MAL_SERVER, "Bailout on SQLload confronted with too large record\n");*/
    1327           0 :                         goto reportlackofinput;
    1328             :                 }
    1329      103566 :                 memcpy(end, task->b->buf + task->b->pos, task->b->len - task->b->pos);
    1330      103566 :                 end = end + task->b->len - task->b->pos;
    1331      103566 :                 *end = '\0';                    /* this is safe, as the stream ensures an extra byte */
    1332             :                 /* Note that we rescan from the start of a record (the last
    1333             :                  * partial buffer from the previous iteration), even if in the
    1334             :                  * previous iteration we have already established that there
    1335             :                  * is no record separator in the first, perhaps significant,
    1336             :                  * part of the buffer. This is because if the record separator
    1337             :                  * is longer than one byte, it is too complex (i.e. would
    1338             :                  * require more state) to be sure what the state of the quote
    1339             :                  * status is when we back off a few bytes from where the last
    1340             :                  * scan ended (we need to back off some since we could be in
    1341             :                  * the middle of the record separator).  If this is too
    1342             :                  * costly, we have to rethink the matter. */
    1343      103566 :                 if (task->from_stdin && *s == '\n' && task->maxrow == BUN_MAX) {
    1344           0 :                         ateof[cur] = true;
    1345           0 :                         goto reportlackofinput;
    1346             :                 }
    1347   145186502 :                 for (e = s; *e && e < end && cnt < task->maxrow;) {
    1348             :                         /* tokenize the record completely
    1349             :                          *
    1350             :                          * The format of the input should comply to the following
    1351             :                          * grammar rule [ [[quote][[esc]char]*[quote]csep]*rsep]*
    1352             :                          * where quote is a single user-defined character.
    1353             :                          * Within the quoted fields a character may be escaped
    1354             :                          * with a backslash.  The correct number of fields should
    1355             :                          * be supplied.  In the first phase we simply break the
    1356             :                          * rows at the record boundary. */
    1357             :                         int nutf = 0;
    1358             :                         int m = 0;
    1359             :                         bool bs = false;
    1360             :                         char q = 0;
    1361             :                         size_t i = 0;
    1362  3133543873 :                         while (*e) {
    1363  3133542844 :                                 if (task->skip > 0) {
    1364             :                                         /* no interpretation of data we're skipping, just
    1365             :                                          * look for newline */
    1366        1827 :                                         if (*e == '\n') {
    1367          27 :                                                 lineno++;
    1368          27 :                                                 break;
    1369             :                                         }
    1370             :                                 } else {
    1371             :                                         /* check for correctly encoded UTF-8 */
    1372  3133541017 :                                         if (nutf > 0) {
    1373        1841 :                                                 if (unlikely((*e & 0xC0) != 0x80))
    1374           1 :                                                         goto badutf8;
    1375        3680 :                                                 if (unlikely(m != 0 && (*e & m) == 0))
    1376           0 :                                                         goto badutf8;
    1377        1840 :                                                 m = 0;
    1378        1840 :                                                 nutf--;
    1379  3133539176 :                                         } else if ((*e & 0x80) != 0) {
    1380        1709 :                                                 if ((*e & 0xE0) == 0xC0) {
    1381        1582 :                                                         nutf = 1;
    1382        1582 :                                                         if (unlikely((e[0] & 0x1E) == 0))
    1383           0 :                                                                 goto badutf8;
    1384         127 :                                                 } else if ((*e & 0xF0) == 0xE0) {
    1385         122 :                                                         nutf = 2;
    1386         122 :                                                         if ((e[0] & 0x0F) == 0)
    1387           3 :                                                                 m = 0x20;
    1388           5 :                                                 } else if (likely((*e & 0xF8) == 0xF0)) {
    1389           5 :                                                         nutf = 3;
    1390           5 :                                                         if ((e[0] & 0x07) == 0)
    1391           5 :                                                                 m = 0x30;
    1392             :                                                 } else {
    1393           0 :                                                         goto badutf8;
    1394             :                                                 }
    1395  3133537467 :                                         } else if (*e == '\n')
    1396   145082766 :                                                 lineno++;
    1397             :                                         /* check for quoting and the row separator */
    1398  3133541016 :                                         if (bs) {
    1399             :                                                 bs = false;
    1400  3132997800 :                                         } else if (task->escape && *e == '\\') {
    1401             :                                                 bs = true;
    1402             :                                                 i = 0;
    1403  3132454584 :                                         } else if (*e == q) {
    1404             :                                                 q = 0;
    1405  3126895913 :                                         } else if (*e == quote) {
    1406             :                                                 q = quote;
    1407             :                                                 i = 0;
    1408  3121337160 :                                         } else if (q == 0) {
    1409  2940394300 :                                                 i = rdfa[i][(unsigned char) *e];
    1410  2940394300 :                                                 if (i == rseplen)
    1411             :                                                         break;
    1412             :                                         }
    1413             :                                 }
    1414  2988459907 :                                 e++;
    1415             :                         }
    1416   145083965 :                         if (*e == 0) {
    1417        1029 :                                 partial = e - s;
    1418             :                                 /* found an incomplete record, saved for next round */
    1419        1029 :                                 if (unlikely(s + partial < end)) {
    1420             :                                         /* found a EOS in the input */
    1421           0 :                                         tablet_error(task, rowno, startlineno, int_nil,
    1422             :                                                                  "record too long (EOS found)", "");
    1423           0 :                                         ateof[cur] = true;
    1424           0 :                                         goto reportlackofinput;
    1425             :                                 }
    1426             :                                 break;
    1427             :                         } else {
    1428   145082936 :                                 rowno++;
    1429   145082936 :                                 if (task->skip > 0) {
    1430          27 :                                         task->skip--;
    1431             :                                 } else {
    1432   145082909 :                                         if (cnt < task->maxrow) {
    1433   145082909 :                                                 task->startlineno[cur][task->top[cur]] = startlineno;
    1434   145082909 :                                                 task->rows[cur][task->top[cur]++] = s;
    1435   145082909 :                                                 startlineno = lineno;
    1436   145082909 :                                                 cnt++;
    1437             :                                         }
    1438   145082909 :                                         *(e + 1 - rseplen) = 0;
    1439             :                                 }
    1440   145082936 :                                 s = ++e;
    1441   145082936 :                                 task->b->pos += (size_t) (e - base);
    1442   145082936 :                                 base = e;
    1443   145082936 :                                 if (task->top[cur] == task->limit)
    1444             :                                         break;
    1445             :                         }
    1446             :                 }
    1447             : 
    1448      102536 :   reportlackofinput:
    1449             : /*        TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
    1450             : 
    1451      103567 :                 if (consoleinput) {
    1452      101956 :                         task->cur = cur;
    1453      101956 :                         task->ateof = ateof[cur];
    1454      101956 :                         task->cnt = bufcnt[cur];
    1455             :                         /* tell consumer to go ahead */
    1456      101956 :                         MT_sema_up(&task->consumer);
    1457             :                         /* then wait until it is done */
    1458      101956 :                         MT_sema_down(&task->producer);
    1459      101956 :                         if (cnt == task->maxrow) {
    1460         778 :                                 GDKfree(rdfa);
    1461         778 :                                 MT_thread_set_qry_ctx(NULL);
    1462         778 :                                 return;
    1463             :                         }
    1464             :                 } else {
    1465        1611 :                         assert(!blocked[cur]);
    1466        1611 :                         if (blocked[(cur + 1) % MAXBUFFERS]) {
    1467             :                                 /* first wait until other buffer is done */
    1468             : /*                              TRC_DEBUG(MAL_SERVER, "Wait for consumers to finish buffer: %d\n", (cur + 1) % MAXBUFFERS);*/
    1469             : 
    1470        1274 :                                 MT_sema_down(&task->producer);
    1471        1274 :                                 blocked[(cur + 1) % MAXBUFFERS] = false;
    1472        1274 :                                 if (task->state == ENDOFCOPY) {
    1473           0 :                                         GDKfree(rdfa);
    1474           0 :                                         MT_thread_set_qry_ctx(NULL);
    1475           0 :                                         return;
    1476             :                                 }
    1477             :                         }
    1478             :                         /* other buffer is done, proceed with current buffer */
    1479        1611 :                         assert(!blocked[(cur + 1) % MAXBUFFERS]);
    1480        1611 :                         blocked[cur] = true;
    1481        1611 :                         task->cur = cur;
    1482        1611 :                         task->ateof = ateof[cur];
    1483        1611 :                         task->cnt = bufcnt[cur];
    1484        1611 :                         more = !ateof[cur] || (e && e < end
    1485           0 :                                                                    && task->top[cur] == task->limit);
    1486             : /*                      TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
    1487             : 
    1488        1611 :                         MT_sema_up(&task->consumer);
    1489             : 
    1490        1611 :                         cur = (cur + 1) % MAXBUFFERS;
    1491             : /*                      TRC_DEBUG(MAL_SERVER, "May continue with buffer: %d\n", cur);*/
    1492             : 
    1493        1611 :                         if (cnt == task->maxrow) {
    1494         182 :                                 MT_sema_down(&task->producer);
    1495             : /*                              TRC_DEBUG(MAL_SERVER, "Producer delivered all\n");*/
    1496         182 :                                 GDKfree(rdfa);
    1497         182 :                                 MT_thread_set_qry_ctx(NULL);
    1498         182 :                                 return;
    1499             :                         }
    1500             :                 }
    1501             : /*              TRC_DEBUG(MAL_SERVER, "Continue producer buffer: %d\n", cur);*/
    1502             : 
    1503             :                 /* we ran out of input? */
    1504      102607 :                 if (task->ateof && !more) {
    1505             : /*                      TRC_DEBUG(MAL_SERVER, "Producer encountered eof\n");*/
    1506         158 :                         GDKfree(rdfa);
    1507         158 :                         MT_thread_set_qry_ctx(NULL);
    1508         158 :                         return;
    1509             :                 }
    1510             :                 /* consumers ask us to stop? */
    1511      102449 :                 if (task->state == ENDOFCOPY) {
    1512           0 :                         GDKfree(rdfa);
    1513           0 :                         MT_thread_set_qry_ctx(NULL);
    1514           0 :                         return;
    1515             :                 }
    1516      102449 :                 bufcnt[cur] = cnt;
    1517             :                 /* move the non-parsed correct row data to the head of the next buffer */
    1518      102449 :                 end = s = task->input[cur];
    1519             :         }
    1520           0 :         if (unlikely(cnt < task->maxrow && task->maxrow != BUN_NONE)) {
    1521           0 :                 char msg[256];
    1522           0 :                 snprintf(msg, sizeof(msg), "incomplete record at end of file:%s\n", s);
    1523           0 :                 task->as->error = GDKstrdup(msg);
    1524           0 :                 tablet_error(task, rowno, startlineno, int_nil,
    1525             :                                          "incomplete record at end of file", s);
    1526           0 :                 task->b->pos += partial;
    1527             :         }
    1528           0 :         GDKfree(rdfa);
    1529           0 :         MT_thread_set_qry_ctx(NULL);
    1530             : 
    1531           0 :         return;
    1532             : 
    1533           1 :   badutf8:
    1534           1 :         tablet_error(task, rowno, startlineno, int_nil,
    1535             :                                  "input not properly encoded UTF-8", "");
    1536           1 :         ateof[cur] = true;
    1537           1 :         goto reportlackofinput;
    1538             : }
    1539             : 
    1540             : static void
    1541        1146 : create_rejects_table(Client cntxt)
    1542             : {
    1543        1146 :         MT_lock_set(&mal_contextLock);
    1544        1146 :         if (cntxt->error_row == NULL) {
    1545         482 :                 cntxt->error_row = COLnew(0, TYPE_lng, 0, TRANSIENT);
    1546         482 :                 cntxt->error_fld = COLnew(0, TYPE_int, 0, TRANSIENT);
    1547         482 :                 cntxt->error_msg = COLnew(0, TYPE_str, 0, TRANSIENT);
    1548         482 :                 cntxt->error_input = COLnew(0, TYPE_str, 0, TRANSIENT);
    1549         482 :                 if (cntxt->error_row == NULL || cntxt->error_fld == NULL
    1550         482 :                         || cntxt->error_msg == NULL || cntxt->error_input == NULL) {
    1551           0 :                         BBPreclaim(cntxt->error_row);
    1552           0 :                         BBPreclaim(cntxt->error_fld);
    1553           0 :                         BBPreclaim(cntxt->error_msg);
    1554           0 :                         BBPreclaim(cntxt->error_input);
    1555           0 :                         cntxt->error_row = cntxt->error_fld = cntxt->error_msg = cntxt->error_input = NULL;
    1556             :                 }
    1557             :         }
    1558        1146 :         MT_lock_unset(&mal_contextLock);
    1559        1146 : }
    1560             : 
    1561             : BUN
    1562        1118 : SQLload_file(Client cntxt, Tablet *as, bstream *b, stream *out,
    1563             :                          const char *csep, const char *rsep, char quote, lng skip,
    1564             :                          lng maxrow, int best, bool from_stdin, const char *tabnam,
    1565             :                          bool escape)
    1566             : {
    1567        1118 :         BUN cnt = 0, cntstart = 0, leftover = 0;
    1568        1118 :         int res = 0;                            /* < 0: error, > 0: success, == 0: continue processing */
    1569        1118 :         int j;
    1570        1118 :         BUN firstcol;
    1571        1118 :         BUN i, attr;
    1572        1118 :         READERtask task;
    1573        1118 :         READERtask ptask[MAXWORKERS];
    1574        1118 :         int threads = 1;
    1575        1118 :         lng tio, t1 = 0;
    1576        1118 :         char name[MT_NAME_LEN];
    1577             : 
    1578        1118 :         if (maxrow < 0 || maxrow > (LL_CONSTANT(1) << 16)) {
    1579         112 :                 threads = GDKgetenv_int("tablet_threads", GDKnr_threads);
    1580         112 :                 if (threads > 1)
    1581         112 :                         threads = threads < MAXWORKERS ? threads - 1 : MAXWORKERS - 1;
    1582             :                 else
    1583             :                         threads = 1;
    1584             :         }
    1585             : 
    1586             : /*      TRC_DEBUG(MAL_SERVER, "Prepare copy work for '%d' threads col '%s' rec '%s' quot '%c'\n", threads, csep, rsep, quote);*/
    1587             : 
    1588        1118 :         memset(ptask, 0, sizeof(ptask));
    1589        2236 :         task = (READERtask) {
    1590             :                 .cntxt = cntxt,
    1591             :                 .from_stdin = from_stdin,
    1592             :                 .as = as,
    1593             :                 .escape = escape,       /* TODO: implement feature!!! */
    1594        1118 :                 .set_qry_ctx = MT_thread_get_qry_ctx() != NULL,
    1595             :         };
    1596             : 
    1597             :         /* create the reject tables */
    1598        1118 :         create_rejects_table(task.cntxt);
    1599        1118 :         if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL
    1600        1118 :                 || task.cntxt->error_msg == NULL || task.cntxt->error_input == NULL) {
    1601           0 :                 tablet_error(&task, lng_nil, lng_nil, int_nil,
    1602             :                                          "SQLload initialization failed", "");
    1603             :                 /* nothing allocated yet, so nothing to free */
    1604           0 :                 return BUN_NONE;
    1605             :         }
    1606             : 
    1607        1118 :         assert(rsep);
    1608        1118 :         assert(csep);
    1609        1118 :         assert(maxrow < 0 || maxrow <= (lng) BUN_MAX);
    1610        1118 :         task.fields = (char ***) GDKzalloc(as->nr_attrs * sizeof(char **));
    1611        1118 :         task.cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
    1612        1118 :         task.time = (lng *) GDKzalloc(as->nr_attrs * sizeof(lng));
    1613        1118 :         if (task.fields == NULL || task.cols == NULL || task.time == NULL) {
    1614           0 :                 tablet_error(&task, lng_nil, lng_nil, int_nil,
    1615             :                                          "memory allocation failed", "SQLload_file");
    1616           0 :                 goto bailout;
    1617             :         }
    1618        1118 :         task.cur = 0;
    1619        3354 :         for (i = 0; i < MAXBUFFERS; i++) {
    1620        2236 :                 task.base[i] = GDKmalloc(MAXROWSIZE(2 * b->size) + 2);
    1621        2236 :                 task.rowlimit[i] = MAXROWSIZE(2 * b->size);
    1622        2236 :                 if (task.base[i] == NULL) {
    1623           0 :                         tablet_error(&task, lng_nil, lng_nil, int_nil,
    1624             :                                                  SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
    1625           0 :                         goto bailout;
    1626             :                 }
    1627        2236 :                 task.base[i][0] = task.base[i][b->size + 1] = 0;
    1628        2236 :                 task.input[i] = task.base[i] + 1;       /* wrap the buffer with null bytes */
    1629             :         }
    1630        1118 :         task.besteffort = best;
    1631             : 
    1632        1118 :         if (maxrow < 0)
    1633          78 :                 task.maxrow = BUN_MAX;
    1634             :         else
    1635        1040 :                 task.maxrow = (BUN) maxrow;
    1636             : 
    1637        1118 :         task.skip = skip;
    1638        1118 :         task.quote = quote;
    1639        1118 :         task.csep = csep;
    1640        1118 :         task.seplen = strlen(csep);
    1641        1118 :         task.rsep = rsep;
    1642        1118 :         task.rseplen = strlen(rsep);
    1643        1118 :         task.errbuf = cntxt->errbuf;
    1644             : 
    1645        1118 :         MT_sema_init(&task.producer, 0, "task.producer");
    1646        1118 :         MT_sema_init(&task.consumer, 0, "task.consumer");
    1647        1118 :         task.ateof = false;
    1648        1118 :         task.b = b;
    1649        1118 :         task.out = out;
    1650             : 
    1651        1118 :         as->error = NULL;
    1652             : 
    1653             :         /* there is no point in creating more threads than we have columns */
    1654        1118 :         if (as->nr_attrs < (BUN) threads)
    1655          70 :                 threads = (int) as->nr_attrs;
    1656             : 
    1657             :         /* allocate enough space for pointers into the buffer pool.  */
    1658             :         /* the record separator is considered a column */
    1659        1118 :         task.limit = (int) (b->size / as->nr_attrs + as->nr_attrs);
    1660       11452 :         for (i = 0; i < as->nr_attrs; i++) {
    1661       10334 :                 task.fields[i] = GDKmalloc(sizeof(char *) * task.limit);
    1662       10334 :                 if (task.fields[i] == NULL) {
    1663           0 :                         if (task.as->error == NULL)
    1664           0 :                                 as->error = createException(MAL, "sql.copy_from",
    1665             :                                                                                         SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1666           0 :                         goto bailout;
    1667             :                 }
    1668       10334 :                 task.cols[i] = (int) (i + 1);   /* to distinguish non initialized later with zero */
    1669             :         }
    1670        3354 :         for (i = 0; i < MAXBUFFERS; i++) {
    1671        2236 :                 task.rows[i] = GDKzalloc(sizeof(char *) * task.limit);
    1672        2236 :                 task.startlineno[i] = GDKzalloc(sizeof(lng) * task.limit);
    1673        2236 :                 if (task.rows[i] == NULL || task.startlineno[i] == NULL) {
    1674           0 :                         GDKfree(task.rows[i]);
    1675           0 :                         GDKfree(task.startlineno[i]);
    1676           0 :                         tablet_error(&task, lng_nil, lng_nil, int_nil,
    1677             :                                                  SQLSTATE(HY013) MAL_MALLOC_FAIL,
    1678             :                                                  "SQLload_file:failed to alloc buffers");
    1679           0 :                         goto bailout;
    1680             :                 }
    1681             :         }
    1682        1118 :         task.rowerror = (bte *) GDKzalloc(sizeof(bte) * task.limit);
    1683        1118 :         if (task.rowerror == NULL) {
    1684           0 :                 tablet_error(&task, lng_nil, lng_nil, int_nil,
    1685             :                                          SQLSTATE(HY013) MAL_MALLOC_FAIL,
    1686             :                                          "SQLload_file:failed to alloc rowerror buffer");
    1687           0 :                 goto bailout;
    1688             :         }
    1689             : 
    1690        1118 :         task.id = 0;
    1691        1118 :         snprintf(name, sizeof(name), "prod-%s", tabnam);
    1692        1118 :         if (MT_create_thread(&task.tid, SQLproducer, (void *) &task, MT_THR_JOINABLE, name) < 0) {
    1693           0 :                 tablet_error(&task, lng_nil, lng_nil, int_nil,
    1694             :                                          SQLSTATE(42000) "failed to start producer thread",
    1695             :                                          "SQLload_file");
    1696           0 :                 goto bailout;
    1697             :         }
    1698             : /*      TRC_DEBUG(MAL_SERVER, "Parallel bulk load " LLFMT " - " BUNFMT "\n", skip, task.maxrow);*/
    1699             : 
    1700        1118 :         task.workers = threads;
    1701        2595 :         for (j = 0; j < threads; j++) {
    1702        1477 :                 ptask[j] = task;
    1703        1477 :                 ptask[j].id = j;
    1704        1477 :                 ptask[j].cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
    1705        1477 :                 if (ptask[j].cols == NULL) {
    1706           0 :                         tablet_error(&task, lng_nil, lng_nil, int_nil,
    1707             :                                                  SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
    1708           0 :                         task.id = -1;
    1709           0 :                         MT_sema_up(&task.producer);
    1710           0 :                         goto bailout;
    1711             :                 }
    1712        1477 :                 snprintf(name, sizeof(name), "ptask%d.sema", j);
    1713        1477 :                 MT_sema_init(&ptask[j].sema, 0, name);
    1714        1477 :                 snprintf(name, sizeof(name), "ptask%d.repl", j);
    1715        1477 :                 MT_sema_init(&ptask[j].reply, 0, name);
    1716        1477 :                 snprintf(name, sizeof(name), "wrkr%d-%s", j, tabnam);
    1717        1477 :                 if (MT_create_thread(&ptask[j].tid, SQLworker, (void *) &ptask[j], MT_THR_JOINABLE, name) < 0) {
    1718           0 :                         tablet_error(&task, lng_nil, lng_nil, int_nil,
    1719             :                                                  SQLSTATE(42000) "failed to start worker thread",
    1720             :                                                  "SQLload_file");
    1721           0 :                         threads = j;
    1722           0 :                         for (j = 0; j < threads; j++)
    1723           0 :                                 ptask[j].workers = threads;
    1724             :                 }
    1725             :         }
    1726        1118 :         if (threads == 0) {
    1727             :                 /* no threads started */
    1728           0 :                 task.id = -1;
    1729           0 :                 MT_sema_up(&task.producer);
    1730           0 :                 goto bailout;
    1731             :         }
    1732        1118 :         MT_sema_up(&task.producer);
    1733             : 
    1734        1118 :         tio = GDKusec();
    1735        1118 :         tio = GDKusec() - tio;
    1736        1118 :         t1 = GDKusec();
    1737        2239 :         for (firstcol = 0; firstcol < task.as->nr_attrs; firstcol++)
    1738        1121 :                 if (task.as->format[firstcol].c != NULL)
    1739             :                         break;
    1740      104504 :         while (res == 0 && cnt < task.maxrow) {
    1741             : 
    1742             :                 // track how many elements are in the aggregated BATs
    1743      103567 :                 cntstart = BATcount(task.as->format[firstcol].c);
    1744             :                 /* block until the producer has data available */
    1745      103567 :                 MT_sema_down(&task.consumer);
    1746      103567 :                 cnt += task.top[task.cur];
    1747      103567 :                 if (task.ateof && !task.top[task.cur])
    1748             :                         break;
    1749      103409 :                 t1 = GDKusec() - t1;
    1750             : /*              TRC_DEBUG(MAL_SERVER, "Break: %d rows\n", task.top[task.cur]);*/
    1751             : 
    1752      103409 :                 t1 = GDKusec();
    1753      103409 :                 if (task.top[task.cur]) {
    1754             :                         /* activate the workers to break rows */
    1755      410589 :                         for (j = 0; j < threads; j++) {
    1756             :                                 /* stage one, break the rows in parallel */
    1757      307210 :                                 ptask[j].error = 0;
    1758      307210 :                                 ptask[j].state = BREAKROW;
    1759      307210 :                                 ptask[j].next = task.top[task.cur];
    1760      307210 :                                 ptask[j].fields = task.fields;
    1761      307210 :                                 ptask[j].limit = task.limit;
    1762      307210 :                                 ptask[j].cnt = task.cnt;
    1763      307210 :                                 ptask[j].cur = task.cur;
    1764      307210 :                                 ptask[j].top[task.cur] = task.top[task.cur];
    1765      307210 :                                 MT_sema_up(&ptask[j].sema);
    1766             :                         }
    1767             :                 }
    1768      103409 :                 if (task.top[task.cur]) {
    1769             :                         /* await completion of row break phase */
    1770      410589 :                         for (j = 0; j < threads; j++) {
    1771      307210 :                                 MT_sema_down(&ptask[j].reply);
    1772      307210 :                                 if (ptask[j].error) {
    1773           0 :                                         res = -1;
    1774             : /*                                      TRC_ERROR(MAL_SERVER, "Error in task: %d %d\n", j, ptask[j].error);*/
    1775             :                                 }
    1776             :                         }
    1777             :                 }
    1778             : 
    1779             : /*              TRC_DEBUG(MAL_SERVER,
    1780             :                         "Fill the BATs '%d' " BUNFMT " cap " BUNFMT "\n",
    1781             :                         task.top[task.cur], task.cnt, BATcapacity(as->format[task.cur].c));*/
    1782             : 
    1783      103409 :                 if (task.top[task.cur]) {
    1784      103379 :                         if (res == 0) {
    1785      103379 :                                 SQLworkdivider(&task, ptask, (int) as->nr_attrs, threads);
    1786             : 
    1787             :                                 /* activate the workers to update the BATs */
    1788      513968 :                                 for (j = 0; j < threads; j++) {
    1789             :                                         /* stage two, update the BATs */
    1790      307210 :                                         ptask[j].state = UPDATEBAT;
    1791      307210 :                                         MT_sema_up(&ptask[j].sema);
    1792             :                                 }
    1793             :                         }
    1794             :                 }
    1795      103409 :                 tio = GDKusec();
    1796      103409 :                 tio = t1 - tio;
    1797             : 
    1798             :                 /* await completion of the BAT updates */
    1799      103409 :                 if (res == 0 && task.top[task.cur]) {
    1800      410589 :                         for (j = 0; j < threads; j++) {
    1801      307210 :                                 MT_sema_down(&ptask[j].reply);
    1802      307210 :                                 if (ptask[j].errorcnt > 0 && !ptask[j].besteffort) {
    1803      307210 :                                         res = -1;
    1804      307210 :                                         best = 0;
    1805             :                                 }
    1806             :                         }
    1807             :                 }
    1808             : 
    1809             :                 /* trim the BATs discarding error tuples */
    1810             : #define trimerrors(TYPE)                                                                                                \
    1811             :                 do {                                                                                                                    \
    1812             :                         TYPE *src, *dst;                                                                                        \
    1813             :                         leftover= BATcount(task.as->format[attr].c);                         \
    1814             :                         limit = leftover - cntstart;                                                            \
    1815             :                         dst =src= (TYPE *) BUNtloc(task.as->format[attr].ci,cntstart); \
    1816             :                         for(j = 0; j < (int) limit; j++, src++){                                     \
    1817             :                                 if ( task.rowerror[j]){                                                                 \
    1818             :                                         leftover--;                                                                                     \
    1819             :                                         continue;                                                                                       \
    1820             :                                 }                                                                                                               \
    1821             :                                 *dst++ = *src;                                                                                  \
    1822             :                         }                                                                                                                       \
    1823             :                         BATsetcount(task.as->format[attr].c, leftover );                     \
    1824             :                 } while (0)
    1825             : 
    1826             : /*              TRC_DEBUG(MAL_SERVER, "Trim bbest '%d' table size " BUNFMT " - rows found so far " BUNFMT "\n",
    1827             :                                          best, BATcount(as->format[firstcol].c), task.cnt); */
    1828             : 
    1829      103409 :                 if (best && BATcount(as->format[firstcol].c)) {
    1830             :                         BUN limit;
    1831             :                         int width;
    1832             : 
    1833          45 :                         for (attr = 0; attr < as->nr_attrs; attr++) {
    1834          31 :                                 if (as->format[attr].skip)
    1835           5 :                                         continue;
    1836          26 :                                 width = as->format[attr].c->twidth;
    1837          26 :                                 as->format[attr].ci = bat_iterator_nolock(as->format[attr].c);
    1838          26 :                                 switch (width) {
    1839           5 :                                 case 1:
    1840          16 :                                         trimerrors(bte);
    1841           5 :                                         break;
    1842           0 :                                 case 2:
    1843           0 :                                         trimerrors(sht);
    1844           0 :                                         break;
    1845          21 :                                 case 4:
    1846          55 :                                         trimerrors(int);
    1847          21 :                                         break;
    1848           0 :                                 case 8:
    1849           0 :                                         trimerrors(lng);
    1850           0 :                                         break;
    1851             : #ifdef HAVE_HGE
    1852           0 :                                 case 16:
    1853           0 :                                         trimerrors(hge);
    1854           0 :                                         break;
    1855             : #endif
    1856           0 :                                 default:
    1857             :                                 {
    1858           0 :                                         char *src, *dst;
    1859           0 :                                         leftover = BATcount(task.as->format[attr].c);
    1860           0 :                                         limit = leftover - cntstart;
    1861           0 :                                         dst = src = BUNtloc(task.as->format[attr].ci, cntstart);
    1862           0 :                                         for (j = 0; j < (int) limit; j++, src += width) {
    1863           0 :                                                 if (task.rowerror[j]) {
    1864           0 :                                                         leftover--;
    1865           0 :                                                         continue;
    1866             :                                                 }
    1867           0 :                                                 if (dst != src)
    1868           0 :                                                         memcpy(dst, src, width);
    1869           0 :                                                 dst += width;
    1870             :                                         }
    1871           0 :                                         BATsetcount(task.as->format[attr].c, leftover);
    1872             :                                 }
    1873           0 :                                         break;
    1874             :                                 }
    1875             :                         }
    1876             :                         // re-initialize the error vector;
    1877          14 :                         memset(task.rowerror, 0, task.limit);
    1878          14 :                         task.errorcnt = 0;
    1879             :                 }
    1880             : 
    1881      103409 :                 if (res < 0) {
    1882             :                         /* producer should stop */
    1883          23 :                         task.maxrow = cnt;
    1884          23 :                         task.state = ENDOFCOPY;
    1885          23 :                         task.ateof = true;
    1886             :                 }
    1887      103409 :                 if (task.ateof && task.top[task.cur] < task.limit && cnt != task.maxrow)
    1888             :                         break;
    1889      103409 :                 task.top[task.cur] = 0;
    1890      103409 :                 if (cnt == task.maxrow)
    1891         960 :                         task.ateof = true;
    1892      104527 :                 MT_sema_up(&task.producer);
    1893             :         }
    1894             : 
    1895             : /*      TRC_DEBUG(MAL_SERVER, "End of block stream eof=%d - res=%d\n", task.ateof, res);*/
    1896             : 
    1897        1118 :         cnt = BATcount(task.as->format[firstcol].c);
    1898             : 
    1899        1118 :         task.state = ENDOFCOPY;
    1900             : /*      TRC_DEBUG(MAL_SERVER, "Activate sync on disk\n");*/
    1901             : 
    1902        1118 :         if (!task.ateof || cnt < task.maxrow) {
    1903             : /*              TRC_DEBUG(MAL_SERVER, "Shut down reader\n");*/
    1904         175 :                 MT_sema_up(&task.producer);
    1905             :         }
    1906        1118 :         MT_join_thread(task.tid);
    1907             : 
    1908             : /*      TRC_DEBUG(MAL_SERVER, "Activate endofcopy\n");*/
    1909             : 
    1910        3713 :         for (j = 0; j < threads; j++) {
    1911        1477 :                 ptask[j].state = ENDOFCOPY;
    1912        1477 :                 MT_sema_up(&ptask[j].sema);
    1913             :         }
    1914             :         /* wait for their death */
    1915        2595 :         for (j = 0; j < threads; j++)
    1916        1477 :                 MT_sema_down(&ptask[j].reply);
    1917             : 
    1918             : /*      TRC_DEBUG(MAL_SERVER, "Kill the workers\n");*/
    1919             : 
    1920        2595 :         for (j = 0; j < threads; j++) {
    1921        1477 :                 MT_join_thread(ptask[j].tid);
    1922        1477 :                 GDKfree(ptask[j].cols);
    1923        1477 :                 MT_sema_destroy(&ptask[j].sema);
    1924        1477 :                 MT_sema_destroy(&ptask[j].reply);
    1925             :         }
    1926             : 
    1927             : /*      TRC_DEBUG(MAL_SERVER, "Found " BUNFMT " tuples\n", cnt);*/
    1928             : /*      TRC_DEBUG(MAL_SERVER, "Leftover input: %.63s\n", task.b->buf + task.b->pos);*/
    1929             : 
    1930       11452 :         for (i = 0; i < as->nr_attrs; i++) {
    1931       10334 :                 BAT *b = task.as->format[i].c;
    1932       10334 :                 if (b)
    1933       10328 :                         BATsettrivprop(b);
    1934       10334 :                 GDKfree(task.fields[i]);
    1935             :         }
    1936        1118 :         GDKfree(task.fields);
    1937        1118 :         GDKfree(task.cols);
    1938        1118 :         GDKfree(task.time);
    1939        4472 :         for (i = 0; i < MAXBUFFERS; i++) {
    1940        2236 :                 GDKfree(task.base[i]);
    1941        2236 :                 GDKfree(task.rows[i]);
    1942        2236 :                 GDKfree(task.startlineno[i]);
    1943             :         }
    1944        1118 :         if (task.rowerror)
    1945        1118 :                 GDKfree(task.rowerror);
    1946        1118 :         MT_sema_destroy(&task.producer);
    1947        1118 :         MT_sema_destroy(&task.consumer);
    1948             : 
    1949        1118 :         return res < 0 ? BUN_NONE : cnt;
    1950             : 
    1951           0 :   bailout:
    1952           0 :         if (task.fields) {
    1953           0 :                 for (i = 0; i < as->nr_attrs; i++)
    1954           0 :                         GDKfree(task.fields[i]);
    1955           0 :                 GDKfree(task.fields);
    1956             :         }
    1957           0 :         GDKfree(task.time);
    1958           0 :         GDKfree(task.cols);
    1959           0 :         GDKfree(task.base[task.cur]);
    1960           0 :         GDKfree(task.rowerror);
    1961           0 :         for (i = 0; i < MAXWORKERS; i++)
    1962           0 :                 GDKfree(ptask[i].cols);
    1963             :         return BUN_NONE;
    1964             : }
    1965             : 
    1966             : /* return the latest reject table */
    1967             : str
    1968          28 : COPYrejects(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1969             : {
    1970          28 :         bat *row = getArgReference_bat(stk, pci, 0);
    1971          28 :         bat *fld = getArgReference_bat(stk, pci, 1);
    1972          28 :         bat *msg = getArgReference_bat(stk, pci, 2);
    1973          28 :         bat *inp = getArgReference_bat(stk, pci, 3);
    1974             : 
    1975          28 :         create_rejects_table(cntxt);
    1976          28 :         if (cntxt->error_row == NULL)
    1977           0 :                 throw(MAL, "sql.rejects", "No reject table available");
    1978          28 :         MT_lock_set(&errorlock);
    1979          28 :         BAT *bn1 = COLcopy(cntxt->error_row, cntxt->error_row->ttype, true, TRANSIENT);
    1980          28 :         BAT *bn2 = COLcopy(cntxt->error_fld, cntxt->error_fld->ttype, true, TRANSIENT);
    1981          28 :         BAT *bn3 = COLcopy(cntxt->error_msg, cntxt->error_msg->ttype, true, TRANSIENT);
    1982          28 :         BAT *bn4 = COLcopy(cntxt->error_input, cntxt->error_input->ttype, true, TRANSIENT);
    1983          28 :         MT_lock_unset(&errorlock);
    1984          28 :         if (bn1 == NULL || bn2 == NULL || bn3 == NULL || bn4 == NULL) {
    1985           0 :                 BBPreclaim(bn1);
    1986           0 :                 BBPreclaim(bn2);
    1987           0 :                 BBPreclaim(bn3);
    1988           0 :                 BBPreclaim(bn4);
    1989           0 :                 throw(MAL, "sql.rejects", GDK_EXCEPTION);
    1990             :         }
    1991          28 :         *row = bn1->batCacheid;
    1992          28 :         *fld = bn2->batCacheid;
    1993          28 :         *msg = bn3->batCacheid;
    1994          28 :         *inp = bn4->batCacheid;
    1995          28 :         BBPkeepref(bn1);
    1996          28 :         BBPkeepref(bn2);
    1997          28 :         BBPkeepref(bn3);
    1998          28 :         BBPkeepref(bn4);
    1999          28 :         (void) mb;
    2000          28 :         return MAL_SUCCEED;
    2001             : }
    2002             : 
    2003             : str
    2004          13 : COPYrejects_clear(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    2005             : {
    2006          13 :         if (cntxt->error_row) {
    2007          13 :                 MT_lock_set(&errorlock);
    2008          13 :                 BATclear(cntxt->error_row, true);
    2009          13 :                 if (cntxt->error_fld)
    2010          13 :                         BATclear(cntxt->error_fld, true);
    2011          13 :                 if (cntxt->error_msg)
    2012          13 :                         BATclear(cntxt->error_msg, true);
    2013          13 :                 if (cntxt->error_input)
    2014          13 :                         BATclear(cntxt->error_input, true);
    2015          13 :                 MT_lock_unset(&errorlock);
    2016             :         }
    2017          13 :         (void) mb;
    2018          13 :         (void) stk;
    2019          13 :         (void) pci;
    2020          13 :         return MAL_SUCCEED;
    2021             : }

Generated by: LCOV version 1.14