LCOV - code coverage report
Current view: top level - gdk - gdk_logger.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 1552 2131 72.8 %
Date: 2024-04-25 20:03:45 Functions: 77 78 98.7 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "gdk.h"
      15             : #include "gdk_private.h"
      16             : #include "gdk_logger.h"
      17             : #include "gdk_logger_internals.h"
      18             : #include "mutils.h"
      19             : #include <string.h>
      20             : 
      21             : static gdk_return log_add_bat(logger *lg, BAT *b, log_id id, int tid);
      22             : static gdk_return log_del_bat(logger *lg, log_bid bid);
      23             : /*
      24             :  * The logger uses a directory to store its log files. One master log
      25             :  * file stores information about the version of the logger and the
      26             :  * type mapping it uses. This file is a simple ascii file with the
      27             :  * following format:
      28             :  *  {6DIGIT-VERSION\n[id,type_name\n]*}
      29             :  * The transaction log files have a binary format.
      30             :  */
      31             : 
      32             : #define LOG_START       0
      33             : #define LOG_END         1
      34             : #define LOG_UPDATE_CONST        2
      35             : #define LOG_UPDATE_BULK 3
      36             : #define LOG_UPDATE      4
      37             : #define LOG_CREATE      5
      38             : #define LOG_DESTROY     6
      39             : #define LOG_SEQ         7
      40             : #define LOG_CLEAR       8       /* DEPRECATED */
      41             : #define LOG_BAT_GROUP   9
      42             : 
      43             : #ifdef NATIVE_WIN32
      44             : #define getfilepos _ftelli64
      45             : #else
      46             : #ifdef HAVE_FSEEKO
      47             : #define getfilepos ftello
      48             : #else
      49             : #define getfilepos ftell
      50             : #endif
      51             : #endif
      52             : 
      53             : #define BATSIZE 0
      54             : 
      55             : #define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory || (lg)->flushnow)
      56             : 
      57             : static const char *log_commands[] = {
      58             :         "LOG_START",
      59             :         "LOG_END",
      60             :         "LOG_UPDATE_CONST",
      61             :         "LOG_UPDATE_BULK",
      62             :         "LOG_UPDATE",
      63             :         "LOG_CREATE",
      64             :         "LOG_DESTROY",
      65             :         "LOG_SEQ",
      66             :         "",                   /* LOG_CLEAR IS DEPRECATED */
      67             :         "LOG_BAT_GROUP",
      68             : };
      69             : 
      70             : typedef struct logaction {
      71             :         int type;               /* type of change */
      72             :         lng nr;
      73             :         int tt;
      74             :         lng id;
      75             :         lng offset;
      76             :         log_id cid;             /* id of object */
      77             :         BAT *b;                 /* temporary bat with changes */
      78             :         BAT *uid;               /* temporary bat with bun positions to update */
      79             : } logaction;
      80             : 
      81             : /* during the recover process a number of transactions could be active */
      82             : typedef struct trans {
      83             :         int tid;                /* transaction id */
      84             :         int sz;                 /* sz of the changes array */
      85             :         int nr;                 /* nr of changes */
      86             : 
      87             :         logaction *changes;
      88             : 
      89             :         struct trans *tr;
      90             : } trans;
      91             : 
      92             : typedef struct logformat_t {
      93             :         bte flag;
      94             :         int id;
      95             : } logformat;
      96             : 
      97             : typedef enum { LOG_OK, LOG_EOF, LOG_ERR } log_return;
      98             : 
      99             : static gdk_return bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated);
     100             : static gdk_return tr_grow(trans *tr);
     101             : 
     102             : #define log_lock(lg)    MT_lock_set(&(lg)->lock)
     103             : #define log_unlock(lg)  MT_lock_unset(&(lg)->lock)
     104             : 
     105             : static inline bte
     106      910009 : find_type(logger *lg, int tpe)
     107             : {
     108      910009 :         assert(tpe >= 0 && tpe < MAXATOMS);
     109      910009 :         return lg->type_id[tpe];
     110             : }
     111             : 
     112             : static inline int
     113      545189 : find_type_nr(logger *lg, bte tpe)
     114             : {
     115      545189 :         int nr = lg->type_nr[tpe < 0 ? 256 + tpe : tpe];
     116      545189 :         if (nr == 255)
     117           0 :                 return -1;
     118             :         return nr;
     119             : }
     120             : 
     121             : static BUN
     122      418235 : log_find(BAT *b, BAT *d, int val)
     123             : {
     124      418235 :         BUN p;
     125             : 
     126      418235 :         assert(b->ttype == TYPE_int);
     127      418235 :         assert(d->ttype == TYPE_oid);
     128      418235 :         BATiter bi = bat_iterator(b);
     129      418235 :         if (BAThash(b) == GDK_SUCCEED) {
     130      418235 :                 MT_rwlock_rdlock(&b->thashlock);
     131      661984 :                 HASHloop_int(bi, b->thash, p, &val) {
     132      183615 :                         oid pos = p;
     133      183615 :                         if (BUNfnd(d, &pos) == BUN_NONE) {
     134      183615 :                                 MT_rwlock_rdunlock(&b->thashlock);
     135      183615 :                                 bat_iterator_end(&bi);
     136      183615 :                                 return p;
     137             :                         }
     138             :                 }
     139      234620 :                 MT_rwlock_rdunlock(&b->thashlock);
     140             :         } else {                /* unlikely: BAThash failed */
     141           0 :                 int *t = (int *) bi.base;
     142             : 
     143           0 :                 for (p = 0; p < bi.count; p++) {
     144           0 :                         if (t[p] == val) {
     145           0 :                                 oid pos = p;
     146           0 :                                 if (BUNfnd(d, &pos) == BUN_NONE) {
     147           0 :                                         bat_iterator_end(&bi);
     148           0 :                                         return p;
     149             :                                 }
     150             :                         }
     151             :                 }
     152             :         }
     153      234620 :         bat_iterator_end(&bi);
     154      234620 :         return BUN_NONE;
     155             : }
     156             : 
     157             : static log_bid
     158      645401 : internal_find_bat(logger *lg, log_id id, int tid)
     159             : {
     160      645401 :         BUN p;
     161             : 
     162      645401 :         if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
     163      645401 :                 BATiter cni = bat_iterator(lg->catalog_id);
     164      645401 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     165      645401 :                 if (tid < 0) {
     166      448470 :                         HASHloop_int(cni, cni.b->thash, p, &id) {
     167      205444 :                                 oid pos = p;
     168      205444 :                                 if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
     169      205444 :                                         MT_rwlock_rdunlock(&cni.b->thashlock);
     170      205444 :                                         bat_iterator_end(&cni);
     171      205444 :                                         return *(log_bid *) Tloc(lg->catalog_bid, p);
     172             :                                 }
     173             :                         }
     174             :                 } else {
     175      361561 :                         BUN cp = BUN_NONE;
     176     1651307 :                         HASHloop_int(cni, cni.b->thash, p, &id) {
     177     1077829 :                                 lng lid = *(lng *) Tloc(lg->catalog_lid, p);
     178     1077829 :                                 if (lid != lng_nil && lid <= tid) {
     179             :                                         break;
     180             :                                 }
     181             :                                 cp = p;
     182             :                         }
     183      361561 :                         if (cp != BUN_NONE) {
     184      361299 :                                 MT_rwlock_rdunlock(&cni.b->thashlock);
     185      361299 :                                 bat_iterator_end(&cni);
     186      361299 :                                 return *(log_bid *) Tloc(lg->catalog_bid, cp);
     187             :                         }
     188             :                 }
     189       78658 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     190       78658 :                 bat_iterator_end(&cni);
     191       78658 :                 return 0;       /* not found */
     192             :         }
     193             :         return -1;              /* error creating hash */
     194             : }
     195             : 
     196             : static inline void
     197       15885 : logbat_destroy(BAT *b)
     198             : {
     199       19210 :         BBPreclaim(b);
     200        5631 : }
     201             : 
     202             : static BAT *
     203       14294 : logbat_new(int tt, BUN size, role_t role)
     204             : {
     205       14294 :         BAT *nb = COLnew(0, tt, size, role);
     206             : 
     207       14294 :         if (nb) {
     208       14294 :                 BBP_pid(nb->batCacheid) = 0;
     209       14294 :                 if (role == PERSISTENT)
     210        8978 :                         BATmode(nb, false);
     211             :         } else {
     212           0 :                 TRC_CRITICAL(GDK, "creating new BAT[%s]#" BUNFMT " failed\n", ATOMname(tt), size);
     213             :         }
     214       14294 :         return nb;
     215             : }
     216             : 
     217             : static bool
     218      755954 : log_read_format(logger *lg, logformat *data)
     219             : {
     220      755954 :         assert(!lg->inmemory);
     221      755954 :         if (mnstr_read(lg->input_log, &data->flag, 1, 1) == 1) {
     222      744204 :                 if (mnstr_readInt(lg->input_log, &data->id) == 1)
     223             :                         return true;
     224             :                 /* could only read part, so complain */
     225           0 :                 TRC_CRITICAL(GDK, "read failed\n");
     226             :         }
     227             :         return false;
     228             : }
     229             : 
     230             : static gdk_return
     231      752417 : log_write_format(logger *lg, logformat *data)
     232             : {
     233      752417 :         assert(data->id || data->flag);
     234      752417 :         assert(!lg->inmemory);
     235      752417 :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
     236     1504834 :         if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
     237     1504834 :             mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
     238      752417 :             mnstr_writeInt(lg->current->output_log, data->id))
     239             :                 return GDK_SUCCEED;
     240           0 :         TRC_CRITICAL(GDK, "write failed\n");
     241           0 :         return GDK_FAIL;
     242             : }
     243             : 
     244             : static log_return
     245        2683 : log_read_seq(logger *lg, logformat *l)
     246             : {
     247        2683 :         int seq = l->id;
     248        2683 :         lng val;
     249        2683 :         BUN p;
     250             : 
     251        2683 :         assert(!lg->inmemory);
     252        2683 :         if (mnstr_readLng(lg->input_log, &val) != 1) {
     253           0 :                 TRC_CRITICAL(GDK, "read failed\n");
     254           0 :                 return LOG_EOF;
     255             :         }
     256        2683 :         if (lg->flushing)
     257             :                 return LOG_OK;
     258             : 
     259          50 :         if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
     260          49 :             p >= lg->seqs_id->batInserted) {
     261          19 :                 assert(lg->seqs_val->hseqbase == 0);
     262          19 :                 if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
     263           0 :                         TRC_CRITICAL(GDK, "replace of %s_seqs_val failed\n", lg->fn);
     264           0 :                         return LOG_ERR;
     265             :                 }
     266             :         } else {
     267          30 :                 if (p != BUN_NONE) {
     268          30 :                         oid pos = p;
     269          30 :                         if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
     270           0 :                                 TRC_CRITICAL(GDK, "append to %s_dseqs failed\n", lg->fn);
     271           0 :                                 return LOG_ERR;
     272             :                         }
     273             :                 }
     274          62 :                 if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
     275          31 :                     BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
     276           0 :                         TRC_CRITICAL(GDK, "append to %s_seqs_val/id failed\n", lg->fn);
     277           0 :                         return LOG_ERR;
     278             :                 }
     279             :         }
     280             :         return LOG_OK;
     281             : }
     282             : 
     283             : #if 0
     284             : static gdk_return
     285             : log_write_id(logger *lg, int id)
     286             : {
     287             :         assert(!lg->inmemory);
     288             :         assert(id >= 0);
     289             :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
     290             :         if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
     291             :             mnstr_writeInt(lg->current->output_log, id))
     292             :                 return GDK_SUCCEED;
     293             :         TRC_CRITICAL(GDK, "write failed\n");
     294             :         return GDK_FAIL;
     295             : }
     296             : 
     297             : static log_return
     298             : log_read_id(logger *lg, log_id *id)
     299             : {
     300             :         assert(!lg->inmemory);
     301             :         if (mnstr_readInt(lg->input_log, id) != 1) {
     302             :                 TRC_CRITICAL(GDK, "read failed\n");
     303             :                 return LOG_EOF;
     304             :         }
     305             :         return LOG_OK;
     306             : }
     307             : #endif
     308             : 
     309             : static log_return
     310       18006 : string_reader(logger *lg, BAT *b, lng nr)
     311             : {
     312       18006 :         size_t sz = 0;
     313       18006 :         lng SZ = 0;
     314       18006 :         log_return res = LOG_OK;
     315             : 
     316       36253 :         while (nr && res == LOG_OK) {
     317       18247 :                 if (mnstr_readLng(lg->input_log, &SZ) != 1) {
     318           0 :                         TRC_CRITICAL(GDK, "read failed\n");
     319           0 :                         return LOG_EOF;
     320             :                 }
     321       18247 :                 sz = (size_t) SZ;
     322       18247 :                 char *buf = lg->rbuf;
     323       18247 :                 if (lg->rbufsize < sz) {
     324           2 :                         if (!(buf = GDKrealloc(lg->rbuf, sz))) {
     325           0 :                                 TRC_CRITICAL(GDK, "couldn't grow string buffer\n");
     326           0 :                                 return LOG_ERR;
     327             :                         }
     328           2 :                         lg->rbuf = buf;
     329           2 :                         lg->rbufsize = sz;
     330             :                 }
     331             : 
     332       18247 :                 if (mnstr_read(lg->input_log, buf, sz, 1) != 1) {
     333           0 :                         TRC_CRITICAL(GDK, "read failed\n");
     334           0 :                         return LOG_EOF;
     335             :                 }
     336             :                 /* handle strings */
     337             :                 char *t = buf;
     338             :                 /* chunked */
     339             : #define CHUNK_SIZE 1024
     340             :                 char *strings[CHUNK_SIZE];
     341             :                 int cur = 0;
     342             : 
     343       99189 :                 for (; nr > 0 && res == LOG_OK && t < (buf + sz); nr--) {
     344       80942 :                         strings[cur++] = t;
     345       80942 :                         if (cur == CHUNK_SIZE &&
     346           6 :                             b &&
     347           6 :                             BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
     348           0 :                                 TRC_CRITICAL(GDK, "append to string bat failed\n");
     349           0 :                                 res = LOG_ERR;
     350             :                         }
     351       80942 :                         if (cur == CHUNK_SIZE)
     352          35 :                                 cur = 0;
     353             :                         /* find next */
     354     5804390 :                         while (*t)
     355     5723448 :                                 t++;
     356       80942 :                         t++;
     357             :                 }
     358       18247 :                 if (cur &&
     359         489 :                     b &&
     360         489 :                     BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
     361           0 :                         TRC_CRITICAL(GDK, "append to string bat failed\n");
     362           0 :                         res = LOG_ERR;
     363             :                 }
     364             :         }
     365             :         return res;
     366             : }
     367             : 
     368             : 
     369             : struct offset {
     370             :         lng os;           /* offset within source BAT in logfile */
     371             :         lng nr;           /* number of values to be copied */
     372             :         lng od;           /* offset within destination BAT in database */
     373             : };
     374             : 
     375             : static log_return
     376      388656 : log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, BAT **cands)
     377             : {
     378      388656 :         log_return res = LOG_OK;
     379      388656 :         lng nr, pnr;
     380      388656 :         bte type_id = -1;
     381      388656 :         int tpe;
     382             : 
     383      388656 :         assert(!lg->inmemory);
     384      388656 :         TRC_DEBUG(WAL, "found %d %s", id, l->flag == LOG_UPDATE ? "update" : "update_buld");
     385             : 
     386      777312 :         if (mnstr_readLng(lg->input_log, &nr) != 1 ||
     387      388656 :             mnstr_read(lg->input_log, &type_id, 1, 1) != 1) {
     388           0 :                 TRC_CRITICAL(GDK, "read failed\n");
     389           0 :                 return LOG_EOF;
     390             :         }
     391             : 
     392      388656 :         pnr = nr;
     393      388656 :         tpe = find_type_nr(lg, type_id);
     394      388656 :         if (tpe >= 0) {
     395      388656 :                 BAT *uid = NULL;
     396      388656 :                 BAT *r = NULL;
     397      388656 :                 void *(*rt)(ptr, size_t *, stream *, size_t) = BATatoms[tpe].atomRead;
     398      388656 :                 lng offset;
     399             : 
     400      388656 :                 assert(nr <= (lng) BUN_MAX);
     401      388656 :                 if (!lg->flushing && l->flag == LOG_UPDATE) {
     402           0 :                         uid = COLnew(0, TYPE_oid, (BUN) nr, PERSISTENT);
     403           0 :                         if (uid == NULL) {
     404           0 :                                 TRC_CRITICAL(GDK, "creating bat failed\n");
     405       27445 :                                 return LOG_ERR;
     406             :                         }
     407             :                 }
     408             : 
     409      388656 :                 if (l->flag == LOG_UPDATE_CONST) {
     410      123950 :                         if (mnstr_readLng(lg->input_log, &offset) != 1) {
     411           0 :                                 TRC_CRITICAL(GDK, "read failed\n");
     412           0 :                                 return LOG_EOF;
     413             :                         }
     414      123950 :                         if (cands) {
     415             :                                 /* This const range actually represents a segment of candidates corresponding to updated bat entries */
     416             : 
     417       27445 :                                 if (BATcount(*cands) == 0 || lg->flushing) {
     418             :                                         /* when flushing, we only need the offset and count of the last segment of inserts. */
     419       27394 :                                         assert((*cands)->ttype == TYPE_void);
     420       27394 :                                         BATtseqbase(*cands, (oid) offset);
     421       27394 :                                         BATsetcount(*cands, (BUN) nr);
     422          51 :                                 } else if (!lg->flushing) {
     423          51 :                                         assert(BATcount(*cands) > 0);
     424          51 :                                         BAT *dense = BATdense(0, (oid) offset, (BUN) nr);
     425          51 :                                         BAT *newcands = NULL;
     426          51 :                                         if (!dense) {
     427           0 :                                                 TRC_CRITICAL(GDK, "creating bat failed\n");
     428           0 :                                                 res = LOG_ERR;
     429          51 :                                         } else if ((*cands)->ttype == TYPE_void) {
     430          11 :                                                 if ((newcands = BATmergecand(*cands, dense))) {
     431          11 :                                                         BBPreclaim(*cands);
     432          11 :                                                         *cands = newcands;
     433             :                                                 } else {
     434           0 :                                                         TRC_CRITICAL(GDK, "creating bat failed\n");
     435           0 :                                                         res = LOG_ERR;
     436             :                                                 }
     437             :                                         } else {
     438          40 :                                                 assert((*cands)->ttype == TYPE_oid);
     439          40 :                                                 assert(BATcount(*cands) > 0);
     440          40 :                                                 if (BATappend(*cands, dense, NULL, true) != GDK_SUCCEED) {
     441           0 :                                                         TRC_CRITICAL(GDK, "appending to bat failed\n");
     442           0 :                                                         res = LOG_ERR;
     443             :                                                 }
     444             :                                         }
     445          51 :                                         BBPreclaim(dense);
     446             :                                 }
     447             : 
     448             :                                 /* We have to read the value to update the read cursor */
     449       27445 :                                 size_t tlen = lg->rbufsize;
     450       27445 :                                 void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
     451       27445 :                                 if (t == NULL) {
     452           0 :                                         TRC_CRITICAL(GDK, "read failed\n");
     453           0 :                                         res = LOG_EOF;
     454             :                                 }
     455       27445 :                                 return res;
     456             :                         }
     457             :                 }
     458             : 
     459      361211 :                 if (!lg->flushing) {
     460        2250 :                         r = COLnew(0, tpe, (BUN) nr, PERSISTENT);
     461        2250 :                         if (r == NULL) {
     462           0 :                                 if (uid)
     463           0 :                                         BBPreclaim(uid);
     464           0 :                                 return LOG_ERR;
     465             :                         }
     466             :                 }
     467             : 
     468      361211 :                 if (l->flag == LOG_UPDATE_CONST) {
     469       96505 :                         size_t tlen = lg->rbufsize;
     470       96505 :                         void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
     471       96505 :                         if (t == NULL) {
     472           0 :                                 TRC_CRITICAL(GDK, "read failed\n");
     473           0 :                                 res = LOG_EOF;
     474             :                         } else {
     475       96505 :                                 lg->rbuf = t;
     476       96505 :                                 lg->rbufsize = tlen;
     477       96505 :                                 if (r) {
     478       13768 :                                         for (BUN p = 0; p < (BUN) nr; p++) {
     479       12756 :                                                 if (BUNappend(r, t, true) != GDK_SUCCEED) {
     480           0 :                                                         TRC_CRITICAL(GDK, "append to bat failed\n");
     481           0 :                                                         res = LOG_ERR;
     482             :                                                 }
     483             :                                         }
     484             :                                 }
     485             :                         }
     486      264706 :                 } else if (l->flag == LOG_UPDATE_BULK) {
     487      264688 :                         if (mnstr_readLng(lg->input_log, &offset) != 1) {
     488           0 :                                 if (r)
     489           0 :                                         BBPreclaim(r);
     490           0 :                                 TRC_CRITICAL(GDK, "read failed\n");
     491           0 :                                 return LOG_EOF;
     492             :                         }
     493      264688 :                         if (tpe == TYPE_msk) {
     494           0 :                                 if (r) {
     495           0 :                                         if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
     496           0 :                                                 BATsetcount(r, (BUN) nr);
     497             :                                         else {
     498           0 :                                                 TRC_CRITICAL(GDK, "read failed\n");
     499           0 :                                                 res = LOG_EOF;
     500             :                                         }
     501             :                                 } else {
     502           0 :                                         size_t tlen = lg->rbufsize / sizeof(int);
     503           0 :                                         size_t cnt = 0, snr = (size_t) nr;
     504           0 :                                         snr = (snr + 31) / 32;
     505           0 :                                         assert(tlen);
     506           0 :                                         for (; res == LOG_OK && snr > 0; snr -= cnt) {
     507           0 :                                                 cnt = snr > tlen ? tlen : snr;
     508           0 :                                                 if (!mnstr_readIntArray(lg->input_log, lg->rbuf, cnt)) {
     509           0 :                                                         TRC_CRITICAL(GDK, "read failed\n");
     510           0 :                                                         res = LOG_EOF;
     511             :                                                 }
     512             :                                         }
     513             :                                 }
     514             :                         } else {
     515      264688 :                                 if (!ATOMvarsized(tpe)) {
     516      246106 :                                         size_t cnt = 0, snr = (size_t) nr;
     517      246106 :                                         size_t tlen = lg->rbufsize / ATOMsize(tpe), ntlen = lg->rbufsize;
     518      246106 :                                         assert(tlen);
     519             :                                         /* read in chunks of max
     520             :                                          * BUFSIZE/width rows */
     521      504883 :                                         for (; res == LOG_OK && snr > 0; snr -= cnt) {
     522      258777 :                                                 cnt = snr > tlen ? tlen : snr;
     523      258777 :                                                 void *t = rt(lg->rbuf, &ntlen, lg->input_log, cnt);
     524             : 
     525      258777 :                                                 if (t == NULL) {
     526             :                                                         res = LOG_EOF;
     527             :                                                         break;
     528             :                                                 }
     529      258777 :                                                 assert(t == lg->rbuf);
     530      258777 :                                                 if (r && BUNappendmulti(r, t, cnt, true) != GDK_SUCCEED) {
     531           0 :                                                         TRC_CRITICAL(GDK, "append to bat failed\n");
     532           0 :                                                         res = LOG_ERR;
     533             :                                                 }
     534             :                                         }
     535       18582 :                                 } else if (tpe == TYPE_str) {
     536             :                                         /* efficient string */
     537       18000 :                                         res = string_reader(lg, r, nr);
     538             :                                 } else {
     539        1206 :                                         for (; res == LOG_OK && nr > 0; nr--) {
     540         624 :                                                 size_t tlen = lg->rbufsize;
     541         624 :                                                 void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
     542             : 
     543         624 :                                                 if (t == NULL) {
     544             :                                                         /* see if failure was due to
     545             :                                                          * malloc or something less
     546             :                                                          * serious (in the current
     547             :                                                          * context) */
     548           0 :                                                         if (strstr(GDKerrbuf, "alloc") == NULL)
     549             :                                                                 res = LOG_EOF;
     550             :                                                         else
     551           0 :                                                                 res = LOG_ERR;
     552           0 :                                                         TRC_CRITICAL(GDK, "read failed\n");
     553             :                                                 } else {
     554         624 :                                                         lg->rbuf = t;
     555         624 :                                                         lg->rbufsize = tlen;
     556         624 :                                                         if (r && BUNappend(r, t, true) != GDK_SUCCEED) {
     557           0 :                                                                 TRC_CRITICAL(GDK, "append to bat failed\n");
     558           0 :                                                                 res = LOG_ERR;
     559             :                                                         }
     560             :                                                 }
     561             :                                         }
     562             :                                 }
     563             :                         }
     564             :                 } else {
     565          18 :                         void *(*rh)(ptr, size_t *, stream *, size_t) = BATatoms[TYPE_oid].atomRead;
     566          18 :                         void *hv = ATOMnil(TYPE_oid);
     567          18 :                         offset = 0;
     568             : 
     569          18 :                         if (hv == NULL) {
     570           0 :                                 TRC_CRITICAL(GDK, "read failed\n");
     571           0 :                                 res = LOG_EOF;
     572             :                         }
     573        1071 :                         for (; res == LOG_OK && nr > 0; nr--) {
     574        1053 :                                 size_t hlen = sizeof(oid);
     575        1053 :                                 void *h = rh(hv, &hlen, lg->input_log, 1);
     576        1053 :                                 assert(hlen == sizeof(oid));
     577        1053 :                                 assert(h == hv);
     578        1053 :                                 if ((uid && BUNappend(uid, h, true) != GDK_SUCCEED)) {
     579           0 :                                         TRC_CRITICAL(GDK, "append to bat failed\n");
     580           0 :                                         res = LOG_ERR;
     581             :                                 }
     582             :                         }
     583          18 :                         nr = pnr;
     584          18 :                         if (tpe == TYPE_msk) {
     585           0 :                                 if (r) {
     586           0 :                                         if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
     587           0 :                                                 BATsetcount(r, (BUN) nr);
     588             :                                         else {
     589           0 :                                                 TRC_CRITICAL(GDK, "read failed\n");
     590           0 :                                                 res = LOG_EOF;
     591             :                                         }
     592             :                                 } else {
     593           0 :                                         for (lng i = 0; i < nr; i += 32) {
     594           0 :                                                 int v;
     595           0 :                                                 switch (mnstr_readInt(lg->input_log, &v)) {
     596           0 :                                                 case 1:
     597           0 :                                                         continue;
     598             :                                                 case 0:
     599             :                                                         res = LOG_EOF;
     600             :                                                         break;
     601             :                                                 default:
     602             :                                                         res = LOG_ERR;
     603             :                                                         break;
     604             :                                                 }
     605           0 :                                                 TRC_CRITICAL(GDK, "read failed\n");
     606           0 :                                                 break;
     607             :                                         }
     608             :                                 }
     609          18 :                         } else if (tpe == TYPE_str) {
     610             :                                 /* efficient string */
     611           6 :                                 res = string_reader(lg, r, nr);
     612             :                         } else {
     613          58 :                                 for (; res == LOG_OK && nr > 0; nr--) {
     614          46 :                                         size_t tlen = lg->rbufsize;
     615          46 :                                         void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
     616             : 
     617          46 :                                         if (t == NULL) {
     618           0 :                                                 if (strstr(GDKerrbuf, "malloc") == NULL)
     619             :                                                         res = LOG_EOF;
     620             :                                                 else
     621           0 :                                                         res = LOG_ERR;
     622           0 :                                                 TRC_CRITICAL(GDK, "read failed\n");
     623             :                                         } else {
     624          46 :                                                 lg->rbuf = t;
     625          46 :                                                 lg->rbufsize = tlen;
     626          46 :                                                 if ((r && BUNappend(r, t, true) != GDK_SUCCEED)) {
     627           0 :                                                         TRC_CRITICAL(GDK, "append to bat failed\n");
     628           0 :                                                         res = LOG_ERR;
     629             :                                                 }
     630             :                                         }
     631             :                                 }
     632             :                         }
     633          18 :                         GDKfree(hv);
     634             :                 }
     635             : 
     636      361211 :                 if (res == LOG_OK) {
     637      361211 :                         if (tr_grow(tr) == GDK_SUCCEED) {
     638      361211 :                                 tr->changes[tr->nr].type = l->flag;
     639      361211 :                                 if (l->flag == LOG_UPDATE_BULK && offset == -1) {
     640      136057 :                                         assert(cands);  /* bat r is part of a group of bats logged together. */
     641      136057 :                                         struct canditer ci;
     642      136057 :                                         canditer_init(&ci, NULL, *cands);
     643      136057 :                                         const oid first = canditer_peek(&ci);
     644      136057 :                                         const oid last = canditer_last(&ci);
     645      136057 :                                         offset = (lng) first;
     646      136057 :                                         pnr = (lng) (last - first) + 1;
     647      136057 :                                         if (!lg->flushing) {
     648        1131 :                                                 assert(uid == NULL);
     649        1131 :                                                 uid = *cands;
     650        1131 :                                                 BBPfix((*cands)->batCacheid);
     651        1131 :                                                 tr->changes[tr->nr].type = LOG_UPDATE;
     652             :                                         }
     653             :                                 }
     654      361211 :                                 if (l->flag == LOG_UPDATE_CONST) {
     655       96505 :                                         assert(!cands); /* TODO: This might change in the future. */
     656       96505 :                                         tr->changes[tr->nr].type = LOG_UPDATE_BULK;
     657             :                                 }
     658      361211 :                                 tr->changes[tr->nr].nr = pnr;
     659      361211 :                                 tr->changes[tr->nr].tt = tpe;
     660      361211 :                                 tr->changes[tr->nr].cid = id;
     661      361211 :                                 tr->changes[tr->nr].offset = offset;
     662      361211 :                                 tr->changes[tr->nr].b = r;
     663      361211 :                                 tr->changes[tr->nr].uid = uid;
     664      361211 :                                 tr->nr++;
     665             :                         } else {
     666           0 :                                 TRC_CRITICAL(GDK, "memory allocation failed\n");
     667           0 :                                 res = LOG_ERR;
     668             :                         }
     669             :                 }
     670      361211 :                 if (res != LOG_OK) {
     671           0 :                         if (r)
     672           0 :                                 BBPreclaim(r);
     673           0 :                         if (cands && uid)
     674           0 :                                 BBPunfix((*cands)->batCacheid);
     675           0 :                         else if (uid)
     676           0 :                                 BBPreclaim(uid);
     677             :                 }
     678             :         } else {
     679             :                 /* bat missing ERROR or ignore ? currently error. */
     680           0 :                 TRC_CRITICAL(GDK, "unknown type\n");
     681           0 :                 res = LOG_ERR;
     682             :         }
     683             :         return res;
     684             : }
     685             : 
     686             : 
     687             : static gdk_return
     688      558077 : la_bat_update_count(logger *lg, log_id id, lng cnt, int tid)
     689             : {
     690      558077 :         BATiter cni = bat_iterator_nolock(lg->catalog_id);
     691             : 
     692      558077 :         if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
     693      558077 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     694      558077 :                 BUN p, cp = BUN_NONE;
     695             : 
     696     2153274 :                 HASHloop_int(cni, cni.b->thash, p, &id) {
     697     1277099 :                         lng lid = *(lng *) Tloc(lg->catalog_lid, p);
     698             : 
     699     1277099 :                         if (lid != lng_nil && lid <= tid)
     700             :                                 break;
     701             :                         cp = p;
     702             :                 }
     703      558077 :                 if (cp != BUN_NONE) {
     704      557879 :                         lng ocnt = *(lng *) Tloc(lg->catalog_cnt, cp);
     705      557879 :                         assert(lg->catalog_cnt->hseqbase == 0);
     706      557879 :                         if (ocnt < cnt && BUNreplace(lg->catalog_cnt, cp, &cnt, false) != GDK_SUCCEED) {
     707           0 :                                 MT_rwlock_rdunlock(&cni.b->thashlock);
     708           0 :                                 return GDK_FAIL;
     709             :                         }
     710             :                 }
     711      558077 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     712      558077 :                 return GDK_SUCCEED;
     713             :         }
     714             :         return GDK_FAIL;
     715             : }
     716             : 
     717             : static gdk_return
     718      361211 : la_bat_updates(logger *lg, logaction *la, int tid)
     719             : {
     720      361211 :         log_bid bid = internal_find_bat(lg, la->cid, tid);
     721      361211 :         BAT *b = NULL;
     722             : 
     723      361211 :         if (bid < 0)
     724             :                 return GDK_FAIL;
     725      361211 :         if (!bid) {
     726             :                 /* object already gone, nothing needed */
     727             :                 return GDK_SUCCEED;
     728             :         }
     729             : 
     730      361206 :         if (!lg->flushing) {
     731        2250 :                 b = BATdescriptor(bid);
     732        2250 :                 if (b == NULL)
     733             :                         return GDK_FAIL;
     734             :         }
     735      361206 :         BUN cnt = 0;
     736      361206 :         if (la->type == LOG_UPDATE_BULK) {
     737      360057 :                 if (!lg->flushing) {
     738        1119 :                         cnt = BATcount(b);
     739        1119 :                         int is_msk = (b->ttype == TYPE_msk);
     740             :                         /* handle offset 0 ie clear */
     741        1119 :                         if ( /* DISABLES CODE */ (0) && la->offset == 0 && cnt)
     742             :                                 BATclear(b, true);
     743             :                         /* handle offset */
     744        1119 :                         if (cnt <= (BUN) la->offset) {
     745         249 :                                 msk t = 1;
     746         249 :                                 if (cnt < (BUN) la->offset) {     /* insert nils */
     747           0 :                                         const void *tv = (is_msk) ? &t : ATOMnilptr(b->ttype);
     748           0 :                                         lng i, d = la->offset - BATcount(b);
     749           0 :                                         for (i = 0; i < d; i++) {
     750           0 :                                                 if (BUNappend(b, tv, true) != GDK_SUCCEED) {
     751           0 :                                                         logbat_destroy(b);
     752           0 :                                                         return GDK_FAIL;
     753             :                                                 }
     754             :                                         }
     755             :                                 }
     756         249 :                                 if (BATcount(b) == (BUN) la->offset && BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
     757           0 :                                         logbat_destroy(b);
     758           0 :                                         return GDK_FAIL;
     759             :                                 }
     760             :                         } else {
     761         870 :                                 BATiter vi = bat_iterator(la->b);
     762         870 :                                 BUN p, q;
     763             : 
     764        4845 :                                 for (p = 0, q = (BUN) la->offset; p < (BUN) la->nr; p++, q++) {
     765        5816 :                                         const void *t = BUNtail(vi, p);
     766             : 
     767        3975 :                                         if (q < cnt) {
     768        3974 :                                                 if (BUNreplace(b, q, t, true) != GDK_SUCCEED) {
     769           0 :                                                         logbat_destroy(b);
     770           0 :                                                         bat_iterator_end(&vi);
     771           0 :                                                         return GDK_FAIL;
     772             :                                                 }
     773             :                                         } else {
     774           1 :                                                 if (BUNappend(b, t, true) != GDK_SUCCEED) {
     775           0 :                                                         logbat_destroy(b);
     776           0 :                                                         bat_iterator_end(&vi);
     777           0 :                                                         return GDK_FAIL;
     778             :                                                 }
     779             :                                         }
     780             :                                 }
     781         870 :                                 bat_iterator_end(&vi);
     782             :                         }
     783             :                 }
     784        1149 :         } else if (la->type == LOG_UPDATE) {
     785        1149 :                 if (!lg->flushing && BATupdate(b, la->uid, la->b, true) != GDK_SUCCEED) {
     786             :                         return GDK_FAIL;
     787             :                 }
     788             :         }
     789      361206 :         cnt = (BUN) (la->offset + la->nr);
     790      361206 :         if (la_bat_update_count(lg, la->cid, cnt, tid) != GDK_SUCCEED) {
     791           0 :                 if (b)
     792           0 :                         logbat_destroy(b);
     793           0 :                 return GDK_FAIL;
     794             :         }
     795      361206 :         if (b)
     796        2250 :                 logbat_destroy(b);
     797             :         return GDK_SUCCEED;
     798             : }
     799             : 
     800             : static log_return
     801       11276 : log_read_destroy(logger *lg, trans *tr, log_id id)
     802             : {
     803       11276 :         (void) lg;
     804       11276 :         assert(!lg->inmemory);
     805       11276 :         if (tr_grow(tr) == GDK_SUCCEED) {
     806       11276 :                 tr->changes[tr->nr].type = LOG_DESTROY;
     807       11276 :                 tr->changes[tr->nr].cid = id;
     808       11276 :                 tr->nr++;
     809       11276 :                 return LOG_OK;
     810             :         }
     811           0 :         TRC_CRITICAL(GDK, "memory allocation failed\n");
     812           0 :         return LOG_ERR;
     813             : }
     814             : 
     815             : static gdk_return
     816          73 : la_bat_destroy(logger *lg, logaction *la, int tid)
     817             : {
     818          73 :         log_bid bid = internal_find_bat(lg, la->cid, tid);
     819             : 
     820          73 :         if (bid < 0)
     821             :                 return GDK_FAIL;
     822          73 :         if (!bid) {
     823             : #ifndef NDEBUG
     824           0 :                 GDKwarning("failed to find bid for object %d\n", la->cid);
     825             : #endif
     826           0 :                 return GDK_SUCCEED;
     827             :         }
     828          73 :         if (bid && log_del_bat(lg, bid) != GDK_SUCCEED)
     829             :                 return GDK_FAIL;
     830             :         return GDK_SUCCEED;
     831             : }
     832             : 
     833             : static log_return
     834      156533 : log_read_create(logger *lg, trans *tr, log_id id)
     835             : {
     836      156533 :         bte tt;
     837      156533 :         int tpe;
     838             : 
     839      156533 :         assert(!lg->inmemory);
     840      156533 :         TRC_DEBUG(WAL, "create %d", id);
     841             : 
     842      156533 :         if (mnstr_read(lg->input_log, &tt, 1, 1) != 1) {
     843           0 :                 TRC_CRITICAL(GDK, "read failed\n");
     844           0 :                 return LOG_EOF;
     845             :         }
     846             : 
     847      156533 :         tpe = find_type_nr(lg, tt);
     848             :         /* read create */
     849      156533 :         if (tr_grow(tr) == GDK_SUCCEED) {
     850      156533 :                 tr->changes[tr->nr].type = LOG_CREATE;
     851      156533 :                 tr->changes[tr->nr].tt = tpe;
     852      156533 :                 tr->changes[tr->nr].cid = id;
     853      156533 :                 tr->nr++;
     854      156533 :                 return LOG_OK;
     855             :         }
     856           0 :         TRC_CRITICAL(GDK, "memory allocation failed\n");
     857           0 :         return LOG_ERR;
     858             : }
     859             : 
     860             : static gdk_return
     861         277 : la_bat_create(logger *lg, logaction *la, int tid)
     862             : {
     863         277 :         BAT *b;
     864             : 
     865             :         /* formerly head column type, should be void */
     866         277 :         if ((b = COLnew(0, la->tt, BATSIZE, PERSISTENT)) == NULL)
     867             :                 return GDK_FAIL;
     868             : 
     869         277 :         if (la->tt < 0)
     870           0 :                 BATtseqbase(b, 0);
     871             : 
     872         554 :         if ((b = BATsetaccess(b, BAT_READ)) == NULL ||
     873         277 :             log_add_bat(lg, b, la->cid, tid) != GDK_SUCCEED) {
     874           0 :                 logbat_destroy(b);
     875           0 :                 return GDK_FAIL;
     876             :         }
     877         277 :         logbat_destroy(b);
     878         277 :         return GDK_SUCCEED;
     879             : }
     880             : 
     881             : static gdk_return
     882         223 : log_write_new_types(logger *lg, FILE *fp, bool append)
     883             : {
     884         223 :         bte id = 0;
     885             : 
     886             :         /* write types and insert into bats */
     887             :         /* first the fixed sized types */
     888        7133 :         for (int i = 0; i < GDKatomcnt; i++) {
     889        6910 :                 if (ATOMvarsized(i))
     890        1782 :                         continue;
     891        5128 :                 if (append) {
     892        5128 :                         lg->type_id[i] = id;
     893        5128 :                         lg->type_nr[id] = i;
     894             :                 }
     895        5128 :                 if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
     896             :                         return GDK_FAIL;
     897        5128 :                 id++;
     898             :         }
     899             :         /* second the var sized types */
     900             :         id = -127;              /* start after nil */
     901        7133 :         for (int i = 0; i < GDKatomcnt; i++) {
     902        6910 :                 if (!ATOMvarsized(i))
     903        5128 :                         continue;
     904        1782 :                 if (append) {
     905        1782 :                         lg->type_id[i] = id;
     906        1782 :                         lg->type_nr[256 + id] = i;
     907             :                 }
     908        1782 :                 if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
     909             :                         return GDK_FAIL;
     910        1782 :                 id++;
     911             :         }
     912             :         return GDK_SUCCEED;
     913             : }
     914             : 
     915             : #define TR_SIZE         1024
     916             : 
     917             : static trans *
     918       55780 : tr_create(trans *tr, int tid)
     919             : {
     920       55780 :         trans *ntr = GDKmalloc(sizeof(trans));
     921             : 
     922       55780 :         if (ntr == NULL)
     923             :                 return NULL;
     924       55780 :         ntr->tid = tid;
     925       55780 :         ntr->sz = TR_SIZE;
     926       55780 :         ntr->nr = 0;
     927       55780 :         ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
     928       55780 :         if (ntr->changes == NULL) {
     929           0 :                 GDKfree(ntr);
     930           0 :                 return NULL;
     931             :         }
     932       55780 :         ntr->tr = tr;
     933       55780 :         return ntr;
     934             : }
     935             : 
     936             : static gdk_return
     937      529020 : la_apply(logger *lg, logaction *c, int tid)
     938             : {
     939      529020 :         gdk_return ret = GDK_SUCCEED;
     940             : 
     941      529020 :         switch (c->type) {
     942      361211 :         case LOG_UPDATE_BULK:
     943             :         case LOG_UPDATE:
     944      361211 :                 ret = la_bat_updates(lg, c, tid);
     945      361211 :                 break;
     946      156533 :         case LOG_CREATE:
     947      156533 :                 if (!lg->flushing)
     948         277 :                         ret = la_bat_create(lg, c, tid);
     949             :                 break;
     950       11276 :         case LOG_DESTROY:
     951       11276 :                 if (!lg->flushing)
     952          73 :                         ret = la_bat_destroy(lg, c, tid);
     953             :                 break;
     954             :         default:
     955           0 :                 MT_UNREACHABLE();
     956             :         }
     957      529020 :         return ret;
     958             : }
     959             : 
     960             : static void
     961      529020 : la_destroy(logaction *c)
     962             : {
     963      529020 :         if ((c->type == LOG_UPDATE || c->type == LOG_UPDATE_BULK) && c->b)
     964        2250 :                 logbat_destroy(c->b);
     965      529020 :         if (c->type == LOG_UPDATE && c->uid)
     966        1131 :                 logbat_destroy(c->uid);
     967      529020 : }
     968             : 
     969             : static gdk_return
     970      529020 : tr_grow(trans *tr)
     971             : {
     972      529020 :         if (tr->nr == tr->sz) {
     973           9 :                 logaction *changes;
     974           9 :                 tr->sz <<= 1;
     975           9 :                 changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
     976           9 :                 if (changes == NULL)
     977             :                         return GDK_FAIL;
     978           9 :                 tr->changes = changes;
     979             :         }
     980             :         /* cleanup the next */
     981      529020 :         tr->changes[tr->nr].b = NULL;
     982      529020 :         return GDK_SUCCEED;
     983             : }
     984             : 
     985             : static trans *
     986       55780 : tr_destroy(trans *tr)
     987             : {
     988       55780 :         trans *r = tr->tr;
     989             : 
     990       55780 :         GDKfree(tr->changes);
     991       55780 :         GDKfree(tr);
     992       55780 :         return r;
     993             : }
     994             : 
     995             : static trans *
     996           0 : tr_abort_(logger *lg, trans *tr, int s)
     997             : {
     998           0 :         int i;
     999             : 
    1000           0 :         (void) lg;
    1001             : 
    1002           0 :         TRC_DEBUG(WAL, "abort");
    1003             : 
    1004           0 :         for (i = s; i < tr->nr; i++)
    1005           0 :                 la_destroy(&tr->changes[i]);
    1006           0 :         return tr_destroy(tr);
    1007             : }
    1008             : 
    1009             : static trans *
    1010           0 : tr_abort(logger *lg, trans *tr)
    1011             : {
    1012           0 :         return tr_abort_(lg, tr, 0);
    1013             : }
    1014             : 
    1015             : static trans *
    1016       55780 : tr_commit(logger *lg, trans *tr)
    1017             : {
    1018       55780 :         int i;
    1019             : 
    1020       55780 :         TRC_DEBUG(WAL, "commit");
    1021             : 
    1022      584800 :         for (i = 0; i < tr->nr; i++) {
    1023      529020 :                 if (la_apply(lg, &tr->changes[i], tr->tid) != GDK_SUCCEED) {
    1024           0 :                         TRC_CRITICAL(GDK, "aborting transaction\n");
    1025           0 :                         do {
    1026           0 :                                 tr = tr_abort_(lg, tr, i);
    1027           0 :                         } while (tr != NULL);
    1028             :                         return (trans *) -1;
    1029             :                 }
    1030      529020 :                 la_destroy(&tr->changes[i]);
    1031             :         }
    1032       55780 :         lg->saved_tid = tr->tid;
    1033       55780 :         return tr_destroy(tr);
    1034             : }
    1035             : 
    1036             : static gdk_return
    1037         112 : log_read_types_file(logger *lg, FILE *fp)
    1038             : {
    1039         112 :         int id = 0;
    1040         112 :         char atom_name[IDLENGTH];
    1041             : 
    1042             :         /* scanf should use IDLENGTH somehow */
    1043        3540 :         while (fscanf(fp, "%d,%63s\n", &id, atom_name) == 2) {
    1044        3428 :                 int i = ATOMindex(atom_name);
    1045             : 
    1046        3428 :                 if (id < -127 || id > 127 || i < 0) {
    1047           0 :                         GDKerror("unknown type in log file '%s'\n", atom_name);
    1048           0 :                         return GDK_FAIL;
    1049             :                 }
    1050        3428 :                 lg->type_id[i] = (int8_t) id;
    1051        3428 :                 lg->type_nr[id < 0 ? 256 + id : id] = i;
    1052             :         }
    1053             :         return GDK_SUCCEED;
    1054             : }
    1055             : 
    1056             : 
    1057             : gdk_return
    1058         223 : log_create_types_file(logger *lg, const char *filename, bool append)
    1059             : {
    1060         223 :         FILE *fp;
    1061             : 
    1062         223 :         if ((fp = MT_fopen(filename, "w")) == NULL) {
    1063           0 :                 GDKerror("cannot create log file %s\n", filename);
    1064           0 :                 return GDK_FAIL;
    1065             :         }
    1066         223 :         if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
    1067           0 :                 fclose(fp);
    1068           0 :                 GDKerror("writing log file %s failed", filename);
    1069           0 :                 if (MT_remove(filename) < 0)
    1070           0 :                         GDKsyserror("remove %s failed\n", filename);
    1071           0 :                 return GDK_FAIL;
    1072             :         }
    1073             : 
    1074         223 :         if (log_write_new_types(lg, fp, append) != GDK_SUCCEED) {
    1075           0 :                 fclose(fp);
    1076           0 :                 GDKerror("writing log file %s failed", filename);
    1077           0 :                 if (MT_remove(filename) < 0)
    1078           0 :                         GDKsyserror("remove %s failed\n", filename);
    1079           0 :                 return GDK_FAIL;
    1080             :         }
    1081         223 :         if (fflush(fp) < 0 || (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)
    1082             : #if defined(_MSC_VER)
    1083             :                                && _commit(_fileno(fp)) < 0
    1084             : #elif defined(HAVE_FDATASYNC)
    1085           0 :                                && fdatasync(fileno(fp)) < 0
    1086             : #elif defined(HAVE_FSYNC)
    1087             :                                && fsync(fileno(fp)) < 0
    1088             : #endif
    1089             :             )) {
    1090           0 :                 GDKsyserror("flushing log file %s failed", filename);
    1091           0 :                 fclose(fp);
    1092           0 :                 if (MT_remove(filename) < 0)
    1093           0 :                         GDKsyserror("remove %s failed\n", filename);
    1094           0 :                 return GDK_FAIL;
    1095             :         }
    1096         223 :         if (fclose(fp) < 0) {
    1097           0 :                 GDKsyserror("closing log file %s failed", filename);
    1098           0 :                 if (MT_remove(filename) < 0)
    1099           0 :                         GDKsyserror("remove %s failed\n", filename);
    1100           0 :                 return GDK_FAIL;
    1101             :         }
    1102             :         return GDK_SUCCEED;
    1103             : }
    1104             : 
    1105             : #define rotation_lock(lg)       MT_lock_set(&(lg)->rotation_lock)
    1106             : #define rotation_unlock(lg)     MT_lock_unset(&(lg)->rotation_lock)
    1107             : 
    1108             : static gdk_return
    1109       11994 : log_open_output(logger *lg)
    1110             : {
    1111       11994 :         logged_range *new_range = (logged_range *) GDKmalloc(sizeof(logged_range));
    1112             : 
    1113       11994 :         if (!new_range) {
    1114           0 :                 TRC_CRITICAL(GDK, "allocation failure\n");
    1115           0 :                 return GDK_FAIL;
    1116             :         }
    1117       11994 :         if (!LOG_DISABLED(lg)) {
    1118       11993 :                 char id[32];
    1119       11993 :                 char *filename;
    1120             : 
    1121       11993 :                 if (snprintf(id, sizeof(id), LLFMT, lg->id) >= (int) sizeof(id)) {
    1122           0 :                         TRC_CRITICAL(GDK, "filename is too large\n");
    1123           0 :                         GDKfree(new_range);
    1124           0 :                         return GDK_FAIL;
    1125             :                 }
    1126       11993 :                 if ((filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id)) == NULL) {
    1127           0 :                         TRC_CRITICAL(GDK, "allocation failure\n");
    1128           0 :                         GDKfree(new_range);
    1129           0 :                         return GDK_FAIL;
    1130             :                 }
    1131             : 
    1132       11993 :                 TRC_INFO(WAL, "opening %s.%s", LOGFILE, id);
    1133       11993 :                 new_range->output_log = open_wstream(filename);
    1134       11993 :                 if (new_range->output_log) {
    1135       11993 :                         short byteorder = 1234;
    1136       11993 :                         mnstr_write(new_range->output_log, &byteorder, sizeof(byteorder), 1);
    1137             :                 }
    1138             : 
    1139       11993 :                 if (new_range->output_log == NULL || mnstr_errnr(new_range->output_log) != MNSTR_NO__ERROR) {
    1140           0 :                         TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, mnstr_peek_error(NULL));
    1141           0 :                         close_stream(new_range->output_log);
    1142           0 :                         GDKfree(new_range);
    1143           0 :                         GDKfree(filename);
    1144           0 :                         return GDK_FAIL;
    1145             :                 }
    1146       11993 :                 GDKfree(filename);
    1147             :         }
    1148       11994 :         ATOMIC_INIT(&new_range->refcount, 1);
    1149       11994 :         ATOMIC_INIT(&new_range->last_ts, 0);
    1150       11994 :         ATOMIC_INIT(&new_range->flushed_ts, 0);
    1151       11994 :         ATOMIC_INIT(&new_range->drops, 0);
    1152       11994 :         new_range->id = lg->id;
    1153       11994 :         new_range->next = NULL;
    1154       11994 :         logged_range *current = lg->current;
    1155       11994 :         assert(current && current->next == NULL);
    1156       11994 :         new_range->cnt = current->cnt;
    1157       11994 :         current->next = new_range;
    1158       11994 :         lg->file_age = GDKusec();
    1159       11994 :         return GDK_SUCCEED;
    1160             : }
    1161             : 
    1162             : static inline void
    1163       12201 : log_close_input(logger *lg)
    1164             : {
    1165       12201 :         if (!lg->inmemory && lg->input_log) {
    1166       11754 :                 TRC_INFO(WAL, "closing input log %s", mnstr_name(lg->input_log));
    1167       11754 :                 close_stream(lg->input_log);
    1168             :         }
    1169       12201 :         lg->input_log = NULL;
    1170       12201 : }
    1171             : 
    1172             : static inline void
    1173         335 : log_close_output(logger *lg)
    1174             : {
    1175         335 :         if (!LOG_DISABLED(lg) && lg->current->output_log) {
    1176         334 :                 TRC_INFO(WAL, "closing output log %s", mnstr_name(lg->current->output_log));
    1177         334 :                 close_stream(lg->current->output_log);
    1178             :         }
    1179         335 :         lg->current->output_log = NULL;
    1180         335 : }
    1181             : 
    1182             : static gdk_return
    1183       11866 : log_open_input(logger *lg, const char *filename, bool *filemissing)
    1184             : {
    1185       11866 :         TRC_INFO(WAL, "opening input log %s", filename);
    1186       11866 :         lg->input_log = open_rstream(filename);
    1187             : 
    1188             :         /* if the file doesn't exist, there is nothing to be read back */
    1189       11866 :         if (lg->input_log == NULL || mnstr_errnr(lg->input_log) != MNSTR_NO__ERROR) {
    1190         112 :                 log_close_input(lg);
    1191         112 :                 *filemissing = true;
    1192         112 :                 return GDK_SUCCEED;
    1193             :         }
    1194       11754 :         short byteorder;
    1195       11754 :         switch (mnstr_read(lg->input_log, &byteorder, sizeof(byteorder), 1)) {
    1196           0 :         case -1:
    1197           0 :                 log_close_input(lg);
    1198           0 :                 TRC_CRITICAL(GDK, "read failed\n");
    1199           0 :                 return GDK_FAIL;
    1200           4 :         case 0:
    1201             :                 /* empty file is ok */
    1202           4 :                 log_close_input(lg);
    1203           4 :                 return GDK_SUCCEED;
    1204       11750 :         case 1:
    1205             :                 /* if not empty, must start with correct byte order mark */
    1206       11750 :                 if (byteorder != 1234) {
    1207           0 :                         TRC_CRITICAL(GDK, "incorrect byte order word in file %s\n", filename);
    1208           0 :                         log_close_input(lg);
    1209           0 :                         return GDK_FAIL;
    1210             :                 }
    1211             :                 break;
    1212             :         }
    1213             :         return GDK_SUCCEED;
    1214             : }
    1215             : 
    1216             : static log_return
    1217       11750 : log_read_transaction(logger *lg, uint32_t *updated, BUN maxupdated)
    1218             : {
    1219       11750 :         logformat l;
    1220       11750 :         trans *tr = NULL;
    1221       11750 :         log_return err = LOG_OK;
    1222       11750 :         bool ok = true;
    1223       11750 :         ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
    1224             : 
    1225       11750 :         (void) maxupdated;      /* only used inside assert() */
    1226             : 
    1227       11750 :         if (!lg->flushing)
    1228         177 :                 ATOMIC_AND(&GDKdebug, ~CHECKMASK);
    1229             : 
    1230       11750 :         BAT *cands = NULL;      /* used in case of LOG_BAT_GROUP */
    1231             : 
    1232      755954 :         while (err == LOG_OK && (ok = log_read_format(lg, &l))) {
    1233      744204 :                 if (l.flag == 0 && l.id == 0) {
    1234       11750 :                         err = LOG_EOF;
    1235             :                         break;
    1236             :                 }
    1237             : 
    1238      744204 :                 TRC_DEBUG_IF(WAL) {
    1239           0 :                         if (l.flag > 0 && l.flag != LOG_CLEAR &&
    1240             :                             l.flag < (bte) (sizeof(log_commands) / sizeof(log_commands[0])))
    1241           0 :                                 TRC_DEBUG_ENDIF(WAL, "%s %d", log_commands[(int) l.flag], l.id);
    1242             :                         else
    1243           0 :                                 TRC_DEBUG_ENDIF(WAL, "%d %d", l.flag, l.id);
    1244             :                 }
    1245      744204 :                 switch (l.flag) {
    1246      556465 :                 case LOG_UPDATE_CONST:
    1247             :                 case LOG_UPDATE_BULK:
    1248             :                 case LOG_UPDATE:
    1249             :                 case LOG_CREATE:
    1250             :                 case LOG_DESTROY:
    1251      556465 :                         if (updated && BAThash(lg->catalog_id) == GDK_SUCCEED) {
    1252      553626 :                                 BATiter cni = bat_iterator(lg->catalog_id);
    1253      553626 :                                 BUN p;
    1254      553626 :                                 BUN posnew = BUN_NONE;
    1255      553626 :                                 BUN posold = BUN_NONE;
    1256      553626 :                                 MT_rwlock_rdlock(&cni.b->thashlock);
    1257     4267704 :                                 HASHloop_int(cni, cni.b->thash, p, &l.id) {
    1258     3091731 :                                         lng lid = *(lng *) Tloc(lg->catalog_lid, p);
    1259     3091731 :                                         if (lid == lng_nil || lid > tr->tid)
    1260             :                                                 posnew = p;
    1261     1421259 :                                         else if (lid == tr->tid)
    1262     3430327 :                                                 posold = p;
    1263             :                                 }
    1264      553626 :                                 MT_rwlock_rdunlock(&cni.b->thashlock);
    1265      553626 :                                 bat_iterator_end(&cni);
    1266             :                                 /* Normally at this point, posnew is the
    1267             :                                  * location of the bat that this
    1268             :                                  * transaction is working on, and posold
    1269             :                                  * is the location of the previous
    1270             :                                  * version of the bat.  If LOG_CREATE,
    1271             :                                  * both are relevant, since the latter
    1272             :                                  * is the new bat, and the former is the
    1273             :                                  * to-be-destroyed bat.  For
    1274             :                                  * LOG_DESTROY, only posnew should be
    1275             :                                  * relevant, but for the other types, if
    1276             :                                  * the table is destroyed later in the
    1277             :                                  * same transaction, we need posold, and
    1278             :                                  * else (the normal case) we need
    1279             :                                  * posnew. */
    1280      553626 :                                 if (posnew != BUN_NONE) {
    1281      542416 :                                         assert(posnew < maxupdated);
    1282      542416 :                                         updated[posnew / 32] |= 1U << (posnew % 32);
    1283             :                                 }
    1284      553626 :                                 if ((l.flag == LOG_CREATE || posnew == BUN_NONE) && posold != BUN_NONE) {
    1285      163715 :                                         assert(posold < maxupdated);
    1286      163715 :                                         updated[posold / 32] |= 1U << (posold % 32);
    1287             :                                 }
    1288             :                         }
    1289             :                         break;
    1290             :                 default:
    1291             :                         /* do nothing */
    1292             :                         break;
    1293             :                 }
    1294             :                 /* the functions we call here can succeed (LOG_OK),
    1295             :                  * but they can also fail for two different reasons:
    1296             :                  * they can run out of input (LOG_EOF -- this is not
    1297             :                  * serious, we just abort the remaining transactions),
    1298             :                  * or some malloc or BAT update fails (LOG_ERR -- this
    1299             :                  * is serious, we must abort the complete process);
    1300             :                  * the latter failure causes the current function to
    1301             :                  * return GDK_FAIL */
    1302      744204 :                 switch (l.flag) {
    1303       55780 :                 case LOG_START:
    1304       55780 :                         assert(!lg->flushing || l.id <= lg->tid);
    1305       55780 :                         if (!lg->flushing && l.id > lg->tid)
    1306         136 :                                 lg->tid = l.id;      /* should only happen during initialization */
    1307       55780 :                         if ((tr = tr_create(tr, l.id)) == NULL) {
    1308           0 :                                 TRC_CRITICAL(GDK, "memory allocation failed\n");
    1309           0 :                                 err = LOG_ERR;
    1310           0 :                                 break;
    1311             :                         }
    1312       55780 :                         TRC_DEBUG(WAL, "tstart %d\n", tr->tid);
    1313             :                         break;
    1314       55780 :                 case LOG_END:
    1315       55780 :                         if (tr == NULL)
    1316             :                                 err = LOG_EOF;
    1317       55780 :                         else if (tr->tid != l.id)    /* abort record */
    1318           0 :                                 tr = tr_abort(lg, tr);
    1319             :                         else
    1320       55780 :                                 tr = tr_commit(lg, tr);
    1321             :                         break;
    1322        2683 :                 case LOG_SEQ:
    1323        2683 :                         err = log_read_seq(lg, &l);
    1324        2683 :                         break;
    1325      388656 :                 case LOG_UPDATE_CONST:
    1326             :                 case LOG_UPDATE_BULK:
    1327             :                 case LOG_UPDATE:
    1328      388656 :                         if (tr == NULL)
    1329             :                                 err = LOG_EOF;
    1330             :                         else {
    1331      613795 :                                 err = log_read_updates(lg, tr, &l, l.id, cands ? &cands : NULL);
    1332             :                         }
    1333             :                         break;
    1334      156533 :                 case LOG_CREATE:
    1335      156533 :                         if (tr == NULL)
    1336             :                                 err = LOG_EOF;
    1337             :                         else
    1338      156533 :                                 err = log_read_create(lg, tr, l.id);
    1339             :                         break;
    1340       11276 :                 case LOG_DESTROY:
    1341       11276 :                         if (tr == NULL)
    1342             :                                 err = LOG_EOF;
    1343             :                         else
    1344       11276 :                                 err = log_read_destroy(lg, tr, l.id);
    1345             :                         break;
    1346       73496 :                 case LOG_BAT_GROUP:
    1347       73496 :                         if (tr == NULL)
    1348             :                                 err = LOG_EOF;
    1349             :                         else {
    1350       73496 :                                 if (l.id > 0) {
    1351             :                                         /* START OF LOG_BAT_GROUP */
    1352       36748 :                                         cands = COLnew(0, TYPE_void, 0, SYSTRANS);
    1353       36748 :                                         if (!cands) {
    1354           0 :                                                 TRC_CRITICAL(GDK, "creating bat failed\n");
    1355           0 :                                                 err = LOG_ERR;
    1356             :                                         }
    1357       36748 :                                 } else if (cands == NULL) {
    1358             :                                         /* should have gone through the
    1359             :                                          * above option earlier */
    1360           0 :                                         TRC_CRITICAL(GDK, "unexpected error\n");
    1361           0 :                                         err = LOG_ERR;
    1362             :                                 } else {
    1363             :                                         /* END OF LOG_BAT_GROUP */
    1364       36748 :                                         BBPunfix(cands->batCacheid);
    1365       36748 :                                         cands = NULL;
    1366             :                                 }
    1367             :                         }
    1368             :                         break;
    1369           0 :                 default:
    1370           0 :                         TRC_CRITICAL(GDK, "unrecognized log entry %d", l.flag);
    1371           0 :                         err = LOG_ERR;
    1372             :                 }
    1373      744204 :                 if (tr == (trans *) -1) {
    1374             :                         /* message already generated by tr_commit */
    1375             :                         err = LOG_ERR;
    1376       11750 :                         tr = NULL;
    1377             :                         break;
    1378             :                 }
    1379             :         }
    1380       11750 :         while (tr) {
    1381           0 :                 TRC_WARNING(GDK, "aborting transaction\n");
    1382           0 :                 tr = tr_abort(lg, tr);
    1383             :         }
    1384       11750 :         if (!lg->flushing)
    1385         177 :                 ATOMIC_SET(&GDKdebug, dbg);
    1386             : 
    1387       11750 :         BBPreclaim(cands);
    1388       11750 :         if (!ok)
    1389       11750 :                 return LOG_EOF;
    1390             :         return err;
    1391             : }
    1392             : 
    1393             : static gdk_return
    1394         293 : log_readlog(logger *lg, const char *filename, bool *filemissing)
    1395             : {
    1396         293 :         log_return err = LOG_OK;
    1397         293 :         time_t t0, t1;
    1398         293 :         struct stat sb;
    1399             : 
    1400         293 :         assert(!lg->inmemory);
    1401             : 
    1402         293 :         TRC_INFO(WAL, "opening %s\n", filename);
    1403             : 
    1404         293 :         gdk_return res = log_open_input(lg, filename, filemissing);
    1405         293 :         if (!lg->input_log || res != GDK_SUCCEED)
    1406             :                 return res;
    1407         177 :         int fd;
    1408         177 :         if ((fd = getFileNo(lg->input_log)) < 0 || fstat(fd, &sb) < 0) {
    1409           0 :                 GDKsyserror("fstat on opened file %s failed\n", filename);
    1410           0 :                 log_close_input(lg);
    1411             :                 /* If the file could be opened, but fstat fails,
    1412             :                  * something weird is going on */
    1413           0 :                 return GDK_FAIL;
    1414             :         }
    1415         177 :         t0 = time(NULL);
    1416         177 :         TRC_INFO_IF(WAL) {
    1417           0 :                 TRC_INFO_ENDIF(WAL, "Start reading the write-ahead log '%s'\n", filename);
    1418           0 :                 GDKtracer_flush_buffer();
    1419             :         }
    1420         354 :         while (err != LOG_EOF && err != LOG_ERR) {
    1421         177 :                 t1 = time(NULL);
    1422         177 :                 if (t1 - t0 > 10) {
    1423           0 :                         lng fpos;
    1424           0 :                         t0 = t1;
    1425             :                         /* not more than once every 10 seconds */
    1426           0 :                         fpos = (lng) getfilepos(getFile(lg->input_log));
    1427           0 :                         TRC_INFO_IF(WAL) {
    1428           0 :                                 if (fpos >= 0) {
    1429           0 :                                         TRC_INFO_ENDIF(WAL, "still reading write-ahead log \"%s\" (%d%% done)\n",
    1430             :                                                        filename, (int) ((fpos * 100 + 50) / sb.st_size));
    1431           0 :                                         GDKtracer_flush_buffer();
    1432             :                                 }
    1433             :                         }
    1434             :                 }
    1435         177 :                 err = log_read_transaction(lg, NULL, 0);
    1436             :         }
    1437         177 :         log_close_input(lg);
    1438         177 :         lg->input_log = NULL;
    1439             : 
    1440             :         /* remaining transactions are not committed, ie abort */
    1441         177 :         TRC_INFO_IF(WAL) {
    1442           0 :                 TRC_INFO_ENDIF(WAL, "Finished reading the write-ahead log '%s'\n", filename);
    1443           0 :                 GDKtracer_flush_buffer();
    1444             :         }
    1445             :         /* we cannot distinguish errors from incomplete transactions
    1446             :          * (even if we would log aborts in the logs). So we simply
    1447             :          * abort and move to the next log file */
    1448         177 :         return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
    1449             : }
    1450             : 
    1451             : /*
    1452             :  * The log files are incrementally numbered, starting from 2. They are
    1453             :  * processed in the same sequence.
    1454             :  */
    1455             : static gdk_return
    1456         112 : log_readlogs(logger *lg, const char *filename)
    1457             : {
    1458         112 :         gdk_return res = GDK_SUCCEED;
    1459             : 
    1460         112 :         assert(!lg->inmemory);
    1461         112 :         TRC_DEBUG(WAL, "logger id is " LLFMT " last logger id is " LLFMT "\n", lg->id, lg->saved_id);
    1462             : 
    1463         112 :         char log_filename[FILENAME_MAX];
    1464         112 :         if (lg->saved_id >= lg->id) {
    1465         112 :                 bool filemissing = false;
    1466             : 
    1467         112 :                 lg->id = lg->saved_id + 1;
    1468         405 :                 while (res == GDK_SUCCEED && !filemissing) {
    1469         293 :                         if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
    1470           0 :                                 GDKerror("Logger filename path is too large\n");
    1471           0 :                                 return GDK_FAIL;
    1472             :                         }
    1473         293 :                         res = log_readlog(lg, log_filename, &filemissing);
    1474         293 :                         if (!filemissing) {
    1475         181 :                                 lg->saved_id++;
    1476         181 :                                 lg->id++;
    1477             :                         }
    1478             :                 }
    1479             :         }
    1480             :         return res;
    1481             : }
    1482             : 
    1483             : static gdk_return
    1484       11456 : log_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
    1485             : {
    1486       11456 :         TRC_DEBUG(WAL, "commit");
    1487             : 
    1488       11456 :         return bm_commit(lg, pending, updated, maxupdated);
    1489             : }
    1490             : 
    1491             : static gdk_return
    1492         112 : check_version(logger *lg, FILE *fp, const char *fn, const char *logdir, const char *filename, bool *needsnew)
    1493             : {
    1494         112 :         int version = 0;
    1495             : 
    1496         112 :         assert(!lg->inmemory);
    1497         112 :         if (fscanf(fp, "%6d", &version) != 1) {
    1498           0 :                 GDKerror("Could not read the version number from the file '%s/log'.\n", lg->dir);
    1499           0 :                 fclose(fp);
    1500           0 :                 return GDK_FAIL;
    1501             :         }
    1502         112 :         if (version < 52300) {       /* first CATALOG_VERSION for "new" log format */
    1503           0 :                 lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    1504           0 :                 lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    1505           0 :                 lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
    1506           0 :                 if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
    1507           0 :                         GDKerror("cannot create catalog bats");
    1508           0 :                         fclose(fp);
    1509           0 :                         return GDK_FAIL;
    1510             :                 }
    1511             :                 /* old_logger_load always closes fp */
    1512           0 :                 if (old_logger_load(lg, fn, logdir, fp, version, filename) != GDK_SUCCEED) {
    1513             :                         /*loads drop no longer needed catalog, snapshots bats */
    1514             :                         /*convert catalog_oid -> catalog_id (lng->int) */
    1515           0 :                         GDKerror("Incompatible database version %06d, "
    1516             :                                  "this server supports version %06d.\n%s",
    1517             :                                  version, lg->version,
    1518             :                                  version <
    1519             :                                  lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
    1520           0 :                         return GDK_FAIL;
    1521             :                 }
    1522           0 :                 *needsnew = false;      /* already written a new log file */
    1523           0 :                 return GDK_SUCCEED;
    1524         112 :         } else if (version != lg->version) {
    1525           0 :                 if (lg->prefuncp == NULL ||
    1526           0 :                     (*lg->prefuncp) (lg->funcdata, version, lg->version) != GDK_SUCCEED) {
    1527           0 :                         GDKerror("Incompatible database version %06d, "
    1528             :                                  "this server supports version %06d.\n%s",
    1529             :                                  version, lg->version,
    1530             :                                  version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
    1531           0 :                         fclose(fp);
    1532           0 :                         return GDK_FAIL;
    1533             :                 }
    1534           0 :                 *needsnew = true;       /* we need to write a new log file */
    1535             :         } else {
    1536         112 :                 lg->postfuncp = NULL;        /* don't call */
    1537         112 :                 *needsnew = false;      /* log file already up-to-date */
    1538             :         }
    1539         224 :         if (fgetc(fp) != '\n' ||        /* skip \n */
    1540         112 :             fgetc(fp) != '\n') {        /* skip \n */
    1541           0 :                 GDKerror("Badly formatted log file");
    1542           0 :                 fclose(fp);
    1543           0 :                 return GDK_FAIL;
    1544             :         }
    1545         112 :         if (log_read_types_file(lg, fp) != GDK_SUCCEED) {
    1546           0 :                 fclose(fp);
    1547           0 :                 return GDK_FAIL;
    1548             :         }
    1549         112 :         fclose(fp);
    1550         112 :         return GDK_SUCCEED;
    1551             : }
    1552             : 
    1553             : static BAT *
    1554         334 : bm_tids(BAT *b, BAT *d)
    1555             : {
    1556         334 :         BUN sz = BATcount(b);
    1557         334 :         BAT *tids = BATdense(0, 0, sz);
    1558             : 
    1559         334 :         if (tids == NULL)
    1560             :                 return NULL;
    1561             : 
    1562         334 :         if (BATcount(d)) {
    1563         334 :                 BAT *diff = BATdiff(tids, d, NULL, NULL, false, false, BUN_NONE);
    1564         334 :                 logbat_destroy(tids);
    1565         334 :                 tids = diff;
    1566             :         }
    1567             :         return tids;
    1568             : }
    1569             : 
    1570             : 
    1571             : static gdk_return
    1572        7634 : log_switch_bat(BAT *old, BAT *new, const char *fn, const char *name)
    1573             : {
    1574        7634 :         char bak[IDLENGTH];
    1575             : 
    1576        7634 :         if (BATmode(old, true) != GDK_SUCCEED) {
    1577           0 :                 GDKerror("cannot convert old %s to transient", name);
    1578           0 :                 return GDK_FAIL;
    1579             :         }
    1580        7634 :         if (strconcat_len(bak, sizeof(bak), fn, "_", name, NULL) >= sizeof(bak)) {
    1581           0 :                 GDKerror("name %s_%s too long\n", fn, name);
    1582           0 :                 return GDK_FAIL;
    1583             :         }
    1584        7634 :         if (BBPrename(old, NULL) != 0 || BBPrename(new, bak) != 0) {
    1585           0 :                 GDKerror("rename (%s) failed\n", bak);
    1586           0 :                 return GDK_FAIL;
    1587             :         }
    1588        7634 :         BBPretain(new->batCacheid);
    1589        7634 :         return GDK_SUCCEED;
    1590             : }
    1591             : 
    1592             : static gdk_return
    1593         336 : bm_get_counts(logger *lg)
    1594             : {
    1595         336 :         BUN p, q;
    1596         336 :         const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
    1597             : 
    1598       28780 :         BATloop(lg->catalog_bid, p, q) {
    1599       28444 :                 oid pos = p;
    1600       28444 :                 lng cnt = 0;
    1601       28444 :                 lng lid = lng_nil;
    1602             : 
    1603       28444 :                 if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
    1604       28444 :                         BAT *b = BBPquickdesc(bids[p]);
    1605       28444 :                         assert(b);
    1606       28444 :                         cnt = BATcount(b);
    1607             :                 } else {
    1608           0 :                         lid = BBP_desc(bids[p]) && log_find(lg->catalog_bid, lg->dcatalog, bids[p]) == BUN_NONE ? 1 : -1;
    1609             :                 }
    1610       28444 :                 if (BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED)
    1611           0 :                         return GDK_FAIL;
    1612       28444 :                 if (BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
    1613             :                         return GDK_FAIL;
    1614             :         }
    1615             :         return GDK_SUCCEED;
    1616             : }
    1617             : 
    1618             : static int
    1619        7634 : subcommit_list_add(int next, bat *n, BUN *sizes, bat bid, BUN sz)
    1620             : {
    1621        7634 :         assert(sz <= BBP_desc(bid)->batCount || sz == BUN_NONE);
    1622     1602947 :         for (int i = 0; i < next; i++) {
    1623     1595313 :                 if (n[i] == bid) {
    1624           0 :                         sizes[i] = sz;
    1625           0 :                         return next;
    1626             :                 }
    1627             :         }
    1628        7634 :         n[next] = bid;
    1629        7634 :         sizes[next++] = sz;
    1630        7634 :         return next;
    1631             : }
    1632             : 
    1633             : static int
    1634        2322 : cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng *cnts,
    1635             :                  BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, BUN cleanup)
    1636             : {
    1637        2322 :         BAT *nbids, *noids, *ncnts, *nlids, *ndels;
    1638        2322 :         BUN p, q;
    1639        2322 :         int err = 0, rcnt = 0;
    1640             : 
    1641        2322 :         BUN ocnt = BATcount(catalog_bid);
    1642        2322 :         nbids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
    1643        2322 :         noids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
    1644        2322 :         ncnts = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
    1645        2322 :         nlids = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
    1646        2322 :         ndels = logbat_new(TYPE_oid, BATcount(dcatalog) - cleanup, PERSISTENT);
    1647             : 
    1648        2322 :         if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == NULL || ndels == NULL) {
    1649           0 :                 logbat_destroy(nbids);
    1650           0 :                 logbat_destroy(noids);
    1651           0 :                 logbat_destroy(ncnts);
    1652           0 :                 logbat_destroy(nlids);
    1653           0 :                 logbat_destroy(ndels);
    1654           0 :                 return 0;
    1655             :         }
    1656             : 
    1657        2322 :         oid *poss = Tloc(dcatalog, 0);
    1658      178110 :         BATloop(dcatalog, p, q) {
    1659      175788 :                 oid pos = poss[p];
    1660             : 
    1661      175788 :                 if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid)
    1662           0 :                         continue;
    1663             : 
    1664      175788 :                 if (lids[pos] >= 0) {
    1665      175788 :                         BAT *lb;
    1666      175788 :                         bat bid = bids[pos];
    1667             : 
    1668      175788 :                         if ((lb = BBP_desc(bid)) == NULL || BATmode(lb, true /*transient */ ) != GDK_SUCCEED) {
    1669           0 :                                 GDKwarning("Failed to set bat(%d) transient\n", bid);
    1670             :                         } else {
    1671      175788 :                                 lids[pos] = -1; /* mark as transient */
    1672      175788 :                                 r[rcnt++] = bid;
    1673             :                         }
    1674             :                 }
    1675             :         }
    1676             : 
    1677        2322 :         int *oids = (int *) Tloc(catalog_id, 0);
    1678        2322 :         q = BATcount(catalog_bid);
    1679      824610 :         for (p = 0; p < q && !err; p++) {
    1680      822288 :                 bat col = bids[p];
    1681      822288 :                 int nid = oids[p];
    1682      822288 :                 lng lid = lids[p];
    1683      822288 :                 lng cnt = cnts[p];
    1684      822288 :                 oid pos = p;
    1685             : 
    1686             :                 /* only project out the deleted with lid == -1
    1687             :                  * update dcatalog */
    1688      822288 :                 if (lid == -1)
    1689      175788 :                         continue;       /* remove */
    1690             : 
    1691     1293000 :                 if (BUNappend(nbids, &col, false) != GDK_SUCCEED ||
    1692     1293000 :                     BUNappend(noids, &nid, false) != GDK_SUCCEED ||
    1693     1293000 :                     BUNappend(nlids, &lid, false) != GDK_SUCCEED ||
    1694      646500 :                     BUNappend(ncnts, &cnt, false) != GDK_SUCCEED)
    1695             :                         err = 1;
    1696      646500 :                 if (BUNfnd(lg->dcatalog, &pos) != BUN_NONE) {
    1697           0 :                         pos = (oid) (BATcount(nbids) - 1);
    1698           0 :                         if (BUNappend(ndels, &pos, false) != GDK_SUCCEED)
    1699      646500 :                                 err = 1;
    1700             :                 }
    1701             :         }
    1702             : 
    1703        2322 :         if (err) {
    1704           0 :                 logbat_destroy(nbids);
    1705           0 :                 logbat_destroy(noids);
    1706           0 :                 logbat_destroy(ndels);
    1707           0 :                 logbat_destroy(ncnts);
    1708           0 :                 logbat_destroy(nlids);
    1709           0 :                 return 0;
    1710             :         }
    1711             :         /* point of no return */
    1712        4644 :         if (log_switch_bat(catalog_bid, nbids, lg->fn, "catalog_bid") != GDK_SUCCEED ||
    1713        4644 :             log_switch_bat(catalog_id, noids, lg->fn, "catalog_id") != GDK_SUCCEED ||
    1714        2322 :             log_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != GDK_SUCCEED ||
    1715        2322 :             (nbids = BATsetaccess(nbids, BAT_READ)) == NULL ||
    1716        2322 :             (noids = BATsetaccess(noids, BAT_READ)) == NULL ||
    1717        2322 :             (ndels = BATsetaccess(ndels, BAT_READ)) == NULL) {
    1718           0 :                 logbat_destroy(nbids);
    1719           0 :                 logbat_destroy(noids);
    1720           0 :                 logbat_destroy(ndels);
    1721           0 :                 logbat_destroy(ncnts);
    1722           0 :                 logbat_destroy(nlids);
    1723           0 :                 return -1;
    1724             :         }
    1725        2322 :         r[rcnt++] = lg->catalog_bid->batCacheid;
    1726        2322 :         r[rcnt++] = lg->catalog_id->batCacheid;
    1727        2322 :         r[rcnt++] = lg->dcatalog->batCacheid;
    1728             : 
    1729        2322 :         assert(BATcount(lg->dcatalog) - cleanup == BATcount(ndels));
    1730             : 
    1731        2322 :         logbat_destroy(lg->catalog_bid);
    1732        2322 :         logbat_destroy(lg->catalog_id);
    1733        2322 :         logbat_destroy(lg->dcatalog);
    1734             : 
    1735        2322 :         lg->catalog_bid = nbids;
    1736        2322 :         lg->catalog_id = noids;
    1737        2322 :         lg->dcatalog = ndels;
    1738             : 
    1739             :         /* failing to rename these two bats is not fatal */
    1740        2322 :         if (BBPrename(lg->catalog_cnt, NULL) != GDK_SUCCEED)
    1741        2322 :                 GDKclrerr();
    1742        2322 :         if (BBPrename(lg->catalog_lid, NULL) != GDK_SUCCEED)
    1743        2322 :                 GDKclrerr();
    1744        2322 :         BBPunfix(lg->catalog_cnt->batCacheid);
    1745        2322 :         BBPunfix(lg->catalog_lid->batCacheid);
    1746             : 
    1747        2322 :         lg->catalog_cnt = ncnts;
    1748        2322 :         lg->catalog_lid = nlids;
    1749        2322 :         char bak[FILENAME_MAX];
    1750        2322 :         strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_cnt", NULL);
    1751        2322 :         if (BBPrename(lg->catalog_cnt, bak) < 0)
    1752           0 :                 GDKclrerr();
    1753        2322 :         strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_lid", NULL);
    1754        2322 :         if (BBPrename(lg->catalog_lid, bak) < 0)
    1755           0 :                 GDKclrerr();
    1756        2322 :         rotation_lock(lg);
    1757        9971 :         for (logged_range *p = lg->pending; p; p = p->next) {
    1758        7649 :                 p->cnt -= cleanup;
    1759             :         }
    1760        2322 :         rotation_unlock(lg);
    1761        2322 :         return rcnt;
    1762             : }
    1763             : 
    1764             : /* this function is called with log_lock() held; it releases the lock
    1765             :  * before returning */
    1766             : static gdk_return
    1767       11904 : bm_subcommit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
    1768             : {
    1769       11904 :         BUN cnt = pending ? pending->cnt : BATcount(lg->catalog_bid);
    1770       11904 :         BUN dcnt = BATcount(lg->dcatalog);
    1771       11904 :         BUN p, q;
    1772       11904 :         BAT *catalog_bid = lg->catalog_bid;
    1773       11904 :         BAT *catalog_id = lg->catalog_id;
    1774       11904 :         BAT *dcatalog = lg->dcatalog;
    1775       11904 :         BUN nn = 13 + cnt;
    1776       11904 :         bat *n = GDKmalloc(sizeof(bat) * nn);
    1777       11904 :         bat *r = GDKmalloc(sizeof(bat) * nn);
    1778       11904 :         BUN *sizes = GDKmalloc(sizeof(BUN) * nn);
    1779       11904 :         int i = 0, rcnt = 0;
    1780       11904 :         gdk_return res;
    1781       11904 :         const log_bid *bids;
    1782       11904 :         lng *cnts = NULL, *lids = NULL;
    1783       11904 :         BUN cleanup = 0;
    1784       11904 :         lng t0 = 0;
    1785             : 
    1786       11904 :         if (n == NULL || r == NULL || sizes == NULL) {
    1787           0 :                 GDKfree(n);
    1788           0 :                 GDKfree(r);
    1789           0 :                 GDKfree(sizes);
    1790           0 :                 log_unlock(lg);
    1791           0 :                 return GDK_FAIL;
    1792             :         }
    1793             : 
    1794       11904 :         sizes[i] = 0;
    1795       11904 :         n[i++] = 0;             /* n[0] is not used */
    1796       11904 :         bids = (const log_bid *) Tloc(catalog_bid, 0);
    1797       11904 :         if (lg->catalog_cnt)
    1798       11680 :                 cnts = (lng *) Tloc(lg->catalog_cnt, 0);
    1799       11904 :         if (lg->catalog_lid)
    1800       11680 :                 lids = (lng *) Tloc(lg->catalog_lid, 0);
    1801     3447550 :         BATloop(catalog_bid, p, q) {
    1802     3435646 :                 if (lids && lids[p] != lng_nil && lids[p] <= lg->saved_tid) {
    1803      175788 :                         cleanup++;
    1804      175788 :                         if (lids[p] == -1)
    1805           0 :                                 continue;
    1806      175788 :                         if (BUNappend(dcatalog, &(oid){p}, true) != GDK_SUCCEED) {
    1807           0 :                                 while (BATcount(dcatalog) > dcnt) {
    1808           0 :                                         if (BUNdelete(dcatalog, BATcount(dcatalog) - 1) != GDK_SUCCEED) {
    1809           0 :                                                 TRC_CRITICAL(WAL, "delete after failed append failed\n");
    1810           0 :                                                 break;
    1811             :                                         }
    1812             :                                 }
    1813           0 :                                 GDKfree(n);
    1814           0 :                                 GDKfree(r);
    1815           0 :                                 GDKfree(sizes);
    1816           0 :                                 log_unlock(lg);
    1817           0 :                                 return GDK_FAIL;
    1818             :                         }
    1819             :                 }
    1820     3435646 :                 if (updated && p < maxupdated && (updated[p / 32] & (1U << (p % 32))) == 0) {
    1821     1707931 :                         continue;
    1822             :                 }
    1823     1727715 :                 bat col = bids[p];
    1824             : 
    1825     1727715 :                 TRC_DEBUG(WAL, "new %s (%d)\n", BBP_logical(col), col);
    1826     1727715 :                 assert(col);
    1827     1727715 :                 sizes[i] = cnts ? (BUN) cnts[p] : 0;
    1828     1727715 :                 n[i++] = col;
    1829             :         }
    1830             :         /* now commit catalog, so it's also up to date on disk */
    1831       11904 :         sizes[i] = cnt;
    1832       11904 :         n[i++] = catalog_bid->batCacheid;
    1833       11904 :         sizes[i] = cnt;
    1834       11904 :         n[i++] = catalog_id->batCacheid;
    1835       11904 :         sizes[i] = BATcount(dcatalog);
    1836       11904 :         n[i++] = dcatalog->batCacheid;
    1837             : 
    1838       11904 :         if (cleanup) {
    1839        2322 :                 if ((rcnt = cleanup_and_swap(lg, r, bids, lids, cnts,
    1840             :                                              catalog_bid, catalog_id, dcatalog,
    1841             :                                              cleanup)) < 0) {
    1842           0 :                         GDKfree(n);
    1843           0 :                         GDKfree(r);
    1844           0 :                         GDKfree(sizes);
    1845           0 :                         log_unlock(lg);
    1846           0 :                         return GDK_FAIL;
    1847             :                 }
    1848        2322 :                 cnt -= cleanup;
    1849             :         }
    1850       11904 :         if (dcatalog != lg->dcatalog) {
    1851        2322 :                 i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, cnt);
    1852        2322 :                 i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, cnt);
    1853        2322 :                 i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog));
    1854             :         }
    1855       11904 :         if (lg->seqs_id) {
    1856       11680 :                 sizes[i] = BATcount(lg->seqs_id);
    1857       11680 :                 n[i++] = lg->seqs_id->batCacheid;
    1858       11680 :                 sizes[i] = BATcount(lg->seqs_id);
    1859       11680 :                 n[i++] = lg->seqs_val->batCacheid;
    1860             :         }
    1861       11904 :         if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id) / 2) && BATcount(lg->dseqs) > 10) {
    1862         334 :                 BAT *tids, *ids, *vals;
    1863             : 
    1864         334 :                 tids = bm_tids(lg->seqs_id, lg->dseqs);
    1865         334 :                 if (tids == NULL) {
    1866           0 :                         GDKfree(n);
    1867           0 :                         GDKfree(r);
    1868           0 :                         GDKfree(sizes);
    1869           0 :                         log_unlock(lg);
    1870           0 :                         return GDK_FAIL;
    1871             :                 }
    1872         334 :                 ids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
    1873         334 :                 vals = logbat_new(TYPE_lng, BATcount(tids), PERSISTENT);
    1874             : 
    1875         334 :                 if (ids == NULL || vals == NULL) {
    1876           0 :                         logbat_destroy(tids);
    1877           0 :                         logbat_destroy(ids);
    1878           0 :                         logbat_destroy(vals);
    1879           0 :                         GDKfree(n);
    1880           0 :                         GDKfree(r);
    1881           0 :                         GDKfree(sizes);
    1882           0 :                         log_unlock(lg);
    1883           0 :                         return GDK_FAIL;
    1884             :                 }
    1885             : 
    1886         668 :                 if (BATappend(ids, lg->seqs_id, tids, true) != GDK_SUCCEED ||
    1887         334 :                     BATappend(vals, lg->seqs_val, tids, true) != GDK_SUCCEED) {
    1888           0 :                         logbat_destroy(tids);
    1889           0 :                         logbat_destroy(ids);
    1890           0 :                         logbat_destroy(vals);
    1891           0 :                         GDKfree(n);
    1892           0 :                         GDKfree(r);
    1893           0 :                         GDKfree(sizes);
    1894           0 :                         log_unlock(lg);
    1895           0 :                         return GDK_FAIL;
    1896             :                 }
    1897         334 :                 logbat_destroy(tids);
    1898         334 :                 BATclear(lg->dseqs, true);
    1899             : 
    1900         668 :                 if (log_switch_bat(lg->seqs_id, ids, lg->fn, "seqs_id") != GDK_SUCCEED ||
    1901         334 :                     log_switch_bat(lg->seqs_val, vals, lg->fn, "seqs_val") != GDK_SUCCEED ||
    1902         334 :                     (ids = BATsetaccess(ids, BAT_READ)) == NULL ||
    1903         334 :                     (vals = BATsetaccess(vals, BAT_READ)) == NULL) {
    1904           0 :                         logbat_destroy(ids);
    1905           0 :                         logbat_destroy(vals);
    1906           0 :                         GDKfree(n);
    1907           0 :                         GDKfree(r);
    1908           0 :                         GDKfree(sizes);
    1909           0 :                         log_unlock(lg);
    1910           0 :                         return GDK_FAIL;
    1911             :                 }
    1912         334 :                 i = subcommit_list_add(i, n, sizes, ids->batCacheid, BATcount(ids));
    1913         334 :                 i = subcommit_list_add(i, n, sizes, vals->batCacheid, BATcount(ids));
    1914             : 
    1915         334 :                 if (BBP_lrefs(lg->seqs_id->batCacheid) > 0)
    1916         267 :                         r[rcnt++] = lg->seqs_id->batCacheid;
    1917         334 :                 if (BBP_lrefs(lg->seqs_val->batCacheid) > 0)
    1918         267 :                         r[rcnt++] = lg->seqs_val->batCacheid;
    1919             : 
    1920         334 :                 logbat_destroy(lg->seqs_id);
    1921         334 :                 logbat_destroy(lg->seqs_val);
    1922             : 
    1923         334 :                 lg->seqs_id = ids;
    1924         334 :                 lg->seqs_val = vals;
    1925             :         }
    1926       11904 :         if (lg->seqs_id) {
    1927       11680 :                 sizes[i] = BATcount(lg->dseqs);
    1928       11680 :                 n[i++] = lg->dseqs->batCacheid;
    1929             :         }
    1930             : 
    1931       11904 :         assert((BUN) i <= nn);
    1932       11904 :         log_unlock(lg);
    1933       11904 :         TRC_DEBUG_IF(WAL)
    1934           0 :                 t0 = GDKusec();
    1935       12128 :         res = TMsubcommit_list(n, cnts ? sizes : NULL, i, lg->saved_id, lg->saved_tid);
    1936       11904 :         TRC_DEBUG(WAL, "subcommit " LLFMT "usec\n", GDKusec() - t0);
    1937       11904 :         if (res == GDK_SUCCEED) {       /* now cleanup */
    1938      195192 :                 for (i = 0; i < rcnt; i++) {
    1939      183288 :                         TRC_DEBUG_IF(WAL) {
    1940           0 :                                 TRC_DEBUG_ENDIF(WAL, "release %d\n", r[i]);
    1941           0 :                                 if (BBP_lrefs(r[i]) != 2)
    1942           0 :                                         TRC_DEBUG_ENDIF(WAL, "release %d %d\n", r[i], BBP_lrefs(r[i]));
    1943             :                         }
    1944      183288 :                         BBPrelease(r[i]);
    1945             :                 }
    1946             :         }
    1947       11904 :         GDKfree(n);
    1948       11904 :         GDKfree(r);
    1949       11904 :         GDKfree(sizes);
    1950       11904 :         if (res != GDK_SUCCEED)
    1951           0 :                 TRC_CRITICAL(GDK, "commit failed\n");
    1952             :         return res;
    1953             : }
    1954             : 
    1955             : static gdk_return
    1956         335 : log_filename(logger *lg, char bak[FILENAME_MAX], char filename[FILENAME_MAX])
    1957             : {
    1958         335 :         str filenamestr = NULL;
    1959             : 
    1960         335 :         if ((filenamestr = GDKfilepath(0, lg->dir, LOGFILE, NULL)) == NULL)
    1961             :                 return GDK_FAIL;
    1962         335 :         size_t len = strcpy_len(filename, filenamestr, FILENAME_MAX);
    1963         335 :         GDKfree(filenamestr);
    1964         335 :         if (len >= FILENAME_MAX) {
    1965           0 :                 GDKerror("Logger filename path is too large\n");
    1966           0 :                 return GDK_FAIL;
    1967             :         }
    1968         335 :         if (bak) {
    1969         335 :                 len = strconcat_len(bak, FILENAME_MAX, filename, ".bak", NULL);
    1970         335 :                 if (len >= FILENAME_MAX) {
    1971           0 :                         GDKerror("Logger filename path is too large\n");
    1972           0 :                         return GDK_FAIL;
    1973             :                 }
    1974             :         }
    1975             :         return GDK_SUCCEED;
    1976             : }
    1977             : 
    1978             : static gdk_return
    1979       11754 : log_cleanup(logger *lg, lng id)
    1980             : {
    1981       11754 :         char log_id[FILENAME_MAX];
    1982             : 
    1983       11754 :         if (snprintf(log_id, sizeof(log_id), LLFMT, id) >= FILENAME_MAX) {
    1984           0 :                 GDKerror("log_id filename is too large\n");
    1985           0 :                 return GDK_FAIL;
    1986             :         }
    1987       11754 :         if (GDKunlink(0, lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
    1988           0 :                 GDKwarning("failed to remove old WAL %s.%s\n", LOGFILE, log_id);
    1989           0 :                 GDKclrerr();    /* clear error from unlink */
    1990             :         }
    1991             :         return GDK_SUCCEED;
    1992             : }
    1993             : 
    1994             : #ifdef GDKLIBRARY_JSON
    1995             : static gdk_return
    1996         336 : log_json_upgrade_finalize(void)
    1997             : {
    1998         336 :         int json_tpe = ATOMindex("json");
    1999         671 :         if (!GDKinmemory(0) &&
    2000         335 :             GDKunlink(0, BATDIR, "jsonupgradeneeded", NULL) == GDK_FAIL) {
    2001           0 :                 TRC_CRITICAL(GDK, "Failed to remove json upgrade signal file");
    2002           0 :                 return GDK_FAIL;
    2003             :         }
    2004         336 :         BATatoms[json_tpe].atomRead = (void *(*)(void *, size_t *, stream *, size_t))strRead;
    2005             : 
    2006         336 :         return GDK_SUCCEED;
    2007             : }
    2008             : #endif
    2009             : 
    2010             : /* Load data from the logger logdir
    2011             :  * Initialize new directories and catalog files if none are present,
    2012             :  * unless running in read-only mode
    2013             :  * Load data and persist it in the BATs */
    2014             : static gdk_return
    2015         336 : log_load(const char *fn, const char *logdir, logger *lg, char filename[FILENAME_MAX])
    2016             : {
    2017         336 :         FILE *fp = NULL;
    2018         336 :         char bak[FILENAME_MAX];
    2019         336 :         bat catalog_bid, catalog_id, dcatalog;
    2020         336 :         bool needcommit = false;
    2021         336 :         ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
    2022         336 :         bool readlogs = false;
    2023         336 :         bool needsnew = false;  /* need to write new log file? */
    2024             : 
    2025             :         /* refactor */
    2026         336 :         if (!LOG_DISABLED(lg)) {
    2027         335 :                 if (log_filename(lg, bak, filename) != GDK_SUCCEED)
    2028           0 :                         goto error;
    2029             :         }
    2030             : 
    2031         336 :         lg->catalog_bid = NULL;
    2032         336 :         lg->catalog_id = NULL;
    2033         336 :         lg->catalog_cnt = NULL;
    2034         336 :         lg->catalog_lid = NULL;
    2035         336 :         lg->dcatalog = NULL;
    2036             : 
    2037         336 :         lg->seqs_id = NULL;
    2038         336 :         lg->seqs_val = NULL;
    2039         336 :         lg->dseqs = NULL;
    2040             : 
    2041         336 :         if (!LOG_DISABLED(lg)) {
    2042             :                 /* try to open logfile backup, or failing that, the file
    2043             :                  * itself. we need to know whether this file exists when
    2044             :                  * checking the database consistency later on */
    2045         335 :                 if ((fp = MT_fopen(bak, "r")) != NULL) {
    2046           0 :                         fclose(fp);
    2047           0 :                         fp = NULL;
    2048           0 :                         if (GDKunlink(0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED ||
    2049           0 :                             GDKmove(0, lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL, true) != GDK_SUCCEED)
    2050           0 :                                 goto error;
    2051         335 :                 } else if (errno != ENOENT) {
    2052           0 :                         GDKsyserror("open %s failed", bak);
    2053           0 :                         goto error;
    2054             :                 }
    2055         335 :                 fp = MT_fopen(filename, "r");
    2056         335 :                 if (fp == NULL && errno != ENOENT) {
    2057           0 :                         GDKsyserror("open %s failed", filename);
    2058           0 :                         goto error;
    2059             :                 }
    2060             :         }
    2061             : 
    2062         336 :         strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
    2063         336 :         catalog_bid = BBPindex(bak);
    2064             : 
    2065             :         /* initialize arrays for type mapping, to be read from disk */
    2066         336 :         memset(lg->type_id, -1, sizeof(lg->type_id));
    2067         336 :         memset(lg->type_nr, 255, sizeof(lg->type_nr));
    2068             : 
    2069             :         /* this is intentional - if catalog_bid is 0, force it to find
    2070             :          * the persistent catalog */
    2071         336 :         if (catalog_bid == 0) {
    2072             :                 /* catalog does not exist, so the log file also
    2073             :                  * shouldn't exist */
    2074         224 :                 if (fp != NULL) {
    2075           0 :                         GDKerror("there is no logger catalog, "
    2076             :                                  "but there is a log file.\n");
    2077           0 :                         goto error;
    2078             :                 }
    2079             : 
    2080         224 :                 lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    2081         224 :                 lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    2082         224 :                 lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
    2083             : 
    2084         224 :                 if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
    2085           0 :                         GDKerror("cannot create catalog bats");
    2086           0 :                         goto error;
    2087             :                 }
    2088         224 :                 if ((lg->catalog_bid = BATsetaccess(lg->catalog_bid, BAT_READ)) == NULL ||
    2089         224 :                     (lg->catalog_id = BATsetaccess(lg->catalog_id, BAT_READ)) == NULL ||
    2090         224 :                     (lg->dcatalog = BATsetaccess(lg->dcatalog, BAT_READ)) == NULL) {
    2091           0 :                         goto error;
    2092             :                 }
    2093         224 :                 TRC_INFO(WAL, "create %s catalog\n", fn);
    2094             : 
    2095             :                 /* give the catalog bats names so we can find them
    2096             :                  * next time */
    2097         224 :                 strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
    2098         224 :                 if (BBPrename(lg->catalog_bid, bak) < 0) {
    2099           0 :                         goto error;
    2100             :                 }
    2101             : 
    2102         224 :                 strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
    2103         224 :                 if (BBPrename(lg->catalog_id, bak) < 0) {
    2104           0 :                         goto error;
    2105             :                 }
    2106             : 
    2107         224 :                 strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
    2108         224 :                 if (BBPrename(lg->dcatalog, bak) < 0) {
    2109           0 :                         goto error;
    2110             :                 }
    2111             : 
    2112         224 :                 if (!LOG_DISABLED(lg)) {
    2113         223 :                         if (GDKcreatedir(filename) != GDK_SUCCEED) {
    2114           0 :                                 GDKerror("cannot create directory for log file %s\n", filename);
    2115           0 :                                 goto error;
    2116             :                         }
    2117         223 :                         if (log_create_types_file(lg, filename, true) != GDK_SUCCEED)
    2118           0 :                                 goto error;
    2119             :                 }
    2120             : 
    2121         224 :                 BBPretain(lg->catalog_bid->batCacheid);
    2122         224 :                 BBPretain(lg->catalog_id->batCacheid);
    2123         224 :                 BBPretain(lg->dcatalog->batCacheid);
    2124             : 
    2125         224 :                 log_lock(lg);
    2126             :                 /* bm_subcommit releases the lock */
    2127         224 :                 if (bm_subcommit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
    2128             :                         /* cannot commit catalog, so remove log */
    2129           0 :                         if (MT_remove(filename) < 0)
    2130           0 :                                 GDKsyserror("remove %s failed\n", filename);
    2131           0 :                         BBPrelease(lg->catalog_bid->batCacheid);
    2132           0 :                         BBPrelease(lg->catalog_id->batCacheid);
    2133           0 :                         BBPrelease(lg->dcatalog->batCacheid);
    2134           0 :                         goto error;
    2135             :                 }
    2136             :         } else {
    2137             :                 /* find the persistent catalog. As non persistent bats
    2138             :                  * require a logical reference we also add a logical
    2139             :                  * reference for the persistent bats */
    2140         112 :                 BUN p, q;
    2141         112 :                 BAT *b, *o, *d;
    2142             : 
    2143         112 :                 assert(!lg->inmemory);
    2144             : 
    2145             :                 /* the catalog exists, and so should the log file */
    2146         112 :                 if (fp == NULL && !LOG_DISABLED(lg)) {
    2147           0 :                         GDKerror("There is a logger catalog, but no log file.\n");
    2148           0 :                         goto error;
    2149             :                 }
    2150             :                 if (fp != NULL) {
    2151             :                         /* check_version always closes fp */
    2152         112 :                         if (check_version(lg, fp, fn, logdir, filename, &needsnew) != GDK_SUCCEED) {
    2153           0 :                                 fp = NULL;
    2154           0 :                                 goto error;
    2155             :                         }
    2156             :                         readlogs = true;
    2157             :                         fp = NULL;
    2158             :                 }
    2159             : 
    2160         112 :                 if (lg->catalog_bid == NULL && lg->catalog_id == NULL && lg->dcatalog == NULL) {
    2161         112 :                         b = BATdescriptor(catalog_bid);
    2162         112 :                         if (b == NULL) {
    2163           0 :                                 GDKerror("inconsistent database, catalog does not exist");
    2164           0 :                                 goto error;
    2165             :                         }
    2166             : 
    2167         112 :                         strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
    2168         112 :                         catalog_id = BBPindex(bak);
    2169         112 :                         o = BATdescriptor(catalog_id);
    2170         112 :                         if (o == NULL) {
    2171           0 :                                 BBPunfix(b->batCacheid);
    2172           0 :                                 GDKerror("inconsistent database, catalog_id does not exist");
    2173           0 :                                 goto error;
    2174             :                         }
    2175             : 
    2176         112 :                         strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
    2177         112 :                         dcatalog = BBPindex(bak);
    2178         112 :                         d = BATdescriptor(dcatalog);
    2179         112 :                         if (d == NULL) {
    2180           0 :                                 GDKerror("cannot create dcatalog bat");
    2181           0 :                                 BBPunfix(b->batCacheid);
    2182           0 :                                 BBPunfix(o->batCacheid);
    2183           0 :                                 goto error;
    2184             :                         }
    2185             : 
    2186         112 :                         lg->catalog_bid = b;
    2187         112 :                         lg->catalog_id = o;
    2188         112 :                         lg->dcatalog = d;
    2189         112 :                         const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
    2190       28556 :                         BATloop(lg->catalog_bid, p, q) {
    2191       28444 :                                 bat bid = bids[p];
    2192       28444 :                                 oid pos = p;
    2193             : 
    2194       28444 :                                 if (BBPretain(bid) == 0 &&      /* any bid in the catalog_bid, needs one logical ref */
    2195           0 :                                     BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
    2196           0 :                                     BUNappend(lg->dcatalog, &pos, true) != GDK_SUCCEED)
    2197           0 :                                         goto error;
    2198             :                         }
    2199             :                 }
    2200         112 :                 if ((lg->catalog_bid = BATsetaccess(lg->catalog_bid, BAT_READ)) == NULL ||
    2201         112 :                     (lg->catalog_id = BATsetaccess(lg->catalog_id, BAT_READ)) == NULL ||
    2202         112 :                     (lg->dcatalog = BATsetaccess(lg->dcatalog, BAT_READ)) == NULL) {
    2203           0 :                         goto error;
    2204             :                 }
    2205         112 :                 BBPretain(lg->catalog_bid->batCacheid);
    2206         112 :                 BBPretain(lg->catalog_id->batCacheid);
    2207         112 :                 BBPretain(lg->dcatalog->batCacheid);
    2208             :         }
    2209             :         /* failing to rename the catalog_cnt and catalog_lid bats is not
    2210             :          * fatal */
    2211         336 :         lg->catalog_cnt = logbat_new(TYPE_lng, 1, SYSTRANS);
    2212         336 :         if (lg->catalog_cnt == NULL) {
    2213           0 :                 GDKerror("failed to create catalog_cnt bat");
    2214           0 :                 goto error;
    2215             :         }
    2216         336 :         strconcat_len(bak, sizeof(bak), fn, "_catalog_cnt", NULL);
    2217         336 :         if (BBPrename(lg->catalog_cnt, bak) < 0)
    2218           0 :                 GDKclrerr();
    2219         336 :         lg->catalog_lid = logbat_new(TYPE_lng, 1, SYSTRANS);
    2220         336 :         if (lg->catalog_lid == NULL) {
    2221           0 :                 GDKerror("failed to create catalog_lid bat");
    2222           0 :                 goto error;
    2223             :         }
    2224         336 :         strconcat_len(bak, sizeof(bak), fn, "_catalog_lid", NULL);
    2225         336 :         if (BBPrename(lg->catalog_lid, bak) < 0)
    2226           0 :                 GDKclrerr();
    2227         336 :         if (bm_get_counts(lg) != GDK_SUCCEED)
    2228           0 :                 goto error;
    2229             : 
    2230         336 :         strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
    2231         336 :         if (BBPindex(bak)) {
    2232         112 :                 lg->seqs_id = BATdescriptor(BBPindex(bak));
    2233         112 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
    2234         112 :                 lg->seqs_val = BATdescriptor(BBPindex(bak));
    2235         112 :                 strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
    2236         112 :                 lg->dseqs = BATdescriptor(BBPindex(bak));
    2237         112 :                 if (lg->seqs_id == NULL ||
    2238         112 :                     lg->seqs_val == NULL ||
    2239             :                     lg->dseqs == NULL) {
    2240           0 :                         GDKerror("Logger_new: cannot load seqs bats");
    2241           0 :                         goto error;
    2242             :                 }
    2243             :         } else {
    2244         224 :                 lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
    2245         224 :                 lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
    2246         224 :                 lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
    2247         224 :                 if (lg->seqs_id == NULL ||
    2248         224 :                     lg->seqs_val == NULL ||
    2249             :                     lg->dseqs == NULL) {
    2250           0 :                         GDKerror("Logger_new: cannot create seqs bats");
    2251           0 :                         goto error;
    2252             :                 }
    2253             : 
    2254         224 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
    2255         224 :                 if (BBPrename(lg->seqs_id, bak) < 0) {
    2256           0 :                         goto error;
    2257             :                 }
    2258             : 
    2259         224 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
    2260         224 :                 if (BBPrename(lg->seqs_val, bak) < 0) {
    2261           0 :                         goto error;
    2262             :                 }
    2263             : 
    2264         224 :                 strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
    2265         224 :                 if (BBPrename(lg->dseqs, bak) < 0) {
    2266           0 :                         goto error;
    2267             :                 }
    2268             :                 needcommit = true;
    2269             :         }
    2270         336 :         if ((lg->seqs_val = BATsetaccess(lg->seqs_val, BAT_READ)) == NULL ||
    2271         336 :             (lg->seqs_id = BATsetaccess(lg->seqs_id, BAT_READ)) == NULL ||
    2272         336 :             (lg->dseqs = BATsetaccess(lg->dseqs, BAT_READ)) == NULL) {
    2273           0 :                 goto error;
    2274             :         }
    2275         336 :         dbg = ATOMIC_GET(&GDKdebug);
    2276         336 :         ATOMIC_AND(&GDKdebug, ~CHECKMASK);
    2277         336 :         if (needcommit && bm_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
    2278           0 :                 GDKerror("Logger_new: commit failed");
    2279           0 :                 goto error;
    2280             :         }
    2281         336 :         ATOMIC_SET(&GDKdebug, dbg);
    2282             : 
    2283         336 :         if (readlogs) {
    2284         112 :                 ulng log_id = lg->saved_id + 1;
    2285         112 :                 if (log_readlogs(lg, filename) != GDK_SUCCEED) {
    2286           0 :                         goto error;
    2287             :                 }
    2288         112 :                 if (lg->postfuncp && (*lg->postfuncp) (lg->funcdata, lg) != GDK_SUCCEED)
    2289           0 :                         goto error;
    2290         112 :                 if (needsnew) {
    2291           0 :                         if (GDKmove(0, lg->dir, LOGFILE, NULL, lg->dir, LOGFILE, "bak", true) != GDK_SUCCEED) {
    2292           0 :                                 TRC_CRITICAL(GDK, "couldn't move log to log.bak\n");
    2293           0 :                                 return GDK_FAIL;
    2294             :                         }
    2295           0 :                         if (log_create_types_file(lg, filename, false) != GDK_SUCCEED) {
    2296           0 :                                 TRC_CRITICAL(GDK, "couldn't write new log\n");
    2297           0 :                                 return GDK_FAIL;
    2298             :                         }
    2299             :                 }
    2300         112 :                 dbg = ATOMIC_GET(&GDKdebug);
    2301         112 :                 ATOMIC_AND(&GDKdebug, ~CHECKMASK);
    2302         112 :                 if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
    2303           0 :                         goto error;
    2304             :                 }
    2305         112 :                 ATOMIC_SET(&GDKdebug, dbg);
    2306         293 :                 for (; log_id <= lg->saved_id; log_id++)
    2307         181 :                         (void) log_cleanup(lg, log_id); /* ignore error of removing file */
    2308         112 :                 if (needsnew &&
    2309           0 :                     GDKunlink(0, lg->dir, LOGFILE, "bak") != GDK_SUCCEED) {
    2310           0 :                         TRC_CRITICAL(GDK, "couldn't remove old log.bak file\n");
    2311           0 :                         return GDK_FAIL;
    2312             :                 }
    2313             :         } else {
    2314         224 :                 lg->id = lg->saved_id + 1;
    2315             :         }
    2316             : #ifdef GDKLIBRARY_JSON
    2317         336 :         if (log_json_upgrade_finalize() == GDK_FAIL)
    2318           0 :                 goto error;
    2319             : #endif
    2320             :         return GDK_SUCCEED;
    2321           0 :   error:
    2322           0 :         if (fp)
    2323           0 :                 fclose(fp);
    2324           0 :         logbat_destroy(lg->catalog_bid);
    2325           0 :         logbat_destroy(lg->catalog_id);
    2326           0 :         logbat_destroy(lg->dcatalog);
    2327           0 :         logbat_destroy(lg->seqs_id);
    2328           0 :         logbat_destroy(lg->seqs_val);
    2329           0 :         logbat_destroy(lg->dseqs);
    2330           0 :         ATOMIC_DESTROY(&lg->current->refcount);
    2331           0 :         ATOMIC_DESTROY(&lg->nr_flushers);
    2332           0 :         MT_lock_destroy(&lg->lock);
    2333           0 :         MT_lock_destroy(&lg->rotation_lock);
    2334           0 :         GDKfree(lg->fn);
    2335           0 :         GDKfree(lg->dir);
    2336           0 :         GDKfree(lg->rbuf);
    2337           0 :         GDKfree(lg->wbuf);
    2338           0 :         GDKfree(lg);
    2339           0 :         ATOMIC_SET(&GDKdebug, dbg);
    2340             :         /* We do not call log_json_upgrade_finalize here because we want
    2341             :          * the upgrade to run again next time we try, so we do not want
    2342             :          * to remove the signal file just yet.
    2343             :          */
    2344           0 :         return GDK_FAIL;
    2345             : }
    2346             : 
    2347             : /* Initialize a new logger
    2348             :  * It will load any data in the logdir and persist it in the BATs*/
    2349             : static logger *
    2350         336 : log_new(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp,
    2351             :         postversionfix_fptr postfuncp, void *funcdata)
    2352             : {
    2353         336 :         logger *lg;
    2354         336 :         char filename[FILENAME_MAX];
    2355             : 
    2356         336 :         lng max_dropped = GDKgetenv_int("wal_max_dropped", 100000);
    2357         336 :         lng max_file_age = GDKgetenv_int("wal_max_file_age", 600);
    2358         336 :         lng max_file_size = 0;
    2359             : 
    2360         336 :         if (GDKdebug & FORCEMITOMASK) {
    2361             :                 max_file_size = 2048; /* 2 KiB */
    2362             :         } else {
    2363          12 :                 const char *max_file_size_str = GDKgetenv("wal_max_file_size");
    2364          12 :                 max_file_size = max_file_size_str ? strtoul(max_file_size_str, NULL, 10) : 2147483648;
    2365             :         }
    2366             : 
    2367         336 :         if (!GDKinmemory(0) && MT_path_absolute(logdir)) {
    2368           0 :                 TRC_CRITICAL(GDK, "logdir must be relative path\n");
    2369           0 :                 return NULL;
    2370             :         }
    2371             : 
    2372         336 :         lg = GDKmalloc(sizeof(struct logger));
    2373         336 :         if (lg == NULL) {
    2374           0 :                 TRC_CRITICAL(GDK, "allocating logger structure failed\n");
    2375           0 :                 return NULL;
    2376             :         }
    2377             : 
    2378         672 :         *lg = (logger) {
    2379         336 :                 .inmemory = GDKinmemory(0),
    2380             :                 .debug = debug,
    2381             :                 .version = version,
    2382             :                 .prefuncp = prefuncp,
    2383             :                 .postfuncp = postfuncp,
    2384             :                 .funcdata = funcdata,
    2385             : 
    2386         336 :                 .max_dropped = max_dropped >= 0 ? max_dropped : 100000,
    2387             :                 .file_age = 0,
    2388         336 :                 .max_file_age = max_file_age >= 0 ? max_file_age * 1000000 : 600000000,
    2389         336 :                 .max_file_size = max_file_size >= 0 ? max_file_size : 2147483648,
    2390             : 
    2391             :                 .id = 0,
    2392         336 :                 .saved_id = getBBPlogno(),      /* get saved log numer from bbp */
    2393         336 :                 .saved_tid = (int) getBBPtransid(),     /* get saved transaction id from bbp */
    2394             :         };
    2395             : 
    2396             :         /* probably open file and check version first, then call call old logger code */
    2397         336 :         if (snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP) >= FILENAME_MAX) {
    2398           0 :                 TRC_CRITICAL(GDK, "filename is too large\n");
    2399           0 :                 GDKfree(lg);
    2400           0 :                 return NULL;
    2401             :         }
    2402         336 :         lg->fn = GDKstrdup(fn);
    2403         336 :         lg->dir = GDKstrdup(filename);
    2404         336 :         lg->rbufsize = 64 * 1024;
    2405         336 :         lg->rbuf = GDKmalloc(lg->rbufsize);
    2406         336 :         lg->wbufsize = 64 * 1024;
    2407         336 :         lg->wbuf = GDKmalloc(lg->wbufsize);
    2408         336 :         if (lg->fn == NULL ||
    2409         336 :             lg->dir == NULL ||
    2410         336 :             lg->rbuf == NULL ||
    2411             :             lg->wbuf == NULL) {
    2412           0 :                 TRC_CRITICAL(GDK, "allocating for logger structure failed\n");
    2413           0 :                 GDKfree(lg->fn);
    2414           0 :                 GDKfree(lg->dir);
    2415           0 :                 GDKfree(lg->rbuf);
    2416           0 :                 GDKfree(lg->wbuf);
    2417           0 :                 GDKfree(lg);
    2418           0 :                 return NULL;
    2419             :         }
    2420         336 :         TRC_DEBUG(WAL, "dir set to %s\n", lg->dir);
    2421             : 
    2422         336 :         MT_lock_init(&lg->lock, fn);
    2423         336 :         MT_lock_init(&lg->rotation_lock, "rotation_lock");
    2424         336 :         MT_lock_init(&lg->flush_lock, "flush_lock");
    2425         336 :         MT_cond_init(&lg->excl_flush_cv);
    2426         336 :         ATOMIC_INIT(&lg->nr_flushers, 0);
    2427             : 
    2428         336 :         if (log_load(fn, logdir, lg, filename) == GDK_SUCCEED) {
    2429             :                 return lg;
    2430             :         }
    2431             :         return NULL;
    2432             : }
    2433             : 
    2434             : static logged_range *
    2435       66833 : do_flush_range_cleanup(logger *lg)
    2436             : {
    2437       66833 :         logged_range *frange = lg->flush_ranges;
    2438       66833 :         logged_range *first = frange;
    2439             : 
    2440       78491 :         while (frange->next) {
    2441       12141 :                 if (ATOMIC_GET(&frange->refcount) > 1)
    2442             :                         break;
    2443       11658 :                 frange = frange->next;
    2444             :         }
    2445       66833 :         if (first == frange) {
    2446             :                 return first;
    2447             :         }
    2448             : 
    2449       11658 :         logged_range *flast = frange;
    2450             : 
    2451       11658 :         lg->flush_ranges = flast;
    2452             : 
    2453       23316 :         for (frange = first; frange && frange != flast; frange = frange->next) {
    2454       11658 :                 ATOMIC_DEC(&frange->refcount);
    2455       11658 :                 if (!LOG_DISABLED(lg) && frange->output_log) {
    2456           0 :                         TRC_INFO(WAL, "closing output log %s", mnstr_name(frange->output_log));
    2457           0 :                         close_stream(frange->output_log);
    2458           0 :                         frange->output_log = NULL;
    2459             :                 }
    2460             :         }
    2461             :         return flast;
    2462             : }
    2463             : 
    2464             : void
    2465         335 : log_destroy(logger *lg)
    2466             : {
    2467         335 :         log_close_input(lg);
    2468         335 :         logged_range *last = do_flush_range_cleanup(lg);
    2469         335 :         (void) last;
    2470         335 :         assert(last == lg->current && last == lg->flush_ranges);
    2471         335 :         log_close_output(lg);
    2472         755 :         for (logged_range * p = lg->pending; p; p = lg->pending) {
    2473         420 :                 lg->pending = p->next;
    2474         420 :                 ATOMIC_DESTROY(&p->refcount);
    2475         420 :                 ATOMIC_DESTROY(&p->last_ts);
    2476         420 :                 ATOMIC_DESTROY(&p->flushed_ts);
    2477         420 :                 ATOMIC_DESTROY(&p->drops);
    2478         420 :                 GDKfree(p);
    2479             :         }
    2480         335 :         if (LOG_DISABLED(lg)) {
    2481           1 :                 lg->saved_id = lg->id;
    2482           1 :                 lg->saved_tid = lg->tid;
    2483           1 :                 log_commit(lg, NULL, NULL, 0);
    2484             :         }
    2485         335 :         if (lg->catalog_bid) {
    2486         335 :                 log_lock(lg);
    2487         335 :                 BUN p, q;
    2488         335 :                 BAT *b = lg->catalog_bid;
    2489             : 
    2490             :                 /* free resources */
    2491         335 :                 const log_bid *bids = (const log_bid *) Tloc(b, 0);
    2492       86927 :                 BATloop(b, p, q) {
    2493       86592 :                         bat bid = bids[p];
    2494             : 
    2495       86592 :                         BBPrelease(bid);
    2496             :                 }
    2497             : 
    2498         335 :                 BBPrelease(lg->catalog_bid->batCacheid);
    2499         335 :                 BBPrelease(lg->catalog_id->batCacheid);
    2500         335 :                 BBPrelease(lg->dcatalog->batCacheid);
    2501         335 :                 logbat_destroy(lg->catalog_bid);
    2502         335 :                 logbat_destroy(lg->catalog_id);
    2503         335 :                 logbat_destroy(lg->dcatalog);
    2504             : 
    2505         335 :                 logbat_destroy(lg->catalog_cnt);
    2506         335 :                 logbat_destroy(lg->catalog_lid);
    2507         335 :                 log_unlock(lg);
    2508             :         }
    2509         335 :         MT_lock_destroy(&lg->lock);
    2510         335 :         MT_lock_destroy(&lg->rotation_lock);
    2511         335 :         MT_lock_destroy(&lg->flush_lock);
    2512         335 :         ATOMIC_DESTROY(&lg->nr_flushers);
    2513         335 :         GDKfree(lg->fn);
    2514         335 :         GDKfree(lg->dir);
    2515         335 :         GDKfree(lg->rbuf);
    2516         335 :         GDKfree(lg->wbuf);
    2517         335 :         GDKfree(lg);
    2518         335 : }
    2519             : 
    2520             : /* Create a new logger */
    2521             : logger *
    2522         336 : log_create(int debug, const char *fn, const char *logdir, int version,
    2523             :            preversionfix_fptr prefuncp, postversionfix_fptr postfuncp,
    2524             :            void *funcdata)
    2525             : {
    2526         336 :         logger *lg;
    2527         336 :         TRC_INFO_IF(WAL) {
    2528           0 :                 TRC_INFO_ENDIF(WAL, "Started processing logs %s/%s version %d\n", fn, logdir, version);
    2529           0 :                 GDKtracer_flush_buffer();
    2530             :         }
    2531         336 :         lg = log_new(debug, fn, logdir, version, prefuncp, postfuncp, funcdata);
    2532         336 :         if (lg == NULL)
    2533             :                 return NULL;
    2534         336 :         TRC_INFO_IF(WAL) {
    2535           0 :                 TRC_INFO_ENDIF(WAL, "Finished processing logs %s/%s\n", fn, logdir);
    2536           0 :                 GDKtracer_flush_buffer();
    2537             :         }
    2538         336 :         if (GDKsetenv("recovery", "finished") != GDK_SUCCEED) {
    2539           0 :                 log_destroy(lg);
    2540           0 :                 return NULL;
    2541             :         }
    2542         336 :         assert(lg->current == NULL);
    2543         336 :         logged_range dummy = {
    2544         336 :                 .cnt = BATcount(lg->catalog_bid),
    2545             :         };
    2546         336 :         lg->current = &dummy;
    2547         336 :         if (log_open_output(lg) != GDK_SUCCEED) {
    2548           0 :                 log_destroy(lg);
    2549           0 :                 return NULL;
    2550             :         }
    2551         336 :         lg->current = lg->current->next;
    2552         336 :         assert(lg->pending == NULL && lg->flush_ranges == NULL);
    2553         336 :         lg->pending = lg->current;
    2554         336 :         lg->flush_ranges = lg->current;
    2555         336 :         return lg;
    2556             : }
    2557             : 
    2558             : static logged_range *
    2559        7258 : log_next_logfile(logger *lg, ulng ts)
    2560             : {
    2561        7258 :         int m = (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) ? 1000 : 100;
    2562        7258 :         if (!lg->pending || !lg->pending->next)
    2563             :                 return NULL;
    2564        7258 :         rotation_lock(lg);
    2565        7258 :         if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && lg->pending != lg->flush_ranges &&
    2566        7258 :             (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) ATOMIC_GET(&lg->pending->flushed_ts) &&
    2567        7258 :             (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) {
    2568        6233 :                 rotation_unlock(lg);
    2569        6233 :                 logged_range *p = lg->pending;
    2570        6233 :                 for (int i = 1;
    2571       11573 :                      i < m && ATOMIC_GET(&p->refcount) == 0 && p->next && p->next != lg->current &&
    2572        5358 :                      p->next != lg->flush_ranges && (ulng) ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts)
    2573       16930 :                      && (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++)
    2574        5340 :                         p = p->next;
    2575        6233 :                 return p;
    2576             :         }
    2577        1025 :         rotation_unlock(lg);
    2578        1025 :         return NULL;
    2579             : }
    2580             : 
    2581             : static void
    2582        6233 : log_cleanup_range(logger *lg, ulng id)
    2583             : {
    2584        6233 :         rotation_lock(lg);
    2585       17806 :         while (lg->pending && lg->pending->id <= id) {
    2586       11573 :                 logged_range *p;
    2587       11573 :                 p = lg->pending;
    2588       11573 :                 if (p)
    2589       11573 :                         lg->pending = p->next;
    2590       11573 :                 GDKfree(p);
    2591             :         }
    2592        6233 :         rotation_unlock(lg);
    2593        6233 : }
    2594             : 
    2595             : static void
    2596       66501 : do_rotate(logger *lg)
    2597             : {
    2598       66501 :         logged_range *cur = lg->current;
    2599       66501 :         logged_range *next = cur->next;
    2600       66501 :         if (next) {
    2601       11658 :                 assert(ATOMIC_GET(&next->refcount) == 1);
    2602       11658 :                 lg->current = next;
    2603       11658 :                 if (!LOG_DISABLED(lg) && ATOMIC_GET(&cur->refcount) == 1 && cur->output_log) {
    2604       11463 :                         close_stream(cur->output_log);
    2605       11463 :                         cur->output_log = NULL;
    2606             :                 }
    2607             :         }
    2608       66501 : }
    2609             : 
    2610             : gdk_return
    2611       10991 : log_activate(logger *lg)
    2612             : {
    2613       10991 :         bool flush_cleanup = false;
    2614       10991 :         gdk_return res = GDK_SUCCEED;
    2615             : 
    2616       10991 :         rotation_lock(lg);
    2617       10991 :         const lng current_file_size = LOG_DISABLED(lg) ? 0 : (lng) getfilepos(getFile(lg->current->output_log));
    2618             : 
    2619       10991 :         if (current_file_size == -1) {
    2620           0 :                 rotation_unlock(lg);
    2621           0 :                 return GDK_FAIL;
    2622             :         }
    2623             :         /* file size of 2 means only endian indicator present
    2624             :          * (i.e. effectively empty) */
    2625       10991 :         if (current_file_size <= 2) {
    2626        6862 :                 rotation_unlock(lg);
    2627        6862 :                 return GDK_SUCCEED;
    2628             :         }
    2629             : 
    2630        4129 :         if (!lg->flushnow &&
    2631        4129 :             !lg->current->next &&
    2632        4129 :             current_file_size > 2 &&
    2633        4129 :             (ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped ||
    2634        4120 :                     current_file_size > lg->max_file_size ||
    2635        4074 :                     (GDKusec() - lg->file_age) > lg->max_file_age) &&
    2636          55 :             (ulng) ATOMIC_GET(&lg->current->last_ts) > 0 &&
    2637          55 :             lg->saved_id + 1 == lg->id &&
    2638          55 :             ATOMIC_GET(&lg->current->refcount) == 1 /* no pending work on this file */ ) {
    2639          51 :                 lg->id++;
    2640             :                 /* start new file */
    2641          51 :                 res = log_open_output(lg);
    2642          51 :                 flush_cleanup = true;
    2643          51 :                 do_rotate(lg);
    2644             :         }
    2645          51 :         if (flush_cleanup)
    2646          51 :                 (void) do_flush_range_cleanup(lg);
    2647        4129 :         rotation_unlock(lg);
    2648        4129 :         return res;
    2649             : }
    2650             : 
    2651             : gdk_return
    2652        7258 : log_flush(logger *lg, ulng ts)
    2653             : {
    2654        7258 :         logged_range *pending = log_next_logfile(lg, ts);
    2655        7258 :         ulng lid = pending ? pending->id : 0, olid = lg->saved_id;
    2656        7258 :         if (LOG_DISABLED(lg)) {
    2657           0 :                 lg->saved_id = lid;
    2658           0 :                 lg->saved_tid = lg->tid;
    2659           0 :                 if (lid)
    2660           0 :                         log_cleanup_range(lg, lg->saved_id);
    2661           0 :                 if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED)
    2662           0 :                         TRC_ERROR(GDK, "failed to commit");
    2663           0 :                 return GDK_SUCCEED;
    2664             :         }
    2665        7258 :         if (lg->saved_id >= lid)
    2666             :                 return GDK_SUCCEED;
    2667        6233 :         rotation_lock(lg);
    2668        6233 :         ulng lgid = lg->id;
    2669        6233 :         rotation_unlock(lg);
    2670        6233 :         if (lg->saved_id + 1 >= lgid)     /* logger should first release the file */
    2671             :                 return GDK_SUCCEED;
    2672        6233 :         log_return res = LOG_OK;
    2673        6233 :         ulng cid = olid;
    2674        6233 :         assert(lid <= lgid);
    2675             :         uint32_t *updated = NULL;
    2676             :         BUN nupdated = 0;
    2677             :         size_t allocated = 0;
    2678       17806 :         while (cid < lid && res == LOG_OK) {
    2679       11573 :                 if (!lg->input_log) {
    2680       11573 :                         char *filename;
    2681       11573 :                         char id[32];
    2682       11573 :                         if (snprintf(id, sizeof(id), LLFMT, cid + 1) >= (int) sizeof(id)) {
    2683           0 :                                 GDKfree(updated);
    2684           0 :                                 TRC_CRITICAL(GDK, "log_id filename is too large\n");
    2685           0 :                                 return GDK_FAIL;
    2686             :                         }
    2687       11573 :                         if ((filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id)) == NULL) {
    2688           0 :                                 GDKfree(updated);
    2689           0 :                                 return GDK_FAIL;
    2690             :                         }
    2691       11573 :                         if (strlen(filename) >= FILENAME_MAX) {
    2692           0 :                                 GDKfree(updated);
    2693           0 :                                 TRC_CRITICAL(GDK, "Logger filename path is too large\n");
    2694           0 :                                 GDKfree(filename);
    2695           0 :                                 return GDK_FAIL;
    2696             :                         }
    2697             : 
    2698       11573 :                         bool filemissing = false;
    2699       11573 :                         if (log_open_input(lg, filename, &filemissing) != GDK_SUCCEED) {
    2700           0 :                                 GDKfree(updated);
    2701           0 :                                 GDKfree(filename);
    2702           0 :                                 return GDK_FAIL;
    2703             :                         }
    2704       11573 :                         GDKfree(filename);
    2705             :                 }
    2706             :                 /* we read the full file because skipping is impossible with current log format */
    2707       11573 :                 log_lock(lg);
    2708       11573 :                 if (updated == NULL) {
    2709        6233 :                         nupdated = BATcount(lg->catalog_id);
    2710        6233 :                         allocated = ((nupdated + 31) & ~31) / 8;
    2711        6233 :                         if (allocated == 0)
    2712           0 :                                 allocated = 4;
    2713        6233 :                         updated = GDKzalloc(allocated);
    2714        6233 :                         if (updated == NULL) {
    2715           0 :                                 log_unlock(lg);
    2716           0 :                                 return GDK_FAIL;
    2717             :                         }
    2718        5340 :                 } else if (nupdated < BATcount(lg->catalog_id)) {
    2719          91 :                         BUN n = BATcount(lg->catalog_id);
    2720          91 :                         size_t a = ((n + 31) & ~31) / 8;
    2721          91 :                         if (a > allocated) {
    2722          13 :                                 uint32_t *p = GDKrealloc(updated, a);
    2723          13 :                                 if (p == NULL) {
    2724           0 :                                         GDKfree(updated);
    2725           0 :                                         log_unlock(lg);
    2726           0 :                                         return GDK_FAIL;
    2727             :                                 }
    2728          13 :                                 updated = p;
    2729          13 :                                 memset(updated + allocated / 4, 0, a - allocated);
    2730          13 :                                 allocated = a;
    2731             :                         }
    2732             :                         nupdated = n;
    2733             :                 }
    2734       11573 :                 lg->flushing = true;
    2735       11573 :                 res = log_read_transaction(lg, updated, nupdated);
    2736       11573 :                 lg->flushing = false;
    2737       11573 :                 log_unlock(lg);
    2738       11573 :                 if (res == LOG_EOF) {
    2739       11573 :                         log_close_input(lg);
    2740       11573 :                         res = LOG_OK;
    2741             :                 }
    2742       11573 :                 cid++;
    2743             :         }
    2744        6233 :         if (lid > olid && res == LOG_OK) {
    2745        6233 :                 rotation_lock(lg);      /* protect against concurrent log_tflush rotate check */
    2746        6233 :                 lg->saved_id = lid;
    2747        6233 :                 rotation_unlock(lg);
    2748        6233 :                 if (log_commit(lg, pending, updated, nupdated) != GDK_SUCCEED) {
    2749           0 :                         TRC_ERROR(GDK, "failed to commit");
    2750           0 :                         res = LOG_ERR;
    2751           0 :                         rotation_lock(lg);
    2752           0 :                         lg->saved_id = olid; /* reset !! */
    2753           0 :                         rotation_unlock(lg);
    2754             :                 }
    2755           0 :                 if (res != LOG_ERR) {
    2756       17806 :                         while (olid < lid) {
    2757             :                                 /* Try to cleanup, remove old log file, continue on failure! */
    2758       11573 :                                 olid++;
    2759       11573 :                                 (void) log_cleanup(lg, olid);
    2760             :                         }
    2761             :                 }
    2762        6233 :                 if (res == LOG_OK)
    2763        6233 :                         log_cleanup_range(lg, lg->saved_id);
    2764             :         }
    2765        6233 :         GDKfree(updated);
    2766        6233 :         return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
    2767             : }
    2768             : 
    2769             : /* Clean-up write-ahead log files already persisted in the BATs, leaving only the most recent one.
    2770             :  * Only the bak- files are deleted for the preserved WAL files.
    2771             :  */
    2772             : lng
    2773       29746 : log_changes(logger *lg)
    2774             : {
    2775       29746 :         if (LOG_DISABLED(lg))
    2776             :                 return 0;
    2777       29746 :         rotation_lock(lg);
    2778       29746 :         lng changes = lg->id - lg->saved_id - 1;
    2779       29746 :         rotation_unlock(lg);
    2780       29746 :         return changes;
    2781             : }
    2782             : 
    2783             : int
    2784         463 : log_sequence(logger *lg, int seq, lng *id)
    2785             : {
    2786         463 :         log_lock(lg);
    2787         463 :         BUN p = log_find(lg->seqs_id, lg->dseqs, seq);
    2788             : 
    2789         463 :         if (p != BUN_NONE) {
    2790         348 :                 *id = *(lng *) Tloc(lg->seqs_val, p);
    2791             : 
    2792         348 :                 log_unlock(lg);
    2793         348 :                 return 1;
    2794             :         }
    2795         115 :         log_unlock(lg);
    2796         115 :         return 0;
    2797             : }
    2798             : 
    2799             : gdk_return
    2800      182774 : log_constant(logger *lg, int type, ptr val, log_id id, lng offset, lng cnt)
    2801             : {
    2802      182774 :         bte tpe = find_type(lg, type);
    2803      182774 :         gdk_return ok = GDK_SUCCEED;
    2804      182774 :         logformat l;
    2805      182774 :         lng nr;
    2806      182774 :         l.flag = LOG_UPDATE_CONST;
    2807      182774 :         l.id = id;
    2808      182774 :         nr = cnt;
    2809             : 
    2810      182774 :         if (LOG_DISABLED(lg) || !nr) {
    2811             :                 /* logging is switched off */
    2812       55698 :                 if (nr) {
    2813       55698 :                         log_lock(lg);
    2814       55698 :                         ok = la_bat_update_count(lg, id, offset + cnt, lg->tid);
    2815       55698 :                         log_unlock(lg);
    2816             :                 }
    2817       55698 :                 return ok;
    2818             :         }
    2819             : 
    2820      127076 :         gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[type].atomWrite;
    2821             : 
    2822      127076 :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
    2823      254152 :         if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
    2824      254152 :             log_write_format(lg, &l) != GDK_SUCCEED ||
    2825      254152 :             !mnstr_writeLng(lg->current->output_log, nr) ||
    2826      254152 :             mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
    2827      127076 :             !mnstr_writeLng(lg->current->output_log, offset)) {
    2828           0 :                 ATOMIC_DEC(&lg->current->refcount);
    2829           0 :                 ok = GDK_FAIL;
    2830           0 :                 goto bailout;
    2831             :         }
    2832             : 
    2833      127076 :         ok = wt(val, lg->current->output_log, 1);
    2834             : 
    2835      127076 :         TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
    2836             : 
    2837      127076 :   bailout:
    2838      127076 :         if (ok != GDK_SUCCEED) {
    2839           0 :                 const char *err = mnstr_peek_error(lg->current->output_log);
    2840           0 :                 TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
    2841             :         }
    2842             :         return ok;
    2843             : }
    2844             : 
    2845             : static gdk_return
    2846       18761 : string_writer(logger *lg, BAT *b, lng offset, lng nr)
    2847             : {
    2848       18761 :         size_t bufsz = lg->wbufsize, resize = 0;
    2849       18761 :         BUN end = (BUN) (offset + nr);
    2850       18761 :         char *buf = lg->wbuf;
    2851       18761 :         gdk_return res = GDK_SUCCEED;
    2852             : 
    2853       18761 :         if (!buf)
    2854             :                 return GDK_FAIL;
    2855       18761 :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
    2856       18761 :         if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
    2857             :                 return GDK_FAIL;
    2858       18761 :         BATiter bi = bat_iterator(b);
    2859       18761 :         BUN p = (BUN) offset;
    2860       37574 :         for (; p < end;) {
    2861       18813 :                 size_t sz = 0;
    2862       18813 :                 if (resize) {
    2863           2 :                         if ((buf = GDKrealloc(lg->wbuf, resize)) == NULL) {
    2864             :                                 res = GDK_FAIL;
    2865             :                                 break;
    2866             :                         }
    2867           2 :                         lg->wbuf = buf;
    2868           2 :                         lg->wbufsize = bufsz = resize;
    2869           2 :                         resize = 0;
    2870             :                 }
    2871       18813 :                 char *dst = buf;
    2872      102873 :                 for (; p < end && sz < bufsz; p++) {
    2873       84112 :                         const char *s = BUNtvar(bi, p);
    2874       84112 :                         size_t len = strlen(s) + 1;
    2875       84112 :                         if ((sz + len) > bufsz) {
    2876          52 :                                 if (len > bufsz)
    2877           2 :                                         resize = len + bufsz;
    2878             :                                 break;
    2879             :                         } else {
    2880       84060 :                                 memcpy(dst, s, len);
    2881       84060 :                                 dst += len;
    2882       84060 :                                 sz += len;
    2883             :                         }
    2884             :                 }
    2885       37624 :                 if (sz &&
    2886       37622 :                     (!mnstr_writeLng(lg->current->output_log, (lng) sz) ||
    2887       18811 :                      mnstr_write(lg->current->output_log, buf, sz, 1) != 1)) {
    2888             :                         res = GDK_FAIL;
    2889             :                         break;
    2890             :                 }
    2891             :         }
    2892       18761 :         bat_iterator_end(&bi);
    2893       18761 :         return res;
    2894             : }
    2895             : 
    2896             : static gdk_return
    2897      490664 : internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced, lng total_cnt)
    2898             : {
    2899      490664 :         bte tpe = find_type(lg, b->ttype);
    2900      490664 :         gdk_return ok = GDK_SUCCEED;
    2901      490664 :         logformat l;
    2902      490664 :         BUN p;
    2903      490664 :         lng nr;
    2904      490664 :         l.flag = LOG_UPDATE_BULK;
    2905      490664 :         l.id = id;
    2906      490664 :         nr = cnt;
    2907             : 
    2908      490664 :         if (LOG_DISABLED(lg) || !nr) {
    2909             :                 /* logging is switched off */
    2910      194106 :                 if (nr)
    2911      141173 :                         return la_bat_update_count(lg, id, offset + cnt, lg->tid);
    2912             :                 return GDK_SUCCEED;
    2913             :         }
    2914             : 
    2915      267134 :         gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[b->ttype].atomWrite;
    2916             : 
    2917      267134 :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
    2918      267134 :         if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR) {
    2919           0 :                 ok = GDK_FAIL;
    2920           0 :                 goto bailout;
    2921             :         }
    2922             : 
    2923      267134 :         if (lg->total_cnt == 0)      /* signals single bulk message or first part of bat logged in parts */
    2924      532220 :                 if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2925      660893 :                     !mnstr_writeLng(lg->current->output_log, total_cnt ? total_cnt : cnt) ||
    2926      532220 :                     mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
    2927      403547 :                     !mnstr_writeLng(lg->current->output_log, total_cnt ? -1 : offset)) {  /* offset = -1 indicates bat was logged in parts */
    2928           0 :                         ok = GDK_FAIL;
    2929           0 :                         goto bailout;
    2930             :                 }
    2931      267134 :         if (!total_cnt)
    2932      128673 :                 total_cnt = cnt;
    2933      267134 :         lg->total_cnt += cnt;
    2934             : 
    2935      267134 :         if (lg->total_cnt == total_cnt)      /* This is the last to be logged part of this bat, we can already reset the total_cnt */
    2936      266110 :                 lg->total_cnt = 0;
    2937             : 
    2938             :         /* if offset is just for the log, but BAT is already sliced, reset offset */
    2939      267134 :         if (sliced)
    2940        1478 :                 offset = 0;
    2941      267134 :         if (b->ttype == TYPE_msk) {
    2942           0 :                 BATiter bi = bat_iterator(b);
    2943           0 :                 if (offset % 32 == 0) {
    2944           0 :                         if (!mnstr_writeIntArray(lg->current->output_log, (int *) ((char *) bi.base + offset / 32),
    2945           0 :                              (size_t) ((nr + 31) / 32)))
    2946           0 :                                 ok = GDK_FAIL;
    2947             :                 } else {
    2948           0 :                         for (lng i = 0; i < nr; i += 32) {
    2949             :                                 uint32_t v = 0;
    2950           0 :                                 for (int j = 0; j < 32 && i + j < nr; j++)
    2951           0 :                                         v |= (uint32_t) Tmskval(&bi, (BUN) (offset + i + j)) << j;
    2952           0 :                                 if (!mnstr_writeInt(lg->current->output_log, (int) v)) {
    2953             :                                         ok = GDK_FAIL;
    2954             :                                         break;
    2955             :                                 }
    2956             :                         }
    2957             :                 }
    2958           0 :                 bat_iterator_end(&bi);
    2959      514871 :         } else if (b->ttype < TYPE_str && !isVIEW(b)) {
    2960      247737 :                 BATiter bi = bat_iterator(b);
    2961      247737 :                 const void *t = BUNtail(bi, (BUN) offset);
    2962             : 
    2963      247737 :                 ok = wt(t, lg->current->output_log, (size_t) nr);
    2964      247737 :                 bat_iterator_end(&bi);
    2965       19397 :         } else if (b->ttype == TYPE_str) {
    2966             :                 /* efficient string writes */
    2967       18755 :                 ok = string_writer(lg, b, offset, nr);
    2968             :         } else {
    2969         642 :                 BATiter bi = bat_iterator(b);
    2970         642 :                 BUN end = (BUN) (offset + nr);
    2971        1642 :                 for (p = (BUN) offset; p < end && ok == GDK_SUCCEED; p++) {
    2972        1000 :                         const void *t = BUNtail(bi, p);
    2973             : 
    2974        1000 :                         ok = wt(t, lg->current->output_log, 1);
    2975             :                 }
    2976         642 :                 bat_iterator_end(&bi);
    2977             :         }
    2978             : 
    2979      267134 :         TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
    2980             : 
    2981      267134 :   bailout:
    2982      267134 :         if (ok != GDK_SUCCEED) {
    2983           0 :                 ATOMIC_DEC(&lg->current->refcount);
    2984           0 :                 const char *err = mnstr_peek_error(lg->current->output_log);
    2985           0 :                 TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
    2986             :         }
    2987             :         return ok;
    2988             : }
    2989             : 
    2990             : /*
    2991             :  * Changes made to the BAT descriptor should be stored in the log
    2992             :  * files.  Actually, we need to save the descriptor file, perhaps we
    2993             :  * should simply introduce a versioning scheme.
    2994             :  */
    2995             : gdk_return
    2996      233872 : log_bat_persists(logger *lg, BAT *b, log_id id)
    2997             : {
    2998      233872 :         log_lock(lg);
    2999      233872 :         bte ta = find_type(lg, b->ttype);
    3000      233872 :         logformat l;
    3001             : 
    3002      233872 :         if (log_add_bat(lg, b, id, -1) != GDK_SUCCEED) {
    3003           0 :                 log_unlock(lg);
    3004           0 :                 if (!LOG_DISABLED(lg))
    3005           0 :                         ATOMIC_DEC(&lg->current->refcount);
    3006           0 :                 return GDK_FAIL;
    3007             :         }
    3008             : 
    3009      233872 :         l.flag = LOG_CREATE;
    3010      233872 :         l.id = id;
    3011      233872 :         if (!LOG_DISABLED(lg)) {
    3012      156607 :                 assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
    3013      313214 :                 if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
    3014      313214 :                     log_write_format(lg, &l) != GDK_SUCCEED ||
    3015      156607 :                     mnstr_write(lg->current->output_log, &ta, 1, 1) != 1) {
    3016           0 :                         log_unlock(lg);
    3017           0 :                         ATOMIC_DEC(&lg->current->refcount);
    3018           0 :                         return GDK_FAIL;
    3019             :                 }
    3020             :         }
    3021      233872 :         TRC_DEBUG(WAL, "id (%d) bat (%d)\n", id, b->batCacheid);
    3022      233872 :         gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0, 0);
    3023      233872 :         log_unlock(lg);
    3024      233872 :         if (r != GDK_SUCCEED)
    3025           0 :                 ATOMIC_DEC(&lg->current->refcount);
    3026             :         return r;
    3027             : }
    3028             : 
    3029             : gdk_return
    3030       21352 : log_bat_transient(logger *lg, log_id id)
    3031             : {
    3032       21352 :         log_lock(lg);
    3033       21352 :         log_bid bid = internal_find_bat(lg, id, -1);
    3034       21352 :         logformat l;
    3035             : 
    3036       21352 :         if (bid < 0) {
    3037           0 :                 log_unlock(lg);
    3038           0 :                 return GDK_FAIL;
    3039             :         }
    3040       21352 :         if (!bid) {
    3041           0 :                 GDKerror("log_bat_transient failed to find bid for object %d\n", id);
    3042           0 :                 log_unlock(lg);
    3043           0 :                 return GDK_FAIL;
    3044             :         }
    3045       21352 :         l.flag = LOG_DESTROY;
    3046       21352 :         l.id = id;
    3047             : 
    3048       21352 :         if (!LOG_DISABLED(lg)) {
    3049       12304 :                 if (log_write_format(lg, &l) != GDK_SUCCEED) {
    3050           0 :                         TRC_CRITICAL(GDK, "write failed\n");
    3051           0 :                         log_unlock(lg);
    3052           0 :                         ATOMIC_DEC(&lg->current->refcount);
    3053           0 :                         return GDK_FAIL;
    3054             :                 }
    3055             :         }
    3056       21352 :         TRC_DEBUG(WAL, "Logged destroyed bat (%d) %d\n", id, bid);
    3057       21352 :         BAT *b = BBPquickdesc(bid);
    3058       21352 :         assert(b);
    3059       21352 :         BUN cnt = BATcount(b);
    3060       21352 :         ATOMIC_ADD(&lg->current->drops, cnt);
    3061       21352 :         gdk_return r = log_del_bat(lg, bid);
    3062       21352 :         log_unlock(lg);
    3063       21352 :         if (r != GDK_SUCCEED)
    3064           0 :                 ATOMIC_DEC(&lg->current->refcount);
    3065             :         return r;
    3066             : }
    3067             : 
    3068             : static gdk_return
    3069      112486 : log_bat_group(logger *lg, log_id id)
    3070             : {
    3071      112486 :         if (LOG_DISABLED(lg))
    3072             :                 return GDK_SUCCEED;
    3073             : 
    3074       75086 :         logformat l;
    3075       75086 :         l.flag = LOG_BAT_GROUP;
    3076       75086 :         l.id = id;
    3077       75086 :         gdk_return r = log_write_format(lg, &l);
    3078       75086 :         return r;
    3079             : }
    3080             : 
    3081             : gdk_return
    3082       56243 : log_bat_group_start(logger *lg, log_id id)
    3083             : {
    3084             :         /*positive table id represent start of logged table */
    3085       56243 :         return log_bat_group(lg, id);
    3086             : }
    3087             : 
    3088             : gdk_return
    3089       56243 : log_bat_group_end(logger *lg, log_id id)
    3090             : {
    3091             :         /*negative table id represent end of logged table */
    3092       56243 :         return log_bat_group(lg, -id);
    3093             : }
    3094             : 
    3095             : gdk_return
    3096      254113 : log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, lng total_cnt)
    3097             : {
    3098      254113 :         log_lock(lg);
    3099      254113 :         gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0, total_cnt);
    3100      254113 :         log_unlock(lg);
    3101      254113 :         return r;
    3102             : }
    3103             : 
    3104             : gdk_return
    3105        2699 : log_delta(logger *lg, BAT *uid, BAT *uval, log_id id)
    3106             : {
    3107        2699 :         log_lock(lg);
    3108        2699 :         bte tpe = find_type(lg, uval->ttype);
    3109        2699 :         gdk_return ok = GDK_SUCCEED;
    3110        2699 :         logformat l;
    3111        2699 :         BUN p;
    3112        2699 :         lng nr;
    3113             : 
    3114        2699 :         if (BATtdense(uid)) {
    3115        2679 :                 ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1, 0);
    3116        2679 :                 log_unlock(lg);
    3117        2679 :                 if (!LOG_DISABLED(lg) && ok != GDK_SUCCEED)
    3118           0 :                         ATOMIC_DEC(&lg->current->refcount);
    3119        2679 :                 return ok;
    3120             :         }
    3121             : 
    3122          20 :         assert(uid->ttype == TYPE_oid || uid->ttype == TYPE_void);
    3123             : 
    3124          20 :         l.flag = LOG_UPDATE;
    3125          20 :         l.id = id;
    3126          20 :         nr = (BATcount(uval));
    3127          20 :         assert(nr);
    3128             : 
    3129          20 :         if (LOG_DISABLED(lg)) {
    3130             :                 /* logging is switched off */
    3131           1 :                 log_unlock(lg);
    3132           1 :                 return GDK_SUCCEED;
    3133             :         }
    3134             : 
    3135          19 :         BATiter vi = bat_iterator(uval);
    3136          19 :         gdk_return(*wh) (const void *, stream *, size_t) = BATatoms[TYPE_oid].atomWrite;
    3137          19 :         gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[uval->ttype].atomWrite;
    3138             : 
    3139          19 :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
    3140          38 :         if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
    3141          38 :             log_write_format(lg, &l) != GDK_SUCCEED ||
    3142          38 :             !mnstr_writeLng(lg->current->output_log, nr) ||
    3143          19 :             mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1) {
    3144           0 :                 ok = GDK_FAIL;
    3145           0 :                 goto bailout;
    3146             :         }
    3147        1074 :         for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
    3148        1055 :                 const oid id = BUNtoid(uid, p);
    3149             : 
    3150        1055 :                 ok = wh(&id, lg->current->output_log, 1);
    3151             :         }
    3152          19 :         if (uval->ttype == TYPE_msk) {
    3153           0 :                 if (!mnstr_writeIntArray(lg->current->output_log, vi.base,
    3154           0 :                                          (BATcount(uval) + 31) / 32))
    3155           0 :                         ok = GDK_FAIL;
    3156          32 :         } else if (uval->ttype < TYPE_str && !isVIEW(uval)) {
    3157          13 :                 const void *t = BUNtail(vi, 0);
    3158             : 
    3159          13 :                 ok = wt(t, lg->current->output_log, (size_t) nr);
    3160           6 :         } else if (uval->ttype == TYPE_str) {
    3161             :                 /* efficient string writes */
    3162           6 :                 ok = string_writer(lg, uval, 0, nr);
    3163             :         } else {
    3164           0 :                 for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
    3165           0 :                         const void *val = BUNtail(vi, p);
    3166             : 
    3167           0 :                         ok = wt(val, lg->current->output_log, 1);
    3168             :                 }
    3169             :         }
    3170             : 
    3171          19 :         TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
    3172             : 
    3173          19 :   bailout:
    3174          19 :         bat_iterator_end(&vi);
    3175          19 :         if (ok != GDK_SUCCEED) {
    3176           0 :                 const char *err = mnstr_peek_error(lg->current->output_log);
    3177           0 :                 TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
    3178           0 :                 ATOMIC_DEC(&lg->current->refcount);
    3179             :         }
    3180          19 :         log_unlock(lg);
    3181          19 :         return ok;
    3182             : }
    3183             : 
    3184             : static inline bool
    3185       56230 : check_rotation_conditions(logger *lg)
    3186             : {
    3187       56230 :         if (LOG_DISABLED(lg))
    3188             :                 return false;
    3189             : 
    3190       56227 :         if (lg->current->next)
    3191             :                 return false;   /* do not rotate if there is already a prepared next current */
    3192       56227 :         if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
    3193             :                 return true;
    3194       56227 :         const lng current_file_size = (lng) getfilepos(getFile(lg->current->output_log));
    3195             : 
    3196       56227 :         if (current_file_size == -1)
    3197             :                 return false;
    3198             : 
    3199       56227 :         assert(current_file_size >= 0);
    3200             : 
    3201       56227 :         if (current_file_size == 2)
    3202             :                 return false;
    3203             : 
    3204        3309 :         bool res = (lg->saved_id + 1 >= lg->id && ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped) ||
    3205       57321 :                 current_file_size > lg->max_file_size ||
    3206       49511 :                 (GDKusec() - lg->file_age) > lg->max_file_age;
    3207             : 
    3208       54019 :         return res;
    3209             : }
    3210             : 
    3211             : gdk_return
    3212       61340 : log_tend(logger *lg)
    3213             : {
    3214       61340 :         TRC_DEBUG(WAL, "tend %d\n", lg->tid);
    3215             : 
    3216       61340 :         if (LOG_DISABLED(lg))
    3217             :                 return GDK_SUCCEED;
    3218             : 
    3219       56227 :         gdk_return result;
    3220       56227 :         logformat l;
    3221       56227 :         l.flag = LOG_END;
    3222       56227 :         l.id = lg->tid;
    3223             : 
    3224       56227 :         if ((result = log_write_format(lg, &l)) == GDK_SUCCEED)
    3225       56227 :                 ATOMIC_INC(&lg->nr_flushers);
    3226             :         return result;
    3227             : }
    3228             : 
    3229             : #define flush_lock(lg)          MT_lock_set(&(lg)->flush_lock)
    3230             : #define flush_unlock(lg)        MT_lock_unset(&(lg)->flush_lock)
    3231             : 
    3232             : static inline gdk_return
    3233       55380 : do_flush(logged_range *range)
    3234             : {
    3235             :         /* assumes flush lock */
    3236       55380 :         stream *output_log = range->output_log;
    3237       55380 :         ulng ts = ATOMIC_GET(&range->last_ts);
    3238             : 
    3239       55380 :         if (mnstr_flush(output_log, MNSTR_FLUSH_DATA) ||
    3240       55380 :             (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) && mnstr_fsync(output_log)))
    3241           0 :                 return GDK_FAIL;
    3242       55380 :         ATOMIC_SET(&range->flushed_ts, ts);
    3243       55380 :         return GDK_SUCCEED;
    3244             : }
    3245             : 
    3246             : static inline void
    3247       61337 : log_tdone(logger *lg, logged_range *range, ulng commit_ts)
    3248             : {
    3249       61337 :         (void) lg;
    3250       61337 :         TRC_DEBUG(WAL, "tdone " LLFMT "\n", commit_ts);
    3251             : 
    3252       61337 :         if ((ulng) ATOMIC_GET(&range->last_ts) < commit_ts)
    3253       60490 :                 ATOMIC_SET(&range->last_ts, commit_ts);
    3254       61337 : }
    3255             : 
    3256             : gdk_return
    3257       61340 : log_tflush(logger *lg, ulng file_id, ulng commit_ts)
    3258             : {
    3259       61340 :         if (lg->flushnow) {
    3260        5110 :                 rotation_lock(lg);
    3261        5110 :                 logged_range *p = lg->current;
    3262        5110 :                 assert(lg->flush_ranges == lg->current);
    3263        5110 :                 assert(ATOMIC_GET(&lg->current->flushed_ts) == ATOMIC_GET(&lg->current->last_ts));
    3264        5110 :                 log_tdone(lg, lg->current, commit_ts);
    3265        5110 :                 ATOMIC_SET(&lg->current->flushed_ts, commit_ts);
    3266        5110 :                 lg->id++;
    3267        5110 :                 lg->flushnow = false;
    3268        5110 :                 if (log_open_output(lg) != GDK_SUCCEED)
    3269           0 :                         GDKfatal("Could not create new log file\n");  /* TODO: does not have to be fatal (yet) */
    3270        5110 :                 do_rotate(lg);
    3271        5110 :                 (void) do_flush_range_cleanup(lg);
    3272        5110 :                 assert(lg->flush_ranges == lg->current);
    3273        5110 :                 rotation_unlock(lg);
    3274        5110 :                 return log_commit(lg, p, NULL, 0);
    3275             :         }
    3276             : 
    3277       56230 :         if (LOG_DISABLED(lg))
    3278             :                 return GDK_SUCCEED;
    3279             : 
    3280       56227 :         rotation_lock(lg);
    3281       56227 :         logged_range *frange = do_flush_range_cleanup(lg);
    3282             : 
    3283       56507 :         while (frange->next && frange->id < file_id) {
    3284             :                 assert(frange->next);
    3285             :                 frange = frange->next;
    3286             :         }
    3287             : 
    3288       56227 :         log_tdone(lg, frange, commit_ts);
    3289             : 
    3290       56227 :         if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts) {
    3291             :                 /* delay needed ? */
    3292             : 
    3293       55380 :                 flush_lock(lg);
    3294             :                 /* check it one more time */
    3295       55380 :                 if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts)
    3296       55380 :                         do_flush(frange);
    3297       55380 :                 flush_unlock(lg);
    3298             :         }
    3299             :         /* else somebody else has flushed our log file */
    3300             : 
    3301       56227 :         if (ATOMIC_DEC(&frange->refcount) == 1 && !LOG_DISABLED(lg)) {
    3302       54133 :                 if (frange != lg->current && frange->output_log) {
    3303         195 :                         close_stream(frange->output_log);
    3304         195 :                         frange->output_log = NULL;
    3305             :                 }
    3306             :         }
    3307             : 
    3308       56227 :         if (ATOMIC_DEC(&lg->nr_flushers) == 0) {
    3309             :                 /* I am the last flusher
    3310             :                  * if present,
    3311             :                  * wake up the exclusive flusher in log_tstart */
    3312             :                 /* rotation_lock is still being held */
    3313       54442 :                 MT_cond_signal(&lg->excl_flush_cv);
    3314             :         }
    3315       56227 :         rotation_unlock(lg);
    3316             : 
    3317       56227 :         return GDK_SUCCEED;
    3318             : }
    3319             : 
    3320             : static gdk_return
    3321        6658 : log_tsequence_(logger *lg, int seq, lng val)
    3322             : {
    3323        6658 :         logformat l;
    3324             : 
    3325        6658 :         if (LOG_DISABLED(lg))
    3326             :                 return GDK_SUCCEED;
    3327        2761 :         l.flag = LOG_SEQ;
    3328        2761 :         l.id = seq;
    3329             : 
    3330        2761 :         TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
    3331             : 
    3332        2761 :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
    3333        5522 :         if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
    3334        5522 :             log_write_format(lg, &l) != GDK_SUCCEED ||
    3335        2761 :             !mnstr_writeLng(lg->current->output_log, val)) {
    3336           0 :                 TRC_CRITICAL(GDK, "write failed\n");
    3337           0 :                 ATOMIC_DEC(&lg->current->refcount);
    3338           0 :                 return GDK_FAIL;
    3339             :         }
    3340             :         return GDK_SUCCEED;
    3341             : }
    3342             : 
    3343             : /* a transaction in it self */
    3344             : gdk_return
    3345        6658 : log_tsequence(logger *lg, int seq, lng val)
    3346             : {
    3347        6658 :         BUN p;
    3348             : 
    3349        6658 :         TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
    3350             : 
    3351        6658 :         log_lock(lg);
    3352        6658 :         MT_lock_set(&lg->seqs_id->theaplock);
    3353        6658 :         BUN inserted = lg->seqs_id->batInserted;
    3354        6658 :         MT_lock_unset(&lg->seqs_id->theaplock);
    3355        6658 :         if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE && p >= inserted) {
    3356        1607 :                 assert(lg->seqs_val->hseqbase == 0);
    3357        1607 :                 if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
    3358           0 :                         log_unlock(lg);
    3359           0 :                         return GDK_FAIL;
    3360             :                 }
    3361             :         } else {
    3362        4693 :                 if (p != BUN_NONE) {
    3363        4693 :                         oid pos = p;
    3364        4693 :                         if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
    3365           0 :                                 log_unlock(lg);
    3366           0 :                                 return GDK_FAIL;
    3367             :                         }
    3368             :                 }
    3369       10102 :                 if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
    3370        5051 :                     BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
    3371           0 :                         log_unlock(lg);
    3372           0 :                         return GDK_FAIL;
    3373             :                 }
    3374             :         }
    3375        6658 :         gdk_return r = log_tsequence_(lg, seq, val);
    3376        6658 :         log_unlock(lg);
    3377        6658 :         return r;
    3378             : }
    3379             : 
    3380             : static gdk_return
    3381       11680 : bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
    3382             : {
    3383       11680 :         log_lock(lg);
    3384       11680 :         BAT *b = lg->catalog_bid;
    3385       11680 :         const log_bid *bids;
    3386             : 
    3387       11680 :         bids = (log_bid *) Tloc(b, 0);
    3388      245476 :         for (BUN p = b->batInserted, cnt = pending ? pending->cnt : BATcount(b); p < cnt; p++) {
    3389      233796 :                 log_bid bid = bids[p];
    3390      233796 :                 BAT *lb;
    3391             : 
    3392      233796 :                 assert(bid);
    3393      233796 :                 if ((lb = BBP_desc(bid)) == NULL || BATmode(lb, false) != GDK_SUCCEED) {
    3394           0 :                         GDKwarning("Failed to set bat (%d%s) persistent\n", bid, !lb ? " gone" : "");
    3395           0 :                         log_unlock(lg);
    3396           0 :                         return GDK_FAIL;
    3397             :                 }
    3398             : 
    3399      233796 :                 assert(lb->batRestricted != BAT_WRITE);
    3400             : 
    3401      233796 :                 TRC_DEBUG(WAL, "create %d (%d)\n", bid, BBP_lrefs(bid));
    3402             :         }
    3403             :         /* bm_subcommit releases the lock */
    3404       11680 :         return bm_subcommit(lg, pending, updated, maxupdated);
    3405             : }
    3406             : 
    3407             : static gdk_return
    3408      234149 : log_add_bat(logger *lg, BAT *b, log_id id, int tid)
    3409             : {
    3410      234149 :         log_bid bid = internal_find_bat(lg, id, tid);
    3411      234149 :         lng cnt = 0;
    3412      234149 :         lng lid = lng_nil;
    3413             : 
    3414      234149 :         assert(b->batRestricted != BAT_WRITE);
    3415      234149 :         assert(b->batRole == PERSISTENT);
    3416      234149 :         if (bid < 0)
    3417             :                 return GDK_FAIL;
    3418      234149 :         if (bid) {
    3419      155496 :                 if (bid != b->batCacheid) {
    3420      155493 :                         if (log_del_bat(lg, bid) != GDK_SUCCEED)
    3421             :                                 return GDK_FAIL;
    3422             :                 } else {
    3423             :                         return GDK_SUCCEED;
    3424             :                 }
    3425             :         }
    3426      234146 :         bid = b->batCacheid;
    3427      234146 :         TRC_DEBUG(WAL, "create %d\n", id);
    3428      234146 :         assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
    3429      468292 :         if (BUNappend(lg->catalog_bid, &bid, true) != GDK_SUCCEED ||
    3430      468292 :             BUNappend(lg->catalog_id, &id, true) != GDK_SUCCEED ||
    3431      468292 :             BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED ||
    3432      234146 :             BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
    3433           0 :                 return GDK_FAIL;
    3434      234146 :         if (lg->current)
    3435      233869 :                 lg->current->cnt++;
    3436      234146 :         BBPretain(bid);
    3437      234146 :         return GDK_SUCCEED;
    3438             : }
    3439             : 
    3440             : static gdk_return
    3441      176918 : log_del_bat(logger *lg, log_bid bid)
    3442             : {
    3443      176918 :         BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid);
    3444      176918 :         lng lid = lg->tid;
    3445             : 
    3446      176918 :         assert(p != BUN_NONE);
    3447      176918 :         if (p == BUN_NONE) {
    3448             :                 GDKerror("cannot find BAT\n");
    3449             :                 return GDK_FAIL;
    3450             :         }
    3451             : 
    3452      176918 :         assert(lg->catalog_lid->hseqbase == 0);
    3453      176918 :         return BUNreplace(lg->catalog_lid, p, &lid, false);
    3454             : }
    3455             : 
    3456             : /* returns -1 on failure, 0 when not found, > 0 when found */
    3457             : log_bid
    3458       28616 : log_find_bat(logger *lg, log_id id)
    3459             : {
    3460       28616 :         log_lock(lg);
    3461       28616 :         log_bid bid = internal_find_bat(lg, id, -1);
    3462       28616 :         log_unlock(lg);
    3463       28616 :         if (!bid) {
    3464           0 :                 GDKerror("logger_find_bat failed to find bid for object %d\n", id);
    3465           0 :                 return GDK_FAIL;
    3466             :         }
    3467             :         return bid;
    3468             : }
    3469             : 
    3470             : 
    3471             : 
    3472             : gdk_return
    3473       61340 : log_tstart(logger *lg, bool flushnow, ulng *file_id)
    3474             : {
    3475       61340 :         rotation_lock(lg);
    3476       61340 :         if (flushnow) {
    3477             :                 /* I am now the exclusive flusher */
    3478        5110 :                 if (ATOMIC_GET(&lg->nr_flushers)) {
    3479             :                         /* I am waiting until all existing flushers are done */
    3480           0 :                         MT_cond_wait(&lg->excl_flush_cv, &lg->rotation_lock);
    3481             :                 }
    3482        5110 :                 assert(ATOMIC_GET(&lg->nr_flushers) == 0);
    3483             : 
    3484        5110 :                 if (ATOMIC_GET(&lg->current->last_ts)) {
    3485        1989 :                         lg->id++;
    3486        1989 :                         if (log_open_output(lg) != GDK_SUCCEED)
    3487           0 :                                 GDKfatal("Could not create new log file\n");  /* TODO: does not have to be fatal (yet) */
    3488             :                 }
    3489        5110 :                 do_rotate(lg);
    3490        5110 :                 (void) do_flush_range_cleanup(lg);
    3491        5110 :                 rotation_unlock(lg);
    3492             : 
    3493        5110 :                 if (lg->saved_id + 1 < lg->id)
    3494        4705 :                         log_flush(lg, (1ULL << 63));
    3495        5110 :                 lg->flushnow = flushnow;
    3496             :         } else {
    3497       56230 :                 if (check_rotation_conditions(lg)) {
    3498        4508 :                         lg->id++;
    3499        4508 :                         if (log_open_output(lg) != GDK_SUCCEED)
    3500           0 :                                 GDKfatal("Could not create new log file\n");  /* TODO: does not have to be fatal (yet) */
    3501             :                 }
    3502       56230 :                 do_rotate(lg);
    3503       56230 :                 rotation_unlock(lg);
    3504             :         }
    3505             : 
    3506       61340 :         if (LOG_DISABLED(lg))
    3507             :                 return GDK_SUCCEED;
    3508             : 
    3509       56227 :         ATOMIC_INC(&lg->current->refcount);
    3510       56227 :         *file_id = lg->current->id;
    3511       56227 :         logformat l;
    3512       56227 :         l.flag = LOG_START;
    3513       56227 :         l.id = ++lg->tid;
    3514             : 
    3515       56227 :         TRC_DEBUG(WAL, "tstart %d\n", lg->tid);
    3516       56227 :         if (log_write_format(lg, &l) != GDK_SUCCEED) {
    3517           0 :                 ATOMIC_DEC(&lg->current->refcount);
    3518           0 :                 return GDK_FAIL;
    3519             :         }
    3520             : 
    3521             :         return GDK_SUCCEED;
    3522             : }
    3523             : 
    3524             : void
    3525         114 : log_printinfo(logger *lg)
    3526             : {
    3527         114 :         printf("logger %s:\n", lg->fn);
    3528         114 :         rotation_lock(lg);
    3529         114 :         printf("current log file "ULLFMT", last handled log file "ULLFMT"\n",
    3530             :                lg->id, lg->saved_id);
    3531         114 :         rotation_unlock(lg);
    3532         114 :         printf("current transaction id %d, saved transaction id %d\n",
    3533             :                lg->tid, lg->saved_tid);
    3534         114 :         printf("number of flushers: %d\n", (int) ATOMIC_GET(&lg->nr_flushers));
    3535         114 :         printf("number of catalog entries "BUNFMT", of which "BUNFMT" deleted\n",
    3536         114 :                lg->catalog_bid->batCount, lg->dcatalog->batCount);
    3537         269 :         for (logged_range *p = lg->pending; p; p = p->next) {
    3538         155 :                 char buf[32];
    3539         155 :                 if (p->output_log == NULL ||
    3540         114 :                     snprintf(buf, sizeof(buf), ", file size %"PRIu64, (uint64_t) getfilepos(getFile(lg->current->output_log))) >= (int) sizeof(buf))
    3541          41 :                         buf[0] = 0;
    3542         196 :                 printf("pending range "ULLFMT": drops %"PRIu64", last_ts %"PRIu64", flushed_ts %"PRIu64", refcount %"PRIu64"%s%s\n", p->id, (uint64_t) ATOMIC_GET(&p->drops), (uint64_t) ATOMIC_GET(&p->last_ts), (uint64_t) ATOMIC_GET(&p->flushed_ts), (uint64_t) ATOMIC_GET(&p->refcount), buf, p == lg->current ? " (current)" : "");
    3543             :         }
    3544         114 : }

Generated by: LCOV version 1.14