Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024, 2025 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "bat_storage.h"
15 : #include "bat_utils.h"
16 : #include "sql_string.h"
17 : #include "matomic.h"
18 :
19 : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
20 : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
21 :
22 : static int log_update_col( sql_trans *tr, sql_change *c);
23 : static int log_update_idx( sql_trans *tr, sql_change *c);
24 : static int log_update_del( sql_trans *tr, sql_change *c);
25 : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
26 : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
27 : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
28 : static int log_create_col(sql_trans *tr, sql_change *change);
29 : static int log_create_idx(sql_trans *tr, sql_change *change);
30 : static int log_create_del(sql_trans *tr, sql_change *change);
31 : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
32 : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
33 : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
34 : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
35 : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
36 : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
37 : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
38 : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
39 :
40 : static lng merge_delta( sql_delta *obat);
41 :
42 : /* valid
43 : * !deleted && VALID_4_READ(TS, tr) existing or newly created segment
44 : * deleted && TS > tr->ts && OLDTS < tr->ts deleted after current transaction
45 : */
46 :
47 : #define VALID_4_READ(TS,tr) \
48 : (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
49 :
50 : /* when changed, check if the old status is still valid */
51 : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
52 : (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
53 :
54 : #define SEG_VALID_4_DELETE(seg,tr) \
55 : (!seg->deleted && VALID_4_READ(seg->ts, tr))
56 :
57 : /* Delete (in current trans or by some other finished transaction, or re-used segment which used to be deleted */
58 : #define SEG_IS_DELETED(seg,tr) \
59 : ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
60 : (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
61 :
62 : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
63 : #define SEG_IS_VALID(seg, tr) \
64 : ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
65 : (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
66 :
67 : static inline BAT *
68 5299 : transfer_to_systrans(BAT *b)
69 : {
70 : /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
71 5299 : MT_lock_set(&b->theaplock);
72 5299 : if (VIEWtparent(b) || VIEWvtparent(b)) {
73 29 : MT_lock_unset(&b->theaplock);
74 29 : BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
75 29 : BBPreclaim(b);
76 29 : return bn;
77 : }
78 5270 : if (b->theap->farmid == TRANSIENT ||
79 62 : (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
80 4885 : QryCtx *qc = MT_thread_get_qry_ctx();
81 4885 : if (qc) {
82 2523 : if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
83 2523 : ATOMIC_SUB(&qc->datasize, b->theap->size);
84 2523 : b->theap->farmid = SYSTRANS;
85 2523 : b->batRole = SYSTRANS;
86 : }
87 2523 : if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
88 1092 : ATOMIC_SUB(&qc->datasize, b->tvheap->size);
89 1092 : b->tvheap->farmid = SYSTRANS;
90 : }
91 : }
92 : }
93 5270 : MT_lock_unset(&b->theaplock);
94 5270 : return b;
95 : }
96 :
97 : static void
98 28799722 : lock_table(sqlstore *store, sqlid id)
99 : {
100 28799722 : MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
101 28889230 : }
102 :
103 : static void
104 28889076 : unlock_table(sqlstore *store, sqlid id)
105 : {
106 28889076 : MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
107 28886362 : }
108 :
109 : static void
110 21287348 : lock_column(sqlstore *store, sqlid id)
111 : {
112 21287348 : MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
113 21310616 : }
114 :
115 : static void
116 21308384 : unlock_column(sqlstore *store, sqlid id)
117 : {
118 21308384 : MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
119 21320715 : }
120 :
121 : static void
122 115247 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
123 : {
124 115247 : assert(cleanup);
125 115247 : trans_add(tr, dup_base(b), data, cleanup, commit, log);
126 115245 : }
127 :
128 : static void
129 132848 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
130 : {
131 132848 : assert(cleanup);
132 132848 : dup_base(&t->base);
133 132846 : trans_add(tr, b, data, cleanup, commit, log);
134 132838 : }
135 :
136 : static int
137 76496 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
138 : {
139 76496 : segment *s = change->data;
140 :
141 76496 : if (s->ts <= oldest) {
142 34571 : while(s) {
143 21130 : segment *n = s->prev;
144 21130 : _DELETE(s);
145 21130 : s = n;
146 : }
147 13441 : sqlstore *store = Store;
148 13441 : table_destroy(store, (sql_table*)change->obj);
149 13441 : return 1;
150 : }
151 : return LOG_OK;
152 : }
153 :
154 : static void
155 21130 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
156 : {
157 : /* we can only be accessed by anything older then commit_ts */
158 21130 : if (c->cleanup == &tc_gc_seg)
159 7689 : s->prev = c->data;
160 : else
161 13441 : c->cleanup = &tc_gc_seg;
162 21130 : c->data = s;
163 21130 : s->ts = commit_ts;
164 16896 : }
165 :
166 : static segment *
167 89150 : new_segment(segment *o, sql_trans *tr, size_t cnt)
168 : {
169 89150 : segment *n = (segment*)GDKmalloc(sizeof(segment));
170 :
171 89152 : assert(tr);
172 89152 : if (n) {
173 89152 : *n = (segment) {
174 89152 : .ts = tr->tid,
175 : .oldts = 0,
176 : .deleted = false,
177 : .start = 0,
178 : .end = cnt,
179 : .next = ATOMIC_PTR_VAR_INIT(NULL),
180 : .prev = NULL,
181 : };
182 89152 : if (o) {
183 36368 : n->start += o->end;
184 36368 : n->end += o->end;
185 36368 : ATOMIC_PTR_SET(&o->next, n);
186 : }
187 : }
188 89152 : return n;
189 : }
190 :
191 : static segment *
192 96011 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
193 : {
194 96011 : assert(tr);
195 96011 : if (o->start == start && o->end == start+cnt) {
196 11375 : assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
197 11375 : o->oldts = o->ts;
198 11375 : o->ts = tr->tid;
199 11375 : o->deleted = deleted;
200 11375 : return o;
201 : }
202 84636 : segment *n = (segment*)GDKmalloc(sizeof(segment));
203 :
204 84636 : if (!n)
205 : return NULL;
206 84636 : n->prev = NULL;
207 :
208 84636 : if (o->ts == tr->tid) {
209 5921 : n->oldts = 0;
210 5921 : n->ts = 1;
211 5921 : n->deleted = true;
212 : } else {
213 78715 : n->oldts = o->ts;
214 78715 : n->ts = tr->tid;
215 78715 : n->deleted = deleted;
216 : }
217 84636 : if (start == o->start) {
218 : /* 2-way split: o remains latter part of segment, new one is
219 : * inserted before */
220 68203 : n->start = o->start;
221 68203 : n->end = n->start + cnt;
222 68203 : ATOMIC_PTR_INIT(&n->next, o);
223 68203 : if (segs->h == o)
224 464 : segs->h = n;
225 68203 : if (p)
226 67739 : ATOMIC_PTR_SET(&p->next, n);
227 68203 : o->start = n->end;
228 16433 : } else if (start+cnt == o->end) {
229 : /* 2-way split: o remains first part of segment, new one is
230 : * added after */
231 5801 : n->start = o->end - cnt;
232 5801 : n->end = o->end;
233 5801 : ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
234 5801 : ATOMIC_PTR_SET(&o->next, n);
235 5801 : if (segs->t == o)
236 901 : segs->t = n;
237 5801 : o->end = n->start;
238 : } else {
239 : /* 3-way split: o remains first part of segment, two new ones
240 : * are added after */
241 10632 : segment *n2 = GDKmalloc(sizeof(segment));
242 10632 : if (n2 == NULL) {
243 0 : GDKfree(n);
244 0 : return NULL;
245 : }
246 10632 : ATOMIC_PTR_INIT(&n->next, n2);
247 10632 : n->start = start;
248 10632 : n->end = start + cnt;
249 10632 : *n2 = *o;
250 10632 : ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
251 10632 : n2->start = n->end;
252 10632 : n2->prev = NULL;
253 10632 : if (segs->t == o)
254 4477 : segs->t = n2;
255 10632 : ATOMIC_PTR_SET(&o->next, n);
256 10632 : o->end = start;
257 : }
258 : return n;
259 : }
260 :
261 : static void
262 4302 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
263 : {
264 4302 : segment *cur = segs->h, *seg = NULL;
265 18313 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
266 14011 : if (cur->ts == tr->tid) { /* revert */
267 4799 : cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
268 4799 : cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
269 4799 : cur->oldts = 0;
270 : }
271 14011 : if (cur->ts <= oldest) { /* possibly merge range */
272 12940 : if (!seg) { /* skip first */
273 : seg = cur;
274 8638 : } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
275 : /* merge with previous */
276 4234 : seg->end = cur->end;
277 4234 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
278 4234 : if (cur == segs->t)
279 2857 : segs->t = seg;
280 4234 : mark4destroy(cur, change, store_get_timestamp(tr->store));
281 4234 : cur = seg;
282 : } else {
283 : seg = cur; /* begin of new merge */
284 : }
285 : }
286 : }
287 4302 : }
288 :
289 : static size_t
290 104773 : segs_end_include_deleted( segments *segs, sql_trans *tr)
291 : {
292 104773 : size_t cnt = 0;
293 104773 : segment *s = segs->h, *l = NULL;
294 :
295 480103 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
296 375330 : if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
297 : l = s;
298 : }
299 104773 : if (l)
300 104766 : cnt = l->end;
301 104773 : return cnt;
302 : }
303 :
304 : static int
305 104773 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
306 : {
307 : /* set bits correctly */
308 104773 : BAT *b = temp_descriptor(cs->bid);
309 :
310 104773 : if (!b)
311 : return LOG_ERR;
312 104773 : segment *s = segs->h;
313 :
314 104773 : size_t nr = segs_end_include_deleted(segs, tr);
315 104773 : size_t rounded_nr = ((nr+31)&~31);
316 104773 : if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
317 0 : bat_destroy(b);
318 0 : return LOG_ERR;
319 : }
320 :
321 : /* disable all properties here */
322 104773 : MT_lock_set(&b->theaplock);
323 104773 : b->tsorted = false;
324 104773 : b->trevsorted = false;
325 104773 : b->tnosorted = 0;
326 104773 : b->tnorevsorted = 0;
327 104773 : b->tseqbase = oid_nil;
328 104773 : b->tkey = false;
329 104773 : b->tnokey[0] = 0;
330 104773 : b->tnokey[1] = 0;
331 104773 : BUN cnt = BATcount(b);
332 :
333 104773 : uint32_t *restrict dst;
334 421885 : for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
335 360659 : if (s->start >= nr)
336 : break;
337 317112 : if (s->ts == tr->tid && s->end != s->start) {
338 151852 : if (cnt < s->start) { /* first mark as deleted ! */
339 2646 : size_t lnr = s->start-cnt;
340 2646 : size_t pos = cnt;
341 2646 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
342 2646 : uint32_t cur = 0;
343 :
344 2646 : size_t used = pos&31, end = 32;
345 2646 : if (used) {
346 2564 : if (lnr < (32-used))
347 2436 : end = used + lnr;
348 2564 : assert(end > used);
349 2564 : cur |= ((1U << (end - used)) - 1) << used;
350 2564 : lnr -= end - used;
351 2564 : *dst++ |= cur;
352 2564 : cur = 0;
353 : }
354 2646 : size_t full = lnr/32;
355 2646 : size_t rest = lnr%32;
356 2646 : if (full > 0) {
357 7 : memset(dst, ~0, full * sizeof(*dst));
358 7 : dst += full;
359 7 : lnr -= full * 32;
360 : }
361 2646 : if (rest > 0) {
362 137 : cur |= (1U << rest) - 1;
363 137 : lnr -= rest;
364 137 : *dst |= cur;
365 : }
366 2646 : assert(lnr==0);
367 : }
368 151852 : size_t lnr = s->end-s->start;
369 151852 : size_t pos = s->start;
370 151852 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
371 151852 : uint32_t cur = 0;
372 151852 : size_t used = pos&31, end = 32;
373 151852 : if (used) {
374 113878 : if (lnr < (32-used))
375 108543 : end = used + lnr;
376 113878 : assert(end > used);
377 113878 : cur |= ((1U << (end - used)) - 1) << used;
378 113878 : lnr -= end - used;
379 113878 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
380 113878 : dst++;
381 113878 : cur = 0;
382 : }
383 151852 : size_t full = lnr/32;
384 151852 : size_t rest = lnr%32;
385 151852 : if (full > 0) {
386 3623 : memset(dst, s->deleted?~0:0, full * sizeof(*dst));
387 3623 : dst += full;
388 3623 : lnr -= full * 32;
389 : }
390 151852 : if (rest > 0) {
391 40697 : cur |= (1U << rest) - 1;
392 40697 : lnr -= rest;
393 40697 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
394 : }
395 151852 : assert(lnr==0);
396 151852 : if (cnt < s->end)
397 317112 : cnt = s->end;
398 : }
399 : }
400 104773 : if (nr > BATcount(b)) {
401 63597 : BATsetcount(b, nr);
402 : }
403 104773 : b->theap->dirty = true;
404 104773 : MT_lock_unset(&b->theaplock);
405 :
406 104773 : bat_destroy(b);
407 104773 : return LOG_OK;
408 : }
409 :
410 : /* TODO return LOG_OK/ERR */
411 : static void
412 104799 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
413 : {
414 104799 : sqlstore* store = tr->store;
415 104799 : segment *cur = s->segs->h, *seg = NULL;
416 480201 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
417 375402 : if (cur->ts == tr->tid) {
418 166492 : if (!cur->deleted)
419 93579 : cur->oldts = 0;
420 166492 : cur->ts = commit_ts;
421 : }
422 375402 : if (!seg) {
423 : /* first segment */
424 : seg = cur;
425 : }
426 270603 : else if (seg->ts < TRANSACTION_ID_BASE) {
427 : /* possible merge since both deleted flags are equal */
428 253851 : if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
429 186625 : int merge = 1;
430 186625 : node *n = store->active->h;
431 573390 : for (int i = 0; i < store->active->cnt; i++, n = n->next) {
432 469117 : sql_trans* other = ((sql_trans*)n->data);
433 469117 : ulng active = other->ts;
434 469117 : if(other->active == 2)
435 21894 : continue; /* pretend that another recently committed transaction is no longer active */
436 447223 : if (active == tr->ts)
437 132428 : continue; /* pretend that committing transaction has already committed and is no longer active */
438 314795 : if (seg->ts < active && cur->ts < active)
439 : break;
440 302831 : if (seg->ts > active && cur->ts > active)
441 232443 : continue;
442 :
443 70388 : assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
444 : /* cannot safely merge since there is an active transaction between the segments */
445 : merge = false;
446 : break;
447 : }
448 : /* merge segments */
449 232474 : if (merge) {
450 116237 : seg->end = cur->end;
451 116237 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
452 116237 : if (cur == s->segs->t)
453 28190 : s->segs->t = seg;
454 116237 : if (commit_ts == oldest) {
455 99341 : _DELETE(cur);
456 : } else
457 33792 : mark4destroy(cur, change, commit_ts);
458 116237 : cur = seg;
459 116237 : continue;
460 : }
461 : }
462 : }
463 : seg = cur;
464 : }
465 104799 : }
466 :
467 : static int
468 2213459 : segments_in_transaction(sql_trans *tr, sql_table *t)
469 : {
470 2213459 : storage *s = ATOMIC_PTR_GET(&t->data);
471 2213459 : segment *seg = s->segs->h;
472 :
473 2213459 : if (seg && s->segs->t->ts == tr->tid)
474 : return 1;
475 602712 : for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
476 497151 : if (seg->ts == tr->tid)
477 : return 1;
478 : }
479 : return 0;
480 : }
481 :
482 : static size_t
483 20857376 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
484 : {
485 20857376 : size_t cnt = 0;
486 :
487 : /* because a table can grow rows over the time a transaction is running, we need to find the last valid segment, to
488 : * keep all of the parts aligned */
489 20857376 : lock_table(tr->store, table->base.id);
490 20924814 : segment *s = segs->h, *l = NULL;
491 :
492 20924814 : if (segs->t && SEG_IS_VALID(segs->t, tr))
493 17198856 : l = s = segs->t;
494 :
495 220014372 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
496 199089755 : if (SEG_IS_VALID(s, tr))
497 : l = s;
498 : }
499 20924617 : if (l)
500 20906199 : cnt = l->end;
501 20924617 : unlock_table(tr->store, table->base.id);
502 20922993 : return cnt;
503 : }
504 :
505 : static segments *
506 52782 : new_segments(sql_trans *tr, size_t cnt)
507 : {
508 52782 : segments *n = (segments*)GDKmalloc(sizeof(segments));
509 :
510 52784 : if (n) {
511 52784 : n->nr_reused = 0;
512 52784 : n->h = n->t = new_segment(NULL, tr, cnt);
513 52784 : if (!n->h) {
514 0 : GDKfree(n);
515 0 : return NULL;
516 : }
517 52784 : sql_ref_init(&n->r);
518 : }
519 : return n;
520 : }
521 :
522 : static sql_delta *
523 34939879 : timestamp_delta( sql_trans *tr, sql_delta *d)
524 : {
525 34993622 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
526 53743 : d = d->next;
527 34946493 : return d;
528 : }
529 :
530 : static sql_delta *
531 34775257 : col_timestamp_delta( sql_trans *tr, sql_column *c)
532 : {
533 34775257 : return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
534 : }
535 :
536 : static sql_delta *
537 27332 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
538 : {
539 27332 : return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
540 : }
541 :
542 : static storage *
543 21341504 : timestamp_storage( sql_trans *tr, storage *d)
544 : {
545 21341504 : if (!d)
546 : return NULL;
547 21412708 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
548 71204 : d = d->next;
549 : return d;
550 : }
551 :
552 : static storage *
553 21324623 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
554 : {
555 21324623 : return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
556 : }
557 :
558 : static sql_delta*
559 21406 : delta_dup(sql_delta *d)
560 : {
561 21406 : ATOMIC_INC(&d->cs.refcnt);
562 21406 : return d;
563 : }
564 :
565 : static void *
566 19590 : col_dup(sql_column *c)
567 : {
568 19590 : return delta_dup(ATOMIC_PTR_GET(&c->data));
569 : }
570 :
571 : static void *
572 3103 : idx_dup(sql_idx *i)
573 : {
574 3103 : if (!ATOMIC_PTR_GET(&i->data))
575 : return NULL;
576 1816 : return delta_dup(ATOMIC_PTR_GET(&i->data));
577 : }
578 :
579 : static storage*
580 1607 : storage_dup(storage *d)
581 : {
582 1607 : ATOMIC_INC(&d->cs.refcnt);
583 1607 : return d;
584 : }
585 :
586 : static void *
587 1607 : del_dup(sql_table *t)
588 : {
589 1607 : return storage_dup(ATOMIC_PTR_GET(&t->data));
590 : }
591 :
592 : static size_t
593 17 : count_inserts( segment *s, sql_trans *tr)
594 : {
595 17 : size_t cnt = 0;
596 :
597 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
598 55 : if (!s->deleted && s->ts == tr->tid)
599 4 : cnt += s->end - s->start;
600 : }
601 17 : return cnt;
602 : }
603 :
604 : static size_t
605 879870 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
606 : {
607 879870 : size_t cnt = 0;
608 :
609 1031095 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
610 : ;
611 :
612 5614068 : for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
613 4734224 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
614 1628529 : cnt += s->end - s->start;
615 : }
616 879844 : return cnt;
617 : }
618 :
619 : static size_t
620 17 : count_deletes( segment *s, sql_trans *tr)
621 : {
622 17 : size_t cnt = 0;
623 :
624 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
625 55 : if (SEG_IS_DELETED(s, tr))
626 17 : cnt += s->end - s->start;
627 : }
628 17 : return cnt;
629 : }
630 :
631 : #define CNT_ACTIVE 10
632 :
633 : static size_t
634 19573890 : count_col(sql_trans *tr, sql_column *c, int access)
635 : {
636 19573890 : storage *d;
637 19573890 : sql_delta *ds;
638 :
639 19573890 : if (!isTable(c->t))
640 : return 0;
641 19573890 : d = tab_timestamp_storage(tr, c->t);
642 19582515 : ds = col_timestamp_delta(tr, c);
643 19605764 : if (!d ||!ds)
644 : return 0;
645 19605764 : if (access == 2)
646 491250 : return ds?ds->cs.ucnt:0;
647 19114514 : if (access == 1)
648 17 : return count_inserts(d->segs->h, tr);
649 19114497 : if (access == QUICK)
650 0 : return d->segs->t?d->segs->t->end:0;
651 19114497 : if (access == CNT_ACTIVE) {
652 879719 : size_t cnt = segs_end(d->segs, tr, c->t);
653 880003 : lock_table(tr->store, c->t->base.id);
654 879900 : cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
655 879866 : unlock_table(tr->store, c->t->base.id);
656 879866 : return cnt;
657 : }
658 18234778 : return segs_end(d->segs, tr, c->t);
659 : }
660 :
661 : static size_t
662 23334 : count_idx(sql_trans *tr, sql_idx *i, int access)
663 : {
664 23334 : storage *d;
665 23334 : sql_delta *ds;
666 :
667 23334 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
668 6719 : return 0;
669 16614 : d = tab_timestamp_storage(tr, i->t);
670 16641 : ds = idx_timestamp_delta(tr, i);
671 16653 : if (!d || !ds)
672 : return 0;
673 16653 : if (access == 2)
674 2871 : return ds?ds->cs.ucnt:0;
675 13782 : if (access == 1)
676 0 : return count_inserts(d->segs->h, tr);
677 13782 : if (access == QUICK)
678 3529 : return d->segs->t?d->segs->t->end:0;
679 10253 : return segs_end(d->segs, tr, i->t);
680 : }
681 :
682 : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
683 : static BAT *
684 15056756 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
685 : {
686 15056756 : BAT *b;
687 :
688 15056756 : assert(access == RD_UPD_ID || access == RD_UPD_VAL);
689 : /* returns the updates for cs */
690 15056756 : if (cs->uibid && cs->uvbid && cs->ucnt) {
691 14444 : if (access == RD_UPD_ID) {
692 8683 : if (!(b = temp_descriptor(cs->uibid)))
693 : return NULL;
694 8682 : if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
695 2317 : (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
696 6365 : oid nil = oid_nil;
697 : /* less then cnt */
698 6365 : BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false, false);
699 6366 : if (!s) {
700 0 : bat_destroy(b);
701 0 : return NULL;
702 : }
703 :
704 6366 : BAT *nb = BATproject(s, b);
705 6366 : bat_destroy(s);
706 6366 : bat_destroy(b);
707 6366 : b = nb;
708 : }
709 : } else {
710 5761 : b = temp_descriptor(cs->uvbid);
711 : }
712 : } else {
713 25071816 : b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
714 : }
715 : return b;
716 : }
717 :
718 : static BAT *
719 0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
720 : {
721 0 : int err = 0;
722 0 : BAT *uv = *UV;
723 0 : BUN cnt = BATcount(ui)+BATcount(oi);
724 0 : BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
725 0 : BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
726 :
727 0 : if (!ni || (uv && !nv)) {
728 0 : bat_destroy(ni);
729 0 : bat_destroy(nv);
730 0 : bat_destroy(ui);
731 0 : bat_destroy(uv);
732 0 : bat_destroy(oi);
733 0 : bat_destroy(ov);
734 0 : return NULL;
735 : }
736 0 : BATiter uvi;
737 0 : BATiter ovi;
738 :
739 0 : if (uv) {
740 0 : uvi = bat_iterator(uv);
741 0 : ovi = bat_iterator(ov);
742 : }
743 :
744 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
745 0 : BUN uip = 0, uie = BATcount(ui);
746 0 : BUN oip = 0, oie = BATcount(oi);
747 :
748 0 : oid uiseqb = ui->tseqbase;
749 0 : oid oiseqb = oi->tseqbase;
750 0 : oid *uipt = NULL, *oipt = NULL;
751 0 : BATiter uii = bat_iterator(ui);
752 0 : BATiter oii = bat_iterator(oi);
753 0 : if (!BATtdensebi(&uii))
754 0 : uipt = uii.base;
755 0 : if (!BATtdensebi(&oii))
756 0 : oipt = oii.base;
757 0 : while (uip < uie && oip < oie && !err) {
758 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
759 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
760 :
761 0 : if (uiid <= oiid) {
762 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
763 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
764 : err = 1;
765 0 : uip++;
766 0 : if (uiid == oiid)
767 0 : oip++;
768 : } else { /* uiid > oiid */
769 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
770 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
771 : err = 1;
772 0 : oip++;
773 : }
774 : }
775 0 : while (uip < uie && !err) {
776 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
777 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
778 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
779 : err = 1;
780 0 : uip++;
781 : }
782 0 : while (oip < oie && !err) {
783 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
784 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
785 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
786 : err = 1;
787 0 : oip++;
788 : }
789 0 : if (uv) {
790 0 : bat_iterator_end(&uvi);
791 0 : bat_iterator_end(&ovi);
792 : }
793 0 : bat_iterator_end(&uii);
794 0 : bat_iterator_end(&oii);
795 0 : bat_destroy(ui);
796 0 : bat_destroy(uv);
797 0 : bat_destroy(oi);
798 0 : bat_destroy(ov);
799 0 : if (!err) {
800 0 : if (nv)
801 0 : *UV = nv;
802 0 : return ni;
803 : }
804 0 : *UV = NULL;
805 0 : bat_destroy(ni);
806 0 : bat_destroy(nv);
807 0 : return NULL;
808 : }
809 :
810 : static sql_delta *
811 10041181 : older_delta( sql_delta *d, sql_trans *tr)
812 : {
813 10041181 : sql_delta *o = d->next;
814 :
815 10050290 : while (o && !o->cs.merged) {
816 9104 : if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
817 : break;
818 : else
819 9109 : o = o->next;
820 : }
821 10041186 : if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
822 0 : return o;
823 : return NULL;
824 : }
825 :
826 : static BAT *
827 10038034 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
828 : {
829 10038034 : assert(tr->active);
830 10038034 : sql_delta *o = NULL;
831 10038034 : BAT *ui = NULL, *uv = NULL;
832 :
833 10038034 : if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
834 : return NULL;
835 10040920 : if (access == RD_UPD_VAL) {
836 5022068 : if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
837 0 : bat_destroy(ui);
838 0 : return NULL;
839 : }
840 : }
841 10041121 : while ((o = older_delta(d, tr)) != NULL) {
842 0 : BAT *oui = NULL, *ouv = NULL;
843 0 : if (!oui)
844 0 : oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
845 0 : if (access == RD_UPD_VAL)
846 0 : ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
847 0 : if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
848 0 : bat_destroy(ui);
849 0 : bat_destroy(uv);
850 0 : bat_destroy(oui);
851 0 : bat_destroy(ouv);
852 0 : return NULL;
853 : }
854 0 : if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
855 : return NULL;
856 : d = o;
857 : }
858 10041220 : if (uv) {
859 5022276 : bat_destroy(ui);
860 5022276 : return uv;
861 : }
862 : return ui;
863 : }
864 :
865 : static BAT *
866 2842 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
867 : {
868 2842 : lock_column(tr->store, c->base.id);
869 2842 : sql_delta *d = col_timestamp_delta(tr, c);
870 2842 : int type = c->type.type->localtype;
871 :
872 2842 : if (!d) {
873 0 : unlock_column(tr->store, c->base.id);
874 0 : return NULL;
875 : }
876 2842 : if (d->cs.st == ST_DICT) {
877 0 : BAT *b = quick_descriptor(d->cs.bid);
878 :
879 0 : type = b->ttype;
880 : }
881 2842 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
882 2842 : unlock_column(tr->store, c->base.id);
883 2842 : return bn;
884 : }
885 :
886 : static BAT *
887 0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
888 : {
889 0 : lock_column(tr->store, i->base.id);
890 0 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
891 0 : sql_delta *d = idx_timestamp_delta(tr, i);
892 :
893 0 : if (!d) {
894 0 : unlock_column(tr->store, i->base.id);
895 0 : return NULL;
896 : }
897 0 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
898 0 : unlock_column(tr->store, i->base.id);
899 0 : return bn;
900 : }
901 :
902 : static BAT *
903 10200581 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
904 : {
905 10200581 : BAT *b;
906 :
907 10200581 : assert(access == RDONLY || access == QUICK || access == RD_EXT);
908 10200581 : assert(cs != NULL);
909 10200581 : if (access == QUICK)
910 173355 : return quick_descriptor(cs->bid);
911 10027226 : if (access == RD_EXT)
912 857 : return temp_descriptor(cs->ebid);
913 10026369 : assert(cs->bid);
914 10026369 : b = temp_descriptor(cs->bid);
915 10027451 : if (b == NULL)
916 : return NULL;
917 10027451 : assert(b->batRestricted == BAT_READ);
918 : /* return slice */
919 10027451 : BAT *s = BATslice(b, 0, cnt);
920 10015468 : bat_destroy(b);
921 10015468 : return s;
922 : }
923 :
924 : static int
925 5018215 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
926 : {
927 5018215 : lock_column(tr->store, c->base.id);
928 5017919 : size_t cnt = count_col(tr, c, 0);
929 5019338 : sql_delta *d = col_timestamp_delta(tr, c);
930 5018797 : int type = c->type.type->localtype;
931 :
932 5018797 : if (!d) {
933 0 : unlock_column(tr->store, c->base.id);
934 0 : return LOG_ERR;
935 : }
936 5018797 : if (d->cs.st == ST_DICT) {
937 2 : BAT *b = quick_descriptor(d->cs.bid);
938 :
939 2 : type = b->ttype;
940 : }
941 :
942 5018797 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
943 5019317 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
944 :
945 5019283 : unlock_column(tr->store, c->base.id);
946 :
947 5019021 : if (*ui == NULL || *uv == NULL) {
948 0 : bat_destroy(*ui);
949 0 : bat_destroy(*uv);
950 0 : return LOG_ERR;
951 : }
952 : return LOG_OK;
953 : }
954 :
955 : static int
956 23 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
957 : {
958 23 : lock_column(tr->store, i->base.id);
959 23 : size_t cnt = count_idx(tr, i, 0);
960 23 : sql_delta *d = idx_timestamp_delta(tr, i);
961 23 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
962 :
963 23 : if (!d) {
964 0 : unlock_column(tr->store, i->base.id);
965 0 : return LOG_ERR;
966 : }
967 :
968 23 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
969 23 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
970 :
971 23 : unlock_column(tr->store, i->base.id);
972 :
973 23 : if (*ui == NULL || *uv == NULL) {
974 0 : bat_destroy(*ui);
975 0 : bat_destroy(*uv);
976 0 : return LOG_ERR;
977 : }
978 : return LOG_OK;
979 : }
980 :
981 : static void * /* BAT * */
982 10188008 : bind_col(sql_trans *tr, sql_column *c, int access)
983 : {
984 10188008 : assert(access == QUICK || tr->active);
985 10188008 : if (!isTable(c->t))
986 : return NULL;
987 10188008 : sql_delta *d = col_timestamp_delta(tr, c);
988 10190053 : if (!d)
989 : return NULL;
990 10190053 : size_t cnt = count_col(tr, c, 0);
991 10193827 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
992 2842 : return bind_ucol(tr, c, access, cnt);
993 10190985 : BAT *b = cs_bind_bat( &d->cs, access, cnt);
994 10185428 : assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
995 : return b;
996 : }
997 :
998 : static void * /* BAT * */
999 10699 : bind_idx(sql_trans *tr, sql_idx * i, int access)
1000 : {
1001 10699 : assert(access == QUICK || tr->active);
1002 10699 : if (!isTable(i->t))
1003 : return NULL;
1004 10699 : sql_delta *d = idx_timestamp_delta(tr, i);
1005 10701 : if (!d)
1006 : return NULL;
1007 10701 : size_t cnt = count_idx(tr, i, 0);
1008 10706 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
1009 0 : return bind_uidx(tr, i, access, cnt);
1010 10706 : return cs_bind_bat( &d->cs, access, cnt);
1011 : }
1012 :
1013 : static int
1014 4152 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
1015 : {
1016 4152 : if (!cs->uibid) {
1017 0 : cs->uibid = e_bat(TYPE_oid);
1018 0 : if (cs->uibid == BID_NIL)
1019 : return LOG_ERR;
1020 : }
1021 4152 : if (!cs->uvbid) {
1022 0 : BAT *cur = quick_descriptor(cs->bid);
1023 0 : if (!cur)
1024 : return LOG_ERR;
1025 0 : int type = cur->ttype;
1026 0 : cs->uvbid = e_bat(type);
1027 0 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1028 : return LOG_ERR;
1029 : }
1030 4152 : BAT *ui = temp_descriptor(cs->uibid);
1031 4152 : BAT *uv = temp_descriptor(cs->uvbid);
1032 :
1033 4152 : if (ui == NULL || uv == NULL) {
1034 0 : bat_destroy(ui);
1035 0 : bat_destroy(uv);
1036 0 : return LOG_ERR;
1037 : }
1038 4152 : assert(ui && uv);
1039 4152 : if (isEbat(ui)){
1040 400 : temp_destroy(cs->uibid);
1041 400 : cs->uibid = temp_copy(ui->batCacheid, true, true);
1042 400 : bat_destroy(ui);
1043 400 : if (cs->uibid == BID_NIL ||
1044 400 : (ui = temp_descriptor(cs->uibid)) == NULL) {
1045 0 : bat_destroy(uv);
1046 0 : return LOG_ERR;
1047 : }
1048 : }
1049 4152 : if (isEbat(uv)){
1050 400 : temp_destroy(cs->uvbid);
1051 400 : cs->uvbid = temp_copy(uv->batCacheid, true, true);
1052 400 : bat_destroy(uv);
1053 400 : if (cs->uvbid == BID_NIL ||
1054 400 : (uv = temp_descriptor(cs->uvbid)) == NULL) {
1055 0 : bat_destroy(ui);
1056 0 : return LOG_ERR;
1057 : }
1058 : }
1059 4152 : *Ui = ui;
1060 4152 : *Uv = uv;
1061 4152 : return LOG_OK;
1062 : }
1063 :
1064 : static int
1065 6972 : segments_is_append(segment *s, sql_trans *tr, oid rid)
1066 : {
1067 102649 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1068 102649 : if (s->start <= rid && s->end > rid) {
1069 6972 : if (s->ts == tr->tid && !s->deleted) {
1070 2922 : return 1;
1071 : }
1072 : break;
1073 : }
1074 : }
1075 : return 0;
1076 : }
1077 :
1078 : static int
1079 4050 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
1080 : {
1081 96455 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1082 96455 : if (s->start <= rid && s->end > rid) {
1083 4050 : if (s->ts >= tr->ts && s->deleted) {
1084 0 : return 1;
1085 : }
1086 : break;
1087 : }
1088 : }
1089 : return 0;
1090 : }
1091 :
1092 : static sql_delta *
1093 0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
1094 : {
1095 0 : sql_delta *n = ZNEW(sql_delta);
1096 0 : if (!n)
1097 : return NULL;
1098 0 : *n = *bat;
1099 0 : n->next = NULL;
1100 0 : n->cs.ts = tr->tid;
1101 0 : return n;
1102 : }
1103 :
1104 : static BAT *
1105 17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
1106 : {
1107 17 : BAT *newoffsets = NULL;
1108 17 : sql_delta *bat = *batp;
1109 17 : column_storage *cs = &bat->cs;
1110 17 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1111 :
1112 17 : if (!u)
1113 : return NULL;
1114 17 : BUN max_cnt = (BATcount(u) < 256)?256:(BATcount(u)<65536)?65536:INT_MAX;
1115 17 : if (DICTprepare4append(&newoffsets, i, u) < 0) {
1116 0 : bat_destroy(u);
1117 0 : return NULL;
1118 : } else {
1119 17 : int new = 0;
1120 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1121 17 : if (BATcount(u) >= max_cnt) {
1122 1 : if (max_cnt == INT_MAX) { /* decompress */
1123 0 : if (!(b = temp_descriptor(cs->bid))) {
1124 0 : bat_destroy(u);
1125 0 : return NULL;
1126 : }
1127 0 : if (cs->ucnt) {
1128 0 : BAT *ui = NULL, *uv = NULL;
1129 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1130 0 : bat_destroy(b);
1131 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1132 0 : bat_destroy(nb);
1133 0 : bat_destroy(u);
1134 0 : return NULL;
1135 : }
1136 0 : b = nb;
1137 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1138 0 : bat_destroy(ui);
1139 0 : bat_destroy(uv);
1140 0 : bat_destroy(b);
1141 0 : bat_destroy(u);
1142 : }
1143 0 : bat_destroy(ui);
1144 0 : bat_destroy(uv);
1145 : }
1146 0 : n = DICTdecompress_(b, u, PERSISTENT);
1147 0 : bat_destroy(b);
1148 0 : assert(newoffsets == NULL);
1149 0 : if (!n) {
1150 0 : bat_destroy(u);
1151 0 : return NULL;
1152 : }
1153 0 : if (cs->ts != tr->tid) {
1154 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1155 0 : bat_destroy(n);
1156 0 : return NULL;
1157 : }
1158 0 : cs = &(*batp)->cs;
1159 0 : new = 1;
1160 : }
1161 0 : if (cs->bid && !new)
1162 0 : temp_destroy(cs->bid);
1163 0 : n = transfer_to_systrans(n);
1164 0 : if (n == NULL)
1165 : return NULL;
1166 0 : bat_set_access(n, BAT_READ);
1167 0 : cs->bid = temp_create(n);
1168 0 : bat_destroy(n);
1169 0 : if (cs->ebid && !new)
1170 0 : temp_destroy(cs->ebid);
1171 0 : cs->ebid = 0;
1172 0 : cs->ucnt = 0;
1173 0 : if (cs->uibid && !new)
1174 0 : temp_destroy(cs->uibid);
1175 0 : if (cs->uvbid && !new)
1176 0 : temp_destroy(cs->uvbid);
1177 0 : cs->uibid = cs->uvbid = 0;
1178 0 : cs->st = ST_DEFAULT;
1179 0 : cs->cleared = true;
1180 : } else {
1181 1 : if (!(b = temp_descriptor(cs->bid))) {
1182 0 : bat_destroy(newoffsets);
1183 0 : bat_destroy(u);
1184 0 : return NULL;
1185 : }
1186 2 : n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
1187 1 : bat_destroy(b);
1188 1 : if (!n) {
1189 0 : bat_destroy(newoffsets);
1190 0 : bat_destroy(u);
1191 0 : return NULL;
1192 : }
1193 1 : if (cs->ts != tr->tid) {
1194 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1195 0 : bat_destroy(n);
1196 0 : return NULL;
1197 : }
1198 0 : cs = &(*batp)->cs;
1199 0 : new = 1;
1200 0 : temp_dup(cs->ebid);
1201 0 : if (cs->uibid) {
1202 0 : temp_dup(cs->uibid);
1203 0 : temp_dup(cs->uvbid);
1204 : }
1205 : }
1206 1 : if (cs->bid && !new)
1207 1 : temp_destroy(cs->bid);
1208 1 : n = transfer_to_systrans(n);
1209 1 : if (n == NULL)
1210 : return NULL;
1211 1 : bat_set_access(n, BAT_READ);
1212 1 : cs->bid = temp_create(n);
1213 1 : bat_destroy(n);
1214 1 : cs->cleared = true;
1215 1 : i = newoffsets;
1216 : }
1217 : } else { /* append */
1218 16 : i = newoffsets;
1219 : }
1220 : }
1221 17 : bat_destroy(u);
1222 17 : return i;
1223 : }
1224 :
1225 : static BAT *
1226 0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
1227 : {
1228 0 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1229 0 : BAT *newoffsets = NULL;
1230 0 : BAT *b = NULL, *n = NULL;
1231 :
1232 0 : if (!(b = temp_descriptor(cs->bid)))
1233 : return NULL;
1234 :
1235 0 : if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
1236 0 : bat_destroy(b);
1237 0 : return NULL;
1238 : } else {
1239 : /* returns new offset bat if values within min/max, else decompress */
1240 0 : if (!newoffsets) { /* decompress */
1241 0 : if (cs->ucnt) {
1242 0 : BAT *ui = NULL, *uv = NULL;
1243 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1244 0 : bat_destroy(b);
1245 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1246 0 : bat_destroy(nb);
1247 0 : return NULL;
1248 : }
1249 0 : b = nb;
1250 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1251 0 : bat_destroy(ui);
1252 0 : bat_destroy(uv);
1253 0 : bat_destroy(b);
1254 : }
1255 0 : bat_destroy(ui);
1256 0 : bat_destroy(uv);
1257 : }
1258 0 : n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
1259 0 : bat_destroy(b);
1260 0 : if (!n)
1261 : return NULL;
1262 0 : if (cs->bid)
1263 0 : temp_destroy(cs->bid);
1264 0 : n = transfer_to_systrans(n);
1265 0 : if (n == NULL)
1266 : return NULL;
1267 0 : bat_set_access(n, BAT_READ);
1268 0 : cs->bid = temp_create(n);
1269 0 : cs->ucnt = 0;
1270 0 : if (cs->uibid)
1271 0 : temp_destroy(cs->uibid);
1272 0 : if (cs->uvbid)
1273 0 : temp_destroy(cs->uvbid);
1274 0 : cs->uibid = cs->uvbid = 0;
1275 0 : cs->st = ST_DEFAULT;
1276 0 : cs->cleared = true;
1277 0 : b = n;
1278 : } else { /* append */
1279 : i = newoffsets;
1280 : }
1281 : }
1282 0 : bat_destroy(b);
1283 0 : return i;
1284 : }
1285 :
1286 : /*
1287 : * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
1288 : */
1289 : static int
1290 3158 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
1291 : {
1292 3158 : int res = LOG_OK;
1293 3158 : sql_delta *bat = *batp;
1294 3158 : column_storage *cs = &bat->cs;
1295 3158 : BAT *otids = tids, *oupdates = updates;
1296 :
1297 3158 : if (!BATcount(tids))
1298 : return LOG_OK;
1299 :
1300 3158 : if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
1301 6 : tids = BATunmask(tids);
1302 6 : if (!tids)
1303 : return LOG_ERR;
1304 : }
1305 3158 : if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
1306 0 : updates = BATunmask(updates);
1307 0 : if (!updates) {
1308 0 : if (otids != tids)
1309 0 : bat_destroy(tids);
1310 0 : return LOG_ERR;
1311 : }
1312 3158 : } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
1313 42 : updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
1314 42 : if (!updates) {
1315 0 : if (otids != tids)
1316 0 : bat_destroy(tids);
1317 0 : return LOG_ERR;
1318 : }
1319 : }
1320 :
1321 3158 : if (cs->st == ST_DICT) {
1322 : /* possibly a new array is returned */
1323 4 : BAT *nupdates = dict_append_bat(tr, batp, updates);
1324 4 : bat = *batp;
1325 4 : cs = &bat->cs;
1326 4 : if (oupdates != updates)
1327 0 : bat_destroy(updates);
1328 4 : updates = nupdates;
1329 4 : if (!updates) {
1330 0 : if (otids != tids)
1331 0 : bat_destroy(tids);
1332 0 : return LOG_ERR;
1333 : }
1334 : }
1335 :
1336 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1337 : /* currently only one update delta is possible */
1338 3158 : lock_table(tr->store, t->base.id);
1339 3158 : storage *s = ATOMIC_PTR_GET(&t->data);
1340 3158 : if (!is_new && !cs->cleared) {
1341 2835 : if (!tids->tsorted /* make sure we have simple dense or oids */) {
1342 6 : BAT *sorted, *order;
1343 6 : if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
1344 0 : if (otids != tids)
1345 0 : bat_destroy(tids);
1346 0 : if (oupdates != updates)
1347 0 : bat_destroy(updates);
1348 0 : unlock_table(tr->store, t->base.id);
1349 0 : return LOG_ERR;
1350 : }
1351 6 : if (otids != tids)
1352 0 : bat_destroy(tids);
1353 6 : tids = sorted;
1354 6 : BAT *nupdates = BATproject(order, updates);
1355 6 : bat_destroy(order);
1356 6 : if (oupdates != updates)
1357 0 : bat_destroy(updates);
1358 6 : updates = nupdates;
1359 6 : if (!updates) {
1360 0 : bat_destroy(tids);
1361 0 : unlock_table(tr->store, t->base.id);
1362 0 : return LOG_ERR;
1363 : }
1364 : }
1365 2835 : assert(tids->tsorted);
1366 2835 : BAT *ui = NULL, *uv = NULL;
1367 :
1368 : /* handle updates on just inserted bits */
1369 : /* handle updates on updates (within one transaction) */
1370 2835 : BATiter upi = bat_iterator(updates);
1371 2835 : BUN cnt = 0, ucnt = BATcount(tids);
1372 2835 : BAT *b, *ins = NULL;
1373 2835 : int *msk = NULL;
1374 :
1375 2835 : if((b = temp_descriptor(cs->bid)) == NULL)
1376 : res = LOG_ERR;
1377 :
1378 2835 : if (res == LOG_OK && BATtdense(tids)) {
1379 2631 : oid start = tids->tseqbase, offset = start;
1380 2631 : oid end = start + ucnt;
1381 :
1382 13272 : for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
1383 11322 : if (seg->start <= start && seg->end > start) {
1384 : /* check for delete conflicts */
1385 2631 : if (seg->ts >= tr->ts && seg->deleted) {
1386 0 : res = LOG_CONFLICT;
1387 0 : continue;
1388 : }
1389 :
1390 : /* check for inplace updates */
1391 2631 : BUN lend = end < seg->end?end:seg->end;
1392 2631 : if (seg->ts == tr->tid && !seg->deleted) {
1393 198 : if (!ins) {
1394 198 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1395 198 : if (!ins)
1396 : res = LOG_ERR;
1397 : else {
1398 198 : BATsetcount(ins, ucnt); /* all full updates */
1399 198 : msk = (int*)Tloc(ins, 0);
1400 198 : BUN end = (ucnt+31)/32;
1401 198 : memset(msk, 0, end * sizeof(int));
1402 : }
1403 : }
1404 681 : for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
1405 483 : const void *upd = BUNtail(upi, rid-offset);
1406 483 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1407 0 : res = LOG_ERR;
1408 :
1409 483 : oid word = i/32;
1410 483 : int pos = i%32;
1411 483 : msk[word] |= 1U<<pos;
1412 483 : cnt++;
1413 : }
1414 : }
1415 : }
1416 11322 : if (end < seg->end)
1417 : break;
1418 : }
1419 207 : } else if (res == LOG_OK && complex_cand(tids)) {
1420 3 : struct canditer ci;
1421 3 : segment *seg = s->segs->h;
1422 3 : canditer_init(&ci, NULL, tids);
1423 3 : BUN i = 0;
1424 1036 : while ( seg && res == LOG_OK && i < ucnt) {
1425 1033 : oid rid = canditer_next(&ci);
1426 1033 : if (seg->end <= rid)
1427 13 : seg = ATOMIC_PTR_GET(&seg->next);
1428 1020 : else if (seg->start <= rid && seg->end > rid) {
1429 : /* check for delete conflicts */
1430 1020 : if (seg->ts >= tr->ts && seg->deleted) {
1431 0 : res = LOG_CONFLICT;
1432 0 : continue;
1433 : }
1434 :
1435 : /* check for inplace updates */
1436 1020 : if (seg->ts == tr->tid && !seg->deleted) {
1437 0 : if (!ins) {
1438 0 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1439 0 : if (!ins) {
1440 : res = LOG_ERR;
1441 : break;
1442 : } else {
1443 0 : BATsetcount(ins, ucnt); /* all full updates */
1444 0 : msk = (int*)Tloc(ins, 0);
1445 0 : BUN end = (ucnt+31)/32;
1446 0 : memset(msk, 0, end * sizeof(int));
1447 : }
1448 : }
1449 0 : ptr upd = BUNtail(upi, i);
1450 0 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1451 0 : res = LOG_ERR;
1452 :
1453 0 : oid word = i/32;
1454 0 : int pos = i%32;
1455 0 : msk[word] |= 1U<<pos;
1456 0 : cnt++;
1457 : }
1458 1020 : i++;
1459 : }
1460 : }
1461 201 : } else if (res == LOG_OK) {
1462 201 : BUN i = 0;
1463 201 : oid *rid = Tloc(tids,0);
1464 201 : segment *seg = s->segs->h;
1465 21958 : while ( seg && res == LOG_OK && i < ucnt) {
1466 21757 : if (seg->end <= rid[i])
1467 8325 : seg = ATOMIC_PTR_GET(&seg->next);
1468 13432 : else if (seg->start <= rid[i] && seg->end > rid[i]) {
1469 : /* check for delete conflicts */
1470 13432 : if (seg->ts >= tr->ts && seg->deleted) {
1471 0 : res = LOG_CONFLICT;
1472 0 : continue;
1473 : }
1474 :
1475 : /* check for inplace updates */
1476 13432 : if (seg->ts == tr->tid && !seg->deleted) {
1477 995 : if (!ins) {
1478 80 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1479 80 : if (!ins) {
1480 : res = LOG_ERR;
1481 : break;
1482 : } else {
1483 80 : BATsetcount(ins, ucnt); /* all full updates */
1484 80 : msk = (int*)Tloc(ins, 0);
1485 80 : BUN end = (ucnt+31)/32;
1486 80 : memset(msk, 0, end * sizeof(int));
1487 : }
1488 : }
1489 995 : const void *upd = BUNtail(upi, i);
1490 995 : if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
1491 0 : res = LOG_ERR;
1492 :
1493 995 : oid word = i/32;
1494 995 : int pos = i%32;
1495 995 : msk[word] |= 1U<<pos;
1496 995 : cnt++;
1497 : }
1498 13432 : i++;
1499 : }
1500 : }
1501 : }
1502 :
1503 2835 : if (res == LOG_OK && cnt < ucnt) { /* now handle real updates */
1504 2593 : if (cs->ucnt == 0) {
1505 2491 : if (cnt) {
1506 12 : BAT *nins = BATmaskedcands(0, ucnt, ins, false);
1507 12 : if (nins) {
1508 12 : ui = BATproject(nins, tids);
1509 12 : uv = BATproject(nins, updates);
1510 12 : bat_destroy(nins);
1511 : }
1512 : } else {
1513 2479 : ui = temp_descriptor(tids->batCacheid);
1514 2479 : uv = temp_descriptor(updates->batCacheid);
1515 : }
1516 2491 : if (!ui || !uv) {
1517 : res = LOG_ERR;
1518 : } else {
1519 2491 : temp_destroy(cs->uibid);
1520 2491 : temp_destroy(cs->uvbid);
1521 2491 : ui = transfer_to_systrans(ui);
1522 2491 : uv = transfer_to_systrans(uv);
1523 2491 : if (ui == NULL || uv == NULL) {
1524 0 : BBPreclaim(ui);
1525 0 : BBPreclaim(uv);
1526 : res = LOG_ERR;
1527 : } else {
1528 2491 : cs->uibid = temp_create(ui);
1529 2491 : cs->uvbid = temp_create(uv);
1530 2491 : cs->ucnt = BATcount(ui);
1531 : }
1532 : }
1533 : } else {
1534 102 : BAT *nui = NULL, *nuv = NULL;
1535 :
1536 : /* merge taking msk of inserted into account */
1537 102 : if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
1538 : res = LOG_ERR;
1539 :
1540 102 : if (res == LOG_OK) {
1541 102 : const void *upd = NULL;
1542 102 : nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
1543 102 : nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
1544 :
1545 102 : if (!nui || !nuv) {
1546 : res = LOG_ERR;
1547 : } else {
1548 102 : BATiter ovi = bat_iterator(uv);
1549 :
1550 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
1551 102 : BUN uip = 0, uie = BATcount(ui);
1552 102 : BUN nip = 0, nie = BATcount(tids);
1553 102 : oid uiseqb = ui->tseqbase;
1554 102 : oid niseqb = tids->tseqbase;
1555 102 : oid *uipt = NULL, *nipt = NULL;
1556 102 : BATiter uii = bat_iterator(ui);
1557 102 : BATiter tidsi = bat_iterator(tids);
1558 102 : if (!BATtdensebi(&uii))
1559 97 : uipt = uii.base;
1560 102 : if (!BATtdensebi(&tidsi))
1561 93 : nipt = tidsi.base;
1562 16091 : while (uip < uie && nip < nie && res == LOG_OK) {
1563 15989 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1564 15989 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1565 :
1566 15989 : if (uiv < niv) {
1567 8068 : upd = BUNtail(ovi, uip);
1568 16136 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1569 8068 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1570 : res = LOG_ERR;
1571 8068 : uip++;
1572 7921 : } else if (uiv == niv) {
1573 : /* handle == */
1574 1522 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1575 1522 : upd = BUNtail(upi, nip);
1576 3044 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1577 1522 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1578 : res = LOG_ERR;
1579 : } else {
1580 0 : upd = BUNtail(ovi, uip);
1581 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1582 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1583 : res = LOG_ERR;
1584 : }
1585 1522 : uip++;
1586 1522 : nip++;
1587 : } else { /* uiv > niv */
1588 6399 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1589 6255 : upd = BUNtail(upi, nip);
1590 12510 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1591 6255 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1592 : res = LOG_ERR;
1593 : }
1594 6399 : nip++;
1595 : }
1596 : }
1597 923 : while (uip < uie && res == LOG_OK) {
1598 821 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1599 821 : upd = BUNtail(ovi, uip);
1600 1642 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1601 821 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1602 : res = LOG_ERR;
1603 821 : uip++;
1604 : }
1605 703 : while (nip < nie && res == LOG_OK) {
1606 601 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1607 601 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1608 329 : upd = BUNtail(upi, nip);
1609 658 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1610 329 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1611 : res = LOG_ERR;
1612 : }
1613 601 : nip++;
1614 : }
1615 102 : bat_iterator_end(&uii);
1616 102 : bat_iterator_end(&tidsi);
1617 102 : bat_iterator_end(&ovi);
1618 102 : if (res == LOG_OK) {
1619 102 : temp_destroy(cs->uibid);
1620 102 : temp_destroy(cs->uvbid);
1621 102 : nui = transfer_to_systrans(nui);
1622 102 : nuv = transfer_to_systrans(nuv);
1623 102 : if (nui == NULL || nuv == NULL) {
1624 : res = LOG_ERR;
1625 : } else {
1626 102 : cs->uibid = temp_create(nui);
1627 102 : cs->uvbid = temp_create(nuv);
1628 102 : cs->ucnt = BATcount(nui);
1629 : }
1630 : }
1631 : }
1632 102 : bat_destroy(nui);
1633 102 : bat_destroy(nuv);
1634 : }
1635 : }
1636 : }
1637 2835 : bat_iterator_end(&upi);
1638 2835 : bat_destroy(b);
1639 2835 : unlock_table(tr->store, t->base.id);
1640 2835 : bat_destroy(ins);
1641 2835 : bat_destroy(ui);
1642 2835 : bat_destroy(uv);
1643 2835 : if (otids != tids)
1644 10 : bat_destroy(tids);
1645 2835 : if (oupdates != updates)
1646 11 : bat_destroy(updates);
1647 2835 : return res;
1648 : } else if (is_new || cs->cleared) {
1649 323 : BAT *b = temp_descriptor(cs->bid);
1650 :
1651 323 : if (b == NULL) {
1652 : res = LOG_ERR;
1653 : } else {
1654 323 : if (BATcount(b)==0) {
1655 1 : if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
1656 0 : res = LOG_ERR;
1657 322 : } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
1658 0 : res = LOG_ERR;
1659 323 : BBPcold(b->batCacheid);
1660 323 : bat_destroy(b);
1661 : }
1662 : }
1663 323 : unlock_table(tr->store, t->base.id);
1664 323 : if (otids != tids)
1665 2 : bat_destroy(tids);
1666 323 : if (oupdates != updates)
1667 41 : bat_destroy(updates);
1668 : return res;
1669 : }
1670 :
1671 : static int
1672 3158 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
1673 : {
1674 3158 : return cs_update_bat(tr, bat, t, tids, updates, is_new);
1675 : }
1676 :
1677 : static void *
1678 4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
1679 : {
1680 4 : void *newoffsets = NULL;
1681 4 : sql_delta *bat = *batp;
1682 4 : column_storage *cs = &bat->cs;
1683 4 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1684 :
1685 4 : if (!u)
1686 : return NULL;
1687 4 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1688 4 : if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
1689 0 : bat_destroy(u);
1690 0 : return NULL;
1691 : } else {
1692 4 : int new = 0;
1693 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1694 4 : if (BATcount(u) >= max_cnt) {
1695 0 : if (max_cnt == INT_MAX) { /* decompress */
1696 : if (!(b = temp_descriptor(cs->bid))) {
1697 : bat_destroy(u);
1698 : return NULL;
1699 : }
1700 : n = DICTdecompress_(b, u, PERSISTENT);
1701 : /* TODO decompress updates if any */
1702 : bat_destroy(b);
1703 : assert(newoffsets == NULL);
1704 : if (!n) {
1705 : bat_destroy(u);
1706 : return NULL;
1707 : }
1708 : if (cs->ts != tr->tid) {
1709 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1710 : bat_destroy(n);
1711 : bat_destroy(u);
1712 : return NULL;
1713 : }
1714 : cs = &(*batp)->cs;
1715 : new = 1;
1716 : cs->uibid = cs->uvbid = 0;
1717 : }
1718 : if (cs->bid && !new)
1719 : temp_destroy(cs->bid);
1720 : n = transfer_to_systrans(n);
1721 : if (n == NULL) {
1722 : bat_destroy(u);
1723 : return NULL;
1724 : }
1725 : bat_set_access(n, BAT_READ);
1726 : cs->bid = temp_create(n);
1727 : bat_destroy(n);
1728 : if (cs->ebid && !new)
1729 : temp_destroy(cs->ebid);
1730 : cs->ebid = 0;
1731 : cs->st = ST_DEFAULT;
1732 : /* at append_col the column's storage type is cleared */
1733 : cs->cleared = true;
1734 : } else {
1735 0 : if (!(b = temp_descriptor(cs->bid))) {
1736 0 : GDKfree(newoffsets);
1737 0 : bat_destroy(u);
1738 0 : return NULL;
1739 : }
1740 0 : n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
1741 0 : bat_destroy(b);
1742 0 : if (!n) {
1743 0 : GDKfree(newoffsets);
1744 0 : bat_destroy(u);
1745 0 : return NULL;
1746 : }
1747 0 : if (cs->ts != tr->tid) {
1748 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1749 0 : bat_destroy(u);
1750 0 : bat_destroy(n);
1751 0 : return NULL;
1752 : }
1753 0 : cs = &(*batp)->cs;
1754 0 : new = 1;
1755 0 : temp_dup(cs->ebid);
1756 0 : if (cs->uibid) {
1757 0 : temp_dup(cs->uibid);
1758 0 : temp_dup(cs->uvbid);
1759 : }
1760 : }
1761 0 : if (cs->bid)
1762 0 : temp_destroy(cs->bid);
1763 0 : n = transfer_to_systrans(n);
1764 0 : if (n == NULL) {
1765 0 : bat_destroy(u);
1766 0 : return NULL;
1767 : }
1768 0 : bat_set_access(n, BAT_READ);
1769 0 : cs->bid = temp_create(n);
1770 0 : bat_destroy(n);
1771 0 : cs->cleared = true;
1772 0 : i = newoffsets;
1773 : }
1774 : } else { /* append */
1775 4 : i = newoffsets;
1776 : }
1777 : }
1778 4 : bat_destroy(u);
1779 4 : return i;
1780 : }
1781 :
1782 : static void *
1783 1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
1784 : {
1785 1 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1786 1 : void *newoffsets = NULL;
1787 1 : BAT *b = NULL, *n = NULL;
1788 :
1789 1 : if (!(b = temp_descriptor(cs->bid)))
1790 : return NULL;
1791 :
1792 1 : if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
1793 0 : bat_destroy(b);
1794 0 : return NULL;
1795 : } else {
1796 : /* returns new offset bat if values within min/max, else decompress */
1797 1 : if (!newoffsets) {
1798 1 : n = FORdecompress_(b, offsetval, tt, PERSISTENT);
1799 1 : bat_destroy(b);
1800 1 : if (!n)
1801 : return NULL;
1802 : /* TODO decompress updates if any */
1803 1 : if (cs->bid)
1804 1 : temp_destroy(cs->bid);
1805 1 : n = transfer_to_systrans(n);
1806 1 : if (n == NULL)
1807 : return NULL;
1808 1 : bat_set_access(n, BAT_READ);
1809 1 : cs->bid = temp_create(n);
1810 1 : cs->st = ST_DEFAULT;
1811 : /* at append_col the column's storage type is cleared */
1812 1 : cs->cleared = true;
1813 1 : b = n;
1814 : } else { /* append */
1815 : i = newoffsets;
1816 : }
1817 : }
1818 1 : bat_destroy(b);
1819 1 : return i;
1820 : }
1821 :
1822 : static int
1823 6972 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
1824 : {
1825 6972 : void *oupd = upd;
1826 6972 : sql_delta *bat = *batp;
1827 6972 : column_storage *cs = &bat->cs;
1828 6972 : storage *s = ATOMIC_PTR_GET(&t->data);
1829 6972 : assert(!is_oid_nil(rid));
1830 6972 : int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
1831 :
1832 6972 : if (cs->st == ST_DICT) {
1833 : /* possibly a new array is returned */
1834 0 : upd = dict_append_val(tr, batp, upd, 1);
1835 0 : bat = *batp;
1836 0 : cs = &bat->cs;
1837 0 : if (!upd)
1838 : return LOG_ERR;
1839 : }
1840 :
1841 : /* check if rid is insert ? */
1842 6972 : if (!inplace) {
1843 : /* check conflict */
1844 4050 : if (segments_is_deleted(s->segs->h, tr, rid)) {
1845 0 : if (oupd != upd)
1846 0 : GDKfree(upd);
1847 0 : return LOG_CONFLICT;
1848 : }
1849 4050 : BAT *ui, *uv;
1850 :
1851 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1852 : /* currently only one update delta is possible */
1853 4050 : if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1854 0 : if (oupd != upd)
1855 0 : GDKfree(upd);
1856 0 : return LOG_ERR;
1857 : }
1858 :
1859 4050 : assert(uv->ttype);
1860 4050 : assert(BATcount(ui) == BATcount(uv));
1861 8100 : if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
1862 4050 : BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
1863 0 : if (oupd != upd)
1864 0 : GDKfree(upd);
1865 0 : bat_destroy(ui);
1866 0 : bat_destroy(uv);
1867 0 : return LOG_ERR;
1868 : }
1869 4050 : assert(BATcount(ui) == BATcount(uv));
1870 4050 : bat_destroy(ui);
1871 4050 : bat_destroy(uv);
1872 4050 : cs->ucnt++;
1873 : } else {
1874 2922 : BAT *b = NULL;
1875 :
1876 2922 : if((b = temp_descriptor(cs->bid)) == NULL) {
1877 0 : if (oupd != upd)
1878 0 : GDKfree(upd);
1879 0 : return LOG_ERR;
1880 : }
1881 2922 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
1882 0 : if (oupd != upd)
1883 0 : GDKfree(upd);
1884 0 : bat_destroy(b);
1885 0 : return LOG_ERR;
1886 : }
1887 2922 : bat_destroy(b);
1888 : }
1889 6972 : if (oupd != upd)
1890 0 : GDKfree(upd);
1891 : return LOG_OK;
1892 : }
1893 :
1894 : static int
1895 6972 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
1896 : {
1897 6972 : int res = LOG_OK;
1898 6972 : lock_table(tr->store, t->base.id);
1899 6972 : res = cs_update_val(tr, bat, t, rid, upd, is_new);
1900 6972 : unlock_table(tr->store, t->base.id);
1901 6972 : return res;
1902 : }
1903 :
1904 : static int
1905 158860 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
1906 : {
1907 158860 : (void)tr;
1908 158860 : if (!ocs)
1909 : return LOG_OK;
1910 158860 : cs->bid = ocs->bid;
1911 158860 : cs->ebid = ocs->ebid;
1912 158860 : cs->uibid = ocs->uibid;
1913 158860 : cs->uvbid = ocs->uvbid;
1914 158860 : cs->ucnt = ocs->ucnt;
1915 :
1916 158860 : if (temp) {
1917 25963 : cs->bid = temp_copy(cs->bid, true, false);
1918 25945 : if (cs->bid == BID_NIL)
1919 : return LOG_ERR;
1920 : } else {
1921 132897 : temp_dup(cs->bid);
1922 : }
1923 158839 : if (cs->ebid)
1924 6 : temp_dup(cs->ebid);
1925 158839 : cs->ucnt = 0;
1926 158839 : cs->uibid = e_bat(TYPE_oid);
1927 158876 : cs->uvbid = e_bat(type);
1928 158875 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1929 : return LOG_ERR;
1930 158875 : cs->st = ocs->st;
1931 158875 : return LOG_OK;
1932 : }
1933 :
1934 : static void
1935 322614 : destroy_delta(sql_delta *b, bool recursive)
1936 : {
1937 322614 : if (ATOMIC_DEC(&b->cs.refcnt) > 0)
1938 : return;
1939 301208 : if (recursive && b->next)
1940 127511 : destroy_delta(b->next, true);
1941 301208 : if (b->cs.uibid)
1942 100579 : temp_destroy(b->cs.uibid);
1943 301208 : if (b->cs.uvbid)
1944 100579 : temp_destroy(b->cs.uvbid);
1945 301208 : if (b->cs.bid)
1946 301208 : temp_destroy(b->cs.bid);
1947 301208 : if (b->cs.ebid)
1948 61 : temp_destroy(b->cs.ebid);
1949 301208 : b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
1950 301208 : _DELETE(b);
1951 : }
1952 :
1953 : static sql_delta *
1954 16287369 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
1955 : {
1956 16287369 : sql_delta *obat = ATOMIC_PTR_GET(&c->data);
1957 :
1958 16287369 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
1959 16154527 : return obat;
1960 132842 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
1961 : /* abort */
1962 12 : if (update_conflict)
1963 4 : *update_conflict = true;
1964 8 : else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
1965 8 : return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
1966 4 : return NULL;
1967 : }
1968 132830 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
1969 : return NULL;
1970 132830 : sql_delta* bat = ZNEW(sql_delta);
1971 132879 : if (!bat)
1972 : return NULL;
1973 132879 : ATOMIC_INIT(&bat->cs.refcnt, 1);
1974 132879 : if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
1975 0 : destroy_delta(bat, false);
1976 0 : return NULL;
1977 : }
1978 132882 : bat->cs.ts = tr->tid;
1979 : /* only one writer else abort */
1980 132882 : bat->next = obat;
1981 132882 : if (obat)
1982 132882 : bat->nr_updates = obat->nr_updates;
1983 132882 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
1984 0 : bat->next = NULL;
1985 0 : destroy_delta(bat, false);
1986 0 : if (update_conflict)
1987 0 : *update_conflict = true;
1988 0 : return NULL;
1989 : }
1990 : return bat;
1991 : }
1992 :
1993 : static int
1994 10130 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
1995 : {
1996 10130 : int ok = LOG_OK;
1997 :
1998 10130 : if (is_bat) {
1999 3158 : BAT *tids = incoming_tids;
2000 3158 : BAT *values = incoming_values;
2001 3158 : if (BATcount(tids) == 0)
2002 : return LOG_OK;
2003 3158 : ok = delta_update_bat(tr, delta, table, tids, values, is_new);
2004 : } else {
2005 6972 : ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
2006 : }
2007 : return ok;
2008 : }
2009 :
2010 : static int
2011 10169 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
2012 : {
2013 10169 : int res = LOG_OK;
2014 10169 : bool update_conflict = false;
2015 10169 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2016 :
2017 10169 : if (isbat) {
2018 3194 : BAT *t = tids;
2019 3194 : if (!BATcount(t))
2020 : return LOG_OK;
2021 : }
2022 :
2023 9855 : if (c == NULL)
2024 : return LOG_ERR;
2025 :
2026 9855 : if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
2027 4 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2028 :
2029 9851 : assert(delta && delta->cs.ts == tr->tid);
2030 9851 : assert(c->t->persistence != SQL_DECLARED_TABLE);
2031 9851 : if (odelta != delta)
2032 3482 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
2033 :
2034 9851 : odelta = delta;
2035 9851 : if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
2036 : return res;
2037 9851 : assert(delta == odelta);
2038 9851 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2039 0 : res = sql_trans_alter_storage(tr, c, NULL);
2040 : return res;
2041 : }
2042 :
2043 : static sql_delta *
2044 2462 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
2045 : {
2046 2462 : sql_delta *obat = ATOMIC_PTR_GET(&i->data);
2047 :
2048 2462 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
2049 2434 : return obat;
2050 28 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
2051 : /* abort */
2052 0 : if (update_conflict)
2053 0 : *update_conflict = true;
2054 0 : return NULL;
2055 : }
2056 28 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
2057 : return NULL;
2058 28 : sql_delta* bat = ZNEW(sql_delta);
2059 28 : if (!bat)
2060 : return NULL;
2061 28 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2062 33 : if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
2063 0 : destroy_delta(bat, false);
2064 0 : return NULL;
2065 : }
2066 28 : bat->cs.ts = tr->tid;
2067 : /* only one writer else abort */
2068 28 : bat->next = obat;
2069 28 : if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
2070 0 : bat->next = NULL;
2071 0 : destroy_delta(bat, false);
2072 0 : if (update_conflict)
2073 0 : *update_conflict = true;
2074 0 : return NULL;
2075 : }
2076 : return bat;
2077 : }
2078 :
2079 : static int
2080 782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
2081 : {
2082 782 : int res = LOG_OK;
2083 782 : bool update_conflict = false;
2084 782 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
2085 :
2086 782 : if (isbat) {
2087 782 : BAT *t = tids;
2088 782 : if (!BATcount(t))
2089 : return LOG_OK;
2090 : }
2091 :
2092 279 : if (i == NULL)
2093 : return LOG_ERR;
2094 :
2095 279 : if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
2096 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2097 :
2098 279 : assert(delta && delta->cs.ts == tr->tid);
2099 279 : if (odelta != delta)
2100 22 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
2101 :
2102 279 : odelta = delta;
2103 279 : res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
2104 279 : assert(delta == odelta);
2105 : return res;
2106 : }
2107 :
2108 : static int
2109 149217 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
2110 : {
2111 149217 : BAT *b, *oi = i;
2112 149217 : int err = 0;
2113 149217 : sql_delta *bat = *batp;
2114 :
2115 149217 : assert(!offsets || BATcount(offsets) == BATcount(i));
2116 149217 : if (!BATcount(i))
2117 : return LOG_OK;
2118 149217 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
2119 : return LOG_ERR;
2120 :
2121 149217 : lock_column(tr->store, id);
2122 150076 : if (bat->cs.st == ST_DICT) {
2123 13 : BAT *ni = dict_append_bat(tr, batp, oi);
2124 13 : bat = *batp;
2125 13 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2126 0 : bat_destroy(oi);
2127 13 : oi = ni;
2128 13 : if (!oi) {
2129 0 : unlock_column(tr->store, id);
2130 0 : return LOG_ERR;
2131 : }
2132 : }
2133 150076 : if (bat->cs.st == ST_FOR) {
2134 0 : BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
2135 0 : bat = *batp;
2136 0 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2137 0 : bat_destroy(oi);
2138 0 : oi = ni;
2139 0 : if (!oi) {
2140 0 : unlock_column(tr->store, id);
2141 0 : return LOG_ERR;
2142 : }
2143 : }
2144 :
2145 150076 : b = temp_descriptor(bat->cs.bid);
2146 149772 : if (b == NULL) {
2147 0 : unlock_column(tr->store, id);
2148 0 : if (oi != i)
2149 0 : bat_destroy(oi);
2150 0 : return LOG_ERR;
2151 : }
2152 149772 : if (!offsets && offset == b->hseqbase+BATcount(b)) {
2153 149584 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2154 0 : err = 1;
2155 176 : } else if (!offsets) {
2156 176 : if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
2157 0 : err = 1;
2158 12 : } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
2159 0 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2160 0 : err = 1;
2161 12 : } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
2162 0 : err = 1;
2163 : }
2164 149554 : bat_destroy(b);
2165 150382 : unlock_column(tr->store, id);
2166 :
2167 150393 : if (oi != i)
2168 13 : bat_destroy(oi);
2169 150393 : return (err)?LOG_ERR:LOG_OK;
2170 : }
2171 :
2172 : // Look at the offsets and find where the replacements end and the appends begin.
2173 : static BUN
2174 0 : start_of_appends(BAT *offsets, BUN bcnt)
2175 : {
2176 0 : BUN ocnt = BATcount(offsets);
2177 0 : if (ocnt == 0)
2178 : return 0;
2179 :
2180 0 : BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
2181 0 : if (highest < bcnt)
2182 : // all are replacements
2183 : return ocnt;
2184 :
2185 : // reason backward to find the first append.
2186 : // Suppose offsets has 15 entries, bcnt == 100
2187 : // and the highest offset in offsets is 109.
2188 0 : BUN new_bcnt = highest + 1; // 110
2189 0 : BUN nappends = new_bcnt - bcnt; // 10
2190 0 : BUN nreplacements = ocnt - nappends; // 5
2191 :
2192 : // The first append should be to position bcnt
2193 0 : assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
2194 :
2195 : return nreplacements;
2196 : }
2197 :
2198 :
2199 : static int
2200 15997552 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
2201 : {
2202 15997552 : void *oi = i;
2203 15997552 : BAT *b;
2204 15997552 : lock_column(tr->store, id);
2205 16013865 : sql_delta *bat = *batp;
2206 :
2207 16013865 : if (bat->cs.st == ST_DICT) {
2208 : /* possibly a new array is returned */
2209 4 : i = dict_append_val(tr, batp, i, cnt);
2210 4 : bat = *batp;
2211 4 : if (!i) {
2212 0 : unlock_column(tr->store, id);
2213 0 : return LOG_ERR;
2214 : }
2215 : }
2216 16013865 : if (bat->cs.st == ST_FOR) {
2217 : /* possibly a new array is returned */
2218 1 : i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
2219 1 : bat = *batp;
2220 1 : if (!i) {
2221 0 : unlock_column(tr->store, id);
2222 0 : return LOG_ERR;
2223 : }
2224 : }
2225 :
2226 16013865 : b = temp_descriptor(bat->cs.bid);
2227 16009651 : if (b == NULL) {
2228 0 : if (i != oi)
2229 0 : GDKfree(i);
2230 0 : unlock_column(tr->store, id);
2231 0 : return LOG_ERR;
2232 : }
2233 16009651 : BUN bcnt = BATcount(b);
2234 :
2235 16009651 : if (offsets) {
2236 : // The first few might be replacements while later items might be appends.
2237 : // Handle the replacements here while leaving the appends to the code below.
2238 0 : BUN nreplacements = start_of_appends(offsets, bcnt);
2239 :
2240 0 : oid *start = Tloc(offsets, 0);
2241 0 : if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
2242 0 : bat_destroy(b);
2243 0 : if (i != oi)
2244 0 : GDKfree(i);
2245 0 : unlock_column(tr->store, id);
2246 0 : return LOG_ERR;
2247 : }
2248 :
2249 : // Replacements have been handled. The rest are appends.
2250 0 : assert(offset == oid_nil);
2251 0 : offset = bcnt;
2252 0 : cnt -= nreplacements;
2253 : }
2254 :
2255 16009651 : if (bcnt > offset){
2256 550208 : size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
2257 550208 : if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
2258 0 : bat_destroy(b);
2259 0 : if (i != oi)
2260 0 : GDKfree(i);
2261 0 : unlock_column(tr->store, id);
2262 0 : return LOG_ERR;
2263 : }
2264 554036 : cnt -= ccnt;
2265 554036 : offset += ccnt;
2266 : }
2267 16013479 : if (cnt) {
2268 15459409 : if (BATcount(b) < offset) { /* add space */
2269 6621 : BUN d = offset - BATcount(b);
2270 6621 : if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
2271 0 : bat_destroy(b);
2272 0 : if (i != oi)
2273 0 : GDKfree(i);
2274 0 : unlock_column(tr->store, id);
2275 0 : return LOG_ERR;
2276 : }
2277 : }
2278 15459407 : if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
2279 0 : bat_destroy(b);
2280 0 : if (i != oi)
2281 0 : GDKfree(i);
2282 0 : unlock_column(tr->store, id);
2283 0 : return LOG_ERR;
2284 : }
2285 : }
2286 16017003 : bat_destroy(b);
2287 16010773 : if (i != oi)
2288 4 : GDKfree(i);
2289 16010773 : unlock_column(tr->store, id);
2290 16010773 : return LOG_OK;
2291 : }
2292 :
2293 : static int
2294 25965 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
2295 : {
2296 25965 : if (!(bat->segs = new_segments(tr, 0)))
2297 : return LOG_ERR;
2298 25963 : return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
2299 : }
2300 :
2301 : static int
2302 16146756 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type)
2303 : {
2304 16146756 : int ok = LOG_OK;
2305 :
2306 16146756 : if ((*delta)->cs.merged)
2307 37049 : (*delta)->cs.merged = false; /* TODO needs to move */
2308 16146756 : if (isbat) {
2309 149404 : BAT *bat = incoming_data;
2310 :
2311 149404 : if (BATcount(bat))
2312 149545 : ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
2313 : } else {
2314 15997352 : ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
2315 : }
2316 16170250 : return ok;
2317 : }
2318 :
2319 : static int
2320 16153788 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2321 : {
2322 16153788 : int res = LOG_OK;
2323 16153788 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2324 :
2325 16153788 : if (isbat) {
2326 149363 : BAT *t = data;
2327 149363 : if (!BATcount(t))
2328 : return LOG_OK;
2329 : }
2330 :
2331 16153013 : if ((delta = bind_col_data(tr, c, NULL)) == NULL)
2332 : return LOG_ERR;
2333 :
2334 16148185 : assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
2335 :
2336 16148185 : odelta = delta;
2337 16148185 : if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type)) != LOG_OK)
2338 : return res;
2339 16166859 : if (odelta != delta) {
2340 0 : delta->next = odelta;
2341 0 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
2342 0 : delta->next = NULL;
2343 0 : destroy_delta(delta, false);
2344 0 : return LOG_CONFLICT;
2345 : }
2346 : }
2347 16166859 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2348 1 : res = sql_trans_alter_storage(tr, c, NULL);
2349 : return res;
2350 : }
2351 :
2352 : static int
2353 2189 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2354 : {
2355 2189 : int res = LOG_OK;
2356 2189 : sql_delta *delta;
2357 :
2358 2189 : if (isbat) {
2359 1015 : BAT *t = data;
2360 1015 : if (!BATcount(t))
2361 : return LOG_OK;
2362 : }
2363 :
2364 2177 : if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
2365 : return LOG_ERR;
2366 :
2367 2177 : assert(delta->cs.st == ST_DEFAULT);
2368 :
2369 2177 : res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL);
2370 2177 : return res;
2371 : }
2372 :
2373 : static int
2374 79515 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
2375 : {
2376 79515 : int err = 0;
2377 :
2378 : /* TODO check for conflicting updates */
2379 79515 : (void)rid;
2380 79515 : (void)cnt;
2381 584710 : for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
2382 505195 : sql_column *c = n->data;
2383 505195 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
2384 :
2385 : /* check for active updates */
2386 505195 : if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
2387 : return 1;
2388 : }
2389 : return 0;
2390 : }
2391 :
2392 : static int
2393 75607 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
2394 : {
2395 75607 : lock_table(tr->store, t->base.id);
2396 :
2397 75607 : int in_transaction = segments_in_transaction(tr, t);
2398 :
2399 : /* find segment of rid, split, mark new segment deleted (for tr->tid) */
2400 75607 : segment *seg = s->segs->h, *p = NULL;
2401 52567024 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2402 52567024 : if (seg->start <= rid && seg->end > rid) {
2403 75607 : if (!SEG_VALID_4_DELETE(seg,tr)) {
2404 4 : unlock_table(tr->store, t->base.id);
2405 4 : return LOG_CONFLICT;
2406 : }
2407 75603 : if (deletes_conflict_updates( tr, t, rid, 1)) {
2408 0 : unlock_table(tr->store, t->base.id);
2409 0 : return LOG_CONFLICT;
2410 : }
2411 75603 : if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
2412 0 : unlock_table(tr->store, t->base.id);
2413 0 : return LOG_ERR;
2414 : }
2415 : break;
2416 : }
2417 : }
2418 75603 : unlock_table(tr->store, t->base.id);
2419 75603 : if (!in_transaction)
2420 13169 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2421 : return LOG_OK;
2422 : }
2423 :
2424 : static int
2425 3910 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
2426 : {
2427 3910 : segment *seg = *Seg, *p = NULL;
2428 13341 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2429 13339 : if (seg->start <= start && seg->end > start) {
2430 3960 : size_t lcnt = cnt;
2431 3960 : if (start+lcnt > seg->end)
2432 54 : lcnt = seg->end-start;
2433 3960 : if (SEG_IS_DELETED(seg, tr)) {
2434 47 : start += lcnt;
2435 47 : cnt -= lcnt;
2436 47 : continue;
2437 3913 : } else if (!SEG_VALID_4_DELETE(seg, tr))
2438 1 : return LOG_CONFLICT;
2439 3912 : if (deletes_conflict_updates( tr, t, start, lcnt))
2440 : return LOG_CONFLICT;
2441 3912 : *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
2442 3912 : if (!seg)
2443 : return LOG_ERR;
2444 3912 : start += lcnt;
2445 3912 : cnt -= lcnt;
2446 : }
2447 13291 : if (start+cnt <= seg->end)
2448 : break;
2449 : }
2450 : return LOG_OK;
2451 : }
2452 :
2453 : static int
2454 735 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
2455 : {
2456 735 : segment *seg = s->segs->h;
2457 735 : return seg_delete_range(tr, t, s, &seg, start, cnt);
2458 : }
2459 :
2460 : static int
2461 319 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
2462 : {
2463 319 : int in_transaction = segments_in_transaction(tr, t);
2464 319 : BAT *oi = i; /* update ids */
2465 319 : int ok = LOG_OK;
2466 :
2467 319 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
2468 : return LOG_ERR;
2469 319 : if (BATcount(i)) {
2470 568 : if (BATtdense(i)) {
2471 249 : size_t start = i->tseqbase;
2472 249 : size_t cnt = BATcount(i);
2473 :
2474 249 : lock_table(tr->store, t->base.id);
2475 249 : ok = delete_range(tr, t, s, start, cnt);
2476 249 : unlock_table(tr->store, t->base.id);
2477 70 : } else if (complex_cand(i)) {
2478 0 : struct canditer ci;
2479 0 : oid f = 0, l = 0, cur = 0;
2480 :
2481 0 : canditer_init(&ci, NULL, i);
2482 0 : cur = f = canditer_next(&ci);
2483 :
2484 0 : lock_table(tr->store, t->base.id);
2485 0 : if (!is_oid_nil(f)) {
2486 0 : segment *seg = s->segs->h;
2487 0 : for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
2488 0 : if (cur+1 == l) {
2489 0 : cur++;
2490 0 : continue;
2491 : }
2492 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2493 0 : f = cur = l;
2494 : }
2495 0 : if (ok == LOG_OK)
2496 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2497 : }
2498 0 : unlock_table(tr->store, t->base.id);
2499 : } else {
2500 70 : if (!i->tsorted) {
2501 0 : assert(oi == i);
2502 0 : BAT *ni = NULL;
2503 0 : if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
2504 0 : ok = LOG_ERR;
2505 0 : if (ni)
2506 0 : i = ni;
2507 : }
2508 70 : assert(i->tsorted);
2509 70 : BUN icnt = BATcount(i);
2510 70 : BATiter ii = bat_iterator(i);
2511 70 : oid *o = ii.base, n = o[0]+1;
2512 70 : size_t lcnt = 1;
2513 :
2514 70 : lock_table(tr->store, t->base.id);
2515 70 : segment *seg = s->segs->h;
2516 23233 : for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
2517 23163 : if (o[i] == n) {
2518 22803 : lcnt++;
2519 22803 : n++;
2520 : } else {
2521 360 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2522 360 : lcnt = 0;
2523 : }
2524 23163 : if (!lcnt) {
2525 360 : n = o[i]+1;
2526 360 : lcnt = 1;
2527 : }
2528 : }
2529 70 : bat_iterator_end(&ii);
2530 70 : if (lcnt && ok == LOG_OK)
2531 70 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2532 70 : unlock_table(tr->store, t->base.id);
2533 : }
2534 : }
2535 319 : if (i != oi)
2536 25 : bat_destroy(i);
2537 : // assert
2538 319 : if (!in_transaction)
2539 271 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2540 : return ok;
2541 : }
2542 :
2543 : static void
2544 52745 : destroy_segments(segments *s)
2545 : {
2546 52745 : if (!s || sql_ref_dec(&s->r) > 0)
2547 0 : return;
2548 52745 : segment *seg = s->h;
2549 116653 : while(seg) {
2550 63908 : segment *n = ATOMIC_PTR_GET(&seg->next);
2551 63908 : _DELETE(seg);
2552 63908 : seg = n;
2553 : }
2554 52745 : _DELETE(s);
2555 : }
2556 :
2557 : static void
2558 54210 : destroy_storage(storage *bat)
2559 : {
2560 54210 : if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
2561 : return;
2562 52603 : if (bat->next)
2563 6325 : destroy_storage(bat->next);
2564 52603 : destroy_segments(bat->segs);
2565 52603 : if (bat->cs.uibid)
2566 31221 : temp_destroy(bat->cs.uibid);
2567 52603 : if (bat->cs.uvbid)
2568 31221 : temp_destroy(bat->cs.uvbid);
2569 52603 : if (bat->cs.bid)
2570 52603 : temp_destroy(bat->cs.bid);
2571 52603 : bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
2572 52603 : _DELETE(bat);
2573 : }
2574 :
2575 : static int
2576 174882 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
2577 : {
2578 174882 : if (uncommitted) {
2579 455136 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2580 301059 : if (!VALID_4_READ(s->ts,tr))
2581 : return 1;
2582 : } else {
2583 298791 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2584 279365 : if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
2585 : return 1;
2586 : }
2587 :
2588 : return 0;
2589 : }
2590 :
2591 : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
2592 :
2593 : storage *
2594 2233552 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
2595 : {
2596 2233552 : storage *obat;
2597 :
2598 2233552 : obat = ATOMIC_PTR_GET(&t->data);
2599 :
2600 2233552 : if (obat->cs.ts != tr->tid)
2601 1546660 : if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
2602 1546605 : if (obat->cs.ts >= TRANSACTION_ID_BASE) {
2603 : /* abort */
2604 15444 : if (clear)
2605 15444 : *clear = true;
2606 15444 : return NULL;
2607 : }
2608 :
2609 2218108 : if (!clear)
2610 : return obat;
2611 :
2612 : /* remainder is only to handle clear */
2613 26334 : if (segments_conflict(tr, obat->segs, 1)) {
2614 371 : *clear = true;
2615 371 : return NULL;
2616 : }
2617 25963 : if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
2618 : return NULL;
2619 25963 : storage *bat = ZNEW(storage);
2620 25966 : if (!bat)
2621 : return NULL;
2622 25966 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2623 25966 : if (dup_storage(tr, obat, bat) != LOG_OK) {
2624 0 : destroy_storage(bat);
2625 0 : return NULL;
2626 : }
2627 25966 : bat->cs.cleared = true;
2628 25966 : bat->cs.ts = tr->tid;
2629 : /* only one writer else abort */
2630 25966 : bat->next = obat;
2631 25966 : if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
2632 10 : bat->next = NULL;
2633 10 : destroy_storage(bat);
2634 10 : if (clear)
2635 10 : *clear = true;
2636 10 : return NULL;
2637 : }
2638 : return bat;
2639 : }
2640 :
2641 : static int
2642 75974 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
2643 : {
2644 75974 : int ok = LOG_OK;
2645 75974 : BAT *b = ib;
2646 75974 : storage *bat;
2647 :
2648 75974 : if (isbat && !BATcount(b))
2649 : return ok;
2650 :
2651 75926 : if (t == NULL)
2652 : return LOG_ERR;
2653 :
2654 75926 : if ((bat = bind_del_data(tr, t, NULL)) == NULL)
2655 : return LOG_ERR;
2656 :
2657 75926 : if (isbat)
2658 319 : ok = storage_delete_bat(tr, t, bat, ib);
2659 : else
2660 75607 : ok = storage_delete_val(tr, t, bat, *(oid*)ib);
2661 : return ok;
2662 : }
2663 :
2664 : static size_t
2665 0 : dcount_col(sql_trans *tr, sql_column *c)
2666 : {
2667 0 : sql_delta *b;
2668 :
2669 0 : if (!isTable(c->t))
2670 : return 0;
2671 0 : b = col_timestamp_delta(tr, c);
2672 0 : if (!b)
2673 : return 1;
2674 :
2675 0 : storage *s = ATOMIC_PTR_GET(&c->t->data);
2676 0 : if (!s || !s->segs->t)
2677 : return 1;
2678 0 : size_t cnt = s->segs->t->end;
2679 0 : if (cnt) {
2680 0 : BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
2681 0 : size_t dcnt = 0;
2682 :
2683 0 : if (v)
2684 0 : dcnt = BATguess_uniques(v, NULL);
2685 0 : return dcnt;
2686 : }
2687 : return cnt;
2688 : }
2689 :
2690 : static BAT *
2691 3942853 : bind_no_view(BAT *b, bool quick)
2692 : {
2693 3942853 : if (VIEWtparent(b)) { /* If it is a view get the parent BAT */
2694 3938715 : BAT *nb = BBP_desc(VIEWtparent(b));
2695 3938715 : bat_destroy(b);
2696 3938812 : if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
2697 : return NULL;
2698 : }
2699 : return b;
2700 : }
2701 :
2702 : static int
2703 0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
2704 : {
2705 0 : int ok = 0;
2706 0 : assert(tr->active);
2707 0 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2708 0 : return 0;
2709 0 : lock_column(tr->store, c->base.id);
2710 0 : if (unique_est) {
2711 0 : sql_delta *d;
2712 0 : if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
2713 0 : BAT *b;
2714 0 : if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
2715 0 : MT_lock_set(&b->theaplock);
2716 0 : b->tunique_est = *unique_est;
2717 0 : MT_lock_unset(&b->theaplock);
2718 0 : bat_destroy(b);
2719 : }
2720 : }
2721 : }
2722 0 : if (min) {
2723 0 : _DELETE(c->min);
2724 0 : size_t minlen = ATOMlen(c->type.type->localtype, min);
2725 0 : if ((c->min = GDKmalloc(minlen)) != NULL) {
2726 0 : memcpy(c->min, min, minlen);
2727 0 : ok = 1;
2728 : }
2729 : }
2730 0 : if (max) {
2731 0 : _DELETE(c->max);
2732 0 : size_t maxlen = ATOMlen(c->type.type->localtype, max);
2733 0 : if ((c->max = GDKmalloc(maxlen)) != NULL) {
2734 0 : memcpy(c->max, max, maxlen);
2735 0 : ok = 1;
2736 : }
2737 : }
2738 0 : unlock_column(tr->store, c->base.id);
2739 0 : return ok;
2740 : }
2741 :
2742 : static int
2743 19 : min_max_col(sql_trans *tr, sql_column *c)
2744 : {
2745 19 : int ok = 0;
2746 19 : BAT *b = NULL;
2747 19 : sql_delta *d = NULL;
2748 :
2749 19 : assert(tr->active);
2750 19 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2751 0 : return 0;
2752 19 : if (c->min && c->max)
2753 : return 1;
2754 19 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2755 19 : if (d->cs.st == ST_FOR)
2756 : return 0;
2757 19 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2758 19 : lock_column(tr->store, c->base.id);
2759 19 : if (c->min && c->max) {
2760 0 : unlock_column(tr->store, c->base.id);
2761 0 : return 1;
2762 : }
2763 19 : _DELETE(c->min);
2764 19 : _DELETE(c->max);
2765 19 : if ((b = bind_col(tr, c, access))) {
2766 19 : if (!(b = bind_no_view(b, false))) {
2767 0 : unlock_column(tr->store, c->base.id);
2768 0 : return 0;
2769 : }
2770 19 : BATiter bi = bat_iterator(b);
2771 19 : if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
2772 16 : const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
2773 16 : size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
2774 :
2775 16 : if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
2776 0 : _DELETE(c->min);
2777 0 : _DELETE(c->max);
2778 : } else {
2779 16 : memcpy(c->min, nmin, minlen);
2780 16 : memcpy(c->max, nmax, maxlen);
2781 16 : ok = 1;
2782 : }
2783 : }
2784 19 : bat_iterator_end(&bi);
2785 19 : bat_destroy(b);
2786 : }
2787 19 : unlock_column(tr->store, c->base.id);
2788 : }
2789 : return ok;
2790 : }
2791 :
2792 : static size_t
2793 17 : count_segs(segment *s)
2794 : {
2795 17 : size_t nr = 0;
2796 :
2797 72 : for( ; s; s = ATOMIC_PTR_GET(&s->next))
2798 55 : nr++;
2799 17 : return nr;
2800 : }
2801 :
2802 : static size_t
2803 34 : count_del(sql_trans *tr, sql_table *t, int access)
2804 : {
2805 34 : storage *d;
2806 :
2807 34 : if (!isTable(t))
2808 : return 0;
2809 34 : d = tab_timestamp_storage(tr, t);
2810 34 : if (!d)
2811 : return 0;
2812 34 : if (access == 2)
2813 0 : return d->cs.ucnt;
2814 34 : if (access == 1)
2815 0 : return count_inserts(d->segs->h, tr);
2816 34 : if (access == 10) /* special case for counting the number of segments */
2817 17 : return count_segs(d->segs->h);
2818 17 : return count_deletes(d->segs->h, tr);
2819 : }
2820 :
2821 : static int
2822 18316 : sorted_col(sql_trans *tr, sql_column *col)
2823 : {
2824 18316 : int sorted = 0;
2825 :
2826 18316 : assert(tr->active);
2827 18316 : if (!isTable(col->t) || !col->t->s)
2828 : return 0;
2829 :
2830 18316 : if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
2831 18298 : BAT *b = bind_col(tr, col, QUICK);
2832 :
2833 18298 : if (b)
2834 18298 : sorted = b->tsorted || b->trevsorted;
2835 : }
2836 : return sorted;
2837 : }
2838 :
2839 : static int
2840 6909 : unique_col(sql_trans *tr, sql_column *col)
2841 : {
2842 6909 : int distinct = 0;
2843 :
2844 6909 : assert(tr->active);
2845 6909 : if (!isTable(col->t) || !col->t->s)
2846 : return 0;
2847 :
2848 6909 : if (col && ATOMIC_PTR_GET(&col->data)) {
2849 6909 : BAT *b = bind_col(tr, col, QUICK);
2850 :
2851 6909 : if (b)
2852 6909 : distinct = b->tkey;
2853 : }
2854 : return distinct;
2855 : }
2856 :
2857 : static int
2858 1868 : double_elim_col(sql_trans *tr, sql_column *col)
2859 : {
2860 1868 : int de = 0;
2861 1868 : sql_delta *d;
2862 :
2863 1868 : assert(tr->active);
2864 1868 : if (!isTable(col->t) || !col->t->s)
2865 : return 0;
2866 :
2867 1868 : if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
2868 6 : if (d->cs.st == ST_DICT) {
2869 6 : BAT *b = bind_col(tr, col, QUICK);
2870 6 : if (b && b->ttype == TYPE_bte)
2871 : de = 1;
2872 0 : else if (b && b->ttype == TYPE_sht)
2873 1868 : de = 2;
2874 : }
2875 1862 : } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
2876 1862 : BAT *b = bind_col(tr, col, QUICK);
2877 :
2878 1862 : if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
2879 1862 : de = GDK_ELIMDOUBLES(b->tvheap);
2880 1862 : if (de)
2881 1750 : de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
2882 : }
2883 1750 : assert(de >= 0 && de <= 16);
2884 : }
2885 : return de;
2886 : }
2887 :
2888 : static int
2889 3977758 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
2890 : {
2891 3977758 : int ok = 0;
2892 3977758 : BAT *b = NULL, *off = NULL, *upv = NULL;
2893 3977758 : sql_delta *d = NULL;
2894 :
2895 3977758 : (void) tr;
2896 3977758 : assert(tr->active);
2897 3977758 : *nonil = false;
2898 3977758 : *unique = false;
2899 3977758 : *unique_est = 0.0;
2900 3977758 : if (!c || !isTable(c->t) || !c->t->s)
2901 : return ok;
2902 :
2903 3977520 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2904 3938944 : if (d->cs.st == ST_FOR) {
2905 27 : *nonil = true; /* TODO for min/max. I will do it later */
2906 27 : return ok;
2907 : }
2908 3938917 : int eclass = c->type.type->eclass;
2909 3938917 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2910 3938917 : if ((b = bind_col(tr, c, access))) {
2911 3939656 : if (!(b = bind_no_view(b, false)))
2912 0 : return ok;
2913 3940052 : BATiter bi = bat_iterator(b);
2914 3939291 : *nonil = bi.nonil && !bi.nil;
2915 :
2916 3939291 : if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
2917 3629505 : d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
2918 2342323 : if (c->min && VALinit(min, bi.type, c->min))
2919 : ok |= 1;
2920 2342269 : else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
2921 2332391 : ok |= 1;
2922 2342239 : if (c->max && VALinit(max, bi.type, c->max))
2923 54 : ok |= 2;
2924 2342196 : else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
2925 2326096 : ok |= 2;
2926 : }
2927 3939163 : if (d->cs.ucnt == 0) {
2928 3935046 : if (d->cs.st == ST_DEFAULT) {
2929 3934346 : *unique = bi.key;
2930 3934346 : *unique_est = bi.unique_est;
2931 3934346 : if (*unique_est == 0)
2932 1263865 : *unique_est = (double)BATguess_uniques(b,NULL);
2933 700 : } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
2934 : /* for dict, check the offsets bat for uniqueness */
2935 700 : MT_lock_set(&off->theaplock);
2936 700 : *unique = off->tkey;
2937 700 : *unique_est = off->tunique_est;
2938 700 : MT_lock_unset(&off->theaplock);
2939 : }
2940 : }
2941 3939551 : bat_iterator_end(&bi);
2942 3939815 : bat_destroy(b);
2943 3939894 : if (*nonil && d->cs.ucnt > 0) {
2944 : /* This could use a quick descriptor */
2945 2842 : if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
2946 0 : *nonil = false;
2947 : } else {
2948 2842 : MT_lock_set(&upv->theaplock);
2949 2842 : *nonil &= upv->tnonil && !upv->tnil;
2950 2842 : MT_lock_unset(&upv->theaplock);
2951 2842 : bat_destroy(upv);
2952 : }
2953 : }
2954 : }
2955 : }
2956 : return ok;
2957 : }
2958 :
2959 : static int
2960 257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
2961 : {
2962 257 : assert(tr->active);
2963 257 : if (!isTable(col->t) || !col->t->s)
2964 : return LOG_OK;
2965 :
2966 252 : if (col && ATOMIC_PTR_GET(&col->data)) {
2967 252 : BAT *b = bind_col(tr, col, QUICK);
2968 :
2969 252 : if (b) { /* add props for ranges [min, max> */
2970 252 : MT_lock_set(&b->theaplock);
2971 252 : if (add_range) {
2972 179 : BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
2973 179 : if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
2974 103 : BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
2975 : else
2976 76 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2977 179 : if (!pt->with_nills || !col->null)
2978 117 : BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
2979 : } else {
2980 73 : BATrmprop_nolock(b, GDK_MIN_BOUND);
2981 73 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2982 73 : BATrmprop_nolock(b, GDK_NOT_NULL);
2983 : }
2984 252 : MT_lock_unset(&b->theaplock);
2985 : }
2986 : }
2987 : return LOG_OK;
2988 : }
2989 :
2990 : static int
2991 4809 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
2992 : {
2993 4809 : assert(tr->active);
2994 4809 : if (!isTable(col->t) || !col->t->s)
2995 : return LOG_OK;
2996 :
2997 4779 : if (col && ATOMIC_PTR_GET(&col->data)) {
2998 4779 : BAT *b = bind_col(tr, col, QUICK);
2999 :
3000 4779 : if (b) { /* add props for ranges [min, max> */
3001 4779 : if (not_null) {
3002 4777 : BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
3003 : } else {
3004 2 : BATrmprop(b, GDK_NOT_NULL);
3005 : }
3006 : }
3007 : }
3008 : return LOG_OK;
3009 : }
3010 :
3011 : static int
3012 33030 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
3013 : {
3014 33030 : sqlstore *store = tr->store;
3015 33030 : int bid = log_find_bat(store->logger, id);
3016 33030 : if (bid <= 0)
3017 : return LOG_ERR;
3018 33030 : cs->bid = temp_dup(bid);
3019 33030 : cs->ucnt = 0;
3020 33030 : cs->uibid = e_bat(TYPE_oid);
3021 33030 : cs->uvbid = e_bat(type);
3022 33030 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
3023 : return LOG_ERR;
3024 : return LOG_OK;
3025 : }
3026 :
3027 : static int
3028 69457 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
3029 : {
3030 69457 : int res = LOG_OK;
3031 69457 : gdk_return ok;
3032 69457 : BAT *b = temp_descriptor(bat->cs.bid);
3033 :
3034 69457 : if (b == NULL)
3035 : return LOG_ERR;
3036 :
3037 69457 : if (!bat->cs.uibid)
3038 69449 : bat->cs.uibid = e_bat(TYPE_oid);
3039 69457 : if (!bat->cs.uvbid)
3040 69449 : bat->cs.uvbid = e_bat(b->ttype);
3041 69457 : if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
3042 0 : res = LOG_ERR;
3043 69457 : if (GDKinmemory(0)) {
3044 178 : bat_destroy(b);
3045 178 : return res;
3046 : }
3047 :
3048 69279 : bat_set_access(b, BAT_READ);
3049 69279 : sqlstore *store = tr->store;
3050 69279 : ok = log_bat_persists(store->logger, b, id);
3051 69279 : bat_destroy(b);
3052 69279 : if(res != LOG_OK)
3053 : return res;
3054 69279 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3055 : }
3056 :
3057 : static int
3058 0 : new_persistent_delta( sql_delta *bat)
3059 : {
3060 0 : bat->cs.ucnt = 0;
3061 0 : return LOG_OK;
3062 : }
3063 :
3064 : static void
3065 133395 : create_delta( sql_delta *d, BAT *b)
3066 : {
3067 133395 : bat_set_access(b, BAT_READ);
3068 133395 : d->cs.bid = temp_create(b);
3069 133395 : d->cs.uibid = d->cs.uvbid = 0;
3070 133395 : d->cs.ucnt = 0;
3071 133395 : }
3072 :
3073 : static bat
3074 7381 : copyBat (bat i, int type, oid seq)
3075 : {
3076 7381 : BAT *b, *tb;
3077 7381 : bat res;
3078 :
3079 7381 : if (!i)
3080 : return i;
3081 7381 : tb = quick_descriptor(i);
3082 7381 : if (tb == NULL)
3083 : return 0;
3084 7381 : b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
3085 7381 : if (b == NULL)
3086 : return 0;
3087 :
3088 7381 : bat_set_access(b, BAT_READ);
3089 :
3090 7381 : res = temp_create(b);
3091 7381 : bat_destroy(b);
3092 7381 : return res;
3093 : }
3094 :
3095 : static int
3096 158716 : create_col(sql_trans *tr, sql_column *c)
3097 : {
3098 158716 : int ok = LOG_OK, new = 0;
3099 158716 : int type = c->type.type->localtype;
3100 158716 : sql_delta *bat = ATOMIC_PTR_GET(&c->data);
3101 :
3102 158716 : if (!bat) {
3103 158716 : new = 1;
3104 158716 : bat = ZNEW(sql_delta);
3105 158716 : if (!bat)
3106 : return LOG_ERR;
3107 158716 : ATOMIC_PTR_SET(&c->data, bat);
3108 158716 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3109 : }
3110 :
3111 158716 : if (new)
3112 158716 : bat->cs.ts = tr->tid;
3113 :
3114 158716 : if (!isNew(c)&& !isTempTable(c->t)){
3115 25299 : bat->cs.ts = tr->ts;
3116 25299 : ok = load_cs(tr, &bat->cs, type, c->base.id);
3117 25299 : if (ok == LOG_OK && c->storage_type) {
3118 4 : if (strcmp(c->storage_type, "DICT") == 0) {
3119 2 : sqlstore *store = tr->store;
3120 2 : int bid = log_find_bat(store->logger, -c->base.id);
3121 2 : if (bid <= 0)
3122 : return LOG_ERR;
3123 2 : bat->cs.ebid = temp_dup(bid);
3124 2 : bat->cs.st = ST_DICT;
3125 2 : } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
3126 2 : bat->cs.st = ST_FOR;
3127 : }
3128 : }
3129 25299 : return ok;
3130 133417 : } else if (bat && bat->cs.bid) {
3131 0 : return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
3132 : } else {
3133 133417 : sql_column *fc = NULL;
3134 133417 : size_t cnt = 0;
3135 :
3136 : /* alter ? */
3137 133417 : if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
3138 80935 : storage *s = tab_timestamp_storage(tr, fc->t);
3139 80935 : if (s == NULL)
3140 : return LOG_ERR;
3141 80935 : cnt = segs_end(s->segs, tr, c->t);
3142 : }
3143 133417 : if (cnt && fc != c) {
3144 22 : sql_delta *d = ATOMIC_PTR_GET(&fc->data);
3145 :
3146 22 : if (d->cs.bid) {
3147 22 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3148 22 : if(bat->cs.bid == BID_NIL)
3149 22 : ok = LOG_ERR;
3150 : }
3151 22 : if (d->cs.uibid) {
3152 10 : bat->cs.uibid = e_bat(TYPE_oid);
3153 10 : if (bat->cs.uibid == BID_NIL)
3154 22 : ok = LOG_ERR;
3155 : }
3156 22 : if (d->cs.uvbid) {
3157 10 : bat->cs.uvbid = e_bat(type);
3158 10 : if(bat->cs.uvbid == BID_NIL)
3159 0 : ok = LOG_ERR;
3160 : }
3161 : } else {
3162 133395 : BAT *b = bat_new(type, c->t->sz, PERSISTENT);
3163 133395 : if (!b) {
3164 : ok = LOG_ERR;
3165 : } else {
3166 133395 : create_delta(ATOMIC_PTR_GET(&c->data), b);
3167 133395 : bat_destroy(b);
3168 : }
3169 :
3170 133395 : if (!new) {
3171 0 : bat->cs.uibid = e_bat(TYPE_oid);
3172 0 : if (bat->cs.uibid == BID_NIL)
3173 0 : ok = LOG_ERR;
3174 0 : bat->cs.uvbid = e_bat(type);
3175 0 : if(bat->cs.uvbid == BID_NIL)
3176 0 : ok = LOG_ERR;
3177 : }
3178 : }
3179 133417 : bat->cs.ucnt = 0;
3180 :
3181 133417 : if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
3182 94 : trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
3183 : }
3184 : return ok;
3185 : }
3186 :
3187 : static int
3188 63001 : log_create_col_(sql_trans *tr, sql_column *c)
3189 : {
3190 63001 : assert(!isTempTable(c->t));
3191 63001 : return log_create_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3192 : }
3193 :
3194 : static int
3195 90 : log_create_col(sql_trans *tr, sql_change *change)
3196 : {
3197 90 : return log_create_col_(tr, (sql_column*)change->obj);
3198 : }
3199 :
3200 : static int
3201 120873 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
3202 : {
3203 120873 : (void) t; // TODO transaction_layer_revamp: remove if unnecessary
3204 120873 : (void)oldest;
3205 120873 : assert(delta->cs.ts == tr->tid);
3206 120873 : delta->cs.ts = commit_ts;
3207 :
3208 120873 : assert(delta->next == NULL);
3209 120873 : if (!delta->cs.merged)
3210 120872 : delta->nr_updates += merge_delta(delta);
3211 120873 : if (!tr->parent)
3212 120869 : base->new = 0;
3213 120873 : return LOG_OK;
3214 : }
3215 :
3216 : static int
3217 94 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3218 : {
3219 94 : sql_column *c = (sql_column*)change->obj;
3220 94 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3221 94 : if (!tr->parent)
3222 93 : c->base.new = 0;
3223 94 : return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
3224 : }
3225 :
3226 : /* will be called for new idx's and when new index columns are created */
3227 : static int
3228 9801 : create_idx(sql_trans *tr, sql_idx *ni)
3229 : {
3230 9801 : int ok = LOG_OK, new = 0;
3231 9801 : sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
3232 9801 : int type = TYPE_lng;
3233 :
3234 9801 : if (oid_index(ni->type))
3235 954 : type = TYPE_oid;
3236 :
3237 9801 : if (!bat) {
3238 9801 : new = 1;
3239 9801 : bat = ZNEW(sql_delta);
3240 9801 : if (!bat)
3241 : return LOG_ERR;
3242 9801 : ATOMIC_PTR_INIT(&ni->data, bat);
3243 9801 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3244 : }
3245 :
3246 9801 : if (new)
3247 9801 : bat->cs.ts = tr->tid;
3248 :
3249 9801 : if (!isNew(ni) && !isTempTable(ni->t)){
3250 2442 : bat->cs.ts = 1;
3251 2442 : return load_cs(tr, &bat->cs, type, ni->base.id);
3252 7359 : } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
3253 0 : return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
3254 : } else {
3255 7359 : sql_column *c = ol_first_node(ni->t->columns)->data;
3256 7359 : sql_delta *d = col_timestamp_delta(tr, c);
3257 :
3258 7359 : if (d) {
3259 : /* Here we also handle indices created through alter stmts */
3260 : /* These need to be created aligned to the existing data */
3261 7359 : if (d->cs.bid) {
3262 7359 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3263 7359 : if(bat->cs.bid == BID_NIL)
3264 7359 : ok = LOG_ERR;
3265 : }
3266 : } else {
3267 : return LOG_ERR;
3268 : }
3269 :
3270 7359 : bat->cs.ucnt = 0;
3271 :
3272 7359 : if (!new) {
3273 0 : bat->cs.uibid = e_bat(TYPE_oid);
3274 0 : if (bat->cs.uibid == BID_NIL)
3275 0 : ok = LOG_ERR;
3276 0 : bat->cs.uvbid = e_bat(type);
3277 0 : if(bat->cs.uvbid == BID_NIL)
3278 0 : ok = LOG_ERR;
3279 : }
3280 7359 : bat->cs.ucnt = 0;
3281 7359 : if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
3282 632 : trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
3283 : }
3284 : return ok;
3285 : }
3286 :
3287 : static int
3288 6456 : log_create_idx_(sql_trans *tr, sql_idx *i)
3289 : {
3290 6456 : assert(!isTempTable(i->t));
3291 6456 : return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3292 : }
3293 :
3294 : static int
3295 618 : log_create_idx(sql_trans *tr, sql_change *change)
3296 : {
3297 618 : return log_create_idx_(tr, (sql_idx*)change->obj);
3298 : }
3299 :
3300 : static int
3301 632 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3302 : {
3303 632 : sql_idx *i = (sql_idx*)change->obj;
3304 632 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3305 632 : if (!tr->parent)
3306 632 : i->base.new = 0;
3307 632 : return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
3308 : return LOG_OK;
3309 : }
3310 :
3311 : static int
3312 5289 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
3313 : {
3314 5289 : int ok = load_cs(tr, &s->cs, TYPE_msk, id);
3315 5289 : BAT *b = NULL, *ib = NULL;
3316 :
3317 5289 : if (ok != LOG_OK)
3318 : return ok;
3319 5289 : if (!(b = temp_descriptor(s->cs.bid)))
3320 : return LOG_ERR;
3321 5289 : ib = b;
3322 :
3323 5289 : if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
3324 0 : bat_destroy(ib);
3325 0 : return LOG_ERR;
3326 : }
3327 :
3328 5289 : if (BATcount(b)) {
3329 435 : if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
3330 0 : bat_destroy(ib);
3331 0 : return LOG_ERR;
3332 : }
3333 673 : if (BATtdense(b)) {
3334 238 : size_t start = b->tseqbase;
3335 238 : size_t cnt = BATcount(b);
3336 238 : ok = delete_range(tr, t, s, start, cnt);
3337 : } else {
3338 197 : assert(b->tsorted);
3339 197 : BUN icnt = BATcount(b);
3340 197 : BATiter bi = bat_iterator(b);
3341 197 : size_t lcnt = 1;
3342 197 : oid n;
3343 197 : segment *seg = s->segs->h;
3344 197 : if (complex_cand(b)) {
3345 0 : oid o = * (oid *) Tpos(&bi, 0);
3346 0 : n = o + 1;
3347 0 : for (BUN i = 1; i < icnt; i++) {
3348 0 : o = * (oid *) Tpos(&bi, i);
3349 0 : if (o == n) {
3350 0 : lcnt++;
3351 0 : n++;
3352 : } else {
3353 0 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3354 : break;
3355 : lcnt = 0;
3356 : }
3357 0 : if (!lcnt) {
3358 0 : n = o + 1;
3359 0 : lcnt = 1;
3360 : }
3361 : }
3362 : } else {
3363 197 : oid *o = bi.base;
3364 197 : n = o[0]+1;
3365 294794 : for (size_t i=1; i<icnt; i++) {
3366 294597 : if (o[i] == n) {
3367 291852 : lcnt++;
3368 291852 : n++;
3369 : } else {
3370 2745 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3371 : break;
3372 : lcnt = 0;
3373 : }
3374 294597 : if (!lcnt) {
3375 2745 : n = o[i]+1;
3376 2745 : lcnt = 1;
3377 : }
3378 : }
3379 : }
3380 197 : if (lcnt && ok == LOG_OK)
3381 197 : ok = delete_range(tr, t, s, n-lcnt, lcnt);
3382 197 : bat_iterator_end(&bi);
3383 : }
3384 435 : if (ok == LOG_OK)
3385 7042 : for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
3386 6607 : if (seg->ts == tr->tid)
3387 3436 : seg->ts = 1;
3388 : } else {
3389 4854 : if (ok == LOG_OK) {
3390 4854 : BAT *bb = quick_descriptor(s->cs.bid);
3391 :
3392 4854 : if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
3393 : ok = LOG_ERR;
3394 : } else {
3395 4854 : segment *seg = s->segs->h;
3396 4854 : if (seg->ts == tr->tid)
3397 4854 : seg->ts = 1;
3398 : }
3399 : }
3400 : }
3401 5289 : if (b != ib)
3402 5289 : bat_destroy(b);
3403 5289 : bat_destroy(ib);
3404 :
3405 5289 : return ok;
3406 : }
3407 :
3408 : static int
3409 26677 : create_del(sql_trans *tr, sql_table *t)
3410 : {
3411 26677 : int ok = LOG_OK, new = 0;
3412 26677 : BAT *b;
3413 26677 : storage *bat = ATOMIC_PTR_GET(&t->data);
3414 :
3415 26677 : if (!bat) {
3416 26677 : new = 1;
3417 26677 : bat = ZNEW(storage);
3418 26677 : if(!bat)
3419 : return LOG_ERR;
3420 26677 : ATOMIC_PTR_INIT(&t->data, bat);
3421 26677 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3422 26677 : bat->cs.ts = tr->tid;
3423 : }
3424 :
3425 26677 : if (!isNew(t) && !isTempTable(t)) {
3426 5289 : bat->cs.ts = tr->ts;
3427 5289 : return load_storage(tr, t, bat, t->base.id);
3428 21388 : } else if (bat->cs.bid) {
3429 : return ok;
3430 : } else {
3431 21388 : assert(!bat->segs);
3432 21388 : if (!(bat->segs = new_segments(tr, 0)))
3433 : return LOG_ERR;
3434 :
3435 21388 : b = bat_new(TYPE_msk, t->sz, PERSISTENT);
3436 21388 : if(b != NULL) {
3437 21388 : bat_set_access(b, BAT_READ);
3438 21388 : bat->cs.bid = temp_create(b);
3439 21388 : bat_destroy(b);
3440 : } else {
3441 : return LOG_ERR;
3442 : }
3443 21388 : if (new)
3444 28249 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
3445 : }
3446 : return ok;
3447 : }
3448 :
3449 : static int
3450 199328 : log_segment(sql_trans *tr, segment *s, sqlid id)
3451 : {
3452 199328 : sqlstore *store = tr->store;
3453 199328 : msk m = s->deleted;
3454 199328 : return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
3455 : }
3456 :
3457 : static int
3458 97813 : log_segments(sql_trans *tr, segments *segs, sqlid id)
3459 : {
3460 : /* log segments */
3461 97813 : lock_table(tr->store, id);
3462 462397 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3463 364584 : unlock_table(tr->store, id);
3464 364584 : if (seg->ts == tr->tid && seg->end-seg->start) {
3465 151343 : if (log_segment(tr, seg, id) != LOG_OK) {
3466 : return LOG_ERR;
3467 : }
3468 : }
3469 364584 : lock_table(tr->store, id);
3470 : }
3471 97813 : unlock_table(tr->store, id);
3472 97813 : return LOG_OK;
3473 : }
3474 :
3475 : static int
3476 12763 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
3477 : {
3478 12763 : BAT *b;
3479 12763 : int ok = LOG_OK;
3480 :
3481 12763 : if (GDKinmemory(0))
3482 : return LOG_OK;
3483 :
3484 12730 : b = temp_descriptor(bat->cs.bid);
3485 12730 : if (b == NULL)
3486 : return LOG_ERR;
3487 :
3488 12730 : sqlstore *store = tr->store;
3489 12730 : bat_set_access(b, BAT_READ);
3490 12730 : if (ok == LOG_OK)
3491 12730 : ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
3492 12730 : if (ok == LOG_OK)
3493 12730 : ok = log_segments(tr, bat->segs, t->base.id);
3494 12730 : bat_destroy(b);
3495 12730 : return ok;
3496 : }
3497 :
3498 : static int
3499 12777 : log_create_del(sql_trans *tr, sql_change *change)
3500 : {
3501 12777 : int ok = LOG_OK;
3502 12777 : sql_table *t = (sql_table*)change->obj;
3503 :
3504 12777 : if (t->base.deleted)
3505 : return ok;
3506 12763 : assert(!isTempTable(t));
3507 12763 : ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
3508 12763 : if (ok == LOG_OK) {
3509 75674 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3510 62911 : sql_column *c = n->data;
3511 :
3512 62911 : ok = log_create_col_(tr, c);
3513 : }
3514 12763 : if (t->idxs) {
3515 18613 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3516 5850 : sql_idx *i = n->data;
3517 :
3518 5850 : if (ATOMIC_PTR_GET(&i->data))
3519 5838 : ok = log_create_idx_(tr, i);
3520 : }
3521 : }
3522 : }
3523 : return ok;
3524 : }
3525 :
3526 : static int
3527 21390 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3528 : {
3529 21390 : int ok = LOG_OK;
3530 21390 : sql_table *t = (sql_table*)change->obj;
3531 21390 : storage *dbat = ATOMIC_PTR_GET(&t->data);
3532 :
3533 21390 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
3534 120 : assert(isTempTable(t));
3535 120 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
3536 120 : if (commit_ts) dbat->segs->h->ts = commit_ts;
3537 120 : return ok;
3538 : }
3539 :
3540 21270 : if (!commit_ts) /* rollback handled by ? */
3541 : return ok;
3542 19415 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
3543 19415 : assert(ok == LOG_OK);
3544 19415 : if (ok != LOG_OK)
3545 : return ok;
3546 19415 : merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
3547 19415 : assert(dbat->cs.ts == tr->tid);
3548 19415 : dbat->cs.ts = commit_ts;
3549 19415 : if (ok == LOG_OK) {
3550 133711 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3551 114296 : sql_column *c = n->data;
3552 114296 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3553 :
3554 114296 : ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
3555 : }
3556 19415 : if (t->idxs) {
3557 25278 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3558 5863 : sql_idx *i = n->data;
3559 5863 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3560 :
3561 5863 : if (delta)
3562 5851 : ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
3563 : }
3564 : }
3565 19415 : if (!tr->parent)
3566 19413 : t->base.new = 0;
3567 : }
3568 19415 : if (!tr->parent)
3569 19413 : t->base.new = 0;
3570 : return ok;
3571 : }
3572 :
3573 : static int
3574 21350 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
3575 : {
3576 21350 : gdk_return ok = GDK_SUCCEED;
3577 :
3578 21350 : sqlstore *store = tr->store;
3579 21350 : if (!GDKinmemory(0) && b && b->cs.bid)
3580 18966 : ok = log_bat_transient(store->logger, id);
3581 21350 : if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
3582 25 : ok = log_bat_transient(store->logger, -id);
3583 21350 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3584 : }
3585 :
3586 : static int
3587 178101 : destroy_col(sqlstore *store, sql_column *c)
3588 : {
3589 178101 : (void)store;
3590 178101 : if (ATOMIC_PTR_GET(&c->data))
3591 178101 : destroy_delta(ATOMIC_PTR_GET(&c->data), true);
3592 178101 : ATOMIC_PTR_SET(&c->data, NULL);
3593 178101 : return LOG_OK;
3594 : }
3595 :
3596 : static int
3597 19451 : log_destroy_col_(sql_trans *tr, sql_column *c)
3598 : {
3599 19451 : int ok = LOG_OK;
3600 19451 : assert(!isTempTable(c->t));
3601 19451 : if (!tr->parent) /* don't write save point commits */
3602 19451 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3603 19451 : return ok;
3604 : }
3605 :
3606 : static int
3607 19451 : log_destroy_col(sql_trans *tr, sql_change *change)
3608 : {
3609 19451 : sql_column *c = (sql_column*)change->obj;
3610 19451 : int res = log_destroy_col_(tr, c);
3611 19451 : change->obj = NULL;
3612 19451 : column_destroy(tr->store, c);
3613 19451 : return res;
3614 : }
3615 :
3616 : static int
3617 11600 : destroy_idx(sqlstore *store, sql_idx *i)
3618 : {
3619 11600 : (void)store;
3620 11600 : if (ATOMIC_PTR_GET(&i->data))
3621 11600 : destroy_delta(ATOMIC_PTR_GET(&i->data), true);
3622 11600 : ATOMIC_PTR_SET(&i->data, NULL);
3623 11600 : return LOG_OK;
3624 : }
3625 :
3626 : static int
3627 1975 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
3628 : {
3629 1975 : int ok = LOG_OK;
3630 1975 : assert(!isTempTable(i->t));
3631 1975 : if (ATOMIC_PTR_GET(&i->data)) {
3632 1899 : if (!tr->parent) /* don't write save point commits */
3633 1899 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3634 : }
3635 1975 : return ok;
3636 : }
3637 :
3638 : static int
3639 1975 : log_destroy_idx(sql_trans *tr, sql_change *change)
3640 : {
3641 1975 : sql_idx *i = (sql_idx*)change->obj;
3642 1975 : int res = log_destroy_idx_(tr, i);
3643 1975 : change->obj = NULL;
3644 1975 : idx_destroy(tr->store, i);
3645 1975 : return res;
3646 : }
3647 :
3648 : static int
3649 28248 : destroy_del(sqlstore *store, sql_table *t)
3650 : {
3651 28248 : (void)store;
3652 28248 : if (ATOMIC_PTR_GET(&t->data))
3653 28244 : destroy_storage(ATOMIC_PTR_GET(&t->data));
3654 28248 : ATOMIC_PTR_SET(&t->data, NULL);
3655 28248 : return LOG_OK;
3656 : }
3657 :
3658 : static int
3659 3311 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
3660 : {
3661 3311 : gdk_return ok = GDK_SUCCEED;
3662 :
3663 3311 : sqlstore *store = tr->store;
3664 3311 : if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
3665 3311 : bat && bat->cs.bid)
3666 3311 : ok = log_bat_transient(store->logger, id);
3667 3311 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3668 : }
3669 :
3670 : static int
3671 3311 : log_destroy_del(sql_trans *tr, sql_change *change)
3672 : {
3673 3311 : int ok = LOG_OK;
3674 3311 : sql_table *t = (sql_table*)change->obj;
3675 :
3676 3311 : assert(!isTempTable(t));
3677 3311 : ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
3678 3311 : return ok;
3679 : }
3680 :
3681 : static int
3682 24825 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3683 : {
3684 24825 : (void)tr;
3685 24825 : (void)change;
3686 24825 : (void)commit_ts;
3687 24825 : (void)oldest;
3688 24825 : if (commit_ts)
3689 24802 : change->handled = true;
3690 24825 : return 0;
3691 : }
3692 :
3693 : static int
3694 3382 : drop_del(sql_trans *tr, sql_table *t)
3695 : {
3696 3382 : int ok = LOG_OK;
3697 :
3698 3382 : if (!isNew(t)) {
3699 3382 : storage *bat = ATOMIC_PTR_GET(&t->data);
3700 3447 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, isTempTable(t) ? NULL : &log_destroy_del);
3701 : }
3702 3382 : return ok;
3703 : }
3704 :
3705 : static int
3706 19466 : drop_col(sql_trans *tr, sql_column *c)
3707 : {
3708 19466 : assert(!isNew(c));
3709 19466 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
3710 19466 : trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, isTempTable(c->t) ? NULL : &log_destroy_col);
3711 19466 : return LOG_OK;
3712 : }
3713 :
3714 : static int
3715 1977 : drop_idx(sql_trans *tr, sql_idx *i)
3716 : {
3717 1977 : assert(!isNew(i));
3718 1977 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
3719 1977 : trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, isTempTable(i->t) ? NULL : &log_destroy_idx);
3720 1977 : return LOG_OK;
3721 : }
3722 :
3723 :
3724 : static BUN
3725 129461 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
3726 : {
3727 129461 : BAT *b;
3728 129461 : BUN sz = 0;
3729 :
3730 129461 : (void)tr;
3731 129461 : assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
3732 129461 : if (cs->bid && renew) {
3733 129457 : b = quick_descriptor(cs->bid);
3734 129490 : if (b) {
3735 129490 : sz += BATcount(b);
3736 129490 : if (cs->st == ST_DICT) {
3737 2 : bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
3738 2 : BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
3739 :
3740 2 : if (nebid == BID_NIL || !n) {
3741 0 : temp_destroy(nebid);
3742 0 : bat_destroy(n);
3743 0 : return BUN_NONE;
3744 : }
3745 2 : temp_destroy(cs->ebid);
3746 2 : cs->ebid = nebid;
3747 2 : if (!temp)
3748 2 : bat_set_access(n, BAT_READ);
3749 2 : temp_destroy(cs->bid);
3750 2 : cs->bid = temp_create(n); /* create empty copy */
3751 2 : bat_destroy(n);
3752 : } else {
3753 129488 : bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
3754 :
3755 129270 : if (nbid == BID_NIL)
3756 : return BUN_NONE;
3757 129270 : temp_destroy(cs->bid);
3758 129410 : cs->bid = nbid;
3759 : }
3760 : } else {
3761 : return BUN_NONE;
3762 : }
3763 : }
3764 129416 : if (cs->uibid) {
3765 129273 : temp_destroy(cs->uibid);
3766 129347 : cs->uibid = 0;
3767 : }
3768 129490 : if (cs->uvbid) {
3769 129346 : temp_destroy(cs->uvbid);
3770 129350 : cs->uvbid = 0;
3771 : }
3772 129494 : cs->cleared = true;
3773 129494 : cs->ucnt = 0;
3774 129494 : return sz;
3775 : }
3776 :
3777 : static BUN
3778 129305 : clear_col(sql_trans *tr, sql_column *c, bool renew)
3779 : {
3780 129305 : bool update_conflict = false;
3781 129305 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
3782 :
3783 129305 : if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
3784 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3785 129342 : assert(c->t->persistence != SQL_DECLARED_TABLE);
3786 129342 : if (odelta != delta)
3787 129347 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
3788 129332 : if (delta)
3789 129332 : return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
3790 : return 0;
3791 : }
3792 :
3793 : static BUN
3794 21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
3795 : {
3796 21 : bool update_conflict = false;
3797 21 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
3798 :
3799 21 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
3800 15 : return 0;
3801 6 : if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
3802 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3803 6 : assert(i->t->persistence != SQL_DECLARED_TABLE);
3804 6 : if (odelta != delta)
3805 6 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
3806 6 : if (delta)
3807 6 : return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
3808 : return 0;
3809 : }
3810 :
3811 : static int
3812 142 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
3813 : {
3814 142 : if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
3815 : return LOG_ERR;
3816 142 : if (s->segs)
3817 142 : destroy_segments(s->segs);
3818 142 : if (!(s->segs = new_segments(tr, 0)))
3819 : return LOG_ERR;
3820 : return LOG_OK;
3821 : }
3822 :
3823 :
3824 : /*
3825 : * Clear the table, in general this means replacing the storage,
3826 : * but in case of earlier deletes (or inserts by this transaction), we only mark
3827 : * all segments as deleted.
3828 : * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
3829 : */
3830 : static BUN
3831 41831 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
3832 : {
3833 41831 : int clear = !in_transaction, ok = LOG_OK;
3834 41831 : bool conflict = false;
3835 41831 : storage *bat;
3836 :
3837 41882 : if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
3838 15825 : return conflict?BUN_NONE-1:BUN_NONE;
3839 :
3840 26007 : if (!clear) {
3841 51 : lock_table(tr->store, t->base.id);
3842 51 : ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
3843 51 : unlock_table(tr->store, t->base.id);
3844 : }
3845 26007 : assert(t->persistence != SQL_DECLARED_TABLE);
3846 26007 : if (!in_transaction)
3847 25959 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
3848 26005 : if (ok == LOG_ERR)
3849 : return BUN_NONE;
3850 26005 : if (ok == LOG_CONFLICT)
3851 0 : return BUN_NONE - 1;
3852 : return LOG_OK;
3853 : }
3854 :
3855 : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
3856 : static BUN
3857 41811 : clear_table(sql_trans *tr, sql_table *t)
3858 : {
3859 41811 : node *n = ol_first_node(t->columns);
3860 41811 : sql_column *c = n->data;
3861 41811 : storage *d = tab_timestamp_storage(tr, t);
3862 41827 : int in_transaction, clear;
3863 41827 : BUN sz, clear_ok;
3864 :
3865 41827 : if (!d)
3866 : return BUN_NONE;
3867 41827 : lock_table(tr->store, t->base.id);
3868 41831 : in_transaction = segments_in_transaction(tr, t);
3869 41832 : unlock_table(tr->store, t->base.id);
3870 41831 : clear = !in_transaction;
3871 41831 : sz = count_col(tr, c, CNT_ACTIVE);
3872 41831 : if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
3873 : return clear_ok;
3874 :
3875 26005 : if (in_transaction)
3876 : return sz;
3877 :
3878 155296 : for (; n; n = n->next) {
3879 129340 : c = n->data;
3880 :
3881 129340 : if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
3882 0 : return clear_ok;
3883 : }
3884 25956 : if (t->idxs) {
3885 25977 : for (n = ol_first_node(t->idxs); n; n = n->next) {
3886 21 : sql_idx *ci = n->data;
3887 :
3888 21 : if (isTable(ci->t) && idx_has_column(ci->type) &&
3889 21 : (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
3890 0 : return clear_ok;
3891 : }
3892 : }
3893 25956 : if (clear)
3894 25956 : d->segs->nr_reused = 0;
3895 25956 : return sz;
3896 : }
3897 :
3898 : static int
3899 158775 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
3900 : {
3901 158775 : sqlstore *store = tr->store;
3902 158775 : gdk_return ok = GDK_SUCCEED;
3903 :
3904 158775 : (void) t;
3905 158775 : (void) segs;
3906 158775 : if (GDKinmemory(0))
3907 : return LOG_OK;
3908 :
3909 158768 : if (cs->cleared) {
3910 155358 : assert(cs->ucnt == 0);
3911 155358 : BAT *ins = temp_descriptor(cs->bid);
3912 155358 : if (!ins)
3913 : return LOG_ERR;
3914 155358 : assert(!isEbat(ins));
3915 155358 : bat_set_access(ins, BAT_READ);
3916 155358 : ok = log_bat_persists(store->logger, ins, id);
3917 155358 : bat_destroy(ins);
3918 155358 : if (ok == GDK_SUCCEED && cs->ebid) {
3919 56 : BAT *ins = temp_descriptor(cs->ebid);
3920 56 : if (!ins)
3921 : return LOG_ERR;
3922 56 : assert(!isEbat(ins));
3923 56 : bat_set_access(ins, BAT_READ);
3924 56 : ok = log_bat_persists(store->logger, ins, -id);
3925 56 : bat_destroy(ins);
3926 : }
3927 155358 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3928 : }
3929 :
3930 3410 : assert(!isTempTable(t));
3931 :
3932 3410 : if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
3933 2819 : BAT *ui = temp_descriptor(cs->uibid);
3934 2819 : BAT *uv = temp_descriptor(cs->uvbid);
3935 : /* any updates */
3936 2819 : if (ui == NULL || uv == NULL) {
3937 : ok = GDK_FAIL;
3938 2819 : } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
3939 2819 : ok = log_delta(store->logger, ui, uv, id);
3940 2819 : bat_destroy(ui);
3941 2819 : bat_destroy(uv);
3942 : }
3943 2819 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3944 : }
3945 :
3946 : static inline int
3947 59133 : tr_log_table_start(sql_trans *tr, sql_table *t) {
3948 59133 : sqlstore *store = tr->store;
3949 59133 : return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3950 : }
3951 :
3952 : static inline int
3953 59133 : tr_log_table_end(sql_trans *tr, sql_table *t) {
3954 59133 : sqlstore *store = tr->store;
3955 59133 : return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3956 : }
3957 :
3958 : static int
3959 59133 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
3960 : {
3961 59133 : sqlstore *store = tr->store;
3962 59133 : gdk_return ok = GDK_SUCCEED;
3963 :
3964 59133 : size_t end = segs_end(segs, tr, t);
3965 :
3966 59133 : if (tr_log_table_start(tr, t) != LOG_OK)
3967 : return LOG_ERR;
3968 :
3969 59133 : size_t nr_appends = 0;
3970 :
3971 59133 : lock_table(tr->store, t->base.id);
3972 385143 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3973 326010 : unlock_table(tr->store, t->base.id);
3974 :
3975 326010 : if (seg->ts == tr->tid && seg->end-seg->start) {
3976 120621 : if (!seg->deleted) {
3977 47985 : if (log_segment(tr, seg, t->base.id) != LOG_OK)
3978 : return LOG_ERR;
3979 :
3980 47985 : nr_appends += (seg->end - seg->start);
3981 : }
3982 : }
3983 326010 : lock_table(tr->store, t->base.id);
3984 : }
3985 59133 : unlock_table(tr->store, t->base.id);
3986 :
3987 411567 : for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
3988 352434 : sql_column *c = n->data;
3989 352434 : column_storage *cs = ATOMIC_PTR_GET(&c->data);
3990 :
3991 352434 : if (cs->cleared) {
3992 3 : ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
3993 3 : continue;
3994 : }
3995 :
3996 352431 : lock_table(tr->store, t->base.id);
3997 352431 : if (!cs->cleared) {
3998 2290491 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
3999 1938060 : unlock_table(tr->store, t->base.id);
4000 1938060 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4001 : /* append col*/
4002 271681 : BAT *ins = temp_descriptor(cs->bid);
4003 271681 : if (ins == NULL)
4004 : return LOG_ERR;
4005 271681 : assert(BATcount(ins) >= cur->end);
4006 271681 : ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
4007 271681 : bat_destroy(ins);
4008 : }
4009 1938060 : lock_table(tr->store, t->base.id);
4010 : }
4011 : }
4012 352431 : unlock_table(tr->store, t->base.id);
4013 :
4014 352431 : if (ok == GDK_SUCCEED && cs->ebid) {
4015 19 : BAT *ins = temp_descriptor(cs->ebid);
4016 19 : if (ins == NULL)
4017 : return LOG_ERR;
4018 19 : if (BATcount(ins) > ins->batInserted)
4019 17 : ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
4020 19 : BATcommit(ins, BATcount(ins));
4021 19 : bat_destroy(ins);
4022 : }
4023 : }
4024 :
4025 59133 : if (t->idxs) {
4026 64320 : for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
4027 5187 : sql_idx *i = n->data;
4028 :
4029 5187 : if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
4030 4409 : continue;
4031 778 : column_storage *cs = ATOMIC_PTR_GET(&i->data);
4032 :
4033 778 : if (cs) {
4034 778 : if (cs->cleared) {
4035 0 : ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4036 0 : continue;
4037 : }
4038 :
4039 778 : lock_table(tr->store, t->base.id);
4040 2387 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4041 1609 : unlock_table(tr->store, t->base.id);
4042 1609 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4043 : /* append idx */
4044 730 : BAT *ins = temp_descriptor(cs->bid);
4045 730 : if (ins == NULL)
4046 : return LOG_ERR;
4047 730 : assert(BATcount(ins) >= cur->end);
4048 730 : ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
4049 730 : bat_destroy(ins);
4050 : }
4051 1609 : lock_table(tr->store, t->base.id);
4052 : }
4053 778 : unlock_table(tr->store, t->base.id);
4054 : }
4055 : }
4056 : }
4057 :
4058 59133 : if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
4059 0 : return LOG_ERR;
4060 :
4061 : return LOG_OK;
4062 : }
4063 :
4064 : static int
4065 85083 : log_storage(sql_trans *tr, sql_table *t, storage *s)
4066 : {
4067 85083 : int ok = LOG_OK;
4068 85083 : bool cleared = s->cs.cleared;
4069 85083 : if (ok == LOG_OK && cleared)
4070 25950 : ok = tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
4071 25950 : if (ok == LOG_OK)
4072 85083 : ok = log_segments(tr, s->segs, t->base.id);
4073 85083 : if (ok == LOG_OK && !cleared)
4074 59133 : ok = log_table_append(tr, t, s->segs);
4075 85083 : return ok;
4076 : }
4077 :
4078 : static void
4079 422625 : merge_cs( column_storage *cs, const char* caller)
4080 : {
4081 422625 : if (cs->bid && cs->ucnt) {
4082 2828 : BAT *cur = temp_descriptor(cs->bid);
4083 2828 : BAT *ui = temp_descriptor(cs->uibid);
4084 2828 : BAT *uv = temp_descriptor(cs->uvbid);
4085 :
4086 2828 : if (!cur || !ui || !uv) {
4087 0 : bat_destroy(ui);
4088 0 : bat_destroy(uv);
4089 0 : bat_destroy(cur);
4090 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4091 : return;
4092 : }
4093 2828 : assert(BATcount(ui) == BATcount(uv));
4094 :
4095 : /* any updates */
4096 2828 : assert(!isEbat(cur));
4097 2828 : if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
4098 0 : bat_destroy(ui);
4099 0 : bat_destroy(uv);
4100 0 : bat_destroy(cur);
4101 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4102 : return;
4103 : }
4104 : /* cleanup the old deltas */
4105 2828 : temp_destroy(cs->uibid);
4106 2828 : temp_destroy(cs->uvbid);
4107 2828 : cs->uibid = e_bat(TYPE_oid);
4108 2828 : cs->uvbid = e_bat(cur->ttype);
4109 2828 : assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
4110 2828 : cs->ucnt = 0;
4111 2828 : bat_destroy(ui);
4112 2828 : bat_destroy(uv);
4113 2828 : bat_destroy(cur);
4114 : }
4115 422625 : cs->cleared = false;
4116 422625 : cs->merged = true;
4117 422625 : return;
4118 : }
4119 :
4120 : static lng
4121 379936 : merge_delta( sql_delta *obat)
4122 : {
4123 379936 : lng res = 0;
4124 379936 : if (obat && obat->next && !obat->cs.merged)
4125 132832 : res += merge_delta(obat->next);
4126 379936 : res += obat->cs.ucnt;
4127 379936 : merge_cs(&obat->cs, __func__);
4128 379936 : return res;
4129 : }
4130 :
4131 : static void
4132 42689 : merge_storage(storage *tdb)
4133 : {
4134 42689 : merge_cs(&tdb->cs, __func__);
4135 :
4136 42689 : if (tdb->next) {
4137 452 : destroy_storage(tdb->next);
4138 452 : tdb->next = NULL;
4139 : }
4140 42689 : }
4141 :
4142 : static sql_delta *
4143 1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
4144 : {
4145 : /* commit ie copy back to the parent transaction */
4146 1 : if (delta && delta->cs.ts == commit_ts && delta->next) {
4147 1 : sql_delta *od = delta->next;
4148 1 : if (od->cs.ts == commit_ts) {
4149 0 : sql_delta t = *od, *n = od->next;
4150 0 : *od = *delta;
4151 0 : od->next = n;
4152 0 : *delta = t;
4153 0 : delta->next = NULL;
4154 0 : destroy_delta(delta, true);
4155 0 : return od;
4156 : }
4157 : }
4158 : return delta;
4159 : }
4160 :
4161 : static int
4162 132796 : log_update_col( sql_trans *tr, sql_change *change)
4163 : {
4164 132796 : sql_column *c = (sql_column*)change->obj;
4165 132796 : assert(!isTempTable(c->t));
4166 :
4167 132796 : if (isDeleted(c->t)) {
4168 0 : change->handled = true;
4169 0 : return LOG_OK;
4170 : }
4171 :
4172 132796 : if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
4173 132796 : storage *s = ATOMIC_PTR_GET(&c->t->data);
4174 132796 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
4175 132796 : return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
4176 : }
4177 : return LOG_OK;
4178 : }
4179 :
4180 : static int
4181 217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
4182 : {
4183 217 : sqlstore *store = Store;
4184 :
4185 217 : sql_delta *d = (sql_delta*)change->data;
4186 217 : if (d->cs.ts < oldest) {
4187 82 : destroy_delta(d, false);
4188 82 : if (change->commit == &commit_update_idx)
4189 2 : table_destroy(store, ((sql_idx*)change->obj)->t);
4190 : else
4191 80 : table_destroy(store, ((sql_column*)change->obj)->t);
4192 82 : return 1;
4193 : }
4194 135 : if (d->cs.ts > TRANSACTION_ID_BASE)
4195 82 : d->cs.ts = store_get_timestamp(store) + 1;
4196 : return 0;
4197 : }
4198 :
4199 : static int
4200 9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
4201 : {
4202 9 : sqlstore *store = Store;
4203 :
4204 9 : storage *d = (storage*)change->data;
4205 9 : if (d->cs.ts < oldest) {
4206 3 : destroy_storage(d);
4207 3 : table_destroy(store, (sql_table*)change->obj);
4208 3 : return 1;
4209 : }
4210 6 : if (d->cs.ts > TRANSACTION_ID_BASE)
4211 3 : d->cs.ts = store_get_timestamp(store) + 1;
4212 : return 0;
4213 : }
4214 :
4215 : static int
4216 132914 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
4217 : {
4218 132914 : (void) type; // TODO transaction_layer_revamp remove if remains unused
4219 :
4220 132914 : sql_delta *delta = ATOMIC_PTR_GET(data), *idelta = delta;
4221 :
4222 132914 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4223 3 : int ok = LOG_OK;
4224 3 : assert(isTempTable(t));
4225 3 : if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
4226 0 : ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
4227 3 : if (!tr->parent)
4228 0 : t->base.new = base->new = 0;
4229 3 : change->handled = true;
4230 3 : return ok;
4231 : }
4232 :
4233 132911 : if (commit_ts)
4234 132829 : delta->cs.ts = commit_ts;
4235 132911 : if (!commit_ts) { /* rollback */
4236 82 : sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
4237 :
4238 82 : if (change->ts && t->base.new) /* handled by create col */
4239 : return LOG_OK;
4240 82 : if (o != d) {
4241 0 : while(o && o->next != d)
4242 : o = o->next;
4243 : }
4244 82 : if (o == ATOMIC_PTR_GET(data))
4245 82 : ATOMIC_PTR_SET(data, d->next);
4246 : else
4247 0 : o->next = d->next;
4248 82 : d->next = NULL;
4249 82 : change->cleanup = &tc_gc_rollbacked;
4250 132829 : } else if (!tr->parent) {
4251 : /* merge deltas */
4252 276067 : while (delta && delta->cs.ts > oldest)
4253 143239 : delta = delta->next;
4254 132828 : if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
4255 24738 : lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
4256 24738 : idelta->nr_updates += merge_delta(delta);
4257 24738 : unlock_column(tr->store, base->id);
4258 : }
4259 1 : } else if (tr->parent) /* move delta into older and cleanup current save points */
4260 1 : ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
4261 : return LOG_OK;
4262 : }
4263 :
4264 : static int
4265 132886 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4266 : {
4267 :
4268 132886 : sql_column *c = (sql_column*)change->obj;
4269 132886 : sql_base* base = &c->base;
4270 132886 : sql_table* t = c->t;
4271 132886 : ATOMIC_PTR_TYPE* data = &c->data;
4272 132886 : int type = c->type.type->localtype;
4273 :
4274 132886 : if (change->handled || isDeleted(c->t))
4275 : return LOG_OK;
4276 :
4277 132886 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4278 : }
4279 :
4280 : static int
4281 26 : log_update_idx( sql_trans *tr, sql_change *change)
4282 : {
4283 26 : sql_idx *i = (sql_idx*)change->obj;
4284 26 : assert(!isTempTable(i->t));
4285 :
4286 26 : if (isDeleted(i->t)) {
4287 0 : change->handled = true;
4288 0 : return LOG_OK;
4289 : }
4290 :
4291 26 : if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
4292 26 : storage *s = ATOMIC_PTR_GET(&i->t->data);
4293 26 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
4294 26 : return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
4295 : }
4296 : return LOG_OK;
4297 : }
4298 :
4299 : static int
4300 28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4301 : {
4302 28 : sql_idx *i = (sql_idx*)change->obj;
4303 28 : sql_base* base = &i->base;
4304 28 : sql_table* t = i->t;
4305 28 : ATOMIC_PTR_TYPE* data = &i->data;
4306 28 : int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
4307 :
4308 28 : if (change->handled || isDeleted(i->t))
4309 : return LOG_OK;
4310 :
4311 28 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4312 : }
4313 :
4314 : static storage *
4315 26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
4316 : {
4317 26 : if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
4318 0 : storage *od = dbat->next;
4319 0 : if (od->cs.ts == commit_ts) {
4320 0 : storage t = *od, *n = od->next;
4321 0 : *od = *dbat;
4322 0 : od->next = n;
4323 0 : *dbat = t;
4324 0 : dbat->next = NULL;
4325 0 : destroy_storage(dbat);
4326 0 : return od;
4327 : }
4328 : }
4329 : return dbat;
4330 : }
4331 :
4332 : static int
4333 85083 : log_update_del( sql_trans *tr, sql_change *change)
4334 : {
4335 85083 : sql_table *t = (sql_table*)change->obj;
4336 85083 : assert(!isTempTable(t));
4337 :
4338 85083 : if (isDeleted(t)) {
4339 0 : change->handled = true;
4340 0 : return LOG_OK;
4341 : }
4342 :
4343 85083 : if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
4344 85083 : return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
4345 : return LOG_OK;
4346 : }
4347 :
4348 : static int
4349 89714 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4350 : {
4351 89714 : int ok = LOG_OK;
4352 89714 : sql_table *t = (sql_table*)change->obj;
4353 89714 : storage *dbat = ATOMIC_PTR_GET(&t->data);
4354 :
4355 89714 : if (change->handled || isDeleted(t))
4356 : return ok;
4357 :
4358 89714 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4359 22 : assert(isTempTable(t));
4360 22 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
4361 22 : if (commit_ts) dbat->segs->h->ts = commit_ts;
4362 22 : change->handled = true;
4363 22 : return ok;
4364 : }
4365 :
4366 89692 : lock_table(tr->store, t->base.id);
4367 89692 : if (!commit_ts) { /* rollback */
4368 4308 : if (dbat->cs.ts == tr->tid) {
4369 6 : if (change->ts && t->base.new) { /* handled by the create table */
4370 3 : unlock_table(tr->store, t->base.id);
4371 3 : return ok;
4372 : }
4373 3 : storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
4374 :
4375 3 : if (o != d) {
4376 0 : while(o && o->next != d)
4377 : o = o->next;
4378 : }
4379 3 : if (o == ATOMIC_PTR_GET(&t->data)) {
4380 3 : assert(d->next);
4381 3 : ATOMIC_PTR_SET(&t->data, d->next);
4382 : } else
4383 0 : o->next = d->next;
4384 3 : d->next = NULL;
4385 3 : change->cleanup = &tc_gc_rollbacked_storage;
4386 : } else
4387 4302 : rollback_segments(dbat->segs, tr, change, oldest);
4388 85384 : } else if (ok == LOG_OK && !tr->parent) {
4389 85358 : if (dbat->cs.ts == tr->tid) /* cleared table */
4390 25952 : dbat->cs.ts = commit_ts;
4391 :
4392 85358 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
4393 85358 : if (ok == LOG_OK) {
4394 85358 : merge_segments(dbat, tr, change, commit_ts, oldest);
4395 85358 : if (oldest == commit_ts)
4396 42689 : merge_storage(dbat);
4397 : }
4398 85358 : if (dbat)
4399 85358 : dbat->cs.cleared = false;
4400 26 : } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
4401 26 : merge_segments(dbat, tr, change, commit_ts, oldest);
4402 26 : ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
4403 26 : storage *s = change->data;
4404 26 : if (s->cs.ts == tr->tid)
4405 0 : s->cs.ts = commit_ts;
4406 : }
4407 89689 : unlock_table(tr->store, t->base.id);
4408 89689 : return ok;
4409 : }
4410 :
4411 : /* only rollback (content version) case for now */
4412 : static int
4413 19626 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
4414 : {
4415 19626 : sqlstore *store = Store;
4416 19626 : sql_column *c = (sql_column*)change->obj;
4417 :
4418 19626 : if (!c) /* cleaned earlier */
4419 : return 1;
4420 :
4421 175 : if (change->handled || isDeleted(c->t)) {
4422 0 : column_destroy(store, c);
4423 0 : return 1;
4424 : }
4425 :
4426 : /* savepoint commit (did it merge ?) */
4427 175 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4428 0 : column_destroy(store, c);
4429 0 : return 1;
4430 : }
4431 175 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4432 : return 0;
4433 174 : sql_delta *d = (sql_delta*)change->data, *id = d;
4434 174 : if (d && d->next) {
4435 :
4436 66 : if (d->cs.ts > oldest)
4437 : return LOG_OK; /* cannot cleanup yet */
4438 :
4439 : // d is oldest reachable delta
4440 63 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4441 62 : destroy_delta(d->next, true);
4442 62 : d->next = NULL;
4443 : }
4444 63 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4445 63 : id->nr_updates += merge_delta(d);
4446 63 : unlock_column(store, c->base.id);
4447 : }
4448 171 : column_destroy(store, c);
4449 171 : return 1;
4450 : }
4451 :
4452 : static int
4453 743856 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
4454 : {
4455 743856 : sqlstore *store = Store;
4456 743856 : sql_column *c = (sql_column*)change->obj;
4457 :
4458 743856 : if (!c) /* cleaned earlier */
4459 : return 1;
4460 :
4461 743856 : if (change->handled || isDeleted(c->t)) {
4462 3 : table_destroy(store, c->t);
4463 3 : return 1;
4464 : }
4465 :
4466 : /* savepoint commit (did it merge ?) */
4467 743853 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4468 31334 : table_destroy(store, c->t);
4469 31334 : return 1;
4470 : }
4471 712519 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4472 : return 0;
4473 712518 : sql_delta *d = (sql_delta*)change->data, *id = d;
4474 712518 : if (d && d->next) {
4475 :
4476 712518 : if (d->cs.ts > oldest)
4477 : return LOG_OK; /* cannot cleanup yet */
4478 :
4479 : // d is oldest reachable delta
4480 101405 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4481 5232 : destroy_delta(d->next, true);
4482 5232 : d->next = NULL;
4483 : }
4484 101405 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4485 101405 : id->nr_updates += merge_delta(d);
4486 101405 : unlock_column(store, c->base.id);
4487 : }
4488 101405 : table_destroy(store, c->t);
4489 101405 : return 1;
4490 : }
4491 :
4492 : static int
4493 2609 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
4494 : {
4495 2609 : sqlstore *store = Store;
4496 2609 : sql_idx *i = (sql_idx*)change->obj;
4497 :
4498 2609 : if (!i) /* cleaned earlier */
4499 : return 1;
4500 :
4501 634 : if (change->handled || isDeleted(i->t)) {
4502 0 : idx_destroy(store, i);
4503 0 : return 1;
4504 : }
4505 :
4506 : /* savepoint commit (did it merge ?) */
4507 634 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4508 0 : idx_destroy(store, i);
4509 0 : return 1;
4510 : }
4511 634 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4512 : return 0;
4513 634 : sql_delta *d = (sql_delta*)change->data, *id = d;
4514 634 : if (d && d->next) {
4515 0 : if (d->cs.ts > oldest)
4516 : return LOG_OK; /* cannot cleanup yet */
4517 :
4518 : // d is oldest reachable delta
4519 0 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4520 0 : destroy_delta(d->next, true);
4521 0 : d->next = NULL;
4522 : }
4523 0 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4524 0 : id->nr_updates += merge_delta(d);
4525 0 : unlock_column(store, i->base.id);
4526 : }
4527 634 : idx_destroy(store, i);
4528 634 : return 1;
4529 : }
4530 :
4531 : static int
4532 26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
4533 : {
4534 26 : sqlstore *store = Store;
4535 26 : sql_idx *i = (sql_idx*)change->obj;
4536 :
4537 26 : if (!i) /* cleaned earlier */
4538 : return 1;
4539 :
4540 26 : if (change->handled || isDeleted(i->t)) {
4541 0 : table_destroy(store, i->t);
4542 0 : return 1;
4543 : }
4544 :
4545 : /* savepoint commit (did it merge ?) */
4546 26 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4547 0 : table_destroy(store, i->t);
4548 0 : return 1;
4549 : }
4550 26 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4551 : return 0;
4552 26 : sql_delta *d = (sql_delta*)change->data, *id = d;
4553 26 : if (d && d->next) {
4554 26 : if (d->cs.ts > oldest)
4555 : return LOG_OK; /* cannot cleanup yet */
4556 :
4557 : // d is oldest reachable delta
4558 26 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4559 26 : destroy_delta(d->next, true);
4560 26 : d->next = NULL;
4561 : }
4562 26 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4563 26 : id->nr_updates += merge_delta(d);
4564 26 : unlock_column(store, i->base.id);
4565 : }
4566 26 : table_destroy(store, i->t);
4567 26 : return 1;
4568 : }
4569 :
4570 : static int
4571 222604 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
4572 : {
4573 222604 : sqlstore *store = Store;
4574 222604 : sql_table *t = (sql_table*)change->obj;
4575 :
4576 222604 : if (change->handled || isDeleted(t)) {
4577 3556 : table_destroy(store, t);
4578 3556 : return 1;
4579 : }
4580 : /* savepoint commit (did it merge ?) */
4581 219048 : if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
4582 6325 : table_destroy(store, t);
4583 6325 : return 1;
4584 : }
4585 212723 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4586 : return 0;
4587 212695 : storage *d = (storage*)change->data;
4588 212695 : if (d->next) {
4589 140738 : if (d->cs.ts > oldest)
4590 : return LOG_OK; /* cannot cleanup yet */
4591 :
4592 19176 : destroy_storage(d->next);
4593 19176 : d->next = NULL;
4594 : }
4595 91133 : table_destroy(store, t);
4596 91133 : return 1;
4597 : }
4598 :
4599 : static int
4600 30511 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
4601 : {
4602 30511 : if (nr == 0)
4603 : return LOG_OK;
4604 30511 : assert (nr > 0);
4605 30511 : if ((!offsets || !*offsets) && nr == total) {
4606 30476 : *offset = slot;
4607 30476 : return LOG_OK;
4608 : }
4609 35 : if (!*offsets) {
4610 7 : *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
4611 7 : if (!*offsets)
4612 : return LOG_ERR;
4613 : }
4614 35 : oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
4615 16174 : for(size_t i = 0; i < nr; i++)
4616 16139 : dst[i] = slot + i;
4617 35 : (*offsets)->batCount += nr;
4618 35 : (*offsets)->theap->dirty = true;
4619 35 : return LOG_OK;
4620 : }
4621 :
4622 : static int
4623 30340 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4624 : {
4625 30340 : assert(s->segs);
4626 30340 : ulng oldest = store_oldest(tr->store, NULL);
4627 30347 : BUN slot = 0;
4628 30347 : size_t total = cnt;
4629 :
4630 30347 : if (!locked)
4631 30404 : lock_table(tr->store, t->base.id);
4632 30554 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4633 : /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
4634 61428 : for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4635 30798 : if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* reuse old deleted or rolled back append */
4636 35 : if ((seg->end - seg->start) >= cnt) {
4637 : /* if previous is claimed before we could simply adjust the end/start */
4638 13 : if (p && p->ts == tr->tid && !p->deleted) {
4639 2 : slot = p->end;
4640 2 : p->end += cnt;
4641 2 : seg->start += cnt;
4642 2 : if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
4643 : ok = LOG_ERR;
4644 : break;
4645 : }
4646 2 : s->segs->nr_reused += cnt;
4647 2 : cnt = 0;
4648 2 : break;
4649 : }
4650 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4651 11 : size_t rcnt = seg->end - seg->start;
4652 11 : if (rcnt > cnt)
4653 : rcnt = cnt;
4654 11 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
4655 : ok = LOG_ERR;
4656 : break;
4657 : }
4658 : }
4659 33 : seg->ts = tr->tid;
4660 33 : seg->deleted = false;
4661 33 : slot = seg->start;
4662 33 : if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
4663 : ok = LOG_ERR;
4664 : break;
4665 : }
4666 1 : s->segs->nr_reused += (seg->end - seg->start);
4667 1 : cnt -= (seg->end - seg->start);
4668 : }
4669 : }
4670 30632 : if (ok == LOG_OK && cnt) {
4671 30619 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4672 29477 : slot = s->segs->t->end;
4673 29477 : s->segs->t->end += cnt;
4674 : } else {
4675 1142 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4676 : ok = LOG_ERR;
4677 : } else {
4678 1159 : if (!s->segs->h)
4679 0 : s->segs->h = s->segs->t;
4680 1159 : slot = s->segs->t->start;
4681 : }
4682 : }
4683 30636 : if (ok == LOG_OK)
4684 30636 : ok = add_offsets(slot, cnt, total, offset, offsets);
4685 : }
4686 30479 : if (!locked)
4687 30519 : unlock_table(tr->store, t->base.id);
4688 :
4689 30622 : if (ok == LOG_OK) {
4690 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4691 30667 : if (!in_transaction) {
4692 1144 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4693 1144 : in_transaction = true;
4694 : }
4695 30667 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4696 30612 : tr->logchanges += (lng) total;
4697 30648 : if (*offsets) {
4698 28 : BAT *pos = *offsets;
4699 28 : assert(BATcount(pos) == total);
4700 28 : BATsetcount(pos, total); /* set other properties */
4701 7 : pos->tnil = false;
4702 7 : pos->tnonil = true;
4703 7 : pos->tkey = true;
4704 7 : pos->tsorted = true;
4705 7 : pos->trevsorted = false;
4706 : }
4707 : }
4708 30582 : return ok;
4709 : }
4710 :
4711 : static int
4712 2095269 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4713 : {
4714 2095269 : if (cnt > 1 && offsets)
4715 30341 : return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
4716 2064928 : assert(s->segs);
4717 2064928 : ulng oldest = store_oldest(tr->store, NULL);
4718 2064941 : BUN slot = 0;
4719 2064941 : int reused = 0;
4720 :
4721 2064941 : if (!locked)
4722 1936839 : lock_table(tr->store, t->base.id);
4723 2064983 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4724 : /* naive vacuum approach, iterator through segments, check for large enough deleted segments
4725 : * or create new segment at the end */
4726 9952547 : for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4727 7960574 : if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* reuse old deleted or rolled back append */
4728 :
4729 73022 : if ((seg->end - seg->start) >= cnt) {
4730 :
4731 : /* if previous is claimed before we could simply adjust the end/start */
4732 73022 : if (p && p->ts == tr->tid && !p->deleted) {
4733 56537 : slot = p->end;
4734 56537 : p->end += cnt;
4735 56537 : seg->start += cnt;
4736 56537 : s->segs->nr_reused += cnt;
4737 56537 : reused = 1;
4738 56537 : break;
4739 : }
4740 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4741 16485 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
4742 : ok = LOG_ERR;
4743 : break;
4744 : }
4745 : }
4746 16485 : seg->ts = tr->tid;
4747 16485 : seg->deleted = false;
4748 16485 : slot = seg->start;
4749 16485 : s->segs->nr_reused += cnt;
4750 16485 : reused = 1;
4751 16485 : break;
4752 : }
4753 : }
4754 2064995 : if (ok == LOG_OK && !reused) {
4755 1991973 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4756 1956764 : slot = s->segs->t->end;
4757 1956764 : s->segs->t->end += cnt;
4758 : } else {
4759 35209 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4760 : ok = LOG_ERR;
4761 : } else {
4762 35209 : if (!s->segs->h)
4763 0 : s->segs->h = s->segs->t;
4764 35209 : slot = s->segs->t->start;
4765 : }
4766 : }
4767 : }
4768 2064995 : if (!locked)
4769 1936881 : unlock_table(tr->store, t->base.id);
4770 :
4771 2064995 : if (ok == LOG_OK) {
4772 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4773 2064995 : if (!in_transaction) {
4774 49293 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4775 49293 : in_transaction = true;
4776 : }
4777 2064995 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4778 2064564 : tr->logchanges += (lng) cnt;
4779 2064995 : *offset = slot;
4780 : }
4781 : return ok;
4782 : }
4783 :
4784 : /*
4785 : * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
4786 : * of the global transaction and mark the newly added storage slots unused on the global
4787 : * level but used on the local transaction level. Besides this the local transaction needs
4788 : * to update (and mark unused) any slot in between the old end and new slots.
4789 : * */
4790 : static int
4791 1967182 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4792 : {
4793 1967182 : storage *s;
4794 :
4795 : /* we have a single segment structure for each persistent table
4796 : * for temporary tables each has its own */
4797 1967182 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4798 : return LOG_ERR;
4799 :
4800 1967275 : return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
4801 : }
4802 :
4803 : /* some tables cannot be updated concurrently (user/roles etc) */
4804 : static int
4805 128118 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4806 : {
4807 128118 : storage *s;
4808 128118 : int res = 0;
4809 :
4810 : /* we have a single segment structure for each persistent table
4811 : * for temporary tables each has its own */
4812 128118 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4813 : /* TODO check for other inserts ! */
4814 : return LOG_ERR;
4815 :
4816 128118 : lock_table(tr->store, t->base.id);
4817 128118 : if ((res = segments_conflict(tr, s->segs, 1))) {
4818 4 : unlock_table(tr->store, t->base.id);
4819 4 : return LOG_CONFLICT;
4820 : }
4821 128114 : res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
4822 128114 : unlock_table(tr->store, t->base.id);
4823 128114 : return res;
4824 : }
4825 :
4826 : static int
4827 20430 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
4828 : {
4829 20430 : storage *s;
4830 20430 : int res = 0;
4831 :
4832 20430 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4833 : return LOG_ERR;
4834 :
4835 20430 : lock_table(tr->store, t->base.id);
4836 20430 : res = segments_conflict(tr, s->segs, uncommitted);
4837 20430 : unlock_table(tr->store, t->base.id);
4838 20430 : return res ? LOG_CONFLICT : LOG_OK;
4839 : }
4840 :
4841 : static size_t
4842 1612685 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
4843 : {
4844 1612685 : size_t cnt = 0;
4845 :
4846 1959479 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
4847 : ;
4848 :
4849 3989476 : for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
4850 2376780 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
4851 156019 : cnt += s->end - s->start;
4852 : }
4853 1612696 : return cnt;
4854 : }
4855 :
4856 : static BAT *
4857 1612654 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
4858 : {
4859 1612654 : lock_table(tr->store, t->base.id);
4860 1612697 : segment *s = S->segs->h;
4861 : /* step one no deletes -> dense range */
4862 1612697 : uint32_t cur = 0;
4863 1612697 : size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
4864 1612698 : if (!dnr) {
4865 1483466 : unlock_table(tr->store, t->base.id);
4866 1483434 : return BATdense(start, start, end-start);
4867 : }
4868 :
4869 129232 : BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
4870 129232 : if (!b) {
4871 0 : unlock_table(tr->store, t->base.id);
4872 0 : return NULL;
4873 : }
4874 :
4875 129232 : uint32_t *restrict dst = Tloc(b, 0);
4876 62275287 : for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
4877 62211495 : if (s->end < start)
4878 260705 : continue;
4879 61950790 : if (s->start >= end)
4880 : break;
4881 61885350 : msk m = (SEG_IS_VALID(s, tr));
4882 61885350 : size_t lnr = s->end-s->start;
4883 61885350 : if (s->start < start)
4884 20047 : lnr -= (start - s->start);
4885 61885350 : if (s->end > end)
4886 15145 : lnr -= s->end - end;
4887 :
4888 61885350 : if (m) {
4889 1386613 : size_t used = pos&31, end = 32;
4890 1386613 : if (used) {
4891 1221350 : if (lnr < (32-used))
4892 746049 : end = used + lnr;
4893 1221350 : assert(end > used);
4894 1221350 : cur |= ((1U << (end - used)) - 1) << used;
4895 1221350 : lnr -= end - used;
4896 1221350 : pos += end - used;
4897 1221350 : if (end == 32) {
4898 475326 : *dst++ = cur;
4899 475326 : cur = 0;
4900 : }
4901 : }
4902 1386613 : size_t full = lnr/32;
4903 1386613 : size_t rest = lnr%32;
4904 1386613 : if (full > 0) {
4905 352417 : memset(dst, ~0, full * sizeof(*dst));
4906 352417 : dst += full;
4907 352417 : lnr -= full * 32;
4908 352417 : pos += full * 32;
4909 : }
4910 1386613 : if (rest > 0) {
4911 607229 : cur |= (1U << rest) - 1;
4912 607229 : lnr -= rest;
4913 607229 : pos += rest;
4914 : }
4915 1386613 : assert(lnr==0);
4916 : } else {
4917 60498737 : size_t used = pos&31, end = 32;
4918 60498737 : if (used) {
4919 58626116 : if (lnr < (32-used))
4920 56651560 : end = used + lnr;
4921 :
4922 58626116 : pos+= (end-used);
4923 58626116 : lnr-= (end-used);
4924 58626116 : if (end == 32) {
4925 1974578 : *dst++ = cur;
4926 1974578 : cur = 0;
4927 : }
4928 : }
4929 60498737 : size_t full = lnr/32;
4930 60498737 : size_t rest = lnr%32;
4931 60498737 : memset(dst, 0, full * sizeof(*dst));
4932 60498737 : dst += full;
4933 60498737 : lnr -= full * 32;
4934 60498737 : pos += full * 32;
4935 60498737 : pos+= rest;
4936 60498737 : lnr-= rest;
4937 60498737 : assert(lnr==0);
4938 : }
4939 : }
4940 :
4941 129232 : unlock_table(tr->store, t->base.id);
4942 129232 : if (pos%32)
4943 127246 : *dst=cur;
4944 129232 : BATsetcount(b, nr);
4945 129232 : bn = BATmaskedcands(start, nr, b, true);
4946 129223 : BBPreclaim(b);
4947 129225 : (void)pos;
4948 129225 : assert (pos == nr);
4949 : return bn;
4950 : }
4951 :
4952 : static void * /* BAT * */
4953 1618375 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
4954 : {
4955 : /* with nr_of_parts - part_nr we can adjust parts */
4956 1618375 : storage *s = tab_timestamp_storage(tr, t);
4957 :
4958 1618643 : if (!s)
4959 : return NULL;
4960 1618643 : size_t nr = segs_end(s->segs, tr, t);
4961 :
4962 1619682 : if (!nr)
4963 7002 : return BATdense(0, 0, 0);
4964 :
4965 : /* compute proper part */
4966 1612680 : size_t part_size = nr/nr_of_parts;
4967 1612680 : size_t start = part_size * part_nr;
4968 1612680 : size_t end = start + part_size;
4969 1612680 : if (part_nr == (nr_of_parts-1))
4970 1321646 : end = nr;
4971 1612680 : assert(end <= nr);
4972 1612680 : return segments2cands(s, tr, t, start, end);
4973 : }
4974 :
4975 : static int
4976 5 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
4977 : {
4978 5 : bool update_conflict = false;
4979 :
4980 5 : if (segments_in_transaction(tr, col->t))
4981 : return LOG_CONFLICT;
4982 :
4983 5 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
4984 :
4985 5 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
4986 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
4987 5 : assert(d && d->cs.ts == tr->tid);
4988 5 : if (odelta != d)
4989 5 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
4990 5 : if (d->cs.bid)
4991 5 : temp_destroy(d->cs.bid);
4992 5 : if (d->cs.uibid)
4993 5 : temp_destroy(d->cs.uibid);
4994 5 : if (d->cs.uvbid)
4995 5 : temp_destroy(d->cs.uvbid);
4996 5 : bat_set_access(bn, BAT_READ);
4997 5 : d->cs.bid = temp_create(bn);
4998 5 : d->cs.uibid = 0;
4999 5 : d->cs.uvbid = 0;
5000 5 : d->cs.ucnt = 0;
5001 5 : d->cs.cleared = true;
5002 5 : d->cs.ts = tr->tid;
5003 5 : ATOMIC_INIT(&d->cs.refcnt, 1);
5004 5 : return LOG_OK;
5005 : }
5006 :
5007 : static int
5008 5 : vacuum_col(sql_trans *tr, sql_column *c, bool force)
5009 : {
5010 5 : if (segments_in_transaction(tr, c->t))
5011 : return LOG_CONFLICT;
5012 :
5013 5 : sql_delta *d = NULL;
5014 :
5015 : /* do we have enough to clean */
5016 5 : if ((d = bind_col_data(tr, c, NULL)) == NULL)
5017 : return LOG_CONFLICT;
5018 :
5019 : /* do we have enough to clean */
5020 5 : if (!force && (d->nr_updates) < 1024)
5021 : return LOG_OK;
5022 :
5023 5 : BAT *b = NULL, *bn = NULL;;
5024 5 : if ((b = bind_col(tr, c, 0)) == NULL)
5025 : return LOG_ERR;
5026 5 : if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
5027 0 : BBPreclaim(b);
5028 0 : return LOG_ERR;
5029 : }
5030 5 : int res = swap_bats(tr, c, bn);
5031 5 : d->nr_updates = 0;
5032 5 : BBPreclaim(b);
5033 5 : BBPreclaim(bn);
5034 5 : return res;
5035 : }
5036 :
5037 : static int
5038 0 : vacuum_tab(sql_trans *tr, sql_table *t, bool force)
5039 : {
5040 0 : if (segments_in_transaction(tr, t))
5041 : return LOG_CONFLICT;
5042 :
5043 0 : storage *s;
5044 0 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
5045 : return LOG_ERR;
5046 :
5047 0 : for( node *n = ol_first_node(t->columns); n; n = n->next) {
5048 0 : sql_column *c = n->data;
5049 :
5050 0 : if (!ATOMvarsized(c->type.type->localtype))
5051 0 : continue;
5052 0 : sql_delta *d = NULL;
5053 :
5054 : /* do we have enough to clean */
5055 0 : if ((d = bind_col_data(tr, c, NULL)) == NULL)
5056 : return LOG_CONFLICT;
5057 :
5058 : /* do we have enough to clean */
5059 0 : if (!force && (d->nr_updates + s->segs->nr_reused) < 1024)
5060 0 : continue;
5061 :
5062 0 : BAT *b = NULL, *bn = NULL;;
5063 0 : if ((b = bind_col(tr, c, 0)) == NULL)
5064 : return LOG_ERR;
5065 0 : if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
5066 0 : BBPreclaim(b);
5067 0 : return LOG_ERR;
5068 : }
5069 0 : int res = swap_bats(tr, c, bn);
5070 0 : d->nr_updates = 0;
5071 0 : BBPreclaim(b);
5072 0 : BBPreclaim(bn);
5073 0 : if (res != LOG_OK)
5074 0 : return res;
5075 : }
5076 0 : s->segs->nr_reused = 0;
5077 0 : return LOG_OK;
5078 : }
5079 :
5080 :
5081 : static int
5082 58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
5083 : {
5084 58 : bool update_conflict = false;
5085 :
5086 58 : if (segments_in_transaction(tr, col->t))
5087 : return LOG_CONFLICT;
5088 :
5089 58 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5090 :
5091 58 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5092 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5093 58 : assert(d && d->cs.ts == tr->tid);
5094 58 : assert(col->t->persistence != SQL_DECLARED_TABLE);
5095 58 : if (odelta != d)
5096 58 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
5097 :
5098 58 : d->cs.st = st;
5099 58 : d->cs.cleared = true;
5100 58 : if (d->cs.bid)
5101 58 : temp_destroy(d->cs.bid);
5102 58 : o = transfer_to_systrans(o);
5103 58 : if (o == NULL)
5104 : return LOG_ERR;
5105 58 : bat_set_access(o, BAT_READ);
5106 58 : d->cs.bid = temp_create(o);
5107 58 : if (u) {
5108 53 : if (d->cs.ebid)
5109 0 : temp_destroy(d->cs.ebid);
5110 53 : u = transfer_to_systrans(u);
5111 53 : if (u == NULL)
5112 : return LOG_ERR;
5113 53 : d->cs.ebid = temp_create(u);
5114 : }
5115 : return LOG_OK;
5116 : }
5117 :
5118 : void
5119 357 : bat_storage_init( store_functions *sf)
5120 : {
5121 357 : sf->bind_col = &bind_col;
5122 357 : sf->bind_updates = &bind_updates;
5123 357 : sf->bind_updates_idx = &bind_updates_idx;
5124 357 : sf->bind_idx = &bind_idx;
5125 357 : sf->bind_cands = &bind_cands;
5126 :
5127 357 : sf->claim_tab = &claim_tab;
5128 357 : sf->key_claim_tab = &key_claim_tab;
5129 357 : sf->tab_validate = &tab_validate;
5130 :
5131 357 : sf->append_col = &append_col;
5132 357 : sf->append_idx = &append_idx;
5133 :
5134 357 : sf->update_col = &update_col;
5135 357 : sf->update_idx = &update_idx;
5136 :
5137 357 : sf->delete_tab = &delete_tab;
5138 :
5139 357 : sf->count_del = &count_del;
5140 357 : sf->count_col = &count_col;
5141 357 : sf->count_idx = &count_idx;
5142 357 : sf->dcount_col = &dcount_col;
5143 357 : sf->min_max_col = &min_max_col;
5144 357 : sf->set_stats_col = &set_stats_col;
5145 357 : sf->sorted_col = &sorted_col;
5146 357 : sf->unique_col = &unique_col;
5147 357 : sf->double_elim_col = &double_elim_col;
5148 357 : sf->col_stats = &col_stats;
5149 357 : sf->col_set_range = &col_set_range;
5150 357 : sf->col_not_null = &col_not_null;
5151 :
5152 357 : sf->col_dup = &col_dup;
5153 357 : sf->idx_dup = &idx_dup;
5154 357 : sf->del_dup = &del_dup;
5155 :
5156 357 : sf->create_col = &create_col; /* create and add to change list */
5157 357 : sf->create_idx = &create_idx;
5158 357 : sf->create_del = &create_del;
5159 :
5160 357 : sf->destroy_col = &destroy_col; /* free resources */
5161 357 : sf->destroy_idx = &destroy_idx;
5162 357 : sf->destroy_del = &destroy_del;
5163 :
5164 357 : sf->drop_col = &drop_col; /* add drop to change list */
5165 357 : sf->drop_idx = &drop_idx;
5166 357 : sf->drop_del = &drop_del;
5167 :
5168 357 : sf->clear_table = &clear_table;
5169 :
5170 357 : sf->vacuum_col = &vacuum_col;
5171 357 : sf->vacuum_tab = &vacuum_tab;
5172 357 : sf->col_compress = &col_compress;
5173 357 : }
|