LCOV - code coverage report
Current view: top level - gdk - gdk_logger.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 1586 2162 73.4 %
Date: 2025-03-26 20:06:40 Functions: 78 79 98.7 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024, 2025 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "gdk.h"
      15             : #include "gdk_private.h"
      16             : #include "gdk_logger.h"
      17             : #include "gdk_logger_internals.h"
      18             : #include "mutils.h"
      19             : #include <string.h>
      20             : 
      21             : static gdk_return log_add_bat(logger *lg, BAT *b, log_id id, int tid);
      22             : static gdk_return log_del_bat(logger *lg, log_bid bid);
      23             : /*
      24             :  * The logger uses a directory to store its log files. One master log
      25             :  * file stores information about the version of the logger and the
      26             :  * type mapping it uses. This file is a simple ascii file with the
      27             :  * following format:
      28             :  *  {6DIGIT-VERSION\n[id,type_name\n]*}
      29             :  * The transaction log files have a binary format.
      30             :  */
      31             : 
      32             : #define LOG_START       0
      33             : #define LOG_END         1
      34             : #define LOG_UPDATE_CONST        2
      35             : #define LOG_UPDATE_BULK 3
      36             : #define LOG_UPDATE      4
      37             : #define LOG_CREATE      5
      38             : #define LOG_DESTROY     6
      39             : #define LOG_SEQ         7
      40             : #define LOG_CLEAR       8       /* DEPRECATED */
      41             : #define LOG_BAT_GROUP   9
      42             : 
      43             : #ifdef NATIVE_WIN32
      44             : #define getfilepos _ftelli64
      45             : #else
      46             : #ifdef HAVE_FSEEKO
      47             : #define getfilepos ftello
      48             : #else
      49             : #define getfilepos ftell
      50             : #endif
      51             : #endif
      52             : 
      53             : #define BATSIZE 0
      54             : 
      55             : #define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory || (lg)->flushnow)
      56             : 
      57             : static const char *log_commands[] = {
      58             :         "LOG_START",
      59             :         "LOG_END",
      60             :         "LOG_UPDATE_CONST",
      61             :         "LOG_UPDATE_BULK",
      62             :         "LOG_UPDATE",
      63             :         "LOG_CREATE",
      64             :         "LOG_DESTROY",
      65             :         "LOG_SEQ",
      66             :         "",                   /* LOG_CLEAR IS DEPRECATED */
      67             :         "LOG_BAT_GROUP",
      68             : };
      69             : 
      70             : typedef struct logaction {
      71             :         int type;               /* type of change */
      72             :         lng nr;
      73             :         int tt;
      74             :         lng id;
      75             :         lng offset;
      76             :         log_id cid;             /* id of object */
      77             :         BAT *b;                 /* temporary bat with changes */
      78             :         BAT *uid;               /* temporary bat with bun positions to update */
      79             : } logaction;
      80             : 
      81             : /* during the recover process a number of transactions could be active */
      82             : typedef struct trans {
      83             :         int tid;                /* transaction id */
      84             :         int sz;                 /* sz of the changes array */
      85             :         int nr;                 /* nr of changes */
      86             : 
      87             :         logaction *changes;
      88             : 
      89             :         struct trans *tr;
      90             : } trans;
      91             : 
      92             : typedef struct logformat_t {
      93             :         bte flag;
      94             :         int id;
      95             : } logformat;
      96             : 
      97             : typedef enum { LOG_OK, LOG_EOF, LOG_ERR } log_return;
      98             : 
      99             : static gdk_return bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated);
     100             : static gdk_return tr_grow(trans *tr);
     101             : 
     102             : #define log_lock(lg)    MT_lock_set(&(lg)->lock)
     103             : #define log_unlock(lg)  MT_lock_unset(&(lg)->lock)
     104             : 
     105             : static inline bte
     106      951131 : find_type(logger *lg, int tpe)
     107             : {
     108      951131 :         assert(tpe >= 0 && tpe < MAXATOMS);
     109      951131 :         return lg->type_id[tpe];
     110             : }
     111             : 
     112             : static inline int
     113      543758 : find_type_nr(logger *lg, bte tpe)
     114             : {
     115      543758 :         int nr = lg->type_nr[tpe < 0 ? 256 + tpe : tpe];
     116      543758 :         if (nr == 255)
     117           0 :                 return -1;
     118             :         return nr;
     119             : }
     120             : 
     121             : static BUN
     122      422755 : log_find(BAT *b, BAT *d, int val)
     123             : {
     124      422755 :         BUN p;
     125             : 
     126      422755 :         assert(b->ttype == TYPE_int);
     127      422755 :         assert(d->ttype == TYPE_oid);
     128      422755 :         BATiter bi = bat_iterator(b);
     129      422755 :         if (BAThash(b) == GDK_SUCCEED) {
     130      422755 :                 MT_rwlock_rdlock(&b->thashlock);
     131      710908 :                 HASHloop_int(bi, b->thash, p, &val) {
     132      184673 :                         oid pos = p;
     133      184673 :                         if (BUNfnd(d, &pos) == BUN_NONE) {
     134      184673 :                                 MT_rwlock_rdunlock(&b->thashlock);
     135      184673 :                                 bat_iterator_end(&bi);
     136      184673 :                                 return p;
     137             :                         }
     138             :                 }
     139      238082 :                 MT_rwlock_rdunlock(&b->thashlock);
     140             :         } else {                /* unlikely: BAThash failed */
     141           0 :                 int *t = (int *) bi.base;
     142             : 
     143           0 :                 for (p = 0; p < bi.count; p++) {
     144           0 :                         if (t[p] == val) {
     145           0 :                                 oid pos = p;
     146           0 :                                 if (BUNfnd(d, &pos) == BUN_NONE) {
     147           0 :                                         bat_iterator_end(&bi);
     148           0 :                                         return p;
     149             :                                 }
     150             :                         }
     151             :                 }
     152             :         }
     153      238082 :         bat_iterator_end(&bi);
     154      238082 :         return BUN_NONE;
     155             : }
     156             : 
     157             : static log_bid
     158      654354 : internal_find_bat(logger *lg, log_id id, int tid)
     159             : {
     160      654354 :         BUN p;
     161             : 
     162      654354 :         if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
     163      654354 :                 BATiter cni = bat_iterator(lg->catalog_id);
     164      654354 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     165      654354 :                 if (tid < 0) {
     166      403415 :                         HASHloop_int(cni, cni.b->thash, p, &id) {
     167      211093 :                                 oid pos = p;
     168      211093 :                                 if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
     169      211093 :                                         MT_rwlock_rdunlock(&cni.b->thashlock);
     170      211093 :                                         bat_iterator_end(&cni);
     171      211093 :                                         return *(log_bid *) Tloc(lg->catalog_bid, p);
     172             :                                 }
     173             :                         }
     174             :                 } else {
     175      361199 :                         BUN cp = BUN_NONE;
     176     1390528 :                         HASHloop_int(cni, cni.b->thash, p, &id) {
     177      785682 :                                 lng lid = *(lng *) Tloc(lg->catalog_lid, p);
     178      785682 :                                 if (lid != lng_nil && lid <= tid) {
     179             :                                         break;
     180             :                                 }
     181             :                                 cp = p;
     182             :                         }
     183      361199 :                         if (cp != BUN_NONE) {
     184      360943 :                                 MT_rwlock_rdunlock(&cni.b->thashlock);
     185      360943 :                                 bat_iterator_end(&cni);
     186      360943 :                                 return *(log_bid *) Tloc(lg->catalog_bid, cp);
     187             :                         }
     188             :                 }
     189       82318 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     190       82318 :                 bat_iterator_end(&cni);
     191       82318 :                 return 0;       /* not found */
     192             :         }
     193             :         return -1;              /* error creating hash */
     194             : }
     195             : 
     196             : static inline void
     197       16612 : logbat_destroy(BAT *b)
     198             : {
     199       20790 :         BBPreclaim(b);
     200        3816 : }
     201             : 
     202             : static BAT *
     203       18348 : logbat_new(int tt, BUN size, role_t role)
     204             : {
     205       18348 :         BAT *nb = COLnew(0, tt, size, role);
     206             : 
     207       18348 :         if (nb) {
     208       18348 :                 BBP_pid(nb->batCacheid) = 0;
     209       18348 :                 if (role == PERSISTENT) {
     210       11418 :                         BATmode(nb, false);
     211       11418 :                         nb = BATsetaccess(nb, BAT_READ);
     212             :                 }
     213             :         } else {
     214           0 :                 TRC_CRITICAL(GDK, "creating new BAT[%s]#" BUNFMT " failed\n", ATOMname(tt), size);
     215             :         }
     216       18348 :         return nb;
     217             : }
     218             : 
     219             : static bool
     220      756669 : log_read_format(logger *lg, logformat *data)
     221             : {
     222      756669 :         assert(!lg->inmemory);
     223      756669 :         if (mnstr_read(lg->input_log, &data->flag, 1, 1) == 1) {
     224      743784 :                 if (mnstr_readInt(lg->input_log, &data->id) == 1)
     225             :                         return true;
     226             :                 /* could only read part, so complain */
     227           0 :                 TRC_CRITICAL(GDK, "read failed\n");
     228             :         }
     229             :         return false;
     230             : }
     231             : 
     232             : static gdk_return
     233      748466 : log_write_format(logger *lg, logformat *data)
     234             : {
     235      748466 :         assert(data->id || data->flag);
     236      748466 :         assert(!lg->inmemory);
     237      748466 :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
     238     1496932 :         if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
     239     1496932 :             mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
     240      748466 :             mnstr_writeInt(lg->current->output_log, data->id))
     241             :                 return GDK_SUCCEED;
     242           0 :         TRC_CRITICAL(GDK, "write failed\n");
     243           0 :         return GDK_FAIL;
     244             : }
     245             : 
     246             : static log_return
     247        2773 : log_read_seq(logger *lg, logformat *l)
     248             : {
     249        2773 :         int seq = l->id;
     250        2773 :         lng val;
     251        2773 :         BUN p;
     252             : 
     253        2773 :         assert(!lg->inmemory);
     254        2773 :         if (mnstr_readLng(lg->input_log, &val) != 1) {
     255           0 :                 TRC_CRITICAL(GDK, "read failed\n");
     256           0 :                 return LOG_EOF;
     257             :         }
     258        2773 :         if (lg->flushing)
     259             :                 return LOG_OK;
     260             : 
     261          65 :         if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
     262          64 :             p >= lg->seqs_id->batInserted) {
     263          40 :                 assert(lg->seqs_val->hseqbase == 0);
     264          40 :                 if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
     265           0 :                         TRC_CRITICAL(GDK, "replace of %s_seqs_val failed\n", lg->fn);
     266           0 :                         return LOG_ERR;
     267             :                 }
     268             :         } else {
     269          24 :                 if (p != BUN_NONE) {
     270          24 :                         oid pos = p;
     271          24 :                         if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
     272           0 :                                 TRC_CRITICAL(GDK, "append to %s_dseqs failed\n", lg->fn);
     273           0 :                                 return LOG_ERR;
     274             :                         }
     275             :                 }
     276          50 :                 if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
     277          25 :                     BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
     278           0 :                         TRC_CRITICAL(GDK, "append to %s_seqs_val/id failed\n", lg->fn);
     279           0 :                         return LOG_ERR;
     280             :                 }
     281             :         }
     282             :         return LOG_OK;
     283             : }
     284             : 
     285             : #if 0
     286             : static gdk_return
     287             : log_write_id(logger *lg, int id)
     288             : {
     289             :         assert(!lg->inmemory);
     290             :         assert(id >= 0);
     291             :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
     292             :         if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
     293             :             mnstr_writeInt(lg->current->output_log, id))
     294             :                 return GDK_SUCCEED;
     295             :         TRC_CRITICAL(GDK, "write failed\n");
     296             :         return GDK_FAIL;
     297             : }
     298             : 
     299             : static log_return
     300             : log_read_id(logger *lg, log_id *id)
     301             : {
     302             :         assert(!lg->inmemory);
     303             :         if (mnstr_readInt(lg->input_log, id) != 1) {
     304             :                 TRC_CRITICAL(GDK, "read failed\n");
     305             :                 return LOG_EOF;
     306             :         }
     307             :         return LOG_OK;
     308             : }
     309             : #endif
     310             : 
     311             : static log_return
     312       19225 : string_reader(logger *lg, BAT *b, lng nr)
     313             : {
     314       19225 :         size_t sz = 0;
     315       19225 :         lng SZ = 0;
     316       19225 :         log_return res = LOG_OK;
     317             : 
     318       38621 :         while (nr && res == LOG_OK) {
     319       19396 :                 if (mnstr_readLng(lg->input_log, &SZ) != 1) {
     320           0 :                         TRC_CRITICAL(GDK, "read failed\n");
     321           0 :                         return LOG_EOF;
     322             :                 }
     323       19396 :                 sz = (size_t) SZ;
     324       19396 :                 char *buf = lg->rbuf;
     325       19396 :                 if (lg->rbufsize < sz) {
     326           2 :                         if (!(buf = GDKrealloc(lg->rbuf, sz))) {
     327           0 :                                 TRC_CRITICAL(GDK, "couldn't grow string buffer\n");
     328           0 :                                 return LOG_ERR;
     329             :                         }
     330           2 :                         lg->rbuf = buf;
     331           2 :                         lg->rbufsize = sz;
     332             :                 }
     333             : 
     334       19396 :                 if (mnstr_read(lg->input_log, buf, sz, 1) != 1) {
     335           0 :                         TRC_CRITICAL(GDK, "read failed\n");
     336           0 :                         return LOG_EOF;
     337             :                 }
     338             :                 /* handle strings */
     339             :                 char *t = buf;
     340             :                 /* chunked */
     341             : #define CHUNK_SIZE 1024
     342             :                 char *strings[CHUNK_SIZE];
     343             :                 int cur = 0;
     344             : 
     345       70823 :                 for (; nr > 0 && res == LOG_OK && t < (buf + sz); nr--) {
     346       51427 :                         strings[cur++] = t;
     347       51427 :                         if (cur == CHUNK_SIZE &&
     348           4 :                             b &&
     349           4 :                             BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
     350           0 :                                 TRC_CRITICAL(GDK, "append to string bat failed\n");
     351           0 :                                 res = LOG_ERR;
     352             :                         }
     353       51427 :                         if (cur == CHUNK_SIZE)
     354          14 :                                 cur = 0;
     355             :                         /* find next */
     356     5540972 :                         while (*t)
     357     5489545 :                                 t++;
     358       51427 :                         t++;
     359             :                 }
     360       19396 :                 if (cur &&
     361         332 :                     b &&
     362         332 :                     BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
     363           0 :                         TRC_CRITICAL(GDK, "append to string bat failed\n");
     364           0 :                         res = LOG_ERR;
     365             :                 }
     366             :         }
     367             :         return res;
     368             : }
     369             : 
     370             : 
     371             : struct offset {
     372             :         lng os;           /* offset within source BAT in logfile */
     373             :         lng nr;           /* number of values to be copied */
     374             :         lng od;           /* offset within destination BAT in database */
     375             : };
     376             : 
     377             : static log_return
     378      389096 : log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, BAT **cands)
     379             : {
     380      389096 :         log_return res = LOG_OK;
     381      389096 :         lng nr, pnr;
     382      389096 :         bte type_id = -1;
     383      389096 :         int tpe;
     384             : 
     385      389096 :         assert(!lg->inmemory);
     386      389096 :         TRC_DEBUG(WAL, "found %d %s", id, l->flag == LOG_UPDATE ? "update" : "update_buld");
     387             : 
     388      778192 :         if (mnstr_readLng(lg->input_log, &nr) != 1 ||
     389      389096 :             mnstr_read(lg->input_log, &type_id, 1, 1) != 1) {
     390           0 :                 TRC_CRITICAL(GDK, "read failed\n");
     391           0 :                 return LOG_EOF;
     392             :         }
     393             : 
     394      389096 :         pnr = nr;
     395      389096 :         tpe = find_type_nr(lg, type_id);
     396      389096 :         if (tpe >= 0) {
     397      389096 :                 BAT *uid = NULL;
     398      389096 :                 BAT *r = NULL;
     399      389096 :                 void *(*rt)(ptr, size_t *, stream *, size_t) = BATatoms[tpe].atomRead;
     400      389096 :                 lng offset;
     401             : 
     402      389096 :                 assert(nr <= (lng) BUN_MAX);
     403      389096 :                 if (!lg->flushing && l->flag == LOG_UPDATE) {
     404           0 :                         uid = COLnew(0, TYPE_oid, (BUN) nr, PERSISTENT);
     405           0 :                         if (uid == NULL) {
     406           0 :                                 TRC_CRITICAL(GDK, "creating bat failed\n");
     407       28209 :                                 return LOG_ERR;
     408             :                         }
     409             :                 }
     410             : 
     411      389096 :                 if (l->flag == LOG_UPDATE_CONST) {
     412      122703 :                         if (mnstr_readLng(lg->input_log, &offset) != 1) {
     413           0 :                                 TRC_CRITICAL(GDK, "read failed\n");
     414           0 :                                 return LOG_EOF;
     415             :                         }
     416      122703 :                         if (cands) {
     417             :                                 /* This const range actually represents a segment of candidates corresponding to updated bat entries */
     418             : 
     419       28209 :                                 if (BATcount(*cands) == 0 || lg->flushing) {
     420             :                                         /* when flushing, we only need the offset and count of the last segment of inserts. */
     421       28209 :                                         assert((*cands)->ttype == TYPE_void);
     422       28209 :                                         BATtseqbase(*cands, (oid) offset);
     423       28209 :                                         BATsetcount(*cands, (BUN) nr);
     424           0 :                                 } else if (!lg->flushing) {
     425           0 :                                         assert(BATcount(*cands) > 0);
     426           0 :                                         BAT *dense = BATdense(0, (oid) offset, (BUN) nr);
     427           0 :                                         BAT *newcands = NULL;
     428           0 :                                         if (!dense) {
     429           0 :                                                 TRC_CRITICAL(GDK, "creating bat failed\n");
     430           0 :                                                 res = LOG_ERR;
     431           0 :                                         } else if ((*cands)->ttype == TYPE_void) {
     432           0 :                                                 if ((newcands = BATmergecand(*cands, dense))) {
     433           0 :                                                         BBPreclaim(*cands);
     434           0 :                                                         *cands = newcands;
     435             :                                                 } else {
     436           0 :                                                         TRC_CRITICAL(GDK, "creating bat failed\n");
     437           0 :                                                         res = LOG_ERR;
     438             :                                                 }
     439             :                                         } else {
     440           0 :                                                 assert((*cands)->ttype == TYPE_oid);
     441           0 :                                                 assert(BATcount(*cands) > 0);
     442           0 :                                                 if (BATappend(*cands, dense, NULL, true) != GDK_SUCCEED) {
     443           0 :                                                         TRC_CRITICAL(GDK, "appending to bat failed\n");
     444           0 :                                                         res = LOG_ERR;
     445             :                                                 }
     446             :                                         }
     447           0 :                                         BBPreclaim(dense);
     448             :                                 }
     449             : 
     450             :                                 /* We have to read the value to update the read cursor */
     451       28209 :                                 size_t tlen = lg->rbufsize;
     452       28209 :                                 void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
     453       28209 :                                 if (t == NULL) {
     454           0 :                                         TRC_CRITICAL(GDK, "read failed\n");
     455           0 :                                         res = LOG_EOF;
     456             :                                 }
     457       28209 :                                 return res;
     458             :                         }
     459             :                 }
     460             : 
     461      360887 :                 if (!lg->flushing) {
     462        1397 :                         r = COLnew(0, tpe, (BUN) nr, PERSISTENT);
     463        1397 :                         if (r == NULL) {
     464           0 :                                 if (uid)
     465           0 :                                         BBPreclaim(uid);
     466           0 :                                 return LOG_ERR;
     467             :                         }
     468             :                 }
     469             : 
     470      360887 :                 if (l->flag == LOG_UPDATE_CONST) {
     471       94494 :                         size_t tlen = lg->rbufsize;
     472       94494 :                         void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
     473       94494 :                         if (t == NULL) {
     474           0 :                                 TRC_CRITICAL(GDK, "read failed\n");
     475           0 :                                 res = LOG_EOF;
     476             :                         } else {
     477       94494 :                                 lg->rbuf = t;
     478       94494 :                                 lg->rbufsize = tlen;
     479       94494 :                                 if (r) {
     480        8189 :                                         for (BUN p = 0; p < (BUN) nr; p++) {
     481        7918 :                                                 if (BUNappend(r, t, true) != GDK_SUCCEED) {
     482           0 :                                                         TRC_CRITICAL(GDK, "append to bat failed\n");
     483           0 :                                                         res = LOG_ERR;
     484             :                                                 }
     485             :                                         }
     486             :                                 }
     487             :                         }
     488      266393 :                 } else if (l->flag == LOG_UPDATE_BULK) {
     489      266375 :                         if (mnstr_readLng(lg->input_log, &offset) != 1) {
     490           0 :                                 if (r)
     491           0 :                                         BBPreclaim(r);
     492           0 :                                 TRC_CRITICAL(GDK, "read failed\n");
     493           0 :                                 return LOG_EOF;
     494             :                         }
     495      266375 :                         if (tpe == TYPE_msk) {
     496           0 :                                 if (r) {
     497           0 :                                         if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
     498           0 :                                                 BATsetcount(r, (BUN) nr);
     499             :                                         else {
     500           0 :                                                 TRC_CRITICAL(GDK, "read failed\n");
     501           0 :                                                 res = LOG_EOF;
     502             :                                         }
     503             :                                 } else {
     504           0 :                                         size_t tlen = lg->rbufsize / sizeof(int);
     505           0 :                                         size_t cnt = 0, snr = (size_t) nr;
     506           0 :                                         snr = (snr + 31) / 32;
     507           0 :                                         assert(tlen);
     508           0 :                                         for (; res == LOG_OK && snr > 0; snr -= cnt) {
     509           0 :                                                 cnt = snr > tlen ? tlen : snr;
     510           0 :                                                 if (!mnstr_readIntArray(lg->input_log, lg->rbuf, cnt)) {
     511           0 :                                                         TRC_CRITICAL(GDK, "read failed\n");
     512           0 :                                                         res = LOG_EOF;
     513             :                                                 }
     514             :                                         }
     515             :                                 }
     516             :                         } else {
     517      266375 :                                 if (!ATOMvarsized(tpe)) {
     518      246573 :                                         size_t cnt = 0, snr = (size_t) nr;
     519      246573 :                                         size_t tlen = lg->rbufsize / ATOMsize(tpe), ntlen = lg->rbufsize;
     520      246573 :                                         assert(tlen);
     521             :                                         /* read in chunks of max
     522             :                                          * BUFSIZE/width rows */
     523      493149 :                                         for (; res == LOG_OK && snr > 0; snr -= cnt) {
     524      246576 :                                                 cnt = snr > tlen ? tlen : snr;
     525      246576 :                                                 void *t = rt(lg->rbuf, &ntlen, lg->input_log, cnt);
     526             : 
     527      246576 :                                                 if (t == NULL) {
     528             :                                                         res = LOG_EOF;
     529             :                                                         break;
     530             :                                                 }
     531      246576 :                                                 assert(t == lg->rbuf);
     532      246576 :                                                 if (r && BUNappendmulti(r, t, cnt, true) != GDK_SUCCEED) {
     533           0 :                                                         TRC_CRITICAL(GDK, "append to bat failed\n");
     534           0 :                                                         res = LOG_ERR;
     535             :                                                 }
     536             :                                         }
     537       19802 :                                 } else if (tpe == TYPE_str) {
     538             :                                         /* efficient string */
     539       19219 :                                         res = string_reader(lg, r, nr);
     540             :                                 } else {
     541        1208 :                                         for (; res == LOG_OK && nr > 0; nr--) {
     542         625 :                                                 size_t tlen = lg->rbufsize;
     543         625 :                                                 void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
     544             : 
     545         625 :                                                 if (t == NULL) {
     546             :                                                         /* see if failure was due to
     547             :                                                          * malloc or something less
     548             :                                                          * serious (in the current
     549             :                                                          * context) */
     550           0 :                                                         if (strstr(GDKerrbuf, "alloc") == NULL)
     551             :                                                                 res = LOG_EOF;
     552             :                                                         else
     553           0 :                                                                 res = LOG_ERR;
     554           0 :                                                         TRC_CRITICAL(GDK, "read failed\n");
     555             :                                                 } else {
     556         625 :                                                         lg->rbuf = t;
     557         625 :                                                         lg->rbufsize = tlen;
     558         625 :                                                         if (r && BUNappend(r, t, true) != GDK_SUCCEED) {
     559           0 :                                                                 TRC_CRITICAL(GDK, "append to bat failed\n");
     560           0 :                                                                 res = LOG_ERR;
     561             :                                                         }
     562             :                                                 }
     563             :                                         }
     564             :                                 }
     565             :                         }
     566             :                 } else {
     567          18 :                         void *(*rh)(ptr, size_t *, stream *, size_t) = BATatoms[TYPE_oid].atomRead;
     568          18 :                         void *hv = ATOMnil(TYPE_oid);
     569          18 :                         offset = 0;
     570             : 
     571          18 :                         if (hv == NULL) {
     572           0 :                                 TRC_CRITICAL(GDK, "read failed\n");
     573           0 :                                 res = LOG_EOF;
     574             :                         }
     575        1071 :                         for (; res == LOG_OK && nr > 0; nr--) {
     576        1053 :                                 size_t hlen = sizeof(oid);
     577        1053 :                                 void *h = rh(hv, &hlen, lg->input_log, 1);
     578        1053 :                                 assert(hlen == sizeof(oid));
     579        1053 :                                 assert(h == hv);
     580        1053 :                                 if ((uid && BUNappend(uid, h, true) != GDK_SUCCEED)) {
     581           0 :                                         TRC_CRITICAL(GDK, "append to bat failed\n");
     582           0 :                                         res = LOG_ERR;
     583             :                                 }
     584             :                         }
     585          18 :                         nr = pnr;
     586          18 :                         if (tpe == TYPE_msk) {
     587           0 :                                 if (r) {
     588           0 :                                         if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
     589           0 :                                                 BATsetcount(r, (BUN) nr);
     590             :                                         else {
     591           0 :                                                 TRC_CRITICAL(GDK, "read failed\n");
     592           0 :                                                 res = LOG_EOF;
     593             :                                         }
     594             :                                 } else {
     595           0 :                                         for (lng i = 0; i < nr; i += 32) {
     596           0 :                                                 int v;
     597           0 :                                                 switch (mnstr_readInt(lg->input_log, &v)) {
     598           0 :                                                 case 1:
     599           0 :                                                         continue;
     600             :                                                 case 0:
     601             :                                                         res = LOG_EOF;
     602             :                                                         break;
     603             :                                                 default:
     604             :                                                         res = LOG_ERR;
     605             :                                                         break;
     606             :                                                 }
     607           0 :                                                 TRC_CRITICAL(GDK, "read failed\n");
     608           0 :                                                 break;
     609             :                                         }
     610             :                                 }
     611          18 :                         } else if (tpe == TYPE_str) {
     612             :                                 /* efficient string */
     613           6 :                                 res = string_reader(lg, r, nr);
     614             :                         } else {
     615          58 :                                 for (; res == LOG_OK && nr > 0; nr--) {
     616          46 :                                         size_t tlen = lg->rbufsize;
     617          46 :                                         void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
     618             : 
     619          46 :                                         if (t == NULL) {
     620           0 :                                                 if (strstr(GDKerrbuf, "malloc") == NULL)
     621             :                                                         res = LOG_EOF;
     622             :                                                 else
     623           0 :                                                         res = LOG_ERR;
     624           0 :                                                 TRC_CRITICAL(GDK, "read failed\n");
     625             :                                         } else {
     626          46 :                                                 lg->rbuf = t;
     627          46 :                                                 lg->rbufsize = tlen;
     628          46 :                                                 if ((r && BUNappend(r, t, true) != GDK_SUCCEED)) {
     629           0 :                                                         TRC_CRITICAL(GDK, "append to bat failed\n");
     630           0 :                                                         res = LOG_ERR;
     631             :                                                 }
     632             :                                         }
     633             :                                 }
     634             :                         }
     635          18 :                         GDKfree(hv);
     636             :                 }
     637             : 
     638      360887 :                 if (res == LOG_OK) {
     639      360887 :                         if (tr_grow(tr) == GDK_SUCCEED) {
     640      360887 :                                 tr->changes[tr->nr].type = l->flag;
     641      360887 :                                 if (l->flag == LOG_UPDATE_BULK && offset == -1) {
     642      138983 :                                         assert(cands);  /* bat r is part of a group of bats logged together. */
     643      138983 :                                         struct canditer ci;
     644      138983 :                                         canditer_init(&ci, NULL, *cands);
     645      138983 :                                         const oid first = canditer_peek(&ci);
     646      138983 :                                         const oid last = canditer_last(&ci);
     647      138983 :                                         offset = (lng) first;
     648      138983 :                                         pnr = (lng) (last - first) + 1;
     649      138983 :                                         if (!lg->flushing) {
     650        1022 :                                                 assert(uid == NULL);
     651        1022 :                                                 uid = *cands;
     652        1022 :                                                 BBPfix((*cands)->batCacheid);
     653        1022 :                                                 tr->changes[tr->nr].type = LOG_UPDATE;
     654             :                                         }
     655             :                                 }
     656      360887 :                                 if (l->flag == LOG_UPDATE_CONST) {
     657       94494 :                                         assert(!cands); /* TODO: This might change in the future. */
     658       94494 :                                         tr->changes[tr->nr].type = LOG_UPDATE_BULK;
     659             :                                 }
     660      360887 :                                 tr->changes[tr->nr].nr = pnr;
     661      360887 :                                 tr->changes[tr->nr].tt = tpe;
     662      360887 :                                 tr->changes[tr->nr].cid = id;
     663      360887 :                                 tr->changes[tr->nr].offset = offset;
     664      360887 :                                 tr->changes[tr->nr].b = r;
     665      360887 :                                 tr->changes[tr->nr].uid = uid;
     666      360887 :                                 tr->nr++;
     667             :                         } else {
     668           0 :                                 TRC_CRITICAL(GDK, "memory allocation failed\n");
     669           0 :                                 res = LOG_ERR;
     670             :                         }
     671             :                 }
     672      360887 :                 if (res != LOG_OK) {
     673           0 :                         if (r)
     674           0 :                                 BBPreclaim(r);
     675           0 :                         if (cands && uid)
     676           0 :                                 BBPunfix((*cands)->batCacheid);
     677           0 :                         else if (uid)
     678           0 :                                 BBPreclaim(uid);
     679             :                 }
     680             :         } else {
     681             :                 /* bat missing ERROR or ignore ? currently error. */
     682           0 :                 TRC_CRITICAL(GDK, "unknown type\n");
     683           0 :                 res = LOG_ERR;
     684             :         }
     685             :         return res;
     686             : }
     687             : 
     688             : 
     689             : static gdk_return
     690      595696 : la_bat_update_count(logger *lg, log_id id, lng cnt, int tid)
     691             : {
     692      595696 :         BATiter cni = bat_iterator_nolock(lg->catalog_id);
     693             : 
     694      595696 :         if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
     695      595696 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     696      595696 :                 BUN p, cp = BUN_NONE;
     697             : 
     698     2040770 :                 HASHloop_int(cni, cni.b->thash, p, &id) {
     699     1024144 :                         lng lid = *(lng *) Tloc(lg->catalog_lid, p);
     700             : 
     701     1024144 :                         if (lid != lng_nil && lid <= tid)
     702             :                                 break;
     703             :                         cp = p;
     704             :                 }
     705      595696 :                 if (cp != BUN_NONE) {
     706      595494 :                         lng ocnt = *(lng *) Tloc(lg->catalog_cnt, cp);
     707      595494 :                         assert(lg->catalog_cnt->hseqbase == 0);
     708      595494 :                         if (ocnt < cnt && BUNreplace(lg->catalog_cnt, cp, &cnt, false) != GDK_SUCCEED) {
     709           0 :                                 MT_rwlock_rdunlock(&cni.b->thashlock);
     710           0 :                                 return GDK_FAIL;
     711             :                         }
     712             :                 }
     713      595696 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     714      595696 :                 return GDK_SUCCEED;
     715             :         }
     716             :         return GDK_FAIL;
     717             : }
     718             : 
     719             : static gdk_return
     720      360887 : la_bat_updates(logger *lg, logaction *la, int tid)
     721             : {
     722      360887 :         log_bid bid = internal_find_bat(lg, la->cid, tid);
     723      360887 :         BAT *b = NULL;
     724             : 
     725      360887 :         if (bid < 0)
     726             :                 return GDK_FAIL;
     727      360887 :         if (!bid) {
     728             :                 /* object already gone, nothing needed */
     729             :                 return GDK_SUCCEED;
     730             :         }
     731             : 
     732      360887 :         if (!lg->flushing) {
     733        1397 :                 b = BATdescriptor(bid);
     734        1397 :                 if (b == NULL)
     735             :                         return GDK_FAIL;
     736             :         }
     737      360887 :         BUN cnt = 0;
     738      360887 :         if (la->type == LOG_UPDATE_BULK) {
     739      359847 :                 if (!lg->flushing) {
     740         375 :                         cnt = BATcount(b);
     741         375 :                         int is_msk = (b->ttype == TYPE_msk);
     742             :                         /* handle offset 0 ie clear */
     743         375 :                         if ( /* DISABLES CODE */ (0) && la->offset == 0 && cnt)
     744             :                                 BATclear(b, true);
     745             :                         /* handle offset */
     746         375 :                         if (cnt <= (BUN) la->offset) {
     747         234 :                                 msk t = 1;
     748         234 :                                 if (cnt < (BUN) la->offset) {     /* insert nils */
     749           0 :                                         const void *tv = (is_msk) ? &t : ATOMnilptr(b->ttype);
     750           0 :                                         lng i, d = la->offset - BATcount(b);
     751           0 :                                         for (i = 0; i < d; i++) {
     752           0 :                                                 if (BUNappend(b, tv, true) != GDK_SUCCEED) {
     753           0 :                                                         logbat_destroy(b);
     754           0 :                                                         return GDK_FAIL;
     755             :                                                 }
     756             :                                         }
     757             :                                 }
     758         234 :                                 if (BATcount(b) == (BUN) la->offset && BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
     759           0 :                                         logbat_destroy(b);
     760           0 :                                         return GDK_FAIL;
     761             :                                 }
     762             :                         } else {
     763         141 :                                 BATiter vi = bat_iterator(la->b);
     764         141 :                                 BUN p, q;
     765             : 
     766        2202 :                                 for (p = 0, q = (BUN) la->offset; p < (BUN) la->nr; p++, q++) {
     767        2061 :                                         const void *t = BUNtail(vi, p);
     768             : 
     769        2061 :                                         if (q < cnt) {
     770        2060 :                                                 if (b->tnosorted == q ||
     771        2053 :                                                     b->tnosorted == q + 1)
     772           7 :                                                         b->tnosorted = 0;
     773        2060 :                                                 if (b->tnorevsorted == q ||
     774        2053 :                                                     b->tnorevsorted == q + 1)
     775           7 :                                                         b->tnorevsorted = 0;
     776        2060 :                                                 if (b->tnokey[0] == q ||
     777        2053 :                                                     b->tnokey[1] == q) {
     778           7 :                                                         b->tnokey[0] = 0;
     779           7 :                                                         b->tnokey[1] = 0;
     780             :                                                 }
     781        2060 :                                                 if (b->tminpos == q)
     782           5 :                                                         b->tminpos = BUN_NONE;
     783        2060 :                                                 if (b->tmaxpos == q)
     784           3 :                                                         b->tmaxpos = BUN_NONE;
     785        2060 :                                                 b->tkey = false;
     786        2060 :                                                 b->tsorted = false;
     787        2060 :                                                 b->tkey = false;
     788        2060 :                                                 if (BUNreplace(b, q, t, true) != GDK_SUCCEED) {
     789           0 :                                                         logbat_destroy(b);
     790           0 :                                                         bat_iterator_end(&vi);
     791           0 :                                                         return GDK_FAIL;
     792             :                                                 }
     793             :                                         } else {
     794           1 :                                                 if (BUNappend(b, t, true) != GDK_SUCCEED) {
     795           0 :                                                         logbat_destroy(b);
     796           0 :                                                         bat_iterator_end(&vi);
     797           0 :                                                         return GDK_FAIL;
     798             :                                                 }
     799             :                                         }
     800             :                                 }
     801         141 :                                 bat_iterator_end(&vi);
     802             :                         }
     803             :                 }
     804        1040 :         } else if (la->type == LOG_UPDATE) {
     805        1040 :                 if (!lg->flushing && BATupdate(b, la->uid, la->b, true) != GDK_SUCCEED) {
     806             :                         return GDK_FAIL;
     807             :                 }
     808             :         }
     809      360887 :         cnt = (BUN) (la->offset + la->nr);
     810      360887 :         if (la_bat_update_count(lg, la->cid, cnt, tid) != GDK_SUCCEED) {
     811           0 :                 if (b)
     812           0 :                         logbat_destroy(b);
     813           0 :                 return GDK_FAIL;
     814             :         }
     815      360887 :         if (b)
     816        1397 :                 logbat_destroy(b);
     817             :         return GDK_SUCCEED;
     818             : }
     819             : 
     820             : static log_return
     821        9407 : log_read_destroy(logger *lg, trans *tr, log_id id)
     822             : {
     823        9407 :         (void) lg;
     824        9407 :         assert(!lg->inmemory);
     825        9407 :         if (tr_grow(tr) == GDK_SUCCEED) {
     826        9407 :                 tr->changes[tr->nr].type = LOG_DESTROY;
     827        9407 :                 tr->changes[tr->nr].cid = id;
     828        9407 :                 tr->nr++;
     829        9407 :                 return LOG_OK;
     830             :         }
     831           0 :         TRC_CRITICAL(GDK, "memory allocation failed\n");
     832           0 :         return LOG_ERR;
     833             : }
     834             : 
     835             : static gdk_return
     836          48 : la_bat_destroy(logger *lg, logaction *la, int tid)
     837             : {
     838          48 :         log_bid bid = internal_find_bat(lg, la->cid, tid);
     839             : 
     840          48 :         if (bid < 0)
     841             :                 return GDK_FAIL;
     842          48 :         if (!bid) {
     843             : #ifndef NDEBUG
     844           0 :                 GDKwarning("failed to find bid for object %d\n", la->cid);
     845             : #endif
     846           0 :                 return GDK_SUCCEED;
     847             :         }
     848          48 :         if (bid && log_del_bat(lg, bid) != GDK_SUCCEED)
     849             :                 return GDK_FAIL;
     850             :         return GDK_SUCCEED;
     851             : }
     852             : 
     853             : static log_return
     854      154662 : log_read_create(logger *lg, trans *tr, log_id id)
     855             : {
     856      154662 :         bte tt;
     857      154662 :         int tpe;
     858             : 
     859      154662 :         assert(!lg->inmemory);
     860      154662 :         TRC_DEBUG(WAL, "create %d", id);
     861             : 
     862      154662 :         if (mnstr_read(lg->input_log, &tt, 1, 1) != 1) {
     863           0 :                 TRC_CRITICAL(GDK, "read failed\n");
     864           0 :                 return LOG_EOF;
     865             :         }
     866             : 
     867      154662 :         tpe = find_type_nr(lg, tt);
     868             :         /* read create */
     869      154662 :         if (tr_grow(tr) == GDK_SUCCEED) {
     870      154662 :                 tr->changes[tr->nr].type = LOG_CREATE;
     871      154662 :                 tr->changes[tr->nr].tt = tpe;
     872      154662 :                 tr->changes[tr->nr].cid = id;
     873      154662 :                 tr->nr++;
     874      154662 :                 return LOG_OK;
     875             :         }
     876           0 :         TRC_CRITICAL(GDK, "memory allocation failed\n");
     877           0 :         return LOG_ERR;
     878             : }
     879             : 
     880             : static gdk_return
     881         264 : la_bat_create(logger *lg, logaction *la, int tid)
     882             : {
     883         264 :         BAT *b;
     884             : 
     885             :         /* formerly head column type, should be void */
     886         264 :         if ((b = COLnew(0, la->tt, BATSIZE, PERSISTENT)) == NULL)
     887             :                 return GDK_FAIL;
     888             : 
     889         264 :         if (la->tt < 0)
     890           0 :                 BATtseqbase(b, 0);
     891             : 
     892         528 :         if ((b = BATsetaccess(b, BAT_READ)) == NULL ||
     893         264 :             log_add_bat(lg, b, la->cid, tid) != GDK_SUCCEED) {
     894           0 :                 logbat_destroy(b);
     895           0 :                 return GDK_FAIL;
     896             :         }
     897         264 :         logbat_destroy(b);
     898         264 :         return GDK_SUCCEED;
     899             : }
     900             : 
     901             : static gdk_return
     902         245 : log_write_new_types(logger *lg, FILE *fp)
     903             : {
     904         245 :         bte id = 0;
     905             : 
     906             :         /* write types and insert into bats */
     907         245 :         memset(lg->type_id, -1, sizeof(lg->type_id));
     908         245 :         memset(lg->type_nr, 255, sizeof(lg->type_nr));
     909             :         /* first the fixed sized types */
     910        6858 :         for (int i = 0; i < GDKatomcnt; i++) {
     911        6613 :                 if (ATOMvarsized(i))
     912        1469 :                         continue;
     913        5144 :                 lg->type_id[i] = id;
     914        5144 :                 lg->type_nr[id] = i;
     915        5144 :                 if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
     916             :                         return GDK_FAIL;
     917        5144 :                 id++;
     918             :         }
     919             :         /* second the var sized types */
     920             :         id = -127;              /* start after nil */
     921        6858 :         for (int i = 0; i < GDKatomcnt; i++) {
     922        6613 :                 if (!ATOMvarsized(i))
     923        5144 :                         continue;
     924        1469 :                 lg->type_id[i] = id;
     925        1469 :                 lg->type_nr[256 + id] = i;
     926        1469 :                 if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
     927             :                         return GDK_FAIL;
     928        1469 :                 id++;
     929             :         }
     930             :         return GDK_SUCCEED;
     931             : }
     932             : 
     933             : #define TR_SIZE         1024
     934             : 
     935             : static trans *
     936       56234 : tr_create(trans *tr, int tid)
     937             : {
     938       56234 :         trans *ntr = GDKmalloc(sizeof(trans));
     939             : 
     940       56234 :         if (ntr == NULL)
     941             :                 return NULL;
     942       56234 :         ntr->tid = tid;
     943       56234 :         ntr->sz = TR_SIZE;
     944       56234 :         ntr->nr = 0;
     945       56234 :         ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
     946       56234 :         if (ntr->changes == NULL) {
     947           0 :                 GDKfree(ntr);
     948           0 :                 return NULL;
     949             :         }
     950       56234 :         ntr->tr = tr;
     951       56234 :         return ntr;
     952             : }
     953             : 
     954             : static gdk_return
     955      524956 : la_apply(logger *lg, logaction *c, int tid)
     956             : {
     957      524956 :         gdk_return ret = GDK_SUCCEED;
     958             : 
     959      524956 :         switch (c->type) {
     960      360887 :         case LOG_UPDATE_BULK:
     961             :         case LOG_UPDATE:
     962      360887 :                 ret = la_bat_updates(lg, c, tid);
     963      360887 :                 break;
     964      154662 :         case LOG_CREATE:
     965      154662 :                 if (!lg->flushing)
     966         264 :                         ret = la_bat_create(lg, c, tid);
     967             :                 break;
     968        9407 :         case LOG_DESTROY:
     969        9407 :                 if (!lg->flushing)
     970          48 :                         ret = la_bat_destroy(lg, c, tid);
     971             :                 break;
     972             :         default:
     973           0 :                 MT_UNREACHABLE();
     974             :         }
     975      524956 :         return ret;
     976             : }
     977             : 
     978             : static void
     979      524956 : la_destroy(logaction *c)
     980             : {
     981      524956 :         if ((c->type == LOG_UPDATE || c->type == LOG_UPDATE_BULK) && c->b)
     982        1397 :                 logbat_destroy(c->b);
     983      524956 :         if (c->type == LOG_UPDATE && c->uid)
     984        1022 :                 logbat_destroy(c->uid);
     985      524956 : }
     986             : 
     987             : static gdk_return
     988      524956 : tr_grow(trans *tr)
     989             : {
     990      524956 :         if (tr->nr == tr->sz) {
     991           7 :                 logaction *changes;
     992           7 :                 tr->sz <<= 1;
     993           7 :                 changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
     994           7 :                 if (changes == NULL)
     995             :                         return GDK_FAIL;
     996           7 :                 tr->changes = changes;
     997             :         }
     998             :         /* cleanup the next */
     999      524956 :         tr->changes[tr->nr].b = NULL;
    1000      524956 :         return GDK_SUCCEED;
    1001             : }
    1002             : 
    1003             : static trans *
    1004       56234 : tr_destroy(trans *tr)
    1005             : {
    1006       56234 :         trans *r = tr->tr;
    1007             : 
    1008       56234 :         GDKfree(tr->changes);
    1009       56234 :         GDKfree(tr);
    1010       56234 :         return r;
    1011             : }
    1012             : 
    1013             : static trans *
    1014           0 : tr_abort_(logger *lg, trans *tr, int s)
    1015             : {
    1016           0 :         int i;
    1017             : 
    1018           0 :         (void) lg;
    1019             : 
    1020           0 :         TRC_DEBUG(WAL, "abort");
    1021             : 
    1022           0 :         for (i = s; i < tr->nr; i++)
    1023           0 :                 la_destroy(&tr->changes[i]);
    1024           0 :         return tr_destroy(tr);
    1025             : }
    1026             : 
    1027             : static trans *
    1028           0 : tr_abort(logger *lg, trans *tr)
    1029             : {
    1030           0 :         return tr_abort_(lg, tr, 0);
    1031             : }
    1032             : 
    1033             : static trans *
    1034       56234 : tr_commit(logger *lg, trans *tr)
    1035             : {
    1036       56234 :         int i;
    1037             : 
    1038       56234 :         TRC_DEBUG(WAL, "commit");
    1039             : 
    1040      581190 :         for (i = 0; i < tr->nr; i++) {
    1041      524956 :                 if (la_apply(lg, &tr->changes[i], tr->tid) != GDK_SUCCEED) {
    1042           0 :                         TRC_CRITICAL(GDK, "aborting transaction\n");
    1043           0 :                         do {
    1044           0 :                                 tr = tr_abort_(lg, tr, i);
    1045           0 :                         } while (tr != NULL);
    1046             :                         return (trans *) -1;
    1047             :                 }
    1048      524956 :                 la_destroy(&tr->changes[i]);
    1049             :         }
    1050       56234 :         lg->saved_tid = tr->tid;
    1051       56234 :         return tr_destroy(tr);
    1052             : }
    1053             : 
    1054             : static gdk_return
    1055         127 : log_read_types_file(logger *lg, FILE *fp, int version, bool *needsnew)
    1056             : {
    1057         127 :         int id = 0;
    1058         127 :         char atom_name[IDLENGTH];
    1059         127 :         bool seen_geom = false;
    1060             : 
    1061             :         /* scanf should use IDLENGTH somehow */
    1062        3584 :         while (fscanf(fp, "%d,%63s\n", &id, atom_name) == 2) {
    1063        3457 :                 if (version < 52303 && strcmp(atom_name, "BAT") == 0) {
    1064           8 :                         *needsnew = true;
    1065           8 :                         continue;
    1066             :                 }
    1067        3449 :                 if (version < 52304 && strcmp(atom_name, "color") == 0) {
    1068          16 :                         *needsnew = true;
    1069          16 :                         continue;
    1070             :                 }
    1071         456 :                 if (version < 52304 && strcmp(atom_name, "identifier") == 0) {
    1072          16 :                         *needsnew = true;
    1073          16 :                         continue;
    1074             :                 }
    1075        3417 :                 if (version < 52304 && strcmp(atom_name, "wkba") == 0) {
    1076          16 :                         *needsnew = true;
    1077          16 :                         continue;
    1078             :                 }
    1079        3401 :                 int i = ATOMindex(atom_name);
    1080             : 
    1081        3401 :                 if (id < -127 || id > 127 || i < 0) {
    1082           0 :                         GDKerror("unknown type in log file '%s'\n", atom_name);
    1083           0 :                         return GDK_FAIL;
    1084             :                 }
    1085        3401 :                 seen_geom |= strcmp(atom_name, "mbr") == 0 || strcmp(atom_name, "wkb") == 0;
    1086        3401 :                 lg->type_id[i] = (int8_t) id;
    1087        3401 :                 lg->type_nr[id < 0 ? 256 + id : id] = i;
    1088             :         }
    1089             : #ifdef HAVE_GEOM
    1090         127 :         if (!seen_geom && ATOMindex("mbr") > 0) {
    1091           0 :                 GDKerror("incompatible database: server supports GEOM, but database does not\n");
    1092           0 :                 return GDK_FAIL;
    1093             :         }
    1094             : #endif
    1095             :         (void) seen_geom;
    1096             :         return GDK_SUCCEED;
    1097             : }
    1098             : 
    1099             : 
    1100             : static gdk_return
    1101         245 : log_create_types_file(logger *lg, const char *filename)
    1102             : {
    1103         245 :         FILE *fp;
    1104             : 
    1105         245 :         if ((fp = MT_fopen(filename, "w")) == NULL) {
    1106           0 :                 GDKerror("cannot create log file %s\n", filename);
    1107           0 :                 return GDK_FAIL;
    1108             :         }
    1109         245 :         if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
    1110           0 :                 fclose(fp);
    1111           0 :                 GDKerror("writing log file %s failed", filename);
    1112           0 :                 if (MT_remove(filename) < 0)
    1113           0 :                         GDKsyserror("remove %s failed\n", filename);
    1114           0 :                 return GDK_FAIL;
    1115             :         }
    1116             : 
    1117         245 :         if (log_write_new_types(lg, fp) != GDK_SUCCEED) {
    1118           0 :                 fclose(fp);
    1119           0 :                 GDKerror("writing log file %s failed", filename);
    1120           0 :                 if (MT_remove(filename) < 0)
    1121           0 :                         GDKsyserror("remove %s failed\n", filename);
    1122           0 :                 return GDK_FAIL;
    1123             :         }
    1124         245 :         if (fflush(fp) < 0 || (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)
    1125             : #if defined(_MSC_VER)
    1126             :                                && _commit(_fileno(fp)) < 0
    1127             : #elif defined(HAVE_FDATASYNC)
    1128           0 :                                && fdatasync(fileno(fp)) < 0
    1129             : #elif defined(HAVE_FSYNC)
    1130             :                                && fsync(fileno(fp)) < 0
    1131             : #endif
    1132             :             )) {
    1133           0 :                 GDKsyserror("flushing log file %s failed", filename);
    1134           0 :                 fclose(fp);
    1135           0 :                 if (MT_remove(filename) < 0)
    1136           0 :                         GDKsyserror("remove %s failed\n", filename);
    1137           0 :                 return GDK_FAIL;
    1138             :         }
    1139         245 :         if (fclose(fp) < 0) {
    1140           0 :                 GDKsyserror("closing log file %s failed", filename);
    1141           0 :                 if (MT_remove(filename) < 0)
    1142           0 :                         GDKsyserror("remove %s failed\n", filename);
    1143           0 :                 return GDK_FAIL;
    1144             :         }
    1145             :         return GDK_SUCCEED;
    1146             : }
    1147             : 
    1148             : #define rotation_lock(lg)       MT_lock_set(&(lg)->rotation_lock)
    1149             : #define rotation_unlock(lg)     MT_lock_unset(&(lg)->rotation_lock)
    1150             : #define rotation_trylock(lg, ms) MT_lock_trytime(&(lg)->rotation_lock, ms)
    1151             : 
    1152             : static gdk_return
    1153       13134 : log_open_output(logger *lg)
    1154             : {
    1155       13134 :         logged_range *new_range = (logged_range *) GDKmalloc(sizeof(logged_range));
    1156             : 
    1157       13134 :         if (!new_range) {
    1158           0 :                 TRC_CRITICAL(GDK, "allocation failure\n");
    1159           0 :                 return GDK_FAIL;
    1160             :         }
    1161       26267 :         if (!LOG_DISABLED(lg)) {
    1162       13133 :                 char id[32];
    1163       13133 :                 char filename[MAXPATH];
    1164             : 
    1165       13133 :                 if (snprintf(id, sizeof(id), LLFMT, lg->id) >= (int) sizeof(id)) {
    1166           0 :                         TRC_CRITICAL(GDK, "filename is too large\n");
    1167           0 :                         GDKfree(new_range);
    1168           0 :                         return GDK_FAIL;
    1169             :                 }
    1170       13133 :                 if (GDKfilepath(filename, sizeof(filename), BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id) != GDK_SUCCEED) {
    1171           0 :                         TRC_CRITICAL(GDK, "allocation failure\n");
    1172           0 :                         GDKfree(new_range);
    1173           0 :                         return GDK_FAIL;
    1174             :                 }
    1175             : 
    1176       13133 :                 TRC_INFO(WAL, "opening %s.%s", LOGFILE, id);
    1177       13133 :                 new_range->output_log = open_wstream(filename);
    1178       13133 :                 if (new_range->output_log) {
    1179       13133 :                         short byteorder = 1234;
    1180       13133 :                         mnstr_write(new_range->output_log, &byteorder, sizeof(byteorder), 1);
    1181             :                 }
    1182             : 
    1183       13133 :                 if (new_range->output_log == NULL || mnstr_errnr(new_range->output_log) != MNSTR_NO__ERROR) {
    1184           0 :                         TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, mnstr_peek_error(NULL));
    1185           0 :                         close_stream(new_range->output_log);
    1186           0 :                         GDKfree(new_range);
    1187           0 :                         return GDK_FAIL;
    1188             :                 }
    1189             :         } else {
    1190           1 :                 new_range->output_log = NULL;
    1191             :         }
    1192       13134 :         ATOMIC_INIT(&new_range->refcount, 1);
    1193       13134 :         ATOMIC_INIT(&new_range->last_ts, 0);
    1194       13134 :         ATOMIC_INIT(&new_range->flushed_ts, 0);
    1195       13134 :         ATOMIC_INIT(&new_range->drops, 0);
    1196       13134 :         new_range->id = lg->id;
    1197       13134 :         new_range->next = NULL;
    1198       13134 :         logged_range *current = lg->current;
    1199       13134 :         assert(current && current->next == NULL);
    1200       13134 :         new_range->cnt = current->cnt;
    1201       13134 :         current->next = new_range;
    1202       13134 :         lg->file_age = GDKusec();
    1203       13134 :         return GDK_SUCCEED;
    1204             : }
    1205             : 
    1206             : static inline void
    1207       13373 : log_close_input(logger *lg)
    1208             : {
    1209       13373 :         if (!lg->inmemory && lg->input_log) {
    1210       12890 :                 TRC_INFO(WAL, "closing input log %s", mnstr_name(lg->input_log));
    1211       12890 :                 close_stream(lg->input_log);
    1212             :         }
    1213       13373 :         lg->input_log = NULL;
    1214       13373 : }
    1215             : 
    1216             : static inline void
    1217         356 : log_close_output(logger *lg)
    1218             : {
    1219         356 :         if (!LOG_DISABLED(lg) && lg->current->output_log) {
    1220         355 :                 TRC_INFO(WAL, "closing output log %s", mnstr_name(lg->current->output_log));
    1221         355 :                 close_stream(lg->current->output_log);
    1222             :         }
    1223         356 :         lg->current->output_log = NULL;
    1224         356 : }
    1225             : 
    1226             : static gdk_return
    1227       13017 : log_open_input(logger *lg, const char *filename, bool *filemissing)
    1228             : {
    1229       13017 :         TRC_INFO(WAL, "opening input log %s", filename);
    1230       13017 :         lg->input_log = open_rstream(filename);
    1231             : 
    1232             :         /* if the file doesn't exist, there is nothing to be read back */
    1233       13017 :         if (lg->input_log == NULL || mnstr_errnr(lg->input_log) != MNSTR_NO__ERROR) {
    1234         127 :                 log_close_input(lg);
    1235         127 :                 *filemissing = true;
    1236         127 :                 return GDK_SUCCEED;
    1237             :         }
    1238       12890 :         short byteorder;
    1239       12890 :         switch (mnstr_read(lg->input_log, &byteorder, sizeof(byteorder), 1)) {
    1240           0 :         case -1:
    1241           0 :                 log_close_input(lg);
    1242           0 :                 TRC_CRITICAL(GDK, "read failed\n");
    1243           0 :                 return GDK_FAIL;
    1244           5 :         case 0:
    1245             :                 /* empty file is ok */
    1246           5 :                 log_close_input(lg);
    1247           5 :                 return GDK_SUCCEED;
    1248       12885 :         case 1:
    1249             :                 /* if not empty, must start with correct byte order mark */
    1250       12885 :                 if (byteorder != 1234) {
    1251           0 :                         TRC_CRITICAL(GDK, "incorrect byte order word in file %s\n", filename);
    1252           0 :                         log_close_input(lg);
    1253           0 :                         return GDK_FAIL;
    1254             :                 }
    1255             :                 break;
    1256             :         }
    1257             :         return GDK_SUCCEED;
    1258             : }
    1259             : 
    1260             : static log_return
    1261       12885 : log_read_transaction(logger *lg, uint32_t *updated, BUN maxupdated)
    1262             : {
    1263       12885 :         logformat l;
    1264       12885 :         trans *tr = NULL;
    1265       12885 :         log_return err = LOG_OK;
    1266       12885 :         bool ok = true;
    1267       12885 :         ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
    1268             : 
    1269       12885 :         (void) maxupdated;      /* only used inside assert() */
    1270             : 
    1271       12885 :         if (!lg->flushing)
    1272         202 :                 ATOMIC_AND(&GDKdebug, ~CHECKMASK);
    1273             : 
    1274       12885 :         BAT *cands = NULL;      /* used in case of LOG_BAT_GROUP */
    1275             : 
    1276      756669 :         while (err == LOG_OK && (ok = log_read_format(lg, &l))) {
    1277      743784 :                 if (l.flag == 0 && l.id == 0) {
    1278       12885 :                         err = LOG_EOF;
    1279             :                         break;
    1280             :                 }
    1281             : 
    1282      743784 :                 TRC_DEBUG_IF(WAL) {
    1283           0 :                         if (l.flag > 0 && l.flag != LOG_CLEAR &&
    1284             :                             l.flag < (bte) (sizeof(log_commands) / sizeof(log_commands[0])))
    1285           0 :                                 TRC_DEBUG_ENDIF(WAL, "%s %d", log_commands[(int) l.flag], l.id);
    1286             :                         else
    1287           0 :                                 TRC_DEBUG_ENDIF(WAL, "%d %d", l.flag, l.id);
    1288             :                 }
    1289      743784 :                 switch (l.flag) {
    1290      553165 :                 case LOG_UPDATE_CONST:
    1291             :                 case LOG_UPDATE_BULK:
    1292             :                 case LOG_UPDATE:
    1293             :                 case LOG_CREATE:
    1294             :                 case LOG_DESTROY:
    1295      553165 :                         if (tr != NULL && updated && BAThash(lg->catalog_id) == GDK_SUCCEED) {
    1296      551280 :                                 BATiter cni = bat_iterator(lg->catalog_id);
    1297      551280 :                                 BUN p;
    1298      551280 :                                 BUN posnew = BUN_NONE;
    1299      551280 :                                 BUN posold = BUN_NONE;
    1300      551280 :                                 MT_rwlock_rdlock(&cni.b->thashlock);
    1301     3047648 :                                 HASHloop_int(cni, cni.b->thash, p, &l.id) {
    1302     1932477 :                                         lng lid = *(lng *) Tloc(lg->catalog_lid, p);
    1303     1932477 :                                         if (lid == lng_nil || lid > tr->tid)
    1304             :                                                 posnew = p;
    1305      843337 :                                         else if (lid == tr->tid)
    1306     2343074 :                                                 posold = p;
    1307             :                                 }
    1308      551280 :                                 MT_rwlock_rdunlock(&cni.b->thashlock);
    1309      551280 :                                 bat_iterator_end(&cni);
    1310             :                                 /* Normally at this point, posnew is the
    1311             :                                  * location of the bat that this
    1312             :                                  * transaction is working on, and posold
    1313             :                                  * is the location of the previous
    1314             :                                  * version of the bat.  If LOG_CREATE,
    1315             :                                  * both are relevant, since the latter
    1316             :                                  * is the new bat, and the former is the
    1317             :                                  * to-be-destroyed bat.  For
    1318             :                                  * LOG_DESTROY, only posnew should be
    1319             :                                  * relevant, but for the other types, if
    1320             :                                  * the table is destroyed later in the
    1321             :                                  * same transaction, we need posold, and
    1322             :                                  * else (the normal case) we need
    1323             :                                  * posnew. */
    1324      551280 :                                 if (posnew != BUN_NONE) {
    1325      541921 :                                         assert(posnew < maxupdated);
    1326      541921 :                                         updated[posnew / 32] |= 1U << (posnew % 32);
    1327             :                                 }
    1328      551280 :                                 if ((l.flag == LOG_CREATE || posnew == BUN_NONE) && posold != BUN_NONE) {
    1329      160477 :                                         assert(posold < maxupdated);
    1330      160477 :                                         updated[posold / 32] |= 1U << (posold % 32);
    1331             :                                 }
    1332             :                         }
    1333             :                         break;
    1334             :                 default:
    1335             :                         /* do nothing */
    1336             :                         break;
    1337             :                 }
    1338             :                 /* the functions we call here can succeed (LOG_OK),
    1339             :                  * but they can also fail for two different reasons:
    1340             :                  * they can run out of input (LOG_EOF -- this is not
    1341             :                  * serious, we just abort the remaining transactions),
    1342             :                  * or some malloc or BAT update fails (LOG_ERR -- this
    1343             :                  * is serious, we must abort the complete process);
    1344             :                  * the latter failure causes the current function to
    1345             :                  * return GDK_FAIL */
    1346      743784 :                 switch (l.flag) {
    1347       56234 :                 case LOG_START:
    1348       56234 :                         assert(!lg->flushing || l.id <= lg->tid);
    1349       56234 :                         if (!lg->flushing && l.id > lg->tid)
    1350         127 :                                 lg->tid = l.id;      /* should only happen during initialization */
    1351       56234 :                         if ((tr = tr_create(tr, l.id)) == NULL) {
    1352           0 :                                 TRC_CRITICAL(GDK, "memory allocation failed\n");
    1353           0 :                                 err = LOG_ERR;
    1354           0 :                                 break;
    1355             :                         }
    1356       56234 :                         TRC_DEBUG(WAL, "tstart %d\n", tr->tid);
    1357             :                         break;
    1358       56234 :                 case LOG_END:
    1359       56234 :                         if (tr == NULL)
    1360             :                                 err = LOG_EOF;
    1361       56234 :                         else if (tr->tid != l.id)    /* abort record */
    1362           0 :                                 tr = tr_abort(lg, tr);
    1363             :                         else
    1364       56234 :                                 tr = tr_commit(lg, tr);
    1365             :                         break;
    1366        2773 :                 case LOG_SEQ:
    1367        2773 :                         err = log_read_seq(lg, &l);
    1368        2773 :                         break;
    1369      389096 :                 case LOG_UPDATE_CONST:
    1370             :                 case LOG_UPDATE_BULK:
    1371             :                 case LOG_UPDATE:
    1372      389096 :                         if (tr == NULL)
    1373             :                                 err = LOG_EOF;
    1374             :                         else {
    1375      610985 :                                 err = log_read_updates(lg, tr, &l, l.id, cands ? &cands : NULL);
    1376             :                         }
    1377             :                         break;
    1378      154662 :                 case LOG_CREATE:
    1379      154662 :                         if (tr == NULL)
    1380             :                                 err = LOG_EOF;
    1381             :                         else
    1382      154662 :                                 err = log_read_create(lg, tr, l.id);
    1383             :                         break;
    1384        9407 :                 case LOG_DESTROY:
    1385        9407 :                         if (tr == NULL)
    1386             :                                 err = LOG_EOF;
    1387             :                         else
    1388        9407 :                                 err = log_read_destroy(lg, tr, l.id);
    1389             :                         break;
    1390       75378 :                 case LOG_BAT_GROUP:
    1391       75378 :                         if (tr == NULL)
    1392             :                                 err = LOG_EOF;
    1393             :                         else {
    1394       75378 :                                 if (l.id > 0) {
    1395             :                                         /* START OF LOG_BAT_GROUP */
    1396       37689 :                                         cands = COLnew(0, TYPE_void, 0, SYSTRANS);
    1397       37689 :                                         if (!cands) {
    1398           0 :                                                 TRC_CRITICAL(GDK, "creating bat failed\n");
    1399           0 :                                                 err = LOG_ERR;
    1400             :                                         }
    1401       37689 :                                 } else if (cands == NULL) {
    1402             :                                         /* should have gone through the
    1403             :                                          * above option earlier */
    1404           0 :                                         TRC_CRITICAL(GDK, "unexpected error\n");
    1405           0 :                                         err = LOG_ERR;
    1406             :                                 } else {
    1407             :                                         /* END OF LOG_BAT_GROUP */
    1408       37689 :                                         BBPunfix(cands->batCacheid);
    1409       37689 :                                         cands = NULL;
    1410             :                                 }
    1411             :                         }
    1412             :                         break;
    1413           0 :                 default:
    1414           0 :                         TRC_CRITICAL(GDK, "unrecognized log entry %d", l.flag);
    1415           0 :                         err = LOG_ERR;
    1416             :                 }
    1417      743784 :                 if (tr == (trans *) -1) {
    1418             :                         /* message already generated by tr_commit */
    1419             :                         err = LOG_ERR;
    1420       12885 :                         tr = NULL;
    1421             :                         break;
    1422             :                 }
    1423             :         }
    1424       12885 :         while (tr) {
    1425           0 :                 TRC_WARNING(GDK, "aborting transaction\n");
    1426           0 :                 tr = tr_abort(lg, tr);
    1427             :         }
    1428       12885 :         if (!lg->flushing)
    1429         202 :                 ATOMIC_SET(&GDKdebug, dbg);
    1430             : 
    1431       12885 :         BBPreclaim(cands);
    1432       12885 :         if (!ok)
    1433       12885 :                 return LOG_EOF;
    1434             :         return err;
    1435             : }
    1436             : 
    1437             : static gdk_return
    1438         334 : log_readlog(logger *lg, const char *filename, bool *filemissing)
    1439             : {
    1440         334 :         log_return err = LOG_OK;
    1441         334 :         time_t t0, t1;
    1442         334 :         struct stat sb;
    1443             : 
    1444         334 :         assert(!lg->inmemory);
    1445             : 
    1446         334 :         TRC_INFO(WAL, "opening %s\n", filename);
    1447             : 
    1448         334 :         gdk_return res = log_open_input(lg, filename, filemissing);
    1449         334 :         if (!lg->input_log || res != GDK_SUCCEED)
    1450             :                 return res;
    1451         202 :         int fd;
    1452         202 :         if ((fd = getFileNo(lg->input_log)) < 0 || fstat(fd, &sb) < 0) {
    1453           0 :                 GDKsyserror("fstat on opened file %s failed\n", filename);
    1454           0 :                 log_close_input(lg);
    1455             :                 /* If the file could be opened, but fstat fails,
    1456             :                  * something weird is going on */
    1457           0 :                 return GDK_FAIL;
    1458             :         }
    1459         202 :         t0 = time(NULL);
    1460         202 :         TRC_INFO_IF(WAL) {
    1461           0 :                 TRC_INFO_ENDIF(WAL, "Start reading the write-ahead log '%s'\n", filename);
    1462           0 :                 GDKtracer_flush_buffer();
    1463             :         }
    1464         404 :         while (err != LOG_EOF && err != LOG_ERR) {
    1465         202 :                 t1 = time(NULL);
    1466         202 :                 if (t1 - t0 > 10) {
    1467           0 :                         lng fpos;
    1468           0 :                         t0 = t1;
    1469             :                         /* not more than once every 10 seconds */
    1470           0 :                         fpos = (lng) getfilepos(getFile(lg->input_log));
    1471           0 :                         TRC_INFO_IF(WAL) {
    1472           0 :                                 if (fpos >= 0) {
    1473           0 :                                         TRC_INFO_ENDIF(WAL, "still reading write-ahead log \"%s\" (%d%% done)\n",
    1474             :                                                        filename, (int) ((fpos * 100 + 50) / sb.st_size));
    1475           0 :                                         GDKtracer_flush_buffer();
    1476             :                                 }
    1477             :                         }
    1478             :                 }
    1479         202 :                 err = log_read_transaction(lg, NULL, 0);
    1480             :         }
    1481         202 :         log_close_input(lg);
    1482         202 :         lg->input_log = NULL;
    1483             : 
    1484             :         /* remaining transactions are not committed, ie abort */
    1485         202 :         TRC_INFO_IF(WAL) {
    1486           0 :                 TRC_INFO_ENDIF(WAL, "Finished reading the write-ahead log '%s'\n", filename);
    1487           0 :                 GDKtracer_flush_buffer();
    1488             :         }
    1489             :         /* we cannot distinguish errors from incomplete transactions
    1490             :          * (even if we would log aborts in the logs). So we simply
    1491             :          * abort and move to the next log file */
    1492         202 :         return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
    1493             : }
    1494             : 
    1495             : /*
    1496             :  * The log files are incrementally numbered, starting from 2. They are
    1497             :  * processed in the same sequence.
    1498             :  */
    1499             : static gdk_return
    1500         127 : log_readlogs(logger *lg, const char *filename)
    1501             : {
    1502         127 :         gdk_return res = GDK_SUCCEED;
    1503             : 
    1504         127 :         assert(!lg->inmemory);
    1505         127 :         TRC_DEBUG(WAL, "logger id is " LLFMT " last logger id is " LLFMT "\n", lg->id, lg->saved_id);
    1506             : 
    1507         127 :         char log_filename[FILENAME_MAX];
    1508         127 :         if (lg->saved_id >= lg->id) {
    1509         127 :                 bool filemissing = false;
    1510             : 
    1511         127 :                 lg->id = lg->saved_id + 1;
    1512         461 :                 while (res == GDK_SUCCEED && !filemissing) {
    1513         334 :                         if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
    1514           0 :                                 GDKerror("Logger filename path is too large\n");
    1515           0 :                                 return GDK_FAIL;
    1516             :                         }
    1517         334 :                         res = log_readlog(lg, log_filename, &filemissing);
    1518         334 :                         if (!filemissing) {
    1519         207 :                                 lg->saved_id++;
    1520         207 :                                 lg->id++;
    1521             :                         }
    1522             :                 }
    1523             :         }
    1524             :         return res;
    1525             : }
    1526             : 
    1527             : static gdk_return
    1528       13112 : log_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
    1529             : {
    1530       13112 :         TRC_DEBUG(WAL, "commit");
    1531             : 
    1532       13112 :         return bm_commit(lg, pending, updated, maxupdated);
    1533             : }
    1534             : 
    1535             : static gdk_return
    1536         127 : check_version(logger *lg, FILE *fp, bool *needsnew)
    1537             : {
    1538         127 :         int version = 0;
    1539             : 
    1540         127 :         assert(!lg->inmemory);
    1541         127 :         if (fscanf(fp, "%6d", &version) != 1) {
    1542           0 :                 GDKerror("Could not read the version number from the file '%s/log'.\n", lg->dir);
    1543           0 :                 fclose(fp);
    1544           0 :                 return GDK_FAIL;
    1545             :         }
    1546         127 :         if (version != lg->version) {
    1547          32 :                 if (lg->prefuncp == NULL ||
    1548          16 :                     (*lg->prefuncp) (lg->funcdata, version, lg->version) != GDK_SUCCEED) {
    1549           0 :                         GDKerror("Incompatible database version %06d, "
    1550             :                                  "this server supports version %06d.\n%s",
    1551             :                                  version, lg->version,
    1552             :                                  version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
    1553           0 :                         fclose(fp);
    1554           0 :                         return GDK_FAIL;
    1555             :                 }
    1556          16 :                 *needsnew = true;       /* we need to write a new log file */
    1557             :         } else {
    1558         111 :                 lg->postfuncp = NULL;        /* don't call */
    1559         111 :                 *needsnew = false;      /* log file already up-to-date */
    1560             :         }
    1561         254 :         if (fgetc(fp) != '\n' ||        /* skip \n */
    1562         127 :             fgetc(fp) != '\n') {        /* skip \n */
    1563           0 :                 GDKerror("Badly formatted log file");
    1564           0 :                 fclose(fp);
    1565           0 :                 return GDK_FAIL;
    1566             :         }
    1567         127 :         if (log_read_types_file(lg, fp, version, needsnew) != GDK_SUCCEED) {
    1568           0 :                 fclose(fp);
    1569           0 :                 return GDK_FAIL;
    1570             :         }
    1571         127 :         fclose(fp);
    1572         127 :         return GDK_SUCCEED;
    1573             : }
    1574             : 
    1575             : static BAT *
    1576         357 : bm_tids(BAT *b, BAT *d)
    1577             : {
    1578         357 :         BUN sz = BATcount(b);
    1579         357 :         BAT *tids = BATdense(0, 0, sz);
    1580             : 
    1581         357 :         if (tids == NULL)
    1582             :                 return NULL;
    1583             : 
    1584         357 :         if (BATcount(d)) {
    1585         357 :                 BAT *diff = BATdiff(tids, d, NULL, NULL, false, false, BUN_NONE);
    1586         357 :                 logbat_destroy(tids);
    1587         357 :                 tids = diff;
    1588             :         }
    1589             :         return tids;
    1590             : }
    1591             : 
    1592             : 
    1593             : static gdk_return
    1594       10038 : log_switch_bat(BAT *old, BAT *new, const char *fn, const char *name)
    1595             : {
    1596       10038 :         char bak[IDLENGTH];
    1597             : 
    1598       10038 :         if (BATmode(old, true) != GDK_SUCCEED) {
    1599           0 :                 GDKerror("cannot convert old %s to transient", name);
    1600           0 :                 return GDK_FAIL;
    1601             :         }
    1602       10038 :         if (strconcat_len(bak, sizeof(bak), fn, "_", name, NULL) >= sizeof(bak)) {
    1603           0 :                 GDKerror("name %s_%s too long\n", fn, name);
    1604           0 :                 return GDK_FAIL;
    1605             :         }
    1606       10038 :         if (BBPrename(old, NULL) != 0 || BBPrename(new, bak) != 0) {
    1607           0 :                 GDKerror("rename (%s) failed\n", bak);
    1608           0 :                 return GDK_FAIL;
    1609             :         }
    1610       10038 :         BBPretain(new->batCacheid);
    1611       10038 :         return GDK_SUCCEED;
    1612             : }
    1613             : 
    1614             : static gdk_return
    1615         357 : bm_get_counts(logger *lg)
    1616             : {
    1617         357 :         BUN p, q;
    1618         357 :         const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
    1619             : 
    1620       33173 :         BATloop(lg->catalog_bid, p, q) {
    1621       32816 :                 oid pos = p;
    1622       32816 :                 lng cnt = 0;
    1623       32816 :                 lng lid = lng_nil;
    1624             : 
    1625       32816 :                 if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
    1626       32816 :                         BAT *b = BBPquickdesc(bids[p]);
    1627       32816 :                         assert(b);
    1628       32816 :                         cnt = BATcount(b);
    1629             :                 } else {
    1630           0 :                         lid = BBP_desc(bids[p])->batCacheid != 0 && log_find(lg->catalog_bid, lg->dcatalog, bids[p]) == BUN_NONE ? 1 : -1;
    1631             :                 }
    1632       32816 :                 if (BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED)
    1633           0 :                         return GDK_FAIL;
    1634       32816 :                 if (BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
    1635             :                         return GDK_FAIL;
    1636             :         }
    1637             :         return GDK_SUCCEED;
    1638             : }
    1639             : 
    1640             : static int
    1641       10038 : subcommit_list_add(int next, bat *n, BUN *sizes, bat bid, BUN sz)
    1642             : {
    1643       10038 :         assert(sz <= BBP_desc(bid)->batCount || sz == BUN_NONE);
    1644     2243970 :         for (int i = 0; i < next; i++) {
    1645     2233932 :                 if (n[i] == bid) {
    1646           0 :                         sizes[i] = sz;
    1647           0 :                         return next;
    1648             :                 }
    1649             :         }
    1650       10038 :         n[next] = bid;
    1651       10038 :         sizes[next++] = sz;
    1652       10038 :         return next;
    1653             : }
    1654             : 
    1655             : static int
    1656        3108 : cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng *cnts,
    1657             :                  BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, BUN cleanup)
    1658             : {
    1659        3108 :         BAT *nbids, *noids, *ncnts, *nlids, *ndels;
    1660        3108 :         BUN p, q;
    1661        3108 :         int err = 0, rcnt = 0;
    1662             : 
    1663        3108 :         BUN ocnt = BATcount(catalog_bid);
    1664        3108 :         nbids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
    1665        3108 :         noids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
    1666        3108 :         ncnts = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
    1667        3108 :         nlids = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
    1668        3108 :         ndels = logbat_new(TYPE_oid, BATcount(dcatalog) - cleanup, PERSISTENT);
    1669             : 
    1670        3108 :         if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == NULL || ndels == NULL) {
    1671           0 :                 logbat_destroy(nbids);
    1672           0 :                 logbat_destroy(noids);
    1673           0 :                 logbat_destroy(ncnts);
    1674           0 :                 logbat_destroy(nlids);
    1675           0 :                 logbat_destroy(ndels);
    1676           0 :                 return 0;
    1677             :         }
    1678             : 
    1679        3108 :         oid *poss = Tloc(dcatalog, 0);
    1680      180389 :         BATloop(dcatalog, p, q) {
    1681      177281 :                 oid pos = poss[p];
    1682             : 
    1683      177281 :                 if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid)
    1684           0 :                         continue;
    1685             : 
    1686      177281 :                 if (lids[pos] >= 0) {
    1687      177281 :                         bat bid = bids[pos];
    1688      177281 :                         BAT *lb = BBP_desc(bid);
    1689             : 
    1690      177281 :                         if (lb->batCacheid == 0 || BATmode(lb, true /*transient */ ) != GDK_SUCCEED) {
    1691           0 :                                 GDKwarning("Failed to set bat(%d) transient\n", bid);
    1692             :                         } else {
    1693      177281 :                                 lids[pos] = -1; /* mark as transient */
    1694      177281 :                                 r[rcnt++] = bid;
    1695             :                         }
    1696             :                 }
    1697             :         }
    1698             : 
    1699        3108 :         int *oids = (int *) Tloc(catalog_id, 0);
    1700        3108 :         q = BATcount(catalog_bid);
    1701     1089323 :         for (p = 0; p < q && !err; p++) {
    1702     1086215 :                 bat col = bids[p];
    1703     1086215 :                 int nid = oids[p];
    1704     1086215 :                 lng lid = lids[p];
    1705     1086215 :                 lng cnt = cnts[p];
    1706     1086215 :                 oid pos = p;
    1707             : 
    1708             :                 /* only project out the deleted with lid == -1
    1709             :                  * update dcatalog */
    1710     1086215 :                 if (lid == -1)
    1711      177281 :                         continue;       /* remove */
    1712             : 
    1713     1817868 :                 if (BUNappend(nbids, &col, true) != GDK_SUCCEED ||
    1714     1817868 :                     BUNappend(noids, &nid, true) != GDK_SUCCEED ||
    1715     1817868 :                     BUNappend(nlids, &lid, false) != GDK_SUCCEED ||
    1716      908934 :                     BUNappend(ncnts, &cnt, false) != GDK_SUCCEED)
    1717             :                         err = 1;
    1718      908934 :                 if (BUNfnd(lg->dcatalog, &pos) != BUN_NONE) {
    1719           0 :                         pos = (oid) (BATcount(nbids) - 1);
    1720           0 :                         if (BUNappend(ndels, &pos, true) != GDK_SUCCEED)
    1721      908934 :                                 err = 1;
    1722             :                 }
    1723             :         }
    1724             : 
    1725        3108 :         if (err) {
    1726           0 :                 logbat_destroy(nbids);
    1727           0 :                 logbat_destroy(noids);
    1728           0 :                 logbat_destroy(ndels);
    1729           0 :                 logbat_destroy(ncnts);
    1730           0 :                 logbat_destroy(nlids);
    1731           0 :                 return 0;
    1732             :         }
    1733             :         /* point of no return */
    1734        6216 :         if (log_switch_bat(catalog_bid, nbids, lg->fn, "catalog_bid") != GDK_SUCCEED ||
    1735        6216 :             log_switch_bat(catalog_id, noids, lg->fn, "catalog_id") != GDK_SUCCEED ||
    1736        3108 :             log_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != GDK_SUCCEED) {
    1737           0 :                 logbat_destroy(nbids);
    1738           0 :                 logbat_destroy(noids);
    1739           0 :                 logbat_destroy(ndels);
    1740           0 :                 logbat_destroy(ncnts);
    1741           0 :                 logbat_destroy(nlids);
    1742           0 :                 return -1;
    1743             :         }
    1744        3108 :         r[rcnt++] = lg->catalog_bid->batCacheid;
    1745        3108 :         r[rcnt++] = lg->catalog_id->batCacheid;
    1746        3108 :         r[rcnt++] = lg->dcatalog->batCacheid;
    1747             : 
    1748        3108 :         assert(BATcount(lg->dcatalog) - cleanup == BATcount(ndels));
    1749             : 
    1750        3108 :         logbat_destroy(lg->catalog_bid);
    1751        3108 :         logbat_destroy(lg->catalog_id);
    1752        3108 :         logbat_destroy(lg->dcatalog);
    1753             : 
    1754        3108 :         lg->catalog_bid = nbids;
    1755        3108 :         lg->catalog_id = noids;
    1756        3108 :         lg->dcatalog = ndels;
    1757             : 
    1758             :         /* failing to rename these two bats is not fatal */
    1759        3108 :         if (BBPrename(lg->catalog_cnt, NULL) != GDK_SUCCEED)
    1760        3108 :                 GDKclrerr();
    1761        3108 :         if (BBPrename(lg->catalog_lid, NULL) != GDK_SUCCEED)
    1762        3108 :                 GDKclrerr();
    1763        3108 :         BBPunfix(lg->catalog_cnt->batCacheid);
    1764        3108 :         BBPunfix(lg->catalog_lid->batCacheid);
    1765             : 
    1766        3108 :         lg->catalog_cnt = ncnts;
    1767        3108 :         lg->catalog_lid = nlids;
    1768        3108 :         char bak[FILENAME_MAX];
    1769        3108 :         strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_cnt", NULL);
    1770        3108 :         if (BBPrename(lg->catalog_cnt, bak) < 0)
    1771           0 :                 GDKclrerr();
    1772        3108 :         strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_lid", NULL);
    1773        3108 :         if (BBPrename(lg->catalog_lid, bak) < 0)
    1774           0 :                 GDKclrerr();
    1775        3108 :         rotation_lock(lg);
    1776       12333 :         for (logged_range *p = lg->pending; p; p = p->next) {
    1777        9225 :                 p->cnt -= cleanup;
    1778             :         }
    1779        3108 :         rotation_unlock(lg);
    1780        3108 :         return rcnt;
    1781             : }
    1782             : 
    1783             : /* this function is called with log_lock() held; it releases the lock
    1784             :  * before returning */
    1785             : static gdk_return
    1786       13572 : bm_subcommit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
    1787             : {
    1788       13572 :         BUN cnt = pending ? pending->cnt : BATcount(lg->catalog_bid);
    1789       13572 :         BUN dcnt = BATcount(lg->dcatalog);
    1790       13572 :         BUN p, q;
    1791       13572 :         BAT *catalog_bid = lg->catalog_bid;
    1792       13572 :         BAT *catalog_id = lg->catalog_id;
    1793       13572 :         BAT *dcatalog = lg->dcatalog;
    1794       13572 :         BUN nn = 13 + cnt;
    1795       13572 :         bat *n = GDKmalloc(sizeof(bat) * nn);
    1796       13572 :         bat *r = GDKmalloc(sizeof(bat) * nn);
    1797       13572 :         BUN *sizes = GDKmalloc(sizeof(BUN) * nn);
    1798       13572 :         int i = 0, rcnt = 0;
    1799       13572 :         gdk_return res;
    1800       13572 :         const log_bid *bids;
    1801       13572 :         lng *cnts = NULL, *lids = NULL;
    1802       13572 :         BUN cleanup = 0;
    1803       13572 :         lng t0 = 0;
    1804             : 
    1805       13572 :         if (n == NULL || r == NULL || sizes == NULL) {
    1806           0 :                 GDKfree(n);
    1807           0 :                 GDKfree(r);
    1808           0 :                 GDKfree(sizes);
    1809           0 :                 log_unlock(lg);
    1810           0 :                 return GDK_FAIL;
    1811             :         }
    1812             : 
    1813       13572 :         sizes[i] = 0;
    1814       13572 :         n[i++] = 0;             /* n[0] is not used */
    1815       13572 :         bids = (const log_bid *) Tloc(catalog_bid, 0);
    1816       13572 :         if (lg->catalog_cnt)
    1817       13342 :                 cnts = (lng *) Tloc(lg->catalog_cnt, 0);
    1818       13572 :         if (lg->catalog_lid)
    1819       13342 :                 lids = (lng *) Tloc(lg->catalog_lid, 0);
    1820     3991209 :         BATloop(catalog_bid, p, q) {
    1821     3977637 :                 if (lids && lids[p] != lng_nil && lids[p] <= lg->saved_tid) {
    1822      177281 :                         cleanup++;
    1823      177281 :                         if (lids[p] == -1)
    1824           0 :                                 continue;
    1825      354530 :                         if (BUNfnd(dcatalog, &(oid){p}) == BUN_NONE &&
    1826      177249 :                             BUNappend(dcatalog, &(oid){p}, true) != GDK_SUCCEED) {
    1827           0 :                                 while (BATcount(dcatalog) > dcnt) {
    1828           0 :                                         if (BUNdelete(dcatalog, BATcount(dcatalog) - 1) != GDK_SUCCEED) {
    1829           0 :                                                 TRC_CRITICAL(WAL, "delete after failed append failed\n");
    1830           0 :                                                 break;
    1831             :                                         }
    1832             :                                 }
    1833           0 :                                 GDKfree(n);
    1834           0 :                                 GDKfree(r);
    1835           0 :                                 GDKfree(sizes);
    1836           0 :                                 log_unlock(lg);
    1837           0 :                                 return GDK_FAIL;
    1838             :                         }
    1839             :                 }
    1840     3977637 :                 if (updated && p < maxupdated && (updated[p / 32] & (1U << (p % 32))) == 0) {
    1841     1953078 :                         continue;
    1842             :                 }
    1843     2024559 :                 bat col = bids[p];
    1844             : 
    1845     2024559 :                 TRC_DEBUG(WAL, "new %s (%d)\n", BBP_logical(col), col);
    1846     2024559 :                 assert(col);
    1847     2024559 :                 sizes[i] = cnts ? (BUN) cnts[p] : 0;
    1848     2024559 :                 n[i++] = col;
    1849             :         }
    1850             :         /* now commit catalog, so it's also up to date on disk */
    1851       13572 :         sizes[i] = cnt;
    1852       13572 :         n[i++] = catalog_bid->batCacheid;
    1853       13572 :         sizes[i] = cnt;
    1854       13572 :         n[i++] = catalog_id->batCacheid;
    1855       13572 :         sizes[i] = BATcount(dcatalog);
    1856       13572 :         n[i++] = dcatalog->batCacheid;
    1857             : 
    1858       13572 :         if (cleanup) {
    1859        3108 :                 if ((rcnt = cleanup_and_swap(lg, r, bids, lids, cnts,
    1860             :                                              catalog_bid, catalog_id, dcatalog,
    1861             :                                              cleanup)) < 0) {
    1862           0 :                         GDKfree(n);
    1863           0 :                         GDKfree(r);
    1864           0 :                         GDKfree(sizes);
    1865           0 :                         log_unlock(lg);
    1866           0 :                         return GDK_FAIL;
    1867             :                 }
    1868        3108 :                 cnt -= cleanup;
    1869             :         }
    1870       13572 :         if (dcatalog != lg->dcatalog) {
    1871        3108 :                 i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, cnt);
    1872        3108 :                 i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, cnt);
    1873        3108 :                 i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog));
    1874             :         }
    1875       13572 :         if (lg->seqs_id) {
    1876       13342 :                 sizes[i] = BATcount(lg->seqs_id);
    1877       13342 :                 n[i++] = lg->seqs_id->batCacheid;
    1878       13342 :                 sizes[i] = BATcount(lg->seqs_id);
    1879       13342 :                 n[i++] = lg->seqs_val->batCacheid;
    1880             :         }
    1881       13572 :         if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id) / 2) && BATcount(lg->dseqs) > 10) {
    1882         357 :                 BAT *tids, *ids, *vals;
    1883             : 
    1884         357 :                 tids = bm_tids(lg->seqs_id, lg->dseqs);
    1885         357 :                 if (tids == NULL) {
    1886           0 :                         GDKfree(n);
    1887           0 :                         GDKfree(r);
    1888           0 :                         GDKfree(sizes);
    1889           0 :                         log_unlock(lg);
    1890           0 :                         return GDK_FAIL;
    1891             :                 }
    1892         357 :                 ids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
    1893         357 :                 vals = logbat_new(TYPE_lng, BATcount(tids), PERSISTENT);
    1894             : 
    1895         357 :                 if (ids == NULL || vals == NULL) {
    1896           0 :                         logbat_destroy(tids);
    1897           0 :                         logbat_destroy(ids);
    1898           0 :                         logbat_destroy(vals);
    1899           0 :                         GDKfree(n);
    1900           0 :                         GDKfree(r);
    1901           0 :                         GDKfree(sizes);
    1902           0 :                         log_unlock(lg);
    1903           0 :                         return GDK_FAIL;
    1904             :                 }
    1905             : 
    1906         714 :                 if (BATappend(ids, lg->seqs_id, tids, true) != GDK_SUCCEED ||
    1907         357 :                     BATappend(vals, lg->seqs_val, tids, true) != GDK_SUCCEED) {
    1908           0 :                         logbat_destroy(tids);
    1909           0 :                         logbat_destroy(ids);
    1910           0 :                         logbat_destroy(vals);
    1911           0 :                         GDKfree(n);
    1912           0 :                         GDKfree(r);
    1913           0 :                         GDKfree(sizes);
    1914           0 :                         log_unlock(lg);
    1915           0 :                         return GDK_FAIL;
    1916             :                 }
    1917         357 :                 logbat_destroy(tids);
    1918         357 :                 BATclear(lg->dseqs, true);
    1919             : 
    1920         714 :                 if (log_switch_bat(lg->seqs_id, ids, lg->fn, "seqs_id") != GDK_SUCCEED ||
    1921         357 :                     log_switch_bat(lg->seqs_val, vals, lg->fn, "seqs_val") != GDK_SUCCEED) {
    1922           0 :                         logbat_destroy(ids);
    1923           0 :                         logbat_destroy(vals);
    1924           0 :                         GDKfree(n);
    1925           0 :                         GDKfree(r);
    1926           0 :                         GDKfree(sizes);
    1927           0 :                         log_unlock(lg);
    1928           0 :                         return GDK_FAIL;
    1929             :                 }
    1930         357 :                 i = subcommit_list_add(i, n, sizes, ids->batCacheid, BATcount(ids));
    1931         357 :                 i = subcommit_list_add(i, n, sizes, vals->batCacheid, BATcount(ids));
    1932             : 
    1933         357 :                 if (BBP_lrefs(lg->seqs_id->batCacheid) > 0)
    1934         282 :                         r[rcnt++] = lg->seqs_id->batCacheid;
    1935         357 :                 if (BBP_lrefs(lg->seqs_val->batCacheid) > 0)
    1936         282 :                         r[rcnt++] = lg->seqs_val->batCacheid;
    1937             : 
    1938         357 :                 logbat_destroy(lg->seqs_id);
    1939         357 :                 logbat_destroy(lg->seqs_val);
    1940             : 
    1941         357 :                 lg->seqs_id = ids;
    1942         357 :                 lg->seqs_val = vals;
    1943             :         }
    1944       13572 :         if (lg->seqs_id) {
    1945       13342 :                 sizes[i] = BATcount(lg->dseqs);
    1946       13342 :                 n[i++] = lg->dseqs->batCacheid;
    1947             :         }
    1948             : 
    1949       13572 :         assert((BUN) i <= nn);
    1950       13572 :         log_unlock(lg);
    1951       13572 :         TRC_DEBUG_IF(WAL)
    1952           0 :                 t0 = GDKusec();
    1953       13802 :         res = TMsubcommit_list(n, cnts ? sizes : NULL, i, lg->saved_id);
    1954       13572 :         TRC_DEBUG(WAL, "subcommit " LLFMT "usec\n", GDKusec() - t0);
    1955       13572 :         if (res == GDK_SUCCEED) {       /* now cleanup */
    1956      200741 :                 for (i = 0; i < rcnt; i++) {
    1957      187169 :                         TRC_DEBUG_IF(WAL) {
    1958           0 :                                 TRC_DEBUG_ENDIF(WAL, "release %d\n", r[i]);
    1959           0 :                                 if (BBP_lrefs(r[i]) != 2)
    1960           0 :                                         TRC_DEBUG_ENDIF(WAL, "release %d %d\n", r[i], BBP_lrefs(r[i]));
    1961             :                         }
    1962      187169 :                         BBPrelease(r[i]);
    1963             :                 }
    1964             :         }
    1965       13572 :         GDKfree(n);
    1966       13572 :         GDKfree(r);
    1967       13572 :         GDKfree(sizes);
    1968       13572 :         if (res != GDK_SUCCEED)
    1969           0 :                 TRC_CRITICAL(GDK, "commit failed\n");
    1970             :         return res;
    1971             : }
    1972             : 
    1973             : static gdk_return
    1974         356 : log_filename(logger *lg, char bak[FILENAME_MAX], char filename[FILENAME_MAX])
    1975             : {
    1976         356 :         if (GDKfilepath(filename, FILENAME_MAX, 0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED) {
    1977           0 :                 GDKerror("Logger filename path is too large\n");
    1978           0 :                 return GDK_FAIL;
    1979             :         }
    1980         356 :         if (bak) {
    1981         356 :                 if (strconcat_len(bak, FILENAME_MAX, filename, ".bak", NULL) >= FILENAME_MAX) {
    1982           0 :                         GDKerror("Logger filename path is too large\n");
    1983           0 :                         return GDK_FAIL;
    1984             :                 }
    1985             :         }
    1986             :         return GDK_SUCCEED;
    1987             : }
    1988             : 
    1989             : static gdk_return
    1990       12890 : log_cleanup(logger *lg, lng id)
    1991             : {
    1992       12890 :         char log_id[FILENAME_MAX];
    1993             : 
    1994       12890 :         if (snprintf(log_id, sizeof(log_id), LLFMT, id) >= FILENAME_MAX) {
    1995           0 :                 GDKerror("log_id filename is too large\n");
    1996           0 :                 return GDK_FAIL;
    1997             :         }
    1998       12890 :         if (GDKunlink(0, lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
    1999           0 :                 GDKwarning("failed to remove old WAL %s.%s\n", LOGFILE, log_id);
    2000           0 :                 GDKclrerr();    /* clear error from unlink */
    2001             :         }
    2002             :         return GDK_SUCCEED;
    2003             : }
    2004             : 
    2005             : #ifdef GDKLIBRARY_JSON
    2006             : static gdk_return
    2007         357 : log_json_upgrade_finalize(void)
    2008             : {
    2009         357 :         int json_tpe = ATOMindex("json");
    2010         713 :         if (!GDKinmemory(0) &&
    2011         356 :             GDKunlink(0, BATDIR, "jsonupgradeneeded", NULL) == GDK_FAIL) {
    2012           0 :                 TRC_CRITICAL(GDK, "Failed to remove json upgrade signal file");
    2013           0 :                 return GDK_FAIL;
    2014             :         }
    2015         357 :         BATatoms[json_tpe].atomRead = (void *(*)(void *, size_t *, stream *, size_t))strRead;
    2016             : 
    2017         357 :         return GDK_SUCCEED;
    2018             : }
    2019             : #endif
    2020             : 
    2021             : /* clean up old junk left over from old upgrades: bats that are
    2022             :  * persistent but not in the SQL catalog and that have no name, and bats
    2023             :  * that do have a name that starts with "stat_opt_" (from the statistics
    2024             :  * optimizer that was removed in 2017) are removed here
    2025             :  *
    2026             :  * this function ignores any errors */
    2027             : static void
    2028           8 : clean_bbp(logger *lg)
    2029             : {
    2030           8 :         BAT *b = COLnew(0, TYPE_int, 256, TRANSIENT);
    2031           8 :         if (b == NULL)
    2032             :                 return;
    2033           8 :         if (BUNappend(b, &(int){0}, false) != GDK_SUCCEED) {
    2034           0 :                 BBPreclaim(b);
    2035           0 :                 return;
    2036             :         }
    2037             :         /* mark persistent bats that have no name or have a name
    2038             :          * starting with "stat_opt_" */
    2039       16926 :         for (bat bid = 1, bsz = getBBPsize(); bid < bsz; bid++)
    2040       16918 :                 if (BBP_status(bid) & BBPEXISTING &&
    2041        2576 :                     (BBP_logical(bid) == NULL ||
    2042        2576 :                      strncmp(BBP_logical(bid), "tmp_", 4) == 0 ||
    2043          80 :                      strncmp(BBP_logical(bid), "stat_opt_", 9) == 0))
    2044        2528 :                         BBP_status_on(bid, 1U << 31);
    2045             :         /* remove mark from bats that are in the SQL catalog */
    2046        2216 :         for (BUN i = 0, n = BATcount(lg->catalog_bid); i < n; i++)
    2047        2208 :                 BBP_status_off(((int *) lg->catalog_bid->theap->base)[i], 1U << 31);
    2048             :         /* what's left over are junk bats */
    2049       16926 :         for (bat bid = 1, bsz = getBBPsize(); bid < bsz; bid++)
    2050       16918 :                 if (BBP_status(bid) & (1U << 31)) {
    2051         320 :                         BBP_status_off(bid, 1U << 31);
    2052         640 :                         if (BATmode(BBP_desc(bid), true) != GDK_SUCCEED ||
    2053         320 :                             BUNappend(b, &bid, false) != GDK_SUCCEED) {
    2054           0 :                                 BBPreclaim(b);
    2055           0 :                                 return;
    2056             :                         }
    2057         320 :                         printf("# removing bat %d (tmp_%o)\n", bid, bid);
    2058             :                 }
    2059             :         /* if there were any junk bats, commit their removal */
    2060          16 :         if (b->batCount > 1 &&
    2061           8 :             TMsubcommit_list(Tloc(b, 0), NULL, (int) b->batCount, -1) != GDK_SUCCEED)
    2062           0 :                 printf("clean_bbp transaction failed\n");
    2063           8 :         BBPreclaim(b);
    2064             : }
    2065             : 
    2066             : /* Load data from the logger logdir
    2067             :  * Initialize new directories and catalog files if none are present,
    2068             :  * unless running in read-only mode
    2069             :  * Load data and persist it in the BATs */
    2070             : static gdk_return
    2071         357 : log_load(const char *fn, logger *lg, char filename[FILENAME_MAX])
    2072             : {
    2073         357 :         FILE *fp = NULL;
    2074         357 :         char bak[FILENAME_MAX];
    2075         357 :         bat catalog_bid, catalog_id, dcatalog;
    2076         357 :         bool needcommit = false;
    2077         357 :         ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
    2078         357 :         bool readlogs = false;
    2079         357 :         bool needsnew = false;  /* need to write new log file? */
    2080             : 
    2081             :         /* refactor */
    2082         357 :         if (!LOG_DISABLED(lg)) {
    2083         356 :                 if (log_filename(lg, bak, filename) != GDK_SUCCEED)
    2084           0 :                         goto error;
    2085             :         }
    2086             : 
    2087         357 :         lg->catalog_bid = NULL;
    2088         357 :         lg->catalog_id = NULL;
    2089         357 :         lg->catalog_cnt = NULL;
    2090         357 :         lg->catalog_lid = NULL;
    2091         357 :         lg->dcatalog = NULL;
    2092             : 
    2093         357 :         lg->seqs_id = NULL;
    2094         357 :         lg->seqs_val = NULL;
    2095         357 :         lg->dseqs = NULL;
    2096             : 
    2097         357 :         if (!LOG_DISABLED(lg)) {
    2098             :                 /* try to open logfile backup, or failing that, the file
    2099             :                  * itself. we need to know whether this file exists when
    2100             :                  * checking the database consistency later on */
    2101         356 :                 if ((fp = MT_fopen(bak, "r")) != NULL) {
    2102           0 :                         fclose(fp);
    2103           0 :                         fp = NULL;
    2104           0 :                         if (GDKunlink(0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED ||
    2105           0 :                             GDKmove(0, lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL, true) != GDK_SUCCEED)
    2106           0 :                                 goto error;
    2107         356 :                 } else if (errno != ENOENT) {
    2108           0 :                         GDKsyserror("open %s failed", bak);
    2109           0 :                         goto error;
    2110             :                 }
    2111         356 :                 fp = MT_fopen(filename, "r");
    2112         356 :                 if (fp == NULL && errno != ENOENT) {
    2113           0 :                         GDKsyserror("open %s failed", filename);
    2114           0 :                         goto error;
    2115             :                 }
    2116             :         }
    2117             : 
    2118         357 :         strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
    2119         357 :         catalog_bid = BBPindex(bak);
    2120             : 
    2121             :         /* initialize arrays for type mapping, to be read from disk */
    2122         357 :         memset(lg->type_id, -1, sizeof(lg->type_id));
    2123         357 :         memset(lg->type_nr, 255, sizeof(lg->type_nr));
    2124             : 
    2125             :         /* this is intentional - if catalog_bid is 0, force it to find
    2126             :          * the persistent catalog */
    2127         357 :         if (catalog_bid == 0) {
    2128             :                 /* catalog does not exist, so the log file also
    2129             :                  * shouldn't exist */
    2130         230 :                 if (fp != NULL) {
    2131           0 :                         GDKerror("there is no logger catalog, "
    2132             :                                  "but there is a log file.\n");
    2133           0 :                         goto error;
    2134             :                 }
    2135             : 
    2136         230 :                 lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    2137         230 :                 lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    2138         230 :                 lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
    2139             : 
    2140         230 :                 if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
    2141           0 :                         GDKerror("cannot create catalog bats");
    2142           0 :                         goto error;
    2143             :                 }
    2144         230 :                 TRC_INFO(WAL, "create %s catalog\n", fn);
    2145             : 
    2146             :                 /* give the catalog bats names so we can find them
    2147             :                  * next time */
    2148         230 :                 strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
    2149         230 :                 if (BBPrename(lg->catalog_bid, bak) < 0) {
    2150           0 :                         goto error;
    2151             :                 }
    2152             : 
    2153         230 :                 strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
    2154         230 :                 if (BBPrename(lg->catalog_id, bak) < 0) {
    2155           0 :                         goto error;
    2156             :                 }
    2157             : 
    2158         230 :                 strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
    2159         230 :                 if (BBPrename(lg->dcatalog, bak) < 0) {
    2160           0 :                         goto error;
    2161             :                 }
    2162             : 
    2163         230 :                 if (!LOG_DISABLED(lg)) {
    2164         229 :                         if (GDKcreatedir(filename) != GDK_SUCCEED) {
    2165           0 :                                 GDKerror("cannot create directory for log file %s\n", filename);
    2166           0 :                                 goto error;
    2167             :                         }
    2168         229 :                         if (log_create_types_file(lg, filename) != GDK_SUCCEED)
    2169           0 :                                 goto error;
    2170             :                 }
    2171             : 
    2172         230 :                 BBPretain(lg->catalog_bid->batCacheid);
    2173         230 :                 BBPretain(lg->catalog_id->batCacheid);
    2174         230 :                 BBPretain(lg->dcatalog->batCacheid);
    2175             : 
    2176         230 :                 log_lock(lg);
    2177             :                 /* bm_subcommit releases the lock */
    2178         230 :                 if (bm_subcommit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
    2179             :                         /* cannot commit catalog, so remove log */
    2180           0 :                         if (MT_remove(filename) < 0)
    2181           0 :                                 GDKsyserror("remove %s failed\n", filename);
    2182           0 :                         BBPrelease(lg->catalog_bid->batCacheid);
    2183           0 :                         BBPrelease(lg->catalog_id->batCacheid);
    2184           0 :                         BBPrelease(lg->dcatalog->batCacheid);
    2185           0 :                         goto error;
    2186             :                 }
    2187             :         } else {
    2188             :                 /* find the persistent catalog. As non persistent bats
    2189             :                  * require a logical reference we also add a logical
    2190             :                  * reference for the persistent bats */
    2191         127 :                 BUN p, q;
    2192         127 :                 BAT *b, *o, *d;
    2193             : 
    2194         127 :                 assert(!lg->inmemory);
    2195             : 
    2196             :                 /* the catalog exists, and so should the log file */
    2197         127 :                 if (fp == NULL && !LOG_DISABLED(lg)) {
    2198           0 :                         GDKerror("There is a logger catalog, but no log file.\n");
    2199           0 :                         goto error;
    2200             :                 }
    2201             :                 if (fp != NULL) {
    2202             :                         /* check_version always closes fp */
    2203         127 :                         if (check_version(lg, fp, &needsnew) != GDK_SUCCEED) {
    2204           0 :                                 fp = NULL;
    2205           0 :                                 goto error;
    2206             :                         }
    2207             :                         readlogs = true;
    2208             :                         fp = NULL;
    2209             :                 }
    2210             : 
    2211         127 :                 if (lg->catalog_bid == NULL && lg->catalog_id == NULL && lg->dcatalog == NULL) {
    2212         127 :                         b = BATdescriptor(catalog_bid);
    2213         127 :                         if (b == NULL) {
    2214           0 :                                 GDKerror("inconsistent database, catalog does not exist");
    2215           0 :                                 goto error;
    2216             :                         }
    2217             : 
    2218         127 :                         strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
    2219         127 :                         catalog_id = BBPindex(bak);
    2220         127 :                         o = BATdescriptor(catalog_id);
    2221         127 :                         if (o == NULL) {
    2222           0 :                                 BBPunfix(b->batCacheid);
    2223           0 :                                 GDKerror("inconsistent database, catalog_id does not exist");
    2224           0 :                                 goto error;
    2225             :                         }
    2226             : 
    2227         127 :                         strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
    2228         127 :                         dcatalog = BBPindex(bak);
    2229         127 :                         d = BATdescriptor(dcatalog);
    2230         127 :                         if (d == NULL) {
    2231           0 :                                 GDKerror("cannot create dcatalog bat");
    2232           0 :                                 BBPunfix(b->batCacheid);
    2233           0 :                                 BBPunfix(o->batCacheid);
    2234           0 :                                 goto error;
    2235             :                         }
    2236             : 
    2237         127 :                         lg->catalog_bid = b;
    2238         127 :                         lg->catalog_id = o;
    2239         127 :                         lg->dcatalog = d;
    2240         127 :                         const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
    2241       32943 :                         BATloop(lg->catalog_bid, p, q) {
    2242       32816 :                                 bat bid = bids[p];
    2243       32816 :                                 oid pos = p;
    2244             : 
    2245       32816 :                                 if (BBPretain(bid) == 0 &&      /* any bid in the catalog_bid, needs one logical ref */
    2246           0 :                                     BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
    2247           0 :                                     BUNappend(lg->dcatalog, &pos, true) != GDK_SUCCEED)
    2248           0 :                                         goto error;
    2249             :                         }
    2250             :                 }
    2251         127 :                 if ((lg->catalog_bid = BATsetaccess(lg->catalog_bid, BAT_READ)) == NULL ||
    2252         127 :                     (lg->catalog_id = BATsetaccess(lg->catalog_id, BAT_READ)) == NULL ||
    2253         127 :                     (lg->dcatalog = BATsetaccess(lg->dcatalog, BAT_READ)) == NULL) {
    2254           0 :                         goto error;
    2255             :                 }
    2256         127 :                 BBPretain(lg->catalog_bid->batCacheid);
    2257         127 :                 BBPretain(lg->catalog_id->batCacheid);
    2258         127 :                 BBPretain(lg->dcatalog->batCacheid);
    2259             :         }
    2260             :         /* failing to rename the catalog_cnt and catalog_lid bats is not
    2261             :          * fatal */
    2262         357 :         lg->catalog_cnt = logbat_new(TYPE_lng, 1, SYSTRANS);
    2263         357 :         if (lg->catalog_cnt == NULL) {
    2264           0 :                 GDKerror("failed to create catalog_cnt bat");
    2265           0 :                 goto error;
    2266             :         }
    2267         357 :         strconcat_len(bak, sizeof(bak), fn, "_catalog_cnt", NULL);
    2268         357 :         if (BBPrename(lg->catalog_cnt, bak) < 0)
    2269           0 :                 GDKclrerr();
    2270         357 :         lg->catalog_lid = logbat_new(TYPE_lng, 1, SYSTRANS);
    2271         357 :         if (lg->catalog_lid == NULL) {
    2272           0 :                 GDKerror("failed to create catalog_lid bat");
    2273           0 :                 goto error;
    2274             :         }
    2275         357 :         strconcat_len(bak, sizeof(bak), fn, "_catalog_lid", NULL);
    2276         357 :         if (BBPrename(lg->catalog_lid, bak) < 0)
    2277           0 :                 GDKclrerr();
    2278         357 :         if (bm_get_counts(lg) != GDK_SUCCEED)
    2279           0 :                 goto error;
    2280             : 
    2281         357 :         strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
    2282         357 :         if (BBPindex(bak)) {
    2283         127 :                 lg->seqs_id = BATdescriptor(BBPindex(bak));
    2284         127 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
    2285         127 :                 lg->seqs_val = BATdescriptor(BBPindex(bak));
    2286         127 :                 strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
    2287         127 :                 lg->dseqs = BATdescriptor(BBPindex(bak));
    2288         127 :                 if (lg->seqs_id == NULL ||
    2289         127 :                     lg->seqs_val == NULL ||
    2290             :                     lg->dseqs == NULL) {
    2291           0 :                         GDKerror("Logger_new: cannot load seqs bats");
    2292           0 :                         goto error;
    2293             :                 }
    2294         127 :                 if ((lg->seqs_val = BATsetaccess(lg->seqs_val, BAT_READ)) == NULL ||
    2295         127 :                     (lg->seqs_id = BATsetaccess(lg->seqs_id, BAT_READ)) == NULL ||
    2296         127 :                     (lg->dseqs = BATsetaccess(lg->dseqs, BAT_READ)) == NULL) {
    2297           0 :                         goto error;
    2298             :                 }
    2299             :         } else {
    2300         230 :                 lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
    2301         230 :                 lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
    2302         230 :                 lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
    2303         230 :                 if (lg->seqs_id == NULL ||
    2304         230 :                     lg->seqs_val == NULL ||
    2305             :                     lg->dseqs == NULL) {
    2306           0 :                         GDKerror("Logger_new: cannot create seqs bats");
    2307           0 :                         goto error;
    2308             :                 }
    2309             : 
    2310         230 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
    2311         230 :                 if (BBPrename(lg->seqs_id, bak) < 0) {
    2312           0 :                         goto error;
    2313             :                 }
    2314             : 
    2315         230 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
    2316         230 :                 if (BBPrename(lg->seqs_val, bak) < 0) {
    2317           0 :                         goto error;
    2318             :                 }
    2319             : 
    2320         230 :                 strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
    2321         230 :                 if (BBPrename(lg->dseqs, bak) < 0) {
    2322           0 :                         goto error;
    2323             :                 }
    2324             :                 needcommit = true;
    2325             :         }
    2326         357 :         dbg = ATOMIC_GET(&GDKdebug);
    2327         357 :         ATOMIC_AND(&GDKdebug, ~CHECKMASK);
    2328         357 :         if (needcommit && bm_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
    2329           0 :                 GDKerror("Logger_new: commit failed");
    2330           0 :                 goto error;
    2331             :         }
    2332         357 :         ATOMIC_SET(&GDKdebug, dbg);
    2333             : 
    2334         357 :         if (readlogs) {
    2335         127 :                 ulng log_id = lg->saved_id + 1;
    2336         127 :                 bool earlyexit = GDKgetenv_isyes("process-wal-and-exit");
    2337         127 :                 if (log_readlogs(lg, filename) != GDK_SUCCEED) {
    2338           0 :                         goto error;
    2339             :                 }
    2340         127 :                 if (!earlyexit) {
    2341         127 :                         if (lg->postfuncp && (*lg->postfuncp) (lg->funcdata, lg) != GDK_SUCCEED)
    2342           0 :                                 goto error;
    2343         127 :                         if (needsnew) {
    2344          16 :                                 if (GDKmove(0, lg->dir, LOGFILE, NULL, lg->dir, LOGFILE, "bak", true) != GDK_SUCCEED) {
    2345           0 :                                         TRC_CRITICAL(GDK, "couldn't move log to log.bak\n");
    2346           0 :                                         return GDK_FAIL;
    2347             :                                 }
    2348          16 :                                 if (log_create_types_file(lg, filename) != GDK_SUCCEED) {
    2349           0 :                                         TRC_CRITICAL(GDK, "couldn't write new log\n");
    2350           0 :                                         return GDK_FAIL;
    2351             :                                 }
    2352             :                         }
    2353             :                 }
    2354         127 :                 dbg = ATOMIC_GET(&GDKdebug);
    2355         127 :                 ATOMIC_AND(&GDKdebug, ~CHECKMASK);
    2356         127 :                 if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
    2357           0 :                         goto error;
    2358             :                 }
    2359         127 :                 ATOMIC_SET(&GDKdebug, dbg);
    2360         334 :                 for (; log_id <= lg->saved_id; log_id++)
    2361         207 :                         (void) log_cleanup(lg, log_id); /* ignore error of removing file */
    2362         127 :                 if (earlyexit) {
    2363           0 :                         printf("# mserver5 exiting\n");
    2364           0 :                         exit(0);
    2365             :                 }
    2366         143 :                 if (needsnew &&
    2367          16 :                     GDKunlink(0, lg->dir, LOGFILE, "bak") != GDK_SUCCEED) {
    2368           0 :                         TRC_CRITICAL(GDK, "couldn't remove old log.bak file\n");
    2369           0 :                         return GDK_FAIL;
    2370             :                 }
    2371             :         } else {
    2372         230 :                 lg->id = lg->saved_id + 1;
    2373         230 :                 if (GDKgetenv_isyes("process-wal-and-exit")) {
    2374           0 :                         printf("# mserver5 exiting\n");
    2375           0 :                         exit(0);
    2376             :                 }
    2377             :         }
    2378             : #ifdef GDKLIBRARY_JSON
    2379         357 :         if (log_json_upgrade_finalize() == GDK_FAIL)
    2380           0 :                 goto error;
    2381             : #endif
    2382         357 :         if (GDKgetenv_isyes("clean-BBP"))
    2383           8 :                 clean_bbp(lg);
    2384             :         return GDK_SUCCEED;
    2385           0 :   error:
    2386           0 :         if (fp)
    2387           0 :                 fclose(fp);
    2388           0 :         logbat_destroy(lg->catalog_bid);
    2389           0 :         logbat_destroy(lg->catalog_id);
    2390           0 :         logbat_destroy(lg->dcatalog);
    2391           0 :         logbat_destroy(lg->seqs_id);
    2392           0 :         logbat_destroy(lg->seqs_val);
    2393           0 :         logbat_destroy(lg->dseqs);
    2394           0 :         MT_lock_destroy(&lg->lock);
    2395           0 :         MT_lock_destroy(&lg->rotation_lock);
    2396           0 :         GDKfree(lg->fn);
    2397           0 :         GDKfree(lg->dir);
    2398           0 :         GDKfree(lg->rbuf);
    2399           0 :         GDKfree(lg->wbuf);
    2400           0 :         GDKfree(lg);
    2401           0 :         ATOMIC_SET(&GDKdebug, dbg);
    2402             :         /* We do not call log_json_upgrade_finalize here because we want
    2403             :          * the upgrade to run again next time we try, so we do not want
    2404             :          * to remove the signal file just yet.
    2405             :          */
    2406           0 :         return GDK_FAIL;
    2407             : }
    2408             : 
    2409             : /* Initialize a new logger
    2410             :  * It will load any data in the logdir and persist it in the BATs*/
    2411             : static logger *
    2412         357 : log_new(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp,
    2413             :         postversionfix_fptr postfuncp, void *funcdata)
    2414             : {
    2415         357 :         logger *lg;
    2416         357 :         char filename[FILENAME_MAX];
    2417             : 
    2418         357 :         lng max_dropped = GDKgetenv_int("wal_max_dropped", 100000);
    2419         357 :         lng max_file_age = GDKgetenv_int("wal_max_file_age", 600);
    2420         357 :         lng max_file_size = 0;
    2421             : 
    2422         357 :         if (GDKdebug & TESTINGMASK) {
    2423             :                 max_file_size = 2048; /* 2 KiB */
    2424             :         } else {
    2425          12 :                 const char *max_file_size_str = GDKgetenv("wal_max_file_size");
    2426          12 :                 max_file_size = max_file_size_str ? strtoul(max_file_size_str, NULL, 10) : 2147483648;
    2427             :         }
    2428             : 
    2429         357 :         if (!GDKinmemory(0) && MT_path_absolute(logdir)) {
    2430           0 :                 TRC_CRITICAL(GDK, "logdir must be relative path\n");
    2431           0 :                 return NULL;
    2432             :         }
    2433             : 
    2434         357 :         if (snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP) >= FILENAME_MAX) {
    2435           0 :                 TRC_CRITICAL(GDK, "filename is too large\n");
    2436           0 :                 return NULL;
    2437             :         }
    2438             : 
    2439         357 :         lg = GDKmalloc(sizeof(struct logger));
    2440         357 :         if (lg == NULL) {
    2441           0 :                 TRC_CRITICAL(GDK, "allocating logger structure failed\n");
    2442           0 :                 return NULL;
    2443             :         }
    2444             : 
    2445         714 :         *lg = (logger) {
    2446         357 :                 .inmemory = GDKinmemory(0),
    2447             :                 .debug = debug,
    2448             :                 .version = version,
    2449             :                 .prefuncp = prefuncp,
    2450             :                 .postfuncp = postfuncp,
    2451             :                 .funcdata = funcdata,
    2452             : 
    2453         357 :                 .max_dropped = max_dropped >= 0 ? max_dropped : 100000,
    2454             :                 .file_age = 0,
    2455         357 :                 .max_file_age = max_file_age >= 0 ? max_file_age * 1000000 : 600000000,
    2456         357 :                 .max_file_size = max_file_size >= 0 ? max_file_size : 2147483648,
    2457             : 
    2458             :                 .id = 0,
    2459         357 :                 .saved_id = getBBPlogno(),      /* get saved log number from bbp */
    2460             :                 .nr_flushers = ATOMIC_VAR_INIT(0),
    2461         357 :                 .fn = GDKstrdup(fn),
    2462         357 :                 .dir = GDKstrdup(filename),
    2463             :                 .rbufsize = 64 * 1024,
    2464         357 :                 .rbuf = GDKmalloc(64 * 1024),
    2465             :                 .wbufsize = 64 * 1024,
    2466         357 :                 .wbuf = GDKmalloc(64 * 1024),
    2467             :         };
    2468             : 
    2469             :         /* probably open file and check version first, then call call old logger code */
    2470         357 :         if (lg->fn == NULL ||
    2471         357 :             lg->dir == NULL ||
    2472         357 :             lg->rbuf == NULL ||
    2473             :             lg->wbuf == NULL) {
    2474           0 :                 TRC_CRITICAL(GDK, "allocating for logger structure failed\n");
    2475           0 :                 GDKfree(lg->fn);
    2476           0 :                 GDKfree(lg->dir);
    2477           0 :                 GDKfree(lg->rbuf);
    2478           0 :                 GDKfree(lg->wbuf);
    2479           0 :                 GDKfree(lg);
    2480           0 :                 return NULL;
    2481             :         }
    2482         357 :         TRC_DEBUG(WAL, "dir set to %s\n", lg->dir);
    2483             : 
    2484         357 :         MT_lock_init(&lg->lock, fn);
    2485         357 :         MT_lock_init(&lg->rotation_lock, "rotation_lock");
    2486         357 :         MT_lock_init(&lg->flush_lock, "flush_lock");
    2487         357 :         MT_cond_init(&lg->excl_flush_cv, "flush_cond");
    2488             : 
    2489         357 :         if (log_load(fn, lg, filename) == GDK_SUCCEED) {
    2490             :                 return lg;
    2491             :         }
    2492             :         return NULL;
    2493             : }
    2494             : 
    2495             : static logged_range *
    2496       68869 : do_flush_range_cleanup(logger *lg)
    2497             : {
    2498       68869 :         logged_range *frange = lg->flush_ranges;
    2499       68869 :         logged_range *first = frange;
    2500             : 
    2501       68869 :         if (frange == NULL)
    2502             :                 return NULL;
    2503       81646 :         while (frange->next) {
    2504       13149 :                 if (ATOMIC_GET(&frange->refcount) > 1)
    2505             :                         break;
    2506       12777 :                 frange = frange->next;
    2507             :         }
    2508       68869 :         if (first == frange) {
    2509             :                 return first;
    2510             :         }
    2511             : 
    2512       12777 :         logged_range *flast = frange;
    2513             : 
    2514       12777 :         lg->flush_ranges = flast;
    2515             : 
    2516       25554 :         for (frange = first; frange && frange != flast; frange = frange->next) {
    2517       12777 :                 ATOMIC_DEC(&frange->refcount);
    2518       12777 :                 if (!LOG_DISABLED(lg) && frange->output_log) {
    2519           0 :                         TRC_INFO(WAL, "closing output log %s", mnstr_name(frange->output_log));
    2520           0 :                         close_stream(frange->output_log);
    2521           0 :                         frange->output_log = NULL;
    2522             :                 }
    2523             :         }
    2524             :         return flast;
    2525             : }
    2526             : 
    2527             : void
    2528         356 : log_destroy(logger *lg)
    2529             : {
    2530         356 :         log_close_input(lg);
    2531         356 :         logged_range *last = do_flush_range_cleanup(lg);
    2532         356 :         (void) last;
    2533         356 :         assert(last == lg->current && last == lg->flush_ranges);
    2534         356 :         log_close_output(lg);
    2535         806 :         for (logged_range * p = lg->pending; p; p = lg->pending) {
    2536         450 :                 lg->pending = p->next;
    2537         450 :                 GDKfree(p);
    2538             :         }
    2539         356 :         if (LOG_DISABLED(lg)) {
    2540           1 :                 lg->saved_id = lg->id;
    2541           1 :                 lg->saved_tid = lg->tid;
    2542           1 :                 log_commit(lg, NULL, NULL, 0);
    2543             :         }
    2544         356 :         if (lg->catalog_bid) {
    2545         356 :                 log_lock(lg);
    2546         356 :                 BUN p, q;
    2547         356 :                 BAT *b = lg->catalog_bid;
    2548             : 
    2549             :                 /* free resources */
    2550         356 :                 const log_bid *bids = (const log_bid *) Tloc(b, 0);
    2551       93326 :                 BATloop(b, p, q) {
    2552       92970 :                         bat bid = bids[p];
    2553             : 
    2554       92970 :                         BBPrelease(bid);
    2555             :                 }
    2556             : 
    2557         356 :                 BBPrelease(lg->catalog_bid->batCacheid);
    2558         356 :                 BBPrelease(lg->catalog_id->batCacheid);
    2559         356 :                 BBPrelease(lg->dcatalog->batCacheid);
    2560         356 :                 logbat_destroy(lg->catalog_bid);
    2561         356 :                 logbat_destroy(lg->catalog_id);
    2562         356 :                 logbat_destroy(lg->dcatalog);
    2563             : 
    2564         356 :                 logbat_destroy(lg->catalog_cnt);
    2565         356 :                 logbat_destroy(lg->catalog_lid);
    2566         356 :                 log_unlock(lg);
    2567             :         }
    2568         356 :         MT_lock_destroy(&lg->lock);
    2569         356 :         MT_lock_destroy(&lg->rotation_lock);
    2570         356 :         MT_lock_destroy(&lg->flush_lock);
    2571         356 :         GDKfree(lg->fn);
    2572         356 :         GDKfree(lg->dir);
    2573         356 :         GDKfree(lg->rbuf);
    2574         356 :         GDKfree(lg->wbuf);
    2575         356 :         GDKfree(lg);
    2576         356 : }
    2577             : 
    2578             : /* Create a new logger */
    2579             : logger *
    2580         357 : log_create(int debug, const char *fn, const char *logdir, int version,
    2581             :            preversionfix_fptr prefuncp, postversionfix_fptr postfuncp,
    2582             :            void *funcdata)
    2583             : {
    2584         357 :         logger *lg;
    2585         357 :         TRC_INFO_IF(WAL) {
    2586           0 :                 TRC_INFO_ENDIF(WAL, "Started processing logs %s/%s version %d\n", fn, logdir, version);
    2587           0 :                 GDKtracer_flush_buffer();
    2588             :         }
    2589         357 :         lg = log_new(debug, fn, logdir, version, prefuncp, postfuncp, funcdata);
    2590         357 :         if (lg == NULL)
    2591             :                 return NULL;
    2592         357 :         TRC_INFO_IF(WAL) {
    2593           0 :                 TRC_INFO_ENDIF(WAL, "Finished processing logs %s/%s\n", fn, logdir);
    2594           0 :                 GDKtracer_flush_buffer();
    2595             :         }
    2596         357 :         if (GDKsetenv("recovery", "finished") != GDK_SUCCEED) {
    2597           0 :                 log_destroy(lg);
    2598           0 :                 return NULL;
    2599             :         }
    2600         357 :         assert(lg->current == NULL);
    2601         357 :         logged_range dummy = {
    2602         357 :                 .cnt = BATcount(lg->catalog_bid),
    2603             :         };
    2604         357 :         lg->current = &dummy;
    2605         357 :         if (log_open_output(lg) != GDK_SUCCEED) {
    2606           0 :                 lg->current = NULL;
    2607           0 :                 log_destroy(lg);
    2608           0 :                 return NULL;
    2609             :         }
    2610         357 :         lg->current = lg->current->next;
    2611         357 :         assert(lg->pending == NULL && lg->flush_ranges == NULL);
    2612         357 :         lg->pending = lg->current;
    2613         357 :         lg->flush_ranges = lg->current;
    2614         357 :         return lg;
    2615             : }
    2616             : 
    2617             : static logged_range *
    2618        8312 : log_next_logfile(logger *lg, ulng ts)
    2619             : {
    2620        8312 :         int m = (ATOMIC_GET(&GDKdebug) & TESTINGMASK) ? 1000 : 100;
    2621        8312 :         if (!lg->pending || !lg->pending->next)
    2622             :                 return NULL;
    2623        8312 :         rotation_lock(lg);
    2624        8312 :         if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && lg->pending != lg->flush_ranges &&
    2625        8312 :             (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) ATOMIC_GET(&lg->pending->flushed_ts) &&
    2626        8312 :             (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) {
    2627        7044 :                 logged_range *p = lg->pending;
    2628        7044 :                 for (int i = 1;
    2629       12683 :                      i < m && ATOMIC_GET(&p->refcount) == 0 && p->next && p->next != lg->current &&
    2630        5661 :                      p->next != lg->flush_ranges && (ulng) ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts)
    2631       18344 :                      && (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++)
    2632        5639 :                         p = p->next;
    2633        7044 :                 rotation_unlock(lg);
    2634        7044 :                 return p;
    2635             :         }
    2636        1268 :         rotation_unlock(lg);
    2637        1268 :         return NULL;
    2638             : }
    2639             : 
    2640             : static void
    2641        7044 : log_cleanup_range(logger *lg, ulng id)
    2642             : {
    2643        7044 :         rotation_lock(lg);
    2644       19727 :         while (lg->pending && lg->pending->id <= id) {
    2645       12683 :                 logged_range *p;
    2646       12683 :                 p = lg->pending;
    2647       12683 :                 if (p)
    2648       12683 :                         lg->pending = p->next;
    2649       12683 :                 GDKfree(p);
    2650             :         }
    2651        7044 :         rotation_unlock(lg);
    2652        7044 : }
    2653             : 
    2654             : static void
    2655       68516 : do_rotate(logger *lg)
    2656             : {
    2657       68516 :         logged_range *cur = lg->current;
    2658       68516 :         logged_range *next = cur->next;
    2659       68516 :         if (next) {
    2660       12777 :                 assert(ATOMIC_GET(&next->refcount) == 1);
    2661       12777 :                 lg->current = next;
    2662       12777 :                 if (!LOG_DISABLED(lg) && ATOMIC_GET(&cur->refcount) == 1 && cur->output_log) {
    2663       12624 :                         close_stream(cur->output_log);
    2664       12624 :                         cur->output_log = NULL;
    2665             :                 }
    2666             :         }
    2667       68516 : }
    2668             : 
    2669             : gdk_return
    2670       14011 : log_activate(logger *lg)
    2671             : {
    2672       14011 :         bool flush_cleanup = false;
    2673       14011 :         gdk_return res = GDK_SUCCEED;
    2674             : 
    2675       14011 :         rotation_lock(lg);
    2676       14011 :         const lng current_file_size = LOG_DISABLED(lg) ? 0 : (lng) getfilepos(getFile(lg->current->output_log));
    2677             : 
    2678       14011 :         if (current_file_size == -1) {
    2679           0 :                 rotation_unlock(lg);
    2680           0 :                 return GDK_FAIL;
    2681             :         }
    2682             :         /* file size of 2 means only endian indicator present
    2683             :          * (i.e. effectively empty) */
    2684       14011 :         if (current_file_size <= 2) {
    2685       10669 :                 rotation_unlock(lg);
    2686       10669 :                 return GDK_SUCCEED;
    2687             :         }
    2688             : 
    2689        3342 :         if (!lg->flushnow &&
    2690        3342 :             !lg->current->next &&
    2691        3342 :             current_file_size > 2 &&
    2692        3342 :             (ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped ||
    2693        3332 :                     current_file_size > lg->max_file_size ||
    2694        3299 :                     (GDKusec() - lg->file_age) > lg->max_file_age) &&
    2695          43 :             (ulng) ATOMIC_GET(&lg->current->last_ts) > 0 &&
    2696          43 :             lg->saved_id + 1 == lg->id &&
    2697          42 :             ATOMIC_GET(&lg->current->refcount) == 1 /* no pending work on this file */ ) {
    2698          41 :                 lg->id++;
    2699             :                 /* start new file */
    2700          41 :                 res = log_open_output(lg);
    2701          41 :                 flush_cleanup = true;
    2702          41 :                 do_rotate(lg);
    2703             :         }
    2704          41 :         if (flush_cleanup)
    2705          41 :                 (void) do_flush_range_cleanup(lg);
    2706        3342 :         rotation_unlock(lg);
    2707        3342 :         return res;
    2708             : }
    2709             : 
    2710             : gdk_return
    2711        8312 : log_flush(logger *lg, ulng ts)
    2712             : {
    2713        8312 :         logged_range *pending = log_next_logfile(lg, ts);
    2714        8312 :         ulng lid = pending ? pending->id : 0, olid = lg->saved_id;
    2715        8312 :         if (LOG_DISABLED(lg)) {
    2716           0 :                 lg->saved_id = lid;
    2717           0 :                 lg->saved_tid = lg->tid;
    2718           0 :                 if (lid)
    2719           0 :                         log_cleanup_range(lg, lg->saved_id);
    2720           0 :                 if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED)
    2721           0 :                         TRC_ERROR(GDK, "failed to commit");
    2722           0 :                 return GDK_SUCCEED;
    2723             :         }
    2724        8312 :         if (lg->saved_id >= lid)
    2725             :                 return GDK_SUCCEED;
    2726        7044 :         rotation_lock(lg);
    2727        7044 :         ulng lgid = lg->id;
    2728        7044 :         rotation_unlock(lg);
    2729        7044 :         if (lg->saved_id + 1 >= lgid)     /* logger should first release the file */
    2730             :                 return GDK_SUCCEED;
    2731        7044 :         log_return res = LOG_OK;
    2732        7044 :         ulng cid = olid;
    2733        7044 :         assert(lid <= lgid);
    2734             :         uint32_t *updated = NULL;
    2735             :         BUN nupdated = 0;
    2736             :         size_t allocated = 0;
    2737       19727 :         while (cid < lid && res == LOG_OK) {
    2738       12683 :                 if (!lg->input_log) {
    2739       12683 :                         char filename[MAXPATH];
    2740       12683 :                         char id[32];
    2741       12683 :                         if (snprintf(id, sizeof(id), LLFMT, cid + 1) >= (int) sizeof(id)) {
    2742           0 :                                 GDKfree(updated);
    2743           0 :                                 TRC_CRITICAL(GDK, "log_id filename is too large\n");
    2744           0 :                                 return GDK_FAIL;
    2745             :                         }
    2746       12683 :                         if (GDKfilepath(filename, sizeof(filename), BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id) != GDK_SUCCEED) {
    2747           0 :                                 GDKfree(updated);
    2748           0 :                                 return GDK_FAIL;
    2749             :                         }
    2750       12683 :                         if (strlen(filename) >= FILENAME_MAX) {
    2751             :                                 GDKfree(updated);
    2752             :                                 TRC_CRITICAL(GDK, "Logger filename path is too large\n");
    2753             :                                 return GDK_FAIL;
    2754             :                         }
    2755             : 
    2756       12683 :                         bool filemissing = false;
    2757       12683 :                         if (log_open_input(lg, filename, &filemissing) != GDK_SUCCEED) {
    2758           0 :                                 GDKfree(updated);
    2759           0 :                                 return GDK_FAIL;
    2760             :                         }
    2761             :                 }
    2762             :                 /* we read the full file because skipping is impossible with current log format */
    2763       12683 :                 log_lock(lg);
    2764       12683 :                 if (updated == NULL) {
    2765        7044 :                         nupdated = BATcount(lg->catalog_id);
    2766        7044 :                         allocated = ((nupdated + 31) & ~31) / 8;
    2767        7044 :                         if (allocated == 0)
    2768           0 :                                 allocated = 4;
    2769        7044 :                         updated = GDKzalloc(allocated);
    2770        7044 :                         if (updated == NULL) {
    2771           0 :                                 log_unlock(lg);
    2772           0 :                                 return GDK_FAIL;
    2773             :                         }
    2774        5639 :                 } else if (nupdated < BATcount(lg->catalog_id)) {
    2775          54 :                         BUN n = BATcount(lg->catalog_id);
    2776          54 :                         size_t a = ((n + 31) & ~31) / 8;
    2777          54 :                         if (a > allocated) {
    2778          11 :                                 uint32_t *p = GDKrealloc(updated, a);
    2779          11 :                                 if (p == NULL) {
    2780           0 :                                         GDKfree(updated);
    2781           0 :                                         log_unlock(lg);
    2782           0 :                                         return GDK_FAIL;
    2783             :                                 }
    2784          11 :                                 updated = p;
    2785          11 :                                 memset(updated + allocated / 4, 0, a - allocated);
    2786          11 :                                 allocated = a;
    2787             :                         }
    2788             :                         nupdated = n;
    2789             :                 }
    2790       12683 :                 lg->flushing = true;
    2791       12683 :                 res = log_read_transaction(lg, updated, nupdated);
    2792       12683 :                 lg->flushing = false;
    2793       12683 :                 log_unlock(lg);
    2794       12683 :                 if (res == LOG_EOF) {
    2795       12683 :                         log_close_input(lg);
    2796       12683 :                         res = LOG_OK;
    2797             :                 }
    2798       12683 :                 cid++;
    2799             :         }
    2800        7044 :         if (lid > olid && res == LOG_OK) {
    2801        7044 :                 rotation_lock(lg);      /* protect against concurrent log_tflush rotate check */
    2802        7044 :                 lg->saved_id = lid;
    2803        7044 :                 rotation_unlock(lg);
    2804        7044 :                 if (log_commit(lg, pending, updated, nupdated) != GDK_SUCCEED) {
    2805           0 :                         TRC_ERROR(GDK, "failed to commit");
    2806           0 :                         res = LOG_ERR;
    2807           0 :                         rotation_lock(lg);
    2808           0 :                         lg->saved_id = olid; /* reset !! */
    2809           0 :                         rotation_unlock(lg);
    2810             :                 }
    2811           0 :                 if (res != LOG_ERR) {
    2812       19727 :                         while (olid < lid) {
    2813             :                                 /* Try to cleanup, remove old log file, continue on failure! */
    2814       12683 :                                 olid++;
    2815       12683 :                                 (void) log_cleanup(lg, olid);
    2816             :                         }
    2817             :                 }
    2818        7044 :                 if (res == LOG_OK)
    2819        7044 :                         log_cleanup_range(lg, lg->saved_id);
    2820             :         }
    2821        7044 :         GDKfree(updated);
    2822        7044 :         return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
    2823             : }
    2824             : 
    2825             : /* Clean-up write-ahead log files already persisted in the BATs, leaving only the most recent one.
    2826             :  * Only the bak- files are deleted for the preserved WAL files.
    2827             :  */
    2828             : lng
    2829       33430 : log_changes(logger *lg)
    2830             : {
    2831       33430 :         if (LOG_DISABLED(lg))
    2832             :                 return 0;
    2833       33430 :         rotation_lock(lg);
    2834       33430 :         lng changes = lg->id - lg->saved_id - 1;
    2835       33430 :         rotation_unlock(lg);
    2836       33430 :         return changes;
    2837             : }
    2838             : 
    2839             : int
    2840         488 : log_sequence(logger *lg, int seq, lng *id)
    2841             : {
    2842         488 :         log_lock(lg);
    2843         488 :         BUN p = log_find(lg->seqs_id, lg->dseqs, seq);
    2844             : 
    2845         488 :         if (p != BUN_NONE) {
    2846         369 :                 *id = *(lng *) Tloc(lg->seqs_val, p);
    2847             : 
    2848         369 :                 log_unlock(lg);
    2849         369 :                 return 1;
    2850             :         }
    2851         119 :         log_unlock(lg);
    2852         119 :         return 0;
    2853             : }
    2854             : 
    2855             : gdk_return
    2856      199111 : log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng cnt)
    2857             : {
    2858      199111 :         bte tpe = find_type(lg, type);
    2859      199111 :         gdk_return ok = GDK_SUCCEED;
    2860      199111 :         logformat l;
    2861      199111 :         lng nr;
    2862      199111 :         l.flag = LOG_UPDATE_CONST;
    2863      199111 :         l.id = id;
    2864      199111 :         nr = cnt;
    2865             : 
    2866      199111 :         if (LOG_DISABLED(lg) || !nr) {
    2867             :                 /* logging is switched off */
    2868       75248 :                 if (nr) {
    2869       75248 :                         log_lock(lg);
    2870       75248 :                         ok = la_bat_update_count(lg, id, offset + cnt, lg->tid);
    2871       75248 :                         log_unlock(lg);
    2872             :                 }
    2873       75248 :                 return ok;
    2874             :         }
    2875             : 
    2876      123863 :         gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[type].atomWrite;
    2877             : 
    2878      123863 :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
    2879      247726 :         if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
    2880      247726 :             log_write_format(lg, &l) != GDK_SUCCEED ||
    2881      247726 :             !mnstr_writeLng(lg->current->output_log, nr) ||
    2882      247726 :             mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
    2883      123863 :             !mnstr_writeLng(lg->current->output_log, offset)) {
    2884           0 :                 ATOMIC_DEC(&lg->current->refcount);
    2885           0 :                 ok = GDK_FAIL;
    2886           0 :                 goto bailout;
    2887             :         }
    2888             : 
    2889      123863 :         ok = wt(val, lg->current->output_log, 1);
    2890             : 
    2891      123863 :         TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
    2892             : 
    2893      123863 :   bailout:
    2894      123863 :         if (ok != GDK_SUCCEED) {
    2895           0 :                 const char *err = mnstr_peek_error(lg->current->output_log);
    2896           0 :                 TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
    2897             :         }
    2898             :         return ok;
    2899             : }
    2900             : 
    2901             : static gdk_return
    2902       19677 : string_writer(logger *lg, BAT *b, lng offset, lng nr)
    2903             : {
    2904       19677 :         size_t bufsz = lg->wbufsize, resize = 0;
    2905       19677 :         BUN end = (BUN) (offset + nr);
    2906       19677 :         char *buf = lg->wbuf;
    2907       19677 :         gdk_return res = GDK_SUCCEED;
    2908             : 
    2909       19677 :         if (!buf)
    2910             :                 return GDK_FAIL;
    2911       19677 :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
    2912       19677 :         if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
    2913             :                 return GDK_FAIL;
    2914       19677 :         BATiter bi = bat_iterator(b);
    2915       19677 :         BUN p = (BUN) offset;
    2916       39406 :         for (; p < end;) {
    2917       19729 :                 size_t sz = 0;
    2918       19729 :                 if (resize) {
    2919           2 :                         if ((buf = GDKrealloc(lg->wbuf, resize)) == NULL) {
    2920             :                                 res = GDK_FAIL;
    2921             :                                 break;
    2922             :                         }
    2923           2 :                         lg->wbuf = buf;
    2924           2 :                         lg->wbufsize = bufsz = resize;
    2925           2 :                         resize = 0;
    2926             :                 }
    2927       19729 :                 char *dst = buf;
    2928       71532 :                 for (; p < end && sz < bufsz; p++) {
    2929       51855 :                         const char *s = BUNtvar(bi, p);
    2930       51855 :                         size_t len = strlen(s) + 1;
    2931       51855 :                         if ((sz + len) > bufsz) {
    2932          52 :                                 if (len > bufsz)
    2933           2 :                                         resize = len + bufsz;
    2934             :                                 break;
    2935             :                         } else {
    2936       51803 :                                 memcpy(dst, s, len);
    2937       51803 :                                 dst += len;
    2938       51803 :                                 sz += len;
    2939             :                         }
    2940             :                 }
    2941       39456 :                 if (sz &&
    2942       39454 :                     (!mnstr_writeLng(lg->current->output_log, (lng) sz) ||
    2943       19727 :                      mnstr_write(lg->current->output_log, buf, sz, 1) != 1)) {
    2944             :                         res = GDK_FAIL;
    2945             :                         break;
    2946             :                 }
    2947             :         }
    2948       19677 :         bat_iterator_end(&bi);
    2949       19677 :         return res;
    2950             : }
    2951             : 
    2952             : static gdk_return
    2953      511868 : internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced, lng total_cnt)
    2954             : {
    2955      511868 :         bte tpe = find_type(lg, b->ttype);
    2956      511868 :         gdk_return ok = GDK_SUCCEED;
    2957      511868 :         logformat l;
    2958      511868 :         BUN p;
    2959      511868 :         lng nr;
    2960      511868 :         l.flag = LOG_UPDATE_BULK;
    2961      511868 :         l.id = id;
    2962      511868 :         nr = cnt;
    2963             : 
    2964      511868 :         if (LOG_DISABLED(lg) || !nr) {
    2965             :                 /* logging is switched off */
    2966      215269 :                 if (nr)
    2967      159561 :                         return la_bat_update_count(lg, id, offset + cnt, lg->tid);
    2968             :                 return GDK_SUCCEED;
    2969             :         }
    2970             : 
    2971      267777 :         gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[b->ttype].atomWrite;
    2972             : 
    2973      267777 :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
    2974      267777 :         if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR) {
    2975           0 :                 ok = GDK_FAIL;
    2976           0 :                 goto bailout;
    2977             :         }
    2978             : 
    2979      267777 :         if (lg->total_cnt == 0)      /* signals single bulk message or first part of bat logged in parts */
    2980      534890 :                 if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2981      662310 :                     !mnstr_writeLng(lg->current->output_log, total_cnt ? total_cnt : cnt) ||
    2982      534890 :                     mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
    2983      407470 :                     !mnstr_writeLng(lg->current->output_log, total_cnt ? -1 : offset)) {  /* offset = -1 indicates bat was logged in parts */
    2984           0 :                         ok = GDK_FAIL;
    2985           0 :                         goto bailout;
    2986             :                 }
    2987      267777 :         if (!total_cnt)
    2988      127420 :                 total_cnt = cnt;
    2989      267777 :         lg->total_cnt += cnt;
    2990             : 
    2991      267777 :         if (lg->total_cnt == total_cnt)      /* This is the last to be logged part of this bat, we can already reset the total_cnt */
    2992      267445 :                 lg->total_cnt = 0;
    2993             : 
    2994             :         /* if offset is just for the log, but BAT is already sliced, reset offset */
    2995      267777 :         if (sliced)
    2996        1512 :                 offset = 0;
    2997      267777 :         BATiter bi = bat_iterator(b);
    2998      267777 :         if (b->ttype == TYPE_msk) {
    2999           0 :                 if (offset % 32 == 0) {
    3000           0 :                         if (!mnstr_writeIntArray(lg->current->output_log, (int *) ((char *) bi.base + offset / 32),
    3001           0 :                              (size_t) ((nr + 31) / 32)))
    3002           0 :                                 ok = GDK_FAIL;
    3003             :                 } else {
    3004           0 :                         for (lng i = 0; i < nr; i += 32) {
    3005             :                                 uint32_t v = 0;
    3006           0 :                                 for (int j = 0; j < 32 && i + j < nr; j++)
    3007           0 :                                         v |= (uint32_t) Tmskval(&bi, (BUN) (offset + i + j)) << j;
    3008           0 :                                 if (!mnstr_writeInt(lg->current->output_log, (int) v)) {
    3009             :                                         ok = GDK_FAIL;
    3010             :                                         break;
    3011             :                                 }
    3012             :                         }
    3013             :                 }
    3014      515240 :         } else if (b->ttype < TYPE_str && bi.h->parentid == b->batCacheid) {
    3015      247463 :                 const void *t = BUNtail(bi, (BUN) offset);
    3016             : 
    3017      247463 :                 ok = wt(t, lg->current->output_log, (size_t) nr);
    3018       20314 :         } else if (b->ttype == TYPE_str) {
    3019             :                 /* efficient string writes */
    3020       19671 :                 ok = string_writer(lg, b, offset, nr);
    3021             :         } else {
    3022         643 :                 BUN end = (BUN) (offset + nr);
    3023        1644 :                 for (p = (BUN) offset; p < end && ok == GDK_SUCCEED; p++) {
    3024        1001 :                         const void *t = BUNtail(bi, p);
    3025             : 
    3026        1001 :                         ok = wt(t, lg->current->output_log, 1);
    3027             :                 }
    3028             :         }
    3029      267777 :         bat_iterator_end(&bi);
    3030             : 
    3031      267777 :         TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
    3032             : 
    3033      267777 :   bailout:
    3034      267777 :         if (ok != GDK_SUCCEED) {
    3035           0 :                 ATOMIC_DEC(&lg->current->refcount);
    3036           0 :                 const char *err = mnstr_peek_error(lg->current->output_log);
    3037           0 :                 TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
    3038             :         }
    3039             :         return ok;
    3040             : }
    3041             : 
    3042             : /*
    3043             :  * Changes made to the BAT descriptor should be stored in the log
    3044             :  * files.  Actually, we need to save the descriptor file, perhaps we
    3045             :  * should simply introduce a versioning scheme.
    3046             :  */
    3047             : gdk_return
    3048      237333 : log_bat_persists(logger *lg, BAT *b, log_id id)
    3049             : {
    3050      237333 :         log_lock(lg);
    3051      237333 :         bte ta = find_type(lg, b->ttype);
    3052      237333 :         logformat l;
    3053             : 
    3054      237333 :         if (log_add_bat(lg, b, id, -1) != GDK_SUCCEED) {
    3055           0 :                 log_unlock(lg);
    3056           0 :                 if (!LOG_DISABLED(lg))
    3057           0 :                         ATOMIC_DEC(&lg->current->refcount);
    3058           0 :                 return GDK_FAIL;
    3059             :         }
    3060             : 
    3061      237333 :         l.flag = LOG_CREATE;
    3062      237333 :         l.id = id;
    3063      237333 :         if (!LOG_DISABLED(lg)) {
    3064      154718 :                 assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
    3065      309436 :                 if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
    3066      309436 :                     log_write_format(lg, &l) != GDK_SUCCEED ||
    3067      154718 :                     mnstr_write(lg->current->output_log, &ta, 1, 1) != 1) {
    3068           0 :                         log_unlock(lg);
    3069           0 :                         ATOMIC_DEC(&lg->current->refcount);
    3070           0 :                         return GDK_FAIL;
    3071             :                 }
    3072             :         }
    3073      237333 :         TRC_DEBUG(WAL, "id (%d) bat (%d)\n", id, b->batCacheid);
    3074      237333 :         gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0, 0);
    3075      237333 :         log_unlock(lg);
    3076      237333 :         if (r != GDK_SUCCEED)
    3077           0 :                 ATOMIC_DEC(&lg->current->refcount);
    3078             :         return r;
    3079             : }
    3080             : 
    3081             : gdk_return
    3082       22302 : log_bat_transient(logger *lg, log_id id)
    3083             : {
    3084       22302 :         log_lock(lg);
    3085       22302 :         log_bid bid = internal_find_bat(lg, id, -1);
    3086       22302 :         logformat l;
    3087             : 
    3088       22302 :         if (bid < 0) {
    3089           0 :                 log_unlock(lg);
    3090           0 :                 return GDK_FAIL;
    3091             :         }
    3092       22302 :         if (!bid) {
    3093           0 :                 GDKerror("log_bat_transient failed to find bid for object %d\n", id);
    3094           0 :                 log_unlock(lg);
    3095           0 :                 return GDK_FAIL;
    3096             :         }
    3097       22302 :         l.flag = LOG_DESTROY;
    3098       22302 :         l.id = id;
    3099             : 
    3100       22302 :         if (!LOG_DISABLED(lg)) {
    3101        9725 :                 if (log_write_format(lg, &l) != GDK_SUCCEED) {
    3102           0 :                         TRC_CRITICAL(GDK, "write failed\n");
    3103           0 :                         log_unlock(lg);
    3104           0 :                         ATOMIC_DEC(&lg->current->refcount);
    3105           0 :                         return GDK_FAIL;
    3106             :                 }
    3107             :         }
    3108       22302 :         TRC_DEBUG(WAL, "Logged destroyed bat (%d) %d\n", id, bid);
    3109       22302 :         BAT *b = BBPquickdesc(bid);
    3110       22302 :         assert(b);
    3111       22302 :         BUN cnt = BATcount(b);
    3112       22302 :         ATOMIC_ADD(&lg->current->drops, cnt);
    3113       22302 :         gdk_return r = log_del_bat(lg, bid);
    3114       22302 :         log_unlock(lg);
    3115       22302 :         if (r != GDK_SUCCEED)
    3116           0 :                 ATOMIC_DEC(&lg->current->refcount);
    3117             :         return r;
    3118             : }
    3119             : 
    3120             : static gdk_return
    3121      118116 : log_bat_group(logger *lg, log_id id)
    3122             : {
    3123      118116 :         if (LOG_DISABLED(lg))
    3124             :                 return GDK_SUCCEED;
    3125             : 
    3126       76672 :         logformat l;
    3127       76672 :         l.flag = LOG_BAT_GROUP;
    3128       76672 :         l.id = id;
    3129       76672 :         gdk_return r = log_write_format(lg, &l);
    3130       76672 :         return r;
    3131             : }
    3132             : 
    3133             : gdk_return
    3134       59058 : log_bat_group_start(logger *lg, log_id id)
    3135             : {
    3136             :         /*positive table id represent start of logged table */
    3137       59058 :         return log_bat_group(lg, id);
    3138             : }
    3139             : 
    3140             : gdk_return
    3141       59058 : log_bat_group_end(logger *lg, log_id id)
    3142             : {
    3143             :         /*negative table id represent end of logged table */
    3144       59058 :         return log_bat_group(lg, -id);
    3145             : }
    3146             : 
    3147             : gdk_return
    3148      271800 : log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, lng total_cnt)
    3149             : {
    3150      271800 :         log_lock(lg);
    3151      271800 :         gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0, total_cnt);
    3152      271800 :         log_unlock(lg);
    3153      271800 :         return r;
    3154             : }
    3155             : 
    3156             : gdk_return
    3157        2819 : log_delta(logger *lg, BAT *uid, BAT *uval, log_id id)
    3158             : {
    3159        2819 :         log_lock(lg);
    3160        2819 :         bte tpe = find_type(lg, uval->ttype);
    3161        2819 :         gdk_return ok = GDK_SUCCEED;
    3162        2819 :         logformat l;
    3163        2819 :         BUN p;
    3164        2819 :         lng nr;
    3165             : 
    3166        2819 :         if (BATtdense(uid)) {
    3167        2735 :                 ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1, 0);
    3168        2735 :                 log_unlock(lg);
    3169        2735 :                 if (!LOG_DISABLED(lg) && ok != GDK_SUCCEED)
    3170           0 :                         ATOMIC_DEC(&lg->current->refcount);
    3171        2735 :                 return ok;
    3172             :         }
    3173             : 
    3174          84 :         assert(uid->ttype == TYPE_oid || uid->ttype == TYPE_void);
    3175             : 
    3176          84 :         l.flag = LOG_UPDATE;
    3177          84 :         l.id = id;
    3178          84 :         nr = (BATcount(uval));
    3179          84 :         assert(nr);
    3180             : 
    3181          84 :         if (LOG_DISABLED(lg)) {
    3182             :                 /* logging is switched off */
    3183          65 :                 log_unlock(lg);
    3184          65 :                 return GDK_SUCCEED;
    3185             :         }
    3186             : 
    3187          19 :         BATiter vi = bat_iterator(uval);
    3188          19 :         gdk_return(*wh) (const void *, stream *, size_t) = BATatoms[TYPE_oid].atomWrite;
    3189          19 :         gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[uval->ttype].atomWrite;
    3190             : 
    3191          19 :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
    3192          38 :         if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
    3193          38 :             log_write_format(lg, &l) != GDK_SUCCEED ||
    3194          38 :             !mnstr_writeLng(lg->current->output_log, nr) ||
    3195          19 :             mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1) {
    3196           0 :                 ok = GDK_FAIL;
    3197           0 :                 goto bailout;
    3198             :         }
    3199        1074 :         for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
    3200        1055 :                 const oid id = BUNtoid(uid, p);
    3201             : 
    3202        1055 :                 ok = wh(&id, lg->current->output_log, 1);
    3203             :         }
    3204          19 :         if (uval->ttype == TYPE_msk) {
    3205           0 :                 if (!mnstr_writeIntArray(lg->current->output_log, vi.base,
    3206           0 :                                          (BATcount(uval) + 31) / 32))
    3207           0 :                         ok = GDK_FAIL;
    3208          32 :         } else if (uval->ttype < TYPE_str && !isVIEW(uval)) {
    3209          13 :                 const void *t = BUNtail(vi, 0);
    3210             : 
    3211          13 :                 ok = wt(t, lg->current->output_log, (size_t) nr);
    3212           6 :         } else if (uval->ttype == TYPE_str) {
    3213             :                 /* efficient string writes */
    3214           6 :                 ok = string_writer(lg, uval, 0, nr);
    3215             :         } else {
    3216           0 :                 for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
    3217           0 :                         const void *val = BUNtail(vi, p);
    3218             : 
    3219           0 :                         ok = wt(val, lg->current->output_log, 1);
    3220             :                 }
    3221             :         }
    3222             : 
    3223          19 :         TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
    3224             : 
    3225          19 :   bailout:
    3226          19 :         bat_iterator_end(&vi);
    3227          19 :         if (ok != GDK_SUCCEED) {
    3228           0 :                 const char *err = mnstr_peek_error(lg->current->output_log);
    3229           0 :                 TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
    3230           0 :                 ATOMIC_DEC(&lg->current->refcount);
    3231             :         }
    3232          19 :         log_unlock(lg);
    3233          19 :         return ok;
    3234             : }
    3235             : 
    3236             : static inline bool
    3237       56595 : check_rotation_conditions(logger *lg)
    3238             : {
    3239       56595 :         if (LOG_DISABLED(lg))
    3240             :                 return false;
    3241             : 
    3242       56592 :         if (lg->current->next)
    3243             :                 return false;   /* do not rotate if there is already a prepared next current */
    3244       56592 :         if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
    3245             :                 return true;
    3246       56592 :         const lng current_file_size = (lng) getfilepos(getFile(lg->current->output_log));
    3247             : 
    3248       56592 :         if (current_file_size == -1)
    3249             :                 return false;
    3250             : 
    3251       56592 :         assert(current_file_size >= 0);
    3252             : 
    3253       56592 :         if (current_file_size == 2)
    3254             :                 return false;
    3255             : 
    3256        3173 :         bool res = (lg->saved_id + 1 >= lg->id && ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped) ||
    3257       57097 :                 current_file_size > lg->max_file_size ||
    3258       49595 :                 (GDKusec() - lg->file_age) > lg->max_file_age;
    3259             : 
    3260       53932 :         return res;
    3261             : }
    3262             : 
    3263             : gdk_return
    3264       62535 : log_tend(logger *lg)
    3265             : {
    3266       62535 :         TRC_DEBUG(WAL, "tend %d\n", lg->tid);
    3267             : 
    3268       62535 :         if (LOG_DISABLED(lg))
    3269             :                 return GDK_SUCCEED;
    3270             : 
    3271       56592 :         gdk_return result;
    3272       56592 :         logformat l;
    3273       56592 :         l.flag = LOG_END;
    3274       56592 :         l.id = lg->tid;
    3275             : 
    3276       56592 :         if ((result = log_write_format(lg, &l)) == GDK_SUCCEED)
    3277       56592 :                 ATOMIC_INC(&lg->nr_flushers);
    3278             :         return result;
    3279             : }
    3280             : 
    3281             : #define flush_lock(lg)          MT_lock_set(&(lg)->flush_lock)
    3282             : #define flush_unlock(lg)        MT_lock_unset(&(lg)->flush_lock)
    3283             : 
    3284             : static inline gdk_return
    3285       55962 : do_flush(logged_range *range)
    3286             : {
    3287             :         /* assumes flush lock */
    3288       55962 :         stream *output_log = range->output_log;
    3289       55962 :         ulng ts = ATOMIC_GET(&range->last_ts);
    3290             : 
    3291       55962 :         if (mnstr_flush(output_log, MNSTR_FLUSH_DATA) ||
    3292       55962 :             (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) && mnstr_fsync(output_log)))
    3293           0 :                 return GDK_FAIL;
    3294       55962 :         ATOMIC_SET(&range->flushed_ts, ts);
    3295       55962 :         return GDK_SUCCEED;
    3296             : }
    3297             : 
    3298             : static inline void
    3299       62532 : log_tdone(logger *lg, logged_range *range, ulng commit_ts)
    3300             : {
    3301       62532 :         (void) lg;
    3302       62532 :         TRC_DEBUG(WAL, "tdone " LLFMT "\n", commit_ts);
    3303             : 
    3304       62532 :         if ((ulng) ATOMIC_GET(&range->last_ts) < commit_ts)
    3305       61902 :                 ATOMIC_SET(&range->last_ts, commit_ts);
    3306       62532 : }
    3307             : 
    3308             : gdk_return
    3309       62535 : log_tflush(logger *lg, ulng file_id, ulng commit_ts)
    3310             : {
    3311       62535 :         rotation_lock(lg);
    3312       62535 :         if (lg->flushnow) {
    3313        5940 :                 logged_range *p = lg->current;
    3314        5940 :                 assert(lg->flush_ranges == lg->current);
    3315        5940 :                 assert(ATOMIC_GET(&lg->current->flushed_ts) == ATOMIC_GET(&lg->current->last_ts));
    3316        5940 :                 log_tdone(lg, lg->current, commit_ts);
    3317        5940 :                 ATOMIC_SET(&lg->current->flushed_ts, commit_ts);
    3318        5940 :                 lg->id++;
    3319        5940 :                 lg->flushnow = false;
    3320        5940 :                 if (log_open_output(lg) != GDK_SUCCEED)
    3321           0 :                         GDKfatal("Could not create new log file\n");  /* TODO: does not have to be fatal (yet) */
    3322        5940 :                 do_rotate(lg);
    3323        5940 :                 (void) do_flush_range_cleanup(lg);
    3324        5940 :                 assert(lg->flush_ranges == lg->current);
    3325        5940 :                 rotation_unlock(lg);
    3326        5940 :                 return log_commit(lg, p, NULL, 0);
    3327             :         }
    3328             : 
    3329       56595 :         if (LOG_DISABLED(lg)) {
    3330           3 :                 rotation_unlock(lg);
    3331           3 :                 return GDK_SUCCEED;
    3332             :         }
    3333             : 
    3334       56592 :         logged_range *frange = do_flush_range_cleanup(lg);
    3335             : 
    3336       56806 :         while (frange->next && frange->id < file_id) {
    3337             :                 assert(frange->next);
    3338             :                 frange = frange->next;
    3339             :         }
    3340             : 
    3341       56592 :         log_tdone(lg, frange, commit_ts);
    3342             : 
    3343       56592 :         if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts) {
    3344             :                 /* delay needed ? */
    3345             : 
    3346       55962 :                 flush_lock(lg);
    3347             :                 /* check it one more time */
    3348       55962 :                 if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts)
    3349       55962 :                         do_flush(frange);
    3350       55962 :                 flush_unlock(lg);
    3351             :         }
    3352             :         /* else somebody else has flushed our log file */
    3353             : 
    3354       56592 :         if (ATOMIC_DEC(&frange->refcount) == 1 && !LOG_DISABLED(lg)) {
    3355       55025 :                 if (frange != lg->current && frange->output_log) {
    3356         153 :                         close_stream(frange->output_log);
    3357         153 :                         frange->output_log = NULL;
    3358             :                 }
    3359             :         }
    3360             : 
    3361       56592 :         if (ATOMIC_DEC(&lg->nr_flushers) == 0) {
    3362             :                 /* I am the last flusher
    3363             :                  * if present,
    3364             :                  * wake up the exclusive flusher in log_tstart */
    3365             :                 /* rotation_lock is still being held */
    3366       55350 :                 MT_cond_signal(&lg->excl_flush_cv);
    3367             :         }
    3368       56592 :         rotation_unlock(lg);
    3369             : 
    3370       56592 :         return GDK_SUCCEED;
    3371             : }
    3372             : 
    3373             : static gdk_return
    3374        6982 : log_tsequence_(logger *lg, int seq, lng val)
    3375             : {
    3376        6982 :         logformat l;
    3377             : 
    3378        6982 :         if (LOG_DISABLED(lg))
    3379             :                 return GDK_SUCCEED;
    3380        2840 :         l.flag = LOG_SEQ;
    3381        2840 :         l.id = seq;
    3382             : 
    3383        2840 :         TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
    3384             : 
    3385        2840 :         assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
    3386        5680 :         if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
    3387        5680 :             log_write_format(lg, &l) != GDK_SUCCEED ||
    3388        2840 :             !mnstr_writeLng(lg->current->output_log, val)) {
    3389           0 :                 TRC_CRITICAL(GDK, "write failed\n");
    3390           0 :                 ATOMIC_DEC(&lg->current->refcount);
    3391           0 :                 return GDK_FAIL;
    3392             :         }
    3393             :         return GDK_SUCCEED;
    3394             : }
    3395             : 
    3396             : /* a transaction in it self */
    3397             : gdk_return
    3398        6982 : log_tsequence(logger *lg, int seq, lng val)
    3399             : {
    3400        6982 :         BUN p;
    3401             : 
    3402        6982 :         TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
    3403             : 
    3404        6982 :         log_lock(lg);
    3405        6982 :         MT_lock_set(&lg->seqs_id->theaplock);
    3406        6982 :         BUN inserted = lg->seqs_id->batInserted;
    3407        6982 :         MT_lock_unset(&lg->seqs_id->theaplock);
    3408        6982 :         if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE && p >= inserted) {
    3409        1624 :                 assert(lg->seqs_val->hseqbase == 0);
    3410        1624 :                 if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
    3411           0 :                         log_unlock(lg);
    3412           0 :                         return GDK_FAIL;
    3413             :                 }
    3414             :         } else {
    3415        4990 :                 if (p != BUN_NONE) {
    3416        4990 :                         oid pos = p;
    3417        4990 :                         if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
    3418           0 :                                 log_unlock(lg);
    3419           0 :                                 return GDK_FAIL;
    3420             :                         }
    3421             :                 }
    3422       10716 :                 if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
    3423        5358 :                     BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
    3424           0 :                         log_unlock(lg);
    3425           0 :                         return GDK_FAIL;
    3426             :                 }
    3427             :         }
    3428        6982 :         gdk_return r = log_tsequence_(lg, seq, val);
    3429        6982 :         log_unlock(lg);
    3430        6982 :         return r;
    3431             : }
    3432             : 
    3433             : static gdk_return
    3434       13342 : bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
    3435             : {
    3436       13342 :         log_lock(lg);
    3437       13342 :         BAT *b = lg->catalog_bid;
    3438       13342 :         const log_bid *bids;
    3439             : 
    3440       13342 :         bids = (log_bid *) Tloc(b, 0);
    3441      250673 :         for (BUN p = b->batInserted, cnt = pending ? pending->cnt : BATcount(b); p < cnt; p++) {
    3442      237331 :                 log_bid bid = bids[p];
    3443      237331 :                 BAT *lb = BBP_desc(bid);
    3444             : 
    3445      237331 :                 assert(bid);
    3446      237331 :                 if (lb->batCacheid == 0 || BATmode(lb, false) != GDK_SUCCEED) {
    3447           0 :                         GDKwarning("Failed to set bat (%d%s) persistent\n", bid, !lb ? " gone" : "");
    3448           0 :                         log_unlock(lg);
    3449           0 :                         return GDK_FAIL;
    3450             :                 }
    3451             : 
    3452      237331 :                 assert(lb->batRestricted != BAT_WRITE);
    3453             : 
    3454      237331 :                 TRC_DEBUG(WAL, "create %d (%d)\n", bid, BBP_lrefs(bid));
    3455             :         }
    3456             :         /* bm_subcommit releases the lock */
    3457       13342 :         return bm_subcommit(lg, pending, updated, maxupdated);
    3458             : }
    3459             : 
    3460             : static gdk_return
    3461      237597 : log_add_bat(logger *lg, BAT *b, log_id id, int tid)
    3462             : {
    3463      237597 :         log_bid bid = internal_find_bat(lg, id, tid);
    3464      237597 :         lng cnt = 0;
    3465      237597 :         lng lid = lng_nil;
    3466             : 
    3467      237597 :         assert(b->batRestricted != BAT_WRITE);
    3468      237597 :         assert(b->batRole == PERSISTENT);
    3469      237597 :         if (bid < 0)
    3470             :                 return GDK_FAIL;
    3471      237597 :         if (bid) {
    3472      155279 :                 if (bid != b->batCacheid) {
    3473      155276 :                         if (log_del_bat(lg, bid) != GDK_SUCCEED)
    3474             :                                 return GDK_FAIL;
    3475             :                 } else {
    3476             :                         return GDK_SUCCEED;
    3477             :                 }
    3478             :         }
    3479      237594 :         bid = b->batCacheid;
    3480      237594 :         TRC_DEBUG(WAL, "create %d\n", id);
    3481      237594 :         assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
    3482      475188 :         if (BUNappend(lg->catalog_bid, &bid, true) != GDK_SUCCEED ||
    3483      475188 :             BUNappend(lg->catalog_id, &id, true) != GDK_SUCCEED ||
    3484      475188 :             BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED ||
    3485      237594 :             BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
    3486           0 :                 return GDK_FAIL;
    3487      237594 :         if (lg->current)
    3488      237330 :                 lg->current->cnt++;
    3489      237594 :         BBPretain(bid);
    3490      237594 :         return GDK_SUCCEED;
    3491             : }
    3492             : 
    3493             : static gdk_return
    3494      177626 : log_del_bat(logger *lg, log_bid bid)
    3495             : {
    3496      177626 :         BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid);
    3497      177626 :         lng lid = lg->tid;
    3498             : 
    3499      177626 :         assert(p != BUN_NONE);
    3500      177626 :         if (p == BUN_NONE) {
    3501             :                 GDKerror("cannot find BAT\n");
    3502             :                 return GDK_FAIL;
    3503             :         }
    3504             : 
    3505      177626 :         assert(lg->catalog_lid->hseqbase == 0);
    3506      177626 :         return BUNreplace(lg->catalog_lid, p, &lid, false);
    3507             : }
    3508             : 
    3509             : /* returns -1 on failure, 0 when not found, > 0 when found */
    3510             : log_bid
    3511       33520 : log_find_bat(logger *lg, log_id id)
    3512             : {
    3513       33520 :         log_lock(lg);
    3514       33520 :         log_bid bid = internal_find_bat(lg, id, -1);
    3515       33520 :         log_unlock(lg);
    3516       33520 :         if (!bid) {
    3517           0 :                 GDKerror("logger_find_bat failed to find bid for object %d\n", id);
    3518           0 :                 return GDK_FAIL;
    3519             :         }
    3520             :         return bid;
    3521             : }
    3522             : 
    3523             : 
    3524             : 
    3525             : gdk_return
    3526       62536 : log_tstart(logger *lg, bool flushnow, ulng *file_id)
    3527             : {
    3528       62536 :         rotation_lock(lg);
    3529       62536 :         if (flushnow) {
    3530        5941 :                 if (file_id == NULL) {
    3531             :                         /* special case: ask store_manager to rotate log file */
    3532           1 :                         lg->file_age = 0;
    3533           1 :                         rotation_unlock(lg);
    3534           1 :                         return GDK_SUCCEED;
    3535             :                 }
    3536             :                 /* I am now the exclusive flusher */
    3537        5940 :                 while (ATOMIC_GET(&lg->nr_flushers)) {
    3538             :                         /* I am waiting until all existing flushers are done */
    3539           0 :                         MT_cond_wait(&lg->excl_flush_cv, &lg->rotation_lock);
    3540             :                 }
    3541        5940 :                 assert(ATOMIC_GET(&lg->nr_flushers) == 0);
    3542             : 
    3543        5940 :                 if (ATOMIC_GET(&lg->current->last_ts)) {
    3544        2458 :                         lg->id++;
    3545        2458 :                         if (log_open_output(lg) != GDK_SUCCEED)
    3546           0 :                                 GDKfatal("Could not create new log file\n");  /* TODO: does not have to be fatal (yet) */
    3547             :                 }
    3548        5940 :                 do_rotate(lg);
    3549        5940 :                 (void) do_flush_range_cleanup(lg);
    3550        5940 :                 rotation_unlock(lg);
    3551             : 
    3552        5940 :                 if (lg->saved_id + 1 < lg->id)
    3553        5428 :                         log_flush(lg, (1ULL << 63));
    3554        5940 :                 lg->flushnow = flushnow;
    3555             :         } else {
    3556       56595 :                 if (check_rotation_conditions(lg)) {
    3557        4338 :                         lg->id++;
    3558        4338 :                         if (log_open_output(lg) != GDK_SUCCEED)
    3559           0 :                                 GDKfatal("Could not create new log file\n");  /* TODO: does not have to be fatal (yet) */
    3560             :                 }
    3561       56595 :                 do_rotate(lg);
    3562       56595 :                 rotation_unlock(lg);
    3563             :         }
    3564             : 
    3565       62535 :         if (LOG_DISABLED(lg))
    3566             :                 return GDK_SUCCEED;
    3567             : 
    3568       56592 :         ATOMIC_INC(&lg->current->refcount);
    3569       56592 :         *file_id = lg->current->id;
    3570       56592 :         logformat l;
    3571       56592 :         l.flag = LOG_START;
    3572       56592 :         l.id = ++lg->tid;
    3573             : 
    3574       56592 :         TRC_DEBUG(WAL, "tstart %d\n", lg->tid);
    3575       56592 :         if (log_write_format(lg, &l) != GDK_SUCCEED) {
    3576           0 :                 ATOMIC_DEC(&lg->current->refcount);
    3577           0 :                 return GDK_FAIL;
    3578             :         }
    3579             : 
    3580             :         return GDK_SUCCEED;
    3581             : }
    3582             : 
    3583             : void
    3584         118 : log_printinfo(logger *lg)
    3585             : {
    3586         118 :         if (!rotation_trylock(lg, 1000)) {
    3587           0 :                 printf("Logger is currently locked, so no logger information\n");
    3588           0 :                 return;
    3589             :         }
    3590         118 :         printf("logger %s:\n", lg->fn);
    3591         118 :         printf("current log file "ULLFMT", last handled log file "ULLFMT"\n",
    3592             :                lg->id, lg->saved_id);
    3593         118 :         printf("current transaction id %d, saved transaction id %d\n",
    3594             :                lg->tid, lg->saved_tid);
    3595         118 :         printf("number of flushers: %d\n", (int) ATOMIC_GET(&lg->nr_flushers));
    3596         118 :         printf("number of catalog entries "BUNFMT", of which "BUNFMT" deleted\n",
    3597         118 :                lg->catalog_bid->batCount, lg->dcatalog->batCount);
    3598         307 :         for (logged_range *p = lg->pending; p; p = p->next) {
    3599         189 :                 char buf[32];
    3600         189 :                 if ((lg->debug & 128 || lg->inmemory) ||
    3601         189 :                     p->output_log == NULL ||
    3602         118 :                     snprintf(buf, sizeof(buf), ", file size %"PRIu64, (uint64_t) getfilepos(getFile(lg->current->output_log))) >= (int) sizeof(buf))
    3603          71 :                         buf[0] = 0;
    3604         260 :                 printf("pending range "ULLFMT": drops %"PRIu64", last_ts %"PRIu64", flushed_ts %"PRIu64", refcount %"PRIu64"%s%s\n", p->id, (uint64_t) ATOMIC_GET(&p->drops), (uint64_t) ATOMIC_GET(&p->last_ts), (uint64_t) ATOMIC_GET(&p->flushed_ts), (uint64_t) ATOMIC_GET(&p->refcount), buf, p == lg->current ? " (current)" : "");
    3605             :         }
    3606         118 :         rotation_unlock(lg);
    3607             : }

Generated by: LCOV version 1.14