Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "bat_storage.h"
15 : #include "bat_utils.h"
16 : #include "sql_string.h"
17 : #include "gdk_atoms.h"
18 : #include "gdk_atoms.h"
19 : #include "matomic.h"
20 :
21 : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
22 : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
23 :
24 : static int log_update_col( sql_trans *tr, sql_change *c);
25 : static int log_update_idx( sql_trans *tr, sql_change *c);
26 : static int log_update_del( sql_trans *tr, sql_change *c);
27 : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
28 : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
29 : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
30 : static int log_create_col(sql_trans *tr, sql_change *change);
31 : static int log_create_idx(sql_trans *tr, sql_change *change);
32 : static int log_create_del(sql_trans *tr, sql_change *change);
33 : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
34 : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
35 : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
36 : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
37 : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
38 : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
39 : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
40 : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
41 :
42 : static lng merge_delta( sql_delta *obat);
43 :
44 : /* valid
45 : * !deleted && VALID_4_READ(TS, tr) existing or newly created segment
46 : * deleted && TS > tr->ts && OLDTS < tr->ts deleted after current transaction
47 : */
48 :
49 : #define VALID_4_READ(TS,tr) \
50 : (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
51 :
52 : /* when changed, check if the old status is still valid */
53 : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
54 : (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
55 :
56 : #define SEG_VALID_4_DELETE(seg,tr) \
57 : (!seg->deleted && VALID_4_READ(seg->ts, tr))
58 :
59 : /* Delete (in current trans or by some other finished transaction, or re-used segment which used to be deleted */
60 : #define SEG_IS_DELETED(seg,tr) \
61 : ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
62 : (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
63 :
64 : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
65 : #define SEG_IS_VALID(seg, tr) \
66 : ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
67 : (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
68 :
69 : static inline BAT *
70 5113 : transfer_to_systrans(BAT *b)
71 : {
72 : /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
73 5113 : MT_lock_set(&b->theaplock);
74 5113 : if (VIEWtparent(b) || VIEWvtparent(b)) {
75 17 : MT_lock_unset(&b->theaplock);
76 17 : BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
77 17 : BBPreclaim(b);
78 17 : return bn;
79 : }
80 5096 : if (b->theap->farmid == TRANSIENT ||
81 14 : (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
82 4807 : QryCtx *qc = MT_thread_get_qry_ctx();
83 4807 : if (qc) {
84 2513 : if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
85 2513 : ATOMIC_SUB(&qc->datasize, b->theap->size);
86 2513 : b->theap->farmid = SYSTRANS;
87 2513 : b->batRole = SYSTRANS;
88 : }
89 2513 : if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
90 1092 : ATOMIC_SUB(&qc->datasize, b->tvheap->size);
91 1092 : b->tvheap->farmid = SYSTRANS;
92 : }
93 : }
94 : }
95 5096 : MT_lock_unset(&b->theaplock);
96 5096 : return b;
97 : }
98 :
99 : static void
100 25054390 : lock_table(sqlstore *store, sqlid id)
101 : {
102 25054390 : MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
103 25057478 : }
104 :
105 : static void
106 25057439 : unlock_table(sqlstore *store, sqlid id)
107 : {
108 25057439 : MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
109 25058334 : }
110 :
111 : static void
112 20048323 : lock_column(sqlstore *store, sqlid id)
113 : {
114 20048323 : MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
115 20047706 : }
116 :
117 : static void
118 20048296 : unlock_column(sqlstore *store, sqlid id)
119 : {
120 20048296 : MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
121 20048003 : }
122 :
123 : static void
124 113917 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
125 : {
126 113917 : assert(cleanup);
127 113917 : trans_add(tr, dup_base(b), data, cleanup, commit, log);
128 113916 : }
129 :
130 : static void
131 132714 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
132 : {
133 132714 : assert(cleanup);
134 132714 : dup_base(&t->base);
135 132711 : trans_add(tr, b, data, cleanup, commit, log);
136 132704 : }
137 :
138 : static int
139 78565 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
140 : {
141 78565 : segment *s = change->data;
142 :
143 78565 : if (s->ts <= oldest) {
144 33345 : while(s) {
145 21168 : segment *n = s->prev;
146 21168 : ATOMIC_PTR_DESTROY(&s->next);
147 21168 : _DELETE(s);
148 21168 : s = n;
149 : }
150 12177 : sqlstore *store = Store;
151 12177 : table_destroy(store, (sql_table*)change->obj);
152 12177 : return 1;
153 : }
154 : return LOG_OK;
155 : }
156 :
157 : static void
158 21168 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
159 : {
160 : /* we can only be accessed by anything older then commit_ts */
161 21168 : if (c->cleanup == &tc_gc_seg)
162 8991 : s->prev = c->data;
163 : else
164 12177 : c->cleanup = &tc_gc_seg;
165 21168 : c->data = s;
166 21168 : s->ts = commit_ts;
167 16745 : }
168 :
169 : static segment *
170 87467 : new_segment(segment *o, sql_trans *tr, size_t cnt)
171 : {
172 87467 : segment *n = (segment*)GDKmalloc(sizeof(segment));
173 :
174 87467 : assert(tr);
175 87467 : if (n) {
176 87467 : *n = (segment) {
177 87467 : .ts = tr->tid,
178 : .oldts = 0,
179 : .deleted = false,
180 : .start = 0,
181 : .end = cnt,
182 : .next = ATOMIC_PTR_VAR_INIT(NULL),
183 : .prev = NULL,
184 : };
185 87467 : if (o) {
186 36174 : n->start += o->end;
187 36174 : n->end += o->end;
188 36174 : ATOMIC_PTR_SET(&o->next, n);
189 : }
190 : }
191 87467 : return n;
192 : }
193 :
194 : static segment *
195 86269 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
196 : {
197 86269 : assert(tr);
198 86269 : if (o->start == start && o->end == start+cnt) {
199 10142 : assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
200 10142 : o->oldts = o->ts;
201 10142 : o->ts = tr->tid;
202 10142 : o->deleted = deleted;
203 10142 : return o;
204 : }
205 76127 : segment *n = (segment*)GDKmalloc(sizeof(segment));
206 :
207 76127 : if (!n)
208 : return NULL;
209 76127 : n->prev = NULL;
210 :
211 76127 : if (o->ts == tr->tid) {
212 3801 : n->oldts = 0;
213 3801 : n->ts = 1;
214 3801 : n->deleted = true;
215 : } else {
216 72326 : n->oldts = o->ts;
217 72326 : n->ts = tr->tid;
218 72326 : n->deleted = deleted;
219 : }
220 76127 : if (start == o->start) {
221 : /* 2-way split: o remains latter part of segment, new one is
222 : * inserted before */
223 60853 : n->start = o->start;
224 60853 : n->end = n->start + cnt;
225 60853 : ATOMIC_PTR_INIT(&n->next, o);
226 60853 : if (segs->h == o)
227 454 : segs->h = n;
228 60853 : if (p)
229 60399 : ATOMIC_PTR_SET(&p->next, n);
230 60853 : o->start = n->end;
231 15274 : } else if (start+cnt == o->end) {
232 : /* 2-way split: o remains first part of segment, new one is
233 : * added after */
234 5617 : n->start = o->end - cnt;
235 5617 : n->end = o->end;
236 5617 : ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
237 5617 : ATOMIC_PTR_SET(&o->next, n);
238 5617 : if (segs->t == o)
239 838 : segs->t = n;
240 5617 : o->end = n->start;
241 : } else {
242 : /* 3-way split: o remains first part of segment, two new ones
243 : * are added after */
244 9657 : segment *n2 = GDKmalloc(sizeof(segment));
245 9657 : if (n2 == NULL) {
246 0 : GDKfree(n);
247 0 : return NULL;
248 : }
249 9657 : ATOMIC_PTR_INIT(&n->next, n2);
250 9657 : n->start = start;
251 9657 : n->end = start + cnt;
252 9657 : *n2 = *o;
253 9657 : ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
254 9657 : n2->start = n->end;
255 9657 : n2->prev = NULL;
256 9657 : if (segs->t == o)
257 3994 : segs->t = n2;
258 9657 : ATOMIC_PTR_SET(&o->next, n);
259 9657 : o->end = start;
260 : }
261 : return n;
262 : }
263 :
264 : static void
265 4209 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
266 : {
267 4209 : segment *cur = segs->h, *seg = NULL;
268 18225 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
269 14016 : if (cur->ts == tr->tid) { /* revert */
270 4706 : cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
271 4706 : cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
272 4706 : cur->oldts = 0;
273 : }
274 14016 : if (cur->ts <= oldest) { /* possibly merge range */
275 13139 : if (!seg) { /* skip first */
276 : seg = cur;
277 8930 : } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
278 : /* merge with previous */
279 4423 : seg->end = cur->end;
280 4423 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
281 4423 : if (cur == segs->t)
282 2859 : segs->t = seg;
283 4423 : mark4destroy(cur, change, store_get_timestamp(tr->store));
284 4423 : cur = seg;
285 : } else {
286 : seg = cur; /* begin of new merge */
287 : }
288 : }
289 : }
290 4209 : }
291 :
292 : static size_t
293 103596 : segs_end_include_deleted( segments *segs, sql_trans *tr)
294 : {
295 103596 : size_t cnt = 0;
296 103596 : segment *s = segs->h, *l = NULL;
297 :
298 504063 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
299 400467 : if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
300 : l = s;
301 : }
302 103596 : if (l)
303 103589 : cnt = l->end;
304 103596 : return cnt;
305 : }
306 :
307 : static int
308 103596 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
309 : {
310 : /* set bits correctly */
311 103596 : BAT *b = temp_descriptor(cs->bid);
312 :
313 103596 : if (!b)
314 : return LOG_ERR;
315 103596 : segment *s = segs->h;
316 :
317 103596 : size_t nr = segs_end_include_deleted(segs, tr);
318 103596 : size_t rounded_nr = ((nr+31)&~31);
319 103596 : if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
320 0 : bat_destroy(b);
321 0 : return LOG_ERR;
322 : }
323 :
324 : /* disable all properties here */
325 103596 : MT_lock_set(&b->theaplock);
326 103596 : b->tsorted = false;
327 103596 : b->trevsorted = false;
328 103596 : b->tnosorted = 0;
329 103596 : b->tnorevsorted = 0;
330 103596 : b->tseqbase = oid_nil;
331 103596 : b->tkey = false;
332 103596 : b->tnokey[0] = 0;
333 103596 : b->tnokey[1] = 0;
334 103596 : b->theap->dirty = true;
335 103596 : BUN cnt = BATcount(b);
336 :
337 103596 : uint32_t *restrict dst;
338 433189 : for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
339 374740 : if (s->start >= nr)
340 : break;
341 329593 : if (s->ts == tr->tid && s->end != s->start) {
342 143984 : if (cnt < s->start) { /* first mark as deleted ! */
343 3839 : size_t lnr = s->start-cnt;
344 3839 : size_t pos = cnt;
345 3839 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
346 3839 : uint32_t cur = 0;
347 :
348 3839 : size_t used = pos&31, end = 32;
349 3839 : if (used) {
350 3720 : if (lnr < (32-used))
351 3496 : end = used + lnr;
352 3720 : assert(end > used);
353 3720 : cur |= ((1U << (end - used)) - 1) << used;
354 3720 : lnr -= end - used;
355 3720 : *dst++ |= cur;
356 3720 : cur = 0;
357 : }
358 3839 : size_t full = lnr/32;
359 3839 : size_t rest = lnr%32;
360 3839 : if (full > 0) {
361 7 : memset(dst, ~0, full * sizeof(*dst));
362 7 : dst += full;
363 7 : lnr -= full * 32;
364 : }
365 3839 : if (rest > 0) {
366 204 : cur |= (1U << rest) - 1;
367 204 : lnr -= rest;
368 204 : *dst |= cur;
369 : }
370 3839 : assert(lnr==0);
371 : }
372 143984 : size_t lnr = s->end-s->start;
373 143984 : size_t pos = s->start;
374 143984 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
375 143984 : uint32_t cur = 0;
376 143984 : size_t used = pos&31, end = 32;
377 143984 : if (used) {
378 106313 : if (lnr < (32-used))
379 101141 : end = used + lnr;
380 106313 : assert(end > used);
381 106313 : cur |= ((1U << (end - used)) - 1) << used;
382 106313 : lnr -= end - used;
383 106313 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
384 106313 : dst++;
385 106313 : cur = 0;
386 : }
387 143984 : size_t full = lnr/32;
388 143984 : size_t rest = lnr%32;
389 143984 : if (full > 0) {
390 3544 : memset(dst, s->deleted?~0:0, full * sizeof(*dst));
391 3544 : dst += full;
392 3544 : lnr -= full * 32;
393 : }
394 143984 : if (rest > 0) {
395 40354 : cur |= (1U << rest) - 1;
396 40354 : lnr -= rest;
397 40354 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
398 : }
399 143984 : assert(lnr==0);
400 143984 : if (cnt < s->end)
401 329593 : cnt = s->end;
402 : }
403 : }
404 103596 : if (nr > BATcount(b)) {
405 61003 : BATsetcount(b, nr);
406 : }
407 103596 : MT_lock_unset(&b->theaplock);
408 :
409 103596 : bat_destroy(b);
410 103596 : return LOG_OK;
411 : }
412 :
413 : /* TODO return LOG_OK/ERR */
414 : static void
415 103622 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
416 : {
417 103622 : sqlstore* store = tr->store;
418 103622 : segment *cur = s->segs->h, *seg = NULL;
419 504161 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
420 400539 : if (cur->ts == tr->tid) {
421 158169 : if (!cur->deleted)
422 91545 : cur->oldts = 0;
423 158169 : cur->ts = commit_ts;
424 : }
425 400539 : if (!seg) {
426 : /* first segment */
427 : seg = cur;
428 : }
429 296917 : else if (seg->ts < TRANSACTION_ID_BASE) {
430 : /* possible merge since both deleted flags are equal */
431 268076 : if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
432 199951 : int merge = 1;
433 199951 : node *n = store->active->h;
434 644823 : for (int i = 0; i < store->active->cnt; i++, n = n->next) {
435 548133 : sql_trans* other = ((sql_trans*)n->data);
436 548133 : ulng active = other->ts;
437 548133 : if(other->active == 2)
438 23579 : continue; /* pretend that another recently committed transaction is no longer active */
439 524554 : if (active == tr->ts)
440 133584 : continue; /* pretend that committing transaction has already committed and is no longer active */
441 390970 : if (seg->ts < active && cur->ts < active)
442 : break;
443 378036 : if (seg->ts > active && cur->ts > active)
444 287709 : continue;
445 :
446 90327 : assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
447 : /* cannot safely merge since there is an active transaction between the segments */
448 : merge = false;
449 : break;
450 : }
451 : /* merge segments */
452 219248 : if (merge) {
453 109624 : seg->end = cur->end;
454 109624 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
455 109624 : if (cur == s->segs->t)
456 26871 : s->segs->t = seg;
457 109624 : if (commit_ts == oldest) {
458 92879 : ATOMIC_PTR_DESTROY(&cur->next);
459 92879 : _DELETE(cur);
460 : } else
461 33490 : mark4destroy(cur, change, commit_ts);
462 109624 : cur = seg;
463 109624 : continue;
464 : }
465 : }
466 : }
467 : seg = cur;
468 : }
469 103622 : }
470 :
471 : static int
472 2167349 : segments_in_transaction(sql_trans *tr, sql_table *t)
473 : {
474 2167349 : storage *s = ATOMIC_PTR_GET(&t->data);
475 2167349 : segment *seg = s->segs->h;
476 :
477 2167349 : if (seg && s->segs->t->ts == tr->tid)
478 : return 1;
479 612206 : for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
480 507343 : if (seg->ts == tr->tid)
481 : return 1;
482 : }
483 : return 0;
484 : }
485 :
486 : static size_t
487 17292755 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
488 : {
489 17292755 : size_t cnt = 0;
490 :
491 : /* because a table can grow rows over the time a transaction is running, we need to find the last valid segment, to
492 : * keep all of the parts aligned */
493 17292755 : lock_table(tr->store, table->base.id);
494 17295294 : segment *s = segs->h, *l = NULL;
495 :
496 17295294 : if (segs->t && SEG_IS_VALID(segs->t, tr))
497 14917073 : l = s = segs->t;
498 :
499 210482966 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
500 193187799 : if (SEG_IS_VALID(s, tr))
501 : l = s;
502 : }
503 17295167 : if (l)
504 17276980 : cnt = l->end;
505 17295167 : unlock_table(tr->store, table->base.id);
506 17295575 : return cnt;
507 : }
508 :
509 : static segments *
510 51293 : new_segments(sql_trans *tr, size_t cnt)
511 : {
512 51293 : segments *n = (segments*)GDKmalloc(sizeof(segments));
513 :
514 51293 : if (n) {
515 51293 : n->nr_reused = 0;
516 51293 : n->h = n->t = new_segment(NULL, tr, cnt);
517 51293 : if (!n->h) {
518 0 : GDKfree(n);
519 0 : return NULL;
520 : }
521 51293 : sql_ref_init(&n->r);
522 : }
523 : return n;
524 : }
525 :
526 : static sql_delta *
527 30305203 : timestamp_delta( sql_trans *tr, sql_delta *d)
528 : {
529 30353009 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
530 47806 : d = d->next;
531 30305385 : return d;
532 : }
533 :
534 : static sql_delta *
535 30149729 : col_timestamp_delta( sql_trans *tr, sql_column *c)
536 : {
537 30149729 : return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
538 : }
539 :
540 : static sql_delta *
541 23609 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
542 : {
543 23609 : return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
544 : }
545 :
546 : static storage *
547 18336061 : timestamp_storage( sql_trans *tr, storage *d)
548 : {
549 18336061 : if (!d)
550 : return NULL;
551 18399797 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
552 63736 : d = d->next;
553 : return d;
554 : }
555 :
556 : static storage *
557 18310848 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
558 : {
559 18310848 : return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
560 : }
561 :
562 : static sql_delta*
563 19306 : delta_dup(sql_delta *d)
564 : {
565 19306 : ATOMIC_INC(&d->cs.refcnt);
566 19306 : return d;
567 : }
568 :
569 : static void *
570 17940 : col_dup(sql_column *c)
571 : {
572 17940 : return delta_dup(ATOMIC_PTR_GET(&c->data));
573 : }
574 :
575 : static void *
576 2653 : idx_dup(sql_idx *i)
577 : {
578 2653 : if (!ATOMIC_PTR_GET(&i->data))
579 : return NULL;
580 1366 : return delta_dup(ATOMIC_PTR_GET(&i->data));
581 : }
582 :
583 : static storage*
584 1539 : storage_dup(storage *d)
585 : {
586 1539 : ATOMIC_INC(&d->cs.refcnt);
587 1539 : return d;
588 : }
589 :
590 : static void *
591 1539 : del_dup(sql_table *t)
592 : {
593 1539 : return storage_dup(ATOMIC_PTR_GET(&t->data));
594 : }
595 :
596 : static size_t
597 17 : count_inserts( segment *s, sql_trans *tr)
598 : {
599 17 : size_t cnt = 0;
600 :
601 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
602 55 : if (!s->deleted && s->ts == tr->tid)
603 4 : cnt += s->end - s->start;
604 : }
605 17 : return cnt;
606 : }
607 :
608 : static size_t
609 851469 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
610 : {
611 851469 : size_t cnt = 0;
612 :
613 998295 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
614 : ;
615 :
616 5205603 : for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
617 4354133 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
618 1437788 : cnt += s->end - s->start;
619 : }
620 851470 : return cnt;
621 : }
622 :
623 : static size_t
624 17 : count_deletes( segment *s, sql_trans *tr)
625 : {
626 17 : size_t cnt = 0;
627 :
628 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
629 55 : if (SEG_IS_DELETED(s, tr))
630 17 : cnt += s->end - s->start;
631 : }
632 17 : return cnt;
633 : }
634 :
635 : #define CNT_ACTIVE 10
636 :
637 : static size_t
638 16808522 : count_col(sql_trans *tr, sql_column *c, int access)
639 : {
640 16808522 : storage *d;
641 16808522 : sql_delta *ds;
642 :
643 16808522 : if (!isTable(c->t))
644 : return 0;
645 16808522 : d = tab_timestamp_storage(tr, c->t);
646 16806687 : ds = col_timestamp_delta(tr, c);
647 16806679 : if (!d ||!ds)
648 : return 0;
649 16806679 : if (access == 2)
650 481186 : return ds?ds->cs.ucnt:0;
651 16325493 : if (access == 1)
652 17 : return count_inserts(d->segs->h, tr);
653 16325476 : if (access == QUICK)
654 545149 : return d->segs->t?d->segs->t->end:0;
655 15780327 : if (access == CNT_ACTIVE) {
656 851448 : size_t cnt = segs_end(d->segs, tr, c->t);
657 851486 : lock_table(tr->store, c->t->base.id);
658 851468 : cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
659 851469 : unlock_table(tr->store, c->t->base.id);
660 851469 : return cnt;
661 : }
662 14928879 : return segs_end(d->segs, tr, c->t);
663 : }
664 :
665 : static size_t
666 20900 : count_idx(sql_trans *tr, sql_idx *i, int access)
667 : {
668 20900 : storage *d;
669 20900 : sql_delta *ds;
670 :
671 20900 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
672 6719 : return 0;
673 14181 : d = tab_timestamp_storage(tr, i->t);
674 14178 : ds = idx_timestamp_delta(tr, i);
675 14177 : if (!d || !ds)
676 : return 0;
677 14177 : if (access == 2)
678 2871 : return ds?ds->cs.ucnt:0;
679 11306 : if (access == 1)
680 0 : return count_inserts(d->segs->h, tr);
681 11306 : if (access == QUICK)
682 3550 : return d->segs->t?d->segs->t->end:0;
683 7756 : return segs_end(d->segs, tr, i->t);
684 : }
685 :
686 : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
687 : static BAT *
688 13114690 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
689 : {
690 13114690 : BAT *b;
691 :
692 13114690 : assert(access == RD_UPD_ID || access == RD_UPD_VAL);
693 : /* returns the updates for cs */
694 13114690 : if (cs->uibid && cs->uvbid && cs->ucnt) {
695 12070 : if (access == RD_UPD_ID) {
696 7277 : if (!(b = temp_descriptor(cs->uibid)))
697 : return NULL;
698 7277 : if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
699 1657 : (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
700 5620 : oid nil = oid_nil;
701 : /* less then cnt */
702 5620 : BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false, false);
703 5620 : if (!s) {
704 0 : bat_destroy(b);
705 0 : return NULL;
706 : }
707 :
708 5620 : BAT *nb = BATproject(s, b);
709 5620 : bat_destroy(s);
710 5620 : bat_destroy(b);
711 5620 : b = nb;
712 : }
713 : } else {
714 4793 : b = temp_descriptor(cs->uvbid);
715 : }
716 : } else {
717 21837765 : b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
718 : }
719 : return b;
720 : }
721 :
722 : static BAT *
723 0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
724 : {
725 0 : int err = 0;
726 0 : BAT *uv = *UV;
727 0 : BUN cnt = BATcount(ui)+BATcount(oi);
728 0 : BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
729 0 : BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
730 :
731 0 : if (!ni || (uv && !nv)) {
732 0 : bat_destroy(ni);
733 0 : bat_destroy(nv);
734 0 : bat_destroy(ui);
735 0 : bat_destroy(uv);
736 0 : bat_destroy(oi);
737 0 : bat_destroy(ov);
738 0 : return NULL;
739 : }
740 0 : BATiter uvi;
741 0 : BATiter ovi;
742 :
743 0 : if (uv) {
744 0 : uvi = bat_iterator(uv);
745 0 : ovi = bat_iterator(ov);
746 : }
747 :
748 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
749 0 : BUN uip = 0, uie = BATcount(ui);
750 0 : BUN oip = 0, oie = BATcount(oi);
751 :
752 0 : oid uiseqb = ui->tseqbase;
753 0 : oid oiseqb = oi->tseqbase;
754 0 : oid *uipt = NULL, *oipt = NULL;
755 0 : BATiter uii = bat_iterator(ui);
756 0 : BATiter oii = bat_iterator(oi);
757 0 : if (!BATtdensebi(&uii))
758 0 : uipt = uii.base;
759 0 : if (!BATtdensebi(&oii))
760 0 : oipt = oii.base;
761 0 : while (uip < uie && oip < oie && !err) {
762 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
763 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
764 :
765 0 : if (uiid <= oiid) {
766 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
767 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
768 : err = 1;
769 0 : uip++;
770 0 : if (uiid == oiid)
771 0 : oip++;
772 : } else { /* uiid > oiid */
773 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
774 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
775 : err = 1;
776 0 : oip++;
777 : }
778 : }
779 0 : while (uip < uie && !err) {
780 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
781 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
782 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
783 : err = 1;
784 0 : uip++;
785 : }
786 0 : while (oip < oie && !err) {
787 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
788 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
789 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
790 : err = 1;
791 0 : oip++;
792 : }
793 0 : if (uv) {
794 0 : bat_iterator_end(&uvi);
795 0 : bat_iterator_end(&ovi);
796 : }
797 0 : bat_iterator_end(&uii);
798 0 : bat_iterator_end(&oii);
799 0 : bat_destroy(ui);
800 0 : bat_destroy(uv);
801 0 : bat_destroy(oi);
802 0 : bat_destroy(ov);
803 0 : if (!err) {
804 0 : if (nv)
805 0 : *UV = nv;
806 0 : return ni;
807 : }
808 0 : *UV = NULL;
809 0 : bat_destroy(ni);
810 0 : bat_destroy(nv);
811 0 : return NULL;
812 : }
813 :
814 : static sql_delta *
815 8742204 : older_delta( sql_delta *d, sql_trans *tr)
816 : {
817 8742204 : sql_delta *o = d->next;
818 :
819 8749627 : while (o && !o->cs.merged) {
820 7418 : if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
821 : break;
822 : else
823 7423 : o = o->next;
824 : }
825 8742209 : if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
826 0 : return o;
827 : return NULL;
828 : }
829 :
830 : static BAT *
831 8742575 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
832 : {
833 8742575 : assert(tr->active);
834 8742575 : sql_delta *o = NULL;
835 8742575 : BAT *ui = NULL, *uv = NULL;
836 :
837 8742575 : if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
838 : return NULL;
839 8742128 : if (access == RD_UPD_VAL) {
840 4372518 : if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
841 0 : bat_destroy(ui);
842 0 : return NULL;
843 : }
844 : }
845 8742198 : while ((o = older_delta(d, tr)) != NULL) {
846 0 : BAT *oui = NULL, *ouv = NULL;
847 0 : if (!oui)
848 0 : oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
849 0 : if (access == RD_UPD_VAL)
850 0 : ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
851 0 : if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
852 0 : bat_destroy(ui);
853 0 : bat_destroy(uv);
854 0 : bat_destroy(oui);
855 0 : bat_destroy(ouv);
856 0 : return NULL;
857 : }
858 0 : if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
859 : return NULL;
860 : d = o;
861 : }
862 8742120 : if (uv) {
863 4372579 : bat_destroy(ui);
864 4372579 : return uv;
865 : }
866 : return ui;
867 : }
868 :
869 : static BAT *
870 2312 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
871 : {
872 2312 : lock_column(tr->store, c->base.id);
873 2312 : sql_delta *d = col_timestamp_delta(tr, c);
874 2312 : int type = c->type.type->localtype;
875 :
876 2312 : if (!d) {
877 0 : unlock_column(tr->store, c->base.id);
878 0 : return NULL;
879 : }
880 2312 : if (d->cs.st == ST_DICT) {
881 0 : BAT *b = quick_descriptor(d->cs.bid);
882 :
883 0 : type = b->ttype;
884 : }
885 2312 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
886 2312 : unlock_column(tr->store, c->base.id);
887 2312 : return bn;
888 : }
889 :
890 : static BAT *
891 0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
892 : {
893 0 : lock_column(tr->store, i->base.id);
894 0 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
895 0 : sql_delta *d = idx_timestamp_delta(tr, i);
896 :
897 0 : if (!d) {
898 0 : unlock_column(tr->store, i->base.id);
899 0 : return NULL;
900 : }
901 0 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
902 0 : unlock_column(tr->store, i->base.id);
903 0 : return bn;
904 : }
905 :
906 : static BAT *
907 8972294 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
908 : {
909 8972294 : BAT *b;
910 :
911 8972294 : assert(access == RDONLY || access == QUICK || access == RD_EXT);
912 8972294 : assert(cs != NULL);
913 8972294 : if (access == QUICK)
914 175987 : return quick_descriptor(cs->bid);
915 8796307 : if (access == RD_EXT)
916 857 : return temp_descriptor(cs->ebid);
917 8795450 : assert(cs->bid);
918 8795450 : b = temp_descriptor(cs->bid);
919 8794666 : if (b == NULL)
920 : return NULL;
921 8794666 : assert(b->batRestricted == BAT_READ);
922 : /* return slice */
923 8794666 : BAT *s = BATslice(b, 0, cnt);
924 8795561 : bat_destroy(b);
925 8795561 : return s;
926 : }
927 :
928 : static int
929 4370207 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
930 : {
931 4370207 : lock_column(tr->store, c->base.id);
932 4369911 : size_t cnt = count_col(tr, c, 0);
933 4370185 : sql_delta *d = col_timestamp_delta(tr, c);
934 4370180 : int type = c->type.type->localtype;
935 :
936 4370180 : if (!d) {
937 0 : unlock_column(tr->store, c->base.id);
938 0 : return LOG_ERR;
939 : }
940 4370180 : if (d->cs.st == ST_DICT) {
941 2 : BAT *b = quick_descriptor(d->cs.bid);
942 :
943 2 : type = b->ttype;
944 : }
945 :
946 4370180 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
947 4370262 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
948 :
949 4370261 : unlock_column(tr->store, c->base.id);
950 :
951 4370222 : if (*ui == NULL || *uv == NULL) {
952 0 : bat_destroy(*ui);
953 0 : bat_destroy(*uv);
954 0 : return LOG_ERR;
955 : }
956 : return LOG_OK;
957 : }
958 :
959 : static int
960 23 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
961 : {
962 23 : lock_column(tr->store, i->base.id);
963 23 : size_t cnt = count_idx(tr, i, 0);
964 23 : sql_delta *d = idx_timestamp_delta(tr, i);
965 23 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
966 :
967 23 : if (!d) {
968 0 : unlock_column(tr->store, i->base.id);
969 0 : return LOG_ERR;
970 : }
971 :
972 23 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
973 23 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
974 :
975 23 : unlock_column(tr->store, i->base.id);
976 :
977 23 : if (*ui == NULL || *uv == NULL) {
978 0 : bat_destroy(*ui);
979 0 : bat_destroy(*uv);
980 0 : return LOG_ERR;
981 : }
982 : return LOG_OK;
983 : }
984 :
985 : static void * /* BAT * */
986 8965327 : bind_col(sql_trans *tr, sql_column *c, int access)
987 : {
988 8965327 : assert(access == QUICK || tr->active);
989 8965327 : if (!isTable(c->t))
990 : return NULL;
991 8965327 : sql_delta *d = col_timestamp_delta(tr, c);
992 8965031 : if (!d)
993 : return NULL;
994 8965031 : size_t cnt = count_col(tr, c, 0);
995 8964578 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
996 2312 : return bind_ucol(tr, c, access, cnt);
997 8962266 : BAT *b = cs_bind_bat( &d->cs, access, cnt);
998 8962882 : assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
999 : return b;
1000 : }
1001 :
1002 : static void * /* BAT * */
1003 9407 : bind_idx(sql_trans *tr, sql_idx * i, int access)
1004 : {
1005 9407 : assert(access == QUICK || tr->active);
1006 9407 : if (!isTable(i->t))
1007 : return NULL;
1008 9407 : sql_delta *d = idx_timestamp_delta(tr, i);
1009 9408 : if (!d)
1010 : return NULL;
1011 9408 : size_t cnt = count_idx(tr, i, 0);
1012 9407 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
1013 0 : return bind_uidx(tr, i, access, cnt);
1014 9407 : return cs_bind_bat( &d->cs, access, cnt);
1015 : }
1016 :
1017 : static int
1018 2296 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
1019 : {
1020 2296 : if (!cs->uibid) {
1021 0 : cs->uibid = e_bat(TYPE_oid);
1022 0 : if (cs->uibid == BID_NIL)
1023 : return LOG_ERR;
1024 : }
1025 2296 : if (!cs->uvbid) {
1026 0 : BAT *cur = quick_descriptor(cs->bid);
1027 0 : if (!cur)
1028 : return LOG_ERR;
1029 0 : int type = cur->ttype;
1030 0 : cs->uvbid = e_bat(type);
1031 0 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1032 : return LOG_ERR;
1033 : }
1034 2296 : BAT *ui = temp_descriptor(cs->uibid);
1035 2296 : BAT *uv = temp_descriptor(cs->uvbid);
1036 :
1037 2296 : if (ui == NULL || uv == NULL) {
1038 0 : bat_destroy(ui);
1039 0 : bat_destroy(uv);
1040 0 : return LOG_ERR;
1041 : }
1042 2296 : assert(ui && uv);
1043 2296 : if (isEbat(ui)){
1044 392 : temp_destroy(cs->uibid);
1045 392 : cs->uibid = temp_copy(ui->batCacheid, true, true);
1046 392 : bat_destroy(ui);
1047 392 : if (cs->uibid == BID_NIL ||
1048 392 : (ui = temp_descriptor(cs->uibid)) == NULL) {
1049 0 : bat_destroy(uv);
1050 0 : return LOG_ERR;
1051 : }
1052 : }
1053 2296 : if (isEbat(uv)){
1054 392 : temp_destroy(cs->uvbid);
1055 392 : cs->uvbid = temp_copy(uv->batCacheid, true, true);
1056 392 : bat_destroy(uv);
1057 392 : if (cs->uvbid == BID_NIL ||
1058 392 : (uv = temp_descriptor(cs->uvbid)) == NULL) {
1059 0 : bat_destroy(ui);
1060 0 : return LOG_ERR;
1061 : }
1062 : }
1063 2296 : *Ui = ui;
1064 2296 : *Uv = uv;
1065 2296 : return LOG_OK;
1066 : }
1067 :
1068 : static int
1069 5116 : segments_is_append(segment *s, sql_trans *tr, oid rid)
1070 : {
1071 44691 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1072 44691 : if (s->start <= rid && s->end > rid) {
1073 5116 : if (s->ts == tr->tid && !s->deleted) {
1074 2874 : return 1;
1075 : }
1076 : break;
1077 : }
1078 : }
1079 : return 0;
1080 : }
1081 :
1082 : static int
1083 2242 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
1084 : {
1085 38593 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1086 38593 : if (s->start <= rid && s->end > rid) {
1087 2242 : if (s->ts >= tr->ts && s->deleted) {
1088 0 : return 1;
1089 : }
1090 : break;
1091 : }
1092 : }
1093 : return 0;
1094 : }
1095 :
1096 : static sql_delta *
1097 0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
1098 : {
1099 0 : sql_delta *n = ZNEW(sql_delta);
1100 0 : if (!n)
1101 : return NULL;
1102 0 : *n = *bat;
1103 0 : n->next = NULL;
1104 0 : n->cs.ts = tr->tid;
1105 0 : return n;
1106 : }
1107 :
1108 : static BAT *
1109 17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
1110 : {
1111 17 : BAT *newoffsets = NULL;
1112 17 : sql_delta *bat = *batp;
1113 17 : column_storage *cs = &bat->cs;
1114 17 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1115 :
1116 17 : if (!u)
1117 : return NULL;
1118 17 : BUN max_cnt = (BATcount(u) < 256)?256:(BATcount(u)<65536)?65536:INT_MAX;
1119 17 : if (DICTprepare4append(&newoffsets, i, u) < 0) {
1120 0 : bat_destroy(u);
1121 0 : return NULL;
1122 : } else {
1123 17 : int new = 0;
1124 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1125 17 : if (BATcount(u) >= max_cnt) {
1126 1 : if (max_cnt == INT_MAX) { /* decompress */
1127 0 : if (!(b = temp_descriptor(cs->bid))) {
1128 0 : bat_destroy(u);
1129 0 : return NULL;
1130 : }
1131 0 : if (cs->ucnt) {
1132 0 : BAT *ui = NULL, *uv = NULL;
1133 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1134 0 : bat_destroy(b);
1135 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1136 0 : bat_destroy(nb);
1137 0 : bat_destroy(u);
1138 0 : return NULL;
1139 : }
1140 0 : b = nb;
1141 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1142 0 : bat_destroy(ui);
1143 0 : bat_destroy(uv);
1144 0 : bat_destroy(b);
1145 0 : bat_destroy(u);
1146 : }
1147 0 : bat_destroy(ui);
1148 0 : bat_destroy(uv);
1149 : }
1150 0 : n = DICTdecompress_(b, u, PERSISTENT);
1151 0 : bat_destroy(b);
1152 0 : assert(newoffsets == NULL);
1153 0 : if (!n) {
1154 0 : bat_destroy(u);
1155 0 : return NULL;
1156 : }
1157 0 : if (cs->ts != tr->tid) {
1158 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1159 0 : bat_destroy(n);
1160 0 : return NULL;
1161 : }
1162 0 : cs = &(*batp)->cs;
1163 0 : new = 1;
1164 : }
1165 0 : if (cs->bid && !new)
1166 0 : temp_destroy(cs->bid);
1167 0 : n = transfer_to_systrans(n);
1168 0 : if (n == NULL)
1169 : return NULL;
1170 0 : bat_set_access(n, BAT_READ);
1171 0 : cs->bid = temp_create(n);
1172 0 : bat_destroy(n);
1173 0 : if (cs->ebid && !new)
1174 0 : temp_destroy(cs->ebid);
1175 0 : cs->ebid = 0;
1176 0 : cs->ucnt = 0;
1177 0 : if (cs->uibid && !new)
1178 0 : temp_destroy(cs->uibid);
1179 0 : if (cs->uvbid && !new)
1180 0 : temp_destroy(cs->uvbid);
1181 0 : cs->uibid = cs->uvbid = 0;
1182 0 : cs->st = ST_DEFAULT;
1183 0 : cs->cleared = true;
1184 : } else {
1185 1 : if (!(b = temp_descriptor(cs->bid))) {
1186 0 : bat_destroy(newoffsets);
1187 0 : bat_destroy(u);
1188 0 : return NULL;
1189 : }
1190 2 : n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
1191 1 : bat_destroy(b);
1192 1 : if (!n) {
1193 0 : bat_destroy(newoffsets);
1194 0 : bat_destroy(u);
1195 0 : return NULL;
1196 : }
1197 1 : if (cs->ts != tr->tid) {
1198 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1199 0 : bat_destroy(n);
1200 0 : return NULL;
1201 : }
1202 0 : cs = &(*batp)->cs;
1203 0 : new = 1;
1204 0 : temp_dup(cs->ebid);
1205 0 : if (cs->uibid) {
1206 0 : temp_dup(cs->uibid);
1207 0 : temp_dup(cs->uvbid);
1208 : }
1209 : }
1210 1 : if (cs->bid && !new)
1211 1 : temp_destroy(cs->bid);
1212 1 : n = transfer_to_systrans(n);
1213 1 : if (n == NULL)
1214 : return NULL;
1215 1 : bat_set_access(n, BAT_READ);
1216 1 : cs->bid = temp_create(n);
1217 1 : bat_destroy(n);
1218 1 : cs->cleared = true;
1219 1 : i = newoffsets;
1220 : }
1221 : } else { /* append */
1222 16 : i = newoffsets;
1223 : }
1224 : }
1225 17 : bat_destroy(u);
1226 17 : return i;
1227 : }
1228 :
1229 : static BAT *
1230 0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
1231 : {
1232 0 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1233 0 : BAT *newoffsets = NULL;
1234 0 : BAT *b = NULL, *n = NULL;
1235 :
1236 0 : if (!(b = temp_descriptor(cs->bid)))
1237 : return NULL;
1238 :
1239 0 : if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
1240 0 : bat_destroy(b);
1241 0 : return NULL;
1242 : } else {
1243 : /* returns new offset bat if values within min/max, else decompress */
1244 0 : if (!newoffsets) { /* decompress */
1245 0 : if (cs->ucnt) {
1246 0 : BAT *ui = NULL, *uv = NULL;
1247 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1248 0 : bat_destroy(b);
1249 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1250 0 : bat_destroy(nb);
1251 0 : return NULL;
1252 : }
1253 0 : b = nb;
1254 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1255 0 : bat_destroy(ui);
1256 0 : bat_destroy(uv);
1257 0 : bat_destroy(b);
1258 : }
1259 0 : bat_destroy(ui);
1260 0 : bat_destroy(uv);
1261 : }
1262 0 : n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
1263 0 : bat_destroy(b);
1264 0 : if (!n)
1265 : return NULL;
1266 0 : if (cs->bid)
1267 0 : temp_destroy(cs->bid);
1268 0 : n = transfer_to_systrans(n);
1269 0 : if (n == NULL)
1270 : return NULL;
1271 0 : bat_set_access(n, BAT_READ);
1272 0 : cs->bid = temp_create(n);
1273 0 : cs->ucnt = 0;
1274 0 : if (cs->uibid)
1275 0 : temp_destroy(cs->uibid);
1276 0 : if (cs->uvbid)
1277 0 : temp_destroy(cs->uvbid);
1278 0 : cs->uibid = cs->uvbid = 0;
1279 0 : cs->st = ST_DEFAULT;
1280 0 : cs->cleared = true;
1281 0 : b = n;
1282 : } else { /* append */
1283 : i = newoffsets;
1284 : }
1285 : }
1286 0 : bat_destroy(b);
1287 0 : return i;
1288 : }
1289 :
1290 : /*
1291 : * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
1292 : */
1293 : static int
1294 2953 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
1295 : {
1296 2953 : int res = LOG_OK;
1297 2953 : sql_delta *bat = *batp;
1298 2953 : column_storage *cs = &bat->cs;
1299 2953 : BAT *otids = tids, *oupdates = updates;
1300 :
1301 2953 : if (!BATcount(tids))
1302 : return LOG_OK;
1303 :
1304 2953 : if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
1305 6 : tids = BATunmask(tids);
1306 6 : if (!tids)
1307 : return LOG_ERR;
1308 : }
1309 2953 : if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
1310 0 : updates = BATunmask(updates);
1311 0 : if (!updates) {
1312 0 : if (otids != tids)
1313 0 : bat_destroy(tids);
1314 0 : return LOG_ERR;
1315 : }
1316 2953 : } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
1317 42 : updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
1318 42 : if (!updates) {
1319 0 : if (otids != tids)
1320 0 : bat_destroy(tids);
1321 0 : return LOG_ERR;
1322 : }
1323 : }
1324 :
1325 2953 : if (cs->st == ST_DICT) {
1326 : /* possibly a new array is returned */
1327 4 : BAT *nupdates = dict_append_bat(tr, batp, updates);
1328 4 : bat = *batp;
1329 4 : cs = &bat->cs;
1330 4 : if (oupdates != updates)
1331 0 : bat_destroy(updates);
1332 4 : updates = nupdates;
1333 4 : if (!updates) {
1334 0 : if (otids != tids)
1335 0 : bat_destroy(tids);
1336 0 : return LOG_ERR;
1337 : }
1338 : }
1339 :
1340 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1341 : /* currently only one update delta is possible */
1342 2953 : lock_table(tr->store, t->base.id);
1343 2953 : storage *s = ATOMIC_PTR_GET(&t->data);
1344 2953 : if (!is_new && !cs->cleared) {
1345 2630 : if (!tids->tsorted /* make sure we have simple dense or oids */) {
1346 6 : BAT *sorted, *order;
1347 6 : if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
1348 0 : if (otids != tids)
1349 0 : bat_destroy(tids);
1350 0 : if (oupdates != updates)
1351 0 : bat_destroy(updates);
1352 0 : unlock_table(tr->store, t->base.id);
1353 0 : return LOG_ERR;
1354 : }
1355 6 : if (otids != tids)
1356 0 : bat_destroy(tids);
1357 6 : tids = sorted;
1358 6 : BAT *nupdates = BATproject(order, updates);
1359 6 : bat_destroy(order);
1360 6 : if (oupdates != updates)
1361 0 : bat_destroy(updates);
1362 6 : updates = nupdates;
1363 6 : if (!updates) {
1364 0 : bat_destroy(tids);
1365 0 : unlock_table(tr->store, t->base.id);
1366 0 : return LOG_ERR;
1367 : }
1368 : }
1369 2630 : assert(tids->tsorted);
1370 2630 : BAT *ui = NULL, *uv = NULL;
1371 :
1372 : /* handle updates on just inserted bits */
1373 : /* handle updates on updates (within one transaction) */
1374 2630 : BATiter upi = bat_iterator(updates);
1375 2630 : BUN cnt = 0, ucnt = BATcount(tids);
1376 2630 : BAT *b, *ins = NULL;
1377 2630 : int *msk = NULL;
1378 :
1379 2630 : if((b = temp_descriptor(cs->bid)) == NULL)
1380 : res = LOG_ERR;
1381 :
1382 2630 : if (res == LOG_OK && BATtdense(tids)) {
1383 2497 : oid start = tids->tseqbase, offset = start;
1384 2497 : oid end = start + ucnt;
1385 :
1386 8892 : for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
1387 7002 : if (seg->start <= start && seg->end > start) {
1388 : /* check for delete conflicts */
1389 2497 : if (seg->ts >= tr->ts && seg->deleted) {
1390 0 : res = LOG_CONFLICT;
1391 0 : continue;
1392 : }
1393 :
1394 : /* check for inplace updates */
1395 2497 : BUN lend = end < seg->end?end:seg->end;
1396 2497 : if (seg->ts == tr->tid && !seg->deleted) {
1397 101 : if (!ins) {
1398 101 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1399 101 : if (!ins)
1400 : res = LOG_ERR;
1401 : else {
1402 101 : BATsetcount(ins, ucnt); /* all full updates */
1403 101 : msk = (int*)Tloc(ins, 0);
1404 101 : BUN end = (ucnt+31)/32;
1405 101 : memset(msk, 0, end * sizeof(int));
1406 : }
1407 : }
1408 438 : for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
1409 337 : const void *upd = BUNtail(upi, rid-offset);
1410 337 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1411 0 : res = LOG_ERR;
1412 :
1413 337 : oid word = i/32;
1414 337 : int pos = i%32;
1415 337 : msk[word] |= 1U<<pos;
1416 337 : cnt++;
1417 : }
1418 : }
1419 : }
1420 7002 : if (end < seg->end)
1421 : break;
1422 : }
1423 136 : } else if (res == LOG_OK && complex_cand(tids)) {
1424 3 : struct canditer ci;
1425 3 : segment *seg = s->segs->h;
1426 3 : canditer_init(&ci, NULL, tids);
1427 3 : BUN i = 0;
1428 1036 : while ( seg && res == LOG_OK && i < ucnt) {
1429 1033 : oid rid = canditer_next(&ci);
1430 1033 : if (seg->end <= rid)
1431 13 : seg = ATOMIC_PTR_GET(&seg->next);
1432 1020 : else if (seg->start <= rid && seg->end > rid) {
1433 : /* check for delete conflicts */
1434 1020 : if (seg->ts >= tr->ts && seg->deleted) {
1435 0 : res = LOG_CONFLICT;
1436 0 : continue;
1437 : }
1438 :
1439 : /* check for inplace updates */
1440 1020 : if (seg->ts == tr->tid && !seg->deleted) {
1441 0 : if (!ins) {
1442 0 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1443 0 : if (!ins) {
1444 : res = LOG_ERR;
1445 : break;
1446 : } else {
1447 0 : BATsetcount(ins, ucnt); /* all full updates */
1448 0 : msk = (int*)Tloc(ins, 0);
1449 0 : BUN end = (ucnt+31)/32;
1450 0 : memset(msk, 0, end * sizeof(int));
1451 : }
1452 : }
1453 0 : ptr upd = BUNtail(upi, i);
1454 0 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1455 0 : res = LOG_ERR;
1456 :
1457 0 : oid word = i/32;
1458 0 : int pos = i%32;
1459 0 : msk[word] |= 1U<<pos;
1460 0 : cnt++;
1461 : }
1462 1020 : i++;
1463 : }
1464 : }
1465 130 : } else if (res == LOG_OK) {
1466 130 : BUN i = 0;
1467 130 : oid *rid = Tloc(tids,0);
1468 130 : segment *seg = s->segs->h;
1469 14396 : while ( seg && res == LOG_OK && i < ucnt) {
1470 14266 : if (seg->end <= rid[i])
1471 3808 : seg = ATOMIC_PTR_GET(&seg->next);
1472 10458 : else if (seg->start <= rid[i] && seg->end > rid[i]) {
1473 : /* check for delete conflicts */
1474 10458 : if (seg->ts >= tr->ts && seg->deleted) {
1475 0 : res = LOG_CONFLICT;
1476 0 : continue;
1477 : }
1478 :
1479 : /* check for inplace updates */
1480 10458 : if (seg->ts == tr->tid && !seg->deleted) {
1481 293 : if (!ins) {
1482 29 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1483 29 : if (!ins) {
1484 : res = LOG_ERR;
1485 : break;
1486 : } else {
1487 29 : BATsetcount(ins, ucnt); /* all full updates */
1488 29 : msk = (int*)Tloc(ins, 0);
1489 29 : BUN end = (ucnt+31)/32;
1490 29 : memset(msk, 0, end * sizeof(int));
1491 : }
1492 : }
1493 293 : const void *upd = BUNtail(upi, i);
1494 293 : if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
1495 0 : res = LOG_ERR;
1496 :
1497 293 : oid word = i/32;
1498 293 : int pos = i%32;
1499 293 : msk[word] |= 1U<<pos;
1500 293 : cnt++;
1501 : }
1502 10458 : i++;
1503 : }
1504 : }
1505 : }
1506 :
1507 2630 : if (res == LOG_OK && cnt < ucnt) { /* now handle real updates */
1508 2500 : if (cs->ucnt == 0) {
1509 2446 : if (cnt) {
1510 0 : BAT *nins = BATmaskedcands(0, ucnt, ins, false);
1511 0 : if (nins) {
1512 0 : ui = BATproject(nins, tids);
1513 0 : uv = BATproject(nins, updates);
1514 0 : bat_destroy(nins);
1515 : }
1516 : } else {
1517 2446 : ui = temp_descriptor(tids->batCacheid);
1518 2446 : uv = temp_descriptor(updates->batCacheid);
1519 : }
1520 2446 : if (!ui || !uv) {
1521 : res = LOG_ERR;
1522 : } else {
1523 2446 : temp_destroy(cs->uibid);
1524 2446 : temp_destroy(cs->uvbid);
1525 2446 : ui = transfer_to_systrans(ui);
1526 2446 : uv = transfer_to_systrans(uv);
1527 2446 : if (ui == NULL || uv == NULL) {
1528 0 : BBPreclaim(ui);
1529 0 : BBPreclaim(uv);
1530 : res = LOG_ERR;
1531 : } else {
1532 2446 : cs->uibid = temp_create(ui);
1533 2446 : cs->uvbid = temp_create(uv);
1534 2446 : cs->ucnt = BATcount(ui);
1535 : }
1536 : }
1537 : } else {
1538 54 : BAT *nui = NULL, *nuv = NULL;
1539 :
1540 : /* merge taking msk of inserted into account */
1541 54 : if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
1542 : res = LOG_ERR;
1543 :
1544 54 : if (res == LOG_OK) {
1545 54 : const void *upd = NULL;
1546 54 : nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
1547 54 : nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
1548 :
1549 54 : if (!nui || !nuv) {
1550 : res = LOG_ERR;
1551 : } else {
1552 54 : BATiter ovi = bat_iterator(uv);
1553 :
1554 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
1555 54 : BUN uip = 0, uie = BATcount(ui);
1556 54 : BUN nip = 0, nie = BATcount(tids);
1557 54 : oid uiseqb = ui->tseqbase;
1558 54 : oid niseqb = tids->tseqbase;
1559 54 : oid *uipt = NULL, *nipt = NULL;
1560 54 : BATiter uii = bat_iterator(ui);
1561 54 : BATiter tidsi = bat_iterator(tids);
1562 54 : if (!BATtdensebi(&uii))
1563 53 : uipt = uii.base;
1564 54 : if (!BATtdensebi(&tidsi))
1565 53 : nipt = tidsi.base;
1566 14211 : while (uip < uie && nip < nie && res == LOG_OK) {
1567 14157 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1568 14157 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1569 :
1570 14157 : if (uiv < niv) {
1571 7916 : upd = BUNtail(ovi, uip);
1572 15832 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1573 7916 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1574 : res = LOG_ERR;
1575 7916 : uip++;
1576 6241 : } else if (uiv == niv) {
1577 : /* handle == */
1578 18 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1579 18 : upd = BUNtail(upi, nip);
1580 36 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1581 18 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1582 : res = LOG_ERR;
1583 : } else {
1584 0 : upd = BUNtail(ovi, uip);
1585 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1586 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1587 : res = LOG_ERR;
1588 : }
1589 18 : uip++;
1590 18 : nip++;
1591 : } else { /* uiv > niv */
1592 6223 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1593 6223 : upd = BUNtail(upi, nip);
1594 12446 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1595 6223 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1596 : res = LOG_ERR;
1597 : }
1598 6223 : nip++;
1599 : }
1600 : }
1601 275 : while (uip < uie && res == LOG_OK) {
1602 221 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1603 221 : upd = BUNtail(ovi, uip);
1604 442 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1605 221 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1606 : res = LOG_ERR;
1607 221 : uip++;
1608 : }
1609 383 : while (nip < nie && res == LOG_OK) {
1610 329 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1611 329 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1612 329 : upd = BUNtail(upi, nip);
1613 658 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1614 329 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1615 : res = LOG_ERR;
1616 : }
1617 329 : nip++;
1618 : }
1619 54 : bat_iterator_end(&uii);
1620 54 : bat_iterator_end(&tidsi);
1621 54 : bat_iterator_end(&ovi);
1622 54 : if (res == LOG_OK) {
1623 54 : temp_destroy(cs->uibid);
1624 54 : temp_destroy(cs->uvbid);
1625 54 : nui = transfer_to_systrans(nui);
1626 54 : nuv = transfer_to_systrans(nuv);
1627 54 : if (nui == NULL || nuv == NULL) {
1628 : res = LOG_ERR;
1629 : } else {
1630 54 : cs->uibid = temp_create(nui);
1631 54 : cs->uvbid = temp_create(nuv);
1632 54 : cs->ucnt = BATcount(nui);
1633 : }
1634 : }
1635 : }
1636 54 : bat_destroy(nui);
1637 54 : bat_destroy(nuv);
1638 : }
1639 : }
1640 : }
1641 2630 : bat_iterator_end(&upi);
1642 2630 : bat_destroy(b);
1643 2630 : unlock_table(tr->store, t->base.id);
1644 2630 : bat_destroy(ins);
1645 2630 : bat_destroy(ui);
1646 2630 : bat_destroy(uv);
1647 2630 : if (otids != tids)
1648 10 : bat_destroy(tids);
1649 2630 : if (oupdates != updates)
1650 11 : bat_destroy(updates);
1651 2630 : return res;
1652 : } else if (is_new || cs->cleared) {
1653 323 : BAT *b = temp_descriptor(cs->bid);
1654 :
1655 323 : if (b == NULL) {
1656 : res = LOG_ERR;
1657 : } else {
1658 323 : if (BATcount(b)==0) {
1659 1 : if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
1660 0 : res = LOG_ERR;
1661 322 : } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
1662 0 : res = LOG_ERR;
1663 323 : BBPcold(b->batCacheid);
1664 323 : bat_destroy(b);
1665 : }
1666 : }
1667 323 : unlock_table(tr->store, t->base.id);
1668 323 : if (otids != tids)
1669 2 : bat_destroy(tids);
1670 323 : if (oupdates != updates)
1671 41 : bat_destroy(updates);
1672 : return res;
1673 : }
1674 :
1675 : static int
1676 2953 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
1677 : {
1678 2953 : return cs_update_bat(tr, bat, t, tids, updates, is_new);
1679 : }
1680 :
1681 : static void *
1682 4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
1683 : {
1684 4 : void *newoffsets = NULL;
1685 4 : sql_delta *bat = *batp;
1686 4 : column_storage *cs = &bat->cs;
1687 4 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1688 :
1689 4 : if (!u)
1690 : return NULL;
1691 4 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1692 4 : if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
1693 0 : bat_destroy(u);
1694 0 : return NULL;
1695 : } else {
1696 4 : int new = 0;
1697 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1698 4 : if (BATcount(u) >= max_cnt) {
1699 0 : if (max_cnt == INT_MAX) { /* decompress */
1700 : if (!(b = temp_descriptor(cs->bid))) {
1701 : bat_destroy(u);
1702 : return NULL;
1703 : }
1704 : n = DICTdecompress_(b, u, PERSISTENT);
1705 : /* TODO decompress updates if any */
1706 : bat_destroy(b);
1707 : assert(newoffsets == NULL);
1708 : if (!n) {
1709 : bat_destroy(u);
1710 : return NULL;
1711 : }
1712 : if (cs->ts != tr->tid) {
1713 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1714 : bat_destroy(n);
1715 : bat_destroy(u);
1716 : return NULL;
1717 : }
1718 : cs = &(*batp)->cs;
1719 : new = 1;
1720 : cs->uibid = cs->uvbid = 0;
1721 : }
1722 : if (cs->bid && !new)
1723 : temp_destroy(cs->bid);
1724 : n = transfer_to_systrans(n);
1725 : if (n == NULL) {
1726 : bat_destroy(u);
1727 : return NULL;
1728 : }
1729 : bat_set_access(n, BAT_READ);
1730 : cs->bid = temp_create(n);
1731 : bat_destroy(n);
1732 : if (cs->ebid && !new)
1733 : temp_destroy(cs->ebid);
1734 : cs->ebid = 0;
1735 : cs->st = ST_DEFAULT;
1736 : /* at append_col the column's storage type is cleared */
1737 : cs->cleared = true;
1738 : } else {
1739 0 : if (!(b = temp_descriptor(cs->bid))) {
1740 0 : GDKfree(newoffsets);
1741 0 : bat_destroy(u);
1742 0 : return NULL;
1743 : }
1744 0 : n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
1745 0 : bat_destroy(b);
1746 0 : if (!n) {
1747 0 : GDKfree(newoffsets);
1748 0 : bat_destroy(u);
1749 0 : return NULL;
1750 : }
1751 0 : if (cs->ts != tr->tid) {
1752 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1753 0 : bat_destroy(u);
1754 0 : bat_destroy(n);
1755 0 : return NULL;
1756 : }
1757 0 : cs = &(*batp)->cs;
1758 0 : new = 1;
1759 0 : temp_dup(cs->ebid);
1760 0 : if (cs->uibid) {
1761 0 : temp_dup(cs->uibid);
1762 0 : temp_dup(cs->uvbid);
1763 : }
1764 : }
1765 0 : if (cs->bid)
1766 0 : temp_destroy(cs->bid);
1767 0 : n = transfer_to_systrans(n);
1768 0 : if (n == NULL) {
1769 0 : bat_destroy(u);
1770 0 : return NULL;
1771 : }
1772 0 : bat_set_access(n, BAT_READ);
1773 0 : cs->bid = temp_create(n);
1774 0 : bat_destroy(n);
1775 0 : cs->cleared = true;
1776 0 : i = newoffsets;
1777 : }
1778 : } else { /* append */
1779 4 : i = newoffsets;
1780 : }
1781 : }
1782 4 : bat_destroy(u);
1783 4 : return i;
1784 : }
1785 :
1786 : static void *
1787 1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
1788 : {
1789 1 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1790 1 : void *newoffsets = NULL;
1791 1 : BAT *b = NULL, *n = NULL;
1792 :
1793 1 : if (!(b = temp_descriptor(cs->bid)))
1794 : return NULL;
1795 :
1796 1 : if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
1797 0 : bat_destroy(b);
1798 0 : return NULL;
1799 : } else {
1800 : /* returns new offset bat if values within min/max, else decompress */
1801 1 : if (!newoffsets) {
1802 1 : n = FORdecompress_(b, offsetval, tt, PERSISTENT);
1803 1 : bat_destroy(b);
1804 1 : if (!n)
1805 : return NULL;
1806 : /* TODO decompress updates if any */
1807 1 : if (cs->bid)
1808 1 : temp_destroy(cs->bid);
1809 1 : n = transfer_to_systrans(n);
1810 1 : if (n == NULL)
1811 : return NULL;
1812 1 : bat_set_access(n, BAT_READ);
1813 1 : cs->bid = temp_create(n);
1814 1 : cs->st = ST_DEFAULT;
1815 : /* at append_col the column's storage type is cleared */
1816 1 : cs->cleared = true;
1817 1 : b = n;
1818 : } else { /* append */
1819 : i = newoffsets;
1820 : }
1821 : }
1822 1 : bat_destroy(b);
1823 1 : return i;
1824 : }
1825 :
1826 : static int
1827 5116 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
1828 : {
1829 5116 : void *oupd = upd;
1830 5116 : sql_delta *bat = *batp;
1831 5116 : column_storage *cs = &bat->cs;
1832 5116 : storage *s = ATOMIC_PTR_GET(&t->data);
1833 5116 : assert(!is_oid_nil(rid));
1834 5116 : int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
1835 :
1836 5116 : if (cs->st == ST_DICT) {
1837 : /* possibly a new array is returned */
1838 0 : upd = dict_append_val(tr, batp, upd, 1);
1839 0 : bat = *batp;
1840 0 : cs = &bat->cs;
1841 0 : if (!upd)
1842 : return LOG_ERR;
1843 : }
1844 :
1845 : /* check if rid is insert ? */
1846 5116 : if (!inplace) {
1847 : /* check conflict */
1848 2242 : if (segments_is_deleted(s->segs->h, tr, rid)) {
1849 0 : if (oupd != upd)
1850 0 : GDKfree(upd);
1851 0 : return LOG_CONFLICT;
1852 : }
1853 2242 : BAT *ui, *uv;
1854 :
1855 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1856 : /* currently only one update delta is possible */
1857 2242 : if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1858 0 : if (oupd != upd)
1859 0 : GDKfree(upd);
1860 0 : return LOG_ERR;
1861 : }
1862 :
1863 2242 : assert(uv->ttype);
1864 2242 : assert(BATcount(ui) == BATcount(uv));
1865 4484 : if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
1866 2242 : BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
1867 0 : if (oupd != upd)
1868 0 : GDKfree(upd);
1869 0 : bat_destroy(ui);
1870 0 : bat_destroy(uv);
1871 0 : return LOG_ERR;
1872 : }
1873 2242 : assert(BATcount(ui) == BATcount(uv));
1874 2242 : bat_destroy(ui);
1875 2242 : bat_destroy(uv);
1876 2242 : cs->ucnt++;
1877 : } else {
1878 2874 : BAT *b = NULL;
1879 :
1880 2874 : if((b = temp_descriptor(cs->bid)) == NULL) {
1881 0 : if (oupd != upd)
1882 0 : GDKfree(upd);
1883 0 : return LOG_ERR;
1884 : }
1885 2874 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
1886 0 : if (oupd != upd)
1887 0 : GDKfree(upd);
1888 0 : bat_destroy(b);
1889 0 : return LOG_ERR;
1890 : }
1891 2874 : bat_destroy(b);
1892 : }
1893 5116 : if (oupd != upd)
1894 0 : GDKfree(upd);
1895 : return LOG_OK;
1896 : }
1897 :
1898 : static int
1899 5116 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
1900 : {
1901 5116 : int res = LOG_OK;
1902 5116 : lock_table(tr->store, t->base.id);
1903 5116 : res = cs_update_val(tr, bat, t, rid, upd, is_new);
1904 5116 : unlock_table(tr->store, t->base.id);
1905 5116 : return res;
1906 : }
1907 :
1908 : static int
1909 158708 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
1910 : {
1911 158708 : (void)tr;
1912 158708 : if (!ocs)
1913 : return LOG_OK;
1914 158708 : cs->bid = ocs->bid;
1915 158708 : cs->ebid = ocs->ebid;
1916 158708 : cs->uibid = ocs->uibid;
1917 158708 : cs->uvbid = ocs->uvbid;
1918 158708 : cs->ucnt = ocs->ucnt;
1919 :
1920 158708 : if (temp) {
1921 25944 : cs->bid = temp_copy(cs->bid, true, false);
1922 25944 : if (cs->bid == BID_NIL)
1923 : return LOG_ERR;
1924 : } else {
1925 132764 : temp_dup(cs->bid);
1926 : }
1927 158712 : if (cs->ebid)
1928 6 : temp_dup(cs->ebid);
1929 158712 : cs->ucnt = 0;
1930 158712 : cs->uibid = e_bat(TYPE_oid);
1931 158713 : cs->uvbid = e_bat(type);
1932 158721 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1933 : return LOG_ERR;
1934 158721 : cs->st = ocs->st;
1935 158721 : return LOG_OK;
1936 : }
1937 :
1938 : static void
1939 311843 : destroy_delta(sql_delta *b, bool recursive)
1940 : {
1941 311843 : if (ATOMIC_DEC(&b->cs.refcnt) > 0)
1942 : return;
1943 292537 : if (recursive && b->next)
1944 127251 : destroy_delta(b->next, true);
1945 292537 : if (b->cs.uibid)
1946 94401 : temp_destroy(b->cs.uibid);
1947 292537 : if (b->cs.uvbid)
1948 94401 : temp_destroy(b->cs.uvbid);
1949 292537 : if (b->cs.bid)
1950 292537 : temp_destroy(b->cs.bid);
1951 292537 : if (b->cs.ebid)
1952 61 : temp_destroy(b->cs.ebid);
1953 292537 : b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
1954 292537 : _DELETE(b);
1955 : }
1956 :
1957 : static sql_delta *
1958 15717276 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
1959 : {
1960 15717276 : sql_delta *obat = ATOMIC_PTR_GET(&c->data);
1961 :
1962 15717276 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
1963 15584519 : return obat;
1964 132757 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
1965 : /* abort */
1966 12 : if (update_conflict)
1967 4 : *update_conflict = true;
1968 8 : else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
1969 8 : return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
1970 4 : return NULL;
1971 : }
1972 132745 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
1973 : return NULL;
1974 132744 : sql_delta* bat = ZNEW(sql_delta);
1975 132741 : if (!bat)
1976 : return NULL;
1977 132741 : ATOMIC_INIT(&bat->cs.refcnt, 1);
1978 132741 : if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
1979 0 : destroy_delta(bat, false);
1980 0 : return NULL;
1981 : }
1982 132749 : bat->cs.ts = tr->tid;
1983 : /* only one writer else abort */
1984 132749 : bat->next = obat;
1985 132749 : if (obat)
1986 132749 : bat->nr_updates = obat->nr_updates;
1987 132749 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
1988 0 : bat->next = NULL;
1989 0 : destroy_delta(bat, false);
1990 0 : if (update_conflict)
1991 0 : *update_conflict = true;
1992 0 : return NULL;
1993 : }
1994 : return bat;
1995 : }
1996 :
1997 : static int
1998 8069 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
1999 : {
2000 8069 : int ok = LOG_OK;
2001 :
2002 8069 : if (is_bat) {
2003 2953 : BAT *tids = incoming_tids;
2004 2953 : BAT *values = incoming_values;
2005 2953 : if (BATcount(tids) == 0)
2006 : return LOG_OK;
2007 2953 : ok = delta_update_bat(tr, delta, table, tids, values, is_new);
2008 : } else {
2009 5116 : ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
2010 : }
2011 : return ok;
2012 : }
2013 :
2014 : static int
2015 8092 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
2016 : {
2017 8092 : int res = LOG_OK;
2018 8092 : bool update_conflict = false;
2019 8092 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2020 :
2021 8092 : if (isbat) {
2022 2973 : BAT *t = tids;
2023 2973 : if (!BATcount(t))
2024 : return LOG_OK;
2025 : }
2026 :
2027 7794 : if (c == NULL)
2028 : return LOG_ERR;
2029 :
2030 7794 : if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
2031 4 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2032 :
2033 7790 : assert(delta && delta->cs.ts == tr->tid);
2034 7790 : assert(c->t->persistence != SQL_DECLARED_TABLE);
2035 7790 : if (odelta != delta)
2036 3413 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
2037 :
2038 7790 : odelta = delta;
2039 7790 : if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
2040 : return res;
2041 7790 : assert(delta == odelta);
2042 7790 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2043 0 : res = sql_trans_alter_storage(tr, c, NULL);
2044 : return res;
2045 : }
2046 :
2047 : static sql_delta *
2048 2457 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
2049 : {
2050 2457 : sql_delta *obat = ATOMIC_PTR_GET(&i->data);
2051 :
2052 2457 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
2053 2429 : return obat;
2054 28 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
2055 : /* abort */
2056 0 : if (update_conflict)
2057 0 : *update_conflict = true;
2058 0 : return NULL;
2059 : }
2060 28 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
2061 : return NULL;
2062 28 : sql_delta* bat = ZNEW(sql_delta);
2063 28 : if (!bat)
2064 : return NULL;
2065 28 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2066 33 : if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
2067 0 : destroy_delta(bat, false);
2068 0 : return NULL;
2069 : }
2070 28 : bat->cs.ts = tr->tid;
2071 : /* only one writer else abort */
2072 28 : bat->next = obat;
2073 28 : if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
2074 0 : bat->next = NULL;
2075 0 : destroy_delta(bat, false);
2076 0 : if (update_conflict)
2077 0 : *update_conflict = true;
2078 0 : return NULL;
2079 : }
2080 : return bat;
2081 : }
2082 :
2083 : static int
2084 782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
2085 : {
2086 782 : int res = LOG_OK;
2087 782 : bool update_conflict = false;
2088 782 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
2089 :
2090 782 : if (isbat) {
2091 782 : BAT *t = tids;
2092 782 : if (!BATcount(t))
2093 : return LOG_OK;
2094 : }
2095 :
2096 279 : if (i == NULL)
2097 : return LOG_ERR;
2098 :
2099 279 : if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
2100 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2101 :
2102 279 : assert(delta && delta->cs.ts == tr->tid);
2103 279 : if (odelta != delta)
2104 22 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
2105 :
2106 279 : odelta = delta;
2107 279 : res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
2108 279 : assert(delta == odelta);
2109 : return res;
2110 : }
2111 :
2112 : static int
2113 150478 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
2114 : {
2115 150478 : BAT *b, *oi = i;
2116 150478 : int err = 0;
2117 150478 : sql_delta *bat = *batp;
2118 :
2119 150478 : assert(!offsets || BATcount(offsets) == BATcount(i));
2120 150478 : if (!BATcount(i))
2121 : return LOG_OK;
2122 150478 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
2123 : return LOG_ERR;
2124 :
2125 150478 : lock_column(tr->store, id);
2126 150461 : if (bat->cs.st == ST_DICT) {
2127 13 : BAT *ni = dict_append_bat(tr, batp, oi);
2128 13 : bat = *batp;
2129 13 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2130 0 : bat_destroy(oi);
2131 13 : oi = ni;
2132 13 : if (!oi) {
2133 0 : unlock_column(tr->store, id);
2134 0 : return LOG_ERR;
2135 : }
2136 : }
2137 150461 : if (bat->cs.st == ST_FOR) {
2138 0 : BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
2139 0 : bat = *batp;
2140 0 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2141 0 : bat_destroy(oi);
2142 0 : oi = ni;
2143 0 : if (!oi) {
2144 0 : unlock_column(tr->store, id);
2145 0 : return LOG_ERR;
2146 : }
2147 : }
2148 :
2149 150461 : b = temp_descriptor(bat->cs.bid);
2150 150455 : if (b == NULL) {
2151 0 : unlock_column(tr->store, id);
2152 0 : if (oi != i)
2153 0 : bat_destroy(oi);
2154 0 : return LOG_ERR;
2155 : }
2156 150455 : if (!offsets && offset == b->hseqbase+BATcount(b)) {
2157 150267 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2158 0 : err = 1;
2159 176 : } else if (!offsets) {
2160 176 : if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
2161 0 : err = 1;
2162 12 : } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
2163 0 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2164 0 : err = 1;
2165 12 : } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
2166 0 : err = 1;
2167 : }
2168 150475 : bat_destroy(b);
2169 150486 : unlock_column(tr->store, id);
2170 :
2171 150478 : if (oi != i)
2172 13 : bat_destroy(oi);
2173 150478 : return (err)?LOG_ERR:LOG_OK;
2174 : }
2175 :
2176 : // Look at the offsets and find where the replacements end and the appends begin.
2177 : static BUN
2178 0 : start_of_appends(BAT *offsets, BUN bcnt)
2179 : {
2180 0 : BUN ocnt = BATcount(offsets);
2181 0 : if (ocnt == 0)
2182 : return 0;
2183 :
2184 0 : BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
2185 0 : if (highest < bcnt)
2186 : // all are replacements
2187 : return ocnt;
2188 :
2189 : // reason backward to find the first append.
2190 : // Suppose offsets has 15 entries, bcnt == 100
2191 : // and the highest offset in offsets is 109.
2192 0 : BUN new_bcnt = highest + 1; // 110
2193 0 : BUN nappends = new_bcnt - bcnt; // 10
2194 0 : BUN nreplacements = ocnt - nappends; // 5
2195 :
2196 : // The first append should be to position bcnt
2197 0 : assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
2198 :
2199 : return nreplacements;
2200 : }
2201 :
2202 :
2203 : static int
2204 15431903 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
2205 : {
2206 15431903 : void *oi = i;
2207 15431903 : BAT *b;
2208 15431903 : lock_column(tr->store, id);
2209 15431601 : sql_delta *bat = *batp;
2210 :
2211 15431601 : if (bat->cs.st == ST_DICT) {
2212 : /* possibly a new array is returned */
2213 4 : i = dict_append_val(tr, batp, i, cnt);
2214 4 : bat = *batp;
2215 4 : if (!i) {
2216 0 : unlock_column(tr->store, id);
2217 0 : return LOG_ERR;
2218 : }
2219 : }
2220 15431601 : if (bat->cs.st == ST_FOR) {
2221 : /* possibly a new array is returned */
2222 1 : i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
2223 1 : bat = *batp;
2224 1 : if (!i) {
2225 0 : unlock_column(tr->store, id);
2226 0 : return LOG_ERR;
2227 : }
2228 : }
2229 :
2230 15431601 : b = temp_descriptor(bat->cs.bid);
2231 15431892 : if (b == NULL) {
2232 0 : if (i != oi)
2233 0 : GDKfree(i);
2234 0 : unlock_column(tr->store, id);
2235 0 : return LOG_ERR;
2236 : }
2237 15431892 : BUN bcnt = BATcount(b);
2238 :
2239 15431892 : if (offsets) {
2240 : // The first few might be replacements while later items might be appends.
2241 : // Handle the replacements here while leaving the appends to the code below.
2242 0 : BUN nreplacements = start_of_appends(offsets, bcnt);
2243 :
2244 0 : oid *start = Tloc(offsets, 0);
2245 0 : if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
2246 0 : bat_destroy(b);
2247 0 : if (i != oi)
2248 0 : GDKfree(i);
2249 0 : unlock_column(tr->store, id);
2250 0 : return LOG_ERR;
2251 : }
2252 :
2253 : // Replacements have been handled. The rest are appends.
2254 0 : assert(offset == oid_nil);
2255 0 : offset = bcnt;
2256 0 : cnt -= nreplacements;
2257 : }
2258 :
2259 15431892 : if (bcnt > offset){
2260 537053 : size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
2261 537053 : if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
2262 0 : bat_destroy(b);
2263 0 : if (i != oi)
2264 0 : GDKfree(i);
2265 0 : unlock_column(tr->store, id);
2266 0 : return LOG_ERR;
2267 : }
2268 536519 : cnt -= ccnt;
2269 536519 : offset += ccnt;
2270 : }
2271 15431358 : if (cnt) {
2272 14894828 : if (BATcount(b) < offset) { /* add space */
2273 10492 : BUN d = offset - BATcount(b);
2274 10492 : if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
2275 0 : bat_destroy(b);
2276 0 : if (i != oi)
2277 0 : GDKfree(i);
2278 0 : unlock_column(tr->store, id);
2279 0 : return LOG_ERR;
2280 : }
2281 : }
2282 14894828 : if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
2283 0 : bat_destroy(b);
2284 0 : if (i != oi)
2285 0 : GDKfree(i);
2286 0 : unlock_column(tr->store, id);
2287 0 : return LOG_ERR;
2288 : }
2289 : }
2290 15431266 : bat_destroy(b);
2291 15431933 : if (i != oi)
2292 4 : GDKfree(i);
2293 15431933 : unlock_column(tr->store, id);
2294 15431933 : return LOG_OK;
2295 : }
2296 :
2297 : static int
2298 25944 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
2299 : {
2300 25944 : if (!(bat->segs = new_segments(tr, 0)))
2301 : return LOG_ERR;
2302 25944 : return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
2303 : }
2304 :
2305 : static int
2306 15582384 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type)
2307 : {
2308 15582384 : int ok = LOG_OK;
2309 :
2310 15582384 : if ((*delta)->cs.merged)
2311 36742 : (*delta)->cs.merged = false; /* TODO needs to move */
2312 15582384 : if (isbat) {
2313 150471 : BAT *bat = incoming_data;
2314 :
2315 150471 : if (BATcount(bat))
2316 150471 : ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
2317 : } else {
2318 15431913 : ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
2319 : }
2320 15582237 : return ok;
2321 : }
2322 :
2323 : static int
2324 15580875 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2325 : {
2326 15580875 : int res = LOG_OK;
2327 15580875 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2328 :
2329 15580875 : if (isbat) {
2330 150282 : BAT *t = data;
2331 150282 : if (!BATcount(t))
2332 : return LOG_OK;
2333 : }
2334 :
2335 15580081 : if ((delta = bind_col_data(tr, c, NULL)) == NULL)
2336 : return LOG_ERR;
2337 :
2338 15580161 : assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
2339 :
2340 15580161 : odelta = delta;
2341 15580161 : if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type)) != LOG_OK)
2342 : return res;
2343 15580064 : if (odelta != delta) {
2344 0 : delta->next = odelta;
2345 0 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
2346 0 : delta->next = NULL;
2347 0 : destroy_delta(delta, false);
2348 0 : return LOG_CONFLICT;
2349 : }
2350 : }
2351 15580064 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2352 1 : res = sql_trans_alter_storage(tr, c, NULL);
2353 : return res;
2354 : }
2355 :
2356 : static int
2357 2184 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2358 : {
2359 2184 : int res = LOG_OK;
2360 2184 : sql_delta *delta;
2361 :
2362 2184 : if (isbat) {
2363 1010 : BAT *t = data;
2364 1010 : if (!BATcount(t))
2365 : return LOG_OK;
2366 : }
2367 :
2368 2172 : if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
2369 : return LOG_ERR;
2370 :
2371 2172 : assert(delta->cs.st == ST_DEFAULT);
2372 :
2373 2172 : res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL);
2374 2172 : return res;
2375 : }
2376 :
2377 : static int
2378 71104 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
2379 : {
2380 71104 : int err = 0;
2381 :
2382 : /* TODO check for conflicting updates */
2383 71104 : (void)rid;
2384 71104 : (void)cnt;
2385 536794 : for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
2386 465690 : sql_column *c = n->data;
2387 465690 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
2388 :
2389 : /* check for active updates */
2390 465690 : if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
2391 : return 1;
2392 : }
2393 : return 0;
2394 : }
2395 :
2396 : static int
2397 67597 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
2398 : {
2399 67597 : int in_transaction = segments_in_transaction(tr, t);
2400 :
2401 67597 : lock_table(tr->store, t->base.id);
2402 : /* find segment of rid, split, mark new segment deleted (for tr->tid) */
2403 67597 : segment *seg = s->segs->h, *p = NULL;
2404 51386581 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2405 51386581 : if (seg->start <= rid && seg->end > rid) {
2406 67597 : if (!SEG_VALID_4_DELETE(seg,tr)) {
2407 4 : unlock_table(tr->store, t->base.id);
2408 4 : return LOG_CONFLICT;
2409 : }
2410 67593 : if (deletes_conflict_updates( tr, t, rid, 1)) {
2411 0 : unlock_table(tr->store, t->base.id);
2412 0 : return LOG_CONFLICT;
2413 : }
2414 67593 : if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
2415 0 : unlock_table(tr->store, t->base.id);
2416 0 : return LOG_ERR;
2417 : }
2418 : break;
2419 : }
2420 : }
2421 67593 : unlock_table(tr->store, t->base.id);
2422 67593 : if (!in_transaction)
2423 12794 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2424 : return LOG_OK;
2425 : }
2426 :
2427 : static int
2428 3509 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
2429 : {
2430 3509 : segment *seg = *Seg, *p = NULL;
2431 11789 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2432 11787 : if (seg->start <= start && seg->end > start) {
2433 3559 : size_t lcnt = cnt;
2434 3559 : if (start+lcnt > seg->end)
2435 54 : lcnt = seg->end-start;
2436 3559 : if (SEG_IS_DELETED(seg, tr)) {
2437 47 : start += lcnt;
2438 47 : cnt -= lcnt;
2439 47 : continue;
2440 3512 : } else if (!SEG_VALID_4_DELETE(seg, tr))
2441 1 : return LOG_CONFLICT;
2442 3511 : if (deletes_conflict_updates( tr, t, start, lcnt))
2443 : return LOG_CONFLICT;
2444 3511 : *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
2445 3511 : if (!seg)
2446 : return LOG_ERR;
2447 3511 : start += lcnt;
2448 3511 : cnt -= lcnt;
2449 : }
2450 11739 : if (start+cnt <= seg->end)
2451 : break;
2452 : }
2453 : return LOG_OK;
2454 : }
2455 :
2456 : static int
2457 646 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
2458 : {
2459 646 : segment *seg = s->segs->h;
2460 646 : return seg_delete_range(tr, t, s, &seg, start, cnt);
2461 : }
2462 :
2463 : static int
2464 300 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
2465 : {
2466 300 : int in_transaction = segments_in_transaction(tr, t);
2467 300 : BAT *oi = i; /* update ids */
2468 300 : int ok = LOG_OK;
2469 :
2470 300 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
2471 : return LOG_ERR;
2472 300 : if (BATcount(i)) {
2473 535 : if (BATtdense(i)) {
2474 235 : size_t start = i->tseqbase;
2475 235 : size_t cnt = BATcount(i);
2476 :
2477 235 : lock_table(tr->store, t->base.id);
2478 235 : ok = delete_range(tr, t, s, start, cnt);
2479 235 : unlock_table(tr->store, t->base.id);
2480 65 : } else if (complex_cand(i)) {
2481 0 : struct canditer ci;
2482 0 : oid f = 0, l = 0, cur = 0;
2483 :
2484 0 : canditer_init(&ci, NULL, i);
2485 0 : cur = f = canditer_next(&ci);
2486 :
2487 0 : lock_table(tr->store, t->base.id);
2488 0 : if (!is_oid_nil(f)) {
2489 0 : segment *seg = s->segs->h;
2490 0 : for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
2491 0 : if (cur+1 == l) {
2492 0 : cur++;
2493 0 : continue;
2494 : }
2495 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2496 0 : f = cur = l;
2497 : }
2498 0 : if (ok == LOG_OK)
2499 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2500 : }
2501 0 : unlock_table(tr->store, t->base.id);
2502 : } else {
2503 65 : if (!i->tsorted) {
2504 0 : assert(oi == i);
2505 0 : BAT *ni = NULL;
2506 0 : if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
2507 0 : ok = LOG_ERR;
2508 0 : if (ni)
2509 0 : i = ni;
2510 : }
2511 65 : assert(i->tsorted);
2512 65 : BUN icnt = BATcount(i);
2513 65 : BATiter ii = bat_iterator(i);
2514 65 : oid *o = ii.base, n = o[0]+1;
2515 65 : size_t lcnt = 1;
2516 :
2517 65 : lock_table(tr->store, t->base.id);
2518 65 : segment *seg = s->segs->h;
2519 22227 : for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
2520 22162 : if (o[i] == n) {
2521 21807 : lcnt++;
2522 21807 : n++;
2523 : } else {
2524 355 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2525 355 : lcnt = 0;
2526 : }
2527 22162 : if (!lcnt) {
2528 355 : n = o[i]+1;
2529 355 : lcnt = 1;
2530 : }
2531 : }
2532 65 : bat_iterator_end(&ii);
2533 65 : if (lcnt && ok == LOG_OK)
2534 65 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2535 65 : unlock_table(tr->store, t->base.id);
2536 : }
2537 : }
2538 300 : if (i != oi)
2539 25 : bat_destroy(i);
2540 : // assert
2541 300 : if (!in_transaction)
2542 268 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2543 : return ok;
2544 : }
2545 :
2546 : static void
2547 51253 : destroy_segments(segments *s)
2548 : {
2549 51253 : if (!s || sql_ref_dec(&s->r) > 0)
2550 0 : return;
2551 51253 : segment *seg = s->h;
2552 110415 : while(seg) {
2553 59162 : segment *n = ATOMIC_PTR_GET(&seg->next);
2554 59162 : ATOMIC_PTR_DESTROY(&seg->next);
2555 59162 : _DELETE(seg);
2556 59162 : seg = n;
2557 : }
2558 51253 : _DELETE(s);
2559 : }
2560 :
2561 : static void
2562 52650 : destroy_storage(storage *bat)
2563 : {
2564 52650 : if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
2565 : return;
2566 51111 : if (bat->next)
2567 14144 : destroy_storage(bat->next);
2568 51111 : destroy_segments(bat->segs);
2569 51111 : if (bat->cs.uibid)
2570 30267 : temp_destroy(bat->cs.uibid);
2571 51111 : if (bat->cs.uvbid)
2572 30267 : temp_destroy(bat->cs.uvbid);
2573 51111 : if (bat->cs.bid)
2574 51111 : temp_destroy(bat->cs.bid);
2575 51111 : bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
2576 51111 : _DELETE(bat);
2577 : }
2578 :
2579 : static int
2580 172679 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
2581 : {
2582 172679 : if (uncommitted) {
2583 438772 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2584 287574 : if (!VALID_4_READ(s->ts,tr))
2585 : return 1;
2586 : } else {
2587 165098 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2588 144747 : if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
2589 : return 1;
2590 : }
2591 :
2592 : return 0;
2593 : }
2594 :
2595 : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
2596 :
2597 : storage *
2598 2188548 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
2599 : {
2600 2188548 : storage *obat;
2601 :
2602 2188548 : obat = ATOMIC_PTR_GET(&t->data);
2603 :
2604 2188548 : if (obat->cs.ts != tr->tid)
2605 1512854 : if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
2606 1512799 : if (obat->cs.ts >= TRANSACTION_ID_BASE) {
2607 : /* abort */
2608 15616 : if (clear)
2609 15616 : *clear = true;
2610 15616 : return NULL;
2611 : }
2612 :
2613 2172932 : if (!clear)
2614 : return obat;
2615 :
2616 : /* remainder is only to handle clear */
2617 26158 : if (segments_conflict(tr, obat->segs, 1)) {
2618 215 : *clear = true;
2619 215 : return NULL;
2620 : }
2621 25944 : if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
2622 : return NULL;
2623 25944 : storage *bat = ZNEW(storage);
2624 25944 : if (!bat)
2625 : return NULL;
2626 25944 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2627 25944 : if (dup_storage(tr, obat, bat) != LOG_OK) {
2628 0 : destroy_storage(bat);
2629 0 : return NULL;
2630 : }
2631 25944 : bat->cs.cleared = true;
2632 25944 : bat->cs.ts = tr->tid;
2633 : /* only one writer else abort */
2634 25944 : bat->next = obat;
2635 25944 : if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
2636 5 : bat->next = NULL;
2637 5 : destroy_storage(bat);
2638 5 : if (clear)
2639 5 : *clear = true;
2640 5 : return NULL;
2641 : }
2642 : return bat;
2643 : }
2644 :
2645 : static int
2646 67946 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
2647 : {
2648 67946 : int ok = LOG_OK;
2649 67946 : BAT *b = ib;
2650 67946 : storage *bat;
2651 :
2652 67946 : if (isbat && !BATcount(b))
2653 : return ok;
2654 :
2655 67897 : if (t == NULL)
2656 : return LOG_ERR;
2657 :
2658 67897 : if ((bat = bind_del_data(tr, t, NULL)) == NULL)
2659 : return LOG_ERR;
2660 :
2661 67897 : if (isbat)
2662 300 : ok = storage_delete_bat(tr, t, bat, ib);
2663 : else
2664 67597 : ok = storage_delete_val(tr, t, bat, *(oid*)ib);
2665 : return ok;
2666 : }
2667 :
2668 : static size_t
2669 0 : dcount_col(sql_trans *tr, sql_column *c)
2670 : {
2671 0 : sql_delta *b;
2672 :
2673 0 : if (!isTable(c->t))
2674 : return 0;
2675 0 : b = col_timestamp_delta(tr, c);
2676 0 : if (!b)
2677 : return 1;
2678 :
2679 0 : storage *s = ATOMIC_PTR_GET(&c->t->data);
2680 0 : if (!s || !s->segs->t)
2681 : return 1;
2682 0 : size_t cnt = s->segs->t->end;
2683 0 : if (cnt) {
2684 0 : BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
2685 0 : size_t dcnt = 0;
2686 :
2687 0 : if (v)
2688 0 : dcnt = BATguess_uniques(v, NULL);
2689 0 : return dcnt;
2690 : }
2691 : return cnt;
2692 : }
2693 :
2694 : static BAT *
2695 3812678 : bind_no_view(BAT *b, bool quick)
2696 : {
2697 3812678 : if (VIEWtparent(b)) { /* If it is a view get the parent BAT */
2698 3808966 : BAT *nb = BBP_desc(VIEWtparent(b));
2699 3808966 : bat_destroy(b);
2700 3808946 : if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
2701 : return NULL;
2702 : }
2703 : return b;
2704 : }
2705 :
2706 : static int
2707 0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
2708 : {
2709 0 : int ok = 0;
2710 0 : assert(tr->active);
2711 0 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2712 0 : return 0;
2713 0 : lock_column(tr->store, c->base.id);
2714 0 : if (unique_est) {
2715 0 : sql_delta *d;
2716 0 : if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
2717 0 : BAT *b;
2718 0 : if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
2719 0 : MT_lock_set(&b->theaplock);
2720 0 : b->tunique_est = *unique_est;
2721 0 : MT_lock_unset(&b->theaplock);
2722 0 : bat_destroy(b);
2723 : }
2724 : }
2725 : }
2726 0 : if (min) {
2727 0 : _DELETE(c->min);
2728 0 : size_t minlen = ATOMlen(c->type.type->localtype, min);
2729 0 : if ((c->min = GDKmalloc(minlen)) != NULL) {
2730 0 : memcpy(c->min, min, minlen);
2731 0 : ok = 1;
2732 : }
2733 : }
2734 0 : if (max) {
2735 0 : _DELETE(c->max);
2736 0 : size_t maxlen = ATOMlen(c->type.type->localtype, max);
2737 0 : if ((c->max = GDKmalloc(maxlen)) != NULL) {
2738 0 : memcpy(c->max, max, maxlen);
2739 0 : ok = 1;
2740 : }
2741 : }
2742 0 : unlock_column(tr->store, c->base.id);
2743 0 : return ok;
2744 : }
2745 :
2746 : static int
2747 19 : min_max_col(sql_trans *tr, sql_column *c)
2748 : {
2749 19 : int ok = 0;
2750 19 : BAT *b = NULL;
2751 19 : sql_delta *d = NULL;
2752 :
2753 19 : assert(tr->active);
2754 19 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2755 0 : return 0;
2756 19 : if (c->min && c->max)
2757 : return 1;
2758 19 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2759 19 : if (d->cs.st == ST_FOR)
2760 : return 0;
2761 19 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2762 19 : lock_column(tr->store, c->base.id);
2763 19 : if (c->min && c->max) {
2764 0 : unlock_column(tr->store, c->base.id);
2765 0 : return 1;
2766 : }
2767 19 : _DELETE(c->min);
2768 19 : _DELETE(c->max);
2769 19 : if ((b = bind_col(tr, c, access))) {
2770 19 : if (!(b = bind_no_view(b, false))) {
2771 0 : unlock_column(tr->store, c->base.id);
2772 0 : return 0;
2773 : }
2774 19 : BATiter bi = bat_iterator(b);
2775 19 : if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
2776 16 : const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
2777 16 : size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
2778 :
2779 16 : if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
2780 0 : _DELETE(c->min);
2781 0 : _DELETE(c->max);
2782 : } else {
2783 16 : memcpy(c->min, nmin, minlen);
2784 16 : memcpy(c->max, nmax, maxlen);
2785 16 : ok = 1;
2786 : }
2787 : }
2788 19 : bat_iterator_end(&bi);
2789 19 : bat_destroy(b);
2790 : }
2791 19 : unlock_column(tr->store, c->base.id);
2792 : }
2793 : return ok;
2794 : }
2795 :
2796 : static size_t
2797 17 : count_segs(segment *s)
2798 : {
2799 17 : size_t nr = 0;
2800 :
2801 72 : for( ; s; s = ATOMIC_PTR_GET(&s->next))
2802 55 : nr++;
2803 17 : return nr;
2804 : }
2805 :
2806 : static size_t
2807 34 : count_del(sql_trans *tr, sql_table *t, int access)
2808 : {
2809 34 : storage *d;
2810 :
2811 34 : if (!isTable(t))
2812 : return 0;
2813 34 : d = tab_timestamp_storage(tr, t);
2814 34 : if (!d)
2815 : return 0;
2816 34 : if (access == 2)
2817 0 : return d->cs.ucnt;
2818 34 : if (access == 1)
2819 0 : return count_inserts(d->segs->h, tr);
2820 34 : if (access == 10) /* special case for counting the number of segments */
2821 17 : return count_segs(d->segs->h);
2822 17 : return count_deletes(d->segs->h, tr);
2823 : }
2824 :
2825 : static int
2826 21664 : sorted_col(sql_trans *tr, sql_column *col)
2827 : {
2828 21664 : int sorted = 0;
2829 :
2830 21664 : assert(tr->active);
2831 21664 : if (!isTable(col->t) || !col->t->s)
2832 : return 0;
2833 :
2834 21664 : if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
2835 21645 : BAT *b = bind_col(tr, col, QUICK);
2836 :
2837 21645 : if (b)
2838 21645 : sorted = b->tsorted || b->trevsorted;
2839 : }
2840 : return sorted;
2841 : }
2842 :
2843 : static int
2844 7815 : unique_col(sql_trans *tr, sql_column *col)
2845 : {
2846 7815 : int distinct = 0;
2847 :
2848 7815 : assert(tr->active);
2849 7815 : if (!isTable(col->t) || !col->t->s)
2850 : return 0;
2851 :
2852 7815 : if (col && ATOMIC_PTR_GET(&col->data)) {
2853 7815 : BAT *b = bind_col(tr, col, QUICK);
2854 :
2855 7815 : if (b)
2856 7815 : distinct = b->tkey;
2857 : }
2858 : return distinct;
2859 : }
2860 :
2861 : static int
2862 2049 : double_elim_col(sql_trans *tr, sql_column *col)
2863 : {
2864 2049 : int de = 0;
2865 2049 : sql_delta *d;
2866 :
2867 2049 : assert(tr->active);
2868 2049 : if (!isTable(col->t) || !col->t->s)
2869 : return 0;
2870 :
2871 2049 : if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
2872 6 : if (d->cs.st == ST_DICT) {
2873 6 : BAT *b = bind_col(tr, col, QUICK);
2874 6 : if (b && b->ttype == TYPE_bte)
2875 : de = 1;
2876 0 : else if (b && b->ttype == TYPE_sht)
2877 2049 : de = 2;
2878 : }
2879 2043 : } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
2880 2043 : BAT *b = bind_col(tr, col, QUICK);
2881 :
2882 2043 : if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
2883 2043 : de = GDK_ELIMDOUBLES(b->tvheap);
2884 2043 : if (de)
2885 1813 : de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
2886 : }
2887 1813 : assert(de >= 0 && de <= 16);
2888 : }
2889 : return de;
2890 : }
2891 :
2892 : static int
2893 3849075 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
2894 : {
2895 3849075 : int ok = 0;
2896 3849075 : BAT *b = NULL, *off = NULL, *upv = NULL;
2897 3849075 : sql_delta *d = NULL;
2898 :
2899 3849075 : (void) tr;
2900 3849075 : assert(tr->active);
2901 3849075 : *nonil = false;
2902 3849075 : *unique = false;
2903 3849075 : *unique_est = 0.0;
2904 3849075 : if (!c || !isTable(c->t) || !c->t->s)
2905 : return ok;
2906 :
2907 3848837 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2908 3809636 : if (d->cs.st == ST_FOR) {
2909 27 : *nonil = true; /* TODO for min/max. I will do it later */
2910 27 : return ok;
2911 : }
2912 3809609 : int eclass = c->type.type->eclass;
2913 3809609 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2914 3809609 : if ((b = bind_col(tr, c, access))) {
2915 3809652 : if (!(b = bind_no_view(b, false)))
2916 0 : return ok;
2917 3809596 : BATiter bi = bat_iterator(b);
2918 3809665 : *nonil = bi.nonil && !bi.nil;
2919 :
2920 3809665 : if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
2921 3506411 : d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
2922 2273793 : if (c->min && VALinit(min, bi.type, c->min))
2923 : ok |= 1;
2924 2273739 : else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
2925 2264571 : ok |= 1;
2926 2273799 : if (c->max && VALinit(max, bi.type, c->max))
2927 54 : ok |= 2;
2928 2273747 : else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
2929 2258382 : ok |= 2;
2930 : }
2931 3809658 : if (d->cs.ucnt == 0) {
2932 3806580 : if (d->cs.st == ST_DEFAULT) {
2933 3805880 : *unique = bi.key;
2934 3805880 : *unique_est = bi.unique_est;
2935 3805880 : if (*unique_est == 0)
2936 1227932 : *unique_est = (double)BATguess_uniques(b,NULL);
2937 700 : } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
2938 : /* for dict, check the offsets bat for uniqueness */
2939 700 : MT_lock_set(&off->theaplock);
2940 700 : *unique = off->tkey;
2941 700 : *unique_est = off->tunique_est;
2942 700 : MT_lock_unset(&off->theaplock);
2943 : }
2944 : }
2945 3809662 : bat_iterator_end(&bi);
2946 3809660 : bat_destroy(b);
2947 3809627 : if (*nonil && d->cs.ucnt > 0) {
2948 : /* This could use a quick descriptor */
2949 2312 : if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
2950 0 : *nonil = false;
2951 : } else {
2952 2312 : MT_lock_set(&upv->theaplock);
2953 2312 : *nonil &= upv->tnonil && !upv->tnil;
2954 2312 : MT_lock_unset(&upv->theaplock);
2955 2312 : bat_destroy(upv);
2956 : }
2957 : }
2958 : }
2959 : }
2960 : return ok;
2961 : }
2962 :
2963 : static int
2964 257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
2965 : {
2966 257 : assert(tr->active);
2967 257 : if (!isTable(col->t) || !col->t->s)
2968 : return LOG_OK;
2969 :
2970 252 : if (col && ATOMIC_PTR_GET(&col->data)) {
2971 252 : BAT *b = bind_col(tr, col, QUICK);
2972 :
2973 252 : if (b) { /* add props for ranges [min, max> */
2974 252 : MT_lock_set(&b->theaplock);
2975 252 : if (add_range) {
2976 179 : BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
2977 179 : if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
2978 103 : BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
2979 : else
2980 76 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2981 179 : if (!pt->with_nills || !col->null)
2982 117 : BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
2983 : } else {
2984 73 : BATrmprop_nolock(b, GDK_MIN_BOUND);
2985 73 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2986 73 : BATrmprop_nolock(b, GDK_NOT_NULL);
2987 : }
2988 252 : MT_lock_unset(&b->theaplock);
2989 : }
2990 : }
2991 : return LOG_OK;
2992 : }
2993 :
2994 : static int
2995 4041 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
2996 : {
2997 4041 : assert(tr->active);
2998 4041 : if (!isTable(col->t) || !col->t->s)
2999 : return LOG_OK;
3000 :
3001 4011 : if (col && ATOMIC_PTR_GET(&col->data)) {
3002 4011 : BAT *b = bind_col(tr, col, QUICK);
3003 :
3004 4011 : if (b) { /* add props for ranges [min, max> */
3005 4011 : if (not_null) {
3006 4009 : BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
3007 : } else {
3008 2 : BATrmprop(b, GDK_NOT_NULL);
3009 : }
3010 : }
3011 : }
3012 : return LOG_OK;
3013 : }
3014 :
3015 : static int
3016 27184 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
3017 : {
3018 27184 : sqlstore *store = tr->store;
3019 27184 : int bid = log_find_bat(store->logger, id);
3020 27184 : if (bid <= 0)
3021 : return LOG_ERR;
3022 27184 : cs->bid = temp_dup(bid);
3023 27184 : cs->ucnt = 0;
3024 27184 : cs->uibid = e_bat(TYPE_oid);
3025 27184 : cs->uvbid = e_bat(type);
3026 27184 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
3027 : return LOG_ERR;
3028 : return LOG_OK;
3029 : }
3030 :
3031 : static int
3032 68261 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
3033 : {
3034 68261 : int res = LOG_OK;
3035 68261 : gdk_return ok;
3036 68261 : BAT *b = temp_descriptor(bat->cs.bid);
3037 :
3038 68261 : if (b == NULL)
3039 : return LOG_ERR;
3040 :
3041 68261 : if (!bat->cs.uibid)
3042 68253 : bat->cs.uibid = e_bat(TYPE_oid);
3043 68261 : if (!bat->cs.uvbid)
3044 68253 : bat->cs.uvbid = e_bat(b->ttype);
3045 68261 : if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
3046 0 : res = LOG_ERR;
3047 68261 : if (GDKinmemory(0)) {
3048 177 : bat_destroy(b);
3049 177 : return res;
3050 : }
3051 :
3052 68084 : bat_set_access(b, BAT_READ);
3053 68084 : sqlstore *store = tr->store;
3054 68084 : ok = log_bat_persists(store->logger, b, id);
3055 68084 : bat_destroy(b);
3056 68084 : if(res != LOG_OK)
3057 : return res;
3058 68084 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3059 : }
3060 :
3061 : static int
3062 0 : new_persistent_delta( sql_delta *bat)
3063 : {
3064 0 : bat->cs.ucnt = 0;
3065 0 : return LOG_OK;
3066 : }
3067 :
3068 : static void
3069 129861 : create_delta( sql_delta *d, BAT *b)
3070 : {
3071 129861 : bat_set_access(b, BAT_READ);
3072 129861 : d->cs.bid = temp_create(b);
3073 129861 : d->cs.uibid = d->cs.uvbid = 0;
3074 129861 : d->cs.ucnt = 0;
3075 129861 : }
3076 :
3077 : static bat
3078 7293 : copyBat (bat i, int type, oid seq)
3079 : {
3080 7293 : BAT *b, *tb;
3081 7293 : bat res;
3082 :
3083 7293 : if (!i)
3084 : return i;
3085 7293 : tb = quick_descriptor(i);
3086 7293 : if (tb == NULL)
3087 : return 0;
3088 7293 : b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
3089 7293 : if (b == NULL)
3090 : return 0;
3091 :
3092 7293 : bat_set_access(b, BAT_READ);
3093 :
3094 7293 : res = temp_create(b);
3095 7293 : bat_destroy(b);
3096 7293 : return res;
3097 : }
3098 :
3099 : static int
3100 150685 : create_col(sql_trans *tr, sql_column *c)
3101 : {
3102 150685 : int ok = LOG_OK, new = 0;
3103 150685 : int type = c->type.type->localtype;
3104 150685 : sql_delta *bat = ATOMIC_PTR_GET(&c->data);
3105 :
3106 150685 : if (!bat) {
3107 150685 : new = 1;
3108 150685 : bat = ZNEW(sql_delta);
3109 150685 : if (!bat)
3110 : return LOG_ERR;
3111 150685 : ATOMIC_PTR_SET(&c->data, bat);
3112 150685 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3113 : }
3114 :
3115 150685 : if (new)
3116 150685 : bat->cs.ts = tr->tid;
3117 :
3118 150685 : if (!isNew(c)&& !isTempTable(c->t)){
3119 20802 : bat->cs.ts = tr->ts;
3120 20802 : ok = load_cs(tr, &bat->cs, type, c->base.id);
3121 20802 : if (ok == LOG_OK && c->storage_type) {
3122 4 : if (strcmp(c->storage_type, "DICT") == 0) {
3123 2 : sqlstore *store = tr->store;
3124 2 : int bid = log_find_bat(store->logger, -c->base.id);
3125 2 : if (bid <= 0)
3126 : return LOG_ERR;
3127 2 : bat->cs.ebid = temp_dup(bid);
3128 2 : bat->cs.st = ST_DICT;
3129 2 : } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
3130 2 : bat->cs.st = ST_FOR;
3131 : }
3132 : }
3133 20802 : return ok;
3134 129883 : } else if (bat && bat->cs.bid) {
3135 0 : return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
3136 : } else {
3137 129883 : sql_column *fc = NULL;
3138 129883 : size_t cnt = 0;
3139 :
3140 : /* alter ? */
3141 129883 : if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
3142 79824 : storage *s = tab_timestamp_storage(tr, fc->t);
3143 79824 : if (s == NULL)
3144 : return LOG_ERR;
3145 79824 : cnt = segs_end(s->segs, tr, c->t);
3146 : }
3147 129883 : if (cnt && fc != c) {
3148 22 : sql_delta *d = ATOMIC_PTR_GET(&fc->data);
3149 :
3150 22 : if (d->cs.bid) {
3151 22 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3152 22 : if(bat->cs.bid == BID_NIL)
3153 22 : ok = LOG_ERR;
3154 : }
3155 22 : if (d->cs.uibid) {
3156 10 : bat->cs.uibid = e_bat(TYPE_oid);
3157 10 : if (bat->cs.uibid == BID_NIL)
3158 22 : ok = LOG_ERR;
3159 : }
3160 22 : if (d->cs.uvbid) {
3161 10 : bat->cs.uvbid = e_bat(type);
3162 10 : if(bat->cs.uvbid == BID_NIL)
3163 0 : ok = LOG_ERR;
3164 : }
3165 : } else {
3166 129861 : BAT *b = bat_new(type, c->t->sz, PERSISTENT);
3167 129861 : if (!b) {
3168 : ok = LOG_ERR;
3169 : } else {
3170 129861 : create_delta(ATOMIC_PTR_GET(&c->data), b);
3171 129861 : bat_destroy(b);
3172 : }
3173 :
3174 129861 : if (!new) {
3175 0 : bat->cs.uibid = e_bat(TYPE_oid);
3176 0 : if (bat->cs.uibid == BID_NIL)
3177 0 : ok = LOG_ERR;
3178 0 : bat->cs.uvbid = e_bat(type);
3179 0 : if(bat->cs.uvbid == BID_NIL)
3180 0 : ok = LOG_ERR;
3181 : }
3182 : }
3183 129883 : bat->cs.ucnt = 0;
3184 :
3185 129883 : if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
3186 91 : trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
3187 : }
3188 : return ok;
3189 : }
3190 :
3191 : static int
3192 61893 : log_create_col_(sql_trans *tr, sql_column *c)
3193 : {
3194 61893 : assert(!isTempTable(c->t));
3195 61893 : return log_create_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3196 : }
3197 :
3198 : static int
3199 87 : log_create_col(sql_trans *tr, sql_change *change)
3200 : {
3201 87 : return log_create_col_(tr, (sql_column*)change->obj);
3202 : }
3203 :
3204 : static int
3205 117254 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
3206 : {
3207 117254 : (void) t; // TODO transaction_layer_revamp: remove if unnecessary
3208 117254 : (void)oldest;
3209 117254 : assert(delta->cs.ts == tr->tid);
3210 117254 : delta->cs.ts = commit_ts;
3211 :
3212 117254 : assert(delta->next == NULL);
3213 117254 : if (!delta->cs.merged)
3214 117253 : delta->nr_updates += merge_delta(delta);
3215 117254 : if (!tr->parent)
3216 117250 : base->new = 0;
3217 117254 : return LOG_OK;
3218 : }
3219 :
3220 : static int
3221 91 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3222 : {
3223 91 : sql_column *c = (sql_column*)change->obj;
3224 91 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3225 91 : if (!tr->parent)
3226 90 : c->base.new = 0;
3227 91 : return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
3228 : }
3229 :
3230 : /* will be called for new idx's and when new index columns are created */
3231 : static int
3232 9296 : create_idx(sql_trans *tr, sql_idx *ni)
3233 : {
3234 9296 : int ok = LOG_OK, new = 0;
3235 9296 : sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
3236 9296 : int type = TYPE_lng;
3237 :
3238 9296 : if (oid_index(ni->type))
3239 954 : type = TYPE_oid;
3240 :
3241 9296 : if (!bat) {
3242 9296 : new = 1;
3243 9296 : bat = ZNEW(sql_delta);
3244 9296 : if (!bat)
3245 : return LOG_ERR;
3246 9296 : ATOMIC_PTR_INIT(&ni->data, bat);
3247 9296 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3248 : }
3249 :
3250 9296 : if (new)
3251 9296 : bat->cs.ts = tr->tid;
3252 :
3253 9296 : if (!isNew(ni) && !isTempTable(ni->t)){
3254 2025 : bat->cs.ts = 1;
3255 2025 : return load_cs(tr, &bat->cs, type, ni->base.id);
3256 7271 : } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
3257 0 : return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
3258 : } else {
3259 7271 : sql_column *c = ol_first_node(ni->t->columns)->data;
3260 7271 : sql_delta *d = col_timestamp_delta(tr, c);
3261 :
3262 7271 : if (d) {
3263 : /* Here we also handle indices created through alter stmts */
3264 : /* These need to be created aligned to the existing data */
3265 7271 : if (d->cs.bid) {
3266 7271 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3267 7271 : if(bat->cs.bid == BID_NIL)
3268 7271 : ok = LOG_ERR;
3269 : }
3270 : } else {
3271 : return LOG_ERR;
3272 : }
3273 :
3274 7271 : bat->cs.ucnt = 0;
3275 :
3276 7271 : if (!new) {
3277 0 : bat->cs.uibid = e_bat(TYPE_oid);
3278 0 : if (bat->cs.uibid == BID_NIL)
3279 0 : ok = LOG_ERR;
3280 0 : bat->cs.uvbid = e_bat(type);
3281 0 : if(bat->cs.uvbid == BID_NIL)
3282 0 : ok = LOG_ERR;
3283 : }
3284 7271 : bat->cs.ucnt = 0;
3285 7271 : if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
3286 631 : trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
3287 : }
3288 : return ok;
3289 : }
3290 :
3291 : static int
3292 6368 : log_create_idx_(sql_trans *tr, sql_idx *i)
3293 : {
3294 6368 : assert(!isTempTable(i->t));
3295 6368 : return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3296 : }
3297 :
3298 : static int
3299 617 : log_create_idx(sql_trans *tr, sql_change *change)
3300 : {
3301 617 : return log_create_idx_(tr, (sql_idx*)change->obj);
3302 : }
3303 :
3304 : static int
3305 631 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3306 : {
3307 631 : sql_idx *i = (sql_idx*)change->obj;
3308 631 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3309 631 : if (!tr->parent)
3310 631 : i->base.new = 0;
3311 631 : return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
3312 : return LOG_OK;
3313 : }
3314 :
3315 : static int
3316 4357 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
3317 : {
3318 4357 : int ok = load_cs(tr, &s->cs, TYPE_msk, id);
3319 4357 : BAT *b = NULL, *ib = NULL;
3320 :
3321 4357 : if (ok != LOG_OK)
3322 : return ok;
3323 4357 : if (!(b = temp_descriptor(s->cs.bid)))
3324 : return LOG_ERR;
3325 4357 : ib = b;
3326 :
3327 4357 : if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
3328 0 : bat_destroy(ib);
3329 0 : return LOG_ERR;
3330 : }
3331 :
3332 4357 : if (BATcount(b)) {
3333 360 : if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
3334 0 : bat_destroy(ib);
3335 0 : return LOG_ERR;
3336 : }
3337 549 : if (BATtdense(b)) {
3338 189 : size_t start = b->tseqbase;
3339 189 : size_t cnt = BATcount(b);
3340 189 : ok = delete_range(tr, t, s, start, cnt);
3341 : } else {
3342 171 : assert(b->tsorted);
3343 171 : BUN icnt = BATcount(b);
3344 171 : BATiter bi = bat_iterator(b);
3345 171 : size_t lcnt = 1;
3346 171 : oid n;
3347 171 : segment *seg = s->segs->h;
3348 171 : if (complex_cand(b)) {
3349 0 : oid o = * (oid *) Tpos(&bi, 0);
3350 0 : n = o + 1;
3351 0 : for (BUN i = 1; i < icnt; i++) {
3352 0 : o = * (oid *) Tpos(&bi, i);
3353 0 : if (o == n) {
3354 0 : lcnt++;
3355 0 : n++;
3356 : } else {
3357 0 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3358 : break;
3359 : lcnt = 0;
3360 : }
3361 0 : if (!lcnt) {
3362 0 : n = o + 1;
3363 0 : lcnt = 1;
3364 : }
3365 : }
3366 : } else {
3367 171 : oid *o = bi.base;
3368 171 : n = o[0]+1;
3369 281192 : for (size_t i=1; i<icnt; i++) {
3370 281021 : if (o[i] == n) {
3371 278578 : lcnt++;
3372 278578 : n++;
3373 : } else {
3374 2443 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3375 : break;
3376 : lcnt = 0;
3377 : }
3378 281021 : if (!lcnt) {
3379 2443 : n = o[i]+1;
3380 2443 : lcnt = 1;
3381 : }
3382 : }
3383 : }
3384 171 : if (lcnt && ok == LOG_OK)
3385 171 : ok = delete_range(tr, t, s, n-lcnt, lcnt);
3386 171 : bat_iterator_end(&bi);
3387 : }
3388 360 : if (ok == LOG_OK)
3389 6183 : for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
3390 5823 : if (seg->ts == tr->tid)
3391 3027 : seg->ts = 1;
3392 : } else {
3393 3997 : if (ok == LOG_OK) {
3394 3997 : BAT *bb = quick_descriptor(s->cs.bid);
3395 :
3396 3997 : if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
3397 : ok = LOG_ERR;
3398 : } else {
3399 3997 : segment *seg = s->segs->h;
3400 3997 : if (seg->ts == tr->tid)
3401 3997 : seg->ts = 1;
3402 : }
3403 : }
3404 : }
3405 4357 : if (b != ib)
3406 4357 : bat_destroy(b);
3407 4357 : bat_destroy(ib);
3408 :
3409 4357 : return ok;
3410 : }
3411 :
3412 : static int
3413 25207 : create_del(sql_trans *tr, sql_table *t)
3414 : {
3415 25207 : int ok = LOG_OK, new = 0;
3416 25207 : BAT *b;
3417 25207 : storage *bat = ATOMIC_PTR_GET(&t->data);
3418 :
3419 25207 : if (!bat) {
3420 25207 : new = 1;
3421 25207 : bat = ZNEW(storage);
3422 25207 : if(!bat)
3423 : return LOG_ERR;
3424 25207 : ATOMIC_PTR_INIT(&t->data, bat);
3425 25207 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3426 25207 : bat->cs.ts = tr->tid;
3427 : }
3428 :
3429 25207 : if (!isNew(t) && !isTempTable(t)) {
3430 4357 : bat->cs.ts = tr->ts;
3431 4357 : return load_storage(tr, t, bat, t->base.id);
3432 20850 : } else if (bat->cs.bid) {
3433 : return ok;
3434 : } else {
3435 20850 : assert(!bat->segs);
3436 20850 : if (!(bat->segs = new_segments(tr, 0)))
3437 : return LOG_ERR;
3438 :
3439 20850 : b = bat_new(TYPE_msk, t->sz, PERSISTENT);
3440 20850 : if(b != NULL) {
3441 20850 : bat_set_access(b, BAT_READ);
3442 20850 : bat->cs.bid = temp_create(b);
3443 20850 : bat_destroy(b);
3444 : } else {
3445 : return LOG_ERR;
3446 : }
3447 20850 : if (new)
3448 27391 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
3449 : }
3450 : return ok;
3451 : }
3452 :
3453 : static int
3454 189981 : log_segment(sql_trans *tr, segment *s, sqlid id)
3455 : {
3456 189981 : sqlstore *store = tr->store;
3457 189981 : msk m = s->deleted;
3458 189981 : return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
3459 : }
3460 :
3461 : static int
3462 96957 : log_segments(sql_trans *tr, segments *segs, sqlid id)
3463 : {
3464 : /* log segments */
3465 96957 : lock_table(tr->store, id);
3466 487632 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3467 390675 : unlock_table(tr->store, id);
3468 390675 : if (seg->ts == tr->tid && seg->end-seg->start) {
3469 143475 : if (log_segment(tr, seg, id) != LOG_OK) {
3470 : return LOG_ERR;
3471 : }
3472 : }
3473 390675 : lock_table(tr->store, id);
3474 : }
3475 96957 : unlock_table(tr->store, id);
3476 96957 : return LOG_OK;
3477 : }
3478 :
3479 : static int
3480 12546 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
3481 : {
3482 12546 : BAT *b;
3483 12546 : int ok = LOG_OK;
3484 :
3485 12546 : if (GDKinmemory(0))
3486 : return LOG_OK;
3487 :
3488 12513 : b = temp_descriptor(bat->cs.bid);
3489 12513 : if (b == NULL)
3490 : return LOG_ERR;
3491 :
3492 12513 : sqlstore *store = tr->store;
3493 12513 : bat_set_access(b, BAT_READ);
3494 12513 : if (ok == LOG_OK)
3495 12513 : ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
3496 12513 : if (ok == LOG_OK)
3497 12513 : ok = log_segments(tr, bat->segs, t->base.id);
3498 12513 : bat_destroy(b);
3499 12513 : return ok;
3500 : }
3501 :
3502 : static int
3503 12560 : log_create_del(sql_trans *tr, sql_change *change)
3504 : {
3505 12560 : int ok = LOG_OK;
3506 12560 : sql_table *t = (sql_table*)change->obj;
3507 :
3508 12560 : if (t->base.deleted)
3509 : return ok;
3510 12546 : assert(!isTempTable(t));
3511 12546 : ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
3512 12546 : if (ok == LOG_OK) {
3513 74352 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3514 61806 : sql_column *c = n->data;
3515 :
3516 61806 : ok = log_create_col_(tr, c);
3517 : }
3518 12546 : if (t->idxs) {
3519 18309 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3520 5763 : sql_idx *i = n->data;
3521 :
3522 5763 : if (ATOMIC_PTR_GET(&i->data))
3523 5751 : ok = log_create_idx_(tr, i);
3524 : }
3525 : }
3526 : }
3527 : return ok;
3528 : }
3529 :
3530 : static int
3531 20852 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3532 : {
3533 20852 : int ok = LOG_OK;
3534 20852 : sql_table *t = (sql_table*)change->obj;
3535 20852 : storage *dbat = ATOMIC_PTR_GET(&t->data);
3536 :
3537 20852 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
3538 120 : assert(isTempTable(t));
3539 120 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
3540 120 : if (commit_ts) dbat->segs->h->ts = commit_ts;
3541 120 : return ok;
3542 : }
3543 :
3544 20732 : if (!commit_ts) /* rollback handled by ? */
3545 : return ok;
3546 18878 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
3547 18878 : assert(ok == LOG_OK);
3548 18878 : if (ok != LOG_OK)
3549 : return ok;
3550 18878 : merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
3551 18878 : assert(dbat->cs.ts == tr->tid);
3552 18878 : dbat->cs.ts = commit_ts;
3553 18878 : if (ok == LOG_OK) {
3554 129646 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3555 110768 : sql_column *c = n->data;
3556 110768 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3557 :
3558 110768 : ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
3559 : }
3560 18878 : if (t->idxs) {
3561 24654 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3562 5776 : sql_idx *i = n->data;
3563 5776 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3564 :
3565 5776 : if (delta)
3566 5764 : ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
3567 : }
3568 : }
3569 18878 : if (!tr->parent)
3570 18876 : t->base.new = 0;
3571 : }
3572 18878 : if (!tr->parent)
3573 18876 : t->base.new = 0;
3574 : return ok;
3575 : }
3576 :
3577 : static int
3578 19759 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
3579 : {
3580 19759 : gdk_return ok = GDK_SUCCEED;
3581 :
3582 19759 : sqlstore *store = tr->store;
3583 19759 : if (!GDKinmemory(0) && b && b->cs.bid)
3584 18834 : ok = log_bat_transient(store->logger, id);
3585 19759 : if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
3586 25 : ok = log_bat_transient(store->logger, -id);
3587 19759 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3588 : }
3589 :
3590 : static int
3591 168421 : destroy_col(sqlstore *store, sql_column *c)
3592 : {
3593 168421 : (void)store;
3594 168421 : if (ATOMIC_PTR_GET(&c->data))
3595 168421 : destroy_delta(ATOMIC_PTR_GET(&c->data), true);
3596 168421 : ATOMIC_PTR_SET(&c->data, NULL);
3597 168421 : return LOG_OK;
3598 : }
3599 :
3600 : static int
3601 17876 : log_destroy_col_(sql_trans *tr, sql_column *c)
3602 : {
3603 17876 : int ok = LOG_OK;
3604 17876 : assert(!isTempTable(c->t));
3605 17876 : if (!tr->parent) /* don't write save point commits */
3606 17876 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3607 17876 : return ok;
3608 : }
3609 :
3610 : static int
3611 17876 : log_destroy_col(sql_trans *tr, sql_change *change)
3612 : {
3613 17876 : sql_column *c = (sql_column*)change->obj;
3614 17876 : int res = log_destroy_col_(tr, c);
3615 17876 : change->obj = NULL;
3616 17876 : column_destroy(tr->store, c);
3617 17876 : return res;
3618 : }
3619 :
3620 : static int
3621 10645 : destroy_idx(sqlstore *store, sql_idx *i)
3622 : {
3623 10645 : (void)store;
3624 10645 : if (ATOMIC_PTR_GET(&i->data))
3625 10645 : destroy_delta(ATOMIC_PTR_GET(&i->data), true);
3626 10645 : ATOMIC_PTR_SET(&i->data, NULL);
3627 10645 : return LOG_OK;
3628 : }
3629 :
3630 : static int
3631 1959 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
3632 : {
3633 1959 : int ok = LOG_OK;
3634 1959 : assert(!isTempTable(i->t));
3635 1959 : if (ATOMIC_PTR_GET(&i->data)) {
3636 1883 : if (!tr->parent) /* don't write save point commits */
3637 1883 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3638 : }
3639 1959 : return ok;
3640 : }
3641 :
3642 : static int
3643 1959 : log_destroy_idx(sql_trans *tr, sql_change *change)
3644 : {
3645 1959 : sql_idx *i = (sql_idx*)change->obj;
3646 1959 : int res = log_destroy_idx_(tr, i);
3647 1959 : change->obj = NULL;
3648 1959 : idx_destroy(tr->store, i);
3649 1959 : return res;
3650 : }
3651 :
3652 : static int
3653 26710 : destroy_del(sqlstore *store, sql_table *t)
3654 : {
3655 26710 : (void)store;
3656 26710 : if (ATOMIC_PTR_GET(&t->data))
3657 26706 : destroy_storage(ATOMIC_PTR_GET(&t->data));
3658 26710 : ATOMIC_PTR_SET(&t->data, NULL);
3659 26710 : return LOG_OK;
3660 : }
3661 :
3662 : static int
3663 3256 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
3664 : {
3665 3256 : gdk_return ok = GDK_SUCCEED;
3666 :
3667 3256 : sqlstore *store = tr->store;
3668 3256 : if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
3669 3256 : bat && bat->cs.bid)
3670 3256 : ok = log_bat_transient(store->logger, id);
3671 3256 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3672 : }
3673 :
3674 : static int
3675 3256 : log_destroy_del(sql_trans *tr, sql_change *change)
3676 : {
3677 3256 : int ok = LOG_OK;
3678 3256 : sql_table *t = (sql_table*)change->obj;
3679 :
3680 3256 : assert(!isTempTable(t));
3681 3256 : ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
3682 3256 : return ok;
3683 : }
3684 :
3685 : static int
3686 23179 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3687 : {
3688 23179 : (void)tr;
3689 23179 : (void)change;
3690 23179 : (void)commit_ts;
3691 23179 : (void)oldest;
3692 23179 : if (commit_ts)
3693 23156 : change->handled = true;
3694 23179 : return 0;
3695 : }
3696 :
3697 : static int
3698 3327 : drop_del(sql_trans *tr, sql_table *t)
3699 : {
3700 3327 : int ok = LOG_OK;
3701 :
3702 3327 : if (!isNew(t)) {
3703 3327 : storage *bat = ATOMIC_PTR_GET(&t->data);
3704 3392 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, isTempTable(t) ? NULL : &log_destroy_del);
3705 : }
3706 3327 : return ok;
3707 : }
3708 :
3709 : static int
3710 17891 : drop_col(sql_trans *tr, sql_column *c)
3711 : {
3712 17891 : assert(!isNew(c));
3713 17891 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
3714 17891 : trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, isTempTable(c->t) ? NULL : &log_destroy_col);
3715 17891 : return LOG_OK;
3716 : }
3717 :
3718 : static int
3719 1961 : drop_idx(sql_trans *tr, sql_idx *i)
3720 : {
3721 1961 : assert(!isNew(i));
3722 1961 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
3723 1961 : trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, isTempTable(i->t) ? NULL : &log_destroy_idx);
3724 1961 : return LOG_OK;
3725 : }
3726 :
3727 :
3728 : static BUN
3729 129415 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
3730 : {
3731 129415 : BAT *b;
3732 129415 : BUN sz = 0;
3733 :
3734 129415 : (void)tr;
3735 129415 : assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
3736 129415 : if (cs->bid && renew) {
3737 129417 : b = quick_descriptor(cs->bid);
3738 129412 : if (b) {
3739 129412 : sz += BATcount(b);
3740 129412 : if (cs->st == ST_DICT) {
3741 2 : bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
3742 2 : BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
3743 :
3744 2 : if (nebid == BID_NIL || !n) {
3745 0 : temp_destroy(nebid);
3746 0 : bat_destroy(n);
3747 0 : return BUN_NONE;
3748 : }
3749 2 : temp_destroy(cs->ebid);
3750 2 : cs->ebid = nebid;
3751 2 : if (!temp)
3752 2 : bat_set_access(n, BAT_READ);
3753 2 : temp_destroy(cs->bid);
3754 2 : cs->bid = temp_create(n); /* create empty copy */
3755 2 : bat_destroy(n);
3756 : } else {
3757 129410 : bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
3758 :
3759 129412 : if (nbid == BID_NIL)
3760 : return BUN_NONE;
3761 129412 : temp_destroy(cs->bid);
3762 129400 : cs->bid = nbid;
3763 : }
3764 : } else {
3765 : return BUN_NONE;
3766 : }
3767 : }
3768 129400 : if (cs->uibid) {
3769 129256 : temp_destroy(cs->uibid);
3770 129284 : cs->uibid = 0;
3771 : }
3772 129428 : if (cs->uvbid) {
3773 129284 : temp_destroy(cs->uvbid);
3774 129284 : cs->uvbid = 0;
3775 : }
3776 129428 : cs->cleared = true;
3777 129428 : cs->ucnt = 0;
3778 129428 : return sz;
3779 : }
3780 :
3781 : static BUN
3782 129274 : clear_col(sql_trans *tr, sql_column *c, bool renew)
3783 : {
3784 129274 : bool update_conflict = false;
3785 129274 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
3786 :
3787 129274 : if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
3788 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3789 129277 : assert(c->t->persistence != SQL_DECLARED_TABLE);
3790 129277 : if (odelta != delta)
3791 129282 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
3792 129267 : if (delta)
3793 129267 : return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
3794 : return 0;
3795 : }
3796 :
3797 : static BUN
3798 21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
3799 : {
3800 21 : bool update_conflict = false;
3801 21 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
3802 :
3803 21 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
3804 15 : return 0;
3805 6 : if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
3806 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3807 6 : assert(i->t->persistence != SQL_DECLARED_TABLE);
3808 6 : if (odelta != delta)
3809 6 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
3810 6 : if (delta)
3811 6 : return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
3812 : return 0;
3813 : }
3814 :
3815 : static int
3816 142 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
3817 : {
3818 142 : if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
3819 : return LOG_ERR;
3820 142 : if (s->segs)
3821 142 : destroy_segments(s->segs);
3822 142 : if (!(s->segs = new_segments(tr, 0)))
3823 : return LOG_ERR;
3824 : return LOG_OK;
3825 : }
3826 :
3827 :
3828 : /*
3829 : * Clear the table, in general this means replacing the storage,
3830 : * but in case of earlier deletes (or inserts by this transaction), we only mark
3831 : * all segments as deleted.
3832 : * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
3833 : */
3834 : static BUN
3835 41826 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
3836 : {
3837 41826 : int clear = !in_transaction, ok = LOG_OK;
3838 41826 : bool conflict = false;
3839 41826 : storage *bat;
3840 :
3841 41877 : if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
3842 15836 : return conflict?BUN_NONE-1:BUN_NONE;
3843 :
3844 25990 : if (!clear) {
3845 51 : lock_table(tr->store, t->base.id);
3846 51 : ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
3847 51 : unlock_table(tr->store, t->base.id);
3848 : }
3849 25990 : assert(t->persistence != SQL_DECLARED_TABLE);
3850 25990 : if (!in_transaction)
3851 25942 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
3852 25989 : if (ok == LOG_ERR)
3853 : return BUN_NONE;
3854 25989 : if (ok == LOG_CONFLICT)
3855 0 : return BUN_NONE - 1;
3856 : return LOG_OK;
3857 : }
3858 :
3859 : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
3860 : static BUN
3861 41825 : clear_table(sql_trans *tr, sql_table *t)
3862 : {
3863 41825 : node *n = ol_first_node(t->columns);
3864 41825 : sql_column *c = n->data;
3865 41825 : storage *d = tab_timestamp_storage(tr, t);
3866 41826 : int in_transaction, clear;
3867 41826 : BUN sz, clear_ok;
3868 :
3869 41826 : if (!d)
3870 : return BUN_NONE;
3871 41826 : in_transaction = segments_in_transaction(tr, t);
3872 41826 : clear = !in_transaction;
3873 41826 : sz = count_col(tr, c, CNT_ACTIVE);
3874 41826 : if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
3875 : return clear_ok;
3876 :
3877 25989 : if (in_transaction)
3878 : return sz;
3879 :
3880 155215 : for (; n; n = n->next) {
3881 129276 : c = n->data;
3882 :
3883 129276 : if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
3884 0 : return clear_ok;
3885 : }
3886 25939 : if (t->idxs) {
3887 25960 : for (n = ol_first_node(t->idxs); n; n = n->next) {
3888 21 : sql_idx *ci = n->data;
3889 :
3890 21 : if (isTable(ci->t) && idx_has_column(ci->type) &&
3891 21 : (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
3892 0 : return clear_ok;
3893 : }
3894 : }
3895 25939 : if (clear)
3896 25939 : d->segs->nr_reused = 0;
3897 25939 : return sz;
3898 : }
3899 :
3900 : static int
3901 158622 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
3902 : {
3903 158622 : sqlstore *store = tr->store;
3904 158622 : gdk_return ok = GDK_SUCCEED;
3905 :
3906 158622 : (void) t;
3907 158622 : (void) segs;
3908 158622 : if (GDKinmemory(0))
3909 : return LOG_OK;
3910 :
3911 158615 : if (cs->cleared) {
3912 155274 : assert(cs->ucnt == 0);
3913 155274 : BAT *ins = temp_descriptor(cs->bid);
3914 155274 : if (!ins)
3915 : return LOG_ERR;
3916 155274 : assert(!isEbat(ins));
3917 155274 : bat_set_access(ins, BAT_READ);
3918 155274 : ok = log_bat_persists(store->logger, ins, id);
3919 155274 : bat_destroy(ins);
3920 155274 : if (ok == GDK_SUCCEED && cs->ebid) {
3921 56 : BAT *ins = temp_descriptor(cs->ebid);
3922 56 : if (!ins)
3923 : return LOG_ERR;
3924 56 : assert(!isEbat(ins));
3925 56 : bat_set_access(ins, BAT_READ);
3926 56 : ok = log_bat_persists(store->logger, ins, -id);
3927 56 : bat_destroy(ins);
3928 : }
3929 155274 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3930 : }
3931 :
3932 3341 : assert(!isTempTable(t));
3933 :
3934 3341 : if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
3935 2766 : BAT *ui = temp_descriptor(cs->uibid);
3936 2766 : BAT *uv = temp_descriptor(cs->uvbid);
3937 : /* any updates */
3938 2766 : if (ui == NULL || uv == NULL) {
3939 : ok = GDK_FAIL;
3940 2766 : } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
3941 2766 : ok = log_delta(store->logger, ui, uv, id);
3942 2766 : bat_destroy(ui);
3943 2766 : bat_destroy(uv);
3944 : }
3945 2766 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3946 : }
3947 :
3948 : static inline int
3949 58511 : tr_log_table_start(sql_trans *tr, sql_table *t) {
3950 58511 : sqlstore *store = tr->store;
3951 58511 : return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3952 : }
3953 :
3954 : static inline int
3955 58511 : tr_log_table_end(sql_trans *tr, sql_table *t) {
3956 58511 : sqlstore *store = tr->store;
3957 58511 : return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3958 : }
3959 :
3960 : static int
3961 58511 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
3962 : {
3963 58511 : sqlstore *store = tr->store;
3964 58511 : gdk_return ok = GDK_SUCCEED;
3965 :
3966 58511 : size_t end = segs_end(segs, tr, t);
3967 :
3968 58511 : if (tr_log_table_start(tr, t) != LOG_OK)
3969 : return LOG_ERR;
3970 :
3971 58511 : size_t nr_appends = 0;
3972 :
3973 58511 : lock_table(tr->store, t->base.id);
3974 410869 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3975 352358 : unlock_table(tr->store, t->base.id);
3976 :
3977 352358 : if (seg->ts == tr->tid && seg->end-seg->start) {
3978 112853 : if (!seg->deleted) {
3979 46506 : if (log_segment(tr, seg, t->base.id) != LOG_OK)
3980 : return LOG_ERR;
3981 :
3982 46506 : nr_appends += (seg->end - seg->start);
3983 : }
3984 : }
3985 352358 : lock_table(tr->store, t->base.id);
3986 : }
3987 58511 : unlock_table(tr->store, t->base.id);
3988 :
3989 405529 : for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
3990 347018 : sql_column *c = n->data;
3991 347018 : column_storage *cs = ATOMIC_PTR_GET(&c->data);
3992 :
3993 347018 : if (cs->cleared) {
3994 3 : ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
3995 3 : continue;
3996 : }
3997 :
3998 347015 : lock_table(tr->store, t->base.id);
3999 347015 : if (!cs->cleared) {
4000 2406695 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4001 2059680 : unlock_table(tr->store, t->base.id);
4002 2059680 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4003 : /* append col*/
4004 262989 : BAT *ins = temp_descriptor(cs->bid);
4005 262989 : if (ins == NULL)
4006 : return LOG_ERR;
4007 262989 : assert(BATcount(ins) >= cur->end);
4008 262989 : ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
4009 262989 : bat_destroy(ins);
4010 : }
4011 2059680 : lock_table(tr->store, t->base.id);
4012 : }
4013 : }
4014 347015 : unlock_table(tr->store, t->base.id);
4015 :
4016 347015 : if (ok == GDK_SUCCEED && cs->ebid) {
4017 19 : BAT *ins = temp_descriptor(cs->ebid);
4018 19 : if (ins == NULL)
4019 : return LOG_ERR;
4020 19 : if (BATcount(ins) > ins->batInserted)
4021 17 : ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
4022 19 : BATcommit(ins, BATcount(ins));
4023 19 : bat_destroy(ins);
4024 : }
4025 : }
4026 :
4027 58511 : if (t->idxs) {
4028 63693 : for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
4029 5182 : sql_idx *i = n->data;
4030 :
4031 5182 : if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
4032 4404 : continue;
4033 778 : column_storage *cs = ATOMIC_PTR_GET(&i->data);
4034 :
4035 778 : if (cs) {
4036 778 : if (cs->cleared) {
4037 0 : ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4038 0 : continue;
4039 : }
4040 :
4041 778 : lock_table(tr->store, t->base.id);
4042 2387 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4043 1609 : unlock_table(tr->store, t->base.id);
4044 1609 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4045 : /* append idx */
4046 730 : BAT *ins = temp_descriptor(cs->bid);
4047 730 : if (ins == NULL)
4048 : return LOG_ERR;
4049 730 : assert(BATcount(ins) >= cur->end);
4050 730 : ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
4051 730 : bat_destroy(ins);
4052 : }
4053 1609 : lock_table(tr->store, t->base.id);
4054 : }
4055 778 : unlock_table(tr->store, t->base.id);
4056 : }
4057 : }
4058 : }
4059 :
4060 58511 : if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
4061 0 : return LOG_ERR;
4062 :
4063 : return LOG_OK;
4064 : }
4065 :
4066 : static int
4067 84444 : log_storage(sql_trans *tr, sql_table *t, storage *s)
4068 : {
4069 84444 : int ok = LOG_OK;
4070 84444 : bool cleared = s->cs.cleared;
4071 84444 : if (ok == LOG_OK && cleared)
4072 25933 : ok = tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
4073 25933 : if (ok == LOG_OK)
4074 84444 : ok = log_segments(tr, s->segs, t->base.id);
4075 84444 : if (ok == LOG_OK && !cleared)
4076 58511 : ok = log_table_append(tr, t, s->segs);
4077 84444 : return ok;
4078 : }
4079 :
4080 : static void
4081 385432 : merge_cs( column_storage *cs, const char* caller)
4082 : {
4083 385432 : if (cs->bid && cs->ucnt) {
4084 2775 : BAT *cur = temp_descriptor(cs->bid);
4085 2775 : BAT *ui = temp_descriptor(cs->uibid);
4086 2775 : BAT *uv = temp_descriptor(cs->uvbid);
4087 :
4088 2775 : if (!cur || !ui || !uv) {
4089 0 : bat_destroy(ui);
4090 0 : bat_destroy(uv);
4091 0 : bat_destroy(cur);
4092 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4093 : return;
4094 : }
4095 2775 : assert(BATcount(ui) == BATcount(uv));
4096 :
4097 : /* any updates */
4098 2775 : assert(!isEbat(cur));
4099 2775 : if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
4100 0 : bat_destroy(ui);
4101 0 : bat_destroy(uv);
4102 0 : bat_destroy(cur);
4103 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4104 : return;
4105 : }
4106 : /* cleanup the old deltas */
4107 2775 : temp_destroy(cs->uibid);
4108 2775 : temp_destroy(cs->uvbid);
4109 2775 : cs->uibid = e_bat(TYPE_oid);
4110 2775 : cs->uvbid = e_bat(cur->ttype);
4111 2775 : assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
4112 2775 : cs->ucnt = 0;
4113 2775 : bat_destroy(ui);
4114 2775 : bat_destroy(uv);
4115 2775 : bat_destroy(cur);
4116 : }
4117 385432 : cs->cleared = false;
4118 385432 : cs->merged = true;
4119 385432 : return;
4120 : }
4121 :
4122 : static lng
4123 343347 : merge_delta( sql_delta *obat)
4124 : {
4125 343347 : lng res = 0;
4126 343347 : if (obat && obat->next && !obat->cs.merged)
4127 132696 : res += merge_delta(obat->next);
4128 343347 : res += obat->cs.ucnt;
4129 343347 : merge_cs(&obat->cs, __func__);
4130 343347 : return res;
4131 : }
4132 :
4133 : static void
4134 42085 : merge_storage(storage *tdb)
4135 : {
4136 42085 : merge_cs(&tdb->cs, __func__);
4137 :
4138 42085 : if (tdb->next) {
4139 487 : destroy_storage(tdb->next);
4140 487 : tdb->next = NULL;
4141 : }
4142 42085 : }
4143 :
4144 : static sql_delta *
4145 1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
4146 : {
4147 : /* commit ie copy back to the parent transaction */
4148 1 : if (delta && delta->cs.ts == commit_ts && delta->next) {
4149 1 : sql_delta *od = delta->next;
4150 1 : if (od->cs.ts == commit_ts) {
4151 0 : sql_delta t = *od, *n = od->next;
4152 0 : *od = *delta;
4153 0 : od->next = n;
4154 0 : *delta = t;
4155 0 : delta->next = NULL;
4156 0 : destroy_delta(delta, true);
4157 0 : return od;
4158 : }
4159 : }
4160 : return delta;
4161 : }
4162 :
4163 : static int
4164 132660 : log_update_col( sql_trans *tr, sql_change *change)
4165 : {
4166 132660 : sql_column *c = (sql_column*)change->obj;
4167 132660 : assert(!isTempTable(c->t));
4168 :
4169 132660 : if (isDeleted(c->t)) {
4170 0 : change->handled = true;
4171 0 : return LOG_OK;
4172 : }
4173 :
4174 132660 : if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
4175 132660 : storage *s = ATOMIC_PTR_GET(&c->t->data);
4176 132660 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
4177 132660 : return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
4178 : }
4179 : return LOG_OK;
4180 : }
4181 :
4182 : static int
4183 217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
4184 : {
4185 217 : sqlstore *store = Store;
4186 :
4187 217 : sql_delta *d = (sql_delta*)change->data;
4188 217 : if (d->cs.ts < oldest) {
4189 82 : destroy_delta(d, false);
4190 82 : if (change->commit == &commit_update_idx)
4191 2 : table_destroy(store, ((sql_idx*)change->obj)->t);
4192 : else
4193 80 : table_destroy(store, ((sql_column*)change->obj)->t);
4194 82 : return 1;
4195 : }
4196 135 : if (d->cs.ts > TRANSACTION_ID_BASE)
4197 82 : d->cs.ts = store_get_timestamp(store) + 1;
4198 : return 0;
4199 : }
4200 :
4201 : static int
4202 9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
4203 : {
4204 9 : sqlstore *store = Store;
4205 :
4206 9 : storage *d = (storage*)change->data;
4207 9 : if (d->cs.ts < oldest) {
4208 3 : destroy_storage(d);
4209 3 : table_destroy(store, (sql_table*)change->obj);
4210 3 : return 1;
4211 : }
4212 6 : if (d->cs.ts > TRANSACTION_ID_BASE)
4213 3 : d->cs.ts = store_get_timestamp(store) + 1;
4214 : return 0;
4215 : }
4216 :
4217 : static int
4218 132778 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
4219 : {
4220 132778 : (void) type; // TODO transaction_layer_revamp remove if remains unused
4221 :
4222 132778 : sql_delta *delta = ATOMIC_PTR_GET(data), *idelta = delta;
4223 :
4224 132778 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4225 3 : int ok = LOG_OK;
4226 3 : assert(isTempTable(t));
4227 3 : if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
4228 0 : ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
4229 3 : if (!tr->parent)
4230 0 : t->base.new = base->new = 0;
4231 3 : change->handled = true;
4232 3 : return ok;
4233 : }
4234 :
4235 132775 : if (commit_ts)
4236 132693 : delta->cs.ts = commit_ts;
4237 132775 : if (!commit_ts) { /* rollback */
4238 82 : sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
4239 :
4240 82 : if (change->ts && t->base.new) /* handled by create col */
4241 : return LOG_OK;
4242 82 : if (o != d) {
4243 0 : while(o && o->next != d)
4244 : o = o->next;
4245 : }
4246 82 : if (o == ATOMIC_PTR_GET(data))
4247 82 : ATOMIC_PTR_SET(data, d->next);
4248 : else
4249 0 : o->next = d->next;
4250 82 : d->next = NULL;
4251 82 : change->cleanup = &tc_gc_rollbacked;
4252 132693 : } else if (!tr->parent) {
4253 : /* merge deltas */
4254 356246 : while (delta && delta->cs.ts > oldest)
4255 223554 : delta = delta->next;
4256 132692 : if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
4257 31309 : lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
4258 31309 : idelta->nr_updates += merge_delta(delta);
4259 31309 : unlock_column(tr->store, base->id);
4260 : }
4261 1 : } else if (tr->parent) /* move delta into older and cleanup current save points */
4262 1 : ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
4263 : return LOG_OK;
4264 : }
4265 :
4266 : static int
4267 132750 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4268 : {
4269 :
4270 132750 : sql_column *c = (sql_column*)change->obj;
4271 132750 : sql_base* base = &c->base;
4272 132750 : sql_table* t = c->t;
4273 132750 : ATOMIC_PTR_TYPE* data = &c->data;
4274 132750 : int type = c->type.type->localtype;
4275 :
4276 132750 : if (change->handled || isDeleted(c->t))
4277 : return LOG_OK;
4278 :
4279 132750 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4280 : }
4281 :
4282 : static int
4283 26 : log_update_idx( sql_trans *tr, sql_change *change)
4284 : {
4285 26 : sql_idx *i = (sql_idx*)change->obj;
4286 26 : assert(!isTempTable(i->t));
4287 :
4288 26 : if (isDeleted(i->t)) {
4289 0 : change->handled = true;
4290 0 : return LOG_OK;
4291 : }
4292 :
4293 26 : if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
4294 26 : storage *s = ATOMIC_PTR_GET(&i->t->data);
4295 26 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
4296 26 : return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
4297 : }
4298 : return LOG_OK;
4299 : }
4300 :
4301 : static int
4302 28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4303 : {
4304 28 : sql_idx *i = (sql_idx*)change->obj;
4305 28 : sql_base* base = &i->base;
4306 28 : sql_table* t = i->t;
4307 28 : ATOMIC_PTR_TYPE* data = &i->data;
4308 28 : int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
4309 :
4310 28 : if (change->handled || isDeleted(i->t))
4311 : return LOG_OK;
4312 :
4313 28 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4314 : }
4315 :
4316 : static storage *
4317 26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
4318 : {
4319 26 : if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
4320 0 : storage *od = dbat->next;
4321 0 : if (od->cs.ts == commit_ts) {
4322 0 : storage t = *od, *n = od->next;
4323 0 : *od = *dbat;
4324 0 : od->next = n;
4325 0 : *dbat = t;
4326 0 : dbat->next = NULL;
4327 0 : destroy_storage(dbat);
4328 0 : return od;
4329 : }
4330 : }
4331 : return dbat;
4332 : }
4333 :
4334 : static int
4335 84444 : log_update_del( sql_trans *tr, sql_change *change)
4336 : {
4337 84444 : sql_table *t = (sql_table*)change->obj;
4338 84444 : assert(!isTempTable(t));
4339 :
4340 84444 : if (isDeleted(t)) {
4341 0 : change->handled = true;
4342 0 : return LOG_OK;
4343 : }
4344 :
4345 84444 : if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
4346 84444 : return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
4347 : return LOG_OK;
4348 : }
4349 :
4350 : static int
4351 88981 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4352 : {
4353 88981 : int ok = LOG_OK;
4354 88981 : sql_table *t = (sql_table*)change->obj;
4355 88981 : storage *dbat = ATOMIC_PTR_GET(&t->data);
4356 :
4357 88981 : if (change->handled || isDeleted(t))
4358 : return ok;
4359 :
4360 88981 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4361 22 : assert(isTempTable(t));
4362 22 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
4363 22 : if (commit_ts) dbat->segs->h->ts = commit_ts;
4364 22 : change->handled = true;
4365 22 : return ok;
4366 : }
4367 :
4368 88959 : lock_table(tr->store, t->base.id);
4369 88959 : if (!commit_ts) { /* rollback */
4370 4215 : if (dbat->cs.ts == tr->tid) {
4371 6 : if (change->ts && t->base.new) { /* handled by the create table */
4372 3 : unlock_table(tr->store, t->base.id);
4373 3 : return ok;
4374 : }
4375 3 : storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
4376 :
4377 3 : if (o != d) {
4378 0 : while(o && o->next != d)
4379 : o = o->next;
4380 : }
4381 3 : if (o == ATOMIC_PTR_GET(&t->data)) {
4382 3 : assert(d->next);
4383 3 : ATOMIC_PTR_SET(&t->data, d->next);
4384 : } else
4385 0 : o->next = d->next;
4386 3 : d->next = NULL;
4387 3 : change->cleanup = &tc_gc_rollbacked_storage;
4388 : } else
4389 4209 : rollback_segments(dbat->segs, tr, change, oldest);
4390 84744 : } else if (ok == LOG_OK && !tr->parent) {
4391 84718 : if (dbat->cs.ts == tr->tid) /* cleared table */
4392 25935 : dbat->cs.ts = commit_ts;
4393 :
4394 84718 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
4395 84718 : if (ok == LOG_OK) {
4396 84718 : merge_segments(dbat, tr, change, commit_ts, oldest);
4397 84718 : if (oldest == commit_ts)
4398 42085 : merge_storage(dbat);
4399 : }
4400 84718 : if (dbat)
4401 84718 : dbat->cs.cleared = false;
4402 26 : } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
4403 26 : merge_segments(dbat, tr, change, commit_ts, oldest);
4404 26 : ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
4405 26 : storage *s = change->data;
4406 26 : if (s->cs.ts == tr->tid)
4407 0 : s->cs.ts = commit_ts;
4408 : }
4409 88956 : unlock_table(tr->store, t->base.id);
4410 88956 : return ok;
4411 : }
4412 :
4413 : /* only rollback (content version) case for now */
4414 : static int
4415 18048 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
4416 : {
4417 18048 : sqlstore *store = Store;
4418 18048 : sql_column *c = (sql_column*)change->obj;
4419 :
4420 18048 : if (!c) /* cleaned earlier */
4421 : return 1;
4422 :
4423 172 : if (change->handled || isDeleted(c->t)) {
4424 0 : column_destroy(store, c);
4425 0 : return 1;
4426 : }
4427 :
4428 : /* savepoint commit (did it merge ?) */
4429 172 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4430 0 : column_destroy(store, c);
4431 0 : return 1;
4432 : }
4433 172 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4434 : return 0;
4435 171 : sql_delta *d = (sql_delta*)change->data, *id = d;
4436 171 : if (d && d->next) {
4437 :
4438 66 : if (d->cs.ts > oldest)
4439 : return LOG_OK; /* cannot cleanup yet */
4440 :
4441 : // d is oldest reachable delta
4442 63 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4443 62 : destroy_delta(d->next, true);
4444 62 : d->next = NULL;
4445 : }
4446 63 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4447 63 : id->nr_updates += merge_delta(d);
4448 63 : unlock_column(store, c->base.id);
4449 : }
4450 168 : column_destroy(store, c);
4451 168 : return 1;
4452 : }
4453 :
4454 : static int
4455 863722 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
4456 : {
4457 863722 : sqlstore *store = Store;
4458 863722 : sql_column *c = (sql_column*)change->obj;
4459 :
4460 863722 : if (!c) /* cleaned earlier */
4461 : return 1;
4462 :
4463 863722 : if (change->handled || isDeleted(c->t)) {
4464 3 : table_destroy(store, c->t);
4465 3 : return 1;
4466 : }
4467 :
4468 : /* savepoint commit (did it merge ?) */
4469 863719 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4470 70603 : table_destroy(store, c->t);
4471 70603 : return 1;
4472 : }
4473 793116 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4474 : return 0;
4475 793115 : sql_delta *d = (sql_delta*)change->data, *id = d;
4476 793115 : if (d && d->next) {
4477 :
4478 793115 : if (d->cs.ts > oldest)
4479 : return LOG_OK; /* cannot cleanup yet */
4480 :
4481 : // d is oldest reachable delta
4482 62000 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4483 5356 : destroy_delta(d->next, true);
4484 5356 : d->next = NULL;
4485 : }
4486 62000 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4487 62000 : id->nr_updates += merge_delta(d);
4488 62000 : unlock_column(store, c->base.id);
4489 : }
4490 62000 : table_destroy(store, c->t);
4491 62000 : return 1;
4492 : }
4493 :
4494 : static int
4495 2592 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
4496 : {
4497 2592 : sqlstore *store = Store;
4498 2592 : sql_idx *i = (sql_idx*)change->obj;
4499 :
4500 2592 : if (!i) /* cleaned earlier */
4501 : return 1;
4502 :
4503 633 : if (change->handled || isDeleted(i->t)) {
4504 0 : idx_destroy(store, i);
4505 0 : return 1;
4506 : }
4507 :
4508 : /* savepoint commit (did it merge ?) */
4509 633 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4510 0 : idx_destroy(store, i);
4511 0 : return 1;
4512 : }
4513 633 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4514 : return 0;
4515 633 : sql_delta *d = (sql_delta*)change->data, *id = d;
4516 633 : if (d && d->next) {
4517 0 : if (d->cs.ts > oldest)
4518 : return LOG_OK; /* cannot cleanup yet */
4519 :
4520 : // d is oldest reachable delta
4521 0 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4522 0 : destroy_delta(d->next, true);
4523 0 : d->next = NULL;
4524 : }
4525 0 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4526 0 : id->nr_updates += merge_delta(d);
4527 0 : unlock_column(store, i->base.id);
4528 : }
4529 633 : idx_destroy(store, i);
4530 633 : return 1;
4531 : }
4532 :
4533 : static int
4534 26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
4535 : {
4536 26 : sqlstore *store = Store;
4537 26 : sql_idx *i = (sql_idx*)change->obj;
4538 :
4539 26 : if (!i) /* cleaned earlier */
4540 : return 1;
4541 :
4542 26 : if (change->handled || isDeleted(i->t)) {
4543 0 : table_destroy(store, i->t);
4544 0 : return 1;
4545 : }
4546 :
4547 : /* savepoint commit (did it merge ?) */
4548 26 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4549 0 : table_destroy(store, i->t);
4550 0 : return 1;
4551 : }
4552 26 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4553 : return 0;
4554 26 : sql_delta *d = (sql_delta*)change->data, *id = d;
4555 26 : if (d && d->next) {
4556 26 : if (d->cs.ts > oldest)
4557 : return LOG_OK; /* cannot cleanup yet */
4558 :
4559 : // d is oldest reachable delta
4560 26 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4561 26 : destroy_delta(d->next, true);
4562 26 : d->next = NULL;
4563 : }
4564 26 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4565 26 : id->nr_updates += merge_delta(d);
4566 26 : unlock_column(store, i->base.id);
4567 : }
4568 26 : table_destroy(store, i->t);
4569 26 : return 1;
4570 : }
4571 :
4572 : static int
4573 246600 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
4574 : {
4575 246600 : sqlstore *store = Store;
4576 246600 : sql_table *t = (sql_table*)change->obj;
4577 :
4578 246600 : if (change->handled || isDeleted(t)) {
4579 3501 : table_destroy(store, t);
4580 3501 : return 1;
4581 : }
4582 : /* savepoint commit (did it merge ?) */
4583 243099 : if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
4584 14144 : table_destroy(store, t);
4585 14144 : return 1;
4586 : }
4587 228955 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4588 : return 0;
4589 228927 : storage *d = (storage*)change->data;
4590 228927 : if (d->next) {
4591 156925 : if (d->cs.ts > oldest)
4592 : return LOG_OK; /* cannot cleanup yet */
4593 :
4594 11305 : destroy_storage(d->next);
4595 11305 : d->next = NULL;
4596 : }
4597 83307 : table_destroy(store, t);
4598 83307 : return 1;
4599 : }
4600 :
4601 : static int
4602 30612 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
4603 : {
4604 30612 : if (nr == 0)
4605 : return LOG_OK;
4606 30612 : assert (nr > 0);
4607 30612 : if ((!offsets || !*offsets) && nr == total) {
4608 30583 : *offset = slot;
4609 30583 : return LOG_OK;
4610 : }
4611 29 : if (!*offsets) {
4612 7 : *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
4613 7 : if (!*offsets)
4614 : return LOG_ERR;
4615 : }
4616 29 : oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
4617 16168 : for(size_t i = 0; i < nr; i++)
4618 16139 : dst[i] = slot + i;
4619 29 : (*offsets)->batCount += nr;
4620 29 : (*offsets)->theap->dirty = true;
4621 29 : return LOG_OK;
4622 : }
4623 :
4624 : static int
4625 30590 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4626 : {
4627 30590 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4628 30590 : assert(s->segs);
4629 30590 : ulng oldest = store_oldest(tr->store, NULL);
4630 30590 : BUN slot = 0;
4631 30590 : size_t total = cnt;
4632 :
4633 30590 : if (!locked)
4634 30590 : lock_table(tr->store, t->base.id);
4635 : /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
4636 61312 : for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4637 30724 : if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* reuse old deleted or rolled back append */
4638 35 : if ((seg->end - seg->start) >= cnt) {
4639 : /* if previous is claimed before we could simply adjust the end/start */
4640 13 : if (p && p->ts == tr->tid && !p->deleted) {
4641 2 : slot = p->end;
4642 2 : p->end += cnt;
4643 2 : seg->start += cnt;
4644 2 : if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
4645 : ok = LOG_ERR;
4646 : break;
4647 : }
4648 2 : s->segs->nr_reused += cnt;
4649 2 : cnt = 0;
4650 2 : break;
4651 : }
4652 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4653 11 : size_t rcnt = seg->end - seg->start;
4654 11 : if (rcnt > cnt)
4655 : rcnt = cnt;
4656 11 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
4657 : ok = LOG_ERR;
4658 : break;
4659 : }
4660 : }
4661 33 : seg->ts = tr->tid;
4662 33 : seg->deleted = false;
4663 33 : slot = seg->start;
4664 33 : if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
4665 : ok = LOG_ERR;
4666 : break;
4667 : }
4668 33 : s->segs->nr_reused += (seg->end - seg->start);
4669 33 : cnt -= (seg->end - seg->start);
4670 : }
4671 : }
4672 30590 : if (ok == LOG_OK && cnt) {
4673 30577 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4674 29439 : slot = s->segs->t->end;
4675 29439 : s->segs->t->end += cnt;
4676 : } else {
4677 1138 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4678 : ok = LOG_ERR;
4679 : } else {
4680 1138 : if (!s->segs->h)
4681 0 : s->segs->h = s->segs->t;
4682 1138 : slot = s->segs->t->start;
4683 : }
4684 : }
4685 30577 : if (ok == LOG_OK)
4686 30577 : ok = add_offsets(slot, cnt, total, offset, offsets);
4687 : }
4688 30590 : if (!locked)
4689 30590 : unlock_table(tr->store, t->base.id);
4690 :
4691 30590 : if (ok == LOG_OK) {
4692 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4693 30590 : if (!in_transaction) {
4694 1123 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4695 1123 : in_transaction = true;
4696 : }
4697 30590 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4698 30576 : tr->logchanges += (lng) total;
4699 30590 : if (*offsets) {
4700 7 : BAT *pos = *offsets;
4701 7 : assert(BATcount(pos) == total);
4702 7 : BATsetcount(pos, total); /* set other properties */
4703 7 : pos->tnil = false;
4704 7 : pos->tnonil = true;
4705 7 : pos->tkey = true;
4706 7 : pos->tsorted = true;
4707 7 : pos->trevsorted = false;
4708 : }
4709 : }
4710 30590 : return ok;
4711 : }
4712 :
4713 : static int
4714 2057558 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4715 : {
4716 2057558 : if (cnt > 1 && offsets)
4717 30590 : return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
4718 2026968 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4719 2026968 : assert(s->segs);
4720 2026968 : ulng oldest = store_oldest(tr->store, NULL);
4721 2026968 : BUN slot = 0;
4722 2026968 : int reused = 0;
4723 :
4724 2026968 : if (!locked)
4725 1901714 : lock_table(tr->store, t->base.id);
4726 : /* naive vacuum approach, iterator through segments, check for large enough deleted segments
4727 : * or create new segment at the end */
4728 7885953 : for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4729 5927895 : if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* reuse old deleted or rolled back append */
4730 :
4731 68910 : if ((seg->end - seg->start) >= cnt) {
4732 :
4733 : /* if previous is claimed before we could simply adjust the end/start */
4734 68910 : if (p && p->ts == tr->tid && !p->deleted) {
4735 53756 : slot = p->end;
4736 53756 : p->end += cnt;
4737 53756 : seg->start += cnt;
4738 53756 : s->segs->nr_reused += cnt;
4739 53756 : reused = 1;
4740 53756 : break;
4741 : }
4742 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4743 15154 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
4744 : ok = LOG_ERR;
4745 : break;
4746 : }
4747 : }
4748 15154 : seg->ts = tr->tid;
4749 15154 : seg->deleted = false;
4750 15154 : slot = seg->start;
4751 15154 : s->segs->nr_reused += cnt;
4752 15154 : reused = 1;
4753 15154 : break;
4754 : }
4755 : }
4756 2026968 : if (ok == LOG_OK && !reused) {
4757 1958058 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4758 1923022 : slot = s->segs->t->end;
4759 1923022 : s->segs->t->end += cnt;
4760 : } else {
4761 35036 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4762 : ok = LOG_ERR;
4763 : } else {
4764 35036 : if (!s->segs->h)
4765 0 : s->segs->h = s->segs->t;
4766 35036 : slot = s->segs->t->start;
4767 : }
4768 : }
4769 : }
4770 2026968 : if (!locked)
4771 1901714 : unlock_table(tr->store, t->base.id);
4772 :
4773 2026968 : if (ok == LOG_OK) {
4774 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4775 2026968 : if (!in_transaction) {
4776 48976 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4777 48976 : in_transaction = true;
4778 : }
4779 2026968 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4780 2026537 : tr->logchanges += (lng) cnt;
4781 2026968 : *offset = slot;
4782 : }
4783 : return ok;
4784 : }
4785 :
4786 : /*
4787 : * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
4788 : * of the global transaction and mark the newly added storage slots unused on the global
4789 : * level but used on the local transaction level. Besides this the local transaction needs
4790 : * to update (and mark unused) any slot in between the old end and new slots.
4791 : * */
4792 : static int
4793 1932304 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4794 : {
4795 1932304 : storage *s;
4796 :
4797 : /* we have a single segment structure for each persistent table
4798 : * for temporary tables each has its own */
4799 1932304 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4800 : return LOG_ERR;
4801 :
4802 1932304 : return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
4803 : }
4804 :
4805 : /* some tables cannot be updated concurrently (user/roles etc) */
4806 : static int
4807 125258 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4808 : {
4809 125258 : storage *s;
4810 125258 : int res = 0;
4811 :
4812 : /* we have a single segment structure for each persistent table
4813 : * for temporary tables each has its own */
4814 125258 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4815 : /* TODO check for other inserts ! */
4816 : return LOG_ERR;
4817 :
4818 125258 : lock_table(tr->store, t->base.id);
4819 125258 : if ((res = segments_conflict(tr, s->segs, 1))) {
4820 4 : unlock_table(tr->store, t->base.id);
4821 4 : return LOG_CONFLICT;
4822 : }
4823 125254 : res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
4824 125254 : unlock_table(tr->store, t->base.id);
4825 125254 : return res;
4826 : }
4827 :
4828 : static int
4829 21262 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
4830 : {
4831 21262 : storage *s;
4832 21262 : int res = 0;
4833 :
4834 21262 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4835 : return LOG_ERR;
4836 :
4837 21262 : lock_table(tr->store, t->base.id);
4838 21262 : res = segments_conflict(tr, s->segs, uncommitted);
4839 21262 : unlock_table(tr->store, t->base.id);
4840 21262 : return res ? LOG_CONFLICT : LOG_OK;
4841 : }
4842 :
4843 : static size_t
4844 1360192 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
4845 : {
4846 1360192 : size_t cnt = 0;
4847 :
4848 1470874 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
4849 : ;
4850 :
4851 3432803 : for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
4852 2072610 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
4853 115374 : cnt += s->end - s->start;
4854 : }
4855 1360193 : return cnt;
4856 : }
4857 :
4858 : static BAT *
4859 1360220 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
4860 : {
4861 1360220 : lock_table(tr->store, t->base.id);
4862 1360197 : segment *s = S->segs->h;
4863 : /* step one no deletes -> dense range */
4864 1360197 : uint32_t cur = 0;
4865 1360197 : size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
4866 1360189 : if (!dnr) {
4867 1256420 : unlock_table(tr->store, t->base.id);
4868 1256450 : return BATdense(start, start, end-start);
4869 : }
4870 :
4871 103769 : BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
4872 103769 : if (!b) {
4873 0 : unlock_table(tr->store, t->base.id);
4874 0 : return NULL;
4875 : }
4876 :
4877 103769 : uint32_t *restrict dst = Tloc(b, 0);
4878 58610199 : for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
4879 58564638 : if (s->end < start)
4880 82670 : continue;
4881 58481968 : if (s->start >= end)
4882 : break;
4883 58423760 : msk m = (SEG_IS_VALID(s, tr));
4884 58423760 : size_t lnr = s->end-s->start;
4885 58423760 : if (s->start < start)
4886 6733 : lnr -= (start - s->start);
4887 58423760 : if (s->end > end)
4888 5944 : lnr -= s->end - end;
4889 :
4890 58423760 : if (m) {
4891 892676 : size_t used = pos&31, end = 32;
4892 892676 : if (used) {
4893 766393 : if (lnr < (32-used))
4894 485380 : end = used + lnr;
4895 766393 : assert(end > used);
4896 766393 : cur |= ((1U << (end - used)) - 1) << used;
4897 766393 : lnr -= end - used;
4898 766393 : pos += end - used;
4899 766393 : if (end == 32) {
4900 281013 : *dst++ = cur;
4901 281013 : cur = 0;
4902 : }
4903 : }
4904 892676 : size_t full = lnr/32;
4905 892676 : size_t rest = lnr%32;
4906 892676 : if (full > 0) {
4907 225760 : memset(dst, ~0, full * sizeof(*dst));
4908 225760 : dst += full;
4909 225760 : lnr -= full * 32;
4910 225760 : pos += full * 32;
4911 : }
4912 892676 : if (rest > 0) {
4913 387859 : cur |= (1U << rest) - 1;
4914 387859 : lnr -= rest;
4915 387859 : pos += rest;
4916 : }
4917 892676 : assert(lnr==0);
4918 : } else {
4919 57531084 : size_t used = pos&31, end = 32;
4920 57531084 : if (used) {
4921 55731856 : if (lnr < (32-used))
4922 53836667 : end = used + lnr;
4923 :
4924 55731856 : pos+= (end-used);
4925 55731856 : lnr-= (end-used);
4926 55731856 : if (end == 32) {
4927 1895189 : *dst++ = cur;
4928 1895189 : cur = 0;
4929 : }
4930 : }
4931 57531084 : size_t full = lnr/32;
4932 57531084 : size_t rest = lnr%32;
4933 57531084 : memset(dst, 0, full * sizeof(*dst));
4934 57531084 : dst += full;
4935 57531084 : lnr -= full * 32;
4936 57531084 : pos += full * 32;
4937 57531084 : pos+= rest;
4938 57531084 : lnr-= rest;
4939 57531084 : assert(lnr==0);
4940 : }
4941 : }
4942 :
4943 103769 : unlock_table(tr->store, t->base.id);
4944 103769 : if (pos%32)
4945 102506 : *dst=cur;
4946 103769 : BATsetcount(b, nr);
4947 103768 : bn = BATmaskedcands(start, nr, b, true);
4948 103768 : BBPreclaim(b);
4949 103769 : (void)pos;
4950 103769 : assert (pos == nr);
4951 : return bn;
4952 : }
4953 :
4954 : static void * /* BAT * */
4955 1367019 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
4956 : {
4957 : /* with nr_of_parts - part_nr we can adjust parts */
4958 1367019 : storage *s = tab_timestamp_storage(tr, t);
4959 :
4960 1367001 : if (!s)
4961 : return NULL;
4962 1367001 : size_t nr = segs_end(s->segs, tr, t);
4963 :
4964 1367002 : if (!nr)
4965 6780 : return BATdense(0, 0, 0);
4966 :
4967 : /* compute proper part */
4968 1360222 : size_t part_size = nr/nr_of_parts;
4969 1360222 : size_t start = part_size * part_nr;
4970 1360222 : size_t end = start + part_size;
4971 1360222 : if (part_nr == (nr_of_parts-1))
4972 1288132 : end = nr;
4973 1360222 : assert(end <= nr);
4974 1360222 : return segments2cands(s, tr, t, start, end);
4975 : }
4976 :
4977 : static int
4978 5 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
4979 : {
4980 5 : bool update_conflict = false;
4981 :
4982 5 : if (segments_in_transaction(tr, col->t))
4983 : return LOG_CONFLICT;
4984 :
4985 5 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
4986 :
4987 5 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
4988 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
4989 5 : assert(d && d->cs.ts == tr->tid);
4990 5 : if (odelta != d)
4991 5 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
4992 5 : if (d->cs.bid)
4993 5 : temp_destroy(d->cs.bid);
4994 5 : if (d->cs.uibid)
4995 5 : temp_destroy(d->cs.uibid);
4996 5 : if (d->cs.uvbid)
4997 5 : temp_destroy(d->cs.uvbid);
4998 5 : bat_set_access(bn, BAT_READ);
4999 5 : d->cs.bid = temp_create(bn);
5000 5 : d->cs.uibid = 0;
5001 5 : d->cs.uvbid = 0;
5002 5 : d->cs.ucnt = 0;
5003 5 : d->cs.cleared = true;
5004 5 : d->cs.ts = tr->tid;
5005 5 : ATOMIC_INIT(&d->cs.refcnt, 1);
5006 5 : return LOG_OK;
5007 : }
5008 :
5009 : static int
5010 5 : vacuum_col(sql_trans *tr, sql_column *c, bool force)
5011 : {
5012 5 : if (segments_in_transaction(tr, c->t))
5013 : return LOG_CONFLICT;
5014 :
5015 5 : sql_delta *d = NULL;
5016 :
5017 : /* do we have enough to clean */
5018 5 : if ((d = bind_col_data(tr, c, NULL)) == NULL)
5019 : return LOG_CONFLICT;
5020 :
5021 : /* do we have enough to clean */
5022 5 : if (!force && (d->nr_updates) < 1024)
5023 : return LOG_OK;
5024 :
5025 5 : BAT *b = NULL, *bn = NULL;;
5026 5 : if ((b = bind_col(tr, c, 0)) == NULL)
5027 : return LOG_ERR;
5028 5 : if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
5029 0 : BBPreclaim(b);
5030 0 : return LOG_ERR;
5031 : }
5032 5 : int res = swap_bats(tr, c, bn);
5033 5 : d->nr_updates = 0;
5034 5 : BBPreclaim(b);
5035 5 : BBPreclaim(bn);
5036 5 : return res;
5037 : }
5038 :
5039 : static int
5040 0 : vacuum_tab(sql_trans *tr, sql_table *t, bool force)
5041 : {
5042 0 : if (segments_in_transaction(tr, t))
5043 : return LOG_CONFLICT;
5044 :
5045 0 : storage *s;
5046 0 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
5047 : return LOG_ERR;
5048 :
5049 0 : for( node *n = ol_first_node(t->columns); n; n = n->next) {
5050 0 : sql_column *c = n->data;
5051 :
5052 0 : if (!ATOMvarsized(c->type.type->localtype))
5053 0 : continue;
5054 0 : sql_delta *d = NULL;
5055 :
5056 : /* do we have enough to clean */
5057 0 : if ((d = bind_col_data(tr, c, NULL)) == NULL)
5058 : return LOG_CONFLICT;
5059 :
5060 : /* do we have enough to clean */
5061 0 : if (!force && (d->nr_updates + s->segs->nr_reused) < 1024)
5062 0 : continue;
5063 :
5064 0 : BAT *b = NULL, *bn = NULL;;
5065 0 : if ((b = bind_col(tr, c, 0)) == NULL)
5066 : return LOG_ERR;
5067 0 : if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
5068 0 : BBPreclaim(b);
5069 0 : return LOG_ERR;
5070 : }
5071 0 : int res = swap_bats(tr, c, bn);
5072 0 : d->nr_updates = 0;
5073 0 : BBPreclaim(b);
5074 0 : BBPreclaim(bn);
5075 0 : if (res != LOG_OK)
5076 0 : return res;
5077 : }
5078 0 : s->segs->nr_reused = 0;
5079 0 : return LOG_OK;
5080 : }
5081 :
5082 :
5083 : static int
5084 58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
5085 : {
5086 58 : bool update_conflict = false;
5087 :
5088 58 : if (segments_in_transaction(tr, col->t))
5089 : return LOG_CONFLICT;
5090 :
5091 58 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5092 :
5093 58 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5094 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5095 58 : assert(d && d->cs.ts == tr->tid);
5096 58 : assert(col->t->persistence != SQL_DECLARED_TABLE);
5097 58 : if (odelta != d)
5098 58 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
5099 :
5100 58 : d->cs.st = st;
5101 58 : d->cs.cleared = true;
5102 58 : if (d->cs.bid)
5103 58 : temp_destroy(d->cs.bid);
5104 58 : o = transfer_to_systrans(o);
5105 58 : if (o == NULL)
5106 : return LOG_ERR;
5107 58 : bat_set_access(o, BAT_READ);
5108 58 : d->cs.bid = temp_create(o);
5109 58 : if (u) {
5110 53 : if (d->cs.ebid)
5111 0 : temp_destroy(d->cs.ebid);
5112 53 : u = transfer_to_systrans(u);
5113 53 : if (u == NULL)
5114 : return LOG_ERR;
5115 53 : d->cs.ebid = temp_create(u);
5116 : }
5117 : return LOG_OK;
5118 : }
5119 :
5120 : void
5121 330 : bat_storage_init( store_functions *sf)
5122 : {
5123 330 : sf->bind_col = &bind_col;
5124 330 : sf->bind_updates = &bind_updates;
5125 330 : sf->bind_updates_idx = &bind_updates_idx;
5126 330 : sf->bind_idx = &bind_idx;
5127 330 : sf->bind_cands = &bind_cands;
5128 :
5129 330 : sf->claim_tab = &claim_tab;
5130 330 : sf->key_claim_tab = &key_claim_tab;
5131 330 : sf->tab_validate = &tab_validate;
5132 :
5133 330 : sf->append_col = &append_col;
5134 330 : sf->append_idx = &append_idx;
5135 :
5136 330 : sf->update_col = &update_col;
5137 330 : sf->update_idx = &update_idx;
5138 :
5139 330 : sf->delete_tab = &delete_tab;
5140 :
5141 330 : sf->count_del = &count_del;
5142 330 : sf->count_col = &count_col;
5143 330 : sf->count_idx = &count_idx;
5144 330 : sf->dcount_col = &dcount_col;
5145 330 : sf->min_max_col = &min_max_col;
5146 330 : sf->set_stats_col = &set_stats_col;
5147 330 : sf->sorted_col = &sorted_col;
5148 330 : sf->unique_col = &unique_col;
5149 330 : sf->double_elim_col = &double_elim_col;
5150 330 : sf->col_stats = &col_stats;
5151 330 : sf->col_set_range = &col_set_range;
5152 330 : sf->col_not_null = &col_not_null;
5153 :
5154 330 : sf->col_dup = &col_dup;
5155 330 : sf->idx_dup = &idx_dup;
5156 330 : sf->del_dup = &del_dup;
5157 :
5158 330 : sf->create_col = &create_col; /* create and add to change list */
5159 330 : sf->create_idx = &create_idx;
5160 330 : sf->create_del = &create_del;
5161 :
5162 330 : sf->destroy_col = &destroy_col; /* free resources */
5163 330 : sf->destroy_idx = &destroy_idx;
5164 330 : sf->destroy_del = &destroy_del;
5165 :
5166 330 : sf->drop_col = &drop_col; /* add drop to change list */
5167 330 : sf->drop_idx = &drop_idx;
5168 330 : sf->drop_del = &drop_del;
5169 :
5170 330 : sf->clear_table = &clear_table;
5171 :
5172 330 : sf->vacuum_col = &vacuum_col;
5173 330 : sf->vacuum_tab = &vacuum_tab;
5174 330 : sf->col_compress = &col_compress;
5175 330 : }
|