LCOV - code coverage report
Current view: top level - monetdb5/modules/mal - tablet.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 939 1246 75.4 %
Date: 2025-03-25 21:27:32 Functions: 31 36 86.1 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024, 2025 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /*
      14             :  *  Niels Nes, Martin Kersten
      15             :  *
      16             :  * Parallel bulk load for SQL
      17             :  * The COPY INTO command for SQL is heavily CPU bound, which means
      18             :  * that ideally we would like to exploit the multi-cores to do that
      19             :  * work in parallel.
      20             :  * Complicating factors are the initial record offset, the
      21             :  * possible variable length of the input, and the original sort order
      22             :  * that should preferable be maintained.
      23             :  *
      24             :  * The code below consists of a file reader, which breaks up the
      25             :  * file into chunks of distinct rows. Then multiple parallel threads
      26             :  * grab them, and break them on the field boundaries.
      27             :  * After all fields are identified this way, the columns are converted
      28             :  * and stored in the BATs.
      29             :  *
      30             :  * The threads get a reference to a private copy of the READERtask.
      31             :  * It includes a list of columns they should handle. This is a basis
      32             :  * to distributed cheap and expensive columns over threads.
      33             :  *
      34             :  * The file reader overlaps IO with updates of the BAT.
      35             :  * Also the buffer size of the block stream might be a little small for
      36             :  * this task (1MB). It has been increased to 8MB, which indeed improved.
      37             :  *
      38             :  * The work divider allocates subtasks to threads based on the
      39             :  * observed time spending so far.
      40             :  */
      41             : 
      42             : #include "monetdb_config.h"
      43             : #include "tablet.h"
      44             : #include "mapi_prompt.h"
      45             : #include "mal_internal.h"
      46             : 
      47             : #include <string.h>
      48             : #include <ctype.h>
      49             : 
      50             : #define MAXWORKERS      64
      51             : #define MAXBUFFERS 2
      52             : /* We restrict the row length to be 32MB for the time being */
      53             : #define MAXROWSIZE(X) (X > 32*1024*1024 ? X : 32*1024*1024)
      54             : 
      55             : static MT_Lock errorlock = MT_LOCK_INITIALIZER(errorlock);
      56             : 
      57             : static BAT *
      58       10372 : void_bat_create(int adt, BUN nr)
      59             : {
      60       10372 :         BAT *b = COLnew(0, adt, nr, TRANSIENT);
      61             : 
      62             :         /* check for correct structures */
      63       10372 :         if (b == NULL)
      64             :                 return NULL;
      65       10372 :         if ((b = BATsetaccess(b, BAT_APPEND)) == NULL) {
      66             :                 return NULL;
      67             :         }
      68             : 
      69             :         /* disable all properties here */
      70       10372 :         b->tsorted = false;
      71       10372 :         b->trevsorted = false;
      72       10372 :         b->tnosorted = 0;
      73       10372 :         b->tnorevsorted = 0;
      74       10372 :         b->tseqbase = oid_nil;
      75       10372 :         b->tkey = false;
      76       10372 :         b->tnokey[0] = 0;
      77       10372 :         b->tnokey[1] = 0;
      78       10372 :         return b;
      79             : }
      80             : 
      81             : void
      82      127857 : TABLETdestroy_format(Tablet *as)
      83             : {
      84      127857 :         BUN p;
      85      127857 :         Column *fmt = as->format;
      86             : 
      87      609748 :         for (p = 0; p < as->nr_attrs; p++) {
      88      481853 :                 BBPreclaim(fmt[p].c);
      89      481849 :                 if (fmt[p].data)
      90       10377 :                         GDKfree(fmt[p].data);
      91             :         }
      92      127895 :         GDKfree(fmt);
      93      127903 : }
      94             : 
      95             : static oid
      96      126740 : check_BATs(Tablet *as)
      97             : {
      98      126740 :         Column *fmt = as->format;
      99      126740 :         BUN i = 0;
     100      126740 :         BUN cnt;
     101      126740 :         oid base;
     102             : 
     103      126740 :         if (fmt[i].c == NULL)
     104      126700 :                 i++;
     105      126740 :         cnt = BATcount(fmt[i].c);
     106      126740 :         base = fmt[i].c->hseqbase;
     107             : 
     108      126740 :         if (as->nr != cnt) {
     109        6697 :                 for (i = 0; i < as->nr_attrs; i++)
     110        5997 :                         if (fmt[i].c)
     111        5297 :                                 fmt[i].p = as->offset;
     112         700 :                 return oid_nil;
     113             :         }
     114             : 
     115      591164 :         for (i = 0; i < as->nr_attrs; i++) {
     116      465124 :                 BAT *b = fmt[i].c;
     117             : 
     118      465124 :                 if (b == NULL)
     119      125991 :                         continue;
     120             : 
     121      339133 :                 if (BATcount(b) != cnt || b->hseqbase != base)
     122           0 :                         return oid_nil;
     123             : 
     124      339133 :                 fmt[i].p = as->offset;
     125             :         }
     126             :         return base;
     127             : }
     128             : 
     129             : str
     130        1128 : TABLETcreate_bats(Tablet *as, BUN est)
     131             : {
     132        1128 :         Column *fmt = as->format;
     133        1128 :         BUN i, nr = 0;
     134             : 
     135       11506 :         for (i = 0; i < as->nr_attrs; i++) {
     136       10378 :                 if (fmt[i].skip)
     137           6 :                         continue;
     138       10372 :                 fmt[i].c = void_bat_create(fmt[i].adt, est);
     139       10372 :                 if (!fmt[i].c) {
     140           0 :                         while (i > 0) {
     141           0 :                                 if (!fmt[--i].skip) {
     142           0 :                                         BBPreclaim(fmt[i].c);
     143           0 :                                         fmt[i].c = NULL;
     144             :                                 }
     145             :                         }
     146           0 :                         throw(SQL, "copy", "Failed to create bat of size " BUNFMT "\n",
     147             :                                   est);
     148             :                 }
     149       10372 :                 fmt[i].ci = bat_iterator_nolock(fmt[i].c);
     150       10372 :                 nr++;
     151             :         }
     152        1128 :         if (!nr)
     153           0 :                 throw(SQL, "copy",
     154             :                           "At least one column should be read from the input\n");
     155             :         return MAL_SUCCEED;
     156             : }
     157             : 
     158             : str
     159        1100 : TABLETcollect(BAT **bats, Tablet *as)
     160             : {
     161        1100 :         Column *fmt = as->format;
     162        1100 :         BUN i, j;
     163        1100 :         BUN cnt = 0;
     164             : 
     165        1100 :         if (bats == NULL)
     166           0 :                 throw(SQL, "copy", "Missing container");
     167        2604 :         for (i = 0; i < as->nr_attrs && !cnt; i++)
     168        1504 :                 if (!fmt[i].skip)
     169        1501 :                         cnt = BATcount(fmt[i].c);
     170       11374 :         for (i = 0, j = 0; i < as->nr_attrs; i++) {
     171       10274 :                 if (fmt[i].skip)
     172           6 :                         continue;
     173       10268 :                 bats[j] = fmt[i].c;
     174       10268 :                 BBPfix(bats[j]->batCacheid);
     175       10268 :                 if ((fmt[i].c = BATsetaccess(fmt[i].c, BAT_READ)) == NULL)
     176           0 :                         throw(SQL, "copy",
     177             :                                   "Failed to set access at tablet part " BUNFMT "\n", cnt);
     178       10268 :                 fmt[i].c->tsorted = fmt[i].c->trevsorted = false;
     179       10268 :                 fmt[i].c->tkey = false;
     180       10268 :                 BATsettrivprop(fmt[i].c);
     181             : 
     182       10268 :                 if (cnt != BATcount(fmt[i].c))
     183           0 :                         throw(SQL, "copy", "Count " BUNFMT " differs from " BUNFMT "\n",
     184             :                                   BATcount(fmt[i].c), cnt);
     185       10268 :                 j++;
     186             :         }
     187             :         return MAL_SUCCEED;
     188             : }
     189             : 
     190             : // the starting quote character has already been skipped
     191             : 
     192             : static char *
     193     4688961 : tablet_skip_string(char *s, char quote, bool escape)
     194             : {
     195     4688961 :         size_t i = 0, j = 0;
     196   101817311 :         while (s[i]) {
     197   101769994 :                 if (escape && s[i] == '\\' && s[i + 1] != '\0')
     198      502365 :                         s[j++] = s[i++];
     199   101267629 :                 else if (s[i] == quote) {
     200     3282553 :                         if (s[i + 1] != quote)
     201             :                                 break;
     202             :                         i++;                            /* skip the first quote */
     203             :                 }
     204    97128350 :                 s[j++] = s[i++];
     205             :         }
     206     4688961 :         assert(s[i] == quote || s[i] == '\0');
     207     4688961 :         if (s[i] == 0)
     208             :                 return NULL;
     209     4688961 :         s[j] = 0;
     210     4688961 :         return s + i;
     211             : }
     212             : 
     213             : static int
     214           0 : TABLET_error(stream *s)
     215             : {
     216           0 :         const char *err = mnstr_peek_error(s);
     217           0 :         if (err)
     218           0 :                 TRC_ERROR(MAL_SERVER, "Stream error: %s\n", err);
     219           0 :         return -1;
     220             : }
     221             : 
     222             : /* The output line is first built before being sent. It solves a problem
     223             :    with UDP, where you may loose most of the information using short writes
     224             : */
     225             : 
     226             : static inline int
     227           0 : output_line(char **buf, size_t *len, char **localbuf, size_t *locallen,
     228             :                         Column *fmt, stream *fd, BUN nr_attrs, oid id)
     229             : {
     230           0 :         BUN i;
     231           0 :         ssize_t fill = 0;
     232             : 
     233           0 :         for (i = 0; i < nr_attrs; i++) {
     234           0 :                 if (fmt[i].c == NULL || fmt[i].virt)
     235           0 :                         continue;
     236           0 :                 if (id < fmt[i].c->hseqbase
     237           0 :                         || id >= fmt[i].c->hseqbase + BATcount(fmt[i].c))
     238             :                         break;
     239           0 :                 fmt[i].p = id - fmt[i].c->hseqbase;
     240             :         }
     241           0 :         if (i == nr_attrs) {
     242           0 :                 for (i = 0; i < nr_attrs; i++) {
     243           0 :                         Column *f = fmt + i;
     244           0 :                         const char *p;
     245           0 :                         ssize_t l;
     246             : 
     247           0 :                         if (f->c) {
     248           0 :                                 p = BUNtail(f->ci, f->p);
     249             : 
     250           0 :                                 if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
     251           0 :                                         p = f->nullstr;
     252           0 :                                         l = (ssize_t) strlen(f->nullstr);
     253             :                                 } else {
     254           0 :                                         l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
     255           0 :                                         if (l < 0)
     256             :                                                 return -1;
     257           0 :                                         p = *localbuf;
     258             :                                 }
     259           0 :                                 if (fill + l + f->seplen >= (ssize_t) * len) {
     260             :                                         /* extend the buffer */
     261           0 :                                         char *nbuf;
     262           0 :                                         nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
     263           0 :                                         if (nbuf == NULL)
     264             :                                                 return -1;      /* *buf freed by caller */
     265           0 :                                         *buf = nbuf;
     266           0 :                                         *len = fill + l + f->seplen + BUFSIZ;
     267             :                                 }
     268           0 :                                 strncpy(*buf + fill, p, l);
     269           0 :                                 fill += l;
     270             :                         }
     271           0 :                         strncpy(*buf + fill, f->sep, f->seplen);
     272           0 :                         fill += f->seplen;
     273             :                 }
     274             :         }
     275           0 :         if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
     276           0 :                 return TABLET_error(fd);
     277             :         return 0;
     278             : }
     279             : 
     280             : static ssize_t output_multiset_dense(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen, Column *fmt, BUN nr_attrs, int multiset, int composite, bool quoted, int id);
     281             : static ssize_t output_multiset_sorted(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen, Column *fmt, BUN nr_attrs, int multiset, int composite, bool quoted, int id);
     282             : 
     283             : static inline ssize_t
     284         329 : output_value(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen, Column *f)
     285             : {
     286         329 :         assert (!f->virt && !f->composite && !f->multiset);
     287             : 
     288         329 :         const char *p = BUNtail(f->ci, f->p);
     289         329 :         ssize_t l = 0;
     290             : 
     291         329 :         if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
     292           0 :                 p = f->nullstr;
     293           0 :                 l = (ssize_t) strlen(p);
     294             :         } else {
     295         329 :                 l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
     296         329 :                 if (l < 0)
     297             :                         return -1;
     298         329 :                 p = *localbuf;
     299             :         }
     300         329 :         if (fill + l + f->seplen >= (ssize_t) * len) {
     301             :                 /* extend the buffer */
     302           0 :                 char *nbuf;
     303           0 :                 nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
     304           0 :                 if (nbuf == NULL)
     305             :                         return -1;      /* *buf freed by caller */
     306           0 :                 *buf = nbuf;
     307           0 :                 *len = fill + l + f->seplen + BUFSIZ;
     308             :         }
     309         329 :         if (l) {
     310         329 :                 strncpy(*buf + fill, p, l);
     311         329 :                 fill += l;
     312         329 :                 f->p++;
     313             :         }
     314             :         return fill;
     315             : }
     316             : 
     317             : static ssize_t
     318         111 : output_composite(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen, Column *fmt, BUN nr_attrs, int composite, bool quoted)
     319             : {
     320         111 :         if (!quoted)
     321          25 :                 (*buf)[fill++] = '\'';
     322         111 :         (*buf)[fill++] = '(';
     323         111 :         (void)nr_attrs;
     324             :         /*
     325             :         if (fmt->virt) {
     326             :                 assert(0);
     327             :                 fmt++;
     328             :                 nr_attrs--;
     329             :         }
     330             :         */
     331         111 :         int first = 1, j = 0;
     332         309 :         for( ; composite; composite--) {
     333         198 :                 Column *f = fmt + j;
     334             : 
     335         198 :                 if (!first) {
     336          87 :                         (*buf)[fill++] = ',';
     337          87 :                         (*buf)[fill++] = ' ';
     338          87 :                         (*buf)[fill] = 0;
     339             :                 }
     340         198 :                 if (f->multiset) {
     341          10 :                         int nr_attrs = f->nrfields - 1;
     342          10 :                         Column *rowid = fmt+j+nr_attrs;
     343          10 :                         const char *p = BUNtail(rowid->ci, rowid->p);
     344             : 
     345             :                         /* various cases:
     346             :                          *      rowid column dense (but for ints !!
     347             :                          *      rowid column sorted
     348             :                          *      else
     349             :                          */
     350          10 :                         assert(rowid->c->tsorted);
     351          10 :                         if (rowid->c->tkey)
     352          10 :                                 fill = output_multiset_dense(buf, len, fill, localbuf, locallen, fmt + j + 1, nr_attrs-1, f->multiset, f->composite, true, *(int*)p);
     353             :                         else
     354           0 :                                 fill = output_multiset_sorted(buf, len, fill, localbuf, locallen, fmt + j + 1, nr_attrs-1, f->multiset, f->composite, true, *(int*)p);
     355          10 :                         fmt[j+nr_attrs].p++;
     356          10 :                         f = fmt + j + nr_attrs; /* closing bracket */
     357          10 :                         j += nr_attrs + 1;
     358         188 :                 } else if (f->composite) {
     359          44 :                         int nr_attrs = f->nrfields - 1;
     360          44 :                         fill = output_composite(buf, len, fill, localbuf, locallen, fmt + j + 1, nr_attrs-j-1, f->composite, true);
     361          44 :                         f = fmt + j + nr_attrs; /* closing bracket */
     362          44 :                         j += nr_attrs + 1;
     363         144 :                 } else if (f->c) {
     364         144 :                         fill = output_value(buf, len, fill, localbuf, locallen, f);
     365         144 :                         j++;
     366             :                 }
     367         198 :                 first = 0;
     368             :         }
     369         111 :         (*buf)[fill++] = ')';
     370         111 :         if (!quoted)
     371          25 :                 (*buf)[fill++] = '\'';
     372         111 :         (*buf)[fill] = 0;
     373         111 :         return fill;
     374             : }
     375             : 
     376             : #define MS_ARRAY 2
     377             : /* id is prev id + 1 */
     378             : static ssize_t
     379          59 : output_multiset_dense(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen,
     380             :                                   Column *fmt, BUN nr_attrs, int multiset, int composite, bool quoted, int id)
     381             : {
     382          59 :         nr_attrs -= (multiset == MS_ARRAY)?2:1;
     383          59 :         Column *msid = fmt + nr_attrs;
     384          59 :         int *idp = (int*)Tloc(msid->c, msid->p);
     385          59 :         int first = 1;
     386             : 
     387          59 :         if (!quoted)
     388          49 :                 (*buf)[fill++] = '\'';
     389          59 :         (*buf)[fill++] = '{';
     390          59 :         (*buf)[fill] = 0;
     391         190 :         for (; *idp == id && fill > 0; idp++, msid->p++) {
     392         131 :                 if (!first)
     393          72 :                         (*buf)[fill++] = ',';
     394         131 :                 if (composite) {
     395          42 :                         fill = output_composite(buf, len, fill, localbuf, locallen, fmt, nr_attrs, composite, true);
     396             :                 } else {
     397          89 :                         fill = output_value(buf, len, fill, localbuf, locallen, fmt);
     398             :                 }
     399         131 :                 first = 0;
     400             :         }
     401          59 :         if (fill < 0)
     402             :                 return fill;
     403          59 :         (*buf)[fill++] = '}';
     404          59 :         if (!quoted)
     405          49 :                 (*buf)[fill++] = '\'';
     406          59 :         (*buf)[fill] = 0;
     407          59 :         return fill;
     408             : }
     409             : 
     410             : /* id >= prev id */
     411             : static ssize_t
     412           6 : output_multiset_sorted(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen,
     413             :                                   Column *fmt, BUN nr_attrs, int multiset, int composite, bool quoted, int id)
     414             : {
     415           6 :         nr_attrs -= (multiset == MS_ARRAY)?2:1;
     416           6 :         Column *msid = fmt + nr_attrs;
     417             :         /* how to also keep prev id */
     418           6 :         BUN pos = msid->p;
     419           6 :         int *idp = (int*)Tloc(msid->c, pos);
     420           6 :         int first = 1;
     421             : 
     422           6 :         if (msid->p) {
     423          17 :                 while (pos > 0 && idp[-1] >= id) {
     424          12 :                         idp--;
     425          12 :                         pos--;
     426             :                 }
     427           5 :                 msid->p = pos;
     428             :         }
     429             : 
     430           6 :         if (!quoted)
     431           6 :                 (*buf)[fill++] = '\'';
     432           6 :         (*buf)[fill++] = '{';
     433           6 :         (*buf)[fill] = 0;
     434          24 :         for (; *idp == id && fill > 0; idp++, msid->p++, pos++) {
     435          18 :                 if (!first)
     436          12 :                         (*buf)[fill++] = ',';
     437          18 :                 if (composite) {
     438           0 :                         fill = output_composite(buf, len, fill, localbuf, locallen, fmt, nr_attrs, composite, true);
     439             :                 } else {
     440          18 :                         fmt->p = pos;
     441          18 :                         fill = output_value(buf, len, fill, localbuf, locallen, fmt);
     442             :                 }
     443          18 :                 first = 0;
     444             :         }
     445           6 :         if (fill < 0)
     446             :                 return fill;
     447           6 :         (*buf)[fill++] = '}';
     448           6 :         if (!quoted)
     449           6 :                 (*buf)[fill++] = '\'';
     450           6 :         (*buf)[fill] = 0;
     451           6 :         return fill;
     452             : }
     453             : 
     454             : static ssize_t
     455          80 : output_line_complex(char **buf, size_t *len, ssize_t fill, char **localbuf, size_t *locallen,
     456             :                                   Column *fmt, BUN nr_attrs)
     457             : {
     458          80 :         BUN j;
     459             : 
     460         318 :         for (j = 0; j < nr_attrs; ) {
     461         238 :                 Column *f = fmt + j;
     462         238 :                 const char *p;
     463             : 
     464         238 :                 if (f->multiset) {
     465          55 :                         int nr_attrs = f->nrfields - 1;
     466          55 :                         Column *rowid = fmt+j+nr_attrs;
     467          55 :                         p = BUNtail(rowid->ci, rowid->p);
     468             : 
     469          55 :                         assert(rowid->c->tsorted);
     470          55 :                         if (rowid->c->tkey)
     471          49 :                             fill = output_multiset_dense(buf, len, fill, localbuf, locallen, fmt + j + 1, nr_attrs-1, f->multiset, f->composite, false, *(int*)p);
     472             :                         else
     473           6 :                             fill = output_multiset_sorted(buf, len, fill, localbuf, locallen, fmt + j + 1, nr_attrs-1, f->multiset, f->composite, false, *(int*)p);
     474          55 :                         fmt[j+nr_attrs].p++;
     475          55 :                         f = fmt + j + nr_attrs; /* closing bracket */
     476          55 :                         j += nr_attrs + 1;
     477         183 :                 } else if (f->composite) {
     478          25 :                         int nr_attrs = f->nrfields - 1;
     479          25 :                         fill = output_composite(buf, len, fill, localbuf, locallen, fmt + j + 1, nr_attrs-j, f->composite, false);
     480          25 :                         f = fmt + j + nr_attrs; /* closing bracket */
     481          25 :                         j += nr_attrs + 1;
     482         158 :                 } else if (f->c) {
     483          78 :                         fill = output_value(buf, len, fill, localbuf, locallen, f);
     484          78 :                         j++;
     485             :                 } else {
     486          80 :                         j++;
     487             :                 }
     488         238 :                 strncpy(*buf + fill, f->sep, f->seplen);
     489         238 :                 fill += f->seplen;
     490             :         }
     491          80 :         return fill;
     492             : }
     493             : 
     494             : static inline int
     495     6210605 : output_line_dense(char **buf, size_t *len, char **localbuf, size_t *locallen,
     496             :                                   Column *fmt, stream *fd, BUN nr_attrs)
     497             : {
     498     6210605 :         BUN i;
     499     6210605 :         ssize_t fill = 0;
     500             : 
     501    27939363 :         for (i = 0; i < nr_attrs; i++) {
     502    21728784 :                 Column *f = fmt + i;
     503    21728784 :                 const char *p;
     504    21728784 :                 ssize_t l;
     505             : 
     506    21728784 :                 if (f->virt)
     507           0 :                         continue;
     508    21728784 :                 if (f->c) {
     509    15518174 :                         p = BUNtail(f->ci, f->p);
     510             : 
     511    15518174 :                         if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
     512      854566 :                                 p = f->nullstr;
     513      854566 :                                 l = (ssize_t) strlen(p);
     514             :                         } else {
     515    14663533 :                                 l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
     516    14663582 :                                 if (l < 0)
     517             :                                         return -1;
     518    14663582 :                                 p = *localbuf;
     519             :                         }
     520    15518148 :                         if (fill + l + f->seplen >= (ssize_t) * len) {
     521             :                                 /* extend the buffer */
     522          78 :                                 char *nbuf;
     523          78 :                                 nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
     524          78 :                                 if (nbuf == NULL)
     525             :                                         return -1;      /* *buf freed by caller */
     526          78 :                                 *buf = nbuf;
     527          78 :                                 *len = fill + l + f->seplen + BUFSIZ;
     528             :                         }
     529    15518148 :                         strncpy(*buf + fill, p, l);
     530    15518148 :                         fill += l;
     531    15518148 :                         f->p++;
     532             :                 }
     533    21728758 :                 strncpy(*buf + fill, f->sep, f->seplen);
     534    21728758 :                 fill += f->seplen;
     535             :         }
     536     6210579 :         if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
     537           0 :                 return TABLET_error(fd);
     538             :         return 0;
     539             : }
     540             : 
     541             : static inline int
     542           0 : output_line_lookup(char **buf, size_t *len, Column *fmt, stream *fd,
     543             :                                    BUN nr_attrs, oid id)
     544             : {
     545           0 :         BUN i;
     546             : 
     547           0 :         for (i = 0; i < nr_attrs; i++) {
     548           0 :                 Column *f = fmt + i;
     549             : 
     550           0 :                 if (f->virt)
     551           0 :                         continue;
     552           0 :                 if (f->c) {
     553           0 :                         const void *p = BUNtail(f->ci, id - f->c->hseqbase);
     554             : 
     555           0 :                         if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
     556           0 :                                 size_t l = strlen(f->nullstr);
     557           0 :                                 if (mnstr_write(fd, f->nullstr, 1, l) != (ssize_t) l)
     558           0 :                                         return TABLET_error(fd);
     559             :                         } else {
     560           0 :                                 ssize_t l = f->tostr(f->extra, buf, len, f->adt, p);
     561             : 
     562           0 :                                 if (l < 0 || mnstr_write(fd, *buf, 1, l) != l)
     563           0 :                                         return TABLET_error(fd);
     564             :                         }
     565             :                 }
     566           0 :                 if (mnstr_write(fd, f->sep, 1, f->seplen) != f->seplen)
     567           0 :                         return TABLET_error(fd);
     568             :         }
     569             :         return 0;
     570             : }
     571             : 
     572             : /*
     573             :  * Fast Load
     574             :  * To speedup the CPU intensive loading of files we have to break
     575             :  * the file into pieces and perform parallel analysis. Experimentation
     576             :  * against lineitem SF1 showed that half of the time goes into very
     577             :  * basis atom analysis (41 out of 102 B instructions).
     578             :  * Furthermore, the actual insertion into the BATs takes only
     579             :  * about 10% of the total. With multi-core processors around
     580             :  * it seems we can gain here significantly.
     581             :  *
     582             :  * The approach taken is to fork a parallel scan over the text file.
     583             :  * We assume that the blocked stream is already
     584             :  * positioned correctly at the reading position. The start and limit
     585             :  * indicates the byte range to search for tuples.
     586             :  * If start> 0 then we first skip to the next record separator.
     587             :  * If necessary we read more than 'limit' bytes to ensure parsing a complete
     588             :  * record and stop at the record boundary.
     589             :  * Beware, we should allocate Tablet descriptors for each file segment,
     590             :  * otherwise we end up with a gross concurrency control problem.
     591             :  * The resulting BATs should be glued at the final phase.
     592             :  *
     593             :  * Raw Load
     594             :  * Front-ends can bypass most of the overhead in loading the BATs
     595             :  * by preparing the corresponding files directly and replace those
     596             :  * created by e.g. the SQL frontend.
     597             :  * This strategy is only advisable for cases where we have very
     598             :  * large files >200GB and/or are created by a well debugged code.
     599             :  *
     600             :  * To experiment with this approach, the code base responds
     601             :  * on negative number of cores by dumping the data directly in BAT
     602             :  * storage format into a collections of files on disk.
     603             :  * It reports on the actions to be taken to replace BATs.
     604             :  * This technique is initially only supported for fixed-sized columns.
     605             :  * The rawmode() indicator acts as the internal switch.
     606             :  */
     607             : 
     608             : /*
     609             :  * To speed up loading ascii files we have to determine the number of blocks.
     610             :  * This depends on the number of cores available.
     611             :  * For the time being we hardwire this decision based on our own
     612             :  * platforms.
     613             :  * Furthermore, we only consider parallel load for file-based requests.
     614             :  *
     615             :  * To simplify our world, we assume a single producer process.
     616             :  */
     617             : 
     618             : static int
     619           0 : output_file_default(Tablet *as, BAT *order, stream *fd, bstream *in)
     620             : {
     621           0 :         size_t len = BUFSIZ, locallen = BUFSIZ;
     622           0 :         int res = 0;
     623           0 :         char *buf = GDKmalloc(len);
     624           0 :         char *localbuf = GDKmalloc(len);
     625           0 :         BUN p, q;
     626           0 :         oid id;
     627           0 :         BUN offset = as->offset;
     628             : 
     629           0 :         if (buf == NULL || localbuf == NULL) {
     630           0 :                 GDKfree(buf);
     631           0 :                 GDKfree(localbuf);
     632           0 :                 return -1;
     633             :         }
     634           0 :         for (q = offset + as->nr, p = offset, id = order->hseqbase + offset; p < q;
     635           0 :                  p++, id++) {
     636           0 :                 if (((p - offset) & 8191) == 8191 && bstream_getoob(in)) {
     637             :                         res = -5;
     638             :                         break;
     639             :                 }
     640           0 :                 if ((res = output_line(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs, id)) < 0) {
     641             :                         break;
     642             :                 }
     643             :         }
     644           0 :         GDKfree(localbuf);
     645           0 :         GDKfree(buf);
     646           0 :         return res;
     647             : }
     648             : 
     649             : static int
     650          24 : output_complex(Tablet *as, stream *fd, bstream *in)
     651             : {
     652          24 :         size_t len = BUFSIZ, locallen = BUFSIZ;
     653          24 :         ssize_t res = 0;
     654          24 :         char *buf = GDKmalloc(len);
     655          24 :         char *localbuf = GDKmalloc(len);
     656          24 :         BUN i = 0;
     657             : 
     658          24 :         if (buf == NULL || localbuf == NULL) {
     659           0 :                 GDKfree(buf);
     660           0 :                 GDKfree(localbuf);
     661           0 :                 return -1;
     662             :         }
     663         104 :         for (i = 0; i < as->nr; i++) {
     664          80 :                 if ((i & 8191) == 8191 && bstream_getoob(in)) {
     665             :                         res = -5;                       /* "Query aborted" */
     666             :                         break;
     667             :                 }
     668          80 :                 if ((res = output_line_complex(&buf, &len, 0, &localbuf, &locallen, as->format, as->nr_attrs)) < 0) {
     669             :                         break;
     670             :                 }
     671          80 :                 if (fd && mnstr_write(fd, buf, 1, res) != res)
     672           0 :                         return TABLET_error(fd);
     673             :         }
     674          24 :         GDKfree(localbuf);
     675          24 :         GDKfree(buf);
     676          24 :         if (res < 0)
     677           0 :                 return (int)res;
     678             :         return 0;
     679             : }
     680             : 
     681             : static int
     682      126692 : output_file_dense(Tablet *as, stream *fd, bstream *in)
     683             : {
     684      126692 :         size_t len = BUFSIZ, locallen = BUFSIZ;
     685      126692 :         int res = 0;
     686      126692 :         char *buf = GDKmalloc(len);
     687      126735 :         char *localbuf = GDKmalloc(len);
     688      126733 :         BUN i = 0;
     689             : 
     690      126733 :         if (buf == NULL || localbuf == NULL) {
     691           0 :                 GDKfree(buf);
     692           0 :                 GDKfree(localbuf);
     693           0 :                 return -1;
     694             :         }
     695     6337326 :         for (i = 0; i < as->nr; i++) {
     696     6210593 :                 if ((i & 8191) == 8191 && bstream_getoob(in)) {
     697             :                         res = -5;                       /* "Query aborted" */
     698             :                         break;
     699             :                 }
     700     6210593 :                 if ((res = output_line_dense(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs)) < 0) {
     701             :                         break;
     702             :                 }
     703             :         }
     704      126733 :         GDKfree(localbuf);
     705      126751 :         GDKfree(buf);
     706      126751 :         return res;
     707             : }
     708             : 
     709             : static int
     710           0 : output_file_ordered(Tablet *as, BAT *order, stream *fd, bstream *in)
     711             : {
     712           0 :         size_t len = BUFSIZ;
     713           0 :         int res = 0;
     714           0 :         char *buf = GDKmalloc(len);
     715           0 :         BUN p, q;
     716           0 :         BUN i = 0;
     717           0 :         BUN offset = as->offset;
     718             : 
     719           0 :         if (buf == NULL)
     720             :                 return -1;
     721           0 :         for (q = offset + as->nr, p = offset; p < q; p++, i++) {
     722           0 :                 oid h = order->hseqbase + p;
     723             : 
     724           0 :                 if (((p - offset) & 8191) == 8191 && bstream_getoob(in)) {
     725             :                         res = -5;
     726             :                         break;
     727             :                 }
     728           0 :                 if ((res = output_line_lookup(&buf, &len, as->format, fd, as->nr_attrs, h)) < 0) {
     729           0 :                         GDKfree(buf);
     730           0 :                         return res;
     731             :                 }
     732             :         }
     733           0 :         GDKfree(buf);
     734           0 :         return res;
     735             : }
     736             : 
     737             : int
     738      126722 : TABLEToutput_file(Tablet *as, BAT *order, stream *s, bstream *in)
     739             : {
     740      126722 :         oid base = oid_nil;
     741      126722 :         int ret = 0;
     742             : 
     743             :         /* only set nr if it is zero or lower (bogus) to the maximum value
     744             :          * possible (BATcount), if already set within BATcount range,
     745             :          * preserve value such that for instance SQL's reply_size still
     746             :          * works
     747             :          */
     748      126722 :         if (order) {
     749           0 :                 BUN maxnr = BATcount(order);
     750           0 :                 if (as->nr == BUN_NONE || as->nr > maxnr)
     751           0 :                         as->nr = maxnr;
     752             :         }
     753      126722 :         assert(as->nr != BUN_NONE);
     754             :         int complex = 0;
     755      471241 :         for (unsigned int i = 1; i < as->nr_attrs && !complex; i++)
     756      344519 :            complex |= (as->format[i].multiset || as->format[i].composite);
     757             : 
     758      126722 :         if (complex) {
     759           0 :                 assert(!order);
     760           0 :                 return output_complex(as, s, in);
     761             :         }
     762      126740 :         base = check_BATs(as);
     763      126696 :         if (!order || !is_oid_nil(base)) {
     764      126696 :                 if (!order || order->hseqbase == base)
     765      126696 :                         ret = output_file_dense(as, s, in);
     766             :                 else
     767           0 :                         ret = output_file_ordered(as, order, s, in);
     768             :         } else {
     769           0 :                 ret = output_file_default(as, order, s, in);
     770             :         }
     771             :         return ret;
     772             : }
     773             : 
     774             : /*
     775             :  *  Niels Nes, Martin Kersten
     776             :  *
     777             :  * Parallel bulk load for SQL
     778             :  * The COPY INTO command for SQL is heavily CPU bound, which means
     779             :  * that ideally we would like to exploit the multi-cores to do that
     780             :  * work in parallel.
     781             :  * Complicating factors are the initial record offset, the
     782             :  * possible variable length of the input, and the original sort order
     783             :  * that should preferable be maintained.
     784             :  *
     785             :  * The code below consists of a file reader, which breaks up the
     786             :  * file into chunks of distinct rows. Then multiple parallel threads
     787             :  * grab them, and break them on the field boundaries.
     788             :  * After all fields are identified this way, the columns are converted
     789             :  * and stored in the BATs.
     790             :  *
     791             :  * The threads get a reference to a private copy of the READERtask.
     792             :  * It includes a list of columns they should handle. This is a basis
     793             :  * to distributed cheap and expensive columns over threads.
     794             :  *
     795             :  * The file reader overlaps IO with updates of the BAT.
     796             :  * Also the buffer size of the block stream might be a little small for
     797             :  * this task (1MB). It has been increased to 8MB, which indeed improved.
     798             :  *
     799             :  * The work divider allocates subtasks to threads based on the
     800             :  * observed time spending so far.
     801             :  */
     802             : 
     803             : #define BREAKROW 1
     804             : #define UPDATEBAT 2
     805             : #define ENDOFCOPY 3
     806             : 
     807             : typedef struct {
     808             :         Client cntxt;
     809             :         int id;                                         /* for self reference */
     810             :         int state;                                      /* row break=1 , 2 = update bat */
     811             :         int workers;                            /* how many concurrent ones */
     812             :         int error;                                      /* error during row break */
     813             :         int next;
     814             :         int limit;
     815             :         BUN cnt, maxrow;                        /* first row in file chunk. */
     816             :         lng skip;                                       /* number of lines to be skipped */
     817             :         lng *time, wtime;                       /* time per col + time per thread */
     818             :         int rounds;                                     /* how often did we divide the work */
     819             :         bool ateof;                                     /* io control */
     820             :         bool from_stdin;
     821             :         bool escape;                            /* whether to handle \ escapes */
     822             :         bool besteffort;
     823             :         char quote;
     824             :         bstream *b;
     825             :         stream *out;
     826             :         MT_Id tid;
     827             :         MT_Sema producer;                       /* reader waits for call */
     828             :         MT_Sema consumer;                       /* reader waits for call */
     829             :         MT_Sema sema;                           /* threads wait for work , negative next implies exit */
     830             :         MT_Sema reply;                          /* let reader continue */
     831             :         Tablet *as;
     832             :         char *errbuf;
     833             :         const char *csep, *rsep;
     834             :         size_t seplen, rseplen;
     835             : 
     836             :         char *base[MAXBUFFERS], *input[MAXBUFFERS];     /* buffers for row splitter and tokenizer */
     837             :         size_t rowlimit[MAXBUFFERS];    /* determines maximal record length buffer */
     838             :         char **rows[MAXBUFFERS];
     839             :         lng *startlineno[MAXBUFFERS];
     840             :         int top[MAXBUFFERS];            /* number of rows in this buffer */
     841             :         int cur;                                        /* current buffer used by splitter and update threads */
     842             : 
     843             :         int *cols;                                      /* columns to handle */
     844             :         char ***fields;
     845             :         bte *rowerror;
     846             :         int errorcnt;
     847             :         bool aborted;
     848             :         bool set_qry_ctx;
     849             : } READERtask;
     850             : 
     851             : /* returns TRUE if there is/might be more */
     852             : static bool
     853      102768 : tablet_read_more(READERtask *task)
     854             : {
     855      102768 :         bstream *in = task->b;
     856      102768 :         stream *out = task->out;
     857      102768 :         size_t n =  task->b->size;
     858      102768 :         if (out) {
     859      101136 :                 do {
     860             :                         /* query is not finished ask for more */
     861             :                         /* we need more query text */
     862      101136 :                         if (bstream_next(in) < 0) {
     863           0 :                                 if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) {
     864           0 :                                         task->aborted = true;
     865           0 :                                         mnstr_clearerr(in->s);
     866             :                                 }
     867           0 :                                 return false;
     868             :                         }
     869      101136 :                         if (in->eof) {
     870      101134 :                                 if (bstream_getoob(in)) {
     871           0 :                                         task->aborted = true;
     872           0 :                                         return false;
     873             :                                 }
     874      101134 :                                 if (mnstr_write(out, PROMPT2, sizeof(PROMPT2) - 1, 1) == 1)
     875      101134 :                                         mnstr_flush(out, MNSTR_FLUSH_DATA);
     876      101134 :                                 in->eof = false;
     877             :                                 /* we need more query text */
     878      101134 :                                 if (bstream_next(in) < 0) {
     879           0 :                                         if (mnstr_errnr(in->s) == MNSTR_INTERRUPT) {
     880           0 :                                                 task->aborted = true;
     881           0 :                                                 mnstr_clearerr(in->s);
     882             :                                         }
     883           0 :                                         return false;
     884             :                                 }
     885      101134 :                                 if (in->eof)
     886             :                                         return false;
     887             :                         }
     888      101134 :                 } while (in->len <= in->pos);
     889        1632 :         } else if (bstream_read(in, n) <= 0) {
     890             :                 return false;
     891             :         }
     892             :         return true;
     893             : }
     894             : 
     895             : /* note, the column value that is passed here is the 0 based value; the
     896             :  * lineno value on the other hand is 1 based */
     897             : static void
     898         151 : tablet_error(READERtask *task, lng idx, lng lineno, int col, const char *msg,
     899             :                          const char *fcn)
     900             : {
     901         151 :         assert(is_int_nil(col) || col >= 0);
     902         151 :         assert(is_lng_nil(lineno) || lineno >= 1);
     903         151 :         MT_lock_set(&errorlock);
     904         151 :         if (task->cntxt->error_row != NULL
     905         151 :                 && (BUNappend(task->cntxt->error_row, &lineno, false) != GDK_SUCCEED
     906         151 :                         || BUNappend(task->cntxt->error_fld, &(int) { col + 1 },
     907             :                                                  false) != GDK_SUCCEED
     908         151 :                         || BUNappend(task->cntxt->error_msg, msg, false) != GDK_SUCCEED
     909         151 :                         || BUNappend(task->cntxt->error_input, fcn,
     910             :                                                  false) != GDK_SUCCEED)) {
     911           0 :                 task->besteffort = false;
     912             :         }
     913         151 :         if (!is_lng_nil(idx) && task->rowerror && idx < task->limit)
     914         151 :                 task->rowerror[idx]++;
     915         151 :         if (task->as->error == NULL) {
     916          68 :                 const char *colnam = is_int_nil(col) || col < 0
     917          34 :                                 || (BUN) col >= task->as->nr_attrs ? NULL : task->as->format[col].name;
     918          34 :                 if (msg == NULL) {
     919           0 :                         task->besteffort = false;
     920          34 :                 } else if (!is_lng_nil(lineno)) {
     921          34 :                         if (!is_int_nil(col)) {
     922          32 :                                 if (colnam)
     923          32 :                                         task->as->error = createException(MAL, "sql.copy_from",
     924             :                                                                                                           "line " LLFMT ": column %d %s: %s",
     925             :                                                                                                           lineno, col + 1, colnam, msg);
     926             :                                 else
     927           0 :                                         task->as->error = createException(MAL, "sql.copy_from",
     928             :                                                                                                           "line " LLFMT ": column %d: %s",
     929             :                                                                                                           lineno, col + 1, msg);
     930             :                         } else {
     931           2 :                                 task->as->error = createException(MAL, "sql.copy_from",
     932             :                                                                                                   "line " LLFMT ": %s", lineno, msg);
     933             :                         }
     934             :                 } else {
     935           0 :                         if (!is_int_nil(col)) {
     936           0 :                                 if (colnam)
     937           0 :                                         task->as->error = createException(MAL, "sql.copy_from",
     938             :                                                                                                           "column %d %s: %s", col + 1, colnam,
     939             :                                                                                                           msg);
     940             :                                 else
     941           0 :                                         task->as->error = createException(MAL, "sql.copy_from",
     942             :                                                                                                           "column %d: %s", col + 1, msg);
     943             :                         } else {
     944           0 :                                 task->as->error = createException(MAL, "sql.copy_from", "%s", msg);
     945             :                         }
     946             :                 }
     947             :         }
     948         151 :         task->errorcnt++;
     949         151 :         MT_lock_unset(&errorlock);
     950         151 : }
     951             : 
     952             : /*
     953             :  * The row is broken into pieces directly on their field separators. It assumes that we have
     954             :  * the record in the cache already, so we can do most work quickly.
     955             :  * Furthermore, it assume a uniform (SQL) pattern, without whitespace skipping, but with quote and separator.
     956             :  */
     957             : 
     958             : static size_t
     959         566 : mystrlen(const char *s)
     960             : {
     961             :         /* Calculate and return the space that is needed for the function
     962             :          * mycpstr below to do its work. */
     963         566 :         size_t len = 0;
     964         566 :         const char *s0 = s;
     965             : 
     966       44715 :         while (*s) {
     967       44149 :                 if ((*s & 0x80) == 0) {
     968             :                         ;
     969           6 :                 } else if ((*s & 0xC0) == 0x80) {
     970             :                         /* continuation byte */
     971           0 :                         len += 3;
     972           6 :                 } else if ((*s & 0xE0) == 0xC0) {
     973             :                         /* two-byte sequence */
     974           6 :                         if ((s[1] & 0xC0) != 0x80)
     975           0 :                                 len += 3;
     976             :                         else
     977           6 :                                 s += 2;
     978           0 :                 } else if ((*s & 0xF0) == 0xE0) {
     979             :                         /* three-byte sequence */
     980           0 :                         if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
     981           0 :                                 len += 3;
     982             :                         else
     983           0 :                                 s += 3;
     984           0 :                 } else if ((*s & 0xF8) == 0xF0) {
     985             :                         /* four-byte sequence */
     986           0 :                         if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
     987           0 :                                 || (s[3] & 0xC0) != 0x80)
     988           0 :                                 len += 3;
     989             :                         else
     990           0 :                                 s += 4;
     991             :                 } else {
     992             :                         /* not a valid start byte */
     993           0 :                         len += 3;
     994             :                 }
     995       44149 :                 s++;
     996             :         }
     997         566 :         len += s - s0;
     998         566 :         return len;
     999             : }
    1000             : 
    1001             : static char *
    1002         855 : mycpstr(char *t, const char *s)
    1003             : {
    1004             :         /* Copy the string pointed to by s into the buffer pointed to by
    1005             :          * t, and return a pointer to the NULL byte at the end.  During
    1006             :          * the copy we translate incorrect UTF-8 sequences to escapes
    1007             :          * looking like <XX> where XX is the hexadecimal representation of
    1008             :          * the incorrect byte.  The buffer t needs to be large enough to
    1009             :          * hold the result, but the correct length can be calculated by
    1010             :          * the function mystrlen above.*/
    1011       45299 :         while (*s) {
    1012       44444 :                 if ((*s & 0x80) == 0) {
    1013       44438 :                         *t++ = *s++;
    1014           6 :                 } else if ((*s & 0xC0) == 0x80) {
    1015           0 :                         t += sprintf(t, "<%02X>", (uint8_t) * s++);
    1016           6 :                 } else if ((*s & 0xE0) == 0xC0) {
    1017             :                         /* two-byte sequence */
    1018           6 :                         if ((s[1] & 0xC0) != 0x80)
    1019           0 :                                 t += sprintf(t, "<%02X>", (uint8_t) * s++);
    1020             :                         else {
    1021           6 :                                 *t++ = *s++;
    1022           6 :                                 *t++ = *s++;
    1023             :                         }
    1024           0 :                 } else if ((*s & 0xF0) == 0xE0) {
    1025             :                         /* three-byte sequence */
    1026           0 :                         if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
    1027           0 :                                 t += sprintf(t, "<%02X>", (uint8_t) * s++);
    1028             :                         else {
    1029           0 :                                 *t++ = *s++;
    1030           0 :                                 *t++ = *s++;
    1031           0 :                                 *t++ = *s++;
    1032             :                         }
    1033           0 :                 } else if ((*s & 0xF8) == 0xF0) {
    1034             :                         /* four-byte sequence */
    1035           0 :                         if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80
    1036           0 :                                 || (s[3] & 0xC0) != 0x80)
    1037           0 :                                 t += sprintf(t, "<%02X>", (uint8_t) * s++);
    1038             :                         else {
    1039           0 :                                 *t++ = *s++;
    1040           0 :                                 *t++ = *s++;
    1041           0 :                                 *t++ = *s++;
    1042           0 :                                 *t++ = *s++;
    1043             :                         }
    1044             :                 } else {
    1045             :                         /* not a valid start byte */
    1046           0 :                         t += sprintf(t, "<%02X>", (uint8_t) * s++);
    1047             :                 }
    1048             :         }
    1049         855 :         *t = 0;
    1050         855 :         return t;
    1051             : }
    1052             : 
    1053             : static str
    1054         149 : SQLload_error(READERtask *task, lng idx, BUN attrs)
    1055             : {
    1056         149 :         str line;
    1057         149 :         char *s;
    1058         149 :         size_t sz = 0;
    1059         149 :         BUN i;
    1060             : 
    1061         587 :         for (i = 0; i < attrs; i++) {
    1062         438 :                 if (task->fields[i][idx])
    1063         428 :                         sz += mystrlen(task->fields[i][idx]);
    1064         438 :                 sz += task->seplen;
    1065             :         }
    1066             : 
    1067         149 :         s = line = GDKmalloc(sz + task->rseplen + 1);
    1068         149 :         if (line == NULL) {
    1069           0 :                 tablet_error(task, idx, lng_nil, int_nil, "SQLload malloc error",
    1070             :                                          "SQLload_error");
    1071           0 :                 return NULL;
    1072             :         }
    1073         587 :         for (i = 0; i < attrs; i++) {
    1074         438 :                 if (task->fields[i][idx])
    1075         428 :                         s = mycpstr(s, task->fields[i][idx]);
    1076         438 :                 if (i < attrs - 1)
    1077         289 :                         s = mycpstr(s, task->csep);
    1078             :         }
    1079         149 :         strcpy(s, task->rsep);
    1080         149 :         return line;
    1081             : }
    1082             : 
    1083             : /*
    1084             :  * The parsing of the individual values is straightforward. If the value represents
    1085             :  * the null-replacement string then we grab the underlying nil.
    1086             :  * If the string starts with the quote identified from SQL, we locate the tail
    1087             :  * and interpret the body.
    1088             :  *
    1089             :  * If inserting fails, we return -1; if the value cannot be parsed, we
    1090             :  * return -1 if besteffort is not set, otherwise we return 0, but in
    1091             :  * either case an entry is added to the error table.
    1092             :  */
    1093             : static inline int
    1094   275770996 : SQLinsert_val(READERtask *task, int col, int idx)
    1095             : {
    1096   275770996 :         Column *fmt = task->as->format + col;
    1097   275770996 :         const void *adt;
    1098   275770996 :         char buf[BUFSIZ];
    1099   275770996 :         char *s = task->fields[col][idx];
    1100   275770996 :         char *err = NULL;
    1101   275770996 :         int ret = 0;
    1102             : 
    1103             :         /* include testing on the terminating null byte !! */
    1104   275770996 :         if (s == NULL) {
    1105     6364758 :                 adt = fmt->nildata;
    1106     6364758 :                 fmt->c->tnonil = false;
    1107             :         } else {
    1108   269406238 :                 if (task->escape) {
    1109   269176238 :                         size_t slen = strlen(s) + 1;
    1110   269176238 :                         char *data = slen <= sizeof(buf) ? buf : GDKmalloc(strlen(s) + 1);
    1111          18 :                         if (data == NULL
    1112   302502308 :                                 || GDKstrFromStr((unsigned char *) data, (unsigned char *) s,
    1113   269176238 :                                                                  strlen(s), '\0') < 0)
    1114             :                                 adt = NULL;
    1115             :                         else
    1116   302502306 :                                 adt = fmt->frstr(fmt, fmt->adt, data);
    1117   309109493 :                         if (data != buf)
    1118          18 :                                 GDKfree(data);
    1119             :                 } else
    1120      230000 :                         adt = fmt->frstr(fmt, fmt->adt, s);
    1121             :         }
    1122             : 
    1123   315704290 :         lng row = task->cnt + idx + 1;
    1124   315704290 :         if (adt == NULL) {
    1125         138 :                 if (task->rowerror) {
    1126         138 :                         err = SQLload_error(task, idx, task->as->nr_attrs);
    1127         138 :                         if (s) {
    1128         138 :                                 size_t slen = mystrlen(s);
    1129         138 :                                 char *scpy = GDKmalloc(slen + 1);
    1130         138 :                                 if (scpy == NULL) {
    1131           0 :                                         tablet_error(task, idx, row, col,
    1132             :                                                                  SQLSTATE(HY013) MAL_MALLOC_FAIL, err);
    1133           0 :                                         task->besteffort = false;    /* no longer best effort */
    1134           0 :                                         GDKfree(err);
    1135           0 :                                         return -1;
    1136             :                                 }
    1137         138 :                                 mycpstr(scpy, s);
    1138         138 :                                 s = scpy;
    1139             :                         }
    1140         138 :                         snprintf(buf, sizeof(buf), "'%s' expected%s%s%s", fmt->type,
    1141             :                                          s ? " in '" : "", s ? s : "", s ? "'" : "");
    1142         138 :                         GDKfree(s);
    1143         138 :                         tablet_error(task, idx, row, col, buf, err);
    1144         138 :                         GDKfree(err);
    1145         138 :                         if (!task->besteffort)
    1146             :                                 return -1;
    1147             :                 }
    1148         117 :                 ret = -!task->besteffort;    /* yep, two unary operators ;-) */
    1149             :                 /* replace it with a nil */
    1150         117 :                 adt = fmt->nildata;
    1151         117 :                 fmt->c->tnonil = false;
    1152             :         }
    1153   315704269 :         if (bunfastapp(fmt->c, adt) == GDK_SUCCEED)
    1154             :                 return ret;
    1155             : 
    1156             :         /* failure */
    1157           0 :         if (task->rowerror) {
    1158           0 :                 char *msg = GDKerrbuf;
    1159           0 :                 err = SQLload_error(task, idx, task->as->nr_attrs);
    1160           0 :                 tablet_error(task, idx, row, col, msg
    1161           0 :                                          && *msg ? msg : "insert failed", err);
    1162           0 :                 GDKfree(err);
    1163             :         }
    1164           0 :         task->besteffort = false;    /* no longer best effort */
    1165           0 :         return -1;
    1166             : }
    1167             : 
    1168             : static int
    1169      325739 : SQLworker_column(READERtask *task, int col)
    1170             : {
    1171      325739 :         int i;
    1172      325739 :         Column *fmt = task->as->format;
    1173             : 
    1174      325739 :         if (fmt[col].c == NULL)
    1175             :                 return 0;
    1176             : 
    1177             :         /* watch out for concurrent threads */
    1178      325733 :         MT_lock_set(&mal_copyLock);
    1179      327456 :         if (!fmt[col].skip
    1180      327456 :                 && BATcapacity(fmt[col].c) < BATcount(fmt[col].c) + task->next) {
    1181         289 :                 if (BATextend(fmt[col].c, BATgrows(fmt[col].c) + task->limit) != GDK_SUCCEED) {
    1182           0 :                         tablet_error(task, lng_nil, lng_nil, col,
    1183             :                                                  "Failed to extend the BAT\n", "SQLworker_column");
    1184           0 :                         MT_lock_unset(&mal_copyLock);
    1185           0 :                         return -1;
    1186             :                 }
    1187             :         }
    1188      327456 :         MT_lock_unset(&mal_copyLock);
    1189             : 
    1190   281345446 :         for (i = 0; i < task->top[task->cur]; i++) {
    1191   280691793 :                 if (!fmt[col].skip && SQLinsert_val(task, col, i) < 0) {
    1192          21 :                         BATsetcount(fmt[col].c, BATcount(fmt[col].c));
    1193          21 :                         return -1;
    1194             :                 }
    1195             :         }
    1196      326197 :         BATsetcount(fmt[col].c, BATcount(fmt[col].c));
    1197             : 
    1198      326197 :         return 0;
    1199             : }
    1200             : 
    1201             : /*
    1202             :  * The rows are broken on the column separator. Any error is shown and reflected with
    1203             :  * setting the reference of the offending row fields to NULL.
    1204             :  * This allows the loading to continue, skipping the minimal number of rows.
    1205             :  * The details about the locations can be inspected from the error table.
    1206             :  * We also trim the quotes around strings.
    1207             :  */
    1208             : static int
    1209   135273489 : SQLload_parse_row(READERtask *task, int idx)
    1210             : {
    1211   135273489 :         BUN i;
    1212   135273489 :         char errmsg[BUFSIZ];
    1213   135273489 :         char ch = *task->csep;
    1214   135273489 :         char *row = task->rows[task->cur][idx];
    1215   135273489 :         lng startlineno = task->startlineno[task->cur][idx];
    1216   135273489 :         Tablet *as = task->as;
    1217   135273489 :         Column *fmt = as->format;
    1218   135273489 :         bool error = false;
    1219   135273489 :         str errline = NULL;
    1220             : 
    1221   135273489 :         assert(idx < task->top[task->cur]);
    1222   135273489 :         assert(row);
    1223   135273489 :         errmsg[0] = 0;
    1224             : 
    1225   135273489 :         if (task->quote || task->seplen != 1) {
    1226    11437216 :                 for (i = 0; i < as->nr_attrs; i++) {
    1227     8673546 :                         bool quote = false;
    1228     8673546 :                         task->fields[i][idx] = row;
    1229             :                         /* recognize fields starting with a quote, keep them */
    1230     8673546 :                         if (*row && *row == task->quote) {
    1231     4442535 :                                 quote = true;
    1232     4442535 :                                 task->fields[i][idx] = row + 1;
    1233     4442535 :                                 row = tablet_skip_string(row + 1, task->quote, task->escape);
    1234             : 
    1235     4644337 :                                 if (!row) {
    1236           0 :                                         errline = SQLload_error(task, idx, i + 1);
    1237           0 :                                         snprintf(errmsg, BUFSIZ, "Quote (%c) missing", task->quote);
    1238           0 :                                         tablet_error(task, idx, startlineno, (int) i, errmsg,
    1239             :                                                                  errline);
    1240           0 :                                         GDKfree(errline);
    1241           0 :                                         error = true;
    1242           0 :                                         goto errors1;
    1243             :                                 } else
    1244     4644337 :                                         *row++ = 0;
    1245             :                         }
    1246             : 
    1247             :                         /* eat away the column separator */
    1248    50368888 :                         for (; *row; row++)
    1249    49592171 :                                 if (*row == '\\' && task->escape) {
    1250           2 :                                         if (row[1])
    1251           2 :                                                 row++;
    1252    49592169 :                                 } else if (*row == ch
    1253     8098631 :                                                    && (task->seplen == 1
    1254           4 :                                                            || strncmp(row, task->csep,
    1255             :                                                                                   task->seplen) == 0)) {
    1256     8098631 :                                         *row = 0;
    1257     8098631 :                                         row += task->seplen;
    1258     8098631 :                                         goto endoffieldcheck;
    1259             :                                 }
    1260             : 
    1261             :                         /* not enough fields */
    1262      776717 :                         if (i < as->nr_attrs - 1) {
    1263           1 :                                 errline = SQLload_error(task, idx, i + 1);
    1264             :                                 /* it's the next value that is missing */
    1265           1 :                                 tablet_error(task, idx, startlineno, (int) i + 1,
    1266             :                                                          "Column value missing", errline);
    1267           1 :                                 GDKfree(errline);
    1268           1 :                                 error = true;
    1269           1 :   errors1:
    1270             :                                 /* we save all errors detected  as NULL values */
    1271           4 :                                 for (; i < as->nr_attrs; i++)
    1272           3 :                                         task->fields[i][idx] = NULL;
    1273           1 :                                 i--;
    1274             :                         }
    1275      776716 :   endoffieldcheck:
    1276     8875348 :                         ;
    1277             :                         /* check for user defined NULL string */
    1278     8875348 :                         if ((!quote || !fmt->null_length) && fmt->nullstr
    1279     7378671 :                                 && task->fields[i][idx]
    1280     7378670 :                                 && GDKstrcasecmp(task->fields[i][idx], fmt->nullstr) == 0)
    1281     1960415 :                                 task->fields[i][idx] = 0;
    1282             :                 }
    1283             :         } else {
    1284             :                 assert(!task->quote);
    1285             :                 assert(task->seplen == 1);
    1286   326143620 :                 for (i = 0; i < as->nr_attrs; i++) {
    1287   195413124 :                         task->fields[i][idx] = row;
    1288             : 
    1289             :                         /* eat away the column separator */
    1290  1885916832 :                         for (; *row; row++)
    1291  1754973294 :                                 if (*row == '\\' && task->escape) {
    1292         382 :                                         if (row[1])
    1293         382 :                                                 row++;
    1294  1754972912 :                                 } else if (*row == ch) {
    1295    64469586 :                                         *row = 0;
    1296    64469586 :                                         row++;
    1297    64469586 :                                         goto endoffield2;
    1298             :                                 }
    1299             : 
    1300             :                         /* not enough fields */
    1301   130943538 :                         if (i < as->nr_attrs - 1) {
    1302           9 :                                 errline = SQLload_error(task, idx, i + 1);
    1303             :                                 /* it's the next value that is missing */
    1304           9 :                                 tablet_error(task, idx, startlineno, (int) i + 1,
    1305             :                                                          "Column value missing", errline);
    1306           9 :                                 GDKfree(errline);
    1307           9 :                                 error = true;
    1308             :                                 /* we save all errors detected */
    1309          43 :                                 for (; i < as->nr_attrs; i++)
    1310          25 :                                         task->fields[i][idx] = NULL;
    1311           9 :                                 i--;
    1312             :                         }
    1313   130943529 :   endoffield2:
    1314   195413124 :                         ;
    1315             :                         /* check for user defined NULL string */
    1316   195413124 :                         if (fmt->nullstr && task->fields[i][idx]
    1317   181136481 :                                 && GDKstrcasecmp(task->fields[i][idx], fmt->nullstr) == 0) {
    1318     3673724 :                                 task->fields[i][idx] = 0;
    1319             :                         }
    1320             :                 }
    1321             :         }
    1322             :         /* check for too many values as well */
    1323   133494166 :         if (row && *row && i == as->nr_attrs) {
    1324           1 :                 errline = SQLload_error(task, idx, task->as->nr_attrs);
    1325           1 :                 snprintf(errmsg, BUFSIZ, "Leftover data '%s'", row);
    1326           1 :                 tablet_error(task, idx, startlineno, (int) i, errmsg, errline);
    1327           1 :                 GDKfree(errline);
    1328           1 :                 error = true;
    1329             :         }
    1330   133494166 :         return error ? -1 : 0;
    1331             : }
    1332             : 
    1333             : static void
    1334        1495 : SQLworker(void *arg)
    1335             : {
    1336        1495 :         READERtask *task = (READERtask *) arg;
    1337        1495 :         unsigned int i;
    1338        1495 :         int j, piece;
    1339        1495 :         lng t0;
    1340             : 
    1341        1495 :         GDKsetbuf(GDKmalloc(GDKMAXERRLEN));     /* where to leave errors */
    1342        1497 :         GDKclrerr();
    1343        1497 :         task->errbuf = GDKerrbuf;
    1344        1497 :         MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
    1345             : 
    1346        1497 :         MT_sema_down(&task->sema);
    1347      613090 :         while (task->top[task->cur] >= 0) {
    1348             :                 /* stage one, break the rows spread the work over the workers */
    1349      613090 :                 switch (task->state) {
    1350      305881 :                 case BREAKROW:
    1351      305881 :                         t0 = GDKusec();
    1352      305760 :                         piece = (task->top[task->cur] + task->workers) / task->workers;
    1353             : 
    1354      305760 :                         for (j = piece * task->id;
    1355   134017125 :                                  j < task->top[task->cur] && j < piece * (task->id + 1); j++)
    1356   133712251 :                                 if (task->rows[task->cur][j]) {
    1357   133712251 :                                         if (SQLload_parse_row(task, j) < 0) {
    1358          11 :                                                 task->errorcnt++;
    1359             :                                                 // early break unless best effort
    1360          11 :                                                 if (!task->besteffort) {
    1361           8 :                                                         for (j++;
    1362        9316 :                                                                  j < task->top[task->cur]
    1363        9316 :                                                                  && j < piece * (task->id + 1); j++)
    1364       37233 :                                                                 for (i = 0; i < task->as->nr_attrs; i++)
    1365       27925 :                                                                         task->fields[i][j] = NULL;
    1366             :                                                         break;
    1367             :                                                 }
    1368             :                                         }
    1369             :                                 }
    1370      304882 :                         task->wtime = GDKusec() - t0;
    1371      305815 :                         break;
    1372      305727 :                 case UPDATEBAT:
    1373      305727 :                         if (!task->besteffort && task->errorcnt)
    1374             :                                 break;
    1375             :                         /* stage two, updating the BATs */
    1376     1297605 :                         for (i = 0; i < task->as->nr_attrs; i++)
    1377      991645 :                                 if (task->cols[i]) {
    1378      325961 :                                         t0 = GDKusec();
    1379      325282 :                                         if (SQLworker_column(task, task->cols[i] - 1) < 0)
    1380             :                                                 break;
    1381      326028 :                                         t0 = GDKusec() - t0;
    1382      326132 :                                         task->time[i] += t0;
    1383      326132 :                                         task->wtime += t0;
    1384             :                                 }
    1385             :                         break;
    1386        1482 :                 case ENDOFCOPY:
    1387        1482 :                         MT_sema_up(&task->reply);
    1388        1485 :                         goto do_return;
    1389             :                 }
    1390      611734 :                 MT_sema_up(&task->reply);
    1391     1222482 :                 MT_sema_down(&task->sema);
    1392             :         }
    1393           0 :         MT_sema_up(&task->reply);
    1394             : 
    1395        1485 :   do_return:
    1396        1485 :         GDKfree(GDKerrbuf);
    1397        1491 :         GDKsetbuf(NULL);
    1398        1480 :         MT_thread_set_qry_ctx(NULL);
    1399        1480 : }
    1400             : 
    1401             : static void
    1402      103364 : SQLworkdivider(READERtask *task, READERtask *ptask, int nr_attrs, int threads)
    1403             : {
    1404      103364 :         int i, j, mi;
    1405      103364 :         lng loc[MAXWORKERS];
    1406             : 
    1407             :         /* after a few rounds we stick to the work assignment */
    1408      103364 :         if (task->rounds > 8)
    1409      103225 :                 return;
    1410             :         /* simple round robin the first time */
    1411        2757 :         if (threads == 1 || task->rounds++ == 0) {
    1412       16461 :                 for (i = j = 0; i < nr_attrs; i++, j++)
    1413       13843 :                         ptask[j % threads].cols[i] = task->cols[i];
    1414             :                 return;
    1415             :         }
    1416         139 :         memset(loc, 0, sizeof(loc));
    1417             :         /* use of load directives */
    1418        2054 :         for (i = 0; i < nr_attrs; i++)
    1419       14624 :                 for (j = 0; j < threads; j++)
    1420       12709 :                         ptask[j].cols[i] = 0;
    1421             : 
    1422             :         /* now allocate the work to the threads */
    1423        2054 :         for (i = 0; i < nr_attrs; i++, j++) {
    1424             :                 mi = 0;
    1425       12709 :                 for (j = 1; j < threads; j++)
    1426       10794 :                         if (loc[j] < loc[mi])
    1427        3098 :                                 mi = j;
    1428             : 
    1429        1915 :                 ptask[mi].cols[i] = task->cols[i];
    1430        1915 :                 loc[mi] += task->time[i];
    1431             :         }
    1432             :         /* reset the timer */
    1433        2054 :         for (i = 0; i < nr_attrs; i++, j++)
    1434        1915 :                 task->time[i] = 0;
    1435             : }
    1436             : 
    1437             : /*
    1438             :  * Reading is handled by a separate task as a preparation for more parallelism.
    1439             :  * A buffer is filled with proper rows.
    1440             :  * If we are reading from a file then a double buffering scheme ia activated.
    1441             :  * Reading from the console (stdin) remains single buffered only.
    1442             :  * If we end up with unfinished records, then the rowlimit will terminate the process.
    1443             :  */
    1444             : 
    1445             : typedef unsigned char (*dfa_t)[256];
    1446             : 
    1447             : static dfa_t
    1448        1128 : mkdfa(const unsigned char *sep, size_t seplen)
    1449             : {
    1450        1128 :         dfa_t dfa;
    1451        1128 :         size_t i, j, k;
    1452             : 
    1453        1128 :         dfa = GDKzalloc(seplen * sizeof(*dfa));
    1454        1128 :         if (dfa == NULL)
    1455             :                 return NULL;
    1456             :         /* Each character in the separator string advances the state by
    1457             :          * one.  If state reaches seplen, the separator was recognized.
    1458             :          *
    1459             :          * The first loop and the nested loop make sure that if in any
    1460             :          * state we encounter an invalid character, but part of what we've
    1461             :          * matched so far is a prefix of the separator, we go to the
    1462             :          * appropriate state. */
    1463        2278 :         for (i = 0; i < seplen; i++)
    1464        1150 :                 dfa[i][sep[0]] = 1;
    1465        2278 :         for (j = 0; j < seplen; j++) {
    1466        1150 :                 dfa[j][sep[j]] = (unsigned char) (j + 1);
    1467        1172 :                 for (k = 0; k < j; k++) {
    1468          44 :                         for (i = 0; i < j - k; i++)
    1469          22 :                                 if (sep[k + i] != sep[i])
    1470             :                                         break;
    1471          22 :                         if (i == j - k && dfa[j][sep[i]] <= i)
    1472           0 :                                 dfa[j][sep[i]] = (unsigned char) (i + 1);
    1473             :                 }
    1474             :         }
    1475             :         return dfa;
    1476             : }
    1477             : 
    1478             : #ifdef __has_builtin
    1479             : #if __has_builtin(__builtin_expect)
    1480             : /* __builtin_expect returns its first argument; it is expected to be
    1481             :  * equal to the second argument */
    1482             : #define unlikely(expr)  __builtin_expect((expr) != 0, 0)
    1483             : #define likely(expr)    __builtin_expect((expr) != 0, 1)
    1484             : #endif
    1485             : #endif
    1486             : #ifndef unlikely
    1487             : #ifdef _MSC_VER
    1488             : #define unlikely(expr)  (__assume(!(expr)), (expr))
    1489             : #define likely(expr)    (__assume((expr)), (expr))
    1490             : #else
    1491             : #define unlikely(expr)  (expr)
    1492             : #define likely(expr)    (expr)
    1493             : #endif
    1494             : #endif
    1495             : 
    1496             : static void
    1497        1128 : SQLproducer(void *p)
    1498             : {
    1499        1128 :         READERtask *task = (READERtask *) p;
    1500        1128 :         bool consoleinput = false;
    1501        1128 :         int cur = 0;                            // buffer being filled
    1502        1128 :         bool blocked[MAXBUFFERS] = { false };
    1503        1128 :         bool ateof[MAXBUFFERS] = { false };
    1504        1128 :         BUN cnt = 0, bufcnt[MAXBUFFERS] = { 0 };
    1505        1128 :         char *end = NULL, *e = NULL, *s = NULL, *base;
    1506        1128 :         const char *rsep = task->rsep;
    1507        1128 :         size_t rseplen = strlen(rsep), partial = 0;
    1508        1128 :         char quote = task->quote;
    1509        1128 :         dfa_t rdfa;
    1510        1128 :         lng rowno = 0;
    1511        1128 :         lng lineno = 1;
    1512        1128 :         lng startlineno = 1;
    1513        1128 :         int more = 0;
    1514             : 
    1515        1128 :         MT_sema_down(&task->producer);
    1516        1128 :         if (task->id < 0) {
    1517             :                 return;
    1518             :         }
    1519             : 
    1520        1128 :         MT_thread_set_qry_ctx(task->set_qry_ctx ? &task->cntxt->qryctx : NULL);
    1521        1128 :         rdfa = mkdfa((const unsigned char *) rsep, rseplen);
    1522        1128 :         if (rdfa == NULL) {
    1523           0 :                 tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate memory",
    1524             :                                          "");
    1525           0 :                 ateof[cur] = true;
    1526           0 :                 goto reportlackofinput;
    1527             :         }
    1528             : 
    1529             : /*      TRC_DEBUG(MAL_SERVER, "SQLproducer started size '%zu' and len '%zu'\n", task->b->size, task->b->len);*/
    1530             : 
    1531        1128 :         base = end = s = task->input[cur];
    1532        1128 :         *s = 0;
    1533        1128 :         task->cur = cur;
    1534        1128 :         if (task->as->filename == NULL) {
    1535         787 :                 consoleinput = true;
    1536         787 :                 goto parseSTDIN;
    1537             :         }
    1538      205195 :         for (;;) {
    1539      102768 :                 startlineno = lineno;
    1540      102768 :                 ateof[cur] = !tablet_read_more(task);
    1541             : 
    1542             :                 // we may be reading from standard input and may be out of input
    1543             :                 // warn the consumers
    1544      102768 :                 if (task->aborted || ((lineno & 8191) == 0 && bstream_getoob(task->cntxt->fdin))) {
    1545           0 :                         tablet_error(task, rowno, lineno, int_nil,
    1546             :                                                  "problem reported by client", s);
    1547           0 :                         ateof[cur] = true;
    1548           0 :                         goto reportlackofinput;
    1549             :                 }
    1550             : 
    1551      102768 :                 if (ateof[cur] && partial) {
    1552           1 :                         if (unlikely(partial)) {
    1553           1 :                                 tablet_error(task, rowno, lineno, int_nil,
    1554             :                                                          "incomplete record at end of file", s);
    1555           1 :                                 task->b->pos += partial;
    1556             :                         }
    1557           1 :                         goto reportlackofinput;
    1558             :                 }
    1559             : 
    1560      102767 :                 if (task->errbuf && task->errbuf[0]) {
    1561           0 :                         if (unlikely(GDKerrbuf && GDKerrbuf[0])) {
    1562           0 :                                 tablet_error(task, rowno, lineno, int_nil, GDKerrbuf,
    1563             :                                                          "SQLload_file");
    1564             : /*                              TRC_DEBUG(MAL_SERVER, "Bailout on SQLload\n");*/
    1565           0 :                                 ateof[cur] = true;
    1566           0 :                                 break;
    1567             :                         }
    1568             :                 }
    1569             : 
    1570      102767 :   parseSTDIN:
    1571             : 
    1572             :                 /* copy the stream buffer into the input buffer, which is guaranteed larger, but still limited */
    1573      103554 :                 partial = 0;
    1574      103554 :                 task->top[cur] = 0;
    1575      103554 :                 s = task->input[cur];
    1576      103554 :                 base = end;
    1577             :                 /* avoid too long records */
    1578      103554 :                 if (unlikely(end - s + task->b->len - task->b->pos >= task->rowlimit[cur])) {
    1579             :                         /* the input buffer should be extended, but 'base' is not shared
    1580             :                            between the threads, which we can not now update.
    1581             :                            Mimic an ateof instead; */
    1582           0 :                         tablet_error(task, rowno, lineno, int_nil, "record too long", "");
    1583           0 :                         ateof[cur] = true;
    1584             : /*                      TRC_DEBUG(MAL_SERVER, "Bailout on SQLload confronted with too large record\n");*/
    1585           0 :                         goto reportlackofinput;
    1586             :                 }
    1587      103554 :                 memcpy(end, task->b->buf + task->b->pos, task->b->len - task->b->pos);
    1588      103554 :                 end = end + task->b->len - task->b->pos;
    1589      103554 :                 *end = '\0';                    /* this is safe, as the stream ensures an extra byte */
    1590             :                 /* Note that we rescan from the start of a record (the last
    1591             :                  * partial buffer from the previous iteration), even if in the
    1592             :                  * previous iteration we have already established that there
    1593             :                  * is no record separator in the first, perhaps significant,
    1594             :                  * part of the buffer. This is because if the record separator
    1595             :                  * is longer than one byte, it is too complex (i.e. would
    1596             :                  * require more state) to be sure what the state of the quote
    1597             :                  * status is when we back off a few bytes from where the last
    1598             :                  * scan ended (we need to back off some since we could be in
    1599             :                  * the middle of the record separator).  If this is too
    1600             :                  * costly, we have to rethink the matter. */
    1601      103554 :                 if (task->from_stdin && *s == '\n' && task->maxrow == BUN_MAX) {
    1602           0 :                         ateof[cur] = true;
    1603           0 :                         goto reportlackofinput;
    1604             :                 }
    1605   145229275 :                 for (e = s; *e && e < end && cnt < task->maxrow;) {
    1606             :                         /* tokenize the record completely
    1607             :                          *
    1608             :                          * The format of the input should comply to the following
    1609             :                          * grammar rule [ [[quote][[esc]char]*[quote]csep]*rsep]*
    1610             :                          * where quote is a single user-defined character.
    1611             :                          * Within the quoted fields a character may be escaped
    1612             :                          * with a backslash.  The correct number of fields should
    1613             :                          * be supplied.  In the first phase we simply break the
    1614             :                          * rows at the record boundary. */
    1615             :                         int nutf = 0;
    1616             :                         int m = 0;
    1617             :                         bool bs = false;
    1618             :                         char q = 0;
    1619             :                         size_t i = 0;
    1620  3153552235 :                         while (*e) {
    1621  3153551192 :                                 if (task->skip > 0) {
    1622             :                                         /* no interpretation of data we're skipping, just
    1623             :                                          * look for newline */
    1624        1827 :                                         if (*e == '\n') {
    1625          27 :                                                 lineno++;
    1626          27 :                                                 break;
    1627             :                                         }
    1628             :                                 } else {
    1629             :                                         /* check for correctly encoded UTF-8 */
    1630  3153549365 :                                         if (nutf > 0) {
    1631        1841 :                                                 if (unlikely((*e & 0xC0) != 0x80))
    1632           1 :                                                         goto badutf8;
    1633        3680 :                                                 if (unlikely(m != 0 && (*e & m) == 0))
    1634           0 :                                                         goto badutf8;
    1635        1840 :                                                 m = 0;
    1636        1840 :                                                 nutf--;
    1637  3153547524 :                                         } else if ((*e & 0x80) != 0) {
    1638        1709 :                                                 if ((*e & 0xE0) == 0xC0) {
    1639        1582 :                                                         nutf = 1;
    1640        1582 :                                                         if (unlikely((e[0] & 0x1E) == 0))
    1641           0 :                                                                 goto badutf8;
    1642         127 :                                                 } else if ((*e & 0xF0) == 0xE0) {
    1643         122 :                                                         nutf = 2;
    1644         122 :                                                         if ((e[0] & 0x0F) == 0)
    1645           3 :                                                                 m = 0x20;
    1646           5 :                                                 } else if (likely((*e & 0xF8) == 0xF0)) {
    1647           5 :                                                         nutf = 3;
    1648           5 :                                                         if ((e[0] & 0x07) == 0)
    1649           5 :                                                                 m = 0x30;
    1650             :                                                 } else {
    1651           0 :                                                         goto badutf8;
    1652             :                                                 }
    1653  3153545815 :                                         } else if (*e == '\n')
    1654   145126151 :                                                 lineno++;
    1655             :                                         /* check for quoting and the row separator */
    1656  3153549364 :                                         if (bs) {
    1657             :                                                 bs = false;
    1658  3153006292 :                                         } else if (task->escape && *e == '\\') {
    1659             :                                                 bs = true;
    1660             :                                                 i = 0;
    1661  3152463220 :                                         } else if (*e == q) {
    1662             :                                                 q = 0;
    1663  3146894538 :                                         } else if (*e == quote) {
    1664             :                                                 q = quote;
    1665             :                                                 i = 0;
    1666  3141325773 :                                         } else if (q == 0) {
    1667  2959386861 :                                                 i = rdfa[i][(unsigned char) *e];
    1668  2959386861 :                                                 if (i == rseplen)
    1669             :                                                         break;
    1670             :                                         }
    1671             :                                 }
    1672  3008425470 :                                 e++;
    1673             :                         }
    1674   145126764 :                         if (*e == 0) {
    1675        1043 :                                 partial = e - s;
    1676             :                                 /* found an incomplete record, saved for next round */
    1677        1043 :                                 if (unlikely(s + partial < end)) {
    1678             :                                         /* found a EOS in the input */
    1679           0 :                                         tablet_error(task, rowno, startlineno, int_nil,
    1680             :                                                                  "record too long (EOS found)", "");
    1681           0 :                                         ateof[cur] = true;
    1682           0 :                                         goto reportlackofinput;
    1683             :                                 }
    1684             :                                 break;
    1685             :                         } else {
    1686   145125721 :                                 rowno++;
    1687   145125721 :                                 if (task->skip > 0) {
    1688          27 :                                         task->skip--;
    1689             :                                 } else {
    1690   145125694 :                                         if (cnt < task->maxrow) {
    1691   145125694 :                                                 task->startlineno[cur][task->top[cur]] = startlineno;
    1692   145125694 :                                                 task->rows[cur][task->top[cur]++] = s;
    1693   145125694 :                                                 startlineno = lineno;
    1694   145125694 :                                                 cnt++;
    1695             :                                         }
    1696   145125694 :                                         *(e + 1 - rseplen) = 0;
    1697             :                                 }
    1698   145125721 :                                 s = ++e;
    1699   145125721 :                                 task->b->pos += (size_t) (e - base);
    1700   145125721 :                                 base = e;
    1701   145125721 :                                 if (task->top[cur] == task->limit)
    1702             :                                         break;
    1703             :                         }
    1704             :                 }
    1705             : 
    1706      102510 :   reportlackofinput:
    1707             : /*        TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
    1708             : 
    1709      103555 :                 if (consoleinput) {
    1710      101923 :                         task->cur = cur;
    1711      101923 :                         task->ateof = ateof[cur];
    1712      101923 :                         task->cnt = bufcnt[cur];
    1713             :                         /* tell consumer to go ahead */
    1714      101923 :                         MT_sema_up(&task->consumer);
    1715             :                         /* then wait until it is done */
    1716      101923 :                         MT_sema_down(&task->producer);
    1717      101923 :                         if (cnt == task->maxrow) {
    1718         784 :                                 GDKfree(rdfa);
    1719         784 :                                 MT_thread_set_qry_ctx(NULL);
    1720         784 :                                 return;
    1721             :                         }
    1722             :                 } else {
    1723        1632 :                         assert(!blocked[cur]);
    1724        1632 :                         if (blocked[(cur + 1) % MAXBUFFERS]) {
    1725             :                                 /* first wait until other buffer is done */
    1726             : /*                              TRC_DEBUG(MAL_SERVER, "Wait for consumers to finish buffer: %d\n", (cur + 1) % MAXBUFFERS);*/
    1727             : 
    1728        1291 :                                 MT_sema_down(&task->producer);
    1729        1291 :                                 blocked[(cur + 1) % MAXBUFFERS] = false;
    1730        1291 :                                 if (task->state == ENDOFCOPY) {
    1731           3 :                                         GDKfree(rdfa);
    1732           3 :                                         MT_thread_set_qry_ctx(NULL);
    1733           3 :                                         return;
    1734             :                                 }
    1735             :                         }
    1736             :                         /* other buffer is done, proceed with current buffer */
    1737        1629 :                         assert(!blocked[(cur + 1) % MAXBUFFERS]);
    1738        1629 :                         blocked[cur] = true;
    1739        1629 :                         task->cur = cur;
    1740        1629 :                         task->ateof = ateof[cur];
    1741        1629 :                         task->cnt = bufcnt[cur];
    1742        1629 :                         more = !ateof[cur] || (e && e < end
    1743           0 :                                                                    && task->top[cur] == task->limit);
    1744             : /*                      TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
    1745             : 
    1746        1629 :                         MT_sema_up(&task->consumer);
    1747             : 
    1748        1629 :                         cur = (cur + 1) % MAXBUFFERS;
    1749             : /*                      TRC_DEBUG(MAL_SERVER, "May continue with buffer: %d\n", cur);*/
    1750             : 
    1751        1629 :                         if (cnt == task->maxrow) {
    1752         182 :                                 MT_sema_down(&task->producer);
    1753             : /*                              TRC_DEBUG(MAL_SERVER, "Producer delivered all\n");*/
    1754         182 :                                 GDKfree(rdfa);
    1755         182 :                                 MT_thread_set_qry_ctx(NULL);
    1756         182 :                                 return;
    1757             :                         }
    1758             :                 }
    1759             : /*              TRC_DEBUG(MAL_SERVER, "Continue producer buffer: %d\n", cur);*/
    1760             : 
    1761             :                 /* we ran out of input? */
    1762      102586 :                 if (task->ateof && !more) {
    1763             : /*                      TRC_DEBUG(MAL_SERVER, "Producer encountered eof\n");*/
    1764         159 :                         GDKfree(rdfa);
    1765         159 :                         MT_thread_set_qry_ctx(NULL);
    1766         159 :                         return;
    1767             :                 }
    1768             :                 /* consumers ask us to stop? */
    1769      102427 :                 if (task->state == ENDOFCOPY) {
    1770           0 :                         GDKfree(rdfa);
    1771           0 :                         MT_thread_set_qry_ctx(NULL);
    1772           0 :                         return;
    1773             :                 }
    1774      102427 :                 bufcnt[cur] = cnt;
    1775             :                 /* move the non-parsed correct row data to the head of the next buffer */
    1776      102427 :                 end = s = task->input[cur];
    1777             :         }
    1778           0 :         if (unlikely(cnt < task->maxrow && task->maxrow != BUN_NONE)) {
    1779           0 :                 char msg[256];
    1780           0 :                 snprintf(msg, sizeof(msg), "incomplete record at end of file:%s\n", s);
    1781           0 :                 task->as->error = GDKstrdup(msg);
    1782           0 :                 tablet_error(task, rowno, startlineno, int_nil,
    1783             :                                          "incomplete record at end of file", s);
    1784           0 :                 task->b->pos += partial;
    1785             :         }
    1786           0 :         GDKfree(rdfa);
    1787           0 :         MT_thread_set_qry_ctx(NULL);
    1788             : 
    1789           0 :         return;
    1790             : 
    1791           1 :   badutf8:
    1792           1 :         tablet_error(task, rowno, startlineno, int_nil,
    1793             :                                  "input not properly encoded UTF-8", "");
    1794           1 :         ateof[cur] = true;
    1795           1 :         goto reportlackofinput;
    1796             : }
    1797             : 
    1798             : static void
    1799        1158 : create_rejects_table(Client cntxt)
    1800             : {
    1801        1158 :         MT_lock_set(&mal_contextLock);
    1802        1158 :         if (cntxt->error_row == NULL) {
    1803         490 :                 cntxt->error_row = COLnew(0, TYPE_lng, 0, TRANSIENT);
    1804         490 :                 cntxt->error_fld = COLnew(0, TYPE_int, 0, TRANSIENT);
    1805         490 :                 cntxt->error_msg = COLnew(0, TYPE_str, 0, TRANSIENT);
    1806         490 :                 cntxt->error_input = COLnew(0, TYPE_str, 0, TRANSIENT);
    1807         490 :                 if (cntxt->error_row == NULL || cntxt->error_fld == NULL
    1808         490 :                         || cntxt->error_msg == NULL || cntxt->error_input == NULL) {
    1809           0 :                         BBPreclaim(cntxt->error_row);
    1810           0 :                         BBPreclaim(cntxt->error_fld);
    1811           0 :                         BBPreclaim(cntxt->error_msg);
    1812           0 :                         BBPreclaim(cntxt->error_input);
    1813           0 :                         cntxt->error_row = cntxt->error_fld = cntxt->error_msg = cntxt->error_input = NULL;
    1814             :                 }
    1815             :         }
    1816        1158 :         MT_lock_unset(&mal_contextLock);
    1817        1158 : }
    1818             : 
    1819             : BUN
    1820        1128 : SQLload_file(Client cntxt, Tablet *as, bstream *b, stream *out,
    1821             :                          const char *csep, const char *rsep, char quote, lng skip,
    1822             :                          lng maxrow, int best, bool from_stdin, const char *tabnam,
    1823             :                          bool escape)
    1824             : {
    1825        1128 :         BUN cnt = 0, cntstart = 0, leftover = 0;
    1826        1128 :         int res = 0;                            /* < 0: error, > 0: success, == 0: continue processing */
    1827        1128 :         int j;
    1828        1128 :         BUN firstcol;
    1829        1128 :         BUN i, attr;
    1830        1128 :         READERtask task;
    1831        1128 :         READERtask ptask[MAXWORKERS];
    1832        1128 :         int threads = 1;
    1833        1128 :         lng tio, t1 = 0;
    1834        1128 :         char name[MT_NAME_LEN];
    1835             : 
    1836        1128 :         if (maxrow < 0 || maxrow > (LL_CONSTANT(1) << 16)) {
    1837         116 :                 threads = GDKgetenv_int("tablet_threads", GDKnr_threads);
    1838         116 :                 if (threads > 1)
    1839         116 :                         threads = threads < MAXWORKERS ? threads - 1 : MAXWORKERS - 1;
    1840             :                 else
    1841             :                         threads = 1;
    1842             :         }
    1843             : 
    1844             : /*      TRC_DEBUG(MAL_SERVER, "Prepare copy work for '%d' threads col '%s' rec '%s' quot '%c'\n", threads, csep, rsep, quote);*/
    1845             : 
    1846        1128 :         memset(ptask, 0, sizeof(ptask));
    1847        2256 :         task = (READERtask) {
    1848             :                 .cntxt = cntxt,
    1849             :                 .from_stdin = from_stdin,
    1850             :                 .as = as,
    1851             :                 .escape = escape,       /* TODO: implement feature!!! */
    1852        1128 :                 .set_qry_ctx = MT_thread_get_qry_ctx() != NULL,
    1853             :         };
    1854             : 
    1855             :         /* create the reject tables */
    1856        1128 :         create_rejects_table(task.cntxt);
    1857        1128 :         if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL
    1858        1128 :                 || task.cntxt->error_msg == NULL || task.cntxt->error_input == NULL) {
    1859           0 :                 tablet_error(&task, lng_nil, lng_nil, int_nil,
    1860             :                                          "SQLload initialization failed", "");
    1861             :                 /* nothing allocated yet, so nothing to free */
    1862           0 :                 return BUN_NONE;
    1863             :         }
    1864             : 
    1865        1128 :         assert(rsep);
    1866        1128 :         assert(csep);
    1867        1128 :         assert(maxrow < 0 || maxrow <= (lng) BUN_MAX);
    1868        1128 :         task.fields = (char ***) GDKzalloc(as->nr_attrs * sizeof(char **));
    1869        1128 :         task.cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
    1870        1128 :         task.time = (lng *) GDKzalloc(as->nr_attrs * sizeof(lng));
    1871        1128 :         if (task.fields == NULL || task.cols == NULL || task.time == NULL) {
    1872           0 :                 tablet_error(&task, lng_nil, lng_nil, int_nil,
    1873             :                                          "memory allocation failed", "SQLload_file");
    1874           0 :                 goto bailout;
    1875             :         }
    1876        1128 :         task.cur = 0;
    1877        3384 :         for (i = 0; i < MAXBUFFERS; i++) {
    1878        2256 :                 task.base[i] = GDKmalloc(MAXROWSIZE(2 * b->size) + 2);
    1879        2256 :                 task.rowlimit[i] = MAXROWSIZE(2 * b->size);
    1880        2256 :                 if (task.base[i] == NULL) {
    1881           0 :                         tablet_error(&task, lng_nil, lng_nil, int_nil,
    1882             :                                                  SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
    1883           0 :                         goto bailout;
    1884             :                 }
    1885        2256 :                 task.base[i][0] = task.base[i][b->size + 1] = 0;
    1886        2256 :                 task.input[i] = task.base[i] + 1;       /* wrap the buffer with null bytes */
    1887             :         }
    1888        1128 :         task.besteffort = best;
    1889             : 
    1890        1128 :         if (maxrow < 0)
    1891          82 :                 task.maxrow = BUN_MAX;
    1892             :         else
    1893        1046 :                 task.maxrow = (BUN) maxrow;
    1894             : 
    1895        1128 :         task.skip = skip;
    1896        1128 :         task.quote = quote;
    1897        1128 :         task.csep = csep;
    1898        1128 :         task.seplen = strlen(csep);
    1899        1128 :         task.rsep = rsep;
    1900        1128 :         task.rseplen = strlen(rsep);
    1901        1128 :         task.errbuf = cntxt->errbuf;
    1902             : 
    1903        1128 :         MT_sema_init(&task.producer, 0, "task.producer");
    1904        1128 :         MT_sema_init(&task.consumer, 0, "task.consumer");
    1905        1128 :         task.ateof = false;
    1906        1128 :         task.b = b;
    1907        1128 :         task.out = out;
    1908             : 
    1909        1128 :         as->error = NULL;
    1910             : 
    1911             :         /* there is no point in creating more threads than we have columns */
    1912        1128 :         if (as->nr_attrs < (BUN) threads)
    1913          74 :                 threads = (int) as->nr_attrs;
    1914             : 
    1915             :         /* allocate enough space for pointers into the buffer pool.  */
    1916             :         /* the record separator is considered a column */
    1917        1128 :         task.limit = (int) (b->size / as->nr_attrs + as->nr_attrs);
    1918       11506 :         for (i = 0; i < as->nr_attrs; i++) {
    1919       10378 :                 task.fields[i] = GDKmalloc(sizeof(char *) * task.limit);
    1920       10378 :                 if (task.fields[i] == NULL) {
    1921           0 :                         if (task.as->error == NULL)
    1922           0 :                                 as->error = createException(MAL, "sql.copy_from",
    1923             :                                                                                         SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1924           0 :                         goto bailout;
    1925             :                 }
    1926       10378 :                 task.cols[i] = (int) (i + 1);   /* to distinguish non initialized later with zero */
    1927             :         }
    1928        3384 :         for (i = 0; i < MAXBUFFERS; i++) {
    1929        2256 :                 task.rows[i] = GDKzalloc(sizeof(char *) * task.limit);
    1930        2256 :                 task.startlineno[i] = GDKzalloc(sizeof(lng) * task.limit);
    1931        2256 :                 if (task.rows[i] == NULL || task.startlineno[i] == NULL) {
    1932           0 :                         GDKfree(task.rows[i]);
    1933           0 :                         GDKfree(task.startlineno[i]);
    1934           0 :                         tablet_error(&task, lng_nil, lng_nil, int_nil,
    1935             :                                                  SQLSTATE(HY013) MAL_MALLOC_FAIL,
    1936             :                                                  "SQLload_file:failed to alloc buffers");
    1937           0 :                         goto bailout;
    1938             :                 }
    1939             :         }
    1940        1128 :         task.rowerror = (bte *) GDKzalloc(sizeof(bte) * task.limit);
    1941        1128 :         if (task.rowerror == NULL) {
    1942           0 :                 tablet_error(&task, lng_nil, lng_nil, int_nil,
    1943             :                                          SQLSTATE(HY013) MAL_MALLOC_FAIL,
    1944             :                                          "SQLload_file:failed to alloc rowerror buffer");
    1945           0 :                 goto bailout;
    1946             :         }
    1947             : 
    1948        1128 :         task.id = 0;
    1949        1128 :         snprintf(name, sizeof(name), "prod-%s", tabnam);
    1950        1128 :         if (MT_create_thread(&task.tid, SQLproducer, (void *) &task, MT_THR_JOINABLE, name) < 0) {
    1951           0 :                 tablet_error(&task, lng_nil, lng_nil, int_nil,
    1952             :                                          SQLSTATE(42000) "failed to start producer thread",
    1953             :                                          "SQLload_file");
    1954           0 :                 goto bailout;
    1955             :         }
    1956             : /*      TRC_DEBUG(MAL_SERVER, "Parallel bulk load " LLFMT " - " BUNFMT "\n", skip, task.maxrow);*/
    1957             : 
    1958        1128 :         task.workers = threads;
    1959        2625 :         for (j = 0; j < threads; j++) {
    1960        1497 :                 ptask[j] = task;
    1961        1497 :                 ptask[j].id = j;
    1962        1497 :                 ptask[j].cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
    1963        1497 :                 if (ptask[j].cols == NULL) {
    1964           0 :                         tablet_error(&task, lng_nil, lng_nil, int_nil,
    1965             :                                                  SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
    1966           0 :                         task.id = -1;
    1967           0 :                         MT_sema_up(&task.producer);
    1968           0 :                         goto bailout;
    1969             :                 }
    1970        1497 :                 snprintf(name, sizeof(name), "ptask%d.sema", j);
    1971        1497 :                 MT_sema_init(&ptask[j].sema, 0, name);
    1972        1497 :                 snprintf(name, sizeof(name), "ptask%d.repl", j);
    1973        1497 :                 MT_sema_init(&ptask[j].reply, 0, name);
    1974        1497 :                 snprintf(name, sizeof(name), "wrkr%d-%s", j, tabnam);
    1975        1497 :                 if (MT_create_thread(&ptask[j].tid, SQLworker, (void *) &ptask[j], MT_THR_JOINABLE, name) < 0) {
    1976           0 :                         tablet_error(&task, lng_nil, lng_nil, int_nil,
    1977             :                                                  SQLSTATE(42000) "failed to start worker thread",
    1978             :                                                  "SQLload_file");
    1979           0 :                         threads = j;
    1980           0 :                         for (j = 0; j < threads; j++)
    1981           0 :                                 ptask[j].workers = threads;
    1982             :                 }
    1983             :         }
    1984        1128 :         if (threads == 0) {
    1985             :                 /* no threads started */
    1986           0 :                 task.id = -1;
    1987           0 :                 MT_sema_up(&task.producer);
    1988           0 :                 goto bailout;
    1989             :         }
    1990        1128 :         MT_sema_up(&task.producer);
    1991             : 
    1992        1128 :         tio = GDKusec();
    1993        1128 :         tio = GDKusec() - tio;
    1994        1128 :         t1 = GDKusec();
    1995        2259 :         for (firstcol = 0; firstcol < task.as->nr_attrs; firstcol++)
    1996        1131 :                 if (task.as->format[firstcol].c != NULL)
    1997             :                         break;
    1998      104495 :         while (res == 0 && cnt < task.maxrow) {
    1999             : 
    2000             :                 // track how many elements are in the aggregated BATs
    2001      103552 :                 cntstart = BATcount(task.as->format[firstcol].c);
    2002             :                 /* block until the producer has data available */
    2003      103552 :                 MT_sema_down(&task.consumer);
    2004      103552 :                 cnt += task.top[task.cur];
    2005      103552 :                 if (task.ateof && !task.top[task.cur])
    2006             :                         break;
    2007      103393 :                 t1 = GDKusec() - t1;
    2008             : /*              TRC_DEBUG(MAL_SERVER, "Break: %d rows\n", task.top[task.cur]);*/
    2009             : 
    2010      103393 :                 t1 = GDKusec();
    2011      103393 :                 if (task.top[task.cur]) {
    2012             :                         /* activate the workers to break rows */
    2013      410607 :                         for (j = 0; j < threads; j++) {
    2014             :                                 /* stage one, break the rows in parallel */
    2015      307243 :                                 ptask[j].error = 0;
    2016      307243 :                                 ptask[j].state = BREAKROW;
    2017      307243 :                                 ptask[j].next = task.top[task.cur];
    2018      307243 :                                 ptask[j].fields = task.fields;
    2019      307243 :                                 ptask[j].limit = task.limit;
    2020      307243 :                                 ptask[j].cnt = task.cnt;
    2021      307243 :                                 ptask[j].cur = task.cur;
    2022      307243 :                                 ptask[j].top[task.cur] = task.top[task.cur];
    2023      307243 :                                 MT_sema_up(&ptask[j].sema);
    2024             :                         }
    2025             :                 }
    2026      103393 :                 if (task.top[task.cur]) {
    2027             :                         /* await completion of row break phase */
    2028      410607 :                         for (j = 0; j < threads; j++) {
    2029      307243 :                                 MT_sema_down(&ptask[j].reply);
    2030      307243 :                                 if (ptask[j].error) {
    2031           0 :                                         res = -1;
    2032             : /*                                      TRC_ERROR(MAL_SERVER, "Error in task: %d %d\n", j, ptask[j].error);*/
    2033             :                                 }
    2034             :                         }
    2035             :                 }
    2036             : 
    2037             : /*              TRC_DEBUG(MAL_SERVER,
    2038             :                         "Fill the BATs '%d' " BUNFMT " cap " BUNFMT "\n",
    2039             :                         task.top[task.cur], task.cnt, BATcapacity(as->format[task.cur].c));*/
    2040             : 
    2041      103393 :                 if (task.top[task.cur]) {
    2042      103364 :                         if (res == 0) {
    2043      103364 :                                 SQLworkdivider(&task, ptask, (int) as->nr_attrs, threads);
    2044             : 
    2045             :                                 /* activate the workers to update the BATs */
    2046      513971 :                                 for (j = 0; j < threads; j++) {
    2047             :                                         /* stage two, update the BATs */
    2048      307243 :                                         ptask[j].state = UPDATEBAT;
    2049      307243 :                                         MT_sema_up(&ptask[j].sema);
    2050             :                                 }
    2051             :                         }
    2052             :                 }
    2053      103393 :                 tio = GDKusec();
    2054      103393 :                 tio = t1 - tio;
    2055             : 
    2056             :                 /* await completion of the BAT updates */
    2057      103393 :                 if (res == 0 && task.top[task.cur]) {
    2058      410607 :                         for (j = 0; j < threads; j++) {
    2059      307243 :                                 MT_sema_down(&ptask[j].reply);
    2060      307243 :                                 if (ptask[j].errorcnt > 0 && !ptask[j].besteffort) {
    2061      307243 :                                         res = -1;
    2062      307243 :                                         best = 0;
    2063             :                                 }
    2064             :                         }
    2065             :                 }
    2066             : 
    2067             :                 /* trim the BATs discarding error tuples */
    2068             : #define trimerrors(TYPE)                                                                                                \
    2069             :                 do {                                                                                                                    \
    2070             :                         TYPE *src, *dst;                                                                                        \
    2071             :                         leftover= BATcount(task.as->format[attr].c);                         \
    2072             :                         limit = leftover - cntstart;                                                            \
    2073             :                         dst =src= (TYPE *) BUNtloc(task.as->format[attr].ci,cntstart); \
    2074             :                         for(j = 0; j < (int) limit; j++, src++){                                     \
    2075             :                                 if ( task.rowerror[j]){                                                                 \
    2076             :                                         leftover--;                                                                                     \
    2077             :                                         continue;                                                                                       \
    2078             :                                 }                                                                                                               \
    2079             :                                 *dst++ = *src;                                                                                  \
    2080             :                         }                                                                                                                       \
    2081             :                         BATsetcount(task.as->format[attr].c, leftover );                     \
    2082             :                 } while (0)
    2083             : 
    2084             : /*              TRC_DEBUG(MAL_SERVER, "Trim bbest '%d' table size " BUNFMT " - rows found so far " BUNFMT "\n",
    2085             :                                          best, BATcount(as->format[firstcol].c), task.cnt); */
    2086             : 
    2087      103393 :                 if (best && BATcount(as->format[firstcol].c)) {
    2088             :                         BUN limit;
    2089             :                         int width;
    2090             : 
    2091          53 :                         for (attr = 0; attr < as->nr_attrs; attr++) {
    2092          37 :                                 if (as->format[attr].skip)
    2093           5 :                                         continue;
    2094          32 :                                 width = as->format[attr].c->twidth;
    2095          32 :                                 as->format[attr].ci = bat_iterator_nolock(as->format[attr].c);
    2096          32 :                                 switch (width) {
    2097           5 :                                 case 1:
    2098          16 :                                         trimerrors(bte);
    2099           5 :                                         break;
    2100           0 :                                 case 2:
    2101           0 :                                         trimerrors(sht);
    2102           0 :                                         break;
    2103          27 :                                 case 4:
    2104       30061 :                                         trimerrors(int);
    2105          27 :                                         break;
    2106           0 :                                 case 8:
    2107           0 :                                         trimerrors(lng);
    2108           0 :                                         break;
    2109             : #ifdef HAVE_HGE
    2110           0 :                                 case 16:
    2111           0 :                                         trimerrors(hge);
    2112           0 :                                         break;
    2113             : #endif
    2114           0 :                                 default:
    2115             :                                 {
    2116           0 :                                         char *src, *dst;
    2117           0 :                                         leftover = BATcount(task.as->format[attr].c);
    2118           0 :                                         limit = leftover - cntstart;
    2119           0 :                                         dst = src = BUNtloc(task.as->format[attr].ci, cntstart);
    2120           0 :                                         for (j = 0; j < (int) limit; j++, src += width) {
    2121           0 :                                                 if (task.rowerror[j]) {
    2122           0 :                                                         leftover--;
    2123           0 :                                                         continue;
    2124             :                                                 }
    2125           0 :                                                 if (dst != src)
    2126           0 :                                                         memcpy(dst, src, width);
    2127           0 :                                                 dst += width;
    2128             :                                         }
    2129           0 :                                         BATsetcount(task.as->format[attr].c, leftover);
    2130             :                                 }
    2131           0 :                                         break;
    2132             :                                 }
    2133             :                         }
    2134             :                         // re-initialize the error vector;
    2135          16 :                         memset(task.rowerror, 0, task.limit);
    2136          16 :                         task.errorcnt = 0;
    2137             :                 }
    2138             : 
    2139      103393 :                 if (res < 0) {
    2140             :                         /* producer should stop */
    2141          26 :                         task.maxrow = cnt;
    2142          26 :                         task.state = ENDOFCOPY;
    2143          26 :                         task.ateof = true;
    2144             :                 }
    2145      103393 :                 if (task.ateof && task.top[task.cur] < task.limit && cnt != task.maxrow)
    2146             :                         break;
    2147      103393 :                 task.top[task.cur] = 0;
    2148      103393 :                 if (cnt == task.maxrow)
    2149         969 :                         task.ateof = true;
    2150      104521 :                 MT_sema_up(&task.producer);
    2151             :         }
    2152             : 
    2153             : /*      TRC_DEBUG(MAL_SERVER, "End of block stream eof=%d - res=%d\n", task.ateof, res);*/
    2154             : 
    2155        1128 :         cnt = BATcount(task.as->format[firstcol].c);
    2156             : 
    2157        1128 :         task.state = ENDOFCOPY;
    2158             : /*      TRC_DEBUG(MAL_SERVER, "Activate sync on disk\n");*/
    2159             : 
    2160        1128 :         if (!task.ateof || cnt < task.maxrow) {
    2161             : /*              TRC_DEBUG(MAL_SERVER, "Shut down reader\n");*/
    2162         177 :                 MT_sema_up(&task.producer);
    2163             :         }
    2164        1128 :         MT_join_thread(task.tid);
    2165             : 
    2166             : /*      TRC_DEBUG(MAL_SERVER, "Activate endofcopy\n");*/
    2167             : 
    2168        3753 :         for (j = 0; j < threads; j++) {
    2169        1497 :                 ptask[j].state = ENDOFCOPY;
    2170        1497 :                 MT_sema_up(&ptask[j].sema);
    2171             :         }
    2172             :         /* wait for their death */
    2173        2625 :         for (j = 0; j < threads; j++)
    2174        1497 :                 MT_sema_down(&ptask[j].reply);
    2175             : 
    2176             : /*      TRC_DEBUG(MAL_SERVER, "Kill the workers\n");*/
    2177             : 
    2178        2625 :         for (j = 0; j < threads; j++) {
    2179        1497 :                 MT_join_thread(ptask[j].tid);
    2180        1497 :                 GDKfree(ptask[j].cols);
    2181        1497 :                 MT_sema_destroy(&ptask[j].sema);
    2182        1497 :                 MT_sema_destroy(&ptask[j].reply);
    2183             :         }
    2184             : 
    2185             : /*      TRC_DEBUG(MAL_SERVER, "Found " BUNFMT " tuples\n", cnt);*/
    2186             : /*      TRC_DEBUG(MAL_SERVER, "Leftover input: %.63s\n", task.b->buf + task.b->pos);*/
    2187             : 
    2188       11506 :         for (i = 0; i < as->nr_attrs; i++) {
    2189       10378 :                 BAT *b = task.as->format[i].c;
    2190       10378 :                 if (b)
    2191       10372 :                         BATsettrivprop(b);
    2192       10378 :                 GDKfree(task.fields[i]);
    2193             :         }
    2194        1128 :         GDKfree(task.fields);
    2195        1128 :         GDKfree(task.cols);
    2196        1128 :         GDKfree(task.time);
    2197        4512 :         for (i = 0; i < MAXBUFFERS; i++) {
    2198        2256 :                 GDKfree(task.base[i]);
    2199        2256 :                 GDKfree(task.rows[i]);
    2200        2256 :                 GDKfree(task.startlineno[i]);
    2201             :         }
    2202        1128 :         if (task.rowerror)
    2203        1128 :                 GDKfree(task.rowerror);
    2204        1128 :         MT_sema_destroy(&task.producer);
    2205        1128 :         MT_sema_destroy(&task.consumer);
    2206             : 
    2207        1128 :         return res < 0 ? BUN_NONE : cnt;
    2208             : 
    2209           0 :   bailout:
    2210           0 :         if (task.fields) {
    2211           0 :                 for (i = 0; i < as->nr_attrs; i++)
    2212           0 :                         GDKfree(task.fields[i]);
    2213           0 :                 GDKfree(task.fields);
    2214             :         }
    2215           0 :         GDKfree(task.time);
    2216           0 :         GDKfree(task.cols);
    2217           0 :         GDKfree(task.base[task.cur]);
    2218           0 :         GDKfree(task.rowerror);
    2219           0 :         for (i = 0; i < MAXWORKERS; i++)
    2220           0 :                 GDKfree(ptask[i].cols);
    2221             :         return BUN_NONE;
    2222             : }
    2223             : 
    2224             : /* return the latest reject table */
    2225             : str
    2226          30 : COPYrejects(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    2227             : {
    2228          30 :         bat *row = getArgReference_bat(stk, pci, 0);
    2229          30 :         bat *fld = getArgReference_bat(stk, pci, 1);
    2230          30 :         bat *msg = getArgReference_bat(stk, pci, 2);
    2231          30 :         bat *inp = getArgReference_bat(stk, pci, 3);
    2232             : 
    2233          30 :         create_rejects_table(cntxt);
    2234          30 :         if (cntxt->error_row == NULL)
    2235           0 :                 throw(MAL, "sql.rejects", "No reject table available");
    2236          30 :         MT_lock_set(&errorlock);
    2237          30 :         BAT *bn1 = COLcopy(cntxt->error_row, cntxt->error_row->ttype, true, TRANSIENT);
    2238          30 :         BAT *bn2 = COLcopy(cntxt->error_fld, cntxt->error_fld->ttype, true, TRANSIENT);
    2239          30 :         BAT *bn3 = COLcopy(cntxt->error_msg, cntxt->error_msg->ttype, true, TRANSIENT);
    2240          30 :         BAT *bn4 = COLcopy(cntxt->error_input, cntxt->error_input->ttype, true, TRANSIENT);
    2241          30 :         MT_lock_unset(&errorlock);
    2242          30 :         if (bn1 == NULL || bn2 == NULL || bn3 == NULL || bn4 == NULL) {
    2243           0 :                 BBPreclaim(bn1);
    2244           0 :                 BBPreclaim(bn2);
    2245           0 :                 BBPreclaim(bn3);
    2246           0 :                 BBPreclaim(bn4);
    2247           0 :                 throw(MAL, "sql.rejects", GDK_EXCEPTION);
    2248             :         }
    2249          30 :         *row = bn1->batCacheid;
    2250          30 :         *fld = bn2->batCacheid;
    2251          30 :         *msg = bn3->batCacheid;
    2252          30 :         *inp = bn4->batCacheid;
    2253          30 :         BBPkeepref(bn1);
    2254          30 :         BBPkeepref(bn2);
    2255          30 :         BBPkeepref(bn3);
    2256          30 :         BBPkeepref(bn4);
    2257          30 :         (void) mb;
    2258          30 :         return MAL_SUCCEED;
    2259             : }
    2260             : 
    2261             : str
    2262          14 : COPYrejects_clear(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    2263             : {
    2264          14 :         if (cntxt->error_row) {
    2265          13 :                 MT_lock_set(&errorlock);
    2266          13 :                 BATclear(cntxt->error_row, true);
    2267          13 :                 if (cntxt->error_fld)
    2268          13 :                         BATclear(cntxt->error_fld, true);
    2269          13 :                 if (cntxt->error_msg)
    2270          13 :                         BATclear(cntxt->error_msg, true);
    2271          13 :                 if (cntxt->error_input)
    2272          13 :                         BATclear(cntxt->error_input, true);
    2273          13 :                 MT_lock_unset(&errorlock);
    2274             :         }
    2275          14 :         (void) mb;
    2276          14 :         (void) stk;
    2277          14 :         (void) pci;
    2278          14 :         return MAL_SUCCEED;
    2279             : }

Generated by: LCOV version 1.14