Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 : #include "gdk_logger.h"
17 : #include "gdk_logger_internals.h"
18 : #include "mutils.h"
19 : #include <string.h>
20 :
21 : static gdk_return log_add_bat(logger *lg, BAT *b, log_id id, int tid);
22 : static gdk_return log_del_bat(logger *lg, log_bid bid);
23 : /*
24 : * The logger uses a directory to store its log files. One master log
25 : * file stores information about the version of the logger and the
26 : * type mapping it uses. This file is a simple ascii file with the
27 : * following format:
28 : * {6DIGIT-VERSION\n[id,type_name\n]*}
29 : * The transaction log files have a binary format.
30 : */
31 :
32 : #define LOG_START 0
33 : #define LOG_END 1
34 : #define LOG_UPDATE_CONST 2
35 : #define LOG_UPDATE_BULK 3
36 : #define LOG_UPDATE 4
37 : #define LOG_CREATE 5
38 : #define LOG_DESTROY 6
39 : #define LOG_SEQ 7
40 : #define LOG_CLEAR 8 /* DEPRECATED */
41 : #define LOG_BAT_GROUP 9
42 :
43 : #ifdef NATIVE_WIN32
44 : #define getfilepos _ftelli64
45 : #else
46 : #ifdef HAVE_FSEEKO
47 : #define getfilepos ftello
48 : #else
49 : #define getfilepos ftell
50 : #endif
51 : #endif
52 :
53 : #define BATSIZE 0
54 :
55 : #define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory || (lg)->flushnow)
56 :
57 : static const char *log_commands[] = {
58 : "LOG_START",
59 : "LOG_END",
60 : "LOG_UPDATE_CONST",
61 : "LOG_UPDATE_BULK",
62 : "LOG_UPDATE",
63 : "LOG_CREATE",
64 : "LOG_DESTROY",
65 : "LOG_SEQ",
66 : "", /* LOG_CLEAR IS DEPRECATED */
67 : "LOG_BAT_GROUP",
68 : };
69 :
70 : typedef struct logaction {
71 : int type; /* type of change */
72 : lng nr;
73 : int tt;
74 : lng id;
75 : lng offset;
76 : log_id cid; /* id of object */
77 : BAT *b; /* temporary bat with changes */
78 : BAT *uid; /* temporary bat with bun positions to update */
79 : } logaction;
80 :
81 : /* during the recover process a number of transactions could be active */
82 : typedef struct trans {
83 : int tid; /* transaction id */
84 : int sz; /* sz of the changes array */
85 : int nr; /* nr of changes */
86 :
87 : logaction *changes;
88 :
89 : struct trans *tr;
90 : } trans;
91 :
92 : typedef struct logformat_t {
93 : bte flag;
94 : int id;
95 : } logformat;
96 :
97 : typedef enum { LOG_OK, LOG_EOF, LOG_ERR } log_return;
98 :
99 : static gdk_return bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated);
100 : static gdk_return tr_grow(trans *tr);
101 :
102 : #define log_lock(lg) MT_lock_set(&(lg)->lock)
103 : #define log_unlock(lg) MT_lock_unset(&(lg)->lock)
104 :
105 : static inline bte
106 950598 : find_type(logger *lg, int tpe)
107 : {
108 950598 : assert(tpe >= 0 && tpe < MAXATOMS);
109 950598 : return lg->type_id[tpe];
110 : }
111 :
112 : static inline int
113 552969 : find_type_nr(logger *lg, bte tpe)
114 : {
115 552969 : int nr = lg->type_nr[tpe < 0 ? 256 + tpe : tpe];
116 552969 : if (nr == 255)
117 0 : return -1;
118 : return nr;
119 : }
120 :
121 : static BUN
122 422275 : log_find(BAT *b, BAT *d, int val)
123 : {
124 422275 : BUN p;
125 :
126 422275 : assert(b->ttype == TYPE_int);
127 422275 : assert(d->ttype == TYPE_oid);
128 422275 : BATiter bi = bat_iterator(b);
129 422275 : if (BAThash(b) == GDK_SUCCEED) {
130 422275 : MT_rwlock_rdlock(&b->thashlock);
131 688618 : HASHloop_int(bi, b->thash, p, &val) {
132 184779 : oid pos = p;
133 184779 : if (BUNfnd(d, &pos) == BUN_NONE) {
134 184779 : MT_rwlock_rdunlock(&b->thashlock);
135 184779 : bat_iterator_end(&bi);
136 184779 : return p;
137 : }
138 : }
139 237496 : MT_rwlock_rdunlock(&b->thashlock);
140 : } else { /* unlikely: BAThash failed */
141 0 : int *t = (int *) bi.base;
142 :
143 0 : for (p = 0; p < bi.count; p++) {
144 0 : if (t[p] == val) {
145 0 : oid pos = p;
146 0 : if (BUNfnd(d, &pos) == BUN_NONE) {
147 0 : bat_iterator_end(&bi);
148 0 : return p;
149 : }
150 : }
151 : }
152 : }
153 237496 : bat_iterator_end(&bi);
154 237496 : return BUN_NONE;
155 : }
156 :
157 : static log_bid
158 659169 : internal_find_bat(logger *lg, log_id id, int tid)
159 : {
160 659169 : BUN p;
161 :
162 659169 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
163 659169 : BATiter cni = bat_iterator(lg->catalog_id);
164 659169 : MT_rwlock_rdlock(&cni.b->thashlock);
165 659169 : if (tid < 0) {
166 417652 : HASHloop_int(cni, cni.b->thash, p, &id) {
167 210727 : oid pos = p;
168 210727 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
169 210727 : MT_rwlock_rdunlock(&cni.b->thashlock);
170 210727 : bat_iterator_end(&cni);
171 210727 : return *(log_bid *) Tloc(lg->catalog_bid, p);
172 : }
173 : }
174 : } else {
175 367191 : BUN cp = BUN_NONE;
176 3254102 : HASHloop_int(cni, cni.b->thash, p, &id) {
177 2916609 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
178 2916609 : if (lid != lng_nil && lid <= tid) {
179 : break;
180 : }
181 : cp = p;
182 : }
183 367191 : if (cp != BUN_NONE) {
184 366935 : MT_rwlock_rdunlock(&cni.b->thashlock);
185 366935 : bat_iterator_end(&cni);
186 366935 : return *(log_bid *) Tloc(lg->catalog_bid, cp);
187 : }
188 : }
189 81507 : MT_rwlock_rdunlock(&cni.b->thashlock);
190 81507 : bat_iterator_end(&cni);
191 81507 : return 0; /* not found */
192 : }
193 : return -1; /* error creating hash */
194 : }
195 :
196 : static inline void
197 13544 : logbat_destroy(BAT *b)
198 : {
199 16700 : BBPreclaim(b);
200 3816 : }
201 :
202 : static BAT *
203 13267 : logbat_new(int tt, BUN size, role_t role)
204 : {
205 13267 : BAT *nb = COLnew(0, tt, size, role);
206 :
207 13267 : if (nb) {
208 13267 : BBP_pid(nb->batCacheid) = 0;
209 13267 : if (role == PERSISTENT) {
210 8365 : BATmode(nb, false);
211 8365 : nb = BATsetaccess(nb, BAT_READ);
212 : }
213 : } else {
214 0 : TRC_CRITICAL(GDK, "creating new BAT[%s]#" BUNFMT " failed\n", ATOMname(tt), size);
215 : }
216 13267 : return nb;
217 : }
218 :
219 : static bool
220 767753 : log_read_format(logger *lg, logformat *data)
221 : {
222 767753 : assert(!lg->inmemory);
223 767753 : if (mnstr_read(lg->input_log, &data->flag, 1, 1) == 1) {
224 755476 : if (mnstr_readInt(lg->input_log, &data->id) == 1)
225 : return true;
226 : /* could only read part, so complain */
227 0 : TRC_CRITICAL(GDK, "read failed\n");
228 : }
229 : return false;
230 : }
231 :
232 : static gdk_return
233 760345 : log_write_format(logger *lg, logformat *data)
234 : {
235 760345 : assert(data->id || data->flag);
236 760345 : assert(!lg->inmemory);
237 760345 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
238 1520690 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
239 1520690 : mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
240 760345 : mnstr_writeInt(lg->current->output_log, data->id))
241 : return GDK_SUCCEED;
242 0 : TRC_CRITICAL(GDK, "write failed\n");
243 0 : return GDK_FAIL;
244 : }
245 :
246 : static log_return
247 2780 : log_read_seq(logger *lg, logformat *l)
248 : {
249 2780 : int seq = l->id;
250 2780 : lng val;
251 2780 : BUN p;
252 :
253 2780 : assert(!lg->inmemory);
254 2780 : if (mnstr_readLng(lg->input_log, &val) != 1) {
255 0 : TRC_CRITICAL(GDK, "read failed\n");
256 0 : return LOG_EOF;
257 : }
258 2780 : if (lg->flushing)
259 : return LOG_OK;
260 :
261 65 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
262 64 : p >= lg->seqs_id->batInserted) {
263 40 : assert(lg->seqs_val->hseqbase == 0);
264 40 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
265 0 : TRC_CRITICAL(GDK, "replace of %s_seqs_val failed\n", lg->fn);
266 0 : return LOG_ERR;
267 : }
268 : } else {
269 24 : if (p != BUN_NONE) {
270 24 : oid pos = p;
271 24 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
272 0 : TRC_CRITICAL(GDK, "append to %s_dseqs failed\n", lg->fn);
273 0 : return LOG_ERR;
274 : }
275 : }
276 50 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
277 25 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
278 0 : TRC_CRITICAL(GDK, "append to %s_seqs_val/id failed\n", lg->fn);
279 0 : return LOG_ERR;
280 : }
281 : }
282 : return LOG_OK;
283 : }
284 :
285 : #if 0
286 : static gdk_return
287 : log_write_id(logger *lg, int id)
288 : {
289 : assert(!lg->inmemory);
290 : assert(id >= 0);
291 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
292 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
293 : mnstr_writeInt(lg->current->output_log, id))
294 : return GDK_SUCCEED;
295 : TRC_CRITICAL(GDK, "write failed\n");
296 : return GDK_FAIL;
297 : }
298 :
299 : static log_return
300 : log_read_id(logger *lg, log_id *id)
301 : {
302 : assert(!lg->inmemory);
303 : if (mnstr_readInt(lg->input_log, id) != 1) {
304 : TRC_CRITICAL(GDK, "read failed\n");
305 : return LOG_EOF;
306 : }
307 : return LOG_OK;
308 : }
309 : #endif
310 :
311 : static log_return
312 19180 : string_reader(logger *lg, BAT *b, lng nr)
313 : {
314 19180 : size_t sz = 0;
315 19180 : lng SZ = 0;
316 19180 : log_return res = LOG_OK;
317 :
318 38527 : while (nr && res == LOG_OK) {
319 19347 : if (mnstr_readLng(lg->input_log, &SZ) != 1) {
320 0 : TRC_CRITICAL(GDK, "read failed\n");
321 0 : return LOG_EOF;
322 : }
323 19347 : sz = (size_t) SZ;
324 19347 : char *buf = lg->rbuf;
325 19347 : if (lg->rbufsize < sz) {
326 2 : if (!(buf = GDKrealloc(lg->rbuf, sz))) {
327 0 : TRC_CRITICAL(GDK, "couldn't grow string buffer\n");
328 0 : return LOG_ERR;
329 : }
330 2 : lg->rbuf = buf;
331 2 : lg->rbufsize = sz;
332 : }
333 :
334 19347 : if (mnstr_read(lg->input_log, buf, sz, 1) != 1) {
335 0 : TRC_CRITICAL(GDK, "read failed\n");
336 0 : return LOG_EOF;
337 : }
338 : /* handle strings */
339 : char *t = buf;
340 : /* chunked */
341 : #define CHUNK_SIZE 1024
342 : char *strings[CHUNK_SIZE];
343 : int cur = 0;
344 :
345 70866 : for (; nr > 0 && res == LOG_OK && t < (buf + sz); nr--) {
346 51519 : strings[cur++] = t;
347 51519 : if (cur == CHUNK_SIZE &&
348 4 : b &&
349 4 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
350 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
351 0 : res = LOG_ERR;
352 : }
353 51519 : if (cur == CHUNK_SIZE)
354 14 : cur = 0;
355 : /* find next */
356 5539823 : while (*t)
357 5488304 : t++;
358 51519 : t++;
359 : }
360 19347 : if (cur &&
361 332 : b &&
362 332 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
363 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
364 0 : res = LOG_ERR;
365 : }
366 : }
367 : return res;
368 : }
369 :
370 :
371 : struct offset {
372 : lng os; /* offset within source BAT in logfile */
373 : lng nr; /* number of values to be copied */
374 : lng od; /* offset within destination BAT in database */
375 : };
376 :
377 : static log_return
378 395397 : log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, BAT **cands)
379 : {
380 395397 : log_return res = LOG_OK;
381 395397 : lng nr, pnr;
382 395397 : bte type_id = -1;
383 395397 : int tpe;
384 :
385 395397 : assert(!lg->inmemory);
386 395397 : TRC_DEBUG(WAL, "found %d %s", id, l->flag == LOG_UPDATE ? "update" : "update_buld");
387 :
388 790794 : if (mnstr_readLng(lg->input_log, &nr) != 1 ||
389 395397 : mnstr_read(lg->input_log, &type_id, 1, 1) != 1) {
390 0 : TRC_CRITICAL(GDK, "read failed\n");
391 0 : return LOG_EOF;
392 : }
393 :
394 395397 : pnr = nr;
395 395397 : tpe = find_type_nr(lg, type_id);
396 395397 : if (tpe >= 0) {
397 395397 : BAT *uid = NULL;
398 395397 : BAT *r = NULL;
399 395397 : void *(*rt)(ptr, size_t *, stream *, size_t) = BATatoms[tpe].atomRead;
400 395397 : lng offset;
401 :
402 395397 : assert(nr <= (lng) BUN_MAX);
403 395397 : if (!lg->flushing && l->flag == LOG_UPDATE) {
404 0 : uid = COLnew(0, TYPE_oid, (BUN) nr, PERSISTENT);
405 0 : if (uid == NULL) {
406 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
407 28518 : return LOG_ERR;
408 : }
409 : }
410 :
411 395397 : if (l->flag == LOG_UPDATE_CONST) {
412 125014 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
413 0 : TRC_CRITICAL(GDK, "read failed\n");
414 0 : return LOG_EOF;
415 : }
416 125014 : if (cands) {
417 : /* This const range actually represents a segment of candidates corresponding to updated bat entries */
418 :
419 28518 : if (BATcount(*cands) == 0 || lg->flushing) {
420 : /* when flushing, we only need the offset and count of the last segment of inserts. */
421 28518 : assert((*cands)->ttype == TYPE_void);
422 28518 : BATtseqbase(*cands, (oid) offset);
423 28518 : BATsetcount(*cands, (BUN) nr);
424 0 : } else if (!lg->flushing) {
425 0 : assert(BATcount(*cands) > 0);
426 0 : BAT *dense = BATdense(0, (oid) offset, (BUN) nr);
427 0 : BAT *newcands = NULL;
428 0 : if (!dense) {
429 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
430 0 : res = LOG_ERR;
431 0 : } else if ((*cands)->ttype == TYPE_void) {
432 0 : if ((newcands = BATmergecand(*cands, dense))) {
433 0 : BBPreclaim(*cands);
434 0 : *cands = newcands;
435 : } else {
436 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
437 0 : res = LOG_ERR;
438 : }
439 : } else {
440 0 : assert((*cands)->ttype == TYPE_oid);
441 0 : assert(BATcount(*cands) > 0);
442 0 : if (BATappend(*cands, dense, NULL, true) != GDK_SUCCEED) {
443 0 : TRC_CRITICAL(GDK, "appending to bat failed\n");
444 0 : res = LOG_ERR;
445 : }
446 : }
447 0 : BBPreclaim(dense);
448 : }
449 :
450 : /* We have to read the value to update the read cursor */
451 28518 : size_t tlen = lg->rbufsize;
452 28518 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
453 28518 : if (t == NULL) {
454 0 : TRC_CRITICAL(GDK, "read failed\n");
455 0 : res = LOG_EOF;
456 : }
457 28518 : return res;
458 : }
459 : }
460 :
461 366879 : if (!lg->flushing) {
462 1397 : r = COLnew(0, tpe, (BUN) nr, PERSISTENT);
463 1397 : if (r == NULL) {
464 0 : if (uid)
465 0 : BBPreclaim(uid);
466 0 : return LOG_ERR;
467 : }
468 : }
469 :
470 366879 : if (l->flag == LOG_UPDATE_CONST) {
471 96496 : size_t tlen = lg->rbufsize;
472 96496 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
473 96496 : if (t == NULL) {
474 0 : TRC_CRITICAL(GDK, "read failed\n");
475 0 : res = LOG_EOF;
476 : } else {
477 96496 : lg->rbuf = t;
478 96496 : lg->rbufsize = tlen;
479 96496 : if (r) {
480 8175 : for (BUN p = 0; p < (BUN) nr; p++) {
481 7904 : if (BUNappend(r, t, true) != GDK_SUCCEED) {
482 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
483 0 : res = LOG_ERR;
484 : }
485 : }
486 : }
487 : }
488 270383 : } else if (l->flag == LOG_UPDATE_BULK) {
489 270365 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
490 0 : if (r)
491 0 : BBPreclaim(r);
492 0 : TRC_CRITICAL(GDK, "read failed\n");
493 0 : return LOG_EOF;
494 : }
495 270365 : if (tpe == TYPE_msk) {
496 0 : if (r) {
497 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
498 0 : BATsetcount(r, (BUN) nr);
499 : else {
500 0 : TRC_CRITICAL(GDK, "read failed\n");
501 0 : res = LOG_EOF;
502 : }
503 : } else {
504 0 : size_t tlen = lg->rbufsize / sizeof(int);
505 0 : size_t cnt = 0, snr = (size_t) nr;
506 0 : snr = (snr + 31) / 32;
507 0 : assert(tlen);
508 0 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
509 0 : cnt = snr > tlen ? tlen : snr;
510 0 : if (!mnstr_readIntArray(lg->input_log, lg->rbuf, cnt)) {
511 0 : TRC_CRITICAL(GDK, "read failed\n");
512 0 : res = LOG_EOF;
513 : }
514 : }
515 : }
516 : } else {
517 270365 : if (!ATOMvarsized(tpe)) {
518 250603 : size_t cnt = 0, snr = (size_t) nr;
519 250603 : size_t tlen = lg->rbufsize / ATOMsize(tpe), ntlen = lg->rbufsize;
520 250603 : assert(tlen);
521 : /* read in chunks of max
522 : * BUFSIZE/width rows */
523 501209 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
524 250606 : cnt = snr > tlen ? tlen : snr;
525 250606 : void *t = rt(lg->rbuf, &ntlen, lg->input_log, cnt);
526 :
527 250606 : if (t == NULL) {
528 : res = LOG_EOF;
529 : break;
530 : }
531 250606 : assert(t == lg->rbuf);
532 250606 : if (r && BUNappendmulti(r, t, cnt, true) != GDK_SUCCEED) {
533 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
534 0 : res = LOG_ERR;
535 : }
536 : }
537 19762 : } else if (tpe == TYPE_str) {
538 : /* efficient string */
539 19174 : res = string_reader(lg, r, nr);
540 : } else {
541 1218 : for (; res == LOG_OK && nr > 0; nr--) {
542 630 : size_t tlen = lg->rbufsize;
543 630 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
544 :
545 630 : if (t == NULL) {
546 : /* see if failure was due to
547 : * malloc or something less
548 : * serious (in the current
549 : * context) */
550 0 : if (strstr(GDKerrbuf, "alloc") == NULL)
551 : res = LOG_EOF;
552 : else
553 0 : res = LOG_ERR;
554 0 : TRC_CRITICAL(GDK, "read failed\n");
555 : } else {
556 630 : lg->rbuf = t;
557 630 : lg->rbufsize = tlen;
558 630 : if (r && BUNappend(r, t, true) != GDK_SUCCEED) {
559 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
560 0 : res = LOG_ERR;
561 : }
562 : }
563 : }
564 : }
565 : }
566 : } else {
567 18 : void *(*rh)(ptr, size_t *, stream *, size_t) = BATatoms[TYPE_oid].atomRead;
568 18 : void *hv = ATOMnil(TYPE_oid);
569 18 : offset = 0;
570 :
571 18 : if (hv == NULL) {
572 0 : TRC_CRITICAL(GDK, "read failed\n");
573 0 : res = LOG_EOF;
574 : }
575 1071 : for (; res == LOG_OK && nr > 0; nr--) {
576 1053 : size_t hlen = sizeof(oid);
577 1053 : void *h = rh(hv, &hlen, lg->input_log, 1);
578 1053 : assert(hlen == sizeof(oid));
579 1053 : assert(h == hv);
580 1053 : if ((uid && BUNappend(uid, h, true) != GDK_SUCCEED)) {
581 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
582 0 : res = LOG_ERR;
583 : }
584 : }
585 18 : nr = pnr;
586 18 : if (tpe == TYPE_msk) {
587 0 : if (r) {
588 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
589 0 : BATsetcount(r, (BUN) nr);
590 : else {
591 0 : TRC_CRITICAL(GDK, "read failed\n");
592 0 : res = LOG_EOF;
593 : }
594 : } else {
595 0 : for (lng i = 0; i < nr; i += 32) {
596 0 : int v;
597 0 : switch (mnstr_readInt(lg->input_log, &v)) {
598 0 : case 1:
599 0 : continue;
600 : case 0:
601 : res = LOG_EOF;
602 : break;
603 : default:
604 : res = LOG_ERR;
605 : break;
606 : }
607 0 : TRC_CRITICAL(GDK, "read failed\n");
608 0 : break;
609 : }
610 : }
611 18 : } else if (tpe == TYPE_str) {
612 : /* efficient string */
613 6 : res = string_reader(lg, r, nr);
614 : } else {
615 58 : for (; res == LOG_OK && nr > 0; nr--) {
616 46 : size_t tlen = lg->rbufsize;
617 46 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
618 :
619 46 : if (t == NULL) {
620 0 : if (strstr(GDKerrbuf, "malloc") == NULL)
621 : res = LOG_EOF;
622 : else
623 0 : res = LOG_ERR;
624 0 : TRC_CRITICAL(GDK, "read failed\n");
625 : } else {
626 46 : lg->rbuf = t;
627 46 : lg->rbufsize = tlen;
628 46 : if ((r && BUNappend(r, t, true) != GDK_SUCCEED)) {
629 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
630 0 : res = LOG_ERR;
631 : }
632 : }
633 : }
634 : }
635 18 : GDKfree(hv);
636 : }
637 :
638 366879 : if (res == LOG_OK) {
639 366879 : if (tr_grow(tr) == GDK_SUCCEED) {
640 366879 : tr->changes[tr->nr].type = l->flag;
641 366879 : if (l->flag == LOG_UPDATE_BULK && offset == -1) {
642 140593 : assert(cands); /* bat r is part of a group of bats logged together. */
643 140593 : struct canditer ci;
644 140593 : canditer_init(&ci, NULL, *cands);
645 140593 : const oid first = canditer_peek(&ci);
646 140593 : const oid last = canditer_last(&ci);
647 140593 : offset = (lng) first;
648 140593 : pnr = (lng) (last - first) + 1;
649 140593 : if (!lg->flushing) {
650 1022 : assert(uid == NULL);
651 1022 : uid = *cands;
652 1022 : BBPfix((*cands)->batCacheid);
653 1022 : tr->changes[tr->nr].type = LOG_UPDATE;
654 : }
655 : }
656 366879 : if (l->flag == LOG_UPDATE_CONST) {
657 96496 : assert(!cands); /* TODO: This might change in the future. */
658 96496 : tr->changes[tr->nr].type = LOG_UPDATE_BULK;
659 : }
660 366879 : tr->changes[tr->nr].nr = pnr;
661 366879 : tr->changes[tr->nr].tt = tpe;
662 366879 : tr->changes[tr->nr].cid = id;
663 366879 : tr->changes[tr->nr].offset = offset;
664 366879 : tr->changes[tr->nr].b = r;
665 366879 : tr->changes[tr->nr].uid = uid;
666 366879 : tr->nr++;
667 : } else {
668 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
669 0 : res = LOG_ERR;
670 : }
671 : }
672 366879 : if (res != LOG_OK) {
673 0 : if (r)
674 0 : BBPreclaim(r);
675 0 : if (cands && uid)
676 0 : BBPunfix((*cands)->batCacheid);
677 0 : else if (uid)
678 0 : BBPreclaim(uid);
679 : }
680 : } else {
681 : /* bat missing ERROR or ignore ? currently error. */
682 0 : TRC_CRITICAL(GDK, "unknown type\n");
683 0 : res = LOG_ERR;
684 : }
685 : return res;
686 : }
687 :
688 :
689 : static gdk_return
690 595899 : la_bat_update_count(logger *lg, log_id id, lng cnt, int tid)
691 : {
692 595899 : BATiter cni = bat_iterator_nolock(lg->catalog_id);
693 :
694 595899 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
695 595899 : MT_rwlock_rdlock(&cni.b->thashlock);
696 595899 : BUN p, cp = BUN_NONE;
697 :
698 3890038 : HASHloop_int(cni, cni.b->thash, p, &id) {
699 3146630 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
700 :
701 3146630 : if (lid != lng_nil && lid <= tid)
702 : break;
703 : cp = p;
704 : }
705 595899 : if (cp != BUN_NONE) {
706 595697 : lng ocnt = *(lng *) Tloc(lg->catalog_cnt, cp);
707 595697 : assert(lg->catalog_cnt->hseqbase == 0);
708 595697 : if (ocnt < cnt && BUNreplace(lg->catalog_cnt, cp, &cnt, false) != GDK_SUCCEED) {
709 0 : MT_rwlock_rdunlock(&cni.b->thashlock);
710 0 : return GDK_FAIL;
711 : }
712 : }
713 595899 : MT_rwlock_rdunlock(&cni.b->thashlock);
714 595899 : return GDK_SUCCEED;
715 : }
716 : return GDK_FAIL;
717 : }
718 :
719 : static gdk_return
720 366879 : la_bat_updates(logger *lg, logaction *la, int tid)
721 : {
722 366879 : log_bid bid = internal_find_bat(lg, la->cid, tid);
723 366879 : BAT *b = NULL;
724 :
725 366879 : if (bid < 0)
726 : return GDK_FAIL;
727 366879 : if (!bid) {
728 : /* object already gone, nothing needed */
729 : return GDK_SUCCEED;
730 : }
731 :
732 366879 : if (!lg->flushing) {
733 1397 : b = BATdescriptor(bid);
734 1397 : if (b == NULL)
735 : return GDK_FAIL;
736 : }
737 366879 : BUN cnt = 0;
738 366879 : if (la->type == LOG_UPDATE_BULK) {
739 365839 : if (!lg->flushing) {
740 375 : cnt = BATcount(b);
741 375 : int is_msk = (b->ttype == TYPE_msk);
742 : /* handle offset 0 ie clear */
743 375 : if ( /* DISABLES CODE */ (0) && la->offset == 0 && cnt)
744 : BATclear(b, true);
745 : /* handle offset */
746 375 : if (cnt <= (BUN) la->offset) {
747 234 : msk t = 1;
748 234 : if (cnt < (BUN) la->offset) { /* insert nils */
749 0 : const void *tv = (is_msk) ? &t : ATOMnilptr(b->ttype);
750 0 : lng i, d = la->offset - BATcount(b);
751 0 : for (i = 0; i < d; i++) {
752 0 : if (BUNappend(b, tv, true) != GDK_SUCCEED) {
753 0 : logbat_destroy(b);
754 0 : return GDK_FAIL;
755 : }
756 : }
757 : }
758 234 : if (BATcount(b) == (BUN) la->offset && BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
759 0 : logbat_destroy(b);
760 0 : return GDK_FAIL;
761 : }
762 : } else {
763 141 : BATiter vi = bat_iterator(la->b);
764 141 : BUN p, q;
765 :
766 2199 : for (p = 0, q = (BUN) la->offset; p < (BUN) la->nr; p++, q++) {
767 2058 : const void *t = BUNtail(vi, p);
768 :
769 2058 : if (q < cnt) {
770 2057 : if (b->tnosorted == q)
771 7 : b->tnosorted = 0;
772 2057 : if (b->tnorevsorted == q)
773 7 : b->tnorevsorted = 0;
774 2057 : if (b->tnokey[0] == q ||
775 2050 : b->tnokey[1] == q) {
776 7 : b->tnokey[0] = 0;
777 7 : b->tnokey[1] = 0;
778 : }
779 2057 : b->tkey = false;
780 2057 : b->tsorted = false;
781 2057 : b->tkey = false;
782 2057 : if (BUNreplace(b, q, t, true) != GDK_SUCCEED) {
783 0 : logbat_destroy(b);
784 0 : bat_iterator_end(&vi);
785 0 : return GDK_FAIL;
786 : }
787 : } else {
788 1 : if (BUNappend(b, t, true) != GDK_SUCCEED) {
789 0 : logbat_destroy(b);
790 0 : bat_iterator_end(&vi);
791 0 : return GDK_FAIL;
792 : }
793 : }
794 : }
795 141 : bat_iterator_end(&vi);
796 : }
797 : }
798 1040 : } else if (la->type == LOG_UPDATE) {
799 1040 : if (!lg->flushing && BATupdate(b, la->uid, la->b, true) != GDK_SUCCEED) {
800 : return GDK_FAIL;
801 : }
802 : }
803 366879 : cnt = (BUN) (la->offset + la->nr);
804 366879 : if (la_bat_update_count(lg, la->cid, cnt, tid) != GDK_SUCCEED) {
805 0 : if (b)
806 0 : logbat_destroy(b);
807 0 : return GDK_FAIL;
808 : }
809 366879 : if (b)
810 1397 : logbat_destroy(b);
811 : return GDK_SUCCEED;
812 : }
813 :
814 : static log_return
815 9931 : log_read_destroy(logger *lg, trans *tr, log_id id)
816 : {
817 9931 : (void) lg;
818 9931 : assert(!lg->inmemory);
819 9931 : if (tr_grow(tr) == GDK_SUCCEED) {
820 9931 : tr->changes[tr->nr].type = LOG_DESTROY;
821 9931 : tr->changes[tr->nr].cid = id;
822 9931 : tr->nr++;
823 9931 : return LOG_OK;
824 : }
825 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
826 0 : return LOG_ERR;
827 : }
828 :
829 : static gdk_return
830 48 : la_bat_destroy(logger *lg, logaction *la, int tid)
831 : {
832 48 : log_bid bid = internal_find_bat(lg, la->cid, tid);
833 :
834 48 : if (bid < 0)
835 : return GDK_FAIL;
836 48 : if (!bid) {
837 : #ifndef NDEBUG
838 0 : GDKwarning("failed to find bid for object %d\n", la->cid);
839 : #endif
840 0 : return GDK_SUCCEED;
841 : }
842 48 : if (bid && log_del_bat(lg, bid) != GDK_SUCCEED)
843 : return GDK_FAIL;
844 : return GDK_SUCCEED;
845 : }
846 :
847 : static log_return
848 157572 : log_read_create(logger *lg, trans *tr, log_id id)
849 : {
850 157572 : bte tt;
851 157572 : int tpe;
852 :
853 157572 : assert(!lg->inmemory);
854 157572 : TRC_DEBUG(WAL, "create %d", id);
855 :
856 157572 : if (mnstr_read(lg->input_log, &tt, 1, 1) != 1) {
857 0 : TRC_CRITICAL(GDK, "read failed\n");
858 0 : return LOG_EOF;
859 : }
860 :
861 157572 : tpe = find_type_nr(lg, tt);
862 : /* read create */
863 157572 : if (tr_grow(tr) == GDK_SUCCEED) {
864 157572 : tr->changes[tr->nr].type = LOG_CREATE;
865 157572 : tr->changes[tr->nr].tt = tpe;
866 157572 : tr->changes[tr->nr].cid = id;
867 157572 : tr->nr++;
868 157572 : return LOG_OK;
869 : }
870 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
871 0 : return LOG_ERR;
872 : }
873 :
874 : static gdk_return
875 264 : la_bat_create(logger *lg, logaction *la, int tid)
876 : {
877 264 : BAT *b;
878 :
879 : /* formerly head column type, should be void */
880 264 : if ((b = COLnew(0, la->tt, BATSIZE, PERSISTENT)) == NULL)
881 : return GDK_FAIL;
882 :
883 264 : if (la->tt < 0)
884 0 : BATtseqbase(b, 0);
885 :
886 528 : if ((b = BATsetaccess(b, BAT_READ)) == NULL ||
887 264 : log_add_bat(lg, b, la->cid, tid) != GDK_SUCCEED) {
888 0 : logbat_destroy(b);
889 0 : return GDK_FAIL;
890 : }
891 264 : logbat_destroy(b);
892 264 : return GDK_SUCCEED;
893 : }
894 :
895 : static gdk_return
896 242 : log_write_new_types(logger *lg, FILE *fp)
897 : {
898 242 : bte id = 0;
899 :
900 : /* write types and insert into bats */
901 242 : memset(lg->type_id, -1, sizeof(lg->type_id));
902 242 : memset(lg->type_nr, 255, sizeof(lg->type_nr));
903 : /* first the fixed sized types */
904 6774 : for (int i = 0; i < GDKatomcnt; i++) {
905 6532 : if (ATOMvarsized(i))
906 1451 : continue;
907 5081 : lg->type_id[i] = id;
908 5081 : lg->type_nr[id] = i;
909 5081 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
910 : return GDK_FAIL;
911 5081 : id++;
912 : }
913 : /* second the var sized types */
914 : id = -127; /* start after nil */
915 6774 : for (int i = 0; i < GDKatomcnt; i++) {
916 6532 : if (!ATOMvarsized(i))
917 5081 : continue;
918 1451 : lg->type_id[i] = id;
919 1451 : lg->type_nr[256 + id] = i;
920 1451 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
921 : return GDK_FAIL;
922 1451 : id++;
923 : }
924 : return GDK_SUCCEED;
925 : }
926 :
927 : #define TR_SIZE 1024
928 :
929 : static trans *
930 56950 : tr_create(trans *tr, int tid)
931 : {
932 56950 : trans *ntr = GDKmalloc(sizeof(trans));
933 :
934 56950 : if (ntr == NULL)
935 : return NULL;
936 56950 : ntr->tid = tid;
937 56950 : ntr->sz = TR_SIZE;
938 56950 : ntr->nr = 0;
939 56950 : ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
940 56950 : if (ntr->changes == NULL) {
941 0 : GDKfree(ntr);
942 0 : return NULL;
943 : }
944 56950 : ntr->tr = tr;
945 56950 : return ntr;
946 : }
947 :
948 : static gdk_return
949 534382 : la_apply(logger *lg, logaction *c, int tid)
950 : {
951 534382 : gdk_return ret = GDK_SUCCEED;
952 :
953 534382 : switch (c->type) {
954 366879 : case LOG_UPDATE_BULK:
955 : case LOG_UPDATE:
956 366879 : ret = la_bat_updates(lg, c, tid);
957 366879 : break;
958 157572 : case LOG_CREATE:
959 157572 : if (!lg->flushing)
960 264 : ret = la_bat_create(lg, c, tid);
961 : break;
962 9931 : case LOG_DESTROY:
963 9931 : if (!lg->flushing)
964 48 : ret = la_bat_destroy(lg, c, tid);
965 : break;
966 : default:
967 0 : MT_UNREACHABLE();
968 : }
969 534382 : return ret;
970 : }
971 :
972 : static void
973 534382 : la_destroy(logaction *c)
974 : {
975 534382 : if ((c->type == LOG_UPDATE || c->type == LOG_UPDATE_BULK) && c->b)
976 1397 : logbat_destroy(c->b);
977 534382 : if (c->type == LOG_UPDATE && c->uid)
978 1022 : logbat_destroy(c->uid);
979 534382 : }
980 :
981 : static gdk_return
982 534382 : tr_grow(trans *tr)
983 : {
984 534382 : if (tr->nr == tr->sz) {
985 8 : logaction *changes;
986 8 : tr->sz <<= 1;
987 8 : changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
988 8 : if (changes == NULL)
989 : return GDK_FAIL;
990 8 : tr->changes = changes;
991 : }
992 : /* cleanup the next */
993 534382 : tr->changes[tr->nr].b = NULL;
994 534382 : return GDK_SUCCEED;
995 : }
996 :
997 : static trans *
998 56950 : tr_destroy(trans *tr)
999 : {
1000 56950 : trans *r = tr->tr;
1001 :
1002 56950 : GDKfree(tr->changes);
1003 56950 : GDKfree(tr);
1004 56950 : return r;
1005 : }
1006 :
1007 : static trans *
1008 0 : tr_abort_(logger *lg, trans *tr, int s)
1009 : {
1010 0 : int i;
1011 :
1012 0 : (void) lg;
1013 :
1014 0 : TRC_DEBUG(WAL, "abort");
1015 :
1016 0 : for (i = s; i < tr->nr; i++)
1017 0 : la_destroy(&tr->changes[i]);
1018 0 : return tr_destroy(tr);
1019 : }
1020 :
1021 : static trans *
1022 0 : tr_abort(logger *lg, trans *tr)
1023 : {
1024 0 : return tr_abort_(lg, tr, 0);
1025 : }
1026 :
1027 : static trans *
1028 56950 : tr_commit(logger *lg, trans *tr)
1029 : {
1030 56950 : int i;
1031 :
1032 56950 : TRC_DEBUG(WAL, "commit");
1033 :
1034 591332 : for (i = 0; i < tr->nr; i++) {
1035 534382 : if (la_apply(lg, &tr->changes[i], tr->tid) != GDK_SUCCEED) {
1036 0 : TRC_CRITICAL(GDK, "aborting transaction\n");
1037 0 : do {
1038 0 : tr = tr_abort_(lg, tr, i);
1039 0 : } while (tr != NULL);
1040 : return (trans *) -1;
1041 : }
1042 534382 : la_destroy(&tr->changes[i]);
1043 : }
1044 56950 : lg->saved_tid = tr->tid;
1045 56950 : return tr_destroy(tr);
1046 : }
1047 :
1048 : static gdk_return
1049 125 : log_read_types_file(logger *lg, FILE *fp, int version, bool *needsnew)
1050 : {
1051 125 : int id = 0;
1052 125 : char atom_name[IDLENGTH];
1053 125 : bool seen_geom = false;
1054 :
1055 : /* scanf should use IDLENGTH somehow */
1056 3528 : while (fscanf(fp, "%d,%63s\n", &id, atom_name) == 2) {
1057 3403 : if (version < 52303 && strcmp(atom_name, "BAT") == 0) {
1058 8 : *needsnew = true;
1059 8 : continue;
1060 : }
1061 3395 : if (version < 52304 && strcmp(atom_name, "color") == 0) {
1062 16 : *needsnew = true;
1063 16 : continue;
1064 : }
1065 456 : if (version < 52304 && strcmp(atom_name, "identifier") == 0) {
1066 16 : *needsnew = true;
1067 16 : continue;
1068 : }
1069 3363 : if (version < 52304 && strcmp(atom_name, "wkba") == 0) {
1070 16 : *needsnew = true;
1071 16 : continue;
1072 : }
1073 3347 : int i = ATOMindex(atom_name);
1074 :
1075 3347 : if (id < -127 || id > 127 || i < 0) {
1076 0 : GDKerror("unknown type in log file '%s'\n", atom_name);
1077 0 : return GDK_FAIL;
1078 : }
1079 3347 : seen_geom |= strcmp(atom_name, "mbr") == 0 || strcmp(atom_name, "wkb") == 0;
1080 3347 : lg->type_id[i] = (int8_t) id;
1081 3347 : lg->type_nr[id < 0 ? 256 + id : id] = i;
1082 : }
1083 : #ifdef HAVE_GEOM
1084 125 : if (!seen_geom && ATOMindex("mbr") > 0) {
1085 0 : GDKerror("incompatible database: server supports GEOM, but database does not\n");
1086 0 : return GDK_FAIL;
1087 : }
1088 : #endif
1089 : (void) seen_geom;
1090 : return GDK_SUCCEED;
1091 : }
1092 :
1093 :
1094 : static gdk_return
1095 242 : log_create_types_file(logger *lg, const char *filename)
1096 : {
1097 242 : FILE *fp;
1098 :
1099 242 : if ((fp = MT_fopen(filename, "w")) == NULL) {
1100 0 : GDKerror("cannot create log file %s\n", filename);
1101 0 : return GDK_FAIL;
1102 : }
1103 242 : if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
1104 0 : fclose(fp);
1105 0 : GDKerror("writing log file %s failed", filename);
1106 0 : if (MT_remove(filename) < 0)
1107 0 : GDKsyserror("remove %s failed\n", filename);
1108 0 : return GDK_FAIL;
1109 : }
1110 :
1111 242 : if (log_write_new_types(lg, fp) != GDK_SUCCEED) {
1112 0 : fclose(fp);
1113 0 : GDKerror("writing log file %s failed", filename);
1114 0 : if (MT_remove(filename) < 0)
1115 0 : GDKsyserror("remove %s failed\n", filename);
1116 0 : return GDK_FAIL;
1117 : }
1118 242 : if (fflush(fp) < 0 || (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)
1119 : #if defined(_MSC_VER)
1120 : && _commit(_fileno(fp)) < 0
1121 : #elif defined(HAVE_FDATASYNC)
1122 0 : && fdatasync(fileno(fp)) < 0
1123 : #elif defined(HAVE_FSYNC)
1124 : && fsync(fileno(fp)) < 0
1125 : #endif
1126 : )) {
1127 0 : GDKsyserror("flushing log file %s failed", filename);
1128 0 : fclose(fp);
1129 0 : if (MT_remove(filename) < 0)
1130 0 : GDKsyserror("remove %s failed\n", filename);
1131 0 : return GDK_FAIL;
1132 : }
1133 242 : if (fclose(fp) < 0) {
1134 0 : GDKsyserror("closing log file %s failed", filename);
1135 0 : if (MT_remove(filename) < 0)
1136 0 : GDKsyserror("remove %s failed\n", filename);
1137 0 : return GDK_FAIL;
1138 : }
1139 : return GDK_SUCCEED;
1140 : }
1141 :
1142 : #define rotation_lock(lg) MT_lock_set(&(lg)->rotation_lock)
1143 : #define rotation_unlock(lg) MT_lock_unset(&(lg)->rotation_lock)
1144 : #define rotation_trylock(lg, ms) MT_lock_trytime(&(lg)->rotation_lock, ms)
1145 :
1146 : static gdk_return
1147 12530 : log_open_output(logger *lg)
1148 : {
1149 12530 : logged_range *new_range = (logged_range *) GDKmalloc(sizeof(logged_range));
1150 :
1151 12530 : if (!new_range) {
1152 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1153 0 : return GDK_FAIL;
1154 : }
1155 25059 : if (!LOG_DISABLED(lg)) {
1156 12529 : char id[32];
1157 12529 : char filename[MAXPATH];
1158 :
1159 12529 : if (snprintf(id, sizeof(id), LLFMT, lg->id) >= (int) sizeof(id)) {
1160 0 : TRC_CRITICAL(GDK, "filename is too large\n");
1161 0 : GDKfree(new_range);
1162 0 : return GDK_FAIL;
1163 : }
1164 12529 : if (GDKfilepath(filename, sizeof(filename), BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id) != GDK_SUCCEED) {
1165 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1166 0 : GDKfree(new_range);
1167 0 : return GDK_FAIL;
1168 : }
1169 :
1170 12529 : TRC_INFO(WAL, "opening %s.%s", LOGFILE, id);
1171 12529 : new_range->output_log = open_wstream(filename);
1172 12529 : if (new_range->output_log) {
1173 12529 : short byteorder = 1234;
1174 12529 : mnstr_write(new_range->output_log, &byteorder, sizeof(byteorder), 1);
1175 : }
1176 :
1177 12529 : if (new_range->output_log == NULL || mnstr_errnr(new_range->output_log) != MNSTR_NO__ERROR) {
1178 0 : TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, mnstr_peek_error(NULL));
1179 0 : close_stream(new_range->output_log);
1180 0 : GDKfree(new_range);
1181 0 : return GDK_FAIL;
1182 : }
1183 : } else {
1184 1 : new_range->output_log = NULL;
1185 : }
1186 12530 : ATOMIC_INIT(&new_range->refcount, 1);
1187 12530 : ATOMIC_INIT(&new_range->last_ts, 0);
1188 12530 : ATOMIC_INIT(&new_range->flushed_ts, 0);
1189 12530 : ATOMIC_INIT(&new_range->drops, 0);
1190 12530 : new_range->id = lg->id;
1191 12530 : new_range->next = NULL;
1192 12530 : logged_range *current = lg->current;
1193 12530 : assert(current && current->next == NULL);
1194 12530 : new_range->cnt = current->cnt;
1195 12530 : current->next = new_range;
1196 12530 : lg->file_age = GDKusec();
1197 12530 : return GDK_SUCCEED;
1198 : }
1199 :
1200 : static inline void
1201 12758 : log_close_input(logger *lg)
1202 : {
1203 12758 : if (!lg->inmemory && lg->input_log) {
1204 12282 : TRC_INFO(WAL, "closing input log %s", mnstr_name(lg->input_log));
1205 12282 : close_stream(lg->input_log);
1206 : }
1207 12758 : lg->input_log = NULL;
1208 12758 : }
1209 :
1210 : static inline void
1211 351 : log_close_output(logger *lg)
1212 : {
1213 351 : if (!LOG_DISABLED(lg) && lg->current->output_log) {
1214 350 : TRC_INFO(WAL, "closing output log %s", mnstr_name(lg->current->output_log));
1215 350 : close_stream(lg->current->output_log);
1216 : }
1217 351 : lg->current->output_log = NULL;
1218 351 : }
1219 :
1220 : static gdk_return
1221 12407 : log_open_input(logger *lg, const char *filename, bool *filemissing)
1222 : {
1223 12407 : TRC_INFO(WAL, "opening input log %s", filename);
1224 12407 : lg->input_log = open_rstream(filename);
1225 :
1226 : /* if the file doesn't exist, there is nothing to be read back */
1227 12407 : if (lg->input_log == NULL || mnstr_errnr(lg->input_log) != MNSTR_NO__ERROR) {
1228 125 : log_close_input(lg);
1229 125 : *filemissing = true;
1230 125 : return GDK_SUCCEED;
1231 : }
1232 12282 : short byteorder;
1233 12282 : switch (mnstr_read(lg->input_log, &byteorder, sizeof(byteorder), 1)) {
1234 0 : case -1:
1235 0 : log_close_input(lg);
1236 0 : TRC_CRITICAL(GDK, "read failed\n");
1237 0 : return GDK_FAIL;
1238 5 : case 0:
1239 : /* empty file is ok */
1240 5 : log_close_input(lg);
1241 5 : return GDK_SUCCEED;
1242 12277 : case 1:
1243 : /* if not empty, must start with correct byte order mark */
1244 12277 : if (byteorder != 1234) {
1245 0 : TRC_CRITICAL(GDK, "incorrect byte order word in file %s\n", filename);
1246 0 : log_close_input(lg);
1247 0 : return GDK_FAIL;
1248 : }
1249 : break;
1250 : }
1251 : return GDK_SUCCEED;
1252 : }
1253 :
1254 : static log_return
1255 12277 : log_read_transaction(logger *lg, uint32_t *updated, BUN maxupdated)
1256 : {
1257 12277 : logformat l;
1258 12277 : trans *tr = NULL;
1259 12277 : log_return err = LOG_OK;
1260 12277 : bool ok = true;
1261 12277 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
1262 :
1263 12277 : (void) maxupdated; /* only used inside assert() */
1264 :
1265 12277 : if (!lg->flushing)
1266 214 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
1267 :
1268 12277 : BAT *cands = NULL; /* used in case of LOG_BAT_GROUP */
1269 :
1270 767753 : while (err == LOG_OK && (ok = log_read_format(lg, &l))) {
1271 755476 : if (l.flag == 0 && l.id == 0) {
1272 12277 : err = LOG_EOF;
1273 : break;
1274 : }
1275 :
1276 755476 : TRC_DEBUG_IF(WAL) {
1277 0 : if (l.flag > 0 && l.flag != LOG_CLEAR &&
1278 : l.flag < (bte) (sizeof(log_commands) / sizeof(log_commands[0])))
1279 0 : TRC_DEBUG_ENDIF(WAL, "%s %d", log_commands[(int) l.flag], l.id);
1280 : else
1281 0 : TRC_DEBUG_ENDIF(WAL, "%d %d", l.flag, l.id);
1282 : }
1283 755476 : switch (l.flag) {
1284 562900 : case LOG_UPDATE_CONST:
1285 : case LOG_UPDATE_BULK:
1286 : case LOG_UPDATE:
1287 : case LOG_CREATE:
1288 : case LOG_DESTROY:
1289 562900 : if (tr != NULL && updated && BAThash(lg->catalog_id) == GDK_SUCCEED) {
1290 561015 : BATiter cni = bat_iterator(lg->catalog_id);
1291 561015 : BUN p;
1292 561015 : BUN posnew = BUN_NONE;
1293 561015 : BUN posold = BUN_NONE;
1294 561015 : MT_rwlock_rdlock(&cni.b->thashlock);
1295 11153040 : HASHloop_int(cni, cni.b->thash, p, &l.id) {
1296 10339668 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
1297 10339668 : if (lid == lng_nil || lid > tr->tid)
1298 : posnew = p;
1299 4997183 : else if (lid == tr->tid)
1300 10521268 : posold = p;
1301 : }
1302 561015 : MT_rwlock_rdunlock(&cni.b->thashlock);
1303 561015 : bat_iterator_end(&cni);
1304 : /* Normally at this point, posnew is the
1305 : * location of the bat that this
1306 : * transaction is working on, and posold
1307 : * is the location of the previous
1308 : * version of the bat. If LOG_CREATE,
1309 : * both are relevant, since the latter
1310 : * is the new bat, and the former is the
1311 : * to-be-destroyed bat. For
1312 : * LOG_DESTROY, only posnew should be
1313 : * relevant, but for the other types, if
1314 : * the table is destroyed later in the
1315 : * same transaction, we need posold, and
1316 : * else (the normal case) we need
1317 : * posnew. */
1318 561015 : if (posnew != BUN_NONE) {
1319 551132 : assert(posnew < maxupdated);
1320 551132 : updated[posnew / 32] |= 1U << (posnew % 32);
1321 : }
1322 561015 : if ((l.flag == LOG_CREATE || posnew == BUN_NONE) && posold != BUN_NONE) {
1323 163881 : assert(posold < maxupdated);
1324 163881 : updated[posold / 32] |= 1U << (posold % 32);
1325 : }
1326 : }
1327 : break;
1328 : default:
1329 : /* do nothing */
1330 : break;
1331 : }
1332 : /* the functions we call here can succeed (LOG_OK),
1333 : * but they can also fail for two different reasons:
1334 : * they can run out of input (LOG_EOF -- this is not
1335 : * serious, we just abort the remaining transactions),
1336 : * or some malloc or BAT update fails (LOG_ERR -- this
1337 : * is serious, we must abort the complete process);
1338 : * the latter failure causes the current function to
1339 : * return GDK_FAIL */
1340 755476 : switch (l.flag) {
1341 56950 : case LOG_START:
1342 56950 : assert(!lg->flushing || l.id <= lg->tid);
1343 56950 : if (!lg->flushing && l.id > lg->tid)
1344 127 : lg->tid = l.id; /* should only happen during initialization */
1345 56950 : if ((tr = tr_create(tr, l.id)) == NULL) {
1346 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
1347 0 : err = LOG_ERR;
1348 0 : break;
1349 : }
1350 56950 : TRC_DEBUG(WAL, "tstart %d\n", tr->tid);
1351 : break;
1352 56950 : case LOG_END:
1353 56950 : if (tr == NULL)
1354 : err = LOG_EOF;
1355 56950 : else if (tr->tid != l.id) /* abort record */
1356 0 : tr = tr_abort(lg, tr);
1357 : else
1358 56950 : tr = tr_commit(lg, tr);
1359 : break;
1360 2780 : case LOG_SEQ:
1361 2780 : err = log_read_seq(lg, &l);
1362 2780 : break;
1363 395397 : case LOG_UPDATE_CONST:
1364 : case LOG_UPDATE_BULK:
1365 : case LOG_UPDATE:
1366 395397 : if (tr == NULL)
1367 : err = LOG_EOF;
1368 : else {
1369 621668 : err = log_read_updates(lg, tr, &l, l.id, cands ? &cands : NULL);
1370 : }
1371 : break;
1372 157572 : case LOG_CREATE:
1373 157572 : if (tr == NULL)
1374 : err = LOG_EOF;
1375 : else
1376 157572 : err = log_read_create(lg, tr, l.id);
1377 : break;
1378 9931 : case LOG_DESTROY:
1379 9931 : if (tr == NULL)
1380 : err = LOG_EOF;
1381 : else
1382 9931 : err = log_read_destroy(lg, tr, l.id);
1383 : break;
1384 75896 : case LOG_BAT_GROUP:
1385 75896 : if (tr == NULL)
1386 : err = LOG_EOF;
1387 : else {
1388 75896 : if (l.id > 0) {
1389 : /* START OF LOG_BAT_GROUP */
1390 37948 : cands = COLnew(0, TYPE_void, 0, SYSTRANS);
1391 37948 : if (!cands) {
1392 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
1393 0 : err = LOG_ERR;
1394 : }
1395 37948 : } else if (cands == NULL) {
1396 : /* should have gone through the
1397 : * above option earlier */
1398 0 : TRC_CRITICAL(GDK, "unexpected error\n");
1399 0 : err = LOG_ERR;
1400 : } else {
1401 : /* END OF LOG_BAT_GROUP */
1402 37948 : BBPunfix(cands->batCacheid);
1403 37948 : cands = NULL;
1404 : }
1405 : }
1406 : break;
1407 0 : default:
1408 0 : TRC_CRITICAL(GDK, "unrecognized log entry %d", l.flag);
1409 0 : err = LOG_ERR;
1410 : }
1411 755476 : if (tr == (trans *) -1) {
1412 : /* message already generated by tr_commit */
1413 : err = LOG_ERR;
1414 12277 : tr = NULL;
1415 : break;
1416 : }
1417 : }
1418 12277 : while (tr) {
1419 0 : TRC_WARNING(GDK, "aborting transaction\n");
1420 0 : tr = tr_abort(lg, tr);
1421 : }
1422 12277 : if (!lg->flushing)
1423 214 : ATOMIC_SET(&GDKdebug, dbg);
1424 :
1425 12277 : BBPreclaim(cands);
1426 12277 : if (!ok)
1427 12277 : return LOG_EOF;
1428 : return err;
1429 : }
1430 :
1431 : static gdk_return
1432 344 : log_readlog(logger *lg, const char *filename, bool *filemissing)
1433 : {
1434 344 : log_return err = LOG_OK;
1435 344 : time_t t0, t1;
1436 344 : struct stat sb;
1437 :
1438 344 : assert(!lg->inmemory);
1439 :
1440 344 : TRC_INFO(WAL, "opening %s\n", filename);
1441 :
1442 344 : gdk_return res = log_open_input(lg, filename, filemissing);
1443 344 : if (!lg->input_log || res != GDK_SUCCEED)
1444 : return res;
1445 214 : int fd;
1446 214 : if ((fd = getFileNo(lg->input_log)) < 0 || fstat(fd, &sb) < 0) {
1447 0 : GDKsyserror("fstat on opened file %s failed\n", filename);
1448 0 : log_close_input(lg);
1449 : /* If the file could be opened, but fstat fails,
1450 : * something weird is going on */
1451 0 : return GDK_FAIL;
1452 : }
1453 214 : t0 = time(NULL);
1454 214 : TRC_INFO_IF(WAL) {
1455 0 : TRC_INFO_ENDIF(WAL, "Start reading the write-ahead log '%s'\n", filename);
1456 0 : GDKtracer_flush_buffer();
1457 : }
1458 428 : while (err != LOG_EOF && err != LOG_ERR) {
1459 214 : t1 = time(NULL);
1460 214 : if (t1 - t0 > 10) {
1461 0 : lng fpos;
1462 0 : t0 = t1;
1463 : /* not more than once every 10 seconds */
1464 0 : fpos = (lng) getfilepos(getFile(lg->input_log));
1465 0 : TRC_INFO_IF(WAL) {
1466 0 : if (fpos >= 0) {
1467 0 : TRC_INFO_ENDIF(WAL, "still reading write-ahead log \"%s\" (%d%% done)\n",
1468 : filename, (int) ((fpos * 100 + 50) / sb.st_size));
1469 0 : GDKtracer_flush_buffer();
1470 : }
1471 : }
1472 : }
1473 214 : err = log_read_transaction(lg, NULL, 0);
1474 : }
1475 214 : log_close_input(lg);
1476 214 : lg->input_log = NULL;
1477 :
1478 : /* remaining transactions are not committed, ie abort */
1479 214 : TRC_INFO_IF(WAL) {
1480 0 : TRC_INFO_ENDIF(WAL, "Finished reading the write-ahead log '%s'\n", filename);
1481 0 : GDKtracer_flush_buffer();
1482 : }
1483 : /* we cannot distinguish errors from incomplete transactions
1484 : * (even if we would log aborts in the logs). So we simply
1485 : * abort and move to the next log file */
1486 214 : return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
1487 : }
1488 :
1489 : /*
1490 : * The log files are incrementally numbered, starting from 2. They are
1491 : * processed in the same sequence.
1492 : */
1493 : static gdk_return
1494 125 : log_readlogs(logger *lg, const char *filename)
1495 : {
1496 125 : gdk_return res = GDK_SUCCEED;
1497 :
1498 125 : assert(!lg->inmemory);
1499 125 : TRC_DEBUG(WAL, "logger id is " LLFMT " last logger id is " LLFMT "\n", lg->id, lg->saved_id);
1500 :
1501 125 : char log_filename[FILENAME_MAX];
1502 125 : if (lg->saved_id >= lg->id) {
1503 125 : bool filemissing = false;
1504 :
1505 125 : lg->id = lg->saved_id + 1;
1506 469 : while (res == GDK_SUCCEED && !filemissing) {
1507 344 : if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
1508 0 : GDKerror("Logger filename path is too large\n");
1509 0 : return GDK_FAIL;
1510 : }
1511 344 : res = log_readlog(lg, log_filename, &filemissing);
1512 344 : if (!filemissing) {
1513 219 : lg->saved_id++;
1514 219 : lg->id++;
1515 : }
1516 : }
1517 : }
1518 : return res;
1519 : }
1520 :
1521 : static gdk_return
1522 11563 : log_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1523 : {
1524 11563 : TRC_DEBUG(WAL, "commit");
1525 :
1526 11563 : return bm_commit(lg, pending, updated, maxupdated);
1527 : }
1528 :
1529 : static gdk_return
1530 125 : check_version(logger *lg, FILE *fp, bool *needsnew)
1531 : {
1532 125 : int version = 0;
1533 :
1534 125 : assert(!lg->inmemory);
1535 125 : if (fscanf(fp, "%6d", &version) != 1) {
1536 0 : GDKerror("Could not read the version number from the file '%s/log'.\n", lg->dir);
1537 0 : fclose(fp);
1538 0 : return GDK_FAIL;
1539 : }
1540 125 : if (version != lg->version) {
1541 32 : if (lg->prefuncp == NULL ||
1542 16 : (*lg->prefuncp) (lg->funcdata, version, lg->version) != GDK_SUCCEED) {
1543 0 : GDKerror("Incompatible database version %06d, "
1544 : "this server supports version %06d.\n%s",
1545 : version, lg->version,
1546 : version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
1547 0 : fclose(fp);
1548 0 : return GDK_FAIL;
1549 : }
1550 16 : *needsnew = true; /* we need to write a new log file */
1551 : } else {
1552 109 : lg->postfuncp = NULL; /* don't call */
1553 109 : *needsnew = false; /* log file already up-to-date */
1554 : }
1555 250 : if (fgetc(fp) != '\n' || /* skip \n */
1556 125 : fgetc(fp) != '\n') { /* skip \n */
1557 0 : GDKerror("Badly formatted log file");
1558 0 : fclose(fp);
1559 0 : return GDK_FAIL;
1560 : }
1561 125 : if (log_read_types_file(lg, fp, version, needsnew) != GDK_SUCCEED) {
1562 0 : fclose(fp);
1563 0 : return GDK_FAIL;
1564 : }
1565 125 : fclose(fp);
1566 125 : return GDK_SUCCEED;
1567 : }
1568 :
1569 : static BAT *
1570 353 : bm_tids(BAT *b, BAT *d)
1571 : {
1572 353 : BUN sz = BATcount(b);
1573 353 : BAT *tids = BATdense(0, 0, sz);
1574 :
1575 353 : if (tids == NULL)
1576 : return NULL;
1577 :
1578 353 : if (BATcount(d)) {
1579 353 : BAT *diff = BATdiff(tids, d, NULL, NULL, false, false, BUN_NONE);
1580 353 : logbat_destroy(tids);
1581 353 : tids = diff;
1582 : }
1583 : return tids;
1584 : }
1585 :
1586 :
1587 : static gdk_return
1588 7003 : log_switch_bat(BAT *old, BAT *new, const char *fn, const char *name)
1589 : {
1590 7003 : char bak[IDLENGTH];
1591 :
1592 7003 : if (BATmode(old, true) != GDK_SUCCEED) {
1593 0 : GDKerror("cannot convert old %s to transient", name);
1594 0 : return GDK_FAIL;
1595 : }
1596 7003 : if (strconcat_len(bak, sizeof(bak), fn, "_", name, NULL) >= sizeof(bak)) {
1597 0 : GDKerror("name %s_%s too long\n", fn, name);
1598 0 : return GDK_FAIL;
1599 : }
1600 7003 : if (BBPrename(old, NULL) != 0 || BBPrename(new, bak) != 0) {
1601 0 : GDKerror("rename (%s) failed\n", bak);
1602 0 : return GDK_FAIL;
1603 : }
1604 7003 : BBPretain(new->batCacheid);
1605 7003 : return GDK_SUCCEED;
1606 : }
1607 :
1608 : static gdk_return
1609 352 : bm_get_counts(logger *lg)
1610 : {
1611 352 : BUN p, q;
1612 352 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
1613 :
1614 32666 : BATloop(lg->catalog_bid, p, q) {
1615 32314 : oid pos = p;
1616 32314 : lng cnt = 0;
1617 32314 : lng lid = lng_nil;
1618 :
1619 32314 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
1620 32314 : BAT *b = BBPquickdesc(bids[p]);
1621 32314 : assert(b);
1622 32314 : cnt = BATcount(b);
1623 : } else {
1624 0 : lid = BBP_desc(bids[p])->batCacheid != 0 && log_find(lg->catalog_bid, lg->dcatalog, bids[p]) == BUN_NONE ? 1 : -1;
1625 : }
1626 32314 : if (BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED)
1627 0 : return GDK_FAIL;
1628 32314 : if (BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
1629 : return GDK_FAIL;
1630 : }
1631 : return GDK_SUCCEED;
1632 : }
1633 :
1634 : static int
1635 7003 : subcommit_list_add(int next, bat *n, BUN *sizes, bat bid, BUN sz)
1636 : {
1637 7003 : assert(sz <= BBP_desc(bid)->batCount || sz == BUN_NONE);
1638 1630439 : for (int i = 0; i < next; i++) {
1639 1623436 : if (n[i] == bid) {
1640 0 : sizes[i] = sz;
1641 0 : return next;
1642 : }
1643 : }
1644 7003 : n[next] = bid;
1645 7003 : sizes[next++] = sz;
1646 7003 : return next;
1647 : }
1648 :
1649 : static int
1650 2099 : cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng *cnts,
1651 : BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, BUN cleanup)
1652 : {
1653 2099 : BAT *nbids, *noids, *ncnts, *nlids, *ndels;
1654 2099 : BUN p, q;
1655 2099 : int err = 0, rcnt = 0;
1656 :
1657 2099 : BUN ocnt = BATcount(catalog_bid);
1658 2099 : nbids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1659 2099 : noids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1660 2099 : ncnts = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1661 2099 : nlids = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1662 2099 : ndels = logbat_new(TYPE_oid, BATcount(dcatalog) - cleanup, PERSISTENT);
1663 :
1664 2099 : if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == NULL || ndels == NULL) {
1665 0 : logbat_destroy(nbids);
1666 0 : logbat_destroy(noids);
1667 0 : logbat_destroy(ncnts);
1668 0 : logbat_destroy(nlids);
1669 0 : logbat_destroy(ndels);
1670 0 : return 0;
1671 : }
1672 :
1673 2099 : oid *poss = Tloc(dcatalog, 0);
1674 179516 : BATloop(dcatalog, p, q) {
1675 177417 : oid pos = poss[p];
1676 :
1677 177417 : if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid)
1678 0 : continue;
1679 :
1680 177417 : if (lids[pos] >= 0) {
1681 177417 : bat bid = bids[pos];
1682 177417 : BAT *lb = BBP_desc(bid);
1683 :
1684 177417 : if (lb->batCacheid == 0 || BATmode(lb, true /*transient */ ) != GDK_SUCCEED) {
1685 0 : GDKwarning("Failed to set bat(%d) transient\n", bid);
1686 : } else {
1687 177417 : lids[pos] = -1; /* mark as transient */
1688 177417 : r[rcnt++] = bid;
1689 : }
1690 : }
1691 : }
1692 :
1693 2099 : int *oids = (int *) Tloc(catalog_id, 0);
1694 2099 : q = BATcount(catalog_bid);
1695 741427 : for (p = 0; p < q && !err; p++) {
1696 739328 : bat col = bids[p];
1697 739328 : int nid = oids[p];
1698 739328 : lng lid = lids[p];
1699 739328 : lng cnt = cnts[p];
1700 739328 : oid pos = p;
1701 :
1702 : /* only project out the deleted with lid == -1
1703 : * update dcatalog */
1704 739328 : if (lid == -1)
1705 177417 : continue; /* remove */
1706 :
1707 1123822 : if (BUNappend(nbids, &col, true) != GDK_SUCCEED ||
1708 1123822 : BUNappend(noids, &nid, true) != GDK_SUCCEED ||
1709 1123822 : BUNappend(nlids, &lid, false) != GDK_SUCCEED ||
1710 561911 : BUNappend(ncnts, &cnt, false) != GDK_SUCCEED)
1711 : err = 1;
1712 561911 : if (BUNfnd(lg->dcatalog, &pos) != BUN_NONE) {
1713 0 : pos = (oid) (BATcount(nbids) - 1);
1714 0 : if (BUNappend(ndels, &pos, true) != GDK_SUCCEED)
1715 561911 : err = 1;
1716 : }
1717 : }
1718 :
1719 2099 : if (err) {
1720 0 : logbat_destroy(nbids);
1721 0 : logbat_destroy(noids);
1722 0 : logbat_destroy(ndels);
1723 0 : logbat_destroy(ncnts);
1724 0 : logbat_destroy(nlids);
1725 0 : return 0;
1726 : }
1727 : /* point of no return */
1728 4198 : if (log_switch_bat(catalog_bid, nbids, lg->fn, "catalog_bid") != GDK_SUCCEED ||
1729 4198 : log_switch_bat(catalog_id, noids, lg->fn, "catalog_id") != GDK_SUCCEED ||
1730 2099 : log_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != GDK_SUCCEED) {
1731 0 : logbat_destroy(nbids);
1732 0 : logbat_destroy(noids);
1733 0 : logbat_destroy(ndels);
1734 0 : logbat_destroy(ncnts);
1735 0 : logbat_destroy(nlids);
1736 0 : return -1;
1737 : }
1738 2099 : r[rcnt++] = lg->catalog_bid->batCacheid;
1739 2099 : r[rcnt++] = lg->catalog_id->batCacheid;
1740 2099 : r[rcnt++] = lg->dcatalog->batCacheid;
1741 :
1742 2099 : assert(BATcount(lg->dcatalog) - cleanup == BATcount(ndels));
1743 :
1744 2099 : logbat_destroy(lg->catalog_bid);
1745 2099 : logbat_destroy(lg->catalog_id);
1746 2099 : logbat_destroy(lg->dcatalog);
1747 :
1748 2099 : lg->catalog_bid = nbids;
1749 2099 : lg->catalog_id = noids;
1750 2099 : lg->dcatalog = ndels;
1751 :
1752 : /* failing to rename these two bats is not fatal */
1753 2099 : if (BBPrename(lg->catalog_cnt, NULL) != GDK_SUCCEED)
1754 2099 : GDKclrerr();
1755 2099 : if (BBPrename(lg->catalog_lid, NULL) != GDK_SUCCEED)
1756 2099 : GDKclrerr();
1757 2099 : BBPunfix(lg->catalog_cnt->batCacheid);
1758 2099 : BBPunfix(lg->catalog_lid->batCacheid);
1759 :
1760 2099 : lg->catalog_cnt = ncnts;
1761 2099 : lg->catalog_lid = nlids;
1762 2099 : char bak[FILENAME_MAX];
1763 2099 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_cnt", NULL);
1764 2099 : if (BBPrename(lg->catalog_cnt, bak) < 0)
1765 0 : GDKclrerr();
1766 2099 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_lid", NULL);
1767 2099 : if (BBPrename(lg->catalog_lid, bak) < 0)
1768 0 : GDKclrerr();
1769 2099 : rotation_lock(lg);
1770 9538 : for (logged_range *p = lg->pending; p; p = p->next) {
1771 7439 : p->cnt -= cleanup;
1772 : }
1773 2099 : rotation_unlock(lg);
1774 2099 : return rcnt;
1775 : }
1776 :
1777 : /* this function is called with log_lock() held; it releases the lock
1778 : * before returning */
1779 : static gdk_return
1780 12017 : bm_subcommit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1781 : {
1782 12017 : BUN cnt = pending ? pending->cnt : BATcount(lg->catalog_bid);
1783 12017 : BUN dcnt = BATcount(lg->dcatalog);
1784 12017 : BUN p, q;
1785 12017 : BAT *catalog_bid = lg->catalog_bid;
1786 12017 : BAT *catalog_id = lg->catalog_id;
1787 12017 : BAT *dcatalog = lg->dcatalog;
1788 12017 : BUN nn = 13 + cnt;
1789 12017 : bat *n = GDKmalloc(sizeof(bat) * nn);
1790 12017 : bat *r = GDKmalloc(sizeof(bat) * nn);
1791 12017 : BUN *sizes = GDKmalloc(sizeof(BUN) * nn);
1792 12017 : int i = 0, rcnt = 0;
1793 12017 : gdk_return res;
1794 12017 : const log_bid *bids;
1795 12017 : lng *cnts = NULL, *lids = NULL;
1796 12017 : BUN cleanup = 0;
1797 12017 : lng t0 = 0;
1798 :
1799 12017 : if (n == NULL || r == NULL || sizes == NULL) {
1800 0 : GDKfree(n);
1801 0 : GDKfree(r);
1802 0 : GDKfree(sizes);
1803 0 : log_unlock(lg);
1804 0 : return GDK_FAIL;
1805 : }
1806 :
1807 12017 : sizes[i] = 0;
1808 12017 : n[i++] = 0; /* n[0] is not used */
1809 12017 : bids = (const log_bid *) Tloc(catalog_bid, 0);
1810 12017 : if (lg->catalog_cnt)
1811 11790 : cnts = (lng *) Tloc(lg->catalog_cnt, 0);
1812 12017 : if (lg->catalog_lid)
1813 11790 : lids = (lng *) Tloc(lg->catalog_lid, 0);
1814 3504146 : BATloop(catalog_bid, p, q) {
1815 3492129 : if (lids && lids[p] != lng_nil && lids[p] <= lg->saved_tid) {
1816 177417 : cleanup++;
1817 177417 : if (lids[p] == -1)
1818 0 : continue;
1819 354802 : if (BUNfnd(dcatalog, &(oid){p}) == BUN_NONE &&
1820 177385 : BUNappend(dcatalog, &(oid){p}, true) != GDK_SUCCEED) {
1821 0 : while (BATcount(dcatalog) > dcnt) {
1822 0 : if (BUNdelete(dcatalog, BATcount(dcatalog) - 1) != GDK_SUCCEED) {
1823 0 : TRC_CRITICAL(WAL, "delete after failed append failed\n");
1824 0 : break;
1825 : }
1826 : }
1827 0 : GDKfree(n);
1828 0 : GDKfree(r);
1829 0 : GDKfree(sizes);
1830 0 : log_unlock(lg);
1831 0 : return GDK_FAIL;
1832 : }
1833 : }
1834 3492129 : if (updated && p < maxupdated && (updated[p / 32] & (1U << (p % 32))) == 0) {
1835 1674260 : continue;
1836 : }
1837 1817869 : bat col = bids[p];
1838 :
1839 1817869 : TRC_DEBUG(WAL, "new %s (%d)\n", BBP_logical(col), col);
1840 1817869 : assert(col);
1841 1817869 : sizes[i] = cnts ? (BUN) cnts[p] : 0;
1842 1817869 : n[i++] = col;
1843 : }
1844 : /* now commit catalog, so it's also up to date on disk */
1845 12017 : sizes[i] = cnt;
1846 12017 : n[i++] = catalog_bid->batCacheid;
1847 12017 : sizes[i] = cnt;
1848 12017 : n[i++] = catalog_id->batCacheid;
1849 12017 : sizes[i] = BATcount(dcatalog);
1850 12017 : n[i++] = dcatalog->batCacheid;
1851 :
1852 12017 : if (cleanup) {
1853 2099 : if ((rcnt = cleanup_and_swap(lg, r, bids, lids, cnts,
1854 : catalog_bid, catalog_id, dcatalog,
1855 : cleanup)) < 0) {
1856 0 : GDKfree(n);
1857 0 : GDKfree(r);
1858 0 : GDKfree(sizes);
1859 0 : log_unlock(lg);
1860 0 : return GDK_FAIL;
1861 : }
1862 2099 : cnt -= cleanup;
1863 : }
1864 12017 : if (dcatalog != lg->dcatalog) {
1865 2099 : i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, cnt);
1866 2099 : i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, cnt);
1867 2099 : i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog));
1868 : }
1869 12017 : if (lg->seqs_id) {
1870 11790 : sizes[i] = BATcount(lg->seqs_id);
1871 11790 : n[i++] = lg->seqs_id->batCacheid;
1872 11790 : sizes[i] = BATcount(lg->seqs_id);
1873 11790 : n[i++] = lg->seqs_val->batCacheid;
1874 : }
1875 12017 : if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id) / 2) && BATcount(lg->dseqs) > 10) {
1876 353 : BAT *tids, *ids, *vals;
1877 :
1878 353 : tids = bm_tids(lg->seqs_id, lg->dseqs);
1879 353 : if (tids == NULL) {
1880 0 : GDKfree(n);
1881 0 : GDKfree(r);
1882 0 : GDKfree(sizes);
1883 0 : log_unlock(lg);
1884 0 : return GDK_FAIL;
1885 : }
1886 353 : ids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
1887 353 : vals = logbat_new(TYPE_lng, BATcount(tids), PERSISTENT);
1888 :
1889 353 : if (ids == NULL || vals == NULL) {
1890 0 : logbat_destroy(tids);
1891 0 : logbat_destroy(ids);
1892 0 : logbat_destroy(vals);
1893 0 : GDKfree(n);
1894 0 : GDKfree(r);
1895 0 : GDKfree(sizes);
1896 0 : log_unlock(lg);
1897 0 : return GDK_FAIL;
1898 : }
1899 :
1900 706 : if (BATappend(ids, lg->seqs_id, tids, true) != GDK_SUCCEED ||
1901 353 : BATappend(vals, lg->seqs_val, tids, true) != GDK_SUCCEED) {
1902 0 : logbat_destroy(tids);
1903 0 : logbat_destroy(ids);
1904 0 : logbat_destroy(vals);
1905 0 : GDKfree(n);
1906 0 : GDKfree(r);
1907 0 : GDKfree(sizes);
1908 0 : log_unlock(lg);
1909 0 : return GDK_FAIL;
1910 : }
1911 353 : logbat_destroy(tids);
1912 353 : BATclear(lg->dseqs, true);
1913 :
1914 706 : if (log_switch_bat(lg->seqs_id, ids, lg->fn, "seqs_id") != GDK_SUCCEED ||
1915 353 : log_switch_bat(lg->seqs_val, vals, lg->fn, "seqs_val") != GDK_SUCCEED) {
1916 0 : logbat_destroy(ids);
1917 0 : logbat_destroy(vals);
1918 0 : GDKfree(n);
1919 0 : GDKfree(r);
1920 0 : GDKfree(sizes);
1921 0 : log_unlock(lg);
1922 0 : return GDK_FAIL;
1923 : }
1924 353 : i = subcommit_list_add(i, n, sizes, ids->batCacheid, BATcount(ids));
1925 353 : i = subcommit_list_add(i, n, sizes, vals->batCacheid, BATcount(ids));
1926 :
1927 353 : if (BBP_lrefs(lg->seqs_id->batCacheid) > 0)
1928 279 : r[rcnt++] = lg->seqs_id->batCacheid;
1929 353 : if (BBP_lrefs(lg->seqs_val->batCacheid) > 0)
1930 279 : r[rcnt++] = lg->seqs_val->batCacheid;
1931 :
1932 353 : logbat_destroy(lg->seqs_id);
1933 353 : logbat_destroy(lg->seqs_val);
1934 :
1935 353 : lg->seqs_id = ids;
1936 353 : lg->seqs_val = vals;
1937 : }
1938 12017 : if (lg->seqs_id) {
1939 11790 : sizes[i] = BATcount(lg->dseqs);
1940 11790 : n[i++] = lg->dseqs->batCacheid;
1941 : }
1942 :
1943 12017 : assert((BUN) i <= nn);
1944 12017 : log_unlock(lg);
1945 12017 : TRC_DEBUG_IF(WAL)
1946 0 : t0 = GDKusec();
1947 12244 : res = TMsubcommit_list(n, cnts ? sizes : NULL, i, lg->saved_id);
1948 12017 : TRC_DEBUG(WAL, "subcommit " LLFMT "usec\n", GDKusec() - t0);
1949 12017 : if (res == GDK_SUCCEED) { /* now cleanup */
1950 196289 : for (i = 0; i < rcnt; i++) {
1951 184272 : TRC_DEBUG_IF(WAL) {
1952 0 : TRC_DEBUG_ENDIF(WAL, "release %d\n", r[i]);
1953 0 : if (BBP_lrefs(r[i]) != 2)
1954 0 : TRC_DEBUG_ENDIF(WAL, "release %d %d\n", r[i], BBP_lrefs(r[i]));
1955 : }
1956 184272 : BBPrelease(r[i]);
1957 : }
1958 : }
1959 12017 : GDKfree(n);
1960 12017 : GDKfree(r);
1961 12017 : GDKfree(sizes);
1962 12017 : if (res != GDK_SUCCEED)
1963 0 : TRC_CRITICAL(GDK, "commit failed\n");
1964 : return res;
1965 : }
1966 :
1967 : static gdk_return
1968 351 : log_filename(logger *lg, char bak[FILENAME_MAX], char filename[FILENAME_MAX])
1969 : {
1970 351 : if (GDKfilepath(filename, FILENAME_MAX, 0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED) {
1971 0 : GDKerror("Logger filename path is too large\n");
1972 0 : return GDK_FAIL;
1973 : }
1974 351 : if (bak) {
1975 351 : if (strconcat_len(bak, FILENAME_MAX, filename, ".bak", NULL) >= FILENAME_MAX) {
1976 0 : GDKerror("Logger filename path is too large\n");
1977 0 : return GDK_FAIL;
1978 : }
1979 : }
1980 : return GDK_SUCCEED;
1981 : }
1982 :
1983 : static gdk_return
1984 12282 : log_cleanup(logger *lg, lng id)
1985 : {
1986 12282 : char log_id[FILENAME_MAX];
1987 :
1988 12282 : if (snprintf(log_id, sizeof(log_id), LLFMT, id) >= FILENAME_MAX) {
1989 0 : GDKerror("log_id filename is too large\n");
1990 0 : return GDK_FAIL;
1991 : }
1992 12282 : if (GDKunlink(0, lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
1993 0 : GDKwarning("failed to remove old WAL %s.%s\n", LOGFILE, log_id);
1994 0 : GDKclrerr(); /* clear error from unlink */
1995 : }
1996 : return GDK_SUCCEED;
1997 : }
1998 :
1999 : #ifdef GDKLIBRARY_JSON
2000 : static gdk_return
2001 352 : log_json_upgrade_finalize(void)
2002 : {
2003 352 : int json_tpe = ATOMindex("json");
2004 703 : if (!GDKinmemory(0) &&
2005 351 : GDKunlink(0, BATDIR, "jsonupgradeneeded", NULL) == GDK_FAIL) {
2006 0 : TRC_CRITICAL(GDK, "Failed to remove json upgrade signal file");
2007 0 : return GDK_FAIL;
2008 : }
2009 352 : BATatoms[json_tpe].atomRead = (void *(*)(void *, size_t *, stream *, size_t))strRead;
2010 :
2011 352 : return GDK_SUCCEED;
2012 : }
2013 : #endif
2014 :
2015 : /* Load data from the logger logdir
2016 : * Initialize new directories and catalog files if none are present,
2017 : * unless running in read-only mode
2018 : * Load data and persist it in the BATs */
2019 : static gdk_return
2020 352 : log_load(const char *fn, logger *lg, char filename[FILENAME_MAX])
2021 : {
2022 352 : FILE *fp = NULL;
2023 352 : char bak[FILENAME_MAX];
2024 352 : bat catalog_bid, catalog_id, dcatalog;
2025 352 : bool needcommit = false;
2026 352 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
2027 352 : bool readlogs = false;
2028 352 : bool needsnew = false; /* need to write new log file? */
2029 :
2030 : /* refactor */
2031 352 : if (!LOG_DISABLED(lg)) {
2032 351 : if (log_filename(lg, bak, filename) != GDK_SUCCEED)
2033 0 : goto error;
2034 : }
2035 :
2036 352 : lg->catalog_bid = NULL;
2037 352 : lg->catalog_id = NULL;
2038 352 : lg->catalog_cnt = NULL;
2039 352 : lg->catalog_lid = NULL;
2040 352 : lg->dcatalog = NULL;
2041 :
2042 352 : lg->seqs_id = NULL;
2043 352 : lg->seqs_val = NULL;
2044 352 : lg->dseqs = NULL;
2045 :
2046 352 : if (!LOG_DISABLED(lg)) {
2047 : /* try to open logfile backup, or failing that, the file
2048 : * itself. we need to know whether this file exists when
2049 : * checking the database consistency later on */
2050 351 : if ((fp = MT_fopen(bak, "r")) != NULL) {
2051 0 : fclose(fp);
2052 0 : fp = NULL;
2053 0 : if (GDKunlink(0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED ||
2054 0 : GDKmove(0, lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL, true) != GDK_SUCCEED)
2055 0 : goto error;
2056 351 : } else if (errno != ENOENT) {
2057 0 : GDKsyserror("open %s failed", bak);
2058 0 : goto error;
2059 : }
2060 351 : fp = MT_fopen(filename, "r");
2061 351 : if (fp == NULL && errno != ENOENT) {
2062 0 : GDKsyserror("open %s failed", filename);
2063 0 : goto error;
2064 : }
2065 : }
2066 :
2067 352 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2068 352 : catalog_bid = BBPindex(bak);
2069 :
2070 : /* initialize arrays for type mapping, to be read from disk */
2071 352 : memset(lg->type_id, -1, sizeof(lg->type_id));
2072 352 : memset(lg->type_nr, 255, sizeof(lg->type_nr));
2073 :
2074 : /* this is intentional - if catalog_bid is 0, force it to find
2075 : * the persistent catalog */
2076 352 : if (catalog_bid == 0) {
2077 : /* catalog does not exist, so the log file also
2078 : * shouldn't exist */
2079 227 : if (fp != NULL) {
2080 0 : GDKerror("there is no logger catalog, "
2081 : "but there is a log file.\n");
2082 0 : goto error;
2083 : }
2084 :
2085 227 : lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2086 227 : lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2087 227 : lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
2088 :
2089 227 : if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
2090 0 : GDKerror("cannot create catalog bats");
2091 0 : goto error;
2092 : }
2093 227 : TRC_INFO(WAL, "create %s catalog\n", fn);
2094 :
2095 : /* give the catalog bats names so we can find them
2096 : * next time */
2097 227 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2098 227 : if (BBPrename(lg->catalog_bid, bak) < 0) {
2099 0 : goto error;
2100 : }
2101 :
2102 227 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2103 227 : if (BBPrename(lg->catalog_id, bak) < 0) {
2104 0 : goto error;
2105 : }
2106 :
2107 227 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2108 227 : if (BBPrename(lg->dcatalog, bak) < 0) {
2109 0 : goto error;
2110 : }
2111 :
2112 227 : if (!LOG_DISABLED(lg)) {
2113 226 : if (GDKcreatedir(filename) != GDK_SUCCEED) {
2114 0 : GDKerror("cannot create directory for log file %s\n", filename);
2115 0 : goto error;
2116 : }
2117 226 : if (log_create_types_file(lg, filename) != GDK_SUCCEED)
2118 0 : goto error;
2119 : }
2120 :
2121 227 : BBPretain(lg->catalog_bid->batCacheid);
2122 227 : BBPretain(lg->catalog_id->batCacheid);
2123 227 : BBPretain(lg->dcatalog->batCacheid);
2124 :
2125 227 : log_lock(lg);
2126 : /* bm_subcommit releases the lock */
2127 227 : if (bm_subcommit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2128 : /* cannot commit catalog, so remove log */
2129 0 : if (MT_remove(filename) < 0)
2130 0 : GDKsyserror("remove %s failed\n", filename);
2131 0 : BBPrelease(lg->catalog_bid->batCacheid);
2132 0 : BBPrelease(lg->catalog_id->batCacheid);
2133 0 : BBPrelease(lg->dcatalog->batCacheid);
2134 0 : goto error;
2135 : }
2136 : } else {
2137 : /* find the persistent catalog. As non persistent bats
2138 : * require a logical reference we also add a logical
2139 : * reference for the persistent bats */
2140 125 : BUN p, q;
2141 125 : BAT *b, *o, *d;
2142 :
2143 125 : assert(!lg->inmemory);
2144 :
2145 : /* the catalog exists, and so should the log file */
2146 125 : if (fp == NULL && !LOG_DISABLED(lg)) {
2147 0 : GDKerror("There is a logger catalog, but no log file.\n");
2148 0 : goto error;
2149 : }
2150 : if (fp != NULL) {
2151 : /* check_version always closes fp */
2152 125 : if (check_version(lg, fp, &needsnew) != GDK_SUCCEED) {
2153 0 : fp = NULL;
2154 0 : goto error;
2155 : }
2156 : readlogs = true;
2157 : fp = NULL;
2158 : }
2159 :
2160 125 : if (lg->catalog_bid == NULL && lg->catalog_id == NULL && lg->dcatalog == NULL) {
2161 125 : b = BATdescriptor(catalog_bid);
2162 125 : if (b == NULL) {
2163 0 : GDKerror("inconsistent database, catalog does not exist");
2164 0 : goto error;
2165 : }
2166 :
2167 125 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2168 125 : catalog_id = BBPindex(bak);
2169 125 : o = BATdescriptor(catalog_id);
2170 125 : if (o == NULL) {
2171 0 : BBPunfix(b->batCacheid);
2172 0 : GDKerror("inconsistent database, catalog_id does not exist");
2173 0 : goto error;
2174 : }
2175 :
2176 125 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2177 125 : dcatalog = BBPindex(bak);
2178 125 : d = BATdescriptor(dcatalog);
2179 125 : if (d == NULL) {
2180 0 : GDKerror("cannot create dcatalog bat");
2181 0 : BBPunfix(b->batCacheid);
2182 0 : BBPunfix(o->batCacheid);
2183 0 : goto error;
2184 : }
2185 :
2186 125 : lg->catalog_bid = b;
2187 125 : lg->catalog_id = o;
2188 125 : lg->dcatalog = d;
2189 125 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
2190 32439 : BATloop(lg->catalog_bid, p, q) {
2191 32314 : bat bid = bids[p];
2192 32314 : oid pos = p;
2193 :
2194 32314 : if (BBPretain(bid) == 0 && /* any bid in the catalog_bid, needs one logical ref */
2195 0 : BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
2196 0 : BUNappend(lg->dcatalog, &pos, true) != GDK_SUCCEED)
2197 0 : goto error;
2198 : }
2199 : }
2200 125 : if ((lg->catalog_bid = BATsetaccess(lg->catalog_bid, BAT_READ)) == NULL ||
2201 125 : (lg->catalog_id = BATsetaccess(lg->catalog_id, BAT_READ)) == NULL ||
2202 125 : (lg->dcatalog = BATsetaccess(lg->dcatalog, BAT_READ)) == NULL) {
2203 0 : goto error;
2204 : }
2205 125 : BBPretain(lg->catalog_bid->batCacheid);
2206 125 : BBPretain(lg->catalog_id->batCacheid);
2207 125 : BBPretain(lg->dcatalog->batCacheid);
2208 : }
2209 : /* failing to rename the catalog_cnt and catalog_lid bats is not
2210 : * fatal */
2211 352 : lg->catalog_cnt = logbat_new(TYPE_lng, 1, SYSTRANS);
2212 352 : if (lg->catalog_cnt == NULL) {
2213 0 : GDKerror("failed to create catalog_cnt bat");
2214 0 : goto error;
2215 : }
2216 352 : strconcat_len(bak, sizeof(bak), fn, "_catalog_cnt", NULL);
2217 352 : if (BBPrename(lg->catalog_cnt, bak) < 0)
2218 0 : GDKclrerr();
2219 352 : lg->catalog_lid = logbat_new(TYPE_lng, 1, SYSTRANS);
2220 352 : if (lg->catalog_lid == NULL) {
2221 0 : GDKerror("failed to create catalog_lid bat");
2222 0 : goto error;
2223 : }
2224 352 : strconcat_len(bak, sizeof(bak), fn, "_catalog_lid", NULL);
2225 352 : if (BBPrename(lg->catalog_lid, bak) < 0)
2226 0 : GDKclrerr();
2227 352 : if (bm_get_counts(lg) != GDK_SUCCEED)
2228 0 : goto error;
2229 :
2230 352 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2231 352 : if (BBPindex(bak)) {
2232 125 : lg->seqs_id = BATdescriptor(BBPindex(bak));
2233 125 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2234 125 : lg->seqs_val = BATdescriptor(BBPindex(bak));
2235 125 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2236 125 : lg->dseqs = BATdescriptor(BBPindex(bak));
2237 125 : if (lg->seqs_id == NULL ||
2238 125 : lg->seqs_val == NULL ||
2239 : lg->dseqs == NULL) {
2240 0 : GDKerror("Logger_new: cannot load seqs bats");
2241 0 : goto error;
2242 : }
2243 125 : if ((lg->seqs_val = BATsetaccess(lg->seqs_val, BAT_READ)) == NULL ||
2244 125 : (lg->seqs_id = BATsetaccess(lg->seqs_id, BAT_READ)) == NULL ||
2245 125 : (lg->dseqs = BATsetaccess(lg->dseqs, BAT_READ)) == NULL) {
2246 0 : goto error;
2247 : }
2248 : } else {
2249 227 : lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
2250 227 : lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
2251 227 : lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
2252 227 : if (lg->seqs_id == NULL ||
2253 227 : lg->seqs_val == NULL ||
2254 : lg->dseqs == NULL) {
2255 0 : GDKerror("Logger_new: cannot create seqs bats");
2256 0 : goto error;
2257 : }
2258 :
2259 227 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2260 227 : if (BBPrename(lg->seqs_id, bak) < 0) {
2261 0 : goto error;
2262 : }
2263 :
2264 227 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2265 227 : if (BBPrename(lg->seqs_val, bak) < 0) {
2266 0 : goto error;
2267 : }
2268 :
2269 227 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2270 227 : if (BBPrename(lg->dseqs, bak) < 0) {
2271 0 : goto error;
2272 : }
2273 : needcommit = true;
2274 : }
2275 352 : dbg = ATOMIC_GET(&GDKdebug);
2276 352 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2277 352 : if (needcommit && bm_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2278 0 : GDKerror("Logger_new: commit failed");
2279 0 : goto error;
2280 : }
2281 352 : ATOMIC_SET(&GDKdebug, dbg);
2282 :
2283 352 : if (readlogs) {
2284 125 : ulng log_id = lg->saved_id + 1;
2285 125 : bool earlyexit = GDKgetenv_isyes("process-wal-and-exit");
2286 125 : if (log_readlogs(lg, filename) != GDK_SUCCEED) {
2287 0 : goto error;
2288 : }
2289 125 : if (!earlyexit) {
2290 125 : if (lg->postfuncp && (*lg->postfuncp) (lg->funcdata, lg) != GDK_SUCCEED)
2291 0 : goto error;
2292 125 : if (needsnew) {
2293 16 : if (GDKmove(0, lg->dir, LOGFILE, NULL, lg->dir, LOGFILE, "bak", true) != GDK_SUCCEED) {
2294 0 : TRC_CRITICAL(GDK, "couldn't move log to log.bak\n");
2295 0 : return GDK_FAIL;
2296 : }
2297 16 : if (log_create_types_file(lg, filename) != GDK_SUCCEED) {
2298 0 : TRC_CRITICAL(GDK, "couldn't write new log\n");
2299 0 : return GDK_FAIL;
2300 : }
2301 : }
2302 : }
2303 125 : dbg = ATOMIC_GET(&GDKdebug);
2304 125 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2305 125 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2306 0 : goto error;
2307 : }
2308 125 : ATOMIC_SET(&GDKdebug, dbg);
2309 344 : for (; log_id <= lg->saved_id; log_id++)
2310 219 : (void) log_cleanup(lg, log_id); /* ignore error of removing file */
2311 125 : if (earlyexit) {
2312 0 : printf("# mserver5 exiting\n");
2313 0 : exit(0);
2314 : }
2315 141 : if (needsnew &&
2316 16 : GDKunlink(0, lg->dir, LOGFILE, "bak") != GDK_SUCCEED) {
2317 0 : TRC_CRITICAL(GDK, "couldn't remove old log.bak file\n");
2318 0 : return GDK_FAIL;
2319 : }
2320 : } else {
2321 227 : lg->id = lg->saved_id + 1;
2322 227 : if (GDKgetenv_isyes("process-wal-and-exit")) {
2323 0 : printf("# mserver5 exiting\n");
2324 0 : exit(0);
2325 : }
2326 : }
2327 : #ifdef GDKLIBRARY_JSON
2328 352 : if (log_json_upgrade_finalize() == GDK_FAIL)
2329 0 : goto error;
2330 : #endif
2331 : return GDK_SUCCEED;
2332 0 : error:
2333 0 : if (fp)
2334 0 : fclose(fp);
2335 0 : logbat_destroy(lg->catalog_bid);
2336 0 : logbat_destroy(lg->catalog_id);
2337 0 : logbat_destroy(lg->dcatalog);
2338 0 : logbat_destroy(lg->seqs_id);
2339 0 : logbat_destroy(lg->seqs_val);
2340 0 : logbat_destroy(lg->dseqs);
2341 0 : MT_lock_destroy(&lg->lock);
2342 0 : MT_lock_destroy(&lg->rotation_lock);
2343 0 : GDKfree(lg->fn);
2344 0 : GDKfree(lg->dir);
2345 0 : GDKfree(lg->rbuf);
2346 0 : GDKfree(lg->wbuf);
2347 0 : GDKfree(lg);
2348 0 : ATOMIC_SET(&GDKdebug, dbg);
2349 : /* We do not call log_json_upgrade_finalize here because we want
2350 : * the upgrade to run again next time we try, so we do not want
2351 : * to remove the signal file just yet.
2352 : */
2353 0 : return GDK_FAIL;
2354 : }
2355 :
2356 : /* Initialize a new logger
2357 : * It will load any data in the logdir and persist it in the BATs*/
2358 : static logger *
2359 352 : log_new(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp,
2360 : postversionfix_fptr postfuncp, void *funcdata)
2361 : {
2362 352 : logger *lg;
2363 352 : char filename[FILENAME_MAX];
2364 :
2365 352 : lng max_dropped = GDKgetenv_int("wal_max_dropped", 100000);
2366 352 : lng max_file_age = GDKgetenv_int("wal_max_file_age", 600);
2367 352 : lng max_file_size = 0;
2368 :
2369 352 : if (GDKdebug & TESTINGMASK) {
2370 : max_file_size = 2048; /* 2 KiB */
2371 : } else {
2372 12 : const char *max_file_size_str = GDKgetenv("wal_max_file_size");
2373 12 : max_file_size = max_file_size_str ? strtoul(max_file_size_str, NULL, 10) : 2147483648;
2374 : }
2375 :
2376 352 : if (!GDKinmemory(0) && MT_path_absolute(logdir)) {
2377 0 : TRC_CRITICAL(GDK, "logdir must be relative path\n");
2378 0 : return NULL;
2379 : }
2380 :
2381 352 : if (snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP) >= FILENAME_MAX) {
2382 0 : TRC_CRITICAL(GDK, "filename is too large\n");
2383 0 : return NULL;
2384 : }
2385 :
2386 352 : lg = GDKmalloc(sizeof(struct logger));
2387 352 : if (lg == NULL) {
2388 0 : TRC_CRITICAL(GDK, "allocating logger structure failed\n");
2389 0 : return NULL;
2390 : }
2391 :
2392 704 : *lg = (logger) {
2393 352 : .inmemory = GDKinmemory(0),
2394 : .debug = debug,
2395 : .version = version,
2396 : .prefuncp = prefuncp,
2397 : .postfuncp = postfuncp,
2398 : .funcdata = funcdata,
2399 :
2400 352 : .max_dropped = max_dropped >= 0 ? max_dropped : 100000,
2401 : .file_age = 0,
2402 352 : .max_file_age = max_file_age >= 0 ? max_file_age * 1000000 : 600000000,
2403 352 : .max_file_size = max_file_size >= 0 ? max_file_size : 2147483648,
2404 :
2405 : .id = 0,
2406 352 : .saved_id = getBBPlogno(), /* get saved log number from bbp */
2407 : .nr_flushers = ATOMIC_VAR_INIT(0),
2408 352 : .fn = GDKstrdup(fn),
2409 352 : .dir = GDKstrdup(filename),
2410 : .rbufsize = 64 * 1024,
2411 352 : .rbuf = GDKmalloc(64 * 1024),
2412 : .wbufsize = 64 * 1024,
2413 352 : .wbuf = GDKmalloc(64 * 1024),
2414 : };
2415 :
2416 : /* probably open file and check version first, then call call old logger code */
2417 352 : if (lg->fn == NULL ||
2418 352 : lg->dir == NULL ||
2419 352 : lg->rbuf == NULL ||
2420 : lg->wbuf == NULL) {
2421 0 : TRC_CRITICAL(GDK, "allocating for logger structure failed\n");
2422 0 : GDKfree(lg->fn);
2423 0 : GDKfree(lg->dir);
2424 0 : GDKfree(lg->rbuf);
2425 0 : GDKfree(lg->wbuf);
2426 0 : GDKfree(lg);
2427 0 : return NULL;
2428 : }
2429 352 : TRC_DEBUG(WAL, "dir set to %s\n", lg->dir);
2430 :
2431 352 : MT_lock_init(&lg->lock, fn);
2432 352 : MT_lock_init(&lg->rotation_lock, "rotation_lock");
2433 352 : MT_lock_init(&lg->flush_lock, "flush_lock");
2434 352 : MT_cond_init(&lg->excl_flush_cv);
2435 :
2436 352 : if (log_load(fn, lg, filename) == GDK_SUCCEED) {
2437 : return lg;
2438 : }
2439 : return NULL;
2440 : }
2441 :
2442 : static logged_range *
2443 68668 : do_flush_range_cleanup(logger *lg)
2444 : {
2445 68668 : logged_range *frange = lg->flush_ranges;
2446 68668 : logged_range *first = frange;
2447 :
2448 68668 : if (frange == NULL)
2449 : return NULL;
2450 80846 : while (frange->next) {
2451 12385 : if (ATOMIC_GET(&frange->refcount) > 1)
2452 : break;
2453 12178 : frange = frange->next;
2454 : }
2455 68668 : if (first == frange) {
2456 : return first;
2457 : }
2458 :
2459 12178 : logged_range *flast = frange;
2460 :
2461 12178 : lg->flush_ranges = flast;
2462 :
2463 24356 : for (frange = first; frange && frange != flast; frange = frange->next) {
2464 12178 : ATOMIC_DEC(&frange->refcount);
2465 12178 : if (!LOG_DISABLED(lg) && frange->output_log) {
2466 0 : TRC_INFO(WAL, "closing output log %s", mnstr_name(frange->output_log));
2467 0 : close_stream(frange->output_log);
2468 0 : frange->output_log = NULL;
2469 : }
2470 : }
2471 : return flast;
2472 : }
2473 :
2474 : void
2475 351 : log_destroy(logger *lg)
2476 : {
2477 351 : log_close_input(lg);
2478 351 : logged_range *last = do_flush_range_cleanup(lg);
2479 351 : (void) last;
2480 351 : assert(last == lg->current && last == lg->flush_ranges);
2481 351 : log_close_output(lg);
2482 817 : for (logged_range * p = lg->pending; p; p = lg->pending) {
2483 466 : lg->pending = p->next;
2484 466 : GDKfree(p);
2485 : }
2486 351 : if (LOG_DISABLED(lg)) {
2487 1 : lg->saved_id = lg->id;
2488 1 : lg->saved_tid = lg->tid;
2489 1 : log_commit(lg, NULL, NULL, 0);
2490 : }
2491 351 : if (lg->catalog_bid) {
2492 351 : log_lock(lg);
2493 351 : BUN p, q;
2494 351 : BAT *b = lg->catalog_bid;
2495 :
2496 : /* free resources */
2497 351 : const log_bid *bids = (const log_bid *) Tloc(b, 0);
2498 92100 : BATloop(b, p, q) {
2499 91749 : bat bid = bids[p];
2500 :
2501 91749 : BBPrelease(bid);
2502 : }
2503 :
2504 351 : BBPrelease(lg->catalog_bid->batCacheid);
2505 351 : BBPrelease(lg->catalog_id->batCacheid);
2506 351 : BBPrelease(lg->dcatalog->batCacheid);
2507 351 : logbat_destroy(lg->catalog_bid);
2508 351 : logbat_destroy(lg->catalog_id);
2509 351 : logbat_destroy(lg->dcatalog);
2510 :
2511 351 : logbat_destroy(lg->catalog_cnt);
2512 351 : logbat_destroy(lg->catalog_lid);
2513 351 : log_unlock(lg);
2514 : }
2515 351 : MT_lock_destroy(&lg->lock);
2516 351 : MT_lock_destroy(&lg->rotation_lock);
2517 351 : MT_lock_destroy(&lg->flush_lock);
2518 351 : GDKfree(lg->fn);
2519 351 : GDKfree(lg->dir);
2520 351 : GDKfree(lg->rbuf);
2521 351 : GDKfree(lg->wbuf);
2522 351 : GDKfree(lg);
2523 351 : }
2524 :
2525 : /* Create a new logger */
2526 : logger *
2527 352 : log_create(int debug, const char *fn, const char *logdir, int version,
2528 : preversionfix_fptr prefuncp, postversionfix_fptr postfuncp,
2529 : void *funcdata)
2530 : {
2531 352 : logger *lg;
2532 352 : TRC_INFO_IF(WAL) {
2533 0 : TRC_INFO_ENDIF(WAL, "Started processing logs %s/%s version %d\n", fn, logdir, version);
2534 0 : GDKtracer_flush_buffer();
2535 : }
2536 352 : lg = log_new(debug, fn, logdir, version, prefuncp, postfuncp, funcdata);
2537 352 : if (lg == NULL)
2538 : return NULL;
2539 352 : TRC_INFO_IF(WAL) {
2540 0 : TRC_INFO_ENDIF(WAL, "Finished processing logs %s/%s\n", fn, logdir);
2541 0 : GDKtracer_flush_buffer();
2542 : }
2543 352 : if (GDKsetenv("recovery", "finished") != GDK_SUCCEED) {
2544 0 : log_destroy(lg);
2545 0 : return NULL;
2546 : }
2547 352 : assert(lg->current == NULL);
2548 352 : logged_range dummy = {
2549 352 : .cnt = BATcount(lg->catalog_bid),
2550 : };
2551 352 : lg->current = &dummy;
2552 352 : if (log_open_output(lg) != GDK_SUCCEED) {
2553 0 : lg->current = NULL;
2554 0 : log_destroy(lg);
2555 0 : return NULL;
2556 : }
2557 352 : lg->current = lg->current->next;
2558 352 : assert(lg->pending == NULL && lg->flush_ranges == NULL);
2559 352 : lg->pending = lg->current;
2560 352 : lg->flush_ranges = lg->current;
2561 352 : return lg;
2562 : }
2563 :
2564 : static logged_range *
2565 6533 : log_next_logfile(logger *lg, ulng ts)
2566 : {
2567 6533 : int m = (ATOMIC_GET(&GDKdebug) & TESTINGMASK) ? 1000 : 100;
2568 6533 : if (!lg->pending || !lg->pending->next)
2569 : return NULL;
2570 6533 : rotation_lock(lg);
2571 6533 : if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && lg->pending != lg->flush_ranges &&
2572 6533 : (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) ATOMIC_GET(&lg->pending->flushed_ts) &&
2573 6533 : (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) {
2574 5975 : rotation_unlock(lg);
2575 5975 : logged_range *p = lg->pending;
2576 5975 : for (int i = 1;
2577 12063 : i < m && ATOMIC_GET(&p->refcount) == 0 && p->next && p->next != lg->current &&
2578 6090 : p->next != lg->flush_ranges && (ulng) ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts)
2579 18153 : && (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++)
2580 6088 : p = p->next;
2581 5975 : return p;
2582 : }
2583 558 : rotation_unlock(lg);
2584 558 : return NULL;
2585 : }
2586 :
2587 : static void
2588 5975 : log_cleanup_range(logger *lg, ulng id)
2589 : {
2590 5975 : rotation_lock(lg);
2591 18038 : while (lg->pending && lg->pending->id <= id) {
2592 12063 : logged_range *p;
2593 12063 : p = lg->pending;
2594 12063 : if (p)
2595 12063 : lg->pending = p->next;
2596 12063 : GDKfree(p);
2597 : }
2598 5975 : rotation_unlock(lg);
2599 5975 : }
2600 :
2601 : static void
2602 68320 : do_rotate(logger *lg)
2603 : {
2604 68320 : logged_range *cur = lg->current;
2605 68320 : logged_range *next = cur->next;
2606 68320 : if (next) {
2607 12178 : assert(ATOMIC_GET(&next->refcount) == 1);
2608 12178 : lg->current = next;
2609 12178 : if (!LOG_DISABLED(lg) && ATOMIC_GET(&cur->refcount) == 1 && cur->output_log) {
2610 12045 : close_stream(cur->output_log);
2611 12045 : cur->output_log = NULL;
2612 : }
2613 : }
2614 68320 : }
2615 :
2616 : gdk_return
2617 7181 : log_activate(logger *lg)
2618 : {
2619 7181 : bool flush_cleanup = false;
2620 7181 : gdk_return res = GDK_SUCCEED;
2621 :
2622 7181 : rotation_lock(lg);
2623 7181 : const lng current_file_size = LOG_DISABLED(lg) ? 0 : (lng) getfilepos(getFile(lg->current->output_log));
2624 :
2625 7181 : if (current_file_size == -1) {
2626 0 : rotation_unlock(lg);
2627 0 : return GDK_FAIL;
2628 : }
2629 : /* file size of 2 means only endian indicator present
2630 : * (i.e. effectively empty) */
2631 7181 : if (current_file_size <= 2) {
2632 5252 : rotation_unlock(lg);
2633 5252 : return GDK_SUCCEED;
2634 : }
2635 :
2636 1929 : if (!lg->flushnow &&
2637 1929 : !lg->current->next &&
2638 1929 : current_file_size > 2 &&
2639 1929 : (ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped ||
2640 1920 : current_file_size > lg->max_file_size ||
2641 1902 : (GDKusec() - lg->file_age) > lg->max_file_age) &&
2642 27 : (ulng) ATOMIC_GET(&lg->current->last_ts) > 0 &&
2643 27 : lg->saved_id + 1 == lg->id &&
2644 25 : ATOMIC_GET(&lg->current->refcount) == 1 /* no pending work on this file */ ) {
2645 25 : lg->id++;
2646 : /* start new file */
2647 25 : res = log_open_output(lg);
2648 25 : flush_cleanup = true;
2649 25 : do_rotate(lg);
2650 : }
2651 25 : if (flush_cleanup)
2652 25 : (void) do_flush_range_cleanup(lg);
2653 1929 : rotation_unlock(lg);
2654 1929 : return res;
2655 : }
2656 :
2657 : gdk_return
2658 6533 : log_flush(logger *lg, ulng ts)
2659 : {
2660 6533 : logged_range *pending = log_next_logfile(lg, ts);
2661 6533 : ulng lid = pending ? pending->id : 0, olid = lg->saved_id;
2662 6533 : if (LOG_DISABLED(lg)) {
2663 0 : lg->saved_id = lid;
2664 0 : lg->saved_tid = lg->tid;
2665 0 : if (lid)
2666 0 : log_cleanup_range(lg, lg->saved_id);
2667 0 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED)
2668 0 : TRC_ERROR(GDK, "failed to commit");
2669 0 : return GDK_SUCCEED;
2670 : }
2671 6533 : if (lg->saved_id >= lid)
2672 : return GDK_SUCCEED;
2673 5975 : rotation_lock(lg);
2674 5975 : ulng lgid = lg->id;
2675 5975 : rotation_unlock(lg);
2676 5975 : if (lg->saved_id + 1 >= lgid) /* logger should first release the file */
2677 : return GDK_SUCCEED;
2678 5975 : log_return res = LOG_OK;
2679 5975 : ulng cid = olid;
2680 5975 : assert(lid <= lgid);
2681 : uint32_t *updated = NULL;
2682 : BUN nupdated = 0;
2683 : size_t allocated = 0;
2684 18038 : while (cid < lid && res == LOG_OK) {
2685 12063 : if (!lg->input_log) {
2686 12063 : char filename[MAXPATH];
2687 12063 : char id[32];
2688 12063 : if (snprintf(id, sizeof(id), LLFMT, cid + 1) >= (int) sizeof(id)) {
2689 0 : GDKfree(updated);
2690 0 : TRC_CRITICAL(GDK, "log_id filename is too large\n");
2691 0 : return GDK_FAIL;
2692 : }
2693 12063 : if (GDKfilepath(filename, sizeof(filename), BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id) != GDK_SUCCEED) {
2694 0 : GDKfree(updated);
2695 0 : return GDK_FAIL;
2696 : }
2697 12063 : if (strlen(filename) >= FILENAME_MAX) {
2698 : GDKfree(updated);
2699 : TRC_CRITICAL(GDK, "Logger filename path is too large\n");
2700 : return GDK_FAIL;
2701 : }
2702 :
2703 12063 : bool filemissing = false;
2704 12063 : if (log_open_input(lg, filename, &filemissing) != GDK_SUCCEED) {
2705 0 : GDKfree(updated);
2706 0 : return GDK_FAIL;
2707 : }
2708 : }
2709 : /* we read the full file because skipping is impossible with current log format */
2710 12063 : log_lock(lg);
2711 12063 : if (updated == NULL) {
2712 5975 : nupdated = BATcount(lg->catalog_id);
2713 5975 : allocated = ((nupdated + 31) & ~31) / 8;
2714 5975 : if (allocated == 0)
2715 0 : allocated = 4;
2716 5975 : updated = GDKzalloc(allocated);
2717 5975 : if (updated == NULL) {
2718 0 : log_unlock(lg);
2719 0 : return GDK_FAIL;
2720 : }
2721 6088 : } else if (nupdated < BATcount(lg->catalog_id)) {
2722 536 : BUN n = BATcount(lg->catalog_id);
2723 536 : size_t a = ((n + 31) & ~31) / 8;
2724 536 : if (a > allocated) {
2725 59 : uint32_t *p = GDKrealloc(updated, a);
2726 59 : if (p == NULL) {
2727 0 : GDKfree(updated);
2728 0 : log_unlock(lg);
2729 0 : return GDK_FAIL;
2730 : }
2731 59 : updated = p;
2732 59 : memset(updated + allocated / 4, 0, a - allocated);
2733 59 : allocated = a;
2734 : }
2735 : nupdated = n;
2736 : }
2737 12063 : lg->flushing = true;
2738 12063 : res = log_read_transaction(lg, updated, nupdated);
2739 12063 : lg->flushing = false;
2740 12063 : log_unlock(lg);
2741 12063 : if (res == LOG_EOF) {
2742 12063 : log_close_input(lg);
2743 12063 : res = LOG_OK;
2744 : }
2745 12063 : cid++;
2746 : }
2747 5975 : if (lid > olid && res == LOG_OK) {
2748 5975 : rotation_lock(lg); /* protect against concurrent log_tflush rotate check */
2749 5975 : lg->saved_id = lid;
2750 5975 : rotation_unlock(lg);
2751 5975 : if (log_commit(lg, pending, updated, nupdated) != GDK_SUCCEED) {
2752 0 : TRC_ERROR(GDK, "failed to commit");
2753 0 : res = LOG_ERR;
2754 0 : rotation_lock(lg);
2755 0 : lg->saved_id = olid; /* reset !! */
2756 0 : rotation_unlock(lg);
2757 : }
2758 0 : if (res != LOG_ERR) {
2759 18038 : while (olid < lid) {
2760 : /* Try to cleanup, remove old log file, continue on failure! */
2761 12063 : olid++;
2762 12063 : (void) log_cleanup(lg, olid);
2763 : }
2764 : }
2765 5975 : if (res == LOG_OK)
2766 5975 : log_cleanup_range(lg, lg->saved_id);
2767 : }
2768 5975 : GDKfree(updated);
2769 5975 : return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
2770 : }
2771 :
2772 : /* Clean-up write-ahead log files already persisted in the BATs, leaving only the most recent one.
2773 : * Only the bak- files are deleted for the preserved WAL files.
2774 : */
2775 : lng
2776 17034 : log_changes(logger *lg)
2777 : {
2778 17034 : if (LOG_DISABLED(lg))
2779 : return 0;
2780 17034 : rotation_lock(lg);
2781 17034 : lng changes = lg->id - lg->saved_id - 1;
2782 17034 : rotation_unlock(lg);
2783 17034 : return changes;
2784 : }
2785 :
2786 : int
2787 483 : log_sequence(logger *lg, int seq, lng *id)
2788 : {
2789 483 : log_lock(lg);
2790 483 : BUN p = log_find(lg->seqs_id, lg->dseqs, seq);
2791 :
2792 483 : if (p != BUN_NONE) {
2793 364 : *id = *(lng *) Tloc(lg->seqs_val, p);
2794 :
2795 364 : log_unlock(lg);
2796 364 : return 1;
2797 : }
2798 119 : log_unlock(lg);
2799 119 : return 0;
2800 : }
2801 :
2802 : gdk_return
2803 199193 : log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng cnt)
2804 : {
2805 199193 : bte tpe = find_type(lg, type);
2806 199193 : gdk_return ok = GDK_SUCCEED;
2807 199193 : logformat l;
2808 199193 : lng nr;
2809 199193 : l.flag = LOG_UPDATE_CONST;
2810 199193 : l.id = id;
2811 199193 : nr = cnt;
2812 :
2813 199193 : if (LOG_DISABLED(lg) || !nr) {
2814 : /* logging is switched off */
2815 73009 : if (nr) {
2816 73009 : log_lock(lg);
2817 73009 : ok = la_bat_update_count(lg, id, offset + cnt, lg->tid);
2818 73009 : log_unlock(lg);
2819 : }
2820 73009 : return ok;
2821 : }
2822 :
2823 126184 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[type].atomWrite;
2824 :
2825 126184 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2826 252368 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
2827 252368 : log_write_format(lg, &l) != GDK_SUCCEED ||
2828 252368 : !mnstr_writeLng(lg->current->output_log, nr) ||
2829 252368 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2830 126184 : !mnstr_writeLng(lg->current->output_log, offset)) {
2831 0 : ATOMIC_DEC(&lg->current->refcount);
2832 0 : ok = GDK_FAIL;
2833 0 : goto bailout;
2834 : }
2835 :
2836 126184 : ok = wt(val, lg->current->output_log, 1);
2837 :
2838 126184 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
2839 :
2840 126184 : bailout:
2841 126184 : if (ok != GDK_SUCCEED) {
2842 0 : const char *err = mnstr_peek_error(lg->current->output_log);
2843 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
2844 : }
2845 : return ok;
2846 : }
2847 :
2848 : static gdk_return
2849 19675 : string_writer(logger *lg, BAT *b, lng offset, lng nr)
2850 : {
2851 19675 : size_t bufsz = lg->wbufsize, resize = 0;
2852 19675 : BUN end = (BUN) (offset + nr);
2853 19675 : char *buf = lg->wbuf;
2854 19675 : gdk_return res = GDK_SUCCEED;
2855 :
2856 19675 : if (!buf)
2857 : return GDK_FAIL;
2858 19675 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2859 19675 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
2860 : return GDK_FAIL;
2861 19675 : BATiter bi = bat_iterator(b);
2862 19675 : BUN p = (BUN) offset;
2863 39402 : for (; p < end;) {
2864 19727 : size_t sz = 0;
2865 19727 : if (resize) {
2866 2 : if ((buf = GDKrealloc(lg->wbuf, resize)) == NULL) {
2867 : res = GDK_FAIL;
2868 : break;
2869 : }
2870 2 : lg->wbuf = buf;
2871 2 : lg->wbufsize = bufsz = resize;
2872 2 : resize = 0;
2873 : }
2874 19727 : char *dst = buf;
2875 71667 : for (; p < end && sz < bufsz; p++) {
2876 51992 : const char *s = BUNtvar(bi, p);
2877 51992 : size_t len = strlen(s) + 1;
2878 51992 : if ((sz + len) > bufsz) {
2879 52 : if (len > bufsz)
2880 2 : resize = len + bufsz;
2881 : break;
2882 : } else {
2883 51940 : memcpy(dst, s, len);
2884 51940 : dst += len;
2885 51940 : sz += len;
2886 : }
2887 : }
2888 39452 : if (sz &&
2889 39450 : (!mnstr_writeLng(lg->current->output_log, (lng) sz) ||
2890 19725 : mnstr_write(lg->current->output_log, buf, sz, 1) != 1)) {
2891 : res = GDK_FAIL;
2892 : break;
2893 : }
2894 : }
2895 19675 : bat_iterator_end(&bi);
2896 19675 : return res;
2897 : }
2898 :
2899 : static gdk_return
2900 511855 : internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced, lng total_cnt)
2901 : {
2902 511855 : bte tpe = find_type(lg, b->ttype);
2903 511855 : gdk_return ok = GDK_SUCCEED;
2904 511855 : logformat l;
2905 511855 : BUN p;
2906 511855 : lng nr;
2907 511855 : l.flag = LOG_UPDATE_BULK;
2908 511855 : l.id = id;
2909 511855 : nr = cnt;
2910 :
2911 511855 : if (LOG_DISABLED(lg) || !nr) {
2912 : /* logging is switched off */
2913 210760 : if (nr)
2914 156011 : return la_bat_update_count(lg, id, offset + cnt, lg->tid);
2915 : return GDK_SUCCEED;
2916 : }
2917 :
2918 271797 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[b->ttype].atomWrite;
2919 :
2920 271797 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2921 271797 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR) {
2922 0 : ok = GDK_FAIL;
2923 0 : goto bailout;
2924 : }
2925 :
2926 271797 : if (lg->total_cnt == 0) /* signals single bulk message or first part of bat logged in parts */
2927 542966 : if (log_write_format(lg, &l) != GDK_SUCCEED ||
2928 672820 : !mnstr_writeLng(lg->current->output_log, total_cnt ? total_cnt : cnt) ||
2929 542966 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2930 413112 : !mnstr_writeLng(lg->current->output_log, total_cnt ? -1 : offset)) { /* offset = -1 indicates bat was logged in parts */
2931 0 : ok = GDK_FAIL;
2932 0 : goto bailout;
2933 : }
2934 271797 : if (!total_cnt)
2935 129854 : total_cnt = cnt;
2936 271797 : lg->total_cnt += cnt;
2937 :
2938 271797 : if (lg->total_cnt == total_cnt) /* This is the last to be logged part of this bat, we can already reset the total_cnt */
2939 271483 : lg->total_cnt = 0;
2940 :
2941 : /* if offset is just for the log, but BAT is already sliced, reset offset */
2942 271797 : if (sliced)
2943 1512 : offset = 0;
2944 271797 : if (b->ttype == TYPE_msk) {
2945 0 : BATiter bi = bat_iterator(b);
2946 0 : if (offset % 32 == 0) {
2947 0 : if (!mnstr_writeIntArray(lg->current->output_log, (int *) ((char *) bi.base + offset / 32),
2948 0 : (size_t) ((nr + 31) / 32)))
2949 0 : ok = GDK_FAIL;
2950 : } else {
2951 0 : for (lng i = 0; i < nr; i += 32) {
2952 : uint32_t v = 0;
2953 0 : for (int j = 0; j < 32 && i + j < nr; j++)
2954 0 : v |= (uint32_t) Tmskval(&bi, (BUN) (offset + i + j)) << j;
2955 0 : if (!mnstr_writeInt(lg->current->output_log, (int) v)) {
2956 : ok = GDK_FAIL;
2957 : break;
2958 : }
2959 : }
2960 : }
2961 0 : bat_iterator_end(&bi);
2962 523275 : } else if (b->ttype < TYPE_str && !isVIEW(b)) {
2963 251478 : BATiter bi = bat_iterator(b);
2964 251478 : const void *t = BUNtail(bi, (BUN) offset);
2965 :
2966 251478 : ok = wt(t, lg->current->output_log, (size_t) nr);
2967 251478 : bat_iterator_end(&bi);
2968 20319 : } else if (b->ttype == TYPE_str) {
2969 : /* efficient string writes */
2970 19669 : ok = string_writer(lg, b, offset, nr);
2971 : } else {
2972 650 : BATiter bi = bat_iterator(b);
2973 650 : BUN end = (BUN) (offset + nr);
2974 1658 : for (p = (BUN) offset; p < end && ok == GDK_SUCCEED; p++) {
2975 1008 : const void *t = BUNtail(bi, p);
2976 :
2977 1008 : ok = wt(t, lg->current->output_log, 1);
2978 : }
2979 650 : bat_iterator_end(&bi);
2980 : }
2981 :
2982 271797 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
2983 :
2984 271797 : bailout:
2985 271797 : if (ok != GDK_SUCCEED) {
2986 0 : ATOMIC_DEC(&lg->current->refcount);
2987 0 : const char *err = mnstr_peek_error(lg->current->output_log);
2988 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
2989 : }
2990 : return ok;
2991 : }
2992 :
2993 : /*
2994 : * Changes made to the BAT descriptor should be stored in the log
2995 : * files. Actually, we need to save the descriptor file, perhaps we
2996 : * should simply introduce a versioning scheme.
2997 : */
2998 : gdk_return
2999 236750 : log_bat_persists(logger *lg, BAT *b, log_id id)
3000 : {
3001 236750 : log_lock(lg);
3002 236750 : bte ta = find_type(lg, b->ttype);
3003 236750 : logformat l;
3004 :
3005 236750 : if (log_add_bat(lg, b, id, -1) != GDK_SUCCEED) {
3006 0 : log_unlock(lg);
3007 0 : if (!LOG_DISABLED(lg))
3008 0 : ATOMIC_DEC(&lg->current->refcount);
3009 0 : return GDK_FAIL;
3010 : }
3011 :
3012 236750 : l.flag = LOG_CREATE;
3013 236750 : l.id = id;
3014 236750 : if (!LOG_DISABLED(lg)) {
3015 157628 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3016 315256 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3017 315256 : log_write_format(lg, &l) != GDK_SUCCEED ||
3018 157628 : mnstr_write(lg->current->output_log, &ta, 1, 1) != 1) {
3019 0 : log_unlock(lg);
3020 0 : ATOMIC_DEC(&lg->current->refcount);
3021 0 : return GDK_FAIL;
3022 : }
3023 : }
3024 236750 : TRC_DEBUG(WAL, "id (%d) bat (%d)\n", id, b->batCacheid);
3025 236750 : gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0, 0);
3026 236750 : log_unlock(lg);
3027 236750 : if (r != GDK_SUCCEED)
3028 0 : ATOMIC_DEC(&lg->current->refcount);
3029 : return r;
3030 : }
3031 :
3032 : gdk_return
3033 22210 : log_bat_transient(logger *lg, log_id id)
3034 : {
3035 22210 : log_lock(lg);
3036 22210 : log_bid bid = internal_find_bat(lg, id, -1);
3037 22210 : logformat l;
3038 :
3039 22210 : if (bid < 0) {
3040 0 : log_unlock(lg);
3041 0 : return GDK_FAIL;
3042 : }
3043 22210 : if (!bid) {
3044 0 : GDKerror("log_bat_transient failed to find bid for object %d\n", id);
3045 0 : log_unlock(lg);
3046 0 : return GDK_FAIL;
3047 : }
3048 22210 : l.flag = LOG_DESTROY;
3049 22210 : l.id = id;
3050 :
3051 22210 : if (!LOG_DISABLED(lg)) {
3052 10249 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3053 0 : TRC_CRITICAL(GDK, "write failed\n");
3054 0 : log_unlock(lg);
3055 0 : ATOMIC_DEC(&lg->current->refcount);
3056 0 : return GDK_FAIL;
3057 : }
3058 : }
3059 22210 : TRC_DEBUG(WAL, "Logged destroyed bat (%d) %d\n", id, bid);
3060 22210 : BAT *b = BBPquickdesc(bid);
3061 22210 : assert(b);
3062 22210 : BUN cnt = BATcount(b);
3063 22210 : ATOMIC_ADD(&lg->current->drops, cnt);
3064 22210 : gdk_return r = log_del_bat(lg, bid);
3065 22210 : log_unlock(lg);
3066 22210 : if (r != GDK_SUCCEED)
3067 0 : ATOMIC_DEC(&lg->current->refcount);
3068 : return r;
3069 : }
3070 :
3071 : static gdk_return
3072 118302 : log_bat_group(logger *lg, log_id id)
3073 : {
3074 118302 : if (LOG_DISABLED(lg))
3075 : return GDK_SUCCEED;
3076 :
3077 77200 : logformat l;
3078 77200 : l.flag = LOG_BAT_GROUP;
3079 77200 : l.id = id;
3080 77200 : gdk_return r = log_write_format(lg, &l);
3081 77200 : return r;
3082 : }
3083 :
3084 : gdk_return
3085 59151 : log_bat_group_start(logger *lg, log_id id)
3086 : {
3087 : /*positive table id represent start of logged table */
3088 59151 : return log_bat_group(lg, id);
3089 : }
3090 :
3091 : gdk_return
3092 59151 : log_bat_group_end(logger *lg, log_id id)
3093 : {
3094 : /*negative table id represent end of logged table */
3095 59151 : return log_bat_group(lg, -id);
3096 : }
3097 :
3098 : gdk_return
3099 272389 : log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, lng total_cnt)
3100 : {
3101 272389 : log_lock(lg);
3102 272389 : gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0, total_cnt);
3103 272389 : log_unlock(lg);
3104 272389 : return r;
3105 : }
3106 :
3107 : gdk_return
3108 2800 : log_delta(logger *lg, BAT *uid, BAT *uval, log_id id)
3109 : {
3110 2800 : log_lock(lg);
3111 2800 : bte tpe = find_type(lg, uval->ttype);
3112 2800 : gdk_return ok = GDK_SUCCEED;
3113 2800 : logformat l;
3114 2800 : BUN p;
3115 2800 : lng nr;
3116 :
3117 2800 : if (BATtdense(uid)) {
3118 2716 : ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1, 0);
3119 2716 : log_unlock(lg);
3120 2716 : if (!LOG_DISABLED(lg) && ok != GDK_SUCCEED)
3121 0 : ATOMIC_DEC(&lg->current->refcount);
3122 2716 : return ok;
3123 : }
3124 :
3125 84 : assert(uid->ttype == TYPE_oid || uid->ttype == TYPE_void);
3126 :
3127 84 : l.flag = LOG_UPDATE;
3128 84 : l.id = id;
3129 84 : nr = (BATcount(uval));
3130 84 : assert(nr);
3131 :
3132 84 : if (LOG_DISABLED(lg)) {
3133 : /* logging is switched off */
3134 65 : log_unlock(lg);
3135 65 : return GDK_SUCCEED;
3136 : }
3137 :
3138 19 : BATiter vi = bat_iterator(uval);
3139 19 : gdk_return(*wh) (const void *, stream *, size_t) = BATatoms[TYPE_oid].atomWrite;
3140 19 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[uval->ttype].atomWrite;
3141 :
3142 19 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3143 38 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3144 38 : log_write_format(lg, &l) != GDK_SUCCEED ||
3145 38 : !mnstr_writeLng(lg->current->output_log, nr) ||
3146 19 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1) {
3147 0 : ok = GDK_FAIL;
3148 0 : goto bailout;
3149 : }
3150 1074 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3151 1055 : const oid id = BUNtoid(uid, p);
3152 :
3153 1055 : ok = wh(&id, lg->current->output_log, 1);
3154 : }
3155 19 : if (uval->ttype == TYPE_msk) {
3156 0 : if (!mnstr_writeIntArray(lg->current->output_log, vi.base,
3157 0 : (BATcount(uval) + 31) / 32))
3158 0 : ok = GDK_FAIL;
3159 32 : } else if (uval->ttype < TYPE_str && !isVIEW(uval)) {
3160 13 : const void *t = BUNtail(vi, 0);
3161 :
3162 13 : ok = wt(t, lg->current->output_log, (size_t) nr);
3163 6 : } else if (uval->ttype == TYPE_str) {
3164 : /* efficient string writes */
3165 6 : ok = string_writer(lg, uval, 0, nr);
3166 : } else {
3167 0 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3168 0 : const void *val = BUNtail(vi, p);
3169 :
3170 0 : ok = wt(val, lg->current->output_log, 1);
3171 : }
3172 : }
3173 :
3174 19 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
3175 :
3176 19 : bailout:
3177 19 : bat_iterator_end(&vi);
3178 19 : if (ok != GDK_SUCCEED) {
3179 0 : const char *err = mnstr_peek_error(lg->current->output_log);
3180 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
3181 0 : ATOMIC_DEC(&lg->current->refcount);
3182 : }
3183 19 : log_unlock(lg);
3184 19 : return ok;
3185 : }
3186 :
3187 : static inline bool
3188 57371 : check_rotation_conditions(logger *lg)
3189 : {
3190 57371 : if (LOG_DISABLED(lg))
3191 : return false;
3192 :
3193 57368 : if (lg->current->next)
3194 : return false; /* do not rotate if there is already a prepared next current */
3195 57368 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
3196 : return true;
3197 57368 : const lng current_file_size = (lng) getfilepos(getFile(lg->current->output_log));
3198 :
3199 57368 : if (current_file_size == -1)
3200 : return false;
3201 :
3202 57368 : assert(current_file_size >= 0);
3203 :
3204 57368 : if (current_file_size == 2)
3205 : return false;
3206 :
3207 1419 : bool res = (lg->saved_id + 1 >= lg->id && ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped) ||
3208 56552 : current_file_size > lg->max_file_size ||
3209 50492 : (GDKusec() - lg->file_age) > lg->max_file_age;
3210 :
3211 55141 : return res;
3212 : }
3213 :
3214 : gdk_return
3215 62833 : log_tend(logger *lg)
3216 : {
3217 62833 : TRC_DEBUG(WAL, "tend %d\n", lg->tid);
3218 :
3219 62833 : if (LOG_DISABLED(lg))
3220 : return GDK_SUCCEED;
3221 :
3222 57368 : gdk_return result;
3223 57368 : logformat l;
3224 57368 : l.flag = LOG_END;
3225 57368 : l.id = lg->tid;
3226 :
3227 57368 : if ((result = log_write_format(lg, &l)) == GDK_SUCCEED)
3228 57368 : ATOMIC_INC(&lg->nr_flushers);
3229 : return result;
3230 : }
3231 :
3232 : #define flush_lock(lg) MT_lock_set(&(lg)->flush_lock)
3233 : #define flush_unlock(lg) MT_lock_unset(&(lg)->flush_lock)
3234 :
3235 : static inline gdk_return
3236 56979 : do_flush(logged_range *range)
3237 : {
3238 : /* assumes flush lock */
3239 56979 : stream *output_log = range->output_log;
3240 56979 : ulng ts = ATOMIC_GET(&range->last_ts);
3241 :
3242 56979 : if (mnstr_flush(output_log, MNSTR_FLUSH_DATA) ||
3243 56979 : (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) && mnstr_fsync(output_log)))
3244 0 : return GDK_FAIL;
3245 56979 : ATOMIC_SET(&range->flushed_ts, ts);
3246 56979 : return GDK_SUCCEED;
3247 : }
3248 :
3249 : static inline void
3250 62830 : log_tdone(logger *lg, logged_range *range, ulng commit_ts)
3251 : {
3252 62830 : (void) lg;
3253 62830 : TRC_DEBUG(WAL, "tdone " LLFMT "\n", commit_ts);
3254 :
3255 62830 : if ((ulng) ATOMIC_GET(&range->last_ts) < commit_ts)
3256 62441 : ATOMIC_SET(&range->last_ts, commit_ts);
3257 62830 : }
3258 :
3259 : gdk_return
3260 62833 : log_tflush(logger *lg, ulng file_id, ulng commit_ts)
3261 : {
3262 62833 : rotation_lock(lg);
3263 62833 : if (lg->flushnow) {
3264 5462 : logged_range *p = lg->current;
3265 5462 : assert(lg->flush_ranges == lg->current);
3266 5462 : assert(ATOMIC_GET(&lg->current->flushed_ts) == ATOMIC_GET(&lg->current->last_ts));
3267 5462 : log_tdone(lg, lg->current, commit_ts);
3268 5462 : ATOMIC_SET(&lg->current->flushed_ts, commit_ts);
3269 5462 : lg->id++;
3270 5462 : lg->flushnow = false;
3271 5462 : if (log_open_output(lg) != GDK_SUCCEED)
3272 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3273 5462 : do_rotate(lg);
3274 5462 : (void) do_flush_range_cleanup(lg);
3275 5462 : assert(lg->flush_ranges == lg->current);
3276 5462 : rotation_unlock(lg);
3277 5462 : return log_commit(lg, p, NULL, 0);
3278 : }
3279 :
3280 57371 : if (LOG_DISABLED(lg)) {
3281 3 : rotation_unlock(lg);
3282 3 : return GDK_SUCCEED;
3283 : }
3284 :
3285 57368 : logged_range *frange = do_flush_range_cleanup(lg);
3286 :
3287 57437 : while (frange->next && frange->id < file_id) {
3288 : assert(frange->next);
3289 : frange = frange->next;
3290 : }
3291 :
3292 57368 : log_tdone(lg, frange, commit_ts);
3293 :
3294 57368 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts) {
3295 : /* delay needed ? */
3296 :
3297 56979 : flush_lock(lg);
3298 : /* check it one more time */
3299 56979 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts)
3300 56979 : do_flush(frange);
3301 56979 : flush_unlock(lg);
3302 : }
3303 : /* else somebody else has flushed our log file */
3304 :
3305 57368 : if (ATOMIC_DEC(&frange->refcount) == 1 && !LOG_DISABLED(lg)) {
3306 55992 : if (frange != lg->current && frange->output_log) {
3307 133 : close_stream(frange->output_log);
3308 133 : frange->output_log = NULL;
3309 : }
3310 : }
3311 :
3312 57368 : if (ATOMIC_DEC(&lg->nr_flushers) == 0) {
3313 : /* I am the last flusher
3314 : * if present,
3315 : * wake up the exclusive flusher in log_tstart */
3316 : /* rotation_lock is still being held */
3317 56739 : MT_cond_signal(&lg->excl_flush_cv);
3318 : }
3319 57368 : rotation_unlock(lg);
3320 :
3321 57368 : return GDK_SUCCEED;
3322 : }
3323 :
3324 : static gdk_return
3325 6954 : log_tsequence_(logger *lg, int seq, lng val)
3326 : {
3327 6954 : logformat l;
3328 :
3329 6954 : if (LOG_DISABLED(lg))
3330 : return GDK_SUCCEED;
3331 2846 : l.flag = LOG_SEQ;
3332 2846 : l.id = seq;
3333 :
3334 2846 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3335 :
3336 2846 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3337 5692 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3338 5692 : log_write_format(lg, &l) != GDK_SUCCEED ||
3339 2846 : !mnstr_writeLng(lg->current->output_log, val)) {
3340 0 : TRC_CRITICAL(GDK, "write failed\n");
3341 0 : ATOMIC_DEC(&lg->current->refcount);
3342 0 : return GDK_FAIL;
3343 : }
3344 : return GDK_SUCCEED;
3345 : }
3346 :
3347 : /* a transaction in it self */
3348 : gdk_return
3349 6954 : log_tsequence(logger *lg, int seq, lng val)
3350 : {
3351 6954 : BUN p;
3352 :
3353 6954 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3354 :
3355 6954 : log_lock(lg);
3356 6954 : MT_lock_set(&lg->seqs_id->theaplock);
3357 6954 : BUN inserted = lg->seqs_id->batInserted;
3358 6954 : MT_lock_unset(&lg->seqs_id->theaplock);
3359 6954 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE && p >= inserted) {
3360 1689 : assert(lg->seqs_val->hseqbase == 0);
3361 1689 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
3362 0 : log_unlock(lg);
3363 0 : return GDK_FAIL;
3364 : }
3365 : } else {
3366 4900 : if (p != BUN_NONE) {
3367 4900 : oid pos = p;
3368 4900 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
3369 0 : log_unlock(lg);
3370 0 : return GDK_FAIL;
3371 : }
3372 : }
3373 10530 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
3374 5265 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
3375 0 : log_unlock(lg);
3376 0 : return GDK_FAIL;
3377 : }
3378 : }
3379 6954 : gdk_return r = log_tsequence_(lg, seq, val);
3380 6954 : log_unlock(lg);
3381 6954 : return r;
3382 : }
3383 :
3384 : static gdk_return
3385 11790 : bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
3386 : {
3387 11790 : log_lock(lg);
3388 11790 : BAT *b = lg->catalog_bid;
3389 11790 : const log_bid *bids;
3390 :
3391 11790 : bids = (log_bid *) Tloc(b, 0);
3392 248538 : for (BUN p = b->batInserted, cnt = pending ? pending->cnt : BATcount(b); p < cnt; p++) {
3393 236748 : log_bid bid = bids[p];
3394 236748 : BAT *lb = BBP_desc(bid);
3395 :
3396 236748 : assert(bid);
3397 236748 : if (lb->batCacheid == 0 || BATmode(lb, false) != GDK_SUCCEED) {
3398 0 : GDKwarning("Failed to set bat (%d%s) persistent\n", bid, !lb ? " gone" : "");
3399 0 : log_unlock(lg);
3400 0 : return GDK_FAIL;
3401 : }
3402 :
3403 236748 : assert(lb->batRestricted != BAT_WRITE);
3404 :
3405 236748 : TRC_DEBUG(WAL, "create %d (%d)\n", bid, BBP_lrefs(bid));
3406 : }
3407 : /* bm_subcommit releases the lock */
3408 11790 : return bm_subcommit(lg, pending, updated, maxupdated);
3409 : }
3410 :
3411 : static gdk_return
3412 237014 : log_add_bat(logger *lg, BAT *b, log_id id, int tid)
3413 : {
3414 237014 : log_bid bid = internal_find_bat(lg, id, tid);
3415 237014 : lng cnt = 0;
3416 237014 : lng lid = lng_nil;
3417 :
3418 237014 : assert(b->batRestricted != BAT_WRITE);
3419 237014 : assert(b->batRole == PERSISTENT);
3420 237014 : if (bid < 0)
3421 : return GDK_FAIL;
3422 237014 : if (bid) {
3423 155507 : if (bid != b->batCacheid) {
3424 155504 : if (log_del_bat(lg, bid) != GDK_SUCCEED)
3425 : return GDK_FAIL;
3426 : } else {
3427 : return GDK_SUCCEED;
3428 : }
3429 : }
3430 237011 : bid = b->batCacheid;
3431 237011 : TRC_DEBUG(WAL, "create %d\n", id);
3432 237011 : assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
3433 474022 : if (BUNappend(lg->catalog_bid, &bid, true) != GDK_SUCCEED ||
3434 474022 : BUNappend(lg->catalog_id, &id, true) != GDK_SUCCEED ||
3435 474022 : BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED ||
3436 237011 : BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
3437 0 : return GDK_FAIL;
3438 237011 : if (lg->current)
3439 236747 : lg->current->cnt++;
3440 237011 : BBPretain(bid);
3441 237011 : return GDK_SUCCEED;
3442 : }
3443 :
3444 : static gdk_return
3445 177762 : log_del_bat(logger *lg, log_bid bid)
3446 : {
3447 177762 : BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid);
3448 177762 : lng lid = lg->tid;
3449 :
3450 177762 : assert(p != BUN_NONE);
3451 177762 : if (p == BUN_NONE) {
3452 : GDKerror("cannot find BAT\n");
3453 : return GDK_FAIL;
3454 : }
3455 :
3456 177762 : assert(lg->catalog_lid->hseqbase == 0);
3457 177762 : return BUNreplace(lg->catalog_lid, p, &lid, false);
3458 : }
3459 :
3460 : /* returns -1 on failure, 0 when not found, > 0 when found */
3461 : log_bid
3462 33018 : log_find_bat(logger *lg, log_id id)
3463 : {
3464 33018 : log_lock(lg);
3465 33018 : log_bid bid = internal_find_bat(lg, id, -1);
3466 33018 : log_unlock(lg);
3467 33018 : if (!bid) {
3468 0 : GDKerror("logger_find_bat failed to find bid for object %d\n", id);
3469 0 : return GDK_FAIL;
3470 : }
3471 : return bid;
3472 : }
3473 :
3474 :
3475 :
3476 : gdk_return
3477 62834 : log_tstart(logger *lg, bool flushnow, ulng *file_id)
3478 : {
3479 62834 : rotation_lock(lg);
3480 62834 : if (flushnow) {
3481 5463 : if (file_id == NULL) {
3482 : /* special case: ask store_manager to rotate log file */
3483 1 : lg->file_age = 0;
3484 1 : rotation_unlock(lg);
3485 1 : return GDK_SUCCEED;
3486 : }
3487 : /* I am now the exclusive flusher */
3488 5462 : while (ATOMIC_GET(&lg->nr_flushers)) {
3489 : /* I am waiting until all existing flushers are done */
3490 0 : MT_cond_wait(&lg->excl_flush_cv, &lg->rotation_lock);
3491 : }
3492 5462 : assert(ATOMIC_GET(&lg->nr_flushers) == 0);
3493 :
3494 5462 : if (ATOMIC_GET(&lg->current->last_ts)) {
3495 2041 : lg->id++;
3496 2041 : if (log_open_output(lg) != GDK_SUCCEED)
3497 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3498 : }
3499 5462 : do_rotate(lg);
3500 5462 : (void) do_flush_range_cleanup(lg);
3501 5462 : rotation_unlock(lg);
3502 :
3503 5462 : if (lg->saved_id + 1 < lg->id)
3504 5040 : log_flush(lg, (1ULL << 63));
3505 5462 : lg->flushnow = flushnow;
3506 : } else {
3507 57371 : if (check_rotation_conditions(lg)) {
3508 4650 : lg->id++;
3509 4650 : if (log_open_output(lg) != GDK_SUCCEED)
3510 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3511 : }
3512 57371 : do_rotate(lg);
3513 57371 : rotation_unlock(lg);
3514 : }
3515 :
3516 62833 : if (LOG_DISABLED(lg))
3517 : return GDK_SUCCEED;
3518 :
3519 57368 : ATOMIC_INC(&lg->current->refcount);
3520 57368 : *file_id = lg->current->id;
3521 57368 : logformat l;
3522 57368 : l.flag = LOG_START;
3523 57368 : l.id = ++lg->tid;
3524 :
3525 57368 : TRC_DEBUG(WAL, "tstart %d\n", lg->tid);
3526 57368 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3527 0 : ATOMIC_DEC(&lg->current->refcount);
3528 0 : return GDK_FAIL;
3529 : }
3530 :
3531 : return GDK_SUCCEED;
3532 : }
3533 :
3534 : void
3535 116 : log_printinfo(logger *lg)
3536 : {
3537 116 : if (!rotation_trylock(lg, 1000)) {
3538 0 : printf("Logger is currently locked, so no logger information\n");
3539 0 : return;
3540 : }
3541 116 : printf("logger %s:\n", lg->fn);
3542 116 : printf("current log file "ULLFMT", last handled log file "ULLFMT"\n",
3543 : lg->id, lg->saved_id);
3544 116 : printf("current transaction id %d, saved transaction id %d\n",
3545 : lg->tid, lg->saved_tid);
3546 116 : printf("number of flushers: %d\n", (int) ATOMIC_GET(&lg->nr_flushers));
3547 116 : printf("number of catalog entries "BUNFMT", of which "BUNFMT" deleted\n",
3548 116 : lg->catalog_bid->batCount, lg->dcatalog->batCount);
3549 329 : for (logged_range *p = lg->pending; p; p = p->next) {
3550 213 : char buf[32];
3551 213 : if ((lg->debug & 128 || lg->inmemory) ||
3552 213 : p->output_log == NULL ||
3553 116 : snprintf(buf, sizeof(buf), ", file size %"PRIu64, (uint64_t) getfilepos(getFile(lg->current->output_log))) >= (int) sizeof(buf))
3554 97 : buf[0] = 0;
3555 310 : printf("pending range "ULLFMT": drops %"PRIu64", last_ts %"PRIu64", flushed_ts %"PRIu64", refcount %"PRIu64"%s%s\n", p->id, (uint64_t) ATOMIC_GET(&p->drops), (uint64_t) ATOMIC_GET(&p->last_ts), (uint64_t) ATOMIC_GET(&p->flushed_ts), (uint64_t) ATOMIC_GET(&p->refcount), buf, p == lg->current ? " (current)" : "");
3556 : }
3557 116 : rotation_unlock(lg);
3558 : }
|