Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024, 2025 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 : #include "gdk_logger.h"
17 : #include "gdk_logger_internals.h"
18 : #include "mutils.h"
19 : #include <string.h>
20 :
21 : static gdk_return log_add_bat(logger *lg, BAT *b, log_id id, int tid);
22 : static gdk_return log_del_bat(logger *lg, log_bid bid);
23 : /*
24 : * The logger uses a directory to store its log files. One master log
25 : * file stores information about the version of the logger and the
26 : * type mapping it uses. This file is a simple ascii file with the
27 : * following format:
28 : * {6DIGIT-VERSION\n[id,type_name\n]*}
29 : * The transaction log files have a binary format.
30 : */
31 :
32 : #define LOG_START 0
33 : #define LOG_END 1
34 : #define LOG_UPDATE_CONST 2
35 : #define LOG_UPDATE_BULK 3
36 : #define LOG_UPDATE 4
37 : #define LOG_CREATE 5
38 : #define LOG_DESTROY 6
39 : #define LOG_SEQ 7
40 : #define LOG_CLEAR 8 /* DEPRECATED */
41 : #define LOG_BAT_GROUP 9
42 :
43 : #ifdef NATIVE_WIN32
44 : #define getfilepos _ftelli64
45 : #else
46 : #ifdef HAVE_FSEEKO
47 : #define getfilepos ftello
48 : #else
49 : #define getfilepos ftell
50 : #endif
51 : #endif
52 :
53 : #define BATSIZE 0
54 :
55 : #define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory || (lg)->flushnow)
56 :
57 : static const char *log_commands[] = {
58 : "LOG_START",
59 : "LOG_END",
60 : "LOG_UPDATE_CONST",
61 : "LOG_UPDATE_BULK",
62 : "LOG_UPDATE",
63 : "LOG_CREATE",
64 : "LOG_DESTROY",
65 : "LOG_SEQ",
66 : "", /* LOG_CLEAR IS DEPRECATED */
67 : "LOG_BAT_GROUP",
68 : };
69 :
70 : typedef struct logaction {
71 : int type; /* type of change */
72 : lng nr;
73 : int tt;
74 : lng id;
75 : lng offset;
76 : log_id cid; /* id of object */
77 : BAT *b; /* temporary bat with changes */
78 : BAT *uid; /* temporary bat with bun positions to update */
79 : } logaction;
80 :
81 : /* during the recover process a number of transactions could be active */
82 : typedef struct trans {
83 : int tid; /* transaction id */
84 : int sz; /* sz of the changes array */
85 : int nr; /* nr of changes */
86 :
87 : logaction *changes;
88 :
89 : struct trans *tr;
90 : } trans;
91 :
92 : typedef struct logformat_t {
93 : bte flag;
94 : int id;
95 : } logformat;
96 :
97 : typedef enum { LOG_OK, LOG_EOF, LOG_ERR } log_return;
98 :
99 : static gdk_return bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated);
100 : static gdk_return tr_grow(trans *tr);
101 :
102 : #define log_lock(lg) MT_lock_set(&(lg)->lock)
103 : #define log_unlock(lg) MT_lock_unset(&(lg)->lock)
104 :
105 : static inline bte
106 964909 : find_type(logger *lg, int tpe)
107 : {
108 964909 : assert(tpe >= 0 && tpe < MAXATOMS);
109 964909 : return lg->type_id[tpe];
110 : }
111 :
112 : static inline int
113 550147 : find_type_nr(logger *lg, bte tpe)
114 : {
115 550147 : int nr = lg->type_nr[tpe < 0 ? 256 + tpe : tpe];
116 550147 : if (nr == 255)
117 0 : return -1;
118 : return nr;
119 : }
120 :
121 : static BUN
122 424050 : log_find(BAT *b, BAT *d, int val)
123 : {
124 424050 : BUN p;
125 :
126 424050 : assert(b->ttype == TYPE_int);
127 424050 : assert(d->ttype == TYPE_oid);
128 424050 : BATiter bi = bat_iterator(b);
129 424050 : if (BAThash(b) == GDK_SUCCEED) {
130 424050 : MT_rwlock_rdlock(&b->thashlock);
131 702069 : HASHloop_int(bi, b->thash, p, &val) {
132 185008 : oid pos = p;
133 185008 : if (BUNfnd(d, &pos) == BUN_NONE) {
134 185008 : MT_rwlock_rdunlock(&b->thashlock);
135 185008 : bat_iterator_end(&bi);
136 185008 : return p;
137 : }
138 : }
139 239042 : MT_rwlock_rdunlock(&b->thashlock);
140 : } else { /* unlikely: BAThash failed */
141 0 : int *t = (int *) bi.base;
142 :
143 0 : for (p = 0; p < bi.count; p++) {
144 0 : if (t[p] == val) {
145 0 : oid pos = p;
146 0 : if (BUNfnd(d, &pos) == BUN_NONE) {
147 0 : bat_iterator_end(&bi);
148 0 : return p;
149 : }
150 : }
151 : }
152 : }
153 239042 : bat_iterator_end(&bi);
154 239042 : return BUN_NONE;
155 : }
156 :
157 : static log_bid
158 661852 : internal_find_bat(logger *lg, log_id id, int tid)
159 : {
160 661852 : BUN p;
161 :
162 661852 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
163 661852 : BATiter cni = bat_iterator(lg->catalog_id);
164 661852 : MT_rwlock_rdlock(&cni.b->thashlock);
165 661852 : if (tid < 0) {
166 432316 : HASHloop_int(cni, cni.b->thash, p, &id) {
167 213111 : oid pos = p;
168 213111 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
169 213111 : MT_rwlock_rdunlock(&cni.b->thashlock);
170 213111 : bat_iterator_end(&cni);
171 213111 : return *(log_bid *) Tloc(lg->catalog_bid, p);
172 : }
173 : }
174 : } else {
175 365874 : BUN cp = BUN_NONE;
176 1703128 : HASHloop_int(cni, cni.b->thash, p, &id) {
177 1152727 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
178 1152727 : if (lid != lng_nil && lid <= tid) {
179 : break;
180 : }
181 : cp = p;
182 : }
183 365874 : if (cp != BUN_NONE) {
184 365609 : MT_rwlock_rdunlock(&cni.b->thashlock);
185 365609 : bat_iterator_end(&cni);
186 365609 : return *(log_bid *) Tloc(lg->catalog_bid, cp);
187 : }
188 : }
189 83132 : MT_rwlock_rdunlock(&cni.b->thashlock);
190 83132 : bat_iterator_end(&cni);
191 83132 : return 0; /* not found */
192 : }
193 : return -1; /* error creating hash */
194 : }
195 :
196 : static inline void
197 14994 : logbat_destroy(BAT *b)
198 : {
199 18553 : BBPreclaim(b);
200 4050 : }
201 :
202 : static BAT *
203 15250 : logbat_new(int tt, BUN size, role_t role)
204 : {
205 15250 : BAT *nb = COLnew(0, tt, size, role);
206 :
207 15250 : if (nb) {
208 15250 : BBP_pid(nb->batCacheid) = 0;
209 15250 : if (role == PERSISTENT) {
210 9558 : BATmode(nb, false);
211 9558 : nb = BATsetaccess(nb, BAT_READ);
212 : }
213 : } else {
214 0 : TRC_CRITICAL(GDK, "creating new BAT[%s]#" BUNFMT " failed\n", ATOMname(tt), size);
215 : }
216 15250 : return nb;
217 : }
218 :
219 : static bool
220 763834 : log_read_format(logger *lg, logformat *data)
221 : {
222 763834 : assert(!lg->inmemory);
223 763834 : if (mnstr_read(lg->input_log, &data->flag, 1, 1) == 1) {
224 751201 : if (mnstr_readInt(lg->input_log, &data->id) == 1)
225 : return true;
226 : /* could only read part, so complain */
227 0 : TRC_CRITICAL(GDK, "read failed\n");
228 : }
229 : return false;
230 : }
231 :
232 : static gdk_return
233 755892 : log_write_format(logger *lg, logformat *data)
234 : {
235 755892 : assert(data->id || data->flag);
236 755892 : assert(!lg->inmemory);
237 755892 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
238 1511784 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
239 1511784 : mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
240 755892 : mnstr_writeInt(lg->current->output_log, data->id))
241 : return GDK_SUCCEED;
242 0 : TRC_CRITICAL(GDK, "write failed\n");
243 0 : return GDK_FAIL;
244 : }
245 :
246 : static log_return
247 2795 : log_read_seq(logger *lg, logformat *l)
248 : {
249 2795 : int seq = l->id;
250 2795 : lng val;
251 2795 : BUN p;
252 :
253 2795 : assert(!lg->inmemory);
254 2795 : if (mnstr_readLng(lg->input_log, &val) != 1) {
255 0 : TRC_CRITICAL(GDK, "read failed\n");
256 0 : return LOG_EOF;
257 : }
258 2795 : if (lg->flushing)
259 : return LOG_OK;
260 :
261 65 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
262 64 : p >= lg->seqs_id->batInserted) {
263 40 : assert(lg->seqs_val->hseqbase == 0);
264 40 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
265 0 : TRC_CRITICAL(GDK, "replace of %s_seqs_val failed\n", lg->fn);
266 0 : return LOG_ERR;
267 : }
268 : } else {
269 24 : if (p != BUN_NONE) {
270 24 : oid pos = p;
271 24 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
272 0 : TRC_CRITICAL(GDK, "append to %s_dseqs failed\n", lg->fn);
273 0 : return LOG_ERR;
274 : }
275 : }
276 50 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
277 25 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
278 0 : TRC_CRITICAL(GDK, "append to %s_seqs_val/id failed\n", lg->fn);
279 0 : return LOG_ERR;
280 : }
281 : }
282 : return LOG_OK;
283 : }
284 :
285 : #if 0
286 : static gdk_return
287 : log_write_id(logger *lg, int id)
288 : {
289 : assert(!lg->inmemory);
290 : assert(id >= 0);
291 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
292 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
293 : mnstr_writeInt(lg->current->output_log, id))
294 : return GDK_SUCCEED;
295 : TRC_CRITICAL(GDK, "write failed\n");
296 : return GDK_FAIL;
297 : }
298 :
299 : static log_return
300 : log_read_id(logger *lg, log_id *id)
301 : {
302 : assert(!lg->inmemory);
303 : if (mnstr_readInt(lg->input_log, id) != 1) {
304 : TRC_CRITICAL(GDK, "read failed\n");
305 : return LOG_EOF;
306 : }
307 : return LOG_OK;
308 : }
309 : #endif
310 :
311 : static log_return
312 19266 : string_reader(logger *lg, BAT *b, lng nr)
313 : {
314 19266 : size_t sz = 0;
315 19266 : lng SZ = 0;
316 19266 : log_return res = LOG_OK;
317 :
318 38755 : while (nr && res == LOG_OK) {
319 19489 : if (mnstr_readLng(lg->input_log, &SZ) != 1) {
320 0 : TRC_CRITICAL(GDK, "read failed\n");
321 0 : return LOG_EOF;
322 : }
323 19489 : sz = (size_t) SZ;
324 19489 : char *buf = lg->rbuf;
325 19489 : if (lg->rbufsize < sz) {
326 2 : if (!(buf = GDKrealloc(lg->rbuf, sz))) {
327 0 : TRC_CRITICAL(GDK, "couldn't grow string buffer\n");
328 0 : return LOG_ERR;
329 : }
330 2 : lg->rbuf = buf;
331 2 : lg->rbufsize = sz;
332 : }
333 :
334 19489 : if (mnstr_read(lg->input_log, buf, sz, 1) != 1) {
335 0 : TRC_CRITICAL(GDK, "read failed\n");
336 0 : return LOG_EOF;
337 : }
338 : /* handle strings */
339 : char *t = buf;
340 : /* chunked */
341 : #define CHUNK_SIZE 1024
342 : char *strings[CHUNK_SIZE];
343 : int cur = 0;
344 :
345 91098 : for (; nr > 0 && res == LOG_OK && t < (buf + sz); nr--) {
346 71609 : strings[cur++] = t;
347 71609 : if (cur == CHUNK_SIZE &&
348 4 : b &&
349 4 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
350 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
351 0 : res = LOG_ERR;
352 : }
353 71609 : if (cur == CHUNK_SIZE)
354 33 : cur = 0;
355 : /* find next */
356 5561807 : while (*t)
357 5490198 : t++;
358 71609 : t++;
359 : }
360 19489 : if (cur &&
361 332 : b &&
362 332 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
363 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
364 0 : res = LOG_ERR;
365 : }
366 : }
367 : return res;
368 : }
369 :
370 :
371 : struct offset {
372 : lng os; /* offset within source BAT in logfile */
373 : lng nr; /* number of values to be copied */
374 : lng od; /* offset within destination BAT in database */
375 : };
376 :
377 : static log_return
378 393803 : log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, BAT **cands)
379 : {
380 393803 : log_return res = LOG_OK;
381 393803 : lng nr, pnr;
382 393803 : bte type_id = -1;
383 393803 : int tpe;
384 :
385 393803 : assert(!lg->inmemory);
386 393803 : TRC_DEBUG(WAL, "found %d %s", id, l->flag == LOG_UPDATE ? "update" : "update_buld");
387 :
388 787606 : if (mnstr_readLng(lg->input_log, &nr) != 1 ||
389 393803 : mnstr_read(lg->input_log, &type_id, 1, 1) != 1) {
390 0 : TRC_CRITICAL(GDK, "read failed\n");
391 0 : return LOG_EOF;
392 : }
393 :
394 393803 : pnr = nr;
395 393803 : tpe = find_type_nr(lg, type_id);
396 393803 : if (tpe >= 0) {
397 393803 : BAT *uid = NULL;
398 393803 : BAT *r = NULL;
399 393803 : void *(*rt)(ptr, size_t *, stream *, size_t) = BATatoms[tpe].atomRead;
400 393803 : lng offset;
401 :
402 393803 : assert(nr <= (lng) BUN_MAX);
403 393803 : if (!lg->flushing && l->flag == LOG_UPDATE) {
404 0 : uid = COLnew(0, TYPE_oid, (BUN) nr, PERSISTENT);
405 0 : if (uid == NULL) {
406 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
407 28244 : return LOG_ERR;
408 : }
409 : }
410 :
411 393803 : if (l->flag == LOG_UPDATE_CONST) {
412 123277 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
413 0 : TRC_CRITICAL(GDK, "read failed\n");
414 0 : return LOG_EOF;
415 : }
416 123277 : if (cands) {
417 : /* This const range actually represents a segment of candidates corresponding to updated bat entries */
418 :
419 28244 : if (BATcount(*cands) == 0 || lg->flushing) {
420 : /* when flushing, we only need the offset and count of the last segment of inserts. */
421 28244 : assert((*cands)->ttype == TYPE_void);
422 28244 : BATtseqbase(*cands, (oid) offset);
423 28244 : BATsetcount(*cands, (BUN) nr);
424 0 : } else if (!lg->flushing) {
425 0 : assert(BATcount(*cands) > 0);
426 0 : BAT *dense = BATdense(0, (oid) offset, (BUN) nr);
427 0 : BAT *newcands = NULL;
428 0 : if (!dense) {
429 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
430 0 : res = LOG_ERR;
431 0 : } else if ((*cands)->ttype == TYPE_void) {
432 0 : if ((newcands = BATmergecand(*cands, dense))) {
433 0 : BBPreclaim(*cands);
434 0 : *cands = newcands;
435 : } else {
436 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
437 0 : res = LOG_ERR;
438 : }
439 : } else {
440 0 : assert((*cands)->ttype == TYPE_oid);
441 0 : assert(BATcount(*cands) > 0);
442 0 : if (BATappend(*cands, dense, NULL, true) != GDK_SUCCEED) {
443 0 : TRC_CRITICAL(GDK, "appending to bat failed\n");
444 0 : res = LOG_ERR;
445 : }
446 : }
447 0 : BBPreclaim(dense);
448 : }
449 :
450 : /* We have to read the value to update the read cursor */
451 28244 : size_t tlen = lg->rbufsize;
452 28244 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
453 28244 : if (t == NULL) {
454 0 : TRC_CRITICAL(GDK, "read failed\n");
455 0 : res = LOG_EOF;
456 : }
457 28244 : return res;
458 : }
459 : }
460 :
461 365559 : if (!lg->flushing) {
462 1476 : r = COLnew(0, tpe, (BUN) nr, PERSISTENT);
463 1476 : if (r == NULL) {
464 0 : if (uid)
465 0 : BBPreclaim(uid);
466 0 : return LOG_ERR;
467 : }
468 : }
469 :
470 365559 : if (l->flag == LOG_UPDATE_CONST) {
471 95033 : size_t tlen = lg->rbufsize;
472 95033 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
473 95033 : if (t == NULL) {
474 0 : TRC_CRITICAL(GDK, "read failed\n");
475 0 : res = LOG_EOF;
476 : } else {
477 95033 : lg->rbuf = t;
478 95033 : lg->rbufsize = tlen;
479 95033 : if (r) {
480 8208 : for (BUN p = 0; p < (BUN) nr; p++) {
481 7937 : if (BUNappend(r, t, true) != GDK_SUCCEED) {
482 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
483 0 : res = LOG_ERR;
484 : }
485 : }
486 : }
487 : }
488 270526 : } else if (l->flag == LOG_UPDATE_BULK) {
489 270507 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
490 0 : if (r)
491 0 : BBPreclaim(r);
492 0 : TRC_CRITICAL(GDK, "read failed\n");
493 0 : return LOG_EOF;
494 : }
495 270507 : if (tpe == TYPE_msk) {
496 0 : if (r) {
497 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
498 0 : BATsetcount(r, (BUN) nr);
499 : else {
500 0 : TRC_CRITICAL(GDK, "read failed\n");
501 0 : res = LOG_EOF;
502 : }
503 : } else {
504 0 : size_t tlen = lg->rbufsize / sizeof(int);
505 0 : size_t cnt = 0, snr = (size_t) nr;
506 0 : snr = (snr + 31) / 32;
507 0 : assert(tlen);
508 0 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
509 0 : cnt = snr > tlen ? tlen : snr;
510 0 : if (!mnstr_readIntArray(lg->input_log, lg->rbuf, cnt)) {
511 0 : TRC_CRITICAL(GDK, "read failed\n");
512 0 : res = LOG_EOF;
513 : }
514 : }
515 : }
516 : } else {
517 270507 : if (!ATOMvarsized(tpe)) {
518 250664 : size_t cnt = 0, snr = (size_t) nr;
519 250664 : size_t tlen = lg->rbufsize / ATOMsize(tpe), ntlen = lg->rbufsize;
520 250664 : assert(tlen);
521 : /* read in chunks of max
522 : * BUFSIZE/width rows */
523 501332 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
524 250668 : cnt = snr > tlen ? tlen : snr;
525 250668 : void *t = rt(lg->rbuf, &ntlen, lg->input_log, cnt);
526 :
527 250668 : if (t == NULL) {
528 : res = LOG_EOF;
529 : break;
530 : }
531 250668 : assert(t == lg->rbuf);
532 250668 : if (r && BUNappendmulti(r, t, cnt, true) != GDK_SUCCEED) {
533 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
534 0 : res = LOG_ERR;
535 : }
536 : }
537 19843 : } else if (tpe == TYPE_str) {
538 : /* efficient string */
539 19260 : res = string_reader(lg, r, nr);
540 : } else {
541 1208 : for (; res == LOG_OK && nr > 0; nr--) {
542 625 : size_t tlen = lg->rbufsize;
543 625 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
544 :
545 625 : if (t == NULL) {
546 : /* see if failure was due to
547 : * malloc or something less
548 : * serious (in the current
549 : * context) */
550 0 : if (strstr(GDKerrbuf, "alloc") == NULL)
551 : res = LOG_EOF;
552 : else
553 0 : res = LOG_ERR;
554 0 : TRC_CRITICAL(GDK, "read failed\n");
555 : } else {
556 625 : lg->rbuf = t;
557 625 : lg->rbufsize = tlen;
558 625 : if (r && BUNappend(r, t, true) != GDK_SUCCEED) {
559 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
560 0 : res = LOG_ERR;
561 : }
562 : }
563 : }
564 : }
565 : }
566 : } else {
567 19 : void *(*rh)(ptr, size_t *, stream *, size_t) = BATatoms[TYPE_oid].atomRead;
568 19 : void *hv = ATOMnil(TYPE_oid);
569 19 : offset = 0;
570 :
571 19 : if (hv == NULL) {
572 0 : TRC_CRITICAL(GDK, "read failed\n");
573 0 : res = LOG_EOF;
574 : }
575 1074 : for (; res == LOG_OK && nr > 0; nr--) {
576 1055 : size_t hlen = sizeof(oid);
577 1055 : void *h = rh(hv, &hlen, lg->input_log, 1);
578 1055 : assert(hlen == sizeof(oid));
579 1055 : assert(h == hv);
580 1055 : if ((uid && BUNappend(uid, h, true) != GDK_SUCCEED)) {
581 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
582 0 : res = LOG_ERR;
583 : }
584 : }
585 19 : nr = pnr;
586 19 : if (tpe == TYPE_msk) {
587 0 : if (r) {
588 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
589 0 : BATsetcount(r, (BUN) nr);
590 : else {
591 0 : TRC_CRITICAL(GDK, "read failed\n");
592 0 : res = LOG_EOF;
593 : }
594 : } else {
595 0 : for (lng i = 0; i < nr; i += 32) {
596 0 : int v;
597 0 : switch (mnstr_readInt(lg->input_log, &v)) {
598 0 : case 1:
599 0 : continue;
600 : case 0:
601 : res = LOG_EOF;
602 : break;
603 : default:
604 : res = LOG_ERR;
605 : break;
606 : }
607 0 : TRC_CRITICAL(GDK, "read failed\n");
608 0 : break;
609 : }
610 : }
611 19 : } else if (tpe == TYPE_str) {
612 : /* efficient string */
613 6 : res = string_reader(lg, r, nr);
614 : } else {
615 61 : for (; res == LOG_OK && nr > 0; nr--) {
616 48 : size_t tlen = lg->rbufsize;
617 48 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
618 :
619 48 : if (t == NULL) {
620 0 : if (strstr(GDKerrbuf, "malloc") == NULL)
621 : res = LOG_EOF;
622 : else
623 0 : res = LOG_ERR;
624 0 : TRC_CRITICAL(GDK, "read failed\n");
625 : } else {
626 48 : lg->rbuf = t;
627 48 : lg->rbufsize = tlen;
628 48 : if ((r && BUNappend(r, t, true) != GDK_SUCCEED)) {
629 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
630 0 : res = LOG_ERR;
631 : }
632 : }
633 : }
634 : }
635 19 : GDKfree(hv);
636 : }
637 :
638 365559 : if (res == LOG_OK) {
639 365559 : if (tr_grow(tr) == GDK_SUCCEED) {
640 365559 : tr->changes[tr->nr].type = l->flag;
641 365559 : if (l->flag == LOG_UPDATE_BULK && offset == -1) {
642 141712 : assert(cands); /* bat r is part of a group of bats logged together. */
643 141712 : struct canditer ci;
644 141712 : canditer_init(&ci, NULL, *cands);
645 141712 : const oid first = canditer_peek(&ci);
646 141712 : const oid last = canditer_last(&ci);
647 141712 : offset = (lng) first;
648 141712 : pnr = (lng) (last - first) + 1;
649 141712 : if (!lg->flushing) {
650 1098 : assert(uid == NULL);
651 1098 : uid = *cands;
652 1098 : BBPfix((*cands)->batCacheid);
653 1098 : tr->changes[tr->nr].type = LOG_UPDATE;
654 : }
655 : }
656 365559 : if (l->flag == LOG_UPDATE_CONST) {
657 95033 : assert(!cands); /* TODO: This might change in the future. */
658 95033 : tr->changes[tr->nr].type = LOG_UPDATE_BULK;
659 : }
660 365559 : tr->changes[tr->nr].nr = pnr;
661 365559 : tr->changes[tr->nr].tt = tpe;
662 365559 : tr->changes[tr->nr].cid = id;
663 365559 : tr->changes[tr->nr].offset = offset;
664 365559 : tr->changes[tr->nr].b = r;
665 365559 : tr->changes[tr->nr].uid = uid;
666 365559 : tr->nr++;
667 : } else {
668 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
669 0 : res = LOG_ERR;
670 : }
671 : }
672 365559 : if (res != LOG_OK) {
673 0 : if (r)
674 0 : BBPreclaim(r);
675 0 : if (cands && uid)
676 0 : BBPunfix((*cands)->batCacheid);
677 0 : else if (uid)
678 0 : BBPreclaim(uid);
679 : }
680 : } else {
681 : /* bat missing ERROR or ignore ? currently error. */
682 0 : TRC_CRITICAL(GDK, "unknown type\n");
683 0 : res = LOG_ERR;
684 : }
685 : return res;
686 : }
687 :
688 :
689 : static gdk_return
690 608109 : la_bat_update_count(logger *lg, log_id id, lng cnt, int tid)
691 : {
692 608109 : BATiter cni = bat_iterator_nolock(lg->catalog_id);
693 :
694 608109 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
695 608109 : MT_rwlock_rdlock(&cni.b->thashlock);
696 608109 : BUN p, cp = BUN_NONE;
697 :
698 2394612 : HASHloop_int(cni, cni.b->thash, p, &id) {
699 1397352 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
700 :
701 1397352 : if (lid != lng_nil && lid <= tid)
702 : break;
703 : cp = p;
704 : }
705 608109 : if (cp != BUN_NONE) {
706 607901 : lng ocnt = *(lng *) Tloc(lg->catalog_cnt, cp);
707 607901 : assert(lg->catalog_cnt->hseqbase == 0);
708 607901 : if (ocnt < cnt && BUNreplace(lg->catalog_cnt, cp, &cnt, false) != GDK_SUCCEED) {
709 0 : MT_rwlock_rdunlock(&cni.b->thashlock);
710 0 : return GDK_FAIL;
711 : }
712 : }
713 608109 : MT_rwlock_rdunlock(&cni.b->thashlock);
714 608109 : return GDK_SUCCEED;
715 : }
716 : return GDK_FAIL;
717 : }
718 :
719 : static gdk_return
720 365559 : la_bat_updates(logger *lg, logaction *la, int tid)
721 : {
722 365559 : log_bid bid = internal_find_bat(lg, la->cid, tid);
723 365559 : BAT *b = NULL;
724 :
725 365559 : if (bid < 0)
726 : return GDK_FAIL;
727 365559 : if (!bid) {
728 : /* object already gone, nothing needed */
729 : return GDK_SUCCEED;
730 : }
731 :
732 365553 : if (!lg->flushing) {
733 1476 : b = BATdescriptor(bid);
734 1476 : if (b == NULL)
735 : return GDK_FAIL;
736 : }
737 365553 : BUN cnt = 0;
738 365553 : if (la->type == LOG_UPDATE_BULK) {
739 364436 : if (!lg->flushing) {
740 378 : cnt = BATcount(b);
741 378 : int is_msk = (b->ttype == TYPE_msk);
742 : /* handle offset 0 ie clear */
743 378 : if ( /* DISABLES CODE */ (0) && la->offset == 0 && cnt)
744 : BATclear(b, true);
745 : /* handle offset */
746 378 : if (cnt <= (BUN) la->offset) {
747 237 : msk t = 1;
748 237 : if (cnt < (BUN) la->offset) { /* insert nils */
749 0 : const void *tv = (is_msk) ? &t : ATOMnilptr(b->ttype);
750 0 : lng i, d = la->offset - BATcount(b);
751 0 : for (i = 0; i < d; i++) {
752 0 : if (BUNappend(b, tv, true) != GDK_SUCCEED) {
753 0 : logbat_destroy(b);
754 0 : return GDK_FAIL;
755 : }
756 : }
757 : }
758 237 : if (BATcount(b) == (BUN) la->offset && BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
759 0 : logbat_destroy(b);
760 0 : return GDK_FAIL;
761 : }
762 : } else {
763 141 : BATiter vi = bat_iterator(la->b);
764 141 : BUN p, q;
765 :
766 2206 : for (p = 0, q = (BUN) la->offset; p < (BUN) la->nr; p++, q++) {
767 2065 : const void *t = BUNtail(vi, p);
768 :
769 2065 : if (q < cnt) {
770 2064 : if (b->tnosorted == q)
771 7 : b->tnosorted = 0;
772 2064 : if (b->tnorevsorted == q)
773 7 : b->tnorevsorted = 0;
774 2064 : if (b->tnokey[0] == q ||
775 2057 : b->tnokey[1] == q) {
776 7 : b->tnokey[0] = 0;
777 7 : b->tnokey[1] = 0;
778 : }
779 2064 : if (b->tminpos == q)
780 5 : b->tminpos = BUN_NONE;
781 2064 : if (b->tmaxpos == q)
782 3 : b->tmaxpos = BUN_NONE;
783 2064 : b->tkey = false;
784 2064 : b->tsorted = false;
785 2064 : b->tkey = false;
786 2064 : if (BUNreplace(b, q, t, true) != GDK_SUCCEED) {
787 0 : logbat_destroy(b);
788 0 : bat_iterator_end(&vi);
789 0 : return GDK_FAIL;
790 : }
791 : } else {
792 1 : if (BUNappend(b, t, true) != GDK_SUCCEED) {
793 0 : logbat_destroy(b);
794 0 : bat_iterator_end(&vi);
795 0 : return GDK_FAIL;
796 : }
797 : }
798 : }
799 141 : bat_iterator_end(&vi);
800 : }
801 : }
802 1117 : } else if (la->type == LOG_UPDATE) {
803 1117 : if (!lg->flushing && BATupdate(b, la->uid, la->b, true) != GDK_SUCCEED) {
804 : return GDK_FAIL;
805 : }
806 : }
807 365553 : cnt = (BUN) (la->offset + la->nr);
808 365553 : if (la_bat_update_count(lg, la->cid, cnt, tid) != GDK_SUCCEED) {
809 0 : if (b)
810 0 : logbat_destroy(b);
811 0 : return GDK_FAIL;
812 : }
813 365553 : if (b)
814 1476 : logbat_destroy(b);
815 : return GDK_SUCCEED;
816 : }
817 :
818 : static log_return
819 9477 : log_read_destroy(logger *lg, trans *tr, log_id id)
820 : {
821 9477 : (void) lg;
822 9477 : assert(!lg->inmemory);
823 9477 : if (tr_grow(tr) == GDK_SUCCEED) {
824 9477 : tr->changes[tr->nr].type = LOG_DESTROY;
825 9477 : tr->changes[tr->nr].cid = id;
826 9477 : tr->nr++;
827 9477 : return LOG_OK;
828 : }
829 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
830 0 : return LOG_ERR;
831 : }
832 :
833 : static gdk_return
834 48 : la_bat_destroy(logger *lg, logaction *la, int tid)
835 : {
836 48 : log_bid bid = internal_find_bat(lg, la->cid, tid);
837 :
838 48 : if (bid < 0)
839 : return GDK_FAIL;
840 48 : if (!bid) {
841 : #ifndef NDEBUG
842 0 : GDKwarning("failed to find bid for object %d\n", la->cid);
843 : #endif
844 0 : return GDK_SUCCEED;
845 : }
846 48 : if (bid && log_del_bat(lg, bid) != GDK_SUCCEED)
847 : return GDK_FAIL;
848 : return GDK_SUCCEED;
849 : }
850 :
851 : static log_return
852 156344 : log_read_create(logger *lg, trans *tr, log_id id)
853 : {
854 156344 : bte tt;
855 156344 : int tpe;
856 :
857 156344 : assert(!lg->inmemory);
858 156344 : TRC_DEBUG(WAL, "create %d", id);
859 :
860 156344 : if (mnstr_read(lg->input_log, &tt, 1, 1) != 1) {
861 0 : TRC_CRITICAL(GDK, "read failed\n");
862 0 : return LOG_EOF;
863 : }
864 :
865 156344 : tpe = find_type_nr(lg, tt);
866 : /* read create */
867 156344 : if (tr_grow(tr) == GDK_SUCCEED) {
868 156344 : tr->changes[tr->nr].type = LOG_CREATE;
869 156344 : tr->changes[tr->nr].tt = tpe;
870 156344 : tr->changes[tr->nr].cid = id;
871 156344 : tr->nr++;
872 156344 : return LOG_OK;
873 : }
874 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
875 0 : return LOG_ERR;
876 : }
877 :
878 : static gdk_return
879 267 : la_bat_create(logger *lg, logaction *la, int tid)
880 : {
881 267 : BAT *b;
882 :
883 : /* formerly head column type, should be void */
884 267 : if ((b = COLnew(0, la->tt, BATSIZE, PERSISTENT)) == NULL)
885 : return GDK_FAIL;
886 :
887 267 : if (la->tt < 0)
888 0 : BATtseqbase(b, 0);
889 :
890 534 : if ((b = BATsetaccess(b, BAT_READ)) == NULL ||
891 267 : log_add_bat(lg, b, la->cid, tid) != GDK_SUCCEED) {
892 0 : logbat_destroy(b);
893 0 : return GDK_FAIL;
894 : }
895 267 : logbat_destroy(b);
896 267 : return GDK_SUCCEED;
897 : }
898 :
899 : static gdk_return
900 245 : log_write_new_types(logger *lg, FILE *fp)
901 : {
902 245 : bte id = 0;
903 :
904 : /* write types and insert into bats */
905 245 : memset(lg->type_id, -1, sizeof(lg->type_id));
906 245 : memset(lg->type_nr, 255, sizeof(lg->type_nr));
907 : /* first the fixed sized types */
908 6858 : for (int i = 0; i < GDKatomcnt; i++) {
909 6613 : if (ATOMvarsized(i))
910 1469 : continue;
911 5144 : lg->type_id[i] = id;
912 5144 : lg->type_nr[id] = i;
913 5144 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
914 : return GDK_FAIL;
915 5144 : id++;
916 : }
917 : /* second the var sized types */
918 : id = -127; /* start after nil */
919 6858 : for (int i = 0; i < GDKatomcnt; i++) {
920 6613 : if (!ATOMvarsized(i))
921 5144 : continue;
922 1469 : lg->type_id[i] = id;
923 1469 : lg->type_nr[256 + id] = i;
924 1469 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
925 : return GDK_FAIL;
926 1469 : id++;
927 : }
928 : return GDK_SUCCEED;
929 : }
930 :
931 : #define TR_SIZE 1024
932 :
933 : static trans *
934 56562 : tr_create(trans *tr, int tid)
935 : {
936 56562 : trans *ntr = GDKmalloc(sizeof(trans));
937 :
938 56562 : if (ntr == NULL)
939 : return NULL;
940 56562 : ntr->tid = tid;
941 56562 : ntr->sz = TR_SIZE;
942 56562 : ntr->nr = 0;
943 56562 : ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
944 56562 : if (ntr->changes == NULL) {
945 0 : GDKfree(ntr);
946 0 : return NULL;
947 : }
948 56562 : ntr->tr = tr;
949 56562 : return ntr;
950 : }
951 :
952 : static gdk_return
953 531380 : la_apply(logger *lg, logaction *c, int tid)
954 : {
955 531380 : gdk_return ret = GDK_SUCCEED;
956 :
957 531380 : switch (c->type) {
958 365559 : case LOG_UPDATE_BULK:
959 : case LOG_UPDATE:
960 365559 : ret = la_bat_updates(lg, c, tid);
961 365559 : break;
962 156344 : case LOG_CREATE:
963 156344 : if (!lg->flushing)
964 267 : ret = la_bat_create(lg, c, tid);
965 : break;
966 9477 : case LOG_DESTROY:
967 9477 : if (!lg->flushing)
968 48 : ret = la_bat_destroy(lg, c, tid);
969 : break;
970 : default:
971 0 : MT_UNREACHABLE();
972 : }
973 531380 : return ret;
974 : }
975 :
976 : static void
977 531380 : la_destroy(logaction *c)
978 : {
979 531380 : if ((c->type == LOG_UPDATE || c->type == LOG_UPDATE_BULK) && c->b)
980 1476 : logbat_destroy(c->b);
981 531380 : if (c->type == LOG_UPDATE && c->uid)
982 1098 : logbat_destroy(c->uid);
983 531380 : }
984 :
985 : static gdk_return
986 531380 : tr_grow(trans *tr)
987 : {
988 531380 : if (tr->nr == tr->sz) {
989 7 : logaction *changes;
990 7 : tr->sz <<= 1;
991 7 : changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
992 7 : if (changes == NULL)
993 : return GDK_FAIL;
994 7 : tr->changes = changes;
995 : }
996 : /* cleanup the next */
997 531380 : tr->changes[tr->nr].b = NULL;
998 531380 : return GDK_SUCCEED;
999 : }
1000 :
1001 : static trans *
1002 56562 : tr_destroy(trans *tr)
1003 : {
1004 56562 : trans *r = tr->tr;
1005 :
1006 56562 : GDKfree(tr->changes);
1007 56562 : GDKfree(tr);
1008 56562 : return r;
1009 : }
1010 :
1011 : static trans *
1012 0 : tr_abort_(logger *lg, trans *tr, int s)
1013 : {
1014 0 : int i;
1015 :
1016 0 : (void) lg;
1017 :
1018 0 : TRC_DEBUG(WAL, "abort");
1019 :
1020 0 : for (i = s; i < tr->nr; i++)
1021 0 : la_destroy(&tr->changes[i]);
1022 0 : return tr_destroy(tr);
1023 : }
1024 :
1025 : static trans *
1026 0 : tr_abort(logger *lg, trans *tr)
1027 : {
1028 0 : return tr_abort_(lg, tr, 0);
1029 : }
1030 :
1031 : static trans *
1032 56562 : tr_commit(logger *lg, trans *tr)
1033 : {
1034 56562 : int i;
1035 :
1036 56562 : TRC_DEBUG(WAL, "commit");
1037 :
1038 587942 : for (i = 0; i < tr->nr; i++) {
1039 531380 : if (la_apply(lg, &tr->changes[i], tr->tid) != GDK_SUCCEED) {
1040 0 : TRC_CRITICAL(GDK, "aborting transaction\n");
1041 0 : do {
1042 0 : tr = tr_abort_(lg, tr, i);
1043 0 : } while (tr != NULL);
1044 : return (trans *) -1;
1045 : }
1046 531380 : la_destroy(&tr->changes[i]);
1047 : }
1048 56562 : lg->saved_tid = tr->tid;
1049 56562 : return tr_destroy(tr);
1050 : }
1051 :
1052 : static gdk_return
1053 128 : log_read_types_file(logger *lg, FILE *fp, int version, bool *needsnew)
1054 : {
1055 128 : int id = 0;
1056 128 : char atom_name[IDLENGTH];
1057 128 : bool seen_geom = false;
1058 :
1059 : /* scanf should use IDLENGTH somehow */
1060 3612 : while (fscanf(fp, "%d,%63s\n", &id, atom_name) == 2) {
1061 3484 : if (version < 52303 && strcmp(atom_name, "BAT") == 0) {
1062 8 : *needsnew = true;
1063 8 : continue;
1064 : }
1065 3476 : if (version < 52304 && strcmp(atom_name, "color") == 0) {
1066 16 : *needsnew = true;
1067 16 : continue;
1068 : }
1069 456 : if (version < 52304 && strcmp(atom_name, "identifier") == 0) {
1070 16 : *needsnew = true;
1071 16 : continue;
1072 : }
1073 3444 : if (version < 52304 && strcmp(atom_name, "wkba") == 0) {
1074 16 : *needsnew = true;
1075 16 : continue;
1076 : }
1077 3428 : int i = ATOMindex(atom_name);
1078 :
1079 3428 : if (id < -127 || id > 127 || i < 0) {
1080 0 : GDKerror("unknown type in log file '%s'\n", atom_name);
1081 0 : return GDK_FAIL;
1082 : }
1083 3428 : seen_geom |= strcmp(atom_name, "mbr") == 0 || strcmp(atom_name, "wkb") == 0;
1084 3428 : lg->type_id[i] = (int8_t) id;
1085 3428 : lg->type_nr[id < 0 ? 256 + id : id] = i;
1086 : }
1087 : #ifdef HAVE_GEOM
1088 128 : if (!seen_geom && ATOMindex("mbr") > 0) {
1089 0 : GDKerror("incompatible database: server supports GEOM, but database does not\n");
1090 0 : return GDK_FAIL;
1091 : }
1092 : #endif
1093 : (void) seen_geom;
1094 : return GDK_SUCCEED;
1095 : }
1096 :
1097 :
1098 : static gdk_return
1099 245 : log_create_types_file(logger *lg, const char *filename)
1100 : {
1101 245 : FILE *fp;
1102 :
1103 245 : if ((fp = MT_fopen(filename, "w")) == NULL) {
1104 0 : GDKerror("cannot create log file %s\n", filename);
1105 0 : return GDK_FAIL;
1106 : }
1107 245 : if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
1108 0 : fclose(fp);
1109 0 : GDKerror("writing log file %s failed", filename);
1110 0 : if (MT_remove(filename) < 0)
1111 0 : GDKsyserror("remove %s failed\n", filename);
1112 0 : return GDK_FAIL;
1113 : }
1114 :
1115 245 : if (log_write_new_types(lg, fp) != GDK_SUCCEED) {
1116 0 : fclose(fp);
1117 0 : GDKerror("writing log file %s failed", filename);
1118 0 : if (MT_remove(filename) < 0)
1119 0 : GDKsyserror("remove %s failed\n", filename);
1120 0 : return GDK_FAIL;
1121 : }
1122 245 : if (fflush(fp) < 0 || (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)
1123 : #if defined(_MSC_VER)
1124 : && _commit(_fileno(fp)) < 0
1125 : #elif defined(HAVE_FDATASYNC)
1126 0 : && fdatasync(fileno(fp)) < 0
1127 : #elif defined(HAVE_FSYNC)
1128 : && fsync(fileno(fp)) < 0
1129 : #endif
1130 : )) {
1131 0 : GDKsyserror("flushing log file %s failed", filename);
1132 0 : fclose(fp);
1133 0 : if (MT_remove(filename) < 0)
1134 0 : GDKsyserror("remove %s failed\n", filename);
1135 0 : return GDK_FAIL;
1136 : }
1137 245 : if (fclose(fp) < 0) {
1138 0 : GDKsyserror("closing log file %s failed", filename);
1139 0 : if (MT_remove(filename) < 0)
1140 0 : GDKsyserror("remove %s failed\n", filename);
1141 0 : return GDK_FAIL;
1142 : }
1143 : return GDK_SUCCEED;
1144 : }
1145 :
1146 : #define rotation_lock(lg) MT_lock_set(&(lg)->rotation_lock)
1147 : #define rotation_unlock(lg) MT_lock_unset(&(lg)->rotation_lock)
1148 : #define rotation_trylock(lg, ms) MT_lock_trytime(&(lg)->rotation_lock, ms)
1149 :
1150 : static gdk_return
1151 12883 : log_open_output(logger *lg)
1152 : {
1153 12883 : logged_range *new_range = (logged_range *) GDKmalloc(sizeof(logged_range));
1154 :
1155 12883 : if (!new_range) {
1156 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1157 0 : return GDK_FAIL;
1158 : }
1159 25765 : if (!LOG_DISABLED(lg)) {
1160 12882 : char id[32];
1161 12882 : char filename[MAXPATH];
1162 :
1163 12882 : if (snprintf(id, sizeof(id), LLFMT, lg->id) >= (int) sizeof(id)) {
1164 0 : TRC_CRITICAL(GDK, "filename is too large\n");
1165 0 : GDKfree(new_range);
1166 0 : return GDK_FAIL;
1167 : }
1168 12882 : if (GDKfilepath(filename, sizeof(filename), BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id) != GDK_SUCCEED) {
1169 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1170 0 : GDKfree(new_range);
1171 0 : return GDK_FAIL;
1172 : }
1173 :
1174 12882 : TRC_INFO(WAL, "opening %s.%s", LOGFILE, id);
1175 12882 : new_range->output_log = open_wstream(filename);
1176 12882 : if (new_range->output_log) {
1177 12882 : short byteorder = 1234;
1178 12882 : mnstr_write(new_range->output_log, &byteorder, sizeof(byteorder), 1);
1179 : }
1180 :
1181 12882 : if (new_range->output_log == NULL || mnstr_errnr(new_range->output_log) != MNSTR_NO__ERROR) {
1182 0 : TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, mnstr_peek_error(NULL));
1183 0 : close_stream(new_range->output_log);
1184 0 : GDKfree(new_range);
1185 0 : return GDK_FAIL;
1186 : }
1187 : } else {
1188 1 : new_range->output_log = NULL;
1189 : }
1190 12883 : ATOMIC_INIT(&new_range->refcount, 1);
1191 12883 : ATOMIC_INIT(&new_range->last_ts, 0);
1192 12883 : ATOMIC_INIT(&new_range->flushed_ts, 0);
1193 12883 : ATOMIC_INIT(&new_range->drops, 0);
1194 12883 : new_range->id = lg->id;
1195 12883 : new_range->next = NULL;
1196 12883 : logged_range *current = lg->current;
1197 12883 : assert(current && current->next == NULL);
1198 12883 : new_range->cnt = current->cnt;
1199 12883 : current->next = new_range;
1200 12883 : lg->file_age = GDKusec();
1201 12883 : return GDK_SUCCEED;
1202 : }
1203 :
1204 : static inline void
1205 13124 : log_close_input(logger *lg)
1206 : {
1207 13124 : if (!lg->inmemory && lg->input_log) {
1208 12639 : TRC_INFO(WAL, "closing input log %s", mnstr_name(lg->input_log));
1209 12639 : close_stream(lg->input_log);
1210 : }
1211 13124 : lg->input_log = NULL;
1212 13124 : }
1213 :
1214 : static inline void
1215 357 : log_close_output(logger *lg)
1216 : {
1217 357 : if (!LOG_DISABLED(lg) && lg->current->output_log) {
1218 356 : TRC_INFO(WAL, "closing output log %s", mnstr_name(lg->current->output_log));
1219 356 : close_stream(lg->current->output_log);
1220 : }
1221 357 : lg->current->output_log = NULL;
1222 357 : }
1223 :
1224 : static gdk_return
1225 12767 : log_open_input(logger *lg, const char *filename, bool *filemissing)
1226 : {
1227 12767 : TRC_INFO(WAL, "opening input log %s", filename);
1228 12767 : lg->input_log = open_rstream(filename);
1229 :
1230 : /* if the file doesn't exist, there is nothing to be read back */
1231 12767 : if (lg->input_log == NULL || mnstr_errnr(lg->input_log) != MNSTR_NO__ERROR) {
1232 128 : log_close_input(lg);
1233 128 : *filemissing = true;
1234 128 : return GDK_SUCCEED;
1235 : }
1236 12639 : short byteorder;
1237 12639 : switch (mnstr_read(lg->input_log, &byteorder, sizeof(byteorder), 1)) {
1238 0 : case -1:
1239 0 : log_close_input(lg);
1240 0 : TRC_CRITICAL(GDK, "read failed\n");
1241 0 : return GDK_FAIL;
1242 6 : case 0:
1243 : /* empty file is ok */
1244 6 : log_close_input(lg);
1245 6 : return GDK_SUCCEED;
1246 12633 : case 1:
1247 : /* if not empty, must start with correct byte order mark */
1248 12633 : if (byteorder != 1234) {
1249 0 : TRC_CRITICAL(GDK, "incorrect byte order word in file %s\n", filename);
1250 0 : log_close_input(lg);
1251 0 : return GDK_FAIL;
1252 : }
1253 : break;
1254 : }
1255 : return GDK_SUCCEED;
1256 : }
1257 :
1258 : static log_return
1259 12633 : log_read_transaction(logger *lg, uint32_t *updated, BUN maxupdated)
1260 : {
1261 12633 : logformat l;
1262 12633 : trans *tr = NULL;
1263 12633 : log_return err = LOG_OK;
1264 12633 : bool ok = true;
1265 12633 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
1266 :
1267 12633 : (void) maxupdated; /* only used inside assert() */
1268 :
1269 12633 : if (!lg->flushing)
1270 213 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
1271 :
1272 12633 : BAT *cands = NULL; /* used in case of LOG_BAT_GROUP */
1273 :
1274 763834 : while (err == LOG_OK && (ok = log_read_format(lg, &l))) {
1275 751201 : if (l.flag == 0 && l.id == 0) {
1276 12633 : err = LOG_EOF;
1277 : break;
1278 : }
1279 :
1280 751201 : TRC_DEBUG_IF(WAL) {
1281 0 : if (l.flag > 0 && l.flag != LOG_CLEAR &&
1282 : l.flag < (bte) (sizeof(log_commands) / sizeof(log_commands[0])))
1283 0 : TRC_DEBUG_ENDIF(WAL, "%s %d", log_commands[(int) l.flag], l.id);
1284 : else
1285 0 : TRC_DEBUG_ENDIF(WAL, "%d %d", l.flag, l.id);
1286 : }
1287 751201 : switch (l.flag) {
1288 559624 : case LOG_UPDATE_CONST:
1289 : case LOG_UPDATE_BULK:
1290 : case LOG_UPDATE:
1291 : case LOG_CREATE:
1292 : case LOG_DESTROY:
1293 559624 : if (tr != NULL && updated && BAThash(lg->catalog_id) == GDK_SUCCEED) {
1294 557657 : BATiter cni = bat_iterator(lg->catalog_id);
1295 557657 : BUN p;
1296 557657 : BUN posnew = BUN_NONE;
1297 557657 : BUN posold = BUN_NONE;
1298 557657 : MT_rwlock_rdlock(&cni.b->thashlock);
1299 4440586 : HASHloop_int(cni, cni.b->thash, p, &l.id) {
1300 3386635 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
1301 3386635 : if (lid == lng_nil || lid > tr->tid)
1302 : posnew = p;
1303 1569972 : else if (lid == tr->tid)
1304 3686365 : posold = p;
1305 : }
1306 557657 : MT_rwlock_rdunlock(&cni.b->thashlock);
1307 557657 : bat_iterator_end(&cni);
1308 : /* Normally at this point, posnew is the
1309 : * location of the bat that this
1310 : * transaction is working on, and posold
1311 : * is the location of the previous
1312 : * version of the bat. If LOG_CREATE,
1313 : * both are relevant, since the latter
1314 : * is the new bat, and the former is the
1315 : * to-be-destroyed bat. For
1316 : * LOG_DESTROY, only posnew should be
1317 : * relevant, but for the other types, if
1318 : * the table is destroyed later in the
1319 : * same transaction, we need posold, and
1320 : * else (the normal case) we need
1321 : * posnew. */
1322 557657 : if (posnew != BUN_NONE) {
1323 548220 : assert(posnew < maxupdated);
1324 548220 : updated[posnew / 32] |= 1U << (posnew % 32);
1325 : }
1326 557657 : if ((l.flag == LOG_CREATE || posnew == BUN_NONE) && posold != BUN_NONE) {
1327 162239 : assert(posold < maxupdated);
1328 162239 : updated[posold / 32] |= 1U << (posold % 32);
1329 : }
1330 : }
1331 : break;
1332 : default:
1333 : /* do nothing */
1334 : break;
1335 : }
1336 : /* the functions we call here can succeed (LOG_OK),
1337 : * but they can also fail for two different reasons:
1338 : * they can run out of input (LOG_EOF -- this is not
1339 : * serious, we just abort the remaining transactions),
1340 : * or some malloc or BAT update fails (LOG_ERR -- this
1341 : * is serious, we must abort the complete process);
1342 : * the latter failure causes the current function to
1343 : * return GDK_FAIL */
1344 751201 : switch (l.flag) {
1345 56562 : case LOG_START:
1346 56562 : assert(!lg->flushing || l.id <= lg->tid);
1347 56562 : if (!lg->flushing && l.id > lg->tid)
1348 127 : lg->tid = l.id; /* should only happen during initialization */
1349 56562 : if ((tr = tr_create(tr, l.id)) == NULL) {
1350 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
1351 0 : err = LOG_ERR;
1352 0 : break;
1353 : }
1354 56562 : TRC_DEBUG(WAL, "tstart %d\n", tr->tid);
1355 : break;
1356 56562 : case LOG_END:
1357 56562 : if (tr == NULL)
1358 : err = LOG_EOF;
1359 56562 : else if (tr->tid != l.id) /* abort record */
1360 0 : tr = tr_abort(lg, tr);
1361 : else
1362 56562 : tr = tr_commit(lg, tr);
1363 : break;
1364 2795 : case LOG_SEQ:
1365 2795 : err = log_read_seq(lg, &l);
1366 2795 : break;
1367 393803 : case LOG_UPDATE_CONST:
1368 : case LOG_UPDATE_BULK:
1369 : case LOG_UPDATE:
1370 393803 : if (tr == NULL)
1371 : err = LOG_EOF;
1372 : else {
1373 617635 : err = log_read_updates(lg, tr, &l, l.id, cands ? &cands : NULL);
1374 : }
1375 : break;
1376 156344 : case LOG_CREATE:
1377 156344 : if (tr == NULL)
1378 : err = LOG_EOF;
1379 : else
1380 156344 : err = log_read_create(lg, tr, l.id);
1381 : break;
1382 9477 : case LOG_DESTROY:
1383 9477 : if (tr == NULL)
1384 : err = LOG_EOF;
1385 : else
1386 9477 : err = log_read_destroy(lg, tr, l.id);
1387 : break;
1388 75658 : case LOG_BAT_GROUP:
1389 75658 : if (tr == NULL)
1390 : err = LOG_EOF;
1391 : else {
1392 75658 : if (l.id > 0) {
1393 : /* START OF LOG_BAT_GROUP */
1394 37829 : cands = COLnew(0, TYPE_void, 0, SYSTRANS);
1395 37829 : if (!cands) {
1396 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
1397 0 : err = LOG_ERR;
1398 : }
1399 37829 : } else if (cands == NULL) {
1400 : /* should have gone through the
1401 : * above option earlier */
1402 0 : TRC_CRITICAL(GDK, "unexpected error\n");
1403 0 : err = LOG_ERR;
1404 : } else {
1405 : /* END OF LOG_BAT_GROUP */
1406 37829 : BBPunfix(cands->batCacheid);
1407 37829 : cands = NULL;
1408 : }
1409 : }
1410 : break;
1411 0 : default:
1412 0 : TRC_CRITICAL(GDK, "unrecognized log entry %d", l.flag);
1413 0 : err = LOG_ERR;
1414 : }
1415 751201 : if (tr == (trans *) -1) {
1416 : /* message already generated by tr_commit */
1417 : err = LOG_ERR;
1418 12633 : tr = NULL;
1419 : break;
1420 : }
1421 : }
1422 12633 : while (tr) {
1423 0 : TRC_WARNING(GDK, "aborting transaction\n");
1424 0 : tr = tr_abort(lg, tr);
1425 : }
1426 12633 : if (!lg->flushing)
1427 213 : ATOMIC_SET(&GDKdebug, dbg);
1428 :
1429 12633 : BBPreclaim(cands);
1430 12633 : if (!ok)
1431 12633 : return LOG_EOF;
1432 : return err;
1433 : }
1434 :
1435 : static gdk_return
1436 347 : log_readlog(logger *lg, const char *filename, bool *filemissing)
1437 : {
1438 347 : log_return err = LOG_OK;
1439 347 : time_t t0, t1;
1440 347 : struct stat sb;
1441 :
1442 347 : assert(!lg->inmemory);
1443 :
1444 347 : TRC_INFO(WAL, "opening %s\n", filename);
1445 :
1446 347 : gdk_return res = log_open_input(lg, filename, filemissing);
1447 347 : if (!lg->input_log || res != GDK_SUCCEED)
1448 : return res;
1449 213 : int fd;
1450 213 : if ((fd = getFileNo(lg->input_log)) < 0 || fstat(fd, &sb) < 0) {
1451 0 : GDKsyserror("fstat on opened file %s failed\n", filename);
1452 0 : log_close_input(lg);
1453 : /* If the file could be opened, but fstat fails,
1454 : * something weird is going on */
1455 0 : return GDK_FAIL;
1456 : }
1457 213 : t0 = time(NULL);
1458 213 : TRC_INFO_IF(WAL) {
1459 0 : TRC_INFO_ENDIF(WAL, "Start reading the write-ahead log '%s'\n", filename);
1460 0 : GDKtracer_flush_buffer();
1461 : }
1462 426 : while (err != LOG_EOF && err != LOG_ERR) {
1463 213 : t1 = time(NULL);
1464 213 : if (t1 - t0 > 10) {
1465 0 : lng fpos;
1466 0 : t0 = t1;
1467 : /* not more than once every 10 seconds */
1468 0 : fpos = (lng) getfilepos(getFile(lg->input_log));
1469 0 : TRC_INFO_IF(WAL) {
1470 0 : if (fpos >= 0) {
1471 0 : TRC_INFO_ENDIF(WAL, "still reading write-ahead log \"%s\" (%d%% done)\n",
1472 : filename, (int) ((fpos * 100 + 50) / sb.st_size));
1473 0 : GDKtracer_flush_buffer();
1474 : }
1475 : }
1476 : }
1477 213 : err = log_read_transaction(lg, NULL, 0);
1478 : }
1479 213 : log_close_input(lg);
1480 213 : lg->input_log = NULL;
1481 :
1482 : /* remaining transactions are not committed, ie abort */
1483 213 : TRC_INFO_IF(WAL) {
1484 0 : TRC_INFO_ENDIF(WAL, "Finished reading the write-ahead log '%s'\n", filename);
1485 0 : GDKtracer_flush_buffer();
1486 : }
1487 : /* we cannot distinguish errors from incomplete transactions
1488 : * (even if we would log aborts in the logs). So we simply
1489 : * abort and move to the next log file */
1490 213 : return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
1491 : }
1492 :
1493 : /*
1494 : * The log files are incrementally numbered, starting from 2. They are
1495 : * processed in the same sequence.
1496 : */
1497 : static gdk_return
1498 128 : log_readlogs(logger *lg, const char *filename)
1499 : {
1500 128 : gdk_return res = GDK_SUCCEED;
1501 :
1502 128 : assert(!lg->inmemory);
1503 128 : TRC_DEBUG(WAL, "logger id is " LLFMT " last logger id is " LLFMT "\n", lg->id, lg->saved_id);
1504 :
1505 128 : char log_filename[FILENAME_MAX];
1506 128 : if (lg->saved_id >= lg->id) {
1507 128 : bool filemissing = false;
1508 :
1509 128 : lg->id = lg->saved_id + 1;
1510 475 : while (res == GDK_SUCCEED && !filemissing) {
1511 347 : if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
1512 0 : GDKerror("Logger filename path is too large\n");
1513 0 : return GDK_FAIL;
1514 : }
1515 347 : res = log_readlog(lg, log_filename, &filemissing);
1516 347 : if (!filemissing) {
1517 219 : lg->saved_id++;
1518 219 : lg->id++;
1519 : }
1520 : }
1521 : }
1522 : return res;
1523 : }
1524 :
1525 : static gdk_return
1526 12182 : log_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1527 : {
1528 12182 : TRC_DEBUG(WAL, "commit");
1529 :
1530 12182 : return bm_commit(lg, pending, updated, maxupdated);
1531 : }
1532 :
1533 : static gdk_return
1534 128 : check_version(logger *lg, FILE *fp, bool *needsnew)
1535 : {
1536 128 : int version = 0;
1537 :
1538 128 : assert(!lg->inmemory);
1539 128 : if (fscanf(fp, "%6d", &version) != 1) {
1540 0 : GDKerror("Could not read the version number from the file '%s/log'.\n", lg->dir);
1541 0 : fclose(fp);
1542 0 : return GDK_FAIL;
1543 : }
1544 128 : if (version != lg->version) {
1545 32 : if (lg->prefuncp == NULL ||
1546 16 : (*lg->prefuncp) (lg->funcdata, version, lg->version) != GDK_SUCCEED) {
1547 0 : GDKerror("Incompatible database version %06d, "
1548 : "this server supports version %06d.\n%s",
1549 : version, lg->version,
1550 : version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
1551 0 : fclose(fp);
1552 0 : return GDK_FAIL;
1553 : }
1554 16 : *needsnew = true; /* we need to write a new log file */
1555 : } else {
1556 112 : lg->postfuncp = NULL; /* don't call */
1557 112 : *needsnew = false; /* log file already up-to-date */
1558 : }
1559 256 : if (fgetc(fp) != '\n' || /* skip \n */
1560 128 : fgetc(fp) != '\n') { /* skip \n */
1561 0 : GDKerror("Badly formatted log file");
1562 0 : fclose(fp);
1563 0 : return GDK_FAIL;
1564 : }
1565 128 : if (log_read_types_file(lg, fp, version, needsnew) != GDK_SUCCEED) {
1566 0 : fclose(fp);
1567 0 : return GDK_FAIL;
1568 : }
1569 128 : fclose(fp);
1570 128 : return GDK_SUCCEED;
1571 : }
1572 :
1573 : static BAT *
1574 357 : bm_tids(BAT *b, BAT *d)
1575 : {
1576 357 : BUN sz = BATcount(b);
1577 357 : BAT *tids = BATdense(0, 0, sz);
1578 :
1579 357 : if (tids == NULL)
1580 : return NULL;
1581 :
1582 357 : if (BATcount(d)) {
1583 357 : BAT *diff = BATdiff(tids, d, NULL, NULL, false, false, BUN_NONE);
1584 357 : logbat_destroy(tids);
1585 357 : tids = diff;
1586 : }
1587 : return tids;
1588 : }
1589 :
1590 :
1591 : static gdk_return
1592 8178 : log_switch_bat(BAT *old, BAT *new, const char *fn, const char *name)
1593 : {
1594 8178 : char bak[IDLENGTH];
1595 :
1596 8178 : if (BATmode(old, true) != GDK_SUCCEED) {
1597 0 : GDKerror("cannot convert old %s to transient", name);
1598 0 : return GDK_FAIL;
1599 : }
1600 8178 : if (strconcat_len(bak, sizeof(bak), fn, "_", name, NULL) >= sizeof(bak)) {
1601 0 : GDKerror("name %s_%s too long\n", fn, name);
1602 0 : return GDK_FAIL;
1603 : }
1604 8178 : if (BBPrename(old, NULL) != 0 || BBPrename(new, bak) != 0) {
1605 0 : GDKerror("rename (%s) failed\n", bak);
1606 0 : return GDK_FAIL;
1607 : }
1608 8178 : BBPretain(new->batCacheid);
1609 8178 : return GDK_SUCCEED;
1610 : }
1611 :
1612 : static gdk_return
1613 358 : bm_get_counts(logger *lg)
1614 : {
1615 358 : BUN p, q;
1616 358 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
1617 :
1618 33790 : BATloop(lg->catalog_bid, p, q) {
1619 33432 : oid pos = p;
1620 33432 : lng cnt = 0;
1621 33432 : lng lid = lng_nil;
1622 :
1623 33432 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
1624 33432 : BAT *b = BBPquickdesc(bids[p]);
1625 33432 : assert(b);
1626 33432 : cnt = BATcount(b);
1627 : } else {
1628 0 : lid = BBP_desc(bids[p])->batCacheid != 0 && log_find(lg->catalog_bid, lg->dcatalog, bids[p]) == BUN_NONE ? 1 : -1;
1629 : }
1630 33432 : if (BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED)
1631 0 : return GDK_FAIL;
1632 33432 : if (BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
1633 : return GDK_FAIL;
1634 : }
1635 : return GDK_SUCCEED;
1636 : }
1637 :
1638 : static int
1639 8178 : subcommit_list_add(int next, bat *n, BUN *sizes, bat bid, BUN sz)
1640 : {
1641 8178 : assert(sz <= BBP_desc(bid)->batCount || sz == BUN_NONE);
1642 1902354 : for (int i = 0; i < next; i++) {
1643 1894176 : if (n[i] == bid) {
1644 0 : sizes[i] = sz;
1645 0 : return next;
1646 : }
1647 : }
1648 8178 : n[next] = bid;
1649 8178 : sizes[next++] = sz;
1650 8178 : return next;
1651 : }
1652 :
1653 : static int
1654 2488 : cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng *cnts,
1655 : BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, BUN cleanup)
1656 : {
1657 2488 : BAT *nbids, *noids, *ncnts, *nlids, *ndels;
1658 2488 : BUN p, q;
1659 2488 : int err = 0, rcnt = 0;
1660 :
1661 2488 : BUN ocnt = BATcount(catalog_bid);
1662 2488 : nbids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1663 2488 : noids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1664 2488 : ncnts = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1665 2488 : nlids = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1666 2488 : ndels = logbat_new(TYPE_oid, BATcount(dcatalog) - cleanup, PERSISTENT);
1667 :
1668 2488 : if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == NULL || ndels == NULL) {
1669 0 : logbat_destroy(nbids);
1670 0 : logbat_destroy(noids);
1671 0 : logbat_destroy(ncnts);
1672 0 : logbat_destroy(nlids);
1673 0 : logbat_destroy(ndels);
1674 0 : return 0;
1675 : }
1676 :
1677 2488 : oid *poss = Tloc(dcatalog, 0);
1678 180051 : BATloop(dcatalog, p, q) {
1679 177563 : oid pos = poss[p];
1680 :
1681 177563 : if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid)
1682 0 : continue;
1683 :
1684 177563 : if (lids[pos] >= 0) {
1685 177563 : bat bid = bids[pos];
1686 177563 : BAT *lb = BBP_desc(bid);
1687 :
1688 177563 : if (lb->batCacheid == 0 || BATmode(lb, true /*transient */ ) != GDK_SUCCEED) {
1689 0 : GDKwarning("Failed to set bat(%d) transient\n", bid);
1690 : } else {
1691 177563 : lids[pos] = -1; /* mark as transient */
1692 177563 : r[rcnt++] = bid;
1693 : }
1694 : }
1695 : }
1696 :
1697 2488 : int *oids = (int *) Tloc(catalog_id, 0);
1698 2488 : q = BATcount(catalog_bid);
1699 880976 : for (p = 0; p < q && !err; p++) {
1700 878488 : bat col = bids[p];
1701 878488 : int nid = oids[p];
1702 878488 : lng lid = lids[p];
1703 878488 : lng cnt = cnts[p];
1704 878488 : oid pos = p;
1705 :
1706 : /* only project out the deleted with lid == -1
1707 : * update dcatalog */
1708 878488 : if (lid == -1)
1709 177563 : continue; /* remove */
1710 :
1711 1401850 : if (BUNappend(nbids, &col, true) != GDK_SUCCEED ||
1712 1401850 : BUNappend(noids, &nid, true) != GDK_SUCCEED ||
1713 1401850 : BUNappend(nlids, &lid, false) != GDK_SUCCEED ||
1714 700925 : BUNappend(ncnts, &cnt, false) != GDK_SUCCEED)
1715 : err = 1;
1716 700925 : if (BUNfnd(lg->dcatalog, &pos) != BUN_NONE) {
1717 0 : pos = (oid) (BATcount(nbids) - 1);
1718 0 : if (BUNappend(ndels, &pos, true) != GDK_SUCCEED)
1719 700925 : err = 1;
1720 : }
1721 : }
1722 :
1723 2488 : if (err) {
1724 0 : logbat_destroy(nbids);
1725 0 : logbat_destroy(noids);
1726 0 : logbat_destroy(ndels);
1727 0 : logbat_destroy(ncnts);
1728 0 : logbat_destroy(nlids);
1729 0 : return 0;
1730 : }
1731 : /* point of no return */
1732 4976 : if (log_switch_bat(catalog_bid, nbids, lg->fn, "catalog_bid") != GDK_SUCCEED ||
1733 4976 : log_switch_bat(catalog_id, noids, lg->fn, "catalog_id") != GDK_SUCCEED ||
1734 2488 : log_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != GDK_SUCCEED) {
1735 0 : logbat_destroy(nbids);
1736 0 : logbat_destroy(noids);
1737 0 : logbat_destroy(ndels);
1738 0 : logbat_destroy(ncnts);
1739 0 : logbat_destroy(nlids);
1740 0 : return -1;
1741 : }
1742 2488 : r[rcnt++] = lg->catalog_bid->batCacheid;
1743 2488 : r[rcnt++] = lg->catalog_id->batCacheid;
1744 2488 : r[rcnt++] = lg->dcatalog->batCacheid;
1745 :
1746 2488 : assert(BATcount(lg->dcatalog) - cleanup == BATcount(ndels));
1747 :
1748 2488 : logbat_destroy(lg->catalog_bid);
1749 2488 : logbat_destroy(lg->catalog_id);
1750 2488 : logbat_destroy(lg->dcatalog);
1751 :
1752 2488 : lg->catalog_bid = nbids;
1753 2488 : lg->catalog_id = noids;
1754 2488 : lg->dcatalog = ndels;
1755 :
1756 : /* failing to rename these two bats is not fatal */
1757 2488 : if (BBPrename(lg->catalog_cnt, NULL) != GDK_SUCCEED)
1758 2488 : GDKclrerr();
1759 2488 : if (BBPrename(lg->catalog_lid, NULL) != GDK_SUCCEED)
1760 2488 : GDKclrerr();
1761 2488 : BBPunfix(lg->catalog_cnt->batCacheid);
1762 2488 : BBPunfix(lg->catalog_lid->batCacheid);
1763 :
1764 2488 : lg->catalog_cnt = ncnts;
1765 2488 : lg->catalog_lid = nlids;
1766 2488 : char bak[FILENAME_MAX];
1767 2488 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_cnt", NULL);
1768 2488 : if (BBPrename(lg->catalog_cnt, bak) < 0)
1769 0 : GDKclrerr();
1770 2488 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_lid", NULL);
1771 2488 : if (BBPrename(lg->catalog_lid, bak) < 0)
1772 0 : GDKclrerr();
1773 2488 : rotation_lock(lg);
1774 10724 : for (logged_range *p = lg->pending; p; p = p->next) {
1775 8236 : p->cnt -= cleanup;
1776 : }
1777 2488 : rotation_unlock(lg);
1778 2488 : return rcnt;
1779 : }
1780 :
1781 : /* this function is called with log_lock() held; it releases the lock
1782 : * before returning */
1783 : static gdk_return
1784 12642 : bm_subcommit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1785 : {
1786 12642 : BUN cnt = pending ? pending->cnt : BATcount(lg->catalog_bid);
1787 12642 : BUN dcnt = BATcount(lg->dcatalog);
1788 12642 : BUN p, q;
1789 12642 : BAT *catalog_bid = lg->catalog_bid;
1790 12642 : BAT *catalog_id = lg->catalog_id;
1791 12642 : BAT *dcatalog = lg->dcatalog;
1792 12642 : BUN nn = 13 + cnt;
1793 12642 : bat *n = GDKmalloc(sizeof(bat) * nn);
1794 12642 : bat *r = GDKmalloc(sizeof(bat) * nn);
1795 12642 : BUN *sizes = GDKmalloc(sizeof(BUN) * nn);
1796 12642 : int i = 0, rcnt = 0;
1797 12642 : gdk_return res;
1798 12642 : const log_bid *bids;
1799 12642 : lng *cnts = NULL, *lids = NULL;
1800 12642 : BUN cleanup = 0;
1801 12642 : lng t0 = 0;
1802 :
1803 12642 : if (n == NULL || r == NULL || sizes == NULL) {
1804 0 : GDKfree(n);
1805 0 : GDKfree(r);
1806 0 : GDKfree(sizes);
1807 0 : log_unlock(lg);
1808 0 : return GDK_FAIL;
1809 : }
1810 :
1811 12642 : sizes[i] = 0;
1812 12642 : n[i++] = 0; /* n[0] is not used */
1813 12642 : bids = (const log_bid *) Tloc(catalog_bid, 0);
1814 12642 : if (lg->catalog_cnt)
1815 12412 : cnts = (lng *) Tloc(lg->catalog_cnt, 0);
1816 12642 : if (lg->catalog_lid)
1817 12412 : lids = (lng *) Tloc(lg->catalog_lid, 0);
1818 3731504 : BATloop(catalog_bid, p, q) {
1819 3718862 : if (lids && lids[p] != lng_nil && lids[p] <= lg->saved_tid) {
1820 177563 : cleanup++;
1821 177563 : if (lids[p] == -1)
1822 0 : continue;
1823 355094 : if (BUNfnd(dcatalog, &(oid){p}) == BUN_NONE &&
1824 177531 : BUNappend(dcatalog, &(oid){p}, true) != GDK_SUCCEED) {
1825 0 : while (BATcount(dcatalog) > dcnt) {
1826 0 : if (BUNdelete(dcatalog, BATcount(dcatalog) - 1) != GDK_SUCCEED) {
1827 0 : TRC_CRITICAL(WAL, "delete after failed append failed\n");
1828 0 : break;
1829 : }
1830 : }
1831 0 : GDKfree(n);
1832 0 : GDKfree(r);
1833 0 : GDKfree(sizes);
1834 0 : log_unlock(lg);
1835 0 : return GDK_FAIL;
1836 : }
1837 : }
1838 3718862 : if (updated && p < maxupdated && (updated[p / 32] & (1U << (p % 32))) == 0) {
1839 1777260 : continue;
1840 : }
1841 1941602 : bat col = bids[p];
1842 :
1843 1941602 : TRC_DEBUG(WAL, "new %s (%d)\n", BBP_logical(col), col);
1844 1941602 : assert(col);
1845 1941602 : sizes[i] = cnts ? (BUN) cnts[p] : 0;
1846 1941602 : n[i++] = col;
1847 : }
1848 : /* now commit catalog, so it's also up to date on disk */
1849 12642 : sizes[i] = cnt;
1850 12642 : n[i++] = catalog_bid->batCacheid;
1851 12642 : sizes[i] = cnt;
1852 12642 : n[i++] = catalog_id->batCacheid;
1853 12642 : sizes[i] = BATcount(dcatalog);
1854 12642 : n[i++] = dcatalog->batCacheid;
1855 :
1856 12642 : if (cleanup) {
1857 2488 : if ((rcnt = cleanup_and_swap(lg, r, bids, lids, cnts,
1858 : catalog_bid, catalog_id, dcatalog,
1859 : cleanup)) < 0) {
1860 0 : GDKfree(n);
1861 0 : GDKfree(r);
1862 0 : GDKfree(sizes);
1863 0 : log_unlock(lg);
1864 0 : return GDK_FAIL;
1865 : }
1866 2488 : cnt -= cleanup;
1867 : }
1868 12642 : if (dcatalog != lg->dcatalog) {
1869 2488 : i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, cnt);
1870 2488 : i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, cnt);
1871 2488 : i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog));
1872 : }
1873 12642 : if (lg->seqs_id) {
1874 12412 : sizes[i] = BATcount(lg->seqs_id);
1875 12412 : n[i++] = lg->seqs_id->batCacheid;
1876 12412 : sizes[i] = BATcount(lg->seqs_id);
1877 12412 : n[i++] = lg->seqs_val->batCacheid;
1878 : }
1879 12642 : if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id) / 2) && BATcount(lg->dseqs) > 10) {
1880 357 : BAT *tids, *ids, *vals;
1881 :
1882 357 : tids = bm_tids(lg->seqs_id, lg->dseqs);
1883 357 : if (tids == NULL) {
1884 0 : GDKfree(n);
1885 0 : GDKfree(r);
1886 0 : GDKfree(sizes);
1887 0 : log_unlock(lg);
1888 0 : return GDK_FAIL;
1889 : }
1890 357 : ids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
1891 357 : vals = logbat_new(TYPE_lng, BATcount(tids), PERSISTENT);
1892 :
1893 357 : if (ids == NULL || vals == NULL) {
1894 0 : logbat_destroy(tids);
1895 0 : logbat_destroy(ids);
1896 0 : logbat_destroy(vals);
1897 0 : GDKfree(n);
1898 0 : GDKfree(r);
1899 0 : GDKfree(sizes);
1900 0 : log_unlock(lg);
1901 0 : return GDK_FAIL;
1902 : }
1903 :
1904 714 : if (BATappend(ids, lg->seqs_id, tids, true) != GDK_SUCCEED ||
1905 357 : BATappend(vals, lg->seqs_val, tids, true) != GDK_SUCCEED) {
1906 0 : logbat_destroy(tids);
1907 0 : logbat_destroy(ids);
1908 0 : logbat_destroy(vals);
1909 0 : GDKfree(n);
1910 0 : GDKfree(r);
1911 0 : GDKfree(sizes);
1912 0 : log_unlock(lg);
1913 0 : return GDK_FAIL;
1914 : }
1915 357 : logbat_destroy(tids);
1916 357 : BATclear(lg->dseqs, true);
1917 :
1918 714 : if (log_switch_bat(lg->seqs_id, ids, lg->fn, "seqs_id") != GDK_SUCCEED ||
1919 357 : log_switch_bat(lg->seqs_val, vals, lg->fn, "seqs_val") != GDK_SUCCEED) {
1920 0 : logbat_destroy(ids);
1921 0 : logbat_destroy(vals);
1922 0 : GDKfree(n);
1923 0 : GDKfree(r);
1924 0 : GDKfree(sizes);
1925 0 : log_unlock(lg);
1926 0 : return GDK_FAIL;
1927 : }
1928 357 : i = subcommit_list_add(i, n, sizes, ids->batCacheid, BATcount(ids));
1929 357 : i = subcommit_list_add(i, n, sizes, vals->batCacheid, BATcount(ids));
1930 :
1931 357 : if (BBP_lrefs(lg->seqs_id->batCacheid) > 0)
1932 281 : r[rcnt++] = lg->seqs_id->batCacheid;
1933 357 : if (BBP_lrefs(lg->seqs_val->batCacheid) > 0)
1934 281 : r[rcnt++] = lg->seqs_val->batCacheid;
1935 :
1936 357 : logbat_destroy(lg->seqs_id);
1937 357 : logbat_destroy(lg->seqs_val);
1938 :
1939 357 : lg->seqs_id = ids;
1940 357 : lg->seqs_val = vals;
1941 : }
1942 12642 : if (lg->seqs_id) {
1943 12412 : sizes[i] = BATcount(lg->dseqs);
1944 12412 : n[i++] = lg->dseqs->batCacheid;
1945 : }
1946 :
1947 12642 : assert((BUN) i <= nn);
1948 12642 : log_unlock(lg);
1949 12642 : TRC_DEBUG_IF(WAL)
1950 0 : t0 = GDKusec();
1951 12872 : res = TMsubcommit_list(n, cnts ? sizes : NULL, i, lg->saved_id);
1952 12642 : TRC_DEBUG(WAL, "subcommit " LLFMT "usec\n", GDKusec() - t0);
1953 12642 : if (res == GDK_SUCCEED) { /* now cleanup */
1954 198231 : for (i = 0; i < rcnt; i++) {
1955 185589 : TRC_DEBUG_IF(WAL) {
1956 0 : TRC_DEBUG_ENDIF(WAL, "release %d\n", r[i]);
1957 0 : if (BBP_lrefs(r[i]) != 2)
1958 0 : TRC_DEBUG_ENDIF(WAL, "release %d %d\n", r[i], BBP_lrefs(r[i]));
1959 : }
1960 185589 : BBPrelease(r[i]);
1961 : }
1962 : }
1963 12642 : GDKfree(n);
1964 12642 : GDKfree(r);
1965 12642 : GDKfree(sizes);
1966 12642 : if (res != GDK_SUCCEED)
1967 0 : TRC_CRITICAL(GDK, "commit failed\n");
1968 : return res;
1969 : }
1970 :
1971 : static gdk_return
1972 357 : log_filename(logger *lg, char bak[FILENAME_MAX], char filename[FILENAME_MAX])
1973 : {
1974 357 : if (GDKfilepath(filename, FILENAME_MAX, 0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED) {
1975 0 : GDKerror("Logger filename path is too large\n");
1976 0 : return GDK_FAIL;
1977 : }
1978 357 : if (bak) {
1979 357 : if (strconcat_len(bak, FILENAME_MAX, filename, ".bak", NULL) >= FILENAME_MAX) {
1980 0 : GDKerror("Logger filename path is too large\n");
1981 0 : return GDK_FAIL;
1982 : }
1983 : }
1984 : return GDK_SUCCEED;
1985 : }
1986 :
1987 : static gdk_return
1988 12639 : log_cleanup(logger *lg, lng id)
1989 : {
1990 12639 : char log_id[FILENAME_MAX];
1991 :
1992 12639 : if (snprintf(log_id, sizeof(log_id), LLFMT, id) >= FILENAME_MAX) {
1993 0 : GDKerror("log_id filename is too large\n");
1994 0 : return GDK_FAIL;
1995 : }
1996 12639 : if (GDKunlink(0, lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
1997 0 : GDKwarning("failed to remove old WAL %s.%s\n", LOGFILE, log_id);
1998 0 : GDKclrerr(); /* clear error from unlink */
1999 : }
2000 : return GDK_SUCCEED;
2001 : }
2002 :
2003 : #ifdef GDKLIBRARY_JSON
2004 : static gdk_return
2005 358 : log_json_upgrade_finalize(void)
2006 : {
2007 358 : int json_tpe = ATOMindex("json");
2008 715 : if (!GDKinmemory(0) &&
2009 357 : GDKunlink(0, BATDIR, "jsonupgradeneeded", NULL) == GDK_FAIL) {
2010 0 : TRC_CRITICAL(GDK, "Failed to remove json upgrade signal file");
2011 0 : return GDK_FAIL;
2012 : }
2013 358 : BATatoms[json_tpe].atomRead = (void *(*)(void *, size_t *, stream *, size_t))strRead;
2014 :
2015 358 : return GDK_SUCCEED;
2016 : }
2017 : #endif
2018 :
2019 : /* clean up old junk left over from old upgrades: bats that are
2020 : * persistent but not in the SQL catalog and that have no name, and bats
2021 : * that do have a name that starts with "stat_opt_" (from the statistics
2022 : * optimizer that was removed in 2017) are removed here
2023 : *
2024 : * this function ignores any errors */
2025 : static void
2026 8 : clean_bbp(logger *lg)
2027 : {
2028 8 : BAT *b = COLnew(0, TYPE_int, 256, TRANSIENT);
2029 8 : if (b == NULL)
2030 : return;
2031 8 : if (BUNappend(b, &(int){0}, false) != GDK_SUCCEED) {
2032 0 : BBPreclaim(b);
2033 0 : return;
2034 : }
2035 : /* mark persistent bats that have no name or have a name
2036 : * starting with "stat_opt_" */
2037 16926 : for (bat bid = 1, bsz = getBBPsize(); bid < bsz; bid++)
2038 16918 : if (BBP_status(bid) & BBPEXISTING &&
2039 2600 : (BBP_logical(bid) == NULL ||
2040 2600 : strncmp(BBP_logical(bid), "tmp_", 4) == 0 ||
2041 80 : strncmp(BBP_logical(bid), "stat_opt_", 9) == 0))
2042 2552 : BBP_status_on(bid, 1U << 31);
2043 : /* remove mark from bats that are in the SQL catalog */
2044 2240 : for (BUN i = 0, n = BATcount(lg->catalog_bid); i < n; i++)
2045 2232 : BBP_status_off(((int *) lg->catalog_bid->theap->base)[i], 1U << 31);
2046 : /* what's left over are junk bats */
2047 16926 : for (bat bid = 1, bsz = getBBPsize(); bid < bsz; bid++)
2048 16918 : if (BBP_status(bid) & (1U << 31)) {
2049 320 : BBP_status_off(bid, 1U << 31);
2050 640 : if (BATmode(BBP_desc(bid), true) != GDK_SUCCEED ||
2051 320 : BUNappend(b, &bid, false) != GDK_SUCCEED) {
2052 0 : BBPreclaim(b);
2053 0 : return;
2054 : }
2055 320 : printf("# removing bat %d (tmp_%o)\n", bid, bid);
2056 : }
2057 : /* if there were any junk bats, commit their removal */
2058 16 : if (b->batCount > 1 &&
2059 8 : TMsubcommit_list(Tloc(b, 0), NULL, (int) b->batCount, -1) != GDK_SUCCEED)
2060 0 : printf("clean_bbp transaction failed\n");
2061 8 : BBPreclaim(b);
2062 : }
2063 :
2064 : /* Load data from the logger logdir
2065 : * Initialize new directories and catalog files if none are present,
2066 : * unless running in read-only mode
2067 : * Load data and persist it in the BATs */
2068 : static gdk_return
2069 358 : log_load(const char *fn, logger *lg, char filename[FILENAME_MAX])
2070 : {
2071 358 : FILE *fp = NULL;
2072 358 : char bak[FILENAME_MAX];
2073 358 : bat catalog_bid, catalog_id, dcatalog;
2074 358 : bool needcommit = false;
2075 358 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
2076 358 : bool readlogs = false;
2077 358 : bool needsnew = false; /* need to write new log file? */
2078 :
2079 : /* refactor */
2080 358 : if (!LOG_DISABLED(lg)) {
2081 357 : if (log_filename(lg, bak, filename) != GDK_SUCCEED)
2082 0 : goto error;
2083 : }
2084 :
2085 358 : lg->catalog_bid = NULL;
2086 358 : lg->catalog_id = NULL;
2087 358 : lg->catalog_cnt = NULL;
2088 358 : lg->catalog_lid = NULL;
2089 358 : lg->dcatalog = NULL;
2090 :
2091 358 : lg->seqs_id = NULL;
2092 358 : lg->seqs_val = NULL;
2093 358 : lg->dseqs = NULL;
2094 :
2095 358 : if (!LOG_DISABLED(lg)) {
2096 : /* try to open logfile backup, or failing that, the file
2097 : * itself. we need to know whether this file exists when
2098 : * checking the database consistency later on */
2099 357 : if ((fp = MT_fopen(bak, "r")) != NULL) {
2100 0 : fclose(fp);
2101 0 : fp = NULL;
2102 0 : if (GDKunlink(0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED ||
2103 0 : GDKmove(0, lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL, true) != GDK_SUCCEED)
2104 0 : goto error;
2105 357 : } else if (errno != ENOENT) {
2106 0 : GDKsyserror("open %s failed", bak);
2107 0 : goto error;
2108 : }
2109 357 : fp = MT_fopen(filename, "r");
2110 357 : if (fp == NULL && errno != ENOENT) {
2111 0 : GDKsyserror("open %s failed", filename);
2112 0 : goto error;
2113 : }
2114 : }
2115 :
2116 358 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2117 358 : catalog_bid = BBPindex(bak);
2118 :
2119 : /* initialize arrays for type mapping, to be read from disk */
2120 358 : memset(lg->type_id, -1, sizeof(lg->type_id));
2121 358 : memset(lg->type_nr, 255, sizeof(lg->type_nr));
2122 :
2123 : /* this is intentional - if catalog_bid is 0, force it to find
2124 : * the persistent catalog */
2125 358 : if (catalog_bid == 0) {
2126 : /* catalog does not exist, so the log file also
2127 : * shouldn't exist */
2128 230 : if (fp != NULL) {
2129 0 : GDKerror("there is no logger catalog, "
2130 : "but there is a log file.\n");
2131 0 : goto error;
2132 : }
2133 :
2134 230 : lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2135 230 : lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2136 230 : lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
2137 :
2138 230 : if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
2139 0 : GDKerror("cannot create catalog bats");
2140 0 : goto error;
2141 : }
2142 230 : TRC_INFO(WAL, "create %s catalog\n", fn);
2143 :
2144 : /* give the catalog bats names so we can find them
2145 : * next time */
2146 230 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2147 230 : if (BBPrename(lg->catalog_bid, bak) < 0) {
2148 0 : goto error;
2149 : }
2150 :
2151 230 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2152 230 : if (BBPrename(lg->catalog_id, bak) < 0) {
2153 0 : goto error;
2154 : }
2155 :
2156 230 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2157 230 : if (BBPrename(lg->dcatalog, bak) < 0) {
2158 0 : goto error;
2159 : }
2160 :
2161 230 : if (!LOG_DISABLED(lg)) {
2162 229 : if (GDKcreatedir(filename) != GDK_SUCCEED) {
2163 0 : GDKerror("cannot create directory for log file %s\n", filename);
2164 0 : goto error;
2165 : }
2166 229 : if (log_create_types_file(lg, filename) != GDK_SUCCEED)
2167 0 : goto error;
2168 : }
2169 :
2170 230 : BBPretain(lg->catalog_bid->batCacheid);
2171 230 : BBPretain(lg->catalog_id->batCacheid);
2172 230 : BBPretain(lg->dcatalog->batCacheid);
2173 :
2174 230 : log_lock(lg);
2175 : /* bm_subcommit releases the lock */
2176 230 : if (bm_subcommit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2177 : /* cannot commit catalog, so remove log */
2178 0 : if (MT_remove(filename) < 0)
2179 0 : GDKsyserror("remove %s failed\n", filename);
2180 0 : BBPrelease(lg->catalog_bid->batCacheid);
2181 0 : BBPrelease(lg->catalog_id->batCacheid);
2182 0 : BBPrelease(lg->dcatalog->batCacheid);
2183 0 : goto error;
2184 : }
2185 : } else {
2186 : /* find the persistent catalog. As non persistent bats
2187 : * require a logical reference we also add a logical
2188 : * reference for the persistent bats */
2189 128 : BUN p, q;
2190 128 : BAT *b, *o, *d;
2191 :
2192 128 : assert(!lg->inmemory);
2193 :
2194 : /* the catalog exists, and so should the log file */
2195 128 : if (fp == NULL && !LOG_DISABLED(lg)) {
2196 0 : GDKerror("There is a logger catalog, but no log file.\n");
2197 0 : goto error;
2198 : }
2199 : if (fp != NULL) {
2200 : /* check_version always closes fp */
2201 128 : if (check_version(lg, fp, &needsnew) != GDK_SUCCEED) {
2202 0 : fp = NULL;
2203 0 : goto error;
2204 : }
2205 : readlogs = true;
2206 : fp = NULL;
2207 : }
2208 :
2209 128 : if (lg->catalog_bid == NULL && lg->catalog_id == NULL && lg->dcatalog == NULL) {
2210 128 : b = BATdescriptor(catalog_bid);
2211 128 : if (b == NULL) {
2212 0 : GDKerror("inconsistent database, catalog does not exist");
2213 0 : goto error;
2214 : }
2215 :
2216 128 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2217 128 : catalog_id = BBPindex(bak);
2218 128 : o = BATdescriptor(catalog_id);
2219 128 : if (o == NULL) {
2220 0 : BBPunfix(b->batCacheid);
2221 0 : GDKerror("inconsistent database, catalog_id does not exist");
2222 0 : goto error;
2223 : }
2224 :
2225 128 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2226 128 : dcatalog = BBPindex(bak);
2227 128 : d = BATdescriptor(dcatalog);
2228 128 : if (d == NULL) {
2229 0 : GDKerror("cannot create dcatalog bat");
2230 0 : BBPunfix(b->batCacheid);
2231 0 : BBPunfix(o->batCacheid);
2232 0 : goto error;
2233 : }
2234 :
2235 128 : lg->catalog_bid = b;
2236 128 : lg->catalog_id = o;
2237 128 : lg->dcatalog = d;
2238 128 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
2239 33560 : BATloop(lg->catalog_bid, p, q) {
2240 33432 : bat bid = bids[p];
2241 33432 : oid pos = p;
2242 :
2243 33432 : if (BBPretain(bid) == 0 && /* any bid in the catalog_bid, needs one logical ref */
2244 0 : BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
2245 0 : BUNappend(lg->dcatalog, &pos, true) != GDK_SUCCEED)
2246 0 : goto error;
2247 : }
2248 : }
2249 128 : if ((lg->catalog_bid = BATsetaccess(lg->catalog_bid, BAT_READ)) == NULL ||
2250 128 : (lg->catalog_id = BATsetaccess(lg->catalog_id, BAT_READ)) == NULL ||
2251 128 : (lg->dcatalog = BATsetaccess(lg->dcatalog, BAT_READ)) == NULL) {
2252 0 : goto error;
2253 : }
2254 128 : BBPretain(lg->catalog_bid->batCacheid);
2255 128 : BBPretain(lg->catalog_id->batCacheid);
2256 128 : BBPretain(lg->dcatalog->batCacheid);
2257 : }
2258 : /* failing to rename the catalog_cnt and catalog_lid bats is not
2259 : * fatal */
2260 358 : lg->catalog_cnt = logbat_new(TYPE_lng, 1, SYSTRANS);
2261 358 : if (lg->catalog_cnt == NULL) {
2262 0 : GDKerror("failed to create catalog_cnt bat");
2263 0 : goto error;
2264 : }
2265 358 : strconcat_len(bak, sizeof(bak), fn, "_catalog_cnt", NULL);
2266 358 : if (BBPrename(lg->catalog_cnt, bak) < 0)
2267 0 : GDKclrerr();
2268 358 : lg->catalog_lid = logbat_new(TYPE_lng, 1, SYSTRANS);
2269 358 : if (lg->catalog_lid == NULL) {
2270 0 : GDKerror("failed to create catalog_lid bat");
2271 0 : goto error;
2272 : }
2273 358 : strconcat_len(bak, sizeof(bak), fn, "_catalog_lid", NULL);
2274 358 : if (BBPrename(lg->catalog_lid, bak) < 0)
2275 0 : GDKclrerr();
2276 358 : if (bm_get_counts(lg) != GDK_SUCCEED)
2277 0 : goto error;
2278 :
2279 358 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2280 358 : if (BBPindex(bak)) {
2281 128 : lg->seqs_id = BATdescriptor(BBPindex(bak));
2282 128 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2283 128 : lg->seqs_val = BATdescriptor(BBPindex(bak));
2284 128 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2285 128 : lg->dseqs = BATdescriptor(BBPindex(bak));
2286 128 : if (lg->seqs_id == NULL ||
2287 128 : lg->seqs_val == NULL ||
2288 : lg->dseqs == NULL) {
2289 0 : GDKerror("Logger_new: cannot load seqs bats");
2290 0 : goto error;
2291 : }
2292 128 : if ((lg->seqs_val = BATsetaccess(lg->seqs_val, BAT_READ)) == NULL ||
2293 128 : (lg->seqs_id = BATsetaccess(lg->seqs_id, BAT_READ)) == NULL ||
2294 128 : (lg->dseqs = BATsetaccess(lg->dseqs, BAT_READ)) == NULL) {
2295 0 : goto error;
2296 : }
2297 : } else {
2298 230 : lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
2299 230 : lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
2300 230 : lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
2301 230 : if (lg->seqs_id == NULL ||
2302 230 : lg->seqs_val == NULL ||
2303 : lg->dseqs == NULL) {
2304 0 : GDKerror("Logger_new: cannot create seqs bats");
2305 0 : goto error;
2306 : }
2307 :
2308 230 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2309 230 : if (BBPrename(lg->seqs_id, bak) < 0) {
2310 0 : goto error;
2311 : }
2312 :
2313 230 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2314 230 : if (BBPrename(lg->seqs_val, bak) < 0) {
2315 0 : goto error;
2316 : }
2317 :
2318 230 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2319 230 : if (BBPrename(lg->dseqs, bak) < 0) {
2320 0 : goto error;
2321 : }
2322 : needcommit = true;
2323 : }
2324 358 : dbg = ATOMIC_GET(&GDKdebug);
2325 358 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2326 358 : if (needcommit && bm_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2327 0 : GDKerror("Logger_new: commit failed");
2328 0 : goto error;
2329 : }
2330 358 : ATOMIC_SET(&GDKdebug, dbg);
2331 :
2332 358 : if (readlogs) {
2333 128 : ulng log_id = lg->saved_id + 1;
2334 128 : bool earlyexit = GDKgetenv_isyes("process-wal-and-exit");
2335 128 : if (log_readlogs(lg, filename) != GDK_SUCCEED) {
2336 0 : goto error;
2337 : }
2338 128 : if (!earlyexit) {
2339 128 : if (lg->postfuncp && (*lg->postfuncp) (lg->funcdata, lg) != GDK_SUCCEED)
2340 0 : goto error;
2341 128 : if (needsnew) {
2342 16 : if (GDKmove(0, lg->dir, LOGFILE, NULL, lg->dir, LOGFILE, "bak", true) != GDK_SUCCEED) {
2343 0 : TRC_CRITICAL(GDK, "couldn't move log to log.bak\n");
2344 0 : return GDK_FAIL;
2345 : }
2346 16 : if (log_create_types_file(lg, filename) != GDK_SUCCEED) {
2347 0 : TRC_CRITICAL(GDK, "couldn't write new log\n");
2348 0 : return GDK_FAIL;
2349 : }
2350 : }
2351 : }
2352 128 : dbg = ATOMIC_GET(&GDKdebug);
2353 128 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2354 128 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2355 0 : goto error;
2356 : }
2357 128 : ATOMIC_SET(&GDKdebug, dbg);
2358 347 : for (; log_id <= lg->saved_id; log_id++)
2359 219 : (void) log_cleanup(lg, log_id); /* ignore error of removing file */
2360 128 : if (earlyexit) {
2361 0 : printf("# mserver5 exiting\n");
2362 0 : exit(0);
2363 : }
2364 144 : if (needsnew &&
2365 16 : GDKunlink(0, lg->dir, LOGFILE, "bak") != GDK_SUCCEED) {
2366 0 : TRC_CRITICAL(GDK, "couldn't remove old log.bak file\n");
2367 0 : return GDK_FAIL;
2368 : }
2369 : } else {
2370 230 : lg->id = lg->saved_id + 1;
2371 230 : if (GDKgetenv_isyes("process-wal-and-exit")) {
2372 0 : printf("# mserver5 exiting\n");
2373 0 : exit(0);
2374 : }
2375 : }
2376 : #ifdef GDKLIBRARY_JSON
2377 358 : if (log_json_upgrade_finalize() == GDK_FAIL)
2378 0 : goto error;
2379 : #endif
2380 358 : if (GDKgetenv_isyes("clean-BBP"))
2381 8 : clean_bbp(lg);
2382 : return GDK_SUCCEED;
2383 0 : error:
2384 0 : if (fp)
2385 0 : fclose(fp);
2386 0 : logbat_destroy(lg->catalog_bid);
2387 0 : logbat_destroy(lg->catalog_id);
2388 0 : logbat_destroy(lg->dcatalog);
2389 0 : logbat_destroy(lg->seqs_id);
2390 0 : logbat_destroy(lg->seqs_val);
2391 0 : logbat_destroy(lg->dseqs);
2392 0 : MT_lock_destroy(&lg->lock);
2393 0 : MT_lock_destroy(&lg->rotation_lock);
2394 0 : GDKfree(lg->fn);
2395 0 : GDKfree(lg->dir);
2396 0 : GDKfree(lg->rbuf);
2397 0 : GDKfree(lg->wbuf);
2398 0 : GDKfree(lg);
2399 0 : ATOMIC_SET(&GDKdebug, dbg);
2400 : /* We do not call log_json_upgrade_finalize here because we want
2401 : * the upgrade to run again next time we try, so we do not want
2402 : * to remove the signal file just yet.
2403 : */
2404 0 : return GDK_FAIL;
2405 : }
2406 :
2407 : /* Initialize a new logger
2408 : * It will load any data in the logdir and persist it in the BATs*/
2409 : static logger *
2410 358 : log_new(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp,
2411 : postversionfix_fptr postfuncp, void *funcdata)
2412 : {
2413 358 : logger *lg;
2414 358 : char filename[FILENAME_MAX];
2415 :
2416 358 : lng max_dropped = GDKgetenv_int("wal_max_dropped", 100000);
2417 358 : lng max_file_age = GDKgetenv_int("wal_max_file_age", 600);
2418 358 : lng max_file_size = 0;
2419 :
2420 358 : if (GDKdebug & TESTINGMASK) {
2421 : max_file_size = 2048; /* 2 KiB */
2422 : } else {
2423 12 : const char *max_file_size_str = GDKgetenv("wal_max_file_size");
2424 12 : max_file_size = max_file_size_str ? strtoul(max_file_size_str, NULL, 10) : 2147483648;
2425 : }
2426 :
2427 358 : if (!GDKinmemory(0) && MT_path_absolute(logdir)) {
2428 0 : TRC_CRITICAL(GDK, "logdir must be relative path\n");
2429 0 : return NULL;
2430 : }
2431 :
2432 358 : if (snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP) >= FILENAME_MAX) {
2433 0 : TRC_CRITICAL(GDK, "filename is too large\n");
2434 0 : return NULL;
2435 : }
2436 :
2437 358 : lg = GDKmalloc(sizeof(struct logger));
2438 358 : if (lg == NULL) {
2439 0 : TRC_CRITICAL(GDK, "allocating logger structure failed\n");
2440 0 : return NULL;
2441 : }
2442 :
2443 716 : *lg = (logger) {
2444 358 : .inmemory = GDKinmemory(0),
2445 : .debug = debug,
2446 : .version = version,
2447 : .prefuncp = prefuncp,
2448 : .postfuncp = postfuncp,
2449 : .funcdata = funcdata,
2450 :
2451 358 : .max_dropped = max_dropped >= 0 ? max_dropped : 100000,
2452 : .file_age = 0,
2453 358 : .max_file_age = max_file_age >= 0 ? max_file_age * 1000000 : 600000000,
2454 358 : .max_file_size = max_file_size >= 0 ? max_file_size : 2147483648,
2455 :
2456 : .id = 0,
2457 358 : .saved_id = getBBPlogno(), /* get saved log number from bbp */
2458 : .nr_flushers = ATOMIC_VAR_INIT(0),
2459 358 : .fn = GDKstrdup(fn),
2460 358 : .dir = GDKstrdup(filename),
2461 : .rbufsize = 64 * 1024,
2462 358 : .rbuf = GDKmalloc(64 * 1024),
2463 : .wbufsize = 64 * 1024,
2464 358 : .wbuf = GDKmalloc(64 * 1024),
2465 : };
2466 :
2467 : /* probably open file and check version first, then call call old logger code */
2468 358 : if (lg->fn == NULL ||
2469 358 : lg->dir == NULL ||
2470 358 : lg->rbuf == NULL ||
2471 : lg->wbuf == NULL) {
2472 0 : TRC_CRITICAL(GDK, "allocating for logger structure failed\n");
2473 0 : GDKfree(lg->fn);
2474 0 : GDKfree(lg->dir);
2475 0 : GDKfree(lg->rbuf);
2476 0 : GDKfree(lg->wbuf);
2477 0 : GDKfree(lg);
2478 0 : return NULL;
2479 : }
2480 358 : TRC_DEBUG(WAL, "dir set to %s\n", lg->dir);
2481 :
2482 358 : MT_lock_init(&lg->lock, fn);
2483 358 : MT_lock_init(&lg->rotation_lock, "rotation_lock");
2484 358 : MT_lock_init(&lg->flush_lock, "flush_lock");
2485 358 : MT_cond_init(&lg->excl_flush_cv, "flush_cond");
2486 :
2487 358 : if (log_load(fn, lg, filename) == GDK_SUCCEED) {
2488 : return lg;
2489 : }
2490 : return NULL;
2491 : }
2492 :
2493 : static logged_range *
2494 68761 : do_flush_range_cleanup(logger *lg)
2495 : {
2496 68761 : logged_range *frange = lg->flush_ranges;
2497 68761 : logged_range *first = frange;
2498 :
2499 68761 : if (frange == NULL)
2500 : return NULL;
2501 81286 : while (frange->next) {
2502 12685 : if (ATOMIC_GET(&frange->refcount) > 1)
2503 : break;
2504 12525 : frange = frange->next;
2505 : }
2506 68761 : if (first == frange) {
2507 : return first;
2508 : }
2509 :
2510 12525 : logged_range *flast = frange;
2511 :
2512 12525 : lg->flush_ranges = flast;
2513 :
2514 25050 : for (frange = first; frange && frange != flast; frange = frange->next) {
2515 12525 : ATOMIC_DEC(&frange->refcount);
2516 12525 : if (!LOG_DISABLED(lg) && frange->output_log) {
2517 0 : TRC_INFO(WAL, "closing output log %s", mnstr_name(frange->output_log));
2518 0 : close_stream(frange->output_log);
2519 0 : frange->output_log = NULL;
2520 : }
2521 : }
2522 : return flast;
2523 : }
2524 :
2525 : void
2526 357 : log_destroy(logger *lg)
2527 : {
2528 357 : log_close_input(lg);
2529 357 : logged_range *last = do_flush_range_cleanup(lg);
2530 357 : (void) last;
2531 357 : assert(last == lg->current && last == lg->flush_ranges);
2532 357 : log_close_output(lg);
2533 819 : for (logged_range * p = lg->pending; p; p = lg->pending) {
2534 462 : lg->pending = p->next;
2535 462 : GDKfree(p);
2536 : }
2537 357 : if (LOG_DISABLED(lg)) {
2538 1 : lg->saved_id = lg->id;
2539 1 : lg->saved_tid = lg->tid;
2540 1 : log_commit(lg, NULL, NULL, 0);
2541 : }
2542 357 : if (lg->catalog_bid) {
2543 357 : log_lock(lg);
2544 357 : BUN p, q;
2545 357 : BAT *b = lg->catalog_bid;
2546 :
2547 : /* free resources */
2548 357 : const log_bid *bids = (const log_bid *) Tloc(b, 0);
2549 94634 : BATloop(b, p, q) {
2550 94277 : bat bid = bids[p];
2551 :
2552 94277 : BBPrelease(bid);
2553 : }
2554 :
2555 357 : BBPrelease(lg->catalog_bid->batCacheid);
2556 357 : BBPrelease(lg->catalog_id->batCacheid);
2557 357 : BBPrelease(lg->dcatalog->batCacheid);
2558 357 : logbat_destroy(lg->catalog_bid);
2559 357 : logbat_destroy(lg->catalog_id);
2560 357 : logbat_destroy(lg->dcatalog);
2561 :
2562 357 : logbat_destroy(lg->catalog_cnt);
2563 357 : logbat_destroy(lg->catalog_lid);
2564 357 : log_unlock(lg);
2565 : }
2566 357 : MT_lock_destroy(&lg->lock);
2567 357 : MT_lock_destroy(&lg->rotation_lock);
2568 357 : MT_lock_destroy(&lg->flush_lock);
2569 357 : GDKfree(lg->fn);
2570 357 : GDKfree(lg->dir);
2571 357 : GDKfree(lg->rbuf);
2572 357 : GDKfree(lg->wbuf);
2573 357 : GDKfree(lg);
2574 357 : }
2575 :
2576 : /* Create a new logger */
2577 : logger *
2578 358 : log_create(int debug, const char *fn, const char *logdir, int version,
2579 : preversionfix_fptr prefuncp, postversionfix_fptr postfuncp,
2580 : void *funcdata)
2581 : {
2582 358 : logger *lg;
2583 358 : TRC_INFO_IF(WAL) {
2584 0 : TRC_INFO_ENDIF(WAL, "Started processing logs %s/%s version %d\n", fn, logdir, version);
2585 0 : GDKtracer_flush_buffer();
2586 : }
2587 358 : lg = log_new(debug, fn, logdir, version, prefuncp, postfuncp, funcdata);
2588 358 : if (lg == NULL)
2589 : return NULL;
2590 358 : TRC_INFO_IF(WAL) {
2591 0 : TRC_INFO_ENDIF(WAL, "Finished processing logs %s/%s\n", fn, logdir);
2592 0 : GDKtracer_flush_buffer();
2593 : }
2594 358 : if (GDKsetenv("recovery", "finished") != GDK_SUCCEED) {
2595 0 : log_destroy(lg);
2596 0 : return NULL;
2597 : }
2598 358 : assert(lg->current == NULL);
2599 358 : logged_range dummy = {
2600 358 : .cnt = BATcount(lg->catalog_bid),
2601 : };
2602 358 : lg->current = &dummy;
2603 358 : if (log_open_output(lg) != GDK_SUCCEED) {
2604 0 : lg->current = NULL;
2605 0 : log_destroy(lg);
2606 0 : return NULL;
2607 : }
2608 358 : lg->current = lg->current->next;
2609 358 : assert(lg->pending == NULL && lg->flush_ranges == NULL);
2610 358 : lg->pending = lg->current;
2611 358 : lg->flush_ranges = lg->current;
2612 358 : return lg;
2613 : }
2614 :
2615 : static logged_range *
2616 6919 : log_next_logfile(logger *lg, ulng ts)
2617 : {
2618 6919 : int m = (ATOMIC_GET(&GDKdebug) & TESTINGMASK) ? 1000 : 100;
2619 6919 : if (!lg->pending || !lg->pending->next)
2620 : return NULL;
2621 6919 : rotation_lock(lg);
2622 6919 : if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && lg->pending != lg->flush_ranges &&
2623 6919 : (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) ATOMIC_GET(&lg->pending->flushed_ts) &&
2624 6919 : (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) {
2625 6319 : logged_range *p = lg->pending;
2626 6319 : for (int i = 1;
2627 12420 : i < m && ATOMIC_GET(&p->refcount) == 0 && p->next && p->next != lg->current &&
2628 6102 : p->next != lg->flush_ranges && (ulng) ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts)
2629 18522 : && (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++)
2630 6101 : p = p->next;
2631 6319 : rotation_unlock(lg);
2632 6319 : return p;
2633 : }
2634 600 : rotation_unlock(lg);
2635 600 : return NULL;
2636 : }
2637 :
2638 : static void
2639 6319 : log_cleanup_range(logger *lg, ulng id)
2640 : {
2641 6319 : rotation_lock(lg);
2642 18739 : while (lg->pending && lg->pending->id <= id) {
2643 12420 : logged_range *p;
2644 12420 : p = lg->pending;
2645 12420 : if (p)
2646 12420 : lg->pending = p->next;
2647 12420 : GDKfree(p);
2648 : }
2649 6319 : rotation_unlock(lg);
2650 6319 : }
2651 :
2652 : static void
2653 68407 : do_rotate(logger *lg)
2654 : {
2655 68407 : logged_range *cur = lg->current;
2656 68407 : logged_range *next = cur->next;
2657 68407 : if (next) {
2658 12525 : assert(ATOMIC_GET(&next->refcount) == 1);
2659 12525 : lg->current = next;
2660 12525 : if (!LOG_DISABLED(lg) && ATOMIC_GET(&cur->refcount) == 1 && cur->output_log) {
2661 12416 : close_stream(cur->output_log);
2662 12416 : cur->output_log = NULL;
2663 : }
2664 : }
2665 68407 : }
2666 :
2667 : gdk_return
2668 7515 : log_activate(logger *lg)
2669 : {
2670 7515 : bool flush_cleanup = false;
2671 7515 : gdk_return res = GDK_SUCCEED;
2672 :
2673 7515 : rotation_lock(lg);
2674 7515 : const lng current_file_size = LOG_DISABLED(lg) ? 0 : (lng) getfilepos(getFile(lg->current->output_log));
2675 :
2676 7515 : if (current_file_size == -1) {
2677 0 : rotation_unlock(lg);
2678 0 : return GDK_FAIL;
2679 : }
2680 : /* file size of 2 means only endian indicator present
2681 : * (i.e. effectively empty) */
2682 7515 : if (current_file_size <= 2) {
2683 5562 : rotation_unlock(lg);
2684 5562 : return GDK_SUCCEED;
2685 : }
2686 :
2687 1953 : if (!lg->flushnow &&
2688 1953 : !lg->current->next &&
2689 1953 : current_file_size > 2 &&
2690 1953 : (ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped ||
2691 1946 : current_file_size > lg->max_file_size ||
2692 1929 : (GDKusec() - lg->file_age) > lg->max_file_age) &&
2693 24 : (ulng) ATOMIC_GET(&lg->current->last_ts) > 0 &&
2694 24 : lg->saved_id + 1 == lg->id &&
2695 23 : ATOMIC_GET(&lg->current->refcount) == 1 /* no pending work on this file */ ) {
2696 23 : lg->id++;
2697 : /* start new file */
2698 23 : res = log_open_output(lg);
2699 23 : flush_cleanup = true;
2700 23 : do_rotate(lg);
2701 : }
2702 23 : if (flush_cleanup)
2703 23 : (void) do_flush_range_cleanup(lg);
2704 1953 : rotation_unlock(lg);
2705 1953 : return res;
2706 : }
2707 :
2708 : gdk_return
2709 6919 : log_flush(logger *lg, ulng ts)
2710 : {
2711 6919 : logged_range *pending = log_next_logfile(lg, ts);
2712 6919 : ulng lid = pending ? pending->id : 0, olid = lg->saved_id;
2713 6919 : if (LOG_DISABLED(lg)) {
2714 0 : lg->saved_id = lid;
2715 0 : lg->saved_tid = lg->tid;
2716 0 : if (lid)
2717 0 : log_cleanup_range(lg, lg->saved_id);
2718 0 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED)
2719 0 : TRC_ERROR(GDK, "failed to commit");
2720 0 : return GDK_SUCCEED;
2721 : }
2722 6919 : if (lg->saved_id >= lid)
2723 : return GDK_SUCCEED;
2724 6319 : rotation_lock(lg);
2725 6319 : ulng lgid = lg->id;
2726 6319 : rotation_unlock(lg);
2727 6319 : if (lg->saved_id + 1 >= lgid) /* logger should first release the file */
2728 : return GDK_SUCCEED;
2729 6319 : log_return res = LOG_OK;
2730 6319 : ulng cid = olid;
2731 6319 : assert(lid <= lgid);
2732 : uint32_t *updated = NULL;
2733 : BUN nupdated = 0;
2734 : size_t allocated = 0;
2735 18739 : while (cid < lid && res == LOG_OK) {
2736 12420 : if (!lg->input_log) {
2737 12420 : char filename[MAXPATH];
2738 12420 : char id[32];
2739 12420 : if (snprintf(id, sizeof(id), LLFMT, cid + 1) >= (int) sizeof(id)) {
2740 0 : GDKfree(updated);
2741 0 : TRC_CRITICAL(GDK, "log_id filename is too large\n");
2742 0 : return GDK_FAIL;
2743 : }
2744 12420 : if (GDKfilepath(filename, sizeof(filename), BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id) != GDK_SUCCEED) {
2745 0 : GDKfree(updated);
2746 0 : return GDK_FAIL;
2747 : }
2748 12420 : if (strlen(filename) >= FILENAME_MAX) {
2749 : GDKfree(updated);
2750 : TRC_CRITICAL(GDK, "Logger filename path is too large\n");
2751 : return GDK_FAIL;
2752 : }
2753 :
2754 12420 : bool filemissing = false;
2755 12420 : if (log_open_input(lg, filename, &filemissing) != GDK_SUCCEED) {
2756 0 : GDKfree(updated);
2757 0 : return GDK_FAIL;
2758 : }
2759 : }
2760 : /* we read the full file because skipping is impossible with current log format */
2761 12420 : log_lock(lg);
2762 12420 : if (updated == NULL) {
2763 6319 : nupdated = BATcount(lg->catalog_id);
2764 6319 : allocated = ((nupdated + 31) & ~31) / 8;
2765 6319 : if (allocated == 0)
2766 0 : allocated = 4;
2767 6319 : updated = GDKzalloc(allocated);
2768 6319 : if (updated == NULL) {
2769 0 : log_unlock(lg);
2770 0 : return GDK_FAIL;
2771 : }
2772 6101 : } else if (nupdated < BATcount(lg->catalog_id)) {
2773 118 : BUN n = BATcount(lg->catalog_id);
2774 118 : size_t a = ((n + 31) & ~31) / 8;
2775 118 : if (a > allocated) {
2776 13 : uint32_t *p = GDKrealloc(updated, a);
2777 13 : if (p == NULL) {
2778 0 : GDKfree(updated);
2779 0 : log_unlock(lg);
2780 0 : return GDK_FAIL;
2781 : }
2782 13 : updated = p;
2783 13 : memset(updated + allocated / 4, 0, a - allocated);
2784 13 : allocated = a;
2785 : }
2786 : nupdated = n;
2787 : }
2788 12420 : lg->flushing = true;
2789 12420 : res = log_read_transaction(lg, updated, nupdated);
2790 12420 : lg->flushing = false;
2791 12420 : log_unlock(lg);
2792 12420 : if (res == LOG_EOF) {
2793 12420 : log_close_input(lg);
2794 12420 : res = LOG_OK;
2795 : }
2796 12420 : cid++;
2797 : }
2798 6319 : if (lid > olid && res == LOG_OK) {
2799 6319 : rotation_lock(lg); /* protect against concurrent log_tflush rotate check */
2800 6319 : lg->saved_id = lid;
2801 6319 : rotation_unlock(lg);
2802 6319 : if (log_commit(lg, pending, updated, nupdated) != GDK_SUCCEED) {
2803 0 : TRC_ERROR(GDK, "failed to commit");
2804 0 : res = LOG_ERR;
2805 0 : rotation_lock(lg);
2806 0 : lg->saved_id = olid; /* reset !! */
2807 0 : rotation_unlock(lg);
2808 : }
2809 0 : if (res != LOG_ERR) {
2810 18739 : while (olid < lid) {
2811 : /* Try to cleanup, remove old log file, continue on failure! */
2812 12420 : olid++;
2813 12420 : (void) log_cleanup(lg, olid);
2814 : }
2815 : }
2816 6319 : if (res == LOG_OK)
2817 6319 : log_cleanup_range(lg, lg->saved_id);
2818 : }
2819 6319 : GDKfree(updated);
2820 6319 : return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
2821 : }
2822 :
2823 : /* Clean-up write-ahead log files already persisted in the BATs, leaving only the most recent one.
2824 : * Only the bak- files are deleted for the preserved WAL files.
2825 : */
2826 : lng
2827 17924 : log_changes(logger *lg)
2828 : {
2829 17924 : if (LOG_DISABLED(lg))
2830 : return 0;
2831 17924 : rotation_lock(lg);
2832 17924 : lng changes = lg->id - lg->saved_id - 1;
2833 17924 : rotation_unlock(lg);
2834 17924 : return changes;
2835 : }
2836 :
2837 : int
2838 505 : log_sequence(logger *lg, int seq, lng *id)
2839 : {
2840 505 : log_lock(lg);
2841 505 : BUN p = log_find(lg->seqs_id, lg->dseqs, seq);
2842 :
2843 505 : if (p != BUN_NONE) {
2844 370 : *id = *(lng *) Tloc(lg->seqs_val, p);
2845 :
2846 370 : log_unlock(lg);
2847 370 : return 1;
2848 : }
2849 135 : log_unlock(lg);
2850 135 : return 0;
2851 : }
2852 :
2853 : gdk_return
2854 199880 : log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng cnt)
2855 : {
2856 199880 : bte tpe = find_type(lg, type);
2857 199880 : gdk_return ok = GDK_SUCCEED;
2858 199880 : logformat l;
2859 199880 : lng nr;
2860 199880 : l.flag = LOG_UPDATE_CONST;
2861 199880 : l.id = id;
2862 199880 : nr = cnt;
2863 :
2864 199880 : if (LOG_DISABLED(lg) || !nr) {
2865 : /* logging is switched off */
2866 75441 : if (nr) {
2867 75441 : log_lock(lg);
2868 75441 : ok = la_bat_update_count(lg, id, offset + cnt, lg->tid);
2869 75441 : log_unlock(lg);
2870 : }
2871 75441 : return ok;
2872 : }
2873 :
2874 124439 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[type].atomWrite;
2875 :
2876 124439 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2877 248878 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
2878 248878 : log_write_format(lg, &l) != GDK_SUCCEED ||
2879 248878 : !mnstr_writeLng(lg->current->output_log, nr) ||
2880 248878 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2881 124439 : !mnstr_writeLng(lg->current->output_log, offset)) {
2882 0 : ATOMIC_DEC(&lg->current->refcount);
2883 0 : ok = GDK_FAIL;
2884 0 : goto bailout;
2885 : }
2886 :
2887 124439 : ok = wt(val, lg->current->output_log, 1);
2888 :
2889 124439 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
2890 :
2891 124439 : bailout:
2892 124439 : if (ok != GDK_SUCCEED) {
2893 0 : const char *err = mnstr_peek_error(lg->current->output_log);
2894 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
2895 : }
2896 : return ok;
2897 : }
2898 :
2899 : static gdk_return
2900 19762 : string_writer(logger *lg, BAT *b, lng offset, lng nr)
2901 : {
2902 19762 : size_t bufsz = lg->wbufsize, resize = 0;
2903 19762 : BUN end = (BUN) (offset + nr);
2904 19762 : char *buf = lg->wbuf;
2905 19762 : gdk_return res = GDK_SUCCEED;
2906 :
2907 19762 : if (!buf)
2908 : return GDK_FAIL;
2909 19762 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2910 19762 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
2911 : return GDK_FAIL;
2912 19762 : BATiter bi = bat_iterator(b);
2913 19762 : BUN p = (BUN) offset;
2914 39576 : for (; p < end;) {
2915 19814 : size_t sz = 0;
2916 19814 : if (resize) {
2917 2 : if ((buf = GDKrealloc(lg->wbuf, resize)) == NULL) {
2918 : res = GDK_FAIL;
2919 : break;
2920 : }
2921 2 : lg->wbuf = buf;
2922 2 : lg->wbufsize = bufsz = resize;
2923 2 : resize = 0;
2924 : }
2925 19814 : char *dst = buf;
2926 91789 : for (; p < end && sz < bufsz; p++) {
2927 72027 : const char *s = BUNtvar(bi, p);
2928 72027 : size_t len = strlen(s) + 1;
2929 72027 : if ((sz + len) > bufsz) {
2930 52 : if (len > bufsz)
2931 2 : resize = len + bufsz;
2932 : break;
2933 : } else {
2934 71975 : memcpy(dst, s, len);
2935 71975 : dst += len;
2936 71975 : sz += len;
2937 : }
2938 : }
2939 39626 : if (sz &&
2940 39624 : (!mnstr_writeLng(lg->current->output_log, (lng) sz) ||
2941 19812 : mnstr_write(lg->current->output_log, buf, sz, 1) != 1)) {
2942 : res = GDK_FAIL;
2943 : break;
2944 : }
2945 : }
2946 19762 : bat_iterator_end(&bi);
2947 19762 : return res;
2948 : }
2949 :
2950 : static gdk_return
2951 523952 : internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced, lng total_cnt)
2952 : {
2953 523952 : bte tpe = find_type(lg, b->ttype);
2954 523952 : gdk_return ok = GDK_SUCCEED;
2955 523952 : logformat l;
2956 523952 : BUN p;
2957 523952 : lng nr;
2958 523952 : l.flag = LOG_UPDATE_BULK;
2959 523952 : l.id = id;
2960 523952 : nr = cnt;
2961 :
2962 523952 : if (LOG_DISABLED(lg) || !nr) {
2963 : /* logging is switched off */
2964 222692 : if (nr)
2965 167115 : return la_bat_update_count(lg, id, offset + cnt, lg->tid);
2966 : return GDK_SUCCEED;
2967 : }
2968 :
2969 272164 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[b->ttype].atomWrite;
2970 :
2971 272164 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2972 272164 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR) {
2973 0 : ok = GDK_FAIL;
2974 0 : goto bailout;
2975 : }
2976 :
2977 272164 : if (lg->total_cnt == 0) /* signals single bulk message or first part of bat logged in parts */
2978 543232 : if (log_write_format(lg, &l) != GDK_SUCCEED ||
2979 672055 : !mnstr_writeLng(lg->current->output_log, total_cnt ? total_cnt : cnt) ||
2980 543232 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2981 414409 : !mnstr_writeLng(lg->current->output_log, total_cnt ? -1 : offset)) { /* offset = -1 indicates bat was logged in parts */
2982 0 : ok = GDK_FAIL;
2983 0 : goto bailout;
2984 : }
2985 272164 : if (!total_cnt)
2986 128823 : total_cnt = cnt;
2987 272164 : lg->total_cnt += cnt;
2988 :
2989 272164 : if (lg->total_cnt == total_cnt) /* This is the last to be logged part of this bat, we can already reset the total_cnt */
2990 271616 : lg->total_cnt = 0;
2991 :
2992 : /* if offset is just for the log, but BAT is already sliced, reset offset */
2993 272164 : if (sliced)
2994 1512 : offset = 0;
2995 272164 : BATiter bi = bat_iterator(b);
2996 272164 : if (b->ttype == TYPE_msk) {
2997 0 : if (offset % 32 == 0) {
2998 0 : if (!mnstr_writeIntArray(lg->current->output_log, (int *) ((char *) bi.base + offset / 32),
2999 0 : (size_t) ((nr + 31) / 32)))
3000 0 : ok = GDK_FAIL;
3001 : } else {
3002 0 : for (lng i = 0; i < nr; i += 32) {
3003 : uint32_t v = 0;
3004 0 : for (int j = 0; j < 32 && i + j < nr; j++)
3005 0 : v |= (uint32_t) Tmskval(&bi, (BUN) (offset + i + j)) << j;
3006 0 : if (!mnstr_writeInt(lg->current->output_log, (int) v)) {
3007 : ok = GDK_FAIL;
3008 : break;
3009 : }
3010 : }
3011 : }
3012 523929 : } else if (b->ttype < TYPE_str && bi.h->parentid == b->batCacheid) {
3013 251765 : const void *t = BUNtail(bi, (BUN) offset);
3014 :
3015 251765 : ok = wt(t, lg->current->output_log, (size_t) nr);
3016 20399 : } else if (b->ttype == TYPE_str) {
3017 : /* efficient string writes */
3018 19756 : ok = string_writer(lg, b, offset, nr);
3019 : } else {
3020 643 : BUN end = (BUN) (offset + nr);
3021 1644 : for (p = (BUN) offset; p < end && ok == GDK_SUCCEED; p++) {
3022 1001 : const void *t = BUNtail(bi, p);
3023 :
3024 1001 : ok = wt(t, lg->current->output_log, 1);
3025 : }
3026 : }
3027 272164 : bat_iterator_end(&bi);
3028 :
3029 272164 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
3030 :
3031 272164 : bailout:
3032 272164 : if (ok != GDK_SUCCEED) {
3033 0 : ATOMIC_DEC(&lg->current->refcount);
3034 0 : const char *err = mnstr_peek_error(lg->current->output_log);
3035 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
3036 : }
3037 : return ok;
3038 : }
3039 :
3040 : /*
3041 : * Changes made to the BAT descriptor should be stored in the log
3042 : * files. Actually, we need to save the descriptor file, perhaps we
3043 : * should simply introduce a versioning scheme.
3044 : */
3045 : gdk_return
3046 238258 : log_bat_persists(logger *lg, BAT *b, log_id id)
3047 : {
3048 238258 : log_lock(lg);
3049 238258 : bte ta = find_type(lg, b->ttype);
3050 238258 : logformat l;
3051 :
3052 238258 : if (log_add_bat(lg, b, id, -1) != GDK_SUCCEED) {
3053 0 : log_unlock(lg);
3054 0 : if (!LOG_DISABLED(lg))
3055 0 : ATOMIC_DEC(&lg->current->refcount);
3056 0 : return GDK_FAIL;
3057 : }
3058 :
3059 238258 : l.flag = LOG_CREATE;
3060 238258 : l.id = id;
3061 238258 : if (!LOG_DISABLED(lg)) {
3062 156395 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3063 312790 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3064 312790 : log_write_format(lg, &l) != GDK_SUCCEED ||
3065 156395 : mnstr_write(lg->current->output_log, &ta, 1, 1) != 1) {
3066 0 : log_unlock(lg);
3067 0 : ATOMIC_DEC(&lg->current->refcount);
3068 0 : return GDK_FAIL;
3069 : }
3070 : }
3071 238258 : TRC_DEBUG(WAL, "id (%d) bat (%d)\n", id, b->batCacheid);
3072 238258 : gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0, 0);
3073 238258 : log_unlock(lg);
3074 238258 : if (r != GDK_SUCCEED)
3075 0 : ATOMIC_DEC(&lg->current->refcount);
3076 : return r;
3077 : }
3078 :
3079 : gdk_return
3080 22461 : log_bat_transient(logger *lg, log_id id)
3081 : {
3082 22461 : log_lock(lg);
3083 22461 : log_bid bid = internal_find_bat(lg, id, -1);
3084 22461 : logformat l;
3085 :
3086 22461 : if (bid < 0) {
3087 0 : log_unlock(lg);
3088 0 : return GDK_FAIL;
3089 : }
3090 22461 : if (!bid) {
3091 0 : GDKerror("log_bat_transient failed to find bid for object %d\n", id);
3092 0 : log_unlock(lg);
3093 0 : return GDK_FAIL;
3094 : }
3095 22461 : l.flag = LOG_DESTROY;
3096 22461 : l.id = id;
3097 :
3098 22461 : if (!LOG_DISABLED(lg)) {
3099 9792 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3100 0 : TRC_CRITICAL(GDK, "write failed\n");
3101 0 : log_unlock(lg);
3102 0 : ATOMIC_DEC(&lg->current->refcount);
3103 0 : return GDK_FAIL;
3104 : }
3105 : }
3106 22461 : TRC_DEBUG(WAL, "Logged destroyed bat (%d) %d\n", id, bid);
3107 22461 : BAT *b = BBPquickdesc(bid);
3108 22461 : assert(b);
3109 22461 : BUN cnt = BATcount(b);
3110 22461 : ATOMIC_ADD(&lg->current->drops, cnt);
3111 22461 : gdk_return r = log_del_bat(lg, bid);
3112 22461 : log_unlock(lg);
3113 22461 : if (r != GDK_SUCCEED)
3114 0 : ATOMIC_DEC(&lg->current->refcount);
3115 : return r;
3116 : }
3117 :
3118 : static gdk_return
3119 118734 : log_bat_group(logger *lg, log_id id)
3120 : {
3121 118734 : if (LOG_DISABLED(lg))
3122 : return GDK_SUCCEED;
3123 :
3124 76944 : logformat l;
3125 76944 : l.flag = LOG_BAT_GROUP;
3126 76944 : l.id = id;
3127 76944 : gdk_return r = log_write_format(lg, &l);
3128 76944 : return r;
3129 : }
3130 :
3131 : gdk_return
3132 59367 : log_bat_group_start(logger *lg, log_id id)
3133 : {
3134 : /*positive table id represent start of logged table */
3135 59367 : return log_bat_group(lg, id);
3136 : }
3137 :
3138 : gdk_return
3139 59367 : log_bat_group_end(logger *lg, log_id id)
3140 : {
3141 : /*negative table id represent end of logged table */
3142 59367 : return log_bat_group(lg, -id);
3143 : }
3144 :
3145 : gdk_return
3146 282959 : log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, lng total_cnt)
3147 : {
3148 282959 : log_lock(lg);
3149 282959 : gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0, total_cnt);
3150 282959 : log_unlock(lg);
3151 282959 : return r;
3152 : }
3153 :
3154 : gdk_return
3155 2819 : log_delta(logger *lg, BAT *uid, BAT *uval, log_id id)
3156 : {
3157 2819 : log_lock(lg);
3158 2819 : bte tpe = find_type(lg, uval->ttype);
3159 2819 : gdk_return ok = GDK_SUCCEED;
3160 2819 : logformat l;
3161 2819 : BUN p;
3162 2819 : lng nr;
3163 :
3164 2819 : if (BATtdense(uid)) {
3165 2735 : ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1, 0);
3166 2735 : log_unlock(lg);
3167 2735 : if (!LOG_DISABLED(lg) && ok != GDK_SUCCEED)
3168 0 : ATOMIC_DEC(&lg->current->refcount);
3169 2735 : return ok;
3170 : }
3171 :
3172 84 : assert(uid->ttype == TYPE_oid || uid->ttype == TYPE_void);
3173 :
3174 84 : l.flag = LOG_UPDATE;
3175 84 : l.id = id;
3176 84 : nr = (BATcount(uval));
3177 84 : assert(nr);
3178 :
3179 84 : if (LOG_DISABLED(lg)) {
3180 : /* logging is switched off */
3181 65 : log_unlock(lg);
3182 65 : return GDK_SUCCEED;
3183 : }
3184 :
3185 19 : BATiter vi = bat_iterator(uval);
3186 19 : gdk_return(*wh) (const void *, stream *, size_t) = BATatoms[TYPE_oid].atomWrite;
3187 19 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[uval->ttype].atomWrite;
3188 :
3189 19 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3190 38 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3191 38 : log_write_format(lg, &l) != GDK_SUCCEED ||
3192 38 : !mnstr_writeLng(lg->current->output_log, nr) ||
3193 19 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1) {
3194 0 : ok = GDK_FAIL;
3195 0 : goto bailout;
3196 : }
3197 1074 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3198 1055 : const oid id = BUNtoid(uid, p);
3199 :
3200 1055 : ok = wh(&id, lg->current->output_log, 1);
3201 : }
3202 19 : if (uval->ttype == TYPE_msk) {
3203 0 : if (!mnstr_writeIntArray(lg->current->output_log, vi.base,
3204 0 : (BATcount(uval) + 31) / 32))
3205 0 : ok = GDK_FAIL;
3206 32 : } else if (uval->ttype < TYPE_str && !isVIEW(uval)) {
3207 13 : const void *t = BUNtail(vi, 0);
3208 :
3209 13 : ok = wt(t, lg->current->output_log, (size_t) nr);
3210 6 : } else if (uval->ttype == TYPE_str) {
3211 : /* efficient string writes */
3212 6 : ok = string_writer(lg, uval, 0, nr);
3213 : } else {
3214 0 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3215 0 : const void *val = BUNtail(vi, p);
3216 :
3217 0 : ok = wt(val, lg->current->output_log, 1);
3218 : }
3219 : }
3220 :
3221 19 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
3222 :
3223 19 : bailout:
3224 19 : bat_iterator_end(&vi);
3225 19 : if (ok != GDK_SUCCEED) {
3226 0 : const char *err = mnstr_peek_error(lg->current->output_log);
3227 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
3228 0 : ATOMIC_DEC(&lg->current->refcount);
3229 : }
3230 19 : log_unlock(lg);
3231 19 : return ok;
3232 : }
3233 :
3234 : static inline bool
3235 56916 : check_rotation_conditions(logger *lg)
3236 : {
3237 56916 : if (LOG_DISABLED(lg))
3238 : return false;
3239 :
3240 56913 : if (lg->current->next)
3241 : return false; /* do not rotate if there is already a prepared next current */
3242 56913 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
3243 : return true;
3244 56913 : const lng current_file_size = (lng) getfilepos(getFile(lg->current->output_log));
3245 :
3246 56913 : if (current_file_size == -1)
3247 : return false;
3248 :
3249 56913 : assert(current_file_size >= 0);
3250 :
3251 56913 : if (current_file_size == 2)
3252 : return false;
3253 :
3254 1632 : bool res = (lg->saved_id + 1 >= lg->id && ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped) ||
3255 56110 : current_file_size > lg->max_file_size ||
3256 49962 : (GDKusec() - lg->file_age) > lg->max_file_age;
3257 :
3258 54486 : return res;
3259 : }
3260 :
3261 : gdk_return
3262 62650 : log_tend(logger *lg)
3263 : {
3264 62650 : TRC_DEBUG(WAL, "tend %d\n", lg->tid);
3265 :
3266 62650 : if (LOG_DISABLED(lg))
3267 : return GDK_SUCCEED;
3268 :
3269 56913 : gdk_return result;
3270 56913 : logformat l;
3271 56913 : l.flag = LOG_END;
3272 56913 : l.id = lg->tid;
3273 :
3274 56913 : if ((result = log_write_format(lg, &l)) == GDK_SUCCEED)
3275 56913 : ATOMIC_INC(&lg->nr_flushers);
3276 : return result;
3277 : }
3278 :
3279 : #define flush_lock(lg) MT_lock_set(&(lg)->flush_lock)
3280 : #define flush_unlock(lg) MT_lock_unset(&(lg)->flush_lock)
3281 :
3282 : static inline gdk_return
3283 56651 : do_flush(logged_range *range)
3284 : {
3285 : /* assumes flush lock */
3286 56651 : stream *output_log = range->output_log;
3287 56651 : ulng ts = ATOMIC_GET(&range->last_ts);
3288 :
3289 56651 : if (mnstr_flush(output_log, MNSTR_FLUSH_DATA) ||
3290 56651 : (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) && mnstr_fsync(output_log)))
3291 0 : return GDK_FAIL;
3292 56651 : ATOMIC_SET(&range->flushed_ts, ts);
3293 56651 : return GDK_SUCCEED;
3294 : }
3295 :
3296 : static inline void
3297 62647 : log_tdone(logger *lg, logged_range *range, ulng commit_ts)
3298 : {
3299 62647 : (void) lg;
3300 62647 : TRC_DEBUG(WAL, "tdone " LLFMT "\n", commit_ts);
3301 :
3302 62647 : if ((ulng) ATOMIC_GET(&range->last_ts) < commit_ts)
3303 62385 : ATOMIC_SET(&range->last_ts, commit_ts);
3304 62647 : }
3305 :
3306 : gdk_return
3307 62650 : log_tflush(logger *lg, ulng file_id, ulng commit_ts)
3308 : {
3309 62650 : rotation_lock(lg);
3310 62650 : if (lg->flushnow) {
3311 5734 : logged_range *p = lg->current;
3312 5734 : assert(lg->flush_ranges == lg->current);
3313 5734 : assert(ATOMIC_GET(&lg->current->flushed_ts) == ATOMIC_GET(&lg->current->last_ts));
3314 5734 : log_tdone(lg, lg->current, commit_ts);
3315 5734 : ATOMIC_SET(&lg->current->flushed_ts, commit_ts);
3316 5734 : lg->id++;
3317 5734 : lg->flushnow = false;
3318 5734 : if (log_open_output(lg) != GDK_SUCCEED)
3319 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3320 5734 : do_rotate(lg);
3321 5734 : (void) do_flush_range_cleanup(lg);
3322 5734 : assert(lg->flush_ranges == lg->current);
3323 5734 : rotation_unlock(lg);
3324 5734 : return log_commit(lg, p, NULL, 0);
3325 : }
3326 :
3327 56916 : if (LOG_DISABLED(lg)) {
3328 3 : rotation_unlock(lg);
3329 3 : return GDK_SUCCEED;
3330 : }
3331 :
3332 56913 : logged_range *frange = do_flush_range_cleanup(lg);
3333 :
3334 56963 : while (frange->next && frange->id < file_id) {
3335 : assert(frange->next);
3336 : frange = frange->next;
3337 : }
3338 :
3339 56913 : log_tdone(lg, frange, commit_ts);
3340 :
3341 56913 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts) {
3342 : /* delay needed ? */
3343 :
3344 56651 : flush_lock(lg);
3345 : /* check it one more time */
3346 56651 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts)
3347 56651 : do_flush(frange);
3348 56651 : flush_unlock(lg);
3349 : }
3350 : /* else somebody else has flushed our log file */
3351 :
3352 56913 : if (ATOMIC_DEC(&frange->refcount) == 1 && !LOG_DISABLED(lg)) {
3353 55928 : if (frange != lg->current && frange->output_log) {
3354 109 : close_stream(frange->output_log);
3355 109 : frange->output_log = NULL;
3356 : }
3357 : }
3358 :
3359 56913 : if (ATOMIC_DEC(&lg->nr_flushers) == 0) {
3360 : /* I am the last flusher
3361 : * if present,
3362 : * wake up the exclusive flusher in log_tstart */
3363 : /* rotation_lock is still being held */
3364 56498 : MT_cond_signal(&lg->excl_flush_cv);
3365 : }
3366 56913 : rotation_unlock(lg);
3367 :
3368 56913 : return GDK_SUCCEED;
3369 : }
3370 :
3371 : static gdk_return
3372 7053 : log_tsequence_(logger *lg, int seq, lng val)
3373 : {
3374 7053 : logformat l;
3375 :
3376 7053 : if (LOG_DISABLED(lg))
3377 : return GDK_SUCCEED;
3378 2861 : l.flag = LOG_SEQ;
3379 2861 : l.id = seq;
3380 :
3381 2861 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3382 :
3383 2861 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3384 5722 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3385 5722 : log_write_format(lg, &l) != GDK_SUCCEED ||
3386 2861 : !mnstr_writeLng(lg->current->output_log, val)) {
3387 0 : TRC_CRITICAL(GDK, "write failed\n");
3388 0 : ATOMIC_DEC(&lg->current->refcount);
3389 0 : return GDK_FAIL;
3390 : }
3391 : return GDK_SUCCEED;
3392 : }
3393 :
3394 : /* a transaction in it self */
3395 : gdk_return
3396 7053 : log_tsequence(logger *lg, int seq, lng val)
3397 : {
3398 7053 : BUN p;
3399 :
3400 7053 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3401 :
3402 7053 : log_lock(lg);
3403 7053 : MT_lock_set(&lg->seqs_id->theaplock);
3404 7053 : BUN inserted = lg->seqs_id->batInserted;
3405 7053 : MT_lock_unset(&lg->seqs_id->theaplock);
3406 7053 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE && p >= inserted) {
3407 1680 : assert(lg->seqs_val->hseqbase == 0);
3408 1680 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
3409 0 : log_unlock(lg);
3410 0 : return GDK_FAIL;
3411 : }
3412 : } else {
3413 4989 : if (p != BUN_NONE) {
3414 4989 : oid pos = p;
3415 4989 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
3416 0 : log_unlock(lg);
3417 0 : return GDK_FAIL;
3418 : }
3419 : }
3420 10746 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
3421 5373 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
3422 0 : log_unlock(lg);
3423 0 : return GDK_FAIL;
3424 : }
3425 : }
3426 7053 : gdk_return r = log_tsequence_(lg, seq, val);
3427 7053 : log_unlock(lg);
3428 7053 : return r;
3429 : }
3430 :
3431 : static gdk_return
3432 12412 : bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
3433 : {
3434 12412 : log_lock(lg);
3435 12412 : BAT *b = lg->catalog_bid;
3436 12412 : const log_bid *bids;
3437 :
3438 12412 : bids = (log_bid *) Tloc(b, 0);
3439 250721 : for (BUN p = b->batInserted, cnt = pending ? pending->cnt : BATcount(b); p < cnt; p++) {
3440 238309 : log_bid bid = bids[p];
3441 238309 : BAT *lb = BBP_desc(bid);
3442 :
3443 238309 : assert(bid);
3444 238309 : if (lb->batCacheid == 0 || BATmode(lb, false) != GDK_SUCCEED) {
3445 0 : GDKwarning("Failed to set bat (%d%s) persistent\n", bid, !lb ? " gone" : "");
3446 0 : log_unlock(lg);
3447 0 : return GDK_FAIL;
3448 : }
3449 :
3450 238309 : assert(lb->batRestricted != BAT_WRITE);
3451 :
3452 238309 : TRC_DEBUG(WAL, "create %d (%d)\n", bid, BBP_lrefs(bid));
3453 : }
3454 : /* bm_subcommit releases the lock */
3455 12412 : return bm_subcommit(lg, pending, updated, maxupdated);
3456 : }
3457 :
3458 : static gdk_return
3459 238525 : log_add_bat(logger *lg, BAT *b, log_id id, int tid)
3460 : {
3461 238525 : log_bid bid = internal_find_bat(lg, id, tid);
3462 238525 : lng cnt = 0;
3463 238525 : lng lid = lng_nil;
3464 :
3465 238525 : assert(b->batRestricted != BAT_WRITE);
3466 238525 : assert(b->batRole == PERSISTENT);
3467 238525 : if (bid < 0)
3468 : return GDK_FAIL;
3469 238525 : if (bid) {
3470 155399 : if (bid != b->batCacheid) {
3471 155396 : if (log_del_bat(lg, bid) != GDK_SUCCEED)
3472 : return GDK_FAIL;
3473 : } else {
3474 : return GDK_SUCCEED;
3475 : }
3476 : }
3477 238522 : bid = b->batCacheid;
3478 238522 : TRC_DEBUG(WAL, "create %d\n", id);
3479 238522 : assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
3480 477044 : if (BUNappend(lg->catalog_bid, &bid, true) != GDK_SUCCEED ||
3481 477044 : BUNappend(lg->catalog_id, &id, true) != GDK_SUCCEED ||
3482 477044 : BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED ||
3483 238522 : BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
3484 0 : return GDK_FAIL;
3485 238522 : if (lg->current)
3486 238255 : lg->current->cnt++;
3487 238522 : BBPretain(bid);
3488 238522 : return GDK_SUCCEED;
3489 : }
3490 :
3491 : static gdk_return
3492 177905 : log_del_bat(logger *lg, log_bid bid)
3493 : {
3494 177905 : BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid);
3495 177905 : lng lid = lg->tid;
3496 :
3497 177905 : assert(p != BUN_NONE);
3498 177905 : if (p == BUN_NONE) {
3499 : GDKerror("cannot find BAT\n");
3500 : return GDK_FAIL;
3501 : }
3502 :
3503 177905 : assert(lg->catalog_lid->hseqbase == 0);
3504 177905 : return BUNreplace(lg->catalog_lid, p, &lid, false);
3505 : }
3506 :
3507 : /* returns -1 on failure, 0 when not found, > 0 when found */
3508 : log_bid
3509 35259 : log_find_bat(logger *lg, log_id id)
3510 : {
3511 35259 : log_lock(lg);
3512 35259 : log_bid bid = internal_find_bat(lg, id, -1);
3513 35259 : log_unlock(lg);
3514 35259 : if (!bid) {
3515 0 : GDKerror("logger_find_bat failed to find bid for object %d\n", id);
3516 0 : return GDK_FAIL;
3517 : }
3518 : return bid;
3519 : }
3520 :
3521 :
3522 :
3523 : gdk_return
3524 62651 : log_tstart(logger *lg, bool flushnow, ulng *file_id)
3525 : {
3526 62651 : rotation_lock(lg);
3527 62651 : if (flushnow) {
3528 5735 : if (file_id == NULL) {
3529 : /* special case: ask store_manager to rotate log file */
3530 1 : lg->file_age = 0;
3531 1 : rotation_unlock(lg);
3532 1 : return GDK_SUCCEED;
3533 : }
3534 : /* I am now the exclusive flusher */
3535 5734 : while (ATOMIC_GET(&lg->nr_flushers)) {
3536 : /* I am waiting until all existing flushers are done */
3537 0 : MT_cond_wait(&lg->excl_flush_cv, &lg->rotation_lock);
3538 : }
3539 5734 : assert(ATOMIC_GET(&lg->nr_flushers) == 0);
3540 :
3541 5734 : if (ATOMIC_GET(&lg->current->last_ts)) {
3542 2243 : lg->id++;
3543 2243 : if (log_open_output(lg) != GDK_SUCCEED)
3544 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3545 : }
3546 5734 : do_rotate(lg);
3547 5734 : (void) do_flush_range_cleanup(lg);
3548 5734 : rotation_unlock(lg);
3549 :
3550 5734 : if (lg->saved_id + 1 < lg->id)
3551 5307 : log_flush(lg, (1ULL << 63));
3552 5734 : lg->flushnow = flushnow;
3553 : } else {
3554 56916 : if (check_rotation_conditions(lg)) {
3555 4525 : lg->id++;
3556 4525 : if (log_open_output(lg) != GDK_SUCCEED)
3557 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3558 : }
3559 56916 : do_rotate(lg);
3560 56916 : rotation_unlock(lg);
3561 : }
3562 :
3563 62650 : if (LOG_DISABLED(lg))
3564 : return GDK_SUCCEED;
3565 :
3566 56913 : ATOMIC_INC(&lg->current->refcount);
3567 56913 : *file_id = lg->current->id;
3568 56913 : logformat l;
3569 56913 : l.flag = LOG_START;
3570 56913 : l.id = ++lg->tid;
3571 :
3572 56913 : TRC_DEBUG(WAL, "tstart %d\n", lg->tid);
3573 56913 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3574 0 : ATOMIC_DEC(&lg->current->refcount);
3575 0 : return GDK_FAIL;
3576 : }
3577 :
3578 : return GDK_SUCCEED;
3579 : }
3580 :
3581 : void
3582 119 : log_printinfo(logger *lg)
3583 : {
3584 119 : if (!rotation_trylock(lg, 1000)) {
3585 0 : printf("Logger is currently locked, so no logger information\n");
3586 0 : return;
3587 : }
3588 119 : printf("logger %s:\n", lg->fn);
3589 119 : printf("current log file "ULLFMT", last handled log file "ULLFMT"\n",
3590 : lg->id, lg->saved_id);
3591 119 : printf("current transaction id %d, saved transaction id %d\n",
3592 : lg->tid, lg->saved_tid);
3593 119 : printf("number of flushers: %d\n", (int) ATOMIC_GET(&lg->nr_flushers));
3594 119 : printf("number of catalog entries "BUNFMT", of which "BUNFMT" deleted\n",
3595 119 : lg->catalog_bid->batCount, lg->dcatalog->batCount);
3596 336 : for (logged_range *p = lg->pending; p; p = p->next) {
3597 217 : char buf[32];
3598 217 : if ((lg->debug & 128 || lg->inmemory) ||
3599 217 : p->output_log == NULL ||
3600 119 : snprintf(buf, sizeof(buf), ", file size %"PRIu64, (uint64_t) getfilepos(getFile(lg->current->output_log))) >= (int) sizeof(buf))
3601 98 : buf[0] = 0;
3602 315 : printf("pending range "ULLFMT": drops %"PRIu64", last_ts %"PRIu64", flushed_ts %"PRIu64", refcount %"PRIu64"%s%s\n", p->id, (uint64_t) ATOMIC_GET(&p->drops), (uint64_t) ATOMIC_GET(&p->last_ts), (uint64_t) ATOMIC_GET(&p->flushed_ts), (uint64_t) ATOMIC_GET(&p->refcount), buf, p == lg->current ? " (current)" : "");
3603 : }
3604 119 : rotation_unlock(lg);
3605 : }
|