Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 : #include "gdk_logger.h"
17 : #include "gdk_logger_internals.h"
18 : #include "mutils.h"
19 : #include <string.h>
20 :
21 : static gdk_return log_add_bat(logger *lg, BAT *b, log_id id, int tid);
22 : static gdk_return log_del_bat(logger *lg, log_bid bid);
23 : /*
24 : * The logger uses a directory to store its log files. One master log
25 : * file stores information about the version of the logger and the
26 : * type mapping it uses. This file is a simple ascii file with the
27 : * following format:
28 : * {6DIGIT-VERSION\n[id,type_name\n]*}
29 : * The transaction log files have a binary format.
30 : */
31 :
32 : #define LOG_START 0
33 : #define LOG_END 1
34 : #define LOG_UPDATE_CONST 2
35 : #define LOG_UPDATE_BULK 3
36 : #define LOG_UPDATE 4
37 : #define LOG_CREATE 5
38 : #define LOG_DESTROY 6
39 : #define LOG_SEQ 7
40 : #define LOG_CLEAR 8 /* DEPRECATED */
41 : #define LOG_BAT_GROUP 9
42 :
43 : #ifdef NATIVE_WIN32
44 : #define getfilepos _ftelli64
45 : #else
46 : #ifdef HAVE_FSEEKO
47 : #define getfilepos ftello
48 : #else
49 : #define getfilepos ftell
50 : #endif
51 : #endif
52 :
53 : #define BATSIZE 0
54 :
55 : #define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory || (lg)->flushnow)
56 :
57 : static const char *log_commands[] = {
58 : "LOG_START",
59 : "LOG_END",
60 : "LOG_UPDATE_CONST",
61 : "LOG_UPDATE_BULK",
62 : "LOG_UPDATE",
63 : "LOG_CREATE",
64 : "LOG_DESTROY",
65 : "LOG_SEQ",
66 : "", /* LOG_CLEAR IS DEPRECATED */
67 : "LOG_BAT_GROUP",
68 : };
69 :
70 : typedef struct logaction {
71 : int type; /* type of change */
72 : lng nr;
73 : int tt;
74 : lng id;
75 : lng offset;
76 : log_id cid; /* id of object */
77 : BAT *b; /* temporary bat with changes */
78 : BAT *uid; /* temporary bat with bun positions to update */
79 : } logaction;
80 :
81 : /* during the recover process a number of transactions could be active */
82 : typedef struct trans {
83 : int tid; /* transaction id */
84 : int sz; /* sz of the changes array */
85 : int nr; /* nr of changes */
86 :
87 : logaction *changes;
88 :
89 : struct trans *tr;
90 : } trans;
91 :
92 : typedef struct logformat_t {
93 : bte flag;
94 : int id;
95 : } logformat;
96 :
97 : typedef enum { LOG_OK, LOG_EOF, LOG_ERR } log_return;
98 :
99 : static gdk_return bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated);
100 : static gdk_return tr_grow(trans *tr);
101 :
102 : #define log_lock(lg) MT_lock_set(&(lg)->lock)
103 : #define log_unlock(lg) MT_lock_unset(&(lg)->lock)
104 :
105 : static inline bte
106 934022 : find_type(logger *lg, int tpe)
107 : {
108 934022 : assert(tpe >= 0 && tpe < MAXATOMS);
109 934022 : return lg->type_id[tpe];
110 : }
111 :
112 : static inline int
113 548127 : find_type_nr(logger *lg, bte tpe)
114 : {
115 548127 : int nr = lg->type_nr[tpe < 0 ? 256 + tpe : tpe];
116 548127 : if (nr == 255)
117 0 : return -1;
118 : return nr;
119 : }
120 :
121 : static BUN
122 421384 : log_find(BAT *b, BAT *d, int val)
123 : {
124 421384 : BUN p;
125 :
126 421384 : assert(b->ttype == TYPE_int);
127 421384 : assert(d->ttype == TYPE_oid);
128 421384 : BATiter bi = bat_iterator(b);
129 421384 : if (BAThash(b) == GDK_SUCCEED) {
130 421384 : MT_rwlock_rdlock(&b->thashlock);
131 699763 : HASHloop_int(bi, b->thash, p, &val) {
132 184534 : oid pos = p;
133 184534 : if (BUNfnd(d, &pos) == BUN_NONE) {
134 184534 : MT_rwlock_rdunlock(&b->thashlock);
135 184534 : bat_iterator_end(&bi);
136 184534 : return p;
137 : }
138 : }
139 236850 : MT_rwlock_rdunlock(&b->thashlock);
140 : } else { /* unlikely: BAThash failed */
141 0 : int *t = (int *) bi.base;
142 :
143 0 : for (p = 0; p < bi.count; p++) {
144 0 : if (t[p] == val) {
145 0 : oid pos = p;
146 0 : if (BUNfnd(d, &pos) == BUN_NONE) {
147 0 : bat_iterator_end(&bi);
148 0 : return p;
149 : }
150 : }
151 : }
152 : }
153 236850 : bat_iterator_end(&bi);
154 236850 : return BUN_NONE;
155 : }
156 :
157 : static log_bid
158 649065 : internal_find_bat(logger *lg, log_id id, int tid)
159 : {
160 649065 : BUN p;
161 :
162 649065 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
163 649065 : BATiter cni = bat_iterator(lg->catalog_id);
164 649065 : MT_rwlock_rdlock(&cni.b->thashlock);
165 649065 : if (tid < 0) {
166 395942 : HASHloop_int(cni, cni.b->thash, p, &id) {
167 205205 : oid pos = p;
168 205205 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
169 205205 : MT_rwlock_rdunlock(&cni.b->thashlock);
170 205205 : bat_iterator_end(&cni);
171 205205 : return *(log_bid *) Tloc(lg->catalog_bid, p);
172 : }
173 : }
174 : } else {
175 363163 : BUN cp = BUN_NONE;
176 1682918 : HASHloop_int(cni, cni.b->thash, p, &id) {
177 1279073 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
178 1279073 : if (lid != lng_nil && lid <= tid) {
179 : break;
180 : }
181 : cp = p;
182 : }
183 363163 : if (cp != BUN_NONE) {
184 362918 : MT_rwlock_rdunlock(&cni.b->thashlock);
185 362918 : bat_iterator_end(&cni);
186 362918 : return *(log_bid *) Tloc(lg->catalog_bid, cp);
187 : }
188 : }
189 80942 : MT_rwlock_rdunlock(&cni.b->thashlock);
190 80942 : bat_iterator_end(&cni);
191 80942 : return 0; /* not found */
192 : }
193 : return -1; /* error creating hash */
194 : }
195 :
196 : static inline void
197 14166 : logbat_destroy(BAT *b)
198 : {
199 17676 : BBPreclaim(b);
200 3427 : }
201 :
202 : static BAT *
203 15102 : logbat_new(int tt, BUN size, role_t role)
204 : {
205 15102 : BAT *nb = COLnew(0, tt, size, role);
206 :
207 15102 : if (nb) {
208 15102 : BBP_pid(nb->batCacheid) = 0;
209 15102 : if (role == PERSISTENT) {
210 9488 : BATmode(nb, false);
211 9488 : nb = BATsetaccess(nb, BAT_READ);
212 : }
213 : } else {
214 0 : TRC_CRITICAL(GDK, "creating new BAT[%s]#" BUNFMT " failed\n", ATOMname(tt), size);
215 : }
216 15102 : return nb;
217 : }
218 :
219 : static bool
220 760599 : log_read_format(logger *lg, logformat *data)
221 : {
222 760599 : assert(!lg->inmemory);
223 760599 : if (mnstr_read(lg->input_log, &data->flag, 1, 1) == 1) {
224 748225 : if (mnstr_readInt(lg->input_log, &data->id) == 1)
225 : return true;
226 : /* could only read part, so complain */
227 0 : TRC_CRITICAL(GDK, "read failed\n");
228 : }
229 : return false;
230 : }
231 :
232 : static gdk_return
233 755007 : log_write_format(logger *lg, logformat *data)
234 : {
235 755007 : assert(data->id || data->flag);
236 755007 : assert(!lg->inmemory);
237 755007 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
238 1510014 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
239 1510014 : mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
240 755007 : mnstr_writeInt(lg->current->output_log, data->id))
241 : return GDK_SUCCEED;
242 0 : TRC_CRITICAL(GDK, "write failed\n");
243 0 : return GDK_FAIL;
244 : }
245 :
246 : static log_return
247 2760 : log_read_seq(logger *lg, logformat *l)
248 : {
249 2760 : int seq = l->id;
250 2760 : lng val;
251 2760 : BUN p;
252 :
253 2760 : assert(!lg->inmemory);
254 2760 : if (mnstr_readLng(lg->input_log, &val) != 1) {
255 0 : TRC_CRITICAL(GDK, "read failed\n");
256 0 : return LOG_EOF;
257 : }
258 2760 : if (lg->flushing)
259 : return LOG_OK;
260 :
261 60 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
262 59 : p >= lg->seqs_id->batInserted) {
263 39 : assert(lg->seqs_val->hseqbase == 0);
264 39 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
265 0 : TRC_CRITICAL(GDK, "replace of %s_seqs_val failed\n", lg->fn);
266 0 : return LOG_ERR;
267 : }
268 : } else {
269 20 : if (p != BUN_NONE) {
270 20 : oid pos = p;
271 20 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
272 0 : TRC_CRITICAL(GDK, "append to %s_dseqs failed\n", lg->fn);
273 0 : return LOG_ERR;
274 : }
275 : }
276 42 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
277 21 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
278 0 : TRC_CRITICAL(GDK, "append to %s_seqs_val/id failed\n", lg->fn);
279 0 : return LOG_ERR;
280 : }
281 : }
282 : return LOG_OK;
283 : }
284 :
285 : #if 0
286 : static gdk_return
287 : log_write_id(logger *lg, int id)
288 : {
289 : assert(!lg->inmemory);
290 : assert(id >= 0);
291 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
292 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
293 : mnstr_writeInt(lg->current->output_log, id))
294 : return GDK_SUCCEED;
295 : TRC_CRITICAL(GDK, "write failed\n");
296 : return GDK_FAIL;
297 : }
298 :
299 : static log_return
300 : log_read_id(logger *lg, log_id *id)
301 : {
302 : assert(!lg->inmemory);
303 : if (mnstr_readInt(lg->input_log, id) != 1) {
304 : TRC_CRITICAL(GDK, "read failed\n");
305 : return LOG_EOF;
306 : }
307 : return LOG_OK;
308 : }
309 : #endif
310 :
311 : static log_return
312 19165 : string_reader(logger *lg, BAT *b, lng nr)
313 : {
314 19165 : size_t sz = 0;
315 19165 : lng SZ = 0;
316 19165 : log_return res = LOG_OK;
317 :
318 38445 : while (nr && res == LOG_OK) {
319 19280 : if (mnstr_readLng(lg->input_log, &SZ) != 1) {
320 0 : TRC_CRITICAL(GDK, "read failed\n");
321 0 : return LOG_EOF;
322 : }
323 19280 : sz = (size_t) SZ;
324 19280 : char *buf = lg->rbuf;
325 19280 : if (lg->rbufsize < sz) {
326 2 : if (!(buf = GDKrealloc(lg->rbuf, sz))) {
327 0 : TRC_CRITICAL(GDK, "couldn't grow string buffer\n");
328 0 : return LOG_ERR;
329 : }
330 2 : lg->rbuf = buf;
331 2 : lg->rbufsize = sz;
332 : }
333 :
334 19280 : if (mnstr_read(lg->input_log, buf, sz, 1) != 1) {
335 0 : TRC_CRITICAL(GDK, "read failed\n");
336 0 : return LOG_EOF;
337 : }
338 : /* handle strings */
339 : char *t = buf;
340 : /* chunked */
341 : #define CHUNK_SIZE 1024
342 : char *strings[CHUNK_SIZE];
343 : int cur = 0;
344 :
345 73053 : for (; nr > 0 && res == LOG_OK && t < (buf + sz); nr--) {
346 53773 : strings[cur++] = t;
347 53773 : if (cur == CHUNK_SIZE &&
348 4 : b &&
349 4 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
350 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
351 0 : res = LOG_ERR;
352 : }
353 53773 : if (cur == CHUNK_SIZE)
354 14 : cur = 0;
355 : /* find next */
356 5557801 : while (*t)
357 5504028 : t++;
358 53773 : t++;
359 : }
360 19280 : if (cur &&
361 294 : b &&
362 294 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
363 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
364 0 : res = LOG_ERR;
365 : }
366 : }
367 : return res;
368 : }
369 :
370 :
371 : struct offset {
372 : lng os; /* offset within source BAT in logfile */
373 : lng nr; /* number of values to be copied */
374 : lng od; /* offset within destination BAT in database */
375 : };
376 :
377 : static log_return
378 391129 : log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, BAT **cands)
379 : {
380 391129 : log_return res = LOG_OK;
381 391129 : lng nr, pnr;
382 391129 : bte type_id = -1;
383 391129 : int tpe;
384 :
385 391129 : assert(!lg->inmemory);
386 391129 : TRC_DEBUG(WAL, "found %d %s", id, l->flag == LOG_UPDATE ? "update" : "update_buld");
387 :
388 782258 : if (mnstr_readLng(lg->input_log, &nr) != 1 ||
389 391129 : mnstr_read(lg->input_log, &type_id, 1, 1) != 1) {
390 0 : TRC_CRITICAL(GDK, "read failed\n");
391 0 : return LOG_EOF;
392 : }
393 :
394 391129 : pnr = nr;
395 391129 : tpe = find_type_nr(lg, type_id);
396 391129 : if (tpe >= 0) {
397 391129 : BAT *uid = NULL;
398 391129 : BAT *r = NULL;
399 391129 : void *(*rt)(ptr, size_t *, stream *, size_t) = BATatoms[tpe].atomRead;
400 391129 : lng offset;
401 :
402 391129 : assert(nr <= (lng) BUN_MAX);
403 391129 : if (!lg->flushing && l->flag == LOG_UPDATE) {
404 0 : uid = COLnew(0, TYPE_oid, (BUN) nr, PERSISTENT);
405 0 : if (uid == NULL) {
406 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
407 28259 : return LOG_ERR;
408 : }
409 : }
410 :
411 391129 : if (l->flag == LOG_UPDATE_CONST) {
412 122922 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
413 0 : TRC_CRITICAL(GDK, "read failed\n");
414 0 : return LOG_EOF;
415 : }
416 122922 : if (cands) {
417 : /* This const range actually represents a segment of candidates corresponding to updated bat entries */
418 :
419 28259 : if (BATcount(*cands) == 0 || lg->flushing) {
420 : /* when flushing, we only need the offset and count of the last segment of inserts. */
421 28259 : assert((*cands)->ttype == TYPE_void);
422 28259 : BATtseqbase(*cands, (oid) offset);
423 28259 : BATsetcount(*cands, (BUN) nr);
424 0 : } else if (!lg->flushing) {
425 0 : assert(BATcount(*cands) > 0);
426 0 : BAT *dense = BATdense(0, (oid) offset, (BUN) nr);
427 0 : BAT *newcands = NULL;
428 0 : if (!dense) {
429 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
430 0 : res = LOG_ERR;
431 0 : } else if ((*cands)->ttype == TYPE_void) {
432 0 : if ((newcands = BATmergecand(*cands, dense))) {
433 0 : BBPreclaim(*cands);
434 0 : *cands = newcands;
435 : } else {
436 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
437 0 : res = LOG_ERR;
438 : }
439 : } else {
440 0 : assert((*cands)->ttype == TYPE_oid);
441 0 : assert(BATcount(*cands) > 0);
442 0 : if (BATappend(*cands, dense, NULL, true) != GDK_SUCCEED) {
443 0 : TRC_CRITICAL(GDK, "appending to bat failed\n");
444 0 : res = LOG_ERR;
445 : }
446 : }
447 0 : BBPreclaim(dense);
448 : }
449 :
450 : /* We have to read the value to update the read cursor */
451 28259 : size_t tlen = lg->rbufsize;
452 28259 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
453 28259 : if (t == NULL) {
454 0 : TRC_CRITICAL(GDK, "read failed\n");
455 0 : res = LOG_EOF;
456 : }
457 28259 : return res;
458 : }
459 : }
460 :
461 362870 : if (!lg->flushing) {
462 1258 : r = COLnew(0, tpe, (BUN) nr, PERSISTENT);
463 1258 : if (r == NULL) {
464 0 : if (uid)
465 0 : BBPreclaim(uid);
466 0 : return LOG_ERR;
467 : }
468 : }
469 :
470 362870 : if (l->flag == LOG_UPDATE_CONST) {
471 94663 : size_t tlen = lg->rbufsize;
472 94663 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
473 94663 : if (t == NULL) {
474 0 : TRC_CRITICAL(GDK, "read failed\n");
475 0 : res = LOG_EOF;
476 : } else {
477 94663 : lg->rbuf = t;
478 94663 : lg->rbufsize = tlen;
479 94663 : if (r) {
480 8130 : for (BUN p = 0; p < (BUN) nr; p++) {
481 7886 : if (BUNappend(r, t, true) != GDK_SUCCEED) {
482 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
483 0 : res = LOG_ERR;
484 : }
485 : }
486 : }
487 : }
488 268207 : } else if (l->flag == LOG_UPDATE_BULK) {
489 268189 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
490 0 : if (r)
491 0 : BBPreclaim(r);
492 0 : TRC_CRITICAL(GDK, "read failed\n");
493 0 : return LOG_EOF;
494 : }
495 268189 : if (tpe == TYPE_msk) {
496 0 : if (r) {
497 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
498 0 : BATsetcount(r, (BUN) nr);
499 : else {
500 0 : TRC_CRITICAL(GDK, "read failed\n");
501 0 : res = LOG_EOF;
502 : }
503 : } else {
504 0 : size_t tlen = lg->rbufsize / sizeof(int);
505 0 : size_t cnt = 0, snr = (size_t) nr;
506 0 : snr = (snr + 31) / 32;
507 0 : assert(tlen);
508 0 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
509 0 : cnt = snr > tlen ? tlen : snr;
510 0 : if (!mnstr_readIntArray(lg->input_log, lg->rbuf, cnt)) {
511 0 : TRC_CRITICAL(GDK, "read failed\n");
512 0 : res = LOG_EOF;
513 : }
514 : }
515 : }
516 : } else {
517 268189 : if (!ATOMvarsized(tpe)) {
518 248448 : size_t cnt = 0, snr = (size_t) nr;
519 248448 : size_t tlen = lg->rbufsize / ATOMsize(tpe), ntlen = lg->rbufsize;
520 248448 : assert(tlen);
521 : /* read in chunks of max
522 : * BUFSIZE/width rows */
523 509564 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
524 261116 : cnt = snr > tlen ? tlen : snr;
525 261116 : void *t = rt(lg->rbuf, &ntlen, lg->input_log, cnt);
526 :
527 261116 : if (t == NULL) {
528 : res = LOG_EOF;
529 : break;
530 : }
531 261116 : assert(t == lg->rbuf);
532 261116 : if (r && BUNappendmulti(r, t, cnt, true) != GDK_SUCCEED) {
533 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
534 0 : res = LOG_ERR;
535 : }
536 : }
537 19741 : } else if (tpe == TYPE_str) {
538 : /* efficient string */
539 19159 : res = string_reader(lg, r, nr);
540 : } else {
541 1206 : for (; res == LOG_OK && nr > 0; nr--) {
542 624 : size_t tlen = lg->rbufsize;
543 624 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
544 :
545 624 : if (t == NULL) {
546 : /* see if failure was due to
547 : * malloc or something less
548 : * serious (in the current
549 : * context) */
550 0 : if (strstr(GDKerrbuf, "alloc") == NULL)
551 : res = LOG_EOF;
552 : else
553 0 : res = LOG_ERR;
554 0 : TRC_CRITICAL(GDK, "read failed\n");
555 : } else {
556 624 : lg->rbuf = t;
557 624 : lg->rbufsize = tlen;
558 624 : if (r && BUNappend(r, t, true) != GDK_SUCCEED) {
559 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
560 0 : res = LOG_ERR;
561 : }
562 : }
563 : }
564 : }
565 : }
566 : } else {
567 18 : void *(*rh)(ptr, size_t *, stream *, size_t) = BATatoms[TYPE_oid].atomRead;
568 18 : void *hv = ATOMnil(TYPE_oid);
569 18 : offset = 0;
570 :
571 18 : if (hv == NULL) {
572 0 : TRC_CRITICAL(GDK, "read failed\n");
573 0 : res = LOG_EOF;
574 : }
575 1071 : for (; res == LOG_OK && nr > 0; nr--) {
576 1053 : size_t hlen = sizeof(oid);
577 1053 : void *h = rh(hv, &hlen, lg->input_log, 1);
578 1053 : assert(hlen == sizeof(oid));
579 1053 : assert(h == hv);
580 1053 : if ((uid && BUNappend(uid, h, true) != GDK_SUCCEED)) {
581 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
582 0 : res = LOG_ERR;
583 : }
584 : }
585 18 : nr = pnr;
586 18 : if (tpe == TYPE_msk) {
587 0 : if (r) {
588 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
589 0 : BATsetcount(r, (BUN) nr);
590 : else {
591 0 : TRC_CRITICAL(GDK, "read failed\n");
592 0 : res = LOG_EOF;
593 : }
594 : } else {
595 0 : for (lng i = 0; i < nr; i += 32) {
596 0 : int v;
597 0 : switch (mnstr_readInt(lg->input_log, &v)) {
598 0 : case 1:
599 0 : continue;
600 : case 0:
601 : res = LOG_EOF;
602 : break;
603 : default:
604 : res = LOG_ERR;
605 : break;
606 : }
607 0 : TRC_CRITICAL(GDK, "read failed\n");
608 0 : break;
609 : }
610 : }
611 18 : } else if (tpe == TYPE_str) {
612 : /* efficient string */
613 6 : res = string_reader(lg, r, nr);
614 : } else {
615 58 : for (; res == LOG_OK && nr > 0; nr--) {
616 46 : size_t tlen = lg->rbufsize;
617 46 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
618 :
619 46 : if (t == NULL) {
620 0 : if (strstr(GDKerrbuf, "malloc") == NULL)
621 : res = LOG_EOF;
622 : else
623 0 : res = LOG_ERR;
624 0 : TRC_CRITICAL(GDK, "read failed\n");
625 : } else {
626 46 : lg->rbuf = t;
627 46 : lg->rbufsize = tlen;
628 46 : if ((r && BUNappend(r, t, true) != GDK_SUCCEED)) {
629 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
630 0 : res = LOG_ERR;
631 : }
632 : }
633 : }
634 : }
635 18 : GDKfree(hv);
636 : }
637 :
638 362870 : if (res == LOG_OK) {
639 362870 : if (tr_grow(tr) == GDK_SUCCEED) {
640 362870 : tr->changes[tr->nr].type = l->flag;
641 362870 : if (l->flag == LOG_UPDATE_BULK && offset == -1) {
642 139236 : assert(cands); /* bat r is part of a group of bats logged together. */
643 139236 : struct canditer ci;
644 139236 : canditer_init(&ci, NULL, *cands);
645 139236 : const oid first = canditer_peek(&ci);
646 139236 : const oid last = canditer_last(&ci);
647 139236 : offset = (lng) first;
648 139236 : pnr = (lng) (last - first) + 1;
649 139236 : if (!lg->flushing) {
650 911 : assert(uid == NULL);
651 911 : uid = *cands;
652 911 : BBPfix((*cands)->batCacheid);
653 911 : tr->changes[tr->nr].type = LOG_UPDATE;
654 : }
655 : }
656 362870 : if (l->flag == LOG_UPDATE_CONST) {
657 94663 : assert(!cands); /* TODO: This might change in the future. */
658 94663 : tr->changes[tr->nr].type = LOG_UPDATE_BULK;
659 : }
660 362870 : tr->changes[tr->nr].nr = pnr;
661 362870 : tr->changes[tr->nr].tt = tpe;
662 362870 : tr->changes[tr->nr].cid = id;
663 362870 : tr->changes[tr->nr].offset = offset;
664 362870 : tr->changes[tr->nr].b = r;
665 362870 : tr->changes[tr->nr].uid = uid;
666 362870 : tr->nr++;
667 : } else {
668 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
669 0 : res = LOG_ERR;
670 : }
671 : }
672 362870 : if (res != LOG_OK) {
673 0 : if (r)
674 0 : BBPreclaim(r);
675 0 : if (cands && uid)
676 0 : BBPunfix((*cands)->batCacheid);
677 0 : else if (uid)
678 0 : BBPreclaim(uid);
679 : }
680 : } else {
681 : /* bat missing ERROR or ignore ? currently error. */
682 0 : TRC_CRITICAL(GDK, "unknown type\n");
683 0 : res = LOG_ERR;
684 : }
685 : return res;
686 : }
687 :
688 :
689 : static gdk_return
690 579270 : la_bat_update_count(logger *lg, log_id id, lng cnt, int tid)
691 : {
692 579270 : BATiter cni = bat_iterator_nolock(lg->catalog_id);
693 :
694 579270 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
695 579270 : MT_rwlock_rdlock(&cni.b->thashlock);
696 579270 : BUN p, cp = BUN_NONE;
697 :
698 2287293 : HASHloop_int(cni, cni.b->thash, p, &id) {
699 1497510 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
700 :
701 1497510 : if (lid != lng_nil && lid <= tid)
702 : break;
703 : cp = p;
704 : }
705 579270 : if (cp != BUN_NONE) {
706 579070 : lng ocnt = *(lng *) Tloc(lg->catalog_cnt, cp);
707 579070 : assert(lg->catalog_cnt->hseqbase == 0);
708 579070 : if (ocnt < cnt && BUNreplace(lg->catalog_cnt, cp, &cnt, false) != GDK_SUCCEED) {
709 0 : MT_rwlock_rdunlock(&cni.b->thashlock);
710 0 : return GDK_FAIL;
711 : }
712 : }
713 579270 : MT_rwlock_rdunlock(&cni.b->thashlock);
714 579270 : return GDK_SUCCEED;
715 : }
716 : return GDK_FAIL;
717 : }
718 :
719 : static gdk_return
720 362870 : la_bat_updates(logger *lg, logaction *la, int tid)
721 : {
722 362870 : log_bid bid = internal_find_bat(lg, la->cid, tid);
723 362870 : BAT *b = NULL;
724 :
725 362870 : if (bid < 0)
726 : return GDK_FAIL;
727 362870 : if (!bid) {
728 : /* object already gone, nothing needed */
729 : return GDK_SUCCEED;
730 : }
731 :
732 362870 : if (!lg->flushing) {
733 1258 : b = BATdescriptor(bid);
734 1258 : if (b == NULL)
735 : return GDK_FAIL;
736 : }
737 362870 : BUN cnt = 0;
738 362870 : if (la->type == LOG_UPDATE_BULK) {
739 361941 : if (!lg->flushing) {
740 347 : cnt = BATcount(b);
741 347 : int is_msk = (b->ttype == TYPE_msk);
742 : /* handle offset 0 ie clear */
743 347 : if ( /* DISABLES CODE */ (0) && la->offset == 0 && cnt)
744 : BATclear(b, true);
745 : /* handle offset */
746 347 : if (cnt <= (BUN) la->offset) {
747 222 : msk t = 1;
748 222 : if (cnt < (BUN) la->offset) { /* insert nils */
749 0 : const void *tv = (is_msk) ? &t : ATOMnilptr(b->ttype);
750 0 : lng i, d = la->offset - BATcount(b);
751 0 : for (i = 0; i < d; i++) {
752 0 : if (BUNappend(b, tv, true) != GDK_SUCCEED) {
753 0 : logbat_destroy(b);
754 0 : return GDK_FAIL;
755 : }
756 : }
757 : }
758 222 : if (BATcount(b) == (BUN) la->offset && BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
759 0 : logbat_destroy(b);
760 0 : return GDK_FAIL;
761 : }
762 : } else {
763 125 : BATiter vi = bat_iterator(la->b);
764 125 : BUN p, q;
765 :
766 2167 : for (p = 0, q = (BUN) la->offset; p < (BUN) la->nr; p++, q++) {
767 2042 : const void *t = BUNtail(vi, p);
768 :
769 2042 : if (q < cnt) {
770 2041 : if (b->tnosorted == q)
771 7 : b->tnosorted = 0;
772 2041 : if (b->tnorevsorted == q)
773 7 : b->tnorevsorted = 0;
774 2041 : if (b->tnokey[0] == q ||
775 2034 : b->tnokey[1] == q) {
776 7 : b->tnokey[0] = 0;
777 7 : b->tnokey[1] = 0;
778 : }
779 2041 : b->tkey = false;
780 2041 : b->tsorted = false;
781 2041 : b->tkey = false;
782 2041 : if (BUNreplace(b, q, t, true) != GDK_SUCCEED) {
783 0 : logbat_destroy(b);
784 0 : bat_iterator_end(&vi);
785 0 : return GDK_FAIL;
786 : }
787 : } else {
788 1 : if (BUNappend(b, t, true) != GDK_SUCCEED) {
789 0 : logbat_destroy(b);
790 0 : bat_iterator_end(&vi);
791 0 : return GDK_FAIL;
792 : }
793 : }
794 : }
795 125 : bat_iterator_end(&vi);
796 : }
797 : }
798 929 : } else if (la->type == LOG_UPDATE) {
799 929 : if (!lg->flushing && BATupdate(b, la->uid, la->b, true) != GDK_SUCCEED) {
800 : return GDK_FAIL;
801 : }
802 : }
803 362870 : cnt = (BUN) (la->offset + la->nr);
804 362870 : if (la_bat_update_count(lg, la->cid, cnt, tid) != GDK_SUCCEED) {
805 0 : if (b)
806 0 : logbat_destroy(b);
807 0 : return GDK_FAIL;
808 : }
809 362870 : if (b)
810 1258 : logbat_destroy(b);
811 : return GDK_SUCCEED;
812 : }
813 :
814 : static log_return
815 9320 : log_read_destroy(logger *lg, trans *tr, log_id id)
816 : {
817 9320 : (void) lg;
818 9320 : assert(!lg->inmemory);
819 9320 : if (tr_grow(tr) == GDK_SUCCEED) {
820 9320 : tr->changes[tr->nr].type = LOG_DESTROY;
821 9320 : tr->changes[tr->nr].cid = id;
822 9320 : tr->nr++;
823 9320 : return LOG_OK;
824 : }
825 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
826 0 : return LOG_ERR;
827 : }
828 :
829 : static gdk_return
830 40 : la_bat_destroy(logger *lg, logaction *la, int tid)
831 : {
832 40 : log_bid bid = internal_find_bat(lg, la->cid, tid);
833 :
834 40 : if (bid < 0)
835 : return GDK_FAIL;
836 40 : if (!bid) {
837 : #ifndef NDEBUG
838 0 : GDKwarning("failed to find bid for object %d\n", la->cid);
839 : #endif
840 0 : return GDK_SUCCEED;
841 : }
842 40 : if (bid && log_del_bat(lg, bid) != GDK_SUCCEED)
843 : return GDK_FAIL;
844 : return GDK_SUCCEED;
845 : }
846 :
847 : static log_return
848 156998 : log_read_create(logger *lg, trans *tr, log_id id)
849 : {
850 156998 : bte tt;
851 156998 : int tpe;
852 :
853 156998 : assert(!lg->inmemory);
854 156998 : TRC_DEBUG(WAL, "create %d", id);
855 :
856 156998 : if (mnstr_read(lg->input_log, &tt, 1, 1) != 1) {
857 0 : TRC_CRITICAL(GDK, "read failed\n");
858 0 : return LOG_EOF;
859 : }
860 :
861 156998 : tpe = find_type_nr(lg, tt);
862 : /* read create */
863 156998 : if (tr_grow(tr) == GDK_SUCCEED) {
864 156998 : tr->changes[tr->nr].type = LOG_CREATE;
865 156998 : tr->changes[tr->nr].tt = tpe;
866 156998 : tr->changes[tr->nr].cid = id;
867 156998 : tr->nr++;
868 156998 : return LOG_OK;
869 : }
870 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
871 0 : return LOG_ERR;
872 : }
873 :
874 : static gdk_return
875 253 : la_bat_create(logger *lg, logaction *la, int tid)
876 : {
877 253 : BAT *b;
878 :
879 : /* formerly head column type, should be void */
880 253 : if ((b = COLnew(0, la->tt, BATSIZE, PERSISTENT)) == NULL)
881 : return GDK_FAIL;
882 :
883 253 : if (la->tt < 0)
884 0 : BATtseqbase(b, 0);
885 :
886 506 : if ((b = BATsetaccess(b, BAT_READ)) == NULL ||
887 253 : log_add_bat(lg, b, la->cid, tid) != GDK_SUCCEED) {
888 0 : logbat_destroy(b);
889 0 : return GDK_FAIL;
890 : }
891 253 : logbat_destroy(b);
892 253 : return GDK_SUCCEED;
893 : }
894 :
895 : static gdk_return
896 233 : log_write_new_types(logger *lg, FILE *fp)
897 : {
898 233 : bte id = 0;
899 :
900 : /* write types and insert into bats */
901 233 : memset(lg->type_id, -1, sizeof(lg->type_id));
902 233 : memset(lg->type_nr, 255, sizeof(lg->type_nr));
903 : /* first the fixed sized types */
904 7220 : for (int i = 0; i < GDKatomcnt; i++) {
905 6987 : if (ATOMvarsized(i))
906 1862 : continue;
907 5125 : lg->type_id[i] = id;
908 5125 : lg->type_nr[id] = i;
909 5125 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
910 : return GDK_FAIL;
911 5125 : id++;
912 : }
913 : /* second the var sized types */
914 : id = -127; /* start after nil */
915 7220 : for (int i = 0; i < GDKatomcnt; i++) {
916 6987 : if (!ATOMvarsized(i))
917 5125 : continue;
918 1862 : lg->type_id[i] = id;
919 1862 : lg->type_nr[256 + id] = i;
920 1862 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
921 : return GDK_FAIL;
922 1862 : id++;
923 : }
924 : return GDK_SUCCEED;
925 : }
926 :
927 : #define TR_SIZE 1024
928 :
929 : static trans *
930 56502 : tr_create(trans *tr, int tid)
931 : {
932 56502 : trans *ntr = GDKmalloc(sizeof(trans));
933 :
934 56502 : if (ntr == NULL)
935 : return NULL;
936 56502 : ntr->tid = tid;
937 56502 : ntr->sz = TR_SIZE;
938 56502 : ntr->nr = 0;
939 56502 : ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
940 56502 : if (ntr->changes == NULL) {
941 0 : GDKfree(ntr);
942 0 : return NULL;
943 : }
944 56502 : ntr->tr = tr;
945 56502 : return ntr;
946 : }
947 :
948 : static gdk_return
949 529188 : la_apply(logger *lg, logaction *c, int tid)
950 : {
951 529188 : gdk_return ret = GDK_SUCCEED;
952 :
953 529188 : switch (c->type) {
954 362870 : case LOG_UPDATE_BULK:
955 : case LOG_UPDATE:
956 362870 : ret = la_bat_updates(lg, c, tid);
957 362870 : break;
958 156998 : case LOG_CREATE:
959 156998 : if (!lg->flushing)
960 253 : ret = la_bat_create(lg, c, tid);
961 : break;
962 9320 : case LOG_DESTROY:
963 9320 : if (!lg->flushing)
964 40 : ret = la_bat_destroy(lg, c, tid);
965 : break;
966 : default:
967 0 : MT_UNREACHABLE();
968 : }
969 529188 : return ret;
970 : }
971 :
972 : static void
973 529188 : la_destroy(logaction *c)
974 : {
975 529188 : if ((c->type == LOG_UPDATE || c->type == LOG_UPDATE_BULK) && c->b)
976 1258 : logbat_destroy(c->b);
977 529188 : if (c->type == LOG_UPDATE && c->uid)
978 911 : logbat_destroy(c->uid);
979 529188 : }
980 :
981 : static gdk_return
982 529188 : tr_grow(trans *tr)
983 : {
984 529188 : if (tr->nr == tr->sz) {
985 8 : logaction *changes;
986 8 : tr->sz <<= 1;
987 8 : changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
988 8 : if (changes == NULL)
989 : return GDK_FAIL;
990 8 : tr->changes = changes;
991 : }
992 : /* cleanup the next */
993 529188 : tr->changes[tr->nr].b = NULL;
994 529188 : return GDK_SUCCEED;
995 : }
996 :
997 : static trans *
998 56502 : tr_destroy(trans *tr)
999 : {
1000 56502 : trans *r = tr->tr;
1001 :
1002 56502 : GDKfree(tr->changes);
1003 56502 : GDKfree(tr);
1004 56502 : return r;
1005 : }
1006 :
1007 : static trans *
1008 0 : tr_abort_(logger *lg, trans *tr, int s)
1009 : {
1010 0 : int i;
1011 :
1012 0 : (void) lg;
1013 :
1014 0 : TRC_DEBUG(WAL, "abort");
1015 :
1016 0 : for (i = s; i < tr->nr; i++)
1017 0 : la_destroy(&tr->changes[i]);
1018 0 : return tr_destroy(tr);
1019 : }
1020 :
1021 : static trans *
1022 0 : tr_abort(logger *lg, trans *tr)
1023 : {
1024 0 : return tr_abort_(lg, tr, 0);
1025 : }
1026 :
1027 : static trans *
1028 56502 : tr_commit(logger *lg, trans *tr)
1029 : {
1030 56502 : int i;
1031 :
1032 56502 : TRC_DEBUG(WAL, "commit");
1033 :
1034 585690 : for (i = 0; i < tr->nr; i++) {
1035 529188 : if (la_apply(lg, &tr->changes[i], tr->tid) != GDK_SUCCEED) {
1036 0 : TRC_CRITICAL(GDK, "aborting transaction\n");
1037 0 : do {
1038 0 : tr = tr_abort_(lg, tr, i);
1039 0 : } while (tr != NULL);
1040 : return (trans *) -1;
1041 : }
1042 529188 : la_destroy(&tr->changes[i]);
1043 : }
1044 56502 : lg->saved_tid = tr->tid;
1045 56502 : return tr_destroy(tr);
1046 : }
1047 :
1048 : static gdk_return
1049 105 : log_read_types_file(logger *lg, FILE *fp, int version)
1050 : {
1051 105 : int id = 0;
1052 105 : char atom_name[IDLENGTH];
1053 105 : bool seen_geom = false;
1054 :
1055 : /* scanf should use IDLENGTH somehow */
1056 3221 : while (fscanf(fp, "%d,%63s\n", &id, atom_name) == 2) {
1057 3116 : if (version < 52303 && strcmp(atom_name, "BAT") == 0)
1058 8 : continue;
1059 3108 : int i = ATOMindex(atom_name);
1060 :
1061 3108 : if (id < -127 || id > 127 || i < 0) {
1062 0 : GDKerror("unknown type in log file '%s'\n", atom_name);
1063 0 : return GDK_FAIL;
1064 : }
1065 3108 : seen_geom |= strcmp(atom_name, "mbr") == 0 || strcmp(atom_name, "wkb") == 0;
1066 3108 : lg->type_id[i] = (int8_t) id;
1067 3108 : lg->type_nr[id < 0 ? 256 + id : id] = i;
1068 : }
1069 : #ifdef HAVE_GEOM
1070 105 : if (!seen_geom && ATOMindex("mbr") > 0) {
1071 0 : GDKerror("incompatible database: server supports GEOM, but database does not\n");
1072 0 : return GDK_FAIL;
1073 : }
1074 : #endif
1075 : (void) seen_geom;
1076 : return GDK_SUCCEED;
1077 : }
1078 :
1079 :
1080 : gdk_return
1081 233 : log_create_types_file(logger *lg, const char *filename)
1082 : {
1083 233 : FILE *fp;
1084 :
1085 233 : if ((fp = MT_fopen(filename, "w")) == NULL) {
1086 0 : GDKerror("cannot create log file %s\n", filename);
1087 0 : return GDK_FAIL;
1088 : }
1089 233 : if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
1090 0 : fclose(fp);
1091 0 : GDKerror("writing log file %s failed", filename);
1092 0 : if (MT_remove(filename) < 0)
1093 0 : GDKsyserror("remove %s failed\n", filename);
1094 0 : return GDK_FAIL;
1095 : }
1096 :
1097 233 : if (log_write_new_types(lg, fp) != GDK_SUCCEED) {
1098 0 : fclose(fp);
1099 0 : GDKerror("writing log file %s failed", filename);
1100 0 : if (MT_remove(filename) < 0)
1101 0 : GDKsyserror("remove %s failed\n", filename);
1102 0 : return GDK_FAIL;
1103 : }
1104 233 : if (fflush(fp) < 0 || (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)
1105 : #if defined(_MSC_VER)
1106 : && _commit(_fileno(fp)) < 0
1107 : #elif defined(HAVE_FDATASYNC)
1108 0 : && fdatasync(fileno(fp)) < 0
1109 : #elif defined(HAVE_FSYNC)
1110 : && fsync(fileno(fp)) < 0
1111 : #endif
1112 : )) {
1113 0 : GDKsyserror("flushing log file %s failed", filename);
1114 0 : fclose(fp);
1115 0 : if (MT_remove(filename) < 0)
1116 0 : GDKsyserror("remove %s failed\n", filename);
1117 0 : return GDK_FAIL;
1118 : }
1119 233 : if (fclose(fp) < 0) {
1120 0 : GDKsyserror("closing log file %s failed", filename);
1121 0 : if (MT_remove(filename) < 0)
1122 0 : GDKsyserror("remove %s failed\n", filename);
1123 0 : return GDK_FAIL;
1124 : }
1125 : return GDK_SUCCEED;
1126 : }
1127 :
1128 : #define rotation_lock(lg) MT_lock_set(&(lg)->rotation_lock)
1129 : #define rotation_unlock(lg) MT_lock_unset(&(lg)->rotation_lock)
1130 : #define rotation_trylock(lg, ms) MT_lock_trytime(&(lg)->rotation_lock, ms)
1131 :
1132 : static gdk_return
1133 12621 : log_open_output(logger *lg)
1134 : {
1135 12621 : logged_range *new_range = (logged_range *) GDKmalloc(sizeof(logged_range));
1136 :
1137 12621 : if (!new_range) {
1138 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1139 0 : return GDK_FAIL;
1140 : }
1141 25241 : if (!LOG_DISABLED(lg)) {
1142 12620 : char id[32];
1143 12620 : char *filename;
1144 :
1145 12620 : if (snprintf(id, sizeof(id), LLFMT, lg->id) >= (int) sizeof(id)) {
1146 0 : TRC_CRITICAL(GDK, "filename is too large\n");
1147 0 : GDKfree(new_range);
1148 0 : return GDK_FAIL;
1149 : }
1150 12620 : if ((filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id)) == NULL) {
1151 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1152 0 : GDKfree(new_range);
1153 0 : return GDK_FAIL;
1154 : }
1155 :
1156 12620 : TRC_INFO(WAL, "opening %s.%s", LOGFILE, id);
1157 12620 : new_range->output_log = open_wstream(filename);
1158 12620 : if (new_range->output_log) {
1159 12620 : short byteorder = 1234;
1160 12620 : mnstr_write(new_range->output_log, &byteorder, sizeof(byteorder), 1);
1161 : }
1162 :
1163 12620 : if (new_range->output_log == NULL || mnstr_errnr(new_range->output_log) != MNSTR_NO__ERROR) {
1164 0 : TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, mnstr_peek_error(NULL));
1165 0 : close_stream(new_range->output_log);
1166 0 : GDKfree(new_range);
1167 0 : GDKfree(filename);
1168 0 : return GDK_FAIL;
1169 : }
1170 12620 : GDKfree(filename);
1171 : } else {
1172 1 : new_range->output_log = NULL;
1173 : }
1174 12621 : ATOMIC_INIT(&new_range->refcount, 1);
1175 12621 : ATOMIC_INIT(&new_range->last_ts, 0);
1176 12621 : ATOMIC_INIT(&new_range->flushed_ts, 0);
1177 12621 : ATOMIC_INIT(&new_range->drops, 0);
1178 12621 : new_range->id = lg->id;
1179 12621 : new_range->next = NULL;
1180 12621 : logged_range *current = lg->current;
1181 12621 : assert(current && current->next == NULL);
1182 12621 : new_range->cnt = current->cnt;
1183 12621 : current->next = new_range;
1184 12621 : lg->file_age = GDKusec();
1185 12621 : return GDK_SUCCEED;
1186 : }
1187 :
1188 : static inline void
1189 12814 : log_close_input(logger *lg)
1190 : {
1191 12814 : if (!lg->inmemory && lg->input_log) {
1192 12379 : TRC_INFO(WAL, "closing input log %s", mnstr_name(lg->input_log));
1193 12379 : close_stream(lg->input_log);
1194 : }
1195 12814 : lg->input_log = NULL;
1196 12814 : }
1197 :
1198 : static inline void
1199 330 : log_close_output(logger *lg)
1200 : {
1201 330 : if (!LOG_DISABLED(lg) && lg->current->output_log) {
1202 329 : TRC_INFO(WAL, "closing output log %s", mnstr_name(lg->current->output_log));
1203 329 : close_stream(lg->current->output_log);
1204 : }
1205 330 : lg->current->output_log = NULL;
1206 330 : }
1207 :
1208 : static gdk_return
1209 12484 : log_open_input(logger *lg, const char *filename, bool *filemissing)
1210 : {
1211 12484 : TRC_INFO(WAL, "opening input log %s", filename);
1212 12484 : lg->input_log = open_rstream(filename);
1213 :
1214 : /* if the file doesn't exist, there is nothing to be read back */
1215 12484 : if (lg->input_log == NULL || mnstr_errnr(lg->input_log) != MNSTR_NO__ERROR) {
1216 105 : log_close_input(lg);
1217 105 : *filemissing = true;
1218 105 : return GDK_SUCCEED;
1219 : }
1220 12379 : short byteorder;
1221 12379 : switch (mnstr_read(lg->input_log, &byteorder, sizeof(byteorder), 1)) {
1222 0 : case -1:
1223 0 : log_close_input(lg);
1224 0 : TRC_CRITICAL(GDK, "read failed\n");
1225 0 : return GDK_FAIL;
1226 5 : case 0:
1227 : /* empty file is ok */
1228 5 : log_close_input(lg);
1229 5 : return GDK_SUCCEED;
1230 12374 : case 1:
1231 : /* if not empty, must start with correct byte order mark */
1232 12374 : if (byteorder != 1234) {
1233 0 : TRC_CRITICAL(GDK, "incorrect byte order word in file %s\n", filename);
1234 0 : log_close_input(lg);
1235 0 : return GDK_FAIL;
1236 : }
1237 : break;
1238 : }
1239 : return GDK_SUCCEED;
1240 : }
1241 :
1242 : static log_return
1243 12374 : log_read_transaction(logger *lg, uint32_t *updated, BUN maxupdated)
1244 : {
1245 12374 : logformat l;
1246 12374 : trans *tr = NULL;
1247 12374 : log_return err = LOG_OK;
1248 12374 : bool ok = true;
1249 12374 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
1250 :
1251 12374 : (void) maxupdated; /* only used inside assert() */
1252 :
1253 12374 : if (!lg->flushing)
1254 167 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
1255 :
1256 12374 : BAT *cands = NULL; /* used in case of LOG_BAT_GROUP */
1257 :
1258 760599 : while (err == LOG_OK && (ok = log_read_format(lg, &l))) {
1259 748225 : if (l.flag == 0 && l.id == 0) {
1260 12374 : err = LOG_EOF;
1261 : break;
1262 : }
1263 :
1264 748225 : TRC_DEBUG_IF(WAL) {
1265 0 : if (l.flag > 0 && l.flag != LOG_CLEAR &&
1266 : l.flag < (bte) (sizeof(log_commands) / sizeof(log_commands[0])))
1267 0 : TRC_DEBUG_ENDIF(WAL, "%s %d", log_commands[(int) l.flag], l.id);
1268 : else
1269 0 : TRC_DEBUG_ENDIF(WAL, "%d %d", l.flag, l.id);
1270 : }
1271 748225 : switch (l.flag) {
1272 557447 : case LOG_UPDATE_CONST:
1273 : case LOG_UPDATE_BULK:
1274 : case LOG_UPDATE:
1275 : case LOG_CREATE:
1276 : case LOG_DESTROY:
1277 557447 : if (tr != NULL && updated && BAThash(lg->catalog_id) == GDK_SUCCEED) {
1278 555739 : BATiter cni = bat_iterator(lg->catalog_id);
1279 555739 : BUN p;
1280 555739 : BUN posnew = BUN_NONE;
1281 555739 : BUN posold = BUN_NONE;
1282 555739 : MT_rwlock_rdlock(&cni.b->thashlock);
1283 4798006 : HASHloop_int(cni, cni.b->thash, p, &l.id) {
1284 3873181 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
1285 3873181 : if (lid == lng_nil || lid > tr->tid)
1286 : posnew = p;
1287 1800171 : else if (lid == tr->tid)
1288 4161883 : posold = p;
1289 : }
1290 555739 : MT_rwlock_rdunlock(&cni.b->thashlock);
1291 555739 : bat_iterator_end(&cni);
1292 : /* Normally at this point, posnew is the
1293 : * location of the bat that this
1294 : * transaction is working on, and posold
1295 : * is the location of the previous
1296 : * version of the bat. If LOG_CREATE,
1297 : * both are relevant, since the latter
1298 : * is the new bat, and the former is the
1299 : * to-be-destroyed bat. For
1300 : * LOG_DESTROY, only posnew should be
1301 : * relevant, but for the other types, if
1302 : * the table is destroyed later in the
1303 : * same transaction, we need posold, and
1304 : * else (the normal case) we need
1305 : * posnew. */
1306 555739 : if (posnew != BUN_NONE) {
1307 546459 : assert(posnew < maxupdated);
1308 546459 : updated[posnew / 32] |= 1U << (posnew % 32);
1309 : }
1310 555739 : if ((l.flag == LOG_CREATE || posnew == BUN_NONE) && posold != BUN_NONE) {
1311 162162 : assert(posold < maxupdated);
1312 162162 : updated[posold / 32] |= 1U << (posold % 32);
1313 : }
1314 : }
1315 : break;
1316 : default:
1317 : /* do nothing */
1318 : break;
1319 : }
1320 : /* the functions we call here can succeed (LOG_OK),
1321 : * but they can also fail for two different reasons:
1322 : * they can run out of input (LOG_EOF -- this is not
1323 : * serious, we just abort the remaining transactions),
1324 : * or some malloc or BAT update fails (LOG_ERR -- this
1325 : * is serious, we must abort the complete process);
1326 : * the latter failure causes the current function to
1327 : * return GDK_FAIL */
1328 748225 : switch (l.flag) {
1329 56502 : case LOG_START:
1330 56502 : assert(!lg->flushing || l.id <= lg->tid);
1331 56502 : if (!lg->flushing && l.id > lg->tid)
1332 109 : lg->tid = l.id; /* should only happen during initialization */
1333 56502 : if ((tr = tr_create(tr, l.id)) == NULL) {
1334 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
1335 0 : err = LOG_ERR;
1336 0 : break;
1337 : }
1338 56502 : TRC_DEBUG(WAL, "tstart %d\n", tr->tid);
1339 : break;
1340 56502 : case LOG_END:
1341 56502 : if (tr == NULL)
1342 : err = LOG_EOF;
1343 56502 : else if (tr->tid != l.id) /* abort record */
1344 0 : tr = tr_abort(lg, tr);
1345 : else
1346 56502 : tr = tr_commit(lg, tr);
1347 : break;
1348 2760 : case LOG_SEQ:
1349 2760 : err = log_read_seq(lg, &l);
1350 2760 : break;
1351 391129 : case LOG_UPDATE_CONST:
1352 : case LOG_UPDATE_BULK:
1353 : case LOG_UPDATE:
1354 391129 : if (tr == NULL)
1355 : err = LOG_EOF;
1356 : else {
1357 614748 : err = log_read_updates(lg, tr, &l, l.id, cands ? &cands : NULL);
1358 : }
1359 : break;
1360 156998 : case LOG_CREATE:
1361 156998 : if (tr == NULL)
1362 : err = LOG_EOF;
1363 : else
1364 156998 : err = log_read_create(lg, tr, l.id);
1365 : break;
1366 9320 : case LOG_DESTROY:
1367 9320 : if (tr == NULL)
1368 : err = LOG_EOF;
1369 : else
1370 9320 : err = log_read_destroy(lg, tr, l.id);
1371 : break;
1372 75014 : case LOG_BAT_GROUP:
1373 75014 : if (tr == NULL)
1374 : err = LOG_EOF;
1375 : else {
1376 75014 : if (l.id > 0) {
1377 : /* START OF LOG_BAT_GROUP */
1378 37507 : cands = COLnew(0, TYPE_void, 0, SYSTRANS);
1379 37507 : if (!cands) {
1380 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
1381 0 : err = LOG_ERR;
1382 : }
1383 37507 : } else if (cands == NULL) {
1384 : /* should have gone through the
1385 : * above option earlier */
1386 0 : TRC_CRITICAL(GDK, "unexpected error\n");
1387 0 : err = LOG_ERR;
1388 : } else {
1389 : /* END OF LOG_BAT_GROUP */
1390 37507 : BBPunfix(cands->batCacheid);
1391 37507 : cands = NULL;
1392 : }
1393 : }
1394 : break;
1395 0 : default:
1396 0 : TRC_CRITICAL(GDK, "unrecognized log entry %d", l.flag);
1397 0 : err = LOG_ERR;
1398 : }
1399 748225 : if (tr == (trans *) -1) {
1400 : /* message already generated by tr_commit */
1401 : err = LOG_ERR;
1402 12374 : tr = NULL;
1403 : break;
1404 : }
1405 : }
1406 12374 : while (tr) {
1407 0 : TRC_WARNING(GDK, "aborting transaction\n");
1408 0 : tr = tr_abort(lg, tr);
1409 : }
1410 12374 : if (!lg->flushing)
1411 167 : ATOMIC_SET(&GDKdebug, dbg);
1412 :
1413 12374 : BBPreclaim(cands);
1414 12374 : if (!ok)
1415 12374 : return LOG_EOF;
1416 : return err;
1417 : }
1418 :
1419 : static gdk_return
1420 277 : log_readlog(logger *lg, const char *filename, bool *filemissing)
1421 : {
1422 277 : log_return err = LOG_OK;
1423 277 : time_t t0, t1;
1424 277 : struct stat sb;
1425 :
1426 277 : assert(!lg->inmemory);
1427 :
1428 277 : TRC_INFO(WAL, "opening %s\n", filename);
1429 :
1430 277 : gdk_return res = log_open_input(lg, filename, filemissing);
1431 277 : if (!lg->input_log || res != GDK_SUCCEED)
1432 : return res;
1433 167 : int fd;
1434 167 : if ((fd = getFileNo(lg->input_log)) < 0 || fstat(fd, &sb) < 0) {
1435 0 : GDKsyserror("fstat on opened file %s failed\n", filename);
1436 0 : log_close_input(lg);
1437 : /* If the file could be opened, but fstat fails,
1438 : * something weird is going on */
1439 0 : return GDK_FAIL;
1440 : }
1441 167 : t0 = time(NULL);
1442 167 : TRC_INFO_IF(WAL) {
1443 0 : TRC_INFO_ENDIF(WAL, "Start reading the write-ahead log '%s'\n", filename);
1444 0 : GDKtracer_flush_buffer();
1445 : }
1446 334 : while (err != LOG_EOF && err != LOG_ERR) {
1447 167 : t1 = time(NULL);
1448 167 : if (t1 - t0 > 10) {
1449 0 : lng fpos;
1450 0 : t0 = t1;
1451 : /* not more than once every 10 seconds */
1452 0 : fpos = (lng) getfilepos(getFile(lg->input_log));
1453 0 : TRC_INFO_IF(WAL) {
1454 0 : if (fpos >= 0) {
1455 0 : TRC_INFO_ENDIF(WAL, "still reading write-ahead log \"%s\" (%d%% done)\n",
1456 : filename, (int) ((fpos * 100 + 50) / sb.st_size));
1457 0 : GDKtracer_flush_buffer();
1458 : }
1459 : }
1460 : }
1461 167 : err = log_read_transaction(lg, NULL, 0);
1462 : }
1463 167 : log_close_input(lg);
1464 167 : lg->input_log = NULL;
1465 :
1466 : /* remaining transactions are not committed, ie abort */
1467 167 : TRC_INFO_IF(WAL) {
1468 0 : TRC_INFO_ENDIF(WAL, "Finished reading the write-ahead log '%s'\n", filename);
1469 0 : GDKtracer_flush_buffer();
1470 : }
1471 : /* we cannot distinguish errors from incomplete transactions
1472 : * (even if we would log aborts in the logs). So we simply
1473 : * abort and move to the next log file */
1474 167 : return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
1475 : }
1476 :
1477 : /*
1478 : * The log files are incrementally numbered, starting from 2. They are
1479 : * processed in the same sequence.
1480 : */
1481 : static gdk_return
1482 105 : log_readlogs(logger *lg, const char *filename)
1483 : {
1484 105 : gdk_return res = GDK_SUCCEED;
1485 :
1486 105 : assert(!lg->inmemory);
1487 105 : TRC_DEBUG(WAL, "logger id is " LLFMT " last logger id is " LLFMT "\n", lg->id, lg->saved_id);
1488 :
1489 105 : char log_filename[FILENAME_MAX];
1490 105 : if (lg->saved_id >= lg->id) {
1491 105 : bool filemissing = false;
1492 :
1493 105 : lg->id = lg->saved_id + 1;
1494 382 : while (res == GDK_SUCCEED && !filemissing) {
1495 277 : if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
1496 0 : GDKerror("Logger filename path is too large\n");
1497 0 : return GDK_FAIL;
1498 : }
1499 277 : res = log_readlog(lg, log_filename, &filemissing);
1500 277 : if (!filemissing) {
1501 172 : lg->saved_id++;
1502 172 : lg->id++;
1503 : }
1504 : }
1505 : }
1506 : return res;
1507 : }
1508 :
1509 : static gdk_return
1510 12305 : log_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1511 : {
1512 12305 : TRC_DEBUG(WAL, "commit");
1513 :
1514 12305 : return bm_commit(lg, pending, updated, maxupdated);
1515 : }
1516 :
1517 : static gdk_return
1518 105 : check_version(logger *lg, FILE *fp, bool *needsnew)
1519 : {
1520 105 : int version = 0;
1521 :
1522 105 : assert(!lg->inmemory);
1523 105 : if (fscanf(fp, "%6d", &version) != 1) {
1524 0 : GDKerror("Could not read the version number from the file '%s/log'.\n", lg->dir);
1525 0 : fclose(fp);
1526 0 : return GDK_FAIL;
1527 : }
1528 105 : if (version != lg->version) {
1529 16 : if (lg->prefuncp == NULL ||
1530 8 : (*lg->prefuncp) (lg->funcdata, version, lg->version) != GDK_SUCCEED) {
1531 0 : GDKerror("Incompatible database version %06d, "
1532 : "this server supports version %06d.\n%s",
1533 : version, lg->version,
1534 : version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
1535 0 : fclose(fp);
1536 0 : return GDK_FAIL;
1537 : }
1538 8 : *needsnew = true; /* we need to write a new log file */
1539 : } else {
1540 97 : lg->postfuncp = NULL; /* don't call */
1541 97 : *needsnew = false; /* log file already up-to-date */
1542 : }
1543 210 : if (fgetc(fp) != '\n' || /* skip \n */
1544 105 : fgetc(fp) != '\n') { /* skip \n */
1545 0 : GDKerror("Badly formatted log file");
1546 0 : fclose(fp);
1547 0 : return GDK_FAIL;
1548 : }
1549 105 : if (log_read_types_file(lg, fp, version) != GDK_SUCCEED) {
1550 0 : fclose(fp);
1551 0 : return GDK_FAIL;
1552 : }
1553 105 : fclose(fp);
1554 105 : return GDK_SUCCEED;
1555 : }
1556 :
1557 : static BAT *
1558 352 : bm_tids(BAT *b, BAT *d)
1559 : {
1560 352 : BUN sz = BATcount(b);
1561 352 : BAT *tids = BATdense(0, 0, sz);
1562 :
1563 352 : if (tids == NULL)
1564 : return NULL;
1565 :
1566 352 : if (BATcount(d)) {
1567 352 : BAT *diff = BATdiff(tids, d, NULL, NULL, false, false, BUN_NONE);
1568 352 : logbat_destroy(tids);
1569 352 : tids = diff;
1570 : }
1571 : return tids;
1572 : }
1573 :
1574 :
1575 : static gdk_return
1576 8132 : log_switch_bat(BAT *old, BAT *new, const char *fn, const char *name)
1577 : {
1578 8132 : char bak[IDLENGTH];
1579 :
1580 8132 : if (BATmode(old, true) != GDK_SUCCEED) {
1581 0 : GDKerror("cannot convert old %s to transient", name);
1582 0 : return GDK_FAIL;
1583 : }
1584 8132 : if (strconcat_len(bak, sizeof(bak), fn, "_", name, NULL) >= sizeof(bak)) {
1585 0 : GDKerror("name %s_%s too long\n", fn, name);
1586 0 : return GDK_FAIL;
1587 : }
1588 8132 : if (BBPrename(old, NULL) != 0 || BBPrename(new, bak) != 0) {
1589 0 : GDKerror("rename (%s) failed\n", bak);
1590 0 : return GDK_FAIL;
1591 : }
1592 8132 : BBPretain(new->batCacheid);
1593 8132 : return GDK_SUCCEED;
1594 : }
1595 :
1596 : static gdk_return
1597 331 : bm_get_counts(logger *lg)
1598 : {
1599 331 : BUN p, q;
1600 331 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
1601 :
1602 27602 : BATloop(lg->catalog_bid, p, q) {
1603 27271 : oid pos = p;
1604 27271 : lng cnt = 0;
1605 27271 : lng lid = lng_nil;
1606 :
1607 27271 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
1608 27271 : BAT *b = BBPquickdesc(bids[p]);
1609 27271 : assert(b);
1610 27271 : cnt = BATcount(b);
1611 : } else {
1612 0 : lid = BBP_desc(bids[p])->batCacheid != 0 && log_find(lg->catalog_bid, lg->dcatalog, bids[p]) == BUN_NONE ? 1 : -1;
1613 : }
1614 27271 : if (BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED)
1615 0 : return GDK_FAIL;
1616 27271 : if (BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
1617 : return GDK_FAIL;
1618 : }
1619 : return GDK_SUCCEED;
1620 : }
1621 :
1622 : static int
1623 8132 : subcommit_list_add(int next, bat *n, BUN *sizes, bat bid, BUN sz)
1624 : {
1625 8132 : assert(sz <= BBP_desc(bid)->batCount || sz == BUN_NONE);
1626 1835897 : for (int i = 0; i < next; i++) {
1627 1827765 : if (n[i] == bid) {
1628 0 : sizes[i] = sz;
1629 0 : return next;
1630 : }
1631 : }
1632 8132 : n[next] = bid;
1633 8132 : sizes[next++] = sz;
1634 8132 : return next;
1635 : }
1636 :
1637 : static int
1638 2476 : cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng *cnts,
1639 : BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, BUN cleanup)
1640 : {
1641 2476 : BAT *nbids, *noids, *ncnts, *nlids, *ndels;
1642 2476 : BUN p, q;
1643 2476 : int err = 0, rcnt = 0;
1644 :
1645 2476 : BUN ocnt = BATcount(catalog_bid);
1646 2476 : nbids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1647 2476 : noids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1648 2476 : ncnts = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1649 2476 : nlids = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1650 2476 : ndels = logbat_new(TYPE_oid, BATcount(dcatalog) - cleanup, PERSISTENT);
1651 :
1652 2476 : if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == NULL || ndels == NULL) {
1653 0 : logbat_destroy(nbids);
1654 0 : logbat_destroy(noids);
1655 0 : logbat_destroy(ncnts);
1656 0 : logbat_destroy(nlids);
1657 0 : logbat_destroy(ndels);
1658 0 : return 0;
1659 : }
1660 :
1661 2476 : oid *poss = Tloc(dcatalog, 0);
1662 179114 : BATloop(dcatalog, p, q) {
1663 176638 : oid pos = poss[p];
1664 :
1665 176638 : if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid)
1666 0 : continue;
1667 :
1668 176638 : if (lids[pos] >= 0) {
1669 176638 : bat bid = bids[pos];
1670 176638 : BAT *lb = BBP_desc(bid);
1671 :
1672 176638 : if (lb->batCacheid == 0 || BATmode(lb, true /*transient */ ) != GDK_SUCCEED) {
1673 0 : GDKwarning("Failed to set bat(%d) transient\n", bid);
1674 : } else {
1675 176638 : lids[pos] = -1; /* mark as transient */
1676 176638 : r[rcnt++] = bid;
1677 : }
1678 : }
1679 : }
1680 :
1681 2476 : int *oids = (int *) Tloc(catalog_id, 0);
1682 2476 : q = BATcount(catalog_bid);
1683 868957 : for (p = 0; p < q && !err; p++) {
1684 866481 : bat col = bids[p];
1685 866481 : int nid = oids[p];
1686 866481 : lng lid = lids[p];
1687 866481 : lng cnt = cnts[p];
1688 866481 : oid pos = p;
1689 :
1690 : /* only project out the deleted with lid == -1
1691 : * update dcatalog */
1692 866481 : if (lid == -1)
1693 176638 : continue; /* remove */
1694 :
1695 1379686 : if (BUNappend(nbids, &col, true) != GDK_SUCCEED ||
1696 1379686 : BUNappend(noids, &nid, true) != GDK_SUCCEED ||
1697 1379686 : BUNappend(nlids, &lid, false) != GDK_SUCCEED ||
1698 689843 : BUNappend(ncnts, &cnt, false) != GDK_SUCCEED)
1699 : err = 1;
1700 689843 : if (BUNfnd(lg->dcatalog, &pos) != BUN_NONE) {
1701 0 : pos = (oid) (BATcount(nbids) - 1);
1702 0 : if (BUNappend(ndels, &pos, true) != GDK_SUCCEED)
1703 689843 : err = 1;
1704 : }
1705 : }
1706 :
1707 2476 : if (err) {
1708 0 : logbat_destroy(nbids);
1709 0 : logbat_destroy(noids);
1710 0 : logbat_destroy(ndels);
1711 0 : logbat_destroy(ncnts);
1712 0 : logbat_destroy(nlids);
1713 0 : return 0;
1714 : }
1715 : /* point of no return */
1716 4952 : if (log_switch_bat(catalog_bid, nbids, lg->fn, "catalog_bid") != GDK_SUCCEED ||
1717 4952 : log_switch_bat(catalog_id, noids, lg->fn, "catalog_id") != GDK_SUCCEED ||
1718 2476 : log_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != GDK_SUCCEED) {
1719 0 : logbat_destroy(nbids);
1720 0 : logbat_destroy(noids);
1721 0 : logbat_destroy(ndels);
1722 0 : logbat_destroy(ncnts);
1723 0 : logbat_destroy(nlids);
1724 0 : return -1;
1725 : }
1726 2476 : r[rcnt++] = lg->catalog_bid->batCacheid;
1727 2476 : r[rcnt++] = lg->catalog_id->batCacheid;
1728 2476 : r[rcnt++] = lg->dcatalog->batCacheid;
1729 :
1730 2476 : assert(BATcount(lg->dcatalog) - cleanup == BATcount(ndels));
1731 :
1732 2476 : logbat_destroy(lg->catalog_bid);
1733 2476 : logbat_destroy(lg->catalog_id);
1734 2476 : logbat_destroy(lg->dcatalog);
1735 :
1736 2476 : lg->catalog_bid = nbids;
1737 2476 : lg->catalog_id = noids;
1738 2476 : lg->dcatalog = ndels;
1739 :
1740 : /* failing to rename these two bats is not fatal */
1741 2476 : if (BBPrename(lg->catalog_cnt, NULL) != GDK_SUCCEED)
1742 2476 : GDKclrerr();
1743 2476 : if (BBPrename(lg->catalog_lid, NULL) != GDK_SUCCEED)
1744 2476 : GDKclrerr();
1745 2476 : BBPunfix(lg->catalog_cnt->batCacheid);
1746 2476 : BBPunfix(lg->catalog_lid->batCacheid);
1747 :
1748 2476 : lg->catalog_cnt = ncnts;
1749 2476 : lg->catalog_lid = nlids;
1750 2476 : char bak[FILENAME_MAX];
1751 2476 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_cnt", NULL);
1752 2476 : if (BBPrename(lg->catalog_cnt, bak) < 0)
1753 0 : GDKclrerr();
1754 2476 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_lid", NULL);
1755 2476 : if (BBPrename(lg->catalog_lid, bak) < 0)
1756 0 : GDKclrerr();
1757 2476 : rotation_lock(lg);
1758 10409 : for (logged_range *p = lg->pending; p; p = p->next) {
1759 7933 : p->cnt -= cleanup;
1760 : }
1761 2476 : rotation_unlock(lg);
1762 2476 : return rcnt;
1763 : }
1764 :
1765 : /* this function is called with log_lock() held; it releases the lock
1766 : * before returning */
1767 : static gdk_return
1768 12757 : bm_subcommit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1769 : {
1770 12757 : BUN cnt = pending ? pending->cnt : BATcount(lg->catalog_bid);
1771 12757 : BUN dcnt = BATcount(lg->dcatalog);
1772 12757 : BUN p, q;
1773 12757 : BAT *catalog_bid = lg->catalog_bid;
1774 12757 : BAT *catalog_id = lg->catalog_id;
1775 12757 : BAT *dcatalog = lg->dcatalog;
1776 12757 : BUN nn = 13 + cnt;
1777 12757 : bat *n = GDKmalloc(sizeof(bat) * nn);
1778 12757 : bat *r = GDKmalloc(sizeof(bat) * nn);
1779 12757 : BUN *sizes = GDKmalloc(sizeof(BUN) * nn);
1780 12757 : int i = 0, rcnt = 0;
1781 12757 : gdk_return res;
1782 12757 : const log_bid *bids;
1783 12757 : lng *cnts = NULL, *lids = NULL;
1784 12757 : BUN cleanup = 0;
1785 12757 : lng t0 = 0;
1786 :
1787 12757 : if (n == NULL || r == NULL || sizes == NULL) {
1788 0 : GDKfree(n);
1789 0 : GDKfree(r);
1790 0 : GDKfree(sizes);
1791 0 : log_unlock(lg);
1792 0 : return GDK_FAIL;
1793 : }
1794 :
1795 12757 : sizes[i] = 0;
1796 12757 : n[i++] = 0; /* n[0] is not used */
1797 12757 : bids = (const log_bid *) Tloc(catalog_bid, 0);
1798 12757 : if (lg->catalog_cnt)
1799 12531 : cnts = (lng *) Tloc(lg->catalog_cnt, 0);
1800 12757 : if (lg->catalog_lid)
1801 12531 : lids = (lng *) Tloc(lg->catalog_lid, 0);
1802 3715719 : BATloop(catalog_bid, p, q) {
1803 3702962 : if (lids && lids[p] != lng_nil && lids[p] <= lg->saved_tid) {
1804 176638 : cleanup++;
1805 176638 : if (lids[p] == -1)
1806 0 : continue;
1807 353276 : if (BUNfnd(dcatalog, &(oid){p}) == BUN_NONE &&
1808 176638 : BUNappend(dcatalog, &(oid){p}, true) != GDK_SUCCEED) {
1809 0 : while (BATcount(dcatalog) > dcnt) {
1810 0 : if (BUNdelete(dcatalog, BATcount(dcatalog) - 1) != GDK_SUCCEED) {
1811 0 : TRC_CRITICAL(WAL, "delete after failed append failed\n");
1812 0 : break;
1813 : }
1814 : }
1815 0 : GDKfree(n);
1816 0 : GDKfree(r);
1817 0 : GDKfree(sizes);
1818 0 : log_unlock(lg);
1819 0 : return GDK_FAIL;
1820 : }
1821 : }
1822 3702962 : if (updated && p < maxupdated && (updated[p / 32] & (1U << (p % 32))) == 0) {
1823 1833200 : continue;
1824 : }
1825 1869762 : bat col = bids[p];
1826 :
1827 1869762 : TRC_DEBUG(WAL, "new %s (%d)\n", BBP_logical(col), col);
1828 1869762 : assert(col);
1829 1869762 : sizes[i] = cnts ? (BUN) cnts[p] : 0;
1830 1869762 : n[i++] = col;
1831 : }
1832 : /* now commit catalog, so it's also up to date on disk */
1833 12757 : sizes[i] = cnt;
1834 12757 : n[i++] = catalog_bid->batCacheid;
1835 12757 : sizes[i] = cnt;
1836 12757 : n[i++] = catalog_id->batCacheid;
1837 12757 : sizes[i] = BATcount(dcatalog);
1838 12757 : n[i++] = dcatalog->batCacheid;
1839 :
1840 12757 : if (cleanup) {
1841 2476 : if ((rcnt = cleanup_and_swap(lg, r, bids, lids, cnts,
1842 : catalog_bid, catalog_id, dcatalog,
1843 : cleanup)) < 0) {
1844 0 : GDKfree(n);
1845 0 : GDKfree(r);
1846 0 : GDKfree(sizes);
1847 0 : log_unlock(lg);
1848 0 : return GDK_FAIL;
1849 : }
1850 2476 : cnt -= cleanup;
1851 : }
1852 12757 : if (dcatalog != lg->dcatalog) {
1853 2476 : i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, cnt);
1854 2476 : i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, cnt);
1855 2476 : i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog));
1856 : }
1857 12757 : if (lg->seqs_id) {
1858 12531 : sizes[i] = BATcount(lg->seqs_id);
1859 12531 : n[i++] = lg->seqs_id->batCacheid;
1860 12531 : sizes[i] = BATcount(lg->seqs_id);
1861 12531 : n[i++] = lg->seqs_val->batCacheid;
1862 : }
1863 12757 : if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id) / 2) && BATcount(lg->dseqs) > 10) {
1864 352 : BAT *tids, *ids, *vals;
1865 :
1866 352 : tids = bm_tids(lg->seqs_id, lg->dseqs);
1867 352 : if (tids == NULL) {
1868 0 : GDKfree(n);
1869 0 : GDKfree(r);
1870 0 : GDKfree(sizes);
1871 0 : log_unlock(lg);
1872 0 : return GDK_FAIL;
1873 : }
1874 352 : ids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
1875 352 : vals = logbat_new(TYPE_lng, BATcount(tids), PERSISTENT);
1876 :
1877 352 : if (ids == NULL || vals == NULL) {
1878 0 : logbat_destroy(tids);
1879 0 : logbat_destroy(ids);
1880 0 : logbat_destroy(vals);
1881 0 : GDKfree(n);
1882 0 : GDKfree(r);
1883 0 : GDKfree(sizes);
1884 0 : log_unlock(lg);
1885 0 : return GDK_FAIL;
1886 : }
1887 :
1888 704 : if (BATappend(ids, lg->seqs_id, tids, true) != GDK_SUCCEED ||
1889 352 : BATappend(vals, lg->seqs_val, tids, true) != GDK_SUCCEED) {
1890 0 : logbat_destroy(tids);
1891 0 : logbat_destroy(ids);
1892 0 : logbat_destroy(vals);
1893 0 : GDKfree(n);
1894 0 : GDKfree(r);
1895 0 : GDKfree(sizes);
1896 0 : log_unlock(lg);
1897 0 : return GDK_FAIL;
1898 : }
1899 352 : logbat_destroy(tids);
1900 352 : BATclear(lg->dseqs, true);
1901 :
1902 704 : if (log_switch_bat(lg->seqs_id, ids, lg->fn, "seqs_id") != GDK_SUCCEED ||
1903 352 : log_switch_bat(lg->seqs_val, vals, lg->fn, "seqs_val") != GDK_SUCCEED) {
1904 0 : logbat_destroy(ids);
1905 0 : logbat_destroy(vals);
1906 0 : GDKfree(n);
1907 0 : GDKfree(r);
1908 0 : GDKfree(sizes);
1909 0 : log_unlock(lg);
1910 0 : return GDK_FAIL;
1911 : }
1912 352 : i = subcommit_list_add(i, n, sizes, ids->batCacheid, BATcount(ids));
1913 352 : i = subcommit_list_add(i, n, sizes, vals->batCacheid, BATcount(ids));
1914 :
1915 352 : if (BBP_lrefs(lg->seqs_id->batCacheid) > 0)
1916 281 : r[rcnt++] = lg->seqs_id->batCacheid;
1917 352 : if (BBP_lrefs(lg->seqs_val->batCacheid) > 0)
1918 281 : r[rcnt++] = lg->seqs_val->batCacheid;
1919 :
1920 352 : logbat_destroy(lg->seqs_id);
1921 352 : logbat_destroy(lg->seqs_val);
1922 :
1923 352 : lg->seqs_id = ids;
1924 352 : lg->seqs_val = vals;
1925 : }
1926 12757 : if (lg->seqs_id) {
1927 12531 : sizes[i] = BATcount(lg->dseqs);
1928 12531 : n[i++] = lg->dseqs->batCacheid;
1929 : }
1930 :
1931 12757 : assert((BUN) i <= nn);
1932 12757 : log_unlock(lg);
1933 12757 : TRC_DEBUG_IF(WAL)
1934 0 : t0 = GDKusec();
1935 12983 : res = TMsubcommit_list(n, cnts ? sizes : NULL, i, lg->saved_id);
1936 12757 : TRC_DEBUG(WAL, "subcommit " LLFMT "usec\n", GDKusec() - t0);
1937 12757 : if (res == GDK_SUCCEED) { /* now cleanup */
1938 197385 : for (i = 0; i < rcnt; i++) {
1939 184628 : TRC_DEBUG_IF(WAL) {
1940 0 : TRC_DEBUG_ENDIF(WAL, "release %d\n", r[i]);
1941 0 : if (BBP_lrefs(r[i]) != 2)
1942 0 : TRC_DEBUG_ENDIF(WAL, "release %d %d\n", r[i], BBP_lrefs(r[i]));
1943 : }
1944 184628 : BBPrelease(r[i]);
1945 : }
1946 : }
1947 12757 : GDKfree(n);
1948 12757 : GDKfree(r);
1949 12757 : GDKfree(sizes);
1950 12757 : if (res != GDK_SUCCEED)
1951 0 : TRC_CRITICAL(GDK, "commit failed\n");
1952 : return res;
1953 : }
1954 :
1955 : static gdk_return
1956 330 : log_filename(logger *lg, char bak[FILENAME_MAX], char filename[FILENAME_MAX])
1957 : {
1958 330 : str filenamestr = NULL;
1959 :
1960 330 : if ((filenamestr = GDKfilepath(0, lg->dir, LOGFILE, NULL)) == NULL)
1961 : return GDK_FAIL;
1962 330 : size_t len = strcpy_len(filename, filenamestr, FILENAME_MAX);
1963 330 : GDKfree(filenamestr);
1964 330 : if (len >= FILENAME_MAX) {
1965 0 : GDKerror("Logger filename path is too large\n");
1966 0 : return GDK_FAIL;
1967 : }
1968 330 : if (bak) {
1969 330 : len = strconcat_len(bak, FILENAME_MAX, filename, ".bak", NULL);
1970 330 : if (len >= FILENAME_MAX) {
1971 0 : GDKerror("Logger filename path is too large\n");
1972 0 : return GDK_FAIL;
1973 : }
1974 : }
1975 : return GDK_SUCCEED;
1976 : }
1977 :
1978 : static gdk_return
1979 12379 : log_cleanup(logger *lg, lng id)
1980 : {
1981 12379 : char log_id[FILENAME_MAX];
1982 :
1983 12379 : if (snprintf(log_id, sizeof(log_id), LLFMT, id) >= FILENAME_MAX) {
1984 0 : GDKerror("log_id filename is too large\n");
1985 0 : return GDK_FAIL;
1986 : }
1987 12379 : if (GDKunlink(0, lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
1988 0 : GDKwarning("failed to remove old WAL %s.%s\n", LOGFILE, log_id);
1989 0 : GDKclrerr(); /* clear error from unlink */
1990 : }
1991 : return GDK_SUCCEED;
1992 : }
1993 :
1994 : #ifdef GDKLIBRARY_JSON
1995 : static gdk_return
1996 331 : log_json_upgrade_finalize(void)
1997 : {
1998 331 : int json_tpe = ATOMindex("json");
1999 661 : if (!GDKinmemory(0) &&
2000 330 : GDKunlink(0, BATDIR, "jsonupgradeneeded", NULL) == GDK_FAIL) {
2001 0 : TRC_CRITICAL(GDK, "Failed to remove json upgrade signal file");
2002 0 : return GDK_FAIL;
2003 : }
2004 331 : BATatoms[json_tpe].atomRead = (void *(*)(void *, size_t *, stream *, size_t))strRead;
2005 :
2006 331 : return GDK_SUCCEED;
2007 : }
2008 : #endif
2009 :
2010 : /* Load data from the logger logdir
2011 : * Initialize new directories and catalog files if none are present,
2012 : * unless running in read-only mode
2013 : * Load data and persist it in the BATs */
2014 : static gdk_return
2015 331 : log_load(const char *fn, logger *lg, char filename[FILENAME_MAX])
2016 : {
2017 331 : FILE *fp = NULL;
2018 331 : char bak[FILENAME_MAX];
2019 331 : bat catalog_bid, catalog_id, dcatalog;
2020 331 : bool needcommit = false;
2021 331 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
2022 331 : bool readlogs = false;
2023 331 : bool needsnew = false; /* need to write new log file? */
2024 :
2025 : /* refactor */
2026 331 : if (!LOG_DISABLED(lg)) {
2027 330 : if (log_filename(lg, bak, filename) != GDK_SUCCEED)
2028 0 : goto error;
2029 : }
2030 :
2031 331 : lg->catalog_bid = NULL;
2032 331 : lg->catalog_id = NULL;
2033 331 : lg->catalog_cnt = NULL;
2034 331 : lg->catalog_lid = NULL;
2035 331 : lg->dcatalog = NULL;
2036 :
2037 331 : lg->seqs_id = NULL;
2038 331 : lg->seqs_val = NULL;
2039 331 : lg->dseqs = NULL;
2040 :
2041 331 : if (!LOG_DISABLED(lg)) {
2042 : /* try to open logfile backup, or failing that, the file
2043 : * itself. we need to know whether this file exists when
2044 : * checking the database consistency later on */
2045 330 : if ((fp = MT_fopen(bak, "r")) != NULL) {
2046 0 : fclose(fp);
2047 0 : fp = NULL;
2048 0 : if (GDKunlink(0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED ||
2049 0 : GDKmove(0, lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL, true) != GDK_SUCCEED)
2050 0 : goto error;
2051 330 : } else if (errno != ENOENT) {
2052 0 : GDKsyserror("open %s failed", bak);
2053 0 : goto error;
2054 : }
2055 330 : fp = MT_fopen(filename, "r");
2056 330 : if (fp == NULL && errno != ENOENT) {
2057 0 : GDKsyserror("open %s failed", filename);
2058 0 : goto error;
2059 : }
2060 : }
2061 :
2062 331 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2063 331 : catalog_bid = BBPindex(bak);
2064 :
2065 : /* initialize arrays for type mapping, to be read from disk */
2066 331 : memset(lg->type_id, -1, sizeof(lg->type_id));
2067 331 : memset(lg->type_nr, 255, sizeof(lg->type_nr));
2068 :
2069 : /* this is intentional - if catalog_bid is 0, force it to find
2070 : * the persistent catalog */
2071 331 : if (catalog_bid == 0) {
2072 : /* catalog does not exist, so the log file also
2073 : * shouldn't exist */
2074 226 : if (fp != NULL) {
2075 0 : GDKerror("there is no logger catalog, "
2076 : "but there is a log file.\n");
2077 0 : goto error;
2078 : }
2079 :
2080 226 : lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2081 226 : lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2082 226 : lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
2083 :
2084 226 : if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
2085 0 : GDKerror("cannot create catalog bats");
2086 0 : goto error;
2087 : }
2088 226 : TRC_INFO(WAL, "create %s catalog\n", fn);
2089 :
2090 : /* give the catalog bats names so we can find them
2091 : * next time */
2092 226 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2093 226 : if (BBPrename(lg->catalog_bid, bak) < 0) {
2094 0 : goto error;
2095 : }
2096 :
2097 226 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2098 226 : if (BBPrename(lg->catalog_id, bak) < 0) {
2099 0 : goto error;
2100 : }
2101 :
2102 226 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2103 226 : if (BBPrename(lg->dcatalog, bak) < 0) {
2104 0 : goto error;
2105 : }
2106 :
2107 226 : if (!LOG_DISABLED(lg)) {
2108 225 : if (GDKcreatedir(filename) != GDK_SUCCEED) {
2109 0 : GDKerror("cannot create directory for log file %s\n", filename);
2110 0 : goto error;
2111 : }
2112 225 : if (log_create_types_file(lg, filename) != GDK_SUCCEED)
2113 0 : goto error;
2114 : }
2115 :
2116 226 : BBPretain(lg->catalog_bid->batCacheid);
2117 226 : BBPretain(lg->catalog_id->batCacheid);
2118 226 : BBPretain(lg->dcatalog->batCacheid);
2119 :
2120 226 : log_lock(lg);
2121 : /* bm_subcommit releases the lock */
2122 226 : if (bm_subcommit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2123 : /* cannot commit catalog, so remove log */
2124 0 : if (MT_remove(filename) < 0)
2125 0 : GDKsyserror("remove %s failed\n", filename);
2126 0 : BBPrelease(lg->catalog_bid->batCacheid);
2127 0 : BBPrelease(lg->catalog_id->batCacheid);
2128 0 : BBPrelease(lg->dcatalog->batCacheid);
2129 0 : goto error;
2130 : }
2131 : } else {
2132 : /* find the persistent catalog. As non persistent bats
2133 : * require a logical reference we also add a logical
2134 : * reference for the persistent bats */
2135 105 : BUN p, q;
2136 105 : BAT *b, *o, *d;
2137 :
2138 105 : assert(!lg->inmemory);
2139 :
2140 : /* the catalog exists, and so should the log file */
2141 105 : if (fp == NULL && !LOG_DISABLED(lg)) {
2142 0 : GDKerror("There is a logger catalog, but no log file.\n");
2143 0 : goto error;
2144 : }
2145 : if (fp != NULL) {
2146 : /* check_version always closes fp */
2147 105 : if (check_version(lg, fp, &needsnew) != GDK_SUCCEED) {
2148 0 : fp = NULL;
2149 0 : goto error;
2150 : }
2151 : readlogs = true;
2152 : fp = NULL;
2153 : }
2154 :
2155 105 : if (lg->catalog_bid == NULL && lg->catalog_id == NULL && lg->dcatalog == NULL) {
2156 105 : b = BATdescriptor(catalog_bid);
2157 105 : if (b == NULL) {
2158 0 : GDKerror("inconsistent database, catalog does not exist");
2159 0 : goto error;
2160 : }
2161 :
2162 105 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2163 105 : catalog_id = BBPindex(bak);
2164 105 : o = BATdescriptor(catalog_id);
2165 105 : if (o == NULL) {
2166 0 : BBPunfix(b->batCacheid);
2167 0 : GDKerror("inconsistent database, catalog_id does not exist");
2168 0 : goto error;
2169 : }
2170 :
2171 105 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2172 105 : dcatalog = BBPindex(bak);
2173 105 : d = BATdescriptor(dcatalog);
2174 105 : if (d == NULL) {
2175 0 : GDKerror("cannot create dcatalog bat");
2176 0 : BBPunfix(b->batCacheid);
2177 0 : BBPunfix(o->batCacheid);
2178 0 : goto error;
2179 : }
2180 :
2181 105 : lg->catalog_bid = b;
2182 105 : lg->catalog_id = o;
2183 105 : lg->dcatalog = d;
2184 105 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
2185 27376 : BATloop(lg->catalog_bid, p, q) {
2186 27271 : bat bid = bids[p];
2187 27271 : oid pos = p;
2188 :
2189 27271 : if (BBPretain(bid) == 0 && /* any bid in the catalog_bid, needs one logical ref */
2190 0 : BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
2191 0 : BUNappend(lg->dcatalog, &pos, true) != GDK_SUCCEED)
2192 0 : goto error;
2193 : }
2194 : }
2195 105 : if ((lg->catalog_bid = BATsetaccess(lg->catalog_bid, BAT_READ)) == NULL ||
2196 105 : (lg->catalog_id = BATsetaccess(lg->catalog_id, BAT_READ)) == NULL ||
2197 105 : (lg->dcatalog = BATsetaccess(lg->dcatalog, BAT_READ)) == NULL) {
2198 0 : goto error;
2199 : }
2200 105 : BBPretain(lg->catalog_bid->batCacheid);
2201 105 : BBPretain(lg->catalog_id->batCacheid);
2202 105 : BBPretain(lg->dcatalog->batCacheid);
2203 : }
2204 : /* failing to rename the catalog_cnt and catalog_lid bats is not
2205 : * fatal */
2206 331 : lg->catalog_cnt = logbat_new(TYPE_lng, 1, SYSTRANS);
2207 331 : if (lg->catalog_cnt == NULL) {
2208 0 : GDKerror("failed to create catalog_cnt bat");
2209 0 : goto error;
2210 : }
2211 331 : strconcat_len(bak, sizeof(bak), fn, "_catalog_cnt", NULL);
2212 331 : if (BBPrename(lg->catalog_cnt, bak) < 0)
2213 0 : GDKclrerr();
2214 331 : lg->catalog_lid = logbat_new(TYPE_lng, 1, SYSTRANS);
2215 331 : if (lg->catalog_lid == NULL) {
2216 0 : GDKerror("failed to create catalog_lid bat");
2217 0 : goto error;
2218 : }
2219 331 : strconcat_len(bak, sizeof(bak), fn, "_catalog_lid", NULL);
2220 331 : if (BBPrename(lg->catalog_lid, bak) < 0)
2221 0 : GDKclrerr();
2222 331 : if (bm_get_counts(lg) != GDK_SUCCEED)
2223 0 : goto error;
2224 :
2225 331 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2226 331 : if (BBPindex(bak)) {
2227 105 : lg->seqs_id = BATdescriptor(BBPindex(bak));
2228 105 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2229 105 : lg->seqs_val = BATdescriptor(BBPindex(bak));
2230 105 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2231 105 : lg->dseqs = BATdescriptor(BBPindex(bak));
2232 105 : if (lg->seqs_id == NULL ||
2233 105 : lg->seqs_val == NULL ||
2234 : lg->dseqs == NULL) {
2235 0 : GDKerror("Logger_new: cannot load seqs bats");
2236 0 : goto error;
2237 : }
2238 105 : if ((lg->seqs_val = BATsetaccess(lg->seqs_val, BAT_READ)) == NULL ||
2239 105 : (lg->seqs_id = BATsetaccess(lg->seqs_id, BAT_READ)) == NULL ||
2240 105 : (lg->dseqs = BATsetaccess(lg->dseqs, BAT_READ)) == NULL) {
2241 0 : goto error;
2242 : }
2243 : } else {
2244 226 : lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
2245 226 : lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
2246 226 : lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
2247 226 : if (lg->seqs_id == NULL ||
2248 226 : lg->seqs_val == NULL ||
2249 : lg->dseqs == NULL) {
2250 0 : GDKerror("Logger_new: cannot create seqs bats");
2251 0 : goto error;
2252 : }
2253 :
2254 226 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2255 226 : if (BBPrename(lg->seqs_id, bak) < 0) {
2256 0 : goto error;
2257 : }
2258 :
2259 226 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2260 226 : if (BBPrename(lg->seqs_val, bak) < 0) {
2261 0 : goto error;
2262 : }
2263 :
2264 226 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2265 226 : if (BBPrename(lg->dseqs, bak) < 0) {
2266 0 : goto error;
2267 : }
2268 : needcommit = true;
2269 : }
2270 331 : dbg = ATOMIC_GET(&GDKdebug);
2271 331 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2272 331 : if (needcommit && bm_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2273 0 : GDKerror("Logger_new: commit failed");
2274 0 : goto error;
2275 : }
2276 331 : ATOMIC_SET(&GDKdebug, dbg);
2277 :
2278 331 : if (readlogs) {
2279 105 : ulng log_id = lg->saved_id + 1;
2280 105 : if (log_readlogs(lg, filename) != GDK_SUCCEED) {
2281 0 : goto error;
2282 : }
2283 105 : if (GDKgetenv_isyes("process-wal-and-exit")) {
2284 0 : printf("# mserver5 exiting\n");
2285 0 : exit(0);
2286 : }
2287 105 : if (lg->postfuncp && (*lg->postfuncp) (lg->funcdata, lg) != GDK_SUCCEED)
2288 0 : goto error;
2289 105 : if (needsnew) {
2290 8 : if (GDKmove(0, lg->dir, LOGFILE, NULL, lg->dir, LOGFILE, "bak", true) != GDK_SUCCEED) {
2291 0 : TRC_CRITICAL(GDK, "couldn't move log to log.bak\n");
2292 0 : return GDK_FAIL;
2293 : }
2294 8 : if (log_create_types_file(lg, filename) != GDK_SUCCEED) {
2295 0 : TRC_CRITICAL(GDK, "couldn't write new log\n");
2296 0 : return GDK_FAIL;
2297 : }
2298 : }
2299 105 : dbg = ATOMIC_GET(&GDKdebug);
2300 105 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2301 105 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2302 0 : goto error;
2303 : }
2304 105 : ATOMIC_SET(&GDKdebug, dbg);
2305 277 : for (; log_id <= lg->saved_id; log_id++)
2306 172 : (void) log_cleanup(lg, log_id); /* ignore error of removing file */
2307 113 : if (needsnew &&
2308 8 : GDKunlink(0, lg->dir, LOGFILE, "bak") != GDK_SUCCEED) {
2309 0 : TRC_CRITICAL(GDK, "couldn't remove old log.bak file\n");
2310 0 : return GDK_FAIL;
2311 : }
2312 : } else {
2313 226 : lg->id = lg->saved_id + 1;
2314 226 : if (GDKgetenv_isyes("process-wal-and-exit")) {
2315 0 : printf("# mserver5 exiting\n");
2316 0 : exit(0);
2317 : }
2318 : }
2319 : #ifdef GDKLIBRARY_JSON
2320 331 : if (log_json_upgrade_finalize() == GDK_FAIL)
2321 0 : goto error;
2322 : #endif
2323 : return GDK_SUCCEED;
2324 0 : error:
2325 0 : if (fp)
2326 0 : fclose(fp);
2327 0 : logbat_destroy(lg->catalog_bid);
2328 0 : logbat_destroy(lg->catalog_id);
2329 0 : logbat_destroy(lg->dcatalog);
2330 0 : logbat_destroy(lg->seqs_id);
2331 0 : logbat_destroy(lg->seqs_val);
2332 0 : logbat_destroy(lg->dseqs);
2333 0 : MT_lock_destroy(&lg->lock);
2334 0 : MT_lock_destroy(&lg->rotation_lock);
2335 0 : GDKfree(lg->fn);
2336 0 : GDKfree(lg->dir);
2337 0 : GDKfree(lg->rbuf);
2338 0 : GDKfree(lg->wbuf);
2339 0 : GDKfree(lg);
2340 0 : ATOMIC_SET(&GDKdebug, dbg);
2341 : /* We do not call log_json_upgrade_finalize here because we want
2342 : * the upgrade to run again next time we try, so we do not want
2343 : * to remove the signal file just yet.
2344 : */
2345 0 : return GDK_FAIL;
2346 : }
2347 :
2348 : /* Initialize a new logger
2349 : * It will load any data in the logdir and persist it in the BATs*/
2350 : static logger *
2351 331 : log_new(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp,
2352 : postversionfix_fptr postfuncp, void *funcdata)
2353 : {
2354 331 : logger *lg;
2355 331 : char filename[FILENAME_MAX];
2356 :
2357 331 : lng max_dropped = GDKgetenv_int("wal_max_dropped", 100000);
2358 331 : lng max_file_age = GDKgetenv_int("wal_max_file_age", 600);
2359 331 : lng max_file_size = 0;
2360 :
2361 331 : if (GDKdebug & TESTINGMASK) {
2362 : max_file_size = 2048; /* 2 KiB */
2363 : } else {
2364 12 : const char *max_file_size_str = GDKgetenv("wal_max_file_size");
2365 12 : max_file_size = max_file_size_str ? strtoul(max_file_size_str, NULL, 10) : 2147483648;
2366 : }
2367 :
2368 331 : if (!GDKinmemory(0) && MT_path_absolute(logdir)) {
2369 0 : TRC_CRITICAL(GDK, "logdir must be relative path\n");
2370 0 : return NULL;
2371 : }
2372 :
2373 331 : if (snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP) >= FILENAME_MAX) {
2374 0 : TRC_CRITICAL(GDK, "filename is too large\n");
2375 0 : return NULL;
2376 : }
2377 :
2378 331 : lg = GDKmalloc(sizeof(struct logger));
2379 331 : if (lg == NULL) {
2380 0 : TRC_CRITICAL(GDK, "allocating logger structure failed\n");
2381 0 : return NULL;
2382 : }
2383 :
2384 662 : *lg = (logger) {
2385 331 : .inmemory = GDKinmemory(0),
2386 : .debug = debug,
2387 : .version = version,
2388 : .prefuncp = prefuncp,
2389 : .postfuncp = postfuncp,
2390 : .funcdata = funcdata,
2391 :
2392 331 : .max_dropped = max_dropped >= 0 ? max_dropped : 100000,
2393 : .file_age = 0,
2394 331 : .max_file_age = max_file_age >= 0 ? max_file_age * 1000000 : 600000000,
2395 331 : .max_file_size = max_file_size >= 0 ? max_file_size : 2147483648,
2396 :
2397 : .id = 0,
2398 331 : .saved_id = getBBPlogno(), /* get saved log number from bbp */
2399 : .nr_flushers = ATOMIC_VAR_INIT(0),
2400 331 : .fn = GDKstrdup(fn),
2401 331 : .dir = GDKstrdup(filename),
2402 : .rbufsize = 64 * 1024,
2403 331 : .rbuf = GDKmalloc(64 * 1024),
2404 : .wbufsize = 64 * 1024,
2405 331 : .wbuf = GDKmalloc(64 * 1024),
2406 : };
2407 :
2408 : /* probably open file and check version first, then call call old logger code */
2409 331 : if (lg->fn == NULL ||
2410 331 : lg->dir == NULL ||
2411 331 : lg->rbuf == NULL ||
2412 : lg->wbuf == NULL) {
2413 0 : TRC_CRITICAL(GDK, "allocating for logger structure failed\n");
2414 0 : GDKfree(lg->fn);
2415 0 : GDKfree(lg->dir);
2416 0 : GDKfree(lg->rbuf);
2417 0 : GDKfree(lg->wbuf);
2418 0 : GDKfree(lg);
2419 0 : return NULL;
2420 : }
2421 331 : TRC_DEBUG(WAL, "dir set to %s\n", lg->dir);
2422 :
2423 331 : MT_lock_init(&lg->lock, fn);
2424 331 : MT_lock_init(&lg->rotation_lock, "rotation_lock");
2425 331 : MT_lock_init(&lg->flush_lock, "flush_lock");
2426 331 : MT_cond_init(&lg->excl_flush_cv);
2427 :
2428 331 : if (log_load(fn, lg, filename) == GDK_SUCCEED) {
2429 : return lg;
2430 : }
2431 : return NULL;
2432 : }
2433 :
2434 : static logged_range *
2435 68418 : do_flush_range_cleanup(logger *lg)
2436 : {
2437 68418 : logged_range *frange = lg->flush_ranges;
2438 68418 : logged_range *first = frange;
2439 :
2440 68418 : if (frange == NULL)
2441 : return NULL;
2442 80708 : while (frange->next) {
2443 12830 : if (ATOMIC_GET(&frange->refcount) > 1)
2444 : break;
2445 12290 : frange = frange->next;
2446 : }
2447 68418 : if (first == frange) {
2448 : return first;
2449 : }
2450 :
2451 12290 : logged_range *flast = frange;
2452 :
2453 12290 : lg->flush_ranges = flast;
2454 :
2455 24580 : for (frange = first; frange && frange != flast; frange = frange->next) {
2456 12290 : ATOMIC_DEC(&frange->refcount);
2457 12290 : if (!LOG_DISABLED(lg) && frange->output_log) {
2458 0 : TRC_INFO(WAL, "closing output log %s", mnstr_name(frange->output_log));
2459 0 : close_stream(frange->output_log);
2460 0 : frange->output_log = NULL;
2461 : }
2462 : }
2463 : return flast;
2464 : }
2465 :
2466 : void
2467 330 : log_destroy(logger *lg)
2468 : {
2469 330 : log_close_input(lg);
2470 330 : logged_range *last = do_flush_range_cleanup(lg);
2471 330 : (void) last;
2472 330 : assert(last == lg->current && last == lg->flush_ranges);
2473 330 : log_close_output(lg);
2474 743 : for (logged_range * p = lg->pending; p; p = lg->pending) {
2475 413 : lg->pending = p->next;
2476 413 : GDKfree(p);
2477 : }
2478 330 : if (LOG_DISABLED(lg)) {
2479 1 : lg->saved_id = lg->id;
2480 1 : lg->saved_tid = lg->tid;
2481 1 : log_commit(lg, NULL, NULL, 0);
2482 : }
2483 330 : if (lg->catalog_bid) {
2484 330 : log_lock(lg);
2485 330 : BUN p, q;
2486 330 : BAT *b = lg->catalog_bid;
2487 :
2488 : /* free resources */
2489 330 : const log_bid *bids = (const log_bid *) Tloc(b, 0);
2490 87125 : BATloop(b, p, q) {
2491 86795 : bat bid = bids[p];
2492 :
2493 86795 : BBPrelease(bid);
2494 : }
2495 :
2496 330 : BBPrelease(lg->catalog_bid->batCacheid);
2497 330 : BBPrelease(lg->catalog_id->batCacheid);
2498 330 : BBPrelease(lg->dcatalog->batCacheid);
2499 330 : logbat_destroy(lg->catalog_bid);
2500 330 : logbat_destroy(lg->catalog_id);
2501 330 : logbat_destroy(lg->dcatalog);
2502 :
2503 330 : logbat_destroy(lg->catalog_cnt);
2504 330 : logbat_destroy(lg->catalog_lid);
2505 330 : log_unlock(lg);
2506 : }
2507 330 : MT_lock_destroy(&lg->lock);
2508 330 : MT_lock_destroy(&lg->rotation_lock);
2509 330 : MT_lock_destroy(&lg->flush_lock);
2510 330 : GDKfree(lg->fn);
2511 330 : GDKfree(lg->dir);
2512 330 : GDKfree(lg->rbuf);
2513 330 : GDKfree(lg->wbuf);
2514 330 : GDKfree(lg);
2515 330 : }
2516 :
2517 : /* Create a new logger */
2518 : logger *
2519 331 : log_create(int debug, const char *fn, const char *logdir, int version,
2520 : preversionfix_fptr prefuncp, postversionfix_fptr postfuncp,
2521 : void *funcdata)
2522 : {
2523 331 : logger *lg;
2524 331 : TRC_INFO_IF(WAL) {
2525 0 : TRC_INFO_ENDIF(WAL, "Started processing logs %s/%s version %d\n", fn, logdir, version);
2526 0 : GDKtracer_flush_buffer();
2527 : }
2528 331 : lg = log_new(debug, fn, logdir, version, prefuncp, postfuncp, funcdata);
2529 331 : if (lg == NULL)
2530 : return NULL;
2531 331 : TRC_INFO_IF(WAL) {
2532 0 : TRC_INFO_ENDIF(WAL, "Finished processing logs %s/%s\n", fn, logdir);
2533 0 : GDKtracer_flush_buffer();
2534 : }
2535 331 : if (GDKsetenv("recovery", "finished") != GDK_SUCCEED) {
2536 0 : log_destroy(lg);
2537 0 : return NULL;
2538 : }
2539 331 : assert(lg->current == NULL);
2540 331 : logged_range dummy = {
2541 331 : .cnt = BATcount(lg->catalog_bid),
2542 : };
2543 331 : lg->current = &dummy;
2544 331 : if (log_open_output(lg) != GDK_SUCCEED) {
2545 0 : lg->current = NULL;
2546 0 : log_destroy(lg);
2547 0 : return NULL;
2548 : }
2549 331 : lg->current = lg->current->next;
2550 331 : assert(lg->pending == NULL && lg->flush_ranges == NULL);
2551 331 : lg->pending = lg->current;
2552 331 : lg->flush_ranges = lg->current;
2553 331 : return lg;
2554 : }
2555 :
2556 : static logged_range *
2557 7896 : log_next_logfile(logger *lg, ulng ts)
2558 : {
2559 7896 : int m = (ATOMIC_GET(&GDKdebug) & TESTINGMASK) ? 1000 : 100;
2560 7896 : if (!lg->pending || !lg->pending->next)
2561 : return NULL;
2562 7896 : rotation_lock(lg);
2563 7896 : if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && lg->pending != lg->flush_ranges &&
2564 7896 : (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) ATOMIC_GET(&lg->pending->flushed_ts) &&
2565 7896 : (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) {
2566 6623 : rotation_unlock(lg);
2567 6623 : logged_range *p = lg->pending;
2568 6623 : for (int i = 1;
2569 12207 : i < m && ATOMIC_GET(&p->refcount) == 0 && p->next && p->next != lg->current &&
2570 5610 : p->next != lg->flush_ranges && (ulng) ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts)
2571 17814 : && (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++)
2572 5584 : p = p->next;
2573 6623 : return p;
2574 : }
2575 1273 : rotation_unlock(lg);
2576 1273 : return NULL;
2577 : }
2578 :
2579 : static void
2580 6623 : log_cleanup_range(logger *lg, ulng id)
2581 : {
2582 6623 : rotation_lock(lg);
2583 18830 : while (lg->pending && lg->pending->id <= id) {
2584 12207 : logged_range *p;
2585 12207 : p = lg->pending;
2586 12207 : if (p)
2587 12207 : lg->pending = p->next;
2588 12207 : GDKfree(p);
2589 : }
2590 6623 : rotation_unlock(lg);
2591 6623 : }
2592 :
2593 : static void
2594 68091 : do_rotate(logger *lg)
2595 : {
2596 68091 : logged_range *cur = lg->current;
2597 68091 : logged_range *next = cur->next;
2598 68091 : if (next) {
2599 12290 : assert(ATOMIC_GET(&next->refcount) == 1);
2600 12290 : lg->current = next;
2601 12290 : if (!LOG_DISABLED(lg) && ATOMIC_GET(&cur->refcount) == 1 && cur->output_log) {
2602 12081 : close_stream(cur->output_log);
2603 12081 : cur->output_log = NULL;
2604 : }
2605 : }
2606 68091 : }
2607 :
2608 : gdk_return
2609 10329 : log_activate(logger *lg)
2610 : {
2611 10329 : bool flush_cleanup = false;
2612 10329 : gdk_return res = GDK_SUCCEED;
2613 :
2614 10329 : rotation_lock(lg);
2615 10329 : const lng current_file_size = LOG_DISABLED(lg) ? 0 : (lng) getfilepos(getFile(lg->current->output_log));
2616 :
2617 10329 : if (current_file_size == -1) {
2618 0 : rotation_unlock(lg);
2619 0 : return GDK_FAIL;
2620 : }
2621 : /* file size of 2 means only endian indicator present
2622 : * (i.e. effectively empty) */
2623 10329 : if (current_file_size <= 2) {
2624 6975 : rotation_unlock(lg);
2625 6975 : return GDK_SUCCEED;
2626 : }
2627 :
2628 3354 : if (!lg->flushnow &&
2629 3354 : !lg->current->next &&
2630 3354 : current_file_size > 2 &&
2631 3354 : (ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped ||
2632 3346 : current_file_size > lg->max_file_size ||
2633 3316 : (GDKusec() - lg->file_age) > lg->max_file_age) &&
2634 38 : (ulng) ATOMIC_GET(&lg->current->last_ts) > 0 &&
2635 38 : lg->saved_id + 1 == lg->id &&
2636 38 : ATOMIC_GET(&lg->current->refcount) == 1 /* no pending work on this file */ ) {
2637 37 : lg->id++;
2638 : /* start new file */
2639 37 : res = log_open_output(lg);
2640 37 : flush_cleanup = true;
2641 37 : do_rotate(lg);
2642 : }
2643 37 : if (flush_cleanup)
2644 37 : (void) do_flush_range_cleanup(lg);
2645 3354 : rotation_unlock(lg);
2646 3354 : return res;
2647 : }
2648 :
2649 : gdk_return
2650 7896 : log_flush(logger *lg, ulng ts)
2651 : {
2652 7896 : logged_range *pending = log_next_logfile(lg, ts);
2653 7896 : ulng lid = pending ? pending->id : 0, olid = lg->saved_id;
2654 7896 : if (LOG_DISABLED(lg)) {
2655 0 : lg->saved_id = lid;
2656 0 : lg->saved_tid = lg->tid;
2657 0 : if (lid)
2658 0 : log_cleanup_range(lg, lg->saved_id);
2659 0 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED)
2660 0 : TRC_ERROR(GDK, "failed to commit");
2661 0 : return GDK_SUCCEED;
2662 : }
2663 7896 : if (lg->saved_id >= lid)
2664 : return GDK_SUCCEED;
2665 6623 : rotation_lock(lg);
2666 6623 : ulng lgid = lg->id;
2667 6623 : rotation_unlock(lg);
2668 6623 : if (lg->saved_id + 1 >= lgid) /* logger should first release the file */
2669 : return GDK_SUCCEED;
2670 6623 : log_return res = LOG_OK;
2671 6623 : ulng cid = olid;
2672 6623 : assert(lid <= lgid);
2673 : uint32_t *updated = NULL;
2674 : BUN nupdated = 0;
2675 : size_t allocated = 0;
2676 18830 : while (cid < lid && res == LOG_OK) {
2677 12207 : if (!lg->input_log) {
2678 12207 : char *filename;
2679 12207 : char id[32];
2680 12207 : if (snprintf(id, sizeof(id), LLFMT, cid + 1) >= (int) sizeof(id)) {
2681 0 : GDKfree(updated);
2682 0 : TRC_CRITICAL(GDK, "log_id filename is too large\n");
2683 0 : return GDK_FAIL;
2684 : }
2685 12207 : if ((filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id)) == NULL) {
2686 0 : GDKfree(updated);
2687 0 : return GDK_FAIL;
2688 : }
2689 12207 : if (strlen(filename) >= FILENAME_MAX) {
2690 0 : GDKfree(updated);
2691 0 : TRC_CRITICAL(GDK, "Logger filename path is too large\n");
2692 0 : GDKfree(filename);
2693 0 : return GDK_FAIL;
2694 : }
2695 :
2696 12207 : bool filemissing = false;
2697 12207 : if (log_open_input(lg, filename, &filemissing) != GDK_SUCCEED) {
2698 0 : GDKfree(updated);
2699 0 : GDKfree(filename);
2700 0 : return GDK_FAIL;
2701 : }
2702 12207 : GDKfree(filename);
2703 : }
2704 : /* we read the full file because skipping is impossible with current log format */
2705 12207 : log_lock(lg);
2706 12207 : if (updated == NULL) {
2707 6623 : nupdated = BATcount(lg->catalog_id);
2708 6623 : allocated = ((nupdated + 31) & ~31) / 8;
2709 6623 : if (allocated == 0)
2710 0 : allocated = 4;
2711 6623 : updated = GDKzalloc(allocated);
2712 6623 : if (updated == NULL) {
2713 0 : log_unlock(lg);
2714 0 : return GDK_FAIL;
2715 : }
2716 5584 : } else if (nupdated < BATcount(lg->catalog_id)) {
2717 107 : BUN n = BATcount(lg->catalog_id);
2718 107 : size_t a = ((n + 31) & ~31) / 8;
2719 107 : if (a > allocated) {
2720 24 : uint32_t *p = GDKrealloc(updated, a);
2721 24 : if (p == NULL) {
2722 0 : GDKfree(updated);
2723 0 : log_unlock(lg);
2724 0 : return GDK_FAIL;
2725 : }
2726 24 : updated = p;
2727 24 : memset(updated + allocated / 4, 0, a - allocated);
2728 24 : allocated = a;
2729 : }
2730 : nupdated = n;
2731 : }
2732 12207 : lg->flushing = true;
2733 12207 : res = log_read_transaction(lg, updated, nupdated);
2734 12207 : lg->flushing = false;
2735 12207 : log_unlock(lg);
2736 12207 : if (res == LOG_EOF) {
2737 12207 : log_close_input(lg);
2738 12207 : res = LOG_OK;
2739 : }
2740 12207 : cid++;
2741 : }
2742 6623 : if (lid > olid && res == LOG_OK) {
2743 6623 : rotation_lock(lg); /* protect against concurrent log_tflush rotate check */
2744 6623 : lg->saved_id = lid;
2745 6623 : rotation_unlock(lg);
2746 6623 : if (log_commit(lg, pending, updated, nupdated) != GDK_SUCCEED) {
2747 0 : TRC_ERROR(GDK, "failed to commit");
2748 0 : res = LOG_ERR;
2749 0 : rotation_lock(lg);
2750 0 : lg->saved_id = olid; /* reset !! */
2751 0 : rotation_unlock(lg);
2752 : }
2753 0 : if (res != LOG_ERR) {
2754 18830 : while (olid < lid) {
2755 : /* Try to cleanup, remove old log file, continue on failure! */
2756 12207 : olid++;
2757 12207 : (void) log_cleanup(lg, olid);
2758 : }
2759 : }
2760 6623 : if (res == LOG_OK)
2761 6623 : log_cleanup_range(lg, lg->saved_id);
2762 : }
2763 6623 : GDKfree(updated);
2764 6623 : return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
2765 : }
2766 :
2767 : /* Clean-up write-ahead log files already persisted in the BATs, leaving only the most recent one.
2768 : * Only the bak- files are deleted for the preserved WAL files.
2769 : */
2770 : lng
2771 28644 : log_changes(logger *lg)
2772 : {
2773 28644 : if (LOG_DISABLED(lg))
2774 : return 0;
2775 28644 : rotation_lock(lg);
2776 28644 : lng changes = lg->id - lg->saved_id - 1;
2777 28644 : rotation_unlock(lg);
2778 28644 : return changes;
2779 : }
2780 :
2781 : int
2782 462 : log_sequence(logger *lg, int seq, lng *id)
2783 : {
2784 462 : log_lock(lg);
2785 462 : BUN p = log_find(lg->seqs_id, lg->dseqs, seq);
2786 :
2787 462 : if (p != BUN_NONE) {
2788 343 : *id = *(lng *) Tloc(lg->seqs_val, p);
2789 :
2790 343 : log_unlock(lg);
2791 343 : return 1;
2792 : }
2793 119 : log_unlock(lg);
2794 119 : return 0;
2795 : }
2796 :
2797 : gdk_return
2798 192044 : log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng cnt)
2799 : {
2800 192044 : bte tpe = find_type(lg, type);
2801 192044 : gdk_return ok = GDK_SUCCEED;
2802 192044 : logformat l;
2803 192044 : lng nr;
2804 192044 : l.flag = LOG_UPDATE_CONST;
2805 192044 : l.id = id;
2806 192044 : nr = cnt;
2807 :
2808 192044 : if (LOG_DISABLED(lg) || !nr) {
2809 : /* logging is switched off */
2810 66607 : if (nr) {
2811 66607 : log_lock(lg);
2812 66607 : ok = la_bat_update_count(lg, id, offset + cnt, lg->tid);
2813 66607 : log_unlock(lg);
2814 : }
2815 66607 : return ok;
2816 : }
2817 :
2818 125437 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[type].atomWrite;
2819 :
2820 125437 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2821 250874 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
2822 250874 : log_write_format(lg, &l) != GDK_SUCCEED ||
2823 250874 : !mnstr_writeLng(lg->current->output_log, nr) ||
2824 250874 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2825 125437 : !mnstr_writeLng(lg->current->output_log, offset)) {
2826 0 : ATOMIC_DEC(&lg->current->refcount);
2827 0 : ok = GDK_FAIL;
2828 0 : goto bailout;
2829 : }
2830 :
2831 125437 : ok = wt(val, lg->current->output_log, 1);
2832 :
2833 125437 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
2834 :
2835 125437 : bailout:
2836 125437 : if (ok != GDK_SUCCEED) {
2837 0 : const char *err = mnstr_peek_error(lg->current->output_log);
2838 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
2839 : }
2840 : return ok;
2841 : }
2842 :
2843 : static gdk_return
2844 19587 : string_writer(logger *lg, BAT *b, lng offset, lng nr)
2845 : {
2846 19587 : size_t bufsz = lg->wbufsize, resize = 0;
2847 19587 : BUN end = (BUN) (offset + nr);
2848 19587 : char *buf = lg->wbuf;
2849 19587 : gdk_return res = GDK_SUCCEED;
2850 :
2851 19587 : if (!buf)
2852 : return GDK_FAIL;
2853 19587 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2854 19587 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
2855 : return GDK_FAIL;
2856 19587 : BATiter bi = bat_iterator(b);
2857 19587 : BUN p = (BUN) offset;
2858 39226 : for (; p < end;) {
2859 19639 : size_t sz = 0;
2860 19639 : if (resize) {
2861 2 : if ((buf = GDKrealloc(lg->wbuf, resize)) == NULL) {
2862 : res = GDK_FAIL;
2863 : break;
2864 : }
2865 2 : lg->wbuf = buf;
2866 2 : lg->wbufsize = bufsz = resize;
2867 2 : resize = 0;
2868 : }
2869 19639 : char *dst = buf;
2870 73815 : for (; p < end && sz < bufsz; p++) {
2871 54228 : const char *s = BUNtvar(bi, p);
2872 54228 : size_t len = strlen(s) + 1;
2873 54228 : if ((sz + len) > bufsz) {
2874 52 : if (len > bufsz)
2875 2 : resize = len + bufsz;
2876 : break;
2877 : } else {
2878 54176 : memcpy(dst, s, len);
2879 54176 : dst += len;
2880 54176 : sz += len;
2881 : }
2882 : }
2883 39276 : if (sz &&
2884 39274 : (!mnstr_writeLng(lg->current->output_log, (lng) sz) ||
2885 19637 : mnstr_write(lg->current->output_log, buf, sz, 1) != 1)) {
2886 : res = GDK_FAIL;
2887 : break;
2888 : }
2889 : }
2890 19587 : bat_iterator_end(&bi);
2891 19587 : return res;
2892 : }
2893 :
2894 : static gdk_return
2895 503089 : internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced, lng total_cnt)
2896 : {
2897 503089 : bte tpe = find_type(lg, b->ttype);
2898 503089 : gdk_return ok = GDK_SUCCEED;
2899 503089 : logformat l;
2900 503089 : BUN p;
2901 503089 : lng nr;
2902 503089 : l.flag = LOG_UPDATE_BULK;
2903 503089 : l.id = id;
2904 503089 : nr = cnt;
2905 :
2906 503089 : if (LOG_DISABLED(lg) || !nr) {
2907 : /* logging is switched off */
2908 204024 : if (nr)
2909 149793 : return la_bat_update_count(lg, id, offset + cnt, lg->tid);
2910 : return GDK_SUCCEED;
2911 : }
2912 :
2913 269488 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[b->ttype].atomWrite;
2914 :
2915 269488 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2916 269488 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR) {
2917 0 : ok = GDK_FAIL;
2918 0 : goto bailout;
2919 : }
2920 :
2921 269488 : if (lg->total_cnt == 0) /* signals single bulk message or first part of bat logged in parts */
2922 538608 : if (log_write_format(lg, &l) != GDK_SUCCEED ||
2923 667626 : !mnstr_writeLng(lg->current->output_log, total_cnt ? total_cnt : cnt) ||
2924 538608 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2925 409590 : !mnstr_writeLng(lg->current->output_log, total_cnt ? -1 : offset)) { /* offset = -1 indicates bat was logged in parts */
2926 0 : ok = GDK_FAIL;
2927 0 : goto bailout;
2928 : }
2929 269488 : if (!total_cnt)
2930 129018 : total_cnt = cnt;
2931 269488 : lg->total_cnt += cnt;
2932 :
2933 269488 : if (lg->total_cnt == total_cnt) /* This is the last to be logged part of this bat, we can already reset the total_cnt */
2934 269304 : lg->total_cnt = 0;
2935 :
2936 : /* if offset is just for the log, but BAT is already sliced, reset offset */
2937 269488 : if (sliced)
2938 1512 : offset = 0;
2939 269488 : if (b->ttype == TYPE_msk) {
2940 0 : BATiter bi = bat_iterator(b);
2941 0 : if (offset % 32 == 0) {
2942 0 : if (!mnstr_writeIntArray(lg->current->output_log, (int *) ((char *) bi.base + offset / 32),
2943 0 : (size_t) ((nr + 31) / 32)))
2944 0 : ok = GDK_FAIL;
2945 : } else {
2946 0 : for (lng i = 0; i < nr; i += 32) {
2947 : uint32_t v = 0;
2948 0 : for (int j = 0; j < 32 && i + j < nr; j++)
2949 0 : v |= (uint32_t) Tmskval(&bi, (BUN) (offset + i + j)) << j;
2950 0 : if (!mnstr_writeInt(lg->current->output_log, (int) v)) {
2951 : ok = GDK_FAIL;
2952 : break;
2953 : }
2954 : }
2955 : }
2956 0 : bat_iterator_end(&bi);
2957 518753 : } else if (b->ttype < TYPE_str && !isVIEW(b)) {
2958 249265 : BATiter bi = bat_iterator(b);
2959 249265 : const void *t = BUNtail(bi, (BUN) offset);
2960 :
2961 249265 : ok = wt(t, lg->current->output_log, (size_t) nr);
2962 249265 : bat_iterator_end(&bi);
2963 20223 : } else if (b->ttype == TYPE_str) {
2964 : /* efficient string writes */
2965 19581 : ok = string_writer(lg, b, offset, nr);
2966 : } else {
2967 642 : BATiter bi = bat_iterator(b);
2968 642 : BUN end = (BUN) (offset + nr);
2969 1642 : for (p = (BUN) offset; p < end && ok == GDK_SUCCEED; p++) {
2970 1000 : const void *t = BUNtail(bi, p);
2971 :
2972 1000 : ok = wt(t, lg->current->output_log, 1);
2973 : }
2974 642 : bat_iterator_end(&bi);
2975 : }
2976 :
2977 269488 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
2978 :
2979 269488 : bailout:
2980 269488 : if (ok != GDK_SUCCEED) {
2981 0 : ATOMIC_DEC(&lg->current->refcount);
2982 0 : const char *err = mnstr_peek_error(lg->current->output_log);
2983 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
2984 : }
2985 : return ok;
2986 : }
2987 :
2988 : /*
2989 : * Changes made to the BAT descriptor should be stored in the log
2990 : * files. Actually, we need to save the descriptor file, perhaps we
2991 : * should simply introduce a versioning scheme.
2992 : */
2993 : gdk_return
2994 236118 : log_bat_persists(logger *lg, BAT *b, log_id id)
2995 : {
2996 236118 : log_lock(lg);
2997 236118 : bte ta = find_type(lg, b->ttype);
2998 236118 : logformat l;
2999 :
3000 236118 : if (log_add_bat(lg, b, id, -1) != GDK_SUCCEED) {
3001 0 : log_unlock(lg);
3002 0 : if (!LOG_DISABLED(lg))
3003 0 : ATOMIC_DEC(&lg->current->refcount);
3004 0 : return GDK_FAIL;
3005 : }
3006 :
3007 236118 : l.flag = LOG_CREATE;
3008 236118 : l.id = id;
3009 236118 : if (!LOG_DISABLED(lg)) {
3010 157071 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3011 314142 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3012 314142 : log_write_format(lg, &l) != GDK_SUCCEED ||
3013 157071 : mnstr_write(lg->current->output_log, &ta, 1, 1) != 1) {
3014 0 : log_unlock(lg);
3015 0 : ATOMIC_DEC(&lg->current->refcount);
3016 0 : return GDK_FAIL;
3017 : }
3018 : }
3019 236118 : TRC_DEBUG(WAL, "id (%d) bat (%d)\n", id, b->batCacheid);
3020 236118 : gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0, 0);
3021 236118 : log_unlock(lg);
3022 236118 : if (r != GDK_SUCCEED)
3023 0 : ATOMIC_DEC(&lg->current->refcount);
3024 : return r;
3025 : }
3026 :
3027 : gdk_return
3028 22132 : log_bat_transient(logger *lg, log_id id)
3029 : {
3030 22132 : log_lock(lg);
3031 22132 : log_bid bid = internal_find_bat(lg, id, -1);
3032 22132 : logformat l;
3033 :
3034 22132 : if (bid < 0) {
3035 0 : log_unlock(lg);
3036 0 : return GDK_FAIL;
3037 : }
3038 22132 : if (!bid) {
3039 0 : GDKerror("log_bat_transient failed to find bid for object %d\n", id);
3040 0 : log_unlock(lg);
3041 0 : return GDK_FAIL;
3042 : }
3043 22132 : l.flag = LOG_DESTROY;
3044 22132 : l.id = id;
3045 :
3046 22132 : if (!LOG_DISABLED(lg)) {
3047 10217 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3048 0 : TRC_CRITICAL(GDK, "write failed\n");
3049 0 : log_unlock(lg);
3050 0 : ATOMIC_DEC(&lg->current->refcount);
3051 0 : return GDK_FAIL;
3052 : }
3053 : }
3054 22132 : TRC_DEBUG(WAL, "Logged destroyed bat (%d) %d\n", id, bid);
3055 22132 : BAT *b = BBPquickdesc(bid);
3056 22132 : assert(b);
3057 22132 : BUN cnt = BATcount(b);
3058 22132 : ATOMIC_ADD(&lg->current->drops, cnt);
3059 22132 : gdk_return r = log_del_bat(lg, bid);
3060 22132 : log_unlock(lg);
3061 22132 : if (r != GDK_SUCCEED)
3062 0 : ATOMIC_DEC(&lg->current->refcount);
3063 : return r;
3064 : }
3065 :
3066 : static gdk_return
3067 117096 : log_bat_group(logger *lg, log_id id)
3068 : {
3069 117096 : if (LOG_DISABLED(lg))
3070 : return GDK_SUCCEED;
3071 :
3072 76334 : logformat l;
3073 76334 : l.flag = LOG_BAT_GROUP;
3074 76334 : l.id = id;
3075 76334 : gdk_return r = log_write_format(lg, &l);
3076 76334 : return r;
3077 : }
3078 :
3079 : gdk_return
3080 58548 : log_bat_group_start(logger *lg, log_id id)
3081 : {
3082 : /*positive table id represent start of logged table */
3083 58548 : return log_bat_group(lg, id);
3084 : }
3085 :
3086 : gdk_return
3087 58548 : log_bat_group_end(logger *lg, log_id id)
3088 : {
3089 : /*negative table id represent end of logged table */
3090 58548 : return log_bat_group(lg, -id);
3091 : }
3092 :
3093 : gdk_return
3094 264260 : log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, lng total_cnt)
3095 : {
3096 264260 : log_lock(lg);
3097 264260 : gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0, total_cnt);
3098 264260 : log_unlock(lg);
3099 264260 : return r;
3100 : }
3101 :
3102 : gdk_return
3103 2771 : log_delta(logger *lg, BAT *uid, BAT *uval, log_id id)
3104 : {
3105 2771 : log_lock(lg);
3106 2771 : bte tpe = find_type(lg, uval->ttype);
3107 2771 : gdk_return ok = GDK_SUCCEED;
3108 2771 : logformat l;
3109 2771 : BUN p;
3110 2771 : lng nr;
3111 :
3112 2771 : if (BATtdense(uid)) {
3113 2711 : ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1, 0);
3114 2711 : log_unlock(lg);
3115 2711 : if (!LOG_DISABLED(lg) && ok != GDK_SUCCEED)
3116 0 : ATOMIC_DEC(&lg->current->refcount);
3117 2711 : return ok;
3118 : }
3119 :
3120 60 : assert(uid->ttype == TYPE_oid || uid->ttype == TYPE_void);
3121 :
3122 60 : l.flag = LOG_UPDATE;
3123 60 : l.id = id;
3124 60 : nr = (BATcount(uval));
3125 60 : assert(nr);
3126 :
3127 60 : if (LOG_DISABLED(lg)) {
3128 : /* logging is switched off */
3129 41 : log_unlock(lg);
3130 41 : return GDK_SUCCEED;
3131 : }
3132 :
3133 19 : BATiter vi = bat_iterator(uval);
3134 19 : gdk_return(*wh) (const void *, stream *, size_t) = BATatoms[TYPE_oid].atomWrite;
3135 19 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[uval->ttype].atomWrite;
3136 :
3137 19 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3138 38 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3139 38 : log_write_format(lg, &l) != GDK_SUCCEED ||
3140 38 : !mnstr_writeLng(lg->current->output_log, nr) ||
3141 19 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1) {
3142 0 : ok = GDK_FAIL;
3143 0 : goto bailout;
3144 : }
3145 1074 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3146 1055 : const oid id = BUNtoid(uid, p);
3147 :
3148 1055 : ok = wh(&id, lg->current->output_log, 1);
3149 : }
3150 19 : if (uval->ttype == TYPE_msk) {
3151 0 : if (!mnstr_writeIntArray(lg->current->output_log, vi.base,
3152 0 : (BATcount(uval) + 31) / 32))
3153 0 : ok = GDK_FAIL;
3154 32 : } else if (uval->ttype < TYPE_str && !isVIEW(uval)) {
3155 13 : const void *t = BUNtail(vi, 0);
3156 :
3157 13 : ok = wt(t, lg->current->output_log, (size_t) nr);
3158 6 : } else if (uval->ttype == TYPE_str) {
3159 : /* efficient string writes */
3160 6 : ok = string_writer(lg, uval, 0, nr);
3161 : } else {
3162 0 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3163 0 : const void *val = BUNtail(vi, p);
3164 :
3165 0 : ok = wt(val, lg->current->output_log, 1);
3166 : }
3167 : }
3168 :
3169 19 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
3170 :
3171 19 : bailout:
3172 19 : bat_iterator_end(&vi);
3173 19 : if (ok != GDK_SUCCEED) {
3174 0 : const char *err = mnstr_peek_error(lg->current->output_log);
3175 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
3176 0 : ATOMIC_DEC(&lg->current->refcount);
3177 : }
3178 19 : log_unlock(lg);
3179 19 : return ok;
3180 : }
3181 :
3182 : static inline bool
3183 56902 : check_rotation_conditions(logger *lg)
3184 : {
3185 56902 : if (LOG_DISABLED(lg))
3186 : return false;
3187 :
3188 56899 : if (lg->current->next)
3189 : return false; /* do not rotate if there is already a prepared next current */
3190 56899 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
3191 : return true;
3192 56899 : const lng current_file_size = (lng) getfilepos(getFile(lg->current->output_log));
3193 :
3194 56899 : if (current_file_size == -1)
3195 : return false;
3196 :
3197 56899 : assert(current_file_size >= 0);
3198 :
3199 56899 : if (current_file_size == 2)
3200 : return false;
3201 :
3202 2750 : bool res = (lg->saved_id + 1 >= lg->id && ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped) ||
3203 57282 : current_file_size > lg->max_file_size ||
3204 50035 : (GDKusec() - lg->file_age) > lg->max_file_age;
3205 :
3206 54537 : return res;
3207 : }
3208 :
3209 : gdk_return
3210 62478 : log_tend(logger *lg)
3211 : {
3212 62478 : TRC_DEBUG(WAL, "tend %d\n", lg->tid);
3213 :
3214 62478 : if (LOG_DISABLED(lg))
3215 : return GDK_SUCCEED;
3216 :
3217 56899 : gdk_return result;
3218 56899 : logformat l;
3219 56899 : l.flag = LOG_END;
3220 56899 : l.id = lg->tid;
3221 :
3222 56899 : if ((result = log_write_format(lg, &l)) == GDK_SUCCEED)
3223 56899 : ATOMIC_INC(&lg->nr_flushers);
3224 : return result;
3225 : }
3226 :
3227 : #define flush_lock(lg) MT_lock_set(&(lg)->flush_lock)
3228 : #define flush_unlock(lg) MT_lock_unset(&(lg)->flush_lock)
3229 :
3230 : static inline gdk_return
3231 56091 : do_flush(logged_range *range)
3232 : {
3233 : /* assumes flush lock */
3234 56091 : stream *output_log = range->output_log;
3235 56091 : ulng ts = ATOMIC_GET(&range->last_ts);
3236 :
3237 56091 : if (mnstr_flush(output_log, MNSTR_FLUSH_DATA) ||
3238 56091 : (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) && mnstr_fsync(output_log)))
3239 0 : return GDK_FAIL;
3240 56091 : ATOMIC_SET(&range->flushed_ts, ts);
3241 56091 : return GDK_SUCCEED;
3242 : }
3243 :
3244 : static inline void
3245 62475 : log_tdone(logger *lg, logged_range *range, ulng commit_ts)
3246 : {
3247 62475 : (void) lg;
3248 62475 : TRC_DEBUG(WAL, "tdone " LLFMT "\n", commit_ts);
3249 :
3250 62475 : if ((ulng) ATOMIC_GET(&range->last_ts) < commit_ts)
3251 61667 : ATOMIC_SET(&range->last_ts, commit_ts);
3252 62475 : }
3253 :
3254 : gdk_return
3255 62478 : log_tflush(logger *lg, ulng file_id, ulng commit_ts)
3256 : {
3257 62478 : rotation_lock(lg);
3258 62478 : if (lg->flushnow) {
3259 5576 : logged_range *p = lg->current;
3260 5576 : assert(lg->flush_ranges == lg->current);
3261 5576 : assert(ATOMIC_GET(&lg->current->flushed_ts) == ATOMIC_GET(&lg->current->last_ts));
3262 5576 : log_tdone(lg, lg->current, commit_ts);
3263 5576 : ATOMIC_SET(&lg->current->flushed_ts, commit_ts);
3264 5576 : lg->id++;
3265 5576 : lg->flushnow = false;
3266 5576 : if (log_open_output(lg) != GDK_SUCCEED)
3267 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3268 5576 : do_rotate(lg);
3269 5576 : (void) do_flush_range_cleanup(lg);
3270 5576 : assert(lg->flush_ranges == lg->current);
3271 5576 : rotation_unlock(lg);
3272 5576 : return log_commit(lg, p, NULL, 0);
3273 : }
3274 :
3275 56902 : if (LOG_DISABLED(lg)) {
3276 3 : rotation_unlock(lg);
3277 3 : return GDK_SUCCEED;
3278 : }
3279 :
3280 56899 : logged_range *frange = do_flush_range_cleanup(lg);
3281 :
3282 57220 : while (frange->next && frange->id < file_id) {
3283 : assert(frange->next);
3284 : frange = frange->next;
3285 : }
3286 :
3287 56899 : log_tdone(lg, frange, commit_ts);
3288 :
3289 56899 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts) {
3290 : /* delay needed ? */
3291 :
3292 56091 : flush_lock(lg);
3293 : /* check it one more time */
3294 56091 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts)
3295 56091 : do_flush(frange);
3296 56091 : flush_unlock(lg);
3297 : }
3298 : /* else somebody else has flushed our log file */
3299 :
3300 56899 : if (ATOMIC_DEC(&frange->refcount) == 1 && !LOG_DISABLED(lg)) {
3301 54808 : if (frange != lg->current && frange->output_log) {
3302 209 : close_stream(frange->output_log);
3303 209 : frange->output_log = NULL;
3304 : }
3305 : }
3306 :
3307 56899 : if (ATOMIC_DEC(&lg->nr_flushers) == 0) {
3308 : /* I am the last flusher
3309 : * if present,
3310 : * wake up the exclusive flusher in log_tstart */
3311 : /* rotation_lock is still being held */
3312 55129 : MT_cond_signal(&lg->excl_flush_cv);
3313 : }
3314 56899 : rotation_unlock(lg);
3315 :
3316 56899 : return GDK_SUCCEED;
3317 : }
3318 :
3319 : static gdk_return
3320 6896 : log_tsequence_(logger *lg, int seq, lng val)
3321 : {
3322 6896 : logformat l;
3323 :
3324 6896 : if (LOG_DISABLED(lg))
3325 : return GDK_SUCCEED;
3326 2827 : l.flag = LOG_SEQ;
3327 2827 : l.id = seq;
3328 :
3329 2827 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3330 :
3331 2827 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3332 5654 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3333 5654 : log_write_format(lg, &l) != GDK_SUCCEED ||
3334 2827 : !mnstr_writeLng(lg->current->output_log, val)) {
3335 0 : TRC_CRITICAL(GDK, "write failed\n");
3336 0 : ATOMIC_DEC(&lg->current->refcount);
3337 0 : return GDK_FAIL;
3338 : }
3339 : return GDK_SUCCEED;
3340 : }
3341 :
3342 : /* a transaction in it self */
3343 : gdk_return
3344 6896 : log_tsequence(logger *lg, int seq, lng val)
3345 : {
3346 6896 : BUN p;
3347 :
3348 6896 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3349 :
3350 6896 : log_lock(lg);
3351 6896 : MT_lock_set(&lg->seqs_id->theaplock);
3352 6896 : BUN inserted = lg->seqs_id->batInserted;
3353 6896 : MT_lock_unset(&lg->seqs_id->theaplock);
3354 6896 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE && p >= inserted) {
3355 1620 : assert(lg->seqs_val->hseqbase == 0);
3356 1620 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
3357 0 : log_unlock(lg);
3358 0 : return GDK_FAIL;
3359 : }
3360 : } else {
3361 4914 : if (p != BUN_NONE) {
3362 4914 : oid pos = p;
3363 4914 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
3364 0 : log_unlock(lg);
3365 0 : return GDK_FAIL;
3366 : }
3367 : }
3368 10552 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
3369 5276 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
3370 0 : log_unlock(lg);
3371 0 : return GDK_FAIL;
3372 : }
3373 : }
3374 6896 : gdk_return r = log_tsequence_(lg, seq, val);
3375 6896 : log_unlock(lg);
3376 6896 : return r;
3377 : }
3378 :
3379 : static gdk_return
3380 12531 : bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
3381 : {
3382 12531 : log_lock(lg);
3383 12531 : BAT *b = lg->catalog_bid;
3384 12531 : const log_bid *bids;
3385 :
3386 12531 : bids = (log_bid *) Tloc(b, 0);
3387 248582 : for (BUN p = b->batInserted, cnt = pending ? pending->cnt : BATcount(b); p < cnt; p++) {
3388 236051 : log_bid bid = bids[p];
3389 236051 : BAT *lb = BBP_desc(bid);
3390 :
3391 236051 : assert(bid);
3392 236051 : if (lb->batCacheid == 0 || BATmode(lb, false) != GDK_SUCCEED) {
3393 0 : GDKwarning("Failed to set bat (%d%s) persistent\n", bid, !lb ? " gone" : "");
3394 0 : log_unlock(lg);
3395 0 : return GDK_FAIL;
3396 : }
3397 :
3398 236051 : assert(lb->batRestricted != BAT_WRITE);
3399 :
3400 236051 : TRC_DEBUG(WAL, "create %d (%d)\n", bid, BBP_lrefs(bid));
3401 : }
3402 : /* bm_subcommit releases the lock */
3403 12531 : return bm_subcommit(lg, pending, updated, maxupdated);
3404 : }
3405 :
3406 : static gdk_return
3407 236371 : log_add_bat(logger *lg, BAT *b, log_id id, int tid)
3408 : {
3409 236371 : log_bid bid = internal_find_bat(lg, id, tid);
3410 236371 : lng cnt = 0;
3411 236371 : lng lid = lng_nil;
3412 :
3413 236371 : assert(b->batRestricted != BAT_WRITE);
3414 236371 : assert(b->batRole == PERSISTENT);
3415 236371 : if (bid < 0)
3416 : return GDK_FAIL;
3417 236371 : if (bid) {
3418 155429 : if (bid != b->batCacheid) {
3419 155426 : if (log_del_bat(lg, bid) != GDK_SUCCEED)
3420 : return GDK_FAIL;
3421 : } else {
3422 : return GDK_SUCCEED;
3423 : }
3424 : }
3425 236368 : bid = b->batCacheid;
3426 236368 : TRC_DEBUG(WAL, "create %d\n", id);
3427 236368 : assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
3428 472736 : if (BUNappend(lg->catalog_bid, &bid, true) != GDK_SUCCEED ||
3429 472736 : BUNappend(lg->catalog_id, &id, true) != GDK_SUCCEED ||
3430 472736 : BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED ||
3431 236368 : BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
3432 0 : return GDK_FAIL;
3433 236368 : if (lg->current)
3434 236115 : lg->current->cnt++;
3435 236368 : BBPretain(bid);
3436 236368 : return GDK_SUCCEED;
3437 : }
3438 :
3439 : static gdk_return
3440 177598 : log_del_bat(logger *lg, log_bid bid)
3441 : {
3442 177598 : BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid);
3443 177598 : lng lid = lg->tid;
3444 :
3445 177598 : assert(p != BUN_NONE);
3446 177598 : if (p == BUN_NONE) {
3447 : GDKerror("cannot find BAT\n");
3448 : return GDK_FAIL;
3449 : }
3450 :
3451 177598 : assert(lg->catalog_lid->hseqbase == 0);
3452 177598 : return BUNreplace(lg->catalog_lid, p, &lid, false);
3453 : }
3454 :
3455 : /* returns -1 on failure, 0 when not found, > 0 when found */
3456 : log_bid
3457 27652 : log_find_bat(logger *lg, log_id id)
3458 : {
3459 27652 : log_lock(lg);
3460 27652 : log_bid bid = internal_find_bat(lg, id, -1);
3461 27652 : log_unlock(lg);
3462 27652 : if (!bid) {
3463 0 : GDKerror("logger_find_bat failed to find bid for object %d\n", id);
3464 0 : return GDK_FAIL;
3465 : }
3466 : return bid;
3467 : }
3468 :
3469 :
3470 :
3471 : gdk_return
3472 62479 : log_tstart(logger *lg, bool flushnow, ulng *file_id)
3473 : {
3474 62479 : rotation_lock(lg);
3475 62479 : if (flushnow) {
3476 5577 : if (file_id == NULL) {
3477 : /* special case: ask store_manager to rotate log file */
3478 1 : lg->file_age = 0;
3479 1 : rotation_unlock(lg);
3480 1 : return GDK_SUCCEED;
3481 : }
3482 : /* I am now the exclusive flusher */
3483 5577 : while (ATOMIC_GET(&lg->nr_flushers)) {
3484 : /* I am waiting until all existing flushers are done */
3485 1 : MT_cond_wait(&lg->excl_flush_cv, &lg->rotation_lock);
3486 : }
3487 5576 : assert(ATOMIC_GET(&lg->nr_flushers) == 0);
3488 :
3489 5576 : if (ATOMIC_GET(&lg->current->last_ts)) {
3490 2174 : lg->id++;
3491 2174 : if (log_open_output(lg) != GDK_SUCCEED)
3492 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3493 : }
3494 5576 : do_rotate(lg);
3495 5576 : (void) do_flush_range_cleanup(lg);
3496 5576 : rotation_unlock(lg);
3497 :
3498 5576 : if (lg->saved_id + 1 < lg->id)
3499 5111 : log_flush(lg, (1ULL << 63));
3500 5576 : lg->flushnow = flushnow;
3501 : } else {
3502 56902 : if (check_rotation_conditions(lg)) {
3503 4503 : lg->id++;
3504 4503 : if (log_open_output(lg) != GDK_SUCCEED)
3505 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3506 : }
3507 56902 : do_rotate(lg);
3508 56902 : rotation_unlock(lg);
3509 : }
3510 :
3511 62478 : if (LOG_DISABLED(lg))
3512 : return GDK_SUCCEED;
3513 :
3514 56899 : ATOMIC_INC(&lg->current->refcount);
3515 56899 : *file_id = lg->current->id;
3516 56899 : logformat l;
3517 56899 : l.flag = LOG_START;
3518 56899 : l.id = ++lg->tid;
3519 :
3520 56899 : TRC_DEBUG(WAL, "tstart %d\n", lg->tid);
3521 56899 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3522 0 : ATOMIC_DEC(&lg->current->refcount);
3523 0 : return GDK_FAIL;
3524 : }
3525 :
3526 : return GDK_SUCCEED;
3527 : }
3528 :
3529 : void
3530 116 : log_printinfo(logger *lg)
3531 : {
3532 116 : if (!rotation_trylock(lg, 1000)) {
3533 0 : printf("Logger is currently locked, so no logger information\n");
3534 0 : return;
3535 : }
3536 116 : printf("logger %s:\n", lg->fn);
3537 116 : printf("current log file "ULLFMT", last handled log file "ULLFMT"\n",
3538 : lg->id, lg->saved_id);
3539 116 : printf("current transaction id %d, saved transaction id %d\n",
3540 : lg->tid, lg->saved_tid);
3541 116 : printf("number of flushers: %d\n", (int) ATOMIC_GET(&lg->nr_flushers));
3542 116 : printf("number of catalog entries "BUNFMT", of which "BUNFMT" deleted\n",
3543 116 : lg->catalog_bid->batCount, lg->dcatalog->batCount);
3544 301 : for (logged_range *p = lg->pending; p; p = p->next) {
3545 185 : char buf[32];
3546 185 : if ((lg->debug & 128 || lg->inmemory) ||
3547 185 : p->output_log == NULL ||
3548 116 : snprintf(buf, sizeof(buf), ", file size %"PRIu64, (uint64_t) getfilepos(getFile(lg->current->output_log))) >= (int) sizeof(buf))
3549 69 : buf[0] = 0;
3550 254 : printf("pending range "ULLFMT": drops %"PRIu64", last_ts %"PRIu64", flushed_ts %"PRIu64", refcount %"PRIu64"%s%s\n", p->id, (uint64_t) ATOMIC_GET(&p->drops), (uint64_t) ATOMIC_GET(&p->last_ts), (uint64_t) ATOMIC_GET(&p->flushed_ts), (uint64_t) ATOMIC_GET(&p->refcount), buf, p == lg->current ? " (current)" : "");
3551 : }
3552 116 : rotation_unlock(lg);
3553 : }
|