Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "bat_storage.h"
15 : #include "bat_utils.h"
16 : #include "sql_string.h"
17 : #include "gdk_atoms.h"
18 : #include "gdk_atoms.h"
19 : #include "matomic.h"
20 :
21 : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
22 : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
23 :
24 : static int log_update_col( sql_trans *tr, sql_change *c);
25 : static int log_update_idx( sql_trans *tr, sql_change *c);
26 : static int log_update_del( sql_trans *tr, sql_change *c);
27 : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
28 : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
29 : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
30 : static int log_create_col(sql_trans *tr, sql_change *change);
31 : static int log_create_idx(sql_trans *tr, sql_change *change);
32 : static int log_create_del(sql_trans *tr, sql_change *change);
33 : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
34 : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
35 : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
36 : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
37 : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
38 : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
39 : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
40 : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
41 :
42 : static lng merge_delta( sql_delta *obat);
43 :
44 : /* valid
45 : * !deleted && VALID_4_READ(TS, tr) existing or newly created segment
46 : * deleted && TS > tr->ts && OLDTS < tr->ts deleted after current transaction
47 : */
48 :
49 : #define VALID_4_READ(TS,tr) \
50 : (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
51 :
52 : /* when changed, check if the old status is still valid */
53 : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
54 : (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
55 :
56 : #define SEG_VALID_4_DELETE(seg,tr) \
57 : (!seg->deleted && VALID_4_READ(seg->ts, tr))
58 :
59 : /* Delete (in current trans or by some other finished transaction, or re-used segment which used to be deleted */
60 : #define SEG_IS_DELETED(seg,tr) \
61 : ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
62 : (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
63 :
64 : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
65 : #define SEG_IS_VALID(seg, tr) \
66 : ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
67 : (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
68 :
69 : static inline BAT *
70 5123 : transfer_to_systrans(BAT *b)
71 : {
72 : /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
73 5123 : MT_lock_set(&b->theaplock);
74 5123 : if (VIEWtparent(b) || VIEWvtparent(b)) {
75 17 : MT_lock_unset(&b->theaplock);
76 17 : BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
77 17 : BBPreclaim(b);
78 17 : return bn;
79 : }
80 5106 : if (b->theap->farmid == TRANSIENT ||
81 14 : (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
82 4817 : QryCtx *qc = MT_thread_get_qry_ctx();
83 4817 : if (qc) {
84 2523 : if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
85 2523 : ATOMIC_SUB(&qc->datasize, b->theap->size);
86 2523 : b->theap->farmid = SYSTRANS;
87 2523 : b->batRole = SYSTRANS;
88 : }
89 2523 : if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
90 1092 : ATOMIC_SUB(&qc->datasize, b->tvheap->size);
91 1092 : b->tvheap->farmid = SYSTRANS;
92 : }
93 : }
94 : }
95 5106 : MT_lock_unset(&b->theaplock);
96 5106 : return b;
97 : }
98 :
99 : static void
100 25943532 : lock_table(sqlstore *store, sqlid id)
101 : {
102 25943532 : MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
103 25946993 : }
104 :
105 : static void
106 25946979 : unlock_table(sqlstore *store, sqlid id)
107 : {
108 25946979 : MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
109 25947852 : }
110 :
111 : static void
112 20058427 : lock_column(sqlstore *store, sqlid id)
113 : {
114 20058427 : MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
115 20058025 : }
116 :
117 : static void
118 20058325 : unlock_column(sqlstore *store, sqlid id)
119 : {
120 20058325 : MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
121 20058416 : }
122 :
123 : static void
124 114057 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
125 : {
126 114057 : assert(cleanup);
127 114057 : trans_add(tr, dup_base(b), data, cleanup, commit, log);
128 114057 : }
129 :
130 : static void
131 132844 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
132 : {
133 132844 : assert(cleanup);
134 132844 : dup_base(&t->base);
135 132842 : trans_add(tr, b, data, cleanup, commit, log);
136 132833 : }
137 :
138 : static int
139 91076 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
140 : {
141 91076 : segment *s = change->data;
142 :
143 91076 : if (s->ts <= oldest) {
144 32960 : while(s) {
145 21150 : segment *n = s->prev;
146 21150 : ATOMIC_PTR_DESTROY(&s->next);
147 21150 : _DELETE(s);
148 21150 : s = n;
149 : }
150 11810 : sqlstore *store = Store;
151 11810 : table_destroy(store, (sql_table*)change->obj);
152 11810 : return 1;
153 : }
154 : return LOG_OK;
155 : }
156 :
157 : static void
158 21150 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
159 : {
160 : /* we can only be accessed by anything older then commit_ts */
161 21150 : if (c->cleanup == &tc_gc_seg)
162 9340 : s->prev = c->data;
163 : else
164 11810 : c->cleanup = &tc_gc_seg;
165 21150 : c->data = s;
166 21150 : s->ts = commit_ts;
167 16730 : }
168 :
169 : static segment *
170 87627 : new_segment(segment *o, sql_trans *tr, size_t cnt)
171 : {
172 87627 : segment *n = (segment*)GDKmalloc(sizeof(segment));
173 :
174 87627 : assert(tr);
175 87627 : if (n) {
176 87627 : *n = (segment) {
177 87627 : .ts = tr->tid,
178 : .oldts = 0,
179 : .deleted = false,
180 : .start = 0,
181 : .end = cnt,
182 : .next = ATOMIC_PTR_VAR_INIT(NULL),
183 : .prev = NULL,
184 : };
185 87627 : if (o) {
186 36180 : n->start += o->end;
187 36180 : n->end += o->end;
188 36180 : ATOMIC_PTR_SET(&o->next, n);
189 : }
190 : }
191 87627 : return n;
192 : }
193 :
194 : static segment *
195 88289 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
196 : {
197 88289 : assert(tr);
198 88289 : if (o->start == start && o->end == start+cnt) {
199 10189 : assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
200 10189 : o->oldts = o->ts;
201 10189 : o->ts = tr->tid;
202 10189 : o->deleted = deleted;
203 10189 : return o;
204 : }
205 78100 : segment *n = (segment*)GDKmalloc(sizeof(segment));
206 :
207 78100 : if (!n)
208 : return NULL;
209 78100 : n->prev = NULL;
210 :
211 78100 : if (o->ts == tr->tid) {
212 3875 : n->oldts = 0;
213 3875 : n->ts = 1;
214 3875 : n->deleted = true;
215 : } else {
216 74225 : n->oldts = o->ts;
217 74225 : n->ts = tr->tid;
218 74225 : n->deleted = deleted;
219 : }
220 78100 : if (start == o->start) {
221 : /* 2-way split: o remains latter part of segment, new one is
222 : * inserted before */
223 62582 : n->start = o->start;
224 62582 : n->end = n->start + cnt;
225 62582 : ATOMIC_PTR_INIT(&n->next, o);
226 62582 : if (segs->h == o)
227 454 : segs->h = n;
228 62582 : if (p)
229 62128 : ATOMIC_PTR_SET(&p->next, n);
230 62582 : o->start = n->end;
231 15518 : } else if (start+cnt == o->end) {
232 : /* 2-way split: o remains first part of segment, new one is
233 : * added after */
234 5628 : n->start = o->end - cnt;
235 5628 : n->end = o->end;
236 5628 : ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
237 5628 : ATOMIC_PTR_SET(&o->next, n);
238 5628 : if (segs->t == o)
239 841 : segs->t = n;
240 5628 : o->end = n->start;
241 : } else {
242 : /* 3-way split: o remains first part of segment, two new ones
243 : * are added after */
244 9890 : segment *n2 = GDKmalloc(sizeof(segment));
245 9890 : if (n2 == NULL) {
246 0 : GDKfree(n);
247 0 : return NULL;
248 : }
249 9890 : ATOMIC_PTR_INIT(&n->next, n2);
250 9890 : n->start = start;
251 9890 : n->end = start + cnt;
252 9890 : *n2 = *o;
253 9890 : ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
254 9890 : n2->start = n->end;
255 9890 : n2->prev = NULL;
256 9890 : if (segs->t == o)
257 4079 : segs->t = n2;
258 9890 : ATOMIC_PTR_SET(&o->next, n);
259 9890 : o->end = start;
260 : }
261 : return n;
262 : }
263 :
264 : static void
265 4201 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
266 : {
267 4201 : segment *cur = segs->h, *seg = NULL;
268 18148 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
269 13947 : if (cur->ts == tr->tid) { /* revert */
270 4698 : cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
271 4698 : cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
272 4698 : cur->oldts = 0;
273 : }
274 13947 : if (cur->ts <= oldest) { /* possibly merge range */
275 13074 : if (!seg) { /* skip first */
276 : seg = cur;
277 8873 : } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
278 : /* merge with previous */
279 4420 : seg->end = cur->end;
280 4420 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
281 4420 : if (cur == segs->t)
282 2863 : segs->t = seg;
283 4420 : mark4destroy(cur, change, store_get_timestamp(tr->store));
284 4420 : cur = seg;
285 : } else {
286 : seg = cur; /* begin of new merge */
287 : }
288 : }
289 : }
290 4201 : }
291 :
292 : static size_t
293 103735 : segs_end_include_deleted( segments *segs, sql_trans *tr)
294 : {
295 103735 : size_t cnt = 0;
296 103735 : segment *s = segs->h, *l = NULL;
297 :
298 528373 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
299 424638 : if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
300 : l = s;
301 : }
302 103735 : if (l)
303 103728 : cnt = l->end;
304 103735 : return cnt;
305 : }
306 :
307 : static int
308 103735 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
309 : {
310 : /* set bits correctly */
311 103735 : BAT *b = temp_descriptor(cs->bid);
312 :
313 103735 : if (!b)
314 : return LOG_ERR;
315 103735 : segment *s = segs->h;
316 :
317 103735 : size_t nr = segs_end_include_deleted(segs, tr);
318 103735 : size_t rounded_nr = ((nr+31)&~31);
319 103735 : if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
320 0 : bat_destroy(b);
321 0 : return LOG_ERR;
322 : }
323 :
324 : /* disable all properties here */
325 103735 : MT_lock_set(&b->theaplock);
326 103735 : b->tsorted = false;
327 103735 : b->trevsorted = false;
328 103735 : b->tnosorted = 0;
329 103735 : b->tnorevsorted = 0;
330 103735 : b->tseqbase = oid_nil;
331 103735 : b->tkey = false;
332 103735 : b->tnokey[0] = 0;
333 103735 : b->tnokey[1] = 0;
334 103735 : b->theap->dirty = true;
335 103735 : BUN cnt = BATcount(b);
336 :
337 103735 : uint32_t *restrict dst;
338 457392 : for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
339 398936 : if (s->start >= nr)
340 : break;
341 353657 : if (s->ts == tr->tid && s->end != s->start) {
342 145967 : if (cnt < s->start) { /* first mark as deleted ! */
343 3974 : size_t lnr = s->start-cnt;
344 3974 : size_t pos = cnt;
345 3974 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
346 3974 : uint32_t cur = 0;
347 :
348 3974 : size_t used = pos&31, end = 32;
349 3974 : if (used) {
350 3864 : if (lnr < (32-used))
351 3647 : end = used + lnr;
352 3864 : assert(end > used);
353 3864 : cur |= ((1U << (end - used)) - 1) << used;
354 3864 : lnr -= end - used;
355 3864 : *dst++ |= cur;
356 3864 : cur = 0;
357 : }
358 3974 : size_t full = lnr/32;
359 3974 : size_t rest = lnr%32;
360 3974 : if (full > 0) {
361 7 : memset(dst, ~0, full * sizeof(*dst));
362 7 : dst += full;
363 7 : lnr -= full * 32;
364 : }
365 3974 : if (rest > 0) {
366 192 : cur |= (1U << rest) - 1;
367 192 : lnr -= rest;
368 192 : *dst |= cur;
369 : }
370 3974 : assert(lnr==0);
371 : }
372 145967 : size_t lnr = s->end-s->start;
373 145967 : size_t pos = s->start;
374 145967 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
375 145967 : uint32_t cur = 0;
376 145967 : size_t used = pos&31, end = 32;
377 145967 : if (used) {
378 108217 : if (lnr < (32-used))
379 102990 : end = used + lnr;
380 108217 : assert(end > used);
381 108217 : cur |= ((1U << (end - used)) - 1) << used;
382 108217 : lnr -= end - used;
383 108217 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
384 108217 : dst++;
385 108217 : cur = 0;
386 : }
387 145967 : size_t full = lnr/32;
388 145967 : size_t rest = lnr%32;
389 145967 : if (full > 0) {
390 3552 : memset(dst, s->deleted?~0:0, full * sizeof(*dst));
391 3552 : dst += full;
392 3552 : lnr -= full * 32;
393 : }
394 145967 : if (rest > 0) {
395 40445 : cur |= (1U << rest) - 1;
396 40445 : lnr -= rest;
397 40445 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
398 : }
399 145967 : assert(lnr==0);
400 145967 : if (cnt < s->end)
401 353657 : cnt = s->end;
402 : }
403 : }
404 103735 : if (nr > BATcount(b)) {
405 60816 : BATsetcount(b, nr);
406 : }
407 103735 : MT_lock_unset(&b->theaplock);
408 :
409 103735 : bat_destroy(b);
410 103735 : return LOG_OK;
411 : }
412 :
413 : /* TODO return LOG_OK/ERR */
414 : static void
415 103761 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
416 : {
417 103761 : sqlstore* store = tr->store;
418 103761 : segment *cur = s->segs->h, *seg = NULL;
419 528471 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
420 424710 : if (cur->ts == tr->tid) {
421 160231 : if (!cur->deleted)
422 91727 : cur->oldts = 0;
423 160231 : cur->ts = commit_ts;
424 : }
425 424710 : if (!seg) {
426 : /* first segment */
427 : seg = cur;
428 : }
429 320949 : else if (seg->ts < TRANSACTION_ID_BASE) {
430 : /* possible merge since both deleted flags are equal */
431 291949 : if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
432 222883 : int merge = 1;
433 222883 : node *n = store->active->h;
434 763610 : for (int i = 0; i < store->active->cnt; i++, n = n->next) {
435 665314 : sql_trans* other = ((sql_trans*)n->data);
436 665314 : ulng active = other->ts;
437 665314 : if(other->active == 2)
438 21746 : continue; /* pretend that another recently committed transaction is no longer active */
439 643568 : if (active == tr->ts)
440 134770 : continue; /* pretend that committing transaction has already committed and is no longer active */
441 508798 : if (seg->ts < active && cur->ts < active)
442 : break;
443 495620 : if (seg->ts > active && cur->ts > active)
444 384211 : continue;
445 :
446 111409 : assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
447 : /* cannot safely merge since there is an active transaction between the segments */
448 : merge = false;
449 : break;
450 : }
451 : /* merge segments */
452 222948 : if (merge) {
453 111474 : seg->end = cur->end;
454 111474 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
455 111474 : if (cur == s->segs->t)
456 26813 : s->segs->t = seg;
457 111474 : if (commit_ts == oldest) {
458 94744 : ATOMIC_PTR_DESTROY(&cur->next);
459 94744 : _DELETE(cur);
460 : } else
461 33460 : mark4destroy(cur, change, commit_ts);
462 111474 : cur = seg;
463 111474 : continue;
464 : }
465 : }
466 : }
467 : seg = cur;
468 : }
469 103761 : }
470 :
471 : static int
472 2171197 : segments_in_transaction(sql_trans *tr, sql_table *t)
473 : {
474 2171197 : storage *s = ATOMIC_PTR_GET(&t->data);
475 2171197 : segment *seg = s->segs->h;
476 :
477 2171197 : if (seg && s->segs->t->ts == tr->tid)
478 : return 1;
479 640736 : for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
480 535840 : if (seg->ts == tr->tid)
481 : return 1;
482 : }
483 : return 0;
484 : }
485 :
486 : static size_t
487 17948128 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
488 : {
489 17948128 : size_t cnt = 0;
490 :
491 : /* because a table can grow rows over the time a transaction is running, we need to find the last valid segment, to
492 : * keep all of the parts aligned */
493 17948128 : lock_table(tr->store, table->base.id);
494 17950956 : segment *s = segs->h, *l = NULL;
495 :
496 17950956 : if (segs->t && SEG_IS_VALID(segs->t, tr))
497 15290325 : l = s = segs->t;
498 :
499 215228515 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
500 197277714 : if (SEG_IS_VALID(s, tr))
501 : l = s;
502 : }
503 17950801 : if (l)
504 17932400 : cnt = l->end;
505 17950801 : unlock_table(tr->store, table->base.id);
506 17951289 : return cnt;
507 : }
508 :
509 : static segments *
510 51447 : new_segments(sql_trans *tr, size_t cnt)
511 : {
512 51447 : segments *n = (segments*)GDKmalloc(sizeof(segments));
513 :
514 51447 : if (n) {
515 51447 : n->nr_reused = 0;
516 51447 : n->h = n->t = new_segment(NULL, tr, cnt);
517 51447 : if (!n->h) {
518 0 : GDKfree(n);
519 0 : return NULL;
520 : }
521 51447 : sql_ref_init(&n->r);
522 : }
523 : return n;
524 : }
525 :
526 : static sql_delta *
527 30397516 : timestamp_delta( sql_trans *tr, sql_delta *d)
528 : {
529 30445097 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
530 47581 : d = d->next;
531 30397627 : return d;
532 : }
533 :
534 : static sql_delta *
535 30241629 : col_timestamp_delta( sql_trans *tr, sql_column *c)
536 : {
537 30241629 : return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
538 : }
539 :
540 : static sql_delta *
541 23609 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
542 : {
543 23609 : return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
544 : }
545 :
546 : static storage *
547 18447958 : timestamp_storage( sql_trans *tr, storage *d)
548 : {
549 18447958 : if (!d)
550 : return NULL;
551 18511666 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
552 63708 : d = d->next;
553 : return d;
554 : }
555 :
556 : static storage *
557 18422573 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
558 : {
559 18422573 : return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
560 : }
561 :
562 : static sql_delta*
563 19306 : delta_dup(sql_delta *d)
564 : {
565 19306 : ATOMIC_INC(&d->cs.refcnt);
566 19306 : return d;
567 : }
568 :
569 : static void *
570 17940 : col_dup(sql_column *c)
571 : {
572 17940 : return delta_dup(ATOMIC_PTR_GET(&c->data));
573 : }
574 :
575 : static void *
576 2653 : idx_dup(sql_idx *i)
577 : {
578 2653 : if (!ATOMIC_PTR_GET(&i->data))
579 : return NULL;
580 1366 : return delta_dup(ATOMIC_PTR_GET(&i->data));
581 : }
582 :
583 : static storage*
584 1539 : storage_dup(storage *d)
585 : {
586 1539 : ATOMIC_INC(&d->cs.refcnt);
587 1539 : return d;
588 : }
589 :
590 : static void *
591 1539 : del_dup(sql_table *t)
592 : {
593 1539 : return storage_dup(ATOMIC_PTR_GET(&t->data));
594 : }
595 :
596 : static size_t
597 17 : count_inserts( segment *s, sql_trans *tr)
598 : {
599 17 : size_t cnt = 0;
600 :
601 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
602 55 : if (!s->deleted && s->ts == tr->tid)
603 4 : cnt += s->end - s->start;
604 : }
605 17 : return cnt;
606 : }
607 :
608 : static size_t
609 855763 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
610 : {
611 855763 : size_t cnt = 0;
612 :
613 1003416 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
614 : ;
615 :
616 5367407 : for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
617 4511645 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
618 1486733 : cnt += s->end - s->start;
619 : }
620 855762 : return cnt;
621 : }
622 :
623 : static size_t
624 17 : count_deletes( segment *s, sql_trans *tr)
625 : {
626 17 : size_t cnt = 0;
627 :
628 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
629 55 : if (SEG_IS_DELETED(s, tr))
630 17 : cnt += s->end - s->start;
631 : }
632 17 : return cnt;
633 : }
634 :
635 : #define CNT_ACTIVE 10
636 :
637 : static size_t
638 16862993 : count_col(sql_trans *tr, sql_column *c, int access)
639 : {
640 16862993 : storage *d;
641 16862993 : sql_delta *ds;
642 :
643 16862993 : if (!isTable(c->t))
644 : return 0;
645 16862993 : d = tab_timestamp_storage(tr, c->t);
646 16861325 : ds = col_timestamp_delta(tr, c);
647 16861246 : if (!d ||!ds)
648 : return 0;
649 16861246 : if (access == 2)
650 482800 : return ds?ds->cs.ucnt:0;
651 16378446 : if (access == 1)
652 17 : return count_inserts(d->segs->h, tr);
653 16378429 : if (access == QUICK)
654 0 : return d->segs->t?d->segs->t->end:0;
655 16378429 : if (access == CNT_ACTIVE) {
656 855753 : size_t cnt = segs_end(d->segs, tr, c->t);
657 855783 : lock_table(tr->store, c->t->base.id);
658 855763 : cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
659 855761 : unlock_table(tr->store, c->t->base.id);
660 855761 : return cnt;
661 : }
662 15522676 : return segs_end(d->segs, tr, c->t);
663 : }
664 :
665 : static size_t
666 20902 : count_idx(sql_trans *tr, sql_idx *i, int access)
667 : {
668 20902 : storage *d;
669 20902 : sql_delta *ds;
670 :
671 20902 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
672 6719 : return 0;
673 14183 : d = tab_timestamp_storage(tr, i->t);
674 14182 : ds = idx_timestamp_delta(tr, i);
675 14181 : if (!d || !ds)
676 : return 0;
677 14181 : if (access == 2)
678 2871 : return ds?ds->cs.ucnt:0;
679 11310 : if (access == 1)
680 0 : return count_inserts(d->segs->h, tr);
681 11310 : if (access == QUICK)
682 3550 : return d->segs->t?d->segs->t->end:0;
683 7760 : return segs_end(d->segs, tr, i->t);
684 : }
685 :
686 : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
687 : static BAT *
688 13125712 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
689 : {
690 13125712 : BAT *b;
691 :
692 13125712 : assert(access == RD_UPD_ID || access == RD_UPD_VAL);
693 : /* returns the updates for cs */
694 13125712 : if (cs->uibid && cs->uvbid && cs->ucnt) {
695 12136 : if (access == RD_UPD_ID) {
696 7311 : if (!(b = temp_descriptor(cs->uibid)))
697 : return NULL;
698 7311 : if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
699 1689 : (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
700 5622 : oid nil = oid_nil;
701 : /* less then cnt */
702 5622 : BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false, false);
703 5622 : if (!s) {
704 0 : bat_destroy(b);
705 0 : return NULL;
706 : }
707 :
708 5622 : BAT *nb = BATproject(s, b);
709 5622 : bat_destroy(s);
710 5622 : bat_destroy(b);
711 5622 : b = nb;
712 : }
713 : } else {
714 4825 : b = temp_descriptor(cs->uvbid);
715 : }
716 : } else {
717 21856045 : b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
718 : }
719 : return b;
720 : }
721 :
722 : static BAT *
723 0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
724 : {
725 0 : int err = 0;
726 0 : BAT *uv = *UV;
727 0 : BUN cnt = BATcount(ui)+BATcount(oi);
728 0 : BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
729 0 : BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
730 :
731 0 : if (!ni || (uv && !nv)) {
732 0 : bat_destroy(ni);
733 0 : bat_destroy(nv);
734 0 : bat_destroy(ui);
735 0 : bat_destroy(uv);
736 0 : bat_destroy(oi);
737 0 : bat_destroy(ov);
738 0 : return NULL;
739 : }
740 0 : BATiter uvi;
741 0 : BATiter ovi;
742 :
743 0 : if (uv) {
744 0 : uvi = bat_iterator(uv);
745 0 : ovi = bat_iterator(ov);
746 : }
747 :
748 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
749 0 : BUN uip = 0, uie = BATcount(ui);
750 0 : BUN oip = 0, oie = BATcount(oi);
751 :
752 0 : oid uiseqb = ui->tseqbase;
753 0 : oid oiseqb = oi->tseqbase;
754 0 : oid *uipt = NULL, *oipt = NULL;
755 0 : BATiter uii = bat_iterator(ui);
756 0 : BATiter oii = bat_iterator(oi);
757 0 : if (!BATtdensebi(&uii))
758 0 : uipt = uii.base;
759 0 : if (!BATtdensebi(&oii))
760 0 : oipt = oii.base;
761 0 : while (uip < uie && oip < oie && !err) {
762 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
763 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
764 :
765 0 : if (uiid <= oiid) {
766 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
767 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
768 : err = 1;
769 0 : uip++;
770 0 : if (uiid == oiid)
771 0 : oip++;
772 : } else { /* uiid > oiid */
773 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
774 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
775 : err = 1;
776 0 : oip++;
777 : }
778 : }
779 0 : while (uip < uie && !err) {
780 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
781 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
782 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
783 : err = 1;
784 0 : uip++;
785 : }
786 0 : while (oip < oie && !err) {
787 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
788 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
789 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
790 : err = 1;
791 0 : oip++;
792 : }
793 0 : if (uv) {
794 0 : bat_iterator_end(&uvi);
795 0 : bat_iterator_end(&ovi);
796 : }
797 0 : bat_iterator_end(&uii);
798 0 : bat_iterator_end(&oii);
799 0 : bat_destroy(ui);
800 0 : bat_destroy(uv);
801 0 : bat_destroy(oi);
802 0 : bat_destroy(ov);
803 0 : if (!err) {
804 0 : if (nv)
805 0 : *UV = nv;
806 0 : return ni;
807 : }
808 0 : *UV = NULL;
809 0 : bat_destroy(ni);
810 0 : bat_destroy(nv);
811 0 : return NULL;
812 : }
813 :
814 : static sql_delta *
815 8749946 : older_delta( sql_delta *d, sql_trans *tr)
816 : {
817 8749946 : sql_delta *o = d->next;
818 :
819 8757419 : while (o && !o->cs.merged) {
820 7471 : if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
821 : break;
822 : else
823 7473 : o = o->next;
824 : }
825 8749948 : if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
826 0 : return o;
827 : return NULL;
828 : }
829 :
830 : static BAT *
831 8750038 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
832 : {
833 8750038 : assert(tr->active);
834 8750038 : sql_delta *o = NULL;
835 8750038 : BAT *ui = NULL, *uv = NULL;
836 :
837 8750038 : if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
838 : return NULL;
839 8749903 : if (access == RD_UPD_VAL) {
840 4376319 : if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
841 0 : bat_destroy(ui);
842 0 : return NULL;
843 : }
844 : }
845 8749940 : while ((o = older_delta(d, tr)) != NULL) {
846 0 : BAT *oui = NULL, *ouv = NULL;
847 0 : if (!oui)
848 0 : oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
849 0 : if (access == RD_UPD_VAL)
850 0 : ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
851 0 : if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
852 0 : bat_destroy(ui);
853 0 : bat_destroy(uv);
854 0 : bat_destroy(oui);
855 0 : bat_destroy(ouv);
856 0 : return NULL;
857 : }
858 0 : if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
859 : return NULL;
860 : d = o;
861 : }
862 8749888 : if (uv) {
863 4376348 : bat_destroy(ui);
864 4376348 : return uv;
865 : }
866 : return ui;
867 : }
868 :
869 : static BAT *
870 2342 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
871 : {
872 2342 : lock_column(tr->store, c->base.id);
873 2342 : sql_delta *d = col_timestamp_delta(tr, c);
874 2342 : int type = c->type.type->localtype;
875 :
876 2342 : if (!d) {
877 0 : unlock_column(tr->store, c->base.id);
878 0 : return NULL;
879 : }
880 2342 : if (d->cs.st == ST_DICT) {
881 0 : BAT *b = quick_descriptor(d->cs.bid);
882 :
883 0 : type = b->ttype;
884 : }
885 2342 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
886 2342 : unlock_column(tr->store, c->base.id);
887 2342 : return bn;
888 : }
889 :
890 : static BAT *
891 0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
892 : {
893 0 : lock_column(tr->store, i->base.id);
894 0 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
895 0 : sql_delta *d = idx_timestamp_delta(tr, i);
896 :
897 0 : if (!d) {
898 0 : unlock_column(tr->store, i->base.id);
899 0 : return NULL;
900 : }
901 0 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
902 0 : unlock_column(tr->store, i->base.id);
903 0 : return bn;
904 : }
905 :
906 : static BAT *
907 9005556 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
908 : {
909 9005556 : BAT *b;
910 :
911 9005556 : assert(access == RDONLY || access == QUICK || access == RD_EXT);
912 9005556 : assert(cs != NULL);
913 9005556 : if (access == QUICK)
914 175953 : return quick_descriptor(cs->bid);
915 8829603 : if (access == RD_EXT)
916 857 : return temp_descriptor(cs->ebid);
917 8828746 : assert(cs->bid);
918 8828746 : b = temp_descriptor(cs->bid);
919 8828659 : if (b == NULL)
920 : return NULL;
921 8828659 : assert(b->batRestricted == BAT_READ);
922 : /* return slice */
923 8828659 : BAT *s = BATslice(b, 0, cnt);
924 8828911 : bat_destroy(b);
925 8828911 : return s;
926 : }
927 :
928 : static int
929 4373672 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
930 : {
931 4373672 : lock_column(tr->store, c->base.id);
932 4373557 : size_t cnt = count_col(tr, c, 0);
933 4373930 : sql_delta *d = col_timestamp_delta(tr, c);
934 4373900 : int type = c->type.type->localtype;
935 :
936 4373900 : if (!d) {
937 0 : unlock_column(tr->store, c->base.id);
938 0 : return LOG_ERR;
939 : }
940 4373900 : if (d->cs.st == ST_DICT) {
941 2 : BAT *b = quick_descriptor(d->cs.bid);
942 :
943 2 : type = b->ttype;
944 : }
945 :
946 4373900 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
947 4373996 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
948 :
949 4373995 : unlock_column(tr->store, c->base.id);
950 :
951 4373954 : if (*ui == NULL || *uv == NULL) {
952 0 : bat_destroy(*ui);
953 0 : bat_destroy(*uv);
954 0 : return LOG_ERR;
955 : }
956 : return LOG_OK;
957 : }
958 :
959 : static int
960 23 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
961 : {
962 23 : lock_column(tr->store, i->base.id);
963 23 : size_t cnt = count_idx(tr, i, 0);
964 23 : sql_delta *d = idx_timestamp_delta(tr, i);
965 23 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
966 :
967 23 : if (!d) {
968 0 : unlock_column(tr->store, i->base.id);
969 0 : return LOG_ERR;
970 : }
971 :
972 23 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
973 23 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
974 :
975 23 : unlock_column(tr->store, i->base.id);
976 :
977 23 : if (*ui == NULL || *uv == NULL) {
978 0 : bat_destroy(*ui);
979 0 : bat_destroy(*uv);
980 0 : return LOG_ERR;
981 : }
982 : return LOG_OK;
983 : }
984 :
985 : static void * /* BAT * */
986 8998669 : bind_col(sql_trans *tr, sql_column *c, int access)
987 : {
988 8998669 : assert(access == QUICK || tr->active);
989 8998669 : if (!isTable(c->t))
990 : return NULL;
991 8998669 : sql_delta *d = col_timestamp_delta(tr, c);
992 8998529 : if (!d)
993 : return NULL;
994 8998529 : size_t cnt = count_col(tr, c, 0);
995 8998342 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
996 2342 : return bind_ucol(tr, c, access, cnt);
997 8996000 : BAT *b = cs_bind_bat( &d->cs, access, cnt);
998 8995298 : assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
999 : return b;
1000 : }
1001 :
1002 : static void * /* BAT * */
1003 9407 : bind_idx(sql_trans *tr, sql_idx * i, int access)
1004 : {
1005 9407 : assert(access == QUICK || tr->active);
1006 9407 : if (!isTable(i->t))
1007 : return NULL;
1008 9407 : sql_delta *d = idx_timestamp_delta(tr, i);
1009 9409 : if (!d)
1010 : return NULL;
1011 9409 : size_t cnt = count_idx(tr, i, 0);
1012 9409 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
1013 0 : return bind_uidx(tr, i, access, cnt);
1014 9409 : return cs_bind_bat( &d->cs, access, cnt);
1015 : }
1016 :
1017 : static int
1018 2296 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
1019 : {
1020 2296 : if (!cs->uibid) {
1021 0 : cs->uibid = e_bat(TYPE_oid);
1022 0 : if (cs->uibid == BID_NIL)
1023 : return LOG_ERR;
1024 : }
1025 2296 : if (!cs->uvbid) {
1026 0 : BAT *cur = quick_descriptor(cs->bid);
1027 0 : if (!cur)
1028 : return LOG_ERR;
1029 0 : int type = cur->ttype;
1030 0 : cs->uvbid = e_bat(type);
1031 0 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1032 : return LOG_ERR;
1033 : }
1034 2296 : BAT *ui = temp_descriptor(cs->uibid);
1035 2296 : BAT *uv = temp_descriptor(cs->uvbid);
1036 :
1037 2296 : if (ui == NULL || uv == NULL) {
1038 0 : bat_destroy(ui);
1039 0 : bat_destroy(uv);
1040 0 : return LOG_ERR;
1041 : }
1042 2296 : assert(ui && uv);
1043 2296 : if (isEbat(ui)){
1044 392 : temp_destroy(cs->uibid);
1045 392 : cs->uibid = temp_copy(ui->batCacheid, true, true);
1046 392 : bat_destroy(ui);
1047 392 : if (cs->uibid == BID_NIL ||
1048 392 : (ui = temp_descriptor(cs->uibid)) == NULL) {
1049 0 : bat_destroy(uv);
1050 0 : return LOG_ERR;
1051 : }
1052 : }
1053 2296 : if (isEbat(uv)){
1054 392 : temp_destroy(cs->uvbid);
1055 392 : cs->uvbid = temp_copy(uv->batCacheid, true, true);
1056 392 : bat_destroy(uv);
1057 392 : if (cs->uvbid == BID_NIL ||
1058 392 : (uv = temp_descriptor(cs->uvbid)) == NULL) {
1059 0 : bat_destroy(ui);
1060 0 : return LOG_ERR;
1061 : }
1062 : }
1063 2296 : *Ui = ui;
1064 2296 : *Uv = uv;
1065 2296 : return LOG_OK;
1066 : }
1067 :
1068 : static int
1069 5116 : segments_is_append(segment *s, sql_trans *tr, oid rid)
1070 : {
1071 44691 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1072 44691 : if (s->start <= rid && s->end > rid) {
1073 5116 : if (s->ts == tr->tid && !s->deleted) {
1074 2874 : return 1;
1075 : }
1076 : break;
1077 : }
1078 : }
1079 : return 0;
1080 : }
1081 :
1082 : static int
1083 2242 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
1084 : {
1085 38593 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1086 38593 : if (s->start <= rid && s->end > rid) {
1087 2242 : if (s->ts >= tr->ts && s->deleted) {
1088 0 : return 1;
1089 : }
1090 : break;
1091 : }
1092 : }
1093 : return 0;
1094 : }
1095 :
1096 : static sql_delta *
1097 0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
1098 : {
1099 0 : sql_delta *n = ZNEW(sql_delta);
1100 0 : if (!n)
1101 : return NULL;
1102 0 : *n = *bat;
1103 0 : n->next = NULL;
1104 0 : n->cs.ts = tr->tid;
1105 0 : return n;
1106 : }
1107 :
1108 : static BAT *
1109 17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
1110 : {
1111 17 : BAT *newoffsets = NULL;
1112 17 : sql_delta *bat = *batp;
1113 17 : column_storage *cs = &bat->cs;
1114 17 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1115 :
1116 17 : if (!u)
1117 : return NULL;
1118 17 : BUN max_cnt = (BATcount(u) < 256)?256:(BATcount(u)<65536)?65536:INT_MAX;
1119 17 : if (DICTprepare4append(&newoffsets, i, u) < 0) {
1120 0 : bat_destroy(u);
1121 0 : return NULL;
1122 : } else {
1123 17 : int new = 0;
1124 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1125 17 : if (BATcount(u) >= max_cnt) {
1126 1 : if (max_cnt == INT_MAX) { /* decompress */
1127 0 : if (!(b = temp_descriptor(cs->bid))) {
1128 0 : bat_destroy(u);
1129 0 : return NULL;
1130 : }
1131 0 : if (cs->ucnt) {
1132 0 : BAT *ui = NULL, *uv = NULL;
1133 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1134 0 : bat_destroy(b);
1135 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1136 0 : bat_destroy(nb);
1137 0 : bat_destroy(u);
1138 0 : return NULL;
1139 : }
1140 0 : b = nb;
1141 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1142 0 : bat_destroy(ui);
1143 0 : bat_destroy(uv);
1144 0 : bat_destroy(b);
1145 0 : bat_destroy(u);
1146 : }
1147 0 : bat_destroy(ui);
1148 0 : bat_destroy(uv);
1149 : }
1150 0 : n = DICTdecompress_(b, u, PERSISTENT);
1151 0 : bat_destroy(b);
1152 0 : assert(newoffsets == NULL);
1153 0 : if (!n) {
1154 0 : bat_destroy(u);
1155 0 : return NULL;
1156 : }
1157 0 : if (cs->ts != tr->tid) {
1158 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1159 0 : bat_destroy(n);
1160 0 : return NULL;
1161 : }
1162 0 : cs = &(*batp)->cs;
1163 0 : new = 1;
1164 : }
1165 0 : if (cs->bid && !new)
1166 0 : temp_destroy(cs->bid);
1167 0 : n = transfer_to_systrans(n);
1168 0 : if (n == NULL)
1169 : return NULL;
1170 0 : bat_set_access(n, BAT_READ);
1171 0 : cs->bid = temp_create(n);
1172 0 : bat_destroy(n);
1173 0 : if (cs->ebid && !new)
1174 0 : temp_destroy(cs->ebid);
1175 0 : cs->ebid = 0;
1176 0 : cs->ucnt = 0;
1177 0 : if (cs->uibid && !new)
1178 0 : temp_destroy(cs->uibid);
1179 0 : if (cs->uvbid && !new)
1180 0 : temp_destroy(cs->uvbid);
1181 0 : cs->uibid = cs->uvbid = 0;
1182 0 : cs->st = ST_DEFAULT;
1183 0 : cs->cleared = true;
1184 : } else {
1185 1 : if (!(b = temp_descriptor(cs->bid))) {
1186 0 : bat_destroy(newoffsets);
1187 0 : bat_destroy(u);
1188 0 : return NULL;
1189 : }
1190 2 : n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
1191 1 : bat_destroy(b);
1192 1 : if (!n) {
1193 0 : bat_destroy(newoffsets);
1194 0 : bat_destroy(u);
1195 0 : return NULL;
1196 : }
1197 1 : if (cs->ts != tr->tid) {
1198 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1199 0 : bat_destroy(n);
1200 0 : return NULL;
1201 : }
1202 0 : cs = &(*batp)->cs;
1203 0 : new = 1;
1204 0 : temp_dup(cs->ebid);
1205 0 : if (cs->uibid) {
1206 0 : temp_dup(cs->uibid);
1207 0 : temp_dup(cs->uvbid);
1208 : }
1209 : }
1210 1 : if (cs->bid && !new)
1211 1 : temp_destroy(cs->bid);
1212 1 : n = transfer_to_systrans(n);
1213 1 : if (n == NULL)
1214 : return NULL;
1215 1 : bat_set_access(n, BAT_READ);
1216 1 : cs->bid = temp_create(n);
1217 1 : bat_destroy(n);
1218 1 : cs->cleared = true;
1219 1 : i = newoffsets;
1220 : }
1221 : } else { /* append */
1222 16 : i = newoffsets;
1223 : }
1224 : }
1225 17 : bat_destroy(u);
1226 17 : return i;
1227 : }
1228 :
1229 : static BAT *
1230 0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
1231 : {
1232 0 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1233 0 : BAT *newoffsets = NULL;
1234 0 : BAT *b = NULL, *n = NULL;
1235 :
1236 0 : if (!(b = temp_descriptor(cs->bid)))
1237 : return NULL;
1238 :
1239 0 : if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
1240 0 : bat_destroy(b);
1241 0 : return NULL;
1242 : } else {
1243 : /* returns new offset bat if values within min/max, else decompress */
1244 0 : if (!newoffsets) { /* decompress */
1245 0 : if (cs->ucnt) {
1246 0 : BAT *ui = NULL, *uv = NULL;
1247 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1248 0 : bat_destroy(b);
1249 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1250 0 : bat_destroy(nb);
1251 0 : return NULL;
1252 : }
1253 0 : b = nb;
1254 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1255 0 : bat_destroy(ui);
1256 0 : bat_destroy(uv);
1257 0 : bat_destroy(b);
1258 : }
1259 0 : bat_destroy(ui);
1260 0 : bat_destroy(uv);
1261 : }
1262 0 : n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
1263 0 : bat_destroy(b);
1264 0 : if (!n)
1265 : return NULL;
1266 0 : if (cs->bid)
1267 0 : temp_destroy(cs->bid);
1268 0 : n = transfer_to_systrans(n);
1269 0 : if (n == NULL)
1270 : return NULL;
1271 0 : bat_set_access(n, BAT_READ);
1272 0 : cs->bid = temp_create(n);
1273 0 : cs->ucnt = 0;
1274 0 : if (cs->uibid)
1275 0 : temp_destroy(cs->uibid);
1276 0 : if (cs->uvbid)
1277 0 : temp_destroy(cs->uvbid);
1278 0 : cs->uibid = cs->uvbid = 0;
1279 0 : cs->st = ST_DEFAULT;
1280 0 : cs->cleared = true;
1281 0 : b = n;
1282 : } else { /* append */
1283 : i = newoffsets;
1284 : }
1285 : }
1286 0 : bat_destroy(b);
1287 0 : return i;
1288 : }
1289 :
1290 : /*
1291 : * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
1292 : */
1293 : static int
1294 2974 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
1295 : {
1296 2974 : int res = LOG_OK;
1297 2974 : sql_delta *bat = *batp;
1298 2974 : column_storage *cs = &bat->cs;
1299 2974 : BAT *otids = tids, *oupdates = updates;
1300 :
1301 2974 : if (!BATcount(tids))
1302 : return LOG_OK;
1303 :
1304 2974 : if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
1305 6 : tids = BATunmask(tids);
1306 6 : if (!tids)
1307 : return LOG_ERR;
1308 : }
1309 2974 : if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
1310 0 : updates = BATunmask(updates);
1311 0 : if (!updates) {
1312 0 : if (otids != tids)
1313 0 : bat_destroy(tids);
1314 0 : return LOG_ERR;
1315 : }
1316 2974 : } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
1317 42 : updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
1318 42 : if (!updates) {
1319 0 : if (otids != tids)
1320 0 : bat_destroy(tids);
1321 0 : return LOG_ERR;
1322 : }
1323 : }
1324 :
1325 2974 : if (cs->st == ST_DICT) {
1326 : /* possibly a new array is returned */
1327 4 : BAT *nupdates = dict_append_bat(tr, batp, updates);
1328 4 : bat = *batp;
1329 4 : cs = &bat->cs;
1330 4 : if (oupdates != updates)
1331 0 : bat_destroy(updates);
1332 4 : updates = nupdates;
1333 4 : if (!updates) {
1334 0 : if (otids != tids)
1335 0 : bat_destroy(tids);
1336 0 : return LOG_ERR;
1337 : }
1338 : }
1339 :
1340 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1341 : /* currently only one update delta is possible */
1342 2974 : lock_table(tr->store, t->base.id);
1343 2974 : storage *s = ATOMIC_PTR_GET(&t->data);
1344 2974 : if (!is_new && !cs->cleared) {
1345 2651 : if (!tids->tsorted /* make sure we have simple dense or oids */) {
1346 6 : BAT *sorted, *order;
1347 6 : if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
1348 0 : if (otids != tids)
1349 0 : bat_destroy(tids);
1350 0 : if (oupdates != updates)
1351 0 : bat_destroy(updates);
1352 0 : unlock_table(tr->store, t->base.id);
1353 0 : return LOG_ERR;
1354 : }
1355 6 : if (otids != tids)
1356 0 : bat_destroy(tids);
1357 6 : tids = sorted;
1358 6 : BAT *nupdates = BATproject(order, updates);
1359 6 : bat_destroy(order);
1360 6 : if (oupdates != updates)
1361 0 : bat_destroy(updates);
1362 6 : updates = nupdates;
1363 6 : if (!updates) {
1364 0 : bat_destroy(tids);
1365 0 : unlock_table(tr->store, t->base.id);
1366 0 : return LOG_ERR;
1367 : }
1368 : }
1369 2651 : assert(tids->tsorted);
1370 2651 : BAT *ui = NULL, *uv = NULL;
1371 :
1372 : /* handle updates on just inserted bits */
1373 : /* handle updates on updates (within one transaction) */
1374 2651 : BATiter upi = bat_iterator(updates);
1375 2651 : BUN cnt = 0, ucnt = BATcount(tids);
1376 2651 : BAT *b, *ins = NULL;
1377 2651 : int *msk = NULL;
1378 :
1379 2651 : if((b = temp_descriptor(cs->bid)) == NULL)
1380 : res = LOG_ERR;
1381 :
1382 2651 : if (res == LOG_OK && BATtdense(tids)) {
1383 2514 : oid start = tids->tseqbase, offset = start;
1384 2514 : oid end = start + ucnt;
1385 :
1386 9164 : for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
1387 7262 : if (seg->start <= start && seg->end > start) {
1388 : /* check for delete conflicts */
1389 2514 : if (seg->ts >= tr->ts && seg->deleted) {
1390 0 : res = LOG_CONFLICT;
1391 0 : continue;
1392 : }
1393 :
1394 : /* check for inplace updates */
1395 2514 : BUN lend = end < seg->end?end:seg->end;
1396 2514 : if (seg->ts == tr->tid && !seg->deleted) {
1397 113 : if (!ins) {
1398 113 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1399 113 : if (!ins)
1400 : res = LOG_ERR;
1401 : else {
1402 113 : BATsetcount(ins, ucnt); /* all full updates */
1403 113 : msk = (int*)Tloc(ins, 0);
1404 113 : BUN end = (ucnt+31)/32;
1405 113 : memset(msk, 0, end * sizeof(int));
1406 : }
1407 : }
1408 474 : for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
1409 361 : const void *upd = BUNtail(upi, rid-offset);
1410 361 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1411 0 : res = LOG_ERR;
1412 :
1413 361 : oid word = i/32;
1414 361 : int pos = i%32;
1415 361 : msk[word] |= 1U<<pos;
1416 361 : cnt++;
1417 : }
1418 : }
1419 : }
1420 7262 : if (end < seg->end)
1421 : break;
1422 : }
1423 140 : } else if (res == LOG_OK && complex_cand(tids)) {
1424 3 : struct canditer ci;
1425 3 : segment *seg = s->segs->h;
1426 3 : canditer_init(&ci, NULL, tids);
1427 3 : BUN i = 0;
1428 1036 : while ( seg && res == LOG_OK && i < ucnt) {
1429 1033 : oid rid = canditer_next(&ci);
1430 1033 : if (seg->end <= rid)
1431 13 : seg = ATOMIC_PTR_GET(&seg->next);
1432 1020 : else if (seg->start <= rid && seg->end > rid) {
1433 : /* check for delete conflicts */
1434 1020 : if (seg->ts >= tr->ts && seg->deleted) {
1435 0 : res = LOG_CONFLICT;
1436 0 : continue;
1437 : }
1438 :
1439 : /* check for inplace updates */
1440 1020 : if (seg->ts == tr->tid && !seg->deleted) {
1441 0 : if (!ins) {
1442 0 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1443 0 : if (!ins) {
1444 : res = LOG_ERR;
1445 : break;
1446 : } else {
1447 0 : BATsetcount(ins, ucnt); /* all full updates */
1448 0 : msk = (int*)Tloc(ins, 0);
1449 0 : BUN end = (ucnt+31)/32;
1450 0 : memset(msk, 0, end * sizeof(int));
1451 : }
1452 : }
1453 0 : ptr upd = BUNtail(upi, i);
1454 0 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1455 0 : res = LOG_ERR;
1456 :
1457 0 : oid word = i/32;
1458 0 : int pos = i%32;
1459 0 : msk[word] |= 1U<<pos;
1460 0 : cnt++;
1461 : }
1462 1020 : i++;
1463 : }
1464 : }
1465 134 : } else if (res == LOG_OK) {
1466 134 : BUN i = 0;
1467 134 : oid *rid = Tloc(tids,0);
1468 134 : segment *seg = s->segs->h;
1469 14420 : while ( seg && res == LOG_OK && i < ucnt) {
1470 14286 : if (seg->end <= rid[i])
1471 3820 : seg = ATOMIC_PTR_GET(&seg->next);
1472 10466 : else if (seg->start <= rid[i] && seg->end > rid[i]) {
1473 : /* check for delete conflicts */
1474 10466 : if (seg->ts >= tr->ts && seg->deleted) {
1475 0 : res = LOG_CONFLICT;
1476 0 : continue;
1477 : }
1478 :
1479 : /* check for inplace updates */
1480 10466 : if (seg->ts == tr->tid && !seg->deleted) {
1481 301 : if (!ins) {
1482 33 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1483 33 : if (!ins) {
1484 : res = LOG_ERR;
1485 : break;
1486 : } else {
1487 33 : BATsetcount(ins, ucnt); /* all full updates */
1488 33 : msk = (int*)Tloc(ins, 0);
1489 33 : BUN end = (ucnt+31)/32;
1490 33 : memset(msk, 0, end * sizeof(int));
1491 : }
1492 : }
1493 301 : const void *upd = BUNtail(upi, i);
1494 301 : if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
1495 0 : res = LOG_ERR;
1496 :
1497 301 : oid word = i/32;
1498 301 : int pos = i%32;
1499 301 : msk[word] |= 1U<<pos;
1500 301 : cnt++;
1501 : }
1502 10466 : i++;
1503 : }
1504 : }
1505 : }
1506 :
1507 2651 : if (res == LOG_OK && cnt < ucnt) { /* now handle real updates */
1508 2505 : if (cs->ucnt == 0) {
1509 2451 : if (cnt) {
1510 0 : BAT *nins = BATmaskedcands(0, ucnt, ins, false);
1511 0 : if (nins) {
1512 0 : ui = BATproject(nins, tids);
1513 0 : uv = BATproject(nins, updates);
1514 0 : bat_destroy(nins);
1515 : }
1516 : } else {
1517 2451 : ui = temp_descriptor(tids->batCacheid);
1518 2451 : uv = temp_descriptor(updates->batCacheid);
1519 : }
1520 2451 : if (!ui || !uv) {
1521 : res = LOG_ERR;
1522 : } else {
1523 2451 : temp_destroy(cs->uibid);
1524 2451 : temp_destroy(cs->uvbid);
1525 2451 : ui = transfer_to_systrans(ui);
1526 2451 : uv = transfer_to_systrans(uv);
1527 2451 : if (ui == NULL || uv == NULL) {
1528 0 : BBPreclaim(ui);
1529 0 : BBPreclaim(uv);
1530 : res = LOG_ERR;
1531 : } else {
1532 2451 : cs->uibid = temp_create(ui);
1533 2451 : cs->uvbid = temp_create(uv);
1534 2451 : cs->ucnt = BATcount(ui);
1535 : }
1536 : }
1537 : } else {
1538 54 : BAT *nui = NULL, *nuv = NULL;
1539 :
1540 : /* merge taking msk of inserted into account */
1541 54 : if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
1542 : res = LOG_ERR;
1543 :
1544 54 : if (res == LOG_OK) {
1545 54 : const void *upd = NULL;
1546 54 : nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
1547 54 : nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
1548 :
1549 54 : if (!nui || !nuv) {
1550 : res = LOG_ERR;
1551 : } else {
1552 54 : BATiter ovi = bat_iterator(uv);
1553 :
1554 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
1555 54 : BUN uip = 0, uie = BATcount(ui);
1556 54 : BUN nip = 0, nie = BATcount(tids);
1557 54 : oid uiseqb = ui->tseqbase;
1558 54 : oid niseqb = tids->tseqbase;
1559 54 : oid *uipt = NULL, *nipt = NULL;
1560 54 : BATiter uii = bat_iterator(ui);
1561 54 : BATiter tidsi = bat_iterator(tids);
1562 54 : if (!BATtdensebi(&uii))
1563 53 : uipt = uii.base;
1564 54 : if (!BATtdensebi(&tidsi))
1565 53 : nipt = tidsi.base;
1566 14211 : while (uip < uie && nip < nie && res == LOG_OK) {
1567 14157 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1568 14157 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1569 :
1570 14157 : if (uiv < niv) {
1571 7916 : upd = BUNtail(ovi, uip);
1572 15832 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1573 7916 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1574 : res = LOG_ERR;
1575 7916 : uip++;
1576 6241 : } else if (uiv == niv) {
1577 : /* handle == */
1578 18 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1579 18 : upd = BUNtail(upi, nip);
1580 36 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1581 18 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1582 : res = LOG_ERR;
1583 : } else {
1584 0 : upd = BUNtail(ovi, uip);
1585 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1586 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1587 : res = LOG_ERR;
1588 : }
1589 18 : uip++;
1590 18 : nip++;
1591 : } else { /* uiv > niv */
1592 6223 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1593 6223 : upd = BUNtail(upi, nip);
1594 12446 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1595 6223 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1596 : res = LOG_ERR;
1597 : }
1598 6223 : nip++;
1599 : }
1600 : }
1601 275 : while (uip < uie && res == LOG_OK) {
1602 221 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1603 221 : upd = BUNtail(ovi, uip);
1604 442 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1605 221 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1606 : res = LOG_ERR;
1607 221 : uip++;
1608 : }
1609 383 : while (nip < nie && res == LOG_OK) {
1610 329 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1611 329 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1612 329 : upd = BUNtail(upi, nip);
1613 658 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1614 329 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1615 : res = LOG_ERR;
1616 : }
1617 329 : nip++;
1618 : }
1619 54 : bat_iterator_end(&uii);
1620 54 : bat_iterator_end(&tidsi);
1621 54 : bat_iterator_end(&ovi);
1622 54 : if (res == LOG_OK) {
1623 54 : temp_destroy(cs->uibid);
1624 54 : temp_destroy(cs->uvbid);
1625 54 : nui = transfer_to_systrans(nui);
1626 54 : nuv = transfer_to_systrans(nuv);
1627 54 : if (nui == NULL || nuv == NULL) {
1628 : res = LOG_ERR;
1629 : } else {
1630 54 : cs->uibid = temp_create(nui);
1631 54 : cs->uvbid = temp_create(nuv);
1632 54 : cs->ucnt = BATcount(nui);
1633 : }
1634 : }
1635 : }
1636 54 : bat_destroy(nui);
1637 54 : bat_destroy(nuv);
1638 : }
1639 : }
1640 : }
1641 2651 : bat_iterator_end(&upi);
1642 2651 : bat_destroy(b);
1643 2651 : unlock_table(tr->store, t->base.id);
1644 2651 : bat_destroy(ins);
1645 2651 : bat_destroy(ui);
1646 2651 : bat_destroy(uv);
1647 2651 : if (otids != tids)
1648 10 : bat_destroy(tids);
1649 2651 : if (oupdates != updates)
1650 11 : bat_destroy(updates);
1651 2651 : return res;
1652 : } else if (is_new || cs->cleared) {
1653 323 : BAT *b = temp_descriptor(cs->bid);
1654 :
1655 323 : if (b == NULL) {
1656 : res = LOG_ERR;
1657 : } else {
1658 323 : if (BATcount(b)==0) {
1659 1 : if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
1660 0 : res = LOG_ERR;
1661 322 : } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
1662 0 : res = LOG_ERR;
1663 323 : BBPcold(b->batCacheid);
1664 323 : bat_destroy(b);
1665 : }
1666 : }
1667 323 : unlock_table(tr->store, t->base.id);
1668 323 : if (otids != tids)
1669 2 : bat_destroy(tids);
1670 323 : if (oupdates != updates)
1671 41 : bat_destroy(updates);
1672 : return res;
1673 : }
1674 :
1675 : static int
1676 2974 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
1677 : {
1678 2974 : return cs_update_bat(tr, bat, t, tids, updates, is_new);
1679 : }
1680 :
1681 : static void *
1682 4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
1683 : {
1684 4 : void *newoffsets = NULL;
1685 4 : sql_delta *bat = *batp;
1686 4 : column_storage *cs = &bat->cs;
1687 4 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1688 :
1689 4 : if (!u)
1690 : return NULL;
1691 4 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1692 4 : if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
1693 0 : bat_destroy(u);
1694 0 : return NULL;
1695 : } else {
1696 4 : int new = 0;
1697 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1698 4 : if (BATcount(u) >= max_cnt) {
1699 0 : if (max_cnt == INT_MAX) { /* decompress */
1700 : if (!(b = temp_descriptor(cs->bid))) {
1701 : bat_destroy(u);
1702 : return NULL;
1703 : }
1704 : n = DICTdecompress_(b, u, PERSISTENT);
1705 : /* TODO decompress updates if any */
1706 : bat_destroy(b);
1707 : assert(newoffsets == NULL);
1708 : if (!n) {
1709 : bat_destroy(u);
1710 : return NULL;
1711 : }
1712 : if (cs->ts != tr->tid) {
1713 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1714 : bat_destroy(n);
1715 : bat_destroy(u);
1716 : return NULL;
1717 : }
1718 : cs = &(*batp)->cs;
1719 : new = 1;
1720 : cs->uibid = cs->uvbid = 0;
1721 : }
1722 : if (cs->bid && !new)
1723 : temp_destroy(cs->bid);
1724 : n = transfer_to_systrans(n);
1725 : if (n == NULL) {
1726 : bat_destroy(u);
1727 : return NULL;
1728 : }
1729 : bat_set_access(n, BAT_READ);
1730 : cs->bid = temp_create(n);
1731 : bat_destroy(n);
1732 : if (cs->ebid && !new)
1733 : temp_destroy(cs->ebid);
1734 : cs->ebid = 0;
1735 : cs->st = ST_DEFAULT;
1736 : /* at append_col the column's storage type is cleared */
1737 : cs->cleared = true;
1738 : } else {
1739 0 : if (!(b = temp_descriptor(cs->bid))) {
1740 0 : GDKfree(newoffsets);
1741 0 : bat_destroy(u);
1742 0 : return NULL;
1743 : }
1744 0 : n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
1745 0 : bat_destroy(b);
1746 0 : if (!n) {
1747 0 : GDKfree(newoffsets);
1748 0 : bat_destroy(u);
1749 0 : return NULL;
1750 : }
1751 0 : if (cs->ts != tr->tid) {
1752 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1753 0 : bat_destroy(u);
1754 0 : bat_destroy(n);
1755 0 : return NULL;
1756 : }
1757 0 : cs = &(*batp)->cs;
1758 0 : new = 1;
1759 0 : temp_dup(cs->ebid);
1760 0 : if (cs->uibid) {
1761 0 : temp_dup(cs->uibid);
1762 0 : temp_dup(cs->uvbid);
1763 : }
1764 : }
1765 0 : if (cs->bid)
1766 0 : temp_destroy(cs->bid);
1767 0 : n = transfer_to_systrans(n);
1768 0 : if (n == NULL) {
1769 0 : bat_destroy(u);
1770 0 : return NULL;
1771 : }
1772 0 : bat_set_access(n, BAT_READ);
1773 0 : cs->bid = temp_create(n);
1774 0 : bat_destroy(n);
1775 0 : cs->cleared = true;
1776 0 : i = newoffsets;
1777 : }
1778 : } else { /* append */
1779 4 : i = newoffsets;
1780 : }
1781 : }
1782 4 : bat_destroy(u);
1783 4 : return i;
1784 : }
1785 :
1786 : static void *
1787 1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
1788 : {
1789 1 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1790 1 : void *newoffsets = NULL;
1791 1 : BAT *b = NULL, *n = NULL;
1792 :
1793 1 : if (!(b = temp_descriptor(cs->bid)))
1794 : return NULL;
1795 :
1796 1 : if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
1797 0 : bat_destroy(b);
1798 0 : return NULL;
1799 : } else {
1800 : /* returns new offset bat if values within min/max, else decompress */
1801 1 : if (!newoffsets) {
1802 1 : n = FORdecompress_(b, offsetval, tt, PERSISTENT);
1803 1 : bat_destroy(b);
1804 1 : if (!n)
1805 : return NULL;
1806 : /* TODO decompress updates if any */
1807 1 : if (cs->bid)
1808 1 : temp_destroy(cs->bid);
1809 1 : n = transfer_to_systrans(n);
1810 1 : if (n == NULL)
1811 : return NULL;
1812 1 : bat_set_access(n, BAT_READ);
1813 1 : cs->bid = temp_create(n);
1814 1 : cs->st = ST_DEFAULT;
1815 : /* at append_col the column's storage type is cleared */
1816 1 : cs->cleared = true;
1817 1 : b = n;
1818 : } else { /* append */
1819 : i = newoffsets;
1820 : }
1821 : }
1822 1 : bat_destroy(b);
1823 1 : return i;
1824 : }
1825 :
1826 : static int
1827 5116 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
1828 : {
1829 5116 : void *oupd = upd;
1830 5116 : sql_delta *bat = *batp;
1831 5116 : column_storage *cs = &bat->cs;
1832 5116 : storage *s = ATOMIC_PTR_GET(&t->data);
1833 5116 : assert(!is_oid_nil(rid));
1834 5116 : int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
1835 :
1836 5116 : if (cs->st == ST_DICT) {
1837 : /* possibly a new array is returned */
1838 0 : upd = dict_append_val(tr, batp, upd, 1);
1839 0 : bat = *batp;
1840 0 : cs = &bat->cs;
1841 0 : if (!upd)
1842 : return LOG_ERR;
1843 : }
1844 :
1845 : /* check if rid is insert ? */
1846 5116 : if (!inplace) {
1847 : /* check conflict */
1848 2242 : if (segments_is_deleted(s->segs->h, tr, rid)) {
1849 0 : if (oupd != upd)
1850 0 : GDKfree(upd);
1851 0 : return LOG_CONFLICT;
1852 : }
1853 2242 : BAT *ui, *uv;
1854 :
1855 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1856 : /* currently only one update delta is possible */
1857 2242 : if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1858 0 : if (oupd != upd)
1859 0 : GDKfree(upd);
1860 0 : return LOG_ERR;
1861 : }
1862 :
1863 2242 : assert(uv->ttype);
1864 2242 : assert(BATcount(ui) == BATcount(uv));
1865 4484 : if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
1866 2242 : BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
1867 0 : if (oupd != upd)
1868 0 : GDKfree(upd);
1869 0 : bat_destroy(ui);
1870 0 : bat_destroy(uv);
1871 0 : return LOG_ERR;
1872 : }
1873 2242 : assert(BATcount(ui) == BATcount(uv));
1874 2242 : bat_destroy(ui);
1875 2242 : bat_destroy(uv);
1876 2242 : cs->ucnt++;
1877 : } else {
1878 2874 : BAT *b = NULL;
1879 :
1880 2874 : if((b = temp_descriptor(cs->bid)) == NULL) {
1881 0 : if (oupd != upd)
1882 0 : GDKfree(upd);
1883 0 : return LOG_ERR;
1884 : }
1885 2874 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
1886 0 : if (oupd != upd)
1887 0 : GDKfree(upd);
1888 0 : bat_destroy(b);
1889 0 : return LOG_ERR;
1890 : }
1891 2874 : bat_destroy(b);
1892 : }
1893 5116 : if (oupd != upd)
1894 0 : GDKfree(upd);
1895 : return LOG_OK;
1896 : }
1897 :
1898 : static int
1899 5116 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
1900 : {
1901 5116 : int res = LOG_OK;
1902 5116 : lock_table(tr->store, t->base.id);
1903 5116 : res = cs_update_val(tr, bat, t, rid, upd, is_new);
1904 5116 : unlock_table(tr->store, t->base.id);
1905 5116 : return res;
1906 : }
1907 :
1908 : static int
1909 158867 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
1910 : {
1911 158867 : (void)tr;
1912 158867 : if (!ocs)
1913 : return LOG_OK;
1914 158867 : cs->bid = ocs->bid;
1915 158867 : cs->ebid = ocs->ebid;
1916 158867 : cs->uibid = ocs->uibid;
1917 158867 : cs->uvbid = ocs->uvbid;
1918 158867 : cs->ucnt = ocs->ucnt;
1919 :
1920 158867 : if (temp) {
1921 25970 : cs->bid = temp_copy(cs->bid, true, false);
1922 25970 : if (cs->bid == BID_NIL)
1923 : return LOG_ERR;
1924 : } else {
1925 132897 : temp_dup(cs->bid);
1926 : }
1927 158875 : if (cs->ebid)
1928 6 : temp_dup(cs->ebid);
1929 158875 : cs->ucnt = 0;
1930 158875 : cs->uibid = e_bat(TYPE_oid);
1931 158862 : cs->uvbid = e_bat(type);
1932 158876 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1933 : return LOG_ERR;
1934 158876 : cs->st = ocs->st;
1935 158876 : return LOG_OK;
1936 : }
1937 :
1938 : static void
1939 312662 : destroy_delta(sql_delta *b, bool recursive)
1940 : {
1941 312662 : if (ATOMIC_DEC(&b->cs.refcnt) > 0)
1942 : return;
1943 293356 : if (recursive && b->next)
1944 127641 : destroy_delta(b->next, true);
1945 293356 : if (b->cs.uibid)
1946 94673 : temp_destroy(b->cs.uibid);
1947 293356 : if (b->cs.uvbid)
1948 94673 : temp_destroy(b->cs.uvbid);
1949 293356 : if (b->cs.bid)
1950 293356 : temp_destroy(b->cs.bid);
1951 293356 : if (b->cs.ebid)
1952 61 : temp_destroy(b->cs.ebid);
1953 293356 : b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
1954 293356 : _DELETE(b);
1955 : }
1956 :
1957 : static sql_delta *
1958 15725648 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
1959 : {
1960 15725648 : sql_delta *obat = ATOMIC_PTR_GET(&c->data);
1961 :
1962 15725648 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
1963 15592759 : return obat;
1964 132889 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
1965 : /* abort */
1966 12 : if (update_conflict)
1967 4 : *update_conflict = true;
1968 8 : else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
1969 8 : return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
1970 4 : return NULL;
1971 : }
1972 132877 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
1973 : return NULL;
1974 132878 : sql_delta* bat = ZNEW(sql_delta);
1975 132877 : if (!bat)
1976 : return NULL;
1977 132877 : ATOMIC_INIT(&bat->cs.refcnt, 1);
1978 132877 : if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
1979 0 : destroy_delta(bat, false);
1980 0 : return NULL;
1981 : }
1982 132879 : bat->cs.ts = tr->tid;
1983 : /* only one writer else abort */
1984 132879 : bat->next = obat;
1985 132879 : if (obat)
1986 132879 : bat->nr_updates = obat->nr_updates;
1987 132879 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
1988 0 : bat->next = NULL;
1989 0 : destroy_delta(bat, false);
1990 0 : if (update_conflict)
1991 0 : *update_conflict = true;
1992 0 : return NULL;
1993 : }
1994 : return bat;
1995 : }
1996 :
1997 : static int
1998 8090 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
1999 : {
2000 8090 : int ok = LOG_OK;
2001 :
2002 8090 : if (is_bat) {
2003 2974 : BAT *tids = incoming_tids;
2004 2974 : BAT *values = incoming_values;
2005 2974 : if (BATcount(tids) == 0)
2006 : return LOG_OK;
2007 2974 : ok = delta_update_bat(tr, delta, table, tids, values, is_new);
2008 : } else {
2009 5116 : ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
2010 : }
2011 : return ok;
2012 : }
2013 :
2014 : static int
2015 8113 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
2016 : {
2017 8113 : int res = LOG_OK;
2018 8113 : bool update_conflict = false;
2019 8113 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2020 :
2021 8113 : if (isbat) {
2022 2994 : BAT *t = tids;
2023 2994 : if (!BATcount(t))
2024 : return LOG_OK;
2025 : }
2026 :
2027 7815 : if (c == NULL)
2028 : return LOG_ERR;
2029 :
2030 7815 : if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
2031 4 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2032 :
2033 7811 : assert(delta && delta->cs.ts == tr->tid);
2034 7811 : assert(c->t->persistence != SQL_DECLARED_TABLE);
2035 7811 : if (odelta != delta)
2036 3426 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
2037 :
2038 7811 : odelta = delta;
2039 7811 : if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
2040 : return res;
2041 7811 : assert(delta == odelta);
2042 7811 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2043 0 : res = sql_trans_alter_storage(tr, c, NULL);
2044 : return res;
2045 : }
2046 :
2047 : static sql_delta *
2048 2457 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
2049 : {
2050 2457 : sql_delta *obat = ATOMIC_PTR_GET(&i->data);
2051 :
2052 2457 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
2053 2429 : return obat;
2054 28 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
2055 : /* abort */
2056 0 : if (update_conflict)
2057 0 : *update_conflict = true;
2058 0 : return NULL;
2059 : }
2060 28 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
2061 : return NULL;
2062 28 : sql_delta* bat = ZNEW(sql_delta);
2063 28 : if (!bat)
2064 : return NULL;
2065 28 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2066 33 : if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
2067 0 : destroy_delta(bat, false);
2068 0 : return NULL;
2069 : }
2070 28 : bat->cs.ts = tr->tid;
2071 : /* only one writer else abort */
2072 28 : bat->next = obat;
2073 28 : if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
2074 0 : bat->next = NULL;
2075 0 : destroy_delta(bat, false);
2076 0 : if (update_conflict)
2077 0 : *update_conflict = true;
2078 0 : return NULL;
2079 : }
2080 : return bat;
2081 : }
2082 :
2083 : static int
2084 782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
2085 : {
2086 782 : int res = LOG_OK;
2087 782 : bool update_conflict = false;
2088 782 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
2089 :
2090 782 : if (isbat) {
2091 782 : BAT *t = tids;
2092 782 : if (!BATcount(t))
2093 : return LOG_OK;
2094 : }
2095 :
2096 279 : if (i == NULL)
2097 : return LOG_ERR;
2098 :
2099 279 : if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
2100 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2101 :
2102 279 : assert(delta && delta->cs.ts == tr->tid);
2103 279 : if (odelta != delta)
2104 22 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
2105 :
2106 279 : odelta = delta;
2107 279 : res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
2108 279 : assert(delta == odelta);
2109 : return res;
2110 : }
2111 :
2112 : static int
2113 150599 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
2114 : {
2115 150599 : BAT *b, *oi = i;
2116 150599 : int err = 0;
2117 150599 : sql_delta *bat = *batp;
2118 :
2119 150599 : assert(!offsets || BATcount(offsets) == BATcount(i));
2120 150599 : if (!BATcount(i))
2121 : return LOG_OK;
2122 150599 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
2123 : return LOG_ERR;
2124 :
2125 150599 : lock_column(tr->store, id);
2126 150592 : if (bat->cs.st == ST_DICT) {
2127 13 : BAT *ni = dict_append_bat(tr, batp, oi);
2128 13 : bat = *batp;
2129 13 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2130 0 : bat_destroy(oi);
2131 13 : oi = ni;
2132 13 : if (!oi) {
2133 0 : unlock_column(tr->store, id);
2134 0 : return LOG_ERR;
2135 : }
2136 : }
2137 150592 : if (bat->cs.st == ST_FOR) {
2138 0 : BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
2139 0 : bat = *batp;
2140 0 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2141 0 : bat_destroy(oi);
2142 0 : oi = ni;
2143 0 : if (!oi) {
2144 0 : unlock_column(tr->store, id);
2145 0 : return LOG_ERR;
2146 : }
2147 : }
2148 :
2149 150592 : b = temp_descriptor(bat->cs.bid);
2150 150602 : if (b == NULL) {
2151 0 : unlock_column(tr->store, id);
2152 0 : if (oi != i)
2153 0 : bat_destroy(oi);
2154 0 : return LOG_ERR;
2155 : }
2156 150602 : if (!offsets && offset == b->hseqbase+BATcount(b)) {
2157 150414 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2158 0 : err = 1;
2159 176 : } else if (!offsets) {
2160 176 : if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
2161 0 : err = 1;
2162 12 : } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
2163 0 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2164 0 : err = 1;
2165 12 : } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
2166 0 : err = 1;
2167 : }
2168 150570 : bat_destroy(b);
2169 150586 : unlock_column(tr->store, id);
2170 :
2171 150587 : if (oi != i)
2172 13 : bat_destroy(oi);
2173 150587 : return (err)?LOG_ERR:LOG_OK;
2174 : }
2175 :
2176 : // Look at the offsets and find where the replacements end and the appends begin.
2177 : static BUN
2178 0 : start_of_appends(BAT *offsets, BUN bcnt)
2179 : {
2180 0 : BUN ocnt = BATcount(offsets);
2181 0 : if (ocnt == 0)
2182 : return 0;
2183 :
2184 0 : BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
2185 0 : if (highest < bcnt)
2186 : // all are replacements
2187 : return ocnt;
2188 :
2189 : // reason backward to find the first append.
2190 : // Suppose offsets has 15 entries, bcnt == 100
2191 : // and the highest offset in offsets is 109.
2192 0 : BUN new_bcnt = highest + 1; // 110
2193 0 : BUN nappends = new_bcnt - bcnt; // 10
2194 0 : BUN nreplacements = ocnt - nappends; // 5
2195 :
2196 : // The first append should be to position bcnt
2197 0 : assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
2198 :
2199 : return nreplacements;
2200 : }
2201 :
2202 :
2203 : static int
2204 15440049 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
2205 : {
2206 15440049 : void *oi = i;
2207 15440049 : BAT *b;
2208 15440049 : lock_column(tr->store, id);
2209 15439882 : sql_delta *bat = *batp;
2210 :
2211 15439882 : if (bat->cs.st == ST_DICT) {
2212 : /* possibly a new array is returned */
2213 4 : i = dict_append_val(tr, batp, i, cnt);
2214 4 : bat = *batp;
2215 4 : if (!i) {
2216 0 : unlock_column(tr->store, id);
2217 0 : return LOG_ERR;
2218 : }
2219 : }
2220 15439882 : if (bat->cs.st == ST_FOR) {
2221 : /* possibly a new array is returned */
2222 1 : i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
2223 1 : bat = *batp;
2224 1 : if (!i) {
2225 0 : unlock_column(tr->store, id);
2226 0 : return LOG_ERR;
2227 : }
2228 : }
2229 :
2230 15439882 : b = temp_descriptor(bat->cs.bid);
2231 15439615 : if (b == NULL) {
2232 0 : if (i != oi)
2233 0 : GDKfree(i);
2234 0 : unlock_column(tr->store, id);
2235 0 : return LOG_ERR;
2236 : }
2237 15439615 : BUN bcnt = BATcount(b);
2238 :
2239 15439615 : if (offsets) {
2240 : // The first few might be replacements while later items might be appends.
2241 : // Handle the replacements here while leaving the appends to the code below.
2242 0 : BUN nreplacements = start_of_appends(offsets, bcnt);
2243 :
2244 0 : oid *start = Tloc(offsets, 0);
2245 0 : if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
2246 0 : bat_destroy(b);
2247 0 : if (i != oi)
2248 0 : GDKfree(i);
2249 0 : unlock_column(tr->store, id);
2250 0 : return LOG_ERR;
2251 : }
2252 :
2253 : // Replacements have been handled. The rest are appends.
2254 0 : assert(offset == oid_nil);
2255 0 : offset = bcnt;
2256 0 : cnt -= nreplacements;
2257 : }
2258 :
2259 15439615 : if (bcnt > offset){
2260 539980 : size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
2261 539980 : if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
2262 0 : bat_destroy(b);
2263 0 : if (i != oi)
2264 0 : GDKfree(i);
2265 0 : unlock_column(tr->store, id);
2266 0 : return LOG_ERR;
2267 : }
2268 540252 : cnt -= ccnt;
2269 540252 : offset += ccnt;
2270 : }
2271 15439887 : if (cnt) {
2272 14899635 : if (BATcount(b) < offset) { /* add space */
2273 11330 : BUN d = offset - BATcount(b);
2274 11330 : if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
2275 0 : bat_destroy(b);
2276 0 : if (i != oi)
2277 0 : GDKfree(i);
2278 0 : unlock_column(tr->store, id);
2279 0 : return LOG_ERR;
2280 : }
2281 : }
2282 14899635 : if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
2283 0 : bat_destroy(b);
2284 0 : if (i != oi)
2285 0 : GDKfree(i);
2286 0 : unlock_column(tr->store, id);
2287 0 : return LOG_ERR;
2288 : }
2289 : }
2290 15439968 : bat_destroy(b);
2291 15439809 : if (i != oi)
2292 4 : GDKfree(i);
2293 15439809 : unlock_column(tr->store, id);
2294 15439809 : return LOG_OK;
2295 : }
2296 :
2297 : static int
2298 25970 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
2299 : {
2300 25970 : if (!(bat->segs = new_segments(tr, 0)))
2301 : return LOG_ERR;
2302 25970 : return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
2303 : }
2304 :
2305 : static int
2306 15590667 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type)
2307 : {
2308 15590667 : int ok = LOG_OK;
2309 :
2310 15590667 : if ((*delta)->cs.merged)
2311 36766 : (*delta)->cs.merged = false; /* TODO needs to move */
2312 15590667 : if (isbat) {
2313 150597 : BAT *bat = incoming_data;
2314 :
2315 150597 : if (BATcount(bat))
2316 150603 : ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
2317 : } else {
2318 15440070 : ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
2319 : }
2320 15590528 : return ok;
2321 : }
2322 :
2323 : static int
2324 15589244 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2325 : {
2326 15589244 : int res = LOG_OK;
2327 15589244 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2328 :
2329 15589244 : if (isbat) {
2330 150417 : BAT *t = data;
2331 150417 : if (!BATcount(t))
2332 : return LOG_OK;
2333 : }
2334 :
2335 15588449 : if ((delta = bind_col_data(tr, c, NULL)) == NULL)
2336 : return LOG_ERR;
2337 :
2338 15588404 : assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
2339 :
2340 15588404 : odelta = delta;
2341 15588404 : if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type)) != LOG_OK)
2342 : return res;
2343 15588321 : if (odelta != delta) {
2344 0 : delta->next = odelta;
2345 0 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
2346 0 : delta->next = NULL;
2347 0 : destroy_delta(delta, false);
2348 0 : return LOG_CONFLICT;
2349 : }
2350 : }
2351 15588321 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2352 1 : res = sql_trans_alter_storage(tr, c, NULL);
2353 : return res;
2354 : }
2355 :
2356 : static int
2357 2184 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2358 : {
2359 2184 : int res = LOG_OK;
2360 2184 : sql_delta *delta;
2361 :
2362 2184 : if (isbat) {
2363 1010 : BAT *t = data;
2364 1010 : if (!BATcount(t))
2365 : return LOG_OK;
2366 : }
2367 :
2368 2172 : if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
2369 : return LOG_ERR;
2370 :
2371 2172 : assert(delta->cs.st == ST_DEFAULT);
2372 :
2373 2172 : res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL);
2374 2172 : return res;
2375 : }
2376 :
2377 : static int
2378 73058 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
2379 : {
2380 73058 : int err = 0;
2381 :
2382 : /* TODO check for conflicting updates */
2383 73058 : (void)rid;
2384 73058 : (void)cnt;
2385 546537 : for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
2386 473479 : sql_column *c = n->data;
2387 473479 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
2388 :
2389 : /* check for active updates */
2390 473479 : if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
2391 : return 1;
2392 : }
2393 : return 0;
2394 : }
2395 :
2396 : static int
2397 69475 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
2398 : {
2399 69475 : int in_transaction = segments_in_transaction(tr, t);
2400 :
2401 69475 : lock_table(tr->store, t->base.id);
2402 : /* find segment of rid, split, mark new segment deleted (for tr->tid) */
2403 69475 : segment *seg = s->segs->h, *p = NULL;
2404 51462018 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2405 51462018 : if (seg->start <= rid && seg->end > rid) {
2406 69475 : if (!SEG_VALID_4_DELETE(seg,tr)) {
2407 4 : unlock_table(tr->store, t->base.id);
2408 4 : return LOG_CONFLICT;
2409 : }
2410 69471 : if (deletes_conflict_updates( tr, t, rid, 1)) {
2411 0 : unlock_table(tr->store, t->base.id);
2412 0 : return LOG_CONFLICT;
2413 : }
2414 69471 : if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
2415 0 : unlock_table(tr->store, t->base.id);
2416 0 : return LOG_ERR;
2417 : }
2418 : break;
2419 : }
2420 : }
2421 69471 : unlock_table(tr->store, t->base.id);
2422 69471 : if (!in_transaction)
2423 12831 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2424 : return LOG_OK;
2425 : }
2426 :
2427 : static int
2428 3585 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
2429 : {
2430 3585 : segment *seg = *Seg, *p = NULL;
2431 12017 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2432 12015 : if (seg->start <= start && seg->end > start) {
2433 3635 : size_t lcnt = cnt;
2434 3635 : if (start+lcnt > seg->end)
2435 54 : lcnt = seg->end-start;
2436 3635 : if (SEG_IS_DELETED(seg, tr)) {
2437 47 : start += lcnt;
2438 47 : cnt -= lcnt;
2439 47 : continue;
2440 3588 : } else if (!SEG_VALID_4_DELETE(seg, tr))
2441 1 : return LOG_CONFLICT;
2442 3587 : if (deletes_conflict_updates( tr, t, start, lcnt))
2443 : return LOG_CONFLICT;
2444 3587 : *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
2445 3587 : if (!seg)
2446 : return LOG_ERR;
2447 3587 : start += lcnt;
2448 3587 : cnt -= lcnt;
2449 : }
2450 11967 : if (start+cnt <= seg->end)
2451 : break;
2452 : }
2453 : return LOG_OK;
2454 : }
2455 :
2456 : static int
2457 666 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
2458 : {
2459 666 : segment *seg = s->segs->h;
2460 666 : return seg_delete_range(tr, t, s, &seg, start, cnt);
2461 : }
2462 :
2463 : static int
2464 302 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
2465 : {
2466 302 : int in_transaction = segments_in_transaction(tr, t);
2467 302 : BAT *oi = i; /* update ids */
2468 302 : int ok = LOG_OK;
2469 :
2470 302 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
2471 : return LOG_ERR;
2472 302 : if (BATcount(i)) {
2473 539 : if (BATtdense(i)) {
2474 237 : size_t start = i->tseqbase;
2475 237 : size_t cnt = BATcount(i);
2476 :
2477 237 : lock_table(tr->store, t->base.id);
2478 237 : ok = delete_range(tr, t, s, start, cnt);
2479 237 : unlock_table(tr->store, t->base.id);
2480 65 : } else if (complex_cand(i)) {
2481 0 : struct canditer ci;
2482 0 : oid f = 0, l = 0, cur = 0;
2483 :
2484 0 : canditer_init(&ci, NULL, i);
2485 0 : cur = f = canditer_next(&ci);
2486 :
2487 0 : lock_table(tr->store, t->base.id);
2488 0 : if (!is_oid_nil(f)) {
2489 0 : segment *seg = s->segs->h;
2490 0 : for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
2491 0 : if (cur+1 == l) {
2492 0 : cur++;
2493 0 : continue;
2494 : }
2495 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2496 0 : f = cur = l;
2497 : }
2498 0 : if (ok == LOG_OK)
2499 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2500 : }
2501 0 : unlock_table(tr->store, t->base.id);
2502 : } else {
2503 65 : if (!i->tsorted) {
2504 0 : assert(oi == i);
2505 0 : BAT *ni = NULL;
2506 0 : if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
2507 0 : ok = LOG_ERR;
2508 0 : if (ni)
2509 0 : i = ni;
2510 : }
2511 65 : assert(i->tsorted);
2512 65 : BUN icnt = BATcount(i);
2513 65 : BATiter ii = bat_iterator(i);
2514 65 : oid *o = ii.base, n = o[0]+1;
2515 65 : size_t lcnt = 1;
2516 :
2517 65 : lock_table(tr->store, t->base.id);
2518 65 : segment *seg = s->segs->h;
2519 22227 : for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
2520 22162 : if (o[i] == n) {
2521 21807 : lcnt++;
2522 21807 : n++;
2523 : } else {
2524 355 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2525 355 : lcnt = 0;
2526 : }
2527 22162 : if (!lcnt) {
2528 355 : n = o[i]+1;
2529 355 : lcnt = 1;
2530 : }
2531 : }
2532 65 : bat_iterator_end(&ii);
2533 65 : if (lcnt && ok == LOG_OK)
2534 65 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2535 65 : unlock_table(tr->store, t->base.id);
2536 : }
2537 : }
2538 302 : if (i != oi)
2539 25 : bat_destroy(i);
2540 : // assert
2541 302 : if (!in_transaction)
2542 270 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2543 : return ok;
2544 : }
2545 :
2546 : static void
2547 51407 : destroy_segments(segments *s)
2548 : {
2549 51407 : if (!s || sql_ref_dec(&s->r) > 0)
2550 0 : return;
2551 51407 : segment *seg = s->h;
2552 111088 : while(seg) {
2553 59681 : segment *n = ATOMIC_PTR_GET(&seg->next);
2554 59681 : ATOMIC_PTR_DESTROY(&seg->next);
2555 59681 : _DELETE(seg);
2556 59681 : seg = n;
2557 : }
2558 51407 : _DELETE(s);
2559 : }
2560 :
2561 : static void
2562 52804 : destroy_storage(storage *bat)
2563 : {
2564 52804 : if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
2565 : return;
2566 51265 : if (bat->next)
2567 14776 : destroy_storage(bat->next);
2568 51265 : destroy_segments(bat->segs);
2569 51265 : if (bat->cs.uibid)
2570 30345 : temp_destroy(bat->cs.uibid);
2571 51265 : if (bat->cs.uvbid)
2572 30345 : temp_destroy(bat->cs.uvbid);
2573 51265 : if (bat->cs.bid)
2574 51265 : temp_destroy(bat->cs.bid);
2575 51265 : bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
2576 51265 : _DELETE(bat);
2577 : }
2578 :
2579 : static int
2580 172893 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
2581 : {
2582 172893 : if (uncommitted) {
2583 440062 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2584 288806 : if (!VALID_4_READ(s->ts,tr))
2585 : return 1;
2586 : } else {
2587 173238 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2588 152746 : if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
2589 : return 1;
2590 : }
2591 :
2592 : return 0;
2593 : }
2594 :
2595 : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
2596 :
2597 : storage *
2598 2192526 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
2599 : {
2600 2192526 : storage *obat;
2601 :
2602 2192526 : obat = ATOMIC_PTR_GET(&t->data);
2603 :
2604 2192526 : if (obat->cs.ts != tr->tid)
2605 1516808 : if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
2606 1516753 : if (obat->cs.ts >= TRANSACTION_ID_BASE) {
2607 : /* abort */
2608 15567 : if (clear)
2609 15567 : *clear = true;
2610 15567 : return NULL;
2611 : }
2612 :
2613 2176959 : if (!clear)
2614 : return obat;
2615 :
2616 : /* remainder is only to handle clear */
2617 26214 : if (segments_conflict(tr, obat->segs, 1)) {
2618 244 : *clear = true;
2619 244 : return NULL;
2620 : }
2621 25970 : if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
2622 : return NULL;
2623 25969 : storage *bat = ZNEW(storage);
2624 25970 : if (!bat)
2625 : return NULL;
2626 25970 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2627 25970 : if (dup_storage(tr, obat, bat) != LOG_OK) {
2628 0 : destroy_storage(bat);
2629 0 : return NULL;
2630 : }
2631 25970 : bat->cs.cleared = true;
2632 25970 : bat->cs.ts = tr->tid;
2633 : /* only one writer else abort */
2634 25970 : bat->next = obat;
2635 25970 : if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
2636 4 : bat->next = NULL;
2637 4 : destroy_storage(bat);
2638 4 : if (clear)
2639 4 : *clear = true;
2640 4 : return NULL;
2641 : }
2642 : return bat;
2643 : }
2644 :
2645 : static int
2646 69826 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
2647 : {
2648 69826 : int ok = LOG_OK;
2649 69826 : BAT *b = ib;
2650 69826 : storage *bat;
2651 :
2652 69826 : if (isbat && !BATcount(b))
2653 : return ok;
2654 :
2655 69777 : if (t == NULL)
2656 : return LOG_ERR;
2657 :
2658 69777 : if ((bat = bind_del_data(tr, t, NULL)) == NULL)
2659 : return LOG_ERR;
2660 :
2661 69777 : if (isbat)
2662 302 : ok = storage_delete_bat(tr, t, bat, ib);
2663 : else
2664 69475 : ok = storage_delete_val(tr, t, bat, *(oid*)ib);
2665 : return ok;
2666 : }
2667 :
2668 : static size_t
2669 0 : dcount_col(sql_trans *tr, sql_column *c)
2670 : {
2671 0 : sql_delta *b;
2672 :
2673 0 : if (!isTable(c->t))
2674 : return 0;
2675 0 : b = col_timestamp_delta(tr, c);
2676 0 : if (!b)
2677 : return 1;
2678 :
2679 0 : storage *s = ATOMIC_PTR_GET(&c->t->data);
2680 0 : if (!s || !s->segs->t)
2681 : return 1;
2682 0 : size_t cnt = s->segs->t->end;
2683 0 : if (cnt) {
2684 0 : BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
2685 0 : size_t dcnt = 0;
2686 :
2687 0 : if (v)
2688 0 : dcnt = BATguess_uniques(v, NULL);
2689 0 : return dcnt;
2690 : }
2691 : return cnt;
2692 : }
2693 :
2694 : static BAT *
2695 3837777 : bind_no_view(BAT *b, bool quick)
2696 : {
2697 3837777 : if (VIEWtparent(b)) { /* If it is a view get the parent BAT */
2698 3834037 : BAT *nb = BBP_desc(VIEWtparent(b));
2699 3834037 : bat_destroy(b);
2700 3834159 : if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
2701 : return NULL;
2702 : }
2703 : return b;
2704 : }
2705 :
2706 : static int
2707 0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
2708 : {
2709 0 : int ok = 0;
2710 0 : assert(tr->active);
2711 0 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2712 0 : return 0;
2713 0 : lock_column(tr->store, c->base.id);
2714 0 : if (unique_est) {
2715 0 : sql_delta *d;
2716 0 : if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
2717 0 : BAT *b;
2718 0 : if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
2719 0 : MT_lock_set(&b->theaplock);
2720 0 : b->tunique_est = *unique_est;
2721 0 : MT_lock_unset(&b->theaplock);
2722 0 : bat_destroy(b);
2723 : }
2724 : }
2725 : }
2726 0 : if (min) {
2727 0 : _DELETE(c->min);
2728 0 : size_t minlen = ATOMlen(c->type.type->localtype, min);
2729 0 : if ((c->min = GDKmalloc(minlen)) != NULL) {
2730 0 : memcpy(c->min, min, minlen);
2731 0 : ok = 1;
2732 : }
2733 : }
2734 0 : if (max) {
2735 0 : _DELETE(c->max);
2736 0 : size_t maxlen = ATOMlen(c->type.type->localtype, max);
2737 0 : if ((c->max = GDKmalloc(maxlen)) != NULL) {
2738 0 : memcpy(c->max, max, maxlen);
2739 0 : ok = 1;
2740 : }
2741 : }
2742 0 : unlock_column(tr->store, c->base.id);
2743 0 : return ok;
2744 : }
2745 :
2746 : static int
2747 19 : min_max_col(sql_trans *tr, sql_column *c)
2748 : {
2749 19 : int ok = 0;
2750 19 : BAT *b = NULL;
2751 19 : sql_delta *d = NULL;
2752 :
2753 19 : assert(tr->active);
2754 19 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2755 0 : return 0;
2756 19 : if (c->min && c->max)
2757 : return 1;
2758 19 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2759 19 : if (d->cs.st == ST_FOR)
2760 : return 0;
2761 19 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2762 19 : lock_column(tr->store, c->base.id);
2763 19 : if (c->min && c->max) {
2764 0 : unlock_column(tr->store, c->base.id);
2765 0 : return 1;
2766 : }
2767 19 : _DELETE(c->min);
2768 19 : _DELETE(c->max);
2769 19 : if ((b = bind_col(tr, c, access))) {
2770 19 : if (!(b = bind_no_view(b, false))) {
2771 0 : unlock_column(tr->store, c->base.id);
2772 0 : return 0;
2773 : }
2774 19 : BATiter bi = bat_iterator(b);
2775 19 : if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
2776 16 : const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
2777 16 : size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
2778 :
2779 16 : if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
2780 0 : _DELETE(c->min);
2781 0 : _DELETE(c->max);
2782 : } else {
2783 16 : memcpy(c->min, nmin, minlen);
2784 16 : memcpy(c->max, nmax, maxlen);
2785 16 : ok = 1;
2786 : }
2787 : }
2788 19 : bat_iterator_end(&bi);
2789 19 : bat_destroy(b);
2790 : }
2791 19 : unlock_column(tr->store, c->base.id);
2792 : }
2793 : return ok;
2794 : }
2795 :
2796 : static size_t
2797 17 : count_segs(segment *s)
2798 : {
2799 17 : size_t nr = 0;
2800 :
2801 72 : for( ; s; s = ATOMIC_PTR_GET(&s->next))
2802 55 : nr++;
2803 17 : return nr;
2804 : }
2805 :
2806 : static size_t
2807 34 : count_del(sql_trans *tr, sql_table *t, int access)
2808 : {
2809 34 : storage *d;
2810 :
2811 34 : if (!isTable(t))
2812 : return 0;
2813 34 : d = tab_timestamp_storage(tr, t);
2814 34 : if (!d)
2815 : return 0;
2816 34 : if (access == 2)
2817 0 : return d->cs.ucnt;
2818 34 : if (access == 1)
2819 0 : return count_inserts(d->segs->h, tr);
2820 34 : if (access == 10) /* special case for counting the number of segments */
2821 17 : return count_segs(d->segs->h);
2822 17 : return count_deletes(d->segs->h, tr);
2823 : }
2824 :
2825 : static int
2826 21594 : sorted_col(sql_trans *tr, sql_column *col)
2827 : {
2828 21594 : int sorted = 0;
2829 :
2830 21594 : assert(tr->active);
2831 21594 : if (!isTable(col->t) || !col->t->s)
2832 : return 0;
2833 :
2834 21594 : if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
2835 21575 : BAT *b = bind_col(tr, col, QUICK);
2836 :
2837 21575 : if (b)
2838 21575 : sorted = b->tsorted || b->trevsorted;
2839 : }
2840 : return sorted;
2841 : }
2842 :
2843 : static int
2844 7818 : unique_col(sql_trans *tr, sql_column *col)
2845 : {
2846 7818 : int distinct = 0;
2847 :
2848 7818 : assert(tr->active);
2849 7818 : if (!isTable(col->t) || !col->t->s)
2850 : return 0;
2851 :
2852 7818 : if (col && ATOMIC_PTR_GET(&col->data)) {
2853 7818 : BAT *b = bind_col(tr, col, QUICK);
2854 :
2855 7818 : if (b)
2856 7818 : distinct = b->tkey;
2857 : }
2858 : return distinct;
2859 : }
2860 :
2861 : static int
2862 2049 : double_elim_col(sql_trans *tr, sql_column *col)
2863 : {
2864 2049 : int de = 0;
2865 2049 : sql_delta *d;
2866 :
2867 2049 : assert(tr->active);
2868 2049 : if (!isTable(col->t) || !col->t->s)
2869 : return 0;
2870 :
2871 2049 : if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
2872 6 : if (d->cs.st == ST_DICT) {
2873 6 : BAT *b = bind_col(tr, col, QUICK);
2874 6 : if (b && b->ttype == TYPE_bte)
2875 : de = 1;
2876 0 : else if (b && b->ttype == TYPE_sht)
2877 2049 : de = 2;
2878 : }
2879 2043 : } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
2880 2043 : BAT *b = bind_col(tr, col, QUICK);
2881 :
2882 2043 : if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
2883 2043 : de = GDK_ELIMDOUBLES(b->tvheap);
2884 2043 : if (de)
2885 1813 : de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
2886 : }
2887 1813 : assert(de >= 0 && de <= 16);
2888 : }
2889 : return de;
2890 : }
2891 :
2892 : static int
2893 3874261 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
2894 : {
2895 3874261 : int ok = 0;
2896 3874261 : BAT *b = NULL, *off = NULL, *upv = NULL;
2897 3874261 : sql_delta *d = NULL;
2898 :
2899 3874261 : (void) tr;
2900 3874261 : assert(tr->active);
2901 3874261 : *nonil = false;
2902 3874261 : *unique = false;
2903 3874261 : *unique_est = 0.0;
2904 3874261 : if (!c || !isTable(c->t) || !c->t->s)
2905 : return ok;
2906 :
2907 3874023 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2908 3834821 : if (d->cs.st == ST_FOR) {
2909 27 : *nonil = true; /* TODO for min/max. I will do it later */
2910 27 : return ok;
2911 : }
2912 3834794 : int eclass = c->type.type->eclass;
2913 3834794 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2914 3834794 : if ((b = bind_col(tr, c, access))) {
2915 3834718 : if (!(b = bind_no_view(b, false)))
2916 0 : return ok;
2917 3834816 : BATiter bi = bat_iterator(b);
2918 3834869 : *nonil = bi.nonil && !bi.nil;
2919 :
2920 3834869 : if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
2921 3529272 : d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
2922 2283597 : if (c->min && VALinit(min, bi.type, c->min))
2923 : ok |= 1;
2924 2283543 : else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
2925 2274100 : ok |= 1;
2926 2283608 : if (c->max && VALinit(max, bi.type, c->max))
2927 54 : ok |= 2;
2928 2283546 : else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
2929 2268166 : ok |= 2;
2930 : }
2931 3834849 : if (d->cs.ucnt == 0) {
2932 3831600 : if (d->cs.st == ST_DEFAULT) {
2933 3830900 : *unique = bi.key;
2934 3830900 : *unique_est = bi.unique_est;
2935 3830900 : if (*unique_est == 0)
2936 1238359 : *unique_est = (double)BATguess_uniques(b,NULL);
2937 700 : } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
2938 : /* for dict, check the offsets bat for uniqueness */
2939 700 : MT_lock_set(&off->theaplock);
2940 700 : *unique = off->tkey;
2941 700 : *unique_est = off->tunique_est;
2942 700 : MT_lock_unset(&off->theaplock);
2943 : }
2944 : }
2945 3834855 : bat_iterator_end(&bi);
2946 3834871 : bat_destroy(b);
2947 3834732 : if (*nonil && d->cs.ucnt > 0) {
2948 : /* This could use a quick descriptor */
2949 2342 : if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
2950 0 : *nonil = false;
2951 : } else {
2952 2342 : MT_lock_set(&upv->theaplock);
2953 2342 : *nonil &= upv->tnonil && !upv->tnil;
2954 2342 : MT_lock_unset(&upv->theaplock);
2955 2342 : bat_destroy(upv);
2956 : }
2957 : }
2958 : }
2959 : }
2960 : return ok;
2961 : }
2962 :
2963 : static int
2964 257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
2965 : {
2966 257 : assert(tr->active);
2967 257 : if (!isTable(col->t) || !col->t->s)
2968 : return LOG_OK;
2969 :
2970 252 : if (col && ATOMIC_PTR_GET(&col->data)) {
2971 252 : BAT *b = bind_col(tr, col, QUICK);
2972 :
2973 252 : if (b) { /* add props for ranges [min, max> */
2974 252 : MT_lock_set(&b->theaplock);
2975 252 : if (add_range) {
2976 179 : BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
2977 179 : if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
2978 103 : BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
2979 : else
2980 76 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2981 179 : if (!pt->with_nills || !col->null)
2982 117 : BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
2983 : } else {
2984 73 : BATrmprop_nolock(b, GDK_MIN_BOUND);
2985 73 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2986 73 : BATrmprop_nolock(b, GDK_NOT_NULL);
2987 : }
2988 252 : MT_lock_unset(&b->theaplock);
2989 : }
2990 : }
2991 : return LOG_OK;
2992 : }
2993 :
2994 : static int
2995 4074 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
2996 : {
2997 4074 : assert(tr->active);
2998 4074 : if (!isTable(col->t) || !col->t->s)
2999 : return LOG_OK;
3000 :
3001 4044 : if (col && ATOMIC_PTR_GET(&col->data)) {
3002 4044 : BAT *b = bind_col(tr, col, QUICK);
3003 :
3004 4044 : if (b) { /* add props for ranges [min, max> */
3005 4044 : if (not_null) {
3006 4042 : BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
3007 : } else {
3008 2 : BATrmprop(b, GDK_NOT_NULL);
3009 : }
3010 : }
3011 : }
3012 : return LOG_OK;
3013 : }
3014 :
3015 : static int
3016 27466 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
3017 : {
3018 27466 : sqlstore *store = tr->store;
3019 27466 : int bid = log_find_bat(store->logger, id);
3020 27466 : if (bid <= 0)
3021 : return LOG_ERR;
3022 27466 : cs->bid = temp_dup(bid);
3023 27466 : cs->ucnt = 0;
3024 27466 : cs->uibid = e_bat(TYPE_oid);
3025 27466 : cs->uvbid = e_bat(type);
3026 27466 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
3027 : return LOG_ERR;
3028 : return LOG_OK;
3029 : }
3030 :
3031 : static int
3032 68290 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
3033 : {
3034 68290 : int res = LOG_OK;
3035 68290 : gdk_return ok;
3036 68290 : BAT *b = temp_descriptor(bat->cs.bid);
3037 :
3038 68290 : if (b == NULL)
3039 : return LOG_ERR;
3040 :
3041 68290 : if (!bat->cs.uibid)
3042 68282 : bat->cs.uibid = e_bat(TYPE_oid);
3043 68290 : if (!bat->cs.uvbid)
3044 68282 : bat->cs.uvbid = e_bat(b->ttype);
3045 68290 : if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
3046 0 : res = LOG_ERR;
3047 68290 : if (GDKinmemory(0)) {
3048 177 : bat_destroy(b);
3049 177 : return res;
3050 : }
3051 :
3052 68113 : bat_set_access(b, BAT_READ);
3053 68113 : sqlstore *store = tr->store;
3054 68113 : ok = log_bat_persists(store->logger, b, id);
3055 68113 : bat_destroy(b);
3056 68113 : if(res != LOG_OK)
3057 : return res;
3058 68113 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3059 : }
3060 :
3061 : static int
3062 0 : new_persistent_delta( sql_delta *bat)
3063 : {
3064 0 : bat->cs.ucnt = 0;
3065 0 : return LOG_OK;
3066 : }
3067 :
3068 : static void
3069 130320 : create_delta( sql_delta *d, BAT *b)
3070 : {
3071 130320 : bat_set_access(b, BAT_READ);
3072 130320 : d->cs.bid = temp_create(b);
3073 130320 : d->cs.uibid = d->cs.uvbid = 0;
3074 130320 : d->cs.ucnt = 0;
3075 130320 : }
3076 :
3077 : static bat
3078 7293 : copyBat (bat i, int type, oid seq)
3079 : {
3080 7293 : BAT *b, *tb;
3081 7293 : bat res;
3082 :
3083 7293 : if (!i)
3084 : return i;
3085 7293 : tb = quick_descriptor(i);
3086 7293 : if (tb == NULL)
3087 : return 0;
3088 7293 : b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
3089 7293 : if (b == NULL)
3090 : return 0;
3091 :
3092 7293 : bat_set_access(b, BAT_READ);
3093 :
3094 7293 : res = temp_create(b);
3095 7293 : bat_destroy(b);
3096 7293 : return res;
3097 : }
3098 :
3099 : static int
3100 151356 : create_col(sql_trans *tr, sql_column *c)
3101 : {
3102 151356 : int ok = LOG_OK, new = 0;
3103 151356 : int type = c->type.type->localtype;
3104 151356 : sql_delta *bat = ATOMIC_PTR_GET(&c->data);
3105 :
3106 151356 : if (!bat) {
3107 151356 : new = 1;
3108 151356 : bat = ZNEW(sql_delta);
3109 151356 : if (!bat)
3110 : return LOG_ERR;
3111 151356 : ATOMIC_PTR_SET(&c->data, bat);
3112 151356 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3113 : }
3114 :
3115 151356 : if (new)
3116 151356 : bat->cs.ts = tr->tid;
3117 :
3118 151356 : if (!isNew(c)&& !isTempTable(c->t)){
3119 21014 : bat->cs.ts = tr->ts;
3120 21014 : ok = load_cs(tr, &bat->cs, type, c->base.id);
3121 21014 : if (ok == LOG_OK && c->storage_type) {
3122 4 : if (strcmp(c->storage_type, "DICT") == 0) {
3123 2 : sqlstore *store = tr->store;
3124 2 : int bid = log_find_bat(store->logger, -c->base.id);
3125 2 : if (bid <= 0)
3126 : return LOG_ERR;
3127 2 : bat->cs.ebid = temp_dup(bid);
3128 2 : bat->cs.st = ST_DICT;
3129 2 : } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
3130 2 : bat->cs.st = ST_FOR;
3131 : }
3132 : }
3133 21014 : return ok;
3134 130342 : } else if (bat && bat->cs.bid) {
3135 0 : return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
3136 : } else {
3137 130342 : sql_column *fc = NULL;
3138 130342 : size_t cnt = 0;
3139 :
3140 : /* alter ? */
3141 130342 : if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
3142 79856 : storage *s = tab_timestamp_storage(tr, fc->t);
3143 79856 : if (s == NULL)
3144 : return LOG_ERR;
3145 79856 : cnt = segs_end(s->segs, tr, c->t);
3146 : }
3147 130342 : if (cnt && fc != c) {
3148 22 : sql_delta *d = ATOMIC_PTR_GET(&fc->data);
3149 :
3150 22 : if (d->cs.bid) {
3151 22 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3152 22 : if(bat->cs.bid == BID_NIL)
3153 22 : ok = LOG_ERR;
3154 : }
3155 22 : if (d->cs.uibid) {
3156 10 : bat->cs.uibid = e_bat(TYPE_oid);
3157 10 : if (bat->cs.uibid == BID_NIL)
3158 22 : ok = LOG_ERR;
3159 : }
3160 22 : if (d->cs.uvbid) {
3161 10 : bat->cs.uvbid = e_bat(type);
3162 10 : if(bat->cs.uvbid == BID_NIL)
3163 0 : ok = LOG_ERR;
3164 : }
3165 : } else {
3166 130320 : BAT *b = bat_new(type, c->t->sz, PERSISTENT);
3167 130320 : if (!b) {
3168 : ok = LOG_ERR;
3169 : } else {
3170 130320 : create_delta(ATOMIC_PTR_GET(&c->data), b);
3171 130320 : bat_destroy(b);
3172 : }
3173 :
3174 130320 : if (!new) {
3175 0 : bat->cs.uibid = e_bat(TYPE_oid);
3176 0 : if (bat->cs.uibid == BID_NIL)
3177 0 : ok = LOG_ERR;
3178 0 : bat->cs.uvbid = e_bat(type);
3179 0 : if(bat->cs.uvbid == BID_NIL)
3180 0 : ok = LOG_ERR;
3181 : }
3182 : }
3183 130342 : bat->cs.ucnt = 0;
3184 :
3185 130342 : if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
3186 91 : trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
3187 : }
3188 : return ok;
3189 : }
3190 :
3191 : static int
3192 61922 : log_create_col_(sql_trans *tr, sql_column *c)
3193 : {
3194 61922 : assert(!isTempTable(c->t));
3195 61922 : return log_create_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3196 : }
3197 :
3198 : static int
3199 87 : log_create_col(sql_trans *tr, sql_change *change)
3200 : {
3201 87 : return log_create_col_(tr, (sql_column*)change->obj);
3202 : }
3203 :
3204 : static int
3205 117710 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
3206 : {
3207 117710 : (void) t; // TODO transaction_layer_revamp: remove if unnecessary
3208 117710 : (void)oldest;
3209 117710 : assert(delta->cs.ts == tr->tid);
3210 117710 : delta->cs.ts = commit_ts;
3211 :
3212 117710 : assert(delta->next == NULL);
3213 117710 : if (!delta->cs.merged)
3214 117709 : delta->nr_updates += merge_delta(delta);
3215 117710 : if (!tr->parent)
3216 117706 : base->new = 0;
3217 117710 : return LOG_OK;
3218 : }
3219 :
3220 : static int
3221 91 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3222 : {
3223 91 : sql_column *c = (sql_column*)change->obj;
3224 91 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3225 91 : if (!tr->parent)
3226 90 : c->base.new = 0;
3227 91 : return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
3228 : }
3229 :
3230 : /* will be called for new idx's and when new index columns are created */
3231 : static int
3232 9314 : create_idx(sql_trans *tr, sql_idx *ni)
3233 : {
3234 9314 : int ok = LOG_OK, new = 0;
3235 9314 : sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
3236 9314 : int type = TYPE_lng;
3237 :
3238 9314 : if (oid_index(ni->type))
3239 954 : type = TYPE_oid;
3240 :
3241 9314 : if (!bat) {
3242 9314 : new = 1;
3243 9314 : bat = ZNEW(sql_delta);
3244 9314 : if (!bat)
3245 : return LOG_ERR;
3246 9314 : ATOMIC_PTR_INIT(&ni->data, bat);
3247 9314 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3248 : }
3249 :
3250 9314 : if (new)
3251 9314 : bat->cs.ts = tr->tid;
3252 :
3253 9314 : if (!isNew(ni) && !isTempTable(ni->t)){
3254 2043 : bat->cs.ts = 1;
3255 2043 : return load_cs(tr, &bat->cs, type, ni->base.id);
3256 7271 : } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
3257 0 : return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
3258 : } else {
3259 7271 : sql_column *c = ol_first_node(ni->t->columns)->data;
3260 7271 : sql_delta *d = col_timestamp_delta(tr, c);
3261 :
3262 7271 : if (d) {
3263 : /* Here we also handle indices created through alter stmts */
3264 : /* These need to be created aligned to the existing data */
3265 7271 : if (d->cs.bid) {
3266 7271 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3267 7271 : if(bat->cs.bid == BID_NIL)
3268 7271 : ok = LOG_ERR;
3269 : }
3270 : } else {
3271 : return LOG_ERR;
3272 : }
3273 :
3274 7271 : bat->cs.ucnt = 0;
3275 :
3276 7271 : if (!new) {
3277 0 : bat->cs.uibid = e_bat(TYPE_oid);
3278 0 : if (bat->cs.uibid == BID_NIL)
3279 0 : ok = LOG_ERR;
3280 0 : bat->cs.uvbid = e_bat(type);
3281 0 : if(bat->cs.uvbid == BID_NIL)
3282 0 : ok = LOG_ERR;
3283 : }
3284 7271 : bat->cs.ucnt = 0;
3285 7271 : if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
3286 631 : trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
3287 : }
3288 : return ok;
3289 : }
3290 :
3291 : static int
3292 6368 : log_create_idx_(sql_trans *tr, sql_idx *i)
3293 : {
3294 6368 : assert(!isTempTable(i->t));
3295 6368 : return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3296 : }
3297 :
3298 : static int
3299 617 : log_create_idx(sql_trans *tr, sql_change *change)
3300 : {
3301 617 : return log_create_idx_(tr, (sql_idx*)change->obj);
3302 : }
3303 :
3304 : static int
3305 631 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3306 : {
3307 631 : sql_idx *i = (sql_idx*)change->obj;
3308 631 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3309 631 : if (!tr->parent)
3310 631 : i->base.new = 0;
3311 631 : return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
3312 : return LOG_OK;
3313 : }
3314 :
3315 : static int
3316 4409 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
3317 : {
3318 4409 : int ok = load_cs(tr, &s->cs, TYPE_msk, id);
3319 4409 : BAT *b = NULL, *ib = NULL;
3320 :
3321 4409 : if (ok != LOG_OK)
3322 : return ok;
3323 4409 : if (!(b = temp_descriptor(s->cs.bid)))
3324 : return LOG_ERR;
3325 4409 : ib = b;
3326 :
3327 4409 : if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
3328 0 : bat_destroy(ib);
3329 0 : return LOG_ERR;
3330 : }
3331 :
3332 4409 : if (BATcount(b)) {
3333 378 : if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
3334 0 : bat_destroy(ib);
3335 0 : return LOG_ERR;
3336 : }
3337 569 : if (BATtdense(b)) {
3338 191 : size_t start = b->tseqbase;
3339 191 : size_t cnt = BATcount(b);
3340 191 : ok = delete_range(tr, t, s, start, cnt);
3341 : } else {
3342 187 : assert(b->tsorted);
3343 187 : BUN icnt = BATcount(b);
3344 187 : BATiter bi = bat_iterator(b);
3345 187 : size_t lcnt = 1;
3346 187 : oid n;
3347 187 : segment *seg = s->segs->h;
3348 187 : if (complex_cand(b)) {
3349 0 : oid o = * (oid *) Tpos(&bi, 0);
3350 0 : n = o + 1;
3351 0 : for (BUN i = 1; i < icnt; i++) {
3352 0 : o = * (oid *) Tpos(&bi, i);
3353 0 : if (o == n) {
3354 0 : lcnt++;
3355 0 : n++;
3356 : } else {
3357 0 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3358 : break;
3359 : lcnt = 0;
3360 : }
3361 0 : if (!lcnt) {
3362 0 : n = o + 1;
3363 0 : lcnt = 1;
3364 : }
3365 : }
3366 : } else {
3367 187 : oid *o = bi.base;
3368 187 : n = o[0]+1;
3369 282576 : for (size_t i=1; i<icnt; i++) {
3370 282389 : if (o[i] == n) {
3371 279890 : lcnt++;
3372 279890 : n++;
3373 : } else {
3374 2499 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3375 : break;
3376 : lcnt = 0;
3377 : }
3378 282389 : if (!lcnt) {
3379 2499 : n = o[i]+1;
3380 2499 : lcnt = 1;
3381 : }
3382 : }
3383 : }
3384 187 : if (lcnt && ok == LOG_OK)
3385 187 : ok = delete_range(tr, t, s, n-lcnt, lcnt);
3386 187 : bat_iterator_end(&bi);
3387 : }
3388 378 : if (ok == LOG_OK)
3389 6367 : for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
3390 5989 : if (seg->ts == tr->tid)
3391 3119 : seg->ts = 1;
3392 : } else {
3393 4031 : if (ok == LOG_OK) {
3394 4031 : BAT *bb = quick_descriptor(s->cs.bid);
3395 :
3396 4031 : if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
3397 : ok = LOG_ERR;
3398 : } else {
3399 4031 : segment *seg = s->segs->h;
3400 4031 : if (seg->ts == tr->tid)
3401 4031 : seg->ts = 1;
3402 : }
3403 : }
3404 : }
3405 4409 : if (b != ib)
3406 4409 : bat_destroy(b);
3407 4409 : bat_destroy(ib);
3408 :
3409 4409 : return ok;
3410 : }
3411 :
3412 : static int
3413 25335 : create_del(sql_trans *tr, sql_table *t)
3414 : {
3415 25335 : int ok = LOG_OK, new = 0;
3416 25335 : BAT *b;
3417 25335 : storage *bat = ATOMIC_PTR_GET(&t->data);
3418 :
3419 25335 : if (!bat) {
3420 25335 : new = 1;
3421 25335 : bat = ZNEW(storage);
3422 25335 : if(!bat)
3423 : return LOG_ERR;
3424 25335 : ATOMIC_PTR_INIT(&t->data, bat);
3425 25335 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3426 25335 : bat->cs.ts = tr->tid;
3427 : }
3428 :
3429 25335 : if (!isNew(t) && !isTempTable(t)) {
3430 4409 : bat->cs.ts = tr->ts;
3431 4409 : return load_storage(tr, t, bat, t->base.id);
3432 20926 : } else if (bat->cs.bid) {
3433 : return ok;
3434 : } else {
3435 20926 : assert(!bat->segs);
3436 20926 : if (!(bat->segs = new_segments(tr, 0)))
3437 : return LOG_ERR;
3438 :
3439 20926 : b = bat_new(TYPE_msk, t->sz, PERSISTENT);
3440 20926 : if(b != NULL) {
3441 20926 : bat_set_access(b, BAT_READ);
3442 20926 : bat->cs.bid = temp_create(b);
3443 20926 : bat_destroy(b);
3444 : } else {
3445 : return LOG_ERR;
3446 : }
3447 20926 : if (new)
3448 27524 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
3449 : }
3450 : return ok;
3451 : }
3452 :
3453 : static int
3454 192044 : log_segment(sql_trans *tr, segment *s, sqlid id)
3455 : {
3456 192044 : sqlstore *store = tr->store;
3457 192044 : msk m = s->deleted;
3458 192044 : return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
3459 : }
3460 :
3461 : static int
3462 97039 : log_segments(sql_trans *tr, segments *segs, sqlid id)
3463 : {
3464 : /* log segments */
3465 97039 : lock_table(tr->store, id);
3466 511894 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3467 414855 : unlock_table(tr->store, id);
3468 414855 : if (seg->ts == tr->tid && seg->end-seg->start) {
3469 145458 : if (log_segment(tr, seg, id) != LOG_OK) {
3470 : return LOG_ERR;
3471 : }
3472 : }
3473 414855 : lock_table(tr->store, id);
3474 : }
3475 97039 : unlock_table(tr->store, id);
3476 97039 : return LOG_OK;
3477 : }
3478 :
3479 : static int
3480 12564 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
3481 : {
3482 12564 : BAT *b;
3483 12564 : int ok = LOG_OK;
3484 :
3485 12564 : if (GDKinmemory(0))
3486 : return LOG_OK;
3487 :
3488 12531 : b = temp_descriptor(bat->cs.bid);
3489 12531 : if (b == NULL)
3490 : return LOG_ERR;
3491 :
3492 12531 : sqlstore *store = tr->store;
3493 12531 : bat_set_access(b, BAT_READ);
3494 12531 : if (ok == LOG_OK)
3495 12531 : ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
3496 12531 : if (ok == LOG_OK)
3497 12531 : ok = log_segments(tr, bat->segs, t->base.id);
3498 12531 : bat_destroy(b);
3499 12531 : return ok;
3500 : }
3501 :
3502 : static int
3503 12578 : log_create_del(sql_trans *tr, sql_change *change)
3504 : {
3505 12578 : int ok = LOG_OK;
3506 12578 : sql_table *t = (sql_table*)change->obj;
3507 :
3508 12578 : if (t->base.deleted)
3509 : return ok;
3510 12564 : assert(!isTempTable(t));
3511 12564 : ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
3512 12564 : if (ok == LOG_OK) {
3513 74399 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3514 61835 : sql_column *c = n->data;
3515 :
3516 61835 : ok = log_create_col_(tr, c);
3517 : }
3518 12564 : if (t->idxs) {
3519 18327 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3520 5763 : sql_idx *i = n->data;
3521 :
3522 5763 : if (ATOMIC_PTR_GET(&i->data))
3523 5751 : ok = log_create_idx_(tr, i);
3524 : }
3525 : }
3526 : }
3527 : return ok;
3528 : }
3529 :
3530 : static int
3531 20928 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3532 : {
3533 20928 : int ok = LOG_OK;
3534 20928 : sql_table *t = (sql_table*)change->obj;
3535 20928 : storage *dbat = ATOMIC_PTR_GET(&t->data);
3536 :
3537 20928 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
3538 120 : assert(isTempTable(t));
3539 120 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
3540 120 : if (commit_ts) dbat->segs->h->ts = commit_ts;
3541 120 : return ok;
3542 : }
3543 :
3544 20808 : if (!commit_ts) /* rollback handled by ? */
3545 : return ok;
3546 18953 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
3547 18953 : assert(ok == LOG_OK);
3548 18953 : if (ok != LOG_OK)
3549 : return ok;
3550 18953 : merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
3551 18953 : assert(dbat->cs.ts == tr->tid);
3552 18953 : dbat->cs.ts = commit_ts;
3553 18953 : if (ok == LOG_OK) {
3554 130177 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3555 111224 : sql_column *c = n->data;
3556 111224 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3557 :
3558 111224 : ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
3559 : }
3560 18953 : if (t->idxs) {
3561 24729 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3562 5776 : sql_idx *i = n->data;
3563 5776 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3564 :
3565 5776 : if (delta)
3566 5764 : ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
3567 : }
3568 : }
3569 18953 : if (!tr->parent)
3570 18951 : t->base.new = 0;
3571 : }
3572 18953 : if (!tr->parent)
3573 18951 : t->base.new = 0;
3574 : return ok;
3575 : }
3576 :
3577 : static int
3578 20023 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
3579 : {
3580 20023 : gdk_return ok = GDK_SUCCEED;
3581 :
3582 20023 : sqlstore *store = tr->store;
3583 20023 : if (!GDKinmemory(0) && b && b->cs.bid)
3584 18843 : ok = log_bat_transient(store->logger, id);
3585 20023 : if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
3586 25 : ok = log_bat_transient(store->logger, -id);
3587 20023 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3588 : }
3589 :
3590 : static int
3591 169092 : destroy_col(sqlstore *store, sql_column *c)
3592 : {
3593 169092 : (void)store;
3594 169092 : if (ATOMIC_PTR_GET(&c->data))
3595 169092 : destroy_delta(ATOMIC_PTR_GET(&c->data), true);
3596 169092 : ATOMIC_PTR_SET(&c->data, NULL);
3597 169092 : return LOG_OK;
3598 : }
3599 :
3600 : static int
3601 18140 : log_destroy_col_(sql_trans *tr, sql_column *c)
3602 : {
3603 18140 : int ok = LOG_OK;
3604 18140 : assert(!isTempTable(c->t));
3605 18140 : if (!tr->parent) /* don't write save point commits */
3606 18140 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3607 18140 : return ok;
3608 : }
3609 :
3610 : static int
3611 18140 : log_destroy_col(sql_trans *tr, sql_change *change)
3612 : {
3613 18140 : sql_column *c = (sql_column*)change->obj;
3614 18140 : int res = log_destroy_col_(tr, c);
3615 18140 : change->obj = NULL;
3616 18140 : column_destroy(tr->store, c);
3617 18140 : return res;
3618 : }
3619 :
3620 : static int
3621 10663 : destroy_idx(sqlstore *store, sql_idx *i)
3622 : {
3623 10663 : (void)store;
3624 10663 : if (ATOMIC_PTR_GET(&i->data))
3625 10663 : destroy_delta(ATOMIC_PTR_GET(&i->data), true);
3626 10663 : ATOMIC_PTR_SET(&i->data, NULL);
3627 10663 : return LOG_OK;
3628 : }
3629 :
3630 : static int
3631 1959 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
3632 : {
3633 1959 : int ok = LOG_OK;
3634 1959 : assert(!isTempTable(i->t));
3635 1959 : if (ATOMIC_PTR_GET(&i->data)) {
3636 1883 : if (!tr->parent) /* don't write save point commits */
3637 1883 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3638 : }
3639 1959 : return ok;
3640 : }
3641 :
3642 : static int
3643 1959 : log_destroy_idx(sql_trans *tr, sql_change *change)
3644 : {
3645 1959 : sql_idx *i = (sql_idx*)change->obj;
3646 1959 : int res = log_destroy_idx_(tr, i);
3647 1959 : change->obj = NULL;
3648 1959 : idx_destroy(tr->store, i);
3649 1959 : return res;
3650 : }
3651 :
3652 : static int
3653 26838 : destroy_del(sqlstore *store, sql_table *t)
3654 : {
3655 26838 : (void)store;
3656 26838 : if (ATOMIC_PTR_GET(&t->data))
3657 26834 : destroy_storage(ATOMIC_PTR_GET(&t->data));
3658 26838 : ATOMIC_PTR_SET(&t->data, NULL);
3659 26838 : return LOG_OK;
3660 : }
3661 :
3662 : static int
3663 3264 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
3664 : {
3665 3264 : gdk_return ok = GDK_SUCCEED;
3666 :
3667 3264 : sqlstore *store = tr->store;
3668 3264 : if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
3669 3264 : bat && bat->cs.bid)
3670 3264 : ok = log_bat_transient(store->logger, id);
3671 3264 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3672 : }
3673 :
3674 : static int
3675 3264 : log_destroy_del(sql_trans *tr, sql_change *change)
3676 : {
3677 3264 : int ok = LOG_OK;
3678 3264 : sql_table *t = (sql_table*)change->obj;
3679 :
3680 3264 : assert(!isTempTable(t));
3681 3264 : ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
3682 3264 : return ok;
3683 : }
3684 :
3685 : static int
3686 23451 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3687 : {
3688 23451 : (void)tr;
3689 23451 : (void)change;
3690 23451 : (void)commit_ts;
3691 23451 : (void)oldest;
3692 23451 : if (commit_ts)
3693 23428 : change->handled = true;
3694 23451 : return 0;
3695 : }
3696 :
3697 : static int
3698 3335 : drop_del(sql_trans *tr, sql_table *t)
3699 : {
3700 3335 : int ok = LOG_OK;
3701 :
3702 3335 : if (!isNew(t)) {
3703 3335 : storage *bat = ATOMIC_PTR_GET(&t->data);
3704 3400 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, isTempTable(t) ? NULL : &log_destroy_del);
3705 : }
3706 3335 : return ok;
3707 : }
3708 :
3709 : static int
3710 18155 : drop_col(sql_trans *tr, sql_column *c)
3711 : {
3712 18155 : assert(!isNew(c));
3713 18155 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
3714 18155 : trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, isTempTable(c->t) ? NULL : &log_destroy_col);
3715 18155 : return LOG_OK;
3716 : }
3717 :
3718 : static int
3719 1961 : drop_idx(sql_trans *tr, sql_idx *i)
3720 : {
3721 1961 : assert(!isNew(i));
3722 1961 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
3723 1961 : trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, isTempTable(i->t) ? NULL : &log_destroy_idx);
3724 1961 : return LOG_OK;
3725 : }
3726 :
3727 :
3728 : static BUN
3729 129535 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
3730 : {
3731 129535 : BAT *b;
3732 129535 : BUN sz = 0;
3733 :
3734 129535 : (void)tr;
3735 129535 : assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
3736 129535 : if (cs->bid && renew) {
3737 129535 : b = quick_descriptor(cs->bid);
3738 129525 : if (b) {
3739 129525 : sz += BATcount(b);
3740 129525 : if (cs->st == ST_DICT) {
3741 2 : bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
3742 2 : BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
3743 :
3744 2 : if (nebid == BID_NIL || !n) {
3745 0 : temp_destroy(nebid);
3746 0 : bat_destroy(n);
3747 0 : return BUN_NONE;
3748 : }
3749 2 : temp_destroy(cs->ebid);
3750 2 : cs->ebid = nebid;
3751 2 : if (!temp)
3752 2 : bat_set_access(n, BAT_READ);
3753 2 : temp_destroy(cs->bid);
3754 2 : cs->bid = temp_create(n); /* create empty copy */
3755 2 : bat_destroy(n);
3756 : } else {
3757 129523 : bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
3758 :
3759 129506 : if (nbid == BID_NIL)
3760 : return BUN_NONE;
3761 129506 : temp_destroy(cs->bid);
3762 129481 : cs->bid = nbid;
3763 : }
3764 : } else {
3765 : return BUN_NONE;
3766 : }
3767 : }
3768 129483 : if (cs->uibid) {
3769 129339 : temp_destroy(cs->uibid);
3770 129401 : cs->uibid = 0;
3771 : }
3772 129545 : if (cs->uvbid) {
3773 129401 : temp_destroy(cs->uvbid);
3774 129400 : cs->uvbid = 0;
3775 : }
3776 129544 : cs->cleared = true;
3777 129544 : cs->ucnt = 0;
3778 129544 : return sz;
3779 : }
3780 :
3781 : static BUN
3782 129392 : clear_col(sql_trans *tr, sql_column *c, bool renew)
3783 : {
3784 129392 : bool update_conflict = false;
3785 129392 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
3786 :
3787 129392 : if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
3788 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3789 129394 : assert(c->t->persistence != SQL_DECLARED_TABLE);
3790 129394 : if (odelta != delta)
3791 129399 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
3792 129383 : if (delta)
3793 129383 : return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
3794 : return 0;
3795 : }
3796 :
3797 : static BUN
3798 21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
3799 : {
3800 21 : bool update_conflict = false;
3801 21 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
3802 :
3803 21 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
3804 15 : return 0;
3805 6 : if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
3806 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3807 6 : assert(i->t->persistence != SQL_DECLARED_TABLE);
3808 6 : if (odelta != delta)
3809 6 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
3810 6 : if (delta)
3811 6 : return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
3812 : return 0;
3813 : }
3814 :
3815 : static int
3816 142 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
3817 : {
3818 142 : if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
3819 : return LOG_ERR;
3820 142 : if (s->segs)
3821 142 : destroy_segments(s->segs);
3822 142 : if (!(s->segs = new_segments(tr, 0)))
3823 : return LOG_ERR;
3824 : return LOG_OK;
3825 : }
3826 :
3827 :
3828 : /*
3829 : * Clear the table, in general this means replacing the storage,
3830 : * but in case of earlier deletes (or inserts by this transaction), we only mark
3831 : * all segments as deleted.
3832 : * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
3833 : */
3834 : static BUN
3835 41832 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
3836 : {
3837 41832 : int clear = !in_transaction, ok = LOG_OK;
3838 41832 : bool conflict = false;
3839 41832 : storage *bat;
3840 :
3841 41883 : if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
3842 15815 : return conflict?BUN_NONE-1:BUN_NONE;
3843 :
3844 26017 : if (!clear) {
3845 51 : lock_table(tr->store, t->base.id);
3846 51 : ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
3847 51 : unlock_table(tr->store, t->base.id);
3848 : }
3849 26017 : assert(t->persistence != SQL_DECLARED_TABLE);
3850 26017 : if (!in_transaction)
3851 25969 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
3852 26017 : if (ok == LOG_ERR)
3853 : return BUN_NONE;
3854 26017 : if (ok == LOG_CONFLICT)
3855 0 : return BUN_NONE - 1;
3856 : return LOG_OK;
3857 : }
3858 :
3859 : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
3860 : static BUN
3861 41832 : clear_table(sql_trans *tr, sql_table *t)
3862 : {
3863 41832 : node *n = ol_first_node(t->columns);
3864 41832 : sql_column *c = n->data;
3865 41832 : storage *d = tab_timestamp_storage(tr, t);
3866 41832 : int in_transaction, clear;
3867 41832 : BUN sz, clear_ok;
3868 :
3869 41832 : if (!d)
3870 : return BUN_NONE;
3871 41832 : in_transaction = segments_in_transaction(tr, t);
3872 41832 : clear = !in_transaction;
3873 41832 : sz = count_col(tr, c, CNT_ACTIVE);
3874 41832 : if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
3875 : return clear_ok;
3876 :
3877 26017 : if (in_transaction)
3878 : return sz;
3879 :
3880 155359 : for (; n; n = n->next) {
3881 129393 : c = n->data;
3882 :
3883 129393 : if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
3884 0 : return clear_ok;
3885 : }
3886 25966 : if (t->idxs) {
3887 25987 : for (n = ol_first_node(t->idxs); n; n = n->next) {
3888 21 : sql_idx *ci = n->data;
3889 :
3890 21 : if (isTable(ci->t) && idx_has_column(ci->type) &&
3891 21 : (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
3892 0 : return clear_ok;
3893 : }
3894 : }
3895 25966 : if (clear)
3896 25966 : d->segs->nr_reused = 0;
3897 25966 : return sz;
3898 : }
3899 :
3900 : static int
3901 158779 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
3902 : {
3903 158779 : sqlstore *store = tr->store;
3904 158779 : gdk_return ok = GDK_SUCCEED;
3905 :
3906 158779 : (void) t;
3907 158779 : (void) segs;
3908 158779 : if (GDKinmemory(0))
3909 : return LOG_OK;
3910 :
3911 158772 : if (cs->cleared) {
3912 155418 : assert(cs->ucnt == 0);
3913 155418 : BAT *ins = temp_descriptor(cs->bid);
3914 155418 : if (!ins)
3915 : return LOG_ERR;
3916 155418 : assert(!isEbat(ins));
3917 155418 : bat_set_access(ins, BAT_READ);
3918 155418 : ok = log_bat_persists(store->logger, ins, id);
3919 155418 : bat_destroy(ins);
3920 155418 : if (ok == GDK_SUCCEED && cs->ebid) {
3921 56 : BAT *ins = temp_descriptor(cs->ebid);
3922 56 : if (!ins)
3923 : return LOG_ERR;
3924 56 : assert(!isEbat(ins));
3925 56 : bat_set_access(ins, BAT_READ);
3926 56 : ok = log_bat_persists(store->logger, ins, -id);
3927 56 : bat_destroy(ins);
3928 : }
3929 155418 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3930 : }
3931 :
3932 3354 : assert(!isTempTable(t));
3933 :
3934 3354 : if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
3935 2771 : BAT *ui = temp_descriptor(cs->uibid);
3936 2771 : BAT *uv = temp_descriptor(cs->uvbid);
3937 : /* any updates */
3938 2771 : if (ui == NULL || uv == NULL) {
3939 : ok = GDK_FAIL;
3940 2771 : } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
3941 2771 : ok = log_delta(store->logger, ui, uv, id);
3942 2771 : bat_destroy(ui);
3943 2771 : bat_destroy(uv);
3944 : }
3945 2771 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3946 : }
3947 :
3948 : static inline int
3949 58548 : tr_log_table_start(sql_trans *tr, sql_table *t) {
3950 58548 : sqlstore *store = tr->store;
3951 58548 : return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3952 : }
3953 :
3954 : static inline int
3955 58548 : tr_log_table_end(sql_trans *tr, sql_table *t) {
3956 58548 : sqlstore *store = tr->store;
3957 58548 : return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3958 : }
3959 :
3960 : static int
3961 58548 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
3962 : {
3963 58548 : sqlstore *store = tr->store;
3964 58548 : gdk_return ok = GDK_SUCCEED;
3965 :
3966 58548 : size_t end = segs_end(segs, tr, t);
3967 :
3968 58548 : if (tr_log_table_start(tr, t) != LOG_OK)
3969 : return LOG_ERR;
3970 :
3971 58548 : size_t nr_appends = 0;
3972 :
3973 58548 : lock_table(tr->store, t->base.id);
3974 435046 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3975 376498 : unlock_table(tr->store, t->base.id);
3976 :
3977 376498 : if (seg->ts == tr->tid && seg->end-seg->start) {
3978 114813 : if (!seg->deleted) {
3979 46586 : if (log_segment(tr, seg, t->base.id) != LOG_OK)
3980 : return LOG_ERR;
3981 :
3982 46586 : nr_appends += (seg->end - seg->start);
3983 : }
3984 : }
3985 376498 : lock_table(tr->store, t->base.id);
3986 : }
3987 58548 : unlock_table(tr->store, t->base.id);
3988 :
3989 405882 : for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
3990 347334 : sql_column *c = n->data;
3991 347334 : column_storage *cs = ATOMIC_PTR_GET(&c->data);
3992 :
3993 347334 : if (cs->cleared) {
3994 3 : ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
3995 3 : continue;
3996 : }
3997 :
3998 347331 : lock_table(tr->store, t->base.id);
3999 347331 : if (!cs->cleared) {
4000 2526667 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4001 2179336 : unlock_table(tr->store, t->base.id);
4002 2179336 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4003 : /* append col*/
4004 263513 : BAT *ins = temp_descriptor(cs->bid);
4005 263513 : if (ins == NULL)
4006 : return LOG_ERR;
4007 263513 : assert(BATcount(ins) >= cur->end);
4008 263513 : ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
4009 263513 : bat_destroy(ins);
4010 : }
4011 2179336 : lock_table(tr->store, t->base.id);
4012 : }
4013 : }
4014 347331 : unlock_table(tr->store, t->base.id);
4015 :
4016 347331 : if (ok == GDK_SUCCEED && cs->ebid) {
4017 19 : BAT *ins = temp_descriptor(cs->ebid);
4018 19 : if (ins == NULL)
4019 : return LOG_ERR;
4020 19 : if (BATcount(ins) > ins->batInserted)
4021 17 : ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
4022 19 : BATcommit(ins, BATcount(ins));
4023 19 : bat_destroy(ins);
4024 : }
4025 : }
4026 :
4027 58548 : if (t->idxs) {
4028 63730 : for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
4029 5182 : sql_idx *i = n->data;
4030 :
4031 5182 : if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
4032 4404 : continue;
4033 778 : column_storage *cs = ATOMIC_PTR_GET(&i->data);
4034 :
4035 778 : if (cs) {
4036 778 : if (cs->cleared) {
4037 0 : ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4038 0 : continue;
4039 : }
4040 :
4041 778 : lock_table(tr->store, t->base.id);
4042 2387 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4043 1609 : unlock_table(tr->store, t->base.id);
4044 1609 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4045 : /* append idx */
4046 730 : BAT *ins = temp_descriptor(cs->bid);
4047 730 : if (ins == NULL)
4048 : return LOG_ERR;
4049 730 : assert(BATcount(ins) >= cur->end);
4050 730 : ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
4051 730 : bat_destroy(ins);
4052 : }
4053 1609 : lock_table(tr->store, t->base.id);
4054 : }
4055 778 : unlock_table(tr->store, t->base.id);
4056 : }
4057 : }
4058 : }
4059 :
4060 58548 : if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
4061 0 : return LOG_ERR;
4062 :
4063 : return LOG_OK;
4064 : }
4065 :
4066 : static int
4067 84508 : log_storage(sql_trans *tr, sql_table *t, storage *s)
4068 : {
4069 84508 : int ok = LOG_OK;
4070 84508 : bool cleared = s->cs.cleared;
4071 84508 : if (ok == LOG_OK && cleared)
4072 25960 : ok = tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
4073 25960 : if (ok == LOG_OK)
4074 84508 : ok = log_segments(tr, s->segs, t->base.id);
4075 84508 : if (ok == LOG_OK && !cleared)
4076 58548 : ok = log_table_append(tr, t, s->segs);
4077 84508 : return ok;
4078 : }
4079 :
4080 : static void
4081 384288 : merge_cs( column_storage *cs, const char* caller)
4082 : {
4083 384288 : if (cs->bid && cs->ucnt) {
4084 2780 : BAT *cur = temp_descriptor(cs->bid);
4085 2780 : BAT *ui = temp_descriptor(cs->uibid);
4086 2780 : BAT *uv = temp_descriptor(cs->uvbid);
4087 :
4088 2780 : if (!cur || !ui || !uv) {
4089 0 : bat_destroy(ui);
4090 0 : bat_destroy(uv);
4091 0 : bat_destroy(cur);
4092 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4093 : return;
4094 : }
4095 2780 : assert(BATcount(ui) == BATcount(uv));
4096 :
4097 : /* any updates */
4098 2780 : assert(!isEbat(cur));
4099 2780 : if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
4100 0 : bat_destroy(ui);
4101 0 : bat_destroy(uv);
4102 0 : bat_destroy(cur);
4103 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4104 : return;
4105 : }
4106 : /* cleanup the old deltas */
4107 2780 : temp_destroy(cs->uibid);
4108 2780 : temp_destroy(cs->uvbid);
4109 2780 : cs->uibid = e_bat(TYPE_oid);
4110 2780 : cs->uvbid = e_bat(cur->ttype);
4111 2780 : assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
4112 2780 : cs->ucnt = 0;
4113 2780 : bat_destroy(ui);
4114 2780 : bat_destroy(uv);
4115 2780 : bat_destroy(cur);
4116 : }
4117 384288 : cs->cleared = false;
4118 384288 : cs->merged = true;
4119 384288 : return;
4120 : }
4121 :
4122 : static lng
4123 342200 : merge_delta( sql_delta *obat)
4124 : {
4125 342200 : lng res = 0;
4126 342200 : if (obat && obat->next && !obat->cs.merged)
4127 132826 : res += merge_delta(obat->next);
4128 342200 : res += obat->cs.ucnt;
4129 342200 : merge_cs(&obat->cs, __func__);
4130 342200 : return res;
4131 : }
4132 :
4133 : static void
4134 42088 : merge_storage(storage *tdb)
4135 : {
4136 42088 : merge_cs(&tdb->cs, __func__);
4137 :
4138 42088 : if (tdb->next) {
4139 436 : destroy_storage(tdb->next);
4140 436 : tdb->next = NULL;
4141 : }
4142 42088 : }
4143 :
4144 : static sql_delta *
4145 1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
4146 : {
4147 : /* commit ie copy back to the parent transaction */
4148 1 : if (delta && delta->cs.ts == commit_ts && delta->next) {
4149 1 : sql_delta *od = delta->next;
4150 1 : if (od->cs.ts == commit_ts) {
4151 0 : sql_delta t = *od, *n = od->next;
4152 0 : *od = *delta;
4153 0 : od->next = n;
4154 0 : *delta = t;
4155 0 : delta->next = NULL;
4156 0 : destroy_delta(delta, true);
4157 0 : return od;
4158 : }
4159 : }
4160 : return delta;
4161 : }
4162 :
4163 : static int
4164 132790 : log_update_col( sql_trans *tr, sql_change *change)
4165 : {
4166 132790 : sql_column *c = (sql_column*)change->obj;
4167 132790 : assert(!isTempTable(c->t));
4168 :
4169 132790 : if (isDeleted(c->t)) {
4170 0 : change->handled = true;
4171 0 : return LOG_OK;
4172 : }
4173 :
4174 132790 : if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
4175 132790 : storage *s = ATOMIC_PTR_GET(&c->t->data);
4176 132790 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
4177 132790 : return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
4178 : }
4179 : return LOG_OK;
4180 : }
4181 :
4182 : static int
4183 217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
4184 : {
4185 217 : sqlstore *store = Store;
4186 :
4187 217 : sql_delta *d = (sql_delta*)change->data;
4188 217 : if (d->cs.ts < oldest) {
4189 82 : destroy_delta(d, false);
4190 82 : if (change->commit == &commit_update_idx)
4191 2 : table_destroy(store, ((sql_idx*)change->obj)->t);
4192 : else
4193 80 : table_destroy(store, ((sql_column*)change->obj)->t);
4194 82 : return 1;
4195 : }
4196 135 : if (d->cs.ts > TRANSACTION_ID_BASE)
4197 82 : d->cs.ts = store_get_timestamp(store) + 1;
4198 : return 0;
4199 : }
4200 :
4201 : static int
4202 9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
4203 : {
4204 9 : sqlstore *store = Store;
4205 :
4206 9 : storage *d = (storage*)change->data;
4207 9 : if (d->cs.ts < oldest) {
4208 3 : destroy_storage(d);
4209 3 : table_destroy(store, (sql_table*)change->obj);
4210 3 : return 1;
4211 : }
4212 6 : if (d->cs.ts > TRANSACTION_ID_BASE)
4213 3 : d->cs.ts = store_get_timestamp(store) + 1;
4214 : return 0;
4215 : }
4216 :
4217 : static int
4218 132908 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
4219 : {
4220 132908 : (void) type; // TODO transaction_layer_revamp remove if remains unused
4221 :
4222 132908 : sql_delta *delta = ATOMIC_PTR_GET(data), *idelta = delta;
4223 :
4224 132908 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4225 3 : int ok = LOG_OK;
4226 3 : assert(isTempTable(t));
4227 3 : if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
4228 0 : ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
4229 3 : if (!tr->parent)
4230 0 : t->base.new = base->new = 0;
4231 3 : change->handled = true;
4232 3 : return ok;
4233 : }
4234 :
4235 132905 : if (commit_ts)
4236 132823 : delta->cs.ts = commit_ts;
4237 132905 : if (!commit_ts) { /* rollback */
4238 82 : sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
4239 :
4240 82 : if (change->ts && t->base.new) /* handled by create col */
4241 : return LOG_OK;
4242 82 : if (o != d) {
4243 0 : while(o && o->next != d)
4244 : o = o->next;
4245 : }
4246 82 : if (o == ATOMIC_PTR_GET(data))
4247 82 : ATOMIC_PTR_SET(data, d->next);
4248 : else
4249 0 : o->next = d->next;
4250 82 : d->next = NULL;
4251 82 : change->cleanup = &tc_gc_rollbacked;
4252 132823 : } else if (!tr->parent) {
4253 : /* merge deltas */
4254 362846 : while (delta && delta->cs.ts > oldest)
4255 230024 : delta = delta->next;
4256 132822 : if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
4257 32618 : lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
4258 32618 : idelta->nr_updates += merge_delta(delta);
4259 32618 : unlock_column(tr->store, base->id);
4260 : }
4261 1 : } else if (tr->parent) /* move delta into older and cleanup current save points */
4262 1 : ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
4263 : return LOG_OK;
4264 : }
4265 :
4266 : static int
4267 132880 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4268 : {
4269 :
4270 132880 : sql_column *c = (sql_column*)change->obj;
4271 132880 : sql_base* base = &c->base;
4272 132880 : sql_table* t = c->t;
4273 132880 : ATOMIC_PTR_TYPE* data = &c->data;
4274 132880 : int type = c->type.type->localtype;
4275 :
4276 132880 : if (change->handled || isDeleted(c->t))
4277 : return LOG_OK;
4278 :
4279 132880 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4280 : }
4281 :
4282 : static int
4283 26 : log_update_idx( sql_trans *tr, sql_change *change)
4284 : {
4285 26 : sql_idx *i = (sql_idx*)change->obj;
4286 26 : assert(!isTempTable(i->t));
4287 :
4288 26 : if (isDeleted(i->t)) {
4289 0 : change->handled = true;
4290 0 : return LOG_OK;
4291 : }
4292 :
4293 26 : if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
4294 26 : storage *s = ATOMIC_PTR_GET(&i->t->data);
4295 26 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
4296 26 : return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
4297 : }
4298 : return LOG_OK;
4299 : }
4300 :
4301 : static int
4302 28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4303 : {
4304 28 : sql_idx *i = (sql_idx*)change->obj;
4305 28 : sql_base* base = &i->base;
4306 28 : sql_table* t = i->t;
4307 28 : ATOMIC_PTR_TYPE* data = &i->data;
4308 28 : int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
4309 :
4310 28 : if (change->handled || isDeleted(i->t))
4311 : return LOG_OK;
4312 :
4313 28 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4314 : }
4315 :
4316 : static storage *
4317 26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
4318 : {
4319 26 : if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
4320 0 : storage *od = dbat->next;
4321 0 : if (od->cs.ts == commit_ts) {
4322 0 : storage t = *od, *n = od->next;
4323 0 : *od = *dbat;
4324 0 : od->next = n;
4325 0 : *dbat = t;
4326 0 : dbat->next = NULL;
4327 0 : destroy_storage(dbat);
4328 0 : return od;
4329 : }
4330 : }
4331 : return dbat;
4332 : }
4333 :
4334 : static int
4335 84508 : log_update_del( sql_trans *tr, sql_change *change)
4336 : {
4337 84508 : sql_table *t = (sql_table*)change->obj;
4338 84508 : assert(!isTempTable(t));
4339 :
4340 84508 : if (isDeleted(t)) {
4341 0 : change->handled = true;
4342 0 : return LOG_OK;
4343 : }
4344 :
4345 84508 : if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
4346 84508 : return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
4347 : return LOG_OK;
4348 : }
4349 :
4350 : static int
4351 89037 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4352 : {
4353 89037 : int ok = LOG_OK;
4354 89037 : sql_table *t = (sql_table*)change->obj;
4355 89037 : storage *dbat = ATOMIC_PTR_GET(&t->data);
4356 :
4357 89037 : if (change->handled || isDeleted(t))
4358 : return ok;
4359 :
4360 89037 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4361 22 : assert(isTempTable(t));
4362 22 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
4363 22 : if (commit_ts) dbat->segs->h->ts = commit_ts;
4364 22 : change->handled = true;
4365 22 : return ok;
4366 : }
4367 :
4368 89015 : lock_table(tr->store, t->base.id);
4369 89015 : if (!commit_ts) { /* rollback */
4370 4207 : if (dbat->cs.ts == tr->tid) {
4371 6 : if (change->ts && t->base.new) { /* handled by the create table */
4372 3 : unlock_table(tr->store, t->base.id);
4373 3 : return ok;
4374 : }
4375 3 : storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
4376 :
4377 3 : if (o != d) {
4378 0 : while(o && o->next != d)
4379 : o = o->next;
4380 : }
4381 3 : if (o == ATOMIC_PTR_GET(&t->data)) {
4382 3 : assert(d->next);
4383 3 : ATOMIC_PTR_SET(&t->data, d->next);
4384 : } else
4385 0 : o->next = d->next;
4386 3 : d->next = NULL;
4387 3 : change->cleanup = &tc_gc_rollbacked_storage;
4388 : } else
4389 4201 : rollback_segments(dbat->segs, tr, change, oldest);
4390 84808 : } else if (ok == LOG_OK && !tr->parent) {
4391 84782 : if (dbat->cs.ts == tr->tid) /* cleared table */
4392 25962 : dbat->cs.ts = commit_ts;
4393 :
4394 84782 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
4395 84782 : if (ok == LOG_OK) {
4396 84782 : merge_segments(dbat, tr, change, commit_ts, oldest);
4397 84782 : if (oldest == commit_ts)
4398 42088 : merge_storage(dbat);
4399 : }
4400 84782 : if (dbat)
4401 84782 : dbat->cs.cleared = false;
4402 26 : } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
4403 26 : merge_segments(dbat, tr, change, commit_ts, oldest);
4404 26 : ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
4405 26 : storage *s = change->data;
4406 26 : if (s->cs.ts == tr->tid)
4407 0 : s->cs.ts = commit_ts;
4408 : }
4409 89012 : unlock_table(tr->store, t->base.id);
4410 89012 : return ok;
4411 : }
4412 :
4413 : /* only rollback (content version) case for now */
4414 : static int
4415 18312 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
4416 : {
4417 18312 : sqlstore *store = Store;
4418 18312 : sql_column *c = (sql_column*)change->obj;
4419 :
4420 18312 : if (!c) /* cleaned earlier */
4421 : return 1;
4422 :
4423 172 : if (change->handled || isDeleted(c->t)) {
4424 0 : column_destroy(store, c);
4425 0 : return 1;
4426 : }
4427 :
4428 : /* savepoint commit (did it merge ?) */
4429 172 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4430 0 : column_destroy(store, c);
4431 0 : return 1;
4432 : }
4433 172 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4434 : return 0;
4435 171 : sql_delta *d = (sql_delta*)change->data, *id = d;
4436 171 : if (d && d->next) {
4437 :
4438 66 : if (d->cs.ts > oldest)
4439 : return LOG_OK; /* cannot cleanup yet */
4440 :
4441 : // d is oldest reachable delta
4442 63 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4443 62 : destroy_delta(d->next, true);
4444 62 : d->next = NULL;
4445 : }
4446 63 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4447 63 : id->nr_updates += merge_delta(d);
4448 63 : unlock_column(store, c->base.id);
4449 : }
4450 168 : column_destroy(store, c);
4451 168 : return 1;
4452 : }
4453 :
4454 : static int
4455 876454 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
4456 : {
4457 876454 : sqlstore *store = Store;
4458 876454 : sql_column *c = (sql_column*)change->obj;
4459 :
4460 876454 : if (!c) /* cleaned earlier */
4461 : return 1;
4462 :
4463 876454 : if (change->handled || isDeleted(c->t)) {
4464 3 : table_destroy(store, c->t);
4465 3 : return 1;
4466 : }
4467 :
4468 : /* savepoint commit (did it merge ?) */
4469 876451 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4470 73775 : table_destroy(store, c->t);
4471 73775 : return 1;
4472 : }
4473 802676 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4474 : return 0;
4475 802675 : sql_delta *d = (sql_delta*)change->data, *id = d;
4476 802675 : if (d && d->next) {
4477 :
4478 802675 : if (d->cs.ts > oldest)
4479 : return LOG_OK; /* cannot cleanup yet */
4480 :
4481 : // d is oldest reachable delta
4482 58958 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4483 5096 : destroy_delta(d->next, true);
4484 5096 : d->next = NULL;
4485 : }
4486 58958 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4487 58958 : id->nr_updates += merge_delta(d);
4488 58958 : unlock_column(store, c->base.id);
4489 : }
4490 58958 : table_destroy(store, c->t);
4491 58958 : return 1;
4492 : }
4493 :
4494 : static int
4495 2592 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
4496 : {
4497 2592 : sqlstore *store = Store;
4498 2592 : sql_idx *i = (sql_idx*)change->obj;
4499 :
4500 2592 : if (!i) /* cleaned earlier */
4501 : return 1;
4502 :
4503 633 : if (change->handled || isDeleted(i->t)) {
4504 0 : idx_destroy(store, i);
4505 0 : return 1;
4506 : }
4507 :
4508 : /* savepoint commit (did it merge ?) */
4509 633 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4510 0 : idx_destroy(store, i);
4511 0 : return 1;
4512 : }
4513 633 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4514 : return 0;
4515 633 : sql_delta *d = (sql_delta*)change->data, *id = d;
4516 633 : if (d && d->next) {
4517 0 : if (d->cs.ts > oldest)
4518 : return LOG_OK; /* cannot cleanup yet */
4519 :
4520 : // d is oldest reachable delta
4521 0 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4522 0 : destroy_delta(d->next, true);
4523 0 : d->next = NULL;
4524 : }
4525 0 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4526 0 : id->nr_updates += merge_delta(d);
4527 0 : unlock_column(store, i->base.id);
4528 : }
4529 633 : idx_destroy(store, i);
4530 633 : return 1;
4531 : }
4532 :
4533 : static int
4534 26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
4535 : {
4536 26 : sqlstore *store = Store;
4537 26 : sql_idx *i = (sql_idx*)change->obj;
4538 :
4539 26 : if (!i) /* cleaned earlier */
4540 : return 1;
4541 :
4542 26 : if (change->handled || isDeleted(i->t)) {
4543 0 : table_destroy(store, i->t);
4544 0 : return 1;
4545 : }
4546 :
4547 : /* savepoint commit (did it merge ?) */
4548 26 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4549 0 : table_destroy(store, i->t);
4550 0 : return 1;
4551 : }
4552 26 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4553 : return 0;
4554 26 : sql_delta *d = (sql_delta*)change->data, *id = d;
4555 26 : if (d && d->next) {
4556 26 : if (d->cs.ts > oldest)
4557 : return LOG_OK; /* cannot cleanup yet */
4558 :
4559 : // d is oldest reachable delta
4560 26 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4561 26 : destroy_delta(d->next, true);
4562 26 : d->next = NULL;
4563 : }
4564 26 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4565 26 : id->nr_updates += merge_delta(d);
4566 26 : unlock_column(store, i->base.id);
4567 : }
4568 26 : table_destroy(store, i->t);
4569 26 : return 1;
4570 : }
4571 :
4572 : static int
4573 249538 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
4574 : {
4575 249538 : sqlstore *store = Store;
4576 249538 : sql_table *t = (sql_table*)change->obj;
4577 :
4578 249538 : if (change->handled || isDeleted(t)) {
4579 3509 : table_destroy(store, t);
4580 3509 : return 1;
4581 : }
4582 : /* savepoint commit (did it merge ?) */
4583 246029 : if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
4584 14776 : table_destroy(store, t);
4585 14776 : return 1;
4586 : }
4587 231253 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4588 : return 0;
4589 231225 : storage *d = (storage*)change->data;
4590 231225 : if (d->next) {
4591 158802 : if (d->cs.ts > oldest)
4592 : return LOG_OK; /* cannot cleanup yet */
4593 :
4594 10751 : destroy_storage(d->next);
4595 10751 : d->next = NULL;
4596 : }
4597 83174 : table_destroy(store, t);
4598 83174 : return 1;
4599 : }
4600 :
4601 : static int
4602 30646 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
4603 : {
4604 30646 : if (nr == 0)
4605 : return LOG_OK;
4606 30646 : assert (nr > 0);
4607 30646 : if ((!offsets || !*offsets) && nr == total) {
4608 30617 : *offset = slot;
4609 30617 : return LOG_OK;
4610 : }
4611 29 : if (!*offsets) {
4612 7 : *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
4613 7 : if (!*offsets)
4614 : return LOG_ERR;
4615 : }
4616 29 : oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
4617 16168 : for(size_t i = 0; i < nr; i++)
4618 16139 : dst[i] = slot + i;
4619 29 : (*offsets)->batCount += nr;
4620 29 : (*offsets)->theap->dirty = true;
4621 29 : return LOG_OK;
4622 : }
4623 :
4624 : static int
4625 30624 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4626 : {
4627 30624 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4628 30624 : assert(s->segs);
4629 30624 : ulng oldest = store_oldest(tr->store, NULL);
4630 30624 : BUN slot = 0;
4631 30624 : size_t total = cnt;
4632 :
4633 30624 : if (!locked)
4634 30624 : lock_table(tr->store, t->base.id);
4635 : /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
4636 61380 : for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4637 30758 : if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* reuse old deleted or rolled back append */
4638 35 : if ((seg->end - seg->start) >= cnt) {
4639 : /* if previous is claimed before we could simply adjust the end/start */
4640 13 : if (p && p->ts == tr->tid && !p->deleted) {
4641 2 : slot = p->end;
4642 2 : p->end += cnt;
4643 2 : seg->start += cnt;
4644 2 : if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
4645 : ok = LOG_ERR;
4646 : break;
4647 : }
4648 2 : s->segs->nr_reused += cnt;
4649 2 : cnt = 0;
4650 2 : break;
4651 : }
4652 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4653 11 : size_t rcnt = seg->end - seg->start;
4654 11 : if (rcnt > cnt)
4655 : rcnt = cnt;
4656 11 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
4657 : ok = LOG_ERR;
4658 : break;
4659 : }
4660 : }
4661 33 : seg->ts = tr->tid;
4662 33 : seg->deleted = false;
4663 33 : slot = seg->start;
4664 33 : if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
4665 : ok = LOG_ERR;
4666 : break;
4667 : }
4668 33 : s->segs->nr_reused += (seg->end - seg->start);
4669 33 : cnt -= (seg->end - seg->start);
4670 : }
4671 : }
4672 30624 : if (ok == LOG_OK && cnt) {
4673 30611 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4674 29461 : slot = s->segs->t->end;
4675 29461 : s->segs->t->end += cnt;
4676 : } else {
4677 1150 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4678 : ok = LOG_ERR;
4679 : } else {
4680 1150 : if (!s->segs->h)
4681 0 : s->segs->h = s->segs->t;
4682 1150 : slot = s->segs->t->start;
4683 : }
4684 : }
4685 30611 : if (ok == LOG_OK)
4686 30611 : ok = add_offsets(slot, cnt, total, offset, offsets);
4687 : }
4688 30624 : if (!locked)
4689 30624 : unlock_table(tr->store, t->base.id);
4690 :
4691 30624 : if (ok == LOG_OK) {
4692 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4693 30624 : if (!in_transaction) {
4694 1135 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4695 1135 : in_transaction = true;
4696 : }
4697 30624 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4698 30610 : tr->logchanges += (lng) total;
4699 30624 : if (*offsets) {
4700 7 : BAT *pos = *offsets;
4701 7 : assert(BATcount(pos) == total);
4702 7 : BATsetcount(pos, total); /* set other properties */
4703 7 : pos->tnil = false;
4704 7 : pos->tnonil = true;
4705 7 : pos->tkey = true;
4706 7 : pos->tsorted = true;
4707 7 : pos->trevsorted = false;
4708 : }
4709 : }
4710 30624 : return ok;
4711 : }
4712 :
4713 : static int
4714 2059522 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4715 : {
4716 2059522 : if (cnt > 1 && offsets)
4717 30624 : return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
4718 2028898 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4719 2028898 : assert(s->segs);
4720 2028898 : ulng oldest = store_oldest(tr->store, NULL);
4721 2028898 : BUN slot = 0;
4722 2028898 : int reused = 0;
4723 :
4724 2028898 : if (!locked)
4725 1903612 : lock_table(tr->store, t->base.id);
4726 : /* naive vacuum approach, iterator through segments, check for large enough deleted segments
4727 : * or create new segment at the end */
4728 8340720 : for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4729 6381205 : if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* reuse old deleted or rolled back append */
4730 :
4731 69383 : if ((seg->end - seg->start) >= cnt) {
4732 :
4733 : /* if previous is claimed before we could simply adjust the end/start */
4734 69383 : if (p && p->ts == tr->tid && !p->deleted) {
4735 54163 : slot = p->end;
4736 54163 : p->end += cnt;
4737 54163 : seg->start += cnt;
4738 54163 : s->segs->nr_reused += cnt;
4739 54163 : reused = 1;
4740 54163 : break;
4741 : }
4742 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4743 15220 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
4744 : ok = LOG_ERR;
4745 : break;
4746 : }
4747 : }
4748 15220 : seg->ts = tr->tid;
4749 15220 : seg->deleted = false;
4750 15220 : slot = seg->start;
4751 15220 : s->segs->nr_reused += cnt;
4752 15220 : reused = 1;
4753 15220 : break;
4754 : }
4755 : }
4756 2028898 : if (ok == LOG_OK && !reused) {
4757 1959515 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4758 1924485 : slot = s->segs->t->end;
4759 1924485 : s->segs->t->end += cnt;
4760 : } else {
4761 35030 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4762 : ok = LOG_ERR;
4763 : } else {
4764 35030 : if (!s->segs->h)
4765 0 : s->segs->h = s->segs->t;
4766 35030 : slot = s->segs->t->start;
4767 : }
4768 : }
4769 : }
4770 2028898 : if (!locked)
4771 1903612 : unlock_table(tr->store, t->base.id);
4772 :
4773 2028898 : if (ok == LOG_OK) {
4774 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4775 2028898 : if (!in_transaction) {
4776 48954 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4777 48954 : in_transaction = true;
4778 : }
4779 2028898 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4780 2028467 : tr->logchanges += (lng) cnt;
4781 2028898 : *offset = slot;
4782 : }
4783 : return ok;
4784 : }
4785 :
4786 : /*
4787 : * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
4788 : * of the global transaction and mark the newly added storage slots unused on the global
4789 : * level but used on the local transaction level. Besides this the local transaction needs
4790 : * to update (and mark unused) any slot in between the old end and new slots.
4791 : * */
4792 : static int
4793 1934236 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4794 : {
4795 1934236 : storage *s;
4796 :
4797 : /* we have a single segment structure for each persistent table
4798 : * for temporary tables each has its own */
4799 1934236 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4800 : return LOG_ERR;
4801 :
4802 1934236 : return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
4803 : }
4804 :
4805 : /* some tables cannot be updated concurrently (user/roles etc) */
4806 : static int
4807 125290 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4808 : {
4809 125290 : storage *s;
4810 125290 : int res = 0;
4811 :
4812 : /* we have a single segment structure for each persistent table
4813 : * for temporary tables each has its own */
4814 125290 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4815 : /* TODO check for other inserts ! */
4816 : return LOG_ERR;
4817 :
4818 125290 : lock_table(tr->store, t->base.id);
4819 125290 : if ((res = segments_conflict(tr, s->segs, 1))) {
4820 4 : unlock_table(tr->store, t->base.id);
4821 4 : return LOG_CONFLICT;
4822 : }
4823 125286 : res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
4824 125286 : unlock_table(tr->store, t->base.id);
4825 125286 : return res;
4826 : }
4827 :
4828 : static int
4829 21389 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
4830 : {
4831 21389 : storage *s;
4832 21389 : int res = 0;
4833 :
4834 21389 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4835 : return LOG_ERR;
4836 :
4837 21389 : lock_table(tr->store, t->base.id);
4838 21389 : res = segments_conflict(tr, s->segs, uncommitted);
4839 21389 : unlock_table(tr->store, t->base.id);
4840 21389 : return res ? LOG_CONFLICT : LOG_OK;
4841 : }
4842 :
4843 : static size_t
4844 1417457 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
4845 : {
4846 1417457 : size_t cnt = 0;
4847 :
4848 1546358 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
4849 : ;
4850 :
4851 3563306 : for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
4852 2145852 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
4853 120745 : cnt += s->end - s->start;
4854 : }
4855 1417454 : return cnt;
4856 : }
4857 :
4858 : static BAT *
4859 1417485 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
4860 : {
4861 1417485 : lock_table(tr->store, t->base.id);
4862 1417463 : segment *s = S->segs->h;
4863 : /* step one no deletes -> dense range */
4864 1417463 : uint32_t cur = 0;
4865 1417463 : size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
4866 1417450 : if (!dnr) {
4867 1309396 : unlock_table(tr->store, t->base.id);
4868 1309416 : return BATdense(start, start, end-start);
4869 : }
4870 :
4871 108054 : BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
4872 108054 : if (!b) {
4873 0 : unlock_table(tr->store, t->base.id);
4874 0 : return NULL;
4875 : }
4876 :
4877 108054 : uint32_t *restrict dst = Tloc(b, 0);
4878 59219706 : for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
4879 59170046 : if (s->end < start)
4880 85423 : continue;
4881 59084623 : if (s->start >= end)
4882 : break;
4883 59026229 : msk m = (SEG_IS_VALID(s, tr));
4884 59026229 : size_t lnr = s->end-s->start;
4885 59026229 : if (s->start < start)
4886 11958 : lnr -= (start - s->start);
4887 59026229 : if (s->end > end)
4888 6376 : lnr -= s->end - end;
4889 :
4890 59026229 : if (m) {
4891 956354 : size_t used = pos&31, end = 32;
4892 956354 : if (used) {
4893 824646 : if (lnr < (32-used))
4894 509343 : end = used + lnr;
4895 824646 : assert(end > used);
4896 824646 : cur |= ((1U << (end - used)) - 1) << used;
4897 824646 : lnr -= end - used;
4898 824646 : pos += end - used;
4899 824646 : if (end == 32) {
4900 315303 : *dst++ = cur;
4901 315303 : cur = 0;
4902 : }
4903 : }
4904 956354 : size_t full = lnr/32;
4905 956354 : size_t rest = lnr%32;
4906 956354 : if (full > 0) {
4907 249984 : memset(dst, ~0, full * sizeof(*dst));
4908 249984 : dst += full;
4909 249984 : lnr -= full * 32;
4910 249984 : pos += full * 32;
4911 : }
4912 956354 : if (rest > 0) {
4913 425856 : cur |= (1U << rest) - 1;
4914 425856 : lnr -= rest;
4915 425856 : pos += rest;
4916 : }
4917 956354 : assert(lnr==0);
4918 : } else {
4919 58069875 : size_t used = pos&31, end = 32;
4920 58069875 : if (used) {
4921 56255348 : if (lnr < (32-used))
4922 54342433 : end = used + lnr;
4923 :
4924 56255348 : pos+= (end-used);
4925 56255348 : lnr-= (end-used);
4926 56255348 : if (end == 32) {
4927 1912915 : *dst++ = cur;
4928 1912915 : cur = 0;
4929 : }
4930 : }
4931 58069875 : size_t full = lnr/32;
4932 58069875 : size_t rest = lnr%32;
4933 58069875 : memset(dst, 0, full * sizeof(*dst));
4934 58069875 : dst += full;
4935 58069875 : lnr -= full * 32;
4936 58069875 : pos += full * 32;
4937 58069875 : pos+= rest;
4938 58069875 : lnr-= rest;
4939 58069875 : assert(lnr==0);
4940 : }
4941 : }
4942 :
4943 108054 : unlock_table(tr->store, t->base.id);
4944 108054 : if (pos%32)
4945 106727 : *dst=cur;
4946 108054 : BATsetcount(b, nr);
4947 108054 : bn = BATmaskedcands(start, nr, b, true);
4948 108054 : BBPreclaim(b);
4949 108054 : (void)pos;
4950 108054 : assert (pos == nr);
4951 : return bn;
4952 : }
4953 :
4954 : static void * /* BAT * */
4955 1424297 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
4956 : {
4957 : /* with nr_of_parts - part_nr we can adjust parts */
4958 1424297 : storage *s = tab_timestamp_storage(tr, t);
4959 :
4960 1424280 : if (!s)
4961 : return NULL;
4962 1424280 : size_t nr = segs_end(s->segs, tr, t);
4963 :
4964 1424280 : if (!nr)
4965 6794 : return BATdense(0, 0, 0);
4966 :
4967 : /* compute proper part */
4968 1417486 : size_t part_size = nr/nr_of_parts;
4969 1417486 : size_t start = part_size * part_nr;
4970 1417486 : size_t end = start + part_size;
4971 1417486 : if (part_nr == (nr_of_parts-1))
4972 1284979 : end = nr;
4973 1417486 : assert(end <= nr);
4974 1417486 : return segments2cands(s, tr, t, start, end);
4975 : }
4976 :
4977 : static int
4978 5 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
4979 : {
4980 5 : bool update_conflict = false;
4981 :
4982 5 : if (segments_in_transaction(tr, col->t))
4983 : return LOG_CONFLICT;
4984 :
4985 5 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
4986 :
4987 5 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
4988 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
4989 5 : assert(d && d->cs.ts == tr->tid);
4990 5 : if (odelta != d)
4991 5 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
4992 5 : if (d->cs.bid)
4993 5 : temp_destroy(d->cs.bid);
4994 5 : if (d->cs.uibid)
4995 5 : temp_destroy(d->cs.uibid);
4996 5 : if (d->cs.uvbid)
4997 5 : temp_destroy(d->cs.uvbid);
4998 5 : bat_set_access(bn, BAT_READ);
4999 5 : d->cs.bid = temp_create(bn);
5000 5 : d->cs.uibid = 0;
5001 5 : d->cs.uvbid = 0;
5002 5 : d->cs.ucnt = 0;
5003 5 : d->cs.cleared = true;
5004 5 : d->cs.ts = tr->tid;
5005 5 : ATOMIC_INIT(&d->cs.refcnt, 1);
5006 5 : return LOG_OK;
5007 : }
5008 :
5009 : static int
5010 5 : vacuum_col(sql_trans *tr, sql_column *c, bool force)
5011 : {
5012 5 : if (segments_in_transaction(tr, c->t))
5013 : return LOG_CONFLICT;
5014 :
5015 5 : sql_delta *d = NULL;
5016 :
5017 : /* do we have enough to clean */
5018 5 : if ((d = bind_col_data(tr, c, NULL)) == NULL)
5019 : return LOG_CONFLICT;
5020 :
5021 : /* do we have enough to clean */
5022 5 : if (!force && (d->nr_updates) < 1024)
5023 : return LOG_OK;
5024 :
5025 5 : BAT *b = NULL, *bn = NULL;;
5026 5 : if ((b = bind_col(tr, c, 0)) == NULL)
5027 : return LOG_ERR;
5028 5 : if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
5029 0 : BBPreclaim(b);
5030 0 : return LOG_ERR;
5031 : }
5032 5 : int res = swap_bats(tr, c, bn);
5033 5 : d->nr_updates = 0;
5034 5 : BBPreclaim(b);
5035 5 : BBPreclaim(bn);
5036 5 : return res;
5037 : }
5038 :
5039 : static int
5040 0 : vacuum_tab(sql_trans *tr, sql_table *t, bool force)
5041 : {
5042 0 : if (segments_in_transaction(tr, t))
5043 : return LOG_CONFLICT;
5044 :
5045 0 : storage *s;
5046 0 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
5047 : return LOG_ERR;
5048 :
5049 0 : for( node *n = ol_first_node(t->columns); n; n = n->next) {
5050 0 : sql_column *c = n->data;
5051 :
5052 0 : if (!ATOMvarsized(c->type.type->localtype))
5053 0 : continue;
5054 0 : sql_delta *d = NULL;
5055 :
5056 : /* do we have enough to clean */
5057 0 : if ((d = bind_col_data(tr, c, NULL)) == NULL)
5058 : return LOG_CONFLICT;
5059 :
5060 : /* do we have enough to clean */
5061 0 : if (!force && (d->nr_updates + s->segs->nr_reused) < 1024)
5062 0 : continue;
5063 :
5064 0 : BAT *b = NULL, *bn = NULL;;
5065 0 : if ((b = bind_col(tr, c, 0)) == NULL)
5066 : return LOG_ERR;
5067 0 : if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
5068 0 : BBPreclaim(b);
5069 0 : return LOG_ERR;
5070 : }
5071 0 : int res = swap_bats(tr, c, bn);
5072 0 : d->nr_updates = 0;
5073 0 : BBPreclaim(b);
5074 0 : BBPreclaim(bn);
5075 0 : if (res != LOG_OK)
5076 0 : return res;
5077 : }
5078 0 : s->segs->nr_reused = 0;
5079 0 : return LOG_OK;
5080 : }
5081 :
5082 :
5083 : static int
5084 58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
5085 : {
5086 58 : bool update_conflict = false;
5087 :
5088 58 : if (segments_in_transaction(tr, col->t))
5089 : return LOG_CONFLICT;
5090 :
5091 58 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5092 :
5093 58 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5094 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5095 58 : assert(d && d->cs.ts == tr->tid);
5096 58 : assert(col->t->persistence != SQL_DECLARED_TABLE);
5097 58 : if (odelta != d)
5098 58 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
5099 :
5100 58 : d->cs.st = st;
5101 58 : d->cs.cleared = true;
5102 58 : if (d->cs.bid)
5103 58 : temp_destroy(d->cs.bid);
5104 58 : o = transfer_to_systrans(o);
5105 58 : if (o == NULL)
5106 : return LOG_ERR;
5107 58 : bat_set_access(o, BAT_READ);
5108 58 : d->cs.bid = temp_create(o);
5109 58 : if (u) {
5110 53 : if (d->cs.ebid)
5111 0 : temp_destroy(d->cs.ebid);
5112 53 : u = transfer_to_systrans(u);
5113 53 : if (u == NULL)
5114 : return LOG_ERR;
5115 53 : d->cs.ebid = temp_create(u);
5116 : }
5117 : return LOG_OK;
5118 : }
5119 :
5120 : void
5121 331 : bat_storage_init( store_functions *sf)
5122 : {
5123 331 : sf->bind_col = &bind_col;
5124 331 : sf->bind_updates = &bind_updates;
5125 331 : sf->bind_updates_idx = &bind_updates_idx;
5126 331 : sf->bind_idx = &bind_idx;
5127 331 : sf->bind_cands = &bind_cands;
5128 :
5129 331 : sf->claim_tab = &claim_tab;
5130 331 : sf->key_claim_tab = &key_claim_tab;
5131 331 : sf->tab_validate = &tab_validate;
5132 :
5133 331 : sf->append_col = &append_col;
5134 331 : sf->append_idx = &append_idx;
5135 :
5136 331 : sf->update_col = &update_col;
5137 331 : sf->update_idx = &update_idx;
5138 :
5139 331 : sf->delete_tab = &delete_tab;
5140 :
5141 331 : sf->count_del = &count_del;
5142 331 : sf->count_col = &count_col;
5143 331 : sf->count_idx = &count_idx;
5144 331 : sf->dcount_col = &dcount_col;
5145 331 : sf->min_max_col = &min_max_col;
5146 331 : sf->set_stats_col = &set_stats_col;
5147 331 : sf->sorted_col = &sorted_col;
5148 331 : sf->unique_col = &unique_col;
5149 331 : sf->double_elim_col = &double_elim_col;
5150 331 : sf->col_stats = &col_stats;
5151 331 : sf->col_set_range = &col_set_range;
5152 331 : sf->col_not_null = &col_not_null;
5153 :
5154 331 : sf->col_dup = &col_dup;
5155 331 : sf->idx_dup = &idx_dup;
5156 331 : sf->del_dup = &del_dup;
5157 :
5158 331 : sf->create_col = &create_col; /* create and add to change list */
5159 331 : sf->create_idx = &create_idx;
5160 331 : sf->create_del = &create_del;
5161 :
5162 331 : sf->destroy_col = &destroy_col; /* free resources */
5163 331 : sf->destroy_idx = &destroy_idx;
5164 331 : sf->destroy_del = &destroy_del;
5165 :
5166 331 : sf->drop_col = &drop_col; /* add drop to change list */
5167 331 : sf->drop_idx = &drop_idx;
5168 331 : sf->drop_del = &drop_del;
5169 :
5170 331 : sf->clear_table = &clear_table;
5171 :
5172 331 : sf->vacuum_col = &vacuum_col;
5173 331 : sf->vacuum_tab = &vacuum_tab;
5174 331 : sf->col_compress = &col_compress;
5175 331 : }
|