Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 : #include "gdk_logger.h"
17 : #include "gdk_logger_internals.h"
18 : #include "mutils.h"
19 : #include <string.h>
20 :
21 : static gdk_return log_add_bat(logger *lg, BAT *b, log_id id, int tid);
22 : static gdk_return log_del_bat(logger *lg, log_bid bid);
23 : /*
24 : * The logger uses a directory to store its log files. One master log
25 : * file stores information about the version of the logger and the
26 : * type mapping it uses. This file is a simple ascii file with the
27 : * following format:
28 : * {6DIGIT-VERSION\n[id,type_name\n]*}
29 : * The transaction log files have a binary format.
30 : */
31 :
32 : #define LOG_START 0
33 : #define LOG_END 1
34 : #define LOG_UPDATE_CONST 2
35 : #define LOG_UPDATE_BULK 3
36 : #define LOG_UPDATE 4
37 : #define LOG_CREATE 5
38 : #define LOG_DESTROY 6
39 : #define LOG_SEQ 7
40 : #define LOG_CLEAR 8 /* DEPRECATED */
41 : #define LOG_BAT_GROUP 9
42 :
43 : #ifdef NATIVE_WIN32
44 : #define getfilepos _ftelli64
45 : #else
46 : #ifdef HAVE_FSEEKO
47 : #define getfilepos ftello
48 : #else
49 : #define getfilepos ftell
50 : #endif
51 : #endif
52 :
53 : #define BATSIZE 0
54 :
55 : #define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory || (lg)->flushnow)
56 :
57 : static const char *log_commands[] = {
58 : "LOG_START",
59 : "LOG_END",
60 : "LOG_UPDATE_CONST",
61 : "LOG_UPDATE_BULK",
62 : "LOG_UPDATE",
63 : "LOG_CREATE",
64 : "LOG_DESTROY",
65 : "LOG_SEQ",
66 : "", /* LOG_CLEAR IS DEPRECATED */
67 : "LOG_BAT_GROUP",
68 : };
69 :
70 : typedef struct logaction {
71 : int type; /* type of change */
72 : lng nr;
73 : int tt;
74 : lng id;
75 : lng offset;
76 : log_id cid; /* id of object */
77 : BAT *b; /* temporary bat with changes */
78 : BAT *uid; /* temporary bat with bun positions to update */
79 : } logaction;
80 :
81 : /* during the recover process a number of transactions could be active */
82 : typedef struct trans {
83 : int tid; /* transaction id */
84 : int sz; /* sz of the changes array */
85 : int nr; /* nr of changes */
86 :
87 : logaction *changes;
88 :
89 : struct trans *tr;
90 : } trans;
91 :
92 : typedef struct logformat_t {
93 : bte flag;
94 : int id;
95 : } logformat;
96 :
97 : typedef enum { LOG_OK, LOG_EOF, LOG_ERR } log_return;
98 :
99 : static gdk_return bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated);
100 : static gdk_return tr_grow(trans *tr);
101 :
102 : #define log_lock(lg) MT_lock_set(&(lg)->lock)
103 : #define log_unlock(lg) MT_lock_unset(&(lg)->lock)
104 :
105 : static inline bte
106 918530 : find_type(logger *lg, int tpe)
107 : {
108 918530 : assert(tpe >= 0 && tpe < MAXATOMS);
109 918530 : return lg->type_id[tpe];
110 : }
111 :
112 : static inline int
113 550346 : find_type_nr(logger *lg, bte tpe)
114 : {
115 550346 : int nr = lg->type_nr[tpe < 0 ? 256 + tpe : tpe];
116 550346 : if (nr == 255)
117 0 : return -1;
118 : return nr;
119 : }
120 :
121 : static BUN
122 417319 : log_find(BAT *b, BAT *d, int val)
123 : {
124 417319 : BUN p;
125 :
126 417319 : assert(b->ttype == TYPE_int);
127 417319 : assert(d->ttype == TYPE_oid);
128 417319 : BATiter bi = bat_iterator(b);
129 417319 : if (BAThash(b) == GDK_SUCCEED) {
130 417319 : MT_rwlock_rdlock(&b->thashlock);
131 731108 : HASHloop_int(bi, b->thash, p, &val) {
132 183168 : oid pos = p;
133 183168 : if (BUNfnd(d, &pos) == BUN_NONE) {
134 183168 : MT_rwlock_rdunlock(&b->thashlock);
135 183168 : bat_iterator_end(&bi);
136 183168 : return p;
137 : }
138 : }
139 234151 : MT_rwlock_rdunlock(&b->thashlock);
140 : } else { /* unlikely: BAThash failed */
141 0 : int *t = (int *) bi.base;
142 :
143 0 : for (p = 0; p < bi.count; p++) {
144 0 : if (t[p] == val) {
145 0 : oid pos = p;
146 0 : if (BUNfnd(d, &pos) == BUN_NONE) {
147 0 : bat_iterator_end(&bi);
148 0 : return p;
149 : }
150 : }
151 : }
152 : }
153 234151 : bat_iterator_end(&bi);
154 234151 : return BUN_NONE;
155 : }
156 :
157 : static log_bid
158 649603 : internal_find_bat(logger *lg, log_id id, int tid)
159 : {
160 649603 : BUN p;
161 :
162 649603 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
163 649603 : BATiter cni = bat_iterator(lg->catalog_id);
164 649603 : MT_rwlock_rdlock(&cni.b->thashlock);
165 649603 : if (tid < 0) {
166 394712 : HASHloop_int(cni, cni.b->thash, p, &id) {
167 206902 : oid pos = p;
168 206902 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
169 206902 : MT_rwlock_rdunlock(&cni.b->thashlock);
170 206902 : bat_iterator_end(&cni);
171 206902 : return *(log_bid *) Tloc(lg->catalog_bid, p);
172 : }
173 : }
174 : } else {
175 365010 : BUN cp = BUN_NONE;
176 2719955 : HASHloop_int(cni, cni.b->thash, p, &id) {
177 2355237 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
178 2355237 : if (lid != lng_nil && lid <= tid) {
179 : break;
180 : }
181 : cp = p;
182 : }
183 365010 : if (cp != BUN_NONE) {
184 364743 : MT_rwlock_rdunlock(&cni.b->thashlock);
185 364743 : bat_iterator_end(&cni);
186 364743 : return *(log_bid *) Tloc(lg->catalog_bid, cp);
187 : }
188 : }
189 77958 : MT_rwlock_rdunlock(&cni.b->thashlock);
190 77958 : bat_iterator_end(&cni);
191 77958 : return 0; /* not found */
192 : }
193 : return -1; /* error creating hash */
194 : }
195 :
196 : static inline void
197 18586 : logbat_destroy(BAT *b)
198 : {
199 21348 : BBPreclaim(b);
200 9968 : }
201 :
202 : static BAT *
203 11604 : logbat_new(int tt, BUN size, role_t role)
204 : {
205 11604 : BAT *nb = COLnew(0, tt, size, role);
206 :
207 11604 : if (nb) {
208 11604 : BBP_pid(nb->batCacheid) = 0;
209 11604 : if (role == PERSISTENT) {
210 7338 : BATmode(nb, false);
211 7338 : nb = BATsetaccess(nb, BAT_READ);
212 : }
213 : } else {
214 0 : TRC_CRITICAL(GDK, "creating new BAT[%s]#" BUNFMT " failed\n", ATOMname(tt), size);
215 : }
216 11604 : return nb;
217 : }
218 :
219 : static bool
220 761188 : log_read_format(logger *lg, logformat *data)
221 : {
222 761188 : assert(!lg->inmemory);
223 761188 : if (mnstr_read(lg->input_log, &data->flag, 1, 1) == 1) {
224 749837 : if (mnstr_readInt(lg->input_log, &data->id) == 1)
225 : return true;
226 : /* could only read part, so complain */
227 0 : TRC_CRITICAL(GDK, "read failed\n");
228 : }
229 : return false;
230 : }
231 :
232 : static gdk_return
233 758725 : log_write_format(logger *lg, logformat *data)
234 : {
235 758725 : assert(data->id || data->flag);
236 758725 : assert(!lg->inmemory);
237 758725 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
238 1517450 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
239 1517450 : mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
240 758725 : mnstr_writeInt(lg->current->output_log, data->id))
241 : return GDK_SUCCEED;
242 0 : TRC_CRITICAL(GDK, "write failed\n");
243 0 : return GDK_FAIL;
244 : }
245 :
246 : static log_return
247 2524 : log_read_seq(logger *lg, logformat *l)
248 : {
249 2524 : int seq = l->id;
250 2524 : lng val;
251 2524 : BUN p;
252 :
253 2524 : assert(!lg->inmemory);
254 2524 : if (mnstr_readLng(lg->input_log, &val) != 1) {
255 0 : TRC_CRITICAL(GDK, "read failed\n");
256 0 : return LOG_EOF;
257 : }
258 2524 : if (lg->flushing)
259 : return LOG_OK;
260 :
261 55 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
262 54 : p >= lg->seqs_id->batInserted) {
263 22 : assert(lg->seqs_val->hseqbase == 0);
264 22 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
265 0 : TRC_CRITICAL(GDK, "replace of %s_seqs_val failed\n", lg->fn);
266 0 : return LOG_ERR;
267 : }
268 : } else {
269 32 : if (p != BUN_NONE) {
270 32 : oid pos = p;
271 32 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
272 0 : TRC_CRITICAL(GDK, "append to %s_dseqs failed\n", lg->fn);
273 0 : return LOG_ERR;
274 : }
275 : }
276 66 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
277 33 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
278 0 : TRC_CRITICAL(GDK, "append to %s_seqs_val/id failed\n", lg->fn);
279 0 : return LOG_ERR;
280 : }
281 : }
282 : return LOG_OK;
283 : }
284 :
285 : #if 0
286 : static gdk_return
287 : log_write_id(logger *lg, int id)
288 : {
289 : assert(!lg->inmemory);
290 : assert(id >= 0);
291 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
292 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
293 : mnstr_writeInt(lg->current->output_log, id))
294 : return GDK_SUCCEED;
295 : TRC_CRITICAL(GDK, "write failed\n");
296 : return GDK_FAIL;
297 : }
298 :
299 : static log_return
300 : log_read_id(logger *lg, log_id *id)
301 : {
302 : assert(!lg->inmemory);
303 : if (mnstr_readInt(lg->input_log, id) != 1) {
304 : TRC_CRITICAL(GDK, "read failed\n");
305 : return LOG_EOF;
306 : }
307 : return LOG_OK;
308 : }
309 : #endif
310 :
311 : static log_return
312 17846 : string_reader(logger *lg, BAT *b, lng nr)
313 : {
314 17846 : size_t sz = 0;
315 17846 : lng SZ = 0;
316 17846 : log_return res = LOG_OK;
317 :
318 36151 : while (nr && res == LOG_OK) {
319 18305 : if (mnstr_readLng(lg->input_log, &SZ) != 1) {
320 0 : TRC_CRITICAL(GDK, "read failed\n");
321 0 : return LOG_EOF;
322 : }
323 18305 : sz = (size_t) SZ;
324 18305 : char *buf = lg->rbuf;
325 18305 : if (lg->rbufsize < sz) {
326 1 : if (!(buf = GDKrealloc(lg->rbuf, sz))) {
327 0 : TRC_CRITICAL(GDK, "couldn't grow string buffer\n");
328 0 : return LOG_ERR;
329 : }
330 1 : lg->rbuf = buf;
331 1 : lg->rbufsize = sz;
332 : }
333 :
334 18305 : if (mnstr_read(lg->input_log, buf, sz, 1) != 1) {
335 0 : TRC_CRITICAL(GDK, "read failed\n");
336 0 : return LOG_EOF;
337 : }
338 : /* handle strings */
339 : char *t = buf;
340 : /* chunked */
341 : #define CHUNK_SIZE 1024
342 : char *strings[CHUNK_SIZE];
343 : int cur = 0;
344 :
345 102804 : for (; nr > 0 && res == LOG_OK && t < (buf + sz); nr--) {
346 84499 : strings[cur++] = t;
347 84499 : if (cur == CHUNK_SIZE &&
348 4 : b &&
349 4 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
350 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
351 0 : res = LOG_ERR;
352 : }
353 84499 : if (cur == CHUNK_SIZE)
354 33 : cur = 0;
355 : /* find next */
356 5593113 : while (*t)
357 5508614 : t++;
358 84499 : t++;
359 : }
360 18305 : if (cur &&
361 643 : b &&
362 643 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
363 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
364 0 : res = LOG_ERR;
365 : }
366 : }
367 : return res;
368 : }
369 :
370 :
371 : struct offset {
372 : lng os; /* offset within source BAT in logfile */
373 : lng nr; /* number of values to be copied */
374 : lng od; /* offset within destination BAT in database */
375 : };
376 :
377 : static log_return
378 392562 : log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, BAT **cands)
379 : {
380 392562 : log_return res = LOG_OK;
381 392562 : lng nr, pnr;
382 392562 : bte type_id = -1;
383 392562 : int tpe;
384 :
385 392562 : assert(!lg->inmemory);
386 392562 : TRC_DEBUG(WAL, "found %d %s", id, l->flag == LOG_UPDATE ? "update" : "update_buld");
387 :
388 785124 : if (mnstr_readLng(lg->input_log, &nr) != 1 ||
389 392562 : mnstr_read(lg->input_log, &type_id, 1, 1) != 1) {
390 0 : TRC_CRITICAL(GDK, "read failed\n");
391 0 : return LOG_EOF;
392 : }
393 :
394 392562 : pnr = nr;
395 392562 : tpe = find_type_nr(lg, type_id);
396 392562 : if (tpe >= 0) {
397 392562 : BAT *uid = NULL;
398 392562 : BAT *r = NULL;
399 392562 : void *(*rt)(ptr, size_t *, stream *, size_t) = BATatoms[tpe].atomRead;
400 392562 : lng offset;
401 :
402 392562 : assert(nr <= (lng) BUN_MAX);
403 392562 : if (!lg->flushing && l->flag == LOG_UPDATE) {
404 24 : uid = COLnew(0, TYPE_oid, (BUN) nr, PERSISTENT);
405 24 : if (uid == NULL) {
406 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
407 27924 : return LOG_ERR;
408 : }
409 : }
410 :
411 392562 : if (l->flag == LOG_UPDATE_CONST) {
412 127689 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
413 0 : TRC_CRITICAL(GDK, "read failed\n");
414 0 : return LOG_EOF;
415 : }
416 127689 : if (cands) {
417 : /* This const range actually represents a segment of candidates corresponding to updated bat entries */
418 :
419 27924 : if (BATcount(*cands) == 0 || lg->flushing) {
420 : /* when flushing, we only need the offset and count of the last segment of inserts. */
421 27548 : assert((*cands)->ttype == TYPE_void);
422 27548 : BATtseqbase(*cands, (oid) offset);
423 27548 : BATsetcount(*cands, (BUN) nr);
424 376 : } else if (!lg->flushing) {
425 376 : assert(BATcount(*cands) > 0);
426 376 : BAT *dense = BATdense(0, (oid) offset, (BUN) nr);
427 376 : BAT *newcands = NULL;
428 376 : if (!dense) {
429 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
430 0 : res = LOG_ERR;
431 376 : } else if ((*cands)->ttype == TYPE_void) {
432 130 : if ((newcands = BATmergecand(*cands, dense))) {
433 130 : BBPreclaim(*cands);
434 130 : *cands = newcands;
435 : } else {
436 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
437 0 : res = LOG_ERR;
438 : }
439 : } else {
440 246 : assert((*cands)->ttype == TYPE_oid);
441 246 : assert(BATcount(*cands) > 0);
442 246 : if (BATappend(*cands, dense, NULL, true) != GDK_SUCCEED) {
443 0 : TRC_CRITICAL(GDK, "appending to bat failed\n");
444 0 : res = LOG_ERR;
445 : }
446 : }
447 376 : BBPreclaim(dense);
448 : }
449 :
450 : /* We have to read the value to update the read cursor */
451 27924 : size_t tlen = lg->rbufsize;
452 27924 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
453 27924 : if (t == NULL) {
454 0 : TRC_CRITICAL(GDK, "read failed\n");
455 0 : res = LOG_EOF;
456 : }
457 27924 : return res;
458 : }
459 : }
460 :
461 364638 : if (!lg->flushing) {
462 4381 : r = COLnew(0, tpe, (BUN) nr, PERSISTENT);
463 4381 : if (r == NULL) {
464 0 : if (uid)
465 0 : BBPreclaim(uid);
466 0 : return LOG_ERR;
467 : }
468 : }
469 :
470 364638 : if (l->flag == LOG_UPDATE_CONST) {
471 99765 : size_t tlen = lg->rbufsize;
472 99765 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
473 99765 : if (t == NULL) {
474 0 : TRC_CRITICAL(GDK, "read failed\n");
475 0 : res = LOG_EOF;
476 : } else {
477 99765 : lg->rbuf = t;
478 99765 : lg->rbufsize = tlen;
479 99765 : if (r) {
480 19280 : for (BUN p = 0; p < (BUN) nr; p++) {
481 16212 : if (BUNappend(r, t, true) != GDK_SUCCEED) {
482 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
483 0 : res = LOG_ERR;
484 : }
485 : }
486 : }
487 : }
488 264873 : } else if (l->flag == LOG_UPDATE_BULK) {
489 264839 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
490 0 : if (r)
491 0 : BBPreclaim(r);
492 0 : TRC_CRITICAL(GDK, "read failed\n");
493 0 : return LOG_EOF;
494 : }
495 264839 : if (tpe == TYPE_msk) {
496 0 : if (r) {
497 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
498 0 : BATsetcount(r, (BUN) nr);
499 : else {
500 0 : TRC_CRITICAL(GDK, "read failed\n");
501 0 : res = LOG_EOF;
502 : }
503 : } else {
504 0 : size_t tlen = lg->rbufsize / sizeof(int);
505 0 : size_t cnt = 0, snr = (size_t) nr;
506 0 : snr = (snr + 31) / 32;
507 0 : assert(tlen);
508 0 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
509 0 : cnt = snr > tlen ? tlen : snr;
510 0 : if (!mnstr_readIntArray(lg->input_log, lg->rbuf, cnt)) {
511 0 : TRC_CRITICAL(GDK, "read failed\n");
512 0 : res = LOG_EOF;
513 : }
514 : }
515 : }
516 : } else {
517 264839 : if (!ATOMvarsized(tpe)) {
518 246421 : size_t cnt = 0, snr = (size_t) nr;
519 246421 : size_t tlen = lg->rbufsize / ATOMsize(tpe), ntlen = lg->rbufsize;
520 246421 : assert(tlen);
521 : /* read in chunks of max
522 : * BUFSIZE/width rows */
523 505511 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
524 259090 : cnt = snr > tlen ? tlen : snr;
525 259090 : void *t = rt(lg->rbuf, &ntlen, lg->input_log, cnt);
526 :
527 259090 : if (t == NULL) {
528 : res = LOG_EOF;
529 : break;
530 : }
531 259090 : assert(t == lg->rbuf);
532 259090 : if (r && BUNappendmulti(r, t, cnt, true) != GDK_SUCCEED) {
533 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
534 0 : res = LOG_ERR;
535 : }
536 : }
537 18418 : } else if (tpe == TYPE_str) {
538 : /* efficient string */
539 17830 : res = string_reader(lg, r, nr);
540 : } else {
541 1218 : for (; res == LOG_OK && nr > 0; nr--) {
542 630 : size_t tlen = lg->rbufsize;
543 630 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
544 :
545 630 : if (t == NULL) {
546 : /* see if failure was due to
547 : * malloc or something less
548 : * serious (in the current
549 : * context) */
550 0 : if (strstr(GDKerrbuf, "alloc") == NULL)
551 : res = LOG_EOF;
552 : else
553 0 : res = LOG_ERR;
554 0 : TRC_CRITICAL(GDK, "read failed\n");
555 : } else {
556 630 : lg->rbuf = t;
557 630 : lg->rbufsize = tlen;
558 630 : if (r && BUNappend(r, t, true) != GDK_SUCCEED) {
559 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
560 0 : res = LOG_ERR;
561 : }
562 : }
563 : }
564 : }
565 : }
566 : } else {
567 34 : void *(*rh)(ptr, size_t *, stream *, size_t) = BATatoms[TYPE_oid].atomRead;
568 34 : void *hv = ATOMnil(TYPE_oid);
569 34 : offset = 0;
570 :
571 34 : if (hv == NULL) {
572 0 : TRC_CRITICAL(GDK, "read failed\n");
573 0 : res = LOG_EOF;
574 : }
575 7977 : for (; res == LOG_OK && nr > 0; nr--) {
576 7943 : size_t hlen = sizeof(oid);
577 7943 : void *h = rh(hv, &hlen, lg->input_log, 1);
578 7943 : assert(hlen == sizeof(oid));
579 7943 : assert(h == hv);
580 7943 : if ((uid && BUNappend(uid, h, true) != GDK_SUCCEED)) {
581 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
582 0 : res = LOG_ERR;
583 : }
584 : }
585 34 : nr = pnr;
586 34 : if (tpe == TYPE_msk) {
587 0 : if (r) {
588 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
589 0 : BATsetcount(r, (BUN) nr);
590 : else {
591 0 : TRC_CRITICAL(GDK, "read failed\n");
592 0 : res = LOG_EOF;
593 : }
594 : } else {
595 0 : for (lng i = 0; i < nr; i += 32) {
596 0 : int v;
597 0 : switch (mnstr_readInt(lg->input_log, &v)) {
598 0 : case 1:
599 0 : continue;
600 : case 0:
601 : res = LOG_EOF;
602 : break;
603 : default:
604 : res = LOG_ERR;
605 : break;
606 : }
607 0 : TRC_CRITICAL(GDK, "read failed\n");
608 0 : break;
609 : }
610 : }
611 34 : } else if (tpe == TYPE_str) {
612 : /* efficient string */
613 16 : res = string_reader(lg, r, nr);
614 : } else {
615 4625 : for (; res == LOG_OK && nr > 0; nr--) {
616 4607 : size_t tlen = lg->rbufsize;
617 4607 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
618 :
619 4607 : if (t == NULL) {
620 0 : if (strstr(GDKerrbuf, "malloc") == NULL)
621 : res = LOG_EOF;
622 : else
623 0 : res = LOG_ERR;
624 0 : TRC_CRITICAL(GDK, "read failed\n");
625 : } else {
626 4607 : lg->rbuf = t;
627 4607 : lg->rbufsize = tlen;
628 4607 : if ((r && BUNappend(r, t, true) != GDK_SUCCEED)) {
629 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
630 0 : res = LOG_ERR;
631 : }
632 : }
633 : }
634 : }
635 34 : GDKfree(hv);
636 : }
637 :
638 364638 : if (res == LOG_OK) {
639 364638 : if (tr_grow(tr) == GDK_SUCCEED) {
640 364638 : tr->changes[tr->nr].type = l->flag;
641 364638 : if (l->flag == LOG_UPDATE_BULK && offset == -1) {
642 135047 : assert(cands); /* bat r is part of a group of bats logged together. */
643 135047 : struct canditer ci;
644 135047 : canditer_init(&ci, NULL, *cands);
645 135047 : const oid first = canditer_peek(&ci);
646 135047 : const oid last = canditer_last(&ci);
647 135047 : offset = (lng) first;
648 135047 : pnr = (lng) (last - first) + 1;
649 135047 : if (!lg->flushing) {
650 1182 : assert(uid == NULL);
651 1182 : uid = *cands;
652 1182 : BBPfix((*cands)->batCacheid);
653 1182 : tr->changes[tr->nr].type = LOG_UPDATE;
654 : }
655 : }
656 364638 : if (l->flag == LOG_UPDATE_CONST) {
657 99765 : assert(!cands); /* TODO: This might change in the future. */
658 99765 : tr->changes[tr->nr].type = LOG_UPDATE_BULK;
659 : }
660 364638 : tr->changes[tr->nr].nr = pnr;
661 364638 : tr->changes[tr->nr].tt = tpe;
662 364638 : tr->changes[tr->nr].cid = id;
663 364638 : tr->changes[tr->nr].offset = offset;
664 364638 : tr->changes[tr->nr].b = r;
665 364638 : tr->changes[tr->nr].uid = uid;
666 364638 : tr->nr++;
667 : } else {
668 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
669 0 : res = LOG_ERR;
670 : }
671 : }
672 364638 : if (res != LOG_OK) {
673 0 : if (r)
674 0 : BBPreclaim(r);
675 0 : if (cands && uid)
676 0 : BBPunfix((*cands)->batCacheid);
677 0 : else if (uid)
678 0 : BBPreclaim(uid);
679 : }
680 : } else {
681 : /* bat missing ERROR or ignore ? currently error. */
682 0 : TRC_CRITICAL(GDK, "unknown type\n");
683 0 : res = LOG_ERR;
684 : }
685 : return res;
686 : }
687 :
688 :
689 : static gdk_return
690 564507 : la_bat_update_count(logger *lg, log_id id, lng cnt, int tid)
691 : {
692 564507 : BATiter cni = bat_iterator_nolock(lg->catalog_id);
693 :
694 564507 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
695 564507 : MT_rwlock_rdlock(&cni.b->thashlock);
696 564507 : BUN p, cp = BUN_NONE;
697 :
698 3285986 : HASHloop_int(cni, cni.b->thash, p, &id) {
699 2556353 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
700 :
701 2556353 : if (lid != lng_nil && lid <= tid)
702 : break;
703 : cp = p;
704 : }
705 564507 : if (cp != BUN_NONE) {
706 564309 : lng ocnt = *(lng *) Tloc(lg->catalog_cnt, cp);
707 564309 : assert(lg->catalog_cnt->hseqbase == 0);
708 564309 : if (ocnt < cnt && BUNreplace(lg->catalog_cnt, cp, &cnt, false) != GDK_SUCCEED) {
709 0 : MT_rwlock_rdunlock(&cni.b->thashlock);
710 0 : return GDK_FAIL;
711 : }
712 : }
713 564507 : MT_rwlock_rdunlock(&cni.b->thashlock);
714 564507 : return GDK_SUCCEED;
715 : }
716 : return GDK_FAIL;
717 : }
718 :
719 : static gdk_return
720 364638 : la_bat_updates(logger *lg, logaction *la, int tid)
721 : {
722 364638 : log_bid bid = internal_find_bat(lg, la->cid, tid);
723 364638 : BAT *b = NULL;
724 :
725 364638 : if (bid < 0)
726 : return GDK_FAIL;
727 364638 : if (!bid) {
728 : /* object already gone, nothing needed */
729 : return GDK_SUCCEED;
730 : }
731 :
732 364633 : if (!lg->flushing) {
733 4381 : b = BATdescriptor(bid);
734 4381 : if (b == NULL)
735 : return GDK_FAIL;
736 : }
737 364633 : BUN cnt = 0;
738 364633 : if (la->type == LOG_UPDATE_BULK) {
739 363417 : if (!lg->flushing) {
740 3175 : cnt = BATcount(b);
741 3175 : int is_msk = (b->ttype == TYPE_msk);
742 : /* handle offset 0 ie clear */
743 3175 : if ( /* DISABLES CODE */ (0) && la->offset == 0 && cnt)
744 : BATclear(b, true);
745 : /* handle offset */
746 3175 : if (cnt <= (BUN) la->offset) {
747 470 : msk t = 1;
748 470 : if (cnt < (BUN) la->offset) { /* insert nils */
749 0 : const void *tv = (is_msk) ? &t : ATOMnilptr(b->ttype);
750 0 : lng i, d = la->offset - BATcount(b);
751 0 : for (i = 0; i < d; i++) {
752 0 : if (BUNappend(b, tv, true) != GDK_SUCCEED) {
753 0 : logbat_destroy(b);
754 0 : return GDK_FAIL;
755 : }
756 : }
757 : }
758 470 : if (BATcount(b) == (BUN) la->offset && BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
759 0 : logbat_destroy(b);
760 0 : return GDK_FAIL;
761 : }
762 : } else {
763 2705 : BATiter vi = bat_iterator(la->b);
764 2705 : BUN p, q;
765 :
766 8847 : for (p = 0, q = (BUN) la->offset; p < (BUN) la->nr; p++, q++) {
767 10366 : const void *t = BUNtail(vi, p);
768 :
769 6142 : if (q < cnt) {
770 6141 : if (BUNreplace(b, q, t, true) != GDK_SUCCEED) {
771 0 : logbat_destroy(b);
772 0 : bat_iterator_end(&vi);
773 0 : return GDK_FAIL;
774 : }
775 : } else {
776 1 : if (BUNappend(b, t, true) != GDK_SUCCEED) {
777 0 : logbat_destroy(b);
778 0 : bat_iterator_end(&vi);
779 0 : return GDK_FAIL;
780 : }
781 : }
782 : }
783 2705 : bat_iterator_end(&vi);
784 : }
785 : }
786 1216 : } else if (la->type == LOG_UPDATE) {
787 1216 : if (!lg->flushing && BATupdate(b, la->uid, la->b, true) != GDK_SUCCEED) {
788 : return GDK_FAIL;
789 : }
790 : }
791 364633 : cnt = (BUN) (la->offset + la->nr);
792 364633 : if (la_bat_update_count(lg, la->cid, cnt, tid) != GDK_SUCCEED) {
793 0 : if (b)
794 0 : logbat_destroy(b);
795 0 : return GDK_FAIL;
796 : }
797 364633 : if (b)
798 4381 : logbat_destroy(b);
799 : return GDK_SUCCEED;
800 : }
801 :
802 : static log_return
803 11431 : log_read_destroy(logger *lg, trans *tr, log_id id)
804 : {
805 11431 : (void) lg;
806 11431 : assert(!lg->inmemory);
807 11431 : if (tr_grow(tr) == GDK_SUCCEED) {
808 11431 : tr->changes[tr->nr].type = LOG_DESTROY;
809 11431 : tr->changes[tr->nr].cid = id;
810 11431 : tr->nr++;
811 11431 : return LOG_OK;
812 : }
813 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
814 0 : return LOG_ERR;
815 : }
816 :
817 : static gdk_return
818 90 : la_bat_destroy(logger *lg, logaction *la, int tid)
819 : {
820 90 : log_bid bid = internal_find_bat(lg, la->cid, tid);
821 :
822 90 : if (bid < 0)
823 : return GDK_FAIL;
824 90 : if (!bid) {
825 : #ifndef NDEBUG
826 0 : GDKwarning("failed to find bid for object %d\n", la->cid);
827 : #endif
828 0 : return GDK_SUCCEED;
829 : }
830 90 : if (bid && log_del_bat(lg, bid) != GDK_SUCCEED)
831 : return GDK_FAIL;
832 : return GDK_SUCCEED;
833 : }
834 :
835 : static log_return
836 157784 : log_read_create(logger *lg, trans *tr, log_id id)
837 : {
838 157784 : bte tt;
839 157784 : int tpe;
840 :
841 157784 : assert(!lg->inmemory);
842 157784 : TRC_DEBUG(WAL, "create %d", id);
843 :
844 157784 : if (mnstr_read(lg->input_log, &tt, 1, 1) != 1) {
845 0 : TRC_CRITICAL(GDK, "read failed\n");
846 0 : return LOG_EOF;
847 : }
848 :
849 157784 : tpe = find_type_nr(lg, tt);
850 : /* read create */
851 157784 : if (tr_grow(tr) == GDK_SUCCEED) {
852 157784 : tr->changes[tr->nr].type = LOG_CREATE;
853 157784 : tr->changes[tr->nr].tt = tpe;
854 157784 : tr->changes[tr->nr].cid = id;
855 157784 : tr->nr++;
856 157784 : return LOG_OK;
857 : }
858 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
859 0 : return LOG_ERR;
860 : }
861 :
862 : static gdk_return
863 282 : la_bat_create(logger *lg, logaction *la, int tid)
864 : {
865 282 : BAT *b;
866 :
867 : /* formerly head column type, should be void */
868 282 : if ((b = COLnew(0, la->tt, BATSIZE, PERSISTENT)) == NULL)
869 : return GDK_FAIL;
870 :
871 282 : if (la->tt < 0)
872 0 : BATtseqbase(b, 0);
873 :
874 564 : if ((b = BATsetaccess(b, BAT_READ)) == NULL ||
875 282 : log_add_bat(lg, b, la->cid, tid) != GDK_SUCCEED) {
876 0 : logbat_destroy(b);
877 0 : return GDK_FAIL;
878 : }
879 282 : logbat_destroy(b);
880 282 : return GDK_SUCCEED;
881 : }
882 :
883 : static gdk_return
884 237 : log_write_new_types(logger *lg, FILE *fp)
885 : {
886 237 : bte id = 0;
887 :
888 : /* write types and insert into bats */
889 237 : memset(lg->type_id, -1, sizeof(lg->type_id));
890 237 : memset(lg->type_nr, 255, sizeof(lg->type_nr));
891 : /* first the fixed sized types */
892 7344 : for (int i = 0; i < GDKatomcnt; i++) {
893 7107 : if (ATOMvarsized(i))
894 1894 : continue;
895 5213 : lg->type_id[i] = id;
896 5213 : lg->type_nr[id] = i;
897 5213 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
898 : return GDK_FAIL;
899 5213 : id++;
900 : }
901 : /* second the var sized types */
902 : id = -127; /* start after nil */
903 7344 : for (int i = 0; i < GDKatomcnt; i++) {
904 7107 : if (!ATOMvarsized(i))
905 5213 : continue;
906 1894 : lg->type_id[i] = id;
907 1894 : lg->type_nr[256 + id] = i;
908 1894 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
909 : return GDK_FAIL;
910 1894 : id++;
911 : }
912 : return GDK_SUCCEED;
913 : }
914 :
915 : #define TR_SIZE 1024
916 :
917 : static trans *
918 56058 : tr_create(trans *tr, int tid)
919 : {
920 56058 : trans *ntr = GDKmalloc(sizeof(trans));
921 :
922 56058 : if (ntr == NULL)
923 : return NULL;
924 56058 : ntr->tid = tid;
925 56058 : ntr->sz = TR_SIZE;
926 56058 : ntr->nr = 0;
927 56058 : ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
928 56058 : if (ntr->changes == NULL) {
929 0 : GDKfree(ntr);
930 0 : return NULL;
931 : }
932 56058 : ntr->tr = tr;
933 56058 : return ntr;
934 : }
935 :
936 : static gdk_return
937 533853 : la_apply(logger *lg, logaction *c, int tid)
938 : {
939 533853 : gdk_return ret = GDK_SUCCEED;
940 :
941 533853 : switch (c->type) {
942 364638 : case LOG_UPDATE_BULK:
943 : case LOG_UPDATE:
944 364638 : ret = la_bat_updates(lg, c, tid);
945 364638 : break;
946 157784 : case LOG_CREATE:
947 157784 : if (!lg->flushing)
948 282 : ret = la_bat_create(lg, c, tid);
949 : break;
950 11431 : case LOG_DESTROY:
951 11431 : if (!lg->flushing)
952 90 : ret = la_bat_destroy(lg, c, tid);
953 : break;
954 : default:
955 0 : MT_UNREACHABLE();
956 : }
957 533853 : return ret;
958 : }
959 :
960 : static void
961 533853 : la_destroy(logaction *c)
962 : {
963 533853 : if ((c->type == LOG_UPDATE || c->type == LOG_UPDATE_BULK) && c->b)
964 4381 : logbat_destroy(c->b);
965 533853 : if (c->type == LOG_UPDATE && c->uid)
966 1206 : logbat_destroy(c->uid);
967 533853 : }
968 :
969 : static gdk_return
970 533853 : tr_grow(trans *tr)
971 : {
972 533853 : if (tr->nr == tr->sz) {
973 10 : logaction *changes;
974 10 : tr->sz <<= 1;
975 10 : changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
976 10 : if (changes == NULL)
977 : return GDK_FAIL;
978 10 : tr->changes = changes;
979 : }
980 : /* cleanup the next */
981 533853 : tr->changes[tr->nr].b = NULL;
982 533853 : return GDK_SUCCEED;
983 : }
984 :
985 : static trans *
986 56058 : tr_destroy(trans *tr)
987 : {
988 56058 : trans *r = tr->tr;
989 :
990 56058 : GDKfree(tr->changes);
991 56058 : GDKfree(tr);
992 56058 : return r;
993 : }
994 :
995 : static trans *
996 0 : tr_abort_(logger *lg, trans *tr, int s)
997 : {
998 0 : int i;
999 :
1000 0 : (void) lg;
1001 :
1002 0 : TRC_DEBUG(WAL, "abort");
1003 :
1004 0 : for (i = s; i < tr->nr; i++)
1005 0 : la_destroy(&tr->changes[i]);
1006 0 : return tr_destroy(tr);
1007 : }
1008 :
1009 : static trans *
1010 0 : tr_abort(logger *lg, trans *tr)
1011 : {
1012 0 : return tr_abort_(lg, tr, 0);
1013 : }
1014 :
1015 : static trans *
1016 56058 : tr_commit(logger *lg, trans *tr)
1017 : {
1018 56058 : int i;
1019 :
1020 56058 : TRC_DEBUG(WAL, "commit");
1021 :
1022 589911 : for (i = 0; i < tr->nr; i++) {
1023 533853 : if (la_apply(lg, &tr->changes[i], tr->tid) != GDK_SUCCEED) {
1024 0 : TRC_CRITICAL(GDK, "aborting transaction\n");
1025 0 : do {
1026 0 : tr = tr_abort_(lg, tr, i);
1027 0 : } while (tr != NULL);
1028 : return (trans *) -1;
1029 : }
1030 533853 : la_destroy(&tr->changes[i]);
1031 : }
1032 56058 : lg->saved_tid = tr->tid;
1033 56058 : return tr_destroy(tr);
1034 : }
1035 :
1036 : static gdk_return
1037 119 : log_read_types_file(logger *lg, FILE *fp, int version)
1038 : {
1039 119 : int id = 0;
1040 119 : char atom_name[IDLENGTH];
1041 :
1042 : /* scanf should use IDLENGTH somehow */
1043 3667 : while (fscanf(fp, "%d,%63s\n", &id, atom_name) == 2) {
1044 3548 : if (version < 52303 && strcmp(atom_name, "BAT") == 0)
1045 16 : continue;
1046 3532 : int i = ATOMindex(atom_name);
1047 :
1048 3532 : if (id < -127 || id > 127 || i < 0) {
1049 0 : GDKerror("unknown type in log file '%s'\n", atom_name);
1050 0 : return GDK_FAIL;
1051 : }
1052 3532 : lg->type_id[i] = (int8_t) id;
1053 3532 : lg->type_nr[id < 0 ? 256 + id : id] = i;
1054 : }
1055 : return GDK_SUCCEED;
1056 : }
1057 :
1058 :
1059 : gdk_return
1060 237 : log_create_types_file(logger *lg, const char *filename)
1061 : {
1062 237 : FILE *fp;
1063 :
1064 237 : if ((fp = MT_fopen(filename, "w")) == NULL) {
1065 0 : GDKerror("cannot create log file %s\n", filename);
1066 0 : return GDK_FAIL;
1067 : }
1068 237 : if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
1069 0 : fclose(fp);
1070 0 : GDKerror("writing log file %s failed", filename);
1071 0 : if (MT_remove(filename) < 0)
1072 0 : GDKsyserror("remove %s failed\n", filename);
1073 0 : return GDK_FAIL;
1074 : }
1075 :
1076 237 : if (log_write_new_types(lg, fp) != GDK_SUCCEED) {
1077 0 : fclose(fp);
1078 0 : GDKerror("writing log file %s failed", filename);
1079 0 : if (MT_remove(filename) < 0)
1080 0 : GDKsyserror("remove %s failed\n", filename);
1081 0 : return GDK_FAIL;
1082 : }
1083 237 : if (fflush(fp) < 0 || (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)
1084 : #if defined(_MSC_VER)
1085 : && _commit(_fileno(fp)) < 0
1086 : #elif defined(HAVE_FDATASYNC)
1087 0 : && fdatasync(fileno(fp)) < 0
1088 : #elif defined(HAVE_FSYNC)
1089 : && fsync(fileno(fp)) < 0
1090 : #endif
1091 : )) {
1092 0 : GDKsyserror("flushing log file %s failed", filename);
1093 0 : fclose(fp);
1094 0 : if (MT_remove(filename) < 0)
1095 0 : GDKsyserror("remove %s failed\n", filename);
1096 0 : return GDK_FAIL;
1097 : }
1098 237 : if (fclose(fp) < 0) {
1099 0 : GDKsyserror("closing log file %s failed", filename);
1100 0 : if (MT_remove(filename) < 0)
1101 0 : GDKsyserror("remove %s failed\n", filename);
1102 0 : return GDK_FAIL;
1103 : }
1104 : return GDK_SUCCEED;
1105 : }
1106 :
1107 : #define rotation_lock(lg) MT_lock_set(&(lg)->rotation_lock)
1108 : #define rotation_unlock(lg) MT_lock_unset(&(lg)->rotation_lock)
1109 :
1110 : static gdk_return
1111 11602 : log_open_output(logger *lg)
1112 : {
1113 11602 : logged_range *new_range = (logged_range *) GDKmalloc(sizeof(logged_range));
1114 :
1115 11602 : if (!new_range) {
1116 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1117 0 : return GDK_FAIL;
1118 : }
1119 11602 : if (!LOG_DISABLED(lg)) {
1120 11601 : char id[32];
1121 11601 : char *filename;
1122 :
1123 11601 : if (snprintf(id, sizeof(id), LLFMT, lg->id) >= (int) sizeof(id)) {
1124 0 : TRC_CRITICAL(GDK, "filename is too large\n");
1125 0 : GDKfree(new_range);
1126 0 : return GDK_FAIL;
1127 : }
1128 11601 : if ((filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id)) == NULL) {
1129 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1130 0 : GDKfree(new_range);
1131 0 : return GDK_FAIL;
1132 : }
1133 :
1134 11601 : TRC_INFO(WAL, "opening %s.%s", LOGFILE, id);
1135 11601 : new_range->output_log = open_wstream(filename);
1136 11601 : if (new_range->output_log) {
1137 11601 : short byteorder = 1234;
1138 11601 : mnstr_write(new_range->output_log, &byteorder, sizeof(byteorder), 1);
1139 : }
1140 :
1141 11601 : if (new_range->output_log == NULL || mnstr_errnr(new_range->output_log) != MNSTR_NO__ERROR) {
1142 0 : TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, mnstr_peek_error(NULL));
1143 0 : close_stream(new_range->output_log);
1144 0 : GDKfree(new_range);
1145 0 : GDKfree(filename);
1146 0 : return GDK_FAIL;
1147 : }
1148 11601 : GDKfree(filename);
1149 : }
1150 11602 : ATOMIC_INIT(&new_range->refcount, 1);
1151 11602 : ATOMIC_INIT(&new_range->last_ts, 0);
1152 11602 : ATOMIC_INIT(&new_range->flushed_ts, 0);
1153 11602 : ATOMIC_INIT(&new_range->drops, 0);
1154 11602 : new_range->id = lg->id;
1155 11602 : new_range->next = NULL;
1156 11602 : logged_range *current = lg->current;
1157 11602 : assert(current && current->next == NULL);
1158 11602 : new_range->cnt = current->cnt;
1159 11602 : current->next = new_range;
1160 11602 : lg->file_age = GDKusec();
1161 11602 : return GDK_SUCCEED;
1162 : }
1163 :
1164 : static inline void
1165 11815 : log_close_input(logger *lg)
1166 : {
1167 11815 : if (!lg->inmemory && lg->input_log) {
1168 11356 : TRC_INFO(WAL, "closing input log %s", mnstr_name(lg->input_log));
1169 11356 : close_stream(lg->input_log);
1170 : }
1171 11815 : lg->input_log = NULL;
1172 11815 : }
1173 :
1174 : static inline void
1175 340 : log_close_output(logger *lg)
1176 : {
1177 340 : if (!LOG_DISABLED(lg) && lg->current->output_log) {
1178 339 : TRC_INFO(WAL, "closing output log %s", mnstr_name(lg->current->output_log));
1179 339 : close_stream(lg->current->output_log);
1180 : }
1181 340 : lg->current->output_log = NULL;
1182 340 : }
1183 :
1184 : static gdk_return
1185 11475 : log_open_input(logger *lg, const char *filename, bool *filemissing)
1186 : {
1187 11475 : TRC_INFO(WAL, "opening input log %s", filename);
1188 11475 : lg->input_log = open_rstream(filename);
1189 :
1190 : /* if the file doesn't exist, there is nothing to be read back */
1191 11475 : if (lg->input_log == NULL || mnstr_errnr(lg->input_log) != MNSTR_NO__ERROR) {
1192 119 : log_close_input(lg);
1193 119 : *filemissing = true;
1194 119 : return GDK_SUCCEED;
1195 : }
1196 11356 : short byteorder;
1197 11356 : switch (mnstr_read(lg->input_log, &byteorder, sizeof(byteorder), 1)) {
1198 0 : case -1:
1199 0 : log_close_input(lg);
1200 0 : TRC_CRITICAL(GDK, "read failed\n");
1201 0 : return GDK_FAIL;
1202 5 : case 0:
1203 : /* empty file is ok */
1204 5 : log_close_input(lg);
1205 5 : return GDK_SUCCEED;
1206 11351 : case 1:
1207 : /* if not empty, must start with correct byte order mark */
1208 11351 : if (byteorder != 1234) {
1209 0 : TRC_CRITICAL(GDK, "incorrect byte order word in file %s\n", filename);
1210 0 : log_close_input(lg);
1211 0 : return GDK_FAIL;
1212 : }
1213 : break;
1214 : }
1215 : return GDK_SUCCEED;
1216 : }
1217 :
1218 : static log_return
1219 11351 : log_read_transaction(logger *lg, uint32_t *updated, BUN maxupdated)
1220 : {
1221 11351 : logformat l;
1222 11351 : trans *tr = NULL;
1223 11351 : log_return err = LOG_OK;
1224 11351 : bool ok = true;
1225 11351 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
1226 :
1227 11351 : (void) maxupdated; /* only used inside assert() */
1228 :
1229 11351 : if (!lg->flushing)
1230 194 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
1231 :
1232 11351 : BAT *cands = NULL; /* used in case of LOG_BAT_GROUP */
1233 :
1234 761188 : while (err == LOG_OK && (ok = log_read_format(lg, &l))) {
1235 749837 : if (l.flag == 0 && l.id == 0) {
1236 11351 : err = LOG_EOF;
1237 : break;
1238 : }
1239 :
1240 749837 : TRC_DEBUG_IF(WAL) {
1241 0 : if (l.flag > 0 && l.flag != LOG_CLEAR &&
1242 : l.flag < (bte) (sizeof(log_commands) / sizeof(log_commands[0])))
1243 0 : TRC_DEBUG_ENDIF(WAL, "%s %d", log_commands[(int) l.flag], l.id);
1244 : else
1245 0 : TRC_DEBUG_ENDIF(WAL, "%d %d", l.flag, l.id);
1246 : }
1247 749837 : switch (l.flag) {
1248 561777 : case LOG_UPDATE_CONST:
1249 : case LOG_UPDATE_BULK:
1250 : case LOG_UPDATE:
1251 : case LOG_CREATE:
1252 : case LOG_DESTROY:
1253 561777 : if (updated && BAThash(lg->catalog_id) == GDK_SUCCEED) {
1254 556448 : BATiter cni = bat_iterator(lg->catalog_id);
1255 556448 : BUN p;
1256 556448 : BUN posnew = BUN_NONE;
1257 556448 : BUN posold = BUN_NONE;
1258 556448 : MT_rwlock_rdlock(&cni.b->thashlock);
1259 9109360 : HASHloop_int(cni, cni.b->thash, p, &l.id) {
1260 8152873 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
1261 8152873 : if (lid == lng_nil || lid > tr->tid)
1262 : posnew = p;
1263 3934556 : else if (lid == tr->tid)
1264 8510422 : posold = p;
1265 : }
1266 556448 : MT_rwlock_rdunlock(&cni.b->thashlock);
1267 556448 : bat_iterator_end(&cni);
1268 : /* Normally at this point, posnew is the
1269 : * location of the bat that this
1270 : * transaction is working on, and posold
1271 : * is the location of the previous
1272 : * version of the bat. If LOG_CREATE,
1273 : * both are relevant, since the latter
1274 : * is the new bat, and the former is the
1275 : * to-be-destroyed bat. For
1276 : * LOG_DESTROY, only posnew should be
1277 : * relevant, but for the other types, if
1278 : * the table is destroyed later in the
1279 : * same transaction, we need posold, and
1280 : * else (the normal case) we need
1281 : * posnew. */
1282 556448 : if (posnew != BUN_NONE) {
1283 545100 : assert(posnew < maxupdated);
1284 545100 : updated[posnew / 32] |= 1U << (posnew % 32);
1285 : }
1286 556448 : if ((l.flag == LOG_CREATE || posnew == BUN_NONE) && posold != BUN_NONE) {
1287 165222 : assert(posold < maxupdated);
1288 165222 : updated[posold / 32] |= 1U << (posold % 32);
1289 : }
1290 : }
1291 : break;
1292 : default:
1293 : /* do nothing */
1294 : break;
1295 : }
1296 : /* the functions we call here can succeed (LOG_OK),
1297 : * but they can also fail for two different reasons:
1298 : * they can run out of input (LOG_EOF -- this is not
1299 : * serious, we just abort the remaining transactions),
1300 : * or some malloc or BAT update fails (LOG_ERR -- this
1301 : * is serious, we must abort the complete process);
1302 : * the latter failure causes the current function to
1303 : * return GDK_FAIL */
1304 749837 : switch (l.flag) {
1305 56058 : case LOG_START:
1306 56058 : assert(!lg->flushing || l.id <= lg->tid);
1307 56058 : if (!lg->flushing && l.id > lg->tid)
1308 156 : lg->tid = l.id; /* should only happen during initialization */
1309 56058 : if ((tr = tr_create(tr, l.id)) == NULL) {
1310 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
1311 0 : err = LOG_ERR;
1312 0 : break;
1313 : }
1314 56058 : TRC_DEBUG(WAL, "tstart %d\n", tr->tid);
1315 : break;
1316 56058 : case LOG_END:
1317 56058 : if (tr == NULL)
1318 : err = LOG_EOF;
1319 56058 : else if (tr->tid != l.id) /* abort record */
1320 0 : tr = tr_abort(lg, tr);
1321 : else
1322 56058 : tr = tr_commit(lg, tr);
1323 : break;
1324 2524 : case LOG_SEQ:
1325 2524 : err = log_read_seq(lg, &l);
1326 2524 : break;
1327 392562 : case LOG_UPDATE_CONST:
1328 : case LOG_UPDATE_BULK:
1329 : case LOG_UPDATE:
1330 392562 : if (tr == NULL)
1331 : err = LOG_EOF;
1332 : else {
1333 622138 : err = log_read_updates(lg, tr, &l, l.id, cands ? &cands : NULL);
1334 : }
1335 : break;
1336 157784 : case LOG_CREATE:
1337 157784 : if (tr == NULL)
1338 : err = LOG_EOF;
1339 : else
1340 157784 : err = log_read_create(lg, tr, l.id);
1341 : break;
1342 11431 : case LOG_DESTROY:
1343 11431 : if (tr == NULL)
1344 : err = LOG_EOF;
1345 : else
1346 11431 : err = log_read_destroy(lg, tr, l.id);
1347 : break;
1348 73420 : case LOG_BAT_GROUP:
1349 73420 : if (tr == NULL)
1350 : err = LOG_EOF;
1351 : else {
1352 73420 : if (l.id > 0) {
1353 : /* START OF LOG_BAT_GROUP */
1354 36710 : cands = COLnew(0, TYPE_void, 0, SYSTRANS);
1355 36710 : if (!cands) {
1356 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
1357 0 : err = LOG_ERR;
1358 : }
1359 36710 : } else if (cands == NULL) {
1360 : /* should have gone through the
1361 : * above option earlier */
1362 0 : TRC_CRITICAL(GDK, "unexpected error\n");
1363 0 : err = LOG_ERR;
1364 : } else {
1365 : /* END OF LOG_BAT_GROUP */
1366 36710 : BBPunfix(cands->batCacheid);
1367 36710 : cands = NULL;
1368 : }
1369 : }
1370 : break;
1371 0 : default:
1372 0 : TRC_CRITICAL(GDK, "unrecognized log entry %d", l.flag);
1373 0 : err = LOG_ERR;
1374 : }
1375 749837 : if (tr == (trans *) -1) {
1376 : /* message already generated by tr_commit */
1377 : err = LOG_ERR;
1378 11351 : tr = NULL;
1379 : break;
1380 : }
1381 : }
1382 11351 : while (tr) {
1383 0 : TRC_WARNING(GDK, "aborting transaction\n");
1384 0 : tr = tr_abort(lg, tr);
1385 : }
1386 11351 : if (!lg->flushing)
1387 194 : ATOMIC_SET(&GDKdebug, dbg);
1388 :
1389 11351 : BBPreclaim(cands);
1390 11351 : if (!ok)
1391 11351 : return LOG_EOF;
1392 : return err;
1393 : }
1394 :
1395 : static gdk_return
1396 318 : log_readlog(logger *lg, const char *filename, bool *filemissing)
1397 : {
1398 318 : log_return err = LOG_OK;
1399 318 : time_t t0, t1;
1400 318 : struct stat sb;
1401 :
1402 318 : assert(!lg->inmemory);
1403 :
1404 318 : TRC_INFO(WAL, "opening %s\n", filename);
1405 :
1406 318 : gdk_return res = log_open_input(lg, filename, filemissing);
1407 318 : if (!lg->input_log || res != GDK_SUCCEED)
1408 : return res;
1409 194 : int fd;
1410 194 : if ((fd = getFileNo(lg->input_log)) < 0 || fstat(fd, &sb) < 0) {
1411 0 : GDKsyserror("fstat on opened file %s failed\n", filename);
1412 0 : log_close_input(lg);
1413 : /* If the file could be opened, but fstat fails,
1414 : * something weird is going on */
1415 0 : return GDK_FAIL;
1416 : }
1417 194 : t0 = time(NULL);
1418 194 : TRC_INFO_IF(WAL) {
1419 0 : TRC_INFO_ENDIF(WAL, "Start reading the write-ahead log '%s'\n", filename);
1420 0 : GDKtracer_flush_buffer();
1421 : }
1422 388 : while (err != LOG_EOF && err != LOG_ERR) {
1423 194 : t1 = time(NULL);
1424 194 : if (t1 - t0 > 10) {
1425 0 : lng fpos;
1426 0 : t0 = t1;
1427 : /* not more than once every 10 seconds */
1428 0 : fpos = (lng) getfilepos(getFile(lg->input_log));
1429 0 : TRC_INFO_IF(WAL) {
1430 0 : if (fpos >= 0) {
1431 0 : TRC_INFO_ENDIF(WAL, "still reading write-ahead log \"%s\" (%d%% done)\n",
1432 : filename, (int) ((fpos * 100 + 50) / sb.st_size));
1433 0 : GDKtracer_flush_buffer();
1434 : }
1435 : }
1436 : }
1437 194 : err = log_read_transaction(lg, NULL, 0);
1438 : }
1439 194 : log_close_input(lg);
1440 194 : lg->input_log = NULL;
1441 :
1442 : /* remaining transactions are not committed, ie abort */
1443 194 : TRC_INFO_IF(WAL) {
1444 0 : TRC_INFO_ENDIF(WAL, "Finished reading the write-ahead log '%s'\n", filename);
1445 0 : GDKtracer_flush_buffer();
1446 : }
1447 : /* we cannot distinguish errors from incomplete transactions
1448 : * (even if we would log aborts in the logs). So we simply
1449 : * abort and move to the next log file */
1450 194 : return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
1451 : }
1452 :
1453 : /*
1454 : * The log files are incrementally numbered, starting from 2. They are
1455 : * processed in the same sequence.
1456 : */
1457 : static gdk_return
1458 119 : log_readlogs(logger *lg, const char *filename)
1459 : {
1460 119 : gdk_return res = GDK_SUCCEED;
1461 :
1462 119 : assert(!lg->inmemory);
1463 119 : TRC_DEBUG(WAL, "logger id is " LLFMT " last logger id is " LLFMT "\n", lg->id, lg->saved_id);
1464 :
1465 119 : char log_filename[FILENAME_MAX];
1466 119 : if (lg->saved_id >= lg->id) {
1467 119 : bool filemissing = false;
1468 :
1469 119 : lg->id = lg->saved_id + 1;
1470 437 : while (res == GDK_SUCCEED && !filemissing) {
1471 318 : if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
1472 0 : GDKerror("Logger filename path is too large\n");
1473 0 : return GDK_FAIL;
1474 : }
1475 318 : res = log_readlog(lg, log_filename, &filemissing);
1476 318 : if (!filemissing) {
1477 199 : lg->saved_id++;
1478 199 : lg->id++;
1479 : }
1480 : }
1481 : }
1482 : return res;
1483 : }
1484 :
1485 : static gdk_return
1486 10278 : log_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1487 : {
1488 10278 : TRC_DEBUG(WAL, "commit");
1489 :
1490 10278 : return bm_commit(lg, pending, updated, maxupdated);
1491 : }
1492 :
1493 : static gdk_return
1494 119 : check_version(logger *lg, FILE *fp, const char *fn, const char *logdir, const char *filename, bool *needsnew)
1495 : {
1496 119 : int version = 0;
1497 :
1498 119 : assert(!lg->inmemory);
1499 119 : if (fscanf(fp, "%6d", &version) != 1) {
1500 0 : GDKerror("Could not read the version number from the file '%s/log'.\n", lg->dir);
1501 0 : fclose(fp);
1502 0 : return GDK_FAIL;
1503 : }
1504 119 : if (version < 52300) { /* first CATALOG_VERSION for "new" log format */
1505 0 : lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
1506 0 : lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
1507 0 : lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
1508 0 : if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
1509 0 : GDKerror("cannot create catalog bats");
1510 0 : fclose(fp);
1511 0 : return GDK_FAIL;
1512 : }
1513 : /* old_logger_load always closes fp */
1514 0 : if (old_logger_load(lg, fn, logdir, fp, version, filename) != GDK_SUCCEED) {
1515 : /*loads drop no longer needed catalog, snapshots bats */
1516 : /*convert catalog_oid -> catalog_id (lng->int) */
1517 0 : GDKerror("Incompatible database version %06d, "
1518 : "this server supports version %06d.\n%s",
1519 : version, lg->version,
1520 : version <
1521 : lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
1522 0 : return GDK_FAIL;
1523 : }
1524 0 : *needsnew = false; /* already written a new log file */
1525 0 : return GDK_SUCCEED;
1526 119 : } else if (version != lg->version) {
1527 32 : if (lg->prefuncp == NULL ||
1528 16 : (*lg->prefuncp) (lg->funcdata, version, lg->version) != GDK_SUCCEED) {
1529 0 : GDKerror("Incompatible database version %06d, "
1530 : "this server supports version %06d.\n%s",
1531 : version, lg->version,
1532 : version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
1533 0 : fclose(fp);
1534 0 : return GDK_FAIL;
1535 : }
1536 16 : *needsnew = true; /* we need to write a new log file */
1537 : } else {
1538 103 : lg->postfuncp = NULL; /* don't call */
1539 103 : *needsnew = false; /* log file already up-to-date */
1540 : }
1541 238 : if (fgetc(fp) != '\n' || /* skip \n */
1542 119 : fgetc(fp) != '\n') { /* skip \n */
1543 0 : GDKerror("Badly formatted log file");
1544 0 : fclose(fp);
1545 0 : return GDK_FAIL;
1546 : }
1547 119 : if (log_read_types_file(lg, fp, version) != GDK_SUCCEED) {
1548 0 : fclose(fp);
1549 0 : return GDK_FAIL;
1550 : }
1551 119 : fclose(fp);
1552 119 : return GDK_SUCCEED;
1553 : }
1554 :
1555 : static BAT *
1556 315 : bm_tids(BAT *b, BAT *d)
1557 : {
1558 315 : BUN sz = BATcount(b);
1559 315 : BAT *tids = BATdense(0, 0, sz);
1560 :
1561 315 : if (tids == NULL)
1562 : return NULL;
1563 :
1564 315 : if (BATcount(d)) {
1565 315 : BAT *diff = BATdiff(tids, d, NULL, NULL, false, false, BUN_NONE);
1566 315 : logbat_destroy(tids);
1567 315 : tids = diff;
1568 : }
1569 : return tids;
1570 : }
1571 :
1572 :
1573 : static gdk_return
1574 6006 : log_switch_bat(BAT *old, BAT *new, const char *fn, const char *name)
1575 : {
1576 6006 : char bak[IDLENGTH];
1577 :
1578 6006 : if (BATmode(old, true) != GDK_SUCCEED) {
1579 0 : GDKerror("cannot convert old %s to transient", name);
1580 0 : return GDK_FAIL;
1581 : }
1582 6006 : if (strconcat_len(bak, sizeof(bak), fn, "_", name, NULL) >= sizeof(bak)) {
1583 0 : GDKerror("name %s_%s too long\n", fn, name);
1584 0 : return GDK_FAIL;
1585 : }
1586 6006 : if (BBPrename(old, NULL) != 0 || BBPrename(new, bak) != 0) {
1587 0 : GDKerror("rename (%s) failed\n", bak);
1588 0 : return GDK_FAIL;
1589 : }
1590 6006 : BBPretain(new->batCacheid);
1591 6006 : return GDK_SUCCEED;
1592 : }
1593 :
1594 : static gdk_return
1595 341 : bm_get_counts(logger *lg)
1596 : {
1597 341 : BUN p, q;
1598 341 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
1599 :
1600 30511 : BATloop(lg->catalog_bid, p, q) {
1601 30170 : oid pos = p;
1602 30170 : lng cnt = 0;
1603 30170 : lng lid = lng_nil;
1604 :
1605 30170 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
1606 30170 : BAT *b = BBPquickdesc(bids[p]);
1607 30170 : assert(b);
1608 30170 : cnt = BATcount(b);
1609 : } else {
1610 0 : lid = BBP_desc(bids[p])->batCacheid != 0 && log_find(lg->catalog_bid, lg->dcatalog, bids[p]) == BUN_NONE ? 1 : -1;
1611 : }
1612 30170 : if (BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED)
1613 0 : return GDK_FAIL;
1614 30170 : if (BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
1615 : return GDK_FAIL;
1616 : }
1617 : return GDK_SUCCEED;
1618 : }
1619 :
1620 : static int
1621 6006 : subcommit_list_add(int next, bat *n, BUN *sizes, bat bid, BUN sz)
1622 : {
1623 6006 : assert(sz <= BBP_desc(bid)->batCount || sz == BUN_NONE);
1624 1329916 : for (int i = 0; i < next; i++) {
1625 1323910 : if (n[i] == bid) {
1626 0 : sizes[i] = sz;
1627 0 : return next;
1628 : }
1629 : }
1630 6006 : n[next] = bid;
1631 6006 : sizes[next++] = sz;
1632 6006 : return next;
1633 : }
1634 :
1635 : static int
1636 1792 : cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng *cnts,
1637 : BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, BUN cleanup)
1638 : {
1639 1792 : BAT *nbids, *noids, *ncnts, *nlids, *ndels;
1640 1792 : BUN p, q;
1641 1792 : int err = 0, rcnt = 0;
1642 :
1643 1792 : BUN ocnt = BATcount(catalog_bid);
1644 1792 : nbids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1645 1792 : noids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1646 1792 : ncnts = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1647 1792 : nlids = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1648 1792 : ndels = logbat_new(TYPE_oid, BATcount(dcatalog) - cleanup, PERSISTENT);
1649 :
1650 1792 : if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == NULL || ndels == NULL) {
1651 0 : logbat_destroy(nbids);
1652 0 : logbat_destroy(noids);
1653 0 : logbat_destroy(ncnts);
1654 0 : logbat_destroy(nlids);
1655 0 : logbat_destroy(ndels);
1656 0 : return 0;
1657 : }
1658 :
1659 1792 : oid *poss = Tloc(dcatalog, 0);
1660 177346 : BATloop(dcatalog, p, q) {
1661 175554 : oid pos = poss[p];
1662 :
1663 175554 : if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid)
1664 0 : continue;
1665 :
1666 175554 : if (lids[pos] >= 0) {
1667 175554 : bat bid = bids[pos];
1668 175554 : BAT *lb = BBP_desc(bid);
1669 :
1670 175554 : if (lb->batCacheid == 0 || BATmode(lb, true /*transient */ ) != GDK_SUCCEED) {
1671 0 : GDKwarning("Failed to set bat(%d) transient\n", bid);
1672 : } else {
1673 175554 : lids[pos] = -1; /* mark as transient */
1674 175554 : r[rcnt++] = bid;
1675 : }
1676 : }
1677 : }
1678 :
1679 1792 : int *oids = (int *) Tloc(catalog_id, 0);
1680 1792 : q = BATcount(catalog_bid);
1681 653556 : for (p = 0; p < q && !err; p++) {
1682 651764 : bat col = bids[p];
1683 651764 : int nid = oids[p];
1684 651764 : lng lid = lids[p];
1685 651764 : lng cnt = cnts[p];
1686 651764 : oid pos = p;
1687 :
1688 : /* only project out the deleted with lid == -1
1689 : * update dcatalog */
1690 651764 : if (lid == -1)
1691 175554 : continue; /* remove */
1692 :
1693 952420 : if (BUNappend(nbids, &col, true) != GDK_SUCCEED ||
1694 952420 : BUNappend(noids, &nid, true) != GDK_SUCCEED ||
1695 952420 : BUNappend(nlids, &lid, false) != GDK_SUCCEED ||
1696 476210 : BUNappend(ncnts, &cnt, false) != GDK_SUCCEED)
1697 : err = 1;
1698 476210 : if (BUNfnd(lg->dcatalog, &pos) != BUN_NONE) {
1699 0 : pos = (oid) (BATcount(nbids) - 1);
1700 0 : if (BUNappend(ndels, &pos, true) != GDK_SUCCEED)
1701 476210 : err = 1;
1702 : }
1703 : }
1704 :
1705 1792 : if (err) {
1706 0 : logbat_destroy(nbids);
1707 0 : logbat_destroy(noids);
1708 0 : logbat_destroy(ndels);
1709 0 : logbat_destroy(ncnts);
1710 0 : logbat_destroy(nlids);
1711 0 : return 0;
1712 : }
1713 : /* point of no return */
1714 3584 : if (log_switch_bat(catalog_bid, nbids, lg->fn, "catalog_bid") != GDK_SUCCEED ||
1715 3584 : log_switch_bat(catalog_id, noids, lg->fn, "catalog_id") != GDK_SUCCEED ||
1716 1792 : log_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != GDK_SUCCEED) {
1717 0 : logbat_destroy(nbids);
1718 0 : logbat_destroy(noids);
1719 0 : logbat_destroy(ndels);
1720 0 : logbat_destroy(ncnts);
1721 0 : logbat_destroy(nlids);
1722 0 : return -1;
1723 : }
1724 1792 : r[rcnt++] = lg->catalog_bid->batCacheid;
1725 1792 : r[rcnt++] = lg->catalog_id->batCacheid;
1726 1792 : r[rcnt++] = lg->dcatalog->batCacheid;
1727 :
1728 1792 : assert(BATcount(lg->dcatalog) - cleanup == BATcount(ndels));
1729 :
1730 1792 : logbat_destroy(lg->catalog_bid);
1731 1792 : logbat_destroy(lg->catalog_id);
1732 1792 : logbat_destroy(lg->dcatalog);
1733 :
1734 1792 : lg->catalog_bid = nbids;
1735 1792 : lg->catalog_id = noids;
1736 1792 : lg->dcatalog = ndels;
1737 :
1738 : /* failing to rename these two bats is not fatal */
1739 1792 : if (BBPrename(lg->catalog_cnt, NULL) != GDK_SUCCEED)
1740 1792 : GDKclrerr();
1741 1792 : if (BBPrename(lg->catalog_lid, NULL) != GDK_SUCCEED)
1742 1792 : GDKclrerr();
1743 1792 : BBPunfix(lg->catalog_cnt->batCacheid);
1744 1792 : BBPunfix(lg->catalog_lid->batCacheid);
1745 :
1746 1792 : lg->catalog_cnt = ncnts;
1747 1792 : lg->catalog_lid = nlids;
1748 1792 : char bak[FILENAME_MAX];
1749 1792 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_cnt", NULL);
1750 1792 : if (BBPrename(lg->catalog_cnt, bak) < 0)
1751 0 : GDKclrerr();
1752 1792 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_lid", NULL);
1753 1792 : if (BBPrename(lg->catalog_lid, bak) < 0)
1754 0 : GDKclrerr();
1755 1792 : rotation_lock(lg);
1756 8693 : for (logged_range *p = lg->pending; p; p = p->next) {
1757 6901 : p->cnt -= cleanup;
1758 : }
1759 1792 : rotation_unlock(lg);
1760 1792 : return rcnt;
1761 : }
1762 :
1763 : /* this function is called with log_lock() held; it releases the lock
1764 : * before returning */
1765 : static gdk_return
1766 10722 : bm_subcommit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1767 : {
1768 10722 : BUN cnt = pending ? pending->cnt : BATcount(lg->catalog_bid);
1769 10722 : BUN dcnt = BATcount(lg->dcatalog);
1770 10722 : BUN p, q;
1771 10722 : BAT *catalog_bid = lg->catalog_bid;
1772 10722 : BAT *catalog_id = lg->catalog_id;
1773 10722 : BAT *dcatalog = lg->dcatalog;
1774 10722 : BUN nn = 13 + cnt;
1775 10722 : bat *n = GDKmalloc(sizeof(bat) * nn);
1776 10722 : bat *r = GDKmalloc(sizeof(bat) * nn);
1777 10722 : BUN *sizes = GDKmalloc(sizeof(BUN) * nn);
1778 10722 : int i = 0, rcnt = 0;
1779 10722 : gdk_return res;
1780 10722 : const log_bid *bids;
1781 10722 : lng *cnts = NULL, *lids = NULL;
1782 10722 : BUN cleanup = 0;
1783 10722 : lng t0 = 0;
1784 :
1785 10722 : if (n == NULL || r == NULL || sizes == NULL) {
1786 0 : GDKfree(n);
1787 0 : GDKfree(r);
1788 0 : GDKfree(sizes);
1789 0 : log_unlock(lg);
1790 0 : return GDK_FAIL;
1791 : }
1792 :
1793 10722 : sizes[i] = 0;
1794 10722 : n[i++] = 0; /* n[0] is not used */
1795 10722 : bids = (const log_bid *) Tloc(catalog_bid, 0);
1796 10722 : if (lg->catalog_cnt)
1797 10500 : cnts = (lng *) Tloc(lg->catalog_cnt, 0);
1798 10722 : if (lg->catalog_lid)
1799 10500 : lids = (lng *) Tloc(lg->catalog_lid, 0);
1800 3132978 : BATloop(catalog_bid, p, q) {
1801 3122256 : if (lids && lids[p] != lng_nil && lids[p] <= lg->saved_tid) {
1802 175554 : cleanup++;
1803 175554 : if (lids[p] == -1)
1804 0 : continue;
1805 175554 : if (BUNappend(dcatalog, &(oid){p}, true) != GDK_SUCCEED) {
1806 0 : while (BATcount(dcatalog) > dcnt) {
1807 0 : if (BUNdelete(dcatalog, BATcount(dcatalog) - 1) != GDK_SUCCEED) {
1808 0 : TRC_CRITICAL(WAL, "delete after failed append failed\n");
1809 0 : break;
1810 : }
1811 : }
1812 0 : GDKfree(n);
1813 0 : GDKfree(r);
1814 0 : GDKfree(sizes);
1815 0 : log_unlock(lg);
1816 0 : return GDK_FAIL;
1817 : }
1818 : }
1819 3122256 : if (updated && p < maxupdated && (updated[p / 32] & (1U << (p % 32))) == 0) {
1820 1490405 : continue;
1821 : }
1822 1631851 : bat col = bids[p];
1823 :
1824 1631851 : TRC_DEBUG(WAL, "new %s (%d)\n", BBP_logical(col), col);
1825 1631851 : assert(col);
1826 1631851 : sizes[i] = cnts ? (BUN) cnts[p] : 0;
1827 1631851 : n[i++] = col;
1828 : }
1829 : /* now commit catalog, so it's also up to date on disk */
1830 10722 : sizes[i] = cnt;
1831 10722 : n[i++] = catalog_bid->batCacheid;
1832 10722 : sizes[i] = cnt;
1833 10722 : n[i++] = catalog_id->batCacheid;
1834 10722 : sizes[i] = BATcount(dcatalog);
1835 10722 : n[i++] = dcatalog->batCacheid;
1836 :
1837 10722 : if (cleanup) {
1838 1792 : if ((rcnt = cleanup_and_swap(lg, r, bids, lids, cnts,
1839 : catalog_bid, catalog_id, dcatalog,
1840 : cleanup)) < 0) {
1841 0 : GDKfree(n);
1842 0 : GDKfree(r);
1843 0 : GDKfree(sizes);
1844 0 : log_unlock(lg);
1845 0 : return GDK_FAIL;
1846 : }
1847 1792 : cnt -= cleanup;
1848 : }
1849 10722 : if (dcatalog != lg->dcatalog) {
1850 1792 : i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, cnt);
1851 1792 : i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, cnt);
1852 1792 : i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog));
1853 : }
1854 10722 : if (lg->seqs_id) {
1855 10500 : sizes[i] = BATcount(lg->seqs_id);
1856 10500 : n[i++] = lg->seqs_id->batCacheid;
1857 10500 : sizes[i] = BATcount(lg->seqs_id);
1858 10500 : n[i++] = lg->seqs_val->batCacheid;
1859 : }
1860 10722 : if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id) / 2) && BATcount(lg->dseqs) > 10) {
1861 315 : BAT *tids, *ids, *vals;
1862 :
1863 315 : tids = bm_tids(lg->seqs_id, lg->dseqs);
1864 315 : if (tids == NULL) {
1865 0 : GDKfree(n);
1866 0 : GDKfree(r);
1867 0 : GDKfree(sizes);
1868 0 : log_unlock(lg);
1869 0 : return GDK_FAIL;
1870 : }
1871 315 : ids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
1872 315 : vals = logbat_new(TYPE_lng, BATcount(tids), PERSISTENT);
1873 :
1874 315 : if (ids == NULL || vals == NULL) {
1875 0 : logbat_destroy(tids);
1876 0 : logbat_destroy(ids);
1877 0 : logbat_destroy(vals);
1878 0 : GDKfree(n);
1879 0 : GDKfree(r);
1880 0 : GDKfree(sizes);
1881 0 : log_unlock(lg);
1882 0 : return GDK_FAIL;
1883 : }
1884 :
1885 630 : if (BATappend(ids, lg->seqs_id, tids, true) != GDK_SUCCEED ||
1886 315 : BATappend(vals, lg->seqs_val, tids, true) != GDK_SUCCEED) {
1887 0 : logbat_destroy(tids);
1888 0 : logbat_destroy(ids);
1889 0 : logbat_destroy(vals);
1890 0 : GDKfree(n);
1891 0 : GDKfree(r);
1892 0 : GDKfree(sizes);
1893 0 : log_unlock(lg);
1894 0 : return GDK_FAIL;
1895 : }
1896 315 : logbat_destroy(tids);
1897 315 : BATclear(lg->dseqs, true);
1898 :
1899 630 : if (log_switch_bat(lg->seqs_id, ids, lg->fn, "seqs_id") != GDK_SUCCEED ||
1900 315 : log_switch_bat(lg->seqs_val, vals, lg->fn, "seqs_val") != GDK_SUCCEED) {
1901 0 : logbat_destroy(ids);
1902 0 : logbat_destroy(vals);
1903 0 : GDKfree(n);
1904 0 : GDKfree(r);
1905 0 : GDKfree(sizes);
1906 0 : log_unlock(lg);
1907 0 : return GDK_FAIL;
1908 : }
1909 315 : i = subcommit_list_add(i, n, sizes, ids->batCacheid, BATcount(ids));
1910 315 : i = subcommit_list_add(i, n, sizes, vals->batCacheid, BATcount(ids));
1911 :
1912 315 : if (BBP_lrefs(lg->seqs_id->batCacheid) > 0)
1913 251 : r[rcnt++] = lg->seqs_id->batCacheid;
1914 315 : if (BBP_lrefs(lg->seqs_val->batCacheid) > 0)
1915 251 : r[rcnt++] = lg->seqs_val->batCacheid;
1916 :
1917 315 : logbat_destroy(lg->seqs_id);
1918 315 : logbat_destroy(lg->seqs_val);
1919 :
1920 315 : lg->seqs_id = ids;
1921 315 : lg->seqs_val = vals;
1922 : }
1923 10722 : if (lg->seqs_id) {
1924 10500 : sizes[i] = BATcount(lg->dseqs);
1925 10500 : n[i++] = lg->dseqs->batCacheid;
1926 : }
1927 :
1928 10722 : assert((BUN) i <= nn);
1929 10722 : log_unlock(lg);
1930 10722 : TRC_DEBUG_IF(WAL)
1931 0 : t0 = GDKusec();
1932 10944 : res = TMsubcommit_list(n, cnts ? sizes : NULL, i, lg->saved_id);
1933 10722 : TRC_DEBUG(WAL, "subcommit " LLFMT "usec\n", GDKusec() - t0);
1934 10722 : if (res == GDK_SUCCEED) { /* now cleanup */
1935 192154 : for (i = 0; i < rcnt; i++) {
1936 181432 : TRC_DEBUG_IF(WAL) {
1937 0 : TRC_DEBUG_ENDIF(WAL, "release %d\n", r[i]);
1938 0 : if (BBP_lrefs(r[i]) != 2)
1939 0 : TRC_DEBUG_ENDIF(WAL, "release %d %d\n", r[i], BBP_lrefs(r[i]));
1940 : }
1941 181432 : BBPrelease(r[i]);
1942 : }
1943 : }
1944 10722 : GDKfree(n);
1945 10722 : GDKfree(r);
1946 10722 : GDKfree(sizes);
1947 10722 : if (res != GDK_SUCCEED)
1948 0 : TRC_CRITICAL(GDK, "commit failed\n");
1949 : return res;
1950 : }
1951 :
1952 : static gdk_return
1953 340 : log_filename(logger *lg, char bak[FILENAME_MAX], char filename[FILENAME_MAX])
1954 : {
1955 340 : str filenamestr = NULL;
1956 :
1957 340 : if ((filenamestr = GDKfilepath(0, lg->dir, LOGFILE, NULL)) == NULL)
1958 : return GDK_FAIL;
1959 340 : size_t len = strcpy_len(filename, filenamestr, FILENAME_MAX);
1960 340 : GDKfree(filenamestr);
1961 340 : if (len >= FILENAME_MAX) {
1962 0 : GDKerror("Logger filename path is too large\n");
1963 0 : return GDK_FAIL;
1964 : }
1965 340 : if (bak) {
1966 340 : len = strconcat_len(bak, FILENAME_MAX, filename, ".bak", NULL);
1967 340 : if (len >= FILENAME_MAX) {
1968 0 : GDKerror("Logger filename path is too large\n");
1969 0 : return GDK_FAIL;
1970 : }
1971 : }
1972 : return GDK_SUCCEED;
1973 : }
1974 :
1975 : static gdk_return
1976 11356 : log_cleanup(logger *lg, lng id)
1977 : {
1978 11356 : char log_id[FILENAME_MAX];
1979 :
1980 11356 : if (snprintf(log_id, sizeof(log_id), LLFMT, id) >= FILENAME_MAX) {
1981 0 : GDKerror("log_id filename is too large\n");
1982 0 : return GDK_FAIL;
1983 : }
1984 11356 : if (GDKunlink(0, lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
1985 0 : GDKwarning("failed to remove old WAL %s.%s\n", LOGFILE, log_id);
1986 0 : GDKclrerr(); /* clear error from unlink */
1987 : }
1988 : return GDK_SUCCEED;
1989 : }
1990 :
1991 : #ifdef GDKLIBRARY_JSON
1992 : static gdk_return
1993 341 : log_json_upgrade_finalize(void)
1994 : {
1995 341 : int json_tpe = ATOMindex("json");
1996 681 : if (!GDKinmemory(0) &&
1997 340 : GDKunlink(0, BATDIR, "jsonupgradeneeded", NULL) == GDK_FAIL) {
1998 0 : TRC_CRITICAL(GDK, "Failed to remove json upgrade signal file");
1999 0 : return GDK_FAIL;
2000 : }
2001 341 : BATatoms[json_tpe].atomRead = (void *(*)(void *, size_t *, stream *, size_t))strRead;
2002 :
2003 341 : return GDK_SUCCEED;
2004 : }
2005 : #endif
2006 :
2007 : /* Load data from the logger logdir
2008 : * Initialize new directories and catalog files if none are present,
2009 : * unless running in read-only mode
2010 : * Load data and persist it in the BATs */
2011 : static gdk_return
2012 341 : log_load(const char *fn, const char *logdir, logger *lg, char filename[FILENAME_MAX])
2013 : {
2014 341 : FILE *fp = NULL;
2015 341 : char bak[FILENAME_MAX];
2016 341 : bat catalog_bid, catalog_id, dcatalog;
2017 341 : bool needcommit = false;
2018 341 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
2019 341 : bool readlogs = false;
2020 341 : bool needsnew = false; /* need to write new log file? */
2021 :
2022 : /* refactor */
2023 341 : if (!LOG_DISABLED(lg)) {
2024 340 : if (log_filename(lg, bak, filename) != GDK_SUCCEED)
2025 0 : goto error;
2026 : }
2027 :
2028 341 : lg->catalog_bid = NULL;
2029 341 : lg->catalog_id = NULL;
2030 341 : lg->catalog_cnt = NULL;
2031 341 : lg->catalog_lid = NULL;
2032 341 : lg->dcatalog = NULL;
2033 :
2034 341 : lg->seqs_id = NULL;
2035 341 : lg->seqs_val = NULL;
2036 341 : lg->dseqs = NULL;
2037 :
2038 341 : if (!LOG_DISABLED(lg)) {
2039 : /* try to open logfile backup, or failing that, the file
2040 : * itself. we need to know whether this file exists when
2041 : * checking the database consistency later on */
2042 340 : if ((fp = MT_fopen(bak, "r")) != NULL) {
2043 0 : fclose(fp);
2044 0 : fp = NULL;
2045 0 : if (GDKunlink(0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED ||
2046 0 : GDKmove(0, lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL, true) != GDK_SUCCEED)
2047 0 : goto error;
2048 340 : } else if (errno != ENOENT) {
2049 0 : GDKsyserror("open %s failed", bak);
2050 0 : goto error;
2051 : }
2052 340 : fp = MT_fopen(filename, "r");
2053 340 : if (fp == NULL && errno != ENOENT) {
2054 0 : GDKsyserror("open %s failed", filename);
2055 0 : goto error;
2056 : }
2057 : }
2058 :
2059 341 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2060 341 : catalog_bid = BBPindex(bak);
2061 :
2062 : /* initialize arrays for type mapping, to be read from disk */
2063 341 : memset(lg->type_id, -1, sizeof(lg->type_id));
2064 341 : memset(lg->type_nr, 255, sizeof(lg->type_nr));
2065 :
2066 : /* this is intentional - if catalog_bid is 0, force it to find
2067 : * the persistent catalog */
2068 341 : if (catalog_bid == 0) {
2069 : /* catalog does not exist, so the log file also
2070 : * shouldn't exist */
2071 222 : if (fp != NULL) {
2072 0 : GDKerror("there is no logger catalog, "
2073 : "but there is a log file.\n");
2074 0 : goto error;
2075 : }
2076 :
2077 222 : lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2078 222 : lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2079 222 : lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
2080 :
2081 222 : if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
2082 0 : GDKerror("cannot create catalog bats");
2083 0 : goto error;
2084 : }
2085 222 : TRC_INFO(WAL, "create %s catalog\n", fn);
2086 :
2087 : /* give the catalog bats names so we can find them
2088 : * next time */
2089 222 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2090 222 : if (BBPrename(lg->catalog_bid, bak) < 0) {
2091 0 : goto error;
2092 : }
2093 :
2094 222 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2095 222 : if (BBPrename(lg->catalog_id, bak) < 0) {
2096 0 : goto error;
2097 : }
2098 :
2099 222 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2100 222 : if (BBPrename(lg->dcatalog, bak) < 0) {
2101 0 : goto error;
2102 : }
2103 :
2104 222 : if (!LOG_DISABLED(lg)) {
2105 221 : if (GDKcreatedir(filename) != GDK_SUCCEED) {
2106 0 : GDKerror("cannot create directory for log file %s\n", filename);
2107 0 : goto error;
2108 : }
2109 221 : if (log_create_types_file(lg, filename) != GDK_SUCCEED)
2110 0 : goto error;
2111 : }
2112 :
2113 222 : BBPretain(lg->catalog_bid->batCacheid);
2114 222 : BBPretain(lg->catalog_id->batCacheid);
2115 222 : BBPretain(lg->dcatalog->batCacheid);
2116 :
2117 222 : log_lock(lg);
2118 : /* bm_subcommit releases the lock */
2119 222 : if (bm_subcommit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2120 : /* cannot commit catalog, so remove log */
2121 0 : if (MT_remove(filename) < 0)
2122 0 : GDKsyserror("remove %s failed\n", filename);
2123 0 : BBPrelease(lg->catalog_bid->batCacheid);
2124 0 : BBPrelease(lg->catalog_id->batCacheid);
2125 0 : BBPrelease(lg->dcatalog->batCacheid);
2126 0 : goto error;
2127 : }
2128 : } else {
2129 : /* find the persistent catalog. As non persistent bats
2130 : * require a logical reference we also add a logical
2131 : * reference for the persistent bats */
2132 119 : BUN p, q;
2133 119 : BAT *b, *o, *d;
2134 :
2135 119 : assert(!lg->inmemory);
2136 :
2137 : /* the catalog exists, and so should the log file */
2138 119 : if (fp == NULL && !LOG_DISABLED(lg)) {
2139 0 : GDKerror("There is a logger catalog, but no log file.\n");
2140 0 : goto error;
2141 : }
2142 : if (fp != NULL) {
2143 : /* check_version always closes fp */
2144 119 : if (check_version(lg, fp, fn, logdir, filename, &needsnew) != GDK_SUCCEED) {
2145 0 : fp = NULL;
2146 0 : goto error;
2147 : }
2148 : readlogs = true;
2149 : fp = NULL;
2150 : }
2151 :
2152 119 : if (lg->catalog_bid == NULL && lg->catalog_id == NULL && lg->dcatalog == NULL) {
2153 119 : b = BATdescriptor(catalog_bid);
2154 119 : if (b == NULL) {
2155 0 : GDKerror("inconsistent database, catalog does not exist");
2156 0 : goto error;
2157 : }
2158 :
2159 119 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2160 119 : catalog_id = BBPindex(bak);
2161 119 : o = BATdescriptor(catalog_id);
2162 119 : if (o == NULL) {
2163 0 : BBPunfix(b->batCacheid);
2164 0 : GDKerror("inconsistent database, catalog_id does not exist");
2165 0 : goto error;
2166 : }
2167 :
2168 119 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2169 119 : dcatalog = BBPindex(bak);
2170 119 : d = BATdescriptor(dcatalog);
2171 119 : if (d == NULL) {
2172 0 : GDKerror("cannot create dcatalog bat");
2173 0 : BBPunfix(b->batCacheid);
2174 0 : BBPunfix(o->batCacheid);
2175 0 : goto error;
2176 : }
2177 :
2178 119 : lg->catalog_bid = b;
2179 119 : lg->catalog_id = o;
2180 119 : lg->dcatalog = d;
2181 119 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
2182 30289 : BATloop(lg->catalog_bid, p, q) {
2183 30170 : bat bid = bids[p];
2184 30170 : oid pos = p;
2185 :
2186 30170 : if (BBPretain(bid) == 0 && /* any bid in the catalog_bid, needs one logical ref */
2187 0 : BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
2188 0 : BUNappend(lg->dcatalog, &pos, true) != GDK_SUCCEED)
2189 0 : goto error;
2190 : }
2191 : }
2192 119 : if ((lg->catalog_bid = BATsetaccess(lg->catalog_bid, BAT_READ)) == NULL ||
2193 119 : (lg->catalog_id = BATsetaccess(lg->catalog_id, BAT_READ)) == NULL ||
2194 119 : (lg->dcatalog = BATsetaccess(lg->dcatalog, BAT_READ)) == NULL) {
2195 0 : goto error;
2196 : }
2197 119 : BBPretain(lg->catalog_bid->batCacheid);
2198 119 : BBPretain(lg->catalog_id->batCacheid);
2199 119 : BBPretain(lg->dcatalog->batCacheid);
2200 : }
2201 : /* failing to rename the catalog_cnt and catalog_lid bats is not
2202 : * fatal */
2203 341 : lg->catalog_cnt = logbat_new(TYPE_lng, 1, SYSTRANS);
2204 341 : if (lg->catalog_cnt == NULL) {
2205 0 : GDKerror("failed to create catalog_cnt bat");
2206 0 : goto error;
2207 : }
2208 341 : strconcat_len(bak, sizeof(bak), fn, "_catalog_cnt", NULL);
2209 341 : if (BBPrename(lg->catalog_cnt, bak) < 0)
2210 0 : GDKclrerr();
2211 341 : lg->catalog_lid = logbat_new(TYPE_lng, 1, SYSTRANS);
2212 341 : if (lg->catalog_lid == NULL) {
2213 0 : GDKerror("failed to create catalog_lid bat");
2214 0 : goto error;
2215 : }
2216 341 : strconcat_len(bak, sizeof(bak), fn, "_catalog_lid", NULL);
2217 341 : if (BBPrename(lg->catalog_lid, bak) < 0)
2218 0 : GDKclrerr();
2219 341 : if (bm_get_counts(lg) != GDK_SUCCEED)
2220 0 : goto error;
2221 :
2222 341 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2223 341 : if (BBPindex(bak)) {
2224 119 : lg->seqs_id = BATdescriptor(BBPindex(bak));
2225 119 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2226 119 : lg->seqs_val = BATdescriptor(BBPindex(bak));
2227 119 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2228 119 : lg->dseqs = BATdescriptor(BBPindex(bak));
2229 119 : if (lg->seqs_id == NULL ||
2230 119 : lg->seqs_val == NULL ||
2231 : lg->dseqs == NULL) {
2232 0 : GDKerror("Logger_new: cannot load seqs bats");
2233 0 : goto error;
2234 : }
2235 119 : if ((lg->seqs_val = BATsetaccess(lg->seqs_val, BAT_READ)) == NULL ||
2236 119 : (lg->seqs_id = BATsetaccess(lg->seqs_id, BAT_READ)) == NULL ||
2237 119 : (lg->dseqs = BATsetaccess(lg->dseqs, BAT_READ)) == NULL) {
2238 0 : goto error;
2239 : }
2240 : } else {
2241 222 : lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
2242 222 : lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
2243 222 : lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
2244 222 : if (lg->seqs_id == NULL ||
2245 222 : lg->seqs_val == NULL ||
2246 : lg->dseqs == NULL) {
2247 0 : GDKerror("Logger_new: cannot create seqs bats");
2248 0 : goto error;
2249 : }
2250 :
2251 222 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2252 222 : if (BBPrename(lg->seqs_id, bak) < 0) {
2253 0 : goto error;
2254 : }
2255 :
2256 222 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2257 222 : if (BBPrename(lg->seqs_val, bak) < 0) {
2258 0 : goto error;
2259 : }
2260 :
2261 222 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2262 222 : if (BBPrename(lg->dseqs, bak) < 0) {
2263 0 : goto error;
2264 : }
2265 : needcommit = true;
2266 : }
2267 341 : dbg = ATOMIC_GET(&GDKdebug);
2268 341 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2269 341 : if (needcommit && bm_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2270 0 : GDKerror("Logger_new: commit failed");
2271 0 : goto error;
2272 : }
2273 341 : ATOMIC_SET(&GDKdebug, dbg);
2274 :
2275 341 : if (readlogs) {
2276 119 : ulng log_id = lg->saved_id + 1;
2277 119 : if (log_readlogs(lg, filename) != GDK_SUCCEED) {
2278 0 : goto error;
2279 : }
2280 119 : if (lg->postfuncp && (*lg->postfuncp) (lg->funcdata, lg) != GDK_SUCCEED)
2281 0 : goto error;
2282 119 : if (needsnew) {
2283 16 : if (GDKmove(0, lg->dir, LOGFILE, NULL, lg->dir, LOGFILE, "bak", true) != GDK_SUCCEED) {
2284 0 : TRC_CRITICAL(GDK, "couldn't move log to log.bak\n");
2285 0 : return GDK_FAIL;
2286 : }
2287 16 : if (log_create_types_file(lg, filename) != GDK_SUCCEED) {
2288 0 : TRC_CRITICAL(GDK, "couldn't write new log\n");
2289 0 : return GDK_FAIL;
2290 : }
2291 : }
2292 119 : dbg = ATOMIC_GET(&GDKdebug);
2293 119 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2294 119 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2295 0 : goto error;
2296 : }
2297 119 : ATOMIC_SET(&GDKdebug, dbg);
2298 318 : for (; log_id <= lg->saved_id; log_id++)
2299 199 : (void) log_cleanup(lg, log_id); /* ignore error of removing file */
2300 135 : if (needsnew &&
2301 16 : GDKunlink(0, lg->dir, LOGFILE, "bak") != GDK_SUCCEED) {
2302 0 : TRC_CRITICAL(GDK, "couldn't remove old log.bak file\n");
2303 0 : return GDK_FAIL;
2304 : }
2305 : } else {
2306 222 : lg->id = lg->saved_id + 1;
2307 : }
2308 : #ifdef GDKLIBRARY_JSON
2309 341 : if (log_json_upgrade_finalize() == GDK_FAIL)
2310 0 : goto error;
2311 : #endif
2312 : return GDK_SUCCEED;
2313 0 : error:
2314 0 : if (fp)
2315 0 : fclose(fp);
2316 0 : logbat_destroy(lg->catalog_bid);
2317 0 : logbat_destroy(lg->catalog_id);
2318 0 : logbat_destroy(lg->dcatalog);
2319 0 : logbat_destroy(lg->seqs_id);
2320 0 : logbat_destroy(lg->seqs_val);
2321 0 : logbat_destroy(lg->dseqs);
2322 0 : MT_lock_destroy(&lg->lock);
2323 0 : MT_lock_destroy(&lg->rotation_lock);
2324 0 : GDKfree(lg->fn);
2325 0 : GDKfree(lg->dir);
2326 0 : GDKfree(lg->rbuf);
2327 0 : GDKfree(lg->wbuf);
2328 0 : GDKfree(lg);
2329 0 : ATOMIC_SET(&GDKdebug, dbg);
2330 : /* We do not call log_json_upgrade_finalize here because we want
2331 : * the upgrade to run again next time we try, so we do not want
2332 : * to remove the signal file just yet.
2333 : */
2334 0 : return GDK_FAIL;
2335 : }
2336 :
2337 : /* Initialize a new logger
2338 : * It will load any data in the logdir and persist it in the BATs*/
2339 : static logger *
2340 341 : log_new(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp,
2341 : postversionfix_fptr postfuncp, void *funcdata)
2342 : {
2343 341 : logger *lg;
2344 341 : char filename[FILENAME_MAX];
2345 :
2346 341 : lng max_dropped = GDKgetenv_int("wal_max_dropped", 100000);
2347 341 : lng max_file_age = GDKgetenv_int("wal_max_file_age", 600);
2348 341 : lng max_file_size = 0;
2349 :
2350 341 : if (GDKdebug & FORCEMITOMASK) {
2351 : max_file_size = 2048; /* 2 KiB */
2352 : } else {
2353 12 : const char *max_file_size_str = GDKgetenv("wal_max_file_size");
2354 12 : max_file_size = max_file_size_str ? strtoul(max_file_size_str, NULL, 10) : 2147483648;
2355 : }
2356 :
2357 341 : if (!GDKinmemory(0) && MT_path_absolute(logdir)) {
2358 0 : TRC_CRITICAL(GDK, "logdir must be relative path\n");
2359 0 : return NULL;
2360 : }
2361 :
2362 341 : if (snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP) >= FILENAME_MAX) {
2363 0 : TRC_CRITICAL(GDK, "filename is too large\n");
2364 0 : return NULL;
2365 : }
2366 :
2367 341 : lg = GDKmalloc(sizeof(struct logger));
2368 341 : if (lg == NULL) {
2369 0 : TRC_CRITICAL(GDK, "allocating logger structure failed\n");
2370 0 : return NULL;
2371 : }
2372 :
2373 682 : *lg = (logger) {
2374 341 : .inmemory = GDKinmemory(0),
2375 : .debug = debug,
2376 : .version = version,
2377 : .prefuncp = prefuncp,
2378 : .postfuncp = postfuncp,
2379 : .funcdata = funcdata,
2380 :
2381 341 : .max_dropped = max_dropped >= 0 ? max_dropped : 100000,
2382 : .file_age = 0,
2383 341 : .max_file_age = max_file_age >= 0 ? max_file_age * 1000000 : 600000000,
2384 341 : .max_file_size = max_file_size >= 0 ? max_file_size : 2147483648,
2385 :
2386 : .id = 0,
2387 341 : .saved_id = getBBPlogno(), /* get saved log numer from bbp */
2388 : .nr_flushers = ATOMIC_VAR_INIT(0),
2389 341 : .fn = GDKstrdup(fn),
2390 341 : .dir = GDKstrdup(filename),
2391 : .rbufsize = 64 * 1024,
2392 341 : .rbuf = GDKmalloc(64 * 1024),
2393 : .wbufsize = 64 * 1024,
2394 341 : .wbuf = GDKmalloc(64 * 1024),
2395 : };
2396 :
2397 : /* probably open file and check version first, then call call old logger code */
2398 341 : if (lg->fn == NULL ||
2399 341 : lg->dir == NULL ||
2400 341 : lg->rbuf == NULL ||
2401 : lg->wbuf == NULL) {
2402 0 : TRC_CRITICAL(GDK, "allocating for logger structure failed\n");
2403 0 : GDKfree(lg->fn);
2404 0 : GDKfree(lg->dir);
2405 0 : GDKfree(lg->rbuf);
2406 0 : GDKfree(lg->wbuf);
2407 0 : GDKfree(lg);
2408 0 : return NULL;
2409 : }
2410 341 : TRC_DEBUG(WAL, "dir set to %s\n", lg->dir);
2411 :
2412 341 : MT_lock_init(&lg->lock, fn);
2413 341 : MT_lock_init(&lg->rotation_lock, "rotation_lock");
2414 341 : MT_lock_init(&lg->flush_lock, "flush_lock");
2415 341 : MT_cond_init(&lg->excl_flush_cv);
2416 :
2417 341 : if (log_load(fn, logdir, lg, filename) == GDK_SUCCEED) {
2418 : return lg;
2419 : }
2420 : return NULL;
2421 : }
2422 :
2423 : static logged_range *
2424 66509 : do_flush_range_cleanup(logger *lg)
2425 : {
2426 66509 : logged_range *frange = lg->flush_ranges;
2427 66509 : logged_range *first = frange;
2428 :
2429 77770 : while (frange->next) {
2430 11417 : if (ATOMIC_GET(&frange->refcount) > 1)
2431 : break;
2432 11261 : frange = frange->next;
2433 : }
2434 66509 : if (first == frange) {
2435 : return first;
2436 : }
2437 :
2438 11261 : logged_range *flast = frange;
2439 :
2440 11261 : lg->flush_ranges = flast;
2441 :
2442 22522 : for (frange = first; frange && frange != flast; frange = frange->next) {
2443 11261 : ATOMIC_DEC(&frange->refcount);
2444 11261 : if (!LOG_DISABLED(lg) && frange->output_log) {
2445 0 : TRC_INFO(WAL, "closing output log %s", mnstr_name(frange->output_log));
2446 0 : close_stream(frange->output_log);
2447 0 : frange->output_log = NULL;
2448 : }
2449 : }
2450 : return flast;
2451 : }
2452 :
2453 : void
2454 340 : log_destroy(logger *lg)
2455 : {
2456 340 : log_close_input(lg);
2457 340 : logged_range *last = do_flush_range_cleanup(lg);
2458 340 : (void) last;
2459 340 : assert(last == lg->current && last == lg->flush_ranges);
2460 340 : log_close_output(lg);
2461 784 : for (logged_range * p = lg->pending; p; p = lg->pending) {
2462 444 : lg->pending = p->next;
2463 444 : GDKfree(p);
2464 : }
2465 340 : if (LOG_DISABLED(lg)) {
2466 1 : lg->saved_id = lg->id;
2467 1 : lg->saved_tid = lg->tid;
2468 1 : log_commit(lg, NULL, NULL, 0);
2469 : }
2470 340 : if (lg->catalog_bid) {
2471 340 : log_lock(lg);
2472 340 : BUN p, q;
2473 340 : BAT *b = lg->catalog_bid;
2474 :
2475 : /* free resources */
2476 340 : const log_bid *bids = (const log_bid *) Tloc(b, 0);
2477 88421 : BATloop(b, p, q) {
2478 88081 : bat bid = bids[p];
2479 :
2480 88081 : BBPrelease(bid);
2481 : }
2482 :
2483 340 : BBPrelease(lg->catalog_bid->batCacheid);
2484 340 : BBPrelease(lg->catalog_id->batCacheid);
2485 340 : BBPrelease(lg->dcatalog->batCacheid);
2486 340 : logbat_destroy(lg->catalog_bid);
2487 340 : logbat_destroy(lg->catalog_id);
2488 340 : logbat_destroy(lg->dcatalog);
2489 :
2490 340 : logbat_destroy(lg->catalog_cnt);
2491 340 : logbat_destroy(lg->catalog_lid);
2492 340 : log_unlock(lg);
2493 : }
2494 340 : MT_lock_destroy(&lg->lock);
2495 340 : MT_lock_destroy(&lg->rotation_lock);
2496 340 : MT_lock_destroy(&lg->flush_lock);
2497 340 : GDKfree(lg->fn);
2498 340 : GDKfree(lg->dir);
2499 340 : GDKfree(lg->rbuf);
2500 340 : GDKfree(lg->wbuf);
2501 340 : GDKfree(lg);
2502 340 : }
2503 :
2504 : /* Create a new logger */
2505 : logger *
2506 341 : log_create(int debug, const char *fn, const char *logdir, int version,
2507 : preversionfix_fptr prefuncp, postversionfix_fptr postfuncp,
2508 : void *funcdata)
2509 : {
2510 341 : logger *lg;
2511 341 : TRC_INFO_IF(WAL) {
2512 0 : TRC_INFO_ENDIF(WAL, "Started processing logs %s/%s version %d\n", fn, logdir, version);
2513 0 : GDKtracer_flush_buffer();
2514 : }
2515 341 : lg = log_new(debug, fn, logdir, version, prefuncp, postfuncp, funcdata);
2516 341 : if (lg == NULL)
2517 : return NULL;
2518 341 : TRC_INFO_IF(WAL) {
2519 0 : TRC_INFO_ENDIF(WAL, "Finished processing logs %s/%s\n", fn, logdir);
2520 0 : GDKtracer_flush_buffer();
2521 : }
2522 341 : if (GDKsetenv("recovery", "finished") != GDK_SUCCEED) {
2523 0 : log_destroy(lg);
2524 0 : return NULL;
2525 : }
2526 341 : assert(lg->current == NULL);
2527 341 : logged_range dummy = {
2528 341 : .cnt = BATcount(lg->catalog_bid),
2529 : };
2530 341 : lg->current = &dummy;
2531 341 : if (log_open_output(lg) != GDK_SUCCEED) {
2532 0 : log_destroy(lg);
2533 0 : return NULL;
2534 : }
2535 341 : lg->current = lg->current->next;
2536 341 : assert(lg->pending == NULL && lg->flush_ranges == NULL);
2537 341 : lg->pending = lg->current;
2538 341 : lg->flush_ranges = lg->current;
2539 341 : return lg;
2540 : }
2541 :
2542 : static logged_range *
2543 5768 : log_next_logfile(logger *lg, ulng ts)
2544 : {
2545 5768 : int m = (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) ? 1000 : 100;
2546 5768 : if (!lg->pending || !lg->pending->next)
2547 : return NULL;
2548 5768 : rotation_lock(lg);
2549 5768 : if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && lg->pending != lg->flush_ranges &&
2550 5768 : (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) ATOMIC_GET(&lg->pending->flushed_ts) &&
2551 5768 : (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) {
2552 5332 : rotation_unlock(lg);
2553 5332 : logged_range *p = lg->pending;
2554 5332 : for (int i = 1;
2555 11157 : i < m && ATOMIC_GET(&p->refcount) == 0 && p->next && p->next != lg->current &&
2556 5827 : p->next != lg->flush_ranges && (ulng) ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts)
2557 16984 : && (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++)
2558 5825 : p = p->next;
2559 5332 : return p;
2560 : }
2561 436 : rotation_unlock(lg);
2562 436 : return NULL;
2563 : }
2564 :
2565 : static void
2566 5332 : log_cleanup_range(logger *lg, ulng id)
2567 : {
2568 5332 : rotation_lock(lg);
2569 16489 : while (lg->pending && lg->pending->id <= id) {
2570 11157 : logged_range *p;
2571 11157 : p = lg->pending;
2572 11157 : if (p)
2573 11157 : lg->pending = p->next;
2574 11157 : GDKfree(p);
2575 : }
2576 5332 : rotation_unlock(lg);
2577 5332 : }
2578 :
2579 : static void
2580 66172 : do_rotate(logger *lg)
2581 : {
2582 66172 : logged_range *cur = lg->current;
2583 66172 : logged_range *next = cur->next;
2584 66172 : if (next) {
2585 11261 : assert(ATOMIC_GET(&next->refcount) == 1);
2586 11261 : lg->current = next;
2587 11261 : if (!LOG_DISABLED(lg) && ATOMIC_GET(&cur->refcount) == 1 && cur->output_log) {
2588 11158 : close_stream(cur->output_log);
2589 11158 : cur->output_log = NULL;
2590 : }
2591 : }
2592 66172 : }
2593 :
2594 : gdk_return
2595 5083 : log_activate(logger *lg)
2596 : {
2597 5083 : bool flush_cleanup = false;
2598 5083 : gdk_return res = GDK_SUCCEED;
2599 :
2600 5083 : rotation_lock(lg);
2601 5083 : const lng current_file_size = LOG_DISABLED(lg) ? 0 : (lng) getfilepos(getFile(lg->current->output_log));
2602 :
2603 5083 : if (current_file_size == -1) {
2604 0 : rotation_unlock(lg);
2605 0 : return GDK_FAIL;
2606 : }
2607 : /* file size of 2 means only endian indicator present
2608 : * (i.e. effectively empty) */
2609 5083 : if (current_file_size <= 2) {
2610 3069 : rotation_unlock(lg);
2611 3069 : return GDK_SUCCEED;
2612 : }
2613 :
2614 2014 : if (!lg->flushnow &&
2615 2014 : !lg->current->next &&
2616 2014 : current_file_size > 2 &&
2617 2014 : (ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped ||
2618 2009 : current_file_size > lg->max_file_size ||
2619 1991 : (GDKusec() - lg->file_age) > lg->max_file_age) &&
2620 23 : (ulng) ATOMIC_GET(&lg->current->last_ts) > 0 &&
2621 23 : lg->saved_id + 1 == lg->id &&
2622 22 : ATOMIC_GET(&lg->current->refcount) == 1 /* no pending work on this file */ ) {
2623 22 : lg->id++;
2624 : /* start new file */
2625 22 : res = log_open_output(lg);
2626 22 : flush_cleanup = true;
2627 22 : do_rotate(lg);
2628 : }
2629 22 : if (flush_cleanup)
2630 22 : (void) do_flush_range_cleanup(lg);
2631 2014 : rotation_unlock(lg);
2632 2014 : return res;
2633 : }
2634 :
2635 : gdk_return
2636 5768 : log_flush(logger *lg, ulng ts)
2637 : {
2638 5768 : logged_range *pending = log_next_logfile(lg, ts);
2639 5768 : ulng lid = pending ? pending->id : 0, olid = lg->saved_id;
2640 5768 : if (LOG_DISABLED(lg)) {
2641 0 : lg->saved_id = lid;
2642 0 : lg->saved_tid = lg->tid;
2643 0 : if (lid)
2644 0 : log_cleanup_range(lg, lg->saved_id);
2645 0 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED)
2646 0 : TRC_ERROR(GDK, "failed to commit");
2647 0 : return GDK_SUCCEED;
2648 : }
2649 5768 : if (lg->saved_id >= lid)
2650 : return GDK_SUCCEED;
2651 5332 : rotation_lock(lg);
2652 5332 : ulng lgid = lg->id;
2653 5332 : rotation_unlock(lg);
2654 5332 : if (lg->saved_id + 1 >= lgid) /* logger should first release the file */
2655 : return GDK_SUCCEED;
2656 5332 : log_return res = LOG_OK;
2657 5332 : ulng cid = olid;
2658 5332 : assert(lid <= lgid);
2659 : uint32_t *updated = NULL;
2660 : BUN nupdated = 0;
2661 : size_t allocated = 0;
2662 16489 : while (cid < lid && res == LOG_OK) {
2663 11157 : if (!lg->input_log) {
2664 11157 : char *filename;
2665 11157 : char id[32];
2666 11157 : if (snprintf(id, sizeof(id), LLFMT, cid + 1) >= (int) sizeof(id)) {
2667 0 : GDKfree(updated);
2668 0 : TRC_CRITICAL(GDK, "log_id filename is too large\n");
2669 0 : return GDK_FAIL;
2670 : }
2671 11157 : if ((filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id)) == NULL) {
2672 0 : GDKfree(updated);
2673 0 : return GDK_FAIL;
2674 : }
2675 11157 : if (strlen(filename) >= FILENAME_MAX) {
2676 0 : GDKfree(updated);
2677 0 : TRC_CRITICAL(GDK, "Logger filename path is too large\n");
2678 0 : GDKfree(filename);
2679 0 : return GDK_FAIL;
2680 : }
2681 :
2682 11157 : bool filemissing = false;
2683 11157 : if (log_open_input(lg, filename, &filemissing) != GDK_SUCCEED) {
2684 0 : GDKfree(updated);
2685 0 : GDKfree(filename);
2686 0 : return GDK_FAIL;
2687 : }
2688 11157 : GDKfree(filename);
2689 : }
2690 : /* we read the full file because skipping is impossible with current log format */
2691 11157 : log_lock(lg);
2692 11157 : if (updated == NULL) {
2693 5332 : nupdated = BATcount(lg->catalog_id);
2694 5332 : allocated = ((nupdated + 31) & ~31) / 8;
2695 5332 : if (allocated == 0)
2696 0 : allocated = 4;
2697 5332 : updated = GDKzalloc(allocated);
2698 5332 : if (updated == NULL) {
2699 0 : log_unlock(lg);
2700 0 : return GDK_FAIL;
2701 : }
2702 5825 : } else if (nupdated < BATcount(lg->catalog_id)) {
2703 355 : BUN n = BATcount(lg->catalog_id);
2704 355 : size_t a = ((n + 31) & ~31) / 8;
2705 355 : if (a > allocated) {
2706 33 : uint32_t *p = GDKrealloc(updated, a);
2707 33 : if (p == NULL) {
2708 0 : GDKfree(updated);
2709 0 : log_unlock(lg);
2710 0 : return GDK_FAIL;
2711 : }
2712 33 : updated = p;
2713 33 : memset(updated + allocated / 4, 0, a - allocated);
2714 33 : allocated = a;
2715 : }
2716 : nupdated = n;
2717 : }
2718 11157 : lg->flushing = true;
2719 11157 : res = log_read_transaction(lg, updated, nupdated);
2720 11157 : lg->flushing = false;
2721 11157 : log_unlock(lg);
2722 11157 : if (res == LOG_EOF) {
2723 11157 : log_close_input(lg);
2724 11157 : res = LOG_OK;
2725 : }
2726 11157 : cid++;
2727 : }
2728 5332 : if (lid > olid && res == LOG_OK) {
2729 5332 : rotation_lock(lg); /* protect against concurrent log_tflush rotate check */
2730 5332 : lg->saved_id = lid;
2731 5332 : rotation_unlock(lg);
2732 5332 : if (log_commit(lg, pending, updated, nupdated) != GDK_SUCCEED) {
2733 0 : TRC_ERROR(GDK, "failed to commit");
2734 0 : res = LOG_ERR;
2735 0 : rotation_lock(lg);
2736 0 : lg->saved_id = olid; /* reset !! */
2737 0 : rotation_unlock(lg);
2738 : }
2739 0 : if (res != LOG_ERR) {
2740 16489 : while (olid < lid) {
2741 : /* Try to cleanup, remove old log file, continue on failure! */
2742 11157 : olid++;
2743 11157 : (void) log_cleanup(lg, olid);
2744 : }
2745 : }
2746 5332 : if (res == LOG_OK)
2747 5332 : log_cleanup_range(lg, lg->saved_id);
2748 : }
2749 5332 : GDKfree(updated);
2750 5332 : return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
2751 : }
2752 :
2753 : /* Clean-up write-ahead log files already persisted in the BATs, leaving only the most recent one.
2754 : * Only the bak- files are deleted for the preserved WAL files.
2755 : */
2756 : lng
2757 13972 : log_changes(logger *lg)
2758 : {
2759 13972 : if (LOG_DISABLED(lg))
2760 : return 0;
2761 13972 : rotation_lock(lg);
2762 13972 : lng changes = lg->id - lg->saved_id - 1;
2763 13972 : rotation_unlock(lg);
2764 13972 : return changes;
2765 : }
2766 :
2767 : int
2768 471 : log_sequence(logger *lg, int seq, lng *id)
2769 : {
2770 471 : log_lock(lg);
2771 471 : BUN p = log_find(lg->seqs_id, lg->dseqs, seq);
2772 :
2773 471 : if (p != BUN_NONE) {
2774 353 : *id = *(lng *) Tloc(lg->seqs_val, p);
2775 :
2776 353 : log_unlock(lg);
2777 353 : return 1;
2778 : }
2779 118 : log_unlock(lg);
2780 118 : return 0;
2781 : }
2782 :
2783 : gdk_return
2784 190295 : log_constant(logger *lg, int type, ptr val, log_id id, lng offset, lng cnt)
2785 : {
2786 190295 : bte tpe = find_type(lg, type);
2787 190295 : gdk_return ok = GDK_SUCCEED;
2788 190295 : logformat l;
2789 190295 : lng nr;
2790 190295 : l.flag = LOG_UPDATE_CONST;
2791 190295 : l.id = id;
2792 190295 : nr = cnt;
2793 :
2794 190295 : if (LOG_DISABLED(lg) || !nr) {
2795 : /* logging is switched off */
2796 58692 : if (nr) {
2797 58692 : log_lock(lg);
2798 58692 : ok = la_bat_update_count(lg, id, offset + cnt, lg->tid);
2799 58692 : log_unlock(lg);
2800 : }
2801 58692 : return ok;
2802 : }
2803 :
2804 131603 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[type].atomWrite;
2805 :
2806 131603 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2807 263206 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
2808 263206 : log_write_format(lg, &l) != GDK_SUCCEED ||
2809 263206 : !mnstr_writeLng(lg->current->output_log, nr) ||
2810 263206 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2811 131603 : !mnstr_writeLng(lg->current->output_log, offset)) {
2812 0 : ATOMIC_DEC(&lg->current->refcount);
2813 0 : ok = GDK_FAIL;
2814 0 : goto bailout;
2815 : }
2816 :
2817 131603 : ok = wt(val, lg->current->output_log, 1);
2818 :
2819 131603 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
2820 :
2821 131603 : bailout:
2822 131603 : if (ok != GDK_SUCCEED) {
2823 0 : const char *err = mnstr_peek_error(lg->current->output_log);
2824 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
2825 : }
2826 : return ok;
2827 : }
2828 :
2829 : static gdk_return
2830 18841 : string_writer(logger *lg, BAT *b, lng offset, lng nr)
2831 : {
2832 18841 : size_t bufsz = lg->wbufsize, resize = 0;
2833 18841 : BUN end = (BUN) (offset + nr);
2834 18841 : char *buf = lg->wbuf;
2835 18841 : gdk_return res = GDK_SUCCEED;
2836 :
2837 18841 : if (!buf)
2838 : return GDK_FAIL;
2839 18841 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2840 18841 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
2841 : return GDK_FAIL;
2842 18841 : BATiter bi = bat_iterator(b);
2843 18841 : BUN p = (BUN) offset;
2844 37733 : for (; p < end;) {
2845 18892 : size_t sz = 0;
2846 18892 : if (resize) {
2847 1 : if ((buf = GDKrealloc(lg->wbuf, resize)) == NULL) {
2848 : res = GDK_FAIL;
2849 : break;
2850 : }
2851 1 : lg->wbuf = buf;
2852 1 : lg->wbufsize = bufsz = resize;
2853 1 : resize = 0;
2854 : }
2855 18892 : char *dst = buf;
2856 108000 : for (; p < end && sz < bufsz; p++) {
2857 89159 : const char *s = BUNtvar(bi, p);
2858 89159 : size_t len = strlen(s) + 1;
2859 89159 : if ((sz + len) > bufsz) {
2860 51 : if (len > bufsz)
2861 1 : resize = len + bufsz;
2862 : break;
2863 : } else {
2864 89108 : memcpy(dst, s, len);
2865 89108 : dst += len;
2866 89108 : sz += len;
2867 : }
2868 : }
2869 37783 : if (sz &&
2870 37782 : (!mnstr_writeLng(lg->current->output_log, (lng) sz) ||
2871 18891 : mnstr_write(lg->current->output_log, buf, sz, 1) != 1)) {
2872 : res = GDK_FAIL;
2873 : break;
2874 : }
2875 : }
2876 18841 : bat_iterator_end(&bi);
2877 18841 : return res;
2878 : }
2879 :
2880 : static gdk_return
2881 492122 : internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced, lng total_cnt)
2882 : {
2883 492122 : bte tpe = find_type(lg, b->ttype);
2884 492122 : gdk_return ok = GDK_SUCCEED;
2885 492122 : logformat l;
2886 492122 : BUN p;
2887 492122 : lng nr;
2888 492122 : l.flag = LOG_UPDATE_BULK;
2889 492122 : l.id = id;
2890 492122 : nr = cnt;
2891 :
2892 492122 : if (LOG_DISABLED(lg) || !nr) {
2893 : /* logging is switched off */
2894 193425 : if (nr)
2895 141182 : return la_bat_update_count(lg, id, offset + cnt, lg->tid);
2896 : return GDK_SUCCEED;
2897 : }
2898 :
2899 269223 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[b->ttype].atomWrite;
2900 :
2901 269223 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2902 269223 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR) {
2903 0 : ok = GDK_FAIL;
2904 0 : goto bailout;
2905 : }
2906 :
2907 269223 : if (lg->total_cnt == 0) /* signals single bulk message or first part of bat logged in parts */
2908 532458 : if (log_write_format(lg, &l) != GDK_SUCCEED ||
2909 662292 : !mnstr_writeLng(lg->current->output_log, total_cnt ? total_cnt : cnt) ||
2910 532458 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2911 402624 : !mnstr_writeLng(lg->current->output_log, total_cnt ? -1 : offset)) { /* offset = -1 indicates bat was logged in parts */
2912 0 : ok = GDK_FAIL;
2913 0 : goto bailout;
2914 : }
2915 269223 : if (!total_cnt)
2916 129834 : total_cnt = cnt;
2917 269223 : lg->total_cnt += cnt;
2918 :
2919 269223 : if (lg->total_cnt == total_cnt) /* This is the last to be logged part of this bat, we can already reset the total_cnt */
2920 266229 : lg->total_cnt = 0;
2921 :
2922 : /* if offset is just for the log, but BAT is already sliced, reset offset */
2923 269223 : if (sliced)
2924 1440 : offset = 0;
2925 269223 : if (b->ttype == TYPE_msk) {
2926 0 : BATiter bi = bat_iterator(b);
2927 0 : if (offset % 32 == 0) {
2928 0 : if (!mnstr_writeIntArray(lg->current->output_log, (int *) ((char *) bi.base + offset / 32),
2929 0 : (size_t) ((nr + 31) / 32)))
2930 0 : ok = GDK_FAIL;
2931 : } else {
2932 0 : for (lng i = 0; i < nr; i += 32) {
2933 : uint32_t v = 0;
2934 0 : for (int j = 0; j < 32 && i + j < nr; j++)
2935 0 : v |= (uint32_t) Tmskval(&bi, (BUN) (offset + i + j)) << j;
2936 0 : if (!mnstr_writeInt(lg->current->output_log, (int) v)) {
2937 : ok = GDK_FAIL;
2938 : break;
2939 : }
2940 : }
2941 : }
2942 0 : bat_iterator_end(&bi);
2943 518977 : } else if (b->ttype < TYPE_str && !isVIEW(b)) {
2944 249754 : BATiter bi = bat_iterator(b);
2945 249754 : const void *t = BUNtail(bi, (BUN) offset);
2946 :
2947 249754 : ok = wt(t, lg->current->output_log, (size_t) nr);
2948 249754 : bat_iterator_end(&bi);
2949 19469 : } else if (b->ttype == TYPE_str) {
2950 : /* efficient string writes */
2951 18821 : ok = string_writer(lg, b, offset, nr);
2952 : } else {
2953 648 : BATiter bi = bat_iterator(b);
2954 648 : BUN end = (BUN) (offset + nr);
2955 1654 : for (p = (BUN) offset; p < end && ok == GDK_SUCCEED; p++) {
2956 1006 : const void *t = BUNtail(bi, p);
2957 :
2958 1006 : ok = wt(t, lg->current->output_log, 1);
2959 : }
2960 648 : bat_iterator_end(&bi);
2961 : }
2962 :
2963 269223 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
2964 :
2965 269223 : bailout:
2966 269223 : if (ok != GDK_SUCCEED) {
2967 0 : ATOMIC_DEC(&lg->current->refcount);
2968 0 : const char *err = mnstr_peek_error(lg->current->output_log);
2969 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
2970 : }
2971 : return ok;
2972 : }
2973 :
2974 : /*
2975 : * Changes made to the BAT descriptor should be stored in the log
2976 : * files. Actually, we need to save the descriptor file, perhaps we
2977 : * should simply introduce a versioning scheme.
2978 : */
2979 : gdk_return
2980 233396 : log_bat_persists(logger *lg, BAT *b, log_id id)
2981 : {
2982 233396 : log_lock(lg);
2983 233396 : bte ta = find_type(lg, b->ttype);
2984 233396 : logformat l;
2985 :
2986 233396 : if (log_add_bat(lg, b, id, -1) != GDK_SUCCEED) {
2987 0 : log_unlock(lg);
2988 0 : if (!LOG_DISABLED(lg))
2989 0 : ATOMIC_DEC(&lg->current->refcount);
2990 0 : return GDK_FAIL;
2991 : }
2992 :
2993 233396 : l.flag = LOG_CREATE;
2994 233396 : l.id = id;
2995 233396 : if (!LOG_DISABLED(lg)) {
2996 157856 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2997 315712 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
2998 315712 : log_write_format(lg, &l) != GDK_SUCCEED ||
2999 157856 : mnstr_write(lg->current->output_log, &ta, 1, 1) != 1) {
3000 0 : log_unlock(lg);
3001 0 : ATOMIC_DEC(&lg->current->refcount);
3002 0 : return GDK_FAIL;
3003 : }
3004 : }
3005 233396 : TRC_DEBUG(WAL, "id (%d) bat (%d)\n", id, b->batCacheid);
3006 233396 : gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0, 0);
3007 233396 : log_unlock(lg);
3008 233396 : if (r != GDK_SUCCEED)
3009 0 : ATOMIC_DEC(&lg->current->refcount);
3010 : return r;
3011 : }
3012 :
3013 : gdk_return
3014 20873 : log_bat_transient(logger *lg, log_id id)
3015 : {
3016 20873 : log_lock(lg);
3017 20873 : log_bid bid = internal_find_bat(lg, id, -1);
3018 20873 : logformat l;
3019 :
3020 20873 : if (bid < 0) {
3021 0 : log_unlock(lg);
3022 0 : return GDK_FAIL;
3023 : }
3024 20873 : if (!bid) {
3025 0 : GDKerror("log_bat_transient failed to find bid for object %d\n", id);
3026 0 : log_unlock(lg);
3027 0 : return GDK_FAIL;
3028 : }
3029 20873 : l.flag = LOG_DESTROY;
3030 20873 : l.id = id;
3031 :
3032 20873 : if (!LOG_DISABLED(lg)) {
3033 12443 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3034 0 : TRC_CRITICAL(GDK, "write failed\n");
3035 0 : log_unlock(lg);
3036 0 : ATOMIC_DEC(&lg->current->refcount);
3037 0 : return GDK_FAIL;
3038 : }
3039 : }
3040 20873 : TRC_DEBUG(WAL, "Logged destroyed bat (%d) %d\n", id, bid);
3041 20873 : BAT *b = BBPquickdesc(bid);
3042 20873 : assert(b);
3043 20873 : BUN cnt = BATcount(b);
3044 20873 : ATOMIC_ADD(&lg->current->drops, cnt);
3045 20873 : gdk_return r = log_del_bat(lg, bid);
3046 20873 : log_unlock(lg);
3047 20873 : if (r != GDK_SUCCEED)
3048 0 : ATOMIC_DEC(&lg->current->refcount);
3049 : return r;
3050 : }
3051 :
3052 : static gdk_return
3053 112430 : log_bat_group(logger *lg, log_id id)
3054 : {
3055 112430 : if (LOG_DISABLED(lg))
3056 : return GDK_SUCCEED;
3057 :
3058 74964 : logformat l;
3059 74964 : l.flag = LOG_BAT_GROUP;
3060 74964 : l.id = id;
3061 74964 : gdk_return r = log_write_format(lg, &l);
3062 74964 : return r;
3063 : }
3064 :
3065 : gdk_return
3066 56215 : log_bat_group_start(logger *lg, log_id id)
3067 : {
3068 : /*positive table id represent start of logged table */
3069 56215 : return log_bat_group(lg, id);
3070 : }
3071 :
3072 : gdk_return
3073 56215 : log_bat_group_end(logger *lg, log_id id)
3074 : {
3075 : /*negative table id represent end of logged table */
3076 56215 : return log_bat_group(lg, -id);
3077 : }
3078 :
3079 : gdk_return
3080 256101 : log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, lng total_cnt)
3081 : {
3082 256101 : log_lock(lg);
3083 256101 : gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0, total_cnt);
3084 256101 : log_unlock(lg);
3085 256101 : return r;
3086 : }
3087 :
3088 : gdk_return
3089 2717 : log_delta(logger *lg, BAT *uid, BAT *uval, log_id id)
3090 : {
3091 2717 : log_lock(lg);
3092 2717 : bte tpe = find_type(lg, uval->ttype);
3093 2717 : gdk_return ok = GDK_SUCCEED;
3094 2717 : logformat l;
3095 2717 : BUN p;
3096 2717 : lng nr;
3097 :
3098 2717 : if (BATtdense(uid)) {
3099 2625 : ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1, 0);
3100 2625 : log_unlock(lg);
3101 2625 : if (!LOG_DISABLED(lg) && ok != GDK_SUCCEED)
3102 0 : ATOMIC_DEC(&lg->current->refcount);
3103 2625 : return ok;
3104 : }
3105 :
3106 92 : assert(uid->ttype == TYPE_oid || uid->ttype == TYPE_void);
3107 :
3108 92 : l.flag = LOG_UPDATE;
3109 92 : l.id = id;
3110 92 : nr = (BATcount(uval));
3111 92 : assert(nr);
3112 :
3113 92 : if (LOG_DISABLED(lg)) {
3114 : /* logging is switched off */
3115 49 : log_unlock(lg);
3116 49 : return GDK_SUCCEED;
3117 : }
3118 :
3119 43 : BATiter vi = bat_iterator(uval);
3120 43 : gdk_return(*wh) (const void *, stream *, size_t) = BATatoms[TYPE_oid].atomWrite;
3121 43 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[uval->ttype].atomWrite;
3122 :
3123 43 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3124 86 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3125 86 : log_write_format(lg, &l) != GDK_SUCCEED ||
3126 86 : !mnstr_writeLng(lg->current->output_log, nr) ||
3127 43 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1) {
3128 0 : ok = GDK_FAIL;
3129 0 : goto bailout;
3130 : }
3131 10246 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3132 10203 : const oid id = BUNtoid(uid, p);
3133 :
3134 10203 : ok = wh(&id, lg->current->output_log, 1);
3135 : }
3136 43 : if (uval->ttype == TYPE_msk) {
3137 0 : if (!mnstr_writeIntArray(lg->current->output_log, vi.base,
3138 0 : (BATcount(uval) + 31) / 32))
3139 0 : ok = GDK_FAIL;
3140 66 : } else if (uval->ttype < TYPE_str && !isVIEW(uval)) {
3141 23 : const void *t = BUNtail(vi, 0);
3142 :
3143 23 : ok = wt(t, lg->current->output_log, (size_t) nr);
3144 20 : } else if (uval->ttype == TYPE_str) {
3145 : /* efficient string writes */
3146 20 : ok = string_writer(lg, uval, 0, nr);
3147 : } else {
3148 0 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3149 0 : const void *val = BUNtail(vi, p);
3150 :
3151 0 : ok = wt(val, lg->current->output_log, 1);
3152 : }
3153 : }
3154 :
3155 43 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
3156 :
3157 43 : bailout:
3158 43 : bat_iterator_end(&vi);
3159 43 : if (ok != GDK_SUCCEED) {
3160 0 : const char *err = mnstr_peek_error(lg->current->output_log);
3161 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
3162 0 : ATOMIC_DEC(&lg->current->refcount);
3163 : }
3164 43 : log_unlock(lg);
3165 43 : return ok;
3166 : }
3167 :
3168 : static inline bool
3169 56498 : check_rotation_conditions(logger *lg)
3170 : {
3171 56498 : if (LOG_DISABLED(lg))
3172 : return false;
3173 :
3174 56495 : if (lg->current->next)
3175 : return false; /* do not rotate if there is already a prepared next current */
3176 56495 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
3177 : return true;
3178 56495 : const lng current_file_size = (lng) getfilepos(getFile(lg->current->output_log));
3179 :
3180 56495 : if (current_file_size == -1)
3181 : return false;
3182 :
3183 56495 : assert(current_file_size >= 0);
3184 :
3185 56495 : if (current_file_size == 2)
3186 : return false;
3187 :
3188 1466 : bool res = (lg->saved_id + 1 >= lg->id && ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped) ||
3189 55950 : current_file_size > lg->max_file_size ||
3190 49888 : (GDKusec() - lg->file_age) > lg->max_file_age;
3191 :
3192 54492 : return res;
3193 : }
3194 :
3195 : gdk_return
3196 61324 : log_tend(logger *lg)
3197 : {
3198 61324 : TRC_DEBUG(WAL, "tend %d\n", lg->tid);
3199 :
3200 61324 : if (LOG_DISABLED(lg))
3201 : return GDK_SUCCEED;
3202 :
3203 56495 : gdk_return result;
3204 56495 : logformat l;
3205 56495 : l.flag = LOG_END;
3206 56495 : l.id = lg->tid;
3207 :
3208 56495 : if ((result = log_write_format(lg, &l)) == GDK_SUCCEED)
3209 56495 : ATOMIC_INC(&lg->nr_flushers);
3210 : return result;
3211 : }
3212 :
3213 : #define flush_lock(lg) MT_lock_set(&(lg)->flush_lock)
3214 : #define flush_unlock(lg) MT_lock_unset(&(lg)->flush_lock)
3215 :
3216 : static inline gdk_return
3217 56154 : do_flush(logged_range *range)
3218 : {
3219 : /* assumes flush lock */
3220 56154 : stream *output_log = range->output_log;
3221 56154 : ulng ts = ATOMIC_GET(&range->last_ts);
3222 :
3223 56154 : if (mnstr_flush(output_log, MNSTR_FLUSH_DATA) ||
3224 56154 : (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) && mnstr_fsync(output_log)))
3225 0 : return GDK_FAIL;
3226 56154 : ATOMIC_SET(&range->flushed_ts, ts);
3227 56154 : return GDK_SUCCEED;
3228 : }
3229 :
3230 : static inline void
3231 61321 : log_tdone(logger *lg, logged_range *range, ulng commit_ts)
3232 : {
3233 61321 : (void) lg;
3234 61321 : TRC_DEBUG(WAL, "tdone " LLFMT "\n", commit_ts);
3235 :
3236 61321 : if ((ulng) ATOMIC_GET(&range->last_ts) < commit_ts)
3237 60980 : ATOMIC_SET(&range->last_ts, commit_ts);
3238 61321 : }
3239 :
3240 : gdk_return
3241 61324 : log_tflush(logger *lg, ulng file_id, ulng commit_ts)
3242 : {
3243 61324 : if (lg->flushnow) {
3244 4826 : rotation_lock(lg);
3245 4826 : logged_range *p = lg->current;
3246 4826 : assert(lg->flush_ranges == lg->current);
3247 4826 : assert(ATOMIC_GET(&lg->current->flushed_ts) == ATOMIC_GET(&lg->current->last_ts));
3248 4826 : log_tdone(lg, lg->current, commit_ts);
3249 4826 : ATOMIC_SET(&lg->current->flushed_ts, commit_ts);
3250 4826 : lg->id++;
3251 4826 : lg->flushnow = false;
3252 4826 : if (log_open_output(lg) != GDK_SUCCEED)
3253 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3254 4826 : do_rotate(lg);
3255 4826 : (void) do_flush_range_cleanup(lg);
3256 4826 : assert(lg->flush_ranges == lg->current);
3257 4826 : rotation_unlock(lg);
3258 4826 : return log_commit(lg, p, NULL, 0);
3259 : }
3260 :
3261 56498 : if (LOG_DISABLED(lg))
3262 : return GDK_SUCCEED;
3263 :
3264 56495 : rotation_lock(lg);
3265 56495 : logged_range *frange = do_flush_range_cleanup(lg);
3266 :
3267 56547 : while (frange->next && frange->id < file_id) {
3268 : assert(frange->next);
3269 : frange = frange->next;
3270 : }
3271 :
3272 56495 : log_tdone(lg, frange, commit_ts);
3273 :
3274 56495 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts) {
3275 : /* delay needed ? */
3276 :
3277 56154 : flush_lock(lg);
3278 : /* check it one more time */
3279 56154 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts)
3280 56154 : do_flush(frange);
3281 56154 : flush_unlock(lg);
3282 : }
3283 : /* else somebody else has flushed our log file */
3284 :
3285 56495 : if (ATOMIC_DEC(&frange->refcount) == 1 && !LOG_DISABLED(lg)) {
3286 55320 : if (frange != lg->current && frange->output_log) {
3287 103 : close_stream(frange->output_log);
3288 103 : frange->output_log = NULL;
3289 : }
3290 : }
3291 :
3292 56495 : if (ATOMIC_DEC(&lg->nr_flushers) == 0) {
3293 : /* I am the last flusher
3294 : * if present,
3295 : * wake up the exclusive flusher in log_tstart */
3296 : /* rotation_lock is still being held */
3297 55961 : MT_cond_signal(&lg->excl_flush_cv);
3298 : }
3299 56495 : rotation_unlock(lg);
3300 :
3301 56495 : return GDK_SUCCEED;
3302 : }
3303 :
3304 : static gdk_return
3305 6433 : log_tsequence_(logger *lg, int seq, lng val)
3306 : {
3307 6433 : logformat l;
3308 :
3309 6433 : if (LOG_DISABLED(lg))
3310 : return GDK_SUCCEED;
3311 2597 : l.flag = LOG_SEQ;
3312 2597 : l.id = seq;
3313 :
3314 2597 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3315 :
3316 2597 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3317 5194 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3318 5194 : log_write_format(lg, &l) != GDK_SUCCEED ||
3319 2597 : !mnstr_writeLng(lg->current->output_log, val)) {
3320 0 : TRC_CRITICAL(GDK, "write failed\n");
3321 0 : ATOMIC_DEC(&lg->current->refcount);
3322 0 : return GDK_FAIL;
3323 : }
3324 : return GDK_SUCCEED;
3325 : }
3326 :
3327 : /* a transaction in it self */
3328 : gdk_return
3329 6433 : log_tsequence(logger *lg, int seq, lng val)
3330 : {
3331 6433 : BUN p;
3332 :
3333 6433 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3334 :
3335 6433 : log_lock(lg);
3336 6433 : MT_lock_set(&lg->seqs_id->theaplock);
3337 6433 : BUN inserted = lg->seqs_id->batInserted;
3338 6433 : MT_lock_unset(&lg->seqs_id->theaplock);
3339 6433 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE && p >= inserted) {
3340 1582 : assert(lg->seqs_val->hseqbase == 0);
3341 1582 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
3342 0 : log_unlock(lg);
3343 0 : return GDK_FAIL;
3344 : }
3345 : } else {
3346 4494 : if (p != BUN_NONE) {
3347 4494 : oid pos = p;
3348 4494 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
3349 0 : log_unlock(lg);
3350 0 : return GDK_FAIL;
3351 : }
3352 : }
3353 9702 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
3354 4851 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
3355 0 : log_unlock(lg);
3356 0 : return GDK_FAIL;
3357 : }
3358 : }
3359 6433 : gdk_return r = log_tsequence_(lg, seq, val);
3360 6433 : log_unlock(lg);
3361 6433 : return r;
3362 : }
3363 :
3364 : static gdk_return
3365 10500 : bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
3366 : {
3367 10500 : log_lock(lg);
3368 10500 : BAT *b = lg->catalog_bid;
3369 10500 : const log_bid *bids;
3370 :
3371 10500 : bids = (log_bid *) Tloc(b, 0);
3372 243822 : for (BUN p = b->batInserted, cnt = pending ? pending->cnt : BATcount(b); p < cnt; p++) {
3373 233322 : log_bid bid = bids[p];
3374 233322 : BAT *lb = BBP_desc(bid);
3375 :
3376 233322 : assert(bid);
3377 233322 : if (lb->batCacheid == 0 || BATmode(lb, false) != GDK_SUCCEED) {
3378 0 : GDKwarning("Failed to set bat (%d%s) persistent\n", bid, !lb ? " gone" : "");
3379 0 : log_unlock(lg);
3380 0 : return GDK_FAIL;
3381 : }
3382 :
3383 233322 : assert(lb->batRestricted != BAT_WRITE);
3384 :
3385 233322 : TRC_DEBUG(WAL, "create %d (%d)\n", bid, BBP_lrefs(bid));
3386 : }
3387 : /* bm_subcommit releases the lock */
3388 10500 : return bm_subcommit(lg, pending, updated, maxupdated);
3389 : }
3390 :
3391 : static gdk_return
3392 233678 : log_add_bat(logger *lg, BAT *b, log_id id, int tid)
3393 : {
3394 233678 : log_bid bid = internal_find_bat(lg, id, tid);
3395 233678 : lng cnt = 0;
3396 233678 : lng lid = lng_nil;
3397 :
3398 233678 : assert(b->batRestricted != BAT_WRITE);
3399 233678 : assert(b->batRole == PERSISTENT);
3400 233678 : if (bid < 0)
3401 : return GDK_FAIL;
3402 233678 : if (bid) {
3403 155725 : if (bid != b->batCacheid) {
3404 155722 : if (log_del_bat(lg, bid) != GDK_SUCCEED)
3405 : return GDK_FAIL;
3406 : } else {
3407 : return GDK_SUCCEED;
3408 : }
3409 : }
3410 233675 : bid = b->batCacheid;
3411 233675 : TRC_DEBUG(WAL, "create %d\n", id);
3412 233675 : assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
3413 467350 : if (BUNappend(lg->catalog_bid, &bid, true) != GDK_SUCCEED ||
3414 467350 : BUNappend(lg->catalog_id, &id, true) != GDK_SUCCEED ||
3415 467350 : BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED ||
3416 233675 : BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
3417 0 : return GDK_FAIL;
3418 233675 : if (lg->current)
3419 233393 : lg->current->cnt++;
3420 233675 : BBPretain(bid);
3421 233675 : return GDK_SUCCEED;
3422 : }
3423 :
3424 : static gdk_return
3425 176685 : log_del_bat(logger *lg, log_bid bid)
3426 : {
3427 176685 : BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid);
3428 176685 : lng lid = lg->tid;
3429 :
3430 176685 : assert(p != BUN_NONE);
3431 176685 : if (p == BUN_NONE) {
3432 : GDKerror("cannot find BAT\n");
3433 : return GDK_FAIL;
3434 : }
3435 :
3436 176685 : assert(lg->catalog_lid->hseqbase == 0);
3437 176685 : return BUNreplace(lg->catalog_lid, p, &lid, false);
3438 : }
3439 :
3440 : /* returns -1 on failure, 0 when not found, > 0 when found */
3441 : log_bid
3442 30324 : log_find_bat(logger *lg, log_id id)
3443 : {
3444 30324 : log_lock(lg);
3445 30324 : log_bid bid = internal_find_bat(lg, id, -1);
3446 30324 : log_unlock(lg);
3447 30324 : if (!bid) {
3448 0 : GDKerror("logger_find_bat failed to find bid for object %d\n", id);
3449 0 : return GDK_FAIL;
3450 : }
3451 : return bid;
3452 : }
3453 :
3454 :
3455 :
3456 : gdk_return
3457 61324 : log_tstart(logger *lg, bool flushnow, ulng *file_id)
3458 : {
3459 61324 : rotation_lock(lg);
3460 61324 : if (flushnow) {
3461 : /* I am now the exclusive flusher */
3462 4826 : if (ATOMIC_GET(&lg->nr_flushers)) {
3463 : /* I am waiting until all existing flushers are done */
3464 0 : MT_cond_wait(&lg->excl_flush_cv, &lg->rotation_lock);
3465 : }
3466 4826 : assert(ATOMIC_GET(&lg->nr_flushers) == 0);
3467 :
3468 4826 : if (ATOMIC_GET(&lg->current->last_ts)) {
3469 1809 : lg->id++;
3470 1809 : if (log_open_output(lg) != GDK_SUCCEED)
3471 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3472 : }
3473 4826 : do_rotate(lg);
3474 4826 : (void) do_flush_range_cleanup(lg);
3475 4826 : rotation_unlock(lg);
3476 :
3477 4826 : if (lg->saved_id + 1 < lg->id)
3478 4475 : log_flush(lg, (1ULL << 63));
3479 4826 : lg->flushnow = flushnow;
3480 : } else {
3481 56498 : if (check_rotation_conditions(lg)) {
3482 4604 : lg->id++;
3483 4604 : if (log_open_output(lg) != GDK_SUCCEED)
3484 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3485 : }
3486 56498 : do_rotate(lg);
3487 56498 : rotation_unlock(lg);
3488 : }
3489 :
3490 61324 : if (LOG_DISABLED(lg))
3491 : return GDK_SUCCEED;
3492 :
3493 56495 : ATOMIC_INC(&lg->current->refcount);
3494 56495 : *file_id = lg->current->id;
3495 56495 : logformat l;
3496 56495 : l.flag = LOG_START;
3497 56495 : l.id = ++lg->tid;
3498 :
3499 56495 : TRC_DEBUG(WAL, "tstart %d\n", lg->tid);
3500 56495 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3501 0 : ATOMIC_DEC(&lg->current->refcount);
3502 0 : return GDK_FAIL;
3503 : }
3504 :
3505 : return GDK_SUCCEED;
3506 : }
3507 :
3508 : void
3509 114 : log_printinfo(logger *lg)
3510 : {
3511 114 : printf("logger %s:\n", lg->fn);
3512 114 : rotation_lock(lg);
3513 114 : printf("current log file "ULLFMT", last handled log file "ULLFMT"\n",
3514 : lg->id, lg->saved_id);
3515 114 : rotation_unlock(lg);
3516 114 : printf("current transaction id %d, saved transaction id %d\n",
3517 : lg->tid, lg->saved_tid);
3518 114 : printf("number of flushers: %d\n", (int) ATOMIC_GET(&lg->nr_flushers));
3519 114 : printf("number of catalog entries "BUNFMT", of which "BUNFMT" deleted\n",
3520 114 : lg->catalog_bid->batCount, lg->dcatalog->batCount);
3521 303 : for (logged_range *p = lg->pending; p; p = p->next) {
3522 189 : char buf[32];
3523 189 : if (p->output_log == NULL ||
3524 114 : snprintf(buf, sizeof(buf), ", file size %"PRIu64, (uint64_t) getfilepos(getFile(lg->current->output_log))) >= (int) sizeof(buf))
3525 75 : buf[0] = 0;
3526 264 : printf("pending range "ULLFMT": drops %"PRIu64", last_ts %"PRIu64", flushed_ts %"PRIu64", refcount %"PRIu64"%s%s\n", p->id, (uint64_t) ATOMIC_GET(&p->drops), (uint64_t) ATOMIC_GET(&p->last_ts), (uint64_t) ATOMIC_GET(&p->flushed_ts), (uint64_t) ATOMIC_GET(&p->refcount), buf, p == lg->current ? " (current)" : "");
3527 : }
3528 114 : }
|