Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 : #include "gdk_logger.h"
17 : #include "gdk_logger_internals.h"
18 : #include "mutils.h"
19 : #include <string.h>
20 :
21 : static gdk_return log_add_bat(logger *lg, BAT *b, log_id id, int tid);
22 : static gdk_return log_del_bat(logger *lg, log_bid bid);
23 : /*
24 : * The logger uses a directory to store its log files. One master log
25 : * file stores information about the version of the logger and the
26 : * type mapping it uses. This file is a simple ascii file with the
27 : * following format:
28 : * {6DIGIT-VERSION\n[id,type_name\n]*}
29 : * The transaction log files have a binary format.
30 : */
31 :
32 : #define LOG_START 0
33 : #define LOG_END 1
34 : #define LOG_UPDATE_CONST 2
35 : #define LOG_UPDATE_BULK 3
36 : #define LOG_UPDATE 4
37 : #define LOG_CREATE 5
38 : #define LOG_DESTROY 6
39 : #define LOG_SEQ 7
40 : #define LOG_CLEAR 8 /* DEPRECATED */
41 : #define LOG_BAT_GROUP 9
42 :
43 : #ifdef NATIVE_WIN32
44 : #define getfilepos _ftelli64
45 : #else
46 : #ifdef HAVE_FSEEKO
47 : #define getfilepos ftello
48 : #else
49 : #define getfilepos ftell
50 : #endif
51 : #endif
52 :
53 : #define BATSIZE 0
54 :
55 : #define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory || (lg)->flushnow)
56 :
57 : static const char *log_commands[] = {
58 : "LOG_START",
59 : "LOG_END",
60 : "LOG_UPDATE_CONST",
61 : "LOG_UPDATE_BULK",
62 : "LOG_UPDATE",
63 : "LOG_CREATE",
64 : "LOG_DESTROY",
65 : "LOG_SEQ",
66 : "", /* LOG_CLEAR IS DEPRECATED */
67 : "LOG_BAT_GROUP",
68 : };
69 :
70 : typedef struct logaction {
71 : int type; /* type of change */
72 : lng nr;
73 : int tt;
74 : lng id;
75 : lng offset;
76 : log_id cid; /* id of object */
77 : BAT *b; /* temporary bat with changes */
78 : BAT *uid; /* temporary bat with bun positions to update */
79 : } logaction;
80 :
81 : /* during the recover process a number of transactions could be active */
82 : typedef struct trans {
83 : int tid; /* transaction id */
84 : int sz; /* sz of the changes array */
85 : int nr; /* nr of changes */
86 :
87 : logaction *changes;
88 :
89 : struct trans *tr;
90 : } trans;
91 :
92 : typedef struct logformat_t {
93 : bte flag;
94 : int id;
95 : } logformat;
96 :
97 : typedef enum { LOG_OK, LOG_EOF, LOG_ERR } log_return;
98 :
99 : static gdk_return bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated);
100 : static gdk_return tr_grow(trans *tr);
101 :
102 : #define log_lock(lg) MT_lock_set(&(lg)->lock)
103 : #define log_unlock(lg) MT_lock_unset(&(lg)->lock)
104 :
105 : static inline bte
106 910009 : find_type(logger *lg, int tpe)
107 : {
108 910009 : assert(tpe >= 0 && tpe < MAXATOMS);
109 910009 : return lg->type_id[tpe];
110 : }
111 :
112 : static inline int
113 545189 : find_type_nr(logger *lg, bte tpe)
114 : {
115 545189 : int nr = lg->type_nr[tpe < 0 ? 256 + tpe : tpe];
116 545189 : if (nr == 255)
117 0 : return -1;
118 : return nr;
119 : }
120 :
121 : static BUN
122 418235 : log_find(BAT *b, BAT *d, int val)
123 : {
124 418235 : BUN p;
125 :
126 418235 : assert(b->ttype == TYPE_int);
127 418235 : assert(d->ttype == TYPE_oid);
128 418235 : BATiter bi = bat_iterator(b);
129 418235 : if (BAThash(b) == GDK_SUCCEED) {
130 418235 : MT_rwlock_rdlock(&b->thashlock);
131 661984 : HASHloop_int(bi, b->thash, p, &val) {
132 183615 : oid pos = p;
133 183615 : if (BUNfnd(d, &pos) == BUN_NONE) {
134 183615 : MT_rwlock_rdunlock(&b->thashlock);
135 183615 : bat_iterator_end(&bi);
136 183615 : return p;
137 : }
138 : }
139 234620 : MT_rwlock_rdunlock(&b->thashlock);
140 : } else { /* unlikely: BAThash failed */
141 0 : int *t = (int *) bi.base;
142 :
143 0 : for (p = 0; p < bi.count; p++) {
144 0 : if (t[p] == val) {
145 0 : oid pos = p;
146 0 : if (BUNfnd(d, &pos) == BUN_NONE) {
147 0 : bat_iterator_end(&bi);
148 0 : return p;
149 : }
150 : }
151 : }
152 : }
153 234620 : bat_iterator_end(&bi);
154 234620 : return BUN_NONE;
155 : }
156 :
157 : static log_bid
158 645401 : internal_find_bat(logger *lg, log_id id, int tid)
159 : {
160 645401 : BUN p;
161 :
162 645401 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
163 645401 : BATiter cni = bat_iterator(lg->catalog_id);
164 645401 : MT_rwlock_rdlock(&cni.b->thashlock);
165 645401 : if (tid < 0) {
166 448470 : HASHloop_int(cni, cni.b->thash, p, &id) {
167 205444 : oid pos = p;
168 205444 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
169 205444 : MT_rwlock_rdunlock(&cni.b->thashlock);
170 205444 : bat_iterator_end(&cni);
171 205444 : return *(log_bid *) Tloc(lg->catalog_bid, p);
172 : }
173 : }
174 : } else {
175 361561 : BUN cp = BUN_NONE;
176 1651307 : HASHloop_int(cni, cni.b->thash, p, &id) {
177 1077829 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
178 1077829 : if (lid != lng_nil && lid <= tid) {
179 : break;
180 : }
181 : cp = p;
182 : }
183 361561 : if (cp != BUN_NONE) {
184 361299 : MT_rwlock_rdunlock(&cni.b->thashlock);
185 361299 : bat_iterator_end(&cni);
186 361299 : return *(log_bid *) Tloc(lg->catalog_bid, cp);
187 : }
188 : }
189 78658 : MT_rwlock_rdunlock(&cni.b->thashlock);
190 78658 : bat_iterator_end(&cni);
191 78658 : return 0; /* not found */
192 : }
193 : return -1; /* error creating hash */
194 : }
195 :
196 : static inline void
197 15885 : logbat_destroy(BAT *b)
198 : {
199 19210 : BBPreclaim(b);
200 5631 : }
201 :
202 : static BAT *
203 14294 : logbat_new(int tt, BUN size, role_t role)
204 : {
205 14294 : BAT *nb = COLnew(0, tt, size, role);
206 :
207 14294 : if (nb) {
208 14294 : BBP_pid(nb->batCacheid) = 0;
209 14294 : if (role == PERSISTENT)
210 8978 : BATmode(nb, false);
211 : } else {
212 0 : TRC_CRITICAL(GDK, "creating new BAT[%s]#" BUNFMT " failed\n", ATOMname(tt), size);
213 : }
214 14294 : return nb;
215 : }
216 :
217 : static bool
218 755954 : log_read_format(logger *lg, logformat *data)
219 : {
220 755954 : assert(!lg->inmemory);
221 755954 : if (mnstr_read(lg->input_log, &data->flag, 1, 1) == 1) {
222 744204 : if (mnstr_readInt(lg->input_log, &data->id) == 1)
223 : return true;
224 : /* could only read part, so complain */
225 0 : TRC_CRITICAL(GDK, "read failed\n");
226 : }
227 : return false;
228 : }
229 :
230 : static gdk_return
231 752417 : log_write_format(logger *lg, logformat *data)
232 : {
233 752417 : assert(data->id || data->flag);
234 752417 : assert(!lg->inmemory);
235 752417 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
236 1504834 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
237 1504834 : mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
238 752417 : mnstr_writeInt(lg->current->output_log, data->id))
239 : return GDK_SUCCEED;
240 0 : TRC_CRITICAL(GDK, "write failed\n");
241 0 : return GDK_FAIL;
242 : }
243 :
244 : static log_return
245 2683 : log_read_seq(logger *lg, logformat *l)
246 : {
247 2683 : int seq = l->id;
248 2683 : lng val;
249 2683 : BUN p;
250 :
251 2683 : assert(!lg->inmemory);
252 2683 : if (mnstr_readLng(lg->input_log, &val) != 1) {
253 0 : TRC_CRITICAL(GDK, "read failed\n");
254 0 : return LOG_EOF;
255 : }
256 2683 : if (lg->flushing)
257 : return LOG_OK;
258 :
259 50 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
260 49 : p >= lg->seqs_id->batInserted) {
261 19 : assert(lg->seqs_val->hseqbase == 0);
262 19 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
263 0 : TRC_CRITICAL(GDK, "replace of %s_seqs_val failed\n", lg->fn);
264 0 : return LOG_ERR;
265 : }
266 : } else {
267 30 : if (p != BUN_NONE) {
268 30 : oid pos = p;
269 30 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
270 0 : TRC_CRITICAL(GDK, "append to %s_dseqs failed\n", lg->fn);
271 0 : return LOG_ERR;
272 : }
273 : }
274 62 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
275 31 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
276 0 : TRC_CRITICAL(GDK, "append to %s_seqs_val/id failed\n", lg->fn);
277 0 : return LOG_ERR;
278 : }
279 : }
280 : return LOG_OK;
281 : }
282 :
283 : #if 0
284 : static gdk_return
285 : log_write_id(logger *lg, int id)
286 : {
287 : assert(!lg->inmemory);
288 : assert(id >= 0);
289 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
290 : if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
291 : mnstr_writeInt(lg->current->output_log, id))
292 : return GDK_SUCCEED;
293 : TRC_CRITICAL(GDK, "write failed\n");
294 : return GDK_FAIL;
295 : }
296 :
297 : static log_return
298 : log_read_id(logger *lg, log_id *id)
299 : {
300 : assert(!lg->inmemory);
301 : if (mnstr_readInt(lg->input_log, id) != 1) {
302 : TRC_CRITICAL(GDK, "read failed\n");
303 : return LOG_EOF;
304 : }
305 : return LOG_OK;
306 : }
307 : #endif
308 :
309 : static log_return
310 18006 : string_reader(logger *lg, BAT *b, lng nr)
311 : {
312 18006 : size_t sz = 0;
313 18006 : lng SZ = 0;
314 18006 : log_return res = LOG_OK;
315 :
316 36253 : while (nr && res == LOG_OK) {
317 18247 : if (mnstr_readLng(lg->input_log, &SZ) != 1) {
318 0 : TRC_CRITICAL(GDK, "read failed\n");
319 0 : return LOG_EOF;
320 : }
321 18247 : sz = (size_t) SZ;
322 18247 : char *buf = lg->rbuf;
323 18247 : if (lg->rbufsize < sz) {
324 2 : if (!(buf = GDKrealloc(lg->rbuf, sz))) {
325 0 : TRC_CRITICAL(GDK, "couldn't grow string buffer\n");
326 0 : return LOG_ERR;
327 : }
328 2 : lg->rbuf = buf;
329 2 : lg->rbufsize = sz;
330 : }
331 :
332 18247 : if (mnstr_read(lg->input_log, buf, sz, 1) != 1) {
333 0 : TRC_CRITICAL(GDK, "read failed\n");
334 0 : return LOG_EOF;
335 : }
336 : /* handle strings */
337 : char *t = buf;
338 : /* chunked */
339 : #define CHUNK_SIZE 1024
340 : char *strings[CHUNK_SIZE];
341 : int cur = 0;
342 :
343 99189 : for (; nr > 0 && res == LOG_OK && t < (buf + sz); nr--) {
344 80942 : strings[cur++] = t;
345 80942 : if (cur == CHUNK_SIZE &&
346 6 : b &&
347 6 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
348 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
349 0 : res = LOG_ERR;
350 : }
351 80942 : if (cur == CHUNK_SIZE)
352 35 : cur = 0;
353 : /* find next */
354 5804390 : while (*t)
355 5723448 : t++;
356 80942 : t++;
357 : }
358 18247 : if (cur &&
359 489 : b &&
360 489 : BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) {
361 0 : TRC_CRITICAL(GDK, "append to string bat failed\n");
362 0 : res = LOG_ERR;
363 : }
364 : }
365 : return res;
366 : }
367 :
368 :
369 : struct offset {
370 : lng os; /* offset within source BAT in logfile */
371 : lng nr; /* number of values to be copied */
372 : lng od; /* offset within destination BAT in database */
373 : };
374 :
375 : static log_return
376 388656 : log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, BAT **cands)
377 : {
378 388656 : log_return res = LOG_OK;
379 388656 : lng nr, pnr;
380 388656 : bte type_id = -1;
381 388656 : int tpe;
382 :
383 388656 : assert(!lg->inmemory);
384 388656 : TRC_DEBUG(WAL, "found %d %s", id, l->flag == LOG_UPDATE ? "update" : "update_buld");
385 :
386 777312 : if (mnstr_readLng(lg->input_log, &nr) != 1 ||
387 388656 : mnstr_read(lg->input_log, &type_id, 1, 1) != 1) {
388 0 : TRC_CRITICAL(GDK, "read failed\n");
389 0 : return LOG_EOF;
390 : }
391 :
392 388656 : pnr = nr;
393 388656 : tpe = find_type_nr(lg, type_id);
394 388656 : if (tpe >= 0) {
395 388656 : BAT *uid = NULL;
396 388656 : BAT *r = NULL;
397 388656 : void *(*rt)(ptr, size_t *, stream *, size_t) = BATatoms[tpe].atomRead;
398 388656 : lng offset;
399 :
400 388656 : assert(nr <= (lng) BUN_MAX);
401 388656 : if (!lg->flushing && l->flag == LOG_UPDATE) {
402 0 : uid = COLnew(0, TYPE_oid, (BUN) nr, PERSISTENT);
403 0 : if (uid == NULL) {
404 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
405 27445 : return LOG_ERR;
406 : }
407 : }
408 :
409 388656 : if (l->flag == LOG_UPDATE_CONST) {
410 123950 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
411 0 : TRC_CRITICAL(GDK, "read failed\n");
412 0 : return LOG_EOF;
413 : }
414 123950 : if (cands) {
415 : /* This const range actually represents a segment of candidates corresponding to updated bat entries */
416 :
417 27445 : if (BATcount(*cands) == 0 || lg->flushing) {
418 : /* when flushing, we only need the offset and count of the last segment of inserts. */
419 27394 : assert((*cands)->ttype == TYPE_void);
420 27394 : BATtseqbase(*cands, (oid) offset);
421 27394 : BATsetcount(*cands, (BUN) nr);
422 51 : } else if (!lg->flushing) {
423 51 : assert(BATcount(*cands) > 0);
424 51 : BAT *dense = BATdense(0, (oid) offset, (BUN) nr);
425 51 : BAT *newcands = NULL;
426 51 : if (!dense) {
427 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
428 0 : res = LOG_ERR;
429 51 : } else if ((*cands)->ttype == TYPE_void) {
430 11 : if ((newcands = BATmergecand(*cands, dense))) {
431 11 : BBPreclaim(*cands);
432 11 : *cands = newcands;
433 : } else {
434 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
435 0 : res = LOG_ERR;
436 : }
437 : } else {
438 40 : assert((*cands)->ttype == TYPE_oid);
439 40 : assert(BATcount(*cands) > 0);
440 40 : if (BATappend(*cands, dense, NULL, true) != GDK_SUCCEED) {
441 0 : TRC_CRITICAL(GDK, "appending to bat failed\n");
442 0 : res = LOG_ERR;
443 : }
444 : }
445 51 : BBPreclaim(dense);
446 : }
447 :
448 : /* We have to read the value to update the read cursor */
449 27445 : size_t tlen = lg->rbufsize;
450 27445 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
451 27445 : if (t == NULL) {
452 0 : TRC_CRITICAL(GDK, "read failed\n");
453 0 : res = LOG_EOF;
454 : }
455 27445 : return res;
456 : }
457 : }
458 :
459 361211 : if (!lg->flushing) {
460 2250 : r = COLnew(0, tpe, (BUN) nr, PERSISTENT);
461 2250 : if (r == NULL) {
462 0 : if (uid)
463 0 : BBPreclaim(uid);
464 0 : return LOG_ERR;
465 : }
466 : }
467 :
468 361211 : if (l->flag == LOG_UPDATE_CONST) {
469 96505 : size_t tlen = lg->rbufsize;
470 96505 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
471 96505 : if (t == NULL) {
472 0 : TRC_CRITICAL(GDK, "read failed\n");
473 0 : res = LOG_EOF;
474 : } else {
475 96505 : lg->rbuf = t;
476 96505 : lg->rbufsize = tlen;
477 96505 : if (r) {
478 13768 : for (BUN p = 0; p < (BUN) nr; p++) {
479 12756 : if (BUNappend(r, t, true) != GDK_SUCCEED) {
480 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
481 0 : res = LOG_ERR;
482 : }
483 : }
484 : }
485 : }
486 264706 : } else if (l->flag == LOG_UPDATE_BULK) {
487 264688 : if (mnstr_readLng(lg->input_log, &offset) != 1) {
488 0 : if (r)
489 0 : BBPreclaim(r);
490 0 : TRC_CRITICAL(GDK, "read failed\n");
491 0 : return LOG_EOF;
492 : }
493 264688 : if (tpe == TYPE_msk) {
494 0 : if (r) {
495 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
496 0 : BATsetcount(r, (BUN) nr);
497 : else {
498 0 : TRC_CRITICAL(GDK, "read failed\n");
499 0 : res = LOG_EOF;
500 : }
501 : } else {
502 0 : size_t tlen = lg->rbufsize / sizeof(int);
503 0 : size_t cnt = 0, snr = (size_t) nr;
504 0 : snr = (snr + 31) / 32;
505 0 : assert(tlen);
506 0 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
507 0 : cnt = snr > tlen ? tlen : snr;
508 0 : if (!mnstr_readIntArray(lg->input_log, lg->rbuf, cnt)) {
509 0 : TRC_CRITICAL(GDK, "read failed\n");
510 0 : res = LOG_EOF;
511 : }
512 : }
513 : }
514 : } else {
515 264688 : if (!ATOMvarsized(tpe)) {
516 246106 : size_t cnt = 0, snr = (size_t) nr;
517 246106 : size_t tlen = lg->rbufsize / ATOMsize(tpe), ntlen = lg->rbufsize;
518 246106 : assert(tlen);
519 : /* read in chunks of max
520 : * BUFSIZE/width rows */
521 504883 : for (; res == LOG_OK && snr > 0; snr -= cnt) {
522 258777 : cnt = snr > tlen ? tlen : snr;
523 258777 : void *t = rt(lg->rbuf, &ntlen, lg->input_log, cnt);
524 :
525 258777 : if (t == NULL) {
526 : res = LOG_EOF;
527 : break;
528 : }
529 258777 : assert(t == lg->rbuf);
530 258777 : if (r && BUNappendmulti(r, t, cnt, true) != GDK_SUCCEED) {
531 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
532 0 : res = LOG_ERR;
533 : }
534 : }
535 18582 : } else if (tpe == TYPE_str) {
536 : /* efficient string */
537 18000 : res = string_reader(lg, r, nr);
538 : } else {
539 1206 : for (; res == LOG_OK && nr > 0; nr--) {
540 624 : size_t tlen = lg->rbufsize;
541 624 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
542 :
543 624 : if (t == NULL) {
544 : /* see if failure was due to
545 : * malloc or something less
546 : * serious (in the current
547 : * context) */
548 0 : if (strstr(GDKerrbuf, "alloc") == NULL)
549 : res = LOG_EOF;
550 : else
551 0 : res = LOG_ERR;
552 0 : TRC_CRITICAL(GDK, "read failed\n");
553 : } else {
554 624 : lg->rbuf = t;
555 624 : lg->rbufsize = tlen;
556 624 : if (r && BUNappend(r, t, true) != GDK_SUCCEED) {
557 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
558 0 : res = LOG_ERR;
559 : }
560 : }
561 : }
562 : }
563 : }
564 : } else {
565 18 : void *(*rh)(ptr, size_t *, stream *, size_t) = BATatoms[TYPE_oid].atomRead;
566 18 : void *hv = ATOMnil(TYPE_oid);
567 18 : offset = 0;
568 :
569 18 : if (hv == NULL) {
570 0 : TRC_CRITICAL(GDK, "read failed\n");
571 0 : res = LOG_EOF;
572 : }
573 1071 : for (; res == LOG_OK && nr > 0; nr--) {
574 1053 : size_t hlen = sizeof(oid);
575 1053 : void *h = rh(hv, &hlen, lg->input_log, 1);
576 1053 : assert(hlen == sizeof(oid));
577 1053 : assert(h == hv);
578 1053 : if ((uid && BUNappend(uid, h, true) != GDK_SUCCEED)) {
579 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
580 0 : res = LOG_ERR;
581 : }
582 : }
583 18 : nr = pnr;
584 18 : if (tpe == TYPE_msk) {
585 0 : if (r) {
586 0 : if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
587 0 : BATsetcount(r, (BUN) nr);
588 : else {
589 0 : TRC_CRITICAL(GDK, "read failed\n");
590 0 : res = LOG_EOF;
591 : }
592 : } else {
593 0 : for (lng i = 0; i < nr; i += 32) {
594 0 : int v;
595 0 : switch (mnstr_readInt(lg->input_log, &v)) {
596 0 : case 1:
597 0 : continue;
598 : case 0:
599 : res = LOG_EOF;
600 : break;
601 : default:
602 : res = LOG_ERR;
603 : break;
604 : }
605 0 : TRC_CRITICAL(GDK, "read failed\n");
606 0 : break;
607 : }
608 : }
609 18 : } else if (tpe == TYPE_str) {
610 : /* efficient string */
611 6 : res = string_reader(lg, r, nr);
612 : } else {
613 58 : for (; res == LOG_OK && nr > 0; nr--) {
614 46 : size_t tlen = lg->rbufsize;
615 46 : void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
616 :
617 46 : if (t == NULL) {
618 0 : if (strstr(GDKerrbuf, "malloc") == NULL)
619 : res = LOG_EOF;
620 : else
621 0 : res = LOG_ERR;
622 0 : TRC_CRITICAL(GDK, "read failed\n");
623 : } else {
624 46 : lg->rbuf = t;
625 46 : lg->rbufsize = tlen;
626 46 : if ((r && BUNappend(r, t, true) != GDK_SUCCEED)) {
627 0 : TRC_CRITICAL(GDK, "append to bat failed\n");
628 0 : res = LOG_ERR;
629 : }
630 : }
631 : }
632 : }
633 18 : GDKfree(hv);
634 : }
635 :
636 361211 : if (res == LOG_OK) {
637 361211 : if (tr_grow(tr) == GDK_SUCCEED) {
638 361211 : tr->changes[tr->nr].type = l->flag;
639 361211 : if (l->flag == LOG_UPDATE_BULK && offset == -1) {
640 136057 : assert(cands); /* bat r is part of a group of bats logged together. */
641 136057 : struct canditer ci;
642 136057 : canditer_init(&ci, NULL, *cands);
643 136057 : const oid first = canditer_peek(&ci);
644 136057 : const oid last = canditer_last(&ci);
645 136057 : offset = (lng) first;
646 136057 : pnr = (lng) (last - first) + 1;
647 136057 : if (!lg->flushing) {
648 1131 : assert(uid == NULL);
649 1131 : uid = *cands;
650 1131 : BBPfix((*cands)->batCacheid);
651 1131 : tr->changes[tr->nr].type = LOG_UPDATE;
652 : }
653 : }
654 361211 : if (l->flag == LOG_UPDATE_CONST) {
655 96505 : assert(!cands); /* TODO: This might change in the future. */
656 96505 : tr->changes[tr->nr].type = LOG_UPDATE_BULK;
657 : }
658 361211 : tr->changes[tr->nr].nr = pnr;
659 361211 : tr->changes[tr->nr].tt = tpe;
660 361211 : tr->changes[tr->nr].cid = id;
661 361211 : tr->changes[tr->nr].offset = offset;
662 361211 : tr->changes[tr->nr].b = r;
663 361211 : tr->changes[tr->nr].uid = uid;
664 361211 : tr->nr++;
665 : } else {
666 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
667 0 : res = LOG_ERR;
668 : }
669 : }
670 361211 : if (res != LOG_OK) {
671 0 : if (r)
672 0 : BBPreclaim(r);
673 0 : if (cands && uid)
674 0 : BBPunfix((*cands)->batCacheid);
675 0 : else if (uid)
676 0 : BBPreclaim(uid);
677 : }
678 : } else {
679 : /* bat missing ERROR or ignore ? currently error. */
680 0 : TRC_CRITICAL(GDK, "unknown type\n");
681 0 : res = LOG_ERR;
682 : }
683 : return res;
684 : }
685 :
686 :
687 : static gdk_return
688 558077 : la_bat_update_count(logger *lg, log_id id, lng cnt, int tid)
689 : {
690 558077 : BATiter cni = bat_iterator_nolock(lg->catalog_id);
691 :
692 558077 : if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
693 558077 : MT_rwlock_rdlock(&cni.b->thashlock);
694 558077 : BUN p, cp = BUN_NONE;
695 :
696 2153274 : HASHloop_int(cni, cni.b->thash, p, &id) {
697 1277099 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
698 :
699 1277099 : if (lid != lng_nil && lid <= tid)
700 : break;
701 : cp = p;
702 : }
703 558077 : if (cp != BUN_NONE) {
704 557879 : lng ocnt = *(lng *) Tloc(lg->catalog_cnt, cp);
705 557879 : assert(lg->catalog_cnt->hseqbase == 0);
706 557879 : if (ocnt < cnt && BUNreplace(lg->catalog_cnt, cp, &cnt, false) != GDK_SUCCEED) {
707 0 : MT_rwlock_rdunlock(&cni.b->thashlock);
708 0 : return GDK_FAIL;
709 : }
710 : }
711 558077 : MT_rwlock_rdunlock(&cni.b->thashlock);
712 558077 : return GDK_SUCCEED;
713 : }
714 : return GDK_FAIL;
715 : }
716 :
717 : static gdk_return
718 361211 : la_bat_updates(logger *lg, logaction *la, int tid)
719 : {
720 361211 : log_bid bid = internal_find_bat(lg, la->cid, tid);
721 361211 : BAT *b = NULL;
722 :
723 361211 : if (bid < 0)
724 : return GDK_FAIL;
725 361211 : if (!bid) {
726 : /* object already gone, nothing needed */
727 : return GDK_SUCCEED;
728 : }
729 :
730 361206 : if (!lg->flushing) {
731 2250 : b = BATdescriptor(bid);
732 2250 : if (b == NULL)
733 : return GDK_FAIL;
734 : }
735 361206 : BUN cnt = 0;
736 361206 : if (la->type == LOG_UPDATE_BULK) {
737 360057 : if (!lg->flushing) {
738 1119 : cnt = BATcount(b);
739 1119 : int is_msk = (b->ttype == TYPE_msk);
740 : /* handle offset 0 ie clear */
741 1119 : if ( /* DISABLES CODE */ (0) && la->offset == 0 && cnt)
742 : BATclear(b, true);
743 : /* handle offset */
744 1119 : if (cnt <= (BUN) la->offset) {
745 249 : msk t = 1;
746 249 : if (cnt < (BUN) la->offset) { /* insert nils */
747 0 : const void *tv = (is_msk) ? &t : ATOMnilptr(b->ttype);
748 0 : lng i, d = la->offset - BATcount(b);
749 0 : for (i = 0; i < d; i++) {
750 0 : if (BUNappend(b, tv, true) != GDK_SUCCEED) {
751 0 : logbat_destroy(b);
752 0 : return GDK_FAIL;
753 : }
754 : }
755 : }
756 249 : if (BATcount(b) == (BUN) la->offset && BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
757 0 : logbat_destroy(b);
758 0 : return GDK_FAIL;
759 : }
760 : } else {
761 870 : BATiter vi = bat_iterator(la->b);
762 870 : BUN p, q;
763 :
764 4845 : for (p = 0, q = (BUN) la->offset; p < (BUN) la->nr; p++, q++) {
765 5816 : const void *t = BUNtail(vi, p);
766 :
767 3975 : if (q < cnt) {
768 3974 : if (BUNreplace(b, q, t, true) != GDK_SUCCEED) {
769 0 : logbat_destroy(b);
770 0 : bat_iterator_end(&vi);
771 0 : return GDK_FAIL;
772 : }
773 : } else {
774 1 : if (BUNappend(b, t, true) != GDK_SUCCEED) {
775 0 : logbat_destroy(b);
776 0 : bat_iterator_end(&vi);
777 0 : return GDK_FAIL;
778 : }
779 : }
780 : }
781 870 : bat_iterator_end(&vi);
782 : }
783 : }
784 1149 : } else if (la->type == LOG_UPDATE) {
785 1149 : if (!lg->flushing && BATupdate(b, la->uid, la->b, true) != GDK_SUCCEED) {
786 : return GDK_FAIL;
787 : }
788 : }
789 361206 : cnt = (BUN) (la->offset + la->nr);
790 361206 : if (la_bat_update_count(lg, la->cid, cnt, tid) != GDK_SUCCEED) {
791 0 : if (b)
792 0 : logbat_destroy(b);
793 0 : return GDK_FAIL;
794 : }
795 361206 : if (b)
796 2250 : logbat_destroy(b);
797 : return GDK_SUCCEED;
798 : }
799 :
800 : static log_return
801 11276 : log_read_destroy(logger *lg, trans *tr, log_id id)
802 : {
803 11276 : (void) lg;
804 11276 : assert(!lg->inmemory);
805 11276 : if (tr_grow(tr) == GDK_SUCCEED) {
806 11276 : tr->changes[tr->nr].type = LOG_DESTROY;
807 11276 : tr->changes[tr->nr].cid = id;
808 11276 : tr->nr++;
809 11276 : return LOG_OK;
810 : }
811 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
812 0 : return LOG_ERR;
813 : }
814 :
815 : static gdk_return
816 73 : la_bat_destroy(logger *lg, logaction *la, int tid)
817 : {
818 73 : log_bid bid = internal_find_bat(lg, la->cid, tid);
819 :
820 73 : if (bid < 0)
821 : return GDK_FAIL;
822 73 : if (!bid) {
823 : #ifndef NDEBUG
824 0 : GDKwarning("failed to find bid for object %d\n", la->cid);
825 : #endif
826 0 : return GDK_SUCCEED;
827 : }
828 73 : if (bid && log_del_bat(lg, bid) != GDK_SUCCEED)
829 : return GDK_FAIL;
830 : return GDK_SUCCEED;
831 : }
832 :
833 : static log_return
834 156533 : log_read_create(logger *lg, trans *tr, log_id id)
835 : {
836 156533 : bte tt;
837 156533 : int tpe;
838 :
839 156533 : assert(!lg->inmemory);
840 156533 : TRC_DEBUG(WAL, "create %d", id);
841 :
842 156533 : if (mnstr_read(lg->input_log, &tt, 1, 1) != 1) {
843 0 : TRC_CRITICAL(GDK, "read failed\n");
844 0 : return LOG_EOF;
845 : }
846 :
847 156533 : tpe = find_type_nr(lg, tt);
848 : /* read create */
849 156533 : if (tr_grow(tr) == GDK_SUCCEED) {
850 156533 : tr->changes[tr->nr].type = LOG_CREATE;
851 156533 : tr->changes[tr->nr].tt = tpe;
852 156533 : tr->changes[tr->nr].cid = id;
853 156533 : tr->nr++;
854 156533 : return LOG_OK;
855 : }
856 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
857 0 : return LOG_ERR;
858 : }
859 :
860 : static gdk_return
861 277 : la_bat_create(logger *lg, logaction *la, int tid)
862 : {
863 277 : BAT *b;
864 :
865 : /* formerly head column type, should be void */
866 277 : if ((b = COLnew(0, la->tt, BATSIZE, PERSISTENT)) == NULL)
867 : return GDK_FAIL;
868 :
869 277 : if (la->tt < 0)
870 0 : BATtseqbase(b, 0);
871 :
872 554 : if ((b = BATsetaccess(b, BAT_READ)) == NULL ||
873 277 : log_add_bat(lg, b, la->cid, tid) != GDK_SUCCEED) {
874 0 : logbat_destroy(b);
875 0 : return GDK_FAIL;
876 : }
877 277 : logbat_destroy(b);
878 277 : return GDK_SUCCEED;
879 : }
880 :
881 : static gdk_return
882 223 : log_write_new_types(logger *lg, FILE *fp, bool append)
883 : {
884 223 : bte id = 0;
885 :
886 : /* write types and insert into bats */
887 : /* first the fixed sized types */
888 7133 : for (int i = 0; i < GDKatomcnt; i++) {
889 6910 : if (ATOMvarsized(i))
890 1782 : continue;
891 5128 : if (append) {
892 5128 : lg->type_id[i] = id;
893 5128 : lg->type_nr[id] = i;
894 : }
895 5128 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
896 : return GDK_FAIL;
897 5128 : id++;
898 : }
899 : /* second the var sized types */
900 : id = -127; /* start after nil */
901 7133 : for (int i = 0; i < GDKatomcnt; i++) {
902 6910 : if (!ATOMvarsized(i))
903 5128 : continue;
904 1782 : if (append) {
905 1782 : lg->type_id[i] = id;
906 1782 : lg->type_nr[256 + id] = i;
907 : }
908 1782 : if (fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0)
909 : return GDK_FAIL;
910 1782 : id++;
911 : }
912 : return GDK_SUCCEED;
913 : }
914 :
915 : #define TR_SIZE 1024
916 :
917 : static trans *
918 55780 : tr_create(trans *tr, int tid)
919 : {
920 55780 : trans *ntr = GDKmalloc(sizeof(trans));
921 :
922 55780 : if (ntr == NULL)
923 : return NULL;
924 55780 : ntr->tid = tid;
925 55780 : ntr->sz = TR_SIZE;
926 55780 : ntr->nr = 0;
927 55780 : ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
928 55780 : if (ntr->changes == NULL) {
929 0 : GDKfree(ntr);
930 0 : return NULL;
931 : }
932 55780 : ntr->tr = tr;
933 55780 : return ntr;
934 : }
935 :
936 : static gdk_return
937 529020 : la_apply(logger *lg, logaction *c, int tid)
938 : {
939 529020 : gdk_return ret = GDK_SUCCEED;
940 :
941 529020 : switch (c->type) {
942 361211 : case LOG_UPDATE_BULK:
943 : case LOG_UPDATE:
944 361211 : ret = la_bat_updates(lg, c, tid);
945 361211 : break;
946 156533 : case LOG_CREATE:
947 156533 : if (!lg->flushing)
948 277 : ret = la_bat_create(lg, c, tid);
949 : break;
950 11276 : case LOG_DESTROY:
951 11276 : if (!lg->flushing)
952 73 : ret = la_bat_destroy(lg, c, tid);
953 : break;
954 : default:
955 0 : MT_UNREACHABLE();
956 : }
957 529020 : return ret;
958 : }
959 :
960 : static void
961 529020 : la_destroy(logaction *c)
962 : {
963 529020 : if ((c->type == LOG_UPDATE || c->type == LOG_UPDATE_BULK) && c->b)
964 2250 : logbat_destroy(c->b);
965 529020 : if (c->type == LOG_UPDATE && c->uid)
966 1131 : logbat_destroy(c->uid);
967 529020 : }
968 :
969 : static gdk_return
970 529020 : tr_grow(trans *tr)
971 : {
972 529020 : if (tr->nr == tr->sz) {
973 9 : logaction *changes;
974 9 : tr->sz <<= 1;
975 9 : changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
976 9 : if (changes == NULL)
977 : return GDK_FAIL;
978 9 : tr->changes = changes;
979 : }
980 : /* cleanup the next */
981 529020 : tr->changes[tr->nr].b = NULL;
982 529020 : return GDK_SUCCEED;
983 : }
984 :
985 : static trans *
986 55780 : tr_destroy(trans *tr)
987 : {
988 55780 : trans *r = tr->tr;
989 :
990 55780 : GDKfree(tr->changes);
991 55780 : GDKfree(tr);
992 55780 : return r;
993 : }
994 :
995 : static trans *
996 0 : tr_abort_(logger *lg, trans *tr, int s)
997 : {
998 0 : int i;
999 :
1000 0 : (void) lg;
1001 :
1002 0 : TRC_DEBUG(WAL, "abort");
1003 :
1004 0 : for (i = s; i < tr->nr; i++)
1005 0 : la_destroy(&tr->changes[i]);
1006 0 : return tr_destroy(tr);
1007 : }
1008 :
1009 : static trans *
1010 0 : tr_abort(logger *lg, trans *tr)
1011 : {
1012 0 : return tr_abort_(lg, tr, 0);
1013 : }
1014 :
1015 : static trans *
1016 55780 : tr_commit(logger *lg, trans *tr)
1017 : {
1018 55780 : int i;
1019 :
1020 55780 : TRC_DEBUG(WAL, "commit");
1021 :
1022 584800 : for (i = 0; i < tr->nr; i++) {
1023 529020 : if (la_apply(lg, &tr->changes[i], tr->tid) != GDK_SUCCEED) {
1024 0 : TRC_CRITICAL(GDK, "aborting transaction\n");
1025 0 : do {
1026 0 : tr = tr_abort_(lg, tr, i);
1027 0 : } while (tr != NULL);
1028 : return (trans *) -1;
1029 : }
1030 529020 : la_destroy(&tr->changes[i]);
1031 : }
1032 55780 : lg->saved_tid = tr->tid;
1033 55780 : return tr_destroy(tr);
1034 : }
1035 :
1036 : static gdk_return
1037 112 : log_read_types_file(logger *lg, FILE *fp)
1038 : {
1039 112 : int id = 0;
1040 112 : char atom_name[IDLENGTH];
1041 :
1042 : /* scanf should use IDLENGTH somehow */
1043 3540 : while (fscanf(fp, "%d,%63s\n", &id, atom_name) == 2) {
1044 3428 : int i = ATOMindex(atom_name);
1045 :
1046 3428 : if (id < -127 || id > 127 || i < 0) {
1047 0 : GDKerror("unknown type in log file '%s'\n", atom_name);
1048 0 : return GDK_FAIL;
1049 : }
1050 3428 : lg->type_id[i] = (int8_t) id;
1051 3428 : lg->type_nr[id < 0 ? 256 + id : id] = i;
1052 : }
1053 : return GDK_SUCCEED;
1054 : }
1055 :
1056 :
1057 : gdk_return
1058 223 : log_create_types_file(logger *lg, const char *filename, bool append)
1059 : {
1060 223 : FILE *fp;
1061 :
1062 223 : if ((fp = MT_fopen(filename, "w")) == NULL) {
1063 0 : GDKerror("cannot create log file %s\n", filename);
1064 0 : return GDK_FAIL;
1065 : }
1066 223 : if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
1067 0 : fclose(fp);
1068 0 : GDKerror("writing log file %s failed", filename);
1069 0 : if (MT_remove(filename) < 0)
1070 0 : GDKsyserror("remove %s failed\n", filename);
1071 0 : return GDK_FAIL;
1072 : }
1073 :
1074 223 : if (log_write_new_types(lg, fp, append) != GDK_SUCCEED) {
1075 0 : fclose(fp);
1076 0 : GDKerror("writing log file %s failed", filename);
1077 0 : if (MT_remove(filename) < 0)
1078 0 : GDKsyserror("remove %s failed\n", filename);
1079 0 : return GDK_FAIL;
1080 : }
1081 223 : if (fflush(fp) < 0 || (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)
1082 : #if defined(_MSC_VER)
1083 : && _commit(_fileno(fp)) < 0
1084 : #elif defined(HAVE_FDATASYNC)
1085 0 : && fdatasync(fileno(fp)) < 0
1086 : #elif defined(HAVE_FSYNC)
1087 : && fsync(fileno(fp)) < 0
1088 : #endif
1089 : )) {
1090 0 : GDKsyserror("flushing log file %s failed", filename);
1091 0 : fclose(fp);
1092 0 : if (MT_remove(filename) < 0)
1093 0 : GDKsyserror("remove %s failed\n", filename);
1094 0 : return GDK_FAIL;
1095 : }
1096 223 : if (fclose(fp) < 0) {
1097 0 : GDKsyserror("closing log file %s failed", filename);
1098 0 : if (MT_remove(filename) < 0)
1099 0 : GDKsyserror("remove %s failed\n", filename);
1100 0 : return GDK_FAIL;
1101 : }
1102 : return GDK_SUCCEED;
1103 : }
1104 :
1105 : #define rotation_lock(lg) MT_lock_set(&(lg)->rotation_lock)
1106 : #define rotation_unlock(lg) MT_lock_unset(&(lg)->rotation_lock)
1107 :
1108 : static gdk_return
1109 11994 : log_open_output(logger *lg)
1110 : {
1111 11994 : logged_range *new_range = (logged_range *) GDKmalloc(sizeof(logged_range));
1112 :
1113 11994 : if (!new_range) {
1114 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1115 0 : return GDK_FAIL;
1116 : }
1117 11994 : if (!LOG_DISABLED(lg)) {
1118 11993 : char id[32];
1119 11993 : char *filename;
1120 :
1121 11993 : if (snprintf(id, sizeof(id), LLFMT, lg->id) >= (int) sizeof(id)) {
1122 0 : TRC_CRITICAL(GDK, "filename is too large\n");
1123 0 : GDKfree(new_range);
1124 0 : return GDK_FAIL;
1125 : }
1126 11993 : if ((filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id)) == NULL) {
1127 0 : TRC_CRITICAL(GDK, "allocation failure\n");
1128 0 : GDKfree(new_range);
1129 0 : return GDK_FAIL;
1130 : }
1131 :
1132 11993 : TRC_INFO(WAL, "opening %s.%s", LOGFILE, id);
1133 11993 : new_range->output_log = open_wstream(filename);
1134 11993 : if (new_range->output_log) {
1135 11993 : short byteorder = 1234;
1136 11993 : mnstr_write(new_range->output_log, &byteorder, sizeof(byteorder), 1);
1137 : }
1138 :
1139 11993 : if (new_range->output_log == NULL || mnstr_errnr(new_range->output_log) != MNSTR_NO__ERROR) {
1140 0 : TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, mnstr_peek_error(NULL));
1141 0 : close_stream(new_range->output_log);
1142 0 : GDKfree(new_range);
1143 0 : GDKfree(filename);
1144 0 : return GDK_FAIL;
1145 : }
1146 11993 : GDKfree(filename);
1147 : }
1148 11994 : ATOMIC_INIT(&new_range->refcount, 1);
1149 11994 : ATOMIC_INIT(&new_range->last_ts, 0);
1150 11994 : ATOMIC_INIT(&new_range->flushed_ts, 0);
1151 11994 : ATOMIC_INIT(&new_range->drops, 0);
1152 11994 : new_range->id = lg->id;
1153 11994 : new_range->next = NULL;
1154 11994 : logged_range *current = lg->current;
1155 11994 : assert(current && current->next == NULL);
1156 11994 : new_range->cnt = current->cnt;
1157 11994 : current->next = new_range;
1158 11994 : lg->file_age = GDKusec();
1159 11994 : return GDK_SUCCEED;
1160 : }
1161 :
1162 : static inline void
1163 12201 : log_close_input(logger *lg)
1164 : {
1165 12201 : if (!lg->inmemory && lg->input_log) {
1166 11754 : TRC_INFO(WAL, "closing input log %s", mnstr_name(lg->input_log));
1167 11754 : close_stream(lg->input_log);
1168 : }
1169 12201 : lg->input_log = NULL;
1170 12201 : }
1171 :
1172 : static inline void
1173 335 : log_close_output(logger *lg)
1174 : {
1175 335 : if (!LOG_DISABLED(lg) && lg->current->output_log) {
1176 334 : TRC_INFO(WAL, "closing output log %s", mnstr_name(lg->current->output_log));
1177 334 : close_stream(lg->current->output_log);
1178 : }
1179 335 : lg->current->output_log = NULL;
1180 335 : }
1181 :
1182 : static gdk_return
1183 11866 : log_open_input(logger *lg, const char *filename, bool *filemissing)
1184 : {
1185 11866 : TRC_INFO(WAL, "opening input log %s", filename);
1186 11866 : lg->input_log = open_rstream(filename);
1187 :
1188 : /* if the file doesn't exist, there is nothing to be read back */
1189 11866 : if (lg->input_log == NULL || mnstr_errnr(lg->input_log) != MNSTR_NO__ERROR) {
1190 112 : log_close_input(lg);
1191 112 : *filemissing = true;
1192 112 : return GDK_SUCCEED;
1193 : }
1194 11754 : short byteorder;
1195 11754 : switch (mnstr_read(lg->input_log, &byteorder, sizeof(byteorder), 1)) {
1196 0 : case -1:
1197 0 : log_close_input(lg);
1198 0 : TRC_CRITICAL(GDK, "read failed\n");
1199 0 : return GDK_FAIL;
1200 4 : case 0:
1201 : /* empty file is ok */
1202 4 : log_close_input(lg);
1203 4 : return GDK_SUCCEED;
1204 11750 : case 1:
1205 : /* if not empty, must start with correct byte order mark */
1206 11750 : if (byteorder != 1234) {
1207 0 : TRC_CRITICAL(GDK, "incorrect byte order word in file %s\n", filename);
1208 0 : log_close_input(lg);
1209 0 : return GDK_FAIL;
1210 : }
1211 : break;
1212 : }
1213 : return GDK_SUCCEED;
1214 : }
1215 :
1216 : static log_return
1217 11750 : log_read_transaction(logger *lg, uint32_t *updated, BUN maxupdated)
1218 : {
1219 11750 : logformat l;
1220 11750 : trans *tr = NULL;
1221 11750 : log_return err = LOG_OK;
1222 11750 : bool ok = true;
1223 11750 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
1224 :
1225 11750 : (void) maxupdated; /* only used inside assert() */
1226 :
1227 11750 : if (!lg->flushing)
1228 177 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
1229 :
1230 11750 : BAT *cands = NULL; /* used in case of LOG_BAT_GROUP */
1231 :
1232 755954 : while (err == LOG_OK && (ok = log_read_format(lg, &l))) {
1233 744204 : if (l.flag == 0 && l.id == 0) {
1234 11750 : err = LOG_EOF;
1235 : break;
1236 : }
1237 :
1238 744204 : TRC_DEBUG_IF(WAL) {
1239 0 : if (l.flag > 0 && l.flag != LOG_CLEAR &&
1240 : l.flag < (bte) (sizeof(log_commands) / sizeof(log_commands[0])))
1241 0 : TRC_DEBUG_ENDIF(WAL, "%s %d", log_commands[(int) l.flag], l.id);
1242 : else
1243 0 : TRC_DEBUG_ENDIF(WAL, "%d %d", l.flag, l.id);
1244 : }
1245 744204 : switch (l.flag) {
1246 556465 : case LOG_UPDATE_CONST:
1247 : case LOG_UPDATE_BULK:
1248 : case LOG_UPDATE:
1249 : case LOG_CREATE:
1250 : case LOG_DESTROY:
1251 556465 : if (updated && BAThash(lg->catalog_id) == GDK_SUCCEED) {
1252 553626 : BATiter cni = bat_iterator(lg->catalog_id);
1253 553626 : BUN p;
1254 553626 : BUN posnew = BUN_NONE;
1255 553626 : BUN posold = BUN_NONE;
1256 553626 : MT_rwlock_rdlock(&cni.b->thashlock);
1257 4267704 : HASHloop_int(cni, cni.b->thash, p, &l.id) {
1258 3091731 : lng lid = *(lng *) Tloc(lg->catalog_lid, p);
1259 3091731 : if (lid == lng_nil || lid > tr->tid)
1260 : posnew = p;
1261 1421259 : else if (lid == tr->tid)
1262 3430327 : posold = p;
1263 : }
1264 553626 : MT_rwlock_rdunlock(&cni.b->thashlock);
1265 553626 : bat_iterator_end(&cni);
1266 : /* Normally at this point, posnew is the
1267 : * location of the bat that this
1268 : * transaction is working on, and posold
1269 : * is the location of the previous
1270 : * version of the bat. If LOG_CREATE,
1271 : * both are relevant, since the latter
1272 : * is the new bat, and the former is the
1273 : * to-be-destroyed bat. For
1274 : * LOG_DESTROY, only posnew should be
1275 : * relevant, but for the other types, if
1276 : * the table is destroyed later in the
1277 : * same transaction, we need posold, and
1278 : * else (the normal case) we need
1279 : * posnew. */
1280 553626 : if (posnew != BUN_NONE) {
1281 542416 : assert(posnew < maxupdated);
1282 542416 : updated[posnew / 32] |= 1U << (posnew % 32);
1283 : }
1284 553626 : if ((l.flag == LOG_CREATE || posnew == BUN_NONE) && posold != BUN_NONE) {
1285 163715 : assert(posold < maxupdated);
1286 163715 : updated[posold / 32] |= 1U << (posold % 32);
1287 : }
1288 : }
1289 : break;
1290 : default:
1291 : /* do nothing */
1292 : break;
1293 : }
1294 : /* the functions we call here can succeed (LOG_OK),
1295 : * but they can also fail for two different reasons:
1296 : * they can run out of input (LOG_EOF -- this is not
1297 : * serious, we just abort the remaining transactions),
1298 : * or some malloc or BAT update fails (LOG_ERR -- this
1299 : * is serious, we must abort the complete process);
1300 : * the latter failure causes the current function to
1301 : * return GDK_FAIL */
1302 744204 : switch (l.flag) {
1303 55780 : case LOG_START:
1304 55780 : assert(!lg->flushing || l.id <= lg->tid);
1305 55780 : if (!lg->flushing && l.id > lg->tid)
1306 136 : lg->tid = l.id; /* should only happen during initialization */
1307 55780 : if ((tr = tr_create(tr, l.id)) == NULL) {
1308 0 : TRC_CRITICAL(GDK, "memory allocation failed\n");
1309 0 : err = LOG_ERR;
1310 0 : break;
1311 : }
1312 55780 : TRC_DEBUG(WAL, "tstart %d\n", tr->tid);
1313 : break;
1314 55780 : case LOG_END:
1315 55780 : if (tr == NULL)
1316 : err = LOG_EOF;
1317 55780 : else if (tr->tid != l.id) /* abort record */
1318 0 : tr = tr_abort(lg, tr);
1319 : else
1320 55780 : tr = tr_commit(lg, tr);
1321 : break;
1322 2683 : case LOG_SEQ:
1323 2683 : err = log_read_seq(lg, &l);
1324 2683 : break;
1325 388656 : case LOG_UPDATE_CONST:
1326 : case LOG_UPDATE_BULK:
1327 : case LOG_UPDATE:
1328 388656 : if (tr == NULL)
1329 : err = LOG_EOF;
1330 : else {
1331 613795 : err = log_read_updates(lg, tr, &l, l.id, cands ? &cands : NULL);
1332 : }
1333 : break;
1334 156533 : case LOG_CREATE:
1335 156533 : if (tr == NULL)
1336 : err = LOG_EOF;
1337 : else
1338 156533 : err = log_read_create(lg, tr, l.id);
1339 : break;
1340 11276 : case LOG_DESTROY:
1341 11276 : if (tr == NULL)
1342 : err = LOG_EOF;
1343 : else
1344 11276 : err = log_read_destroy(lg, tr, l.id);
1345 : break;
1346 73496 : case LOG_BAT_GROUP:
1347 73496 : if (tr == NULL)
1348 : err = LOG_EOF;
1349 : else {
1350 73496 : if (l.id > 0) {
1351 : /* START OF LOG_BAT_GROUP */
1352 36748 : cands = COLnew(0, TYPE_void, 0, SYSTRANS);
1353 36748 : if (!cands) {
1354 0 : TRC_CRITICAL(GDK, "creating bat failed\n");
1355 0 : err = LOG_ERR;
1356 : }
1357 36748 : } else if (cands == NULL) {
1358 : /* should have gone through the
1359 : * above option earlier */
1360 0 : TRC_CRITICAL(GDK, "unexpected error\n");
1361 0 : err = LOG_ERR;
1362 : } else {
1363 : /* END OF LOG_BAT_GROUP */
1364 36748 : BBPunfix(cands->batCacheid);
1365 36748 : cands = NULL;
1366 : }
1367 : }
1368 : break;
1369 0 : default:
1370 0 : TRC_CRITICAL(GDK, "unrecognized log entry %d", l.flag);
1371 0 : err = LOG_ERR;
1372 : }
1373 744204 : if (tr == (trans *) -1) {
1374 : /* message already generated by tr_commit */
1375 : err = LOG_ERR;
1376 11750 : tr = NULL;
1377 : break;
1378 : }
1379 : }
1380 11750 : while (tr) {
1381 0 : TRC_WARNING(GDK, "aborting transaction\n");
1382 0 : tr = tr_abort(lg, tr);
1383 : }
1384 11750 : if (!lg->flushing)
1385 177 : ATOMIC_SET(&GDKdebug, dbg);
1386 :
1387 11750 : BBPreclaim(cands);
1388 11750 : if (!ok)
1389 11750 : return LOG_EOF;
1390 : return err;
1391 : }
1392 :
1393 : static gdk_return
1394 293 : log_readlog(logger *lg, const char *filename, bool *filemissing)
1395 : {
1396 293 : log_return err = LOG_OK;
1397 293 : time_t t0, t1;
1398 293 : struct stat sb;
1399 :
1400 293 : assert(!lg->inmemory);
1401 :
1402 293 : TRC_INFO(WAL, "opening %s\n", filename);
1403 :
1404 293 : gdk_return res = log_open_input(lg, filename, filemissing);
1405 293 : if (!lg->input_log || res != GDK_SUCCEED)
1406 : return res;
1407 177 : int fd;
1408 177 : if ((fd = getFileNo(lg->input_log)) < 0 || fstat(fd, &sb) < 0) {
1409 0 : GDKsyserror("fstat on opened file %s failed\n", filename);
1410 0 : log_close_input(lg);
1411 : /* If the file could be opened, but fstat fails,
1412 : * something weird is going on */
1413 0 : return GDK_FAIL;
1414 : }
1415 177 : t0 = time(NULL);
1416 177 : TRC_INFO_IF(WAL) {
1417 0 : TRC_INFO_ENDIF(WAL, "Start reading the write-ahead log '%s'\n", filename);
1418 0 : GDKtracer_flush_buffer();
1419 : }
1420 354 : while (err != LOG_EOF && err != LOG_ERR) {
1421 177 : t1 = time(NULL);
1422 177 : if (t1 - t0 > 10) {
1423 0 : lng fpos;
1424 0 : t0 = t1;
1425 : /* not more than once every 10 seconds */
1426 0 : fpos = (lng) getfilepos(getFile(lg->input_log));
1427 0 : TRC_INFO_IF(WAL) {
1428 0 : if (fpos >= 0) {
1429 0 : TRC_INFO_ENDIF(WAL, "still reading write-ahead log \"%s\" (%d%% done)\n",
1430 : filename, (int) ((fpos * 100 + 50) / sb.st_size));
1431 0 : GDKtracer_flush_buffer();
1432 : }
1433 : }
1434 : }
1435 177 : err = log_read_transaction(lg, NULL, 0);
1436 : }
1437 177 : log_close_input(lg);
1438 177 : lg->input_log = NULL;
1439 :
1440 : /* remaining transactions are not committed, ie abort */
1441 177 : TRC_INFO_IF(WAL) {
1442 0 : TRC_INFO_ENDIF(WAL, "Finished reading the write-ahead log '%s'\n", filename);
1443 0 : GDKtracer_flush_buffer();
1444 : }
1445 : /* we cannot distinguish errors from incomplete transactions
1446 : * (even if we would log aborts in the logs). So we simply
1447 : * abort and move to the next log file */
1448 177 : return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
1449 : }
1450 :
1451 : /*
1452 : * The log files are incrementally numbered, starting from 2. They are
1453 : * processed in the same sequence.
1454 : */
1455 : static gdk_return
1456 112 : log_readlogs(logger *lg, const char *filename)
1457 : {
1458 112 : gdk_return res = GDK_SUCCEED;
1459 :
1460 112 : assert(!lg->inmemory);
1461 112 : TRC_DEBUG(WAL, "logger id is " LLFMT " last logger id is " LLFMT "\n", lg->id, lg->saved_id);
1462 :
1463 112 : char log_filename[FILENAME_MAX];
1464 112 : if (lg->saved_id >= lg->id) {
1465 112 : bool filemissing = false;
1466 :
1467 112 : lg->id = lg->saved_id + 1;
1468 405 : while (res == GDK_SUCCEED && !filemissing) {
1469 293 : if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
1470 0 : GDKerror("Logger filename path is too large\n");
1471 0 : return GDK_FAIL;
1472 : }
1473 293 : res = log_readlog(lg, log_filename, &filemissing);
1474 293 : if (!filemissing) {
1475 181 : lg->saved_id++;
1476 181 : lg->id++;
1477 : }
1478 : }
1479 : }
1480 : return res;
1481 : }
1482 :
1483 : static gdk_return
1484 11456 : log_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1485 : {
1486 11456 : TRC_DEBUG(WAL, "commit");
1487 :
1488 11456 : return bm_commit(lg, pending, updated, maxupdated);
1489 : }
1490 :
1491 : static gdk_return
1492 112 : check_version(logger *lg, FILE *fp, const char *fn, const char *logdir, const char *filename, bool *needsnew)
1493 : {
1494 112 : int version = 0;
1495 :
1496 112 : assert(!lg->inmemory);
1497 112 : if (fscanf(fp, "%6d", &version) != 1) {
1498 0 : GDKerror("Could not read the version number from the file '%s/log'.\n", lg->dir);
1499 0 : fclose(fp);
1500 0 : return GDK_FAIL;
1501 : }
1502 112 : if (version < 52300) { /* first CATALOG_VERSION for "new" log format */
1503 0 : lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
1504 0 : lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
1505 0 : lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
1506 0 : if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
1507 0 : GDKerror("cannot create catalog bats");
1508 0 : fclose(fp);
1509 0 : return GDK_FAIL;
1510 : }
1511 : /* old_logger_load always closes fp */
1512 0 : if (old_logger_load(lg, fn, logdir, fp, version, filename) != GDK_SUCCEED) {
1513 : /*loads drop no longer needed catalog, snapshots bats */
1514 : /*convert catalog_oid -> catalog_id (lng->int) */
1515 0 : GDKerror("Incompatible database version %06d, "
1516 : "this server supports version %06d.\n%s",
1517 : version, lg->version,
1518 : version <
1519 : lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
1520 0 : return GDK_FAIL;
1521 : }
1522 0 : *needsnew = false; /* already written a new log file */
1523 0 : return GDK_SUCCEED;
1524 112 : } else if (version != lg->version) {
1525 0 : if (lg->prefuncp == NULL ||
1526 0 : (*lg->prefuncp) (lg->funcdata, version, lg->version) != GDK_SUCCEED) {
1527 0 : GDKerror("Incompatible database version %06d, "
1528 : "this server supports version %06d.\n%s",
1529 : version, lg->version,
1530 : version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
1531 0 : fclose(fp);
1532 0 : return GDK_FAIL;
1533 : }
1534 0 : *needsnew = true; /* we need to write a new log file */
1535 : } else {
1536 112 : lg->postfuncp = NULL; /* don't call */
1537 112 : *needsnew = false; /* log file already up-to-date */
1538 : }
1539 224 : if (fgetc(fp) != '\n' || /* skip \n */
1540 112 : fgetc(fp) != '\n') { /* skip \n */
1541 0 : GDKerror("Badly formatted log file");
1542 0 : fclose(fp);
1543 0 : return GDK_FAIL;
1544 : }
1545 112 : if (log_read_types_file(lg, fp) != GDK_SUCCEED) {
1546 0 : fclose(fp);
1547 0 : return GDK_FAIL;
1548 : }
1549 112 : fclose(fp);
1550 112 : return GDK_SUCCEED;
1551 : }
1552 :
1553 : static BAT *
1554 334 : bm_tids(BAT *b, BAT *d)
1555 : {
1556 334 : BUN sz = BATcount(b);
1557 334 : BAT *tids = BATdense(0, 0, sz);
1558 :
1559 334 : if (tids == NULL)
1560 : return NULL;
1561 :
1562 334 : if (BATcount(d)) {
1563 334 : BAT *diff = BATdiff(tids, d, NULL, NULL, false, false, BUN_NONE);
1564 334 : logbat_destroy(tids);
1565 334 : tids = diff;
1566 : }
1567 : return tids;
1568 : }
1569 :
1570 :
1571 : static gdk_return
1572 7634 : log_switch_bat(BAT *old, BAT *new, const char *fn, const char *name)
1573 : {
1574 7634 : char bak[IDLENGTH];
1575 :
1576 7634 : if (BATmode(old, true) != GDK_SUCCEED) {
1577 0 : GDKerror("cannot convert old %s to transient", name);
1578 0 : return GDK_FAIL;
1579 : }
1580 7634 : if (strconcat_len(bak, sizeof(bak), fn, "_", name, NULL) >= sizeof(bak)) {
1581 0 : GDKerror("name %s_%s too long\n", fn, name);
1582 0 : return GDK_FAIL;
1583 : }
1584 7634 : if (BBPrename(old, NULL) != 0 || BBPrename(new, bak) != 0) {
1585 0 : GDKerror("rename (%s) failed\n", bak);
1586 0 : return GDK_FAIL;
1587 : }
1588 7634 : BBPretain(new->batCacheid);
1589 7634 : return GDK_SUCCEED;
1590 : }
1591 :
1592 : static gdk_return
1593 336 : bm_get_counts(logger *lg)
1594 : {
1595 336 : BUN p, q;
1596 336 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
1597 :
1598 28780 : BATloop(lg->catalog_bid, p, q) {
1599 28444 : oid pos = p;
1600 28444 : lng cnt = 0;
1601 28444 : lng lid = lng_nil;
1602 :
1603 28444 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
1604 28444 : BAT *b = BBPquickdesc(bids[p]);
1605 28444 : assert(b);
1606 28444 : cnt = BATcount(b);
1607 : } else {
1608 0 : lid = BBP_desc(bids[p]) && log_find(lg->catalog_bid, lg->dcatalog, bids[p]) == BUN_NONE ? 1 : -1;
1609 : }
1610 28444 : if (BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED)
1611 0 : return GDK_FAIL;
1612 28444 : if (BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
1613 : return GDK_FAIL;
1614 : }
1615 : return GDK_SUCCEED;
1616 : }
1617 :
1618 : static int
1619 7634 : subcommit_list_add(int next, bat *n, BUN *sizes, bat bid, BUN sz)
1620 : {
1621 7634 : assert(sz <= BBP_desc(bid)->batCount || sz == BUN_NONE);
1622 1602947 : for (int i = 0; i < next; i++) {
1623 1595313 : if (n[i] == bid) {
1624 0 : sizes[i] = sz;
1625 0 : return next;
1626 : }
1627 : }
1628 7634 : n[next] = bid;
1629 7634 : sizes[next++] = sz;
1630 7634 : return next;
1631 : }
1632 :
1633 : static int
1634 2322 : cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng *cnts,
1635 : BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, BUN cleanup)
1636 : {
1637 2322 : BAT *nbids, *noids, *ncnts, *nlids, *ndels;
1638 2322 : BUN p, q;
1639 2322 : int err = 0, rcnt = 0;
1640 :
1641 2322 : BUN ocnt = BATcount(catalog_bid);
1642 2322 : nbids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1643 2322 : noids = logbat_new(TYPE_int, ocnt - cleanup, PERSISTENT);
1644 2322 : ncnts = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1645 2322 : nlids = logbat_new(TYPE_lng, ocnt - cleanup, SYSTRANS);
1646 2322 : ndels = logbat_new(TYPE_oid, BATcount(dcatalog) - cleanup, PERSISTENT);
1647 :
1648 2322 : if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == NULL || ndels == NULL) {
1649 0 : logbat_destroy(nbids);
1650 0 : logbat_destroy(noids);
1651 0 : logbat_destroy(ncnts);
1652 0 : logbat_destroy(nlids);
1653 0 : logbat_destroy(ndels);
1654 0 : return 0;
1655 : }
1656 :
1657 2322 : oid *poss = Tloc(dcatalog, 0);
1658 178110 : BATloop(dcatalog, p, q) {
1659 175788 : oid pos = poss[p];
1660 :
1661 175788 : if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid)
1662 0 : continue;
1663 :
1664 175788 : if (lids[pos] >= 0) {
1665 175788 : BAT *lb;
1666 175788 : bat bid = bids[pos];
1667 :
1668 175788 : if ((lb = BBP_desc(bid)) == NULL || BATmode(lb, true /*transient */ ) != GDK_SUCCEED) {
1669 0 : GDKwarning("Failed to set bat(%d) transient\n", bid);
1670 : } else {
1671 175788 : lids[pos] = -1; /* mark as transient */
1672 175788 : r[rcnt++] = bid;
1673 : }
1674 : }
1675 : }
1676 :
1677 2322 : int *oids = (int *) Tloc(catalog_id, 0);
1678 2322 : q = BATcount(catalog_bid);
1679 824610 : for (p = 0; p < q && !err; p++) {
1680 822288 : bat col = bids[p];
1681 822288 : int nid = oids[p];
1682 822288 : lng lid = lids[p];
1683 822288 : lng cnt = cnts[p];
1684 822288 : oid pos = p;
1685 :
1686 : /* only project out the deleted with lid == -1
1687 : * update dcatalog */
1688 822288 : if (lid == -1)
1689 175788 : continue; /* remove */
1690 :
1691 1293000 : if (BUNappend(nbids, &col, false) != GDK_SUCCEED ||
1692 1293000 : BUNappend(noids, &nid, false) != GDK_SUCCEED ||
1693 1293000 : BUNappend(nlids, &lid, false) != GDK_SUCCEED ||
1694 646500 : BUNappend(ncnts, &cnt, false) != GDK_SUCCEED)
1695 : err = 1;
1696 646500 : if (BUNfnd(lg->dcatalog, &pos) != BUN_NONE) {
1697 0 : pos = (oid) (BATcount(nbids) - 1);
1698 0 : if (BUNappend(ndels, &pos, false) != GDK_SUCCEED)
1699 646500 : err = 1;
1700 : }
1701 : }
1702 :
1703 2322 : if (err) {
1704 0 : logbat_destroy(nbids);
1705 0 : logbat_destroy(noids);
1706 0 : logbat_destroy(ndels);
1707 0 : logbat_destroy(ncnts);
1708 0 : logbat_destroy(nlids);
1709 0 : return 0;
1710 : }
1711 : /* point of no return */
1712 4644 : if (log_switch_bat(catalog_bid, nbids, lg->fn, "catalog_bid") != GDK_SUCCEED ||
1713 4644 : log_switch_bat(catalog_id, noids, lg->fn, "catalog_id") != GDK_SUCCEED ||
1714 2322 : log_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != GDK_SUCCEED ||
1715 2322 : (nbids = BATsetaccess(nbids, BAT_READ)) == NULL ||
1716 2322 : (noids = BATsetaccess(noids, BAT_READ)) == NULL ||
1717 2322 : (ndels = BATsetaccess(ndels, BAT_READ)) == NULL) {
1718 0 : logbat_destroy(nbids);
1719 0 : logbat_destroy(noids);
1720 0 : logbat_destroy(ndels);
1721 0 : logbat_destroy(ncnts);
1722 0 : logbat_destroy(nlids);
1723 0 : return -1;
1724 : }
1725 2322 : r[rcnt++] = lg->catalog_bid->batCacheid;
1726 2322 : r[rcnt++] = lg->catalog_id->batCacheid;
1727 2322 : r[rcnt++] = lg->dcatalog->batCacheid;
1728 :
1729 2322 : assert(BATcount(lg->dcatalog) - cleanup == BATcount(ndels));
1730 :
1731 2322 : logbat_destroy(lg->catalog_bid);
1732 2322 : logbat_destroy(lg->catalog_id);
1733 2322 : logbat_destroy(lg->dcatalog);
1734 :
1735 2322 : lg->catalog_bid = nbids;
1736 2322 : lg->catalog_id = noids;
1737 2322 : lg->dcatalog = ndels;
1738 :
1739 : /* failing to rename these two bats is not fatal */
1740 2322 : if (BBPrename(lg->catalog_cnt, NULL) != GDK_SUCCEED)
1741 2322 : GDKclrerr();
1742 2322 : if (BBPrename(lg->catalog_lid, NULL) != GDK_SUCCEED)
1743 2322 : GDKclrerr();
1744 2322 : BBPunfix(lg->catalog_cnt->batCacheid);
1745 2322 : BBPunfix(lg->catalog_lid->batCacheid);
1746 :
1747 2322 : lg->catalog_cnt = ncnts;
1748 2322 : lg->catalog_lid = nlids;
1749 2322 : char bak[FILENAME_MAX];
1750 2322 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_cnt", NULL);
1751 2322 : if (BBPrename(lg->catalog_cnt, bak) < 0)
1752 0 : GDKclrerr();
1753 2322 : strconcat_len(bak, sizeof(bak), lg->fn, "_catalog_lid", NULL);
1754 2322 : if (BBPrename(lg->catalog_lid, bak) < 0)
1755 0 : GDKclrerr();
1756 2322 : rotation_lock(lg);
1757 9971 : for (logged_range *p = lg->pending; p; p = p->next) {
1758 7649 : p->cnt -= cleanup;
1759 : }
1760 2322 : rotation_unlock(lg);
1761 2322 : return rcnt;
1762 : }
1763 :
1764 : /* this function is called with log_lock() held; it releases the lock
1765 : * before returning */
1766 : static gdk_return
1767 11904 : bm_subcommit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
1768 : {
1769 11904 : BUN cnt = pending ? pending->cnt : BATcount(lg->catalog_bid);
1770 11904 : BUN dcnt = BATcount(lg->dcatalog);
1771 11904 : BUN p, q;
1772 11904 : BAT *catalog_bid = lg->catalog_bid;
1773 11904 : BAT *catalog_id = lg->catalog_id;
1774 11904 : BAT *dcatalog = lg->dcatalog;
1775 11904 : BUN nn = 13 + cnt;
1776 11904 : bat *n = GDKmalloc(sizeof(bat) * nn);
1777 11904 : bat *r = GDKmalloc(sizeof(bat) * nn);
1778 11904 : BUN *sizes = GDKmalloc(sizeof(BUN) * nn);
1779 11904 : int i = 0, rcnt = 0;
1780 11904 : gdk_return res;
1781 11904 : const log_bid *bids;
1782 11904 : lng *cnts = NULL, *lids = NULL;
1783 11904 : BUN cleanup = 0;
1784 11904 : lng t0 = 0;
1785 :
1786 11904 : if (n == NULL || r == NULL || sizes == NULL) {
1787 0 : GDKfree(n);
1788 0 : GDKfree(r);
1789 0 : GDKfree(sizes);
1790 0 : log_unlock(lg);
1791 0 : return GDK_FAIL;
1792 : }
1793 :
1794 11904 : sizes[i] = 0;
1795 11904 : n[i++] = 0; /* n[0] is not used */
1796 11904 : bids = (const log_bid *) Tloc(catalog_bid, 0);
1797 11904 : if (lg->catalog_cnt)
1798 11680 : cnts = (lng *) Tloc(lg->catalog_cnt, 0);
1799 11904 : if (lg->catalog_lid)
1800 11680 : lids = (lng *) Tloc(lg->catalog_lid, 0);
1801 3447550 : BATloop(catalog_bid, p, q) {
1802 3435646 : if (lids && lids[p] != lng_nil && lids[p] <= lg->saved_tid) {
1803 175788 : cleanup++;
1804 175788 : if (lids[p] == -1)
1805 0 : continue;
1806 175788 : if (BUNappend(dcatalog, &(oid){p}, true) != GDK_SUCCEED) {
1807 0 : while (BATcount(dcatalog) > dcnt) {
1808 0 : if (BUNdelete(dcatalog, BATcount(dcatalog) - 1) != GDK_SUCCEED) {
1809 0 : TRC_CRITICAL(WAL, "delete after failed append failed\n");
1810 0 : break;
1811 : }
1812 : }
1813 0 : GDKfree(n);
1814 0 : GDKfree(r);
1815 0 : GDKfree(sizes);
1816 0 : log_unlock(lg);
1817 0 : return GDK_FAIL;
1818 : }
1819 : }
1820 3435646 : if (updated && p < maxupdated && (updated[p / 32] & (1U << (p % 32))) == 0) {
1821 1707931 : continue;
1822 : }
1823 1727715 : bat col = bids[p];
1824 :
1825 1727715 : TRC_DEBUG(WAL, "new %s (%d)\n", BBP_logical(col), col);
1826 1727715 : assert(col);
1827 1727715 : sizes[i] = cnts ? (BUN) cnts[p] : 0;
1828 1727715 : n[i++] = col;
1829 : }
1830 : /* now commit catalog, so it's also up to date on disk */
1831 11904 : sizes[i] = cnt;
1832 11904 : n[i++] = catalog_bid->batCacheid;
1833 11904 : sizes[i] = cnt;
1834 11904 : n[i++] = catalog_id->batCacheid;
1835 11904 : sizes[i] = BATcount(dcatalog);
1836 11904 : n[i++] = dcatalog->batCacheid;
1837 :
1838 11904 : if (cleanup) {
1839 2322 : if ((rcnt = cleanup_and_swap(lg, r, bids, lids, cnts,
1840 : catalog_bid, catalog_id, dcatalog,
1841 : cleanup)) < 0) {
1842 0 : GDKfree(n);
1843 0 : GDKfree(r);
1844 0 : GDKfree(sizes);
1845 0 : log_unlock(lg);
1846 0 : return GDK_FAIL;
1847 : }
1848 2322 : cnt -= cleanup;
1849 : }
1850 11904 : if (dcatalog != lg->dcatalog) {
1851 2322 : i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, cnt);
1852 2322 : i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, cnt);
1853 2322 : i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog));
1854 : }
1855 11904 : if (lg->seqs_id) {
1856 11680 : sizes[i] = BATcount(lg->seqs_id);
1857 11680 : n[i++] = lg->seqs_id->batCacheid;
1858 11680 : sizes[i] = BATcount(lg->seqs_id);
1859 11680 : n[i++] = lg->seqs_val->batCacheid;
1860 : }
1861 11904 : if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id) / 2) && BATcount(lg->dseqs) > 10) {
1862 334 : BAT *tids, *ids, *vals;
1863 :
1864 334 : tids = bm_tids(lg->seqs_id, lg->dseqs);
1865 334 : if (tids == NULL) {
1866 0 : GDKfree(n);
1867 0 : GDKfree(r);
1868 0 : GDKfree(sizes);
1869 0 : log_unlock(lg);
1870 0 : return GDK_FAIL;
1871 : }
1872 334 : ids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
1873 334 : vals = logbat_new(TYPE_lng, BATcount(tids), PERSISTENT);
1874 :
1875 334 : if (ids == NULL || vals == NULL) {
1876 0 : logbat_destroy(tids);
1877 0 : logbat_destroy(ids);
1878 0 : logbat_destroy(vals);
1879 0 : GDKfree(n);
1880 0 : GDKfree(r);
1881 0 : GDKfree(sizes);
1882 0 : log_unlock(lg);
1883 0 : return GDK_FAIL;
1884 : }
1885 :
1886 668 : if (BATappend(ids, lg->seqs_id, tids, true) != GDK_SUCCEED ||
1887 334 : BATappend(vals, lg->seqs_val, tids, true) != GDK_SUCCEED) {
1888 0 : logbat_destroy(tids);
1889 0 : logbat_destroy(ids);
1890 0 : logbat_destroy(vals);
1891 0 : GDKfree(n);
1892 0 : GDKfree(r);
1893 0 : GDKfree(sizes);
1894 0 : log_unlock(lg);
1895 0 : return GDK_FAIL;
1896 : }
1897 334 : logbat_destroy(tids);
1898 334 : BATclear(lg->dseqs, true);
1899 :
1900 668 : if (log_switch_bat(lg->seqs_id, ids, lg->fn, "seqs_id") != GDK_SUCCEED ||
1901 334 : log_switch_bat(lg->seqs_val, vals, lg->fn, "seqs_val") != GDK_SUCCEED ||
1902 334 : (ids = BATsetaccess(ids, BAT_READ)) == NULL ||
1903 334 : (vals = BATsetaccess(vals, BAT_READ)) == NULL) {
1904 0 : logbat_destroy(ids);
1905 0 : logbat_destroy(vals);
1906 0 : GDKfree(n);
1907 0 : GDKfree(r);
1908 0 : GDKfree(sizes);
1909 0 : log_unlock(lg);
1910 0 : return GDK_FAIL;
1911 : }
1912 334 : i = subcommit_list_add(i, n, sizes, ids->batCacheid, BATcount(ids));
1913 334 : i = subcommit_list_add(i, n, sizes, vals->batCacheid, BATcount(ids));
1914 :
1915 334 : if (BBP_lrefs(lg->seqs_id->batCacheid) > 0)
1916 267 : r[rcnt++] = lg->seqs_id->batCacheid;
1917 334 : if (BBP_lrefs(lg->seqs_val->batCacheid) > 0)
1918 267 : r[rcnt++] = lg->seqs_val->batCacheid;
1919 :
1920 334 : logbat_destroy(lg->seqs_id);
1921 334 : logbat_destroy(lg->seqs_val);
1922 :
1923 334 : lg->seqs_id = ids;
1924 334 : lg->seqs_val = vals;
1925 : }
1926 11904 : if (lg->seqs_id) {
1927 11680 : sizes[i] = BATcount(lg->dseqs);
1928 11680 : n[i++] = lg->dseqs->batCacheid;
1929 : }
1930 :
1931 11904 : assert((BUN) i <= nn);
1932 11904 : log_unlock(lg);
1933 11904 : TRC_DEBUG_IF(WAL)
1934 0 : t0 = GDKusec();
1935 12128 : res = TMsubcommit_list(n, cnts ? sizes : NULL, i, lg->saved_id, lg->saved_tid);
1936 11904 : TRC_DEBUG(WAL, "subcommit " LLFMT "usec\n", GDKusec() - t0);
1937 11904 : if (res == GDK_SUCCEED) { /* now cleanup */
1938 195192 : for (i = 0; i < rcnt; i++) {
1939 183288 : TRC_DEBUG_IF(WAL) {
1940 0 : TRC_DEBUG_ENDIF(WAL, "release %d\n", r[i]);
1941 0 : if (BBP_lrefs(r[i]) != 2)
1942 0 : TRC_DEBUG_ENDIF(WAL, "release %d %d\n", r[i], BBP_lrefs(r[i]));
1943 : }
1944 183288 : BBPrelease(r[i]);
1945 : }
1946 : }
1947 11904 : GDKfree(n);
1948 11904 : GDKfree(r);
1949 11904 : GDKfree(sizes);
1950 11904 : if (res != GDK_SUCCEED)
1951 0 : TRC_CRITICAL(GDK, "commit failed\n");
1952 : return res;
1953 : }
1954 :
1955 : static gdk_return
1956 335 : log_filename(logger *lg, char bak[FILENAME_MAX], char filename[FILENAME_MAX])
1957 : {
1958 335 : str filenamestr = NULL;
1959 :
1960 335 : if ((filenamestr = GDKfilepath(0, lg->dir, LOGFILE, NULL)) == NULL)
1961 : return GDK_FAIL;
1962 335 : size_t len = strcpy_len(filename, filenamestr, FILENAME_MAX);
1963 335 : GDKfree(filenamestr);
1964 335 : if (len >= FILENAME_MAX) {
1965 0 : GDKerror("Logger filename path is too large\n");
1966 0 : return GDK_FAIL;
1967 : }
1968 335 : if (bak) {
1969 335 : len = strconcat_len(bak, FILENAME_MAX, filename, ".bak", NULL);
1970 335 : if (len >= FILENAME_MAX) {
1971 0 : GDKerror("Logger filename path is too large\n");
1972 0 : return GDK_FAIL;
1973 : }
1974 : }
1975 : return GDK_SUCCEED;
1976 : }
1977 :
1978 : static gdk_return
1979 11754 : log_cleanup(logger *lg, lng id)
1980 : {
1981 11754 : char log_id[FILENAME_MAX];
1982 :
1983 11754 : if (snprintf(log_id, sizeof(log_id), LLFMT, id) >= FILENAME_MAX) {
1984 0 : GDKerror("log_id filename is too large\n");
1985 0 : return GDK_FAIL;
1986 : }
1987 11754 : if (GDKunlink(0, lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
1988 0 : GDKwarning("failed to remove old WAL %s.%s\n", LOGFILE, log_id);
1989 0 : GDKclrerr(); /* clear error from unlink */
1990 : }
1991 : return GDK_SUCCEED;
1992 : }
1993 :
1994 : #ifdef GDKLIBRARY_JSON
1995 : static gdk_return
1996 336 : log_json_upgrade_finalize(void)
1997 : {
1998 336 : int json_tpe = ATOMindex("json");
1999 671 : if (!GDKinmemory(0) &&
2000 335 : GDKunlink(0, BATDIR, "jsonupgradeneeded", NULL) == GDK_FAIL) {
2001 0 : TRC_CRITICAL(GDK, "Failed to remove json upgrade signal file");
2002 0 : return GDK_FAIL;
2003 : }
2004 336 : BATatoms[json_tpe].atomRead = (void *(*)(void *, size_t *, stream *, size_t))strRead;
2005 :
2006 336 : return GDK_SUCCEED;
2007 : }
2008 : #endif
2009 :
2010 : /* Load data from the logger logdir
2011 : * Initialize new directories and catalog files if none are present,
2012 : * unless running in read-only mode
2013 : * Load data and persist it in the BATs */
2014 : static gdk_return
2015 336 : log_load(const char *fn, const char *logdir, logger *lg, char filename[FILENAME_MAX])
2016 : {
2017 336 : FILE *fp = NULL;
2018 336 : char bak[FILENAME_MAX];
2019 336 : bat catalog_bid, catalog_id, dcatalog;
2020 336 : bool needcommit = false;
2021 336 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
2022 336 : bool readlogs = false;
2023 336 : bool needsnew = false; /* need to write new log file? */
2024 :
2025 : /* refactor */
2026 336 : if (!LOG_DISABLED(lg)) {
2027 335 : if (log_filename(lg, bak, filename) != GDK_SUCCEED)
2028 0 : goto error;
2029 : }
2030 :
2031 336 : lg->catalog_bid = NULL;
2032 336 : lg->catalog_id = NULL;
2033 336 : lg->catalog_cnt = NULL;
2034 336 : lg->catalog_lid = NULL;
2035 336 : lg->dcatalog = NULL;
2036 :
2037 336 : lg->seqs_id = NULL;
2038 336 : lg->seqs_val = NULL;
2039 336 : lg->dseqs = NULL;
2040 :
2041 336 : if (!LOG_DISABLED(lg)) {
2042 : /* try to open logfile backup, or failing that, the file
2043 : * itself. we need to know whether this file exists when
2044 : * checking the database consistency later on */
2045 335 : if ((fp = MT_fopen(bak, "r")) != NULL) {
2046 0 : fclose(fp);
2047 0 : fp = NULL;
2048 0 : if (GDKunlink(0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED ||
2049 0 : GDKmove(0, lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL, true) != GDK_SUCCEED)
2050 0 : goto error;
2051 335 : } else if (errno != ENOENT) {
2052 0 : GDKsyserror("open %s failed", bak);
2053 0 : goto error;
2054 : }
2055 335 : fp = MT_fopen(filename, "r");
2056 335 : if (fp == NULL && errno != ENOENT) {
2057 0 : GDKsyserror("open %s failed", filename);
2058 0 : goto error;
2059 : }
2060 : }
2061 :
2062 336 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2063 336 : catalog_bid = BBPindex(bak);
2064 :
2065 : /* initialize arrays for type mapping, to be read from disk */
2066 336 : memset(lg->type_id, -1, sizeof(lg->type_id));
2067 336 : memset(lg->type_nr, 255, sizeof(lg->type_nr));
2068 :
2069 : /* this is intentional - if catalog_bid is 0, force it to find
2070 : * the persistent catalog */
2071 336 : if (catalog_bid == 0) {
2072 : /* catalog does not exist, so the log file also
2073 : * shouldn't exist */
2074 224 : if (fp != NULL) {
2075 0 : GDKerror("there is no logger catalog, "
2076 : "but there is a log file.\n");
2077 0 : goto error;
2078 : }
2079 :
2080 224 : lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2081 224 : lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
2082 224 : lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
2083 :
2084 224 : if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
2085 0 : GDKerror("cannot create catalog bats");
2086 0 : goto error;
2087 : }
2088 224 : if ((lg->catalog_bid = BATsetaccess(lg->catalog_bid, BAT_READ)) == NULL ||
2089 224 : (lg->catalog_id = BATsetaccess(lg->catalog_id, BAT_READ)) == NULL ||
2090 224 : (lg->dcatalog = BATsetaccess(lg->dcatalog, BAT_READ)) == NULL) {
2091 0 : goto error;
2092 : }
2093 224 : TRC_INFO(WAL, "create %s catalog\n", fn);
2094 :
2095 : /* give the catalog bats names so we can find them
2096 : * next time */
2097 224 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
2098 224 : if (BBPrename(lg->catalog_bid, bak) < 0) {
2099 0 : goto error;
2100 : }
2101 :
2102 224 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2103 224 : if (BBPrename(lg->catalog_id, bak) < 0) {
2104 0 : goto error;
2105 : }
2106 :
2107 224 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2108 224 : if (BBPrename(lg->dcatalog, bak) < 0) {
2109 0 : goto error;
2110 : }
2111 :
2112 224 : if (!LOG_DISABLED(lg)) {
2113 223 : if (GDKcreatedir(filename) != GDK_SUCCEED) {
2114 0 : GDKerror("cannot create directory for log file %s\n", filename);
2115 0 : goto error;
2116 : }
2117 223 : if (log_create_types_file(lg, filename, true) != GDK_SUCCEED)
2118 0 : goto error;
2119 : }
2120 :
2121 224 : BBPretain(lg->catalog_bid->batCacheid);
2122 224 : BBPretain(lg->catalog_id->batCacheid);
2123 224 : BBPretain(lg->dcatalog->batCacheid);
2124 :
2125 224 : log_lock(lg);
2126 : /* bm_subcommit releases the lock */
2127 224 : if (bm_subcommit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2128 : /* cannot commit catalog, so remove log */
2129 0 : if (MT_remove(filename) < 0)
2130 0 : GDKsyserror("remove %s failed\n", filename);
2131 0 : BBPrelease(lg->catalog_bid->batCacheid);
2132 0 : BBPrelease(lg->catalog_id->batCacheid);
2133 0 : BBPrelease(lg->dcatalog->batCacheid);
2134 0 : goto error;
2135 : }
2136 : } else {
2137 : /* find the persistent catalog. As non persistent bats
2138 : * require a logical reference we also add a logical
2139 : * reference for the persistent bats */
2140 112 : BUN p, q;
2141 112 : BAT *b, *o, *d;
2142 :
2143 112 : assert(!lg->inmemory);
2144 :
2145 : /* the catalog exists, and so should the log file */
2146 112 : if (fp == NULL && !LOG_DISABLED(lg)) {
2147 0 : GDKerror("There is a logger catalog, but no log file.\n");
2148 0 : goto error;
2149 : }
2150 : if (fp != NULL) {
2151 : /* check_version always closes fp */
2152 112 : if (check_version(lg, fp, fn, logdir, filename, &needsnew) != GDK_SUCCEED) {
2153 0 : fp = NULL;
2154 0 : goto error;
2155 : }
2156 : readlogs = true;
2157 : fp = NULL;
2158 : }
2159 :
2160 112 : if (lg->catalog_bid == NULL && lg->catalog_id == NULL && lg->dcatalog == NULL) {
2161 112 : b = BATdescriptor(catalog_bid);
2162 112 : if (b == NULL) {
2163 0 : GDKerror("inconsistent database, catalog does not exist");
2164 0 : goto error;
2165 : }
2166 :
2167 112 : strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
2168 112 : catalog_id = BBPindex(bak);
2169 112 : o = BATdescriptor(catalog_id);
2170 112 : if (o == NULL) {
2171 0 : BBPunfix(b->batCacheid);
2172 0 : GDKerror("inconsistent database, catalog_id does not exist");
2173 0 : goto error;
2174 : }
2175 :
2176 112 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
2177 112 : dcatalog = BBPindex(bak);
2178 112 : d = BATdescriptor(dcatalog);
2179 112 : if (d == NULL) {
2180 0 : GDKerror("cannot create dcatalog bat");
2181 0 : BBPunfix(b->batCacheid);
2182 0 : BBPunfix(o->batCacheid);
2183 0 : goto error;
2184 : }
2185 :
2186 112 : lg->catalog_bid = b;
2187 112 : lg->catalog_id = o;
2188 112 : lg->dcatalog = d;
2189 112 : const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
2190 28556 : BATloop(lg->catalog_bid, p, q) {
2191 28444 : bat bid = bids[p];
2192 28444 : oid pos = p;
2193 :
2194 28444 : if (BBPretain(bid) == 0 && /* any bid in the catalog_bid, needs one logical ref */
2195 0 : BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
2196 0 : BUNappend(lg->dcatalog, &pos, true) != GDK_SUCCEED)
2197 0 : goto error;
2198 : }
2199 : }
2200 112 : if ((lg->catalog_bid = BATsetaccess(lg->catalog_bid, BAT_READ)) == NULL ||
2201 112 : (lg->catalog_id = BATsetaccess(lg->catalog_id, BAT_READ)) == NULL ||
2202 112 : (lg->dcatalog = BATsetaccess(lg->dcatalog, BAT_READ)) == NULL) {
2203 0 : goto error;
2204 : }
2205 112 : BBPretain(lg->catalog_bid->batCacheid);
2206 112 : BBPretain(lg->catalog_id->batCacheid);
2207 112 : BBPretain(lg->dcatalog->batCacheid);
2208 : }
2209 : /* failing to rename the catalog_cnt and catalog_lid bats is not
2210 : * fatal */
2211 336 : lg->catalog_cnt = logbat_new(TYPE_lng, 1, SYSTRANS);
2212 336 : if (lg->catalog_cnt == NULL) {
2213 0 : GDKerror("failed to create catalog_cnt bat");
2214 0 : goto error;
2215 : }
2216 336 : strconcat_len(bak, sizeof(bak), fn, "_catalog_cnt", NULL);
2217 336 : if (BBPrename(lg->catalog_cnt, bak) < 0)
2218 0 : GDKclrerr();
2219 336 : lg->catalog_lid = logbat_new(TYPE_lng, 1, SYSTRANS);
2220 336 : if (lg->catalog_lid == NULL) {
2221 0 : GDKerror("failed to create catalog_lid bat");
2222 0 : goto error;
2223 : }
2224 336 : strconcat_len(bak, sizeof(bak), fn, "_catalog_lid", NULL);
2225 336 : if (BBPrename(lg->catalog_lid, bak) < 0)
2226 0 : GDKclrerr();
2227 336 : if (bm_get_counts(lg) != GDK_SUCCEED)
2228 0 : goto error;
2229 :
2230 336 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2231 336 : if (BBPindex(bak)) {
2232 112 : lg->seqs_id = BATdescriptor(BBPindex(bak));
2233 112 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2234 112 : lg->seqs_val = BATdescriptor(BBPindex(bak));
2235 112 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2236 112 : lg->dseqs = BATdescriptor(BBPindex(bak));
2237 112 : if (lg->seqs_id == NULL ||
2238 112 : lg->seqs_val == NULL ||
2239 : lg->dseqs == NULL) {
2240 0 : GDKerror("Logger_new: cannot load seqs bats");
2241 0 : goto error;
2242 : }
2243 : } else {
2244 224 : lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
2245 224 : lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
2246 224 : lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
2247 224 : if (lg->seqs_id == NULL ||
2248 224 : lg->seqs_val == NULL ||
2249 : lg->dseqs == NULL) {
2250 0 : GDKerror("Logger_new: cannot create seqs bats");
2251 0 : goto error;
2252 : }
2253 :
2254 224 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
2255 224 : if (BBPrename(lg->seqs_id, bak) < 0) {
2256 0 : goto error;
2257 : }
2258 :
2259 224 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
2260 224 : if (BBPrename(lg->seqs_val, bak) < 0) {
2261 0 : goto error;
2262 : }
2263 :
2264 224 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
2265 224 : if (BBPrename(lg->dseqs, bak) < 0) {
2266 0 : goto error;
2267 : }
2268 : needcommit = true;
2269 : }
2270 336 : if ((lg->seqs_val = BATsetaccess(lg->seqs_val, BAT_READ)) == NULL ||
2271 336 : (lg->seqs_id = BATsetaccess(lg->seqs_id, BAT_READ)) == NULL ||
2272 336 : (lg->dseqs = BATsetaccess(lg->dseqs, BAT_READ)) == NULL) {
2273 0 : goto error;
2274 : }
2275 336 : dbg = ATOMIC_GET(&GDKdebug);
2276 336 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2277 336 : if (needcommit && bm_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2278 0 : GDKerror("Logger_new: commit failed");
2279 0 : goto error;
2280 : }
2281 336 : ATOMIC_SET(&GDKdebug, dbg);
2282 :
2283 336 : if (readlogs) {
2284 112 : ulng log_id = lg->saved_id + 1;
2285 112 : if (log_readlogs(lg, filename) != GDK_SUCCEED) {
2286 0 : goto error;
2287 : }
2288 112 : if (lg->postfuncp && (*lg->postfuncp) (lg->funcdata, lg) != GDK_SUCCEED)
2289 0 : goto error;
2290 112 : if (needsnew) {
2291 0 : if (GDKmove(0, lg->dir, LOGFILE, NULL, lg->dir, LOGFILE, "bak", true) != GDK_SUCCEED) {
2292 0 : TRC_CRITICAL(GDK, "couldn't move log to log.bak\n");
2293 0 : return GDK_FAIL;
2294 : }
2295 0 : if (log_create_types_file(lg, filename, false) != GDK_SUCCEED) {
2296 0 : TRC_CRITICAL(GDK, "couldn't write new log\n");
2297 0 : return GDK_FAIL;
2298 : }
2299 : }
2300 112 : dbg = ATOMIC_GET(&GDKdebug);
2301 112 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
2302 112 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED) {
2303 0 : goto error;
2304 : }
2305 112 : ATOMIC_SET(&GDKdebug, dbg);
2306 293 : for (; log_id <= lg->saved_id; log_id++)
2307 181 : (void) log_cleanup(lg, log_id); /* ignore error of removing file */
2308 112 : if (needsnew &&
2309 0 : GDKunlink(0, lg->dir, LOGFILE, "bak") != GDK_SUCCEED) {
2310 0 : TRC_CRITICAL(GDK, "couldn't remove old log.bak file\n");
2311 0 : return GDK_FAIL;
2312 : }
2313 : } else {
2314 224 : lg->id = lg->saved_id + 1;
2315 : }
2316 : #ifdef GDKLIBRARY_JSON
2317 336 : if (log_json_upgrade_finalize() == GDK_FAIL)
2318 0 : goto error;
2319 : #endif
2320 : return GDK_SUCCEED;
2321 0 : error:
2322 0 : if (fp)
2323 0 : fclose(fp);
2324 0 : logbat_destroy(lg->catalog_bid);
2325 0 : logbat_destroy(lg->catalog_id);
2326 0 : logbat_destroy(lg->dcatalog);
2327 0 : logbat_destroy(lg->seqs_id);
2328 0 : logbat_destroy(lg->seqs_val);
2329 0 : logbat_destroy(lg->dseqs);
2330 0 : ATOMIC_DESTROY(&lg->current->refcount);
2331 0 : ATOMIC_DESTROY(&lg->nr_flushers);
2332 0 : MT_lock_destroy(&lg->lock);
2333 0 : MT_lock_destroy(&lg->rotation_lock);
2334 0 : GDKfree(lg->fn);
2335 0 : GDKfree(lg->dir);
2336 0 : GDKfree(lg->rbuf);
2337 0 : GDKfree(lg->wbuf);
2338 0 : GDKfree(lg);
2339 0 : ATOMIC_SET(&GDKdebug, dbg);
2340 : /* We do not call log_json_upgrade_finalize here because we want
2341 : * the upgrade to run again next time we try, so we do not want
2342 : * to remove the signal file just yet.
2343 : */
2344 0 : return GDK_FAIL;
2345 : }
2346 :
2347 : /* Initialize a new logger
2348 : * It will load any data in the logdir and persist it in the BATs*/
2349 : static logger *
2350 336 : log_new(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp,
2351 : postversionfix_fptr postfuncp, void *funcdata)
2352 : {
2353 336 : logger *lg;
2354 336 : char filename[FILENAME_MAX];
2355 :
2356 336 : lng max_dropped = GDKgetenv_int("wal_max_dropped", 100000);
2357 336 : lng max_file_age = GDKgetenv_int("wal_max_file_age", 600);
2358 336 : lng max_file_size = 0;
2359 :
2360 336 : if (GDKdebug & FORCEMITOMASK) {
2361 : max_file_size = 2048; /* 2 KiB */
2362 : } else {
2363 12 : const char *max_file_size_str = GDKgetenv("wal_max_file_size");
2364 12 : max_file_size = max_file_size_str ? strtoul(max_file_size_str, NULL, 10) : 2147483648;
2365 : }
2366 :
2367 336 : if (!GDKinmemory(0) && MT_path_absolute(logdir)) {
2368 0 : TRC_CRITICAL(GDK, "logdir must be relative path\n");
2369 0 : return NULL;
2370 : }
2371 :
2372 336 : lg = GDKmalloc(sizeof(struct logger));
2373 336 : if (lg == NULL) {
2374 0 : TRC_CRITICAL(GDK, "allocating logger structure failed\n");
2375 0 : return NULL;
2376 : }
2377 :
2378 672 : *lg = (logger) {
2379 336 : .inmemory = GDKinmemory(0),
2380 : .debug = debug,
2381 : .version = version,
2382 : .prefuncp = prefuncp,
2383 : .postfuncp = postfuncp,
2384 : .funcdata = funcdata,
2385 :
2386 336 : .max_dropped = max_dropped >= 0 ? max_dropped : 100000,
2387 : .file_age = 0,
2388 336 : .max_file_age = max_file_age >= 0 ? max_file_age * 1000000 : 600000000,
2389 336 : .max_file_size = max_file_size >= 0 ? max_file_size : 2147483648,
2390 :
2391 : .id = 0,
2392 336 : .saved_id = getBBPlogno(), /* get saved log numer from bbp */
2393 336 : .saved_tid = (int) getBBPtransid(), /* get saved transaction id from bbp */
2394 : };
2395 :
2396 : /* probably open file and check version first, then call call old logger code */
2397 336 : if (snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP) >= FILENAME_MAX) {
2398 0 : TRC_CRITICAL(GDK, "filename is too large\n");
2399 0 : GDKfree(lg);
2400 0 : return NULL;
2401 : }
2402 336 : lg->fn = GDKstrdup(fn);
2403 336 : lg->dir = GDKstrdup(filename);
2404 336 : lg->rbufsize = 64 * 1024;
2405 336 : lg->rbuf = GDKmalloc(lg->rbufsize);
2406 336 : lg->wbufsize = 64 * 1024;
2407 336 : lg->wbuf = GDKmalloc(lg->wbufsize);
2408 336 : if (lg->fn == NULL ||
2409 336 : lg->dir == NULL ||
2410 336 : lg->rbuf == NULL ||
2411 : lg->wbuf == NULL) {
2412 0 : TRC_CRITICAL(GDK, "allocating for logger structure failed\n");
2413 0 : GDKfree(lg->fn);
2414 0 : GDKfree(lg->dir);
2415 0 : GDKfree(lg->rbuf);
2416 0 : GDKfree(lg->wbuf);
2417 0 : GDKfree(lg);
2418 0 : return NULL;
2419 : }
2420 336 : TRC_DEBUG(WAL, "dir set to %s\n", lg->dir);
2421 :
2422 336 : MT_lock_init(&lg->lock, fn);
2423 336 : MT_lock_init(&lg->rotation_lock, "rotation_lock");
2424 336 : MT_lock_init(&lg->flush_lock, "flush_lock");
2425 336 : MT_cond_init(&lg->excl_flush_cv);
2426 336 : ATOMIC_INIT(&lg->nr_flushers, 0);
2427 :
2428 336 : if (log_load(fn, logdir, lg, filename) == GDK_SUCCEED) {
2429 : return lg;
2430 : }
2431 : return NULL;
2432 : }
2433 :
2434 : static logged_range *
2435 66833 : do_flush_range_cleanup(logger *lg)
2436 : {
2437 66833 : logged_range *frange = lg->flush_ranges;
2438 66833 : logged_range *first = frange;
2439 :
2440 78491 : while (frange->next) {
2441 12141 : if (ATOMIC_GET(&frange->refcount) > 1)
2442 : break;
2443 11658 : frange = frange->next;
2444 : }
2445 66833 : if (first == frange) {
2446 : return first;
2447 : }
2448 :
2449 11658 : logged_range *flast = frange;
2450 :
2451 11658 : lg->flush_ranges = flast;
2452 :
2453 23316 : for (frange = first; frange && frange != flast; frange = frange->next) {
2454 11658 : ATOMIC_DEC(&frange->refcount);
2455 11658 : if (!LOG_DISABLED(lg) && frange->output_log) {
2456 0 : TRC_INFO(WAL, "closing output log %s", mnstr_name(frange->output_log));
2457 0 : close_stream(frange->output_log);
2458 0 : frange->output_log = NULL;
2459 : }
2460 : }
2461 : return flast;
2462 : }
2463 :
2464 : void
2465 335 : log_destroy(logger *lg)
2466 : {
2467 335 : log_close_input(lg);
2468 335 : logged_range *last = do_flush_range_cleanup(lg);
2469 335 : (void) last;
2470 335 : assert(last == lg->current && last == lg->flush_ranges);
2471 335 : log_close_output(lg);
2472 755 : for (logged_range * p = lg->pending; p; p = lg->pending) {
2473 420 : lg->pending = p->next;
2474 420 : ATOMIC_DESTROY(&p->refcount);
2475 420 : ATOMIC_DESTROY(&p->last_ts);
2476 420 : ATOMIC_DESTROY(&p->flushed_ts);
2477 420 : ATOMIC_DESTROY(&p->drops);
2478 420 : GDKfree(p);
2479 : }
2480 335 : if (LOG_DISABLED(lg)) {
2481 1 : lg->saved_id = lg->id;
2482 1 : lg->saved_tid = lg->tid;
2483 1 : log_commit(lg, NULL, NULL, 0);
2484 : }
2485 335 : if (lg->catalog_bid) {
2486 335 : log_lock(lg);
2487 335 : BUN p, q;
2488 335 : BAT *b = lg->catalog_bid;
2489 :
2490 : /* free resources */
2491 335 : const log_bid *bids = (const log_bid *) Tloc(b, 0);
2492 86927 : BATloop(b, p, q) {
2493 86592 : bat bid = bids[p];
2494 :
2495 86592 : BBPrelease(bid);
2496 : }
2497 :
2498 335 : BBPrelease(lg->catalog_bid->batCacheid);
2499 335 : BBPrelease(lg->catalog_id->batCacheid);
2500 335 : BBPrelease(lg->dcatalog->batCacheid);
2501 335 : logbat_destroy(lg->catalog_bid);
2502 335 : logbat_destroy(lg->catalog_id);
2503 335 : logbat_destroy(lg->dcatalog);
2504 :
2505 335 : logbat_destroy(lg->catalog_cnt);
2506 335 : logbat_destroy(lg->catalog_lid);
2507 335 : log_unlock(lg);
2508 : }
2509 335 : MT_lock_destroy(&lg->lock);
2510 335 : MT_lock_destroy(&lg->rotation_lock);
2511 335 : MT_lock_destroy(&lg->flush_lock);
2512 335 : ATOMIC_DESTROY(&lg->nr_flushers);
2513 335 : GDKfree(lg->fn);
2514 335 : GDKfree(lg->dir);
2515 335 : GDKfree(lg->rbuf);
2516 335 : GDKfree(lg->wbuf);
2517 335 : GDKfree(lg);
2518 335 : }
2519 :
2520 : /* Create a new logger */
2521 : logger *
2522 336 : log_create(int debug, const char *fn, const char *logdir, int version,
2523 : preversionfix_fptr prefuncp, postversionfix_fptr postfuncp,
2524 : void *funcdata)
2525 : {
2526 336 : logger *lg;
2527 336 : TRC_INFO_IF(WAL) {
2528 0 : TRC_INFO_ENDIF(WAL, "Started processing logs %s/%s version %d\n", fn, logdir, version);
2529 0 : GDKtracer_flush_buffer();
2530 : }
2531 336 : lg = log_new(debug, fn, logdir, version, prefuncp, postfuncp, funcdata);
2532 336 : if (lg == NULL)
2533 : return NULL;
2534 336 : TRC_INFO_IF(WAL) {
2535 0 : TRC_INFO_ENDIF(WAL, "Finished processing logs %s/%s\n", fn, logdir);
2536 0 : GDKtracer_flush_buffer();
2537 : }
2538 336 : if (GDKsetenv("recovery", "finished") != GDK_SUCCEED) {
2539 0 : log_destroy(lg);
2540 0 : return NULL;
2541 : }
2542 336 : assert(lg->current == NULL);
2543 336 : logged_range dummy = {
2544 336 : .cnt = BATcount(lg->catalog_bid),
2545 : };
2546 336 : lg->current = &dummy;
2547 336 : if (log_open_output(lg) != GDK_SUCCEED) {
2548 0 : log_destroy(lg);
2549 0 : return NULL;
2550 : }
2551 336 : lg->current = lg->current->next;
2552 336 : assert(lg->pending == NULL && lg->flush_ranges == NULL);
2553 336 : lg->pending = lg->current;
2554 336 : lg->flush_ranges = lg->current;
2555 336 : return lg;
2556 : }
2557 :
2558 : static logged_range *
2559 7258 : log_next_logfile(logger *lg, ulng ts)
2560 : {
2561 7258 : int m = (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) ? 1000 : 100;
2562 7258 : if (!lg->pending || !lg->pending->next)
2563 : return NULL;
2564 7258 : rotation_lock(lg);
2565 7258 : if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && lg->pending != lg->flush_ranges &&
2566 7258 : (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) ATOMIC_GET(&lg->pending->flushed_ts) &&
2567 7258 : (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) {
2568 6233 : rotation_unlock(lg);
2569 6233 : logged_range *p = lg->pending;
2570 6233 : for (int i = 1;
2571 11573 : i < m && ATOMIC_GET(&p->refcount) == 0 && p->next && p->next != lg->current &&
2572 5358 : p->next != lg->flush_ranges && (ulng) ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts)
2573 16930 : && (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++)
2574 5340 : p = p->next;
2575 6233 : return p;
2576 : }
2577 1025 : rotation_unlock(lg);
2578 1025 : return NULL;
2579 : }
2580 :
2581 : static void
2582 6233 : log_cleanup_range(logger *lg, ulng id)
2583 : {
2584 6233 : rotation_lock(lg);
2585 17806 : while (lg->pending && lg->pending->id <= id) {
2586 11573 : logged_range *p;
2587 11573 : p = lg->pending;
2588 11573 : if (p)
2589 11573 : lg->pending = p->next;
2590 11573 : GDKfree(p);
2591 : }
2592 6233 : rotation_unlock(lg);
2593 6233 : }
2594 :
2595 : static void
2596 66501 : do_rotate(logger *lg)
2597 : {
2598 66501 : logged_range *cur = lg->current;
2599 66501 : logged_range *next = cur->next;
2600 66501 : if (next) {
2601 11658 : assert(ATOMIC_GET(&next->refcount) == 1);
2602 11658 : lg->current = next;
2603 11658 : if (!LOG_DISABLED(lg) && ATOMIC_GET(&cur->refcount) == 1 && cur->output_log) {
2604 11463 : close_stream(cur->output_log);
2605 11463 : cur->output_log = NULL;
2606 : }
2607 : }
2608 66501 : }
2609 :
2610 : gdk_return
2611 10991 : log_activate(logger *lg)
2612 : {
2613 10991 : bool flush_cleanup = false;
2614 10991 : gdk_return res = GDK_SUCCEED;
2615 :
2616 10991 : rotation_lock(lg);
2617 10991 : const lng current_file_size = LOG_DISABLED(lg) ? 0 : (lng) getfilepos(getFile(lg->current->output_log));
2618 :
2619 10991 : if (current_file_size == -1) {
2620 0 : rotation_unlock(lg);
2621 0 : return GDK_FAIL;
2622 : }
2623 : /* file size of 2 means only endian indicator present
2624 : * (i.e. effectively empty) */
2625 10991 : if (current_file_size <= 2) {
2626 6862 : rotation_unlock(lg);
2627 6862 : return GDK_SUCCEED;
2628 : }
2629 :
2630 4129 : if (!lg->flushnow &&
2631 4129 : !lg->current->next &&
2632 4129 : current_file_size > 2 &&
2633 4129 : (ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped ||
2634 4120 : current_file_size > lg->max_file_size ||
2635 4074 : (GDKusec() - lg->file_age) > lg->max_file_age) &&
2636 55 : (ulng) ATOMIC_GET(&lg->current->last_ts) > 0 &&
2637 55 : lg->saved_id + 1 == lg->id &&
2638 55 : ATOMIC_GET(&lg->current->refcount) == 1 /* no pending work on this file */ ) {
2639 51 : lg->id++;
2640 : /* start new file */
2641 51 : res = log_open_output(lg);
2642 51 : flush_cleanup = true;
2643 51 : do_rotate(lg);
2644 : }
2645 51 : if (flush_cleanup)
2646 51 : (void) do_flush_range_cleanup(lg);
2647 4129 : rotation_unlock(lg);
2648 4129 : return res;
2649 : }
2650 :
2651 : gdk_return
2652 7258 : log_flush(logger *lg, ulng ts)
2653 : {
2654 7258 : logged_range *pending = log_next_logfile(lg, ts);
2655 7258 : ulng lid = pending ? pending->id : 0, olid = lg->saved_id;
2656 7258 : if (LOG_DISABLED(lg)) {
2657 0 : lg->saved_id = lid;
2658 0 : lg->saved_tid = lg->tid;
2659 0 : if (lid)
2660 0 : log_cleanup_range(lg, lg->saved_id);
2661 0 : if (log_commit(lg, NULL, NULL, 0) != GDK_SUCCEED)
2662 0 : TRC_ERROR(GDK, "failed to commit");
2663 0 : return GDK_SUCCEED;
2664 : }
2665 7258 : if (lg->saved_id >= lid)
2666 : return GDK_SUCCEED;
2667 6233 : rotation_lock(lg);
2668 6233 : ulng lgid = lg->id;
2669 6233 : rotation_unlock(lg);
2670 6233 : if (lg->saved_id + 1 >= lgid) /* logger should first release the file */
2671 : return GDK_SUCCEED;
2672 6233 : log_return res = LOG_OK;
2673 6233 : ulng cid = olid;
2674 6233 : assert(lid <= lgid);
2675 : uint32_t *updated = NULL;
2676 : BUN nupdated = 0;
2677 : size_t allocated = 0;
2678 17806 : while (cid < lid && res == LOG_OK) {
2679 11573 : if (!lg->input_log) {
2680 11573 : char *filename;
2681 11573 : char id[32];
2682 11573 : if (snprintf(id, sizeof(id), LLFMT, cid + 1) >= (int) sizeof(id)) {
2683 0 : GDKfree(updated);
2684 0 : TRC_CRITICAL(GDK, "log_id filename is too large\n");
2685 0 : return GDK_FAIL;
2686 : }
2687 11573 : if ((filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id)) == NULL) {
2688 0 : GDKfree(updated);
2689 0 : return GDK_FAIL;
2690 : }
2691 11573 : if (strlen(filename) >= FILENAME_MAX) {
2692 0 : GDKfree(updated);
2693 0 : TRC_CRITICAL(GDK, "Logger filename path is too large\n");
2694 0 : GDKfree(filename);
2695 0 : return GDK_FAIL;
2696 : }
2697 :
2698 11573 : bool filemissing = false;
2699 11573 : if (log_open_input(lg, filename, &filemissing) != GDK_SUCCEED) {
2700 0 : GDKfree(updated);
2701 0 : GDKfree(filename);
2702 0 : return GDK_FAIL;
2703 : }
2704 11573 : GDKfree(filename);
2705 : }
2706 : /* we read the full file because skipping is impossible with current log format */
2707 11573 : log_lock(lg);
2708 11573 : if (updated == NULL) {
2709 6233 : nupdated = BATcount(lg->catalog_id);
2710 6233 : allocated = ((nupdated + 31) & ~31) / 8;
2711 6233 : if (allocated == 0)
2712 0 : allocated = 4;
2713 6233 : updated = GDKzalloc(allocated);
2714 6233 : if (updated == NULL) {
2715 0 : log_unlock(lg);
2716 0 : return GDK_FAIL;
2717 : }
2718 5340 : } else if (nupdated < BATcount(lg->catalog_id)) {
2719 91 : BUN n = BATcount(lg->catalog_id);
2720 91 : size_t a = ((n + 31) & ~31) / 8;
2721 91 : if (a > allocated) {
2722 13 : uint32_t *p = GDKrealloc(updated, a);
2723 13 : if (p == NULL) {
2724 0 : GDKfree(updated);
2725 0 : log_unlock(lg);
2726 0 : return GDK_FAIL;
2727 : }
2728 13 : updated = p;
2729 13 : memset(updated + allocated / 4, 0, a - allocated);
2730 13 : allocated = a;
2731 : }
2732 : nupdated = n;
2733 : }
2734 11573 : lg->flushing = true;
2735 11573 : res = log_read_transaction(lg, updated, nupdated);
2736 11573 : lg->flushing = false;
2737 11573 : log_unlock(lg);
2738 11573 : if (res == LOG_EOF) {
2739 11573 : log_close_input(lg);
2740 11573 : res = LOG_OK;
2741 : }
2742 11573 : cid++;
2743 : }
2744 6233 : if (lid > olid && res == LOG_OK) {
2745 6233 : rotation_lock(lg); /* protect against concurrent log_tflush rotate check */
2746 6233 : lg->saved_id = lid;
2747 6233 : rotation_unlock(lg);
2748 6233 : if (log_commit(lg, pending, updated, nupdated) != GDK_SUCCEED) {
2749 0 : TRC_ERROR(GDK, "failed to commit");
2750 0 : res = LOG_ERR;
2751 0 : rotation_lock(lg);
2752 0 : lg->saved_id = olid; /* reset !! */
2753 0 : rotation_unlock(lg);
2754 : }
2755 0 : if (res != LOG_ERR) {
2756 17806 : while (olid < lid) {
2757 : /* Try to cleanup, remove old log file, continue on failure! */
2758 11573 : olid++;
2759 11573 : (void) log_cleanup(lg, olid);
2760 : }
2761 : }
2762 6233 : if (res == LOG_OK)
2763 6233 : log_cleanup_range(lg, lg->saved_id);
2764 : }
2765 6233 : GDKfree(updated);
2766 6233 : return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
2767 : }
2768 :
2769 : /* Clean-up write-ahead log files already persisted in the BATs, leaving only the most recent one.
2770 : * Only the bak- files are deleted for the preserved WAL files.
2771 : */
2772 : lng
2773 29746 : log_changes(logger *lg)
2774 : {
2775 29746 : if (LOG_DISABLED(lg))
2776 : return 0;
2777 29746 : rotation_lock(lg);
2778 29746 : lng changes = lg->id - lg->saved_id - 1;
2779 29746 : rotation_unlock(lg);
2780 29746 : return changes;
2781 : }
2782 :
2783 : int
2784 463 : log_sequence(logger *lg, int seq, lng *id)
2785 : {
2786 463 : log_lock(lg);
2787 463 : BUN p = log_find(lg->seqs_id, lg->dseqs, seq);
2788 :
2789 463 : if (p != BUN_NONE) {
2790 348 : *id = *(lng *) Tloc(lg->seqs_val, p);
2791 :
2792 348 : log_unlock(lg);
2793 348 : return 1;
2794 : }
2795 115 : log_unlock(lg);
2796 115 : return 0;
2797 : }
2798 :
2799 : gdk_return
2800 182774 : log_constant(logger *lg, int type, ptr val, log_id id, lng offset, lng cnt)
2801 : {
2802 182774 : bte tpe = find_type(lg, type);
2803 182774 : gdk_return ok = GDK_SUCCEED;
2804 182774 : logformat l;
2805 182774 : lng nr;
2806 182774 : l.flag = LOG_UPDATE_CONST;
2807 182774 : l.id = id;
2808 182774 : nr = cnt;
2809 :
2810 182774 : if (LOG_DISABLED(lg) || !nr) {
2811 : /* logging is switched off */
2812 55698 : if (nr) {
2813 55698 : log_lock(lg);
2814 55698 : ok = la_bat_update_count(lg, id, offset + cnt, lg->tid);
2815 55698 : log_unlock(lg);
2816 : }
2817 55698 : return ok;
2818 : }
2819 :
2820 127076 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[type].atomWrite;
2821 :
2822 127076 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2823 254152 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
2824 254152 : log_write_format(lg, &l) != GDK_SUCCEED ||
2825 254152 : !mnstr_writeLng(lg->current->output_log, nr) ||
2826 254152 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2827 127076 : !mnstr_writeLng(lg->current->output_log, offset)) {
2828 0 : ATOMIC_DEC(&lg->current->refcount);
2829 0 : ok = GDK_FAIL;
2830 0 : goto bailout;
2831 : }
2832 :
2833 127076 : ok = wt(val, lg->current->output_log, 1);
2834 :
2835 127076 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
2836 :
2837 127076 : bailout:
2838 127076 : if (ok != GDK_SUCCEED) {
2839 0 : const char *err = mnstr_peek_error(lg->current->output_log);
2840 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
2841 : }
2842 : return ok;
2843 : }
2844 :
2845 : static gdk_return
2846 18761 : string_writer(logger *lg, BAT *b, lng offset, lng nr)
2847 : {
2848 18761 : size_t bufsz = lg->wbufsize, resize = 0;
2849 18761 : BUN end = (BUN) (offset + nr);
2850 18761 : char *buf = lg->wbuf;
2851 18761 : gdk_return res = GDK_SUCCEED;
2852 :
2853 18761 : if (!buf)
2854 : return GDK_FAIL;
2855 18761 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2856 18761 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
2857 : return GDK_FAIL;
2858 18761 : BATiter bi = bat_iterator(b);
2859 18761 : BUN p = (BUN) offset;
2860 37574 : for (; p < end;) {
2861 18813 : size_t sz = 0;
2862 18813 : if (resize) {
2863 2 : if ((buf = GDKrealloc(lg->wbuf, resize)) == NULL) {
2864 : res = GDK_FAIL;
2865 : break;
2866 : }
2867 2 : lg->wbuf = buf;
2868 2 : lg->wbufsize = bufsz = resize;
2869 2 : resize = 0;
2870 : }
2871 18813 : char *dst = buf;
2872 102873 : for (; p < end && sz < bufsz; p++) {
2873 84112 : const char *s = BUNtvar(bi, p);
2874 84112 : size_t len = strlen(s) + 1;
2875 84112 : if ((sz + len) > bufsz) {
2876 52 : if (len > bufsz)
2877 2 : resize = len + bufsz;
2878 : break;
2879 : } else {
2880 84060 : memcpy(dst, s, len);
2881 84060 : dst += len;
2882 84060 : sz += len;
2883 : }
2884 : }
2885 37624 : if (sz &&
2886 37622 : (!mnstr_writeLng(lg->current->output_log, (lng) sz) ||
2887 18811 : mnstr_write(lg->current->output_log, buf, sz, 1) != 1)) {
2888 : res = GDK_FAIL;
2889 : break;
2890 : }
2891 : }
2892 18761 : bat_iterator_end(&bi);
2893 18761 : return res;
2894 : }
2895 :
2896 : static gdk_return
2897 490664 : internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced, lng total_cnt)
2898 : {
2899 490664 : bte tpe = find_type(lg, b->ttype);
2900 490664 : gdk_return ok = GDK_SUCCEED;
2901 490664 : logformat l;
2902 490664 : BUN p;
2903 490664 : lng nr;
2904 490664 : l.flag = LOG_UPDATE_BULK;
2905 490664 : l.id = id;
2906 490664 : nr = cnt;
2907 :
2908 490664 : if (LOG_DISABLED(lg) || !nr) {
2909 : /* logging is switched off */
2910 194106 : if (nr)
2911 141173 : return la_bat_update_count(lg, id, offset + cnt, lg->tid);
2912 : return GDK_SUCCEED;
2913 : }
2914 :
2915 267134 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[b->ttype].atomWrite;
2916 :
2917 267134 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
2918 267134 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR) {
2919 0 : ok = GDK_FAIL;
2920 0 : goto bailout;
2921 : }
2922 :
2923 267134 : if (lg->total_cnt == 0) /* signals single bulk message or first part of bat logged in parts */
2924 532220 : if (log_write_format(lg, &l) != GDK_SUCCEED ||
2925 660893 : !mnstr_writeLng(lg->current->output_log, total_cnt ? total_cnt : cnt) ||
2926 532220 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
2927 403547 : !mnstr_writeLng(lg->current->output_log, total_cnt ? -1 : offset)) { /* offset = -1 indicates bat was logged in parts */
2928 0 : ok = GDK_FAIL;
2929 0 : goto bailout;
2930 : }
2931 267134 : if (!total_cnt)
2932 128673 : total_cnt = cnt;
2933 267134 : lg->total_cnt += cnt;
2934 :
2935 267134 : if (lg->total_cnt == total_cnt) /* This is the last to be logged part of this bat, we can already reset the total_cnt */
2936 266110 : lg->total_cnt = 0;
2937 :
2938 : /* if offset is just for the log, but BAT is already sliced, reset offset */
2939 267134 : if (sliced)
2940 1478 : offset = 0;
2941 267134 : if (b->ttype == TYPE_msk) {
2942 0 : BATiter bi = bat_iterator(b);
2943 0 : if (offset % 32 == 0) {
2944 0 : if (!mnstr_writeIntArray(lg->current->output_log, (int *) ((char *) bi.base + offset / 32),
2945 0 : (size_t) ((nr + 31) / 32)))
2946 0 : ok = GDK_FAIL;
2947 : } else {
2948 0 : for (lng i = 0; i < nr; i += 32) {
2949 : uint32_t v = 0;
2950 0 : for (int j = 0; j < 32 && i + j < nr; j++)
2951 0 : v |= (uint32_t) Tmskval(&bi, (BUN) (offset + i + j)) << j;
2952 0 : if (!mnstr_writeInt(lg->current->output_log, (int) v)) {
2953 : ok = GDK_FAIL;
2954 : break;
2955 : }
2956 : }
2957 : }
2958 0 : bat_iterator_end(&bi);
2959 514871 : } else if (b->ttype < TYPE_str && !isVIEW(b)) {
2960 247737 : BATiter bi = bat_iterator(b);
2961 247737 : const void *t = BUNtail(bi, (BUN) offset);
2962 :
2963 247737 : ok = wt(t, lg->current->output_log, (size_t) nr);
2964 247737 : bat_iterator_end(&bi);
2965 19397 : } else if (b->ttype == TYPE_str) {
2966 : /* efficient string writes */
2967 18755 : ok = string_writer(lg, b, offset, nr);
2968 : } else {
2969 642 : BATiter bi = bat_iterator(b);
2970 642 : BUN end = (BUN) (offset + nr);
2971 1642 : for (p = (BUN) offset; p < end && ok == GDK_SUCCEED; p++) {
2972 1000 : const void *t = BUNtail(bi, p);
2973 :
2974 1000 : ok = wt(t, lg->current->output_log, 1);
2975 : }
2976 642 : bat_iterator_end(&bi);
2977 : }
2978 :
2979 267134 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
2980 :
2981 267134 : bailout:
2982 267134 : if (ok != GDK_SUCCEED) {
2983 0 : ATOMIC_DEC(&lg->current->refcount);
2984 0 : const char *err = mnstr_peek_error(lg->current->output_log);
2985 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
2986 : }
2987 : return ok;
2988 : }
2989 :
2990 : /*
2991 : * Changes made to the BAT descriptor should be stored in the log
2992 : * files. Actually, we need to save the descriptor file, perhaps we
2993 : * should simply introduce a versioning scheme.
2994 : */
2995 : gdk_return
2996 233872 : log_bat_persists(logger *lg, BAT *b, log_id id)
2997 : {
2998 233872 : log_lock(lg);
2999 233872 : bte ta = find_type(lg, b->ttype);
3000 233872 : logformat l;
3001 :
3002 233872 : if (log_add_bat(lg, b, id, -1) != GDK_SUCCEED) {
3003 0 : log_unlock(lg);
3004 0 : if (!LOG_DISABLED(lg))
3005 0 : ATOMIC_DEC(&lg->current->refcount);
3006 0 : return GDK_FAIL;
3007 : }
3008 :
3009 233872 : l.flag = LOG_CREATE;
3010 233872 : l.id = id;
3011 233872 : if (!LOG_DISABLED(lg)) {
3012 156607 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3013 313214 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3014 313214 : log_write_format(lg, &l) != GDK_SUCCEED ||
3015 156607 : mnstr_write(lg->current->output_log, &ta, 1, 1) != 1) {
3016 0 : log_unlock(lg);
3017 0 : ATOMIC_DEC(&lg->current->refcount);
3018 0 : return GDK_FAIL;
3019 : }
3020 : }
3021 233872 : TRC_DEBUG(WAL, "id (%d) bat (%d)\n", id, b->batCacheid);
3022 233872 : gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0, 0);
3023 233872 : log_unlock(lg);
3024 233872 : if (r != GDK_SUCCEED)
3025 0 : ATOMIC_DEC(&lg->current->refcount);
3026 : return r;
3027 : }
3028 :
3029 : gdk_return
3030 21352 : log_bat_transient(logger *lg, log_id id)
3031 : {
3032 21352 : log_lock(lg);
3033 21352 : log_bid bid = internal_find_bat(lg, id, -1);
3034 21352 : logformat l;
3035 :
3036 21352 : if (bid < 0) {
3037 0 : log_unlock(lg);
3038 0 : return GDK_FAIL;
3039 : }
3040 21352 : if (!bid) {
3041 0 : GDKerror("log_bat_transient failed to find bid for object %d\n", id);
3042 0 : log_unlock(lg);
3043 0 : return GDK_FAIL;
3044 : }
3045 21352 : l.flag = LOG_DESTROY;
3046 21352 : l.id = id;
3047 :
3048 21352 : if (!LOG_DISABLED(lg)) {
3049 12304 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3050 0 : TRC_CRITICAL(GDK, "write failed\n");
3051 0 : log_unlock(lg);
3052 0 : ATOMIC_DEC(&lg->current->refcount);
3053 0 : return GDK_FAIL;
3054 : }
3055 : }
3056 21352 : TRC_DEBUG(WAL, "Logged destroyed bat (%d) %d\n", id, bid);
3057 21352 : BAT *b = BBPquickdesc(bid);
3058 21352 : assert(b);
3059 21352 : BUN cnt = BATcount(b);
3060 21352 : ATOMIC_ADD(&lg->current->drops, cnt);
3061 21352 : gdk_return r = log_del_bat(lg, bid);
3062 21352 : log_unlock(lg);
3063 21352 : if (r != GDK_SUCCEED)
3064 0 : ATOMIC_DEC(&lg->current->refcount);
3065 : return r;
3066 : }
3067 :
3068 : static gdk_return
3069 112486 : log_bat_group(logger *lg, log_id id)
3070 : {
3071 112486 : if (LOG_DISABLED(lg))
3072 : return GDK_SUCCEED;
3073 :
3074 75086 : logformat l;
3075 75086 : l.flag = LOG_BAT_GROUP;
3076 75086 : l.id = id;
3077 75086 : gdk_return r = log_write_format(lg, &l);
3078 75086 : return r;
3079 : }
3080 :
3081 : gdk_return
3082 56243 : log_bat_group_start(logger *lg, log_id id)
3083 : {
3084 : /*positive table id represent start of logged table */
3085 56243 : return log_bat_group(lg, id);
3086 : }
3087 :
3088 : gdk_return
3089 56243 : log_bat_group_end(logger *lg, log_id id)
3090 : {
3091 : /*negative table id represent end of logged table */
3092 56243 : return log_bat_group(lg, -id);
3093 : }
3094 :
3095 : gdk_return
3096 254113 : log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, lng total_cnt)
3097 : {
3098 254113 : log_lock(lg);
3099 254113 : gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0, total_cnt);
3100 254113 : log_unlock(lg);
3101 254113 : return r;
3102 : }
3103 :
3104 : gdk_return
3105 2699 : log_delta(logger *lg, BAT *uid, BAT *uval, log_id id)
3106 : {
3107 2699 : log_lock(lg);
3108 2699 : bte tpe = find_type(lg, uval->ttype);
3109 2699 : gdk_return ok = GDK_SUCCEED;
3110 2699 : logformat l;
3111 2699 : BUN p;
3112 2699 : lng nr;
3113 :
3114 2699 : if (BATtdense(uid)) {
3115 2679 : ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1, 0);
3116 2679 : log_unlock(lg);
3117 2679 : if (!LOG_DISABLED(lg) && ok != GDK_SUCCEED)
3118 0 : ATOMIC_DEC(&lg->current->refcount);
3119 2679 : return ok;
3120 : }
3121 :
3122 20 : assert(uid->ttype == TYPE_oid || uid->ttype == TYPE_void);
3123 :
3124 20 : l.flag = LOG_UPDATE;
3125 20 : l.id = id;
3126 20 : nr = (BATcount(uval));
3127 20 : assert(nr);
3128 :
3129 20 : if (LOG_DISABLED(lg)) {
3130 : /* logging is switched off */
3131 1 : log_unlock(lg);
3132 1 : return GDK_SUCCEED;
3133 : }
3134 :
3135 19 : BATiter vi = bat_iterator(uval);
3136 19 : gdk_return(*wh) (const void *, stream *, size_t) = BATatoms[TYPE_oid].atomWrite;
3137 19 : gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[uval->ttype].atomWrite;
3138 :
3139 19 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3140 38 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3141 38 : log_write_format(lg, &l) != GDK_SUCCEED ||
3142 38 : !mnstr_writeLng(lg->current->output_log, nr) ||
3143 19 : mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1) {
3144 0 : ok = GDK_FAIL;
3145 0 : goto bailout;
3146 : }
3147 1074 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3148 1055 : const oid id = BUNtoid(uid, p);
3149 :
3150 1055 : ok = wh(&id, lg->current->output_log, 1);
3151 : }
3152 19 : if (uval->ttype == TYPE_msk) {
3153 0 : if (!mnstr_writeIntArray(lg->current->output_log, vi.base,
3154 0 : (BATcount(uval) + 31) / 32))
3155 0 : ok = GDK_FAIL;
3156 32 : } else if (uval->ttype < TYPE_str && !isVIEW(uval)) {
3157 13 : const void *t = BUNtail(vi, 0);
3158 :
3159 13 : ok = wt(t, lg->current->output_log, (size_t) nr);
3160 6 : } else if (uval->ttype == TYPE_str) {
3161 : /* efficient string writes */
3162 6 : ok = string_writer(lg, uval, 0, nr);
3163 : } else {
3164 0 : for (p = 0; p < BATcount(uid) && ok == GDK_SUCCEED; p++) {
3165 0 : const void *val = BUNtail(vi, p);
3166 :
3167 0 : ok = wt(val, lg->current->output_log, 1);
3168 : }
3169 : }
3170 :
3171 19 : TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
3172 :
3173 19 : bailout:
3174 19 : bat_iterator_end(&vi);
3175 19 : if (ok != GDK_SUCCEED) {
3176 0 : const char *err = mnstr_peek_error(lg->current->output_log);
3177 0 : TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
3178 0 : ATOMIC_DEC(&lg->current->refcount);
3179 : }
3180 19 : log_unlock(lg);
3181 19 : return ok;
3182 : }
3183 :
3184 : static inline bool
3185 56230 : check_rotation_conditions(logger *lg)
3186 : {
3187 56230 : if (LOG_DISABLED(lg))
3188 : return false;
3189 :
3190 56227 : if (lg->current->next)
3191 : return false; /* do not rotate if there is already a prepared next current */
3192 56227 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
3193 : return true;
3194 56227 : const lng current_file_size = (lng) getfilepos(getFile(lg->current->output_log));
3195 :
3196 56227 : if (current_file_size == -1)
3197 : return false;
3198 :
3199 56227 : assert(current_file_size >= 0);
3200 :
3201 56227 : if (current_file_size == 2)
3202 : return false;
3203 :
3204 3309 : bool res = (lg->saved_id + 1 >= lg->id && ATOMIC_GET(&lg->current->drops) > (ulng)lg->max_dropped) ||
3205 57321 : current_file_size > lg->max_file_size ||
3206 49511 : (GDKusec() - lg->file_age) > lg->max_file_age;
3207 :
3208 54019 : return res;
3209 : }
3210 :
3211 : gdk_return
3212 61340 : log_tend(logger *lg)
3213 : {
3214 61340 : TRC_DEBUG(WAL, "tend %d\n", lg->tid);
3215 :
3216 61340 : if (LOG_DISABLED(lg))
3217 : return GDK_SUCCEED;
3218 :
3219 56227 : gdk_return result;
3220 56227 : logformat l;
3221 56227 : l.flag = LOG_END;
3222 56227 : l.id = lg->tid;
3223 :
3224 56227 : if ((result = log_write_format(lg, &l)) == GDK_SUCCEED)
3225 56227 : ATOMIC_INC(&lg->nr_flushers);
3226 : return result;
3227 : }
3228 :
3229 : #define flush_lock(lg) MT_lock_set(&(lg)->flush_lock)
3230 : #define flush_unlock(lg) MT_lock_unset(&(lg)->flush_lock)
3231 :
3232 : static inline gdk_return
3233 55380 : do_flush(logged_range *range)
3234 : {
3235 : /* assumes flush lock */
3236 55380 : stream *output_log = range->output_log;
3237 55380 : ulng ts = ATOMIC_GET(&range->last_ts);
3238 :
3239 55380 : if (mnstr_flush(output_log, MNSTR_FLUSH_DATA) ||
3240 55380 : (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) && mnstr_fsync(output_log)))
3241 0 : return GDK_FAIL;
3242 55380 : ATOMIC_SET(&range->flushed_ts, ts);
3243 55380 : return GDK_SUCCEED;
3244 : }
3245 :
3246 : static inline void
3247 61337 : log_tdone(logger *lg, logged_range *range, ulng commit_ts)
3248 : {
3249 61337 : (void) lg;
3250 61337 : TRC_DEBUG(WAL, "tdone " LLFMT "\n", commit_ts);
3251 :
3252 61337 : if ((ulng) ATOMIC_GET(&range->last_ts) < commit_ts)
3253 60490 : ATOMIC_SET(&range->last_ts, commit_ts);
3254 61337 : }
3255 :
3256 : gdk_return
3257 61340 : log_tflush(logger *lg, ulng file_id, ulng commit_ts)
3258 : {
3259 61340 : if (lg->flushnow) {
3260 5110 : rotation_lock(lg);
3261 5110 : logged_range *p = lg->current;
3262 5110 : assert(lg->flush_ranges == lg->current);
3263 5110 : assert(ATOMIC_GET(&lg->current->flushed_ts) == ATOMIC_GET(&lg->current->last_ts));
3264 5110 : log_tdone(lg, lg->current, commit_ts);
3265 5110 : ATOMIC_SET(&lg->current->flushed_ts, commit_ts);
3266 5110 : lg->id++;
3267 5110 : lg->flushnow = false;
3268 5110 : if (log_open_output(lg) != GDK_SUCCEED)
3269 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3270 5110 : do_rotate(lg);
3271 5110 : (void) do_flush_range_cleanup(lg);
3272 5110 : assert(lg->flush_ranges == lg->current);
3273 5110 : rotation_unlock(lg);
3274 5110 : return log_commit(lg, p, NULL, 0);
3275 : }
3276 :
3277 56230 : if (LOG_DISABLED(lg))
3278 : return GDK_SUCCEED;
3279 :
3280 56227 : rotation_lock(lg);
3281 56227 : logged_range *frange = do_flush_range_cleanup(lg);
3282 :
3283 56507 : while (frange->next && frange->id < file_id) {
3284 : assert(frange->next);
3285 : frange = frange->next;
3286 : }
3287 :
3288 56227 : log_tdone(lg, frange, commit_ts);
3289 :
3290 56227 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts) {
3291 : /* delay needed ? */
3292 :
3293 55380 : flush_lock(lg);
3294 : /* check it one more time */
3295 55380 : if ((ulng) ATOMIC_GET(&frange->flushed_ts) < commit_ts)
3296 55380 : do_flush(frange);
3297 55380 : flush_unlock(lg);
3298 : }
3299 : /* else somebody else has flushed our log file */
3300 :
3301 56227 : if (ATOMIC_DEC(&frange->refcount) == 1 && !LOG_DISABLED(lg)) {
3302 54133 : if (frange != lg->current && frange->output_log) {
3303 195 : close_stream(frange->output_log);
3304 195 : frange->output_log = NULL;
3305 : }
3306 : }
3307 :
3308 56227 : if (ATOMIC_DEC(&lg->nr_flushers) == 0) {
3309 : /* I am the last flusher
3310 : * if present,
3311 : * wake up the exclusive flusher in log_tstart */
3312 : /* rotation_lock is still being held */
3313 54442 : MT_cond_signal(&lg->excl_flush_cv);
3314 : }
3315 56227 : rotation_unlock(lg);
3316 :
3317 56227 : return GDK_SUCCEED;
3318 : }
3319 :
3320 : static gdk_return
3321 6658 : log_tsequence_(logger *lg, int seq, lng val)
3322 : {
3323 6658 : logformat l;
3324 :
3325 6658 : if (LOG_DISABLED(lg))
3326 : return GDK_SUCCEED;
3327 2761 : l.flag = LOG_SEQ;
3328 2761 : l.id = seq;
3329 :
3330 2761 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3331 :
3332 2761 : assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
3333 5522 : if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
3334 5522 : log_write_format(lg, &l) != GDK_SUCCEED ||
3335 2761 : !mnstr_writeLng(lg->current->output_log, val)) {
3336 0 : TRC_CRITICAL(GDK, "write failed\n");
3337 0 : ATOMIC_DEC(&lg->current->refcount);
3338 0 : return GDK_FAIL;
3339 : }
3340 : return GDK_SUCCEED;
3341 : }
3342 :
3343 : /* a transaction in it self */
3344 : gdk_return
3345 6658 : log_tsequence(logger *lg, int seq, lng val)
3346 : {
3347 6658 : BUN p;
3348 :
3349 6658 : TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
3350 :
3351 6658 : log_lock(lg);
3352 6658 : MT_lock_set(&lg->seqs_id->theaplock);
3353 6658 : BUN inserted = lg->seqs_id->batInserted;
3354 6658 : MT_lock_unset(&lg->seqs_id->theaplock);
3355 6658 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE && p >= inserted) {
3356 1607 : assert(lg->seqs_val->hseqbase == 0);
3357 1607 : if (BUNreplace(lg->seqs_val, p, &val, true) != GDK_SUCCEED) {
3358 0 : log_unlock(lg);
3359 0 : return GDK_FAIL;
3360 : }
3361 : } else {
3362 4693 : if (p != BUN_NONE) {
3363 4693 : oid pos = p;
3364 4693 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED) {
3365 0 : log_unlock(lg);
3366 0 : return GDK_FAIL;
3367 : }
3368 : }
3369 10102 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
3370 5051 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED) {
3371 0 : log_unlock(lg);
3372 0 : return GDK_FAIL;
3373 : }
3374 : }
3375 6658 : gdk_return r = log_tsequence_(lg, seq, val);
3376 6658 : log_unlock(lg);
3377 6658 : return r;
3378 : }
3379 :
3380 : static gdk_return
3381 11680 : bm_commit(logger *lg, logged_range *pending, uint32_t *updated, BUN maxupdated)
3382 : {
3383 11680 : log_lock(lg);
3384 11680 : BAT *b = lg->catalog_bid;
3385 11680 : const log_bid *bids;
3386 :
3387 11680 : bids = (log_bid *) Tloc(b, 0);
3388 245476 : for (BUN p = b->batInserted, cnt = pending ? pending->cnt : BATcount(b); p < cnt; p++) {
3389 233796 : log_bid bid = bids[p];
3390 233796 : BAT *lb;
3391 :
3392 233796 : assert(bid);
3393 233796 : if ((lb = BBP_desc(bid)) == NULL || BATmode(lb, false) != GDK_SUCCEED) {
3394 0 : GDKwarning("Failed to set bat (%d%s) persistent\n", bid, !lb ? " gone" : "");
3395 0 : log_unlock(lg);
3396 0 : return GDK_FAIL;
3397 : }
3398 :
3399 233796 : assert(lb->batRestricted != BAT_WRITE);
3400 :
3401 233796 : TRC_DEBUG(WAL, "create %d (%d)\n", bid, BBP_lrefs(bid));
3402 : }
3403 : /* bm_subcommit releases the lock */
3404 11680 : return bm_subcommit(lg, pending, updated, maxupdated);
3405 : }
3406 :
3407 : static gdk_return
3408 234149 : log_add_bat(logger *lg, BAT *b, log_id id, int tid)
3409 : {
3410 234149 : log_bid bid = internal_find_bat(lg, id, tid);
3411 234149 : lng cnt = 0;
3412 234149 : lng lid = lng_nil;
3413 :
3414 234149 : assert(b->batRestricted != BAT_WRITE);
3415 234149 : assert(b->batRole == PERSISTENT);
3416 234149 : if (bid < 0)
3417 : return GDK_FAIL;
3418 234149 : if (bid) {
3419 155496 : if (bid != b->batCacheid) {
3420 155493 : if (log_del_bat(lg, bid) != GDK_SUCCEED)
3421 : return GDK_FAIL;
3422 : } else {
3423 : return GDK_SUCCEED;
3424 : }
3425 : }
3426 234146 : bid = b->batCacheid;
3427 234146 : TRC_DEBUG(WAL, "create %d\n", id);
3428 234146 : assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
3429 468292 : if (BUNappend(lg->catalog_bid, &bid, true) != GDK_SUCCEED ||
3430 468292 : BUNappend(lg->catalog_id, &id, true) != GDK_SUCCEED ||
3431 468292 : BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED ||
3432 234146 : BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
3433 0 : return GDK_FAIL;
3434 234146 : if (lg->current)
3435 233869 : lg->current->cnt++;
3436 234146 : BBPretain(bid);
3437 234146 : return GDK_SUCCEED;
3438 : }
3439 :
3440 : static gdk_return
3441 176918 : log_del_bat(logger *lg, log_bid bid)
3442 : {
3443 176918 : BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid);
3444 176918 : lng lid = lg->tid;
3445 :
3446 176918 : assert(p != BUN_NONE);
3447 176918 : if (p == BUN_NONE) {
3448 : GDKerror("cannot find BAT\n");
3449 : return GDK_FAIL;
3450 : }
3451 :
3452 176918 : assert(lg->catalog_lid->hseqbase == 0);
3453 176918 : return BUNreplace(lg->catalog_lid, p, &lid, false);
3454 : }
3455 :
3456 : /* returns -1 on failure, 0 when not found, > 0 when found */
3457 : log_bid
3458 28616 : log_find_bat(logger *lg, log_id id)
3459 : {
3460 28616 : log_lock(lg);
3461 28616 : log_bid bid = internal_find_bat(lg, id, -1);
3462 28616 : log_unlock(lg);
3463 28616 : if (!bid) {
3464 0 : GDKerror("logger_find_bat failed to find bid for object %d\n", id);
3465 0 : return GDK_FAIL;
3466 : }
3467 : return bid;
3468 : }
3469 :
3470 :
3471 :
3472 : gdk_return
3473 61340 : log_tstart(logger *lg, bool flushnow, ulng *file_id)
3474 : {
3475 61340 : rotation_lock(lg);
3476 61340 : if (flushnow) {
3477 : /* I am now the exclusive flusher */
3478 5110 : if (ATOMIC_GET(&lg->nr_flushers)) {
3479 : /* I am waiting until all existing flushers are done */
3480 0 : MT_cond_wait(&lg->excl_flush_cv, &lg->rotation_lock);
3481 : }
3482 5110 : assert(ATOMIC_GET(&lg->nr_flushers) == 0);
3483 :
3484 5110 : if (ATOMIC_GET(&lg->current->last_ts)) {
3485 1989 : lg->id++;
3486 1989 : if (log_open_output(lg) != GDK_SUCCEED)
3487 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3488 : }
3489 5110 : do_rotate(lg);
3490 5110 : (void) do_flush_range_cleanup(lg);
3491 5110 : rotation_unlock(lg);
3492 :
3493 5110 : if (lg->saved_id + 1 < lg->id)
3494 4705 : log_flush(lg, (1ULL << 63));
3495 5110 : lg->flushnow = flushnow;
3496 : } else {
3497 56230 : if (check_rotation_conditions(lg)) {
3498 4508 : lg->id++;
3499 4508 : if (log_open_output(lg) != GDK_SUCCEED)
3500 0 : GDKfatal("Could not create new log file\n"); /* TODO: does not have to be fatal (yet) */
3501 : }
3502 56230 : do_rotate(lg);
3503 56230 : rotation_unlock(lg);
3504 : }
3505 :
3506 61340 : if (LOG_DISABLED(lg))
3507 : return GDK_SUCCEED;
3508 :
3509 56227 : ATOMIC_INC(&lg->current->refcount);
3510 56227 : *file_id = lg->current->id;
3511 56227 : logformat l;
3512 56227 : l.flag = LOG_START;
3513 56227 : l.id = ++lg->tid;
3514 :
3515 56227 : TRC_DEBUG(WAL, "tstart %d\n", lg->tid);
3516 56227 : if (log_write_format(lg, &l) != GDK_SUCCEED) {
3517 0 : ATOMIC_DEC(&lg->current->refcount);
3518 0 : return GDK_FAIL;
3519 : }
3520 :
3521 : return GDK_SUCCEED;
3522 : }
3523 :
3524 : void
3525 114 : log_printinfo(logger *lg)
3526 : {
3527 114 : printf("logger %s:\n", lg->fn);
3528 114 : rotation_lock(lg);
3529 114 : printf("current log file "ULLFMT", last handled log file "ULLFMT"\n",
3530 : lg->id, lg->saved_id);
3531 114 : rotation_unlock(lg);
3532 114 : printf("current transaction id %d, saved transaction id %d\n",
3533 : lg->tid, lg->saved_tid);
3534 114 : printf("number of flushers: %d\n", (int) ATOMIC_GET(&lg->nr_flushers));
3535 114 : printf("number of catalog entries "BUNFMT", of which "BUNFMT" deleted\n",
3536 114 : lg->catalog_bid->batCount, lg->dcatalog->batCount);
3537 269 : for (logged_range *p = lg->pending; p; p = p->next) {
3538 155 : char buf[32];
3539 155 : if (p->output_log == NULL ||
3540 114 : snprintf(buf, sizeof(buf), ", file size %"PRIu64, (uint64_t) getfilepos(getFile(lg->current->output_log))) >= (int) sizeof(buf))
3541 41 : buf[0] = 0;
3542 196 : printf("pending range "ULLFMT": drops %"PRIu64", last_ts %"PRIu64", flushed_ts %"PRIu64", refcount %"PRIu64"%s%s\n", p->id, (uint64_t) ATOMIC_GET(&p->drops), (uint64_t) ATOMIC_GET(&p->last_ts), (uint64_t) ATOMIC_GET(&p->flushed_ts), (uint64_t) ATOMIC_GET(&p->refcount), buf, p == lg->current ? " (current)" : "");
3543 : }
3544 114 : }
|