Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 : #include "gdk_logger.h"
17 : #include "gdk_logger_internals.h"
18 : #include "mutils.h"
19 : #include <string.h>
20 :
21 : static gdk_return log_add_bat(logger *lg, BAT *b, log_id id, int tid);
22 : static gdk_return log_del_bat(logger *lg, log_bid bid);
23 : /*
24 : * The logger uses a directory to store its log files. One master log
25 : * file stores information about the version of the logger and the
26 : * type mapping it uses. This file is a simple ascii file with the
27 : * following format:
28 : * {6DIGIT-VERSION\n[id,type_name\n]*}
29 : * The transaction log files have a binary format.
30 : */
31 :
32 : #define LOG_START 0
33 : #define LOG_END 1
34 : #define LOG_UPDATE_CONST 2
35 : #define LOG_UPDATE_BULK 3
36 : #define LOG_UPDATE 4
37 : #define LOG_CREATE 5
38 : #define LOG_DESTROY 6
39 : #define LOG_SEQ 7
40 : #define LOG_CLEAR 8 /* DEPRECATED */
41 : #define LOG_BAT_GROUP 9
42 :
43 : #ifdef NATIVE_WIN32
44 : #define getfilepos _ftelli64
45 : #else
46 : #ifdef HAVE_FSEEKO
47 : #define getfilepos ftello
48 : #else
49 : #define getfilepos ftell
50 : #endif
51 : #endif
52 :
53 : #define BATSIZE 0
54 :
55 : #define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory || (lg)->flushnow)
56 :
57 : static const char *log_commands[] = {
58 : "LOG_START",
59 : "LOG_END",
60 : "LOG_UPDATE_CONST",
61 : "LOG_UPDATE_BULK",
62 : "LOG_UPDATE",
63 : "LOG_CREATE",
64 : "LOG_DESTROY",
65 : "LOG_SEQ",
66 : "", /* LOG_CLEAR IS DEPRECATED */
67 : "LOG_BAT_GROUP",
68 : };
69 :
70 : typedef struct logaction {
71 : int type; /* type of change */
72 : lng nr;
73 : int tt;
74 : lng id;
75 : lng offset;
76 : log_id cid; /* id of object */
77 : BAT *b; /* temporary bat with changes */
78 : BAT *uid; /* temporary bat with bun positions to update */
79 : } logaction;
80 :
81 : /* during the recover process a number of transactions could be active */
82 : typedef struct trans {
83 : int tid; /* transaction id */
84 : int sz; /* sz of the changes array */
85 : int nr; /* nr of changes */
86 :
87 : logaction *changes;
88 :
89 : struct trans *tr;
90 : } trans;
91 :
92 : typedef struct logformat_t {
93 : bte flag;
94 : int id;
95 : } logformat;
96 :
97 : typedef enum { LOG_OK, LOG_EOF, LOG_ERR } log_return;
98 :
99 : static gdk_return bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated);
100 : static gdk_return tr_grow(trans *tr);
101 :
102 : #define log_lock(lg) MT_lock_set(&(lg)->lock)
103 : #define log_unlock(lg) MT_lock_unset(&(lg)->lock)
104 :
105 : static inline bte
106 947712 : find_type(logger *lg, int tpe)
107 : {
108 947712 : assert(tpe >= 0 && tpe < MAXATOMS);
109 947712 : return lg->type_id[tpe];
110 : }
111 :
112 : static inline int
113 549616 : find_type_nr(logger *lg, bte tpe)
114 : {
115 549616 : int nr = lg->type_nr[tpe < 0 ? 256 + tpe : tpe];
116 549616 : if (nr == 255)
117 0 : return -1;
118 : return nr;
119 : }
120 :
121 : static BUN
122 422017 : log_find(BAT *b, BAT *d, int val)
123 : {
124 422017 : BUN p;
125 :
126 422017 : assert(b->ttype == TYPE_int);
127 422017 : assert(d->ttype == TYPE_oid);
128 422017 : BATiter bi = bat_iterator(b);
129 422017 : if (BAThash(b) == GDK_SUCCEED) {
130 422017 : MT_rwlock_rdlock(&b->thashlock);
131 699159 : HASHloop_int(bi, b->thash, p, &val) {
132 184643 : oid pos = p;
133 184643 : if (BUNfnd(d, &pos) == BUN_NONE) {
134 184643 : MT_rwlock_rdunlock(&b->thashlock);
135 184643 : bat_iterator_end(&bi);
136 184643 : return p;
137 : }
138 : }
139 237374 : MT_rwlock_rdunlock(&b->thashlock);
140 : } else { /* unlikely: BAThash failed */
141 0 : int *t = (int *) bi.base;
142 :
143 0 : for (p = 0; p < bi.count; p++) {
144 0 : if (t[p] == val) {
145 0 : oid pos = p;
146 0 : if (BUNfnd(d, &pos) == BUN_NONE) {
147 0 : bat_iterator_end(&bi);
148 0 : return p;
149 : }
150 : }
151 : }
152 : }
153 237374 : bat_iterator_end(&bi);
154 237374 : return BUN_NONE;
155 : }
156 :
157 : static log_bid
158 656411 : internal_find_bat(logger *lg, log_id id, int tid)
159 : {
160 656411 : BUN p;
161 :
162 656411 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
163 656411 : BATiter cni = bat_iterator(lg->catalog_id);
164 656411 : MT_rwlock_rdlock(&cni.b->thashlock);
165 656411 : if (tid < 0) {
166 400533 : HASHloop_int(cni, cni.b->thash, p, &id) {
167 210603 : oid pos = p;
168 210603 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
169 210603 : MT_rwlock_rdunlock(&cni.b->thashlock);
170 210603 : bat_iterator_end(&cni);
171 210603 : return *(log_bid *) Tloc(lg->catalog_bid, p);
172 : }
173 : }
174 : } else {
175 364569 : BUN cp = BUN_NONE;
176 2720735 : HASHloop_int(cni, cni.b->thash, p, &id) {
177 2115357 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
178 2115357 : if (lid != lng_nil && lid <= tid) {
179 : break;
180 : }
181 : cp = p;
182 : }
183 364569 : if (cp != BUN_NONE) {
184 364313 : MT_rwlock_rdunlock(&cni.b->thashlock);
185 364313 : bat_iterator_end(&cni);
186 364313 : return *(log_bid *) Tloc(lg->catalog_bid, cp);
187 : }
188 : }
189 81495 : MT_rwlock_rdunlock(&cni.b->thashlock);
190 81495 : bat_iterator_end(&cni);
191 81495 : return 0; /* not found */
192 : }
193 : return -1; /* error creating hash */
194 : }
195 :
196 : static inline void
197 13792 : logbat_destroy(BAT *b)
198 : {
199 17030 : BBPreclaim(b);
200 3816 : }
201 :
202 : static BAT *
203 13685 : logbat_new(int tt, BUN size, role_t role)
204 : {
205 13685 : BAT *nb = COLnew(0, tt, size, role);
206 :
207 13685 : if (nb) {
208 13685 : BBP_pid(nb->batCacheid) = 0;
209 13685 : if (role == PERSISTENT) {
210 8615 : BATmode(nb, false);
211 8615 : nb = BATsetaccess(nb, BAT_READ);
212 : }
213 : } else {
214 0 : TRC_CRITICAL(GDK, "creating new BAT[%s]#" BUNFMT " failed\n", ATOMname(tt), size);
215 : }
216 13685 : return nb;
217 : }
218 :
219 : static bool
220 762754 : log_read_format(logger *lg, logformat *data)
221 : {
222 762754 : assert(!lg->inmemory);
223 762754 : if (mnstr_read(lg->input_log, &data->flag, 1, 1) == 1) {
224 750461 : if (mnstr_readInt(lg->input_log, &data->id) == 1)
225 : return true;
226 : /* could only read part, so complain */
227 0 : TRC_CRITICAL(GDK, "read failed\n");
228 : }
229 : return false;
230 : }
231 :
232 : static gdk_return
233 755473 : log_write_format(logger *lg, logformat *data)
234 : {
235 755473 : assert(data->id || data->flag);
236 755473 : assert(!lg->inmemory);
237 755473 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
238 1510946 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
239 1510946 : mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
240 755473 : mnstr_writeInt(lg->current->output_log, data->id))
241 : return GDK_SUCCEED;
242 0 : TRC_CRITICAL(GDK, "write failed\n");
243 0 : return GDK_FAIL;
244 : }
245 :
246 : static log_return
247 2771 : log_read_seq(logger *lg, logformat *l)
248 : {
249 2771 : int seq = l->id;
250 2771 : lng val;
251 2771 : BUN p;
252 :
253 2771 : assert(!lg->inmemory);
254 2771 : if (mnstr_readLng(lg->input_log, &val) != 1) {
255 0 : TRC_CRITICAL(GDK, "read failed\n");
256 0 : return LOG_EOF;
257 : }
258 2771 : if (lg->flushing)
259 : return LOG_OK;
260 :
261 65 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
262 64 : p >= lg->seqs_id->batInserted) {
263 40 : assert(lg->seqs_val->hseqbase == 0);
264 40 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
265 0 : TRC_CRITICAL(GDK, "replace of %s_seqs_val failed\n", lg->fn);
266 0 : return LOG_ERR;
267 : }
268 : } else {
269 24 : if (p != BUN_NONE) {
270 24 : oid pos = p;
271 24 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
272 0 : TRC_CRITICAL(GDK, "append to %s_dseqs failed\n", lg->fn);
273 0 : return LOG_ERR;
274 : }
275 : }
276 50 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
277 25 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
278 0 : TRC_CRITICAL(GDK, "append to %s_seqs_val/id failed\n", lg->fn);
279 0 : return LOG_ERR;
280 : }
281 : }
282 : return LOG_OK;
283 : }
284 :
285 : #if 0
286 : static gdk_return
287 : log_write_id(logger *lg, int id)
288 : {
289 : assert(!lg->inmemory);
290 : assert(id >= 0);
291 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
292 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
293 : mnstr_writeInt(lg->current->output_log, id))
294 : return GDK_SUCCEED;
295 : TRC_CRITICAL(GDK, "write failed\n");
296 : return GDK_FAIL;
297 : }
298 :
299 : static log_return
300 : log_read_id(logger *lg, log_id *id)
301 : {
302 : assert(!lg->inmemory);
303 : if (mnstr_readInt(lg->input_log, id) != 1) {
304 : TRC_CRITICAL(GDK, "read failed\n");
305 : return LOG_EOF;
306 : }
307 : return LOG_OK;
308 : }
309 : #endif
310 :
311 : static log_return
312 19080 : string_reader(logger *lg, BAT *b, lng nr)
313 : {
314 19080 : size_t sz = 0;
315 19080 : lng SZ = 0;
316 19080 : log_return res = LOG_OK;
317 :
318 38243 : while (nr && res == LOG_OK) {
319 19163 : if (mnstr_readLng(lg->input_log, &SZ) != 1) {
320 0 : TRC_CRITICAL(GDK, "read failed\n");
321 0 : return LOG_EOF;
322 : }
323 19163 : sz = (size_t) SZ;
324 19163 : char *buf = lg->rbuf;
325 19163 : if (lg->rbufsize < sz) {
326 2 : if (!(buf = GDKrealloc(lg->rbuf, sz))) {
327 0 : TRC_CRITICAL(GDK, "couldn't grow string buffer\n");
328 0 : return LOG_ERR;
329 : }
330 2 : lg->rbuf = buf;
331 2 : lg->rbufsize = sz;
332 : }
333 :
334 19163 : if (mnstr_read(lg->input_log, buf, sz, 1) != 1) {
335 0 : TRC_CRITICAL(GDK, "read failed\n");
336 0 : return LOG_EOF;
337 : }
338 : /* handle strings */
339 : char *t = buf;
340 : /* chunked */
341 : #define CHUNK_SIZE 1024
342 : char *strings[CHUNK_SIZE];
343 : int cur = 0;
344 :
345 70532 : for (; nr > 0 && res == LOG_OK && t < (buf + sz); nr--) {
346 51369 : strings[cur++] = t;
347 51369 : if (cur == CHUNK_SIZE &&
348 4 : b &&
349 4 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
350 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
351 0 : res = LOG_ERR;
352 : }
353 51369 : if (cur == CHUNK_SIZE)
354 14 : cur = 0;
355 : /* find next */
356 5537468 : while (*t)
357 5486099 : t++;
358 51369 : t++;
359 : }
360 19163 : if (cur &&
361 332 : b &&
362 332 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
363 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
364 0 : res = LOG_ERR;
365 : }
366 : }
367 : return res;
368 : }
369 :
370 :
371 : struct offset {
372 : lng os; /* offset within source BAT in logfile */
373 : lng nr; /* number of values to be copied */
374 : lng od; /* offset within destination BAT in database */
375 : };
376 :
377 : static log_return
378 392395 : log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, BAT **cands)
379 : {
380 392395 : log_return res = LOG_OK;
381 392395 : lng nr, pnr;
382 392395 : bte type_id = -1;
383 392395 : int tpe;
384 :
385 392395 : assert(!lg->inmemory);
386 392395 : TRC_DEBUG(WAL, "found %d %s", id, l->flag == LOG_UPDATE ? "update" : "update_buld");
387 :
388 784790 : if (mnstr_readLng(lg->input_log, &nr) != 1 ||
389 392395 : mnstr_read(lg->input_log, &type_id, 1, 1) != 1) {
390 0 : TRC_CRITICAL(GDK, "read failed\n");
391 0 : return LOG_EOF;
392 : }
393 :
394 392395 : pnr = nr;
395 392395 : tpe = find_type_nr(lg, type_id);
396 392395 : if (tpe >= 0) {
397 392395 : BAT *uid = NULL;
398 392395 : BAT *r = NULL;
399 392395 : void *(*rt)(ptr, size_t *, stream *, size_t) = BATatoms[tpe].atomRead;
400 392395 : lng offset;
401 :
402 392395 : assert(nr <= (lng) BUN_MAX);
403 392395 : if (!lg->flushing && l->flag == LOG_UPDATE) {
404 0 : uid = COLnew(0, TYPE_oid, (BUN) nr, PERSISTENT);
405 0 : if (uid == NULL) {
406 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
407 28138 : return LOG_ERR;
408 : }
409 : }
410 :
411 392395 : if (l->flag == LOG_UPDATE_CONST) {
412 124192 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
413 0 : TRC_CRITICAL(GDK, "read failed\n");
414 0 : return LOG_EOF;
415 : }
416 124192 : if (cands) {
417 : /* This const range actually represents a segment of candidates corresponding to updated bat entries */
418 :
419 28138 : if (BATcount(*cands) == 0 || lg->flushing) {
420 : /* when flushing, we only need the offset and count of the last segment of inserts. */
421 28138 : assert((*cands)->ttype == TYPE_void);
422 28138 : BATtseqbase(*cands, (oid) offset);
423 28138 : BATsetcount(*cands, (BUN) nr);
424 0 : } else if (!lg->flushing) {
425 0 : assert(BATcount(*cands) > 0);
426 0 : BAT *dense = BATdense(0, (oid) offset, (BUN) nr);
427 0 : BAT *newcands = NULL;
428 0 : if (!dense) {
429 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
430 0 : res = LOG_ERR;
431 0 : } else if ((*cands)->ttype == TYPE_void) {
432 0 : if ((newcands = BATmergecand(*cands, dense))) {
433 0 : BBPreclaim(*cands);
434 0 : *cands = newcands;
435 : } else {
436 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
437 0 : res = LOG_ERR;
438 : }
439 : } else {
440 0 : assert((*cands)->ttype == TYPE_oid);
441 0 : assert(BATcount(*cands) > 0);
442 0 : if (BATappend(*cands, dense, NULL, true) != GDK_SUCCEED) {
443 0 : TRC_CRITICAL(GDK, "appending to bat failed\n");
444 0 : res = LOG_ERR;
445 : }
446 : }
447 0 : BBPreclaim(dense);
448 : }
449 :
450 : /* We have to read the value to update the read cursor */
451 28138 : size_t tlen = lg->rbufsize;
452 28138 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
453 28138 : if (t == NULL) {
454 0 : TRC_CRITICAL(GDK, "read failed\n");
455 0 : res = LOG_EOF;
456 : }
457 28138 : return res;
458 : }
459 : }
460 :
461 364257 : if (!lg->flushing) {
462 1397 : r = COLnew(0, tpe, (BUN) nr, PERSISTENT);
463 1397 : if (r == NULL) {
464 0 : if (uid)
465 0 : BBPreclaim(uid);
466 0 : return LOG_ERR;
467 : }
468 : }
469 :
470 364257 : if (l->flag == LOG_UPDATE_CONST) {
471 96054 : size_t tlen = lg->rbufsize;
472 96054 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
473 96054 : if (t == NULL) {
474 0 : TRC_CRITICAL(GDK, "read failed\n");
475 0 : res = LOG_EOF;
476 : } else {
477 96054 : lg->rbuf = t;
478 96054 : lg->rbufsize = tlen;
479 96054 : if (r) {
480 8183 : for (BUN p = 0; p < (BUN) nr; p++) {
481 7912 : if (BUNappend(r, t, true) != GDK_SUCCEED) {
482 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
483 0 : res = LOG_ERR;
484 : }
485 : }
486 : }
487 : }
488 268203 : } else if (l->flag == LOG_UPDATE_BULK) {
489 268185 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
490 0 : if (r)
491 0 : BBPreclaim(r);
492 0 : TRC_CRITICAL(GDK, "read failed\n");
493 0 : return LOG_EOF;
494 : }
495 268185 : if (tpe == TYPE_msk) {
496 0 : if (r) {
497 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
498 0 : BATsetcount(r, (BUN) nr);
499 : else {
500 0 : TRC_CRITICAL(GDK, "read failed\n");
501 0 : res = LOG_EOF;
502 : }
503 : } else {
504 0 : size_t tlen = lg->rbufsize / sizeof(int);
505 0 : size_t cnt = 0, snr = (size_t) nr;
506 0 : snr = (snr + 31) / 32;
507 0 : assert(tlen);
508 0 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
509 0 : cnt = snr > tlen ? tlen : snr;
510 0 : if (!mnstr_readIntArray(lg->input_log, lg->rbuf, cnt)) {
511 0 : TRC_CRITICAL(GDK, "read failed\n");
512 0 : res = LOG_EOF;
513 : }
514 : }
515 : }
516 : } else {
517 268185 : if (!ATOMvarsized(tpe)) {
518 248523 : size_t cnt = 0, snr = (size_t) nr;
519 248523 : size_t tlen = lg->rbufsize / ATOMsize(tpe), ntlen = lg->rbufsize;
520 248523 : assert(tlen);
521 : /* read in chunks of max
522 : * BUFSIZE/width rows */
523 497049 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
524 248526 : cnt = snr > tlen ? tlen : snr;
525 248526 : void *t = rt(lg->rbuf, &ntlen, lg->input_log, cnt);
526 :
527 248526 : if (t == NULL) {
528 : res = LOG_EOF;
529 : break;
530 : }
531 248526 : assert(t == lg->rbuf);
532 248526 : if (r && BUNappendmulti(r, t, cnt, true) != GDK_SUCCEED) {
533 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
534 0 : res = LOG_ERR;
535 : }
536 : }
537 19662 : } else if (tpe == TYPE_str) {
538 : /* efficient string */
539 19074 : res = string_reader(lg, r, nr);
540 : } else {
541 1218 : for (; res == LOG_OK && nr > 0; nr--) {
542 630 : size_t tlen = lg->rbufsize;
543 630 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
544 :
545 630 : if (t == NULL) {
546 : /* see if failure was due to
547 : * malloc or something less
548 : * serious (in the current
549 : * context) */
550 0 : if (strstr(GDKerrbuf, "alloc") == NULL)
551 : res = LOG_EOF;
552 : else
553 0 : res = LOG_ERR;
554 0 : TRC_CRITICAL(GDK, "read failed\n");
555 : } else {
556 630 : lg->rbuf = t;
557 630 : lg->rbufsize = tlen;
558 630 : if (r && BUNappend(r, t, true) != GDK_SUCCEED) {
559 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
560 0 : res = LOG_ERR;
561 : }
562 : }
563 : }
564 : }
565 : }
566 : } else {
567 18 : void *(*rh)(ptr, size_t *, stream *, size_t) = BATatoms[TYPE_oid].atomRead;
568 18 : void *hv = ATOMnil(TYPE_oid);
569 18 : offset = 0;
570 :
571 18 : if (hv == NULL) {
572 0 : TRC_CRITICAL(GDK, "read failed\n");
573 0 : res = LOG_EOF;
574 : }
575 1071 : for (; res == LOG_OK && nr > 0; nr--) {
576 1053 : size_t hlen = sizeof(oid);
577 1053 : void *h = rh(hv, &hlen, lg->input_log, 1);
578 1053 : assert(hlen == sizeof(oid));
579 1053 : assert(h == hv);
580 1053 : if ((uid && BUNappend(uid, h, true) != GDK_SUCCEED)) {
581 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
582 0 : res = LOG_ERR;
583 : }
584 : }
585 18 : nr = pnr;
586 18 : if (tpe == TYPE_msk) {
587 0 : if (r) {
588 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
589 0 : BATsetcount(r, (BUN) nr);
590 : else {
591 0 : TRC_CRITICAL(GDK, "read failed\n");
592 0 : res = LOG_EOF;
593 : }
594 : } else {
595 0 : for (lng i = 0; i < nr; i += 32) {
596 0 : int v;
597 0 : switch (mnstr_readInt(lg->input_log, &v)) {
598 0 : case 1:
599 0 : continue;
600 : case 0:
601 : res = LOG_EOF;
602 : break;
603 : default:
604 : res = LOG_ERR;
605 : break;
606 : }
607 0 : TRC_CRITICAL(GDK, "read failed\n");
608 0 : break;
609 : }
610 : }
611 18 : } else if (tpe == TYPE_str) {
612 : /* efficient string */
613 6 : res = string_reader(lg, r, nr);
614 : } else {
615 58 : for (; res == LOG_OK && nr > 0; nr--) {
616 46 : size_t tlen = lg->rbufsize;
617 46 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
618 :
619 46 : if (t == NULL) {
620 0 : if (strstr(GDKerrbuf, "malloc") == NULL)
621 : res = LOG_EOF;
622 : else
623 0 : res = LOG_ERR;
624 0 : TRC_CRITICAL(GDK, "read failed\n");
625 : } else {
626 46 : lg->rbuf = t;
627 46 : lg->rbufsize = tlen;
628 46 : if ((r && BUNappend(r, t, true) != GDK_SUCCEED)) {
629 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
630 0 : res = LOG_ERR;
631 : }
632 : }
633 : }
634 : }
635 18 : GDKfree(hv);
636 : }
637 :
638 364257 : if (res == LOG_OK) {
639 364257 : if (tr_grow(tr) == GDK_SUCCEED) {
640 364257 : tr->changes[tr->nr].type = l->flag;
641 364257 : if (l->flag == LOG_UPDATE_BULK && offset == -1) {
642 138757 : assert(cands); /* bat r is part of a group of bats logged together. */
643 138757 : struct canditer ci;
644 138757 : canditer_init(&ci, NULL, *cands);
645 138757 : const oid first = canditer_peek(&ci);
646 138757 : const oid last = canditer_last(&ci);
647 138757 : offset = (lng) first;
648 138757 : pnr = (lng) (last - first) + 1;
649 138757 : if (!lg->flushing) {
650 1022 : assert(uid == NULL);
651 1022 : uid = *cands;
652 1022 : BBPfix((*cands)->batCacheid);
653 1022 : tr->changes[tr->nr].type = LOG_UPDATE;
654 : }
655 : }
656 364257 : if (l->flag == LOG_UPDATE_CONST) {
657 96054 : assert(!cands); /* TODO: This might change in the future. */
658 96054 : tr->changes[tr->nr].type = LOG_UPDATE_BULK;
659 : }
660 364257 : tr->changes[tr->nr].nr = pnr;
661 364257 : tr->changes[tr->nr].tt = tpe;
662 364257 : tr->changes[tr->nr].cid = id;
663 364257 : tr->changes[tr->nr].offset = offset;
664 364257 : tr->changes[tr->nr].b = r;
665 364257 : tr->changes[tr->nr].uid = uid;
666 364257 : tr->nr++;
667 : } else {
668 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
669 0 : res = LOG_ERR;
670 : }
671 : }
672 364257 : if (res != LOG_OK) {
673 0 : if (r)
674 0 : BBPreclaim(r);
675 0 : if (cands && uid)
676 0 : BBPunfix((*cands)->batCacheid);
677 0 : else if (uid)
678 0 : BBPreclaim(uid);
679 : }
680 : } else {
681 : /* bat missing ERROR or ignore ? currently error. */
682 0 : TRC_CRITICAL(GDK, "unknown type\n");
683 0 : res = LOG_ERR;
684 : }
685 : return res;
686 : }
687 :
688 :
689 : static gdk_return
690 593699 : la_bat_update_count(logger *lg, log_id id, lng cnt, int tid)
691 : {
692 593699 : BATiter cni = bat_iterator_nolock(lg->catalog_id);
693 :
694 593699 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
695 593699 : MT_rwlock_rdlock(&cni.b->thashlock);
696 593699 : BUN p, cp = BUN_NONE;
697 :
698 3357279 : HASHloop_int(cni, cni.b->thash, p, &id) {
699 2346016 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
700 :
701 2346016 : if (lid != lng_nil && lid <= tid)
702 : break;
703 : cp = p;
704 : }
705 593699 : if (cp != BUN_NONE) {
706 593497 : lng ocnt = *(lng *) Tloc(lg->catalog_cnt, cp);
707 593497 : assert(lg->catalog_cnt->hseqbase == 0);
708 593497 : if (ocnt < cnt && BUNreplace(lg->catalog_cnt, cp, &cnt, false) != GDK_SUCCEED) {
709 0 : MT_rwlock_rdunlock(&cni.b->thashlock);
710 0 : return GDK_FAIL;
711 : }
712 : }
713 593699 : MT_rwlock_rdunlock(&cni.b->thashlock);
714 593699 : return GDK_SUCCEED;
715 : }
716 : return GDK_FAIL;
717 : }
718 :
719 : static gdk_return
720 364257 : la_bat_updates(logger *lg, logaction *la, int tid)
721 : {
722 364257 : log_bid bid = internal_find_bat(lg, la->cid, tid);
723 364257 : BAT *b = NULL;
724 :
725 364257 : if (bid < 0)
726 : return GDK_FAIL;
727 364257 : if (!bid) {
728 : /* object already gone, nothing needed */
729 : return GDK_SUCCEED;
730 : }
731 :
732 364257 : if (!lg->flushing) {
733 1397 : b = BATdescriptor(bid);
734 1397 : if (b == NULL)
735 : return GDK_FAIL;
736 : }
737 364257 : BUN cnt = 0;
738 364257 : if (la->type == LOG_UPDATE_BULK) {
739 363217 : if (!lg->flushing) {
740 375 : cnt = BATcount(b);
741 375 : int is_msk = (b->ttype == TYPE_msk);
742 : /* handle offset 0 ie clear */
743 375 : if ( /* DISABLES CODE */ (0) && la->offset == 0 && cnt)
744 : BATclear(b, true);
745 : /* handle offset */
746 375 : if (cnt <= (BUN) la->offset) {
747 234 : msk t = 1;
748 234 : if (cnt < (BUN) la->offset) { /* insert nils */
749 0 : const void *tv = (is_msk) ? &t : ATOMnilptr(b->ttype);
750 0 : lng i, d = la->offset - BATcount(b);
751 0 : for (i = 0; i < d; i++) {
752 0 : if (BUNappend(b, tv, true) != GDK_SUCCEED) {
753 0 : logbat_destroy(b);
754 0 : return GDK_FAIL;
755 : }
756 : }
757 : }
758 234 : if (BATcount(b) == (BUN) la->offset && BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
759 0 : logbat_destroy(b);
760 0 : return GDK_FAIL;
761 : }
762 : } else {
763 141 : BATiter vi = bat_iterator(la->b);
764 141 : BUN p, q;
765 :
766 2201 : for (p = 0, q = (BUN) la->offset; p < (BUN) la->nr; p++, q++) {
767 2060 : const void *t = BUNtail(vi, p);
768 :
769 2060 : if (q < cnt) {
770 2059 : if (b->tnosorted == q)
771 7 : b->tnosorted = 0;
772 2059 : if (b->tnorevsorted == q)
773 7 : b->tnorevsorted = 0;
774 2059 : if (b->tnokey[0] == q ||
775 2052 : b->tnokey[1] == q) {
776 7 : b->tnokey[0] = 0;
777 7 : b->tnokey[1] = 0;
778 : }
779 2059 : b->tkey = false;
780 2059 : b->tsorted = false;
781 2059 : b->tkey = false;
782 2059 : if (BUNreplace(b, q, t, true) != GDK_SUCCEED) {
783 0 : logbat_destroy(b);
784 0 : bat_iterator_end(&vi);
785 0 : return GDK_FAIL;
786 : }
787 : } else {
788 1 : if (BUNappend(b, t, true) != GDK_SUCCEED) {
789 0 : logbat_destroy(b);
790 0 : bat_iterator_end(&vi);
791 0 : return GDK_FAIL;
792 : }
793 : }
794 : }
795 141 : bat_iterator_end(&vi);
796 : }
797 : }
798 1040 : } else if (la->type == LOG_UPDATE) {
799 1040 : if (!lg->flushing && BATupdate(b, la->uid, la->b, true) != GDK_SUCCEED) {
800 : return GDK_FAIL;
801 : }
802 : }
803 364257 : cnt = (BUN) (la->offset + la->nr);
804 364257 : if (la_bat_update_count(lg, la->cid, cnt, tid) != GDK_SUCCEED) {
805 0 : if (b)
806 0 : logbat_destroy(b);
807 0 : return GDK_FAIL;
808 : }
809 364257 : if (b)
810 1397 : logbat_destroy(b);
811 : return GDK_SUCCEED;
812 : }
813 :
814 : static log_return
815 9928 : log_read_destroy(logger *lg, trans *tr, log_id id)
816 : {
817 9928 : (void) lg;
818 9928 : assert(!lg->inmemory);
819 9928 : if (tr_grow(tr) == GDK_SUCCEED) {
820 9928 : tr->changes[tr->nr].type = LOG_DESTROY;
821 9928 : tr->changes[tr->nr].cid = id;
822 9928 : tr->nr++;
823 9928 : return LOG_OK;
824 : }
825 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
826 0 : return LOG_ERR;
827 : }
828 :
829 : static gdk_return
830 48 : la_bat_destroy(logger *lg, logaction *la, int tid)
831 : {
832 48 : log_bid bid = internal_find_bat(lg, la->cid, tid);
833 :
834 48 : if (bid < 0)
835 : return GDK_FAIL;
836 48 : if (!bid) {
837 : #ifndef NDEBUG
838 0 : GDKwarning("failed to find bid for object %d\n", la->cid);
839 : #endif
840 0 : return GDK_SUCCEED;
841 : }
842 48 : if (bid && log_del_bat(lg, bid) != GDK_SUCCEED)
843 : return GDK_FAIL;
844 : return GDK_SUCCEED;
845 : }
846 :
847 : static log_return
848 157221 : log_read_create(logger *lg, trans *tr, log_id id)
849 : {
850 157221 : bte tt;
851 157221 : int tpe;
852 :
853 157221 : assert(!lg->inmemory);
854 157221 : TRC_DEBUG(WAL, "create %d", id);
855 :
856 157221 : if (mnstr_read(lg->input_log, &tt, 1, 1) != 1) {
857 0 : TRC_CRITICAL(GDK, "read failed\n");
858 0 : return LOG_EOF;
859 : }
860 :
861 157221 : tpe = find_type_nr(lg, tt);
862 : /* read create */
863 157221 : if (tr_grow(tr) == GDK_SUCCEED) {
864 157221 : tr->changes[tr->nr].type = LOG_CREATE;
865 157221 : tr->changes[tr->nr].tt = tpe;
866 157221 : tr->changes[tr->nr].cid = id;
867 157221 : tr->nr++;
868 157221 : return LOG_OK;
869 : }
870 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
871 0 : return LOG_ERR;
872 : }
873 :
874 : static gdk_return
875 264 : la_bat_create(logger *lg, logaction *la, int tid)
876 : {
877 264 : BAT *b;
878 :
879 : /* formerly head column type, should be void */
880 264 : if ((b = COLnew(0, la->tt, BATSIZE, PERSISTENT)) == NULL)
881 : return GDK_FAIL;
882 :
883 264 : if (la->tt < 0)
884 0 : BATtseqbase(b, 0);
885 :
886 528 : if ((b = BATsetaccess(b, BAT_READ)) == NULL ||
887 264 : log_add_bat(lg, b, la->cid, tid) != GDK_SUCCEED) {
888 0 : logbat_destroy(b);
889 0 : return GDK_FAIL;
890 : }
891 264 : logbat_destroy(b);
892 264 : return GDK_SUCCEED;
893 : }
894 :
895 : static gdk_return
896 242 : log_write_new_types(logger *lg, FILE *fp)
897 : {
898 242 : bte id = 0;
899 :
900 : /* write types and insert into bats */
901 242 : memset(lg->type_id, -1, sizeof(lg->type_id));
902 242 : memset(lg->type_nr, 255, sizeof(lg->type_nr));
903 : /* first the fixed sized types */
904 6774 : for (int i = 0; i < GDKatomcnt; i++) {
905 6532 : if (ATOMvarsized(i))
906 1451 : continue;
907 5081 : lg->type_id[i] = id;
908 5081 : lg->type_nr[id] = i;
909 5081 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
910 : return GDK_FAIL;
911 5081 : id++;
912 : }
913 : /* second the var sized types */
914 : id = -127; /* start after nil */
915 6774 : for (int i = 0; i < GDKatomcnt; i++) {
916 6532 : if (!ATOMvarsized(i))
917 5081 : continue;
918 1451 : lg->type_id[i] = id;
919 1451 : lg->type_nr[256 + id] = i;
920 1451 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
921 : return GDK_FAIL;
922 1451 : id++;
923 : }
924 : return GDK_SUCCEED;
925 : }
926 :
927 : #define TR_SIZE 1024
928 :
929 : static trans *
930 56487 : tr_create(trans *tr, int tid)
931 : {
932 56487 : trans *ntr = GDKmalloc(sizeof(trans));
933 :
934 56487 : if (ntr == NULL)
935 : return NULL;
936 56487 : ntr->tid = tid;
937 56487 : ntr->sz = TR_SIZE;
938 56487 : ntr->nr = 0;
939 56487 : ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
940 56487 : if (ntr->changes == NULL) {
941 0 : GDKfree(ntr);
942 0 : return NULL;
943 : }
944 56487 : ntr->tr = tr;
945 56487 : return ntr;
946 : }
947 :
948 : static gdk_return
949 531406 : la_apply(logger *lg, logaction *c, int tid)
950 : {
951 531406 : gdk_return ret = GDK_SUCCEED;
952 :
953 531406 : switch (c->type) {
954 364257 : case LOG_UPDATE_BULK:
955 : case LOG_UPDATE:
956 364257 : ret = la_bat_updates(lg, c, tid);
957 364257 : break;
958 157221 : case LOG_CREATE:
959 157221 : if (!lg->flushing)
960 264 : ret = la_bat_create(lg, c, tid);
961 : break;
962 9928 : case LOG_DESTROY:
963 9928 : if (!lg->flushing)
964 48 : ret = la_bat_destroy(lg, c, tid);
965 : break;
966 : default:
967 0 : MT_UNREACHABLE();
968 : }
969 531406 : return ret;
970 : }
971 :
972 : static void
973 531406 : la_destroy(logaction *c)
974 : {
975 531406 : if ((c->type == LOG_UPDATE || c->type == LOG_UPDATE_BULK) && c->b)
976 1397 : logbat_destroy(c->b);
977 531406 : if (c->type == LOG_UPDATE && c->uid)
978 1022 : logbat_destroy(c->uid);
979 531406 : }
980 :
981 : static gdk_return
982 531406 : tr_grow(trans *tr)
983 : {
984 531406 : if (tr->nr == tr->sz) {
985 8 : logaction *changes;
986 8 : tr->sz <<= 1;
987 8 : changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
988 8 : if (changes == NULL)
989 : return GDK_FAIL;
990 8 : tr->changes = changes;
991 : }
992 : /* cleanup the next */
993 531406 : tr->changes[tr->nr].b = NULL;
994 531406 : return GDK_SUCCEED;
995 : }
996 :
997 : static trans *
998 56487 : tr_destroy(trans *tr)
999 : {
1000 56487 : trans *r = tr->tr;
1001 :
1002 56487 : GDKfree(tr->changes);
1003 56487 : GDKfree(tr);
1004 56487 : return r;
1005 : }
1006 :
1007 : static trans *
1008 0 : tr_abort_(logger *lg, trans *tr, int s)
1009 : {
1010 0 : int i;
1011 :
1012 0 : (void) lg;
1013 :
1014 0 : TRC_DEBUG(WAL, "abort");
1015 :
1016 0 : for (i = s; i < tr->nr; i++)
1017 0 : la_destroy(&tr->changes[i]);
1018 0 : return tr_destroy(tr);
1019 : }
1020 :
1021 : static trans *
1022 0 : tr_abort(logger *lg, trans *tr)
1023 : {
1024 0 : return tr_abort_(lg, tr, 0);
1025 : }
1026 :
1027 : static trans *
1028 56487 : tr_commit(logger *lg, trans *tr)
1029 : {
1030 56487 : int i;
1031 :
1032 56487 : TRC_DEBUG(WAL, "commit");
1033 :
1034 587893 : for (i = 0; i < tr->nr; i++) {
1035 531406 : if (la_apply(lg, &tr->changes[i], tr->tid) != GDK_SUCCEED) {
1036 0 : TRC_CRITICAL(GDK, "aborting transaction\n");
1037 0 : do {
1038 0 : tr = tr_abort_(lg, tr, i);
1039 0 : } while (tr != NULL);
1040 : return (trans *) -1;
1041 : }
1042 531406 : la_destroy(&tr->changes[i]);
1043 : }
1044 56487 : lg->saved_tid = tr->tid;
1045 56487 : return tr_destroy(tr);
1046 : }
1047 :
1048 : static gdk_return
1049 125 : log_read_types_file(logger *lg, FILE *fp, int version, bool *needsnew)
1050 : {
1051 125 : int id = 0;
1052 125 : char atom_name[IDLENGTH];
1053 125 : bool seen_geom = false;
1054 :
1055 : /* scanf should use IDLENGTH somehow */
1056 3528 : while (fscanf(fp, "%d,%63s\n", &id, atom_name) == 2) {
1057 3403 : if (version < 52303 && strcmp(atom_name, "BAT") == 0) {
1058 8 : *needsnew = true;
1059 8 : continue;
1060 : }
1061 3395 : if (version < 52304 && strcmp(atom_name, "color") == 0) {
1062 16 : *needsnew = true;
1063 16 : continue;
1064 : }
1065 456 : if (version < 52304 && strcmp(atom_name, "identifier") == 0) {
1066 16 : *needsnew = true;
1067 16 : continue;
1068 : }
1069 3363 : if (version < 52304 && strcmp(atom_name, "wkba") == 0) {
1070 16 : *needsnew = true;
1071 16 : continue;
1072 : }
1073 3347 : int i = ATOMindex(atom_name);
1074 :
1075 3347 : if (id < -127 || id > 127 || i < 0) {
1076 0 : GDKerror("unknown type in log file '%s'\n", atom_name);
1077 0 : return GDK_FAIL;
1078 : }
1079 3347 : seen_geom |= strcmp(atom_name, "mbr") == 0 || strcmp(atom_name, "wkb") == 0;
1080 3347 : lg->type_id[i] = (int8_t) id;
1081 3347 : lg->type_nr[id < 0 ? 256 + id : id] = i;
1082 : }
1083 : #ifdef HAVE_GEOM
1084 125 : if (!seen_geom && ATOMindex("mbr") > 0) {
1085 0 : GDKerror("incompatible database: server supports GEOM, but database does not\n");
1086 0 : return GDK_FAIL;
1087 : }
1088 : #endif
1089 : (void) seen_geom;
1090 : return GDK_SUCCEED;
1091 : }
1092 :
1093 :
1094 : static gdk_return
1095 242 : log_create_types_file(logger *lg, const char *filename)
1096 : {
1097 242 : FILE *fp;
1098 :
1099 242 : if ((fp = MT_fopen(filename, "w")) == NULL) {
1100 0 : GDKerror("cannot create log file %s\n", filename);
1101 0 : return GDK_FAIL;
1102 : }
1103 242 : if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
1104 0 : fclose(fp);
1105 0 : GDKerror("writing log file %s failed", filename);
1106 0 : if (MT_remove(filename) < 0)
1107 0 : GDKsyserror("remove %s failed\n", filename);
1108 0 : return GDK_FAIL;
1109 : }
1110 :
1111 242 : if (log_write_new_types(lg, fp) != GDK_SUCCEED) {
1112 0 : fclose(fp);
1113 0 : GDKerror("writing log file %s failed", filename);
1114 0 : if (MT_remove(filename) < 0)
1115 0 : GDKsyserror("remove %s failed\n", filename);
1116 0 : return GDK_FAIL;
1117 : }
1118 242 : if (fflush(fp) < 0 || (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)
1119 : #if defined(_MSC_VER)
1120 : && _commit(_fileno(fp)) < 0
1121 : #elif defined(HAVE_FDATASYNC)
1122 0 : && fdatasync(fileno(fp)) < 0
1123 : #elif defined(HAVE_FSYNC)
1124 : && fsync(fileno(fp)) < 0
1125 : #endif
1126 : )) {
1127 0 : GDKsyserror("flushing log file %s failed", filename);
1128 0 : fclose(fp);
1129 0 : if (MT_remove(filename) < 0)
1130 0 : GDKsyserror("remove %s failed\n", filename);
1131 0 : return GDK_FAIL;
1132 : }
1133 242 : if (fclose(fp) < 0) {
1134 0 : GDKsyserror("closing log file %s failed", filename);
1135 0 : if (MT_remove(filename) < 0)
1136 0 : GDKsyserror("remove %s failed\n", filename);
1137 0 : return GDK_FAIL;
1138 : }
1139 : return GDK_SUCCEED;
1140 : }
1141 :
1142 : #define rotation_lock(lg) MT_lock_set(&(lg)->rotation_lock)
1143 : #define rotation_unlock(lg) MT_lock_unset(&(lg)->rotation_lock)
1144 : #define rotation_trylock(lg, ms) MT_lock_trytime(&(lg)->rotation_lock, ms)
1145 :
1146 : static gdk_return
1147 12547 : log_open_output(logger *lg)
1148 : {
1149 12547 : logged_range *new_range = (logged_range *) GDKmalloc(sizeof(logged_range));
1150 :
1151 12547 : if (!new_range) {
1152 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1153 0 : return GDK_FAIL;
1154 : }
1155 25093 : if (!LOG_DISABLED(lg)) {
1156 12546 : char id[32];
1157 12546 : char filename[MAXPATH];
1158 :
1159 12546 : if (snprintf(id, sizeof(id), LLFMT, lg->id) >= (int) sizeof(id)) {
1160 0 : TRC_CRITICAL(GDK, "filename is too large\n");
1161 0 : GDKfree(new_range);
1162 0 : return GDK_FAIL;
1163 : }
1164 12546 : if (GDKfilepath(filename, sizeof(filename), BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id) != GDK_SUCCEED) {
1165 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1166 0 : GDKfree(new_range);
1167 0 : return GDK_FAIL;
1168 : }
1169 :
1170 12546 : TRC_INFO(WAL, "opening %s.%s", LOGFILE, id);
1171 12546 : new_range->output_log = open_wstream(filename);
1172 12546 : if (new_range->output_log) {
1173 12546 : short byteorder = 1234;
1174 12546 : mnstr_write(new_range->output_log, &byteorder, sizeof(byteorder), 1);
1175 : }
1176 :
1177 12546 : if (new_range->output_log == NULL || mnstr_errnr(new_range->output_log) != MNSTR_NO__ERROR) {
1178 0 : TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, mnstr_peek_error(NULL));
1179 0 : close_stream(new_range->output_log);
1180 0 : GDKfree(new_range);
1181 0 : return GDK_FAIL;
1182 : }
1183 : } else {
1184 1 : new_range->output_log = NULL;
1185 : }
1186 12547 : ATOMIC_INIT(&new_range->refcount, 1);
1187 12547 : ATOMIC_INIT(&new_range->last_ts, 0);
1188 12547 : ATOMIC_INIT(&new_range->flushed_ts, 0);
1189 12547 : ATOMIC_INIT(&new_range->drops, 0);
1190 12547 : new_range->id = lg->id;
1191 12547 : new_range->next = NULL;
1192 12547 : logged_range *current = lg->current;
1193 12547 : assert(current && current->next == NULL);
1194 12547 : new_range->cnt = current->cnt;
1195 12547 : current->next = new_range;
1196 12547 : lg->file_age = GDKusec();
1197 12547 : return GDK_SUCCEED;
1198 : }
1199 :
1200 : static inline void
1201 12774 : log_close_input(logger *lg)
1202 : {
1203 12774 : if (!lg->inmemory && lg->input_log) {
1204 12298 : TRC_INFO(WAL, "closing input log %s", mnstr_name(lg->input_log));
1205 12298 : close_stream(lg->input_log);
1206 : }
1207 12774 : lg->input_log = NULL;
1208 12774 : }
1209 :
1210 : static inline void
1211 351 : log_close_output(logger *lg)
1212 : {
1213 351 : if (!LOG_DISABLED(lg) && lg->current->output_log) {
1214 350 : TRC_INFO(WAL, "closing output log %s", mnstr_name(lg->current->output_log));
1215 350 : close_stream(lg->current->output_log);
1216 : }
1217 351 : lg->current->output_log = NULL;
1218 351 : }
1219 :
1220 : static gdk_return
1221 12423 : log_open_input(logger *lg, const char *filename, bool *filemissing)
1222 : {
1223 12423 : TRC_INFO(WAL, "opening input log %s", filename);
1224 12423 : lg->input_log = open_rstream(filename);
1225 :
1226 : /* if the file doesn't exist, there is nothing to be read back */
1227 12423 : if (lg->input_log == NULL || mnstr_errnr(lg->input_log) != MNSTR_NO__ERROR) {
1228 125 : log_close_input(lg);
1229 125 : *filemissing = true;
1230 125 : return GDK_SUCCEED;
1231 : }
1232 12298 : short byteorder;
1233 12298 : switch (mnstr_read(lg->input_log, &byteorder, sizeof(byteorder), 1)) {
1234 0 : case -1:
1235 0 : log_close_input(lg);
1236 0 : TRC_CRITICAL(GDK, "read failed\n");
1237 0 : return GDK_FAIL;
1238 5 : case 0:
1239 : /* empty file is ok */
1240 5 : log_close_input(lg);
1241 5 : return GDK_SUCCEED;
1242 12293 : case 1:
1243 : /* if not empty, must start with correct byte order mark */
1244 12293 : if (byteorder != 1234) {
1245 0 : TRC_CRITICAL(GDK, "incorrect byte order word in file %s\n", filename);
1246 0 : log_close_input(lg);
1247 0 : return GDK_FAIL;
1248 : }
1249 : break;
1250 : }
1251 : return GDK_SUCCEED;
1252 : }
1253 :
1254 : static log_return
1255 12293 : log_read_transaction(logger *lg, uint32_t *updated, BUN maxupdated)
1256 : {
1257 12293 : logformat l;
1258 12293 : trans *tr = NULL;
1259 12293 : log_return err = LOG_OK;
1260 12293 : bool ok = true;
1261 12293 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
1262 :
1263 12293 : (void) maxupdated; /* only used inside assert() */
1264 :
1265 12293 : if (!lg->flushing)
1266 214 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
1267 :
1268 12293 : BAT *cands = NULL; /* used in case of LOG_BAT_GROUP */
1269 :
1270 762754 : while (err == LOG_OK && (ok = log_read_format(lg, &l))) {
1271 750461 : if (l.flag == 0 && l.id == 0) {
1272 12293 : err = LOG_EOF;
1273 : break;
1274 : }
1275 :
1276 750461 : TRC_DEBUG_IF(WAL) {
1277 0 : if (l.flag > 0 && l.flag != LOG_CLEAR &&
1278 : l.flag < (bte) (sizeof(log_commands) / sizeof(log_commands[0])))
1279 0 : TRC_DEBUG_ENDIF(WAL, "%s %d", log_commands[(int) l.flag], l.id);
1280 : else
1281 0 : TRC_DEBUG_ENDIF(WAL, "%d %d", l.flag, l.id);
1282 : }
1283 750461 : switch (l.flag) {
1284 559544 : case LOG_UPDATE_CONST:
1285 : case LOG_UPDATE_BULK:
1286 : case LOG_UPDATE:
1287 : case LOG_CREATE:
1288 : case LOG_DESTROY:
1289 559544 : if (tr != NULL && updated && BAThash(lg->catalog_id) == GDK_SUCCEED) {
1290 557659 : BATiter cni = bat_iterator(lg->catalog_id);
1291 557659 : BUN p;
1292 557659 : BUN posnew = BUN_NONE;
1293 557659 : BUN posold = BUN_NONE;
1294 557659 : MT_rwlock_rdlock(&cni.b->thashlock);
1295 8316413 : HASHloop_int(cni, cni.b->thash, p, &l.id) {
1296 7183912 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
1297 7183912 : if (lid == lng_nil || lid > tr->tid)
1298 : posnew = p;
1299 3441399 : else if (lid == tr->tid)
1300 7606010 : posold = p;
1301 : }
1302 557659 : MT_rwlock_rdunlock(&cni.b->thashlock);
1303 557659 : bat_iterator_end(&cni);
1304 : /* Normally at this point, posnew is the
1305 : * location of the bat that this
1306 : * transaction is working on, and posold
1307 : * is the location of the previous
1308 : * version of the bat. If LOG_CREATE,
1309 : * both are relevant, since the latter
1310 : * is the new bat, and the former is the
1311 : * to-be-destroyed bat. For
1312 : * LOG_DESTROY, only posnew should be
1313 : * relevant, but for the other types, if
1314 : * the table is destroyed later in the
1315 : * same transaction, we need posold, and
1316 : * else (the normal case) we need
1317 : * posnew. */
1318 557659 : if (posnew != BUN_NONE) {
1319 547779 : assert(posnew < maxupdated);
1320 547779 : updated[posnew / 32] |= 1U << (posnew % 32);
1321 : }
1322 557659 : if ((l.flag == LOG_CREATE || posnew == BUN_NONE) && posold != BUN_NONE) {
1323 163554 : assert(posold < maxupdated);
1324 163554 : updated[posold / 32] |= 1U << (posold % 32);
1325 : }
1326 : }
1327 : break;
1328 : default:
1329 : /* do nothing */
1330 : break;
1331 : }
1332 : /* the functions we call here can succeed (LOG_OK),
1333 : * but they can also fail for two different reasons:
1334 : * they can run out of input (LOG_EOF -- this is not
1335 : * serious, we just abort the remaining transactions),
1336 : * or some malloc or BAT update fails (LOG_ERR -- this
1337 : * is serious, we must abort the complete process);
1338 : * the latter failure causes the current function to
1339 : * return GDK_FAIL */
1340 750461 : switch (l.flag) {
1341 56487 : case LOG_START:
1342 56487 : assert(!lg->flushing || l.id <= lg->tid);
1343 56487 : if (!lg->flushing && l.id > lg->tid)
1344 127 : lg->tid = l.id; /* should only happen during initialization */
1345 56487 : if ((tr = tr_create(tr, l.id)) == NULL) {
1346 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
1347 0 : err = LOG_ERR;
1348 0 : break;
1349 : }
1350 56487 : TRC_DEBUG(WAL, "tstart %d\n", tr->tid);
1351 : break;
1352 56487 : case LOG_END:
1353 56487 : if (tr == NULL)
1354 : err = LOG_EOF;
1355 56487 : else if (tr->tid != l.id) /* abort record */
1356 0 : tr = tr_abort(lg, tr);
1357 : else
1358 56487 : tr = tr_commit(lg, tr);
1359 : break;
1360 2771 : case LOG_SEQ:
1361 2771 : err = log_read_seq(lg, &l);
1362 2771 : break;
1363 392395 : case LOG_UPDATE_CONST:
1364 : case LOG_UPDATE_BULK:
1365 : case LOG_UPDATE:
1366 392395 : if (tr == NULL)
1367 : err = LOG_EOF;
1368 : else {
1369 617880 : err = log_read_updates(lg, tr, &l, l.id, cands ? &cands : NULL);
1370 : }
1371 : break;
1372 157221 : case LOG_CREATE:
1373 157221 : if (tr == NULL)
1374 : err = LOG_EOF;
1375 : else
1376 157221 : err = log_read_create(lg, tr, l.id);
1377 : break;
1378 9928 : case LOG_DESTROY:
1379 9928 : if (tr == NULL)
1380 : err = LOG_EOF;
1381 : else
1382 9928 : err = log_read_destroy(lg, tr, l.id);
1383 : break;
1384 75172 : case LOG_BAT_GROUP:
1385 75172 : if (tr == NULL)
1386 : err = LOG_EOF;
1387 : else {
1388 75172 : if (l.id > 0) {
1389 : /* START OF LOG_BAT_GROUP */
1390 37586 : cands = COLnew(0, TYPE_void, 0, SYSTRANS);
1391 37586 : if (!cands) {
1392 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
1393 0 : err = LOG_ERR;
1394 : }
1395 37586 : } else if (cands == NULL) {
1396 : /* should have gone through the
1397 : * above option earlier */
1398 0 : TRC_CRITICAL(GDK, "unexpected error\n");
1399 0 : err = LOG_ERR;
1400 : } else {
1401 : /* END OF LOG_BAT_GROUP */
1402 37586 : BBPunfix(cands->batCacheid);
1403 37586 : cands = NULL;
1404 : }
1405 : }
1406 : break;
1407 0 : default:
1408 0 : TRC_CRITICAL(GDK, "unrecognized log entry %d", l.flag);
1409 0 : err = LOG_ERR;
1410 : }
1411 750461 : if (tr == (trans *) -1) {
1412 : /* message already generated by tr_commit */
1413 : err = LOG_ERR;
1414 12293 : tr = NULL;
1415 : break;
1416 : }
1417 : }
1418 12293 : while (tr) {
1419 0 : TRC_WARNING(GDK, "aborting transaction\n");
1420 0 : tr = tr_abort(lg, tr);
1421 : }
1422 12293 : if (!lg->flushing)
1423 214 : ATOMIC_SET(&GDKdebug, dbg);
1424 :
1425 12293 : BBPreclaim(cands);
1426 12293 : if (!ok)
1427 12293 : return LOG_EOF;
1428 : return err;
1429 : }
1430 :
1431 : static gdk_return
1432 344 : log_readlog(logger *lg, const char *filename, bool *filemissing)
1433 : {
1434 344 : log_return err = LOG_OK;
1435 344 : time_t t0, t1;
1436 344 : struct stat sb;
1437 :
1438 344 : assert(!lg->inmemory);
1439 :
1440 344 : TRC_INFO(WAL, "opening %s\n", filename);
1441 :
1442 344 : gdk_return res = log_open_input(lg, filename, filemissing);
1443 344 : if (!lg->input_log || res != GDK_SUCCEED)
1444 : return res;
1445 214 : int fd;
1446 214 : if ((fd = getFileNo(lg->input_log)) < 0 || fstat(fd, &sb) < 0) {
1447 0 : GDKsyserror("fstat on opened file %s failed\n", filename);
1448 0 : log_close_input(lg);
1449 : /* If the file could be opened, but fstat fails,
1450 : * something weird is going on */
1451 0 : return GDK_FAIL;
1452 : }
1453 214 : t0 = time(NULL);
1454 214 : TRC_INFO_IF(WAL) {
1455 0 : TRC_INFO_ENDIF(WAL, "Start reading the write-ahead log '%s'\n", filename);
1456 0 : GDKtracer_flush_buffer();
1457 : }
1458 428 : while (err != LOG_EOF && err != LOG_ERR) {
1459 214 : t1 = time(NULL);
1460 214 : if (t1 - t0 > 10) {
1461 0 : lng fpos;
1462 0 : t0 = t1;
1463 : /* not more than once every 10 seconds */
1464 0 : fpos = (lng) getfilepos(getFile(lg->input_log));
1465 0 : TRC_INFO_IF(WAL) {
1466 0 : if (fpos >= 0) {
1467 0 : TRC_INFO_ENDIF(WAL, "still reading write-ahead log \"%s\" (%d%% done)\n",
1468 : filename, (int) ((fpos * 100 + 50) / sb.st_size));
1469 0 : GDKtracer_flush_buffer();
1470 : }
1471 : }
1472 : }
1473 214 : err = log_read_transaction(lg, NULL, 0);
1474 : }
1475 214 : log_close_input(lg);
1476 214 : lg->input_log = NULL;
1477 :
1478 : /* remaining transactions are not committed, ie abort */
1479 214 : TRC_INFO_IF(WAL) {
1480 0 : TRC_INFO_ENDIF(WAL, "Finished reading the write-ahead log '%s'\n", filename);
1481 0 : GDKtracer_flush_buffer();
1482 : }
1483 : /* we cannot distinguish errors from incomplete transactions
1484 : * (even if we would log aborts in the logs). So we simply
1485 : * abort and move to the next log file */
1486 214 : return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
1487 : }
1488 :
1489 : /*
1490 : * The log files are incrementally numbered, starting from 2. They are
1491 : * processed in the same sequence.
1492 : */
1493 : static gdk_return
1494 125 : log_readlogs(logger *lg, const char *filename)
1495 : {
1496 125 : gdk_return res = GDK_SUCCEED;
1497 :
1498 125 : assert(!lg->inmemory);
1499 125 : TRC_DEBUG(WAL, "logger id is " LLFMT " last logger id is " LLFMT "\n", lg->id, lg->saved_id);
1500 :
1501 125 : char log_filename[FILENAME_MAX];
1502 125 : if (lg->saved_id >= lg->id) {
1503 125 : bool filemissing = false;
1504 :
1505 125 : lg->id = lg->saved_id + 1;
1506 469 : while (res == GDK_SUCCEED && !filemissing) {
1507 344 : if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
1508 0 : GDKerror("Logger filename path is too large\n");
1509 0 : return GDK_FAIL;
1510 : }
1511 344 : res = log_readlog(lg, log_filename, &filemissing);
1512 344 : if (!filemissing) {
1513 219 : lg->saved_id++;
1514 219 : lg->id++;
1515 : }
1516 : }
1517 : }
1518 : return res;
1519 : }
1520 :
1521 : static gdk_return
1522 11639 : log_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1523 : {
1524 11639 : TRC_DEBUG(WAL, "commit");
1525 :
1526 11639 : return bm_commit(lg, pending, updated, maxupdated);
1527 : }
1528 :
1529 : static gdk_return
1530 125 : check_version(logger *lg, FILE *fp, bool *needsnew)
1531 : {
1532 125 : int version = 0;
1533 :
1534 125 : assert(!lg->inmemory);
1535 125 : if (fscanf(fp, "%6d", &version) != 1) {
1536 0 : GDKerror("Could not read the version number from the file '%s/log'.\n", lg->dir);
1537 0 : fclose(fp);
1538 0 : return GDK_FAIL;
1539 : }
1540 125 : if (version != lg->version) {
1541 32 : if (lg->prefuncp == NULL ||
1542 16 : (*lg->prefuncp) (lg->funcdata, version, lg->version) != GDK_SUCCEED) {
1543 0 : GDKerror("Incompatible database version %06d, "
1544 : "this server supports version %06d.\n%s",
1545 : version, lg->version,
1546 : version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
1547 0 : fclose(fp);
1548 0 : return GDK_FAIL;
1549 : }
1550 16 : *needsnew = true; /* we need to write a new log file */
1551 : } else {
1552 109 : lg->postfuncp = NULL; /* don't call */
1553 109 : *needsnew = false; /* log file already up-to-date */
1554 : }
1555 250 : if (fgetc(fp) != '\n' || /* skip \n */
1556 125 : fgetc(fp) != '\n') { /* skip \n */
1557 0 : GDKerror("Badly formatted log file");
1558 0 : fclose(fp);
1559 0 : return GDK_FAIL;
1560 : }
1561 125 : if (log_read_types_file(lg, fp, version, needsnew) != GDK_SUCCEED) {
1562 0 : fclose(fp);
1563 0 : return GDK_FAIL;
1564 : }
1565 125 : fclose(fp);
1566 125 : return GDK_SUCCEED;
1567 : }
1568 :
1569 : static BAT *
1570 352 : bm_tids(BAT *b, BAT *d)
1571 : {
1572 352 : BUN sz = BATcount(b);
1573 352 : BAT *tids = BATdense(0, 0, sz);
1574 :
1575 352 : if (tids == NULL)
1576 : return NULL;
1577 :
1578 352 : if (BATcount(d)) {
1579 352 : BAT *diff = BATdiff(tids, d, NULL, NULL, false, false, BUN_NONE);
1580 352 : logbat_destroy(tids);
1581 352 : tids = diff;
1582 : }
1583 : return tids;
1584 : }
1585 :
1586 :
1587 : static gdk_return
1588 7253 : log_switch_bat(BAT *old, BAT *new, const char *fn, const char *name)
1589 : {
1590 7253 : char bak[IDLENGTH];
1591 :
1592 7253 : if (BATmode(old, true) != GDK_SUCCEED) {
1593 0 : GDKerror("cannot convert old %s to transient", name);
1594 0 : return GDK_FAIL;
1595 : }
1596 7253 : if (strconcat_len(bak, sizeof(bak), fn, "_", name, NULL) >= sizeof(bak)) {
1597 0 : GDKerror("name %s_%s too long\n", fn, name);
1598 0 : return GDK_FAIL;
1599 : }
1600 7253 : if (BBPrename(old, NULL) != 0 || BBPrename(new, bak) != 0) {
1601 0 : GDKerror("rename (%s) failed\n", bak);
1602 0 : return GDK_FAIL;
1603 : }
1604 7253 : BBPretain(new->batCacheid);
1605 7253 : return GDK_SUCCEED;
1606 : }
1607 :
1608 : static gdk_return
1609 352 : bm_get_counts(logger *lg)
1610 : {
1611 352 : BUN p, q;
1612 352 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
1613 :
1614 32666 : BATloop(lg->catalog_bid, p, q) {
1615 32314 : oid pos = p;
1616 32314 : lng cnt = 0;
1617 32314 : lng lid = lng_nil;
1618 :
1619 32314 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
1620 32314 : BAT *b = BBPquickdesc(bids[p]);
1621 32314 : assert(b);
1622 32314 : cnt = BATcount(b);
1623 : } else {
1624 0 : lid = BBP_desc(bids[p])->batCacheid != 0 && log_find(lg->catalog_bid, lg->dcatalog, bids[p]) == BUN_NONE ? 1 : -1;
1625 : }
1626 32314 : if (BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED)
1627 0 : return GDK_FAIL;
1628 32314 : if (BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
1629 : return GDK_FAIL;
1630 : }
1631 : return GDK_SUCCEED;
1632 : }
1633 :
1634 : static int
1635 7253 : subcommit_list_add(int next, bat *n, BUN *sizes, bat bid, BUN sz)
1636 : {
1637 7253 : assert(sz <= BBP_desc(bid)->batCount || sz == BUN_NONE);
1638 1678327 : for (int i = 0; i < next; i++) {
1639 1671074 : if (n[i] == bid) {
1640 0 : sizes[i] = sz;
1641 0 : return next;
1642 : }
1643 : }
1644 7253 : n[next] = bid;
1645 7253 : sizes[next++] = sz;
1646 7253 : return next;
1647 : }
1648 :
1649 : static int
1650 2183 : cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng *cnts,
1651 : BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, BUN cleanup)
1652 : {
1653 2183 : BAT *nbids, *noids, *ncnts, *nlids, *ndels;
1654 2183 : BUN p, q;
1655 2183 : int err = 0, rcnt = 0;
1656 :
1657 2183 : BUN ocnt = BATcount(catalog_bid);
1658 2183 : nbids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1659 2183 : noids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1660 2183 : ncnts = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1661 2183 : nlids = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1662 2183 : ndels = logbat_new(TYPE_oid, BATcount(dcatalog) - cleanup, PERSISTENT);
1663 :
1664 2183 : if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == NULL || ndels == NULL) {
1665 0 : logbat_destroy(nbids);
1666 0 : logbat_destroy(noids);
1667 0 : logbat_destroy(ncnts);
1668 0 : logbat_destroy(nlids);
1669 0 : logbat_destroy(ndels);
1670 0 : return 0;
1671 : }
1672 :
1673 2183 : oid *poss = Tloc(dcatalog, 0);
1674 179482 : BATloop(dcatalog, p, q) {
1675 177299 : oid pos = poss[p];
1676 :
1677 177299 : if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid)
1678 0 : continue;
1679 :
1680 177299 : if (lids[pos] >= 0) {
1681 177299 : bat bid = bids[pos];
1682 177299 : BAT *lb = BBP_desc(bid);
1683 :
1684 177299 : if (lb->batCacheid == 0 || BATmode(lb, true /*transient */ ) != GDK_SUCCEED) {
1685 0 : GDKwarning("Failed to set bat(%d) transient\n", bid);
1686 : } else {
1687 177299 : lids[pos] = -1; /* mark as transient */
1688 177299 : r[rcnt++] = bid;
1689 : }
1690 : }
1691 : }
1692 :
1693 2183 : int *oids = (int *) Tloc(catalog_id, 0);
1694 2183 : q = BATcount(catalog_bid);
1695 771882 : for (p = 0; p < q && !err; p++) {
1696 769699 : bat col = bids[p];
1697 769699 : int nid = oids[p];
1698 769699 : lng lid = lids[p];
1699 769699 : lng cnt = cnts[p];
1700 769699 : oid pos = p;
1701 :
1702 : /* only project out the deleted with lid == -1
1703 : * update dcatalog */
1704 769699 : if (lid == -1)
1705 177299 : continue; /* remove */
1706 :
1707 1184800 : if (BUNappend(nbids, &col, true) != GDK_SUCCEED ||
1708 1184800 : BUNappend(noids, &nid, true) != GDK_SUCCEED ||
1709 1184800 : BUNappend(nlids, &lid, false) != GDK_SUCCEED ||
1710 592400 : BUNappend(ncnts, &cnt, false) != GDK_SUCCEED)
1711 : err = 1;
1712 592400 : if (BUNfnd(lg->dcatalog, &pos) != BUN_NONE) {
1713 0 : pos = (oid) (BATcount(nbids) - 1);
1714 0 : if (BUNappend(ndels, &pos, true) != GDK_SUCCEED)
1715 592400 : err = 1;
1716 : }
1717 : }
1718 :
1719 2183 : if (err) {
1720 0 : logbat_destroy(nbids);
1721 0 : logbat_destroy(noids);
1722 0 : logbat_destroy(ndels);
1723 0 : logbat_destroy(ncnts);
1724 0 : logbat_destroy(nlids);
1725 0 : return 0;
1726 : }
1727 : /* point of no return */
1728 4366 : if (log_switch_bat(catalog_bid, nbids, lg->fn, "catalog_bid") != GDK_SUCCEED ||
1729 4366 : log_switch_bat(catalog_id, noids, lg->fn, "catalog_id") != GDK_SUCCEED ||
1730 2183 : log_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != GDK_SUCCEED) {
1731 0 : logbat_destroy(nbids);
1732 0 : logbat_destroy(noids);
1733 0 : logbat_destroy(ndels);
1734 0 : logbat_destroy(ncnts);
1735 0 : logbat_destroy(nlids);
1736 0 : return -1;
1737 : }
1738 2183 : r[rcnt++] = lg->catalog_bid->batCacheid;
1739 2183 : r[rcnt++] = lg->catalog_id->batCacheid;
1740 2183 : r[rcnt++] = lg->dcatalog->batCacheid;
1741 :
1742 2183 : assert(BATcount(lg->dcatalog) - cleanup == BATcount(ndels));
1743 :
1744 2183 : logbat_destroy(lg->catalog_bid);
1745 2183 : logbat_destroy(lg->catalog_id);
1746 2183 : logbat_destroy(lg->dcatalog);
1747 :
1748 2183 : lg->catalog_bid = nbids;
1749 2183 : lg->catalog_id = noids;
1750 2183 : lg->dcatalog = ndels;
1751 :
1752 : /* failing to rename these two bats is not fatal */
1753 2183 : if (BBPrename(lg->catalog_cnt, NULL) != GDK_SUCCEED)
1754 2183 : GDKclrerr();
1755 2183 : if (BBPrename(lg->catalog_lid, NULL) != GDK_SUCCEED)
1756 2183 : GDKclrerr();
1757 2183 : BBPunfix(lg->catalog_cnt->batCacheid);
1758 2183 : BBPunfix(lg->catalog_lid->batCacheid);
1759 :
1760 2183 : lg->catalog_cnt = ncnts;
1761 2183 : lg->catalog_lid = nlids;
1762 2183 : char bak[FILENAME_MAX];
1763 2183 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_cnt", NULL);
1764 2183 : if (BBPrename(lg->catalog_cnt, bak) < 0)
1765 0 : GDKclrerr();
1766 2183 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_lid", NULL);
1767 2183 : if (BBPrename(lg->catalog_lid, bak) < 0)
1768 0 : GDKclrerr();
1769 2183 : rotation_lock(lg);
1770 9788 : for (logged_range *p = lg->pending; p; p = p->next) {
1771 7605 : p->cnt -= cleanup;
1772 : }
1773 2183 : rotation_unlock(lg);
1774 2183 : return rcnt;
1775 : }
1776 :
1777 : /* this function is called with log_lock() held; it releases the lock
1778 : * before returning */
1779 : static gdk_return
1780 12093 : bm_subcommit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1781 : {
1782 12093 : BUN cnt = pending ? pending->cnt : BATcount(lg->catalog_bid);
1783 12093 : BUN dcnt = BATcount(lg->dcatalog);
1784 12093 : BUN p, q;
1785 12093 : BAT *catalog_bid = lg->catalog_bid;
1786 12093 : BAT *catalog_id = lg->catalog_id;
1787 12093 : BAT *dcatalog = lg->dcatalog;
1788 12093 : BUN nn = 13 + cnt;
1789 12093 : bat *n = GDKmalloc(sizeof(bat) * nn);
1790 12093 : bat *r = GDKmalloc(sizeof(bat) * nn);
1791 12093 : BUN *sizes = GDKmalloc(sizeof(BUN) * nn);
1792 12093 : int i = 0, rcnt = 0;
1793 12093 : gdk_return res;
1794 12093 : const log_bid *bids;
1795 12093 : lng *cnts = NULL, *lids = NULL;
1796 12093 : BUN cleanup = 0;
1797 12093 : lng t0 = 0;
1798 :
1799 12093 : if (n == NULL || r == NULL || sizes == NULL) {
1800 0 : GDKfree(n);
1801 0 : GDKfree(r);
1802 0 : GDKfree(sizes);
1803 0 : log_unlock(lg);
1804 0 : return GDK_FAIL;
1805 : }
1806 :
1807 12093 : sizes[i] = 0;
1808 12093 : n[i++] = 0; /* n[0] is not used */
1809 12093 : bids = (const log_bid *) Tloc(catalog_bid, 0);
1810 12093 : if (lg->catalog_cnt)
1811 11866 : cnts = (lng *) Tloc(lg->catalog_cnt, 0);
1812 12093 : if (lg->catalog_lid)
1813 11866 : lids = (lng *) Tloc(lg->catalog_lid, 0);
1814 3533182 : BATloop(catalog_bid, p, q) {
1815 3521089 : if (lids && lids[p] != lng_nil && lids[p] <= lg->saved_tid) {
1816 177299 : cleanup++;
1817 177299 : if (lids[p] == -1)
1818 0 : continue;
1819 354566 : if (BUNfnd(dcatalog, &(oid){p}) == BUN_NONE &&
1820 177267 : BUNappend(dcatalog, &(oid){p}, true) != GDK_SUCCEED) {
1821 0 : while (BATcount(dcatalog) > dcnt) {
1822 0 : if (BUNdelete(dcatalog, BATcount(dcatalog) - 1) != GDK_SUCCEED) {
1823 0 : TRC_CRITICAL(WAL, "delete after failed append failed\n");
1824 0 : break;
1825 : }
1826 : }
1827 0 : GDKfree(n);
1828 0 : GDKfree(r);
1829 0 : GDKfree(sizes);
1830 0 : log_unlock(lg);
1831 0 : return GDK_FAIL;
1832 : }
1833 : }
1834 3521089 : if (updated && p < maxupdated && (updated[p / 32] & (1U << (p % 32))) == 0) {
1835 1685881 : continue;
1836 : }
1837 1835208 : bat col = bids[p];
1838 :
1839 1835208 : TRC_DEBUG(WAL, "new %s (%d)\n", BBP_logical(col), col);
1840 1835208 : assert(col);
1841 1835208 : sizes[i] = cnts ? (BUN) cnts[p] : 0;
1842 1835208 : n[i++] = col;
1843 : }
1844 : /* now commit catalog, so it's also up to date on disk */
1845 12093 : sizes[i] = cnt;
1846 12093 : n[i++] = catalog_bid->batCacheid;
1847 12093 : sizes[i] = cnt;
1848 12093 : n[i++] = catalog_id->batCacheid;
1849 12093 : sizes[i] = BATcount(dcatalog);
1850 12093 : n[i++] = dcatalog->batCacheid;
1851 :
1852 12093 : if (cleanup) {
1853 2183 : if ((rcnt = cleanup_and_swap(lg, r, bids, lids, cnts,
1854 : catalog_bid, catalog_id, dcatalog,
1855 : cleanup)) < 0) {
1856 0 : GDKfree(n);
1857 0 : GDKfree(r);
1858 0 : GDKfree(sizes);
1859 0 : log_unlock(lg);
1860 0 : return GDK_FAIL;
1861 : }
1862 2183 : cnt -= cleanup;
1863 : }
1864 12093 : if (dcatalog != lg->dcatalog) {
1865 2183 : i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, cnt);
1866 2183 : i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, cnt);
1867 2183 : i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog));
1868 : }
1869 12093 : if (lg->seqs_id) {
1870 11866 : sizes[i] = BATcount(lg->seqs_id);
1871 11866 : n[i++] = lg->seqs_id->batCacheid;
1872 11866 : sizes[i] = BATcount(lg->seqs_id);
1873 11866 : n[i++] = lg->seqs_val->batCacheid;
1874 : }
1875 12093 : if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id) / 2) && BATcount(lg->dseqs) > 10) {
1876 352 : BAT *tids, *ids, *vals;
1877 :
1878 352 : tids = bm_tids(lg->seqs_id, lg->dseqs);
1879 352 : if (tids == NULL) {
1880 0 : GDKfree(n);
1881 0 : GDKfree(r);
1882 0 : GDKfree(sizes);
1883 0 : log_unlock(lg);
1884 0 : return GDK_FAIL;
1885 : }
1886 352 : ids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
1887 352 : vals = logbat_new(TYPE_lng, BATcount(tids), PERSISTENT);
1888 :
1889 352 : if (ids == NULL || vals == NULL) {
1890 0 : logbat_destroy(tids);
1891 0 : logbat_destroy(ids);
1892 0 : logbat_destroy(vals);
1893 0 : GDKfree(n);
1894 0 : GDKfree(r);
1895 0 : GDKfree(sizes);
1896 0 : log_unlock(lg);
1897 0 : return GDK_FAIL;
1898 : }
1899 :
1900 704 : if (BATappend(ids, lg->seqs_id, tids, true) != GDK_SUCCEED ||
1901 352 : BATappend(vals, lg->seqs_val, tids, true) != GDK_SUCCEED) {
1902 0 : logbat_destroy(tids);
1903 0 : logbat_destroy(ids);
1904 0 : logbat_destroy(vals);
1905 0 : GDKfree(n);
1906 0 : GDKfree(r);
1907 0 : GDKfree(sizes);
1908 0 : log_unlock(lg);
1909 0 : return GDK_FAIL;
1910 : }
1911 352 : logbat_destroy(tids);
1912 352 : BATclear(lg->dseqs, true);
1913 :
1914 704 : if (log_switch_bat(lg->seqs_id, ids, lg->fn, "seqs_id") != GDK_SUCCEED ||
1915 352 : log_switch_bat(lg->seqs_val, vals, lg->fn, "seqs_val") != GDK_SUCCEED) {
1916 0 : logbat_destroy(ids);
1917 0 : logbat_destroy(vals);
1918 0 : GDKfree(n);
1919 0 : GDKfree(r);
1920 0 : GDKfree(sizes);
1921 0 : log_unlock(lg);
1922 0 : return GDK_FAIL;
1923 : }
1924 352 : i = subcommit_list_add(i, n, sizes, ids->batCacheid, BATcount(ids));
1925 352 : i = subcommit_list_add(i, n, sizes, vals->batCacheid, BATcount(ids));
1926 :
1927 352 : if (BBP_lrefs(lg->seqs_id->batCacheid) > 0)
1928 278 : r[rcnt++] = lg->seqs_id->batCacheid;
1929 352 : if (BBP_lrefs(lg->seqs_val->batCacheid) > 0)
1930 278 : r[rcnt++] = lg->seqs_val->batCacheid;
1931 :
1932 352 : logbat_destroy(lg->seqs_id);
1933 352 : logbat_destroy(lg->seqs_val);
1934 :
1935 352 : lg->seqs_id = ids;
1936 352 : lg->seqs_val = vals;
1937 : }
1938 12093 : if (lg->seqs_id) {
1939 11866 : sizes[i] = BATcount(lg->dseqs);
1940 11866 : n[i++] = lg->dseqs->batCacheid;
1941 : }
1942 :
1943 12093 : assert((BUN) i <= nn);
1944 12093 : log_unlock(lg);
1945 12093 : TRC_DEBUG_IF(WAL)
1946 0 : t0 = GDKusec();
1947 12320 : res = TMsubcommit_list(n, cnts ? sizes : NULL, i, lg->saved_id);
1948 12093 : TRC_DEBUG(WAL, "subcommit " LLFMT "usec\n", GDKusec() - t0);
1949 12093 : if (res == GDK_SUCCEED) { /* now cleanup */
1950 196497 : for (i = 0; i < rcnt; i++) {
1951 184404 : TRC_DEBUG_IF(WAL) {
1952 0 : TRC_DEBUG_ENDIF(WAL, "release %d\n", r[i]);
1953 0 : if (BBP_lrefs(r[i]) != 2)
1954 0 : TRC_DEBUG_ENDIF(WAL, "release %d %d\n", r[i], BBP_lrefs(r[i]));
1955 : }
1956 184404 : BBPrelease(r[i]);
1957 : }
1958 : }
1959 12093 : GDKfree(n);
1960 12093 : GDKfree(r);
1961 12093 : GDKfree(sizes);
1962 12093 : if (res != GDK_SUCCEED)
1963 0 : TRC_CRITICAL(GDK, "commit failed\n");
1964 : return res;
1965 : }
1966 :
1967 : static gdk_return
1968 351 : log_filename(logger *lg, char bak[FILENAME_MAX], char filename[FILENAME_MAX])
1969 : {
1970 351 : if (GDKfilepath(filename, FILENAME_MAX, 0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED) {
1971 0 : GDKerror("Logger filename path is too large\n");
1972 0 : return GDK_FAIL;
1973 : }
1974 351 : if (bak) {
1975 351 : if (strconcat_len(bak, FILENAME_MAX, filename, ".bak", NULL) >= FILENAME_MAX) {
1976 0 : GDKerror("Logger filename path is too large\n");
1977 0 : return GDK_FAIL;
1978 : }
1979 : }
1980 : return GDK_SUCCEED;
1981 : }
1982 :
1983 : static gdk_return
1984 12298 : log_cleanup(logger *lg, lng id)
1985 : {
1986 12298 : char log_id[FILENAME_MAX];
1987 :
1988 12298 : if (snprintf(log_id, sizeof(log_id), LLFMT, id) >= FILENAME_MAX) {
1989 0 : GDKerror("log_id filename is too large\n");
1990 0 : return GDK_FAIL;
1991 : }
1992 12298 : if (GDKunlink(0, lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
1993 0 : GDKwarning("failed to remove old WAL %s.%s\n", LOGFILE, log_id);
1994 0 : GDKclrerr(); /* clear error from unlink */
1995 : }
1996 : return GDK_SUCCEED;
1997 : }
1998 :
1999 : #ifdef GDKLIBRARY_JSON
2000 : static gdk_return
2001 352 : log_json_upgrade_finalize(void)
2002 : {
2003 352 : int json_tpe = ATOMindex("json");
2004 703 : if (!GDKinmemory(0) &&
2005 351 : GDKunlink(0, BATDIR, "jsonupgradeneeded", NULL) == GDK_FAIL) {
2006 0 : TRC_CRITICAL(GDK, "Failed to remove json upgrade signal file");
2007 0 : return GDK_FAIL;
2008 : }
2009 352 : BATatoms[json_tpe].atomRead = (void *(*)(void *, size_t *, stream *, size_t))strRead;
2010 :
2011 352 : return GDK_SUCCEED;
2012 : }
2013 : #endif
2014 :
2015 : /* Load data from the logger logdir
2016 : * Initialize new directories and catalog files if none are present,
2017 : * unless running in read-only mode
2018 : * Load data and persist it in the BATs */
2019 : static gdk_return
2020 352 : log_load(const char *fn, logger *lg, char filename[FILENAME_MAX])
2021 : {
2022 352 : FILE *fp = NULL;
2023 352 : char bak[FILENAME_MAX];
2024 352 : bat catalog_bid, catalog_id, dcatalog;
2025 352 : bool needcommit = false;
2026 352 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
2027 352 : bool readlogs = false;
2028 352 : bool needsnew = false; /* need to write new log file? */
2029 :
2030 : /* refactor */
2031 352 : if (!LOG_DISABLED(lg)) {
2032 351 : if (log_filename(lg, bak, filename) != GDK_SUCCEED)
2033 0 : goto error;
2034 : }
2035 :
2036 352 : lg->catalog_bid = NULL;
2037 352 : lg->catalog_id = NULL;
2038 352 : lg->catalog_cnt = NULL;
2039 352 : lg->catalog_lid = NULL;
2040 352 : lg->dcatalog = NULL;
2041 :
2042 352 : lg->seqs_id = NULL;
2043 352 : lg->seqs_val = NULL;
2044 352 : lg->dseqs = NULL;
2045 :
2046 352 : if (!LOG_DISABLED(lg)) {
2047 : /* try to open logfile backup, or failing that, the file
2048 : * itself. we need to know whether this file exists when
2049 : * checking the database consistency later on */
2050 351 : if ((fp = MT_fopen(bak, "r")) != NULL) {
2051 0 : fclose(fp);
2052 0 : fp = NULL;
2053 0 : if (GDKunlink(0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED ||
2054 0 : GDKmove(0, lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL, true) != GDK_SUCCEED)
2055 0 : goto error;
2056 351 : } else if (errno != ENOENT) {
2057 0 : GDKsyserror("open %s failed", bak);
2058 0 : goto error;
2059 : }
2060 351 : fp = MT_fopen(filename, "r");
2061 351 : if (fp == NULL && errno != ENOENT) {
2062 0 : GDKsyserror("open %s failed", filename);
2063 0 : goto error;
2064 : }
2065 : }
2066 :
2067 352 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2068 352 : catalog_bid = BBPindex(bak);
2069 :
2070 : /* initialize arrays for type mapping, to be read from disk */
2071 352 : memset(lg->type_id, -1, sizeof(lg->type_id));
2072 352 : memset(lg->type_nr, 255, sizeof(lg->type_nr));
2073 :
2074 : /* this is intentional - if catalog_bid is 0, force it to find
2075 : * the persistent catalog */
2076 352 : if (catalog_bid == 0) {
2077 : /* catalog does not exist, so the log file also
2078 : * shouldn't exist */
2079 227 : if (fp != NULL) {
2080 0 : GDKerror("there is no logger catalog, "
2081 : "but there is a log file.\n");
2082 0 : goto error;
2083 : }
2084 :
2085 227 : lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2086 227 : lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2087 227 : lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
2088 :
2089 227 : if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
2090 0 : GDKerror("cannot create catalog bats");
2091 0 : goto error;
2092 : }
2093 227 : TRC_INFO(WAL, "create %s catalog\n", fn);
2094 :
2095 : /* give the catalog bats names so we can find them
2096 : * next time */
2097 227 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2098 227 : if (BBPrename(lg->catalog_bid, bak) < 0) {
2099 0 : goto error;
2100 : }
2101 :
2102 227 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2103 227 : if (BBPrename(lg->catalog_id, bak) < 0) {
2104 0 : goto error;
2105 : }
2106 :
2107 227 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2108 227 : if (BBPrename(lg->dcatalog, bak) < 0) {
2109 0 : goto error;
2110 : }
2111 :
2112 227 : if (!LOG_DISABLED(lg)) {
2113 226 : if (GDKcreatedir(filename) != GDK_SUCCEED) {
2114 0 : GDKerror("cannot create directory for log file %s\n", filename);
2115 0 : goto error;
2116 : }
2117 226 : if (log_create_types_file(lg, filename) != GDK_SUCCEED)
2118 0 : goto error;
2119 : }
2120 :
2121 227 : BBPretain(lg->catalog_bid->batCacheid);
2122 227 : BBPretain(lg->catalog_id->batCacheid);
2123 227 : BBPretain(lg->dcatalog->batCacheid);
2124 :
2125 227 : log_lock(lg);
2126 : /* bm_subcommit releases the lock */
2127 227 : if (bm_subcommit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2128 : /* cannot commit catalog, so remove log */
2129 0 : if (MT_remove(filename) < 0)
2130 0 : GDKsyserror("remove %s failed\n", filename);
2131 0 : BBPrelease(lg->catalog_bid->batCacheid);
2132 0 : BBPrelease(lg->catalog_id->batCacheid);
2133 0 : BBPrelease(lg->dcatalog->batCacheid);
2134 0 : goto error;
2135 : }
2136 : } else {
2137 : /* find the persistent catalog. As non persistent bats
2138 : * require a logical reference we also add a logical
2139 : * reference for the persistent bats */
2140 125 : BUN p, q;
2141 125 : BAT *b, *o, *d;
2142 :
2143 125 : assert(!lg->inmemory);
2144 :
2145 : /* the catalog exists, and so should the log file */
2146 125 : if (fp == NULL && !LOG_DISABLED(lg)) {
2147 0 : GDKerror("There is a logger catalog, but no log file.\n");
2148 0 : goto error;
2149 : }
2150 : if (fp != NULL) {
2151 : /* check_version always closes fp */
2152 125 : if (check_version(lg, fp, &needsnew) != GDK_SUCCEED) {
2153 0 : fp = NULL;
2154 0 : goto error;
2155 : }
2156 : readlogs = true;
2157 : fp = NULL;
2158 : }
2159 :
2160 125 : if (lg->catalog_bid == NULL && lg->catalog_id == NULL && lg->dcatalog == NULL) {
2161 125 : b = BATdescriptor(catalog_bid);
2162 125 : if (b == NULL) {
2163 0 : GDKerror("inconsistent database, catalog does not exist");
2164 0 : goto error;
2165 : }
2166 :
2167 125 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2168 125 : catalog_id = BBPindex(bak);
2169 125 : o = BATdescriptor(catalog_id);
2170 125 : if (o == NULL) {
2171 0 : BBPunfix(b->batCacheid);
2172 0 : GDKerror("inconsistent database, catalog_id does not exist");
2173 0 : goto error;
2174 : }
2175 :
2176 125 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2177 125 : dcatalog = BBPindex(bak);
2178 125 : d = BATdescriptor(dcatalog);
2179 125 : if (d == NULL) {
2180 0 : GDKerror("cannot create dcatalog bat");
2181 0 : BBPunfix(b->batCacheid);
2182 0 : BBPunfix(o->batCacheid);
2183 0 : goto error;
2184 : }
2185 :
2186 125 : lg->catalog_bid = b;
2187 125 : lg->catalog_id = o;
2188 125 : lg->dcatalog = d;
2189 125 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
2190 32439 : BATloop(lg->catalog_bid, p, q) {
2191 32314 : bat bid = bids[p];
2192 32314 : oid pos = p;
2193 :
2194 32314 : if (BBPretain(bid) == 0 && /* any bid in the catalog_bid, needs one logical ref */
2195 0 : BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
2196 0 : BUNappend(lg->dcatalog, &pos, true) != GDK_SUCCEED)
2197 0 : goto error;
2198 : }
2199 : }
2200 125 : if ((lg->catalog_bid = BATsetaccess(lg->catalog_bid, BAT_READ)) == NULL ||
2201 125 : (lg->catalog_id = BATsetaccess(lg->catalog_id, BAT_READ)) == NULL ||
2202 125 : (lg->dcatalog = BATsetaccess(lg->dcatalog, BAT_READ)) == NULL) {
2203 0 : goto error;
2204 : }
2205 125 : BBPretain(lg->catalog_bid->batCacheid);
2206 125 : BBPretain(lg->catalog_id->batCacheid);
2207 125 : BBPretain(lg->dcatalog->batCacheid);
2208 : }
2209 : /* failing to rename the catalog_cnt and catalog_lid bats is not
2210 : * fatal */
2211 352 : lg->catalog_cnt = logbat_new(TYPE_lng, 1, SYSTRANS);
2212 352 : if (lg->catalog_cnt == NULL) {
2213 0 : GDKerror("failed to create catalog_cnt bat");
2214 0 : goto error;
2215 : }
2216 352 : strconcat_len(bak, sizeof(bak), fn, "_catalog_cnt", NULL);
2217 352 : if (BBPrename(lg->catalog_cnt, bak) < 0)
2218 0 : GDKclrerr();
2219 352 : lg->catalog_lid = logbat_new(TYPE_lng, 1, SYSTRANS);
2220 352 : if (lg->catalog_lid == NULL) {
2221 0 : GDKerror("failed to create catalog_lid bat");
2222 0 : goto error;
2223 : }
2224 352 : strconcat_len(bak, sizeof(bak), fn, "_catalog_lid", NULL);
2225 352 : if (BBPrename(lg->catalog_lid, bak) < 0)
2226 0 : GDKclrerr();
2227 352 : if (bm_get_counts(lg) != GDK_SUCCEED)
2228 0 : goto error;
2229 :
2230 352 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2231 352 : if (BBPindex(bak)) {
2232 125 : lg->seqs_id = BATdescriptor(BBPindex(bak));
2233 125 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2234 125 : lg->seqs_val = BATdescriptor(BBPindex(bak));
2235 125 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2236 125 : lg->dseqs = BATdescriptor(BBPindex(bak));
2237 125 : if (lg->seqs_id == NULL ||
2238 125 : lg->seqs_val == NULL ||
2239 : lg->dseqs == NULL) {
2240 0 : GDKerror("Logger_new: cannot load seqs bats");
2241 0 : goto error;
2242 : }
2243 125 : if ((lg->seqs_val = BATsetaccess(lg->seqs_val, BAT_READ)) == NULL ||
2244 125 : (lg->seqs_id = BATsetaccess(lg->seqs_id, BAT_READ)) == NULL ||
2245 125 : (lg->dseqs = BATsetaccess(lg->dseqs, BAT_READ)) == NULL) {
2246 0 : goto error;
2247 : }
2248 : } else {
2249 227 : lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
2250 227 : lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
2251 227 : lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
2252 227 : if (lg->seqs_id == NULL ||
2253 227 : lg->seqs_val == NULL ||
2254 : lg->dseqs == NULL) {
2255 0 : GDKerror("Logger_new: cannot create seqs bats");
2256 0 : goto error;
2257 : }
2258 :
2259 227 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2260 227 : if (BBPrename(lg->seqs_id, bak) < 0) {
2261 0 : goto error;
2262 : }
2263 :
2264 227 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2265 227 : if (BBPrename(lg->seqs_val, bak) < 0) {
2266 0 : goto error;
2267 : }
2268 :
2269 227 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2270 227 : if (BBPrename(lg->dseqs, bak) < 0) {
2271 0 : goto error;
2272 : }
2273 : needcommit = true;
2274 : }
2275 352 : dbg = ATOMIC_GET(&GDKdebug);
2276 352 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2277 352 : if (needcommit && bm_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2278 0 : GDKerror("Logger_new: commit failed");
2279 0 : goto error;
2280 : }
2281 352 : ATOMIC_SET(&GDKdebug, dbg);
2282 :
2283 352 : if (readlogs) {
2284 125 : ulng log_id = lg->saved_id + 1;
2285 125 : bool earlyexit = GDKgetenv_isyes("process-wal-and-exit");
2286 125 : if (log_readlogs(lg, filename) != GDK_SUCCEED) {
2287 0 : goto error;
2288 : }
2289 125 : if (!earlyexit) {
2290 125 : if (lg->postfuncp && (*lg->postfuncp) (lg->funcdata, lg) != GDK_SUCCEED)
2291 0 : goto error;
2292 125 : if (needsnew) {
2293 16 : if (GDKmove(0, lg->dir, LOGFILE, NULL, lg->dir, LOGFILE, "bak", true) != GDK_SUCCEED) {
2294 0 : TRC_CRITICAL(GDK, "couldn't move log to log.bak\n");
2295 0 : return GDK_FAIL;
2296 : }
2297 16 : if (log_create_types_file(lg, filename) != GDK_SUCCEED) {
2298 0 : TRC_CRITICAL(GDK, "couldn't write new log\n");
2299 0 : return GDK_FAIL;
2300 : }
2301 : }
2302 : }
2303 125 : dbg = ATOMIC_GET(&GDKdebug);
2304 125 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2305 125 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2306 0 : goto error;
2307 : }
2308 125 : ATOMIC_SET(&GDKdebug, dbg);
2309 344 : for (; log_id <= lg->saved_id; log_id++)
2310 219 : (void) log_cleanup(lg, log_id); /* ignore error of removing file */
2311 125 : if (earlyexit) {
2312 0 : printf("# mserver5 exiting\n");
2313 0 : exit(0);
2314 : }
2315 141 : if (needsnew &&
2316 16 : GDKunlink(0, lg->dir, LOGFILE, "bak") != GDK_SUCCEED) {
2317 0 : TRC_CRITICAL(GDK, "couldn't remove old log.bak file\n");
2318 0 : return GDK_FAIL;
2319 : }
2320 : } else {
2321 227 : lg->id = lg->saved_id + 1;
2322 227 : if (GDKgetenv_isyes("process-wal-and-exit")) {
2323 0 : printf("# mserver5 exiting\n");
2324 0 : exit(0);
2325 : }
2326 : }
2327 : #ifdef GDKLIBRARY_JSON
2328 352 : if (log_json_upgrade_finalize() == GDK_FAIL)
2329 0 : goto error;
2330 : #endif
2331 : return GDK_SUCCEED;
2332 0 : error:
2333 0 : if (fp)
2334 0 : fclose(fp);
2335 0 : logbat_destroy(lg->catalog_bid);
2336 0 : logbat_destroy(lg->catalog_id);
2337 0 : logbat_destroy(lg->dcatalog);
2338 0 : logbat_destroy(lg->seqs_id);
2339 0 : logbat_destroy(lg->seqs_val);
2340 0 : logbat_destroy(lg->dseqs);
2341 0 : MT_lock_destroy(&lg->lock);
2342 0 : MT_lock_destroy(&lg->rotation_lock);
2343 0 : GDKfree(lg->fn);
2344 0 : GDKfree(lg->dir);
2345 0 : GDKfree(lg->rbuf);
2346 0 : GDKfree(lg->wbuf);
2347 0 : GDKfree(lg);
2348 0 : ATOMIC_SET(&GDKdebug, dbg);
2349 : /* We do not call log_json_upgrade_finalize here because we want
2350 : * the upgrade to run again next time we try, so we do not want
2351 : * to remove the signal file just yet.
2352 : */
2353 0 : return GDK_FAIL;
2354 : }
2355 :
2356 : /* Initialize a new logger
2357 : * It will load any data in the logdir and persist it in the BATs*/
2358 : static logger *
2359 352 : log_new(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp,
2360 : postversionfix_fptr postfuncp, void *funcdata)
2361 : {
2362 352 : logger *lg;
2363 352 : char filename[FILENAME_MAX];
2364 :
2365 352 : lng max_dropped = GDKgetenv_int("wal_max_dropped", 100000);
2366 352 : lng max_file_age = GDKgetenv_int("wal_max_file_age", 600);
2367 352 : lng max_file_size = 0;
2368 :
2369 352 : if (GDKdebug & TESTINGMASK) {
2370 : max_file_size = 2048; /* 2 KiB */
2371 : } else {
2372 12 : const char *max_file_size_str = GDKgetenv("wal_max_file_size");
2373 12 : max_file_size = max_file_size_str ? strtoul(max_file_size_str, NULL, 10) : 2147483648;
2374 : }
2375 :
2376 352 : if (!GDKinmemory(0) && MT_path_absolute(logdir)) {
2377 0 : TRC_CRITICAL(GDK, "logdir must be relative path\n");
2378 0 : return NULL;
2379 : }
2380 :
2381 352 : if (snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP) >= FILENAME_MAX) {
2382 0 : TRC_CRITICAL(GDK, "filename is too large\n");
2383 0 : return NULL;
2384 : }
2385 :
2386 352 : lg = GDKmalloc(sizeof(struct logger));
2387 352 : if (lg == NULL) {
2388 0 : TRC_CRITICAL(GDK, "allocating logger structure failed\n");
2389 0 : return NULL;
2390 : }
2391 :
2392 704 : *lg = (logger) {
2393 352 : .inmemory = GDKinmemory(0),
2394 : .debug = debug,
2395 : .version = version,
2396 : .prefuncp = prefuncp,
2397 : .postfuncp = postfuncp,
2398 : .funcdata = funcdata,
2399 :
2400 352 : .max_dropped = max_dropped >= 0 ? max_dropped : 100000,
2401 : .file_age = 0,
2402 352 : .max_file_age = max_file_age >= 0 ? max_file_age * 1000000 : 600000000,
2403 352 : .max_file_size = max_file_size >= 0 ? max_file_size : 2147483648,
2404 :
2405 : .id = 0,
2406 352 : .saved_id = getBBPlogno(), /* get saved log number from bbp */
2407 : .nr_flushers = ATOMIC_VAR_INIT(0),
2408 352 : .fn = GDKstrdup(fn),
2409 352 : .dir = GDKstrdup(filename),
2410 : .rbufsize = 64 * 1024,
2411 352 : .rbuf = GDKmalloc(64 * 1024),
2412 : .wbufsize = 64 * 1024,
2413 352 : .wbuf = GDKmalloc(64 * 1024),
2414 : };
2415 :
2416 : /* probably open file and check version first, then call call old logger code */
2417 352 : if (lg->fn == NULL ||
2418 352 : lg->dir == NULL ||
2419 352 : lg->rbuf == NULL ||
2420 : lg->wbuf == NULL) {
2421 0 : TRC_CRITICAL(GDK, "allocating for logger structure failed\n");
2422 0 : GDKfree(lg->fn);
2423 0 : GDKfree(lg->dir);
2424 0 : GDKfree(lg->rbuf);
2425 0 : GDKfree(lg->wbuf);
2426 0 : GDKfree(lg);
2427 0 : return NULL;
2428 : }
2429 352 : TRC_DEBUG(WAL, "dir set to %s\n", lg->dir);
2430 :
2431 352 : MT_lock_init(&lg->lock, fn);
2432 352 : MT_lock_init(&lg->rotation_lock, "rotation_lock");
2433 352 : MT_lock_init(&lg->flush_lock, "flush_lock");
2434 352 : MT_cond_init(&lg->excl_flush_cv);
2435 :
2436 352 : if (log_load(fn, lg, filename) == GDK_SUCCEED) {
2437 : return lg;
2438 : }
2439 : return NULL;
2440 : }
2441 :
2442 : static logged_range *
2443 68327 : do_flush_range_cleanup(logger *lg)
2444 : {
2445 68327 : logged_range *frange = lg->flush_ranges;
2446 68327 : logged_range *first = frange;
2447 :
2448 68327 : if (frange == NULL)
2449 : return NULL;
2450 80522 : while (frange->next) {
2451 12375 : if (ATOMIC_GET(&frange->refcount) > 1)
2452 : break;
2453 12195 : frange = frange->next;
2454 : }
2455 68327 : if (first == frange) {
2456 : return first;
2457 : }
2458 :
2459 12195 : logged_range *flast = frange;
2460 :
2461 12195 : lg->flush_ranges = flast;
2462 :
2463 24390 : for (frange = first; frange && frange != flast; frange = frange->next) {
2464 12195 : ATOMIC_DEC(&frange->refcount);
2465 12195 : if (!LOG_DISABLED(lg) && frange->output_log) {
2466 0 : TRC_INFO(WAL, "closing output log %s", mnstr_name(frange->output_log));
2467 0 : close_stream(frange->output_log);
2468 0 : frange->output_log = NULL;
2469 : }
2470 : }
2471 : return flast;
2472 : }
2473 :
2474 : void
2475 351 : log_destroy(logger *lg)
2476 : {
2477 351 : log_close_input(lg);
2478 351 : logged_range *last = do_flush_range_cleanup(lg);
2479 351 : (void) last;
2480 351 : assert(last == lg->current && last == lg->flush_ranges);
2481 351 : log_close_output(lg);
2482 818 : for (logged_range * p = lg->pending; p; p = lg->pending) {
2483 467 : lg->pending = p->next;
2484 467 : GDKfree(p);
2485 : }
2486 351 : if (LOG_DISABLED(lg)) {
2487 1 : lg->saved_id = lg->id;
2488 1 : lg->saved_tid = lg->tid;
2489 1 : log_commit(lg, NULL, NULL, 0);
2490 : }
2491 351 : if (lg->catalog_bid) {
2492 351 : log_lock(lg);
2493 351 : BUN p, q;
2494 351 : BAT *b = lg->catalog_bid;
2495 :
2496 : /* free resources */
2497 351 : const log_bid *bids = (const log_bid *) Tloc(b, 0);
2498 92098 : BATloop(b, p, q) {
2499 91747 : bat bid = bids[p];
2500 :
2501 91747 : BBPrelease(bid);
2502 : }
2503 :
2504 351 : BBPrelease(lg->catalog_bid->batCacheid);
2505 351 : BBPrelease(lg->catalog_id->batCacheid);
2506 351 : BBPrelease(lg->dcatalog->batCacheid);
2507 351 : logbat_destroy(lg->catalog_bid);
2508 351 : logbat_destroy(lg->catalog_id);
2509 351 : logbat_destroy(lg->dcatalog);
2510 :
2511 351 : logbat_destroy(lg->catalog_cnt);
2512 351 : logbat_destroy(lg->catalog_lid);
2513 351 : log_unlock(lg);
2514 : }
2515 351 : MT_lock_destroy(&lg->lock);
2516 351 : MT_lock_destroy(&lg->rotation_lock);
2517 351 : MT_lock_destroy(&lg->flush_lock);
2518 351 : GDKfree(lg->fn);
2519 351 : GDKfree(lg->dir);
2520 351 : GDKfree(lg->rbuf);
2521 351 : GDKfree(lg->wbuf);
2522 351 : GDKfree(lg);
2523 351 : }
2524 :
2525 : /* Create a new logger */
2526 : logger *
2527 352 : log_create(int debug, const char *fn, const char *logdir, int version,
2528 : preversionfix_fptr prefuncp, postversionfix_fptr postfuncp,
2529 : void *funcdata)
2530 : {
2531 352 : logger *lg;
2532 352 : TRC_INFO_IF(WAL) {
2533 0 : TRC_INFO_ENDIF(WAL, "Started processing logs %s/%s version %d\n", fn, logdir, version);
2534 0 : GDKtracer_flush_buffer();
2535 : }
2536 352 : lg = log_new(debug, fn, logdir, version, prefuncp, postfuncp, funcdata);
2537 352 : if (lg == NULL)
2538 : return NULL;
2539 352 : TRC_INFO_IF(WAL) {
2540 0 : TRC_INFO_ENDIF(WAL, "Finished processing logs %s/%s\n", fn, logdir);
2541 0 : GDKtracer_flush_buffer();
2542 : }
2543 352 : if (GDKsetenv("recovery", "finished") != GDK_SUCCEED) {
2544 0 : log_destroy(lg);
2545 0 : return NULL;
2546 : }
2547 352 : assert(lg->current == NULL);
2548 352 : logged_range dummy = {
2549 352 : .cnt = BATcount(lg->catalog_bid),
2550 : };
2551 352 : lg->current = &dummy;
2552 352 : if (log_open_output(lg) != GDK_SUCCEED) {
2553 0 : lg->current = NULL;
2554 0 : log_destroy(lg);
2555 0 : return NULL;
2556 : }
2557 352 : lg->current = lg->current->next;
2558 352 : assert(lg->pending == NULL && lg->flush_ranges == NULL);
2559 352 : lg->pending = lg->current;
2560 352 : lg->flush_ranges = lg->current;
2561 352 : return lg;
2562 : }
2563 :
2564 : static logged_range *
2565 6560 : log_next_logfile(logger *lg, ulng ts)
2566 : {
2567 6560 : int m = (ATOMIC_GET(&GDKdebug) & TESTINGMASK) ? 1000 : 100;
2568 6560 : if (!lg->pending || !lg->pending->next)
2569 : return NULL;
2570 6560 : rotation_lock(lg);
2571 6560 : if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && lg->pending != lg->flush_ranges &&
2572 6560 : (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) ATOMIC_GET(&lg->pending->flushed_ts) &&
2573 6560 : (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) {
2574 6018 : rotation_unlock(lg);
2575 6018 : logged_range *p = lg->pending;
2576 6018 : for (int i = 1;
2577 12079 : i < m && ATOMIC_GET(&p->refcount) == 0 && p->next && p->next != lg->current &&
2578 6064 : p->next != lg->flush_ranges && (ulng) ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts)
2579 18143 : && (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++)
2580 6061 : p = p->next;
2581 6018 : return p;
2582 : }
2583 542 : rotation_unlock(lg);
2584 542 : return NULL;
2585 : }
2586 :
2587 : static void
2588 6018 : log_cleanup_range(logger *lg, ulng id)
2589 : {
2590 6018 : rotation_lock(lg);
2591 18097 : while (lg->pending && lg->pending->id <= id) {
2592 12079 : logged_range *p;
2593 12079 : p = lg->pending;
2594 12079 : if (p)
2595 12079 : lg->pending = p->next;
2596 12079 : GDKfree(p);
2597 : }
2598 6018 : rotation_unlock(lg);
2599 6018 : }
2600 :
2601 : static void
2602 67979 : do_rotate(logger *lg)
2603 : {
2604 67979 : logged_range *cur = lg->current;
2605 67979 : logged_range *next = cur->next;
2606 67979 : if (next) {
2607 12195 : assert(ATOMIC_GET(&next->refcount) == 1);
2608 12195 : lg->current = next;
2609 12195 : if (!LOG_DISABLED(lg) && ATOMIC_GET(&cur->refcount) == 1 && cur->output_log) {
2610 12083 : close_stream(cur->output_log);
2611 12083 : cur->output_log = NULL;
2612 : }
2613 : }
2614 67979 : }
2615 :
2616 : gdk_return
2617 7215 : log_activate(logger *lg)
2618 : {
2619 7215 : bool flush_cleanup = false;
2620 7215 : gdk_return res = GDK_SUCCEED;
2621 :
2622 7215 : rotation_lock(lg);
2623 7215 : const lng current_file_size = LOG_DISABLED(lg) ? 0 : (lng) getfilepos(getFile(lg->current->output_log));
2624 :
2625 7215 : if (current_file_size == -1) {
2626 0 : rotation_unlock(lg);
2627 0 : return GDK_FAIL;
2628 : }
2629 : /* file size of 2 means only endian indicator present
2630 : * (i.e. effectively empty) */
2631 7215 : if (current_file_size <= 2) {
2632 5292 : rotation_unlock(lg);
2633 5292 : return GDK_SUCCEED;
2634 : }
2635 :
2636 1923 : if (!lg->flushnow &&
2637 1923 : !lg->current->next &&
2638 1923 : current_file_size > 2 &&
2639 1923 : (ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped ||
2640 1915 : current_file_size > lg->max_file_size ||
2641 1893 : (GDKusec() - lg->file_age) > lg->max_file_age) &&
2642 30 : (ulng) ATOMIC_GET(&lg->current->last_ts) > 0 &&
2643 30 : lg->saved_id + 1 == lg->id &&
2644 28 : ATOMIC_GET(&lg->current->refcount) == 1 /* no pending work on this file */ ) {
2645 28 : lg->id++;
2646 : /* start new file */
2647 28 : res = log_open_output(lg);
2648 28 : flush_cleanup = true;
2649 28 : do_rotate(lg);
2650 : }
2651 28 : if (flush_cleanup)
2652 28 : (void) do_flush_range_cleanup(lg);
2653 1923 : rotation_unlock(lg);
2654 1923 : return res;
2655 : }
2656 :
2657 : gdk_return
2658 6560 : log_flush(logger *lg, ulng ts)
2659 : {
2660 6560 : logged_range *pending = log_next_logfile(lg, ts);
2661 6560 : ulng lid = pending ? pending->id : 0, olid = lg->saved_id;
2662 6560 : if (LOG_DISABLED(lg)) {
2663 0 : lg->saved_id = lid;
2664 0 : lg->saved_tid = lg->tid;
2665 0 : if (lid)
2666 0 : log_cleanup_range(lg, lg->saved_id);
2667 0 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED)
2668 0 : TRC_ERROR(GDK, "failed to commit");
2669 0 : return GDK_SUCCEED;
2670 : }
2671 6560 : if (lg->saved_id >= lid)
2672 : return GDK_SUCCEED;
2673 6018 : rotation_lock(lg);
2674 6018 : ulng lgid = lg->id;
2675 6018 : rotation_unlock(lg);
2676 6018 : if (lg->saved_id + 1 >= lgid) /* logger should first release the file */
2677 : return GDK_SUCCEED;
2678 6018 : log_return res = LOG_OK;
2679 6018 : ulng cid = olid;
2680 6018 : assert(lid <= lgid);
2681 : uint32_t *updated = NULL;
2682 : BUN nupdated = 0;
2683 : size_t allocated = 0;
2684 18097 : while (cid < lid && res == LOG_OK) {
2685 12079 : if (!lg->input_log) {
2686 12079 : char filename[MAXPATH];
2687 12079 : char id[32];
2688 12079 : if (snprintf(id, sizeof(id), LLFMT, cid + 1) >= (int) sizeof(id)) {
2689 0 : GDKfree(updated);
2690 0 : TRC_CRITICAL(GDK, "log_id filename is too large\n");
2691 0 : return GDK_FAIL;
2692 : }
2693 12079 : if (GDKfilepath(filename, sizeof(filename), BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id) != GDK_SUCCEED) {
2694 0 : GDKfree(updated);
2695 0 : return GDK_FAIL;
2696 : }
2697 12079 : if (strlen(filename) >= FILENAME_MAX) {
2698 : GDKfree(updated);
2699 : TRC_CRITICAL(GDK, "Logger filename path is too large\n");
2700 : return GDK_FAIL;
2701 : }
2702 :
2703 12079 : bool filemissing = false;
2704 12079 : if (log_open_input(lg, filename, &filemissing) != GDK_SUCCEED) {
2705 0 : GDKfree(updated);
2706 0 : return GDK_FAIL;
2707 : }
2708 : }
2709 : /* we read the full file because skipping is impossible with current log format */
2710 12079 : log_lock(lg);
2711 12079 : if (updated == NULL) {
2712 6018 : nupdated = BATcount(lg->catalog_id);
2713 6018 : allocated = ((nupdated + 31) & ~31) / 8;
2714 6018 : if (allocated == 0)
2715 0 : allocated = 4;
2716 6018 : updated = GDKzalloc(allocated);
2717 6018 : if (updated == NULL) {
2718 0 : log_unlock(lg);
2719 0 : return GDK_FAIL;
2720 : }
2721 6061 : } else if (nupdated < BATcount(lg->catalog_id)) {
2722 390 : BUN n = BATcount(lg->catalog_id);
2723 390 : size_t a = ((n + 31) & ~31) / 8;
2724 390 : if (a > allocated) {
2725 41 : uint32_t *p = GDKrealloc(updated, a);
2726 41 : if (p == NULL) {
2727 0 : GDKfree(updated);
2728 0 : log_unlock(lg);
2729 0 : return GDK_FAIL;
2730 : }
2731 41 : updated = p;
2732 41 : memset(updated + allocated / 4, 0, a - allocated);
2733 41 : allocated = a;
2734 : }
2735 : nupdated = n;
2736 : }
2737 12079 : lg->flushing = true;
2738 12079 : res = log_read_transaction(lg, updated, nupdated);
2739 12079 : lg->flushing = false;
2740 12079 : log_unlock(lg);
2741 12079 : if (res == LOG_EOF) {
2742 12079 : log_close_input(lg);
2743 12079 : res = LOG_OK;
2744 : }
2745 12079 : cid++;
2746 : }
2747 6018 : if (lid > olid && res == LOG_OK) {
2748 6018 : rotation_lock(lg); /* protect against concurrent log_tflush rotate check */
2749 6018 : lg->saved_id = lid;
2750 6018 : rotation_unlock(lg);
2751 6018 : if (log_commit(lg, pending, updated, nupdated) != GDK_SUCCEED) {
2752 0 : TRC_ERROR(GDK, "failed to commit");
2753 0 : res = LOG_ERR;
2754 0 : rotation_lock(lg);
2755 0 : lg->saved_id = olid; /* reset !! */
2756 0 : rotation_unlock(lg);
2757 : }
2758 0 : if (res != LOG_ERR) {
2759 18097 : while (olid < lid) {
2760 : /* Try to cleanup, remove old log file, continue on failure! */
2761 12079 : olid++;
2762 12079 : (void) log_cleanup(lg, olid);
2763 : }
2764 : }
2765 6018 : if (res == LOG_OK)
2766 6018 : log_cleanup_range(lg, lg->saved_id);
2767 : }
2768 6018 : GDKfree(updated);
2769 6018 : return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
2770 : }
2771 :
2772 : /* Clean-up write-ahead log files already persisted in the BATs, leaving only the most recent one.
2773 : * Only the bak- files are deleted for the preserved WAL files.
2774 : */
2775 : lng
2776 17028 : log_changes(logger *lg)
2777 : {
2778 17028 : if (LOG_DISABLED(lg))
2779 : return 0;
2780 17028 : rotation_lock(lg);
2781 17028 : lng changes = lg->id - lg->saved_id - 1;
2782 17028 : rotation_unlock(lg);
2783 17028 : return changes;
2784 : }
2785 :
2786 : int
2787 483 : log_sequence(logger *lg, int seq, lng *id)
2788 : {
2789 483 : log_lock(lg);
2790 483 : BUN p = log_find(lg->seqs_id, lg->dseqs, seq);
2791 :
2792 483 : if (p != BUN_NONE) {
2793 364 : *id = *(lng *) Tloc(lg->seqs_val, p);
2794 :
2795 364 : log_unlock(lg);
2796 364 : return 1;
2797 : }
2798 119 : log_unlock(lg);
2799 119 : return 0;
2800 : }
2801 :
2802 : gdk_return
2803 198417 : log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng cnt)
2804 : {
2805 198417 : bte tpe = find_type(lg, type);
2806 198417 : gdk_return ok = GDK_SUCCEED;
2807 198417 : logformat l;
2808 198417 : lng nr;
2809 198417 : l.flag = LOG_UPDATE_CONST;
2810 198417 : l.id = id;
2811 198417 : nr = cnt;
2812 :
2813 198417 : if (LOG_DISABLED(lg) || !nr) {
2814 : /* logging is switched off */
2815 73067 : if (nr) {
2816 73067 : log_lock(lg);
2817 73067 : ok = la_bat_update_count(lg, id, offset + cnt, lg->tid);
2818 73067 : log_unlock(lg);
2819 : }
2820 73067 : return ok;
2821 : }
2822 :
2823 125350 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[type].atomWrite;
2824 :
2825 125350 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2826 250700 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
2827 250700 : log_write_format(lg, &l) != GDK_SUCCEED ||
2828 250700 : !mnstr_writeLng(lg->current->output_log, nr) ||
2829 250700 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2830 125350 : !mnstr_writeLng(lg->current->output_log, offset)) {
2831 0 : ATOMIC_DEC(&lg->current->refcount);
2832 0 : ok = GDK_FAIL;
2833 0 : goto bailout;
2834 : }
2835 :
2836 125350 : ok = wt(val, lg->current->output_log, 1);
2837 :
2838 125350 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
2839 :
2840 125350 : bailout:
2841 125350 : if (ok != GDK_SUCCEED) {
2842 0 : const char *err = mnstr_peek_error(lg->current->output_log);
2843 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
2844 : }
2845 : return ok;
2846 : }
2847 :
2848 : static gdk_return
2849 19551 : string_writer(logger *lg, BAT *b, lng offset, lng nr)
2850 : {
2851 19551 : size_t bufsz = lg->wbufsize, resize = 0;
2852 19551 : BUN end = (BUN) (offset + nr);
2853 19551 : char *buf = lg->wbuf;
2854 19551 : gdk_return res = GDK_SUCCEED;
2855 :
2856 19551 : if (!buf)
2857 : return GDK_FAIL;
2858 19551 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2859 19551 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
2860 : return GDK_FAIL;
2861 19551 : BATiter bi = bat_iterator(b);
2862 19551 : BUN p = (BUN) offset;
2863 39154 : for (; p < end;) {
2864 19603 : size_t sz = 0;
2865 19603 : if (resize) {
2866 2 : if ((buf = GDKrealloc(lg->wbuf, resize)) == NULL) {
2867 : res = GDK_FAIL;
2868 : break;
2869 : }
2870 2 : lg->wbuf = buf;
2871 2 : lg->wbufsize = bufsz = resize;
2872 2 : resize = 0;
2873 : }
2874 19603 : char *dst = buf;
2875 71453 : for (; p < end && sz < bufsz; p++) {
2876 51902 : const char *s = BUNtvar(bi, p);
2877 51902 : size_t len = strlen(s) + 1;
2878 51902 : if ((sz + len) > bufsz) {
2879 52 : if (len > bufsz)
2880 2 : resize = len + bufsz;
2881 : break;
2882 : } else {
2883 51850 : memcpy(dst, s, len);
2884 51850 : dst += len;
2885 51850 : sz += len;
2886 : }
2887 : }
2888 39204 : if (sz &&
2889 39202 : (!mnstr_writeLng(lg->current->output_log, (lng) sz) ||
2890 19601 : mnstr_write(lg->current->output_log, buf, sz, 1) != 1)) {
2891 : res = GDK_FAIL;
2892 : break;
2893 : }
2894 : }
2895 19551 : bat_iterator_end(&bi);
2896 19551 : return res;
2897 : }
2898 :
2899 : static gdk_return
2900 509865 : internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced, lng total_cnt)
2901 : {
2902 509865 : bte tpe = find_type(lg, b->ttype);
2903 509865 : gdk_return ok = GDK_SUCCEED;
2904 509865 : logformat l;
2905 509865 : BUN p;
2906 509865 : lng nr;
2907 509865 : l.flag = LOG_UPDATE_BULK;
2908 509865 : l.id = id;
2909 509865 : nr = cnt;
2910 :
2911 509865 : if (LOG_DISABLED(lg) || !nr) {
2912 : /* logging is switched off */
2913 211155 : if (nr)
2914 156375 : return la_bat_update_count(lg, id, offset + cnt, lg->tid);
2915 : return GDK_SUCCEED;
2916 : }
2917 :
2918 269473 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[b->ttype].atomWrite;
2919 :
2920 269473 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2921 269473 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR) {
2922 0 : ok = GDK_FAIL;
2923 0 : goto bailout;
2924 : }
2925 :
2926 269473 : if (lg->total_cnt == 0) /* signals single bulk message or first part of bat logged in parts */
2927 538738 : if (log_write_format(lg, &l) != GDK_SUCCEED ||
2928 668302 : !mnstr_writeLng(lg->current->output_log, total_cnt ? total_cnt : cnt) ||
2929 538738 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2930 409174 : !mnstr_writeLng(lg->current->output_log, total_cnt ? -1 : offset)) { /* offset = -1 indicates bat was logged in parts */
2931 0 : ok = GDK_FAIL;
2932 0 : goto bailout;
2933 : }
2934 269473 : if (!total_cnt)
2935 129564 : total_cnt = cnt;
2936 269473 : lg->total_cnt += cnt;
2937 :
2938 269473 : if (lg->total_cnt == total_cnt) /* This is the last to be logged part of this bat, we can already reset the total_cnt */
2939 269369 : lg->total_cnt = 0;
2940 :
2941 : /* if offset is just for the log, but BAT is already sliced, reset offset */
2942 269473 : if (sliced)
2943 1512 : offset = 0;
2944 269473 : if (b->ttype == TYPE_msk) {
2945 0 : BATiter bi = bat_iterator(b);
2946 0 : if (offset % 32 == 0) {
2947 0 : if (!mnstr_writeIntArray(lg->current->output_log, (int *) ((char *) bi.base + offset / 32),
2948 0 : (size_t) ((nr + 31) / 32)))
2949 0 : ok = GDK_FAIL;
2950 : } else {
2951 0 : for (lng i = 0; i < nr; i += 32) {
2952 : uint32_t v = 0;
2953 0 : for (int j = 0; j < 32 && i + j < nr; j++)
2954 0 : v |= (uint32_t) Tmskval(&bi, (BUN) (offset + i + j)) << j;
2955 0 : if (!mnstr_writeInt(lg->current->output_log, (int) v)) {
2956 : ok = GDK_FAIL;
2957 : break;
2958 : }
2959 : }
2960 : }
2961 0 : bat_iterator_end(&bi);
2962 518751 : } else if (b->ttype < TYPE_str && !isVIEW(b)) {
2963 249278 : BATiter bi = bat_iterator(b);
2964 249278 : const void *t = BUNtail(bi, (BUN) offset);
2965 :
2966 249278 : ok = wt(t, lg->current->output_log, (size_t) nr);
2967 249278 : bat_iterator_end(&bi);
2968 20195 : } else if (b->ttype == TYPE_str) {
2969 : /* efficient string writes */
2970 19545 : ok = string_writer(lg, b, offset, nr);
2971 : } else {
2972 650 : BATiter bi = bat_iterator(b);
2973 650 : BUN end = (BUN) (offset + nr);
2974 1658 : for (p = (BUN) offset; p < end && ok == GDK_SUCCEED; p++) {
2975 1008 : const void *t = BUNtail(bi, p);
2976 :
2977 1008 : ok = wt(t, lg->current->output_log, 1);
2978 : }
2979 650 : bat_iterator_end(&bi);
2980 : }
2981 :
2982 269473 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
2983 :
2984 269473 : bailout:
2985 269473 : if (ok != GDK_SUCCEED) {
2986 0 : ATOMIC_DEC(&lg->current->refcount);
2987 0 : const char *err = mnstr_peek_error(lg->current->output_log);
2988 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
2989 : }
2990 : return ok;
2991 : }
2992 :
2993 : /*
2994 : * Changes made to the BAT descriptor should be stored in the log
2995 : * files. Actually, we need to save the descriptor file, perhaps we
2996 : * should simply introduce a versioning scheme.
2997 : */
2998 : gdk_return
2999 236630 : log_bat_persists(logger *lg, BAT *b, log_id id)
3000 : {
3001 236630 : log_lock(lg);
3002 236630 : bte ta = find_type(lg, b->ttype);
3003 236630 : logformat l;
3004 :
3005 236630 : if (log_add_bat(lg, b, id, -1) != GDK_SUCCEED) {
3006 0 : log_unlock(lg);
3007 0 : if (!LOG_DISABLED(lg))
3008 0 : ATOMIC_DEC(&lg->current->refcount);
3009 0 : return GDK_FAIL;
3010 : }
3011 :
3012 236630 : l.flag = LOG_CREATE;
3013 236630 : l.id = id;
3014 236630 : if (!LOG_DISABLED(lg)) {
3015 157277 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3016 314554 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3017 314554 : log_write_format(lg, &l) != GDK_SUCCEED ||
3018 157277 : mnstr_write(lg->current->output_log, &ta, 1, 1) != 1) {
3019 0 : log_unlock(lg);
3020 0 : ATOMIC_DEC(&lg->current->refcount);
3021 0 : return GDK_FAIL;
3022 : }
3023 : }
3024 236630 : TRC_DEBUG(WAL, "id (%d) bat (%d)\n", id, b->batCacheid);
3025 236630 : gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0, 0);
3026 236630 : log_unlock(lg);
3027 236630 : if (r != GDK_SUCCEED)
3028 0 : ATOMIC_DEC(&lg->current->refcount);
3029 : return r;
3030 : }
3031 :
3032 : gdk_return
3033 22194 : log_bat_transient(logger *lg, log_id id)
3034 : {
3035 22194 : log_lock(lg);
3036 22194 : log_bid bid = internal_find_bat(lg, id, -1);
3037 22194 : logformat l;
3038 :
3039 22194 : if (bid < 0) {
3040 0 : log_unlock(lg);
3041 0 : return GDK_FAIL;
3042 : }
3043 22194 : if (!bid) {
3044 0 : GDKerror("log_bat_transient failed to find bid for object %d\n", id);
3045 0 : log_unlock(lg);
3046 0 : return GDK_FAIL;
3047 : }
3048 22194 : l.flag = LOG_DESTROY;
3049 22194 : l.id = id;
3050 :
3051 22194 : if (!LOG_DISABLED(lg)) {
3052 10240 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3053 0 : TRC_CRITICAL(GDK, "write failed\n");
3054 0 : log_unlock(lg);
3055 0 : ATOMIC_DEC(&lg->current->refcount);
3056 0 : return GDK_FAIL;
3057 : }
3058 : }
3059 22194 : TRC_DEBUG(WAL, "Logged destroyed bat (%d) %d\n", id, bid);
3060 22194 : BAT *b = BBPquickdesc(bid);
3061 22194 : assert(b);
3062 22194 : BUN cnt = BATcount(b);
3063 22194 : ATOMIC_ADD(&lg->current->drops, cnt);
3064 22194 : gdk_return r = log_del_bat(lg, bid);
3065 22194 : log_unlock(lg);
3066 22194 : if (r != GDK_SUCCEED)
3067 0 : ATOMIC_DEC(&lg->current->refcount);
3068 : return r;
3069 : }
3070 :
3071 : static gdk_return
3072 117538 : log_bat_group(logger *lg, log_id id)
3073 : {
3074 117538 : if (LOG_DISABLED(lg))
3075 : return GDK_SUCCEED;
3076 :
3077 76464 : logformat l;
3078 76464 : l.flag = LOG_BAT_GROUP;
3079 76464 : l.id = id;
3080 76464 : gdk_return r = log_write_format(lg, &l);
3081 76464 : return r;
3082 : }
3083 :
3084 : gdk_return
3085 58769 : log_bat_group_start(logger *lg, log_id id)
3086 : {
3087 : /*positive table id represent start of logged table */
3088 58769 : return log_bat_group(lg, id);
3089 : }
3090 :
3091 : gdk_return
3092 58769 : log_bat_group_end(logger *lg, log_id id)
3093 : {
3094 : /*negative table id represent end of logged table */
3095 58769 : return log_bat_group(lg, -id);
3096 : }
3097 :
3098 : gdk_return
3099 270519 : log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, lng total_cnt)
3100 : {
3101 270519 : log_lock(lg);
3102 270519 : gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0, total_cnt);
3103 270519 : log_unlock(lg);
3104 270519 : return r;
3105 : }
3106 :
3107 : gdk_return
3108 2800 : log_delta(logger *lg, BAT *uid, BAT *uval, log_id id)
3109 : {
3110 2800 : log_lock(lg);
3111 2800 : bte tpe = find_type(lg, uval->ttype);
3112 2800 : gdk_return ok = GDK_SUCCEED;
3113 2800 : logformat l;
3114 2800 : BUN p;
3115 2800 : lng nr;
3116 :
3117 2800 : if (BATtdense(uid)) {
3118 2716 : ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1, 0);
3119 2716 : log_unlock(lg);
3120 2716 : if (!LOG_DISABLED(lg) && ok != GDK_SUCCEED)
3121 0 : ATOMIC_DEC(&lg->current->refcount);
3122 2716 : return ok;
3123 : }
3124 :
3125 84 : assert(uid->ttype == TYPE_oid || uid->ttype == TYPE_void);
3126 :
3127 84 : l.flag = LOG_UPDATE;
3128 84 : l.id = id;
3129 84 : nr = (BATcount(uval));
3130 84 : assert(nr);
3131 :
3132 84 : if (LOG_DISABLED(lg)) {
3133 : /* logging is switched off */
3134 65 : log_unlock(lg);
3135 65 : return GDK_SUCCEED;
3136 : }
3137 :
3138 19 : BATiter vi = bat_iterator(uval);
3139 19 : gdk_return(*wh) (const void *, stream *, size_t) = BATatoms[TYPE_oid].atomWrite;
3140 19 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[uval->ttype].atomWrite;
3141 :
3142 19 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3143 38 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3144 38 : log_write_format(lg, &l) != GDK_SUCCEED ||
3145 38 : !mnstr_writeLng(lg->current->output_log, nr) ||
3146 19 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1) {
3147 0 : ok = GDK_FAIL;
3148 0 : goto bailout;
3149 : }
3150 1074 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3151 1055 : const oid id = BUNtoid(uid, p);
3152 :
3153 1055 : ok = wh(&id, lg->current->output_log, 1);
3154 : }
3155 19 : if (uval->ttype == TYPE_msk) {
3156 0 : if (!mnstr_writeIntArray(lg->current->output_log, vi.base,
3157 0 : (BATcount(uval) + 31) / 32))
3158 0 : ok = GDK_FAIL;
3159 32 : } else if (uval->ttype < TYPE_str && !isVIEW(uval)) {
3160 13 : const void *t = BUNtail(vi, 0);
3161 :
3162 13 : ok = wt(t, lg->current->output_log, (size_t) nr);
3163 6 : } else if (uval->ttype == TYPE_str) {
3164 : /* efficient string writes */
3165 6 : ok = string_writer(lg, uval, 0, nr);
3166 : } else {
3167 0 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3168 0 : const void *val = BUNtail(vi, p);
3169 :
3170 0 : ok = wt(val, lg->current->output_log, 1);
3171 : }
3172 : }
3173 :
3174 19 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
3175 :
3176 19 : bailout:
3177 19 : bat_iterator_end(&vi);
3178 19 : if (ok != GDK_SUCCEED) {
3179 0 : const char *err = mnstr_peek_error(lg->current->output_log);
3180 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
3181 0 : ATOMIC_DEC(&lg->current->refcount);
3182 : }
3183 19 : log_unlock(lg);
3184 19 : return ok;
3185 : }
3186 :
3187 : static inline bool
3188 56961 : check_rotation_conditions(logger *lg)
3189 : {
3190 56961 : if (LOG_DISABLED(lg))
3191 : return false;
3192 :
3193 56958 : if (lg->current->next)
3194 : return false; /* do not rotate if there is already a prepared next current */
3195 56958 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
3196 : return true;
3197 56958 : const lng current_file_size = (lng) getfilepos(getFile(lg->current->output_log));
3198 :
3199 56958 : if (current_file_size == -1)
3200 : return false;
3201 :
3202 56958 : assert(current_file_size >= 0);
3203 :
3204 56958 : if (current_file_size == 2)
3205 : return false;
3206 :
3207 1384 : bool res = (lg->saved_id + 1 >= lg->id && ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped) ||
3208 56057 : current_file_size > lg->max_file_size ||
3209 50099 : (GDKusec() - lg->file_age) > lg->max_file_age;
3210 :
3211 54680 : return res;
3212 : }
3213 :
3214 : gdk_return
3215 62456 : log_tend(logger *lg)
3216 : {
3217 62456 : TRC_DEBUG(WAL, "tend %d\n", lg->tid);
3218 :
3219 62456 : if (LOG_DISABLED(lg))
3220 : return GDK_SUCCEED;
3221 :
3222 56958 : gdk_return result;
3223 56958 : logformat l;
3224 56958 : l.flag = LOG_END;
3225 56958 : l.id = lg->tid;
3226 :
3227 56958 : if ((result = log_write_format(lg, &l)) == GDK_SUCCEED)
3228 56958 : ATOMIC_INC(&lg->nr_flushers);
3229 : return result;
3230 : }
3231 :
3232 : #define flush_lock(lg) MT_lock_set(&(lg)->flush_lock)
3233 : #define flush_unlock(lg) MT_lock_unset(&(lg)->flush_lock)
3234 :
3235 : static inline gdk_return
3236 56627 : do_flush(logged_range *range)
3237 : {
3238 : /* assumes flush lock */
3239 56627 : stream *output_log = range->output_log;
3240 56627 : ulng ts = ATOMIC_GET(&range->last_ts);
3241 :
3242 56627 : if (mnstr_flush(output_log, MNSTR_FLUSH_DATA) ||
3243 56627 : (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) && mnstr_fsync(output_log)))
3244 0 : return GDK_FAIL;
3245 56627 : ATOMIC_SET(&range->flushed_ts, ts);
3246 56627 : return GDK_SUCCEED;
3247 : }
3248 :
3249 : static inline void
3250 62453 : log_tdone(logger *lg, logged_range *range, ulng commit_ts)
3251 : {
3252 62453 : (void) lg;
3253 62453 : TRC_DEBUG(WAL, "tdone " LLFMT "\n", commit_ts);
3254 :
3255 62453 : if ((ulng) ATOMIC_GET(&range->last_ts) < commit_ts)
3256 62122 : ATOMIC_SET(&range->last_ts, commit_ts);
3257 62453 : }
3258 :
3259 : gdk_return
3260 62456 : log_tflush(logger *lg, ulng file_id, ulng commit_ts)
3261 : {
3262 62456 : rotation_lock(lg);
3263 62456 : if (lg->flushnow) {
3264 5495 : logged_range *p = lg->current;
3265 5495 : assert(lg->flush_ranges == lg->current);
3266 5495 : assert(ATOMIC_GET(&lg->current->flushed_ts) == ATOMIC_GET(&lg->current->last_ts));
3267 5495 : log_tdone(lg, lg->current, commit_ts);
3268 5495 : ATOMIC_SET(&lg->current->flushed_ts, commit_ts);
3269 5495 : lg->id++;
3270 5495 : lg->flushnow = false;
3271 5495 : if (log_open_output(lg) != GDK_SUCCEED)
3272 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3273 5495 : do_rotate(lg);
3274 5495 : (void) do_flush_range_cleanup(lg);
3275 5495 : assert(lg->flush_ranges == lg->current);
3276 5495 : rotation_unlock(lg);
3277 5495 : return log_commit(lg, p, NULL, 0);
3278 : }
3279 :
3280 56961 : if (LOG_DISABLED(lg)) {
3281 3 : rotation_unlock(lg);
3282 3 : return GDK_SUCCEED;
3283 : }
3284 :
3285 56958 : logged_range *frange = do_flush_range_cleanup(lg);
3286 :
3287 57023 : while (frange->next && frange->id < file_id) {
3288 : assert(frange->next);
3289 : frange = frange->next;
3290 : }
3291 :
3292 56958 : log_tdone(lg, frange, commit_ts);
3293 :
3294 56958 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts) {
3295 : /* delay needed ? */
3296 :
3297 56627 : flush_lock(lg);
3298 : /* check it one more time */
3299 56627 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts)
3300 56627 : do_flush(frange);
3301 56627 : flush_unlock(lg);
3302 : }
3303 : /* else somebody else has flushed our log file */
3304 :
3305 56958 : if (ATOMIC_DEC(&frange->refcount) == 1 && !LOG_DISABLED(lg)) {
3306 55752 : if (frange != lg->current && frange->output_log) {
3307 112 : close_stream(frange->output_log);
3308 112 : frange->output_log = NULL;
3309 : }
3310 : }
3311 :
3312 56958 : if (ATOMIC_DEC(&lg->nr_flushers) == 0) {
3313 : /* I am the last flusher
3314 : * if present,
3315 : * wake up the exclusive flusher in log_tstart */
3316 : /* rotation_lock is still being held */
3317 56431 : MT_cond_signal(&lg->excl_flush_cv);
3318 : }
3319 56958 : rotation_unlock(lg);
3320 :
3321 56958 : return GDK_SUCCEED;
3322 : }
3323 :
3324 : static gdk_return
3325 6940 : log_tsequence_(logger *lg, int seq, lng val)
3326 : {
3327 6940 : logformat l;
3328 :
3329 6940 : if (LOG_DISABLED(lg))
3330 : return GDK_SUCCEED;
3331 2838 : l.flag = LOG_SEQ;
3332 2838 : l.id = seq;
3333 :
3334 2838 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3335 :
3336 2838 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3337 5676 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3338 5676 : log_write_format(lg, &l) != GDK_SUCCEED ||
3339 2838 : !mnstr_writeLng(lg->current->output_log, val)) {
3340 0 : TRC_CRITICAL(GDK, "write failed\n");
3341 0 : ATOMIC_DEC(&lg->current->refcount);
3342 0 : return GDK_FAIL;
3343 : }
3344 : return GDK_SUCCEED;
3345 : }
3346 :
3347 : /* a transaction in it self */
3348 : gdk_return
3349 6940 : log_tsequence(logger *lg, int seq, lng val)
3350 : {
3351 6940 : BUN p;
3352 :
3353 6940 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3354 :
3355 6940 : log_lock(lg);
3356 6940 : MT_lock_set(&lg->seqs_id->theaplock);
3357 6940 : BUN inserted = lg->seqs_id->batInserted;
3358 6940 : MT_lock_unset(&lg->seqs_id->theaplock);
3359 6940 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE && p >= inserted) {
3360 1677 : assert(lg->seqs_val->hseqbase == 0);
3361 1677 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
3362 0 : log_unlock(lg);
3363 0 : return GDK_FAIL;
3364 : }
3365 : } else {
3366 4900 : if (p != BUN_NONE) {
3367 4900 : oid pos = p;
3368 4900 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
3369 0 : log_unlock(lg);
3370 0 : return GDK_FAIL;
3371 : }
3372 : }
3373 10526 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
3374 5263 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
3375 0 : log_unlock(lg);
3376 0 : return GDK_FAIL;
3377 : }
3378 : }
3379 6940 : gdk_return r = log_tsequence_(lg, seq, val);
3380 6940 : log_unlock(lg);
3381 6940 : return r;
3382 : }
3383 :
3384 : static gdk_return
3385 11866 : bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
3386 : {
3387 11866 : log_lock(lg);
3388 11866 : BAT *b = lg->catalog_bid;
3389 11866 : const log_bid *bids;
3390 :
3391 11866 : bids = (log_bid *) Tloc(b, 0);
3392 248494 : for (BUN p = b->batInserted, cnt = pending ? pending->cnt : BATcount(b); p < cnt; p++) {
3393 236628 : log_bid bid = bids[p];
3394 236628 : BAT *lb = BBP_desc(bid);
3395 :
3396 236628 : assert(bid);
3397 236628 : if (lb->batCacheid == 0 || BATmode(lb, false) != GDK_SUCCEED) {
3398 0 : GDKwarning("Failed to set bat (%d%s) persistent\n", bid, !lb ? " gone" : "");
3399 0 : log_unlock(lg);
3400 0 : return GDK_FAIL;
3401 : }
3402 :
3403 236628 : assert(lb->batRestricted != BAT_WRITE);
3404 :
3405 236628 : TRC_DEBUG(WAL, "create %d (%d)\n", bid, BBP_lrefs(bid));
3406 : }
3407 : /* bm_subcommit releases the lock */
3408 11866 : return bm_subcommit(lg, pending, updated, maxupdated);
3409 : }
3410 :
3411 : static gdk_return
3412 236894 : log_add_bat(logger *lg, BAT *b, log_id id, int tid)
3413 : {
3414 236894 : log_bid bid = internal_find_bat(lg, id, tid);
3415 236894 : lng cnt = 0;
3416 236894 : lng lid = lng_nil;
3417 :
3418 236894 : assert(b->batRestricted != BAT_WRITE);
3419 236894 : assert(b->batRole == PERSISTENT);
3420 236894 : if (bid < 0)
3421 : return GDK_FAIL;
3422 236894 : if (bid) {
3423 155399 : if (bid != b->batCacheid) {
3424 155396 : if (log_del_bat(lg, bid) != GDK_SUCCEED)
3425 : return GDK_FAIL;
3426 : } else {
3427 : return GDK_SUCCEED;
3428 : }
3429 : }
3430 236891 : bid = b->batCacheid;
3431 236891 : TRC_DEBUG(WAL, "create %d\n", id);
3432 236891 : assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
3433 473782 : if (BUNappend(lg->catalog_bid, &bid, true) != GDK_SUCCEED ||
3434 473782 : BUNappend(lg->catalog_id, &id, true) != GDK_SUCCEED ||
3435 473782 : BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED ||
3436 236891 : BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
3437 0 : return GDK_FAIL;
3438 236891 : if (lg->current)
3439 236627 : lg->current->cnt++;
3440 236891 : BBPretain(bid);
3441 236891 : return GDK_SUCCEED;
3442 : }
3443 :
3444 : static gdk_return
3445 177638 : log_del_bat(logger *lg, log_bid bid)
3446 : {
3447 177638 : BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid);
3448 177638 : lng lid = lg->tid;
3449 :
3450 177638 : assert(p != BUN_NONE);
3451 177638 : if (p == BUN_NONE) {
3452 : GDKerror("cannot find BAT\n");
3453 : return GDK_FAIL;
3454 : }
3455 :
3456 177638 : assert(lg->catalog_lid->hseqbase == 0);
3457 177638 : return BUNreplace(lg->catalog_lid, p, &lid, false);
3458 : }
3459 :
3460 : /* returns -1 on failure, 0 when not found, > 0 when found */
3461 : log_bid
3462 33018 : log_find_bat(logger *lg, log_id id)
3463 : {
3464 33018 : log_lock(lg);
3465 33018 : log_bid bid = internal_find_bat(lg, id, -1);
3466 33018 : log_unlock(lg);
3467 33018 : if (!bid) {
3468 0 : GDKerror("logger_find_bat failed to find bid for object %d\n", id);
3469 0 : return GDK_FAIL;
3470 : }
3471 : return bid;
3472 : }
3473 :
3474 :
3475 :
3476 : gdk_return
3477 62457 : log_tstart(logger *lg, bool flushnow, ulng *file_id)
3478 : {
3479 62457 : rotation_lock(lg);
3480 62457 : if (flushnow) {
3481 5496 : if (file_id == NULL) {
3482 : /* special case: ask store_manager to rotate log file */
3483 1 : lg->file_age = 0;
3484 1 : rotation_unlock(lg);
3485 1 : return GDK_SUCCEED;
3486 : }
3487 : /* I am now the exclusive flusher */
3488 5495 : while (ATOMIC_GET(&lg->nr_flushers)) {
3489 : /* I am waiting until all existing flushers are done */
3490 0 : MT_cond_wait(&lg->excl_flush_cv, &lg->rotation_lock);
3491 : }
3492 5495 : assert(ATOMIC_GET(&lg->nr_flushers) == 0);
3493 :
3494 5495 : if (ATOMIC_GET(&lg->current->last_ts)) {
3495 2090 : lg->id++;
3496 2090 : if (log_open_output(lg) != GDK_SUCCEED)
3497 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3498 : }
3499 5495 : do_rotate(lg);
3500 5495 : (void) do_flush_range_cleanup(lg);
3501 5495 : rotation_unlock(lg);
3502 :
3503 5495 : if (lg->saved_id + 1 < lg->id)
3504 5077 : log_flush(lg, (1ULL << 63));
3505 5495 : lg->flushnow = flushnow;
3506 : } else {
3507 56961 : if (check_rotation_conditions(lg)) {
3508 4582 : lg->id++;
3509 4582 : if (log_open_output(lg) != GDK_SUCCEED)
3510 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3511 : }
3512 56961 : do_rotate(lg);
3513 56961 : rotation_unlock(lg);
3514 : }
3515 :
3516 62456 : if (LOG_DISABLED(lg))
3517 : return GDK_SUCCEED;
3518 :
3519 56958 : ATOMIC_INC(&lg->current->refcount);
3520 56958 : *file_id = lg->current->id;
3521 56958 : logformat l;
3522 56958 : l.flag = LOG_START;
3523 56958 : l.id = ++lg->tid;
3524 :
3525 56958 : TRC_DEBUG(WAL, "tstart %d\n", lg->tid);
3526 56958 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3527 0 : ATOMIC_DEC(&lg->current->refcount);
3528 0 : return GDK_FAIL;
3529 : }
3530 :
3531 : return GDK_SUCCEED;
3532 : }
3533 :
3534 : void
3535 116 : log_printinfo(logger *lg)
3536 : {
3537 116 : if (!rotation_trylock(lg, 1000)) {
3538 0 : printf("Logger is currently locked, so no logger information\n");
3539 0 : return;
3540 : }
3541 116 : printf("logger %s:\n", lg->fn);
3542 116 : printf("current log file "ULLFMT", last handled log file "ULLFMT"\n",
3543 : lg->id, lg->saved_id);
3544 116 : printf("current transaction id %d, saved transaction id %d\n",
3545 : lg->tid, lg->saved_tid);
3546 116 : printf("number of flushers: %d\n", (int) ATOMIC_GET(&lg->nr_flushers));
3547 116 : printf("number of catalog entries "BUNFMT", of which "BUNFMT" deleted\n",
3548 116 : lg->catalog_bid->batCount, lg->dcatalog->batCount);
3549 330 : for (logged_range *p = lg->pending; p; p = p->next) {
3550 214 : char buf[32];
3551 214 : if ((lg->debug & 128 || lg->inmemory) ||
3552 214 : p->output_log == NULL ||
3553 116 : snprintf(buf, sizeof(buf), ", file size %"PRIu64, (uint64_t) getfilepos(getFile(lg->current->output_log))) >= (int) sizeof(buf))
3554 98 : buf[0] = 0;
3555 312 : printf("pending range "ULLFMT": drops %"PRIu64", last_ts %"PRIu64", flushed_ts %"PRIu64", refcount %"PRIu64"%s%s\n", p->id, (uint64_t) ATOMIC_GET(&p->drops), (uint64_t) ATOMIC_GET(&p->last_ts), (uint64_t) ATOMIC_GET(&p->flushed_ts), (uint64_t) ATOMIC_GET(&p->refcount), buf, p == lg->current ? " (current)" : "");
3556 : }
3557 116 : rotation_unlock(lg);
3558 : }
|