Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024, 2025 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 : #include "gdk_logger.h"
17 : #include "gdk_logger_internals.h"
18 : #include "mutils.h"
19 : #include <string.h>
20 :
21 : static gdk_return log_add_bat(logger *lg, BAT *b, log_id id, int tid);
22 : static gdk_return log_del_bat(logger *lg, log_bid bid);
23 : /*
24 : * The logger uses a directory to store its log files. One master log
25 : * file stores information about the version of the logger and the
26 : * type mapping it uses. This file is a simple ascii file with the
27 : * following format:
28 : * {6DIGIT-VERSION\n[id,type_name\n]*}
29 : * The transaction log files have a binary format.
30 : */
31 :
32 : #define LOG_START 0
33 : #define LOG_END 1
34 : #define LOG_UPDATE_CONST 2
35 : #define LOG_UPDATE_BULK 3
36 : #define LOG_UPDATE 4
37 : #define LOG_CREATE 5
38 : #define LOG_DESTROY 6
39 : #define LOG_SEQ 7
40 : #define LOG_CLEAR 8 /* DEPRECATED */
41 : #define LOG_BAT_GROUP 9
42 :
43 : #ifdef NATIVE_WIN32
44 : #define getfilepos _ftelli64
45 : #else
46 : #ifdef HAVE_FSEEKO
47 : #define getfilepos ftello
48 : #else
49 : #define getfilepos ftell
50 : #endif
51 : #endif
52 :
53 : #define BATSIZE 0
54 :
55 : #define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory || (lg)->flushnow)
56 :
57 : static const char *log_commands[] = {
58 : "LOG_START",
59 : "LOG_END",
60 : "LOG_UPDATE_CONST",
61 : "LOG_UPDATE_BULK",
62 : "LOG_UPDATE",
63 : "LOG_CREATE",
64 : "LOG_DESTROY",
65 : "LOG_SEQ",
66 : "", /* LOG_CLEAR IS DEPRECATED */
67 : "LOG_BAT_GROUP",
68 : };
69 :
70 : typedef struct logaction {
71 : int type; /* type of change */
72 : lng nr;
73 : int tt;
74 : lng id;
75 : lng offset;
76 : log_id cid; /* id of object */
77 : BAT *b; /* temporary bat with changes */
78 : BAT *uid; /* temporary bat with bun positions to update */
79 : } logaction;
80 :
81 : /* during the recover process a number of transactions could be active */
82 : typedef struct trans {
83 : int tid; /* transaction id */
84 : int sz; /* sz of the changes array */
85 : int nr; /* nr of changes */
86 :
87 : logaction *changes;
88 :
89 : struct trans *tr;
90 : } trans;
91 :
92 : typedef struct logformat_t {
93 : bte flag;
94 : int id;
95 : } logformat;
96 :
97 : typedef enum { LOG_OK, LOG_EOF, LOG_ERR } log_return;
98 :
99 : static gdk_return bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated);
100 : static gdk_return tr_grow(trans *tr);
101 :
102 : #define log_lock(lg) MT_lock_set(&(lg)->lock)
103 : #define log_unlock(lg) MT_lock_unset(&(lg)->lock)
104 :
105 : static inline bte
106 951131 : find_type(logger *lg, int tpe)
107 : {
108 951131 : assert(tpe >= 0 && tpe < MAXATOMS);
109 951131 : return lg->type_id[tpe];
110 : }
111 :
112 : static inline int
113 543758 : find_type_nr(logger *lg, bte tpe)
114 : {
115 543758 : int nr = lg->type_nr[tpe < 0 ? 256 + tpe : tpe];
116 543758 : if (nr == 255)
117 0 : return -1;
118 : return nr;
119 : }
120 :
121 : static BUN
122 422755 : log_find(BAT *b, BAT *d, int val)
123 : {
124 422755 : BUN p;
125 :
126 422755 : assert(b->ttype == TYPE_int);
127 422755 : assert(d->ttype == TYPE_oid);
128 422755 : BATiter bi = bat_iterator(b);
129 422755 : if (BAThash(b) == GDK_SUCCEED) {
130 422755 : MT_rwlock_rdlock(&b->thashlock);
131 710908 : HASHloop_int(bi, b->thash, p, &val) {
132 184673 : oid pos = p;
133 184673 : if (BUNfnd(d, &pos) == BUN_NONE) {
134 184673 : MT_rwlock_rdunlock(&b->thashlock);
135 184673 : bat_iterator_end(&bi);
136 184673 : return p;
137 : }
138 : }
139 238082 : MT_rwlock_rdunlock(&b->thashlock);
140 : } else { /* unlikely: BAThash failed */
141 0 : int *t = (int *) bi.base;
142 :
143 0 : for (p = 0; p < bi.count; p++) {
144 0 : if (t[p] == val) {
145 0 : oid pos = p;
146 0 : if (BUNfnd(d, &pos) == BUN_NONE) {
147 0 : bat_iterator_end(&bi);
148 0 : return p;
149 : }
150 : }
151 : }
152 : }
153 238082 : bat_iterator_end(&bi);
154 238082 : return BUN_NONE;
155 : }
156 :
157 : static log_bid
158 654354 : internal_find_bat(logger *lg, log_id id, int tid)
159 : {
160 654354 : BUN p;
161 :
162 654354 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
163 654354 : BATiter cni = bat_iterator(lg->catalog_id);
164 654354 : MT_rwlock_rdlock(&cni.b->thashlock);
165 654354 : if (tid < 0) {
166 403415 : HASHloop_int(cni, cni.b->thash, p, &id) {
167 211093 : oid pos = p;
168 211093 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
169 211093 : MT_rwlock_rdunlock(&cni.b->thashlock);
170 211093 : bat_iterator_end(&cni);
171 211093 : return *(log_bid *) Tloc(lg->catalog_bid, p);
172 : }
173 : }
174 : } else {
175 361199 : BUN cp = BUN_NONE;
176 1390528 : HASHloop_int(cni, cni.b->thash, p, &id) {
177 785682 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
178 785682 : if (lid != lng_nil && lid <= tid) {
179 : break;
180 : }
181 : cp = p;
182 : }
183 361199 : if (cp != BUN_NONE) {
184 360943 : MT_rwlock_rdunlock(&cni.b->thashlock);
185 360943 : bat_iterator_end(&cni);
186 360943 : return *(log_bid *) Tloc(lg->catalog_bid, cp);
187 : }
188 : }
189 82318 : MT_rwlock_rdunlock(&cni.b->thashlock);
190 82318 : bat_iterator_end(&cni);
191 82318 : return 0; /* not found */
192 : }
193 : return -1; /* error creating hash */
194 : }
195 :
196 : static inline void
197 16612 : logbat_destroy(BAT *b)
198 : {
199 20790 : BBPreclaim(b);
200 3816 : }
201 :
202 : static BAT *
203 18348 : logbat_new(int tt, BUN size, role_t role)
204 : {
205 18348 : BAT *nb = COLnew(0, tt, size, role);
206 :
207 18348 : if (nb) {
208 18348 : BBP_pid(nb->batCacheid) = 0;
209 18348 : if (role == PERSISTENT) {
210 11418 : BATmode(nb, false);
211 11418 : nb = BATsetaccess(nb, BAT_READ);
212 : }
213 : } else {
214 0 : TRC_CRITICAL(GDK, "creating new BAT[%s]#" BUNFMT " failed\n", ATOMname(tt), size);
215 : }
216 18348 : return nb;
217 : }
218 :
219 : static bool
220 756669 : log_read_format(logger *lg, logformat *data)
221 : {
222 756669 : assert(!lg->inmemory);
223 756669 : if (mnstr_read(lg->input_log, &data->flag, 1, 1) == 1) {
224 743784 : if (mnstr_readInt(lg->input_log, &data->id) == 1)
225 : return true;
226 : /* could only read part, so complain */
227 0 : TRC_CRITICAL(GDK, "read failed\n");
228 : }
229 : return false;
230 : }
231 :
232 : static gdk_return
233 748466 : log_write_format(logger *lg, logformat *data)
234 : {
235 748466 : assert(data->id || data->flag);
236 748466 : assert(!lg->inmemory);
237 748466 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
238 1496932 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
239 1496932 : mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
240 748466 : mnstr_writeInt(lg->current->output_log, data->id))
241 : return GDK_SUCCEED;
242 0 : TRC_CRITICAL(GDK, "write failed\n");
243 0 : return GDK_FAIL;
244 : }
245 :
246 : static log_return
247 2773 : log_read_seq(logger *lg, logformat *l)
248 : {
249 2773 : int seq = l->id;
250 2773 : lng val;
251 2773 : BUN p;
252 :
253 2773 : assert(!lg->inmemory);
254 2773 : if (mnstr_readLng(lg->input_log, &val) != 1) {
255 0 : TRC_CRITICAL(GDK, "read failed\n");
256 0 : return LOG_EOF;
257 : }
258 2773 : if (lg->flushing)
259 : return LOG_OK;
260 :
261 65 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
262 64 : p >= lg->seqs_id->batInserted) {
263 40 : assert(lg->seqs_val->hseqbase == 0);
264 40 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
265 0 : TRC_CRITICAL(GDK, "replace of %s_seqs_val failed\n", lg->fn);
266 0 : return LOG_ERR;
267 : }
268 : } else {
269 24 : if (p != BUN_NONE) {
270 24 : oid pos = p;
271 24 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
272 0 : TRC_CRITICAL(GDK, "append to %s_dseqs failed\n", lg->fn);
273 0 : return LOG_ERR;
274 : }
275 : }
276 50 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
277 25 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
278 0 : TRC_CRITICAL(GDK, "append to %s_seqs_val/id failed\n", lg->fn);
279 0 : return LOG_ERR;
280 : }
281 : }
282 : return LOG_OK;
283 : }
284 :
285 : #if 0
286 : static gdk_return
287 : log_write_id(logger *lg, int id)
288 : {
289 : assert(!lg->inmemory);
290 : assert(id >= 0);
291 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
292 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
293 : mnstr_writeInt(lg->current->output_log, id))
294 : return GDK_SUCCEED;
295 : TRC_CRITICAL(GDK, "write failed\n");
296 : return GDK_FAIL;
297 : }
298 :
299 : static log_return
300 : log_read_id(logger *lg, log_id *id)
301 : {
302 : assert(!lg->inmemory);
303 : if (mnstr_readInt(lg->input_log, id) != 1) {
304 : TRC_CRITICAL(GDK, "read failed\n");
305 : return LOG_EOF;
306 : }
307 : return LOG_OK;
308 : }
309 : #endif
310 :
311 : static log_return
312 19225 : string_reader(logger *lg, BAT *b, lng nr)
313 : {
314 19225 : size_t sz = 0;
315 19225 : lng SZ = 0;
316 19225 : log_return res = LOG_OK;
317 :
318 38621 : while (nr && res == LOG_OK) {
319 19396 : if (mnstr_readLng(lg->input_log, &SZ) != 1) {
320 0 : TRC_CRITICAL(GDK, "read failed\n");
321 0 : return LOG_EOF;
322 : }
323 19396 : sz = (size_t) SZ;
324 19396 : char *buf = lg->rbuf;
325 19396 : if (lg->rbufsize < sz) {
326 2 : if (!(buf = GDKrealloc(lg->rbuf, sz))) {
327 0 : TRC_CRITICAL(GDK, "couldn't grow string buffer\n");
328 0 : return LOG_ERR;
329 : }
330 2 : lg->rbuf = buf;
331 2 : lg->rbufsize = sz;
332 : }
333 :
334 19396 : if (mnstr_read(lg->input_log, buf, sz, 1) != 1) {
335 0 : TRC_CRITICAL(GDK, "read failed\n");
336 0 : return LOG_EOF;
337 : }
338 : /* handle strings */
339 : char *t = buf;
340 : /* chunked */
341 : #define CHUNK_SIZE 1024
342 : char *strings[CHUNK_SIZE];
343 : int cur = 0;
344 :
345 70823 : for (; nr > 0 && res == LOG_OK && t < (buf + sz); nr--) {
346 51427 : strings[cur++] = t;
347 51427 : if (cur == CHUNK_SIZE &&
348 4 : b &&
349 4 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
350 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
351 0 : res = LOG_ERR;
352 : }
353 51427 : if (cur == CHUNK_SIZE)
354 14 : cur = 0;
355 : /* find next */
356 5540972 : while (*t)
357 5489545 : t++;
358 51427 : t++;
359 : }
360 19396 : if (cur &&
361 332 : b &&
362 332 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
363 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
364 0 : res = LOG_ERR;
365 : }
366 : }
367 : return res;
368 : }
369 :
370 :
371 : struct offset {
372 : lng os; /* offset within source BAT in logfile */
373 : lng nr; /* number of values to be copied */
374 : lng od; /* offset within destination BAT in database */
375 : };
376 :
377 : static log_return
378 389096 : log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, BAT **cands)
379 : {
380 389096 : log_return res = LOG_OK;
381 389096 : lng nr, pnr;
382 389096 : bte type_id = -1;
383 389096 : int tpe;
384 :
385 389096 : assert(!lg->inmemory);
386 389096 : TRC_DEBUG(WAL, "found %d %s", id, l->flag == LOG_UPDATE ? "update" : "update_buld");
387 :
388 778192 : if (mnstr_readLng(lg->input_log, &nr) != 1 ||
389 389096 : mnstr_read(lg->input_log, &type_id, 1, 1) != 1) {
390 0 : TRC_CRITICAL(GDK, "read failed\n");
391 0 : return LOG_EOF;
392 : }
393 :
394 389096 : pnr = nr;
395 389096 : tpe = find_type_nr(lg, type_id);
396 389096 : if (tpe >= 0) {
397 389096 : BAT *uid = NULL;
398 389096 : BAT *r = NULL;
399 389096 : void *(*rt)(ptr, size_t *, stream *, size_t) = BATatoms[tpe].atomRead;
400 389096 : lng offset;
401 :
402 389096 : assert(nr <= (lng) BUN_MAX);
403 389096 : if (!lg->flushing && l->flag == LOG_UPDATE) {
404 0 : uid = COLnew(0, TYPE_oid, (BUN) nr, PERSISTENT);
405 0 : if (uid == NULL) {
406 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
407 28209 : return LOG_ERR;
408 : }
409 : }
410 :
411 389096 : if (l->flag == LOG_UPDATE_CONST) {
412 122703 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
413 0 : TRC_CRITICAL(GDK, "read failed\n");
414 0 : return LOG_EOF;
415 : }
416 122703 : if (cands) {
417 : /* This const range actually represents a segment of candidates corresponding to updated bat entries */
418 :
419 28209 : if (BATcount(*cands) == 0 || lg->flushing) {
420 : /* when flushing, we only need the offset and count of the last segment of inserts. */
421 28209 : assert((*cands)->ttype == TYPE_void);
422 28209 : BATtseqbase(*cands, (oid) offset);
423 28209 : BATsetcount(*cands, (BUN) nr);
424 0 : } else if (!lg->flushing) {
425 0 : assert(BATcount(*cands) > 0);
426 0 : BAT *dense = BATdense(0, (oid) offset, (BUN) nr);
427 0 : BAT *newcands = NULL;
428 0 : if (!dense) {
429 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
430 0 : res = LOG_ERR;
431 0 : } else if ((*cands)->ttype == TYPE_void) {
432 0 : if ((newcands = BATmergecand(*cands, dense))) {
433 0 : BBPreclaim(*cands);
434 0 : *cands = newcands;
435 : } else {
436 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
437 0 : res = LOG_ERR;
438 : }
439 : } else {
440 0 : assert((*cands)->ttype == TYPE_oid);
441 0 : assert(BATcount(*cands) > 0);
442 0 : if (BATappend(*cands, dense, NULL, true) != GDK_SUCCEED) {
443 0 : TRC_CRITICAL(GDK, "appending to bat failed\n");
444 0 : res = LOG_ERR;
445 : }
446 : }
447 0 : BBPreclaim(dense);
448 : }
449 :
450 : /* We have to read the value to update the read cursor */
451 28209 : size_t tlen = lg->rbufsize;
452 28209 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
453 28209 : if (t == NULL) {
454 0 : TRC_CRITICAL(GDK, "read failed\n");
455 0 : res = LOG_EOF;
456 : }
457 28209 : return res;
458 : }
459 : }
460 :
461 360887 : if (!lg->flushing) {
462 1397 : r = COLnew(0, tpe, (BUN) nr, PERSISTENT);
463 1397 : if (r == NULL) {
464 0 : if (uid)
465 0 : BBPreclaim(uid);
466 0 : return LOG_ERR;
467 : }
468 : }
469 :
470 360887 : if (l->flag == LOG_UPDATE_CONST) {
471 94494 : size_t tlen = lg->rbufsize;
472 94494 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
473 94494 : if (t == NULL) {
474 0 : TRC_CRITICAL(GDK, "read failed\n");
475 0 : res = LOG_EOF;
476 : } else {
477 94494 : lg->rbuf = t;
478 94494 : lg->rbufsize = tlen;
479 94494 : if (r) {
480 8189 : for (BUN p = 0; p < (BUN) nr; p++) {
481 7918 : if (BUNappend(r, t, true) != GDK_SUCCEED) {
482 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
483 0 : res = LOG_ERR;
484 : }
485 : }
486 : }
487 : }
488 266393 : } else if (l->flag == LOG_UPDATE_BULK) {
489 266375 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
490 0 : if (r)
491 0 : BBPreclaim(r);
492 0 : TRC_CRITICAL(GDK, "read failed\n");
493 0 : return LOG_EOF;
494 : }
495 266375 : if (tpe == TYPE_msk) {
496 0 : if (r) {
497 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
498 0 : BATsetcount(r, (BUN) nr);
499 : else {
500 0 : TRC_CRITICAL(GDK, "read failed\n");
501 0 : res = LOG_EOF;
502 : }
503 : } else {
504 0 : size_t tlen = lg->rbufsize / sizeof(int);
505 0 : size_t cnt = 0, snr = (size_t) nr;
506 0 : snr = (snr + 31) / 32;
507 0 : assert(tlen);
508 0 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
509 0 : cnt = snr > tlen ? tlen : snr;
510 0 : if (!mnstr_readIntArray(lg->input_log, lg->rbuf, cnt)) {
511 0 : TRC_CRITICAL(GDK, "read failed\n");
512 0 : res = LOG_EOF;
513 : }
514 : }
515 : }
516 : } else {
517 266375 : if (!ATOMvarsized(tpe)) {
518 246573 : size_t cnt = 0, snr = (size_t) nr;
519 246573 : size_t tlen = lg->rbufsize / ATOMsize(tpe), ntlen = lg->rbufsize;
520 246573 : assert(tlen);
521 : /* read in chunks of max
522 : * BUFSIZE/width rows */
523 493149 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
524 246576 : cnt = snr > tlen ? tlen : snr;
525 246576 : void *t = rt(lg->rbuf, &ntlen, lg->input_log, cnt);
526 :
527 246576 : if (t == NULL) {
528 : res = LOG_EOF;
529 : break;
530 : }
531 246576 : assert(t == lg->rbuf);
532 246576 : if (r && BUNappendmulti(r, t, cnt, true) != GDK_SUCCEED) {
533 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
534 0 : res = LOG_ERR;
535 : }
536 : }
537 19802 : } else if (tpe == TYPE_str) {
538 : /* efficient string */
539 19219 : res = string_reader(lg, r, nr);
540 : } else {
541 1208 : for (; res == LOG_OK && nr > 0; nr--) {
542 625 : size_t tlen = lg->rbufsize;
543 625 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
544 :
545 625 : if (t == NULL) {
546 : /* see if failure was due to
547 : * malloc or something less
548 : * serious (in the current
549 : * context) */
550 0 : if (strstr(GDKerrbuf, "alloc") == NULL)
551 : res = LOG_EOF;
552 : else
553 0 : res = LOG_ERR;
554 0 : TRC_CRITICAL(GDK, "read failed\n");
555 : } else {
556 625 : lg->rbuf = t;
557 625 : lg->rbufsize = tlen;
558 625 : if (r && BUNappend(r, t, true) != GDK_SUCCEED) {
559 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
560 0 : res = LOG_ERR;
561 : }
562 : }
563 : }
564 : }
565 : }
566 : } else {
567 18 : void *(*rh)(ptr, size_t *, stream *, size_t) = BATatoms[TYPE_oid].atomRead;
568 18 : void *hv = ATOMnil(TYPE_oid);
569 18 : offset = 0;
570 :
571 18 : if (hv == NULL) {
572 0 : TRC_CRITICAL(GDK, "read failed\n");
573 0 : res = LOG_EOF;
574 : }
575 1071 : for (; res == LOG_OK && nr > 0; nr--) {
576 1053 : size_t hlen = sizeof(oid);
577 1053 : void *h = rh(hv, &hlen, lg->input_log, 1);
578 1053 : assert(hlen == sizeof(oid));
579 1053 : assert(h == hv);
580 1053 : if ((uid && BUNappend(uid, h, true) != GDK_SUCCEED)) {
581 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
582 0 : res = LOG_ERR;
583 : }
584 : }
585 18 : nr = pnr;
586 18 : if (tpe == TYPE_msk) {
587 0 : if (r) {
588 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
589 0 : BATsetcount(r, (BUN) nr);
590 : else {
591 0 : TRC_CRITICAL(GDK, "read failed\n");
592 0 : res = LOG_EOF;
593 : }
594 : } else {
595 0 : for (lng i = 0; i < nr; i += 32) {
596 0 : int v;
597 0 : switch (mnstr_readInt(lg->input_log, &v)) {
598 0 : case 1:
599 0 : continue;
600 : case 0:
601 : res = LOG_EOF;
602 : break;
603 : default:
604 : res = LOG_ERR;
605 : break;
606 : }
607 0 : TRC_CRITICAL(GDK, "read failed\n");
608 0 : break;
609 : }
610 : }
611 18 : } else if (tpe == TYPE_str) {
612 : /* efficient string */
613 6 : res = string_reader(lg, r, nr);
614 : } else {
615 58 : for (; res == LOG_OK && nr > 0; nr--) {
616 46 : size_t tlen = lg->rbufsize;
617 46 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
618 :
619 46 : if (t == NULL) {
620 0 : if (strstr(GDKerrbuf, "malloc") == NULL)
621 : res = LOG_EOF;
622 : else
623 0 : res = LOG_ERR;
624 0 : TRC_CRITICAL(GDK, "read failed\n");
625 : } else {
626 46 : lg->rbuf = t;
627 46 : lg->rbufsize = tlen;
628 46 : if ((r && BUNappend(r, t, true) != GDK_SUCCEED)) {
629 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
630 0 : res = LOG_ERR;
631 : }
632 : }
633 : }
634 : }
635 18 : GDKfree(hv);
636 : }
637 :
638 360887 : if (res == LOG_OK) {
639 360887 : if (tr_grow(tr) == GDK_SUCCEED) {
640 360887 : tr->changes[tr->nr].type = l->flag;
641 360887 : if (l->flag == LOG_UPDATE_BULK && offset == -1) {
642 138983 : assert(cands); /* bat r is part of a group of bats logged together. */
643 138983 : struct canditer ci;
644 138983 : canditer_init(&ci, NULL, *cands);
645 138983 : const oid first = canditer_peek(&ci);
646 138983 : const oid last = canditer_last(&ci);
647 138983 : offset = (lng) first;
648 138983 : pnr = (lng) (last - first) + 1;
649 138983 : if (!lg->flushing) {
650 1022 : assert(uid == NULL);
651 1022 : uid = *cands;
652 1022 : BBPfix((*cands)->batCacheid);
653 1022 : tr->changes[tr->nr].type = LOG_UPDATE;
654 : }
655 : }
656 360887 : if (l->flag == LOG_UPDATE_CONST) {
657 94494 : assert(!cands); /* TODO: This might change in the future. */
658 94494 : tr->changes[tr->nr].type = LOG_UPDATE_BULK;
659 : }
660 360887 : tr->changes[tr->nr].nr = pnr;
661 360887 : tr->changes[tr->nr].tt = tpe;
662 360887 : tr->changes[tr->nr].cid = id;
663 360887 : tr->changes[tr->nr].offset = offset;
664 360887 : tr->changes[tr->nr].b = r;
665 360887 : tr->changes[tr->nr].uid = uid;
666 360887 : tr->nr++;
667 : } else {
668 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
669 0 : res = LOG_ERR;
670 : }
671 : }
672 360887 : if (res != LOG_OK) {
673 0 : if (r)
674 0 : BBPreclaim(r);
675 0 : if (cands && uid)
676 0 : BBPunfix((*cands)->batCacheid);
677 0 : else if (uid)
678 0 : BBPreclaim(uid);
679 : }
680 : } else {
681 : /* bat missing ERROR or ignore ? currently error. */
682 0 : TRC_CRITICAL(GDK, "unknown type\n");
683 0 : res = LOG_ERR;
684 : }
685 : return res;
686 : }
687 :
688 :
689 : static gdk_return
690 595696 : la_bat_update_count(logger *lg, log_id id, lng cnt, int tid)
691 : {
692 595696 : BATiter cni = bat_iterator_nolock(lg->catalog_id);
693 :
694 595696 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
695 595696 : MT_rwlock_rdlock(&cni.b->thashlock);
696 595696 : BUN p, cp = BUN_NONE;
697 :
698 2040770 : HASHloop_int(cni, cni.b->thash, p, &id) {
699 1024144 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
700 :
701 1024144 : if (lid != lng_nil && lid <= tid)
702 : break;
703 : cp = p;
704 : }
705 595696 : if (cp != BUN_NONE) {
706 595494 : lng ocnt = *(lng *) Tloc(lg->catalog_cnt, cp);
707 595494 : assert(lg->catalog_cnt->hseqbase == 0);
708 595494 : if (ocnt < cnt && BUNreplace(lg->catalog_cnt, cp, &cnt, false) != GDK_SUCCEED) {
709 0 : MT_rwlock_rdunlock(&cni.b->thashlock);
710 0 : return GDK_FAIL;
711 : }
712 : }
713 595696 : MT_rwlock_rdunlock(&cni.b->thashlock);
714 595696 : return GDK_SUCCEED;
715 : }
716 : return GDK_FAIL;
717 : }
718 :
719 : static gdk_return
720 360887 : la_bat_updates(logger *lg, logaction *la, int tid)
721 : {
722 360887 : log_bid bid = internal_find_bat(lg, la->cid, tid);
723 360887 : BAT *b = NULL;
724 :
725 360887 : if (bid < 0)
726 : return GDK_FAIL;
727 360887 : if (!bid) {
728 : /* object already gone, nothing needed */
729 : return GDK_SUCCEED;
730 : }
731 :
732 360887 : if (!lg->flushing) {
733 1397 : b = BATdescriptor(bid);
734 1397 : if (b == NULL)
735 : return GDK_FAIL;
736 : }
737 360887 : BUN cnt = 0;
738 360887 : if (la->type == LOG_UPDATE_BULK) {
739 359847 : if (!lg->flushing) {
740 375 : cnt = BATcount(b);
741 375 : int is_msk = (b->ttype == TYPE_msk);
742 : /* handle offset 0 ie clear */
743 375 : if ( /* DISABLES CODE */ (0) && la->offset == 0 && cnt)
744 : BATclear(b, true);
745 : /* handle offset */
746 375 : if (cnt <= (BUN) la->offset) {
747 234 : msk t = 1;
748 234 : if (cnt < (BUN) la->offset) { /* insert nils */
749 0 : const void *tv = (is_msk) ? &t : ATOMnilptr(b->ttype);
750 0 : lng i, d = la->offset - BATcount(b);
751 0 : for (i = 0; i < d; i++) {
752 0 : if (BUNappend(b, tv, true) != GDK_SUCCEED) {
753 0 : logbat_destroy(b);
754 0 : return GDK_FAIL;
755 : }
756 : }
757 : }
758 234 : if (BATcount(b) == (BUN) la->offset && BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
759 0 : logbat_destroy(b);
760 0 : return GDK_FAIL;
761 : }
762 : } else {
763 141 : BATiter vi = bat_iterator(la->b);
764 141 : BUN p, q;
765 :
766 2202 : for (p = 0, q = (BUN) la->offset; p < (BUN) la->nr; p++, q++) {
767 2061 : const void *t = BUNtail(vi, p);
768 :
769 2061 : if (q < cnt) {
770 2060 : if (b->tnosorted == q ||
771 2053 : b->tnosorted == q + 1)
772 7 : b->tnosorted = 0;
773 2060 : if (b->tnorevsorted == q ||
774 2053 : b->tnorevsorted == q + 1)
775 7 : b->tnorevsorted = 0;
776 2060 : if (b->tnokey[0] == q ||
777 2053 : b->tnokey[1] == q) {
778 7 : b->tnokey[0] = 0;
779 7 : b->tnokey[1] = 0;
780 : }
781 2060 : if (b->tminpos == q)
782 5 : b->tminpos = BUN_NONE;
783 2060 : if (b->tmaxpos == q)
784 3 : b->tmaxpos = BUN_NONE;
785 2060 : b->tkey = false;
786 2060 : b->tsorted = false;
787 2060 : b->tkey = false;
788 2060 : if (BUNreplace(b, q, t, true) != GDK_SUCCEED) {
789 0 : logbat_destroy(b);
790 0 : bat_iterator_end(&vi);
791 0 : return GDK_FAIL;
792 : }
793 : } else {
794 1 : if (BUNappend(b, t, true) != GDK_SUCCEED) {
795 0 : logbat_destroy(b);
796 0 : bat_iterator_end(&vi);
797 0 : return GDK_FAIL;
798 : }
799 : }
800 : }
801 141 : bat_iterator_end(&vi);
802 : }
803 : }
804 1040 : } else if (la->type == LOG_UPDATE) {
805 1040 : if (!lg->flushing && BATupdate(b, la->uid, la->b, true) != GDK_SUCCEED) {
806 : return GDK_FAIL;
807 : }
808 : }
809 360887 : cnt = (BUN) (la->offset + la->nr);
810 360887 : if (la_bat_update_count(lg, la->cid, cnt, tid) != GDK_SUCCEED) {
811 0 : if (b)
812 0 : logbat_destroy(b);
813 0 : return GDK_FAIL;
814 : }
815 360887 : if (b)
816 1397 : logbat_destroy(b);
817 : return GDK_SUCCEED;
818 : }
819 :
820 : static log_return
821 9407 : log_read_destroy(logger *lg, trans *tr, log_id id)
822 : {
823 9407 : (void) lg;
824 9407 : assert(!lg->inmemory);
825 9407 : if (tr_grow(tr) == GDK_SUCCEED) {
826 9407 : tr->changes[tr->nr].type = LOG_DESTROY;
827 9407 : tr->changes[tr->nr].cid = id;
828 9407 : tr->nr++;
829 9407 : return LOG_OK;
830 : }
831 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
832 0 : return LOG_ERR;
833 : }
834 :
835 : static gdk_return
836 48 : la_bat_destroy(logger *lg, logaction *la, int tid)
837 : {
838 48 : log_bid bid = internal_find_bat(lg, la->cid, tid);
839 :
840 48 : if (bid < 0)
841 : return GDK_FAIL;
842 48 : if (!bid) {
843 : #ifndef NDEBUG
844 0 : GDKwarning("failed to find bid for object %d\n", la->cid);
845 : #endif
846 0 : return GDK_SUCCEED;
847 : }
848 48 : if (bid && log_del_bat(lg, bid) != GDK_SUCCEED)
849 : return GDK_FAIL;
850 : return GDK_SUCCEED;
851 : }
852 :
853 : static log_return
854 154662 : log_read_create(logger *lg, trans *tr, log_id id)
855 : {
856 154662 : bte tt;
857 154662 : int tpe;
858 :
859 154662 : assert(!lg->inmemory);
860 154662 : TRC_DEBUG(WAL, "create %d", id);
861 :
862 154662 : if (mnstr_read(lg->input_log, &tt, 1, 1) != 1) {
863 0 : TRC_CRITICAL(GDK, "read failed\n");
864 0 : return LOG_EOF;
865 : }
866 :
867 154662 : tpe = find_type_nr(lg, tt);
868 : /* read create */
869 154662 : if (tr_grow(tr) == GDK_SUCCEED) {
870 154662 : tr->changes[tr->nr].type = LOG_CREATE;
871 154662 : tr->changes[tr->nr].tt = tpe;
872 154662 : tr->changes[tr->nr].cid = id;
873 154662 : tr->nr++;
874 154662 : return LOG_OK;
875 : }
876 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
877 0 : return LOG_ERR;
878 : }
879 :
880 : static gdk_return
881 264 : la_bat_create(logger *lg, logaction *la, int tid)
882 : {
883 264 : BAT *b;
884 :
885 : /* formerly head column type, should be void */
886 264 : if ((b = COLnew(0, la->tt, BATSIZE, PERSISTENT)) == NULL)
887 : return GDK_FAIL;
888 :
889 264 : if (la->tt < 0)
890 0 : BATtseqbase(b, 0);
891 :
892 528 : if ((b = BATsetaccess(b, BAT_READ)) == NULL ||
893 264 : log_add_bat(lg, b, la->cid, tid) != GDK_SUCCEED) {
894 0 : logbat_destroy(b);
895 0 : return GDK_FAIL;
896 : }
897 264 : logbat_destroy(b);
898 264 : return GDK_SUCCEED;
899 : }
900 :
901 : static gdk_return
902 245 : log_write_new_types(logger *lg, FILE *fp)
903 : {
904 245 : bte id = 0;
905 :
906 : /* write types and insert into bats */
907 245 : memset(lg->type_id, -1, sizeof(lg->type_id));
908 245 : memset(lg->type_nr, 255, sizeof(lg->type_nr));
909 : /* first the fixed sized types */
910 6858 : for (int i = 0; i < GDKatomcnt; i++) {
911 6613 : if (ATOMvarsized(i))
912 1469 : continue;
913 5144 : lg->type_id[i] = id;
914 5144 : lg->type_nr[id] = i;
915 5144 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
916 : return GDK_FAIL;
917 5144 : id++;
918 : }
919 : /* second the var sized types */
920 : id = -127; /* start after nil */
921 6858 : for (int i = 0; i < GDKatomcnt; i++) {
922 6613 : if (!ATOMvarsized(i))
923 5144 : continue;
924 1469 : lg->type_id[i] = id;
925 1469 : lg->type_nr[256 + id] = i;
926 1469 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
927 : return GDK_FAIL;
928 1469 : id++;
929 : }
930 : return GDK_SUCCEED;
931 : }
932 :
933 : #define TR_SIZE 1024
934 :
935 : static trans *
936 56234 : tr_create(trans *tr, int tid)
937 : {
938 56234 : trans *ntr = GDKmalloc(sizeof(trans));
939 :
940 56234 : if (ntr == NULL)
941 : return NULL;
942 56234 : ntr->tid = tid;
943 56234 : ntr->sz = TR_SIZE;
944 56234 : ntr->nr = 0;
945 56234 : ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
946 56234 : if (ntr->changes == NULL) {
947 0 : GDKfree(ntr);
948 0 : return NULL;
949 : }
950 56234 : ntr->tr = tr;
951 56234 : return ntr;
952 : }
953 :
954 : static gdk_return
955 524956 : la_apply(logger *lg, logaction *c, int tid)
956 : {
957 524956 : gdk_return ret = GDK_SUCCEED;
958 :
959 524956 : switch (c->type) {
960 360887 : case LOG_UPDATE_BULK:
961 : case LOG_UPDATE:
962 360887 : ret = la_bat_updates(lg, c, tid);
963 360887 : break;
964 154662 : case LOG_CREATE:
965 154662 : if (!lg->flushing)
966 264 : ret = la_bat_create(lg, c, tid);
967 : break;
968 9407 : case LOG_DESTROY:
969 9407 : if (!lg->flushing)
970 48 : ret = la_bat_destroy(lg, c, tid);
971 : break;
972 : default:
973 0 : MT_UNREACHABLE();
974 : }
975 524956 : return ret;
976 : }
977 :
978 : static void
979 524956 : la_destroy(logaction *c)
980 : {
981 524956 : if ((c->type == LOG_UPDATE || c->type == LOG_UPDATE_BULK) && c->b)
982 1397 : logbat_destroy(c->b);
983 524956 : if (c->type == LOG_UPDATE && c->uid)
984 1022 : logbat_destroy(c->uid);
985 524956 : }
986 :
987 : static gdk_return
988 524956 : tr_grow(trans *tr)
989 : {
990 524956 : if (tr->nr == tr->sz) {
991 7 : logaction *changes;
992 7 : tr->sz <<= 1;
993 7 : changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
994 7 : if (changes == NULL)
995 : return GDK_FAIL;
996 7 : tr->changes = changes;
997 : }
998 : /* cleanup the next */
999 524956 : tr->changes[tr->nr].b = NULL;
1000 524956 : return GDK_SUCCEED;
1001 : }
1002 :
1003 : static trans *
1004 56234 : tr_destroy(trans *tr)
1005 : {
1006 56234 : trans *r = tr->tr;
1007 :
1008 56234 : GDKfree(tr->changes);
1009 56234 : GDKfree(tr);
1010 56234 : return r;
1011 : }
1012 :
1013 : static trans *
1014 0 : tr_abort_(logger *lg, trans *tr, int s)
1015 : {
1016 0 : int i;
1017 :
1018 0 : (void) lg;
1019 :
1020 0 : TRC_DEBUG(WAL, "abort");
1021 :
1022 0 : for (i = s; i < tr->nr; i++)
1023 0 : la_destroy(&tr->changes[i]);
1024 0 : return tr_destroy(tr);
1025 : }
1026 :
1027 : static trans *
1028 0 : tr_abort(logger *lg, trans *tr)
1029 : {
1030 0 : return tr_abort_(lg, tr, 0);
1031 : }
1032 :
1033 : static trans *
1034 56234 : tr_commit(logger *lg, trans *tr)
1035 : {
1036 56234 : int i;
1037 :
1038 56234 : TRC_DEBUG(WAL, "commit");
1039 :
1040 581190 : for (i = 0; i < tr->nr; i++) {
1041 524956 : if (la_apply(lg, &tr->changes[i], tr->tid) != GDK_SUCCEED) {
1042 0 : TRC_CRITICAL(GDK, "aborting transaction\n");
1043 0 : do {
1044 0 : tr = tr_abort_(lg, tr, i);
1045 0 : } while (tr != NULL);
1046 : return (trans *) -1;
1047 : }
1048 524956 : la_destroy(&tr->changes[i]);
1049 : }
1050 56234 : lg->saved_tid = tr->tid;
1051 56234 : return tr_destroy(tr);
1052 : }
1053 :
1054 : static gdk_return
1055 127 : log_read_types_file(logger *lg, FILE *fp, int version, bool *needsnew)
1056 : {
1057 127 : int id = 0;
1058 127 : char atom_name[IDLENGTH];
1059 127 : bool seen_geom = false;
1060 :
1061 : /* scanf should use IDLENGTH somehow */
1062 3584 : while (fscanf(fp, "%d,%63s\n", &id, atom_name) == 2) {
1063 3457 : if (version < 52303 && strcmp(atom_name, "BAT") == 0) {
1064 8 : *needsnew = true;
1065 8 : continue;
1066 : }
1067 3449 : if (version < 52304 && strcmp(atom_name, "color") == 0) {
1068 16 : *needsnew = true;
1069 16 : continue;
1070 : }
1071 456 : if (version < 52304 && strcmp(atom_name, "identifier") == 0) {
1072 16 : *needsnew = true;
1073 16 : continue;
1074 : }
1075 3417 : if (version < 52304 && strcmp(atom_name, "wkba") == 0) {
1076 16 : *needsnew = true;
1077 16 : continue;
1078 : }
1079 3401 : int i = ATOMindex(atom_name);
1080 :
1081 3401 : if (id < -127 || id > 127 || i < 0) {
1082 0 : GDKerror("unknown type in log file '%s'\n", atom_name);
1083 0 : return GDK_FAIL;
1084 : }
1085 3401 : seen_geom |= strcmp(atom_name, "mbr") == 0 || strcmp(atom_name, "wkb") == 0;
1086 3401 : lg->type_id[i] = (int8_t) id;
1087 3401 : lg->type_nr[id < 0 ? 256 + id : id] = i;
1088 : }
1089 : #ifdef HAVE_GEOM
1090 127 : if (!seen_geom && ATOMindex("mbr") > 0) {
1091 0 : GDKerror("incompatible database: server supports GEOM, but database does not\n");
1092 0 : return GDK_FAIL;
1093 : }
1094 : #endif
1095 : (void) seen_geom;
1096 : return GDK_SUCCEED;
1097 : }
1098 :
1099 :
1100 : static gdk_return
1101 245 : log_create_types_file(logger *lg, const char *filename)
1102 : {
1103 245 : FILE *fp;
1104 :
1105 245 : if ((fp = MT_fopen(filename, "w")) == NULL) {
1106 0 : GDKerror("cannot create log file %s\n", filename);
1107 0 : return GDK_FAIL;
1108 : }
1109 245 : if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
1110 0 : fclose(fp);
1111 0 : GDKerror("writing log file %s failed", filename);
1112 0 : if (MT_remove(filename) < 0)
1113 0 : GDKsyserror("remove %s failed\n", filename);
1114 0 : return GDK_FAIL;
1115 : }
1116 :
1117 245 : if (log_write_new_types(lg, fp) != GDK_SUCCEED) {
1118 0 : fclose(fp);
1119 0 : GDKerror("writing log file %s failed", filename);
1120 0 : if (MT_remove(filename) < 0)
1121 0 : GDKsyserror("remove %s failed\n", filename);
1122 0 : return GDK_FAIL;
1123 : }
1124 245 : if (fflush(fp) < 0 || (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)
1125 : #if defined(_MSC_VER)
1126 : && _commit(_fileno(fp)) < 0
1127 : #elif defined(HAVE_FDATASYNC)
1128 0 : && fdatasync(fileno(fp)) < 0
1129 : #elif defined(HAVE_FSYNC)
1130 : && fsync(fileno(fp)) < 0
1131 : #endif
1132 : )) {
1133 0 : GDKsyserror("flushing log file %s failed", filename);
1134 0 : fclose(fp);
1135 0 : if (MT_remove(filename) < 0)
1136 0 : GDKsyserror("remove %s failed\n", filename);
1137 0 : return GDK_FAIL;
1138 : }
1139 245 : if (fclose(fp) < 0) {
1140 0 : GDKsyserror("closing log file %s failed", filename);
1141 0 : if (MT_remove(filename) < 0)
1142 0 : GDKsyserror("remove %s failed\n", filename);
1143 0 : return GDK_FAIL;
1144 : }
1145 : return GDK_SUCCEED;
1146 : }
1147 :
1148 : #define rotation_lock(lg) MT_lock_set(&(lg)->rotation_lock)
1149 : #define rotation_unlock(lg) MT_lock_unset(&(lg)->rotation_lock)
1150 : #define rotation_trylock(lg, ms) MT_lock_trytime(&(lg)->rotation_lock, ms)
1151 :
1152 : static gdk_return
1153 13134 : log_open_output(logger *lg)
1154 : {
1155 13134 : logged_range *new_range = (logged_range *) GDKmalloc(sizeof(logged_range));
1156 :
1157 13134 : if (!new_range) {
1158 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1159 0 : return GDK_FAIL;
1160 : }
1161 26267 : if (!LOG_DISABLED(lg)) {
1162 13133 : char id[32];
1163 13133 : char filename[MAXPATH];
1164 :
1165 13133 : if (snprintf(id, sizeof(id), LLFMT, lg->id) >= (int) sizeof(id)) {
1166 0 : TRC_CRITICAL(GDK, "filename is too large\n");
1167 0 : GDKfree(new_range);
1168 0 : return GDK_FAIL;
1169 : }
1170 13133 : if (GDKfilepath(filename, sizeof(filename), BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id) != GDK_SUCCEED) {
1171 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1172 0 : GDKfree(new_range);
1173 0 : return GDK_FAIL;
1174 : }
1175 :
1176 13133 : TRC_INFO(WAL, "opening %s.%s", LOGFILE, id);
1177 13133 : new_range->output_log = open_wstream(filename);
1178 13133 : if (new_range->output_log) {
1179 13133 : short byteorder = 1234;
1180 13133 : mnstr_write(new_range->output_log, &byteorder, sizeof(byteorder), 1);
1181 : }
1182 :
1183 13133 : if (new_range->output_log == NULL || mnstr_errnr(new_range->output_log) != MNSTR_NO__ERROR) {
1184 0 : TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, mnstr_peek_error(NULL));
1185 0 : close_stream(new_range->output_log);
1186 0 : GDKfree(new_range);
1187 0 : return GDK_FAIL;
1188 : }
1189 : } else {
1190 1 : new_range->output_log = NULL;
1191 : }
1192 13134 : ATOMIC_INIT(&new_range->refcount, 1);
1193 13134 : ATOMIC_INIT(&new_range->last_ts, 0);
1194 13134 : ATOMIC_INIT(&new_range->flushed_ts, 0);
1195 13134 : ATOMIC_INIT(&new_range->drops, 0);
1196 13134 : new_range->id = lg->id;
1197 13134 : new_range->next = NULL;
1198 13134 : logged_range *current = lg->current;
1199 13134 : assert(current && current->next == NULL);
1200 13134 : new_range->cnt = current->cnt;
1201 13134 : current->next = new_range;
1202 13134 : lg->file_age = GDKusec();
1203 13134 : return GDK_SUCCEED;
1204 : }
1205 :
1206 : static inline void
1207 13373 : log_close_input(logger *lg)
1208 : {
1209 13373 : if (!lg->inmemory && lg->input_log) {
1210 12890 : TRC_INFO(WAL, "closing input log %s", mnstr_name(lg->input_log));
1211 12890 : close_stream(lg->input_log);
1212 : }
1213 13373 : lg->input_log = NULL;
1214 13373 : }
1215 :
1216 : static inline void
1217 356 : log_close_output(logger *lg)
1218 : {
1219 356 : if (!LOG_DISABLED(lg) && lg->current->output_log) {
1220 355 : TRC_INFO(WAL, "closing output log %s", mnstr_name(lg->current->output_log));
1221 355 : close_stream(lg->current->output_log);
1222 : }
1223 356 : lg->current->output_log = NULL;
1224 356 : }
1225 :
1226 : static gdk_return
1227 13017 : log_open_input(logger *lg, const char *filename, bool *filemissing)
1228 : {
1229 13017 : TRC_INFO(WAL, "opening input log %s", filename);
1230 13017 : lg->input_log = open_rstream(filename);
1231 :
1232 : /* if the file doesn't exist, there is nothing to be read back */
1233 13017 : if (lg->input_log == NULL || mnstr_errnr(lg->input_log) != MNSTR_NO__ERROR) {
1234 127 : log_close_input(lg);
1235 127 : *filemissing = true;
1236 127 : return GDK_SUCCEED;
1237 : }
1238 12890 : short byteorder;
1239 12890 : switch (mnstr_read(lg->input_log, &byteorder, sizeof(byteorder), 1)) {
1240 0 : case -1:
1241 0 : log_close_input(lg);
1242 0 : TRC_CRITICAL(GDK, "read failed\n");
1243 0 : return GDK_FAIL;
1244 5 : case 0:
1245 : /* empty file is ok */
1246 5 : log_close_input(lg);
1247 5 : return GDK_SUCCEED;
1248 12885 : case 1:
1249 : /* if not empty, must start with correct byte order mark */
1250 12885 : if (byteorder != 1234) {
1251 0 : TRC_CRITICAL(GDK, "incorrect byte order word in file %s\n", filename);
1252 0 : log_close_input(lg);
1253 0 : return GDK_FAIL;
1254 : }
1255 : break;
1256 : }
1257 : return GDK_SUCCEED;
1258 : }
1259 :
1260 : static log_return
1261 12885 : log_read_transaction(logger *lg, uint32_t *updated, BUN maxupdated)
1262 : {
1263 12885 : logformat l;
1264 12885 : trans *tr = NULL;
1265 12885 : log_return err = LOG_OK;
1266 12885 : bool ok = true;
1267 12885 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
1268 :
1269 12885 : (void) maxupdated; /* only used inside assert() */
1270 :
1271 12885 : if (!lg->flushing)
1272 202 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
1273 :
1274 12885 : BAT *cands = NULL; /* used in case of LOG_BAT_GROUP */
1275 :
1276 756669 : while (err == LOG_OK && (ok = log_read_format(lg, &l))) {
1277 743784 : if (l.flag == 0 && l.id == 0) {
1278 12885 : err = LOG_EOF;
1279 : break;
1280 : }
1281 :
1282 743784 : TRC_DEBUG_IF(WAL) {
1283 0 : if (l.flag > 0 && l.flag != LOG_CLEAR &&
1284 : l.flag < (bte) (sizeof(log_commands) / sizeof(log_commands[0])))
1285 0 : TRC_DEBUG_ENDIF(WAL, "%s %d", log_commands[(int) l.flag], l.id);
1286 : else
1287 0 : TRC_DEBUG_ENDIF(WAL, "%d %d", l.flag, l.id);
1288 : }
1289 743784 : switch (l.flag) {
1290 553165 : case LOG_UPDATE_CONST:
1291 : case LOG_UPDATE_BULK:
1292 : case LOG_UPDATE:
1293 : case LOG_CREATE:
1294 : case LOG_DESTROY:
1295 553165 : if (tr != NULL && updated && BAThash(lg->catalog_id) == GDK_SUCCEED) {
1296 551280 : BATiter cni = bat_iterator(lg->catalog_id);
1297 551280 : BUN p;
1298 551280 : BUN posnew = BUN_NONE;
1299 551280 : BUN posold = BUN_NONE;
1300 551280 : MT_rwlock_rdlock(&cni.b->thashlock);
1301 3047648 : HASHloop_int(cni, cni.b->thash, p, &l.id) {
1302 1932477 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
1303 1932477 : if (lid == lng_nil || lid > tr->tid)
1304 : posnew = p;
1305 843337 : else if (lid == tr->tid)
1306 2343074 : posold = p;
1307 : }
1308 551280 : MT_rwlock_rdunlock(&cni.b->thashlock);
1309 551280 : bat_iterator_end(&cni);
1310 : /* Normally at this point, posnew is the
1311 : * location of the bat that this
1312 : * transaction is working on, and posold
1313 : * is the location of the previous
1314 : * version of the bat. If LOG_CREATE,
1315 : * both are relevant, since the latter
1316 : * is the new bat, and the former is the
1317 : * to-be-destroyed bat. For
1318 : * LOG_DESTROY, only posnew should be
1319 : * relevant, but for the other types, if
1320 : * the table is destroyed later in the
1321 : * same transaction, we need posold, and
1322 : * else (the normal case) we need
1323 : * posnew. */
1324 551280 : if (posnew != BUN_NONE) {
1325 541921 : assert(posnew < maxupdated);
1326 541921 : updated[posnew / 32] |= 1U << (posnew % 32);
1327 : }
1328 551280 : if ((l.flag == LOG_CREATE || posnew == BUN_NONE) && posold != BUN_NONE) {
1329 160477 : assert(posold < maxupdated);
1330 160477 : updated[posold / 32] |= 1U << (posold % 32);
1331 : }
1332 : }
1333 : break;
1334 : default:
1335 : /* do nothing */
1336 : break;
1337 : }
1338 : /* the functions we call here can succeed (LOG_OK),
1339 : * but they can also fail for two different reasons:
1340 : * they can run out of input (LOG_EOF -- this is not
1341 : * serious, we just abort the remaining transactions),
1342 : * or some malloc or BAT update fails (LOG_ERR -- this
1343 : * is serious, we must abort the complete process);
1344 : * the latter failure causes the current function to
1345 : * return GDK_FAIL */
1346 743784 : switch (l.flag) {
1347 56234 : case LOG_START:
1348 56234 : assert(!lg->flushing || l.id <= lg->tid);
1349 56234 : if (!lg->flushing && l.id > lg->tid)
1350 127 : lg->tid = l.id; /* should only happen during initialization */
1351 56234 : if ((tr = tr_create(tr, l.id)) == NULL) {
1352 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
1353 0 : err = LOG_ERR;
1354 0 : break;
1355 : }
1356 56234 : TRC_DEBUG(WAL, "tstart %d\n", tr->tid);
1357 : break;
1358 56234 : case LOG_END:
1359 56234 : if (tr == NULL)
1360 : err = LOG_EOF;
1361 56234 : else if (tr->tid != l.id) /* abort record */
1362 0 : tr = tr_abort(lg, tr);
1363 : else
1364 56234 : tr = tr_commit(lg, tr);
1365 : break;
1366 2773 : case LOG_SEQ:
1367 2773 : err = log_read_seq(lg, &l);
1368 2773 : break;
1369 389096 : case LOG_UPDATE_CONST:
1370 : case LOG_UPDATE_BULK:
1371 : case LOG_UPDATE:
1372 389096 : if (tr == NULL)
1373 : err = LOG_EOF;
1374 : else {
1375 610985 : err = log_read_updates(lg, tr, &l, l.id, cands ? &cands : NULL);
1376 : }
1377 : break;
1378 154662 : case LOG_CREATE:
1379 154662 : if (tr == NULL)
1380 : err = LOG_EOF;
1381 : else
1382 154662 : err = log_read_create(lg, tr, l.id);
1383 : break;
1384 9407 : case LOG_DESTROY:
1385 9407 : if (tr == NULL)
1386 : err = LOG_EOF;
1387 : else
1388 9407 : err = log_read_destroy(lg, tr, l.id);
1389 : break;
1390 75378 : case LOG_BAT_GROUP:
1391 75378 : if (tr == NULL)
1392 : err = LOG_EOF;
1393 : else {
1394 75378 : if (l.id > 0) {
1395 : /* START OF LOG_BAT_GROUP */
1396 37689 : cands = COLnew(0, TYPE_void, 0, SYSTRANS);
1397 37689 : if (!cands) {
1398 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
1399 0 : err = LOG_ERR;
1400 : }
1401 37689 : } else if (cands == NULL) {
1402 : /* should have gone through the
1403 : * above option earlier */
1404 0 : TRC_CRITICAL(GDK, "unexpected error\n");
1405 0 : err = LOG_ERR;
1406 : } else {
1407 : /* END OF LOG_BAT_GROUP */
1408 37689 : BBPunfix(cands->batCacheid);
1409 37689 : cands = NULL;
1410 : }
1411 : }
1412 : break;
1413 0 : default:
1414 0 : TRC_CRITICAL(GDK, "unrecognized log entry %d", l.flag);
1415 0 : err = LOG_ERR;
1416 : }
1417 743784 : if (tr == (trans *) -1) {
1418 : /* message already generated by tr_commit */
1419 : err = LOG_ERR;
1420 12885 : tr = NULL;
1421 : break;
1422 : }
1423 : }
1424 12885 : while (tr) {
1425 0 : TRC_WARNING(GDK, "aborting transaction\n");
1426 0 : tr = tr_abort(lg, tr);
1427 : }
1428 12885 : if (!lg->flushing)
1429 202 : ATOMIC_SET(&GDKdebug, dbg);
1430 :
1431 12885 : BBPreclaim(cands);
1432 12885 : if (!ok)
1433 12885 : return LOG_EOF;
1434 : return err;
1435 : }
1436 :
1437 : static gdk_return
1438 334 : log_readlog(logger *lg, const char *filename, bool *filemissing)
1439 : {
1440 334 : log_return err = LOG_OK;
1441 334 : time_t t0, t1;
1442 334 : struct stat sb;
1443 :
1444 334 : assert(!lg->inmemory);
1445 :
1446 334 : TRC_INFO(WAL, "opening %s\n", filename);
1447 :
1448 334 : gdk_return res = log_open_input(lg, filename, filemissing);
1449 334 : if (!lg->input_log || res != GDK_SUCCEED)
1450 : return res;
1451 202 : int fd;
1452 202 : if ((fd = getFileNo(lg->input_log)) < 0 || fstat(fd, &sb) < 0) {
1453 0 : GDKsyserror("fstat on opened file %s failed\n", filename);
1454 0 : log_close_input(lg);
1455 : /* If the file could be opened, but fstat fails,
1456 : * something weird is going on */
1457 0 : return GDK_FAIL;
1458 : }
1459 202 : t0 = time(NULL);
1460 202 : TRC_INFO_IF(WAL) {
1461 0 : TRC_INFO_ENDIF(WAL, "Start reading the write-ahead log '%s'\n", filename);
1462 0 : GDKtracer_flush_buffer();
1463 : }
1464 404 : while (err != LOG_EOF && err != LOG_ERR) {
1465 202 : t1 = time(NULL);
1466 202 : if (t1 - t0 > 10) {
1467 0 : lng fpos;
1468 0 : t0 = t1;
1469 : /* not more than once every 10 seconds */
1470 0 : fpos = (lng) getfilepos(getFile(lg->input_log));
1471 0 : TRC_INFO_IF(WAL) {
1472 0 : if (fpos >= 0) {
1473 0 : TRC_INFO_ENDIF(WAL, "still reading write-ahead log \"%s\" (%d%% done)\n",
1474 : filename, (int) ((fpos * 100 + 50) / sb.st_size));
1475 0 : GDKtracer_flush_buffer();
1476 : }
1477 : }
1478 : }
1479 202 : err = log_read_transaction(lg, NULL, 0);
1480 : }
1481 202 : log_close_input(lg);
1482 202 : lg->input_log = NULL;
1483 :
1484 : /* remaining transactions are not committed, ie abort */
1485 202 : TRC_INFO_IF(WAL) {
1486 0 : TRC_INFO_ENDIF(WAL, "Finished reading the write-ahead log '%s'\n", filename);
1487 0 : GDKtracer_flush_buffer();
1488 : }
1489 : /* we cannot distinguish errors from incomplete transactions
1490 : * (even if we would log aborts in the logs). So we simply
1491 : * abort and move to the next log file */
1492 202 : return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
1493 : }
1494 :
1495 : /*
1496 : * The log files are incrementally numbered, starting from 2. They are
1497 : * processed in the same sequence.
1498 : */
1499 : static gdk_return
1500 127 : log_readlogs(logger *lg, const char *filename)
1501 : {
1502 127 : gdk_return res = GDK_SUCCEED;
1503 :
1504 127 : assert(!lg->inmemory);
1505 127 : TRC_DEBUG(WAL, "logger id is " LLFMT " last logger id is " LLFMT "\n", lg->id, lg->saved_id);
1506 :
1507 127 : char log_filename[FILENAME_MAX];
1508 127 : if (lg->saved_id >= lg->id) {
1509 127 : bool filemissing = false;
1510 :
1511 127 : lg->id = lg->saved_id + 1;
1512 461 : while (res == GDK_SUCCEED && !filemissing) {
1513 334 : if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
1514 0 : GDKerror("Logger filename path is too large\n");
1515 0 : return GDK_FAIL;
1516 : }
1517 334 : res = log_readlog(lg, log_filename, &filemissing);
1518 334 : if (!filemissing) {
1519 207 : lg->saved_id++;
1520 207 : lg->id++;
1521 : }
1522 : }
1523 : }
1524 : return res;
1525 : }
1526 :
1527 : static gdk_return
1528 13112 : log_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1529 : {
1530 13112 : TRC_DEBUG(WAL, "commit");
1531 :
1532 13112 : return bm_commit(lg, pending, updated, maxupdated);
1533 : }
1534 :
1535 : static gdk_return
1536 127 : check_version(logger *lg, FILE *fp, bool *needsnew)
1537 : {
1538 127 : int version = 0;
1539 :
1540 127 : assert(!lg->inmemory);
1541 127 : if (fscanf(fp, "%6d", &version) != 1) {
1542 0 : GDKerror("Could not read the version number from the file '%s/log'.\n", lg->dir);
1543 0 : fclose(fp);
1544 0 : return GDK_FAIL;
1545 : }
1546 127 : if (version != lg->version) {
1547 32 : if (lg->prefuncp == NULL ||
1548 16 : (*lg->prefuncp) (lg->funcdata, version, lg->version) != GDK_SUCCEED) {
1549 0 : GDKerror("Incompatible database version %06d, "
1550 : "this server supports version %06d.\n%s",
1551 : version, lg->version,
1552 : version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
1553 0 : fclose(fp);
1554 0 : return GDK_FAIL;
1555 : }
1556 16 : *needsnew = true; /* we need to write a new log file */
1557 : } else {
1558 111 : lg->postfuncp = NULL; /* don't call */
1559 111 : *needsnew = false; /* log file already up-to-date */
1560 : }
1561 254 : if (fgetc(fp) != '\n' || /* skip \n */
1562 127 : fgetc(fp) != '\n') { /* skip \n */
1563 0 : GDKerror("Badly formatted log file");
1564 0 : fclose(fp);
1565 0 : return GDK_FAIL;
1566 : }
1567 127 : if (log_read_types_file(lg, fp, version, needsnew) != GDK_SUCCEED) {
1568 0 : fclose(fp);
1569 0 : return GDK_FAIL;
1570 : }
1571 127 : fclose(fp);
1572 127 : return GDK_SUCCEED;
1573 : }
1574 :
1575 : static BAT *
1576 357 : bm_tids(BAT *b, BAT *d)
1577 : {
1578 357 : BUN sz = BATcount(b);
1579 357 : BAT *tids = BATdense(0, 0, sz);
1580 :
1581 357 : if (tids == NULL)
1582 : return NULL;
1583 :
1584 357 : if (BATcount(d)) {
1585 357 : BAT *diff = BATdiff(tids, d, NULL, NULL, false, false, BUN_NONE);
1586 357 : logbat_destroy(tids);
1587 357 : tids = diff;
1588 : }
1589 : return tids;
1590 : }
1591 :
1592 :
1593 : static gdk_return
1594 10038 : log_switch_bat(BAT *old, BAT *new, const char *fn, const char *name)
1595 : {
1596 10038 : char bak[IDLENGTH];
1597 :
1598 10038 : if (BATmode(old, true) != GDK_SUCCEED) {
1599 0 : GDKerror("cannot convert old %s to transient", name);
1600 0 : return GDK_FAIL;
1601 : }
1602 10038 : if (strconcat_len(bak, sizeof(bak), fn, "_", name, NULL) >= sizeof(bak)) {
1603 0 : GDKerror("name %s_%s too long\n", fn, name);
1604 0 : return GDK_FAIL;
1605 : }
1606 10038 : if (BBPrename(old, NULL) != 0 || BBPrename(new, bak) != 0) {
1607 0 : GDKerror("rename (%s) failed\n", bak);
1608 0 : return GDK_FAIL;
1609 : }
1610 10038 : BBPretain(new->batCacheid);
1611 10038 : return GDK_SUCCEED;
1612 : }
1613 :
1614 : static gdk_return
1615 357 : bm_get_counts(logger *lg)
1616 : {
1617 357 : BUN p, q;
1618 357 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
1619 :
1620 33173 : BATloop(lg->catalog_bid, p, q) {
1621 32816 : oid pos = p;
1622 32816 : lng cnt = 0;
1623 32816 : lng lid = lng_nil;
1624 :
1625 32816 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
1626 32816 : BAT *b = BBPquickdesc(bids[p]);
1627 32816 : assert(b);
1628 32816 : cnt = BATcount(b);
1629 : } else {
1630 0 : lid = BBP_desc(bids[p])->batCacheid != 0 && log_find(lg->catalog_bid, lg->dcatalog, bids[p]) == BUN_NONE ? 1 : -1;
1631 : }
1632 32816 : if (BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED)
1633 0 : return GDK_FAIL;
1634 32816 : if (BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
1635 : return GDK_FAIL;
1636 : }
1637 : return GDK_SUCCEED;
1638 : }
1639 :
1640 : static int
1641 10038 : subcommit_list_add(int next, bat *n, BUN *sizes, bat bid, BUN sz)
1642 : {
1643 10038 : assert(sz <= BBP_desc(bid)->batCount || sz == BUN_NONE);
1644 2243970 : for (int i = 0; i < next; i++) {
1645 2233932 : if (n[i] == bid) {
1646 0 : sizes[i] = sz;
1647 0 : return next;
1648 : }
1649 : }
1650 10038 : n[next] = bid;
1651 10038 : sizes[next++] = sz;
1652 10038 : return next;
1653 : }
1654 :
1655 : static int
1656 3108 : cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng *cnts,
1657 : BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, BUN cleanup)
1658 : {
1659 3108 : BAT *nbids, *noids, *ncnts, *nlids, *ndels;
1660 3108 : BUN p, q;
1661 3108 : int err = 0, rcnt = 0;
1662 :
1663 3108 : BUN ocnt = BATcount(catalog_bid);
1664 3108 : nbids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1665 3108 : noids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1666 3108 : ncnts = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1667 3108 : nlids = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1668 3108 : ndels = logbat_new(TYPE_oid, BATcount(dcatalog) - cleanup, PERSISTENT);
1669 :
1670 3108 : if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == NULL || ndels == NULL) {
1671 0 : logbat_destroy(nbids);
1672 0 : logbat_destroy(noids);
1673 0 : logbat_destroy(ncnts);
1674 0 : logbat_destroy(nlids);
1675 0 : logbat_destroy(ndels);
1676 0 : return 0;
1677 : }
1678 :
1679 3108 : oid *poss = Tloc(dcatalog, 0);
1680 180389 : BATloop(dcatalog, p, q) {
1681 177281 : oid pos = poss[p];
1682 :
1683 177281 : if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid)
1684 0 : continue;
1685 :
1686 177281 : if (lids[pos] >= 0) {
1687 177281 : bat bid = bids[pos];
1688 177281 : BAT *lb = BBP_desc(bid);
1689 :
1690 177281 : if (lb->batCacheid == 0 || BATmode(lb, true /*transient */ ) != GDK_SUCCEED) {
1691 0 : GDKwarning("Failed to set bat(%d) transient\n", bid);
1692 : } else {
1693 177281 : lids[pos] = -1; /* mark as transient */
1694 177281 : r[rcnt++] = bid;
1695 : }
1696 : }
1697 : }
1698 :
1699 3108 : int *oids = (int *) Tloc(catalog_id, 0);
1700 3108 : q = BATcount(catalog_bid);
1701 1089323 : for (p = 0; p < q && !err; p++) {
1702 1086215 : bat col = bids[p];
1703 1086215 : int nid = oids[p];
1704 1086215 : lng lid = lids[p];
1705 1086215 : lng cnt = cnts[p];
1706 1086215 : oid pos = p;
1707 :
1708 : /* only project out the deleted with lid == -1
1709 : * update dcatalog */
1710 1086215 : if (lid == -1)
1711 177281 : continue; /* remove */
1712 :
1713 1817868 : if (BUNappend(nbids, &col, true) != GDK_SUCCEED ||
1714 1817868 : BUNappend(noids, &nid, true) != GDK_SUCCEED ||
1715 1817868 : BUNappend(nlids, &lid, false) != GDK_SUCCEED ||
1716 908934 : BUNappend(ncnts, &cnt, false) != GDK_SUCCEED)
1717 : err = 1;
1718 908934 : if (BUNfnd(lg->dcatalog, &pos) != BUN_NONE) {
1719 0 : pos = (oid) (BATcount(nbids) - 1);
1720 0 : if (BUNappend(ndels, &pos, true) != GDK_SUCCEED)
1721 908934 : err = 1;
1722 : }
1723 : }
1724 :
1725 3108 : if (err) {
1726 0 : logbat_destroy(nbids);
1727 0 : logbat_destroy(noids);
1728 0 : logbat_destroy(ndels);
1729 0 : logbat_destroy(ncnts);
1730 0 : logbat_destroy(nlids);
1731 0 : return 0;
1732 : }
1733 : /* point of no return */
1734 6216 : if (log_switch_bat(catalog_bid, nbids, lg->fn, "catalog_bid") != GDK_SUCCEED ||
1735 6216 : log_switch_bat(catalog_id, noids, lg->fn, "catalog_id") != GDK_SUCCEED ||
1736 3108 : log_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != GDK_SUCCEED) {
1737 0 : logbat_destroy(nbids);
1738 0 : logbat_destroy(noids);
1739 0 : logbat_destroy(ndels);
1740 0 : logbat_destroy(ncnts);
1741 0 : logbat_destroy(nlids);
1742 0 : return -1;
1743 : }
1744 3108 : r[rcnt++] = lg->catalog_bid->batCacheid;
1745 3108 : r[rcnt++] = lg->catalog_id->batCacheid;
1746 3108 : r[rcnt++] = lg->dcatalog->batCacheid;
1747 :
1748 3108 : assert(BATcount(lg->dcatalog) - cleanup == BATcount(ndels));
1749 :
1750 3108 : logbat_destroy(lg->catalog_bid);
1751 3108 : logbat_destroy(lg->catalog_id);
1752 3108 : logbat_destroy(lg->dcatalog);
1753 :
1754 3108 : lg->catalog_bid = nbids;
1755 3108 : lg->catalog_id = noids;
1756 3108 : lg->dcatalog = ndels;
1757 :
1758 : /* failing to rename these two bats is not fatal */
1759 3108 : if (BBPrename(lg->catalog_cnt, NULL) != GDK_SUCCEED)
1760 3108 : GDKclrerr();
1761 3108 : if (BBPrename(lg->catalog_lid, NULL) != GDK_SUCCEED)
1762 3108 : GDKclrerr();
1763 3108 : BBPunfix(lg->catalog_cnt->batCacheid);
1764 3108 : BBPunfix(lg->catalog_lid->batCacheid);
1765 :
1766 3108 : lg->catalog_cnt = ncnts;
1767 3108 : lg->catalog_lid = nlids;
1768 3108 : char bak[FILENAME_MAX];
1769 3108 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_cnt", NULL);
1770 3108 : if (BBPrename(lg->catalog_cnt, bak) < 0)
1771 0 : GDKclrerr();
1772 3108 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_lid", NULL);
1773 3108 : if (BBPrename(lg->catalog_lid, bak) < 0)
1774 0 : GDKclrerr();
1775 3108 : rotation_lock(lg);
1776 12333 : for (logged_range *p = lg->pending; p; p = p->next) {
1777 9225 : p->cnt -= cleanup;
1778 : }
1779 3108 : rotation_unlock(lg);
1780 3108 : return rcnt;
1781 : }
1782 :
1783 : /* this function is called with log_lock() held; it releases the lock
1784 : * before returning */
1785 : static gdk_return
1786 13572 : bm_subcommit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1787 : {
1788 13572 : BUN cnt = pending ? pending->cnt : BATcount(lg->catalog_bid);
1789 13572 : BUN dcnt = BATcount(lg->dcatalog);
1790 13572 : BUN p, q;
1791 13572 : BAT *catalog_bid = lg->catalog_bid;
1792 13572 : BAT *catalog_id = lg->catalog_id;
1793 13572 : BAT *dcatalog = lg->dcatalog;
1794 13572 : BUN nn = 13 + cnt;
1795 13572 : bat *n = GDKmalloc(sizeof(bat) * nn);
1796 13572 : bat *r = GDKmalloc(sizeof(bat) * nn);
1797 13572 : BUN *sizes = GDKmalloc(sizeof(BUN) * nn);
1798 13572 : int i = 0, rcnt = 0;
1799 13572 : gdk_return res;
1800 13572 : const log_bid *bids;
1801 13572 : lng *cnts = NULL, *lids = NULL;
1802 13572 : BUN cleanup = 0;
1803 13572 : lng t0 = 0;
1804 :
1805 13572 : if (n == NULL || r == NULL || sizes == NULL) {
1806 0 : GDKfree(n);
1807 0 : GDKfree(r);
1808 0 : GDKfree(sizes);
1809 0 : log_unlock(lg);
1810 0 : return GDK_FAIL;
1811 : }
1812 :
1813 13572 : sizes[i] = 0;
1814 13572 : n[i++] = 0; /* n[0] is not used */
1815 13572 : bids = (const log_bid *) Tloc(catalog_bid, 0);
1816 13572 : if (lg->catalog_cnt)
1817 13342 : cnts = (lng *) Tloc(lg->catalog_cnt, 0);
1818 13572 : if (lg->catalog_lid)
1819 13342 : lids = (lng *) Tloc(lg->catalog_lid, 0);
1820 3991209 : BATloop(catalog_bid, p, q) {
1821 3977637 : if (lids && lids[p] != lng_nil && lids[p] <= lg->saved_tid) {
1822 177281 : cleanup++;
1823 177281 : if (lids[p] == -1)
1824 0 : continue;
1825 354530 : if (BUNfnd(dcatalog, &(oid){p}) == BUN_NONE &&
1826 177249 : BUNappend(dcatalog, &(oid){p}, true) != GDK_SUCCEED) {
1827 0 : while (BATcount(dcatalog) > dcnt) {
1828 0 : if (BUNdelete(dcatalog, BATcount(dcatalog) - 1) != GDK_SUCCEED) {
1829 0 : TRC_CRITICAL(WAL, "delete after failed append failed\n");
1830 0 : break;
1831 : }
1832 : }
1833 0 : GDKfree(n);
1834 0 : GDKfree(r);
1835 0 : GDKfree(sizes);
1836 0 : log_unlock(lg);
1837 0 : return GDK_FAIL;
1838 : }
1839 : }
1840 3977637 : if (updated && p < maxupdated && (updated[p / 32] & (1U << (p % 32))) == 0) {
1841 1953078 : continue;
1842 : }
1843 2024559 : bat col = bids[p];
1844 :
1845 2024559 : TRC_DEBUG(WAL, "new %s (%d)\n", BBP_logical(col), col);
1846 2024559 : assert(col);
1847 2024559 : sizes[i] = cnts ? (BUN) cnts[p] : 0;
1848 2024559 : n[i++] = col;
1849 : }
1850 : /* now commit catalog, so it's also up to date on disk */
1851 13572 : sizes[i] = cnt;
1852 13572 : n[i++] = catalog_bid->batCacheid;
1853 13572 : sizes[i] = cnt;
1854 13572 : n[i++] = catalog_id->batCacheid;
1855 13572 : sizes[i] = BATcount(dcatalog);
1856 13572 : n[i++] = dcatalog->batCacheid;
1857 :
1858 13572 : if (cleanup) {
1859 3108 : if ((rcnt = cleanup_and_swap(lg, r, bids, lids, cnts,
1860 : catalog_bid, catalog_id, dcatalog,
1861 : cleanup)) < 0) {
1862 0 : GDKfree(n);
1863 0 : GDKfree(r);
1864 0 : GDKfree(sizes);
1865 0 : log_unlock(lg);
1866 0 : return GDK_FAIL;
1867 : }
1868 3108 : cnt -= cleanup;
1869 : }
1870 13572 : if (dcatalog != lg->dcatalog) {
1871 3108 : i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, cnt);
1872 3108 : i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, cnt);
1873 3108 : i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog));
1874 : }
1875 13572 : if (lg->seqs_id) {
1876 13342 : sizes[i] = BATcount(lg->seqs_id);
1877 13342 : n[i++] = lg->seqs_id->batCacheid;
1878 13342 : sizes[i] = BATcount(lg->seqs_id);
1879 13342 : n[i++] = lg->seqs_val->batCacheid;
1880 : }
1881 13572 : if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id) / 2) && BATcount(lg->dseqs) > 10) {
1882 357 : BAT *tids, *ids, *vals;
1883 :
1884 357 : tids = bm_tids(lg->seqs_id, lg->dseqs);
1885 357 : if (tids == NULL) {
1886 0 : GDKfree(n);
1887 0 : GDKfree(r);
1888 0 : GDKfree(sizes);
1889 0 : log_unlock(lg);
1890 0 : return GDK_FAIL;
1891 : }
1892 357 : ids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
1893 357 : vals = logbat_new(TYPE_lng, BATcount(tids), PERSISTENT);
1894 :
1895 357 : if (ids == NULL || vals == NULL) {
1896 0 : logbat_destroy(tids);
1897 0 : logbat_destroy(ids);
1898 0 : logbat_destroy(vals);
1899 0 : GDKfree(n);
1900 0 : GDKfree(r);
1901 0 : GDKfree(sizes);
1902 0 : log_unlock(lg);
1903 0 : return GDK_FAIL;
1904 : }
1905 :
1906 714 : if (BATappend(ids, lg->seqs_id, tids, true) != GDK_SUCCEED ||
1907 357 : BATappend(vals, lg->seqs_val, tids, true) != GDK_SUCCEED) {
1908 0 : logbat_destroy(tids);
1909 0 : logbat_destroy(ids);
1910 0 : logbat_destroy(vals);
1911 0 : GDKfree(n);
1912 0 : GDKfree(r);
1913 0 : GDKfree(sizes);
1914 0 : log_unlock(lg);
1915 0 : return GDK_FAIL;
1916 : }
1917 357 : logbat_destroy(tids);
1918 357 : BATclear(lg->dseqs, true);
1919 :
1920 714 : if (log_switch_bat(lg->seqs_id, ids, lg->fn, "seqs_id") != GDK_SUCCEED ||
1921 357 : log_switch_bat(lg->seqs_val, vals, lg->fn, "seqs_val") != GDK_SUCCEED) {
1922 0 : logbat_destroy(ids);
1923 0 : logbat_destroy(vals);
1924 0 : GDKfree(n);
1925 0 : GDKfree(r);
1926 0 : GDKfree(sizes);
1927 0 : log_unlock(lg);
1928 0 : return GDK_FAIL;
1929 : }
1930 357 : i = subcommit_list_add(i, n, sizes, ids->batCacheid, BATcount(ids));
1931 357 : i = subcommit_list_add(i, n, sizes, vals->batCacheid, BATcount(ids));
1932 :
1933 357 : if (BBP_lrefs(lg->seqs_id->batCacheid) > 0)
1934 282 : r[rcnt++] = lg->seqs_id->batCacheid;
1935 357 : if (BBP_lrefs(lg->seqs_val->batCacheid) > 0)
1936 282 : r[rcnt++] = lg->seqs_val->batCacheid;
1937 :
1938 357 : logbat_destroy(lg->seqs_id);
1939 357 : logbat_destroy(lg->seqs_val);
1940 :
1941 357 : lg->seqs_id = ids;
1942 357 : lg->seqs_val = vals;
1943 : }
1944 13572 : if (lg->seqs_id) {
1945 13342 : sizes[i] = BATcount(lg->dseqs);
1946 13342 : n[i++] = lg->dseqs->batCacheid;
1947 : }
1948 :
1949 13572 : assert((BUN) i <= nn);
1950 13572 : log_unlock(lg);
1951 13572 : TRC_DEBUG_IF(WAL)
1952 0 : t0 = GDKusec();
1953 13802 : res = TMsubcommit_list(n, cnts ? sizes : NULL, i, lg->saved_id);
1954 13572 : TRC_DEBUG(WAL, "subcommit " LLFMT "usec\n", GDKusec() - t0);
1955 13572 : if (res == GDK_SUCCEED) { /* now cleanup */
1956 200741 : for (i = 0; i < rcnt; i++) {
1957 187169 : TRC_DEBUG_IF(WAL) {
1958 0 : TRC_DEBUG_ENDIF(WAL, "release %d\n", r[i]);
1959 0 : if (BBP_lrefs(r[i]) != 2)
1960 0 : TRC_DEBUG_ENDIF(WAL, "release %d %d\n", r[i], BBP_lrefs(r[i]));
1961 : }
1962 187169 : BBPrelease(r[i]);
1963 : }
1964 : }
1965 13572 : GDKfree(n);
1966 13572 : GDKfree(r);
1967 13572 : GDKfree(sizes);
1968 13572 : if (res != GDK_SUCCEED)
1969 0 : TRC_CRITICAL(GDK, "commit failed\n");
1970 : return res;
1971 : }
1972 :
1973 : static gdk_return
1974 356 : log_filename(logger *lg, char bak[FILENAME_MAX], char filename[FILENAME_MAX])
1975 : {
1976 356 : if (GDKfilepath(filename, FILENAME_MAX, 0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED) {
1977 0 : GDKerror("Logger filename path is too large\n");
1978 0 : return GDK_FAIL;
1979 : }
1980 356 : if (bak) {
1981 356 : if (strconcat_len(bak, FILENAME_MAX, filename, ".bak", NULL) >= FILENAME_MAX) {
1982 0 : GDKerror("Logger filename path is too large\n");
1983 0 : return GDK_FAIL;
1984 : }
1985 : }
1986 : return GDK_SUCCEED;
1987 : }
1988 :
1989 : static gdk_return
1990 12890 : log_cleanup(logger *lg, lng id)
1991 : {
1992 12890 : char log_id[FILENAME_MAX];
1993 :
1994 12890 : if (snprintf(log_id, sizeof(log_id), LLFMT, id) >= FILENAME_MAX) {
1995 0 : GDKerror("log_id filename is too large\n");
1996 0 : return GDK_FAIL;
1997 : }
1998 12890 : if (GDKunlink(0, lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
1999 0 : GDKwarning("failed to remove old WAL %s.%s\n", LOGFILE, log_id);
2000 0 : GDKclrerr(); /* clear error from unlink */
2001 : }
2002 : return GDK_SUCCEED;
2003 : }
2004 :
2005 : #ifdef GDKLIBRARY_JSON
2006 : static gdk_return
2007 357 : log_json_upgrade_finalize(void)
2008 : {
2009 357 : int json_tpe = ATOMindex("json");
2010 713 : if (!GDKinmemory(0) &&
2011 356 : GDKunlink(0, BATDIR, "jsonupgradeneeded", NULL) == GDK_FAIL) {
2012 0 : TRC_CRITICAL(GDK, "Failed to remove json upgrade signal file");
2013 0 : return GDK_FAIL;
2014 : }
2015 357 : BATatoms[json_tpe].atomRead = (void *(*)(void *, size_t *, stream *, size_t))strRead;
2016 :
2017 357 : return GDK_SUCCEED;
2018 : }
2019 : #endif
2020 :
2021 : /* clean up old junk left over from old upgrades: bats that are
2022 : * persistent but not in the SQL catalog and that have no name, and bats
2023 : * that do have a name that starts with "stat_opt_" (from the statistics
2024 : * optimizer that was removed in 2017) are removed here
2025 : *
2026 : * this function ignores any errors */
2027 : static void
2028 8 : clean_bbp(logger *lg)
2029 : {
2030 8 : BAT *b = COLnew(0, TYPE_int, 256, TRANSIENT);
2031 8 : if (b == NULL)
2032 : return;
2033 8 : if (BUNappend(b, &(int){0}, false) != GDK_SUCCEED) {
2034 0 : BBPreclaim(b);
2035 0 : return;
2036 : }
2037 : /* mark persistent bats that have no name or have a name
2038 : * starting with "stat_opt_" */
2039 16926 : for (bat bid = 1, bsz = getBBPsize(); bid < bsz; bid++)
2040 16918 : if (BBP_status(bid) & BBPEXISTING &&
2041 2576 : (BBP_logical(bid) == NULL ||
2042 2576 : strncmp(BBP_logical(bid), "tmp_", 4) == 0 ||
2043 80 : strncmp(BBP_logical(bid), "stat_opt_", 9) == 0))
2044 2528 : BBP_status_on(bid, 1U << 31);
2045 : /* remove mark from bats that are in the SQL catalog */
2046 2216 : for (BUN i = 0, n = BATcount(lg->catalog_bid); i < n; i++)
2047 2208 : BBP_status_off(((int *) lg->catalog_bid->theap->base)[i], 1U << 31);
2048 : /* what's left over are junk bats */
2049 16926 : for (bat bid = 1, bsz = getBBPsize(); bid < bsz; bid++)
2050 16918 : if (BBP_status(bid) & (1U << 31)) {
2051 320 : BBP_status_off(bid, 1U << 31);
2052 640 : if (BATmode(BBP_desc(bid), true) != GDK_SUCCEED ||
2053 320 : BUNappend(b, &bid, false) != GDK_SUCCEED) {
2054 0 : BBPreclaim(b);
2055 0 : return;
2056 : }
2057 320 : printf("# removing bat %d (tmp_%o)\n", bid, bid);
2058 : }
2059 : /* if there were any junk bats, commit their removal */
2060 16 : if (b->batCount > 1 &&
2061 8 : TMsubcommit_list(Tloc(b, 0), NULL, (int) b->batCount, -1) != GDK_SUCCEED)
2062 0 : printf("clean_bbp transaction failed\n");
2063 8 : BBPreclaim(b);
2064 : }
2065 :
2066 : /* Load data from the logger logdir
2067 : * Initialize new directories and catalog files if none are present,
2068 : * unless running in read-only mode
2069 : * Load data and persist it in the BATs */
2070 : static gdk_return
2071 357 : log_load(const char *fn, logger *lg, char filename[FILENAME_MAX])
2072 : {
2073 357 : FILE *fp = NULL;
2074 357 : char bak[FILENAME_MAX];
2075 357 : bat catalog_bid, catalog_id, dcatalog;
2076 357 : bool needcommit = false;
2077 357 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
2078 357 : bool readlogs = false;
2079 357 : bool needsnew = false; /* need to write new log file? */
2080 :
2081 : /* refactor */
2082 357 : if (!LOG_DISABLED(lg)) {
2083 356 : if (log_filename(lg, bak, filename) != GDK_SUCCEED)
2084 0 : goto error;
2085 : }
2086 :
2087 357 : lg->catalog_bid = NULL;
2088 357 : lg->catalog_id = NULL;
2089 357 : lg->catalog_cnt = NULL;
2090 357 : lg->catalog_lid = NULL;
2091 357 : lg->dcatalog = NULL;
2092 :
2093 357 : lg->seqs_id = NULL;
2094 357 : lg->seqs_val = NULL;
2095 357 : lg->dseqs = NULL;
2096 :
2097 357 : if (!LOG_DISABLED(lg)) {
2098 : /* try to open logfile backup, or failing that, the file
2099 : * itself. we need to know whether this file exists when
2100 : * checking the database consistency later on */
2101 356 : if ((fp = MT_fopen(bak, "r")) != NULL) {
2102 0 : fclose(fp);
2103 0 : fp = NULL;
2104 0 : if (GDKunlink(0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED ||
2105 0 : GDKmove(0, lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL, true) != GDK_SUCCEED)
2106 0 : goto error;
2107 356 : } else if (errno != ENOENT) {
2108 0 : GDKsyserror("open %s failed", bak);
2109 0 : goto error;
2110 : }
2111 356 : fp = MT_fopen(filename, "r");
2112 356 : if (fp == NULL && errno != ENOENT) {
2113 0 : GDKsyserror("open %s failed", filename);
2114 0 : goto error;
2115 : }
2116 : }
2117 :
2118 357 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2119 357 : catalog_bid = BBPindex(bak);
2120 :
2121 : /* initialize arrays for type mapping, to be read from disk */
2122 357 : memset(lg->type_id, -1, sizeof(lg->type_id));
2123 357 : memset(lg->type_nr, 255, sizeof(lg->type_nr));
2124 :
2125 : /* this is intentional - if catalog_bid is 0, force it to find
2126 : * the persistent catalog */
2127 357 : if (catalog_bid == 0) {
2128 : /* catalog does not exist, so the log file also
2129 : * shouldn't exist */
2130 230 : if (fp != NULL) {
2131 0 : GDKerror("there is no logger catalog, "
2132 : "but there is a log file.\n");
2133 0 : goto error;
2134 : }
2135 :
2136 230 : lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2137 230 : lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2138 230 : lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
2139 :
2140 230 : if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
2141 0 : GDKerror("cannot create catalog bats");
2142 0 : goto error;
2143 : }
2144 230 : TRC_INFO(WAL, "create %s catalog\n", fn);
2145 :
2146 : /* give the catalog bats names so we can find them
2147 : * next time */
2148 230 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2149 230 : if (BBPrename(lg->catalog_bid, bak) < 0) {
2150 0 : goto error;
2151 : }
2152 :
2153 230 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2154 230 : if (BBPrename(lg->catalog_id, bak) < 0) {
2155 0 : goto error;
2156 : }
2157 :
2158 230 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2159 230 : if (BBPrename(lg->dcatalog, bak) < 0) {
2160 0 : goto error;
2161 : }
2162 :
2163 230 : if (!LOG_DISABLED(lg)) {
2164 229 : if (GDKcreatedir(filename) != GDK_SUCCEED) {
2165 0 : GDKerror("cannot create directory for log file %s\n", filename);
2166 0 : goto error;
2167 : }
2168 229 : if (log_create_types_file(lg, filename) != GDK_SUCCEED)
2169 0 : goto error;
2170 : }
2171 :
2172 230 : BBPretain(lg->catalog_bid->batCacheid);
2173 230 : BBPretain(lg->catalog_id->batCacheid);
2174 230 : BBPretain(lg->dcatalog->batCacheid);
2175 :
2176 230 : log_lock(lg);
2177 : /* bm_subcommit releases the lock */
2178 230 : if (bm_subcommit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2179 : /* cannot commit catalog, so remove log */
2180 0 : if (MT_remove(filename) < 0)
2181 0 : GDKsyserror("remove %s failed\n", filename);
2182 0 : BBPrelease(lg->catalog_bid->batCacheid);
2183 0 : BBPrelease(lg->catalog_id->batCacheid);
2184 0 : BBPrelease(lg->dcatalog->batCacheid);
2185 0 : goto error;
2186 : }
2187 : } else {
2188 : /* find the persistent catalog. As non persistent bats
2189 : * require a logical reference we also add a logical
2190 : * reference for the persistent bats */
2191 127 : BUN p, q;
2192 127 : BAT *b, *o, *d;
2193 :
2194 127 : assert(!lg->inmemory);
2195 :
2196 : /* the catalog exists, and so should the log file */
2197 127 : if (fp == NULL && !LOG_DISABLED(lg)) {
2198 0 : GDKerror("There is a logger catalog, but no log file.\n");
2199 0 : goto error;
2200 : }
2201 : if (fp != NULL) {
2202 : /* check_version always closes fp */
2203 127 : if (check_version(lg, fp, &needsnew) != GDK_SUCCEED) {
2204 0 : fp = NULL;
2205 0 : goto error;
2206 : }
2207 : readlogs = true;
2208 : fp = NULL;
2209 : }
2210 :
2211 127 : if (lg->catalog_bid == NULL && lg->catalog_id == NULL && lg->dcatalog == NULL) {
2212 127 : b = BATdescriptor(catalog_bid);
2213 127 : if (b == NULL) {
2214 0 : GDKerror("inconsistent database, catalog does not exist");
2215 0 : goto error;
2216 : }
2217 :
2218 127 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2219 127 : catalog_id = BBPindex(bak);
2220 127 : o = BATdescriptor(catalog_id);
2221 127 : if (o == NULL) {
2222 0 : BBPunfix(b->batCacheid);
2223 0 : GDKerror("inconsistent database, catalog_id does not exist");
2224 0 : goto error;
2225 : }
2226 :
2227 127 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2228 127 : dcatalog = BBPindex(bak);
2229 127 : d = BATdescriptor(dcatalog);
2230 127 : if (d == NULL) {
2231 0 : GDKerror("cannot create dcatalog bat");
2232 0 : BBPunfix(b->batCacheid);
2233 0 : BBPunfix(o->batCacheid);
2234 0 : goto error;
2235 : }
2236 :
2237 127 : lg->catalog_bid = b;
2238 127 : lg->catalog_id = o;
2239 127 : lg->dcatalog = d;
2240 127 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
2241 32943 : BATloop(lg->catalog_bid, p, q) {
2242 32816 : bat bid = bids[p];
2243 32816 : oid pos = p;
2244 :
2245 32816 : if (BBPretain(bid) == 0 && /* any bid in the catalog_bid, needs one logical ref */
2246 0 : BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
2247 0 : BUNappend(lg->dcatalog, &pos, true) != GDK_SUCCEED)
2248 0 : goto error;
2249 : }
2250 : }
2251 127 : if ((lg->catalog_bid = BATsetaccess(lg->catalog_bid, BAT_READ)) == NULL ||
2252 127 : (lg->catalog_id = BATsetaccess(lg->catalog_id, BAT_READ)) == NULL ||
2253 127 : (lg->dcatalog = BATsetaccess(lg->dcatalog, BAT_READ)) == NULL) {
2254 0 : goto error;
2255 : }
2256 127 : BBPretain(lg->catalog_bid->batCacheid);
2257 127 : BBPretain(lg->catalog_id->batCacheid);
2258 127 : BBPretain(lg->dcatalog->batCacheid);
2259 : }
2260 : /* failing to rename the catalog_cnt and catalog_lid bats is not
2261 : * fatal */
2262 357 : lg->catalog_cnt = logbat_new(TYPE_lng, 1, SYSTRANS);
2263 357 : if (lg->catalog_cnt == NULL) {
2264 0 : GDKerror("failed to create catalog_cnt bat");
2265 0 : goto error;
2266 : }
2267 357 : strconcat_len(bak, sizeof(bak), fn, "_catalog_cnt", NULL);
2268 357 : if (BBPrename(lg->catalog_cnt, bak) < 0)
2269 0 : GDKclrerr();
2270 357 : lg->catalog_lid = logbat_new(TYPE_lng, 1, SYSTRANS);
2271 357 : if (lg->catalog_lid == NULL) {
2272 0 : GDKerror("failed to create catalog_lid bat");
2273 0 : goto error;
2274 : }
2275 357 : strconcat_len(bak, sizeof(bak), fn, "_catalog_lid", NULL);
2276 357 : if (BBPrename(lg->catalog_lid, bak) < 0)
2277 0 : GDKclrerr();
2278 357 : if (bm_get_counts(lg) != GDK_SUCCEED)
2279 0 : goto error;
2280 :
2281 357 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2282 357 : if (BBPindex(bak)) {
2283 127 : lg->seqs_id = BATdescriptor(BBPindex(bak));
2284 127 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2285 127 : lg->seqs_val = BATdescriptor(BBPindex(bak));
2286 127 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2287 127 : lg->dseqs = BATdescriptor(BBPindex(bak));
2288 127 : if (lg->seqs_id == NULL ||
2289 127 : lg->seqs_val == NULL ||
2290 : lg->dseqs == NULL) {
2291 0 : GDKerror("Logger_new: cannot load seqs bats");
2292 0 : goto error;
2293 : }
2294 127 : if ((lg->seqs_val = BATsetaccess(lg->seqs_val, BAT_READ)) == NULL ||
2295 127 : (lg->seqs_id = BATsetaccess(lg->seqs_id, BAT_READ)) == NULL ||
2296 127 : (lg->dseqs = BATsetaccess(lg->dseqs, BAT_READ)) == NULL) {
2297 0 : goto error;
2298 : }
2299 : } else {
2300 230 : lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
2301 230 : lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
2302 230 : lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
2303 230 : if (lg->seqs_id == NULL ||
2304 230 : lg->seqs_val == NULL ||
2305 : lg->dseqs == NULL) {
2306 0 : GDKerror("Logger_new: cannot create seqs bats");
2307 0 : goto error;
2308 : }
2309 :
2310 230 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2311 230 : if (BBPrename(lg->seqs_id, bak) < 0) {
2312 0 : goto error;
2313 : }
2314 :
2315 230 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2316 230 : if (BBPrename(lg->seqs_val, bak) < 0) {
2317 0 : goto error;
2318 : }
2319 :
2320 230 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2321 230 : if (BBPrename(lg->dseqs, bak) < 0) {
2322 0 : goto error;
2323 : }
2324 : needcommit = true;
2325 : }
2326 357 : dbg = ATOMIC_GET(&GDKdebug);
2327 357 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2328 357 : if (needcommit && bm_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2329 0 : GDKerror("Logger_new: commit failed");
2330 0 : goto error;
2331 : }
2332 357 : ATOMIC_SET(&GDKdebug, dbg);
2333 :
2334 357 : if (readlogs) {
2335 127 : ulng log_id = lg->saved_id + 1;
2336 127 : bool earlyexit = GDKgetenv_isyes("process-wal-and-exit");
2337 127 : if (log_readlogs(lg, filename) != GDK_SUCCEED) {
2338 0 : goto error;
2339 : }
2340 127 : if (!earlyexit) {
2341 127 : if (lg->postfuncp && (*lg->postfuncp) (lg->funcdata, lg) != GDK_SUCCEED)
2342 0 : goto error;
2343 127 : if (needsnew) {
2344 16 : if (GDKmove(0, lg->dir, LOGFILE, NULL, lg->dir, LOGFILE, "bak", true) != GDK_SUCCEED) {
2345 0 : TRC_CRITICAL(GDK, "couldn't move log to log.bak\n");
2346 0 : return GDK_FAIL;
2347 : }
2348 16 : if (log_create_types_file(lg, filename) != GDK_SUCCEED) {
2349 0 : TRC_CRITICAL(GDK, "couldn't write new log\n");
2350 0 : return GDK_FAIL;
2351 : }
2352 : }
2353 : }
2354 127 : dbg = ATOMIC_GET(&GDKdebug);
2355 127 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2356 127 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2357 0 : goto error;
2358 : }
2359 127 : ATOMIC_SET(&GDKdebug, dbg);
2360 334 : for (; log_id <= lg->saved_id; log_id++)
2361 207 : (void) log_cleanup(lg, log_id); /* ignore error of removing file */
2362 127 : if (earlyexit) {
2363 0 : printf("# mserver5 exiting\n");
2364 0 : exit(0);
2365 : }
2366 143 : if (needsnew &&
2367 16 : GDKunlink(0, lg->dir, LOGFILE, "bak") != GDK_SUCCEED) {
2368 0 : TRC_CRITICAL(GDK, "couldn't remove old log.bak file\n");
2369 0 : return GDK_FAIL;
2370 : }
2371 : } else {
2372 230 : lg->id = lg->saved_id + 1;
2373 230 : if (GDKgetenv_isyes("process-wal-and-exit")) {
2374 0 : printf("# mserver5 exiting\n");
2375 0 : exit(0);
2376 : }
2377 : }
2378 : #ifdef GDKLIBRARY_JSON
2379 357 : if (log_json_upgrade_finalize() == GDK_FAIL)
2380 0 : goto error;
2381 : #endif
2382 357 : if (GDKgetenv_isyes("clean-BBP"))
2383 8 : clean_bbp(lg);
2384 : return GDK_SUCCEED;
2385 0 : error:
2386 0 : if (fp)
2387 0 : fclose(fp);
2388 0 : logbat_destroy(lg->catalog_bid);
2389 0 : logbat_destroy(lg->catalog_id);
2390 0 : logbat_destroy(lg->dcatalog);
2391 0 : logbat_destroy(lg->seqs_id);
2392 0 : logbat_destroy(lg->seqs_val);
2393 0 : logbat_destroy(lg->dseqs);
2394 0 : MT_lock_destroy(&lg->lock);
2395 0 : MT_lock_destroy(&lg->rotation_lock);
2396 0 : GDKfree(lg->fn);
2397 0 : GDKfree(lg->dir);
2398 0 : GDKfree(lg->rbuf);
2399 0 : GDKfree(lg->wbuf);
2400 0 : GDKfree(lg);
2401 0 : ATOMIC_SET(&GDKdebug, dbg);
2402 : /* We do not call log_json_upgrade_finalize here because we want
2403 : * the upgrade to run again next time we try, so we do not want
2404 : * to remove the signal file just yet.
2405 : */
2406 0 : return GDK_FAIL;
2407 : }
2408 :
2409 : /* Initialize a new logger
2410 : * It will load any data in the logdir and persist it in the BATs*/
2411 : static logger *
2412 357 : log_new(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp,
2413 : postversionfix_fptr postfuncp, void *funcdata)
2414 : {
2415 357 : logger *lg;
2416 357 : char filename[FILENAME_MAX];
2417 :
2418 357 : lng max_dropped = GDKgetenv_int("wal_max_dropped", 100000);
2419 357 : lng max_file_age = GDKgetenv_int("wal_max_file_age", 600);
2420 357 : lng max_file_size = 0;
2421 :
2422 357 : if (GDKdebug & TESTINGMASK) {
2423 : max_file_size = 2048; /* 2 KiB */
2424 : } else {
2425 12 : const char *max_file_size_str = GDKgetenv("wal_max_file_size");
2426 12 : max_file_size = max_file_size_str ? strtoul(max_file_size_str, NULL, 10) : 2147483648;
2427 : }
2428 :
2429 357 : if (!GDKinmemory(0) && MT_path_absolute(logdir)) {
2430 0 : TRC_CRITICAL(GDK, "logdir must be relative path\n");
2431 0 : return NULL;
2432 : }
2433 :
2434 357 : if (snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP) >= FILENAME_MAX) {
2435 0 : TRC_CRITICAL(GDK, "filename is too large\n");
2436 0 : return NULL;
2437 : }
2438 :
2439 357 : lg = GDKmalloc(sizeof(struct logger));
2440 357 : if (lg == NULL) {
2441 0 : TRC_CRITICAL(GDK, "allocating logger structure failed\n");
2442 0 : return NULL;
2443 : }
2444 :
2445 714 : *lg = (logger) {
2446 357 : .inmemory = GDKinmemory(0),
2447 : .debug = debug,
2448 : .version = version,
2449 : .prefuncp = prefuncp,
2450 : .postfuncp = postfuncp,
2451 : .funcdata = funcdata,
2452 :
2453 357 : .max_dropped = max_dropped >= 0 ? max_dropped : 100000,
2454 : .file_age = 0,
2455 357 : .max_file_age = max_file_age >= 0 ? max_file_age * 1000000 : 600000000,
2456 357 : .max_file_size = max_file_size >= 0 ? max_file_size : 2147483648,
2457 :
2458 : .id = 0,
2459 357 : .saved_id = getBBPlogno(), /* get saved log number from bbp */
2460 : .nr_flushers = ATOMIC_VAR_INIT(0),
2461 357 : .fn = GDKstrdup(fn),
2462 357 : .dir = GDKstrdup(filename),
2463 : .rbufsize = 64 * 1024,
2464 357 : .rbuf = GDKmalloc(64 * 1024),
2465 : .wbufsize = 64 * 1024,
2466 357 : .wbuf = GDKmalloc(64 * 1024),
2467 : };
2468 :
2469 : /* probably open file and check version first, then call call old logger code */
2470 357 : if (lg->fn == NULL ||
2471 357 : lg->dir == NULL ||
2472 357 : lg->rbuf == NULL ||
2473 : lg->wbuf == NULL) {
2474 0 : TRC_CRITICAL(GDK, "allocating for logger structure failed\n");
2475 0 : GDKfree(lg->fn);
2476 0 : GDKfree(lg->dir);
2477 0 : GDKfree(lg->rbuf);
2478 0 : GDKfree(lg->wbuf);
2479 0 : GDKfree(lg);
2480 0 : return NULL;
2481 : }
2482 357 : TRC_DEBUG(WAL, "dir set to %s\n", lg->dir);
2483 :
2484 357 : MT_lock_init(&lg->lock, fn);
2485 357 : MT_lock_init(&lg->rotation_lock, "rotation_lock");
2486 357 : MT_lock_init(&lg->flush_lock, "flush_lock");
2487 357 : MT_cond_init(&lg->excl_flush_cv, "flush_cond");
2488 :
2489 357 : if (log_load(fn, lg, filename) == GDK_SUCCEED) {
2490 : return lg;
2491 : }
2492 : return NULL;
2493 : }
2494 :
2495 : static logged_range *
2496 68869 : do_flush_range_cleanup(logger *lg)
2497 : {
2498 68869 : logged_range *frange = lg->flush_ranges;
2499 68869 : logged_range *first = frange;
2500 :
2501 68869 : if (frange == NULL)
2502 : return NULL;
2503 81646 : while (frange->next) {
2504 13149 : if (ATOMIC_GET(&frange->refcount) > 1)
2505 : break;
2506 12777 : frange = frange->next;
2507 : }
2508 68869 : if (first == frange) {
2509 : return first;
2510 : }
2511 :
2512 12777 : logged_range *flast = frange;
2513 :
2514 12777 : lg->flush_ranges = flast;
2515 :
2516 25554 : for (frange = first; frange && frange != flast; frange = frange->next) {
2517 12777 : ATOMIC_DEC(&frange->refcount);
2518 12777 : if (!LOG_DISABLED(lg) && frange->output_log) {
2519 0 : TRC_INFO(WAL, "closing output log %s", mnstr_name(frange->output_log));
2520 0 : close_stream(frange->output_log);
2521 0 : frange->output_log = NULL;
2522 : }
2523 : }
2524 : return flast;
2525 : }
2526 :
2527 : void
2528 356 : log_destroy(logger *lg)
2529 : {
2530 356 : log_close_input(lg);
2531 356 : logged_range *last = do_flush_range_cleanup(lg);
2532 356 : (void) last;
2533 356 : assert(last == lg->current && last == lg->flush_ranges);
2534 356 : log_close_output(lg);
2535 806 : for (logged_range * p = lg->pending; p; p = lg->pending) {
2536 450 : lg->pending = p->next;
2537 450 : GDKfree(p);
2538 : }
2539 356 : if (LOG_DISABLED(lg)) {
2540 1 : lg->saved_id = lg->id;
2541 1 : lg->saved_tid = lg->tid;
2542 1 : log_commit(lg, NULL, NULL, 0);
2543 : }
2544 356 : if (lg->catalog_bid) {
2545 356 : log_lock(lg);
2546 356 : BUN p, q;
2547 356 : BAT *b = lg->catalog_bid;
2548 :
2549 : /* free resources */
2550 356 : const log_bid *bids = (const log_bid *) Tloc(b, 0);
2551 93326 : BATloop(b, p, q) {
2552 92970 : bat bid = bids[p];
2553 :
2554 92970 : BBPrelease(bid);
2555 : }
2556 :
2557 356 : BBPrelease(lg->catalog_bid->batCacheid);
2558 356 : BBPrelease(lg->catalog_id->batCacheid);
2559 356 : BBPrelease(lg->dcatalog->batCacheid);
2560 356 : logbat_destroy(lg->catalog_bid);
2561 356 : logbat_destroy(lg->catalog_id);
2562 356 : logbat_destroy(lg->dcatalog);
2563 :
2564 356 : logbat_destroy(lg->catalog_cnt);
2565 356 : logbat_destroy(lg->catalog_lid);
2566 356 : log_unlock(lg);
2567 : }
2568 356 : MT_lock_destroy(&lg->lock);
2569 356 : MT_lock_destroy(&lg->rotation_lock);
2570 356 : MT_lock_destroy(&lg->flush_lock);
2571 356 : GDKfree(lg->fn);
2572 356 : GDKfree(lg->dir);
2573 356 : GDKfree(lg->rbuf);
2574 356 : GDKfree(lg->wbuf);
2575 356 : GDKfree(lg);
2576 356 : }
2577 :
2578 : /* Create a new logger */
2579 : logger *
2580 357 : log_create(int debug, const char *fn, const char *logdir, int version,
2581 : preversionfix_fptr prefuncp, postversionfix_fptr postfuncp,
2582 : void *funcdata)
2583 : {
2584 357 : logger *lg;
2585 357 : TRC_INFO_IF(WAL) {
2586 0 : TRC_INFO_ENDIF(WAL, "Started processing logs %s/%s version %d\n", fn, logdir, version);
2587 0 : GDKtracer_flush_buffer();
2588 : }
2589 357 : lg = log_new(debug, fn, logdir, version, prefuncp, postfuncp, funcdata);
2590 357 : if (lg == NULL)
2591 : return NULL;
2592 357 : TRC_INFO_IF(WAL) {
2593 0 : TRC_INFO_ENDIF(WAL, "Finished processing logs %s/%s\n", fn, logdir);
2594 0 : GDKtracer_flush_buffer();
2595 : }
2596 357 : if (GDKsetenv("recovery", "finished") != GDK_SUCCEED) {
2597 0 : log_destroy(lg);
2598 0 : return NULL;
2599 : }
2600 357 : assert(lg->current == NULL);
2601 357 : logged_range dummy = {
2602 357 : .cnt = BATcount(lg->catalog_bid),
2603 : };
2604 357 : lg->current = &dummy;
2605 357 : if (log_open_output(lg) != GDK_SUCCEED) {
2606 0 : lg->current = NULL;
2607 0 : log_destroy(lg);
2608 0 : return NULL;
2609 : }
2610 357 : lg->current = lg->current->next;
2611 357 : assert(lg->pending == NULL && lg->flush_ranges == NULL);
2612 357 : lg->pending = lg->current;
2613 357 : lg->flush_ranges = lg->current;
2614 357 : return lg;
2615 : }
2616 :
2617 : static logged_range *
2618 8312 : log_next_logfile(logger *lg, ulng ts)
2619 : {
2620 8312 : int m = (ATOMIC_GET(&GDKdebug) & TESTINGMASK) ? 1000 : 100;
2621 8312 : if (!lg->pending || !lg->pending->next)
2622 : return NULL;
2623 8312 : rotation_lock(lg);
2624 8312 : if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && lg->pending != lg->flush_ranges &&
2625 8312 : (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) ATOMIC_GET(&lg->pending->flushed_ts) &&
2626 8312 : (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) {
2627 7044 : logged_range *p = lg->pending;
2628 7044 : for (int i = 1;
2629 12683 : i < m && ATOMIC_GET(&p->refcount) == 0 && p->next && p->next != lg->current &&
2630 5661 : p->next != lg->flush_ranges && (ulng) ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts)
2631 18344 : && (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++)
2632 5639 : p = p->next;
2633 7044 : rotation_unlock(lg);
2634 7044 : return p;
2635 : }
2636 1268 : rotation_unlock(lg);
2637 1268 : return NULL;
2638 : }
2639 :
2640 : static void
2641 7044 : log_cleanup_range(logger *lg, ulng id)
2642 : {
2643 7044 : rotation_lock(lg);
2644 19727 : while (lg->pending && lg->pending->id <= id) {
2645 12683 : logged_range *p;
2646 12683 : p = lg->pending;
2647 12683 : if (p)
2648 12683 : lg->pending = p->next;
2649 12683 : GDKfree(p);
2650 : }
2651 7044 : rotation_unlock(lg);
2652 7044 : }
2653 :
2654 : static void
2655 68516 : do_rotate(logger *lg)
2656 : {
2657 68516 : logged_range *cur = lg->current;
2658 68516 : logged_range *next = cur->next;
2659 68516 : if (next) {
2660 12777 : assert(ATOMIC_GET(&next->refcount) == 1);
2661 12777 : lg->current = next;
2662 12777 : if (!LOG_DISABLED(lg) && ATOMIC_GET(&cur->refcount) == 1 && cur->output_log) {
2663 12624 : close_stream(cur->output_log);
2664 12624 : cur->output_log = NULL;
2665 : }
2666 : }
2667 68516 : }
2668 :
2669 : gdk_return
2670 14011 : log_activate(logger *lg)
2671 : {
2672 14011 : bool flush_cleanup = false;
2673 14011 : gdk_return res = GDK_SUCCEED;
2674 :
2675 14011 : rotation_lock(lg);
2676 14011 : const lng current_file_size = LOG_DISABLED(lg) ? 0 : (lng) getfilepos(getFile(lg->current->output_log));
2677 :
2678 14011 : if (current_file_size == -1) {
2679 0 : rotation_unlock(lg);
2680 0 : return GDK_FAIL;
2681 : }
2682 : /* file size of 2 means only endian indicator present
2683 : * (i.e. effectively empty) */
2684 14011 : if (current_file_size <= 2) {
2685 10669 : rotation_unlock(lg);
2686 10669 : return GDK_SUCCEED;
2687 : }
2688 :
2689 3342 : if (!lg->flushnow &&
2690 3342 : !lg->current->next &&
2691 3342 : current_file_size > 2 &&
2692 3342 : (ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped ||
2693 3332 : current_file_size > lg->max_file_size ||
2694 3299 : (GDKusec() - lg->file_age) > lg->max_file_age) &&
2695 43 : (ulng) ATOMIC_GET(&lg->current->last_ts) > 0 &&
2696 43 : lg->saved_id + 1 == lg->id &&
2697 42 : ATOMIC_GET(&lg->current->refcount) == 1 /* no pending work on this file */ ) {
2698 41 : lg->id++;
2699 : /* start new file */
2700 41 : res = log_open_output(lg);
2701 41 : flush_cleanup = true;
2702 41 : do_rotate(lg);
2703 : }
2704 41 : if (flush_cleanup)
2705 41 : (void) do_flush_range_cleanup(lg);
2706 3342 : rotation_unlock(lg);
2707 3342 : return res;
2708 : }
2709 :
2710 : gdk_return
2711 8312 : log_flush(logger *lg, ulng ts)
2712 : {
2713 8312 : logged_range *pending = log_next_logfile(lg, ts);
2714 8312 : ulng lid = pending ? pending->id : 0, olid = lg->saved_id;
2715 8312 : if (LOG_DISABLED(lg)) {
2716 0 : lg->saved_id = lid;
2717 0 : lg->saved_tid = lg->tid;
2718 0 : if (lid)
2719 0 : log_cleanup_range(lg, lg->saved_id);
2720 0 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED)
2721 0 : TRC_ERROR(GDK, "failed to commit");
2722 0 : return GDK_SUCCEED;
2723 : }
2724 8312 : if (lg->saved_id >= lid)
2725 : return GDK_SUCCEED;
2726 7044 : rotation_lock(lg);
2727 7044 : ulng lgid = lg->id;
2728 7044 : rotation_unlock(lg);
2729 7044 : if (lg->saved_id + 1 >= lgid) /* logger should first release the file */
2730 : return GDK_SUCCEED;
2731 7044 : log_return res = LOG_OK;
2732 7044 : ulng cid = olid;
2733 7044 : assert(lid <= lgid);
2734 : uint32_t *updated = NULL;
2735 : BUN nupdated = 0;
2736 : size_t allocated = 0;
2737 19727 : while (cid < lid && res == LOG_OK) {
2738 12683 : if (!lg->input_log) {
2739 12683 : char filename[MAXPATH];
2740 12683 : char id[32];
2741 12683 : if (snprintf(id, sizeof(id), LLFMT, cid + 1) >= (int) sizeof(id)) {
2742 0 : GDKfree(updated);
2743 0 : TRC_CRITICAL(GDK, "log_id filename is too large\n");
2744 0 : return GDK_FAIL;
2745 : }
2746 12683 : if (GDKfilepath(filename, sizeof(filename), BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id) != GDK_SUCCEED) {
2747 0 : GDKfree(updated);
2748 0 : return GDK_FAIL;
2749 : }
2750 12683 : if (strlen(filename) >= FILENAME_MAX) {
2751 : GDKfree(updated);
2752 : TRC_CRITICAL(GDK, "Logger filename path is too large\n");
2753 : return GDK_FAIL;
2754 : }
2755 :
2756 12683 : bool filemissing = false;
2757 12683 : if (log_open_input(lg, filename, &filemissing) != GDK_SUCCEED) {
2758 0 : GDKfree(updated);
2759 0 : return GDK_FAIL;
2760 : }
2761 : }
2762 : /* we read the full file because skipping is impossible with current log format */
2763 12683 : log_lock(lg);
2764 12683 : if (updated == NULL) {
2765 7044 : nupdated = BATcount(lg->catalog_id);
2766 7044 : allocated = ((nupdated + 31) & ~31) / 8;
2767 7044 : if (allocated == 0)
2768 0 : allocated = 4;
2769 7044 : updated = GDKzalloc(allocated);
2770 7044 : if (updated == NULL) {
2771 0 : log_unlock(lg);
2772 0 : return GDK_FAIL;
2773 : }
2774 5639 : } else if (nupdated < BATcount(lg->catalog_id)) {
2775 54 : BUN n = BATcount(lg->catalog_id);
2776 54 : size_t a = ((n + 31) & ~31) / 8;
2777 54 : if (a > allocated) {
2778 11 : uint32_t *p = GDKrealloc(updated, a);
2779 11 : if (p == NULL) {
2780 0 : GDKfree(updated);
2781 0 : log_unlock(lg);
2782 0 : return GDK_FAIL;
2783 : }
2784 11 : updated = p;
2785 11 : memset(updated + allocated / 4, 0, a - allocated);
2786 11 : allocated = a;
2787 : }
2788 : nupdated = n;
2789 : }
2790 12683 : lg->flushing = true;
2791 12683 : res = log_read_transaction(lg, updated, nupdated);
2792 12683 : lg->flushing = false;
2793 12683 : log_unlock(lg);
2794 12683 : if (res == LOG_EOF) {
2795 12683 : log_close_input(lg);
2796 12683 : res = LOG_OK;
2797 : }
2798 12683 : cid++;
2799 : }
2800 7044 : if (lid > olid && res == LOG_OK) {
2801 7044 : rotation_lock(lg); /* protect against concurrent log_tflush rotate check */
2802 7044 : lg->saved_id = lid;
2803 7044 : rotation_unlock(lg);
2804 7044 : if (log_commit(lg, pending, updated, nupdated) != GDK_SUCCEED) {
2805 0 : TRC_ERROR(GDK, "failed to commit");
2806 0 : res = LOG_ERR;
2807 0 : rotation_lock(lg);
2808 0 : lg->saved_id = olid; /* reset !! */
2809 0 : rotation_unlock(lg);
2810 : }
2811 0 : if (res != LOG_ERR) {
2812 19727 : while (olid < lid) {
2813 : /* Try to cleanup, remove old log file, continue on failure! */
2814 12683 : olid++;
2815 12683 : (void) log_cleanup(lg, olid);
2816 : }
2817 : }
2818 7044 : if (res == LOG_OK)
2819 7044 : log_cleanup_range(lg, lg->saved_id);
2820 : }
2821 7044 : GDKfree(updated);
2822 7044 : return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
2823 : }
2824 :
2825 : /* Clean-up write-ahead log files already persisted in the BATs, leaving only the most recent one.
2826 : * Only the bak- files are deleted for the preserved WAL files.
2827 : */
2828 : lng
2829 33430 : log_changes(logger *lg)
2830 : {
2831 33430 : if (LOG_DISABLED(lg))
2832 : return 0;
2833 33430 : rotation_lock(lg);
2834 33430 : lng changes = lg->id - lg->saved_id - 1;
2835 33430 : rotation_unlock(lg);
2836 33430 : return changes;
2837 : }
2838 :
2839 : int
2840 488 : log_sequence(logger *lg, int seq, lng *id)
2841 : {
2842 488 : log_lock(lg);
2843 488 : BUN p = log_find(lg->seqs_id, lg->dseqs, seq);
2844 :
2845 488 : if (p != BUN_NONE) {
2846 369 : *id = *(lng *) Tloc(lg->seqs_val, p);
2847 :
2848 369 : log_unlock(lg);
2849 369 : return 1;
2850 : }
2851 119 : log_unlock(lg);
2852 119 : return 0;
2853 : }
2854 :
2855 : gdk_return
2856 199111 : log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng cnt)
2857 : {
2858 199111 : bte tpe = find_type(lg, type);
2859 199111 : gdk_return ok = GDK_SUCCEED;
2860 199111 : logformat l;
2861 199111 : lng nr;
2862 199111 : l.flag = LOG_UPDATE_CONST;
2863 199111 : l.id = id;
2864 199111 : nr = cnt;
2865 :
2866 199111 : if (LOG_DISABLED(lg) || !nr) {
2867 : /* logging is switched off */
2868 75248 : if (nr) {
2869 75248 : log_lock(lg);
2870 75248 : ok = la_bat_update_count(lg, id, offset + cnt, lg->tid);
2871 75248 : log_unlock(lg);
2872 : }
2873 75248 : return ok;
2874 : }
2875 :
2876 123863 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[type].atomWrite;
2877 :
2878 123863 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2879 247726 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
2880 247726 : log_write_format(lg, &l) != GDK_SUCCEED ||
2881 247726 : !mnstr_writeLng(lg->current->output_log, nr) ||
2882 247726 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2883 123863 : !mnstr_writeLng(lg->current->output_log, offset)) {
2884 0 : ATOMIC_DEC(&lg->current->refcount);
2885 0 : ok = GDK_FAIL;
2886 0 : goto bailout;
2887 : }
2888 :
2889 123863 : ok = wt(val, lg->current->output_log, 1);
2890 :
2891 123863 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
2892 :
2893 123863 : bailout:
2894 123863 : if (ok != GDK_SUCCEED) {
2895 0 : const char *err = mnstr_peek_error(lg->current->output_log);
2896 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
2897 : }
2898 : return ok;
2899 : }
2900 :
2901 : static gdk_return
2902 19677 : string_writer(logger *lg, BAT *b, lng offset, lng nr)
2903 : {
2904 19677 : size_t bufsz = lg->wbufsize, resize = 0;
2905 19677 : BUN end = (BUN) (offset + nr);
2906 19677 : char *buf = lg->wbuf;
2907 19677 : gdk_return res = GDK_SUCCEED;
2908 :
2909 19677 : if (!buf)
2910 : return GDK_FAIL;
2911 19677 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2912 19677 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
2913 : return GDK_FAIL;
2914 19677 : BATiter bi = bat_iterator(b);
2915 19677 : BUN p = (BUN) offset;
2916 39406 : for (; p < end;) {
2917 19729 : size_t sz = 0;
2918 19729 : if (resize) {
2919 2 : if ((buf = GDKrealloc(lg->wbuf, resize)) == NULL) {
2920 : res = GDK_FAIL;
2921 : break;
2922 : }
2923 2 : lg->wbuf = buf;
2924 2 : lg->wbufsize = bufsz = resize;
2925 2 : resize = 0;
2926 : }
2927 19729 : char *dst = buf;
2928 71532 : for (; p < end && sz < bufsz; p++) {
2929 51855 : const char *s = BUNtvar(bi, p);
2930 51855 : size_t len = strlen(s) + 1;
2931 51855 : if ((sz + len) > bufsz) {
2932 52 : if (len > bufsz)
2933 2 : resize = len + bufsz;
2934 : break;
2935 : } else {
2936 51803 : memcpy(dst, s, len);
2937 51803 : dst += len;
2938 51803 : sz += len;
2939 : }
2940 : }
2941 39456 : if (sz &&
2942 39454 : (!mnstr_writeLng(lg->current->output_log, (lng) sz) ||
2943 19727 : mnstr_write(lg->current->output_log, buf, sz, 1) != 1)) {
2944 : res = GDK_FAIL;
2945 : break;
2946 : }
2947 : }
2948 19677 : bat_iterator_end(&bi);
2949 19677 : return res;
2950 : }
2951 :
2952 : static gdk_return
2953 511868 : internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced, lng total_cnt)
2954 : {
2955 511868 : bte tpe = find_type(lg, b->ttype);
2956 511868 : gdk_return ok = GDK_SUCCEED;
2957 511868 : logformat l;
2958 511868 : BUN p;
2959 511868 : lng nr;
2960 511868 : l.flag = LOG_UPDATE_BULK;
2961 511868 : l.id = id;
2962 511868 : nr = cnt;
2963 :
2964 511868 : if (LOG_DISABLED(lg) || !nr) {
2965 : /* logging is switched off */
2966 215269 : if (nr)
2967 159561 : return la_bat_update_count(lg, id, offset + cnt, lg->tid);
2968 : return GDK_SUCCEED;
2969 : }
2970 :
2971 267777 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[b->ttype].atomWrite;
2972 :
2973 267777 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2974 267777 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR) {
2975 0 : ok = GDK_FAIL;
2976 0 : goto bailout;
2977 : }
2978 :
2979 267777 : if (lg->total_cnt == 0) /* signals single bulk message or first part of bat logged in parts */
2980 534890 : if (log_write_format(lg, &l) != GDK_SUCCEED ||
2981 662310 : !mnstr_writeLng(lg->current->output_log, total_cnt ? total_cnt : cnt) ||
2982 534890 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2983 407470 : !mnstr_writeLng(lg->current->output_log, total_cnt ? -1 : offset)) { /* offset = -1 indicates bat was logged in parts */
2984 0 : ok = GDK_FAIL;
2985 0 : goto bailout;
2986 : }
2987 267777 : if (!total_cnt)
2988 127420 : total_cnt = cnt;
2989 267777 : lg->total_cnt += cnt;
2990 :
2991 267777 : if (lg->total_cnt == total_cnt) /* This is the last to be logged part of this bat, we can already reset the total_cnt */
2992 267445 : lg->total_cnt = 0;
2993 :
2994 : /* if offset is just for the log, but BAT is already sliced, reset offset */
2995 267777 : if (sliced)
2996 1512 : offset = 0;
2997 267777 : BATiter bi = bat_iterator(b);
2998 267777 : if (b->ttype == TYPE_msk) {
2999 0 : if (offset % 32 == 0) {
3000 0 : if (!mnstr_writeIntArray(lg->current->output_log, (int *) ((char *) bi.base + offset / 32),
3001 0 : (size_t) ((nr + 31) / 32)))
3002 0 : ok = GDK_FAIL;
3003 : } else {
3004 0 : for (lng i = 0; i < nr; i += 32) {
3005 : uint32_t v = 0;
3006 0 : for (int j = 0; j < 32 && i + j < nr; j++)
3007 0 : v |= (uint32_t) Tmskval(&bi, (BUN) (offset + i + j)) << j;
3008 0 : if (!mnstr_writeInt(lg->current->output_log, (int) v)) {
3009 : ok = GDK_FAIL;
3010 : break;
3011 : }
3012 : }
3013 : }
3014 515240 : } else if (b->ttype < TYPE_str && bi.h->parentid == b->batCacheid) {
3015 247463 : const void *t = BUNtail(bi, (BUN) offset);
3016 :
3017 247463 : ok = wt(t, lg->current->output_log, (size_t) nr);
3018 20314 : } else if (b->ttype == TYPE_str) {
3019 : /* efficient string writes */
3020 19671 : ok = string_writer(lg, b, offset, nr);
3021 : } else {
3022 643 : BUN end = (BUN) (offset + nr);
3023 1644 : for (p = (BUN) offset; p < end && ok == GDK_SUCCEED; p++) {
3024 1001 : const void *t = BUNtail(bi, p);
3025 :
3026 1001 : ok = wt(t, lg->current->output_log, 1);
3027 : }
3028 : }
3029 267777 : bat_iterator_end(&bi);
3030 :
3031 267777 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
3032 :
3033 267777 : bailout:
3034 267777 : if (ok != GDK_SUCCEED) {
3035 0 : ATOMIC_DEC(&lg->current->refcount);
3036 0 : const char *err = mnstr_peek_error(lg->current->output_log);
3037 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
3038 : }
3039 : return ok;
3040 : }
3041 :
3042 : /*
3043 : * Changes made to the BAT descriptor should be stored in the log
3044 : * files. Actually, we need to save the descriptor file, perhaps we
3045 : * should simply introduce a versioning scheme.
3046 : */
3047 : gdk_return
3048 237333 : log_bat_persists(logger *lg, BAT *b, log_id id)
3049 : {
3050 237333 : log_lock(lg);
3051 237333 : bte ta = find_type(lg, b->ttype);
3052 237333 : logformat l;
3053 :
3054 237333 : if (log_add_bat(lg, b, id, -1) != GDK_SUCCEED) {
3055 0 : log_unlock(lg);
3056 0 : if (!LOG_DISABLED(lg))
3057 0 : ATOMIC_DEC(&lg->current->refcount);
3058 0 : return GDK_FAIL;
3059 : }
3060 :
3061 237333 : l.flag = LOG_CREATE;
3062 237333 : l.id = id;
3063 237333 : if (!LOG_DISABLED(lg)) {
3064 154718 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3065 309436 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3066 309436 : log_write_format(lg, &l) != GDK_SUCCEED ||
3067 154718 : mnstr_write(lg->current->output_log, &ta, 1, 1) != 1) {
3068 0 : log_unlock(lg);
3069 0 : ATOMIC_DEC(&lg->current->refcount);
3070 0 : return GDK_FAIL;
3071 : }
3072 : }
3073 237333 : TRC_DEBUG(WAL, "id (%d) bat (%d)\n", id, b->batCacheid);
3074 237333 : gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0, 0);
3075 237333 : log_unlock(lg);
3076 237333 : if (r != GDK_SUCCEED)
3077 0 : ATOMIC_DEC(&lg->current->refcount);
3078 : return r;
3079 : }
3080 :
3081 : gdk_return
3082 22302 : log_bat_transient(logger *lg, log_id id)
3083 : {
3084 22302 : log_lock(lg);
3085 22302 : log_bid bid = internal_find_bat(lg, id, -1);
3086 22302 : logformat l;
3087 :
3088 22302 : if (bid < 0) {
3089 0 : log_unlock(lg);
3090 0 : return GDK_FAIL;
3091 : }
3092 22302 : if (!bid) {
3093 0 : GDKerror("log_bat_transient failed to find bid for object %d\n", id);
3094 0 : log_unlock(lg);
3095 0 : return GDK_FAIL;
3096 : }
3097 22302 : l.flag = LOG_DESTROY;
3098 22302 : l.id = id;
3099 :
3100 22302 : if (!LOG_DISABLED(lg)) {
3101 9725 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3102 0 : TRC_CRITICAL(GDK, "write failed\n");
3103 0 : log_unlock(lg);
3104 0 : ATOMIC_DEC(&lg->current->refcount);
3105 0 : return GDK_FAIL;
3106 : }
3107 : }
3108 22302 : TRC_DEBUG(WAL, "Logged destroyed bat (%d) %d\n", id, bid);
3109 22302 : BAT *b = BBPquickdesc(bid);
3110 22302 : assert(b);
3111 22302 : BUN cnt = BATcount(b);
3112 22302 : ATOMIC_ADD(&lg->current->drops, cnt);
3113 22302 : gdk_return r = log_del_bat(lg, bid);
3114 22302 : log_unlock(lg);
3115 22302 : if (r != GDK_SUCCEED)
3116 0 : ATOMIC_DEC(&lg->current->refcount);
3117 : return r;
3118 : }
3119 :
3120 : static gdk_return
3121 118116 : log_bat_group(logger *lg, log_id id)
3122 : {
3123 118116 : if (LOG_DISABLED(lg))
3124 : return GDK_SUCCEED;
3125 :
3126 76672 : logformat l;
3127 76672 : l.flag = LOG_BAT_GROUP;
3128 76672 : l.id = id;
3129 76672 : gdk_return r = log_write_format(lg, &l);
3130 76672 : return r;
3131 : }
3132 :
3133 : gdk_return
3134 59058 : log_bat_group_start(logger *lg, log_id id)
3135 : {
3136 : /*positive table id represent start of logged table */
3137 59058 : return log_bat_group(lg, id);
3138 : }
3139 :
3140 : gdk_return
3141 59058 : log_bat_group_end(logger *lg, log_id id)
3142 : {
3143 : /*negative table id represent end of logged table */
3144 59058 : return log_bat_group(lg, -id);
3145 : }
3146 :
3147 : gdk_return
3148 271800 : log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, lng total_cnt)
3149 : {
3150 271800 : log_lock(lg);
3151 271800 : gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0, total_cnt);
3152 271800 : log_unlock(lg);
3153 271800 : return r;
3154 : }
3155 :
3156 : gdk_return
3157 2819 : log_delta(logger *lg, BAT *uid, BAT *uval, log_id id)
3158 : {
3159 2819 : log_lock(lg);
3160 2819 : bte tpe = find_type(lg, uval->ttype);
3161 2819 : gdk_return ok = GDK_SUCCEED;
3162 2819 : logformat l;
3163 2819 : BUN p;
3164 2819 : lng nr;
3165 :
3166 2819 : if (BATtdense(uid)) {
3167 2735 : ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1, 0);
3168 2735 : log_unlock(lg);
3169 2735 : if (!LOG_DISABLED(lg) && ok != GDK_SUCCEED)
3170 0 : ATOMIC_DEC(&lg->current->refcount);
3171 2735 : return ok;
3172 : }
3173 :
3174 84 : assert(uid->ttype == TYPE_oid || uid->ttype == TYPE_void);
3175 :
3176 84 : l.flag = LOG_UPDATE;
3177 84 : l.id = id;
3178 84 : nr = (BATcount(uval));
3179 84 : assert(nr);
3180 :
3181 84 : if (LOG_DISABLED(lg)) {
3182 : /* logging is switched off */
3183 65 : log_unlock(lg);
3184 65 : return GDK_SUCCEED;
3185 : }
3186 :
3187 19 : BATiter vi = bat_iterator(uval);
3188 19 : gdk_return(*wh) (const void *, stream *, size_t) = BATatoms[TYPE_oid].atomWrite;
3189 19 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[uval->ttype].atomWrite;
3190 :
3191 19 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3192 38 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3193 38 : log_write_format(lg, &l) != GDK_SUCCEED ||
3194 38 : !mnstr_writeLng(lg->current->output_log, nr) ||
3195 19 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1) {
3196 0 : ok = GDK_FAIL;
3197 0 : goto bailout;
3198 : }
3199 1074 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3200 1055 : const oid id = BUNtoid(uid, p);
3201 :
3202 1055 : ok = wh(&id, lg->current->output_log, 1);
3203 : }
3204 19 : if (uval->ttype == TYPE_msk) {
3205 0 : if (!mnstr_writeIntArray(lg->current->output_log, vi.base,
3206 0 : (BATcount(uval) + 31) / 32))
3207 0 : ok = GDK_FAIL;
3208 32 : } else if (uval->ttype < TYPE_str && !isVIEW(uval)) {
3209 13 : const void *t = BUNtail(vi, 0);
3210 :
3211 13 : ok = wt(t, lg->current->output_log, (size_t) nr);
3212 6 : } else if (uval->ttype == TYPE_str) {
3213 : /* efficient string writes */
3214 6 : ok = string_writer(lg, uval, 0, nr);
3215 : } else {
3216 0 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3217 0 : const void *val = BUNtail(vi, p);
3218 :
3219 0 : ok = wt(val, lg->current->output_log, 1);
3220 : }
3221 : }
3222 :
3223 19 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
3224 :
3225 19 : bailout:
3226 19 : bat_iterator_end(&vi);
3227 19 : if (ok != GDK_SUCCEED) {
3228 0 : const char *err = mnstr_peek_error(lg->current->output_log);
3229 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
3230 0 : ATOMIC_DEC(&lg->current->refcount);
3231 : }
3232 19 : log_unlock(lg);
3233 19 : return ok;
3234 : }
3235 :
3236 : static inline bool
3237 56595 : check_rotation_conditions(logger *lg)
3238 : {
3239 56595 : if (LOG_DISABLED(lg))
3240 : return false;
3241 :
3242 56592 : if (lg->current->next)
3243 : return false; /* do not rotate if there is already a prepared next current */
3244 56592 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
3245 : return true;
3246 56592 : const lng current_file_size = (lng) getfilepos(getFile(lg->current->output_log));
3247 :
3248 56592 : if (current_file_size == -1)
3249 : return false;
3250 :
3251 56592 : assert(current_file_size >= 0);
3252 :
3253 56592 : if (current_file_size == 2)
3254 : return false;
3255 :
3256 3173 : bool res = (lg->saved_id + 1 >= lg->id && ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped) ||
3257 57097 : current_file_size > lg->max_file_size ||
3258 49595 : (GDKusec() - lg->file_age) > lg->max_file_age;
3259 :
3260 53932 : return res;
3261 : }
3262 :
3263 : gdk_return
3264 62535 : log_tend(logger *lg)
3265 : {
3266 62535 : TRC_DEBUG(WAL, "tend %d\n", lg->tid);
3267 :
3268 62535 : if (LOG_DISABLED(lg))
3269 : return GDK_SUCCEED;
3270 :
3271 56592 : gdk_return result;
3272 56592 : logformat l;
3273 56592 : l.flag = LOG_END;
3274 56592 : l.id = lg->tid;
3275 :
3276 56592 : if ((result = log_write_format(lg, &l)) == GDK_SUCCEED)
3277 56592 : ATOMIC_INC(&lg->nr_flushers);
3278 : return result;
3279 : }
3280 :
3281 : #define flush_lock(lg) MT_lock_set(&(lg)->flush_lock)
3282 : #define flush_unlock(lg) MT_lock_unset(&(lg)->flush_lock)
3283 :
3284 : static inline gdk_return
3285 55962 : do_flush(logged_range *range)
3286 : {
3287 : /* assumes flush lock */
3288 55962 : stream *output_log = range->output_log;
3289 55962 : ulng ts = ATOMIC_GET(&range->last_ts);
3290 :
3291 55962 : if (mnstr_flush(output_log, MNSTR_FLUSH_DATA) ||
3292 55962 : (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) && mnstr_fsync(output_log)))
3293 0 : return GDK_FAIL;
3294 55962 : ATOMIC_SET(&range->flushed_ts, ts);
3295 55962 : return GDK_SUCCEED;
3296 : }
3297 :
3298 : static inline void
3299 62532 : log_tdone(logger *lg, logged_range *range, ulng commit_ts)
3300 : {
3301 62532 : (void) lg;
3302 62532 : TRC_DEBUG(WAL, "tdone " LLFMT "\n", commit_ts);
3303 :
3304 62532 : if ((ulng) ATOMIC_GET(&range->last_ts) < commit_ts)
3305 61902 : ATOMIC_SET(&range->last_ts, commit_ts);
3306 62532 : }
3307 :
3308 : gdk_return
3309 62535 : log_tflush(logger *lg, ulng file_id, ulng commit_ts)
3310 : {
3311 62535 : rotation_lock(lg);
3312 62535 : if (lg->flushnow) {
3313 5940 : logged_range *p = lg->current;
3314 5940 : assert(lg->flush_ranges == lg->current);
3315 5940 : assert(ATOMIC_GET(&lg->current->flushed_ts) == ATOMIC_GET(&lg->current->last_ts));
3316 5940 : log_tdone(lg, lg->current, commit_ts);
3317 5940 : ATOMIC_SET(&lg->current->flushed_ts, commit_ts);
3318 5940 : lg->id++;
3319 5940 : lg->flushnow = false;
3320 5940 : if (log_open_output(lg) != GDK_SUCCEED)
3321 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3322 5940 : do_rotate(lg);
3323 5940 : (void) do_flush_range_cleanup(lg);
3324 5940 : assert(lg->flush_ranges == lg->current);
3325 5940 : rotation_unlock(lg);
3326 5940 : return log_commit(lg, p, NULL, 0);
3327 : }
3328 :
3329 56595 : if (LOG_DISABLED(lg)) {
3330 3 : rotation_unlock(lg);
3331 3 : return GDK_SUCCEED;
3332 : }
3333 :
3334 56592 : logged_range *frange = do_flush_range_cleanup(lg);
3335 :
3336 56806 : while (frange->next && frange->id < file_id) {
3337 : assert(frange->next);
3338 : frange = frange->next;
3339 : }
3340 :
3341 56592 : log_tdone(lg, frange, commit_ts);
3342 :
3343 56592 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts) {
3344 : /* delay needed ? */
3345 :
3346 55962 : flush_lock(lg);
3347 : /* check it one more time */
3348 55962 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts)
3349 55962 : do_flush(frange);
3350 55962 : flush_unlock(lg);
3351 : }
3352 : /* else somebody else has flushed our log file */
3353 :
3354 56592 : if (ATOMIC_DEC(&frange->refcount) == 1 && !LOG_DISABLED(lg)) {
3355 55025 : if (frange != lg->current && frange->output_log) {
3356 153 : close_stream(frange->output_log);
3357 153 : frange->output_log = NULL;
3358 : }
3359 : }
3360 :
3361 56592 : if (ATOMIC_DEC(&lg->nr_flushers) == 0) {
3362 : /* I am the last flusher
3363 : * if present,
3364 : * wake up the exclusive flusher in log_tstart */
3365 : /* rotation_lock is still being held */
3366 55350 : MT_cond_signal(&lg->excl_flush_cv);
3367 : }
3368 56592 : rotation_unlock(lg);
3369 :
3370 56592 : return GDK_SUCCEED;
3371 : }
3372 :
3373 : static gdk_return
3374 6982 : log_tsequence_(logger *lg, int seq, lng val)
3375 : {
3376 6982 : logformat l;
3377 :
3378 6982 : if (LOG_DISABLED(lg))
3379 : return GDK_SUCCEED;
3380 2840 : l.flag = LOG_SEQ;
3381 2840 : l.id = seq;
3382 :
3383 2840 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3384 :
3385 2840 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3386 5680 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3387 5680 : log_write_format(lg, &l) != GDK_SUCCEED ||
3388 2840 : !mnstr_writeLng(lg->current->output_log, val)) {
3389 0 : TRC_CRITICAL(GDK, "write failed\n");
3390 0 : ATOMIC_DEC(&lg->current->refcount);
3391 0 : return GDK_FAIL;
3392 : }
3393 : return GDK_SUCCEED;
3394 : }
3395 :
3396 : /* a transaction in it self */
3397 : gdk_return
3398 6982 : log_tsequence(logger *lg, int seq, lng val)
3399 : {
3400 6982 : BUN p;
3401 :
3402 6982 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3403 :
3404 6982 : log_lock(lg);
3405 6982 : MT_lock_set(&lg->seqs_id->theaplock);
3406 6982 : BUN inserted = lg->seqs_id->batInserted;
3407 6982 : MT_lock_unset(&lg->seqs_id->theaplock);
3408 6982 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE && p >= inserted) {
3409 1624 : assert(lg->seqs_val->hseqbase == 0);
3410 1624 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
3411 0 : log_unlock(lg);
3412 0 : return GDK_FAIL;
3413 : }
3414 : } else {
3415 4990 : if (p != BUN_NONE) {
3416 4990 : oid pos = p;
3417 4990 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
3418 0 : log_unlock(lg);
3419 0 : return GDK_FAIL;
3420 : }
3421 : }
3422 10716 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
3423 5358 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
3424 0 : log_unlock(lg);
3425 0 : return GDK_FAIL;
3426 : }
3427 : }
3428 6982 : gdk_return r = log_tsequence_(lg, seq, val);
3429 6982 : log_unlock(lg);
3430 6982 : return r;
3431 : }
3432 :
3433 : static gdk_return
3434 13342 : bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
3435 : {
3436 13342 : log_lock(lg);
3437 13342 : BAT *b = lg->catalog_bid;
3438 13342 : const log_bid *bids;
3439 :
3440 13342 : bids = (log_bid *) Tloc(b, 0);
3441 250673 : for (BUN p = b->batInserted, cnt = pending ? pending->cnt : BATcount(b); p < cnt; p++) {
3442 237331 : log_bid bid = bids[p];
3443 237331 : BAT *lb = BBP_desc(bid);
3444 :
3445 237331 : assert(bid);
3446 237331 : if (lb->batCacheid == 0 || BATmode(lb, false) != GDK_SUCCEED) {
3447 0 : GDKwarning("Failed to set bat (%d%s) persistent\n", bid, !lb ? " gone" : "");
3448 0 : log_unlock(lg);
3449 0 : return GDK_FAIL;
3450 : }
3451 :
3452 237331 : assert(lb->batRestricted != BAT_WRITE);
3453 :
3454 237331 : TRC_DEBUG(WAL, "create %d (%d)\n", bid, BBP_lrefs(bid));
3455 : }
3456 : /* bm_subcommit releases the lock */
3457 13342 : return bm_subcommit(lg, pending, updated, maxupdated);
3458 : }
3459 :
3460 : static gdk_return
3461 237597 : log_add_bat(logger *lg, BAT *b, log_id id, int tid)
3462 : {
3463 237597 : log_bid bid = internal_find_bat(lg, id, tid);
3464 237597 : lng cnt = 0;
3465 237597 : lng lid = lng_nil;
3466 :
3467 237597 : assert(b->batRestricted != BAT_WRITE);
3468 237597 : assert(b->batRole == PERSISTENT);
3469 237597 : if (bid < 0)
3470 : return GDK_FAIL;
3471 237597 : if (bid) {
3472 155279 : if (bid != b->batCacheid) {
3473 155276 : if (log_del_bat(lg, bid) != GDK_SUCCEED)
3474 : return GDK_FAIL;
3475 : } else {
3476 : return GDK_SUCCEED;
3477 : }
3478 : }
3479 237594 : bid = b->batCacheid;
3480 237594 : TRC_DEBUG(WAL, "create %d\n", id);
3481 237594 : assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
3482 475188 : if (BUNappend(lg->catalog_bid, &bid, true) != GDK_SUCCEED ||
3483 475188 : BUNappend(lg->catalog_id, &id, true) != GDK_SUCCEED ||
3484 475188 : BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED ||
3485 237594 : BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
3486 0 : return GDK_FAIL;
3487 237594 : if (lg->current)
3488 237330 : lg->current->cnt++;
3489 237594 : BBPretain(bid);
3490 237594 : return GDK_SUCCEED;
3491 : }
3492 :
3493 : static gdk_return
3494 177626 : log_del_bat(logger *lg, log_bid bid)
3495 : {
3496 177626 : BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid);
3497 177626 : lng lid = lg->tid;
3498 :
3499 177626 : assert(p != BUN_NONE);
3500 177626 : if (p == BUN_NONE) {
3501 : GDKerror("cannot find BAT\n");
3502 : return GDK_FAIL;
3503 : }
3504 :
3505 177626 : assert(lg->catalog_lid->hseqbase == 0);
3506 177626 : return BUNreplace(lg->catalog_lid, p, &lid, false);
3507 : }
3508 :
3509 : /* returns -1 on failure, 0 when not found, > 0 when found */
3510 : log_bid
3511 33520 : log_find_bat(logger *lg, log_id id)
3512 : {
3513 33520 : log_lock(lg);
3514 33520 : log_bid bid = internal_find_bat(lg, id, -1);
3515 33520 : log_unlock(lg);
3516 33520 : if (!bid) {
3517 0 : GDKerror("logger_find_bat failed to find bid for object %d\n", id);
3518 0 : return GDK_FAIL;
3519 : }
3520 : return bid;
3521 : }
3522 :
3523 :
3524 :
3525 : gdk_return
3526 62536 : log_tstart(logger *lg, bool flushnow, ulng *file_id)
3527 : {
3528 62536 : rotation_lock(lg);
3529 62536 : if (flushnow) {
3530 5941 : if (file_id == NULL) {
3531 : /* special case: ask store_manager to rotate log file */
3532 1 : lg->file_age = 0;
3533 1 : rotation_unlock(lg);
3534 1 : return GDK_SUCCEED;
3535 : }
3536 : /* I am now the exclusive flusher */
3537 5940 : while (ATOMIC_GET(&lg->nr_flushers)) {
3538 : /* I am waiting until all existing flushers are done */
3539 0 : MT_cond_wait(&lg->excl_flush_cv, &lg->rotation_lock);
3540 : }
3541 5940 : assert(ATOMIC_GET(&lg->nr_flushers) == 0);
3542 :
3543 5940 : if (ATOMIC_GET(&lg->current->last_ts)) {
3544 2458 : lg->id++;
3545 2458 : if (log_open_output(lg) != GDK_SUCCEED)
3546 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3547 : }
3548 5940 : do_rotate(lg);
3549 5940 : (void) do_flush_range_cleanup(lg);
3550 5940 : rotation_unlock(lg);
3551 :
3552 5940 : if (lg->saved_id + 1 < lg->id)
3553 5428 : log_flush(lg, (1ULL << 63));
3554 5940 : lg->flushnow = flushnow;
3555 : } else {
3556 56595 : if (check_rotation_conditions(lg)) {
3557 4338 : lg->id++;
3558 4338 : if (log_open_output(lg) != GDK_SUCCEED)
3559 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3560 : }
3561 56595 : do_rotate(lg);
3562 56595 : rotation_unlock(lg);
3563 : }
3564 :
3565 62535 : if (LOG_DISABLED(lg))
3566 : return GDK_SUCCEED;
3567 :
3568 56592 : ATOMIC_INC(&lg->current->refcount);
3569 56592 : *file_id = lg->current->id;
3570 56592 : logformat l;
3571 56592 : l.flag = LOG_START;
3572 56592 : l.id = ++lg->tid;
3573 :
3574 56592 : TRC_DEBUG(WAL, "tstart %d\n", lg->tid);
3575 56592 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3576 0 : ATOMIC_DEC(&lg->current->refcount);
3577 0 : return GDK_FAIL;
3578 : }
3579 :
3580 : return GDK_SUCCEED;
3581 : }
3582 :
3583 : void
3584 118 : log_printinfo(logger *lg)
3585 : {
3586 118 : if (!rotation_trylock(lg, 1000)) {
3587 0 : printf("Logger is currently locked, so no logger information\n");
3588 0 : return;
3589 : }
3590 118 : printf("logger %s:\n", lg->fn);
3591 118 : printf("current log file "ULLFMT", last handled log file "ULLFMT"\n",
3592 : lg->id, lg->saved_id);
3593 118 : printf("current transaction id %d, saved transaction id %d\n",
3594 : lg->tid, lg->saved_tid);
3595 118 : printf("number of flushers: %d\n", (int) ATOMIC_GET(&lg->nr_flushers));
3596 118 : printf("number of catalog entries "BUNFMT", of which "BUNFMT" deleted\n",
3597 118 : lg->catalog_bid->batCount, lg->dcatalog->batCount);
3598 307 : for (logged_range *p = lg->pending; p; p = p->next) {
3599 189 : char buf[32];
3600 189 : if ((lg->debug & 128 || lg->inmemory) ||
3601 189 : p->output_log == NULL ||
3602 118 : snprintf(buf, sizeof(buf), ", file size %"PRIu64, (uint64_t) getfilepos(getFile(lg->current->output_log))) >= (int) sizeof(buf))
3603 71 : buf[0] = 0;
3604 260 : printf("pending range "ULLFMT": drops %"PRIu64", last_ts %"PRIu64", flushed_ts %"PRIu64", refcount %"PRIu64"%s%s\n", p->id, (uint64_t) ATOMIC_GET(&p->drops), (uint64_t) ATOMIC_GET(&p->last_ts), (uint64_t) ATOMIC_GET(&p->flushed_ts), (uint64_t) ATOMIC_GET(&p->refcount), buf, p == lg->current ? " (current)" : "");
3605 : }
3606 118 : rotation_unlock(lg);
3607 : }
|