Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * (author) N. J. Nes
15 : *
16 : * In the philosophy of MonetDB, transaction management overhead
17 : * should only be paid when necessary. Transaction management is for
18 : * this purpose implemented as a separate module and applications are
19 : * required to obey the transaction policy, e.g. obtaining/releasing
20 : * locks.
21 : *
22 : * This module is designed to support efficient logging of the SQL
23 : * database. Once loaded, the SQL compiler will insert the proper
24 : * calls at transaction commit to include the changes in the log file.
25 : *
26 : * The logger uses a directory to store its log files. One master log
27 : * file stores information about the version of the logger and the
28 : * transaction log files. This file is a simple ascii file with the
29 : * following format:
30 : * {6DIGIT-VERSION\n[log file number \n]*]*}
31 : * The transaction log files have a binary format, which stores fixed
32 : * size logformat headers (flag,nr,bid), where the flag is the type of
33 : * update logged. The nr field indicates how many changes there were
34 : * (in case of inserts/deletes). The bid stores the bid identifier.
35 : *
36 : * The key decision to be made by the user is the location of the log
37 : * file. Ideally, it should be stored in fail-safe environment, or at
38 : * least the log and databases should be on separate disk columns.
39 : *
40 : * This file system may reside on the same hardware as the database
41 : * server and therefore the writes are done to the same disk, but
42 : * could also reside on another system and then the changes are
43 : * flushed through the network. The logger works under the assumption
44 : * that it is called to safeguard updates on the database when it has
45 : * an exclusive lock on the latest version. This lock should be
46 : * guaranteed by the calling transaction manager first.
47 : *
48 : * Finding the updates applied to a BAT is relatively easy, because
49 : * each BAT contains a delta structure. On commit these changes are
50 : * written to the log file and the delta management is reset. Since
51 : * each commit is written to the same log file, the beginning and end
52 : * are marked by a log identifier.
53 : *
54 : * A server restart should only (re)process blocks which are
55 : * completely written to disk. A log replay therefore ends in a commit
56 : * or abort on the changed bats. Once all logs have been read, the
57 : * changes to the bats are made persistent, i.e. a bbp sub-commit is
58 : * done.
59 : */
60 : #include "monetdb_config.h"
61 : #include "gdk.h"
62 : #include "gdk_private.h"
63 : #include "gdk_logger.h"
64 : #include "gdk_logger_internals.h"
65 : #include "mutils.h"
66 : #include <string.h>
67 :
68 : /*
69 : * The log record encoding is geared at reduced storage space, but at
70 : * the expense of readability. A user can not easily inspect the log a
71 : * posteriori to check what has happened.
72 : *
73 : */
74 : #define LOG_START 1
75 : #define LOG_END 2
76 : #define LOG_INSERT 3
77 : #define LOG_UPDATE 5
78 : #define LOG_CREATE 6
79 : #define LOG_DESTROY 7
80 : #define LOG_USE 8
81 : #define LOG_CLEAR 9
82 : #define LOG_SEQ 10
83 : #define LOG_INSERT_ID 11
84 : #define LOG_UPDATE_ID 12
85 : #define LOG_CREATE_ID 13
86 : #define LOG_DESTROY_ID 14
87 : #define LOG_USE_ID 15
88 : #define LOG_CLEAR_ID 16
89 : #define LOG_UPDATE_PAX 17
90 :
91 : #ifdef NATIVE_WIN32
92 : #define getfilepos _ftelli64
93 : #else
94 : #ifdef HAVE_FSEEKO
95 : #define getfilepos ftello
96 : #else
97 : #define getfilepos ftell
98 : #endif
99 : #endif
100 :
101 : #define BATSIZE 0
102 :
103 : #define NAME(name,tpe,id) (name?name:"tpe id")
104 :
105 : #define LOG_DISABLED(lg) ((lg)->lg->debug&128)
106 :
107 : static gdk_return logger_cleanup(old_logger *lg);
108 : static gdk_return logger_add_bat(old_logger *lg, BAT *b, const char *name, char tpe, oid id);
109 : static gdk_return logger_del_bat(old_logger *lg, log_bid bid);
110 :
111 : static const char *log_commands[] = {
112 : NULL,
113 : "LOG_START",
114 : "LOG_END",
115 : "LOG_INSERT",
116 : "LOG_DELETE",
117 : "LOG_UPDATE",
118 : "LOG_CREATE",
119 : "LOG_DESTROY",
120 : "LOG_USE",
121 : "LOG_CLEAR",
122 : "LOG_SEQ",
123 : "LOG_INSERT_ID",
124 : "LOG_DELETE_ID",
125 : "LOG_UPDATE_ID",
126 : "LOG_CREATE_ID",
127 : "LOG_DESTROY_ID",
128 : "LOG_USE_ID",
129 : "LOG_CLEAR_ID",
130 : "LOG_UPDATE_PAX",
131 : };
132 :
133 : typedef struct logaction {
134 : int type; /* type of change */
135 : lng nr;
136 : int ht; /* vid(-1),void etc */
137 : int tt;
138 : lng id;
139 : char *name; /* optional */
140 : char tpe; /* tpe of column */
141 : oid cid; /* id of object */
142 : BAT *b; /* temporary bat with changes */
143 : BAT *uid; /* temporary bat with bun positions to update */
144 : } logaction;
145 :
146 : /* during the recover process a number of transactions could be active */
147 : typedef struct trans {
148 : int tid; /* transaction id */
149 : int sz; /* sz of the changes array */
150 : int nr; /* nr of changes */
151 :
152 : logaction *changes;
153 :
154 : struct trans *tr;
155 : } trans;
156 :
157 : typedef struct logformat_t {
158 : char flag;
159 : int tid;
160 : lng nr;
161 : } logformat;
162 :
163 : typedef enum {LOG_OK, LOG_EOF, LOG_ERR} log_return;
164 :
165 : static gdk_return tr_grow(trans *tr);
166 :
167 : static BUN
168 0 : log_find(BAT *b, BAT *d, int val)
169 : {
170 0 : BATiter cni = bat_iterator_nolock(b);
171 0 : BUN p;
172 :
173 0 : assert(b->ttype == TYPE_int);
174 0 : assert(d->ttype == TYPE_oid);
175 0 : if (BAThash(b) == GDK_SUCCEED) {
176 0 : MT_rwlock_rdlock(&cni.b->thashlock);
177 0 : HASHloop_int(cni, cni.b->thash, p, &val) {
178 0 : oid pos = p;
179 0 : if (BUNfnd(d, &pos) == BUN_NONE) {
180 0 : MT_rwlock_rdunlock(&cni.b->thashlock);
181 0 : return p;
182 : }
183 : }
184 0 : MT_rwlock_rdunlock(&cni.b->thashlock);
185 : } else { /* unlikely: BAThash failed */
186 0 : BUN q;
187 0 : int *t = (int *) Tloc(b, 0);
188 :
189 0 : for (p = 0, q = BATcount(b); p < q; p++) {
190 0 : if (t[p] == val) {
191 0 : oid pos = p;
192 0 : if (BUNfnd(d, &pos) == BUN_NONE) {
193 0 : return p;
194 : }
195 : }
196 : }
197 : }
198 : return BUN_NONE;
199 : }
200 :
201 : static inline void
202 0 : logbat_destroy(BAT *b)
203 : {
204 0 : BBPreclaim(b);
205 0 : }
206 :
207 : static BAT *
208 0 : logbat_new(int tt, BUN size, role_t role)
209 : {
210 0 : BAT *nb = COLnew(0, tt, size, role);
211 :
212 0 : if (nb) {
213 0 : if (role == PERSISTENT)
214 0 : BATmode(nb, false);
215 : } else {
216 0 : TRC_CRITICAL(GDK, "creating new BAT[%s]#" BUNFMT " failed\n", ATOMname(tt), size);
217 : }
218 0 : return nb;
219 : }
220 :
221 : static int
222 0 : log_read_format(old_logger *l, logformat *data)
223 : {
224 0 : return mnstr_read(l->log, &data->flag, 1, 1) == 1 &&
225 0 : mnstr_readLng(l->log, &data->nr) == 1 &&
226 0 : mnstr_readInt(l->log, &data->tid) == 1;
227 : }
228 :
229 : static char *
230 0 : log_read_string(old_logger *l)
231 : {
232 0 : int len;
233 0 : ssize_t nr;
234 0 : char *buf;
235 :
236 0 : if (mnstr_readInt(l->log, &len) != 1) {
237 0 : TRC_CRITICAL(GDK, "read failed\n");
238 : //MK This leads to non-repeatable log structure?
239 0 : return NULL;
240 : }
241 0 : if (len == 0)
242 : return NULL;
243 0 : buf = GDKmalloc(len);
244 0 : if (buf == NULL) {
245 0 : TRC_CRITICAL(GDK, "malloc failed\n");
246 : /* this is bad */
247 0 : return (char *) -1;
248 : }
249 :
250 0 : if ((nr = mnstr_read(l->log, buf, 1, len)) != (ssize_t) len) {
251 0 : buf[len - 1] = 0;
252 0 : TRC_CRITICAL(GDK, "couldn't read name (%s) %zd\n", buf, nr);
253 0 : GDKfree(buf);
254 0 : return NULL;
255 : }
256 0 : buf[len - 1] = 0;
257 0 : return buf;
258 : }
259 :
260 : static log_return
261 0 : log_read_clear(old_logger *lg, trans *tr, char *name, char tpe, oid id)
262 : {
263 0 : if (lg->lg->debug & 1)
264 0 : fprintf(stderr, "#logger found log_read_clear %s\n", NAME(name, tpe, id));
265 0 : if (tr_grow(tr) != GDK_SUCCEED)
266 : return LOG_ERR;
267 0 : tr->changes[tr->nr].type = LOG_CLEAR;
268 0 : tr->changes[tr->nr].tpe = tpe;
269 0 : tr->changes[tr->nr].cid = id;
270 0 : if (name && (tr->changes[tr->nr].name = GDKstrdup(name)) == NULL)
271 : return LOG_ERR;
272 0 : tr->nr++;
273 0 : return LOG_OK;
274 : }
275 :
276 : static int
277 0 : avoid_snapshot(old_logger *lg, log_bid bid)
278 : {
279 0 : if (BATcount(lg->snapshots_bid)-BATcount(lg->dsnapshots)) {
280 0 : BUN p = log_find(lg->snapshots_bid, lg->dsnapshots, bid);
281 :
282 0 : if (p != BUN_NONE) {
283 0 : int tid = *(int *) Tloc(lg->snapshots_tid, p);
284 :
285 0 : if (lg->tid <= tid)
286 : return 1;
287 : }
288 : }
289 : return 0;
290 : }
291 :
292 : log_bid
293 0 : old_logger_find_bat(old_logger *lg, const char *name, char tpe, oid id)
294 : {
295 0 : if (!tpe || !lg->with_ids) {
296 0 : BATiter cni = bat_iterator_nolock(lg->catalog_nme);
297 0 : BUN p;
298 :
299 0 : assert(name != NULL);
300 0 : if (BAThash(lg->catalog_nme) == GDK_SUCCEED) {
301 0 : MT_rwlock_rdlock(&cni.b->thashlock);
302 0 : HASHloop_str(cni, cni.b->thash, p, name) {
303 0 : oid pos = p;
304 0 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
305 0 : oid lid = *(oid*) Tloc(lg->catalog_oid, p);
306 0 : if (!lid) {
307 0 : MT_rwlock_rdunlock(&cni.b->thashlock);
308 0 : return *(log_bid *) Tloc(lg->catalog_bid, p);
309 : }
310 : }
311 : }
312 0 : MT_rwlock_rdunlock(&cni.b->thashlock);
313 0 : return 0; /* not found */
314 : }
315 : } else {
316 0 : BATiter cni = bat_iterator_nolock(lg->catalog_oid);
317 0 : BUN p;
318 :
319 0 : if (BAThash(lg->catalog_oid) == GDK_SUCCEED) {
320 0 : lng lid = (lng) id;
321 0 : MT_rwlock_rdlock(&cni.b->thashlock);
322 0 : HASHloop_lng(cni, cni.b->thash, p, &lid) {
323 0 : oid pos = p;
324 0 : if (*(char*)Tloc(lg->catalog_tpe, p) == tpe) {
325 0 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
326 0 : MT_rwlock_rdunlock(&cni.b->thashlock);
327 0 : return *(log_bid *) Tloc(lg->catalog_bid, p);
328 : }
329 : }
330 : }
331 0 : MT_rwlock_rdunlock(&cni.b->thashlock);
332 0 : return 0; /* not found */
333 : }
334 : }
335 : return -1; /* BAThash failed */
336 : }
337 :
338 : static gdk_return
339 0 : la_bat_clear(old_logger *lg, logaction *la)
340 : {
341 0 : log_bid bid = old_logger_find_bat(lg, la->name, la->tpe, la->cid);
342 0 : BAT *b;
343 :
344 0 : if (bid < 0)
345 : return GDK_FAIL;
346 :
347 0 : if (lg->lg->debug & 1)
348 0 : fprintf(stderr, "#la_bat_clear %s\n", NAME(la->name, la->tpe, la->cid));
349 :
350 : /* do we need to skip these old updates */
351 0 : if (avoid_snapshot(lg, bid))
352 : return GDK_SUCCEED;
353 :
354 0 : b = BATdescriptor(bid);
355 0 : if (b) {
356 0 : restrict_t access = b->batRestricted;
357 0 : b->batRestricted = BAT_WRITE;
358 0 : BATclear(b, true);
359 0 : b->batRestricted = access;
360 0 : logbat_destroy(b);
361 : }
362 : return GDK_SUCCEED;
363 : }
364 :
365 : static log_return
366 0 : log_read_seq(old_logger *lg, logformat *l)
367 : {
368 0 : int seq = (int) l->nr;
369 0 : lng val;
370 0 : BUN p;
371 :
372 0 : assert(l->nr <= (lng) INT_MAX);
373 0 : if (mnstr_readLng(lg->log, &val) != 1) {
374 0 : TRC_CRITICAL(GDK, "read failed\n");
375 0 : return LOG_EOF;
376 : }
377 :
378 0 : if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
379 0 : p >= lg->seqs_id->batInserted) {
380 0 : assert(lg->seqs_val->hseqbase == 0);
381 0 : if (BUNreplace(lg->seqs_val, p, &val, false) != GDK_SUCCEED)
382 : return LOG_ERR;
383 : } else {
384 0 : if (p != BUN_NONE) {
385 0 : oid pos = p;
386 0 : if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED)
387 0 : return LOG_ERR;
388 : }
389 0 : if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
390 0 : BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED)
391 0 : return LOG_ERR;
392 : }
393 : return LOG_OK;
394 : }
395 :
396 : static log_return
397 0 : log_read_id(old_logger *lg, char *tpe, oid *id)
398 : {
399 0 : lng lid;
400 :
401 0 : if (mnstr_readChr(lg->log, tpe) != 1 ||
402 0 : mnstr_readLng(lg->log, &lid) != 1) {
403 0 : TRC_CRITICAL(GDK, "read failed\n");
404 0 : return LOG_EOF;
405 : }
406 0 : *id = (oid)lid;
407 0 : return LOG_OK;
408 : }
409 :
410 : static log_return
411 0 : log_read_updates(old_logger *lg, trans *tr, logformat *l, char *name, int tpe, oid id, int pax)
412 : {
413 0 : log_bid bid = old_logger_find_bat(lg, name, tpe, id);
414 0 : if (bid < 0)
415 : return LOG_ERR;
416 0 : BAT *b = BATdescriptor(bid);
417 0 : log_return res = LOG_OK;
418 0 : int ht = -1, tt = -1, tseq = 0;
419 :
420 0 : if (lg->lg->debug & 1) {
421 0 : if (name)
422 0 : fprintf(stderr, "#logger found log_read_updates %s %s " LLFMT "\n", name, l->flag == LOG_INSERT ? "insert" : "update", l->nr);
423 : else
424 0 : fprintf(stderr, "#logger found log_read_updates " OIDFMT " %s " LLFMT "\n", id, l->flag == LOG_INSERT ? "insert" : "update", l->nr);
425 : }
426 :
427 0 : if (b) {
428 0 : ht = TYPE_void;
429 0 : tt = b->ttype;
430 0 : if (tt == TYPE_void && BATtdense(b))
431 0 : tseq = 1;
432 : } else { /* search trans action for create statement */
433 : int i;
434 :
435 0 : for (i = 0; i < tr->nr; i++) {
436 0 : if (tr->changes[i].type == LOG_CREATE &&
437 0 : (tpe == 0 && name != NULL
438 0 : ? strcmp(tr->changes[i].name, name) == 0
439 0 : : tr->changes[i].tpe == tpe && tr->changes[i].cid == id)) {
440 0 : ht = tr->changes[i].ht;
441 0 : if (ht < 0) {
442 : ht = TYPE_void;
443 : }
444 0 : tt = tr->changes[i].tt;
445 0 : if (tt < 0) {
446 0 : tseq = 1;
447 0 : tt = TYPE_void;
448 : }
449 : break;
450 0 : } else if (tr->changes[i].type == LOG_USE &&
451 0 : (tpe == 0 && name != NULL
452 0 : ? strcmp(tr->changes[i].name, name) == 0
453 0 : : tr->changes[i].tpe == tpe && tr->changes[i].cid == id)) {
454 0 : log_bid bid = (log_bid) tr->changes[i].nr;
455 0 : BAT *b = BATdescriptor(bid);
456 :
457 0 : if (b) {
458 0 : ht = TYPE_void;
459 0 : tt = b->ttype;
460 : }
461 : break;
462 : }
463 : }
464 0 : assert(i < tr->nr); /* found one */
465 : }
466 0 : assert((ht == TYPE_void && l->flag == LOG_INSERT) ||
467 : ((ht == TYPE_oid || !ht) && l->flag == LOG_UPDATE));
468 0 : if ((ht != TYPE_void && l->flag == LOG_INSERT) ||
469 0 : ((ht != TYPE_void && ht != TYPE_oid) && l->flag == LOG_UPDATE))
470 : return LOG_ERR;
471 0 : if (ht >= 0 && tt >= 0) {
472 0 : BAT *uid = NULL;
473 0 : BAT *r;
474 0 : void *(*rt) (ptr, size_t *, stream *, size_t) = BATatoms[tt].atomRead;
475 :
476 0 : assert(l->nr <= (lng) BUN_MAX);
477 0 : if (l->flag == LOG_UPDATE) {
478 0 : uid = COLnew(0, ht, (BUN) l->nr, PERSISTENT);
479 0 : if (uid == NULL) {
480 0 : logbat_destroy(b);
481 0 : return LOG_ERR;
482 : }
483 : } else {
484 0 : assert(ht == TYPE_void);
485 : }
486 0 : r = COLnew(0, tt, (BUN) l->nr, PERSISTENT);
487 0 : if (r == NULL) {
488 0 : BBPreclaim(uid);
489 0 : logbat_destroy(b);
490 0 : return LOG_ERR;
491 : }
492 :
493 0 : if (tseq)
494 0 : BATtseqbase(r, 0);
495 :
496 0 : if (ht == TYPE_void && l->flag == LOG_INSERT) {
497 0 : lng nr = l->nr;
498 0 : for (; res == LOG_OK && nr > 0; nr--) {
499 0 : size_t tlen = lg->lg->rbufsize;
500 0 : void *t = rt(lg->lg->rbuf, &tlen, lg->log, 1);
501 :
502 0 : if (t == NULL) {
503 : /* see if failure was due to
504 : * malloc or something less
505 : * serious (in the current
506 : * context) */
507 0 : if (strstr(GDKerrbuf, "alloc") == NULL)
508 : res = LOG_EOF;
509 : else
510 0 : res = LOG_ERR;
511 0 : break;
512 : } else {
513 0 : lg->lg->rbuf = t;
514 0 : lg->lg->rbufsize = tlen;
515 : }
516 0 : if (BUNappend(r, t, true) != GDK_SUCCEED)
517 0 : res = LOG_ERR;
518 : }
519 : } else {
520 0 : void *(*rh) (ptr, size_t *, stream *, size_t) = ht == TYPE_void ? BATatoms[TYPE_oid].atomRead : BATatoms[ht].atomRead;
521 0 : void *hv = ATOMnil(ht);
522 0 : size_t hlen = ATOMsize(ht);
523 :
524 0 : if (hv == NULL)
525 0 : res = LOG_ERR;
526 :
527 0 : if (!pax) {
528 0 : lng nr = l->nr;
529 0 : for (; res == LOG_OK && nr > 0; nr--) {
530 0 : size_t tlen = lg->lg->rbufsize;
531 0 : void *h = rh(hv, &hlen, lg->log, 1);
532 0 : void *t = rt(lg->lg->rbuf, &tlen, lg->log, 1);
533 :
534 0 : if (t != NULL) {
535 0 : lg->lg->rbuf = t;
536 0 : lg->lg->rbufsize = tlen;
537 : }
538 0 : if (h == NULL)
539 : res = LOG_EOF;
540 0 : else if (t == NULL) {
541 0 : if (strstr(GDKerrbuf, "malloc") == NULL)
542 : res = LOG_EOF;
543 : else
544 0 : res = LOG_ERR;
545 0 : } else if (BUNappend(uid, h, true) != GDK_SUCCEED ||
546 0 : BUNappend(r, t, true) != GDK_SUCCEED)
547 : res = LOG_ERR;
548 : }
549 : } else {
550 0 : char compressed = 0;
551 0 : lng nr = l->nr;
552 :
553 0 : if (mnstr_read(lg->log, &compressed, 1, 1) != 1)
554 0 : return LOG_ERR;
555 :
556 0 : if (compressed) {
557 0 : void *h = rh(hv, &hlen, lg->log, 1);
558 :
559 0 : assert(uid->ttype == TYPE_void);
560 0 : if (h == NULL)
561 : res = LOG_EOF;
562 : else {
563 0 : BATtseqbase(uid, *(oid*)h);
564 0 : BATsetcount(uid, (BUN) l->nr);
565 : }
566 : } else {
567 0 : for (; res == LOG_OK && nr > 0; nr--) {
568 0 : void *h = rh(hv, &hlen, lg->log, 1);
569 :
570 0 : if (h == NULL)
571 : res = LOG_EOF;
572 0 : else if (BUNappend(uid, h, true) != GDK_SUCCEED)
573 0 : res = LOG_ERR;
574 : }
575 : }
576 0 : nr = l->nr;
577 0 : for (; res == LOG_OK && nr > 0; nr--) {
578 0 : size_t tlen = lg->lg->rbufsize;
579 0 : void *t = rt(lg->lg->rbuf, &tlen, lg->log, 1);
580 :
581 0 : if (t == NULL) {
582 0 : if (strstr(GDKerrbuf, "malloc") == NULL)
583 : res = LOG_EOF;
584 : else
585 0 : res = LOG_ERR;
586 : } else {
587 0 : lg->lg->rbuf = t;
588 0 : lg->lg->rbufsize = tlen;
589 0 : if (BUNappend(r, t, true) != GDK_SUCCEED)
590 0 : res = LOG_ERR;
591 : }
592 : }
593 : }
594 0 : GDKfree(hv);
595 : }
596 :
597 0 : if (res == LOG_OK) {
598 0 : if (tr_grow(tr) == GDK_SUCCEED) {
599 0 : tr->changes[tr->nr].type = l->flag;
600 0 : tr->changes[tr->nr].nr = l->nr;
601 0 : tr->changes[tr->nr].ht = ht;
602 0 : tr->changes[tr->nr].tt = tt;
603 0 : tr->changes[tr->nr].tpe = tpe;
604 0 : tr->changes[tr->nr].cid = id;
605 0 : if (name && (tr->changes[tr->nr].name = GDKstrdup(name)) == NULL) {
606 0 : logbat_destroy(b);
607 0 : BBPreclaim(uid);
608 0 : BBPreclaim(r);
609 0 : return LOG_ERR;
610 : }
611 0 : tr->changes[tr->nr].b = r;
612 0 : tr->changes[tr->nr].uid = uid;
613 0 : tr->nr++;
614 : } else {
615 : res = LOG_ERR;
616 : }
617 : }
618 : } else {
619 : /* bat missing ERROR or ignore ? currently error. */
620 : res = LOG_ERR;
621 : }
622 0 : logbat_destroy(b);
623 : return res;
624 : }
625 :
626 : static gdk_return
627 0 : la_bat_updates(old_logger *lg, logaction *la)
628 : {
629 0 : log_bid bid = old_logger_find_bat(lg, la->name, la->tpe, la->cid);
630 0 : BAT *b;
631 :
632 0 : if (bid < 0)
633 : return GDK_FAIL;
634 0 : if (bid == 0)
635 : return GDK_SUCCEED; /* ignore bats no longer in the catalog */
636 :
637 : /* do we need to skip these old updates */
638 0 : if (avoid_snapshot(lg, bid))
639 : return GDK_SUCCEED;
640 :
641 0 : b = BATdescriptor(bid);
642 0 : if (b == NULL)
643 : return GDK_FAIL;
644 0 : if (la->type == LOG_INSERT) {
645 0 : if (BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
646 0 : logbat_destroy(b);
647 0 : return GDK_FAIL;
648 : }
649 0 : } else if (la->type == LOG_UPDATE) {
650 0 : BATiter vi = bat_iterator(la->b);
651 0 : BUN p, q;
652 :
653 0 : BATloop(la->b, p, q) {
654 0 : oid h = BUNtoid(la->uid, p);
655 0 : const void *t = BUNtail(vi, p);
656 :
657 0 : if (h < b->hseqbase || h >= b->hseqbase + BATcount(b)) {
658 : /* if value doesn't exist, insert it;
659 : * if b void headed, maintain that by
660 : * inserting nils */
661 0 : if (b->batCount == 0 && !is_oid_nil(h))
662 0 : b->hseqbase = h;
663 0 : if (!is_oid_nil(b->hseqbase) && !is_oid_nil(h)) {
664 0 : const void *tv = ATOMnilptr(b->ttype);
665 :
666 0 : while (b->hseqbase + b->batCount < h) {
667 0 : if (BUNappend(b, tv, true) != GDK_SUCCEED) {
668 0 : logbat_destroy(b);
669 0 : bat_iterator_end(&vi);
670 0 : return GDK_FAIL;
671 : }
672 : }
673 : }
674 0 : if (BUNappend(b, t, true) != GDK_SUCCEED) {
675 0 : logbat_destroy(b);
676 0 : bat_iterator_end(&vi);
677 0 : return GDK_FAIL;
678 : }
679 : } else {
680 0 : if (BUNreplace(b, h, t, true) != GDK_SUCCEED) {
681 0 : logbat_destroy(b);
682 0 : bat_iterator_end(&vi);
683 0 : return GDK_FAIL;
684 : }
685 : }
686 : }
687 0 : bat_iterator_end(&vi);
688 : }
689 0 : logbat_destroy(b);
690 0 : return GDK_SUCCEED;
691 : }
692 :
693 : static log_return
694 0 : log_read_destroy(old_logger *lg, trans *tr, char *name, char tpe, oid id)
695 : {
696 0 : (void) lg;
697 0 : if (tr_grow(tr) == GDK_SUCCEED) {
698 0 : tr->changes[tr->nr].type = LOG_DESTROY;
699 0 : tr->changes[tr->nr].tpe = tpe;
700 0 : tr->changes[tr->nr].cid = id;
701 0 : if (name && (tr->changes[tr->nr].name = GDKstrdup(name)) == NULL)
702 : return LOG_ERR;
703 0 : tr->nr++;
704 : }
705 : return LOG_OK;
706 : }
707 :
708 : static gdk_return
709 0 : la_bat_destroy(old_logger *lg, logaction *la)
710 : {
711 0 : log_bid bid = old_logger_find_bat(lg, la->name, la->tpe, la->cid);
712 :
713 0 : if (bid < 0)
714 : return GDK_FAIL;
715 0 : if (bid) {
716 0 : BUN p;
717 :
718 0 : if (logger_del_bat(lg, bid) != GDK_SUCCEED)
719 : return GDK_FAIL;
720 :
721 0 : if ((p = log_find(lg->snapshots_bid, lg->dsnapshots, bid)) != BUN_NONE) {
722 0 : oid pos = (oid) p;
723 : #ifndef NDEBUG
724 0 : BAT *b = BBP_desc(bid);
725 0 : assert(b->batRole == PERSISTENT);
726 0 : assert(0 <= b->theap->farmid && b->theap->farmid < MAXFARMS);
727 0 : assert(BBPfarms[b->theap->farmid].roles & (1 << PERSISTENT));
728 0 : if (b->tvheap) {
729 0 : assert(0 <= b->tvheap->farmid && b->tvheap->farmid < MAXFARMS);
730 0 : assert(BBPfarms[b->tvheap->farmid].roles & (1 << PERSISTENT));
731 : }
732 : #endif
733 0 : if (BUNappend(lg->dsnapshots, &pos, true) != GDK_SUCCEED)
734 0 : return GDK_FAIL;
735 : }
736 : }
737 : return GDK_SUCCEED;
738 : }
739 :
740 : static log_return
741 0 : log_read_create(old_logger *lg, trans *tr, char *name, char tpe, oid id)
742 : {
743 0 : char *buf = log_read_string(lg);
744 0 : int ht, tt;
745 0 : char *ha, *ta;
746 :
747 0 : if (lg->lg->debug & 1)
748 0 : fprintf(stderr, "#log_read_create %s\n", name);
749 :
750 0 : if (buf == NULL)
751 : return LOG_EOF;
752 0 : if (buf == (char *) -1)
753 : return LOG_ERR;
754 0 : ha = buf;
755 0 : ta = strchr(buf, ',');
756 0 : if (ta == NULL) {
757 0 : TRC_CRITICAL(GDK, "inconsistent data read\n");
758 0 : return LOG_ERR;
759 : }
760 0 : *ta++ = 0; /* skip over , */
761 0 : if (strcmp(ha, "vid") == 0) {
762 : ht = -1;
763 : } else {
764 0 : ht = ATOMindex(ha);
765 : }
766 0 : if (strcmp(ta, "vid") == 0) {
767 : tt = -1;
768 : } else {
769 0 : tt = ATOMindex(ta);
770 : }
771 0 : GDKfree(buf);
772 0 : if (tr_grow(tr) == GDK_SUCCEED) {
773 0 : tr->changes[tr->nr].type = LOG_CREATE;
774 0 : tr->changes[tr->nr].ht = ht;
775 0 : tr->changes[tr->nr].tt = tt;
776 0 : tr->changes[tr->nr].tpe = tpe;
777 0 : tr->changes[tr->nr].cid = id;
778 0 : if ((tr->changes[tr->nr].name = GDKstrdup(name)) == NULL)
779 : return LOG_ERR;
780 0 : tr->changes[tr->nr].b = NULL;
781 0 : tr->nr++;
782 : }
783 :
784 : return LOG_OK;
785 : }
786 :
787 : static gdk_return
788 0 : la_bat_create(old_logger *lg, logaction *la)
789 : {
790 0 : int tt = (la->tt < 0) ? TYPE_void : la->tt;
791 0 : BAT *b;
792 :
793 : /* formerly head column type, should be void */
794 0 : assert(((la->ht < 0) ? TYPE_void : la->ht) == TYPE_void);
795 0 : if ((b = COLnew(0, tt, BATSIZE, PERSISTENT)) == NULL)
796 : return GDK_FAIL;
797 :
798 0 : if (la->tt < 0)
799 0 : BATtseqbase(b, 0);
800 :
801 0 : if ((b = BATsetaccess(b, BAT_READ)) == NULL ||
802 0 : logger_add_bat(lg, b, la->name, la->tpe, la->cid) != GDK_SUCCEED) {
803 0 : logbat_destroy(b);
804 0 : return GDK_FAIL;
805 : }
806 0 : logbat_destroy(b);
807 0 : return GDK_SUCCEED;
808 : }
809 :
810 : static log_return
811 0 : log_read_use(old_logger *lg, trans *tr, logformat *l, char *name, char tpe, oid id)
812 : {
813 0 : (void) lg;
814 :
815 0 : if (tr_grow(tr) != GDK_SUCCEED)
816 : return LOG_ERR;
817 0 : tr->changes[tr->nr].type = LOG_USE;
818 0 : tr->changes[tr->nr].nr = l->nr;
819 0 : tr->changes[tr->nr].tpe = tpe;
820 0 : tr->changes[tr->nr].cid = id;
821 0 : if ((tr->changes[tr->nr].name = GDKstrdup(name)) == NULL)
822 : return LOG_ERR;
823 0 : tr->changes[tr->nr].b = NULL;
824 0 : tr->nr++;
825 0 : return LOG_OK;
826 : }
827 :
828 : static gdk_return
829 0 : la_bat_use(old_logger *lg, logaction *la)
830 : {
831 0 : log_bid bid = (log_bid) la->nr;
832 0 : BAT *b = BATdescriptor(bid);
833 0 : BUN p;
834 :
835 0 : assert(la->nr <= (lng) INT_MAX);
836 0 : if (b == NULL) {
837 0 : GDKerror("logger: could not use bat (%d) for %s\n", (int) bid, NAME(la->name, la->tpe, la->cid));
838 0 : return GDK_FAIL;
839 : }
840 0 : if (logger_add_bat(lg, b, la->name, la->tpe, la->cid) != GDK_SUCCEED)
841 0 : goto bailout;
842 : #ifndef NDEBUG
843 0 : assert(b->batRole == PERSISTENT);
844 0 : assert(0 <= b->theap->farmid && b->theap->farmid < MAXFARMS);
845 0 : assert(BBPfarms[b->theap->farmid].roles & (1 << PERSISTENT));
846 0 : if (b->tvheap) {
847 0 : assert(0 <= b->tvheap->farmid && b->tvheap->farmid < MAXFARMS);
848 0 : assert(BBPfarms[b->tvheap->farmid].roles & (1 << PERSISTENT));
849 : }
850 : #endif
851 0 : if ((p = log_find(lg->snapshots_bid, lg->dsnapshots, b->batCacheid)) != BUN_NONE &&
852 0 : p >= lg->snapshots_bid->batInserted) {
853 0 : assert(lg->snapshots_tid->hseqbase == 0);
854 0 : if (BUNreplace(lg->snapshots_tid, p, &lg->tid, true) != GDK_SUCCEED)
855 0 : goto bailout;
856 : } else {
857 0 : if (p != BUN_NONE) {
858 0 : oid pos = p;
859 0 : if (BUNappend(lg->dsnapshots, &pos, true) != GDK_SUCCEED)
860 0 : goto bailout;
861 : }
862 : /* move to the dirty new part of the snapshots list,
863 : * new snapshots will get flushed to disk */
864 0 : if (BUNappend(lg->snapshots_bid, &b->batCacheid, true) != GDK_SUCCEED ||
865 0 : BUNappend(lg->snapshots_tid, &lg->tid, true) != GDK_SUCCEED)
866 0 : goto bailout;
867 : }
868 0 : logbat_destroy(b);
869 0 : return GDK_SUCCEED;
870 :
871 0 : bailout:
872 0 : logbat_destroy(b);
873 0 : return GDK_FAIL;
874 : }
875 :
876 :
877 : #define TR_SIZE 1024
878 :
879 : static trans *
880 0 : tr_create(trans *tr, int tid)
881 : {
882 0 : trans *ntr = GDKmalloc(sizeof(trans));
883 :
884 0 : if (ntr == NULL)
885 : return NULL;
886 0 : ntr->tid = tid;
887 0 : ntr->sz = TR_SIZE;
888 0 : ntr->nr = 0;
889 0 : ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
890 0 : if (ntr->changes == NULL) {
891 0 : GDKfree(ntr);
892 0 : return NULL;
893 : }
894 0 : ntr->tr = tr;
895 0 : return ntr;
896 : }
897 :
898 : static trans *
899 0 : tr_find(trans *tr, int tid)
900 : /* finds the tid and reorders the chain list, puts trans with tid first */
901 : {
902 0 : trans *t = tr, *p = NULL;
903 :
904 0 : while (t && t->tid != tid) {
905 0 : p = t;
906 0 : t = t->tr;
907 : }
908 0 : if (t == NULL)
909 : return NULL; /* BAD missing transaction */
910 0 : if (t == tr)
911 : return tr;
912 0 : if (t->tr) /* get this tid out of the list */
913 0 : p->tr = t->tr;
914 0 : t->tr = tr; /* and move it to the front */
915 0 : return t;
916 : }
917 :
918 : static gdk_return
919 0 : la_apply(old_logger *lg, logaction *c)
920 : {
921 0 : gdk_return ret = GDK_FAIL;
922 :
923 0 : switch (c->type) {
924 0 : case LOG_INSERT:
925 : case LOG_UPDATE:
926 0 : ret = la_bat_updates(lg, c);
927 0 : break;
928 0 : case LOG_CREATE:
929 0 : ret = la_bat_create(lg, c);
930 0 : break;
931 0 : case LOG_USE:
932 0 : ret = la_bat_use(lg, c);
933 0 : break;
934 0 : case LOG_DESTROY:
935 0 : ret = la_bat_destroy(lg, c);
936 0 : break;
937 0 : case LOG_CLEAR:
938 0 : ret = la_bat_clear(lg, c);
939 0 : break;
940 : default:
941 0 : MT_UNREACHABLE();
942 : }
943 0 : lg->changes += (ret == GDK_SUCCEED);
944 0 : return ret;
945 : }
946 :
947 : static void
948 0 : la_destroy(logaction *c)
949 : {
950 0 : if (c->name)
951 0 : GDKfree(c->name);
952 0 : if (c->b)
953 0 : logbat_destroy(c->b);
954 0 : }
955 :
956 : static gdk_return
957 0 : tr_grow(trans *tr)
958 : {
959 0 : if (tr->nr == tr->sz) {
960 0 : logaction *changes;
961 0 : tr->sz <<= 1;
962 0 : changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
963 0 : if (changes == NULL)
964 : return GDK_FAIL;
965 0 : tr->changes = changes;
966 : }
967 : /* cleanup the next */
968 0 : tr->changes[tr->nr].name = NULL;
969 0 : tr->changes[tr->nr].b = NULL;
970 0 : return GDK_SUCCEED;
971 : }
972 :
973 : static trans *
974 0 : tr_destroy(trans *tr)
975 : {
976 0 : trans *r = tr->tr;
977 :
978 0 : GDKfree(tr->changes);
979 0 : GDKfree(tr);
980 0 : return r;
981 : }
982 :
983 : static trans *
984 0 : tr_abort(old_logger *lg, trans *tr)
985 : {
986 0 : int i;
987 :
988 0 : if (lg->lg->debug & 1)
989 0 : fprintf(stderr, "#tr_abort\n");
990 :
991 0 : for (i = 0; i < tr->nr; i++)
992 0 : la_destroy(&tr->changes[i]);
993 0 : return tr_destroy(tr);
994 : }
995 :
996 : static trans *
997 0 : tr_commit(old_logger *lg, trans *tr)
998 : {
999 0 : int i;
1000 :
1001 0 : if (lg->lg->debug & 1)
1002 0 : fprintf(stderr, "#tr_commit\n");
1003 :
1004 0 : for (i = 0; i < tr->nr; i++) {
1005 0 : if (la_apply(lg, &tr->changes[i]) != GDK_SUCCEED) {
1006 0 : do {
1007 0 : tr = tr_abort(lg, tr);
1008 0 : } while (tr != NULL);
1009 : return (trans *) -1;
1010 : }
1011 0 : la_destroy(&tr->changes[i]);
1012 : }
1013 0 : return tr_destroy(tr);
1014 : }
1015 :
1016 : static inline void
1017 0 : logger_close(old_logger *lg)
1018 : {
1019 0 : if (!LOG_DISABLED(lg))
1020 0 : close_stream(lg->log);
1021 0 : lg->log = NULL;
1022 0 : }
1023 :
1024 : static gdk_return
1025 0 : logger_readlog(old_logger *lg, char *filename, bool *filemissing)
1026 : {
1027 0 : trans *tr = NULL;
1028 0 : logformat l;
1029 0 : log_return err = LOG_OK;
1030 0 : time_t t0, t1;
1031 0 : struct stat sb;
1032 0 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
1033 0 : int fd;
1034 :
1035 0 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
1036 :
1037 0 : if (lg->lg->debug & 1) {
1038 0 : fprintf(stderr, "#logger_readlog opening %s\n", filename);
1039 : }
1040 :
1041 0 : lg->log = open_rstream(filename);
1042 :
1043 : /* if the file doesn't exist, there is nothing to be read back */
1044 0 : if (lg->log == NULL || mnstr_errnr(lg->log) != MNSTR_NO__ERROR) {
1045 0 : logger_close(lg);
1046 0 : ATOMIC_SET(&GDKdebug, dbg);
1047 0 : *filemissing = true;
1048 0 : return GDK_SUCCEED;
1049 : }
1050 0 : short byteorder;
1051 0 : switch (mnstr_read(lg->log, &byteorder, sizeof(byteorder), 1)) {
1052 0 : case -1:
1053 0 : logger_close(lg);
1054 0 : ATOMIC_SET(&GDKdebug, dbg);
1055 0 : return GDK_FAIL;
1056 0 : case 0:
1057 : /* empty file is ok */
1058 0 : logger_close(lg);
1059 0 : ATOMIC_SET(&GDKdebug, dbg);
1060 0 : return GDK_SUCCEED;
1061 0 : case 1:
1062 : /* if not empty, must start with correct byte order mark */
1063 0 : if (byteorder != 1234) {
1064 0 : TRC_CRITICAL(GDK, "incorrect byte order word in file %s\n", filename);
1065 0 : logger_close(lg);
1066 0 : ATOMIC_SET(&GDKdebug, dbg);
1067 0 : return GDK_FAIL;
1068 : }
1069 : break;
1070 : }
1071 0 : if ((fd = getFileNo(lg->log)) < 0 || fstat(fd, &sb) < 0) {
1072 0 : TRC_CRITICAL(GDK, "fstat on opened file %s failed\n", filename);
1073 0 : logger_close(lg);
1074 0 : ATOMIC_SET(&GDKdebug, dbg);
1075 : /* If the file could be opened, but fstat fails,
1076 : * something weird is going on */
1077 0 : return GDK_FAIL;
1078 : }
1079 0 : t0 = time(NULL);
1080 0 : if (lg->lg->debug & 1) {
1081 0 : printf("# Start reading the write-ahead log '%s'\n", filename);
1082 0 : fflush(stdout);
1083 : }
1084 0 : while (err == LOG_OK && log_read_format(lg, &l)) {
1085 0 : char *name = NULL;
1086 0 : char tpe;
1087 0 : oid id;
1088 :
1089 0 : if (l.flag == 0) {
1090 : /* end of useful content */
1091 0 : assert(l.tid == 0);
1092 0 : assert(l.nr == 0);
1093 0 : break;
1094 : }
1095 0 : t1 = time(NULL);
1096 0 : if (t1 - t0 > 10) {
1097 0 : lng fpos;
1098 0 : t0 = t1;
1099 : /* not more than once every 10 seconds */
1100 0 : fpos = (lng) getfilepos(getFile(lg->log));
1101 0 : if (fpos >= 0) {
1102 0 : printf("# still reading write-ahead log \"%s\" (%d%% done)\n", filename, (int) ((fpos * 100 + 50) / sb.st_size));
1103 0 : fflush(stdout);
1104 : }
1105 : }
1106 0 : if ((l.flag >= LOG_INSERT && l.flag <= LOG_CLEAR) || l.flag == LOG_CREATE_ID || l.flag == LOG_USE_ID) {
1107 0 : name = log_read_string(lg);
1108 :
1109 0 : if (name == NULL) {
1110 : err = LOG_EOF;
1111 : break;
1112 : }
1113 0 : if (name == (char *) -1) {
1114 : err = LOG_ERR;
1115 : break;
1116 : }
1117 : }
1118 0 : if (lg->lg->debug & 1) {
1119 0 : fprintf(stderr, "#logger_readlog: ");
1120 0 : if (l.flag > 0 &&
1121 : l.flag < (char) (sizeof(log_commands) / sizeof(log_commands[0])))
1122 0 : fprintf(stderr, "%s", log_commands[(int) l.flag]);
1123 : else
1124 0 : fprintf(stderr, "%d", l.flag);
1125 0 : fprintf(stderr, " %d " LLFMT, l.tid, l.nr);
1126 0 : if (name)
1127 0 : fprintf(stderr, " %s", name);
1128 0 : fprintf(stderr, "\n");
1129 : }
1130 : /* find proper transaction record */
1131 0 : if (l.flag != LOG_START)
1132 0 : tr = tr_find(tr, l.tid);
1133 : /* the functions we call here can succeed (LOG_OK),
1134 : * but they can also fail for two different reasons:
1135 : * they can run out of input (LOG_EOF -- this is not
1136 : * serious, we just abort the remaining transactions),
1137 : * or some malloc or BAT update fails (LOG_ERR -- this
1138 : * is serious, we must abort the complete process);
1139 : * the latter failure causes the current function to
1140 : * return GDK_FAIL */
1141 0 : switch (l.flag) {
1142 0 : case LOG_START:
1143 0 : assert(l.nr <= (lng) INT_MAX);
1144 0 : if (l.nr > lg->tid)
1145 0 : lg->tid = (int)l.nr;
1146 0 : if ((tr = tr_create(tr, (int)l.nr)) == NULL) {
1147 : err = LOG_ERR;
1148 : break;
1149 : }
1150 0 : if (lg->lg->debug & 1)
1151 0 : fprintf(stderr, "#logger tstart %d\n", tr->tid);
1152 : break;
1153 0 : case LOG_END:
1154 0 : if (tr == NULL)
1155 : err = LOG_EOF;
1156 0 : else if (l.tid != l.nr) /* abort record */
1157 0 : tr = tr_abort(lg, tr);
1158 : else
1159 0 : tr = tr_commit(lg, tr);
1160 : break;
1161 0 : case LOG_SEQ:
1162 0 : err = log_read_seq(lg, &l);
1163 0 : break;
1164 0 : case LOG_INSERT:
1165 : case LOG_UPDATE:
1166 0 : if (name == NULL || tr == NULL)
1167 : err = LOG_EOF;
1168 : else
1169 0 : err = log_read_updates(lg, tr, &l, name, 0, 0, 0);
1170 : break;
1171 0 : case LOG_INSERT_ID:
1172 : case LOG_UPDATE_ID:
1173 : case LOG_UPDATE_PAX: {
1174 0 : int pax = (l.flag == LOG_UPDATE_PAX);
1175 0 : l.flag = (l.flag == LOG_INSERT_ID)?LOG_INSERT:LOG_UPDATE;
1176 0 : if (log_read_id(lg, &tpe, &id) != LOG_OK)
1177 : err = LOG_ERR;
1178 : else
1179 0 : err = log_read_updates(lg, tr, &l, name, tpe, id, pax);
1180 : break;
1181 : }
1182 0 : case LOG_CREATE:
1183 0 : if (name == NULL || tr == NULL)
1184 : err = LOG_EOF;
1185 : else
1186 0 : err = log_read_create(lg, tr, name, 0, 0);
1187 : break;
1188 0 : case LOG_CREATE_ID:
1189 0 : l.flag = LOG_CREATE;
1190 0 : if (tr == NULL || log_read_id(lg, &tpe, &id) != LOG_OK)
1191 : err = LOG_EOF;
1192 : else
1193 0 : err = log_read_create(lg, tr, name, tpe, id);
1194 : break;
1195 0 : case LOG_USE:
1196 0 : if (name == NULL || tr == NULL)
1197 : err = LOG_EOF;
1198 : else
1199 0 : err = log_read_use(lg, tr, &l, name, 0, 0);
1200 : break;
1201 0 : case LOG_USE_ID:
1202 0 : l.flag = LOG_USE;
1203 0 : if (tr == NULL || log_read_id(lg, &tpe, &id) != LOG_OK)
1204 : err = LOG_EOF;
1205 : else
1206 0 : err = log_read_use(lg, tr, &l, name, tpe, id);
1207 : break;
1208 0 : case LOG_DESTROY:
1209 0 : if (name == NULL || tr == NULL)
1210 : err = LOG_EOF;
1211 : else
1212 0 : err = log_read_destroy(lg, tr, name, 0, 0);
1213 : break;
1214 0 : case LOG_DESTROY_ID:
1215 0 : l.flag = LOG_DESTROY;
1216 0 : if (tr == NULL || log_read_id(lg, &tpe, &id) != LOG_OK)
1217 : err = LOG_EOF;
1218 : else
1219 0 : err = log_read_destroy(lg, tr, name, tpe, id);
1220 : break;
1221 0 : case LOG_CLEAR:
1222 0 : if (name == NULL || tr == NULL)
1223 : err = LOG_EOF;
1224 : else
1225 0 : err = log_read_clear(lg, tr, name, 0, 0);
1226 : break;
1227 0 : case LOG_CLEAR_ID:
1228 0 : l.flag = LOG_CLEAR;
1229 0 : if (tr == NULL || log_read_id(lg, &tpe, &id) != LOG_OK)
1230 : err = LOG_EOF;
1231 : else
1232 0 : err = log_read_clear(lg, tr, name, tpe, id);
1233 : break;
1234 : default:
1235 : err = LOG_ERR;
1236 : }
1237 0 : if (name)
1238 0 : GDKfree(name);
1239 0 : if (tr == (trans *) -1) {
1240 : err = LOG_ERR;
1241 : tr = NULL;
1242 : break;
1243 : }
1244 : }
1245 0 : logger_close(lg);
1246 :
1247 : /* remaining transactions are not committed, ie abort */
1248 0 : while (tr)
1249 0 : tr = tr_abort(lg, tr);
1250 0 : if (lg->lg->debug & 1) {
1251 0 : printf("# Finished reading the write-ahead log '%s'\n", filename);
1252 0 : fflush(stdout);
1253 : }
1254 0 : ATOMIC_SET(&GDKdebug, dbg);
1255 : /* we cannot distinguish errors from incomplete transactions
1256 : * (even if we would log aborts in the logs). So we simply
1257 : * abort and move to the next log file */
1258 0 : return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
1259 : }
1260 :
1261 : /*
1262 : * The log files are incrementally numbered, starting from 2. They are
1263 : * processed in the same sequence.
1264 : */
1265 : static gdk_return
1266 0 : logger_readlogs(old_logger *lg, FILE *fp, char *filename)
1267 : {
1268 0 : gdk_return res = GDK_SUCCEED;
1269 0 : char id[BUFSIZ];
1270 :
1271 0 : if (lg->lg->debug & 1) {
1272 0 : fprintf(stderr, "#logger_readlogs logger id is " LLFMT "\n", lg->id);
1273 : }
1274 :
1275 0 : if (fgets(id, sizeof(id), fp) != NULL) {
1276 0 : char log_filename[FILENAME_MAX];
1277 0 : lng lid = strtoll(id, NULL, 10);
1278 :
1279 0 : if (lg->lg->debug & 1) {
1280 0 : fprintf(stderr, "#logger_readlogs last logger id written in %s is " LLFMT "\n", filename, lid);
1281 : }
1282 :
1283 0 : if (lid >= lg->id) {
1284 0 : bool filemissing = false;
1285 :
1286 0 : lg->id = lid;
1287 0 : while (res == GDK_SUCCEED && !filemissing) {
1288 0 : if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
1289 0 : GDKerror("Logger filename path is too large\n");
1290 0 : return GDK_FAIL;
1291 : }
1292 0 : res = logger_readlog(lg, log_filename, &filemissing);
1293 0 : if (!filemissing)
1294 0 : lg->id++;
1295 : }
1296 : } else {
1297 0 : bool filemissing = false;
1298 0 : while (lid >= lg->id && res == GDK_SUCCEED) {
1299 : if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
1300 : GDKerror("Logger filename path is too large\n");
1301 : return GDK_FAIL;
1302 : }
1303 : res = logger_readlog(lg, log_filename, &filemissing);
1304 : /* Increment the id only at the end,
1305 : * since we want to re-read the last
1306 : * file. That is because last time we
1307 : * read it, it was empty, since the
1308 : * logger creates empty files and
1309 : * fills them in later. */
1310 : lg->id++;
1311 : }
1312 0 : if (lid < lg->id) {
1313 0 : lg->id = lid;
1314 : }
1315 : }
1316 : }
1317 : return res;
1318 : }
1319 :
1320 : static gdk_return
1321 0 : check_version(old_logger *lg, FILE *fp, int version)
1322 : {
1323 : /* if these were equal we wouldn't have gotten here */
1324 0 : assert(version != lg->lg->version);
1325 :
1326 0 : if (lg->lg->prefuncp == NULL ||
1327 0 : (*lg->lg->prefuncp)(lg->lg->funcdata, version, lg->lg->version) != GDK_SUCCEED) {
1328 0 : GDKerror("Incompatible database version %06d, "
1329 : "this server supports version %06d.\n%s",
1330 : version, lg->lg->version,
1331 : version < lg->lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
1332 0 : return GDK_FAIL;
1333 : }
1334 :
1335 0 : if (fgetc(fp) != '\n' || /* skip \n */
1336 0 : fgetc(fp) != '\n') { /* skip \n */
1337 0 : GDKerror("Badly formatted log file");
1338 0 : return GDK_FAIL;
1339 : }
1340 : return GDK_SUCCEED;
1341 : }
1342 :
1343 : /* Load data from the logger logdir
1344 : * Initialize new directories and catalog files if none are present,
1345 : * unless running in read-only mode
1346 : * Load data and persist it in the BATs */
1347 : static gdk_return
1348 0 : logger_load(const char *fn, char filename[FILENAME_MAX], old_logger *lg, FILE *fp, int version)
1349 : {
1350 0 : size_t len;
1351 0 : char bak[FILENAME_MAX];
1352 0 : str filenamestr = NULL;
1353 0 : log_bid snapshots_bid = 0;
1354 0 : bat catalog_bid, catalog_nme, catalog_tpe, catalog_oid, dcatalog;
1355 0 : ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
1356 :
1357 0 : assert(!LOG_DISABLED(lg));
1358 :
1359 0 : if ((filenamestr = GDKfilepath(0, lg->lg->dir, LOGFILE, NULL)) == NULL)
1360 0 : goto error;
1361 0 : len = strcpy_len(filename, filenamestr, FILENAME_MAX);
1362 0 : GDKfree(filenamestr);
1363 0 : if (len >= FILENAME_MAX) {
1364 0 : GDKerror("Logger filename path is too large\n");
1365 0 : goto error;
1366 : }
1367 :
1368 0 : strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
1369 0 : catalog_bid = BBPindex(bak);
1370 :
1371 0 : assert(catalog_bid != 0); /* has been checked by new logger */
1372 :
1373 : /* find the persistent catalog. As non persistent bats
1374 : * require a logical reference we also add a logical
1375 : * reference for the persistent bats */
1376 0 : size_t i;
1377 0 : BUN p, q;
1378 0 : BAT *b, *n, *t, *o, *d;
1379 :
1380 0 : b = BATdescriptor(catalog_bid);
1381 0 : if (b == NULL) {
1382 0 : GDKerror("inconsistent database, catalog does not exist");
1383 0 : goto error;
1384 : }
1385 :
1386 0 : strconcat_len(bak, sizeof(bak), fn, "_catalog_nme", NULL);
1387 0 : catalog_nme = BBPindex(bak);
1388 0 : n = BATdescriptor(catalog_nme);
1389 0 : if (n == NULL) {
1390 0 : BBPunfix(b->batCacheid);
1391 0 : GDKerror("inconsistent database, catalog_nme does not exist");
1392 0 : goto error;
1393 : }
1394 :
1395 0 : strconcat_len(bak, sizeof(bak), fn, "_catalog_tpe", NULL);
1396 0 : catalog_tpe = BBPindex(bak);
1397 0 : t = BATdescriptor(catalog_tpe);
1398 0 : if (t == NULL) {
1399 0 : t = logbat_new(TYPE_bte, BATSIZE, SYSTRANS);
1400 0 : if (t == NULL
1401 0 : ||BBPrename(t, bak) < 0) {
1402 0 : BBPunfix(b->batCacheid);
1403 0 : BBPunfix(n->batCacheid);
1404 0 : BBPreclaim(t);
1405 0 : GDKerror("inconsistent database, catalog_tpe does not exist");
1406 0 : goto error;
1407 : }
1408 0 : for(i=0;i<BATcount(n); i++) {
1409 0 : char zero = 0;
1410 0 : if (BUNappend(t, &zero, false) != GDK_SUCCEED)
1411 0 : goto error;
1412 : }
1413 0 : lg->with_ids = false;
1414 : } else {
1415 0 : if (BUNappend(lg->del, &t->batCacheid, false) != GDK_SUCCEED)
1416 0 : goto error;
1417 0 : BBPretain(t->batCacheid);
1418 : }
1419 :
1420 0 : strconcat_len(bak, sizeof(bak), fn, "_catalog_oid", NULL);
1421 0 : catalog_oid = BBPindex(bak);
1422 0 : o = BATdescriptor(catalog_oid);
1423 0 : if (o == NULL) {
1424 0 : o = logbat_new(TYPE_lng, BATSIZE, SYSTRANS);
1425 0 : if (o == NULL
1426 0 : || BBPrename(o, bak) < 0) {
1427 0 : BBPunfix(b->batCacheid);
1428 0 : BBPunfix(n->batCacheid);
1429 0 : BBPunfix(t->batCacheid);
1430 0 : BBPreclaim(o);
1431 0 : GDKerror("inconsistent database, catalog_oid does not exist");
1432 0 : goto error;
1433 : }
1434 0 : for(i=0;i<BATcount(n); i++) {
1435 0 : lng zero = 0;
1436 0 : if (BUNappend(o, &zero, false) != GDK_SUCCEED)
1437 0 : goto error;
1438 : }
1439 0 : lg->with_ids = false;
1440 : } else {
1441 0 : if (BUNappend(lg->del, &o->batCacheid, false) != GDK_SUCCEED)
1442 0 : goto error;
1443 0 : BBPretain(o->batCacheid);
1444 : }
1445 :
1446 0 : strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
1447 0 : dcatalog = BBPindex(bak);
1448 0 : d = BATdescriptor(dcatalog);
1449 0 : if (d == NULL) {
1450 : /* older database: create dcatalog and convert
1451 : * catalog_bid and catalog_nme to
1452 : * dense-headed */
1453 0 : d = logbat_new(TYPE_oid, BATSIZE, SYSTRANS);
1454 0 : if (d == NULL) {
1455 0 : GDKerror("Logger_new: cannot create dcatalog bat");
1456 0 : BBPunfix(b->batCacheid);
1457 0 : BBPunfix(n->batCacheid);
1458 0 : BBPunfix(t->batCacheid);
1459 0 : BBPunfix(o->batCacheid);
1460 0 : goto error;
1461 : }
1462 0 : if (BBPrename(d, bak) < 0) {
1463 0 : BBPunfix(b->batCacheid);
1464 0 : BBPunfix(n->batCacheid);
1465 0 : BBPunfix(t->batCacheid);
1466 0 : BBPunfix(o->batCacheid);
1467 0 : goto error;
1468 : }
1469 : } else {
1470 0 : if (BUNappend(lg->del, &d->batCacheid, false) != GDK_SUCCEED)
1471 0 : goto error;
1472 0 : BBPretain(d->batCacheid);
1473 : }
1474 :
1475 0 : if ((lg->catalog_bid = BATsetaccess(b, BAT_READ)) == NULL ||
1476 0 : (lg->catalog_nme = BATsetaccess(n, BAT_READ)) == NULL ||
1477 0 : (lg->catalog_tpe = BATsetaccess(t, BAT_READ)) == NULL ||
1478 0 : (lg->catalog_oid = BATsetaccess(o, BAT_READ)) == NULL ||
1479 0 : (lg->dcatalog = BATsetaccess(d, BAT_READ)) == NULL)
1480 0 : goto error;
1481 0 : if (BUNappend(lg->del, &b->batCacheid, false) != GDK_SUCCEED)
1482 0 : goto error;
1483 0 : BBPretain(b->batCacheid);
1484 0 : if (BUNappend(lg->del, &n->batCacheid, false) != GDK_SUCCEED)
1485 0 : goto error;
1486 0 : BBPretain(n->batCacheid);
1487 :
1488 0 : const log_bid *bids;
1489 0 : bids = (const log_bid *) Tloc(b, 0);
1490 0 : BATloop(b, p, q) {
1491 0 : bat bid = bids[p];
1492 0 : oid pos = p;
1493 :
1494 0 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
1495 0 : BBPretain(bid) == 0 &&
1496 0 : BUNappend(lg->dcatalog, &pos, true) != GDK_SUCCEED)
1497 0 : goto error;
1498 : }
1499 :
1500 0 : lg->freed = logbat_new(TYPE_int, 1, SYSTRANS);
1501 0 : if (lg->freed == NULL) {
1502 0 : GDKerror("Logger_new: failed to create freed bat");
1503 0 : goto error;
1504 : }
1505 0 : snapshots_bid = old_logger_find_bat(lg, "snapshots_bid", 0, 0);
1506 0 : if (snapshots_bid < 0)
1507 0 : goto error;
1508 0 : if (snapshots_bid == 0) {
1509 0 : lg->snapshots_bid = logbat_new(TYPE_int, 1, SYSTRANS);
1510 0 : lg->snapshots_tid = logbat_new(TYPE_int, 1, SYSTRANS);
1511 0 : lg->dsnapshots = logbat_new(TYPE_oid, 1, SYSTRANS);
1512 0 : if (lg->snapshots_bid == NULL ||
1513 0 : lg->snapshots_tid == NULL ||
1514 : lg->dsnapshots == NULL) {
1515 0 : GDKerror("Logger_new: failed to create snapshots bats");
1516 0 : goto error;
1517 : }
1518 : } else {
1519 0 : bat snapshots_tid = old_logger_find_bat(lg, "snapshots_tid", 0, 0);
1520 0 : bat dsnapshots = old_logger_find_bat(lg, "dsnapshots", 0, 0);
1521 :
1522 0 : if (snapshots_tid < 0 || dsnapshots < 0)
1523 0 : goto error;
1524 0 : ATOMIC_AND(&GDKdebug, ~CHECKMASK);
1525 0 : lg->snapshots_bid = BATdescriptor(snapshots_bid);
1526 0 : if (lg->snapshots_bid == NULL) {
1527 0 : GDKerror("inconsistent database, snapshots_bid does not exist");
1528 0 : goto error;
1529 : }
1530 0 : lg->snapshots_tid = BATdescriptor(snapshots_tid);
1531 0 : if (lg->snapshots_tid == NULL) {
1532 0 : GDKerror("inconsistent database, snapshots_tid does not exist");
1533 0 : goto error;
1534 : }
1535 0 : ATOMIC_SET(&GDKdebug, dbg);
1536 :
1537 0 : if (dsnapshots) {
1538 0 : lg->dsnapshots = BATdescriptor(dsnapshots);
1539 0 : if (lg->dsnapshots == NULL) {
1540 0 : GDKerror("Logger_new: inconsistent database, snapshots_tid does not exist");
1541 0 : goto error;
1542 : }
1543 0 : if (BUNappend(lg->del, &dsnapshots, false) != GDK_SUCCEED)
1544 0 : goto error;
1545 0 : BBPretain(dsnapshots);
1546 : } else {
1547 0 : lg->dsnapshots = logbat_new(TYPE_oid, 1, SYSTRANS);
1548 0 : if (lg->dsnapshots == NULL) {
1549 0 : GDKerror("Logger_new: cannot create dsnapshots bat");
1550 0 : goto error;
1551 : }
1552 : }
1553 0 : if (BUNappend(lg->del, &snapshots_bid, false) != GDK_SUCCEED)
1554 0 : goto error;
1555 0 : BBPretain(snapshots_bid);
1556 0 : if (BUNappend(lg->del, &snapshots_tid, false) != GDK_SUCCEED)
1557 0 : goto error;
1558 0 : BBPretain(snapshots_tid);
1559 : }
1560 0 : if ((lg->snapshots_bid = BATsetaccess(lg->snapshots_bid, BAT_READ)) == NULL ||
1561 0 : (lg->snapshots_tid = BATsetaccess(lg->snapshots_tid, BAT_READ)) == NULL ||
1562 0 : (lg->dsnapshots = BATsetaccess(lg->dsnapshots, BAT_READ)) == NULL) {
1563 0 : goto error;
1564 : }
1565 0 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
1566 0 : if (BBPindex(bak)) {
1567 0 : lg->seqs_id = BATdescriptor(BBPindex(bak));
1568 0 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
1569 0 : lg->seqs_val = BATdescriptor(BBPindex(bak));
1570 0 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
1571 0 : lg->dseqs = BATdescriptor(BBPindex(bak));
1572 : } else {
1573 0 : lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
1574 0 : lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
1575 0 : lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
1576 0 : if (lg->seqs_id == NULL ||
1577 0 : lg->seqs_val == NULL ||
1578 : lg->dseqs == NULL) {
1579 0 : GDKerror("Logger_new: cannot create seqs bats");
1580 0 : goto error;
1581 : }
1582 :
1583 0 : strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
1584 0 : if (BBPrename(lg->seqs_id, bak) < 0) {
1585 0 : goto error;
1586 : }
1587 :
1588 0 : strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
1589 0 : if (BBPrename(lg->seqs_val, bak) < 0) {
1590 0 : goto error;
1591 : }
1592 :
1593 0 : strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
1594 0 : if (BBPrename(lg->dseqs, bak) < 0) {
1595 0 : goto error;
1596 : }
1597 0 : if (BUNappend(lg->add, &lg->seqs_id->batCacheid, false) != GDK_SUCCEED)
1598 0 : goto error;
1599 0 : BBPretain(lg->seqs_id->batCacheid);
1600 0 : if (BUNappend(lg->add, &lg->seqs_val->batCacheid, false) != GDK_SUCCEED)
1601 0 : goto error;
1602 0 : BBPretain(lg->seqs_val->batCacheid);
1603 0 : if (BUNappend(lg->add, &lg->dseqs->batCacheid, false) != GDK_SUCCEED)
1604 0 : goto error;
1605 0 : BBPretain(lg->dseqs->batCacheid);
1606 : }
1607 0 : if ((lg->seqs_val = BATsetaccess(lg->seqs_val, BAT_READ)) == NULL ||
1608 0 : (lg->seqs_id = BATsetaccess(lg->seqs_id, BAT_READ)) == NULL ||
1609 0 : (lg->dseqs = BATsetaccess(lg->dseqs, BAT_READ)) == NULL) {
1610 0 : goto error;
1611 : }
1612 :
1613 0 : if (check_version(lg, fp, version) != GDK_SUCCEED) {
1614 0 : goto error;
1615 : }
1616 :
1617 0 : if (logger_readlogs(lg, fp, filename) != GDK_SUCCEED) {
1618 0 : goto error;
1619 : }
1620 0 : fclose(fp);
1621 0 : fp = NULL;
1622 :
1623 0 : if (lg->lg->postfuncp &&
1624 0 : (*lg->lg->postfuncp)(lg->lg->funcdata, lg) != GDK_SUCCEED)
1625 0 : goto error;
1626 0 : lg->lg->postfuncp = NULL; /* not again */
1627 :
1628 0 : if (BUNappend(lg->add, &lg->lg->catalog_bid->batCacheid, false) != GDK_SUCCEED)
1629 0 : goto error;
1630 0 : BBPretain(lg->lg->catalog_bid->batCacheid);
1631 0 : if (BUNappend(lg->add, &lg->lg->catalog_id->batCacheid, false) != GDK_SUCCEED)
1632 0 : goto error;
1633 0 : BBPretain(lg->lg->catalog_id->batCacheid);
1634 0 : if (BUNappend(lg->add, &lg->lg->dcatalog->batCacheid, false) != GDK_SUCCEED)
1635 0 : goto error;
1636 0 : BBPretain(lg->lg->dcatalog->batCacheid);
1637 :
1638 0 : return GDK_SUCCEED;
1639 0 : error:
1640 0 : if (fp)
1641 0 : fclose(fp);
1642 0 : logbat_destroy(lg->catalog_bid);
1643 0 : logbat_destroy(lg->catalog_nme);
1644 0 : logbat_destroy(lg->catalog_tpe);
1645 0 : logbat_destroy(lg->catalog_oid);
1646 0 : logbat_destroy(lg->dcatalog);
1647 0 : logbat_destroy(lg->snapshots_bid);
1648 0 : logbat_destroy(lg->snapshots_tid);
1649 0 : logbat_destroy(lg->dsnapshots);
1650 0 : logbat_destroy(lg->freed);
1651 0 : logbat_destroy(lg->seqs_id);
1652 0 : logbat_destroy(lg->seqs_val);
1653 0 : logbat_destroy(lg->dseqs);
1654 0 : bids = (const log_bid *) Tloc(lg->add, 0);
1655 0 : BATloop(lg->add, p, q) {
1656 0 : BBPrelease(bids[p]);
1657 : }
1658 0 : logbat_destroy(lg->add);
1659 0 : bids = (const log_bid *) Tloc(lg->del, 0);
1660 0 : BATloop(lg->del, p, q) {
1661 0 : BBPrelease(bids[p]);
1662 : }
1663 0 : logbat_destroy(lg->del);
1664 0 : GDKfree(lg);
1665 0 : return GDK_FAIL;
1666 : }
1667 :
1668 : /* Initialize a new logger
1669 : * It will load any data in the logdir and persist it in the BATs*/
1670 : static old_logger *
1671 0 : logger_new(logger *lg, const char *fn, const char *logdir, FILE *fp, int version, const char *logfile)
1672 : {
1673 0 : old_logger *old_lg;
1674 0 : char filename[FILENAME_MAX];
1675 :
1676 0 : assert(!GDKinmemory(0));
1677 0 : assert(lg != NULL);
1678 0 : if (GDKinmemory(0)) {
1679 0 : TRC_CRITICAL(GDK, "old logger can only be used with a disk-based database\n");
1680 0 : fclose(fp);
1681 0 : return NULL;
1682 : }
1683 0 : if (MT_path_absolute(logdir)) {
1684 0 : TRC_CRITICAL(GDK, "logdir must be relative path\n");
1685 0 : fclose(fp);
1686 0 : return NULL;
1687 : }
1688 :
1689 0 : old_lg = GDKmalloc(sizeof(struct old_logger));
1690 0 : if (old_lg == NULL) {
1691 0 : TRC_CRITICAL(GDK, "allocating logger structure failed\n");
1692 0 : fclose(fp);
1693 0 : return NULL;
1694 : }
1695 :
1696 0 : *old_lg = (struct old_logger) {
1697 : .lg = lg,
1698 : .filename = logfile,
1699 : .with_ids = true,
1700 : .id = 1,
1701 : };
1702 :
1703 0 : old_lg->add = COLnew(0, TYPE_int, 0, SYSTRANS);
1704 0 : old_lg->del = COLnew(0, TYPE_int, 0, SYSTRANS);
1705 0 : if (old_lg->add == NULL || old_lg->del == NULL) {
1706 0 : TRC_CRITICAL(GDK, "cannot allocate temporary bats\n");
1707 0 : goto bailout;
1708 : }
1709 :
1710 0 : if (snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP) >= FILENAME_MAX) {
1711 0 : TRC_CRITICAL(GDK, "filename is too large\n");
1712 0 : goto bailout;
1713 : }
1714 0 : if (old_lg->lg->debug & 1) {
1715 0 : fprintf(stderr, "#logger_new dir set to %s\n", old_lg->lg->dir);
1716 : }
1717 :
1718 0 : if (logger_load(fn, filename, old_lg, fp, version) == GDK_SUCCEED) {
1719 : return old_lg;
1720 : }
1721 : return NULL;
1722 :
1723 0 : bailout:
1724 0 : logbat_destroy(old_lg->add);
1725 0 : logbat_destroy(old_lg->del);
1726 0 : GDKfree(old_lg);
1727 0 : fclose(fp);
1728 0 : return NULL;
1729 : }
1730 :
1731 : static gdk_return
1732 0 : old_logger_destroy(old_logger *lg)
1733 : {
1734 0 : BUN p, q;
1735 0 : BAT *b = NULL;
1736 0 : const log_bid *bids;
1737 0 : gdk_return rc;
1738 :
1739 0 : bat *subcommit = GDKmalloc(sizeof(log_bid) * (BATcount(lg->add) + BATcount(lg->del) + 1));
1740 0 : if (subcommit == NULL) {
1741 0 : TRC_CRITICAL(GDK, "logger_destroy failed\n");
1742 0 : return GDK_FAIL;
1743 : }
1744 0 : int i = 0;
1745 0 : subcommit[i++] = 0;
1746 :
1747 0 : bids = (const log_bid *) Tloc(lg->add, 0);
1748 0 : BATloop(lg->add, p, q) {
1749 0 : b = BATdescriptor(bids[p]);
1750 0 : if (b) {
1751 0 : b = BATsetaccess(b, BAT_READ);
1752 0 : BATmode(b, false);
1753 0 : BBPunfix(bids[p]);
1754 : }
1755 0 : subcommit[i++] = bids[p];
1756 : }
1757 0 : bids = (const log_bid *) Tloc(lg->del, 0);
1758 0 : BATloop(lg->del, p, q) {
1759 0 : b = BATdescriptor(bids[p]);
1760 0 : if (b) {
1761 0 : BATmode(b, true);
1762 0 : BBPunfix(bids[p]);
1763 : }
1764 0 : subcommit[i++] = bids[p];
1765 : }
1766 : /* give the catalog bats names so we can find them
1767 : * next time */
1768 0 : char bak[IDLENGTH];
1769 0 : if (BBPrename(lg->catalog_bid, NULL) < 0 ||
1770 0 : BBPrename(lg->catalog_nme, NULL) < 0 ||
1771 0 : BBPrename(lg->catalog_tpe, NULL) < 0 ||
1772 0 : BBPrename(lg->catalog_oid, NULL) < 0 ||
1773 0 : BBPrename(lg->dcatalog, NULL) < 0 ||
1774 0 : BBPrename(lg->snapshots_bid, NULL) < 0 ||
1775 0 : BBPrename(lg->snapshots_tid, NULL) < 0 ||
1776 0 : BBPrename(lg->dsnapshots, NULL) < 0 ||
1777 0 : strconcat_len(bak, sizeof(bak), lg->lg->fn, "_catalog_bid", NULL) >= sizeof(bak) ||
1778 0 : BBPrename(lg->lg->catalog_bid, bak) < 0 ||
1779 0 : strconcat_len(bak, sizeof(bak), lg->lg->fn, "_catalog_id", NULL) >= sizeof(bak) ||
1780 0 : BBPrename(lg->lg->catalog_id, bak) < 0 ||
1781 0 : strconcat_len(bak, sizeof(bak), lg->lg->fn, "_dcatalog", NULL) >= sizeof(bak) ||
1782 0 : BBPrename(lg->lg->dcatalog, bak) < 0) {
1783 0 : GDKfree(subcommit);
1784 0 : return GDK_FAIL;
1785 : }
1786 0 : if ((rc = GDKmove(0, lg->lg->dir, LOGFILE, NULL, lg->lg->dir, LOGFILE, "bak", true)) != GDK_SUCCEED) {
1787 0 : TRC_CRITICAL(GDK, "logger_destroy failed\n");
1788 0 : GDKfree(subcommit);
1789 0 : return rc;
1790 : }
1791 0 : if ((rc = log_create_types_file(lg->lg, lg->filename, true)) != GDK_SUCCEED) {
1792 0 : TRC_CRITICAL(GDK, "logger_destroy failed\n");
1793 0 : GDKfree(subcommit);
1794 0 : return rc;
1795 : }
1796 0 : lg->lg->id = (ulng) lg->id;
1797 0 : lg->lg->saved_id = lg->lg->id;
1798 0 : rc = TMsubcommit_list(subcommit, NULL, i, lg->lg->saved_id, lg->lg->saved_tid);
1799 0 : GDKfree(subcommit);
1800 0 : if (rc != GDK_SUCCEED) {
1801 0 : TRC_CRITICAL(GDK, "logger_destroy failed\n");
1802 0 : return rc;
1803 : }
1804 0 : snprintf(bak, sizeof(bak), "bak-" LLFMT, lg->id);
1805 0 : if ((rc = GDKmove(0, lg->lg->dir, LOGFILE, "bak", lg->lg->dir, LOGFILE, bak, true)) != GDK_SUCCEED) {
1806 0 : TRC_CRITICAL(GDK, "logger_destroy failed\n");
1807 0 : return rc;
1808 : }
1809 :
1810 0 : if (logger_cleanup(lg) != GDK_SUCCEED)
1811 0 : TRC_CRITICAL(GDK, "logger_cleanup failed\n");
1812 :
1813 : /* free resources */
1814 0 : bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
1815 0 : BATloop(lg->catalog_bid, p, q) {
1816 0 : bat bid = bids[p];
1817 0 : oid pos = p;
1818 :
1819 0 : if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE)
1820 0 : BBPrelease(bid);
1821 : }
1822 0 : bids = (const log_bid *) Tloc(lg->add, 0);
1823 0 : BATloop(lg->add, p, q) {
1824 0 : BBPrelease(bids[p]);
1825 : }
1826 0 : logbat_destroy(lg->add);
1827 0 : bids = (const log_bid *) Tloc(lg->del, 0);
1828 0 : BATloop(lg->del, p, q) {
1829 0 : BBPrelease(bids[p]);
1830 : }
1831 0 : logbat_destroy(lg->del);
1832 :
1833 0 : logbat_destroy(lg->catalog_bid);
1834 0 : logbat_destroy(lg->catalog_nme);
1835 0 : logbat_destroy(lg->catalog_tpe);
1836 0 : logbat_destroy(lg->catalog_oid);
1837 0 : logbat_destroy(lg->dcatalog);
1838 0 : logbat_destroy(lg->freed);
1839 0 : logbat_destroy(lg->seqs_id);
1840 0 : logbat_destroy(lg->seqs_val);
1841 0 : logbat_destroy(lg->dseqs);
1842 0 : logbat_destroy(lg->snapshots_bid);
1843 0 : logbat_destroy(lg->snapshots_tid);
1844 0 : logbat_destroy(lg->dsnapshots);
1845 :
1846 0 : GDKfree(lg);
1847 0 : return GDK_SUCCEED;
1848 : }
1849 :
1850 : /* Create a new logger */
1851 : gdk_return
1852 0 : old_logger_load(logger *lg, const char *fn, const char *logdir, FILE *fp, int version, const char *filename)
1853 : {
1854 0 : old_logger *old_lg;
1855 0 : old_lg = logger_new(lg, fn, logdir, fp, version, filename);
1856 0 : if (old_lg == NULL)
1857 : return GDK_FAIL;
1858 0 : old_logger_destroy(old_lg);
1859 0 : return GDK_SUCCEED;
1860 : }
1861 :
1862 : /* Clean-up write-ahead log files already persisted in the BATs.
1863 : * Update the LOGFILE and delete all bak- files as well.
1864 : */
1865 : static gdk_return
1866 0 : logger_cleanup(old_logger *lg)
1867 : {
1868 0 : char buf[BUFSIZ];
1869 0 : FILE *fp = NULL;
1870 :
1871 0 : if (LOG_DISABLED(lg))
1872 : return GDK_SUCCEED;
1873 :
1874 0 : if (snprintf(buf, sizeof(buf), "%s%s.bak-" LLFMT, lg->lg->dir, LOGFILE, lg->id) >= (int) sizeof(buf)) {
1875 0 : TRC_CRITICAL(GDK, "filename is too large\n");
1876 0 : return GDK_FAIL;
1877 : }
1878 :
1879 0 : if (lg->lg->debug & 1) {
1880 0 : fprintf(stderr, "#logger_cleanup %s\n", buf);
1881 : }
1882 :
1883 0 : lng lid = lg->id;
1884 : // remove the last persisted WAL files as well to reduce the
1885 : // work for the logger_cleanup_old()
1886 0 : if ((fp = GDKfileopen(0, NULL, buf, NULL, "r")) == NULL) {
1887 0 : GDKsyserror("cannot open file %s\n", buf);
1888 0 : return GDK_FAIL;
1889 : }
1890 :
1891 0 : while (lid-- > 0) {
1892 0 : char log_id[FILENAME_MAX];
1893 :
1894 0 : if (snprintf(log_id, sizeof(log_id), LLFMT, lid) >= (int) sizeof(log_id)) {
1895 0 : TRC_CRITICAL(GDK, "log_id filename is too large\n");
1896 0 : fclose(fp);
1897 0 : return GDK_FAIL;
1898 : }
1899 0 : if (GDKunlink(0, lg->lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
1900 : /* not a disaster (yet?) if unlink fails */
1901 0 : TRC_ERROR(GDK, "failed to remove old WAL %s.%s\n", LOGFILE, buf);
1902 0 : GDKclrerr();
1903 : }
1904 : }
1905 0 : fclose(fp);
1906 :
1907 0 : if (snprintf(buf, sizeof(buf), "bak-" LLFMT, lg->id) >= (int) sizeof(buf)) {
1908 0 : TRC_CRITICAL(GDK, "filename is too large\n");
1909 0 : GDKclrerr();
1910 : }
1911 :
1912 0 : if (GDKunlink(0, lg->lg->dir, LOGFILE, buf) != GDK_SUCCEED) {
1913 : /* not a disaster (yet?) if unlink fails */
1914 0 : TRC_ERROR(GDK, "failed to remove old WAL %s.%s\n", LOGFILE, buf);
1915 0 : GDKclrerr();
1916 : }
1917 :
1918 : return GDK_SUCCEED;
1919 : }
1920 :
1921 : static gdk_return
1922 0 : logger_add_bat(old_logger *lg, BAT *b, const char *name, char tpe, oid id)
1923 : {
1924 0 : log_bid bid = old_logger_find_bat(lg, name, tpe, id);
1925 0 : lng lid = tpe ? (lng) id : 0;
1926 :
1927 0 : assert(b->batRestricted != BAT_WRITE);
1928 0 : assert(b->batRole == PERSISTENT);
1929 0 : if (bid < 0)
1930 : return GDK_FAIL;
1931 0 : if (bid) {
1932 0 : if (bid != b->batCacheid) {
1933 0 : if (logger_del_bat(lg, bid) != GDK_SUCCEED)
1934 : return GDK_FAIL;
1935 : } else {
1936 : return GDK_SUCCEED;
1937 : }
1938 : }
1939 0 : bid = b->batCacheid;
1940 0 : if (lg->lg->debug & 1)
1941 0 : fprintf(stderr, "#create %s\n", NAME(name, tpe, id));
1942 0 : assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
1943 0 : lg->changes += BATcount(b) + 1000;
1944 0 : if (BUNappend(lg->catalog_bid, &bid, true) != GDK_SUCCEED ||
1945 0 : BUNappend(lg->catalog_nme, name, true) != GDK_SUCCEED ||
1946 0 : BUNappend(lg->catalog_tpe, &tpe, true) != GDK_SUCCEED ||
1947 0 : BUNappend(lg->catalog_oid, &lid, true) != GDK_SUCCEED)
1948 0 : return GDK_FAIL;
1949 0 : BBPretain(bid);
1950 0 : return GDK_SUCCEED;
1951 : }
1952 :
1953 :
1954 : static gdk_return
1955 0 : logger_del_bat(old_logger *lg, log_bid bid)
1956 : {
1957 0 : BAT *b = BATdescriptor(bid);
1958 0 : BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid), q;
1959 0 : oid pos;
1960 :
1961 0 : assert(p != BUN_NONE);
1962 0 : if (p == BUN_NONE) {
1963 : logbat_destroy(b);
1964 : GDKerror("cannot find BAT\n");
1965 : return GDK_FAIL;
1966 : }
1967 :
1968 : /* if this is a not logger commited snapshot bat, make it
1969 : * transient */
1970 0 : if (p >= lg->catalog_bid->batInserted &&
1971 0 : (q = log_find(lg->snapshots_bid, lg->dsnapshots, bid)) != BUN_NONE) {
1972 0 : pos = (oid) q;
1973 0 : if (BUNappend(lg->dsnapshots, &pos, true) != GDK_SUCCEED) {
1974 0 : logbat_destroy(b);
1975 0 : return GDK_FAIL;
1976 : }
1977 0 : if (lg->lg->debug & 1)
1978 0 : fprintf(stderr,
1979 : "#logger_del_bat release snapshot %d (%d)\n",
1980 0 : bid, BBP_lrefs(bid));
1981 0 : if (BUNappend(lg->freed, &bid, false) != GDK_SUCCEED) {
1982 0 : logbat_destroy(b);
1983 0 : return GDK_FAIL;
1984 : }
1985 0 : } else if (p >= lg->catalog_bid->batInserted) {
1986 0 : BBPrelease(bid);
1987 : } else {
1988 0 : if (BUNappend(lg->freed, &bid, false) != GDK_SUCCEED) {
1989 0 : logbat_destroy(b);
1990 0 : return GDK_FAIL;
1991 : }
1992 : }
1993 0 : if (b) {
1994 0 : lg->changes += BATcount(b) + 1;
1995 0 : BBPunfix(b->batCacheid);
1996 : }
1997 0 : pos = (oid) p;
1998 0 : return BUNappend(lg->dcatalog, &pos, true);
1999 : /*assert(BBP_lrefs(bid) == 0);*/
2000 : }
|