Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "bat_storage.h"
15 : #include "bat_utils.h"
16 : #include "sql_string.h"
17 : #include "gdk_atoms.h"
18 : #include "gdk_atoms.h"
19 : #include "matomic.h"
20 :
21 : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
22 : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
23 :
24 : static int log_update_col( sql_trans *tr, sql_change *c);
25 : static int log_update_idx( sql_trans *tr, sql_change *c);
26 : static int log_update_del( sql_trans *tr, sql_change *c);
27 : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
28 : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
29 : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
30 : static int log_create_col(sql_trans *tr, sql_change *change);
31 : static int log_create_idx(sql_trans *tr, sql_change *change);
32 : static int log_create_del(sql_trans *tr, sql_change *change);
33 : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
34 : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
35 : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
36 : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
37 : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
38 : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
39 : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
40 : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
41 :
42 : static void merge_delta( sql_delta *obat);
43 :
44 : /* valid
45 : * !deleted && VALID_4_READ(TS, tr) existing or newly created segment
46 : * deleted && TS > tr->ts && OLDTS < tr->ts deleted after current transaction
47 : */
48 :
49 : #define VALID_4_READ(TS,tr) \
50 : (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
51 :
52 : /* when changed, check if the old status is still valid */
53 : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
54 : (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
55 :
56 : #define SEG_VALID_4_DELETE(seg,tr) \
57 : (!seg->deleted && VALID_4_READ(seg->ts, tr))
58 :
59 : /* Delete (in current trans or by some other finised transaction, or re-used segment which used to be deleted */
60 : #define SEG_IS_DELETED(seg,tr) \
61 : ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
62 : (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
63 :
64 : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
65 : #define SEG_IS_VALID(seg, tr) \
66 : ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
67 : (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
68 :
69 : static inline BAT *
70 4871 : transfer_to_systrans(BAT *b)
71 : {
72 : /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
73 4871 : MT_lock_set(&b->theaplock);
74 4871 : if (VIEWtparent(b) || VIEWvtparent(b)) {
75 19 : MT_lock_unset(&b->theaplock);
76 19 : BAT *bn = COLcopy(b, b->ttype, true, TRANSIENT);
77 19 : BBPreclaim(b);
78 19 : b = bn;
79 19 : if (b == NULL)
80 : return NULL;
81 19 : MT_lock_set(&b->theaplock);
82 : }
83 4871 : if (b->theap->farmid == TRANSIENT ||
84 185 : (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
85 4686 : QryCtx *qc = MT_thread_get_qry_ctx();
86 4686 : if (qc) {
87 2468 : if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
88 2468 : ATOMIC_SUB(&qc->datasize, b->theap->size);
89 2468 : b->theap->farmid = SYSTRANS;
90 2468 : b->batRole = SYSTRANS;
91 : }
92 2468 : if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
93 1069 : ATOMIC_SUB(&qc->datasize, b->tvheap->size);
94 1069 : b->tvheap->farmid = SYSTRANS;
95 : }
96 : }
97 : }
98 4871 : MT_lock_unset(&b->theaplock);
99 4871 : return b;
100 : }
101 :
102 : static void
103 20328425 : lock_table(sqlstore *store, sqlid id)
104 : {
105 20328425 : MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
106 20330313 : }
107 :
108 : static void
109 20330234 : unlock_table(sqlstore *store, sqlid id)
110 : {
111 20330234 : MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
112 20330547 : }
113 :
114 : static void
115 20402663 : lock_column(sqlstore *store, sqlid id)
116 : {
117 20402663 : MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
118 20402208 : }
119 :
120 : static void
121 20402351 : unlock_column(sqlstore *store, sqlid id)
122 : {
123 20402351 : MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
124 20402140 : }
125 :
126 : static void
127 110848 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
128 : {
129 110848 : assert(cleanup);
130 110848 : trans_add(tr, dup_base(b), data, cleanup, commit, log);
131 110848 : }
132 :
133 : static void
134 132793 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
135 : {
136 132793 : assert(cleanup);
137 132793 : dup_base(&t->base);
138 132792 : trans_add(tr, b, data, cleanup, commit, log);
139 132782 : }
140 :
141 : static int
142 78270 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
143 : {
144 78270 : segment *s = change->data;
145 :
146 78270 : if (s->ts <= oldest) {
147 33218 : while(s) {
148 21064 : segment *n = s->prev;
149 21064 : ATOMIC_PTR_DESTROY(&s->next);
150 21064 : _DELETE(s);
151 21064 : s = n;
152 : }
153 12154 : sqlstore *store = Store;
154 12154 : table_destroy(store, (sql_table*)change->obj);
155 12154 : return 1;
156 : }
157 : return LOG_OK;
158 : }
159 :
160 : static void
161 21064 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
162 : {
163 : /* we can only be accessed by anything older then commit_ts */
164 21064 : if (c->cleanup == &tc_gc_seg)
165 8910 : s->prev = c->data;
166 : else
167 12154 : c->cleanup = &tc_gc_seg;
168 21064 : c->data = s;
169 21064 : s->ts = commit_ts;
170 16254 : }
171 :
172 : static segment *
173 86166 : new_segment(segment *o, sql_trans *tr, size_t cnt)
174 : {
175 86166 : segment *n = (segment*)GDKmalloc(sizeof(segment));
176 :
177 86166 : assert(tr);
178 86166 : if (n) {
179 86166 : n->ts = tr->tid;
180 86166 : n->oldts = 0;
181 86166 : n->deleted = false;
182 86166 : if (o) {
183 35248 : n->start = o->end;
184 35248 : n->end = o->end + cnt;
185 : } else {
186 50918 : n->start = 0;
187 50918 : n->end = cnt;
188 : }
189 86166 : ATOMIC_PTR_INIT(&n->next, NULL);
190 86166 : n->prev = NULL;
191 86166 : if (o)
192 35248 : ATOMIC_PTR_SET(&o->next, n);
193 : }
194 86166 : return n;
195 : }
196 :
197 : static segment *
198 80753 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
199 : {
200 80753 : assert(tr);
201 80753 : if (o->start == start && o->end == start+cnt) {
202 9682 : assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
203 9682 : o->oldts = o->ts;
204 9682 : o->ts = tr->tid;
205 9682 : o->deleted = deleted;
206 9682 : return o;
207 : }
208 71071 : segment *n = (segment*)GDKmalloc(sizeof(segment));
209 :
210 71071 : if (!n)
211 : return NULL;
212 71071 : n->prev = NULL;
213 :
214 71071 : if (o->ts == tr->tid) {
215 2737 : n->oldts = 0;
216 2737 : n->ts = 1;
217 2737 : n->deleted = true;
218 : } else {
219 68334 : n->oldts = o->ts;
220 68334 : n->ts = tr->tid;
221 68334 : n->deleted = deleted;
222 : }
223 71071 : if (start == o->start) {
224 : /* 2-way split: o remains latter part of segment, new one is
225 : * inserted before */
226 57740 : n->start = o->start;
227 57740 : n->end = n->start + cnt;
228 57740 : ATOMIC_PTR_INIT(&n->next, o);
229 57740 : if (segs->h == o)
230 430 : segs->h = n;
231 57740 : if (p)
232 57310 : ATOMIC_PTR_SET(&p->next, n);
233 57740 : o->start = n->end;
234 13331 : } else if (start+cnt == o->end) {
235 : /* 2-way split: o remains first part of segment, new one is
236 : * added after */
237 5384 : n->start = o->end - cnt;
238 5384 : n->end = o->end;
239 5384 : ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
240 5384 : ATOMIC_PTR_SET(&o->next, n);
241 5384 : if (segs->t == o)
242 825 : segs->t = n;
243 5384 : o->end = n->start;
244 : } else {
245 : /* 3-way split: o remains first part of segment, two new ones
246 : * are added after */
247 7947 : segment *n2 = GDKmalloc(sizeof(segment));
248 7947 : if (n2 == NULL) {
249 0 : GDKfree(n);
250 0 : return NULL;
251 : }
252 7947 : ATOMIC_PTR_INIT(&n->next, n2);
253 7947 : n->start = start;
254 7947 : n->end = start + cnt;
255 7947 : *n2 = *o;
256 7947 : ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
257 7947 : n2->start = n->end;
258 7947 : n2->prev = NULL;
259 7947 : if (segs->t == o)
260 2810 : segs->t = n2;
261 7947 : ATOMIC_PTR_SET(&o->next, n);
262 7947 : o->end = start;
263 : }
264 : return n;
265 : }
266 :
267 : static void
268 4142 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
269 : {
270 4142 : segment *cur = segs->h, *seg = NULL;
271 17755 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
272 13613 : if (cur->ts == tr->tid) { /* revert */
273 4633 : cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
274 4633 : cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
275 4633 : cur->oldts = 0;
276 : }
277 13613 : if (cur->ts <= oldest) { /* possibly merge range */
278 13177 : if (!seg) { /* skip first */
279 : seg = cur;
280 9035 : } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
281 : /* merge with previous */
282 4810 : seg->end = cur->end;
283 4810 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
284 4810 : if (cur == segs->t)
285 2839 : segs->t = seg;
286 4810 : mark4destroy(cur, change, store_get_timestamp(tr->store));
287 4810 : cur = seg;
288 : } else {
289 : seg = cur; /* begin of new merge */
290 : }
291 : }
292 : }
293 4142 : }
294 :
295 : static size_t
296 100777 : segs_end_include_deleted( segments *segs, sql_trans *tr)
297 : {
298 100777 : size_t cnt = 0;
299 100777 : segment *s = segs->h, *l = NULL;
300 :
301 485958 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
302 385181 : if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
303 : l = s;
304 : }
305 100777 : if (l)
306 100770 : cnt = l->end;
307 100777 : return cnt;
308 : }
309 :
310 : static int
311 100777 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
312 : {
313 : /* set bits correctly */
314 100777 : BAT *b = temp_descriptor(cs->bid);
315 :
316 100777 : if (!b)
317 : return LOG_ERR;
318 100777 : segment *s = segs->h;
319 :
320 100777 : size_t nr = segs_end_include_deleted(segs, tr);
321 100777 : size_t rounded_nr = ((nr+31)&~31);
322 100777 : if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
323 0 : bat_destroy(b);
324 0 : return LOG_ERR;
325 : }
326 :
327 : /* disable all properties here */
328 100777 : MT_lock_set(&b->theaplock);
329 100777 : b->tsorted = false;
330 100777 : b->trevsorted = false;
331 100777 : b->tnosorted = 0;
332 100777 : b->tnorevsorted = 0;
333 100777 : b->tseqbase = oid_nil;
334 100777 : b->tkey = false;
335 100777 : b->tnokey[0] = 0;
336 100777 : b->tnokey[1] = 0;
337 100777 : b->theap->dirty = true;
338 100777 : BUN cnt = BATcount(b);
339 100777 : MT_lock_unset(&b->theaplock);
340 :
341 100777 : uint32_t *restrict dst;
342 : /* why hashlock ?? */
343 100777 : MT_rwlock_wrlock(&b->thashlock);
344 519837 : for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
345 361660 : if (s->start >= nr)
346 : break;
347 318283 : if (s->ts == tr->tid && s->end != s->start) {
348 138404 : if (cnt < s->start) { /* first mark as deleted ! */
349 3716 : size_t lnr = s->start-cnt;
350 3716 : size_t pos = cnt;
351 3716 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
352 3716 : uint32_t cur = 0;
353 :
354 3716 : size_t used = pos&31, end = 32;
355 3716 : if (used) {
356 3594 : if (lnr < (32-used))
357 3389 : end = used + lnr;
358 3594 : assert(end > used);
359 3594 : cur |= ((1U << (end - used)) - 1) << used;
360 3594 : lnr -= end - used;
361 3594 : *dst++ |= cur;
362 3594 : cur = 0;
363 : }
364 3716 : size_t full = lnr/32;
365 3716 : size_t rest = lnr%32;
366 3716 : if (full > 0) {
367 8 : memset(dst, ~0, full * sizeof(*dst));
368 8 : dst += full;
369 8 : lnr -= full * 32;
370 : }
371 3716 : if (rest > 0) {
372 200 : cur |= (1U << rest) - 1;
373 200 : lnr -= rest;
374 200 : *dst |= cur;
375 : }
376 3716 : assert(lnr==0);
377 : }
378 138404 : size_t lnr = s->end-s->start;
379 138404 : size_t pos = s->start;
380 138404 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
381 138404 : uint32_t cur = 0;
382 138404 : size_t used = pos&31, end = 32;
383 138404 : if (used) {
384 100444 : if (lnr < (32-used))
385 94985 : end = used + lnr;
386 100444 : assert(end > used);
387 100444 : cur |= ((1U << (end - used)) - 1) << used;
388 100444 : lnr -= end - used;
389 100444 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
390 100444 : dst++;
391 100444 : cur = 0;
392 : }
393 138404 : size_t full = lnr/32;
394 138404 : size_t rest = lnr%32;
395 138404 : if (full > 0) {
396 3710 : memset(dst, s->deleted?~0:0, full * sizeof(*dst));
397 3710 : dst += full;
398 3710 : lnr -= full * 32;
399 : }
400 138404 : if (rest > 0) {
401 40152 : cur |= (1U << rest) - 1;
402 40152 : lnr -= rest;
403 40152 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
404 : }
405 138404 : assert(lnr==0);
406 138404 : if (cnt < s->end)
407 318283 : cnt = s->end;
408 : }
409 : }
410 100777 : MT_rwlock_wrunlock(&b->thashlock);
411 100777 : if (nr > BATcount(b)) {
412 60055 : MT_lock_set(&b->theaplock);
413 60055 : BATsetcount(b, nr);
414 60055 : MT_lock_unset(&b->theaplock);
415 : }
416 :
417 100777 : bat_destroy(b);
418 100777 : return LOG_OK;
419 : }
420 :
421 : /* TODO return LOG_OK/ERR */
422 : static void
423 100803 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
424 : {
425 100803 : sqlstore* store = tr->store;
426 100803 : segment *cur = s->segs->h, *seg = NULL;
427 486056 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
428 385253 : if (cur->ts == tr->tid) {
429 152327 : if (!cur->deleted)
430 89366 : cur->oldts = 0;
431 152327 : cur->ts = commit_ts;
432 : }
433 385253 : if (!seg) {
434 : /* first segment */
435 : seg = cur;
436 : }
437 284450 : else if (seg->ts < TRANSACTION_ID_BASE) {
438 : /* possible merge since both deleted flags are equal */
439 257964 : if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
440 194715 : int merge = 1;
441 194715 : node *n = store->active->h;
442 628735 : for (int i = 0; i < store->active->cnt; i++, n = n->next) {
443 536753 : sql_trans* other = ((sql_trans*)n->data);
444 536753 : ulng active = other->ts;
445 536753 : if(other->active == 2)
446 19129 : continue; /* pretend that another recently committed transaction is no longer active */
447 517624 : if (active == tr->ts)
448 124622 : continue; /* pretend that committing transaction has already committed and is no longer active */
449 393002 : if (seg->ts < active && cur->ts < active)
450 : break;
451 380645 : if (seg->ts > active && cur->ts > active)
452 290269 : continue;
453 :
454 90376 : assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
455 : /* cannot safely merge since there is an active transaction between the segments */
456 : merge = false;
457 : break;
458 : }
459 : /* merge segments */
460 208678 : if (merge) {
461 104339 : seg->end = cur->end;
462 104339 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
463 104339 : if (cur == s->segs->t)
464 25933 : s->segs->t = seg;
465 104339 : if (commit_ts == oldest) {
466 88085 : ATOMIC_PTR_DESTROY(&cur->next);
467 88085 : _DELETE(cur);
468 : } else
469 32508 : mark4destroy(cur, change, commit_ts);
470 104339 : cur = seg;
471 104339 : continue;
472 : }
473 : }
474 : }
475 : seg = cur;
476 : }
477 100803 : }
478 :
479 : static int
480 2203384 : segments_in_transaction(sql_trans *tr, sql_table *t)
481 : {
482 2203384 : storage *s = ATOMIC_PTR_GET(&t->data);
483 2203384 : segment *seg = s->segs->h;
484 :
485 2203384 : if (seg && s->segs->t->ts == tr->tid)
486 : return 1;
487 616316 : for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
488 513801 : if (seg->ts == tr->tid)
489 : return 1;
490 : }
491 : return 0;
492 : }
493 :
494 : static size_t
495 13284766 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
496 : {
497 13284766 : size_t cnt = 0;
498 :
499 13284766 : lock_table(tr->store, table->base.id);
500 13286725 : segment *s = segs->h, *l = NULL;
501 :
502 13286725 : if (segs->t && SEG_IS_VALID(segs->t, tr))
503 11374285 : l = s = segs->t;
504 :
505 201051840 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
506 187765128 : if (SEG_IS_VALID(s, tr))
507 : l = s;
508 : }
509 13286712 : if (l)
510 13273387 : cnt = l->end;
511 13286712 : unlock_table(tr->store, table->base.id);
512 13286682 : return cnt;
513 : }
514 :
515 : static segments *
516 50918 : new_segments(sql_trans *tr, size_t cnt)
517 : {
518 50918 : segments *n = (segments*)GDKmalloc(sizeof(segments));
519 :
520 50918 : if (n) {
521 50918 : n->h = n->t = new_segment(NULL, tr, cnt);
522 50918 : if (!n->h) {
523 0 : GDKfree(n);
524 0 : return NULL;
525 : }
526 50918 : sql_ref_init(&n->r);
527 : }
528 : return n;
529 : }
530 :
531 : static sql_delta *
532 22971827 : timestamp_delta( sql_trans *tr, sql_delta *d)
533 : {
534 23003683 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
535 31856 : d = d->next;
536 22971747 : return d;
537 : }
538 :
539 : static sql_delta *
540 22817531 : col_timestamp_delta( sql_trans *tr, sql_column *c)
541 : {
542 22817531 : return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
543 : }
544 :
545 : static sql_delta *
546 21767 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
547 : {
548 21767 : return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
549 : }
550 :
551 : static storage *
552 14276265 : timestamp_storage( sql_trans *tr, storage *d)
553 : {
554 14276265 : if (!d)
555 : return NULL;
556 14324392 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
557 48127 : d = d->next;
558 : return d;
559 : }
560 :
561 : static storage *
562 14251023 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
563 : {
564 14251023 : return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
565 : }
566 :
567 : static sql_delta*
568 19331 : delta_dup(sql_delta *d)
569 : {
570 19331 : ATOMIC_INC(&d->cs.refcnt);
571 19331 : return d;
572 : }
573 :
574 : static void *
575 17937 : col_dup(sql_column *c)
576 : {
577 17937 : return delta_dup(ATOMIC_PTR_GET(&c->data));
578 : }
579 :
580 : static void *
581 2681 : idx_dup(sql_idx *i)
582 : {
583 2681 : if (!ATOMIC_PTR_GET(&i->data))
584 : return NULL;
585 1394 : return delta_dup(ATOMIC_PTR_GET(&i->data));
586 : }
587 :
588 : static storage*
589 1527 : storage_dup(storage *d)
590 : {
591 1527 : ATOMIC_INC(&d->cs.refcnt);
592 1527 : return d;
593 : }
594 :
595 : static void *
596 1527 : del_dup(sql_table *t)
597 : {
598 1527 : return storage_dup(ATOMIC_PTR_GET(&t->data));
599 : }
600 :
601 : static size_t
602 17 : count_inserts( segment *s, sql_trans *tr)
603 : {
604 17 : size_t cnt = 0;
605 :
606 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
607 55 : if (!s->deleted && s->ts == tr->tid)
608 4 : cnt += s->end - s->start;
609 : }
610 17 : return cnt;
611 : }
612 :
613 : static size_t
614 293457 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
615 : {
616 293457 : size_t cnt = 0;
617 :
618 315953 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
619 : ;
620 :
621 773622 : for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
622 480165 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
623 40813 : cnt += s->end - s->start;
624 : }
625 293457 : return cnt;
626 : }
627 :
628 : static size_t
629 17 : count_deletes( segment *s, sql_trans *tr)
630 : {
631 17 : size_t cnt = 0;
632 :
633 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
634 55 : if (SEG_IS_DELETED(s, tr))
635 17 : cnt += s->end - s->start;
636 : }
637 17 : return cnt;
638 : }
639 :
640 : #define CNT_ACTIVE 10
641 :
642 : static size_t
643 12794200 : count_col(sql_trans *tr, sql_column *c, int access)
644 : {
645 12794200 : storage *d;
646 12794200 : sql_delta *ds;
647 :
648 12794200 : if (!isTable(c->t))
649 : return 0;
650 12794200 : d = tab_timestamp_storage(tr, c->t);
651 12792869 : ds = col_timestamp_delta(tr, c);
652 12792798 : if (!d ||!ds)
653 : return 0;
654 12792798 : if (access == 2)
655 448308 : return ds?ds->cs.ucnt:0;
656 12344490 : if (access == 1)
657 17 : return count_inserts(d->segs->h, tr);
658 12344473 : if (access == QUICK)
659 524474 : return d->segs->t?d->segs->t->end:0;
660 11819999 : if (access == CNT_ACTIVE) {
661 293451 : size_t cnt = segs_end(d->segs, tr, c->t);
662 293454 : lock_table(tr->store, c->t->base.id);
663 293457 : cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
664 293456 : unlock_table(tr->store, c->t->base.id);
665 293456 : return cnt;
666 : }
667 11526548 : return segs_end(d->segs, tr, c->t);
668 : }
669 :
670 : static size_t
671 18958 : count_idx(sql_trans *tr, sql_idx *i, int access)
672 : {
673 18958 : storage *d;
674 18958 : sql_delta *ds;
675 :
676 18958 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
677 4751 : return 0;
678 14207 : d = tab_timestamp_storage(tr, i->t);
679 14203 : ds = idx_timestamp_delta(tr, i);
680 14203 : if (!d || !ds)
681 : return 0;
682 14203 : if (access == 2)
683 2838 : return ds?ds->cs.ucnt:0;
684 11365 : if (access == 1)
685 0 : return count_inserts(d->segs->h, tr);
686 11365 : if (access == QUICK)
687 3528 : return d->segs->t?d->segs->t->end:0;
688 7837 : return segs_end(d->segs, tr, i->t);
689 : }
690 :
691 : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
692 : static BAT *
693 12891274 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
694 : {
695 12891274 : BAT *b;
696 :
697 12891274 : assert(access == RD_UPD_ID || access == RD_UPD_VAL);
698 : /* returns the updates for cs */
699 12891274 : if (cs->uibid && cs->uvbid && cs->ucnt) {
700 4190 : if (access == RD_UPD_ID) {
701 2793 : if (!(b = temp_descriptor(cs->uibid)))
702 : return NULL;
703 2793 : if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
704 111 : (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
705 2682 : oid nil = oid_nil;
706 : /* less then cnt */
707 2682 : BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false);
708 2682 : if (!s) {
709 0 : bat_destroy(b);
710 0 : return NULL;
711 : }
712 :
713 2682 : BAT *nb = BATproject(s, b);
714 2682 : bat_destroy(s);
715 2682 : bat_destroy(b);
716 2682 : b = nb;
717 : }
718 : } else {
719 1397 : b = temp_descriptor(cs->uvbid);
720 : }
721 : } else {
722 21478529 : b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
723 : }
724 : return b;
725 : }
726 :
727 : static BAT *
728 0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
729 : {
730 0 : int err = 0;
731 0 : BAT *uv = *UV;
732 0 : BUN cnt = BATcount(ui)+BATcount(oi);
733 0 : BATiter uvi;
734 0 : BATiter ovi;
735 :
736 0 : if (uv) {
737 0 : uvi = bat_iterator(uv);
738 0 : ovi = bat_iterator(ov);
739 : }
740 :
741 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
742 0 : BUN uip = 0, uie = BATcount(ui);
743 0 : BUN oip = 0, oie = BATcount(oi);
744 :
745 0 : oid uiseqb = ui->tseqbase;
746 0 : oid oiseqb = oi->tseqbase;
747 0 : oid *uipt = NULL, *oipt = NULL;
748 0 : BATiter uii = bat_iterator(ui);
749 0 : BATiter oii = bat_iterator(oi);
750 0 : if (!BATtdensebi(&uii))
751 0 : uipt = uii.base;
752 0 : if (!BATtdensebi(&oii))
753 0 : oipt = oii.base;
754 :
755 0 : if (uiseqb == oiseqb && uie == oie) { /* full overlap, no values */
756 0 : if (uv) {
757 0 : bat_iterator_end(&uvi);
758 0 : bat_iterator_end(&ovi);
759 : }
760 0 : bat_iterator_end(&uii);
761 0 : bat_iterator_end(&oii);
762 0 : if (uv) {
763 0 : *UV = uv;
764 : } else {
765 0 : bat_destroy(uv);
766 : }
767 0 : bat_destroy(oi);
768 0 : bat_destroy(ov);
769 0 : return ui;
770 : }
771 0 : BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
772 0 : BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
773 :
774 0 : if (!ni || (uv && !nv)) {
775 0 : bat_destroy(ni);
776 0 : bat_destroy(nv);
777 0 : bat_destroy(ui);
778 0 : bat_destroy(uv);
779 0 : bat_destroy(oi);
780 0 : bat_destroy(ov);
781 0 : return NULL;
782 : }
783 0 : while (uip < uie && oip < oie && !err) {
784 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
785 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
786 :
787 0 : if (uiid <= oiid) {
788 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
789 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
790 : err = 1;
791 0 : uip++;
792 0 : if (uiid == oiid)
793 0 : oip++;
794 : } else { /* uiid > oiid */
795 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
796 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
797 : err = 1;
798 0 : oip++;
799 : }
800 : }
801 0 : while (uip < uie && !err) {
802 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
803 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
804 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
805 : err = 1;
806 0 : uip++;
807 : }
808 0 : while (oip < oie && !err) {
809 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
810 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
811 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
812 : err = 1;
813 0 : oip++;
814 : }
815 0 : if (uv) {
816 0 : bat_iterator_end(&uvi);
817 0 : bat_iterator_end(&ovi);
818 : }
819 0 : bat_iterator_end(&uii);
820 0 : bat_iterator_end(&oii);
821 0 : bat_destroy(ui);
822 0 : bat_destroy(uv);
823 0 : bat_destroy(oi);
824 0 : bat_destroy(ov);
825 0 : if (!err) {
826 0 : if (nv)
827 0 : *UV = nv;
828 0 : return ni;
829 : }
830 0 : *UV = NULL;
831 0 : bat_destroy(ni);
832 0 : bat_destroy(nv);
833 0 : return NULL;
834 : }
835 :
836 : static sql_delta *
837 8594055 : older_delta( sql_delta *d, sql_trans *tr)
838 : {
839 8594055 : sql_delta *o = d->next;
840 :
841 8596976 : while (o && !o->cs.merged) {
842 2917 : if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
843 : break;
844 : else
845 2921 : o = o->next;
846 : }
847 8594059 : if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
848 0 : return o;
849 : return NULL;
850 : }
851 :
852 : static BAT *
853 8594254 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
854 : {
855 8594254 : assert(tr->active);
856 8594254 : sql_delta *o = NULL;
857 8594254 : BAT *ui = NULL, *uv = NULL;
858 :
859 8594254 : if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
860 : return NULL;
861 8593991 : if (access == RD_UPD_VAL) {
862 4297246 : if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
863 0 : bat_destroy(ui);
864 0 : return NULL;
865 : }
866 : }
867 8594063 : while ((o = older_delta(d, tr)) != NULL) {
868 0 : BAT *oui = NULL, *ouv = NULL;
869 0 : if (!oui)
870 0 : oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
871 0 : if (access == RD_UPD_VAL)
872 0 : ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
873 0 : if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
874 0 : bat_destroy(ui);
875 0 : bat_destroy(uv);
876 0 : bat_destroy(oui);
877 0 : bat_destroy(ouv);
878 0 : return NULL;
879 : }
880 0 : if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
881 : return NULL;
882 : d = o;
883 : }
884 8594062 : if (uv) {
885 4297318 : bat_destroy(ui);
886 4297318 : return uv;
887 : }
888 : return ui;
889 : }
890 :
891 : static BAT *
892 3 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
893 : {
894 3 : lock_column(tr->store, c->base.id);
895 3 : sql_delta *d = col_timestamp_delta(tr, c);
896 3 : int type = c->type.type->localtype;
897 :
898 3 : if (!d) {
899 0 : unlock_column(tr->store, c->base.id);
900 0 : return NULL;
901 : }
902 3 : if (d->cs.st == ST_DICT) {
903 0 : BAT *b = quick_descriptor(d->cs.bid);
904 :
905 0 : type = b->ttype;
906 : }
907 3 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
908 3 : unlock_column(tr->store, c->base.id);
909 3 : return bn;
910 : }
911 :
912 : static BAT *
913 0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
914 : {
915 0 : lock_column(tr->store, i->base.id);
916 0 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
917 0 : sql_delta *d = idx_timestamp_delta(tr, i);
918 :
919 0 : if (!d) {
920 0 : unlock_column(tr->store, i->base.id);
921 0 : return NULL;
922 : }
923 0 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
924 0 : unlock_column(tr->store, i->base.id);
925 0 : return bn;
926 : }
927 :
928 : static BAT *
929 5729888 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
930 : {
931 5729888 : BAT *b;
932 :
933 5729888 : assert(access == RDONLY || access == QUICK || access == RD_EXT);
934 5729888 : assert(cs != NULL);
935 5729888 : if (access == QUICK)
936 134176 : return quick_descriptor(cs->bid);
937 5595712 : if (access == RD_EXT)
938 341 : return temp_descriptor(cs->ebid);
939 5595371 : assert(cs->bid);
940 5595371 : b = temp_descriptor(cs->bid);
941 5595162 : if (b == NULL)
942 : return NULL;
943 5595162 : assert(b->batRestricted == BAT_READ);
944 : /* return slice */
945 5595162 : BAT *s = BATslice(b, 0, cnt);
946 5595341 : bat_destroy(b);
947 5595341 : return s;
948 : }
949 :
950 : static int
951 4297116 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
952 : {
953 4297116 : lock_column(tr->store, c->base.id);
954 4297006 : size_t cnt = count_col(tr, c, 0);
955 4297134 : sql_delta *d = col_timestamp_delta(tr, c);
956 4297094 : int type = c->type.type->localtype;
957 :
958 4297094 : if (!d) {
959 0 : unlock_column(tr->store, c->base.id);
960 0 : return LOG_ERR;
961 : }
962 4297094 : if (d->cs.st == ST_DICT) {
963 2 : BAT *b = quick_descriptor(d->cs.bid);
964 :
965 2 : type = b->ttype;
966 : }
967 :
968 4297094 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
969 4297273 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
970 :
971 4297262 : unlock_column(tr->store, c->base.id);
972 :
973 4297241 : if (*ui == NULL || *uv == NULL) {
974 0 : bat_destroy(*ui);
975 0 : bat_destroy(*uv);
976 0 : return LOG_ERR;
977 : }
978 : return LOG_OK;
979 : }
980 :
981 : static int
982 57 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
983 : {
984 57 : lock_column(tr->store, i->base.id);
985 57 : size_t cnt = count_idx(tr, i, 0);
986 57 : sql_delta *d = idx_timestamp_delta(tr, i);
987 57 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
988 :
989 57 : if (!d) {
990 0 : unlock_column(tr->store, i->base.id);
991 0 : return LOG_ERR;
992 : }
993 :
994 57 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
995 57 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
996 :
997 57 : unlock_column(tr->store, i->base.id);
998 :
999 57 : if (*ui == NULL || *uv == NULL) {
1000 0 : bat_destroy(*ui);
1001 0 : bat_destroy(*uv);
1002 0 : return LOG_ERR;
1003 : }
1004 : return LOG_OK;
1005 : }
1006 :
1007 : static void * /* BAT * */
1008 5722550 : bind_col(sql_trans *tr, sql_column *c, int access)
1009 : {
1010 5722550 : assert(access == QUICK || tr->active);
1011 5722550 : if (!isTable(c->t))
1012 : return NULL;
1013 5722550 : sql_delta *d = col_timestamp_delta(tr, c);
1014 5722422 : if (!d)
1015 : return NULL;
1016 5722422 : size_t cnt = count_col(tr, c, 0);
1017 5722226 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
1018 3 : return bind_ucol(tr, c, access, cnt);
1019 5722223 : BAT *b = cs_bind_bat( &d->cs, access, cnt);
1020 5722100 : assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
1021 : return b;
1022 : }
1023 :
1024 : static void * /* BAT * */
1025 7508 : bind_idx(sql_trans *tr, sql_idx * i, int access)
1026 : {
1027 7508 : assert(access == QUICK || tr->active);
1028 7508 : if (!isTable(i->t))
1029 : return NULL;
1030 7508 : sql_delta *d = idx_timestamp_delta(tr, i);
1031 7510 : if (!d)
1032 : return NULL;
1033 7510 : size_t cnt = count_idx(tr, i, 0);
1034 7508 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
1035 0 : return bind_uidx(tr, i, access, cnt);
1036 7508 : return cs_bind_bat( &d->cs, access, cnt);
1037 : }
1038 :
1039 : static int
1040 446 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
1041 : {
1042 446 : if (!cs->uibid) {
1043 0 : cs->uibid = e_bat(TYPE_oid);
1044 0 : if (cs->uibid == BID_NIL)
1045 : return LOG_ERR;
1046 : }
1047 446 : if (!cs->uvbid) {
1048 0 : BAT *cur = quick_descriptor(cs->bid);
1049 0 : if (!cur)
1050 : return LOG_ERR;
1051 0 : int type = cur->ttype;
1052 0 : cs->uvbid = e_bat(type);
1053 0 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1054 : return LOG_ERR;
1055 : }
1056 446 : BAT *ui = temp_descriptor(cs->uibid);
1057 446 : BAT *uv = temp_descriptor(cs->uvbid);
1058 :
1059 446 : if (ui == NULL || uv == NULL) {
1060 0 : bat_destroy(ui);
1061 0 : bat_destroy(uv);
1062 0 : return LOG_ERR;
1063 : }
1064 446 : assert(ui && uv);
1065 446 : if (isEbat(ui)){
1066 394 : temp_destroy(cs->uibid);
1067 394 : cs->uibid = temp_copy(ui->batCacheid, true, true);
1068 394 : bat_destroy(ui);
1069 394 : if (cs->uibid == BID_NIL ||
1070 394 : (ui = temp_descriptor(cs->uibid)) == NULL) {
1071 0 : bat_destroy(uv);
1072 0 : return LOG_ERR;
1073 : }
1074 : }
1075 446 : if (isEbat(uv)){
1076 394 : temp_destroy(cs->uvbid);
1077 394 : cs->uvbid = temp_copy(uv->batCacheid, true, true);
1078 394 : bat_destroy(uv);
1079 394 : if (cs->uvbid == BID_NIL ||
1080 394 : (uv = temp_descriptor(cs->uvbid)) == NULL) {
1081 0 : bat_destroy(ui);
1082 0 : return LOG_ERR;
1083 : }
1084 : }
1085 446 : *Ui = ui;
1086 446 : *Uv = uv;
1087 446 : return LOG_OK;
1088 : }
1089 :
1090 : static int
1091 3018 : segments_is_append(segment *s, sql_trans *tr, oid rid)
1092 : {
1093 5845 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1094 5845 : if (s->start <= rid && s->end > rid) {
1095 3018 : if (s->ts == tr->tid && !s->deleted) {
1096 2574 : return 1;
1097 : }
1098 : break;
1099 : }
1100 : }
1101 : return 0;
1102 : }
1103 :
1104 : static int
1105 444 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
1106 : {
1107 647 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1108 647 : if (s->start <= rid && s->end > rid) {
1109 444 : if (s->ts >= tr->ts && s->deleted) {
1110 0 : return 1;
1111 : }
1112 : break;
1113 : }
1114 : }
1115 : return 0;
1116 : }
1117 :
1118 : static sql_delta *
1119 0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
1120 : {
1121 0 : sql_delta *n = ZNEW(sql_delta);
1122 0 : if (!n)
1123 : return NULL;
1124 0 : *n = *bat;
1125 0 : n->next = NULL;
1126 0 : n->cs.ts = tr->tid;
1127 0 : return n;
1128 : }
1129 :
1130 : static BAT *
1131 17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
1132 : {
1133 17 : BAT *newoffsets = NULL;
1134 17 : sql_delta *bat = *batp;
1135 17 : column_storage *cs = &bat->cs;
1136 17 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1137 :
1138 17 : if (!u)
1139 : return NULL;
1140 17 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1141 17 : if (DICTprepare4append(&newoffsets, i, u) < 0) {
1142 0 : assert(0);
1143 : } else {
1144 17 : int new = 0;
1145 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1146 17 : if (BATcount(u) >= max_cnt) {
1147 1 : if (max_cnt == 64*1024) { /* decompress */
1148 0 : if (!(b = temp_descriptor(cs->bid))) {
1149 0 : bat_destroy(u);
1150 0 : return NULL;
1151 : }
1152 0 : if (cs->ucnt) {
1153 0 : BAT *ui = NULL, *uv = NULL;
1154 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1155 0 : bat_destroy(b);
1156 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1157 0 : bat_destroy(nb);
1158 0 : bat_destroy(u);
1159 0 : return NULL;
1160 : }
1161 0 : b = nb;
1162 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1163 0 : bat_destroy(ui);
1164 0 : bat_destroy(uv);
1165 0 : bat_destroy(b);
1166 0 : bat_destroy(u);
1167 : }
1168 0 : bat_destroy(ui);
1169 0 : bat_destroy(uv);
1170 : }
1171 0 : n = DICTdecompress_(b, u, PERSISTENT);
1172 0 : bat_destroy(b);
1173 0 : assert(newoffsets == NULL);
1174 0 : if (!n) {
1175 0 : bat_destroy(u);
1176 0 : return NULL;
1177 : }
1178 0 : if (cs->ts != tr->tid) {
1179 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1180 0 : bat_destroy(n);
1181 0 : return NULL;
1182 : }
1183 0 : cs = &(*batp)->cs;
1184 0 : new = 1;
1185 : }
1186 0 : if (cs->bid && !new)
1187 0 : temp_destroy(cs->bid);
1188 0 : n = transfer_to_systrans(n);
1189 0 : if (n == NULL)
1190 : return NULL;
1191 0 : bat_set_access(n, BAT_READ);
1192 0 : cs->bid = temp_create(n);
1193 0 : bat_destroy(n);
1194 0 : if (cs->ebid && !new)
1195 0 : temp_destroy(cs->ebid);
1196 0 : cs->ebid = 0;
1197 0 : cs->ucnt = 0;
1198 0 : if (cs->uibid && !new)
1199 0 : temp_destroy(cs->uibid);
1200 0 : if (cs->uvbid && !new)
1201 0 : temp_destroy(cs->uvbid);
1202 0 : cs->uibid = cs->uvbid = 0;
1203 0 : cs->st = ST_DEFAULT;
1204 0 : cs->cleared = true;
1205 : } else {
1206 1 : if (!(b = temp_descriptor(cs->bid))) {
1207 0 : bat_destroy(newoffsets);
1208 0 : bat_destroy(u);
1209 0 : return NULL;
1210 : }
1211 1 : n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), PERSISTENT);
1212 1 : bat_destroy(b);
1213 1 : if (!n) {
1214 0 : bat_destroy(newoffsets);
1215 0 : bat_destroy(u);
1216 0 : return NULL;
1217 : }
1218 1 : if (cs->ts != tr->tid) {
1219 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1220 0 : bat_destroy(n);
1221 0 : return NULL;
1222 : }
1223 0 : cs = &(*batp)->cs;
1224 0 : new = 1;
1225 0 : temp_dup(cs->ebid);
1226 0 : if (cs->uibid) {
1227 0 : temp_dup(cs->uibid);
1228 0 : temp_dup(cs->uvbid);
1229 : }
1230 : }
1231 1 : if (cs->bid && !new)
1232 1 : temp_destroy(cs->bid);
1233 1 : n = transfer_to_systrans(n);
1234 1 : if (n == NULL)
1235 : return NULL;
1236 1 : bat_set_access(n, BAT_READ);
1237 1 : cs->bid = temp_create(n);
1238 1 : bat_destroy(n);
1239 1 : cs->cleared = true;
1240 1 : i = newoffsets;
1241 : }
1242 : } else { /* append */
1243 16 : i = newoffsets;
1244 : }
1245 : }
1246 17 : bat_destroy(u);
1247 17 : return i;
1248 : }
1249 :
1250 : static BAT *
1251 0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
1252 : {
1253 0 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1254 0 : BAT *newoffsets = NULL;
1255 0 : BAT *b = NULL, *n = NULL;
1256 :
1257 0 : if (!(b = temp_descriptor(cs->bid)))
1258 : return NULL;
1259 :
1260 0 : if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
1261 0 : assert(0);
1262 : } else {
1263 : /* returns new offset bat if values within min/max, else decompress */
1264 0 : if (!newoffsets) { /* decompress */
1265 0 : if (cs->ucnt) {
1266 0 : BAT *ui = NULL, *uv = NULL;
1267 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1268 0 : bat_destroy(b);
1269 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1270 0 : bat_destroy(nb);
1271 0 : return NULL;
1272 : }
1273 0 : b = nb;
1274 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1275 0 : bat_destroy(ui);
1276 0 : bat_destroy(uv);
1277 0 : bat_destroy(b);
1278 : }
1279 0 : bat_destroy(ui);
1280 0 : bat_destroy(uv);
1281 : }
1282 0 : n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
1283 0 : bat_destroy(b);
1284 0 : if (!n)
1285 : return NULL;
1286 0 : if (cs->bid)
1287 0 : temp_destroy(cs->bid);
1288 0 : n = transfer_to_systrans(n);
1289 0 : if (n == NULL)
1290 : return NULL;
1291 0 : bat_set_access(n, BAT_READ);
1292 0 : cs->bid = temp_create(n);
1293 0 : cs->ucnt = 0;
1294 0 : if (cs->uibid)
1295 0 : temp_destroy(cs->uibid);
1296 0 : if (cs->uvbid)
1297 0 : temp_destroy(cs->uvbid);
1298 0 : cs->uibid = cs->uvbid = 0;
1299 0 : cs->st = ST_DEFAULT;
1300 0 : cs->cleared = true;
1301 0 : b = n;
1302 : } else { /* append */
1303 : i = newoffsets;
1304 : }
1305 : }
1306 0 : bat_destroy(b);
1307 0 : return i;
1308 : }
1309 :
1310 : /*
1311 : * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
1312 : */
1313 : static int
1314 2829 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
1315 : {
1316 2829 : int res = LOG_OK;
1317 2829 : sql_delta *bat = *batp;
1318 2829 : column_storage *cs = &bat->cs;
1319 2829 : BAT *otids = tids, *oupdates = updates;
1320 :
1321 2829 : if (!BATcount(tids))
1322 : return LOG_OK;
1323 :
1324 2829 : if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
1325 6 : tids = BATunmask(tids);
1326 6 : if (!tids)
1327 : return LOG_ERR;
1328 : }
1329 2829 : if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
1330 0 : updates = BATunmask(updates);
1331 0 : if (!updates) {
1332 0 : if (otids != tids)
1333 0 : bat_destroy(tids);
1334 0 : return LOG_ERR;
1335 : }
1336 2829 : } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
1337 43 : updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
1338 43 : if (!updates) {
1339 0 : if (otids != tids)
1340 0 : bat_destroy(tids);
1341 0 : return LOG_ERR;
1342 : }
1343 : }
1344 :
1345 2829 : if (cs->st == ST_DICT) {
1346 : /* possibly a new array is returned */
1347 4 : BAT *nupdates = dict_append_bat(tr, batp, updates);
1348 4 : bat = *batp;
1349 4 : cs = &bat->cs;
1350 4 : if (oupdates != updates)
1351 0 : bat_destroy(updates);
1352 4 : updates = nupdates;
1353 4 : if (!updates) {
1354 0 : if (otids != tids)
1355 0 : bat_destroy(tids);
1356 0 : return LOG_ERR;
1357 : }
1358 : }
1359 :
1360 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1361 : /* currently only one update delta is possible */
1362 2829 : lock_table(tr->store, t->base.id);
1363 2829 : storage *s = ATOMIC_PTR_GET(&t->data);
1364 2829 : if (!is_new && !cs->cleared) {
1365 2511 : if (!tids->tsorted /* make sure we have simple dense or oids */) {
1366 6 : BAT *sorted, *order;
1367 6 : if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
1368 0 : if (otids != tids)
1369 0 : bat_destroy(tids);
1370 0 : if (oupdates != updates)
1371 0 : bat_destroy(updates);
1372 0 : unlock_table(tr->store, t->base.id);
1373 0 : return LOG_ERR;
1374 : }
1375 6 : if (otids != tids)
1376 0 : bat_destroy(tids);
1377 6 : tids = sorted;
1378 6 : BAT *nupdates = BATproject(order, updates);
1379 6 : bat_destroy(order);
1380 6 : if (oupdates != updates)
1381 0 : bat_destroy(updates);
1382 6 : updates = nupdates;
1383 6 : if (!updates) {
1384 0 : bat_destroy(tids);
1385 0 : unlock_table(tr->store, t->base.id);
1386 0 : return LOG_ERR;
1387 : }
1388 : }
1389 2511 : assert(tids->tsorted);
1390 2511 : BAT *ui = NULL, *uv = NULL;
1391 :
1392 : /* handle updates on just inserted bits */
1393 : /* handle updates on updates (within one transaction) */
1394 2511 : BATiter upi = bat_iterator(updates);
1395 2511 : BUN cnt = 0, ucnt = BATcount(tids);
1396 2511 : BAT *b, *ins = NULL;
1397 2511 : int *msk = NULL;
1398 :
1399 2511 : if((b = temp_descriptor(cs->bid)) == NULL)
1400 : res = LOG_ERR;
1401 :
1402 2511 : if (res == LOG_OK && BATtdense(tids)) {
1403 2468 : oid start = tids->tseqbase, offset = start;
1404 2468 : oid end = start + ucnt;
1405 :
1406 7477 : for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
1407 5617 : if (seg->start <= start && seg->end > start) {
1408 : /* check for delete conflicts */
1409 2468 : if (seg->ts >= tr->ts && seg->deleted) {
1410 0 : res = LOG_CONFLICT;
1411 0 : continue;
1412 : }
1413 :
1414 : /* check for inplace updates */
1415 2468 : BUN lend = end < seg->end?end:seg->end;
1416 2468 : if (seg->ts == tr->tid && !seg->deleted) {
1417 109 : if (!ins) {
1418 109 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1419 109 : if (!ins)
1420 : res = LOG_ERR;
1421 : else {
1422 109 : BATsetcount(ins, ucnt); /* all full updates */
1423 109 : msk = (int*)Tloc(ins, 0);
1424 109 : BUN end = (ucnt+31)/32;
1425 109 : memset(msk, 0, end * sizeof(int));
1426 : }
1427 : }
1428 364 : for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
1429 255 : const void *upd = BUNtail(upi, rid-offset);
1430 255 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1431 0 : res = LOG_ERR;
1432 :
1433 255 : oid word = i/32;
1434 255 : int pos = i%32;
1435 255 : msk[word] |= 1U<<pos;
1436 255 : cnt++;
1437 : }
1438 : }
1439 : }
1440 5617 : if (end < seg->end)
1441 : break;
1442 : }
1443 46 : } else if (res == LOG_OK && complex_cand(tids)) {
1444 3 : struct canditer ci;
1445 3 : segment *seg = s->segs->h;
1446 3 : canditer_init(&ci, NULL, tids);
1447 3 : BUN i = 0;
1448 1036 : while ( seg && res == LOG_OK && i < ucnt) {
1449 1033 : oid rid = canditer_next(&ci);
1450 1033 : if (seg->end <= rid)
1451 13 : seg = ATOMIC_PTR_GET(&seg->next);
1452 1020 : else if (seg->start <= rid && seg->end > rid) {
1453 : /* check for delete conflicts */
1454 1020 : if (seg->ts >= tr->ts && seg->deleted) {
1455 0 : res = LOG_CONFLICT;
1456 0 : continue;
1457 : }
1458 :
1459 : /* check for inplace updates */
1460 1020 : if (seg->ts == tr->tid && !seg->deleted) {
1461 0 : if (!ins) {
1462 0 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1463 0 : if (!ins) {
1464 : res = LOG_ERR;
1465 : break;
1466 : } else {
1467 0 : BATsetcount(ins, ucnt); /* all full updates */
1468 0 : msk = (int*)Tloc(ins, 0);
1469 0 : BUN end = (ucnt+31)/32;
1470 0 : memset(msk, 0, end * sizeof(int));
1471 : }
1472 : }
1473 0 : ptr upd = BUNtail(upi, i);
1474 0 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1475 0 : res = LOG_ERR;
1476 :
1477 0 : oid word = i/32;
1478 0 : int pos = i%32;
1479 0 : msk[word] |= 1U<<pos;
1480 0 : cnt++;
1481 : }
1482 1020 : i++;
1483 : }
1484 : }
1485 40 : } else if (res == LOG_OK) {
1486 40 : BUN i = 0;
1487 40 : oid *rid = Tloc(tids,0);
1488 40 : segment *seg = s->segs->h;
1489 974 : while ( seg && res == LOG_OK && i < ucnt) {
1490 934 : if (seg->end <= rid[i])
1491 686 : seg = ATOMIC_PTR_GET(&seg->next);
1492 248 : else if (seg->start <= rid[i] && seg->end > rid[i]) {
1493 : /* check for delete conflicts */
1494 248 : if (seg->ts >= tr->ts && seg->deleted) {
1495 0 : res = LOG_CONFLICT;
1496 0 : continue;
1497 : }
1498 :
1499 : /* check for inplace updates */
1500 248 : if (seg->ts == tr->tid && !seg->deleted) {
1501 195 : if (!ins) {
1502 23 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1503 23 : if (!ins) {
1504 : res = LOG_ERR;
1505 : break;
1506 : } else {
1507 23 : BATsetcount(ins, ucnt); /* all full updates */
1508 23 : msk = (int*)Tloc(ins, 0);
1509 23 : BUN end = (ucnt+31)/32;
1510 23 : memset(msk, 0, end * sizeof(int));
1511 : }
1512 : }
1513 195 : const void *upd = BUNtail(upi, i);
1514 195 : if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
1515 0 : res = LOG_ERR;
1516 :
1517 195 : oid word = i/32;
1518 195 : int pos = i%32;
1519 195 : msk[word] |= 1U<<pos;
1520 195 : cnt++;
1521 : }
1522 248 : i++;
1523 : }
1524 : }
1525 : }
1526 :
1527 2511 : if (res == LOG_OK && cnt < ucnt) { /* now handle real updates */
1528 2379 : if (cs->ucnt == 0) {
1529 2377 : if (cnt) {
1530 0 : BAT *nins = BATmaskedcands(0, ucnt, ins, false);
1531 0 : if (nins) {
1532 0 : ui = BATproject(nins, tids);
1533 0 : uv = BATproject(nins, updates);
1534 0 : bat_destroy(nins);
1535 : }
1536 : } else {
1537 2377 : ui = temp_descriptor(tids->batCacheid);
1538 2377 : uv = temp_descriptor(updates->batCacheid);
1539 : }
1540 2377 : if (!ui || !uv) {
1541 : res = LOG_ERR;
1542 : } else {
1543 2377 : temp_destroy(cs->uibid);
1544 2377 : temp_destroy(cs->uvbid);
1545 2377 : ui = transfer_to_systrans(ui);
1546 2377 : uv = transfer_to_systrans(uv);
1547 2377 : if (ui == NULL || uv == NULL) {
1548 0 : BBPreclaim(ui);
1549 0 : BBPreclaim(uv);
1550 : res = LOG_ERR;
1551 : } else {
1552 2377 : cs->uibid = temp_create(ui);
1553 2377 : cs->uvbid = temp_create(uv);
1554 2377 : cs->ucnt = BATcount(ui);
1555 : }
1556 : }
1557 : } else {
1558 2 : BAT *nui = NULL, *nuv = NULL;
1559 :
1560 : /* merge taking msk of inserted into account */
1561 2 : if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
1562 : res = LOG_ERR;
1563 :
1564 2 : if (res == LOG_OK) {
1565 2 : const void *upd = NULL;
1566 2 : nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
1567 2 : nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
1568 :
1569 2 : if (!nui || !nuv) {
1570 : res = LOG_ERR;
1571 : } else {
1572 2 : BATiter ovi = bat_iterator(uv);
1573 :
1574 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
1575 2 : BUN uip = 0, uie = BATcount(ui);
1576 2 : BUN nip = 0, nie = BATcount(tids);
1577 2 : oid uiseqb = ui->tseqbase;
1578 2 : oid niseqb = tids->tseqbase;
1579 2 : oid *uipt = NULL, *nipt = NULL;
1580 2 : BATiter uii = bat_iterator(ui);
1581 2 : BATiter tidsi = bat_iterator(tids);
1582 2 : if (!BATtdensebi(&uii))
1583 1 : uipt = uii.base;
1584 2 : if (!BATtdensebi(&tidsi))
1585 1 : nipt = tidsi.base;
1586 20 : while (uip < uie && nip < nie && res == LOG_OK) {
1587 18 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1588 18 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1589 :
1590 18 : if (uiv < niv) {
1591 0 : upd = BUNtail(ovi, uip);
1592 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1593 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1594 : res = LOG_ERR;
1595 0 : uip++;
1596 18 : } else if (uiv == niv) {
1597 : /* handle == */
1598 18 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1599 18 : upd = BUNtail(upi, nip);
1600 36 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1601 18 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1602 : res = LOG_ERR;
1603 : } else {
1604 0 : upd = BUNtail(ovi, uip);
1605 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1606 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1607 : res = LOG_ERR;
1608 : }
1609 18 : uip++;
1610 18 : nip++;
1611 : } else { /* uiv > niv */
1612 0 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1613 0 : upd = BUNtail(upi, nip);
1614 0 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1615 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1616 : res = LOG_ERR;
1617 : }
1618 0 : nip++;
1619 : }
1620 : }
1621 2 : while (uip < uie && res == LOG_OK) {
1622 0 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1623 0 : upd = BUNtail(ovi, uip);
1624 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1625 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1626 : res = LOG_ERR;
1627 0 : uip++;
1628 : }
1629 2 : while (nip < nie && res == LOG_OK) {
1630 0 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1631 0 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1632 0 : upd = BUNtail(upi, nip);
1633 0 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1634 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1635 : res = LOG_ERR;
1636 : }
1637 0 : nip++;
1638 : }
1639 2 : bat_iterator_end(&uii);
1640 2 : bat_iterator_end(&tidsi);
1641 2 : bat_iterator_end(&ovi);
1642 2 : if (res == LOG_OK) {
1643 2 : temp_destroy(cs->uibid);
1644 2 : temp_destroy(cs->uvbid);
1645 2 : nui = transfer_to_systrans(nui);
1646 2 : nuv = transfer_to_systrans(nuv);
1647 2 : if (nui == NULL || nuv == NULL) {
1648 : res = LOG_ERR;
1649 : } else {
1650 2 : cs->uibid = temp_create(nui);
1651 2 : cs->uvbid = temp_create(nuv);
1652 2 : cs->ucnt = BATcount(nui);
1653 : }
1654 : }
1655 : }
1656 2 : bat_destroy(nui);
1657 2 : bat_destroy(nuv);
1658 : }
1659 : }
1660 : }
1661 2511 : bat_iterator_end(&upi);
1662 2511 : bat_destroy(b);
1663 2511 : unlock_table(tr->store, t->base.id);
1664 2511 : bat_destroy(ins);
1665 2511 : bat_destroy(ui);
1666 2511 : bat_destroy(uv);
1667 2511 : if (otids != tids)
1668 10 : bat_destroy(tids);
1669 2511 : if (oupdates != updates)
1670 12 : bat_destroy(updates);
1671 2511 : return res;
1672 : } else if (is_new || cs->cleared) {
1673 318 : BAT *b = temp_descriptor(cs->bid);
1674 :
1675 318 : if (b == NULL) {
1676 : res = LOG_ERR;
1677 : } else {
1678 318 : if (BATcount(b)==0) {
1679 1 : if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
1680 0 : res = LOG_ERR;
1681 317 : } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
1682 0 : res = LOG_ERR;
1683 318 : BBPcold(b->batCacheid);
1684 318 : bat_destroy(b);
1685 : }
1686 : }
1687 318 : unlock_table(tr->store, t->base.id);
1688 318 : if (otids != tids)
1689 2 : bat_destroy(tids);
1690 318 : if (oupdates != updates)
1691 41 : bat_destroy(updates);
1692 : return res;
1693 : }
1694 :
1695 : static int
1696 2829 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
1697 : {
1698 2829 : return cs_update_bat(tr, bat, t, tids, updates, is_new);
1699 : }
1700 :
1701 : static void *
1702 4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
1703 : {
1704 4 : void *newoffsets = NULL;
1705 4 : sql_delta *bat = *batp;
1706 4 : column_storage *cs = &bat->cs;
1707 4 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1708 :
1709 4 : if (!u)
1710 : return NULL;
1711 4 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1712 4 : if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
1713 0 : assert(0);
1714 : } else {
1715 4 : int new = 0;
1716 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1717 4 : if (BATcount(u) >= max_cnt) {
1718 0 : if (max_cnt == 64*1024) { /* decompress */
1719 0 : if (!(b = temp_descriptor(cs->bid))) {
1720 0 : bat_destroy(u);
1721 0 : return NULL;
1722 : }
1723 0 : n = DICTdecompress_(b, u, PERSISTENT);
1724 : /* TODO decompress updates if any */
1725 0 : bat_destroy(b);
1726 0 : assert(newoffsets == NULL);
1727 0 : if (!n) {
1728 0 : bat_destroy(u);
1729 0 : return NULL;
1730 : }
1731 0 : if (cs->ts != tr->tid) {
1732 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1733 0 : bat_destroy(n);
1734 0 : bat_destroy(u);
1735 0 : return NULL;
1736 : }
1737 0 : cs = &(*batp)->cs;
1738 0 : new = 1;
1739 0 : cs->uibid = cs->uvbid = 0;
1740 : }
1741 0 : if (cs->bid && !new)
1742 0 : temp_destroy(cs->bid);
1743 0 : n = transfer_to_systrans(n);
1744 0 : if (n == NULL) {
1745 0 : bat_destroy(u);
1746 0 : return NULL;
1747 : }
1748 0 : bat_set_access(n, BAT_READ);
1749 0 : cs->bid = temp_create(n);
1750 0 : bat_destroy(n);
1751 0 : if (cs->ebid && !new)
1752 0 : temp_destroy(cs->ebid);
1753 0 : cs->ebid = 0;
1754 0 : cs->st = ST_DEFAULT;
1755 : /* at append_col the column's storage type is cleared */
1756 0 : cs->cleared = true;
1757 : } else {
1758 0 : if (!(b = temp_descriptor(cs->bid))) {
1759 0 : GDKfree(newoffsets);
1760 0 : bat_destroy(u);
1761 0 : return NULL;
1762 : }
1763 0 : n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, PERSISTENT);
1764 0 : bat_destroy(b);
1765 0 : if (!n) {
1766 0 : GDKfree(newoffsets);
1767 0 : bat_destroy(u);
1768 0 : return NULL;
1769 : }
1770 0 : if (cs->ts != tr->tid) {
1771 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1772 0 : bat_destroy(u);
1773 0 : bat_destroy(n);
1774 0 : return NULL;
1775 : }
1776 0 : cs = &(*batp)->cs;
1777 0 : new = 1;
1778 0 : temp_dup(cs->ebid);
1779 0 : if (cs->uibid) {
1780 0 : temp_dup(cs->uibid);
1781 0 : temp_dup(cs->uvbid);
1782 : }
1783 : }
1784 0 : if (cs->bid)
1785 0 : temp_destroy(cs->bid);
1786 0 : n = transfer_to_systrans(n);
1787 0 : if (n == NULL) {
1788 0 : bat_destroy(u);
1789 0 : return NULL;
1790 : }
1791 0 : bat_set_access(n, BAT_READ);
1792 0 : cs->bid = temp_create(n);
1793 0 : bat_destroy(n);
1794 0 : cs->cleared = true;
1795 0 : i = newoffsets;
1796 : }
1797 : } else { /* append */
1798 4 : i = newoffsets;
1799 : }
1800 : }
1801 4 : bat_destroy(u);
1802 4 : return i;
1803 : }
1804 :
1805 : static void *
1806 1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
1807 : {
1808 1 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1809 1 : void *newoffsets = NULL;
1810 1 : BAT *b = NULL, *n = NULL;
1811 :
1812 1 : if (!(b = temp_descriptor(cs->bid)))
1813 : return NULL;
1814 :
1815 1 : if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
1816 0 : assert(0);
1817 : } else {
1818 : /* returns new offset bat if values within min/max, else decompress */
1819 1 : if (!newoffsets) {
1820 1 : n = FORdecompress_(b, offsetval, tt, PERSISTENT);
1821 1 : bat_destroy(b);
1822 1 : if (!n)
1823 : return NULL;
1824 : /* TODO decompress updates if any */
1825 1 : if (cs->bid)
1826 1 : temp_destroy(cs->bid);
1827 1 : n = transfer_to_systrans(n);
1828 1 : if (n == NULL)
1829 : return NULL;
1830 1 : bat_set_access(n, BAT_READ);
1831 1 : cs->bid = temp_create(n);
1832 1 : cs->st = ST_DEFAULT;
1833 : /* at append_col the column's storage type is cleared */
1834 1 : cs->cleared = true;
1835 1 : b = n;
1836 : } else { /* append */
1837 : i = newoffsets;
1838 : }
1839 : }
1840 1 : bat_destroy(b);
1841 1 : return i;
1842 : }
1843 :
1844 : static int
1845 3018 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
1846 : {
1847 3018 : void *oupd = upd;
1848 3018 : sql_delta *bat = *batp;
1849 3018 : column_storage *cs = &bat->cs;
1850 3018 : storage *s = ATOMIC_PTR_GET(&t->data);
1851 3018 : assert(!is_oid_nil(rid));
1852 3018 : int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
1853 :
1854 3018 : if (cs->st == ST_DICT) {
1855 : /* possibly a new array is returned */
1856 0 : upd = dict_append_val(tr, batp, upd, 1);
1857 0 : bat = *batp;
1858 0 : cs = &bat->cs;
1859 0 : if (!upd)
1860 : return LOG_ERR;
1861 : }
1862 :
1863 : /* check if rid is insert ? */
1864 3018 : if (!inplace) {
1865 : /* check conflict */
1866 444 : if (segments_is_deleted(s->segs->h, tr, rid)) {
1867 0 : if (oupd != upd)
1868 0 : GDKfree(upd);
1869 0 : return LOG_CONFLICT;
1870 : }
1871 444 : BAT *ui, *uv;
1872 :
1873 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1874 : /* currently only one update delta is possible */
1875 444 : if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1876 0 : if (oupd != upd)
1877 0 : GDKfree(upd);
1878 0 : return LOG_ERR;
1879 : }
1880 :
1881 444 : assert(uv->ttype);
1882 444 : assert(BATcount(ui) == BATcount(uv));
1883 888 : if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
1884 444 : BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
1885 0 : if (oupd != upd)
1886 0 : GDKfree(upd);
1887 0 : bat_destroy(ui);
1888 0 : bat_destroy(uv);
1889 0 : return LOG_ERR;
1890 : }
1891 444 : assert(BATcount(ui) == BATcount(uv));
1892 444 : bat_destroy(ui);
1893 444 : bat_destroy(uv);
1894 444 : cs->ucnt++;
1895 : } else {
1896 2574 : BAT *b = NULL;
1897 :
1898 2574 : if((b = temp_descriptor(cs->bid)) == NULL) {
1899 0 : if (oupd != upd)
1900 0 : GDKfree(upd);
1901 0 : return LOG_ERR;
1902 : }
1903 2574 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
1904 0 : if (oupd != upd)
1905 0 : GDKfree(upd);
1906 0 : bat_destroy(b);
1907 0 : return LOG_ERR;
1908 : }
1909 2574 : bat_destroy(b);
1910 : }
1911 3018 : if (oupd != upd)
1912 0 : GDKfree(upd);
1913 : return LOG_OK;
1914 : }
1915 :
1916 : static int
1917 3018 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
1918 : {
1919 3018 : int res = LOG_OK;
1920 3018 : lock_table(tr->store, t->base.id);
1921 3018 : res = cs_update_val(tr, bat, t, rid, upd, is_new);
1922 3018 : unlock_table(tr->store, t->base.id);
1923 3018 : return res;
1924 : }
1925 :
1926 : static int
1927 158811 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
1928 : {
1929 158811 : (void)tr;
1930 158811 : if (!ocs)
1931 : return LOG_OK;
1932 158811 : cs->bid = ocs->bid;
1933 158811 : cs->ebid = ocs->ebid;
1934 158811 : cs->uibid = ocs->uibid;
1935 158811 : cs->uvbid = ocs->uvbid;
1936 158811 : cs->ucnt = ocs->ucnt;
1937 :
1938 158811 : if (temp) {
1939 25973 : cs->bid = temp_copy(cs->bid, true, false);
1940 25973 : if (cs->bid == BID_NIL)
1941 : return LOG_ERR;
1942 : } else {
1943 132838 : temp_dup(cs->bid);
1944 : }
1945 158817 : if (cs->ebid)
1946 6 : temp_dup(cs->ebid);
1947 158817 : cs->ucnt = 0;
1948 158817 : cs->uibid = e_bat(TYPE_oid);
1949 158823 : cs->uvbid = e_bat(type);
1950 158825 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1951 : return LOG_ERR;
1952 158825 : cs->st = ocs->st;
1953 158825 : return LOG_OK;
1954 : }
1955 :
1956 : static void
1957 184026 : destroy_delta(sql_delta *b, bool recursive)
1958 : {
1959 184026 : if (ATOMIC_DEC(&b->cs.refcnt) > 0)
1960 : return;
1961 164695 : if (recursive && b->next)
1962 1424 : destroy_delta(b->next, true);
1963 164695 : if (b->cs.uibid)
1964 93695 : temp_destroy(b->cs.uibid);
1965 164695 : if (b->cs.uvbid)
1966 93695 : temp_destroy(b->cs.uvbid);
1967 164695 : if (b->cs.bid)
1968 164695 : temp_destroy(b->cs.bid);
1969 164695 : if (b->cs.ebid)
1970 61 : temp_destroy(b->cs.ebid);
1971 164695 : ATOMIC_DESTROY(&b->cs.refcnt);
1972 164695 : b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
1973 164695 : _DELETE(b);
1974 : }
1975 :
1976 : static sql_delta *
1977 16144303 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
1978 : {
1979 16144303 : sql_delta *obat = ATOMIC_PTR_GET(&c->data);
1980 :
1981 16144303 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
1982 16011470 : return obat;
1983 132833 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
1984 : /* abort */
1985 12 : if (update_conflict)
1986 4 : *update_conflict = true;
1987 8 : else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
1988 8 : return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
1989 4 : return NULL;
1990 : }
1991 132821 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
1992 : return NULL;
1993 132823 : sql_delta* bat = ZNEW(sql_delta);
1994 132814 : if (!bat)
1995 : return NULL;
1996 132814 : ATOMIC_INIT(&bat->cs.refcnt, 1);
1997 132814 : if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
1998 0 : destroy_delta(bat, false);
1999 0 : return NULL;
2000 : }
2001 132824 : bat->cs.ts = tr->tid;
2002 : /* only one writer else abort */
2003 132824 : bat->next = obat;
2004 132824 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
2005 0 : bat->next = NULL;
2006 0 : destroy_delta(bat, false);
2007 0 : if (update_conflict)
2008 0 : *update_conflict = true;
2009 0 : return NULL;
2010 : }
2011 : return bat;
2012 : }
2013 :
2014 : static int
2015 5847 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
2016 : {
2017 5847 : int ok = LOG_OK;
2018 :
2019 5847 : if (is_bat) {
2020 2829 : BAT *tids = incoming_tids;
2021 2829 : BAT *values = incoming_values;
2022 2829 : if (BATcount(tids) == 0)
2023 : return LOG_OK;
2024 2829 : ok = delta_update_bat(tr, delta, table, tids, values, is_new);
2025 : } else {
2026 3018 : ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
2027 : }
2028 : return ok;
2029 : }
2030 :
2031 : static int
2032 6071 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, int tpe)
2033 : {
2034 6071 : int res = LOG_OK;
2035 6071 : bool update_conflict = false;
2036 6071 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2037 :
2038 6071 : if (tpe == TYPE_bat) {
2039 3050 : BAT *t = tids;
2040 3050 : if (!BATcount(t))
2041 : return LOG_OK;
2042 : }
2043 :
2044 5576 : if (c == NULL)
2045 : return LOG_ERR;
2046 :
2047 5576 : if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
2048 4 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2049 :
2050 5572 : assert(delta && delta->cs.ts == tr->tid);
2051 5572 : assert(c->t->persistence != SQL_DECLARED_TABLE);
2052 5572 : if (odelta != delta)
2053 3321 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
2054 :
2055 5572 : odelta = delta;
2056 5572 : if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, tpe == TYPE_bat)) != LOG_OK)
2057 : return res;
2058 5572 : assert(delta == odelta);
2059 5572 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2060 0 : res = sql_trans_alter_storage(tr, c, NULL);
2061 : return res;
2062 : }
2063 :
2064 : static sql_delta *
2065 2435 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
2066 : {
2067 2435 : sql_delta *obat = ATOMIC_PTR_GET(&i->data);
2068 :
2069 2435 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
2070 2407 : return obat;
2071 28 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
2072 : /* abort */
2073 0 : if (update_conflict)
2074 0 : *update_conflict = true;
2075 0 : return NULL;
2076 : }
2077 28 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
2078 : return NULL;
2079 28 : sql_delta* bat = ZNEW(sql_delta);
2080 28 : if (!bat)
2081 : return NULL;
2082 28 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2083 33 : if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
2084 0 : destroy_delta(bat, false);
2085 0 : return NULL;
2086 : }
2087 28 : bat->cs.ts = tr->tid;
2088 : /* only one writer else abort */
2089 28 : bat->next = obat;
2090 28 : if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
2091 0 : bat->next = NULL;
2092 0 : destroy_delta(bat, false);
2093 0 : if (update_conflict)
2094 0 : *update_conflict = true;
2095 0 : return NULL;
2096 : }
2097 : return bat;
2098 : }
2099 :
2100 : static int
2101 777 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, int tpe)
2102 : {
2103 777 : int res = LOG_OK;
2104 777 : bool update_conflict = false;
2105 777 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
2106 :
2107 777 : if (tpe == TYPE_bat) {
2108 777 : BAT *t = tids;
2109 777 : if (!BATcount(t))
2110 : return LOG_OK;
2111 : }
2112 :
2113 275 : if (i == NULL)
2114 : return LOG_ERR;
2115 :
2116 275 : if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
2117 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2118 :
2119 275 : assert(delta && delta->cs.ts == tr->tid);
2120 275 : if (odelta != delta)
2121 22 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
2122 :
2123 275 : odelta = delta;
2124 275 : res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, tpe == TYPE_bat);
2125 275 : assert(delta == odelta);
2126 : return res;
2127 : }
2128 :
2129 : static int
2130 149817 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
2131 : {
2132 149817 : BAT *b, *oi = i;
2133 149817 : int err = 0;
2134 149817 : sql_delta *bat = *batp;
2135 :
2136 149817 : assert(!offsets || BATcount(offsets) == BATcount(i));
2137 149817 : if (!BATcount(i))
2138 : return LOG_OK;
2139 149817 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
2140 : return LOG_ERR;
2141 :
2142 149817 : lock_column(tr->store, id);
2143 149808 : if (bat->cs.st == ST_DICT) {
2144 13 : BAT *ni = dict_append_bat(tr, batp, oi);
2145 13 : bat = *batp;
2146 13 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2147 0 : bat_destroy(oi);
2148 13 : oi = ni;
2149 13 : if (!oi) {
2150 0 : unlock_column(tr->store, id);
2151 0 : return LOG_ERR;
2152 : }
2153 : }
2154 149808 : if (bat->cs.st == ST_FOR) {
2155 0 : BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
2156 0 : bat = *batp;
2157 0 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2158 0 : bat_destroy(oi);
2159 0 : oi = ni;
2160 0 : if (!oi) {
2161 0 : unlock_column(tr->store, id);
2162 0 : return LOG_ERR;
2163 : }
2164 : }
2165 :
2166 149808 : b = temp_descriptor(bat->cs.bid);
2167 149812 : if (b == NULL) {
2168 0 : unlock_column(tr->store, id);
2169 0 : if (oi != i)
2170 0 : bat_destroy(oi);
2171 0 : return LOG_ERR;
2172 : }
2173 149812 : if (!offsets && offset == b->hseqbase+BATcount(b)) {
2174 149624 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2175 0 : err = 1;
2176 176 : } else if (!offsets) {
2177 176 : if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
2178 0 : err = 1;
2179 12 : } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
2180 0 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2181 0 : err = 1;
2182 12 : } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
2183 0 : err = 1;
2184 : }
2185 149811 : bat_destroy(b);
2186 149815 : unlock_column(tr->store, id);
2187 :
2188 149797 : if (oi != i)
2189 13 : bat_destroy(oi);
2190 149797 : return (err)?LOG_ERR:LOG_OK;
2191 : }
2192 :
2193 : // Look at the offsets and find where the replacements end and the appends begin.
2194 : static BUN
2195 0 : start_of_appends(BAT *offsets, BUN bcnt)
2196 : {
2197 0 : BUN ocnt = BATcount(offsets);
2198 0 : if (ocnt == 0)
2199 : return 0;
2200 :
2201 0 : BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
2202 0 : if (highest < bcnt)
2203 : // all are replacements
2204 : return ocnt;
2205 :
2206 : // reason backward to find the first append.
2207 : // Suppose offsets has 15 entries, bcnt == 100
2208 : // and the highest offset in offsets is 109.
2209 0 : BUN new_bcnt = highest + 1; // 110
2210 0 : BUN nappends = new_bcnt - bcnt; // 10
2211 0 : BUN nreplacements = ocnt - nappends; // 5
2212 :
2213 : // The first append should be to position bcnt
2214 0 : assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
2215 :
2216 : return nreplacements;
2217 : }
2218 :
2219 :
2220 : static int
2221 15861578 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
2222 : {
2223 15861578 : void *oi = i;
2224 15861578 : BAT *b;
2225 15861578 : lock_column(tr->store, id);
2226 15861305 : sql_delta *bat = *batp;
2227 :
2228 15861305 : if (bat->cs.st == ST_DICT) {
2229 : /* possibly a new array is returned */
2230 4 : i = dict_append_val(tr, batp, i, cnt);
2231 4 : bat = *batp;
2232 4 : if (!i) {
2233 0 : unlock_column(tr->store, id);
2234 0 : return LOG_ERR;
2235 : }
2236 : }
2237 15861305 : if (bat->cs.st == ST_FOR) {
2238 : /* possibly a new array is returned */
2239 1 : i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
2240 1 : bat = *batp;
2241 1 : if (!i) {
2242 0 : unlock_column(tr->store, id);
2243 0 : return LOG_ERR;
2244 : }
2245 : }
2246 :
2247 15861305 : b = temp_descriptor(bat->cs.bid);
2248 15861141 : if (b == NULL) {
2249 0 : if (i != oi)
2250 0 : GDKfree(i);
2251 0 : unlock_column(tr->store, id);
2252 0 : return LOG_ERR;
2253 : }
2254 15861141 : BUN bcnt = BATcount(b);
2255 :
2256 15861141 : if (offsets) {
2257 : // The first few might be replacements while later items might be appends.
2258 : // Handle the replacements here while leaving the appends to the code below.
2259 0 : BUN nreplacements = start_of_appends(offsets, bcnt);
2260 :
2261 0 : oid *start = Tloc(offsets, 0);
2262 0 : if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
2263 0 : bat_destroy(b);
2264 0 : if (i != oi)
2265 0 : GDKfree(i);
2266 0 : unlock_column(tr->store, id);
2267 0 : return LOG_ERR;
2268 : }
2269 :
2270 : // Replacements have been handled. The rest are appends.
2271 0 : assert(offset == oid_nil);
2272 0 : offset = bcnt;
2273 0 : cnt -= nreplacements;
2274 : }
2275 :
2276 15861141 : if (bcnt > offset){
2277 509555 : size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
2278 509555 : if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
2279 0 : bat_destroy(b);
2280 0 : if (i != oi)
2281 0 : GDKfree(i);
2282 0 : unlock_column(tr->store, id);
2283 0 : return LOG_ERR;
2284 : }
2285 509673 : cnt -= ccnt;
2286 509673 : offset += ccnt;
2287 : }
2288 15861259 : if (cnt) {
2289 15351584 : if (BATcount(b) < offset) { /* add space */
2290 10798 : BUN d = offset - BATcount(b);
2291 10798 : if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
2292 0 : bat_destroy(b);
2293 0 : if (i != oi)
2294 0 : GDKfree(i);
2295 0 : unlock_column(tr->store, id);
2296 0 : return LOG_ERR;
2297 : }
2298 : }
2299 15351584 : if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
2300 0 : bat_destroy(b);
2301 0 : if (i != oi)
2302 0 : GDKfree(i);
2303 0 : unlock_column(tr->store, id);
2304 0 : return LOG_ERR;
2305 : }
2306 : }
2307 15861366 : bat_destroy(b);
2308 15861218 : if (i != oi)
2309 4 : GDKfree(i);
2310 15861218 : unlock_column(tr->store, id);
2311 15861218 : return LOG_OK;
2312 : }
2313 :
2314 : static int
2315 25973 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
2316 : {
2317 25973 : if (!(bat->segs = new_segments(tr, 0)))
2318 : return LOG_ERR;
2319 25973 : return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
2320 : }
2321 :
2322 : static int
2323 16011424 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, int tt, char *storage_type)
2324 : {
2325 16011424 : int ok = LOG_OK;
2326 :
2327 16011424 : if ((*delta)->cs.merged)
2328 35924 : (*delta)->cs.merged = false; /* TODO needs to move */
2329 16011424 : if (tt == TYPE_bat) {
2330 149832 : BAT *bat = incoming_data;
2331 :
2332 149832 : if (BATcount(bat))
2333 149817 : ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
2334 : } else {
2335 15861592 : ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
2336 : }
2337 16011219 : return ok;
2338 : }
2339 :
2340 : static int
2341 16009966 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, int tpe)
2342 : {
2343 16009966 : int res = LOG_OK;
2344 16009966 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2345 :
2346 16009966 : if (tpe == TYPE_bat) {
2347 149617 : BAT *t = data;
2348 149617 : if (!BATcount(t))
2349 : return LOG_OK;
2350 : }
2351 :
2352 16009184 : if ((delta = bind_col_data(tr, c, NULL)) == NULL)
2353 : return LOG_ERR;
2354 :
2355 16009237 : assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
2356 :
2357 16009237 : odelta = delta;
2358 16009237 : if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, tpe, c->storage_type)) != LOG_OK)
2359 : return res;
2360 16009035 : if (odelta != delta) {
2361 0 : delta->next = odelta;
2362 0 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
2363 0 : delta->next = NULL;
2364 0 : destroy_delta(delta, false);
2365 0 : return LOG_CONFLICT;
2366 : }
2367 : }
2368 16009035 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2369 1 : res = sql_trans_alter_storage(tr, c, NULL);
2370 : return res;
2371 : }
2372 :
2373 : static int
2374 2166 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, int tpe)
2375 : {
2376 2166 : int res = LOG_OK;
2377 2166 : sql_delta *delta;
2378 :
2379 2166 : if (tpe == TYPE_bat) {
2380 998 : BAT *t = data;
2381 998 : if (!BATcount(t))
2382 : return LOG_OK;
2383 : }
2384 :
2385 2154 : if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
2386 : return LOG_ERR;
2387 :
2388 2154 : assert(delta->cs.st == ST_DEFAULT);
2389 :
2390 2154 : res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, tpe, NULL);
2391 2154 : return res;
2392 : }
2393 :
2394 : static int
2395 66363 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
2396 : {
2397 66363 : int err = 0;
2398 :
2399 : /* TODO check for conflicting updates */
2400 66363 : (void)rid;
2401 66363 : (void)cnt;
2402 508592 : for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
2403 442229 : sql_column *c = n->data;
2404 442229 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
2405 :
2406 : /* check for active updates */
2407 442229 : if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
2408 : return 1;
2409 : }
2410 : return 0;
2411 : }
2412 :
2413 : static int
2414 64042 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
2415 : {
2416 64042 : int in_transaction = segments_in_transaction(tr, t);
2417 :
2418 64042 : lock_table(tr->store, t->base.id);
2419 : /* find segment of rid, split, mark new segment deleted (for tr->tid) */
2420 64042 : segment *seg = s->segs->h, *p = NULL;
2421 51336090 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2422 51336090 : if (seg->start <= rid && seg->end > rid) {
2423 64042 : if (!SEG_VALID_4_DELETE(seg,tr)) {
2424 4 : unlock_table(tr->store, t->base.id);
2425 4 : return LOG_CONFLICT;
2426 : }
2427 64038 : if (deletes_conflict_updates( tr, t, rid, 1)) {
2428 0 : unlock_table(tr->store, t->base.id);
2429 0 : return LOG_CONFLICT;
2430 : }
2431 64038 : if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
2432 0 : unlock_table(tr->store, t->base.id);
2433 0 : return LOG_ERR;
2434 : }
2435 : break;
2436 : }
2437 : }
2438 64038 : unlock_table(tr->store, t->base.id);
2439 64038 : if (!in_transaction)
2440 12098 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2441 : return LOG_OK;
2442 : }
2443 :
2444 : static int
2445 2323 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
2446 : {
2447 2323 : segment *seg = *Seg, *p = NULL;
2448 7445 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2449 7443 : if (seg->start <= start && seg->end > start) {
2450 2363 : size_t lcnt = cnt;
2451 2363 : if (start+lcnt > seg->end)
2452 44 : lcnt = seg->end-start;
2453 2363 : if (SEG_IS_DELETED(seg, tr)) {
2454 37 : start += lcnt;
2455 37 : cnt -= lcnt;
2456 37 : continue;
2457 2326 : } else if (!SEG_VALID_4_DELETE(seg, tr))
2458 1 : return LOG_CONFLICT;
2459 2325 : if (deletes_conflict_updates( tr, t, start, lcnt))
2460 : return LOG_CONFLICT;
2461 2325 : *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
2462 2325 : if (!seg)
2463 : return LOG_ERR;
2464 2325 : start += lcnt;
2465 2325 : cnt -= lcnt;
2466 : }
2467 7405 : if (start+cnt <= seg->end)
2468 : break;
2469 : }
2470 : return LOG_OK;
2471 : }
2472 :
2473 : static int
2474 523 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
2475 : {
2476 523 : segment *seg = s->segs->h;
2477 523 : return seg_delete_range(tr, t, s, &seg, start, cnt);
2478 : }
2479 :
2480 : static int
2481 291 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
2482 : {
2483 291 : int in_transaction = segments_in_transaction(tr, t);
2484 291 : BAT *oi = i; /* update ids */
2485 291 : int ok = LOG_OK;
2486 :
2487 291 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
2488 : return LOG_ERR;
2489 291 : if (BATcount(i)) {
2490 514 : if (BATtdense(i)) {
2491 223 : size_t start = i->tseqbase;
2492 223 : size_t cnt = BATcount(i);
2493 :
2494 223 : lock_table(tr->store, t->base.id);
2495 223 : ok = delete_range(tr, t, s, start, cnt);
2496 223 : unlock_table(tr->store, t->base.id);
2497 68 : } else if (complex_cand(i)) {
2498 0 : struct canditer ci;
2499 0 : oid f = 0, l = 0, cur = 0;
2500 :
2501 0 : canditer_init(&ci, NULL, i);
2502 0 : cur = f = canditer_next(&ci);
2503 :
2504 0 : lock_table(tr->store, t->base.id);
2505 0 : if (!is_oid_nil(f)) {
2506 0 : segment *seg = s->segs->h;
2507 0 : for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
2508 0 : if (cur+1 == l) {
2509 0 : cur++;
2510 0 : continue;
2511 : }
2512 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2513 0 : f = cur = l;
2514 : }
2515 0 : if (ok == LOG_OK)
2516 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2517 : }
2518 0 : unlock_table(tr->store, t->base.id);
2519 : } else {
2520 68 : if (!i->tsorted) {
2521 0 : assert(oi == i);
2522 0 : BAT *ni = NULL;
2523 0 : if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
2524 0 : ok = LOG_ERR;
2525 0 : if (ni)
2526 0 : i = ni;
2527 : }
2528 68 : assert(i->tsorted);
2529 68 : BUN icnt = BATcount(i);
2530 68 : BATiter ii = bat_iterator(i);
2531 68 : oid *o = ii.base, n = o[0]+1;
2532 68 : size_t lcnt = 1;
2533 :
2534 68 : lock_table(tr->store, t->base.id);
2535 68 : segment *seg = s->segs->h;
2536 17899 : for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
2537 17831 : if (o[i] == n) {
2538 17572 : lcnt++;
2539 17572 : n++;
2540 : } else {
2541 259 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2542 259 : lcnt = 0;
2543 : }
2544 17831 : if (!lcnt) {
2545 259 : n = o[i]+1;
2546 259 : lcnt = 1;
2547 : }
2548 : }
2549 68 : bat_iterator_end(&ii);
2550 68 : if (lcnt && ok == LOG_OK)
2551 68 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2552 68 : unlock_table(tr->store, t->base.id);
2553 : }
2554 : }
2555 291 : if (i != oi)
2556 25 : bat_destroy(i);
2557 : // assert
2558 291 : if (!in_transaction)
2559 259 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2560 : return ok;
2561 : }
2562 :
2563 : static void
2564 50879 : destroy_segments(segments *s)
2565 : {
2566 50879 : if (!s || sql_ref_dec(&s->r) > 0)
2567 0 : return;
2568 50879 : segment *seg = s->h;
2569 106873 : while(seg) {
2570 55994 : segment *n = ATOMIC_PTR_GET(&seg->next);
2571 55994 : ATOMIC_PTR_DESTROY(&seg->next);
2572 55994 : _DELETE(seg);
2573 55994 : seg = n;
2574 : }
2575 50879 : _DELETE(s);
2576 : }
2577 :
2578 : static void
2579 52265 : destroy_storage(storage *bat)
2580 : {
2581 52265 : if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
2582 : return;
2583 50738 : if (bat->next)
2584 13957 : destroy_storage(bat->next);
2585 50738 : destroy_segments(bat->segs);
2586 50738 : if (bat->cs.uibid)
2587 30486 : temp_destroy(bat->cs.uibid);
2588 50738 : if (bat->cs.uvbid)
2589 30486 : temp_destroy(bat->cs.uvbid);
2590 50738 : if (bat->cs.bid)
2591 50738 : temp_destroy(bat->cs.bid);
2592 50738 : ATOMIC_DESTROY(&bat->cs.refcnt);
2593 50738 : bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
2594 50738 : _DELETE(bat);
2595 : }
2596 :
2597 : static int
2598 154982 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
2599 : {
2600 154982 : if (uncommitted) {
2601 397982 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2602 256856 : if (!VALID_4_READ(s->ts,tr))
2603 : return 1;
2604 : } else {
2605 68731 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2606 56027 : if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
2607 : return 1;
2608 : }
2609 :
2610 : return 0;
2611 : }
2612 :
2613 : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
2614 :
2615 : storage *
2616 2216895 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
2617 : {
2618 2216895 : storage *obat;
2619 :
2620 2216895 : obat = ATOMIC_PTR_GET(&t->data);
2621 :
2622 2216895 : if (obat->cs.ts != tr->tid)
2623 1448401 : if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
2624 1448370 : if (obat->cs.ts >= TRANSACTION_ID_BASE) {
2625 : /* abort */
2626 15510 : if (clear)
2627 15510 : *clear = true;
2628 15510 : return NULL;
2629 : }
2630 :
2631 2201385 : if (!clear)
2632 : return obat;
2633 :
2634 : /* remainder is only to handle clear */
2635 26262 : if (segments_conflict(tr, obat->segs, 1)) {
2636 289 : *clear = true;
2637 289 : return NULL;
2638 : }
2639 25973 : if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
2640 : return NULL;
2641 25973 : storage *bat = ZNEW(storage);
2642 25973 : if (!bat)
2643 : return NULL;
2644 25973 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2645 25973 : if (dup_storage(tr, obat, bat) != LOG_OK) {
2646 0 : destroy_storage(bat);
2647 0 : return NULL;
2648 : }
2649 25973 : bat->cs.cleared = true;
2650 25973 : bat->cs.ts = tr->tid;
2651 : /* only one writer else abort */
2652 25973 : bat->next = obat;
2653 25973 : if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
2654 2 : bat->next = NULL;
2655 2 : destroy_storage(bat);
2656 2 : if (clear)
2657 2 : *clear = true;
2658 2 : return NULL;
2659 : }
2660 : return bat;
2661 : }
2662 :
2663 : static int
2664 64378 : delete_tab(sql_trans *tr, sql_table * t, void *ib, int tpe)
2665 : {
2666 64378 : int ok = LOG_OK;
2667 64378 : BAT *b = ib;
2668 64378 : storage *bat;
2669 :
2670 64378 : if (tpe == TYPE_bat && !BATcount(b))
2671 : return ok;
2672 :
2673 64333 : if (t == NULL)
2674 : return LOG_ERR;
2675 :
2676 64333 : if ((bat = bind_del_data(tr, t, NULL)) == NULL)
2677 : return LOG_ERR;
2678 :
2679 64333 : if (tpe == TYPE_bat)
2680 291 : ok = storage_delete_bat(tr, t, bat, ib);
2681 : else
2682 64042 : ok = storage_delete_val(tr, t, bat, *(oid*)ib);
2683 : return ok;
2684 : }
2685 :
2686 : static size_t
2687 0 : dcount_col(sql_trans *tr, sql_column *c)
2688 : {
2689 0 : sql_delta *b;
2690 :
2691 0 : if (!isTable(c->t))
2692 : return 0;
2693 0 : b = col_timestamp_delta(tr, c);
2694 0 : if (!b)
2695 : return 1;
2696 :
2697 0 : storage *s = ATOMIC_PTR_GET(&c->t->data);
2698 0 : if (!s || !s->segs->t)
2699 : return 1;
2700 0 : size_t cnt = s->segs->t->end;
2701 0 : if (cnt) {
2702 0 : BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
2703 0 : size_t dcnt = 0;
2704 :
2705 0 : if (v)
2706 0 : dcnt = BATguess_uniques(v, NULL);
2707 0 : return dcnt;
2708 : }
2709 : return cnt;
2710 : }
2711 :
2712 : static BAT *
2713 685445 : bind_no_view(BAT *b, bool quick)
2714 : {
2715 685445 : if (isVIEW(b)) { /* If it is a view get the parent BAT */
2716 685070 : BAT *nb = BBP_desc(VIEWtparent(b));
2717 685070 : bat_destroy(b);
2718 685069 : if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
2719 : return NULL;
2720 : }
2721 : return b;
2722 : }
2723 :
2724 : static int
2725 0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
2726 : {
2727 0 : int ok = 0;
2728 0 : assert(tr->active);
2729 0 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2730 0 : return 0;
2731 0 : lock_column(tr->store, c->base.id);
2732 0 : if (unique_est) {
2733 0 : sql_delta *d;
2734 0 : if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
2735 0 : BAT *b;
2736 0 : if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
2737 0 : MT_lock_set(&b->theaplock);
2738 0 : b->tunique_est = *unique_est;
2739 0 : MT_lock_unset(&b->theaplock);
2740 0 : bat_destroy(b);
2741 : }
2742 : }
2743 : }
2744 0 : if (min) {
2745 0 : _DELETE(c->min);
2746 0 : size_t minlen = ATOMlen(c->type.type->localtype, min);
2747 0 : if ((c->min = GDKmalloc(minlen)) != NULL) {
2748 0 : memcpy(c->min, min, minlen);
2749 0 : ok = 1;
2750 : }
2751 : }
2752 0 : if (max) {
2753 0 : _DELETE(c->max);
2754 0 : size_t maxlen = ATOMlen(c->type.type->localtype, max);
2755 0 : if ((c->max = GDKmalloc(maxlen)) != NULL) {
2756 0 : memcpy(c->max, max, maxlen);
2757 0 : ok = 1;
2758 : }
2759 : }
2760 0 : unlock_column(tr->store, c->base.id);
2761 0 : return ok;
2762 : }
2763 :
2764 : static int
2765 26 : min_max_col(sql_trans *tr, sql_column *c)
2766 : {
2767 26 : int ok = 0;
2768 26 : BAT *b = NULL;
2769 26 : sql_delta *d = NULL;
2770 :
2771 26 : assert(tr->active);
2772 26 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2773 0 : return 0;
2774 26 : if (c->min && c->max)
2775 : return 1;
2776 26 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2777 26 : if (d->cs.st == ST_FOR)
2778 : return 0;
2779 26 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2780 26 : lock_column(tr->store, c->base.id);
2781 26 : if (c->min && c->max) {
2782 0 : unlock_column(tr->store, c->base.id);
2783 0 : return 1;
2784 : }
2785 26 : _DELETE(c->min);
2786 26 : _DELETE(c->max);
2787 26 : if ((b = bind_col(tr, c, access))) {
2788 26 : if (!(b = bind_no_view(b, false))) {
2789 0 : unlock_column(tr->store, c->base.id);
2790 0 : return 0;
2791 : }
2792 26 : BATiter bi = bat_iterator(b);
2793 26 : if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
2794 17 : const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
2795 17 : size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
2796 :
2797 17 : if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
2798 0 : _DELETE(c->min);
2799 0 : _DELETE(c->max);
2800 : } else {
2801 17 : memcpy(c->min, nmin, minlen);
2802 17 : memcpy(c->max, nmax, maxlen);
2803 17 : ok = 1;
2804 : }
2805 : }
2806 26 : bat_iterator_end(&bi);
2807 26 : bat_destroy(b);
2808 : }
2809 26 : unlock_column(tr->store, c->base.id);
2810 : }
2811 : return ok;
2812 : }
2813 :
2814 : static size_t
2815 17 : count_segs(segment *s)
2816 : {
2817 17 : size_t nr = 0;
2818 :
2819 72 : for( ; s; s = ATOMIC_PTR_GET(&s->next))
2820 55 : nr++;
2821 17 : return nr;
2822 : }
2823 :
2824 : static size_t
2825 34 : count_del(sql_trans *tr, sql_table *t, int access)
2826 : {
2827 34 : storage *d;
2828 :
2829 34 : if (!isTable(t))
2830 : return 0;
2831 34 : d = tab_timestamp_storage(tr, t);
2832 34 : if (!d)
2833 : return 0;
2834 34 : if (access == 2)
2835 0 : return d->cs.ucnt;
2836 34 : if (access == 1)
2837 0 : return count_inserts(d->segs->h, tr);
2838 34 : if (access == 10) /* special case for counting the number of segments */
2839 17 : return count_segs(d->segs->h);
2840 17 : return count_deletes(d->segs->h, tr);
2841 : }
2842 :
2843 : static int
2844 24175 : sorted_col(sql_trans *tr, sql_column *col)
2845 : {
2846 24175 : int sorted = 0;
2847 :
2848 24175 : assert(tr->active);
2849 24175 : if (!isTable(col->t) || !col->t->s)
2850 : return 0;
2851 :
2852 24175 : if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
2853 24150 : BAT *b = bind_col(tr, col, QUICK);
2854 :
2855 24150 : if (b)
2856 24150 : sorted = b->tsorted || b->trevsorted;
2857 : }
2858 : return sorted;
2859 : }
2860 :
2861 : static int
2862 7901 : unique_col(sql_trans *tr, sql_column *col)
2863 : {
2864 7901 : int distinct = 0;
2865 :
2866 7901 : assert(tr->active);
2867 7901 : if (!isTable(col->t) || !col->t->s)
2868 : return 0;
2869 :
2870 7901 : if (col && ATOMIC_PTR_GET(&col->data)) {
2871 7901 : BAT *b = bind_col(tr, col, QUICK);
2872 :
2873 7901 : if (b)
2874 7901 : distinct = b->tkey;
2875 : }
2876 : return distinct;
2877 : }
2878 :
2879 : static int
2880 1892 : double_elim_col(sql_trans *tr, sql_column *col)
2881 : {
2882 1892 : int de = 0;
2883 1892 : sql_delta *d;
2884 :
2885 1892 : assert(tr->active);
2886 1892 : if (!isTable(col->t) || !col->t->s)
2887 : return 0;
2888 :
2889 1892 : if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
2890 6 : if (d->cs.st == ST_DICT) {
2891 6 : BAT *b = bind_col(tr, col, QUICK);
2892 6 : if (b && b->ttype == TYPE_bte)
2893 : de = 1;
2894 0 : else if (b && b->ttype == TYPE_sht)
2895 1892 : de = 2;
2896 : }
2897 1886 : } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
2898 1886 : BAT *b = bind_col(tr, col, QUICK);
2899 :
2900 1886 : if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
2901 1886 : de = GDK_ELIMDOUBLES(b->tvheap);
2902 1886 : if (de)
2903 1660 : de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
2904 : }
2905 1660 : assert(de >= 0 && de <= 16);
2906 : }
2907 : return de;
2908 : }
2909 :
2910 : static int
2911 706345 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
2912 : {
2913 706345 : int ok = 0;
2914 706345 : BAT *b = NULL, *off = NULL, *upv = NULL;
2915 706345 : sql_delta *d = NULL;
2916 :
2917 706345 : (void) tr;
2918 706345 : assert(tr->active);
2919 706345 : *nonil = false;
2920 706345 : *unique = false;
2921 706345 : *unique_est = 0.0;
2922 706345 : if (!c || !isTable(c->t) || !c->t->s)
2923 : return ok;
2924 :
2925 706341 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2926 685232 : if (d->cs.st == ST_FOR) {
2927 9 : *nonil = true; /* TODO for min/max. I will do it later */
2928 9 : return ok;
2929 : }
2930 685223 : int eclass = c->type.type->eclass;
2931 685223 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2932 685223 : if ((b = bind_col(tr, c, access))) {
2933 685228 : if (!(b = bind_no_view(b, false)))
2934 0 : return ok;
2935 685222 : BATiter bi = bat_iterator(b);
2936 685229 : *nonil = bi.nonil && !bi.nil;
2937 :
2938 685229 : if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
2939 651312 : d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
2940 516971 : if (c->min && VALinit(min, bi.type, c->min))
2941 : ok |= 1;
2942 516917 : else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
2943 513151 : ok |= 1;
2944 516972 : if (c->max && VALinit(max, bi.type, c->max))
2945 54 : ok |= 2;
2946 516918 : else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
2947 511396 : ok |= 2;
2948 : }
2949 685229 : if (d->cs.ucnt == 0) {
2950 685206 : if (d->cs.st == ST_DEFAULT) {
2951 685020 : *unique = bi.key;
2952 685020 : *unique_est = bi.unique_est;
2953 685020 : if (*unique_est == 0)
2954 201890 : *unique_est = (double)BATguess_uniques(b,NULL);
2955 186 : } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
2956 : /* for dict, check the offsets bat for uniqueness */
2957 186 : MT_lock_set(&off->theaplock);
2958 186 : *unique = off->tkey;
2959 186 : *unique_est = off->tunique_est;
2960 186 : MT_lock_unset(&off->theaplock);
2961 : }
2962 : }
2963 685229 : bat_iterator_end(&bi);
2964 685229 : bat_destroy(b);
2965 685223 : if (*nonil && d->cs.ucnt > 0) {
2966 : /* This could use a quick descriptor */
2967 3 : if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
2968 0 : *nonil = false;
2969 : } else {
2970 3 : MT_lock_set(&upv->theaplock);
2971 3 : *nonil &= upv->tnonil && !upv->tnil;
2972 3 : MT_lock_unset(&upv->theaplock);
2973 3 : bat_destroy(upv);
2974 : }
2975 : }
2976 : }
2977 : }
2978 : return ok;
2979 : }
2980 :
2981 : static int
2982 245 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
2983 : {
2984 245 : assert(tr->active);
2985 245 : if (!isTable(col->t) || !col->t->s)
2986 : return LOG_OK;
2987 :
2988 240 : if (col && ATOMIC_PTR_GET(&col->data)) {
2989 240 : BAT *b = bind_col(tr, col, QUICK);
2990 :
2991 240 : if (b) { /* add props for ranges [min, max> */
2992 240 : MT_lock_set(&b->theaplock);
2993 240 : if (add_range) {
2994 167 : BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
2995 167 : if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
2996 99 : BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
2997 : else
2998 68 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2999 167 : if (!pt->with_nills || !col->null)
3000 111 : BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
3001 : } else {
3002 73 : BATrmprop_nolock(b, GDK_MIN_BOUND);
3003 73 : BATrmprop_nolock(b, GDK_MAX_BOUND);
3004 73 : BATrmprop_nolock(b, GDK_NOT_NULL);
3005 : }
3006 240 : MT_lock_unset(&b->theaplock);
3007 : }
3008 : }
3009 : return LOG_OK;
3010 : }
3011 :
3012 : static int
3013 4087 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
3014 : {
3015 4087 : assert(tr->active);
3016 4087 : if (!isTable(col->t) || !col->t->s)
3017 : return LOG_OK;
3018 :
3019 4057 : if (col && ATOMIC_PTR_GET(&col->data)) {
3020 4057 : BAT *b = bind_col(tr, col, QUICK);
3021 :
3022 4057 : if (b) { /* add props for ranges [min, max> */
3023 4057 : if (not_null) {
3024 4055 : BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
3025 : } else {
3026 2 : BATrmprop(b, GDK_NOT_NULL);
3027 : }
3028 : }
3029 : }
3030 : return LOG_OK;
3031 : }
3032 :
3033 : static int
3034 28614 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
3035 : {
3036 28614 : sqlstore *store = tr->store;
3037 28614 : int bid = log_find_bat(store->logger, id);
3038 28614 : if (bid <= 0)
3039 : return LOG_ERR;
3040 28614 : cs->bid = temp_dup(bid);
3041 28614 : cs->ucnt = 0;
3042 28614 : cs->uibid = e_bat(TYPE_oid);
3043 28614 : cs->uvbid = e_bat(type);
3044 28614 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
3045 : return LOG_ERR;
3046 : return LOG_OK;
3047 : }
3048 :
3049 : static int
3050 66500 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
3051 : {
3052 66500 : int res = LOG_OK;
3053 66500 : gdk_return ok;
3054 66500 : BAT *b = temp_descriptor(bat->cs.bid);
3055 :
3056 66500 : if (b == NULL)
3057 : return LOG_ERR;
3058 :
3059 66500 : if (!bat->cs.uibid)
3060 66492 : bat->cs.uibid = e_bat(TYPE_oid);
3061 66500 : if (!bat->cs.uvbid)
3062 66492 : bat->cs.uvbid = e_bat(b->ttype);
3063 66500 : if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
3064 0 : res = LOG_ERR;
3065 66500 : if (GDKinmemory(0)) {
3066 174 : bat_destroy(b);
3067 174 : return res;
3068 : }
3069 :
3070 66326 : bat_set_access(b, BAT_READ);
3071 66326 : sqlstore *store = tr->store;
3072 66326 : ok = log_bat_persists(store->logger, b, id);
3073 66326 : bat_destroy(b);
3074 66326 : if(res != LOG_OK)
3075 : return res;
3076 66326 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3077 : }
3078 :
3079 : static int
3080 0 : new_persistent_delta( sql_delta *bat)
3081 : {
3082 0 : bat->cs.ucnt = 0;
3083 0 : return LOG_OK;
3084 : }
3085 :
3086 : static void
3087 126876 : create_delta( sql_delta *d, BAT *b)
3088 : {
3089 126876 : bat_set_access(b, BAT_READ);
3090 126876 : d->cs.bid = temp_create(b);
3091 126876 : d->cs.uibid = d->cs.uvbid = 0;
3092 126876 : d->cs.ucnt = 0;
3093 126876 : }
3094 :
3095 : static bat
3096 7133 : copyBat (bat i, int type, oid seq)
3097 : {
3098 7133 : BAT *b, *tb;
3099 7133 : bat res;
3100 :
3101 7133 : if (!i)
3102 : return i;
3103 7133 : tb = quick_descriptor(i);
3104 7133 : if (tb == NULL)
3105 : return 0;
3106 7133 : b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
3107 7133 : if (b == NULL)
3108 : return 0;
3109 :
3110 7133 : bat_set_access(b, BAT_READ);
3111 :
3112 7133 : res = temp_create(b);
3113 7133 : bat_destroy(b);
3114 7133 : return res;
3115 : }
3116 :
3117 : static int
3118 148815 : create_col(sql_trans *tr, sql_column *c)
3119 : {
3120 148815 : int ok = LOG_OK, new = 0;
3121 148815 : int type = c->type.type->localtype;
3122 148815 : sql_delta *bat = ATOMIC_PTR_GET(&c->data);
3123 :
3124 148815 : if (!bat) {
3125 148815 : new = 1;
3126 148815 : bat = ZNEW(sql_delta);
3127 148815 : if (!bat)
3128 : return LOG_ERR;
3129 148815 : ATOMIC_PTR_SET(&c->data, bat);
3130 148815 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3131 : }
3132 :
3133 148815 : if (new)
3134 148815 : bat->cs.ts = tr->tid;
3135 :
3136 148815 : if (!isNew(c)&& !isTempTable(c->t)){
3137 21917 : bat->cs.ts = tr->ts;
3138 21917 : ok = load_cs(tr, &bat->cs, type, c->base.id);
3139 21917 : if (ok == LOG_OK && c->storage_type) {
3140 4 : if (strcmp(c->storage_type, "DICT") == 0) {
3141 2 : sqlstore *store = tr->store;
3142 2 : int bid = log_find_bat(store->logger, -c->base.id);
3143 2 : if (bid <= 0)
3144 : return LOG_ERR;
3145 2 : bat->cs.ebid = temp_dup(bid);
3146 2 : bat->cs.st = ST_DICT;
3147 2 : } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
3148 2 : bat->cs.st = ST_FOR;
3149 : }
3150 : }
3151 21917 : return ok;
3152 126898 : } else if (bat && bat->cs.bid) {
3153 0 : return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
3154 : } else {
3155 126898 : sql_column *fc = NULL;
3156 126898 : size_t cnt = 0;
3157 :
3158 : /* alter ? */
3159 126898 : if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
3160 78214 : storage *s = tab_timestamp_storage(tr, fc->t);
3161 78214 : if (s == NULL)
3162 : return LOG_ERR;
3163 78214 : cnt = segs_end(s->segs, tr, c->t);
3164 : }
3165 126898 : if (cnt && fc != c) {
3166 22 : sql_delta *d = ATOMIC_PTR_GET(&fc->data);
3167 :
3168 22 : if (d->cs.bid) {
3169 22 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3170 22 : if(bat->cs.bid == BID_NIL)
3171 22 : ok = LOG_ERR;
3172 : }
3173 22 : if (d->cs.uibid) {
3174 10 : bat->cs.uibid = e_bat(TYPE_oid);
3175 10 : if (bat->cs.uibid == BID_NIL)
3176 22 : ok = LOG_ERR;
3177 : }
3178 22 : if (d->cs.uvbid) {
3179 10 : bat->cs.uvbid = e_bat(type);
3180 10 : if(bat->cs.uvbid == BID_NIL)
3181 0 : ok = LOG_ERR;
3182 : }
3183 : } else {
3184 126876 : BAT *b = bat_new(type, c->t->sz, PERSISTENT);
3185 126876 : if (!b) {
3186 : ok = LOG_ERR;
3187 : } else {
3188 126876 : create_delta(ATOMIC_PTR_GET(&c->data), b);
3189 126876 : bat_destroy(b);
3190 : }
3191 :
3192 126876 : if (!new) {
3193 0 : bat->cs.uibid = e_bat(TYPE_oid);
3194 0 : if (bat->cs.uibid == BID_NIL)
3195 0 : ok = LOG_ERR;
3196 0 : bat->cs.uvbid = e_bat(type);
3197 0 : if(bat->cs.uvbid == BID_NIL)
3198 0 : ok = LOG_ERR;
3199 : }
3200 : }
3201 126898 : bat->cs.ucnt = 0;
3202 :
3203 126898 : if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
3204 90 : trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
3205 : }
3206 : return ok;
3207 : }
3208 :
3209 : static int
3210 60292 : log_create_col_(sql_trans *tr, sql_column *c)
3211 : {
3212 60292 : assert(!isTempTable(c->t));
3213 60292 : return log_create_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3214 : }
3215 :
3216 : static int
3217 86 : log_create_col(sql_trans *tr, sql_change *change)
3218 : {
3219 86 : return log_create_col_(tr, (sql_column*)change->obj);
3220 : }
3221 :
3222 : static int
3223 114165 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
3224 : {
3225 114165 : (void) t; // TODO transaction_layer_revamp: remove if unnecessary
3226 114165 : (void)oldest;
3227 114165 : assert(delta->cs.ts == tr->tid);
3228 114165 : delta->cs.ts = commit_ts;
3229 :
3230 114165 : assert(delta->next == NULL);
3231 114165 : if (!delta->cs.merged)
3232 114164 : merge_delta(delta);
3233 114165 : if (!tr->parent)
3234 114161 : base->new = 0;
3235 114165 : return LOG_OK;
3236 : }
3237 :
3238 : static int
3239 90 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3240 : {
3241 90 : sql_column *c = (sql_column*)change->obj;
3242 90 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3243 90 : if (!tr->parent)
3244 89 : c->base.new = 0;
3245 90 : return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
3246 : }
3247 :
3248 : /* will be called for new idx's and when new index columns are created */
3249 : static int
3250 9262 : create_idx(sql_trans *tr, sql_idx *ni)
3251 : {
3252 9262 : int ok = LOG_OK, new = 0;
3253 9262 : sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
3254 9262 : int type = TYPE_lng;
3255 :
3256 9262 : if (oid_index(ni->type))
3257 951 : type = TYPE_oid;
3258 :
3259 9262 : if (!bat) {
3260 9262 : new = 1;
3261 9262 : bat = ZNEW(sql_delta);
3262 9262 : if (!bat)
3263 : return LOG_ERR;
3264 9262 : ATOMIC_PTR_SET(&ni->data, bat);
3265 9262 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3266 : }
3267 :
3268 9262 : if (new)
3269 9262 : bat->cs.ts = tr->tid;
3270 :
3271 9262 : if (!isNew(ni) && !isTempTable(ni->t)){
3272 2151 : bat->cs.ts = 1;
3273 2151 : return load_cs(tr, &bat->cs, type, ni->base.id);
3274 7111 : } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
3275 0 : return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
3276 : } else {
3277 7111 : sql_column *c = ol_first_node(ni->t->columns)->data;
3278 7111 : sql_delta *d = col_timestamp_delta(tr, c);
3279 :
3280 7111 : if (d) {
3281 : /* Here we also handle indices created through alter stmts */
3282 : /* These need to be created aligned to the existing data */
3283 7111 : if (d->cs.bid) {
3284 7111 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3285 7111 : if(bat->cs.bid == BID_NIL)
3286 7111 : ok = LOG_ERR;
3287 : }
3288 : } else {
3289 : return LOG_ERR;
3290 : }
3291 :
3292 7111 : bat->cs.ucnt = 0;
3293 :
3294 7111 : if (!new) {
3295 0 : bat->cs.uibid = e_bat(TYPE_oid);
3296 0 : if (bat->cs.uibid == BID_NIL)
3297 0 : ok = LOG_ERR;
3298 0 : bat->cs.uvbid = e_bat(type);
3299 0 : if(bat->cs.uvbid == BID_NIL)
3300 0 : ok = LOG_ERR;
3301 : }
3302 7111 : bat->cs.ucnt = 0;
3303 7111 : if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
3304 631 : trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
3305 : }
3306 : return ok;
3307 : }
3308 :
3309 : static int
3310 6208 : log_create_idx_(sql_trans *tr, sql_idx *i)
3311 : {
3312 6208 : assert(!isTempTable(i->t));
3313 6208 : return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3314 : }
3315 :
3316 : static int
3317 617 : log_create_idx(sql_trans *tr, sql_change *change)
3318 : {
3319 617 : return log_create_idx_(tr, (sql_idx*)change->obj);
3320 : }
3321 :
3322 : static int
3323 631 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3324 : {
3325 631 : sql_idx *i = (sql_idx*)change->obj;
3326 631 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3327 631 : if (!tr->parent)
3328 631 : i->base.new = 0;
3329 631 : return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
3330 : return LOG_OK;
3331 : }
3332 :
3333 : static int
3334 4546 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
3335 : {
3336 4546 : int ok = load_cs(tr, &s->cs, TYPE_msk, id);
3337 4546 : BAT *b = NULL, *ib = NULL;
3338 :
3339 4546 : if (ok != LOG_OK)
3340 : return ok;
3341 4546 : if (!(b = temp_descriptor(s->cs.bid)))
3342 : return LOG_ERR;
3343 4546 : ib = b;
3344 :
3345 4546 : if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
3346 0 : bat_destroy(ib);
3347 0 : return LOG_ERR;
3348 : }
3349 :
3350 4546 : if (BATcount(b)) {
3351 261 : if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
3352 0 : bat_destroy(ib);
3353 0 : return LOG_ERR;
3354 : }
3355 398 : if (BATtdense(b)) {
3356 137 : size_t start = b->tseqbase;
3357 137 : size_t cnt = BATcount(b);
3358 137 : ok = delete_range(tr, t, s, start, cnt);
3359 : } else {
3360 124 : assert(b->tsorted);
3361 124 : BUN icnt = BATcount(b);
3362 124 : BATiter bi = bat_iterator(b);
3363 124 : size_t lcnt = 1;
3364 124 : oid n;
3365 124 : segment *seg = s->segs->h;
3366 124 : if (complex_cand(b)) {
3367 0 : oid o = * (oid *) Tpos(&bi, 0);
3368 0 : n = o + 1;
3369 0 : for (BUN i = 1; i < icnt; i++) {
3370 0 : o = * (oid *) Tpos(&bi, i);
3371 0 : if (o == n) {
3372 0 : lcnt++;
3373 0 : n++;
3374 : } else {
3375 0 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3376 : break;
3377 : lcnt = 0;
3378 : }
3379 0 : if (!lcnt) {
3380 0 : n = o + 1;
3381 0 : lcnt = 1;
3382 : }
3383 : }
3384 : } else {
3385 124 : oid *o = bi.base;
3386 124 : n = o[0]+1;
3387 191799 : for (size_t i=1; i<icnt; i++) {
3388 191675 : if (o[i] == n) {
3389 190202 : lcnt++;
3390 190202 : n++;
3391 : } else {
3392 1473 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3393 : break;
3394 : lcnt = 0;
3395 : }
3396 191675 : if (!lcnt) {
3397 1473 : n = o[i]+1;
3398 1473 : lcnt = 1;
3399 : }
3400 : }
3401 : }
3402 124 : if (lcnt && ok == LOG_OK)
3403 124 : ok = delete_range(tr, t, s, n-lcnt, lcnt);
3404 124 : bat_iterator_end(&bi);
3405 : }
3406 261 : if (ok == LOG_OK)
3407 3880 : for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
3408 3619 : if (seg->ts == tr->tid)
3409 1888 : seg->ts = 1;
3410 : } else {
3411 4285 : if (ok == LOG_OK) {
3412 4285 : BAT *bb = quick_descriptor(s->cs.bid);
3413 :
3414 4285 : if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
3415 : ok = LOG_ERR;
3416 : } else {
3417 4285 : segment *seg = s->segs->h;
3418 4285 : if (seg->ts == tr->tid)
3419 4285 : seg->ts = 1;
3420 : }
3421 : }
3422 : }
3423 4546 : if (b != ib)
3424 4546 : bat_destroy(b);
3425 4546 : bat_destroy(ib);
3426 :
3427 4546 : return ok;
3428 : }
3429 :
3430 : static int
3431 24804 : create_del(sql_trans *tr, sql_table *t)
3432 : {
3433 24804 : int ok = LOG_OK, new = 0;
3434 24804 : BAT *b;
3435 24804 : storage *bat = ATOMIC_PTR_GET(&t->data);
3436 :
3437 24804 : if (!bat) {
3438 24804 : new = 1;
3439 24804 : bat = ZNEW(storage);
3440 24804 : if(!bat)
3441 : return LOG_ERR;
3442 24804 : ATOMIC_PTR_SET(&t->data, bat);
3443 24804 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3444 24804 : bat->cs.ts = tr->tid;
3445 : }
3446 :
3447 24804 : if (!isNew(t) && !isTempTable(t)) {
3448 4546 : bat->cs.ts = tr->ts;
3449 4546 : return load_storage(tr, t, bat, t->base.id);
3450 20258 : } else if (bat->cs.bid) {
3451 : return ok;
3452 : } else {
3453 20258 : assert(!bat->segs);
3454 20258 : if (!(bat->segs = new_segments(tr, 0)))
3455 : return LOG_ERR;
3456 :
3457 20258 : b = bat_new(TYPE_msk, t->sz, PERSISTENT);
3458 20258 : if(b != NULL) {
3459 20258 : bat_set_access(b, BAT_READ);
3460 20258 : bat->cs.bid = temp_create(b);
3461 20258 : bat_destroy(b);
3462 : } else {
3463 : return LOG_ERR;
3464 : }
3465 20258 : if (new)
3466 26708 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
3467 : }
3468 : return ok;
3469 : }
3470 :
3471 : static int
3472 182774 : log_segment(sql_trans *tr, segment *s, sqlid id)
3473 : {
3474 182774 : sqlstore *store = tr->store;
3475 182774 : msk m = s->deleted;
3476 182774 : return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
3477 : }
3478 :
3479 : static int
3480 94225 : log_segments(sql_trans *tr, segments *segs, sqlid id)
3481 : {
3482 : /* log segments */
3483 94225 : lock_table(tr->store, id);
3484 470295 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3485 376070 : unlock_table(tr->store, id);
3486 376070 : if (seg->ts == tr->tid && seg->end-seg->start) {
3487 137896 : if (log_segment(tr, seg, id) != LOG_OK) {
3488 0 : unlock_table(tr->store, id);
3489 0 : return LOG_ERR;
3490 : }
3491 : }
3492 376070 : lock_table(tr->store, id);
3493 : }
3494 94225 : unlock_table(tr->store, id);
3495 94225 : return LOG_OK;
3496 : }
3497 :
3498 : static int
3499 12049 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
3500 : {
3501 12049 : BAT *b;
3502 12049 : int ok = LOG_OK;
3503 :
3504 12049 : if (GDKinmemory(0))
3505 : return LOG_OK;
3506 :
3507 12017 : b = temp_descriptor(bat->cs.bid);
3508 12017 : if (b == NULL)
3509 : return LOG_ERR;
3510 :
3511 12017 : sqlstore *store = tr->store;
3512 12017 : bat_set_access(b, BAT_READ);
3513 12017 : if (ok == LOG_OK)
3514 12017 : ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
3515 12017 : if (ok == LOG_OK)
3516 12017 : ok = log_segments(tr, bat->segs, t->base.id);
3517 12017 : bat_destroy(b);
3518 12017 : return ok;
3519 : }
3520 :
3521 : static int
3522 12063 : log_create_del(sql_trans *tr, sql_change *change)
3523 : {
3524 12063 : int ok = LOG_OK;
3525 12063 : sql_table *t = (sql_table*)change->obj;
3526 :
3527 12063 : if (t->base.deleted)
3528 : return ok;
3529 12049 : assert(!isTempTable(t));
3530 12049 : ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
3531 12049 : if (ok == LOG_OK) {
3532 72255 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3533 60206 : sql_column *c = n->data;
3534 :
3535 60206 : ok = log_create_col_(tr, c);
3536 : }
3537 12049 : if (t->idxs) {
3538 17648 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3539 5599 : sql_idx *i = n->data;
3540 :
3541 5599 : if (ATOMIC_PTR_GET(&i->data))
3542 5591 : ok = log_create_idx_(tr, i);
3543 : }
3544 : }
3545 : }
3546 : return ok;
3547 : }
3548 :
3549 : static int
3550 20260 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3551 : {
3552 20260 : int ok = LOG_OK;
3553 20260 : sql_table *t = (sql_table*)change->obj;
3554 20260 : storage *dbat = ATOMIC_PTR_GET(&t->data);
3555 :
3556 20260 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
3557 119 : assert(isTempTable(t));
3558 119 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
3559 119 : if (commit_ts) dbat->segs->h->ts = commit_ts;
3560 119 : return ok;
3561 : }
3562 :
3563 20141 : if (!commit_ts) /* rollback handled by ? */
3564 : return ok;
3565 18295 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
3566 18295 : assert(ok == LOG_OK);
3567 18295 : if (ok != LOG_OK)
3568 : return ok;
3569 18295 : merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
3570 18295 : assert(dbat->cs.ts == tr->tid);
3571 18295 : dbat->cs.ts = commit_ts;
3572 18295 : if (ok == LOG_OK) {
3573 126135 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3574 107840 : sql_column *c = n->data;
3575 107840 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3576 :
3577 107840 : ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
3578 : }
3579 18295 : if (t->idxs) {
3580 23907 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3581 5612 : sql_idx *i = n->data;
3582 5612 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3583 :
3584 5612 : if (delta)
3585 5604 : ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
3586 : }
3587 : }
3588 18295 : if (!tr->parent)
3589 18293 : t->base.new = 0;
3590 : }
3591 18295 : if (!tr->parent)
3592 18293 : t->base.new = 0;
3593 : return ok;
3594 : }
3595 :
3596 : static int
3597 18245 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
3598 : {
3599 18245 : gdk_return ok = GDK_SUCCEED;
3600 :
3601 18245 : sqlstore *store = tr->store;
3602 18245 : if (!GDKinmemory(0) && b && b->cs.bid)
3603 18242 : ok = log_bat_transient(store->logger, id);
3604 18245 : if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
3605 25 : ok = log_bat_transient(store->logger, -id);
3606 18245 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3607 : }
3608 :
3609 : static int
3610 166552 : destroy_col(sqlstore *store, sql_column *c)
3611 : {
3612 166552 : (void)store;
3613 166552 : if (ATOMIC_PTR_GET(&c->data))
3614 166552 : destroy_delta(ATOMIC_PTR_GET(&c->data), true);
3615 166552 : ATOMIC_PTR_SET(&c->data, NULL);
3616 166552 : return LOG_OK;
3617 : }
3618 :
3619 : static int
3620 16453 : log_destroy_col_(sql_trans *tr, sql_column *c)
3621 : {
3622 16453 : int ok = LOG_OK;
3623 16453 : assert(!isTempTable(c->t));
3624 16453 : if (!tr->parent) /* don't write save point commits */
3625 16453 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3626 16453 : return ok;
3627 : }
3628 :
3629 : static int
3630 55 : log_destroy_col(sql_trans *tr, sql_change *change)
3631 : {
3632 55 : sql_column *c = (sql_column*)change->obj;
3633 55 : int res = log_destroy_col_(tr, c);
3634 55 : change->obj = NULL;
3635 55 : column_destroy(tr->store, c);
3636 55 : return res;
3637 : }
3638 :
3639 : static int
3640 10639 : destroy_idx(sqlstore *store, sql_idx *i)
3641 : {
3642 10639 : (void)store;
3643 10639 : if (ATOMIC_PTR_GET(&i->data))
3644 10639 : destroy_delta(ATOMIC_PTR_GET(&i->data), true);
3645 10639 : ATOMIC_PTR_SET(&i->data, NULL);
3646 10639 : return LOG_OK;
3647 : }
3648 :
3649 : static int
3650 1854 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
3651 : {
3652 1854 : int ok = LOG_OK;
3653 1854 : assert(!isTempTable(i->t));
3654 1854 : if (ATOMIC_PTR_GET(&i->data)) {
3655 1792 : if (!tr->parent) /* don't write save point commits */
3656 1792 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3657 : }
3658 1854 : return ok;
3659 : }
3660 :
3661 : static int
3662 501 : log_destroy_idx(sql_trans *tr, sql_change *change)
3663 : {
3664 501 : sql_idx *i = (sql_idx*)change->obj;
3665 501 : int res = log_destroy_idx_(tr, i);
3666 501 : change->obj = NULL;
3667 501 : idx_destroy(tr->store, i);
3668 501 : return res;
3669 : }
3670 :
3671 : static int
3672 26302 : destroy_del(sqlstore *store, sql_table *t)
3673 : {
3674 26302 : (void)store;
3675 26302 : if (ATOMIC_PTR_GET(&t->data))
3676 26292 : destroy_storage(ATOMIC_PTR_GET(&t->data));
3677 26302 : ATOMIC_PTR_SET(&t->data, NULL);
3678 26302 : return LOG_OK;
3679 : }
3680 :
3681 : static int
3682 3085 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
3683 : {
3684 3085 : gdk_return ok = GDK_SUCCEED;
3685 :
3686 3085 : sqlstore *store = tr->store;
3687 3085 : if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
3688 3085 : bat && bat->cs.bid)
3689 3085 : ok = log_bat_transient(store->logger, id);
3690 3085 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3691 : }
3692 :
3693 : static int
3694 3085 : log_destroy_del(sql_trans *tr, sql_change *change)
3695 : {
3696 3085 : int ok = LOG_OK;
3697 3085 : sql_table *t = (sql_table*)change->obj;
3698 :
3699 3085 : assert(!isTempTable(t));
3700 3085 : ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
3701 3085 : if (ok == LOG_OK) {
3702 19483 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3703 16398 : sql_column *c = n->data;
3704 :
3705 16398 : ok = log_destroy_col_(tr, c);
3706 : }
3707 3085 : if (t->idxs) {
3708 4438 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3709 1353 : sql_idx *i = n->data;
3710 :
3711 1353 : ok = log_destroy_idx_(tr, i);
3712 : }
3713 : }
3714 : }
3715 3085 : return ok;
3716 : }
3717 :
3718 : static int
3719 3723 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3720 : {
3721 3723 : (void)tr;
3722 3723 : (void)change;
3723 3723 : (void)commit_ts;
3724 3723 : (void)oldest;
3725 3723 : if (commit_ts)
3726 3708 : change->handled = true;
3727 3723 : return 0;
3728 : }
3729 :
3730 : static int
3731 3158 : drop_del(sql_trans *tr, sql_table *t)
3732 : {
3733 3158 : int ok = LOG_OK;
3734 :
3735 3158 : if (!isNew(t)) {
3736 3158 : storage *bat = ATOMIC_PTR_GET(&t->data);
3737 3225 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_destroy_del);
3738 : }
3739 3158 : return ok;
3740 : }
3741 :
3742 : static int
3743 63 : drop_col(sql_trans *tr, sql_column *c)
3744 : {
3745 63 : assert(!isNew(c));
3746 63 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
3747 63 : trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_destroy_col);
3748 63 : return LOG_OK;
3749 : }
3750 :
3751 : static int
3752 502 : drop_idx(sql_trans *tr, sql_idx *i)
3753 : {
3754 502 : assert(!isNew(i));
3755 502 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
3756 502 : trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_destroy_idx);
3757 502 : return LOG_OK;
3758 : }
3759 :
3760 :
3761 : static BUN
3762 129591 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
3763 : {
3764 129591 : BAT *b;
3765 129591 : BUN sz = 0;
3766 :
3767 129591 : (void)tr;
3768 129591 : assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
3769 129591 : if (cs->bid && renew) {
3770 129591 : b = quick_descriptor(cs->bid);
3771 129573 : if (b) {
3772 129573 : sz += BATcount(b);
3773 129573 : if (cs->st == ST_DICT) {
3774 2 : bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
3775 2 : BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
3776 :
3777 2 : if (nebid == BID_NIL || !n) {
3778 0 : temp_destroy(nebid);
3779 0 : bat_destroy(n);
3780 0 : return BUN_NONE;
3781 : }
3782 2 : temp_destroy(cs->ebid);
3783 2 : cs->ebid = nebid;
3784 2 : if (!temp)
3785 2 : bat_set_access(n, BAT_READ);
3786 2 : temp_destroy(cs->bid);
3787 2 : cs->bid = temp_create(n); /* create empty copy */
3788 2 : bat_destroy(n);
3789 : } else {
3790 129571 : bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
3791 :
3792 129569 : if (nbid == BID_NIL)
3793 : return BUN_NONE;
3794 129569 : temp_destroy(cs->bid);
3795 129569 : cs->bid = nbid;
3796 : }
3797 : } else {
3798 : return BUN_NONE;
3799 : }
3800 : }
3801 129571 : if (cs->uibid) {
3802 129428 : temp_destroy(cs->uibid);
3803 129455 : cs->uibid = 0;
3804 : }
3805 129598 : if (cs->uvbid) {
3806 129455 : temp_destroy(cs->uvbid);
3807 129455 : cs->uvbid = 0;
3808 : }
3809 129598 : cs->cleared = true;
3810 129598 : cs->ucnt = 0;
3811 129598 : return sz;
3812 : }
3813 :
3814 : static BUN
3815 129448 : clear_col(sql_trans *tr, sql_column *c, bool renew)
3816 : {
3817 129448 : bool update_conflict = false;
3818 129448 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
3819 :
3820 129448 : if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
3821 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3822 129448 : assert(c->t->persistence != SQL_DECLARED_TABLE);
3823 129448 : if (odelta != delta)
3824 129453 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
3825 129437 : if (delta)
3826 129437 : return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
3827 : return 0;
3828 : }
3829 :
3830 : static BUN
3831 21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
3832 : {
3833 21 : bool update_conflict = false;
3834 21 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
3835 :
3836 21 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
3837 15 : return 0;
3838 6 : if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
3839 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3840 6 : assert(i->t->persistence != SQL_DECLARED_TABLE);
3841 6 : if (odelta != delta)
3842 6 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
3843 6 : if (delta)
3844 6 : return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
3845 : return 0;
3846 : }
3847 :
3848 : static int
3849 141 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
3850 : {
3851 141 : if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
3852 : return LOG_ERR;
3853 141 : if (s->segs)
3854 141 : destroy_segments(s->segs);
3855 141 : if (!(s->segs = new_segments(tr, 0)))
3856 : return LOG_ERR;
3857 : return LOG_OK;
3858 : }
3859 :
3860 :
3861 : /*
3862 : * Clear the table, in general this means replacing the storage,
3863 : * but in case of earlier deletes (or inserts by this transaction), we only mark
3864 : * all segments as deleted.
3865 : * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
3866 : */
3867 : static BUN
3868 41811 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
3869 : {
3870 41811 : int clear = !in_transaction, ok = LOG_OK;
3871 41811 : bool conflict = false;
3872 41811 : storage *bat;
3873 :
3874 41850 : if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
3875 15801 : return conflict?BUN_NONE-1:BUN_NONE;
3876 :
3877 26010 : if (!clear) {
3878 39 : lock_table(tr->store, t->base.id);
3879 39 : ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
3880 39 : unlock_table(tr->store, t->base.id);
3881 : }
3882 26010 : assert(t->persistence != SQL_DECLARED_TABLE);
3883 26010 : if (!in_transaction)
3884 25974 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
3885 26010 : if (ok == LOG_ERR)
3886 : return BUN_NONE;
3887 26010 : if (ok == LOG_CONFLICT)
3888 0 : return BUN_NONE - 1;
3889 : return LOG_OK;
3890 : }
3891 :
3892 : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
3893 : static BUN
3894 41811 : clear_table(sql_trans *tr, sql_table *t)
3895 : {
3896 41811 : node *n = ol_first_node(t->columns);
3897 41811 : sql_column *c = n->data;
3898 41811 : storage *d = tab_timestamp_storage(tr, t);
3899 41811 : int in_transaction, clear;
3900 41811 : BUN sz, clear_ok;
3901 :
3902 41811 : if (!d)
3903 : return BUN_NONE;
3904 41811 : in_transaction = segments_in_transaction(tr, t);
3905 41811 : clear = !in_transaction;
3906 41811 : sz = count_col(tr, c, CNT_ACTIVE);
3907 41811 : if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
3908 : return clear_ok;
3909 :
3910 26010 : if (in_transaction)
3911 : return sz;
3912 :
3913 155419 : for (; n; n = n->next) {
3914 129448 : c = n->data;
3915 :
3916 129448 : if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
3917 0 : return clear_ok;
3918 : }
3919 25971 : if (t->idxs) {
3920 25992 : for (n = ol_first_node(t->idxs); n; n = n->next) {
3921 21 : sql_idx *ci = n->data;
3922 :
3923 21 : if (isTable(ci->t) && idx_has_column(ci->type) &&
3924 21 : (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
3925 0 : return clear_ok;
3926 : }
3927 : }
3928 : return sz;
3929 : }
3930 :
3931 : static int
3932 158729 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
3933 : {
3934 158729 : sqlstore *store = tr->store;
3935 158729 : gdk_return ok = GDK_SUCCEED;
3936 :
3937 158729 : (void) t;
3938 158729 : (void) segs;
3939 158729 : if (GDKinmemory(0))
3940 : return LOG_OK;
3941 :
3942 158722 : if (cs->cleared) {
3943 155473 : assert(cs->ucnt == 0);
3944 155473 : BAT *ins = temp_descriptor(cs->bid);
3945 155473 : if (!ins)
3946 : return LOG_ERR;
3947 155473 : assert(!isEbat(ins));
3948 155473 : bat_set_access(ins, BAT_READ);
3949 155473 : ok = log_bat_persists(store->logger, ins, id);
3950 155473 : bat_destroy(ins);
3951 155473 : if (ok == GDK_SUCCEED && cs->ebid) {
3952 56 : BAT *ins = temp_descriptor(cs->ebid);
3953 56 : if (!ins)
3954 : return LOG_ERR;
3955 56 : assert(!isEbat(ins));
3956 56 : bat_set_access(ins, BAT_READ);
3957 56 : ok = log_bat_persists(store->logger, ins, -id);
3958 56 : bat_destroy(ins);
3959 : }
3960 155473 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3961 : }
3962 :
3963 3249 : assert(!isTempTable(t));
3964 :
3965 3249 : if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
3966 2699 : BAT *ui = temp_descriptor(cs->uibid);
3967 2699 : BAT *uv = temp_descriptor(cs->uvbid);
3968 : /* any updates */
3969 2699 : if (ui == NULL || uv == NULL) {
3970 : ok = GDK_FAIL;
3971 2699 : } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
3972 2699 : ok = log_delta(store->logger, ui, uv, id);
3973 2699 : bat_destroy(ui);
3974 2699 : bat_destroy(uv);
3975 : }
3976 2699 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3977 : }
3978 :
3979 : static inline int
3980 56243 : tr_log_table_start(sql_trans *tr, sql_table *t) {
3981 56243 : sqlstore *store = tr->store;
3982 56243 : return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3983 : }
3984 :
3985 : static inline int
3986 56243 : tr_log_table_end(sql_trans *tr, sql_table *t) {
3987 56243 : sqlstore *store = tr->store;
3988 56243 : return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3989 : }
3990 :
3991 : static int
3992 56243 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
3993 : {
3994 56243 : sqlstore *store = tr->store;
3995 56243 : gdk_return ok = GDK_SUCCEED;
3996 :
3997 56243 : size_t end = segs_end(segs, tr, t);
3998 :
3999 56243 : if (tr_log_table_start(tr, t) != LOG_OK)
4000 : return LOG_ERR;
4001 :
4002 56243 : size_t nr_appends = 0;
4003 :
4004 56243 : lock_table(tr->store, t->base.id);
4005 394417 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
4006 338174 : unlock_table(tr->store, t->base.id);
4007 :
4008 338174 : if (seg->ts == tr->tid && seg->end-seg->start) {
4009 107562 : if (!seg->deleted) {
4010 44878 : if (log_segment(tr, seg, t->base.id) != LOG_OK)
4011 : return LOG_ERR;
4012 :
4013 44878 : nr_appends += (seg->end - seg->start);
4014 : }
4015 : }
4016 338174 : lock_table(tr->store, t->base.id);
4017 : }
4018 56243 : unlock_table(tr->store, t->base.id);
4019 :
4020 388866 : for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
4021 332623 : sql_column *c = n->data;
4022 332623 : column_storage *cs = ATOMIC_PTR_GET(&c->data);
4023 :
4024 332623 : if (cs->cleared) {
4025 3 : ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4026 3 : continue;
4027 : }
4028 :
4029 332620 : lock_table(tr->store, t->base.id);
4030 332620 : if (!cs->cleared) {
4031 2300227 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4032 1967607 : unlock_table(tr->store, t->base.id);
4033 1967607 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4034 : /* append col*/
4035 253375 : BAT *ins = temp_descriptor(cs->bid);
4036 253375 : if (ins == NULL)
4037 : return LOG_ERR;
4038 253375 : assert(BATcount(ins) >= cur->end);
4039 253375 : ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
4040 253375 : bat_destroy(ins);
4041 : }
4042 1967607 : lock_table(tr->store, t->base.id);
4043 : }
4044 : }
4045 332620 : unlock_table(tr->store, t->base.id);
4046 :
4047 332620 : if (ok == GDK_SUCCEED && cs->ebid) {
4048 19 : BAT *ins = temp_descriptor(cs->ebid);
4049 19 : if (ins == NULL)
4050 : return LOG_ERR;
4051 19 : if (BATcount(ins) > ins->batInserted)
4052 17 : ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
4053 19 : BATcommit(ins, BATcount(ins));
4054 19 : bat_destroy(ins);
4055 : }
4056 : }
4057 :
4058 56243 : if (t->idxs) {
4059 60777 : for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
4060 4534 : sql_idx *i = n->data;
4061 :
4062 4534 : if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
4063 3765 : continue;
4064 769 : column_storage *cs = ATOMIC_PTR_GET(&i->data);
4065 :
4066 769 : if (cs) {
4067 769 : if (cs->cleared) {
4068 0 : ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4069 0 : continue;
4070 : }
4071 :
4072 769 : lock_table(tr->store, t->base.id);
4073 2360 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4074 1591 : unlock_table(tr->store, t->base.id);
4075 1591 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4076 : /* append idx */
4077 721 : BAT *ins = temp_descriptor(cs->bid);
4078 721 : if (ins == NULL)
4079 : return LOG_ERR;
4080 721 : assert(BATcount(ins) >= cur->end);
4081 721 : ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
4082 721 : bat_destroy(ins);
4083 : }
4084 1591 : lock_table(tr->store, t->base.id);
4085 : }
4086 769 : unlock_table(tr->store, t->base.id);
4087 : }
4088 : }
4089 : }
4090 :
4091 56243 : if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
4092 0 : return LOG_ERR;
4093 :
4094 : return LOG_OK;
4095 : }
4096 :
4097 : static int
4098 82208 : log_storage(sql_trans *tr, sql_table *t, storage *s)
4099 : {
4100 82208 : int ok = LOG_OK;
4101 82208 : bool cleared = s->cs.cleared;
4102 82208 : if (ok == LOG_OK && cleared)
4103 25965 : ok = tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
4104 25965 : if (ok == LOG_OK)
4105 82208 : ok = log_segments(tr, s->segs, t->base.id);
4106 82208 : if (ok == LOG_OK && !cleared)
4107 56243 : ok = log_table_append(tr, t, s->segs);
4108 82208 : return ok;
4109 : }
4110 :
4111 : static void
4112 298415 : merge_cs( column_storage *cs, const char* caller)
4113 : {
4114 298415 : if (cs->bid && cs->ucnt) {
4115 2707 : BAT *cur = temp_descriptor(cs->bid);
4116 2707 : BAT *ui = temp_descriptor(cs->uibid);
4117 2707 : BAT *uv = temp_descriptor(cs->uvbid);
4118 :
4119 2707 : if (!cur || !ui || !uv) {
4120 0 : bat_destroy(ui);
4121 0 : bat_destroy(uv);
4122 0 : bat_destroy(cur);
4123 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4124 : return;
4125 : }
4126 2707 : assert(BATcount(ui) == BATcount(uv));
4127 :
4128 : /* any updates */
4129 2707 : assert(!isEbat(cur));
4130 2707 : if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
4131 0 : bat_destroy(ui);
4132 0 : bat_destroy(uv);
4133 0 : bat_destroy(cur);
4134 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4135 : return;
4136 : }
4137 : /* cleanup the old deltas */
4138 2707 : temp_destroy(cs->uibid);
4139 2707 : temp_destroy(cs->uvbid);
4140 2707 : cs->uibid = e_bat(TYPE_oid);
4141 2707 : cs->uvbid = e_bat(cur->ttype);
4142 2707 : assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
4143 2707 : cs->ucnt = 0;
4144 2707 : bat_destroy(ui);
4145 2707 : bat_destroy(uv);
4146 2707 : bat_destroy(cur);
4147 : }
4148 298415 : cs->cleared = false;
4149 298415 : cs->merged = true;
4150 298415 : return;
4151 : }
4152 :
4153 : static void
4154 258552 : merge_delta( sql_delta *obat)
4155 : {
4156 258552 : if (obat && obat->next && !obat->cs.merged)
4157 50351 : merge_delta(obat->next);
4158 258552 : merge_cs(&obat->cs, __func__);
4159 258552 : }
4160 :
4161 : static void
4162 39863 : merge_storage(storage *tdb)
4163 : {
4164 39863 : merge_cs(&tdb->cs, __func__);
4165 :
4166 39863 : if (tdb->next) {
4167 481 : destroy_storage(tdb->next);
4168 481 : tdb->next = NULL;
4169 : }
4170 39863 : }
4171 :
4172 : static sql_delta *
4173 1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
4174 : {
4175 : /* commit ie copy back to the parent transaction */
4176 1 : if (delta && delta->cs.ts == commit_ts && delta->next) {
4177 1 : sql_delta *od = delta->next;
4178 1 : if (od->cs.ts == commit_ts) {
4179 0 : sql_delta t = *od, *n = od->next;
4180 0 : *od = *delta;
4181 0 : od->next = n;
4182 0 : *delta = t;
4183 0 : delta->next = NULL;
4184 0 : destroy_delta(delta, true);
4185 0 : return od;
4186 : }
4187 : }
4188 : return delta;
4189 : }
4190 :
4191 : static int
4192 132735 : log_update_col( sql_trans *tr, sql_change *change)
4193 : {
4194 132735 : sql_column *c = (sql_column*)change->obj;
4195 132735 : assert(!isTempTable(c->t));
4196 :
4197 132735 : if (isDeleted(c->t)) {
4198 0 : change->handled = true;
4199 0 : return LOG_OK;
4200 : }
4201 :
4202 132735 : if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
4203 132735 : storage *s = ATOMIC_PTR_GET(&c->t->data);
4204 132735 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
4205 132735 : return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
4206 : }
4207 : return LOG_OK;
4208 : }
4209 :
4210 : static int
4211 217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
4212 : {
4213 217 : sqlstore *store = Store;
4214 :
4215 217 : sql_delta *d = (sql_delta*)change->data;
4216 217 : if (d->cs.ts < oldest) {
4217 82 : destroy_delta(d, false);
4218 82 : if (change->commit == &commit_update_idx)
4219 2 : table_destroy(store, ((sql_idx*)change->obj)->t);
4220 : else
4221 80 : table_destroy(store, ((sql_column*)change->obj)->t);
4222 82 : return 1;
4223 : }
4224 135 : if (d->cs.ts > TRANSACTION_ID_BASE)
4225 82 : d->cs.ts = store_get_timestamp(store) + 1;
4226 : return 0;
4227 : }
4228 :
4229 : static int
4230 9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
4231 : {
4232 9 : sqlstore *store = Store;
4233 :
4234 9 : storage *d = (storage*)change->data;
4235 9 : if (d->cs.ts < oldest) {
4236 3 : destroy_storage(d);
4237 3 : table_destroy(store, (sql_table*)change->obj);
4238 3 : return 1;
4239 : }
4240 6 : if (d->cs.ts > TRANSACTION_ID_BASE)
4241 3 : d->cs.ts = store_get_timestamp(store) + 1;
4242 : return 0;
4243 : }
4244 :
4245 : static int
4246 132853 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
4247 : {
4248 132853 : (void) type; // TODO transaction_layer_revamp remove if remains unused
4249 :
4250 132853 : sql_delta *delta = ATOMIC_PTR_GET(data);
4251 :
4252 132853 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4253 3 : int ok = LOG_OK;
4254 3 : assert(isTempTable(t));
4255 3 : if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
4256 0 : ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
4257 3 : if (!tr->parent)
4258 0 : t->base.new = base->new = 0;
4259 3 : change->handled = true;
4260 3 : return ok;
4261 : }
4262 :
4263 132850 : if (commit_ts)
4264 132768 : delta->cs.ts = commit_ts;
4265 132850 : if (!commit_ts) { /* rollback */
4266 82 : sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
4267 :
4268 82 : if (change->ts && t->base.new) /* handled by create col */
4269 : return LOG_OK;
4270 82 : if (o != d) {
4271 0 : while(o && o->next != d)
4272 : o = o->next;
4273 : }
4274 82 : if (o == ATOMIC_PTR_GET(data))
4275 82 : ATOMIC_PTR_SET(data, d->next);
4276 : else
4277 0 : o->next = d->next;
4278 82 : d->next = NULL;
4279 82 : change->cleanup = &tc_gc_rollbacked;
4280 132768 : } else if (!tr->parent) {
4281 : /* merge deltas */
4282 353355 : while (delta && delta->cs.ts > oldest)
4283 220588 : delta = delta->next;
4284 132767 : if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
4285 30905 : lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
4286 30905 : merge_delta(delta);
4287 30905 : unlock_column(tr->store, base->id);
4288 : }
4289 1 : } else if (tr->parent) /* move delta into older and cleanup current save points */
4290 1 : ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
4291 : return LOG_OK;
4292 : }
4293 :
4294 : static int
4295 132825 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4296 : {
4297 :
4298 132825 : sql_column *c = (sql_column*)change->obj;
4299 132825 : sql_base* base = &c->base;
4300 132825 : sql_table* t = c->t;
4301 132825 : ATOMIC_PTR_TYPE* data = &c->data;
4302 132825 : int type = c->type.type->localtype;
4303 :
4304 132825 : if (change->handled || isDeleted(c->t))
4305 : return LOG_OK;
4306 :
4307 132825 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4308 : }
4309 :
4310 : static int
4311 26 : log_update_idx( sql_trans *tr, sql_change *change)
4312 : {
4313 26 : sql_idx *i = (sql_idx*)change->obj;
4314 26 : assert(!isTempTable(i->t));
4315 :
4316 26 : if (isDeleted(i->t)) {
4317 0 : change->handled = true;
4318 0 : return LOG_OK;
4319 : }
4320 :
4321 26 : if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
4322 26 : storage *s = ATOMIC_PTR_GET(&i->t->data);
4323 26 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
4324 26 : return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
4325 : }
4326 : return LOG_OK;
4327 : }
4328 :
4329 : static int
4330 28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4331 : {
4332 28 : sql_idx *i = (sql_idx*)change->obj;
4333 28 : sql_base* base = &i->base;
4334 28 : sql_table* t = i->t;
4335 28 : ATOMIC_PTR_TYPE* data = &i->data;
4336 28 : int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
4337 :
4338 28 : if (change->handled || isDeleted(i->t))
4339 : return LOG_OK;
4340 :
4341 28 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4342 : }
4343 :
4344 : static storage *
4345 26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
4346 : {
4347 26 : if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
4348 0 : assert(0);
4349 : storage *od = dbat->next;
4350 : if (od->cs.ts == commit_ts) {
4351 : storage t = *od, *n = od->next;
4352 : *od = *dbat;
4353 : od->next = n;
4354 : *dbat = t;
4355 : dbat->next = NULL;
4356 : destroy_storage(dbat);
4357 : return od;
4358 : }
4359 : }
4360 26 : return dbat;
4361 : }
4362 :
4363 : static int
4364 82208 : log_update_del( sql_trans *tr, sql_change *change)
4365 : {
4366 82208 : sql_table *t = (sql_table*)change->obj;
4367 82208 : assert(!isTempTable(t));
4368 :
4369 82208 : if (isDeleted(t)) {
4370 0 : change->handled = true;
4371 0 : return LOG_OK;
4372 : }
4373 :
4374 82208 : if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
4375 82208 : return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
4376 : return LOG_OK;
4377 : }
4378 :
4379 : static int
4380 86678 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4381 : {
4382 86678 : int ok = LOG_OK;
4383 86678 : sql_table *t = (sql_table*)change->obj;
4384 86678 : storage *dbat = ATOMIC_PTR_GET(&t->data);
4385 :
4386 86678 : if (change->handled || isDeleted(t))
4387 : return ok;
4388 :
4389 86678 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4390 22 : assert(isTempTable(t));
4391 22 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
4392 22 : if (commit_ts) dbat->segs->h->ts = commit_ts;
4393 22 : change->handled = true;
4394 22 : return ok;
4395 : }
4396 :
4397 86656 : lock_table(tr->store, t->base.id);
4398 86656 : if (!commit_ts) { /* rollback */
4399 4148 : if (dbat->cs.ts == tr->tid) {
4400 6 : if (change->ts && t->base.new) { /* handled by the create table */
4401 3 : unlock_table(tr->store, t->base.id);
4402 3 : return ok;
4403 : }
4404 3 : storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
4405 :
4406 3 : if (o != d) {
4407 0 : while(o && o->next != d)
4408 : o = o->next;
4409 : }
4410 3 : if (o == ATOMIC_PTR_GET(&t->data)) {
4411 3 : assert(d->next);
4412 3 : ATOMIC_PTR_SET(&t->data, d->next);
4413 : } else
4414 0 : o->next = d->next;
4415 3 : d->next = NULL;
4416 3 : change->cleanup = &tc_gc_rollbacked_storage;
4417 : } else
4418 4142 : rollback_segments(dbat->segs, tr, change, oldest);
4419 82508 : } else if (ok == LOG_OK && !tr->parent) {
4420 82482 : if (dbat->cs.ts == tr->tid) /* cleared table */
4421 25967 : dbat->cs.ts = commit_ts;
4422 :
4423 82482 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
4424 82482 : if (ok == LOG_OK) {
4425 82482 : merge_segments(dbat, tr, change, commit_ts, oldest);
4426 82482 : if (oldest == commit_ts)
4427 39863 : merge_storage(dbat);
4428 : }
4429 82482 : if (dbat)
4430 82482 : dbat->cs.cleared = false;
4431 26 : } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
4432 26 : merge_segments(dbat, tr, change, commit_ts, oldest);
4433 26 : ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
4434 26 : storage *s = change->data;
4435 26 : if (s->cs.ts == tr->tid)
4436 0 : s->cs.ts = commit_ts;
4437 : }
4438 86653 : unlock_table(tr->store, t->base.id);
4439 86653 : return ok;
4440 : }
4441 :
4442 : /* only rollback (content version) case for now */
4443 : static int
4444 215 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
4445 : {
4446 215 : sqlstore *store = Store;
4447 215 : sql_column *c = (sql_column*)change->obj;
4448 :
4449 215 : if (!c) /* cleaned earlier */
4450 : return 1;
4451 :
4452 160 : if (change->handled || isDeleted(c->t)) {
4453 0 : column_destroy(store, c);
4454 0 : return 1;
4455 : }
4456 :
4457 : /* savepoint commit (did it merge ?) */
4458 160 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4459 0 : column_destroy(store, c);
4460 0 : return 1;
4461 : }
4462 160 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4463 : return 0;
4464 159 : sql_delta *d = (sql_delta*)change->data;
4465 159 : if (d && d->next) {
4466 :
4467 62 : if (d->cs.ts > oldest)
4468 : return LOG_OK; /* cannot cleanup yet */
4469 :
4470 : // d is oldest reachable delta
4471 59 : if (d->cs.merged && d->next) // Unreachable can immediately be destroyed.
4472 58 : destroy_delta(d->next, true);
4473 :
4474 59 : d->next = NULL;
4475 59 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4476 59 : merge_delta(d);
4477 59 : unlock_column(store, c->base.id);
4478 : }
4479 156 : column_destroy(store, c);
4480 156 : return 1;
4481 : }
4482 :
4483 : static int
4484 861345 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
4485 : {
4486 861345 : sqlstore *store = Store;
4487 861345 : sql_column *c = (sql_column*)change->obj;
4488 :
4489 861345 : if (!c) /* cleaned earlier */
4490 : return 1;
4491 :
4492 861345 : if (change->handled || isDeleted(c->t)) {
4493 3 : table_destroy(store, c->t);
4494 3 : return 1;
4495 : }
4496 :
4497 : /* savepoint commit (did it merge ?) */
4498 861342 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4499 69635 : table_destroy(store, c->t);
4500 69635 : return 1;
4501 : }
4502 791707 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4503 : return 0;
4504 791706 : sql_delta *d = (sql_delta*)change->data;
4505 791706 : if (d && d->next) {
4506 :
4507 791706 : if (d->cs.ts > oldest)
4508 : return LOG_OK; /* cannot cleanup yet */
4509 :
4510 : // d is oldest reachable delta
4511 63047 : if (d->cs.merged && d->next) // Unreachable can immediately be destroyed.
4512 5245 : destroy_delta(d->next, true);
4513 :
4514 63047 : d->next = NULL;
4515 63047 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4516 63047 : merge_delta(d);
4517 63047 : unlock_column(store, c->base.id);
4518 : }
4519 63047 : table_destroy(store, c->t);
4520 63047 : return 1;
4521 : }
4522 :
4523 : static int
4524 1133 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
4525 : {
4526 1133 : sqlstore *store = Store;
4527 1133 : sql_idx *i = (sql_idx*)change->obj;
4528 :
4529 1133 : if (!i) /* cleaned earlier */
4530 : return 1;
4531 :
4532 632 : if (change->handled || isDeleted(i->t)) {
4533 0 : idx_destroy(store, i);
4534 0 : return 1;
4535 : }
4536 :
4537 : /* savepoint commit (did it merge ?) */
4538 632 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4539 0 : idx_destroy(store, i);
4540 0 : return 1;
4541 : }
4542 632 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4543 : return 0;
4544 632 : sql_delta *d = (sql_delta*)change->data;
4545 632 : if (d->next) {
4546 :
4547 0 : if (d->cs.ts > oldest)
4548 : return LOG_OK; /* cannot cleanup yet */
4549 :
4550 : // d is oldest reachable delta
4551 0 : if (d->cs.merged && d->next) // Unreachable can immediately be destroyed.
4552 0 : destroy_delta(d->next, true);
4553 :
4554 0 : d->next = NULL;
4555 0 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4556 0 : merge_delta(d);
4557 0 : unlock_column(store, i->base.id);
4558 : }
4559 632 : idx_destroy(store, i);
4560 632 : return 1;
4561 : }
4562 :
4563 : static int
4564 26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
4565 : {
4566 26 : sqlstore *store = Store;
4567 26 : sql_idx *i = (sql_idx*)change->obj;
4568 :
4569 26 : if (!i) /* cleaned earlier */
4570 : return 1;
4571 :
4572 26 : if (change->handled || isDeleted(i->t)) {
4573 0 : table_destroy(store, i->t);
4574 0 : return 1;
4575 : }
4576 :
4577 : /* savepoint commit (did it merge ?) */
4578 26 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4579 0 : table_destroy(store, i->t);
4580 0 : return 1;
4581 : }
4582 26 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4583 : return 0;
4584 26 : sql_delta *d = (sql_delta*)change->data;
4585 26 : if (d->next) {
4586 :
4587 26 : if (d->cs.ts > oldest)
4588 : return LOG_OK; /* cannot cleanup yet */
4589 :
4590 : // d is oldest reachable delta
4591 26 : if (d->cs.merged && d->next) // Unreachable can immediately be destroyed.
4592 26 : destroy_delta(d->next, true);
4593 :
4594 26 : d->next = NULL;
4595 26 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4596 26 : merge_delta(d);
4597 26 : unlock_column(store, i->base.id);
4598 : }
4599 26 : table_destroy(store, i->t);
4600 26 : return 1;
4601 : }
4602 :
4603 : static int
4604 242883 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
4605 : {
4606 242883 : sqlstore *store = Store;
4607 242883 : sql_table *t = (sql_table*)change->obj;
4608 :
4609 242883 : if (change->handled || isDeleted(t)) {
4610 3333 : table_destroy(store, t);
4611 3333 : return 1;
4612 : }
4613 : /* savepoint commit (did it merge ?) */
4614 239550 : if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
4615 13957 : table_destroy(store, t);
4616 13957 : return 1;
4617 : }
4618 225593 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4619 : return 0;
4620 225565 : storage *d = (storage*)change->data;
4621 225565 : if (d->next) {
4622 156474 : if (d->cs.ts > oldest)
4623 : return LOG_OK; /* cannot cleanup yet */
4624 :
4625 11530 : destroy_storage(d->next);
4626 11530 : d->next = NULL;
4627 : }
4628 80621 : table_destroy(store, t);
4629 80621 : return 1;
4630 : }
4631 :
4632 : static int
4633 30308 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
4634 : {
4635 30308 : if (nr == 0)
4636 : return LOG_OK;
4637 30308 : assert (nr > 0);
4638 30308 : if ((!offsets || !*offsets) && nr == total) {
4639 30279 : *offset = slot;
4640 30279 : return LOG_OK;
4641 : }
4642 29 : if (!*offsets) {
4643 7 : *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
4644 7 : if (!*offsets)
4645 : return LOG_ERR;
4646 : }
4647 29 : oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
4648 16168 : for(size_t i = 0; i < nr; i++)
4649 16139 : dst[i] = slot + i;
4650 29 : (*offsets)->batCount += nr;
4651 29 : (*offsets)->theap->dirty = true;
4652 29 : return LOG_OK;
4653 : }
4654 :
4655 : static int
4656 30286 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4657 : {
4658 30286 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4659 30286 : assert(s->segs);
4660 30286 : ulng oldest = store_oldest(tr->store, NULL);
4661 30286 : BUN slot = 0;
4662 30286 : size_t total = cnt;
4663 :
4664 30286 : if (!locked)
4665 30286 : lock_table(tr->store, t->base.id);
4666 : /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
4667 60694 : for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4668 30410 : if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* re-use old deleted or rolledback append */
4669 35 : if ((seg->end - seg->start) >= cnt) {
4670 : /* if previous is claimed before we could simply adjust the end/start */
4671 13 : if (p && p->ts == tr->tid && !p->deleted) {
4672 2 : slot = p->end;
4673 2 : p->end += cnt;
4674 2 : seg->start += cnt;
4675 2 : if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
4676 : ok = LOG_ERR;
4677 : break;
4678 : }
4679 2 : cnt = 0;
4680 2 : break;
4681 : }
4682 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4683 11 : size_t rcnt = seg->end - seg->start;
4684 11 : if (rcnt > cnt)
4685 : rcnt = cnt;
4686 11 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
4687 : ok = LOG_ERR;
4688 : break;
4689 : }
4690 : }
4691 33 : seg->ts = tr->tid;
4692 33 : seg->deleted = false;
4693 33 : slot = seg->start;
4694 33 : if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
4695 : ok = LOG_ERR;
4696 : break;
4697 : }
4698 33 : cnt -= (seg->end - seg->start);
4699 : }
4700 : }
4701 30286 : if (ok == LOG_OK && cnt) {
4702 30273 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4703 29177 : slot = s->segs->t->end;
4704 29177 : s->segs->t->end += cnt;
4705 : } else {
4706 1096 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4707 : ok = LOG_ERR;
4708 : } else {
4709 1096 : if (!s->segs->h)
4710 0 : s->segs->h = s->segs->t;
4711 1096 : slot = s->segs->t->start;
4712 : }
4713 : }
4714 30273 : if (ok == LOG_OK)
4715 30273 : ok = add_offsets(slot, cnt, total, offset, offsets);
4716 : }
4717 30286 : if (!locked)
4718 30286 : unlock_table(tr->store, t->base.id);
4719 :
4720 30286 : if (ok == LOG_OK) {
4721 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4722 30286 : if (!in_transaction) {
4723 1088 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4724 1088 : in_transaction = true;
4725 : }
4726 30286 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4727 30272 : tr->logchanges += (int) total;
4728 30286 : if (*offsets) {
4729 7 : BAT *pos = *offsets;
4730 7 : assert(BATcount(pos) == total);
4731 7 : BATsetcount(pos, total); /* set other properties */
4732 7 : pos->tnil = false;
4733 7 : pos->tnonil = true;
4734 7 : pos->tkey = true;
4735 7 : pos->tsorted = true;
4736 7 : pos->trevsorted = false;
4737 : }
4738 : }
4739 30286 : return ok;
4740 : }
4741 :
4742 : static int
4743 2097182 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4744 : {
4745 2097182 : if (cnt > 1 && offsets)
4746 30286 : return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
4747 2066896 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4748 2066896 : assert(s->segs);
4749 2066896 : ulng oldest = store_oldest(tr->store, NULL);
4750 2066896 : BUN slot = 0;
4751 2066896 : int reused = 0;
4752 :
4753 2066896 : if (!locked)
4754 1951743 : lock_table(tr->store, t->base.id);
4755 : /* naive vacuum approach, iterator through segments, check for large enough deleted segments
4756 : * or create new segment at the end */
4757 7001137 : for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4758 5000512 : if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* re-use old deleted or rolledback append */
4759 :
4760 66271 : if ((seg->end - seg->start) >= cnt) {
4761 :
4762 : /* if previous is claimed before we could simply adjust the end/start */
4763 66271 : if (p && p->ts == tr->tid && !p->deleted) {
4764 51892 : slot = p->end;
4765 51892 : p->end += cnt;
4766 51892 : seg->start += cnt;
4767 51892 : reused = 1;
4768 51892 : break;
4769 : }
4770 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4771 14379 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
4772 : ok = LOG_ERR;
4773 : break;
4774 : }
4775 : }
4776 14379 : seg->ts = tr->tid;
4777 14379 : seg->deleted = false;
4778 14379 : slot = seg->start;
4779 14379 : reused = 1;
4780 14379 : break;
4781 : }
4782 : }
4783 2066896 : if (ok == LOG_OK && !reused) {
4784 2000625 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4785 1966473 : slot = s->segs->t->end;
4786 1966473 : s->segs->t->end += cnt;
4787 : } else {
4788 34152 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4789 : ok = LOG_ERR;
4790 : } else {
4791 34152 : if (!s->segs->h)
4792 0 : s->segs->h = s->segs->t;
4793 34152 : slot = s->segs->t->start;
4794 : }
4795 : }
4796 : }
4797 2066896 : if (!locked)
4798 1951743 : unlock_table(tr->store, t->base.id);
4799 :
4800 2066896 : if (ok == LOG_OK) {
4801 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4802 2066896 : if (!in_transaction) {
4803 47381 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4804 47381 : in_transaction = true;
4805 : }
4806 2066896 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4807 2066469 : tr->logchanges += (int) cnt;
4808 2066896 : *offset = slot;
4809 : }
4810 : return ok;
4811 : }
4812 :
4813 : /*
4814 : * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
4815 : * of the global transaction and mark the newly added storage slots unused on the global
4816 : * level but used on the local transaction level. Besides this the local transaction needs
4817 : * to update (and mark unused) any slot inbetween the old end and new slots.
4818 : * */
4819 : static int
4820 1982029 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4821 : {
4822 1982029 : storage *s;
4823 :
4824 : /* we have a single segment structure for each persistent table
4825 : * for temporary tables each has its own */
4826 1982029 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4827 : return LOG_ERR;
4828 :
4829 1982029 : return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
4830 : }
4831 :
4832 : /* some tables cannot be updated concurrently (user/roles etc) */
4833 : static int
4834 115157 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4835 : {
4836 115157 : storage *s;
4837 115157 : int res = 0;
4838 :
4839 : /* we have a single segment structure for each persistent table
4840 : * for temporary tables each has its own */
4841 115157 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4842 : /* TODO check for other inserts ! */
4843 : return LOG_ERR;
4844 :
4845 115157 : lock_table(tr->store, t->base.id);
4846 115157 : if ((res = segments_conflict(tr, s->segs, 1))) {
4847 4 : unlock_table(tr->store, t->base.id);
4848 4 : return LOG_CONFLICT;
4849 : }
4850 115153 : res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
4851 115153 : unlock_table(tr->store, t->base.id);
4852 115153 : return res;
4853 : }
4854 :
4855 : static int
4856 13563 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
4857 : {
4858 13563 : storage *s;
4859 13563 : int res = 0;
4860 :
4861 13563 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4862 : return LOG_ERR;
4863 :
4864 13563 : lock_table(tr->store, t->base.id);
4865 13563 : res = segments_conflict(tr, s->segs, uncommitted);
4866 13563 : unlock_table(tr->store, t->base.id);
4867 13563 : return res ? LOG_CONFLICT : LOG_OK;
4868 : }
4869 :
4870 : static size_t
4871 1315904 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
4872 : {
4873 1315904 : size_t cnt = 0;
4874 :
4875 1352318 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
4876 : ;
4877 :
4878 3304831 : for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
4879 1988926 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
4880 97950 : cnt += s->end - s->start;
4881 : }
4882 1315905 : return cnt;
4883 : }
4884 :
4885 : static BAT *
4886 1315905 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
4887 : {
4888 1315905 : lock_table(tr->store, t->base.id);
4889 1315904 : segment *s = S->segs->h;
4890 : /* step one no deletes -> dense range */
4891 1315904 : uint32_t cur = 0;
4892 1315904 : size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
4893 1315902 : if (!dnr) {
4894 1227484 : unlock_table(tr->store, t->base.id);
4895 1227491 : return BATdense(start, start, end-start);
4896 : }
4897 :
4898 88418 : BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
4899 88418 : if (!b) {
4900 0 : unlock_table(tr->store, t->base.id);
4901 0 : return NULL;
4902 : }
4903 :
4904 88418 : uint32_t *restrict dst = Tloc(b, 0);
4905 57204414 : for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
4906 57170407 : if (s->end < start)
4907 17035 : continue;
4908 57153372 : if (s->start >= end)
4909 : break;
4910 57098961 : msk m = (SEG_IS_VALID(s, tr));
4911 57098961 : size_t lnr = s->end-s->start;
4912 57098961 : if (s->start < start)
4913 4575 : lnr -= (start - s->start);
4914 57098961 : if (s->end > end)
4915 3490 : lnr -= s->end - end;
4916 :
4917 57098961 : if (m) {
4918 617920 : size_t used = pos&31, end = 32;
4919 617920 : if (used) {
4920 515419 : if (lnr < (32-used))
4921 305932 : end = used + lnr;
4922 515419 : assert(end > used);
4923 515419 : cur |= ((1U << (end - used)) - 1) << used;
4924 515419 : lnr -= end - used;
4925 515419 : pos += end - used;
4926 515419 : if (end == 32) {
4927 209487 : *dst++ = cur;
4928 209487 : cur = 0;
4929 : }
4930 : }
4931 617920 : size_t full = lnr/32;
4932 617920 : size_t rest = lnr%32;
4933 617920 : if (full > 0) {
4934 196247 : memset(dst, ~0, full * sizeof(*dst));
4935 196247 : dst += full;
4936 196247 : lnr -= full * 32;
4937 196247 : pos += full * 32;
4938 : }
4939 617920 : if (rest > 0) {
4940 296271 : cur |= (1U << rest) - 1;
4941 296271 : lnr -= rest;
4942 296271 : pos += rest;
4943 : }
4944 617920 : assert(lnr==0);
4945 : } else {
4946 56481041 : size_t used = pos&31, end = 32;
4947 56481041 : if (used) {
4948 54712384 : if (lnr < (32-used))
4949 52881192 : end = used + lnr;
4950 :
4951 54712384 : pos+= (end-used);
4952 54712384 : lnr-= (end-used);
4953 54712384 : if (end == 32) {
4954 1831192 : *dst++ = cur;
4955 1831192 : cur = 0;
4956 : }
4957 : }
4958 56481041 : size_t full = lnr/32;
4959 56481041 : size_t rest = lnr%32;
4960 56481041 : memset(dst, 0, full * sizeof(*dst));
4961 56481041 : dst += full;
4962 56481041 : lnr -= full * 32;
4963 56481041 : pos += full * 32;
4964 56481041 : pos+= rest;
4965 56481041 : lnr-= rest;
4966 56481041 : assert(lnr==0);
4967 : }
4968 : }
4969 :
4970 88418 : unlock_table(tr->store, t->base.id);
4971 88418 : if (pos%32)
4972 83945 : *dst=cur;
4973 88418 : BATsetcount(b, nr);
4974 88418 : bn = BATmaskedcands(start, nr, b, true);
4975 88418 : BBPreclaim(b);
4976 88418 : (void)pos;
4977 88418 : assert (pos == nr);
4978 : return bn;
4979 : }
4980 :
4981 : static void * /* BAT * */
4982 1323016 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
4983 : {
4984 : /* with nr_of_parts - part_nr we can adjust parts */
4985 1323016 : storage *s = tab_timestamp_storage(tr, t);
4986 :
4987 1323006 : if (!s)
4988 : return NULL;
4989 1323006 : size_t nr = segs_end(s->segs, tr, t);
4990 :
4991 1322988 : if (!nr)
4992 7084 : return BATdense(0, 0, 0);
4993 :
4994 : /* compute proper part */
4995 1315904 : size_t part_size = nr/nr_of_parts;
4996 1315904 : size_t start = part_size * part_nr;
4997 1315904 : size_t end = start + part_size;
4998 1315904 : if (part_nr == (nr_of_parts-1))
4999 1246826 : end = nr;
5000 1315904 : assert(end <= nr);
5001 1315904 : return segments2cands(s, tr, t, start, end);
5002 : }
5003 :
5004 : static int
5005 1 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
5006 : {
5007 1 : bool update_conflict = false;
5008 :
5009 1 : if (segments_in_transaction(tr, col->t))
5010 : return LOG_CONFLICT;
5011 :
5012 1 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5013 :
5014 1 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5015 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5016 1 : assert(d && d->cs.ts == tr->tid);
5017 1 : if (odelta != d)
5018 1 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
5019 1 : if (d->cs.bid)
5020 1 : temp_destroy(d->cs.bid);
5021 1 : if (d->cs.uibid)
5022 1 : temp_destroy(d->cs.uibid);
5023 1 : if (d->cs.uvbid)
5024 1 : temp_destroy(d->cs.uvbid);
5025 1 : bat_set_access(bn, BAT_READ);
5026 1 : d->cs.bid = temp_create(bn);
5027 1 : d->cs.uibid = 0;
5028 1 : d->cs.uvbid = 0;
5029 1 : d->cs.ucnt = 0;
5030 1 : d->cs.cleared = true;
5031 1 : d->cs.ts = tr->tid;
5032 1 : ATOMIC_INIT(&d->cs.refcnt, 1);
5033 1 : return LOG_OK;
5034 : }
5035 :
5036 : static int
5037 58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
5038 : {
5039 58 : bool update_conflict = false;
5040 :
5041 58 : if (segments_in_transaction(tr, col->t))
5042 : return LOG_CONFLICT;
5043 :
5044 58 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5045 :
5046 58 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5047 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5048 58 : assert(d && d->cs.ts == tr->tid);
5049 58 : assert(col->t->persistence != SQL_DECLARED_TABLE);
5050 58 : if (odelta != d)
5051 58 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
5052 :
5053 58 : d->cs.st = st;
5054 58 : d->cs.cleared = true;
5055 58 : if (d->cs.bid)
5056 58 : temp_destroy(d->cs.bid);
5057 58 : o = transfer_to_systrans(o);
5058 58 : if (o == NULL)
5059 : return LOG_ERR;
5060 58 : bat_set_access(o, BAT_READ);
5061 58 : d->cs.bid = temp_create(o);
5062 58 : if (u) {
5063 53 : if (d->cs.ebid)
5064 0 : temp_destroy(d->cs.ebid);
5065 53 : u = transfer_to_systrans(u);
5066 53 : if (u == NULL)
5067 : return LOG_ERR;
5068 53 : d->cs.ebid = temp_create(u);
5069 : }
5070 : return LOG_OK;
5071 : }
5072 :
5073 : void
5074 336 : bat_storage_init( store_functions *sf)
5075 : {
5076 336 : sf->bind_col = &bind_col;
5077 336 : sf->bind_updates = &bind_updates;
5078 336 : sf->bind_updates_idx = &bind_updates_idx;
5079 336 : sf->bind_idx = &bind_idx;
5080 336 : sf->bind_cands = &bind_cands;
5081 :
5082 336 : sf->claim_tab = &claim_tab;
5083 336 : sf->key_claim_tab = &key_claim_tab;
5084 336 : sf->tab_validate = &tab_validate;
5085 :
5086 336 : sf->append_col = &append_col;
5087 336 : sf->append_idx = &append_idx;
5088 :
5089 336 : sf->update_col = &update_col;
5090 336 : sf->update_idx = &update_idx;
5091 :
5092 336 : sf->delete_tab = &delete_tab;
5093 :
5094 336 : sf->count_del = &count_del;
5095 336 : sf->count_col = &count_col;
5096 336 : sf->count_idx = &count_idx;
5097 336 : sf->dcount_col = &dcount_col;
5098 336 : sf->min_max_col = &min_max_col;
5099 336 : sf->set_stats_col = &set_stats_col;
5100 336 : sf->sorted_col = &sorted_col;
5101 336 : sf->unique_col = &unique_col;
5102 336 : sf->double_elim_col = &double_elim_col;
5103 336 : sf->col_stats = &col_stats;
5104 336 : sf->col_set_range = &col_set_range;
5105 336 : sf->col_not_null = &col_not_null;
5106 :
5107 336 : sf->col_dup = &col_dup;
5108 336 : sf->idx_dup = &idx_dup;
5109 336 : sf->del_dup = &del_dup;
5110 :
5111 336 : sf->create_col = &create_col; /* create and add to change list */
5112 336 : sf->create_idx = &create_idx;
5113 336 : sf->create_del = &create_del;
5114 :
5115 336 : sf->destroy_col = &destroy_col; /* free resources */
5116 336 : sf->destroy_idx = &destroy_idx;
5117 336 : sf->destroy_del = &destroy_del;
5118 :
5119 336 : sf->drop_col = &drop_col; /* add drop to change list */
5120 336 : sf->drop_idx = &drop_idx;
5121 336 : sf->drop_del = &drop_del;
5122 :
5123 336 : sf->clear_table = &clear_table;
5124 :
5125 336 : sf->swap_bats = &swap_bats;
5126 336 : sf->col_compress = &col_compress;
5127 336 : }
5128 :
5129 : #if 0
5130 : static lng
5131 : log_get_nr_inserted(sql_column *fc, lng *offset)
5132 : {
5133 : lng cnt = 0;
5134 :
5135 : if (!fc || GDKinmemory(0))
5136 : return 0;
5137 :
5138 : if (fc->base.atime && fc->base.allocated) {
5139 : sql_delta *fb = fc->data;
5140 : BAT *ins = temp_descriptor(fb->cs.bid);
5141 :
5142 : if (ins && BATcount(ins) > 0 && BATcount(ins) > ins->batInserted) {
5143 : cnt = BATcount(ins) - ins->batInserted;
5144 : }
5145 : bat_destroy(ins);
5146 : }
5147 : return cnt;
5148 : }
5149 :
5150 : static lng
5151 : log_get_nr_deleted(sql_table *ft, lng *offset)
5152 : {
5153 : lng cnt = 0;
5154 :
5155 : if (!ft || GDKinmemory(0))
5156 : return 0;
5157 :
5158 : if (ft->base.atime && ft->base.allocated) {
5159 : storage *fdb = ft->data;
5160 : BAT *db = temp_descriptor(fdb->cs.bid);
5161 :
5162 : if (db && BATcount(db) > 0 && BATcount(db) > db->batInserted) {
5163 : cnt = BATcount(db) - db->batInserted;
5164 : *offset = db->batInserted;
5165 : }
5166 : bat_destroy(db);
5167 : }
5168 : return cnt;
5169 : }
5170 : #endif
|