Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "bat_storage.h"
15 : #include "bat_utils.h"
16 : #include "sql_string.h"
17 : #include "matomic.h"
18 :
19 : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
20 : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
21 :
22 : static int log_update_col( sql_trans *tr, sql_change *c);
23 : static int log_update_idx( sql_trans *tr, sql_change *c);
24 : static int log_update_del( sql_trans *tr, sql_change *c);
25 : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
26 : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
27 : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
28 : static int log_create_col(sql_trans *tr, sql_change *change);
29 : static int log_create_idx(sql_trans *tr, sql_change *change);
30 : static int log_create_del(sql_trans *tr, sql_change *change);
31 : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
32 : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
33 : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
34 : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
35 : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
36 : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
37 : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
38 : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
39 :
40 : static lng merge_delta( sql_delta *obat);
41 :
42 : /* valid
43 : * !deleted && VALID_4_READ(TS, tr) existing or newly created segment
44 : * deleted && TS > tr->ts && OLDTS < tr->ts deleted after current transaction
45 : */
46 :
47 : #define VALID_4_READ(TS,tr) \
48 : (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
49 :
50 : /* when changed, check if the old status is still valid */
51 : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
52 : (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
53 :
54 : #define SEG_VALID_4_DELETE(seg,tr) \
55 : (!seg->deleted && VALID_4_READ(seg->ts, tr))
56 :
57 : /* Delete (in current trans or by some other finished transaction, or re-used segment which used to be deleted */
58 : #define SEG_IS_DELETED(seg,tr) \
59 : ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
60 : (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
61 :
62 : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
63 : #define SEG_IS_VALID(seg, tr) \
64 : ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
65 : (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
66 :
67 : static inline BAT *
68 5261 : transfer_to_systrans(BAT *b)
69 : {
70 : /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
71 5261 : MT_lock_set(&b->theaplock);
72 5261 : if (VIEWtparent(b) || VIEWvtparent(b)) {
73 29 : MT_lock_unset(&b->theaplock);
74 29 : BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
75 29 : BBPreclaim(b);
76 29 : return bn;
77 : }
78 5232 : if (b->theap->farmid == TRANSIENT ||
79 62 : (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
80 4847 : QryCtx *qc = MT_thread_get_qry_ctx();
81 4847 : if (qc) {
82 2523 : if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
83 2523 : ATOMIC_SUB(&qc->datasize, b->theap->size);
84 2523 : b->theap->farmid = SYSTRANS;
85 2523 : b->batRole = SYSTRANS;
86 : }
87 2523 : if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
88 1092 : ATOMIC_SUB(&qc->datasize, b->tvheap->size);
89 1092 : b->tvheap->farmid = SYSTRANS;
90 : }
91 : }
92 : }
93 5232 : MT_lock_unset(&b->theaplock);
94 5232 : return b;
95 : }
96 :
97 : static void
98 28732697 : lock_table(sqlstore *store, sqlid id)
99 : {
100 28732697 : MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
101 28821474 : }
102 :
103 : static void
104 28821775 : unlock_table(sqlstore *store, sqlid id)
105 : {
106 28821775 : MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
107 28818446 : }
108 :
109 : static void
110 21008839 : lock_column(sqlstore *store, sqlid id)
111 : {
112 21008839 : MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
113 21028385 : }
114 :
115 : static void
116 21028674 : unlock_column(sqlstore *store, sqlid id)
117 : {
118 21028674 : MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
119 21038592 : }
120 :
121 : static void
122 114962 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
123 : {
124 114962 : assert(cleanup);
125 114962 : trans_add(tr, dup_base(b), data, cleanup, commit, log);
126 114958 : }
127 :
128 : static void
129 132940 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
130 : {
131 132940 : assert(cleanup);
132 132940 : dup_base(&t->base);
133 132939 : trans_add(tr, b, data, cleanup, commit, log);
134 132931 : }
135 :
136 : static int
137 91012 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
138 : {
139 91012 : segment *s = change->data;
140 :
141 91012 : if (s->ts <= oldest) {
142 34238 : while(s) {
143 21349 : segment *n = s->prev;
144 21349 : ATOMIC_PTR_DESTROY(&s->next);
145 21349 : _DELETE(s);
146 21349 : s = n;
147 : }
148 12889 : sqlstore *store = Store;
149 12889 : table_destroy(store, (sql_table*)change->obj);
150 12889 : return 1;
151 : }
152 : return LOG_OK;
153 : }
154 :
155 : static void
156 21349 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
157 : {
158 : /* we can only be accessed by anything older then commit_ts */
159 21349 : if (c->cleanup == &tc_gc_seg)
160 8460 : s->prev = c->data;
161 : else
162 12889 : c->cleanup = &tc_gc_seg;
163 21349 : c->data = s;
164 21349 : s->ts = commit_ts;
165 16619 : }
166 :
167 : static segment *
168 89026 : new_segment(segment *o, sql_trans *tr, size_t cnt)
169 : {
170 89026 : segment *n = (segment*)GDKmalloc(sizeof(segment));
171 :
172 89026 : assert(tr);
173 89026 : if (n) {
174 89026 : *n = (segment) {
175 89026 : .ts = tr->tid,
176 : .oldts = 0,
177 : .deleted = false,
178 : .start = 0,
179 : .end = cnt,
180 : .next = ATOMIC_PTR_VAR_INIT(NULL),
181 : .prev = NULL,
182 : };
183 89026 : if (o) {
184 36498 : n->start += o->end;
185 36498 : n->end += o->end;
186 36498 : ATOMIC_PTR_SET(&o->next, n);
187 : }
188 : }
189 89026 : return n;
190 : }
191 :
192 : static segment *
193 95599 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
194 : {
195 95599 : assert(tr);
196 95599 : if (o->start == start && o->end == start+cnt) {
197 11238 : assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
198 11238 : o->oldts = o->ts;
199 11238 : o->ts = tr->tid;
200 11238 : o->deleted = deleted;
201 11238 : return o;
202 : }
203 84361 : segment *n = (segment*)GDKmalloc(sizeof(segment));
204 :
205 84361 : if (!n)
206 : return NULL;
207 84361 : n->prev = NULL;
208 :
209 84361 : if (o->ts == tr->tid) {
210 5916 : n->oldts = 0;
211 5916 : n->ts = 1;
212 5916 : n->deleted = true;
213 : } else {
214 78445 : n->oldts = o->ts;
215 78445 : n->ts = tr->tid;
216 78445 : n->deleted = deleted;
217 : }
218 84361 : if (start == o->start) {
219 : /* 2-way split: o remains latter part of segment, new one is
220 : * inserted before */
221 67981 : n->start = o->start;
222 67981 : n->end = n->start + cnt;
223 67981 : ATOMIC_PTR_INIT(&n->next, o);
224 67981 : if (segs->h == o)
225 464 : segs->h = n;
226 67981 : if (p)
227 67517 : ATOMIC_PTR_SET(&p->next, n);
228 67981 : o->start = n->end;
229 16380 : } else if (start+cnt == o->end) {
230 : /* 2-way split: o remains first part of segment, new one is
231 : * added after */
232 5736 : n->start = o->end - cnt;
233 5736 : n->end = o->end;
234 5736 : ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
235 5736 : ATOMIC_PTR_SET(&o->next, n);
236 5736 : if (segs->t == o)
237 885 : segs->t = n;
238 5736 : o->end = n->start;
239 : } else {
240 : /* 3-way split: o remains first part of segment, two new ones
241 : * are added after */
242 10644 : segment *n2 = GDKmalloc(sizeof(segment));
243 10644 : if (n2 == NULL) {
244 0 : GDKfree(n);
245 0 : return NULL;
246 : }
247 10644 : ATOMIC_PTR_INIT(&n->next, n2);
248 10644 : n->start = start;
249 10644 : n->end = start + cnt;
250 10644 : *n2 = *o;
251 10644 : ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
252 10644 : n2->start = n->end;
253 10644 : n2->prev = NULL;
254 10644 : if (segs->t == o)
255 4459 : segs->t = n2;
256 10644 : ATOMIC_PTR_SET(&o->next, n);
257 10644 : o->end = start;
258 : }
259 : return n;
260 : }
261 :
262 : static void
263 4197 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
264 : {
265 4197 : segment *cur = segs->h, *seg = NULL;
266 17955 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
267 13758 : if (cur->ts == tr->tid) { /* revert */
268 4694 : cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
269 4694 : cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
270 4694 : cur->oldts = 0;
271 : }
272 13758 : if (cur->ts <= oldest) { /* possibly merge range */
273 13218 : if (!seg) { /* skip first */
274 : seg = cur;
275 9021 : } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
276 : /* merge with previous */
277 4730 : seg->end = cur->end;
278 4730 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
279 4730 : if (cur == segs->t)
280 2857 : segs->t = seg;
281 4730 : mark4destroy(cur, change, store_get_timestamp(tr->store));
282 4730 : cur = seg;
283 : } else {
284 : seg = cur; /* begin of new merge */
285 : }
286 : }
287 : }
288 4197 : }
289 :
290 : static size_t
291 104616 : segs_end_include_deleted( segments *segs, sql_trans *tr)
292 : {
293 104616 : size_t cnt = 0;
294 104616 : segment *s = segs->h, *l = NULL;
295 :
296 513946 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
297 409330 : if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
298 : l = s;
299 : }
300 104616 : if (l)
301 104609 : cnt = l->end;
302 104616 : return cnt;
303 : }
304 :
305 : static int
306 104616 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
307 : {
308 : /* set bits correctly */
309 104616 : BAT *b = temp_descriptor(cs->bid);
310 :
311 104616 : if (!b)
312 : return LOG_ERR;
313 104616 : segment *s = segs->h;
314 :
315 104616 : size_t nr = segs_end_include_deleted(segs, tr);
316 104616 : size_t rounded_nr = ((nr+31)&~31);
317 104616 : if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
318 0 : bat_destroy(b);
319 0 : return LOG_ERR;
320 : }
321 :
322 : /* disable all properties here */
323 104616 : MT_lock_set(&b->theaplock);
324 104616 : b->tsorted = false;
325 104616 : b->trevsorted = false;
326 104616 : b->tnosorted = 0;
327 104616 : b->tnorevsorted = 0;
328 104616 : b->tseqbase = oid_nil;
329 104616 : b->tkey = false;
330 104616 : b->tnokey[0] = 0;
331 104616 : b->tnokey[1] = 0;
332 104616 : b->theap->dirty = true;
333 104616 : BUN cnt = BATcount(b);
334 :
335 104616 : uint32_t *restrict dst;
336 448841 : for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
337 388965 : if (s->start >= nr)
338 : break;
339 344225 : if (s->ts == tr->tid && s->end != s->start) {
340 151646 : if (cnt < s->start) { /* first mark as deleted ! */
341 3361 : size_t lnr = s->start-cnt;
342 3361 : size_t pos = cnt;
343 3361 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
344 3361 : uint32_t cur = 0;
345 :
346 3361 : size_t used = pos&31, end = 32;
347 3361 : if (used) {
348 3248 : if (lnr < (32-used))
349 3066 : end = used + lnr;
350 3248 : assert(end > used);
351 3248 : cur |= ((1U << (end - used)) - 1) << used;
352 3248 : lnr -= end - used;
353 3248 : *dst++ |= cur;
354 3248 : cur = 0;
355 : }
356 3361 : size_t full = lnr/32;
357 3361 : size_t rest = lnr%32;
358 3361 : if (full > 0) {
359 7 : memset(dst, ~0, full * sizeof(*dst));
360 7 : dst += full;
361 7 : lnr -= full * 32;
362 : }
363 3361 : if (rest > 0) {
364 181 : cur |= (1U << rest) - 1;
365 181 : lnr -= rest;
366 181 : *dst |= cur;
367 : }
368 3361 : assert(lnr==0);
369 : }
370 151646 : size_t lnr = s->end-s->start;
371 151646 : size_t pos = s->start;
372 151646 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
373 151646 : uint32_t cur = 0;
374 151646 : size_t used = pos&31, end = 32;
375 151646 : if (used) {
376 113015 : if (lnr < (32-used))
377 106279 : end = used + lnr;
378 113015 : assert(end > used);
379 113015 : cur |= ((1U << (end - used)) - 1) << used;
380 113015 : lnr -= end - used;
381 113015 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
382 113015 : dst++;
383 113015 : cur = 0;
384 : }
385 151646 : size_t full = lnr/32;
386 151646 : size_t rest = lnr%32;
387 151646 : if (full > 0) {
388 3585 : memset(dst, s->deleted?~0:0, full * sizeof(*dst));
389 3585 : dst += full;
390 3585 : lnr -= full * 32;
391 : }
392 151646 : if (rest > 0) {
393 41593 : cur |= (1U << rest) - 1;
394 41593 : lnr -= rest;
395 41593 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
396 : }
397 151646 : assert(lnr==0);
398 151646 : if (cnt < s->end)
399 344225 : cnt = s->end;
400 : }
401 : }
402 104616 : if (nr > BATcount(b)) {
403 62530 : BATsetcount(b, nr);
404 : }
405 104616 : MT_lock_unset(&b->theaplock);
406 :
407 104616 : bat_destroy(b);
408 104616 : return LOG_OK;
409 : }
410 :
411 : /* TODO return LOG_OK/ERR */
412 : static void
413 104642 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
414 : {
415 104642 : sqlstore* store = tr->store;
416 104642 : segment *cur = s->segs->h, *seg = NULL;
417 514044 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
418 409402 : if (cur->ts == tr->tid) {
419 166148 : if (!cur->deleted)
420 93475 : cur->oldts = 0;
421 166148 : cur->ts = commit_ts;
422 : }
423 409402 : if (!seg) {
424 : /* first segment */
425 : seg = cur;
426 : }
427 304760 : else if (seg->ts < TRANSACTION_ID_BASE) {
428 : /* possible merge since both deleted flags are equal */
429 281917 : if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
430 211207 : int merge = 1;
431 211207 : node *n = store->active->h;
432 719453 : for (int i = 0; i < store->active->cnt; i++, n = n->next) {
433 616195 : sql_trans* other = ((sql_trans*)n->data);
434 616195 : ulng active = other->ts;
435 616195 : if(other->active == 2)
436 30888 : continue; /* pretend that another recently committed transaction is no longer active */
437 585307 : if (active == tr->ts)
438 137690 : continue; /* pretend that committing transaction has already committed and is no longer active */
439 447617 : if (seg->ts < active && cur->ts < active)
440 : break;
441 435257 : if (seg->ts > active && cur->ts > active)
442 339668 : continue;
443 :
444 95589 : assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
445 : /* cannot safely merge since there is an active transaction between the segments */
446 : merge = false;
447 : break;
448 : }
449 : /* merge segments */
450 231236 : if (merge) {
451 115618 : seg->end = cur->end;
452 115618 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
453 115618 : if (cur == s->segs->t)
454 27299 : s->segs->t = seg;
455 115618 : if (commit_ts == oldest) {
456 98999 : ATOMIC_PTR_DESTROY(&cur->next);
457 98999 : _DELETE(cur);
458 : } else
459 33238 : mark4destroy(cur, change, commit_ts);
460 115618 : cur = seg;
461 115618 : continue;
462 : }
463 : }
464 : }
465 : seg = cur;
466 : }
467 104642 : }
468 :
469 : static int
470 2185128 : segments_in_transaction(sql_trans *tr, sql_table *t)
471 : {
472 2185128 : storage *s = ATOMIC_PTR_GET(&t->data);
473 2185128 : segment *seg = s->segs->h;
474 :
475 2185128 : if (seg && s->segs->t->ts == tr->tid)
476 : return 1;
477 636293 : for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
478 530870 : if (seg->ts == tr->tid)
479 : return 1;
480 : }
481 : return 0;
482 : }
483 :
484 : static size_t
485 20646003 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
486 : {
487 20646003 : size_t cnt = 0;
488 :
489 : /* because a table can grow rows over the time a transaction is running, we need to find the last valid segment, to
490 : * keep all of the parts aligned */
491 20646003 : lock_table(tr->store, table->base.id);
492 20710958 : segment *s = segs->h, *l = NULL;
493 :
494 20710958 : if (segs->t && SEG_IS_VALID(segs->t, tr))
495 16994870 : l = s = segs->t;
496 :
497 223549404 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
498 202838583 : if (SEG_IS_VALID(s, tr))
499 : l = s;
500 : }
501 20710821 : if (l)
502 20692405 : cnt = l->end;
503 20710821 : unlock_table(tr->store, table->base.id);
504 20709109 : return cnt;
505 : }
506 :
507 : static segments *
508 52527 : new_segments(sql_trans *tr, size_t cnt)
509 : {
510 52527 : segments *n = (segments*)GDKmalloc(sizeof(segments));
511 :
512 52529 : if (n) {
513 52529 : n->nr_reused = 0;
514 52529 : n->h = n->t = new_segment(NULL, tr, cnt);
515 52528 : if (!n->h) {
516 0 : GDKfree(n);
517 0 : return NULL;
518 : }
519 52528 : sql_ref_init(&n->r);
520 : }
521 : return n;
522 : }
523 :
524 : static sql_delta *
525 34560553 : timestamp_delta( sql_trans *tr, sql_delta *d)
526 : {
527 34614893 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
528 54340 : d = d->next;
529 34567917 : return d;
530 : }
531 :
532 : static sql_delta *
533 34395782 : col_timestamp_delta( sql_trans *tr, sql_column *c)
534 : {
535 34395782 : return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
536 : }
537 :
538 : static sql_delta *
539 27364 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
540 : {
541 27364 : return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
542 : }
543 :
544 : static storage *
545 21125568 : timestamp_storage( sql_trans *tr, storage *d)
546 : {
547 21125568 : if (!d)
548 : return NULL;
549 21197355 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
550 71787 : d = d->next;
551 : return d;
552 : }
553 :
554 : static storage *
555 21108263 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
556 : {
557 21108263 : return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
558 : }
559 :
560 : static sql_delta*
561 21391 : delta_dup(sql_delta *d)
562 : {
563 21391 : ATOMIC_INC(&d->cs.refcnt);
564 21391 : return d;
565 : }
566 :
567 : static void *
568 19575 : col_dup(sql_column *c)
569 : {
570 19575 : return delta_dup(ATOMIC_PTR_GET(&c->data));
571 : }
572 :
573 : static void *
574 3103 : idx_dup(sql_idx *i)
575 : {
576 3103 : if (!ATOMIC_PTR_GET(&i->data))
577 : return NULL;
578 1816 : return delta_dup(ATOMIC_PTR_GET(&i->data));
579 : }
580 :
581 : static storage*
582 1604 : storage_dup(storage *d)
583 : {
584 1604 : ATOMIC_INC(&d->cs.refcnt);
585 1604 : return d;
586 : }
587 :
588 : static void *
589 1604 : del_dup(sql_table *t)
590 : {
591 1604 : return storage_dup(ATOMIC_PTR_GET(&t->data));
592 : }
593 :
594 : static size_t
595 17 : count_inserts( segment *s, sql_trans *tr)
596 : {
597 17 : size_t cnt = 0;
598 :
599 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
600 55 : if (!s->deleted && s->ts == tr->tid)
601 4 : cnt += s->end - s->start;
602 : }
603 17 : return cnt;
604 : }
605 :
606 : static size_t
607 872782 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
608 : {
609 872782 : size_t cnt = 0;
610 :
611 1022773 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
612 : ;
613 :
614 5679298 : for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
615 4806516 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
616 1633009 : cnt += s->end - s->start;
617 : }
618 872782 : return cnt;
619 : }
620 :
621 : static size_t
622 17 : count_deletes( segment *s, sql_trans *tr)
623 : {
624 17 : size_t cnt = 0;
625 :
626 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
627 55 : if (SEG_IS_DELETED(s, tr))
628 17 : cnt += s->end - s->start;
629 : }
630 17 : return cnt;
631 : }
632 :
633 : #define CNT_ACTIVE 10
634 :
635 : static size_t
636 19372921 : count_col(sql_trans *tr, sql_column *c, int access)
637 : {
638 19372921 : storage *d;
639 19372921 : sql_delta *ds;
640 :
641 19372921 : if (!isTable(c->t))
642 : return 0;
643 19372921 : d = tab_timestamp_storage(tr, c->t);
644 19383353 : ds = col_timestamp_delta(tr, c);
645 19408477 : if (!d ||!ds)
646 : return 0;
647 19408477 : if (access == 2)
648 488877 : return ds?ds->cs.ucnt:0;
649 18919600 : if (access == 1)
650 17 : return count_inserts(d->segs->h, tr);
651 18919583 : if (access == QUICK)
652 0 : return d->segs->t?d->segs->t->end:0;
653 18919583 : if (access == CNT_ACTIVE) {
654 872615 : size_t cnt = segs_end(d->segs, tr, c->t);
655 872906 : lock_table(tr->store, c->t->base.id);
656 872788 : cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
657 872797 : unlock_table(tr->store, c->t->base.id);
658 872797 : return cnt;
659 : }
660 18046968 : return segs_end(d->segs, tr, c->t);
661 : }
662 :
663 : static size_t
664 23368 : count_idx(sql_trans *tr, sql_idx *i, int access)
665 : {
666 23368 : storage *d;
667 23368 : sql_delta *ds;
668 :
669 23368 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
670 6719 : return 0;
671 16643 : d = tab_timestamp_storage(tr, i->t);
672 16663 : ds = idx_timestamp_delta(tr, i);
673 16681 : if (!d || !ds)
674 : return 0;
675 16681 : if (access == 2)
676 2871 : return ds?ds->cs.ucnt:0;
677 13810 : if (access == 1)
678 0 : return count_inserts(d->segs->h, tr);
679 13810 : if (access == QUICK)
680 3550 : return d->segs->t?d->segs->t->end:0;
681 10260 : return segs_end(d->segs, tr, i->t);
682 : }
683 :
684 : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
685 : static BAT *
686 14857818 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
687 : {
688 14857818 : BAT *b;
689 :
690 14857818 : assert(access == RD_UPD_ID || access == RD_UPD_VAL);
691 : /* returns the updates for cs */
692 14857818 : if (cs->uibid && cs->uvbid && cs->ucnt) {
693 14217 : if (access == RD_UPD_ID) {
694 8554 : if (!(b = temp_descriptor(cs->uibid)))
695 : return NULL;
696 8554 : if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
697 2254 : (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
698 6300 : oid nil = oid_nil;
699 : /* less then cnt */
700 6300 : BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false, false);
701 6300 : if (!s) {
702 0 : bat_destroy(b);
703 0 : return NULL;
704 : }
705 :
706 6300 : BAT *nb = BATproject(s, b);
707 6300 : bat_destroy(s);
708 6300 : bat_destroy(b);
709 6300 : b = nb;
710 : }
711 : } else {
712 5663 : b = temp_descriptor(cs->uvbid);
713 : }
714 : } else {
715 24740992 : b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
716 : }
717 : return b;
718 : }
719 :
720 : static BAT *
721 0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
722 : {
723 0 : int err = 0;
724 0 : BAT *uv = *UV;
725 0 : BUN cnt = BATcount(ui)+BATcount(oi);
726 0 : BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
727 0 : BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
728 :
729 0 : if (!ni || (uv && !nv)) {
730 0 : bat_destroy(ni);
731 0 : bat_destroy(nv);
732 0 : bat_destroy(ui);
733 0 : bat_destroy(uv);
734 0 : bat_destroy(oi);
735 0 : bat_destroy(ov);
736 0 : return NULL;
737 : }
738 0 : BATiter uvi;
739 0 : BATiter ovi;
740 :
741 0 : if (uv) {
742 0 : uvi = bat_iterator(uv);
743 0 : ovi = bat_iterator(ov);
744 : }
745 :
746 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
747 0 : BUN uip = 0, uie = BATcount(ui);
748 0 : BUN oip = 0, oie = BATcount(oi);
749 :
750 0 : oid uiseqb = ui->tseqbase;
751 0 : oid oiseqb = oi->tseqbase;
752 0 : oid *uipt = NULL, *oipt = NULL;
753 0 : BATiter uii = bat_iterator(ui);
754 0 : BATiter oii = bat_iterator(oi);
755 0 : if (!BATtdensebi(&uii))
756 0 : uipt = uii.base;
757 0 : if (!BATtdensebi(&oii))
758 0 : oipt = oii.base;
759 0 : while (uip < uie && oip < oie && !err) {
760 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
761 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
762 :
763 0 : if (uiid <= oiid) {
764 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
765 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
766 : err = 1;
767 0 : uip++;
768 0 : if (uiid == oiid)
769 0 : oip++;
770 : } else { /* uiid > oiid */
771 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
772 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
773 : err = 1;
774 0 : oip++;
775 : }
776 : }
777 0 : while (uip < uie && !err) {
778 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
779 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
780 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
781 : err = 1;
782 0 : uip++;
783 : }
784 0 : while (oip < oie && !err) {
785 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
786 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
787 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
788 : err = 1;
789 0 : oip++;
790 : }
791 0 : if (uv) {
792 0 : bat_iterator_end(&uvi);
793 0 : bat_iterator_end(&ovi);
794 : }
795 0 : bat_iterator_end(&uii);
796 0 : bat_iterator_end(&oii);
797 0 : bat_destroy(ui);
798 0 : bat_destroy(uv);
799 0 : bat_destroy(oi);
800 0 : bat_destroy(ov);
801 0 : if (!err) {
802 0 : if (nv)
803 0 : *UV = nv;
804 0 : return ni;
805 : }
806 0 : *UV = NULL;
807 0 : bat_destroy(ni);
808 0 : bat_destroy(nv);
809 0 : return NULL;
810 : }
811 :
812 : static sql_delta *
813 9909492 : older_delta( sql_delta *d, sql_trans *tr)
814 : {
815 9909492 : sql_delta *o = d->next;
816 :
817 9918408 : while (o && !o->cs.merged) {
818 8910 : if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
819 : break;
820 : else
821 8916 : o = o->next;
822 : }
823 9909498 : if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
824 0 : return o;
825 : return NULL;
826 : }
827 :
828 : static BAT *
829 9905744 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
830 : {
831 9905744 : assert(tr->active);
832 9905744 : sql_delta *o = NULL;
833 9905744 : BAT *ui = NULL, *uv = NULL;
834 :
835 9905744 : if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
836 : return NULL;
837 9909130 : if (access == RD_UPD_VAL) {
838 4956124 : if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
839 0 : bat_destroy(ui);
840 0 : return NULL;
841 : }
842 : }
843 9909438 : while ((o = older_delta(d, tr)) != NULL) {
844 0 : BAT *oui = NULL, *ouv = NULL;
845 0 : if (!oui)
846 0 : oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
847 0 : if (access == RD_UPD_VAL)
848 0 : ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
849 0 : if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
850 0 : bat_destroy(ui);
851 0 : bat_destroy(uv);
852 0 : bat_destroy(oui);
853 0 : bat_destroy(ouv);
854 0 : return NULL;
855 : }
856 0 : if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
857 : return NULL;
858 : d = o;
859 : }
860 9909537 : if (uv) {
861 4956440 : bat_destroy(ui);
862 4956440 : return uv;
863 : }
864 : return ui;
865 : }
866 :
867 : static BAT *
868 2775 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
869 : {
870 2775 : lock_column(tr->store, c->base.id);
871 2775 : sql_delta *d = col_timestamp_delta(tr, c);
872 2775 : int type = c->type.type->localtype;
873 :
874 2775 : if (!d) {
875 0 : unlock_column(tr->store, c->base.id);
876 0 : return NULL;
877 : }
878 2775 : if (d->cs.st == ST_DICT) {
879 0 : BAT *b = quick_descriptor(d->cs.bid);
880 :
881 0 : type = b->ttype;
882 : }
883 2775 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
884 2775 : unlock_column(tr->store, c->base.id);
885 2775 : return bn;
886 : }
887 :
888 : static BAT *
889 0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
890 : {
891 0 : lock_column(tr->store, i->base.id);
892 0 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
893 0 : sql_delta *d = idx_timestamp_delta(tr, i);
894 :
895 0 : if (!d) {
896 0 : unlock_column(tr->store, i->base.id);
897 0 : return NULL;
898 : }
899 0 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
900 0 : unlock_column(tr->store, i->base.id);
901 0 : return bn;
902 : }
903 :
904 : static BAT *
905 10089113 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
906 : {
907 10089113 : BAT *b;
908 :
909 10089113 : assert(access == RDONLY || access == QUICK || access == RD_EXT);
910 10089113 : assert(cs != NULL);
911 10089113 : if (access == QUICK)
912 179028 : return quick_descriptor(cs->bid);
913 9910085 : if (access == RD_EXT)
914 857 : return temp_descriptor(cs->ebid);
915 9909228 : assert(cs->bid);
916 9909228 : b = temp_descriptor(cs->bid);
917 9910502 : if (b == NULL)
918 : return NULL;
919 9910502 : assert(b->batRestricted == BAT_READ);
920 : /* return slice */
921 9910502 : BAT *s = BATslice(b, 0, cnt);
922 9897882 : bat_destroy(b);
923 9897882 : return s;
924 : }
925 :
926 : static int
927 4952463 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
928 : {
929 4952463 : lock_column(tr->store, c->base.id);
930 4951838 : size_t cnt = count_col(tr, c, 0);
931 4953342 : sql_delta *d = col_timestamp_delta(tr, c);
932 4953034 : int type = c->type.type->localtype;
933 :
934 4953034 : if (!d) {
935 0 : unlock_column(tr->store, c->base.id);
936 0 : return LOG_ERR;
937 : }
938 4953034 : if (d->cs.st == ST_DICT) {
939 2 : BAT *b = quick_descriptor(d->cs.bid);
940 :
941 2 : type = b->ttype;
942 : }
943 :
944 4953034 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
945 4953526 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
946 :
947 4953343 : unlock_column(tr->store, c->base.id);
948 :
949 4953188 : if (*ui == NULL || *uv == NULL) {
950 0 : bat_destroy(*ui);
951 0 : bat_destroy(*uv);
952 0 : return LOG_ERR;
953 : }
954 : return LOG_OK;
955 : }
956 :
957 : static int
958 23 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
959 : {
960 23 : lock_column(tr->store, i->base.id);
961 23 : size_t cnt = count_idx(tr, i, 0);
962 23 : sql_delta *d = idx_timestamp_delta(tr, i);
963 23 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
964 :
965 23 : if (!d) {
966 0 : unlock_column(tr->store, i->base.id);
967 0 : return LOG_ERR;
968 : }
969 :
970 23 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
971 23 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
972 :
973 23 : unlock_column(tr->store, i->base.id);
974 :
975 23 : if (*ui == NULL || *uv == NULL) {
976 0 : bat_destroy(*ui);
977 0 : bat_destroy(*uv);
978 0 : return LOG_ERR;
979 : }
980 : return LOG_OK;
981 : }
982 :
983 : static void * /* BAT * */
984 10076453 : bind_col(sql_trans *tr, sql_column *c, int access)
985 : {
986 10076453 : assert(access == QUICK || tr->active);
987 10076453 : if (!isTable(c->t))
988 : return NULL;
989 10076453 : sql_delta *d = col_timestamp_delta(tr, c);
990 10078609 : if (!d)
991 : return NULL;
992 10078609 : size_t cnt = count_col(tr, c, 0);
993 10082156 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
994 2775 : return bind_ucol(tr, c, access, cnt);
995 10079381 : BAT *b = cs_bind_bat( &d->cs, access, cnt);
996 10075922 : assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
997 : return b;
998 : }
999 :
1000 : static void * /* BAT * */
1001 10707 : bind_idx(sql_trans *tr, sql_idx * i, int access)
1002 : {
1003 10707 : assert(access == QUICK || tr->active);
1004 10707 : if (!isTable(i->t))
1005 : return NULL;
1006 10707 : sql_delta *d = idx_timestamp_delta(tr, i);
1007 10711 : if (!d)
1008 : return NULL;
1009 10711 : size_t cnt = count_idx(tr, i, 0);
1010 10712 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
1011 0 : return bind_uidx(tr, i, access, cnt);
1012 10712 : return cs_bind_bat( &d->cs, access, cnt);
1013 : }
1014 :
1015 : static int
1016 4152 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
1017 : {
1018 4152 : if (!cs->uibid) {
1019 0 : cs->uibid = e_bat(TYPE_oid);
1020 0 : if (cs->uibid == BID_NIL)
1021 : return LOG_ERR;
1022 : }
1023 4152 : if (!cs->uvbid) {
1024 0 : BAT *cur = quick_descriptor(cs->bid);
1025 0 : if (!cur)
1026 : return LOG_ERR;
1027 0 : int type = cur->ttype;
1028 0 : cs->uvbid = e_bat(type);
1029 0 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1030 : return LOG_ERR;
1031 : }
1032 4152 : BAT *ui = temp_descriptor(cs->uibid);
1033 4152 : BAT *uv = temp_descriptor(cs->uvbid);
1034 :
1035 4152 : if (ui == NULL || uv == NULL) {
1036 0 : bat_destroy(ui);
1037 0 : bat_destroy(uv);
1038 0 : return LOG_ERR;
1039 : }
1040 4152 : assert(ui && uv);
1041 4152 : if (isEbat(ui)){
1042 400 : temp_destroy(cs->uibid);
1043 400 : cs->uibid = temp_copy(ui->batCacheid, true, true);
1044 400 : bat_destroy(ui);
1045 400 : if (cs->uibid == BID_NIL ||
1046 400 : (ui = temp_descriptor(cs->uibid)) == NULL) {
1047 0 : bat_destroy(uv);
1048 0 : return LOG_ERR;
1049 : }
1050 : }
1051 4152 : if (isEbat(uv)){
1052 400 : temp_destroy(cs->uvbid);
1053 400 : cs->uvbid = temp_copy(uv->batCacheid, true, true);
1054 400 : bat_destroy(uv);
1055 400 : if (cs->uvbid == BID_NIL ||
1056 400 : (uv = temp_descriptor(cs->uvbid)) == NULL) {
1057 0 : bat_destroy(ui);
1058 0 : return LOG_ERR;
1059 : }
1060 : }
1061 4152 : *Ui = ui;
1062 4152 : *Uv = uv;
1063 4152 : return LOG_OK;
1064 : }
1065 :
1066 : static int
1067 6936 : segments_is_append(segment *s, sql_trans *tr, oid rid)
1068 : {
1069 102577 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1070 102577 : if (s->start <= rid && s->end > rid) {
1071 6936 : if (s->ts == tr->tid && !s->deleted) {
1072 2886 : return 1;
1073 : }
1074 : break;
1075 : }
1076 : }
1077 : return 0;
1078 : }
1079 :
1080 : static int
1081 4050 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
1082 : {
1083 96455 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1084 96455 : if (s->start <= rid && s->end > rid) {
1085 4050 : if (s->ts >= tr->ts && s->deleted) {
1086 0 : return 1;
1087 : }
1088 : break;
1089 : }
1090 : }
1091 : return 0;
1092 : }
1093 :
1094 : static sql_delta *
1095 0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
1096 : {
1097 0 : sql_delta *n = ZNEW(sql_delta);
1098 0 : if (!n)
1099 : return NULL;
1100 0 : *n = *bat;
1101 0 : n->next = NULL;
1102 0 : n->cs.ts = tr->tid;
1103 0 : return n;
1104 : }
1105 :
1106 : static BAT *
1107 17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
1108 : {
1109 17 : BAT *newoffsets = NULL;
1110 17 : sql_delta *bat = *batp;
1111 17 : column_storage *cs = &bat->cs;
1112 17 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1113 :
1114 17 : if (!u)
1115 : return NULL;
1116 17 : BUN max_cnt = (BATcount(u) < 256)?256:(BATcount(u)<65536)?65536:INT_MAX;
1117 17 : if (DICTprepare4append(&newoffsets, i, u) < 0) {
1118 0 : bat_destroy(u);
1119 0 : return NULL;
1120 : } else {
1121 17 : int new = 0;
1122 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1123 17 : if (BATcount(u) >= max_cnt) {
1124 1 : if (max_cnt == INT_MAX) { /* decompress */
1125 0 : if (!(b = temp_descriptor(cs->bid))) {
1126 0 : bat_destroy(u);
1127 0 : return NULL;
1128 : }
1129 0 : if (cs->ucnt) {
1130 0 : BAT *ui = NULL, *uv = NULL;
1131 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1132 0 : bat_destroy(b);
1133 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1134 0 : bat_destroy(nb);
1135 0 : bat_destroy(u);
1136 0 : return NULL;
1137 : }
1138 0 : b = nb;
1139 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1140 0 : bat_destroy(ui);
1141 0 : bat_destroy(uv);
1142 0 : bat_destroy(b);
1143 0 : bat_destroy(u);
1144 : }
1145 0 : bat_destroy(ui);
1146 0 : bat_destroy(uv);
1147 : }
1148 0 : n = DICTdecompress_(b, u, PERSISTENT);
1149 0 : bat_destroy(b);
1150 0 : assert(newoffsets == NULL);
1151 0 : if (!n) {
1152 0 : bat_destroy(u);
1153 0 : return NULL;
1154 : }
1155 0 : if (cs->ts != tr->tid) {
1156 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1157 0 : bat_destroy(n);
1158 0 : return NULL;
1159 : }
1160 0 : cs = &(*batp)->cs;
1161 0 : new = 1;
1162 : }
1163 0 : if (cs->bid && !new)
1164 0 : temp_destroy(cs->bid);
1165 0 : n = transfer_to_systrans(n);
1166 0 : if (n == NULL)
1167 : return NULL;
1168 0 : bat_set_access(n, BAT_READ);
1169 0 : cs->bid = temp_create(n);
1170 0 : bat_destroy(n);
1171 0 : if (cs->ebid && !new)
1172 0 : temp_destroy(cs->ebid);
1173 0 : cs->ebid = 0;
1174 0 : cs->ucnt = 0;
1175 0 : if (cs->uibid && !new)
1176 0 : temp_destroy(cs->uibid);
1177 0 : if (cs->uvbid && !new)
1178 0 : temp_destroy(cs->uvbid);
1179 0 : cs->uibid = cs->uvbid = 0;
1180 0 : cs->st = ST_DEFAULT;
1181 0 : cs->cleared = true;
1182 : } else {
1183 1 : if (!(b = temp_descriptor(cs->bid))) {
1184 0 : bat_destroy(newoffsets);
1185 0 : bat_destroy(u);
1186 0 : return NULL;
1187 : }
1188 2 : n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
1189 1 : bat_destroy(b);
1190 1 : if (!n) {
1191 0 : bat_destroy(newoffsets);
1192 0 : bat_destroy(u);
1193 0 : return NULL;
1194 : }
1195 1 : if (cs->ts != tr->tid) {
1196 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1197 0 : bat_destroy(n);
1198 0 : return NULL;
1199 : }
1200 0 : cs = &(*batp)->cs;
1201 0 : new = 1;
1202 0 : temp_dup(cs->ebid);
1203 0 : if (cs->uibid) {
1204 0 : temp_dup(cs->uibid);
1205 0 : temp_dup(cs->uvbid);
1206 : }
1207 : }
1208 1 : if (cs->bid && !new)
1209 1 : temp_destroy(cs->bid);
1210 1 : n = transfer_to_systrans(n);
1211 1 : if (n == NULL)
1212 : return NULL;
1213 1 : bat_set_access(n, BAT_READ);
1214 1 : cs->bid = temp_create(n);
1215 1 : bat_destroy(n);
1216 1 : cs->cleared = true;
1217 1 : i = newoffsets;
1218 : }
1219 : } else { /* append */
1220 16 : i = newoffsets;
1221 : }
1222 : }
1223 17 : bat_destroy(u);
1224 17 : return i;
1225 : }
1226 :
1227 : static BAT *
1228 0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
1229 : {
1230 0 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1231 0 : BAT *newoffsets = NULL;
1232 0 : BAT *b = NULL, *n = NULL;
1233 :
1234 0 : if (!(b = temp_descriptor(cs->bid)))
1235 : return NULL;
1236 :
1237 0 : if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
1238 0 : bat_destroy(b);
1239 0 : return NULL;
1240 : } else {
1241 : /* returns new offset bat if values within min/max, else decompress */
1242 0 : if (!newoffsets) { /* decompress */
1243 0 : if (cs->ucnt) {
1244 0 : BAT *ui = NULL, *uv = NULL;
1245 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1246 0 : bat_destroy(b);
1247 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1248 0 : bat_destroy(nb);
1249 0 : return NULL;
1250 : }
1251 0 : b = nb;
1252 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1253 0 : bat_destroy(ui);
1254 0 : bat_destroy(uv);
1255 0 : bat_destroy(b);
1256 : }
1257 0 : bat_destroy(ui);
1258 0 : bat_destroy(uv);
1259 : }
1260 0 : n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
1261 0 : bat_destroy(b);
1262 0 : if (!n)
1263 : return NULL;
1264 0 : if (cs->bid)
1265 0 : temp_destroy(cs->bid);
1266 0 : n = transfer_to_systrans(n);
1267 0 : if (n == NULL)
1268 : return NULL;
1269 0 : bat_set_access(n, BAT_READ);
1270 0 : cs->bid = temp_create(n);
1271 0 : cs->ucnt = 0;
1272 0 : if (cs->uibid)
1273 0 : temp_destroy(cs->uibid);
1274 0 : if (cs->uvbid)
1275 0 : temp_destroy(cs->uvbid);
1276 0 : cs->uibid = cs->uvbid = 0;
1277 0 : cs->st = ST_DEFAULT;
1278 0 : cs->cleared = true;
1279 0 : b = n;
1280 : } else { /* append */
1281 : i = newoffsets;
1282 : }
1283 : }
1284 0 : bat_destroy(b);
1285 0 : return i;
1286 : }
1287 :
1288 : /*
1289 : * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
1290 : */
1291 : static int
1292 3107 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
1293 : {
1294 3107 : int res = LOG_OK;
1295 3107 : sql_delta *bat = *batp;
1296 3107 : column_storage *cs = &bat->cs;
1297 3107 : BAT *otids = tids, *oupdates = updates;
1298 :
1299 3107 : if (!BATcount(tids))
1300 : return LOG_OK;
1301 :
1302 3107 : if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
1303 6 : tids = BATunmask(tids);
1304 6 : if (!tids)
1305 : return LOG_ERR;
1306 : }
1307 3107 : if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
1308 0 : updates = BATunmask(updates);
1309 0 : if (!updates) {
1310 0 : if (otids != tids)
1311 0 : bat_destroy(tids);
1312 0 : return LOG_ERR;
1313 : }
1314 3107 : } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
1315 42 : updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
1316 42 : if (!updates) {
1317 0 : if (otids != tids)
1318 0 : bat_destroy(tids);
1319 0 : return LOG_ERR;
1320 : }
1321 : }
1322 :
1323 3107 : if (cs->st == ST_DICT) {
1324 : /* possibly a new array is returned */
1325 4 : BAT *nupdates = dict_append_bat(tr, batp, updates);
1326 4 : bat = *batp;
1327 4 : cs = &bat->cs;
1328 4 : if (oupdates != updates)
1329 0 : bat_destroy(updates);
1330 4 : updates = nupdates;
1331 4 : if (!updates) {
1332 0 : if (otids != tids)
1333 0 : bat_destroy(tids);
1334 0 : return LOG_ERR;
1335 : }
1336 : }
1337 :
1338 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1339 : /* currently only one update delta is possible */
1340 3107 : lock_table(tr->store, t->base.id);
1341 3107 : storage *s = ATOMIC_PTR_GET(&t->data);
1342 3107 : if (!is_new && !cs->cleared) {
1343 2784 : if (!tids->tsorted /* make sure we have simple dense or oids */) {
1344 6 : BAT *sorted, *order;
1345 6 : if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
1346 0 : if (otids != tids)
1347 0 : bat_destroy(tids);
1348 0 : if (oupdates != updates)
1349 0 : bat_destroy(updates);
1350 0 : unlock_table(tr->store, t->base.id);
1351 0 : return LOG_ERR;
1352 : }
1353 6 : if (otids != tids)
1354 0 : bat_destroy(tids);
1355 6 : tids = sorted;
1356 6 : BAT *nupdates = BATproject(order, updates);
1357 6 : bat_destroy(order);
1358 6 : if (oupdates != updates)
1359 0 : bat_destroy(updates);
1360 6 : updates = nupdates;
1361 6 : if (!updates) {
1362 0 : bat_destroy(tids);
1363 0 : unlock_table(tr->store, t->base.id);
1364 0 : return LOG_ERR;
1365 : }
1366 : }
1367 2784 : assert(tids->tsorted);
1368 2784 : BAT *ui = NULL, *uv = NULL;
1369 :
1370 : /* handle updates on just inserted bits */
1371 : /* handle updates on updates (within one transaction) */
1372 2784 : BATiter upi = bat_iterator(updates);
1373 2784 : BUN cnt = 0, ucnt = BATcount(tids);
1374 2784 : BAT *b, *ins = NULL;
1375 2784 : int *msk = NULL;
1376 :
1377 2784 : if((b = temp_descriptor(cs->bid)) == NULL)
1378 : res = LOG_ERR;
1379 :
1380 2784 : if (res == LOG_OK && BATtdense(tids)) {
1381 2580 : oid start = tids->tseqbase, offset = start;
1382 2580 : oid end = start + ucnt;
1383 :
1384 11412 : for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
1385 9487 : if (seg->start <= start && seg->end > start) {
1386 : /* check for delete conflicts */
1387 2580 : if (seg->ts >= tr->ts && seg->deleted) {
1388 0 : res = LOG_CONFLICT;
1389 0 : continue;
1390 : }
1391 :
1392 : /* check for inplace updates */
1393 2580 : BUN lend = end < seg->end?end:seg->end;
1394 2580 : if (seg->ts == tr->tid && !seg->deleted) {
1395 166 : if (!ins) {
1396 166 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1397 166 : if (!ins)
1398 : res = LOG_ERR;
1399 : else {
1400 166 : BATsetcount(ins, ucnt); /* all full updates */
1401 166 : msk = (int*)Tloc(ins, 0);
1402 166 : BUN end = (ucnt+31)/32;
1403 166 : memset(msk, 0, end * sizeof(int));
1404 : }
1405 : }
1406 601 : for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
1407 435 : const void *upd = BUNtail(upi, rid-offset);
1408 435 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1409 0 : res = LOG_ERR;
1410 :
1411 435 : oid word = i/32;
1412 435 : int pos = i%32;
1413 435 : msk[word] |= 1U<<pos;
1414 435 : cnt++;
1415 : }
1416 : }
1417 : }
1418 9487 : if (end < seg->end)
1419 : break;
1420 : }
1421 207 : } else if (res == LOG_OK && complex_cand(tids)) {
1422 3 : struct canditer ci;
1423 3 : segment *seg = s->segs->h;
1424 3 : canditer_init(&ci, NULL, tids);
1425 3 : BUN i = 0;
1426 1036 : while ( seg && res == LOG_OK && i < ucnt) {
1427 1033 : oid rid = canditer_next(&ci);
1428 1033 : if (seg->end <= rid)
1429 13 : seg = ATOMIC_PTR_GET(&seg->next);
1430 1020 : else if (seg->start <= rid && seg->end > rid) {
1431 : /* check for delete conflicts */
1432 1020 : if (seg->ts >= tr->ts && seg->deleted) {
1433 0 : res = LOG_CONFLICT;
1434 0 : continue;
1435 : }
1436 :
1437 : /* check for inplace updates */
1438 1020 : if (seg->ts == tr->tid && !seg->deleted) {
1439 0 : if (!ins) {
1440 0 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1441 0 : if (!ins) {
1442 : res = LOG_ERR;
1443 : break;
1444 : } else {
1445 0 : BATsetcount(ins, ucnt); /* all full updates */
1446 0 : msk = (int*)Tloc(ins, 0);
1447 0 : BUN end = (ucnt+31)/32;
1448 0 : memset(msk, 0, end * sizeof(int));
1449 : }
1450 : }
1451 0 : ptr upd = BUNtail(upi, i);
1452 0 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1453 0 : res = LOG_ERR;
1454 :
1455 0 : oid word = i/32;
1456 0 : int pos = i%32;
1457 0 : msk[word] |= 1U<<pos;
1458 0 : cnt++;
1459 : }
1460 1020 : i++;
1461 : }
1462 : }
1463 201 : } else if (res == LOG_OK) {
1464 201 : BUN i = 0;
1465 201 : oid *rid = Tloc(tids,0);
1466 201 : segment *seg = s->segs->h;
1467 21958 : while ( seg && res == LOG_OK && i < ucnt) {
1468 21757 : if (seg->end <= rid[i])
1469 8325 : seg = ATOMIC_PTR_GET(&seg->next);
1470 13432 : else if (seg->start <= rid[i] && seg->end > rid[i]) {
1471 : /* check for delete conflicts */
1472 13432 : if (seg->ts >= tr->ts && seg->deleted) {
1473 0 : res = LOG_CONFLICT;
1474 0 : continue;
1475 : }
1476 :
1477 : /* check for inplace updates */
1478 13432 : if (seg->ts == tr->tid && !seg->deleted) {
1479 995 : if (!ins) {
1480 80 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1481 80 : if (!ins) {
1482 : res = LOG_ERR;
1483 : break;
1484 : } else {
1485 80 : BATsetcount(ins, ucnt); /* all full updates */
1486 80 : msk = (int*)Tloc(ins, 0);
1487 80 : BUN end = (ucnt+31)/32;
1488 80 : memset(msk, 0, end * sizeof(int));
1489 : }
1490 : }
1491 995 : const void *upd = BUNtail(upi, i);
1492 995 : if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
1493 0 : res = LOG_ERR;
1494 :
1495 995 : oid word = i/32;
1496 995 : int pos = i%32;
1497 995 : msk[word] |= 1U<<pos;
1498 995 : cnt++;
1499 : }
1500 13432 : i++;
1501 : }
1502 : }
1503 : }
1504 :
1505 2784 : if (res == LOG_OK && cnt < ucnt) { /* now handle real updates */
1506 2574 : if (cs->ucnt == 0) {
1507 2472 : if (cnt) {
1508 12 : BAT *nins = BATmaskedcands(0, ucnt, ins, false);
1509 12 : if (nins) {
1510 12 : ui = BATproject(nins, tids);
1511 12 : uv = BATproject(nins, updates);
1512 12 : bat_destroy(nins);
1513 : }
1514 : } else {
1515 2460 : ui = temp_descriptor(tids->batCacheid);
1516 2460 : uv = temp_descriptor(updates->batCacheid);
1517 : }
1518 2472 : if (!ui || !uv) {
1519 : res = LOG_ERR;
1520 : } else {
1521 2472 : temp_destroy(cs->uibid);
1522 2472 : temp_destroy(cs->uvbid);
1523 2472 : ui = transfer_to_systrans(ui);
1524 2472 : uv = transfer_to_systrans(uv);
1525 2472 : if (ui == NULL || uv == NULL) {
1526 0 : BBPreclaim(ui);
1527 0 : BBPreclaim(uv);
1528 : res = LOG_ERR;
1529 : } else {
1530 2472 : cs->uibid = temp_create(ui);
1531 2472 : cs->uvbid = temp_create(uv);
1532 2472 : cs->ucnt = BATcount(ui);
1533 : }
1534 : }
1535 : } else {
1536 102 : BAT *nui = NULL, *nuv = NULL;
1537 :
1538 : /* merge taking msk of inserted into account */
1539 102 : if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
1540 : res = LOG_ERR;
1541 :
1542 102 : if (res == LOG_OK) {
1543 102 : const void *upd = NULL;
1544 102 : nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
1545 102 : nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
1546 :
1547 102 : if (!nui || !nuv) {
1548 : res = LOG_ERR;
1549 : } else {
1550 102 : BATiter ovi = bat_iterator(uv);
1551 :
1552 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
1553 102 : BUN uip = 0, uie = BATcount(ui);
1554 102 : BUN nip = 0, nie = BATcount(tids);
1555 102 : oid uiseqb = ui->tseqbase;
1556 102 : oid niseqb = tids->tseqbase;
1557 102 : oid *uipt = NULL, *nipt = NULL;
1558 102 : BATiter uii = bat_iterator(ui);
1559 102 : BATiter tidsi = bat_iterator(tids);
1560 102 : if (!BATtdensebi(&uii))
1561 97 : uipt = uii.base;
1562 102 : if (!BATtdensebi(&tidsi))
1563 93 : nipt = tidsi.base;
1564 16091 : while (uip < uie && nip < nie && res == LOG_OK) {
1565 15989 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1566 15989 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1567 :
1568 15989 : if (uiv < niv) {
1569 8068 : upd = BUNtail(ovi, uip);
1570 16136 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1571 8068 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1572 : res = LOG_ERR;
1573 8068 : uip++;
1574 7921 : } else if (uiv == niv) {
1575 : /* handle == */
1576 1522 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1577 1522 : upd = BUNtail(upi, nip);
1578 3044 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1579 1522 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1580 : res = LOG_ERR;
1581 : } else {
1582 0 : upd = BUNtail(ovi, uip);
1583 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1584 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1585 : res = LOG_ERR;
1586 : }
1587 1522 : uip++;
1588 1522 : nip++;
1589 : } else { /* uiv > niv */
1590 6399 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1591 6255 : upd = BUNtail(upi, nip);
1592 12510 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1593 6255 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1594 : res = LOG_ERR;
1595 : }
1596 6399 : nip++;
1597 : }
1598 : }
1599 923 : while (uip < uie && res == LOG_OK) {
1600 821 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1601 821 : upd = BUNtail(ovi, uip);
1602 1642 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1603 821 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1604 : res = LOG_ERR;
1605 821 : uip++;
1606 : }
1607 703 : while (nip < nie && res == LOG_OK) {
1608 601 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1609 601 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1610 329 : upd = BUNtail(upi, nip);
1611 658 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1612 329 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1613 : res = LOG_ERR;
1614 : }
1615 601 : nip++;
1616 : }
1617 102 : bat_iterator_end(&uii);
1618 102 : bat_iterator_end(&tidsi);
1619 102 : bat_iterator_end(&ovi);
1620 102 : if (res == LOG_OK) {
1621 102 : temp_destroy(cs->uibid);
1622 102 : temp_destroy(cs->uvbid);
1623 102 : nui = transfer_to_systrans(nui);
1624 102 : nuv = transfer_to_systrans(nuv);
1625 102 : if (nui == NULL || nuv == NULL) {
1626 : res = LOG_ERR;
1627 : } else {
1628 102 : cs->uibid = temp_create(nui);
1629 102 : cs->uvbid = temp_create(nuv);
1630 102 : cs->ucnt = BATcount(nui);
1631 : }
1632 : }
1633 : }
1634 102 : bat_destroy(nui);
1635 102 : bat_destroy(nuv);
1636 : }
1637 : }
1638 : }
1639 2784 : bat_iterator_end(&upi);
1640 2784 : bat_destroy(b);
1641 2784 : unlock_table(tr->store, t->base.id);
1642 2784 : bat_destroy(ins);
1643 2784 : bat_destroy(ui);
1644 2784 : bat_destroy(uv);
1645 2784 : if (otids != tids)
1646 10 : bat_destroy(tids);
1647 2784 : if (oupdates != updates)
1648 11 : bat_destroy(updates);
1649 2784 : return res;
1650 : } else if (is_new || cs->cleared) {
1651 323 : BAT *b = temp_descriptor(cs->bid);
1652 :
1653 323 : if (b == NULL) {
1654 : res = LOG_ERR;
1655 : } else {
1656 323 : if (BATcount(b)==0) {
1657 1 : if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
1658 0 : res = LOG_ERR;
1659 322 : } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
1660 0 : res = LOG_ERR;
1661 323 : BBPcold(b->batCacheid);
1662 323 : bat_destroy(b);
1663 : }
1664 : }
1665 323 : unlock_table(tr->store, t->base.id);
1666 323 : if (otids != tids)
1667 2 : bat_destroy(tids);
1668 323 : if (oupdates != updates)
1669 41 : bat_destroy(updates);
1670 : return res;
1671 : }
1672 :
1673 : static int
1674 3107 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
1675 : {
1676 3107 : return cs_update_bat(tr, bat, t, tids, updates, is_new);
1677 : }
1678 :
1679 : static void *
1680 4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
1681 : {
1682 4 : void *newoffsets = NULL;
1683 4 : sql_delta *bat = *batp;
1684 4 : column_storage *cs = &bat->cs;
1685 4 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1686 :
1687 4 : if (!u)
1688 : return NULL;
1689 4 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1690 4 : if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
1691 0 : bat_destroy(u);
1692 0 : return NULL;
1693 : } else {
1694 4 : int new = 0;
1695 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1696 4 : if (BATcount(u) >= max_cnt) {
1697 0 : if (max_cnt == INT_MAX) { /* decompress */
1698 : if (!(b = temp_descriptor(cs->bid))) {
1699 : bat_destroy(u);
1700 : return NULL;
1701 : }
1702 : n = DICTdecompress_(b, u, PERSISTENT);
1703 : /* TODO decompress updates if any */
1704 : bat_destroy(b);
1705 : assert(newoffsets == NULL);
1706 : if (!n) {
1707 : bat_destroy(u);
1708 : return NULL;
1709 : }
1710 : if (cs->ts != tr->tid) {
1711 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1712 : bat_destroy(n);
1713 : bat_destroy(u);
1714 : return NULL;
1715 : }
1716 : cs = &(*batp)->cs;
1717 : new = 1;
1718 : cs->uibid = cs->uvbid = 0;
1719 : }
1720 : if (cs->bid && !new)
1721 : temp_destroy(cs->bid);
1722 : n = transfer_to_systrans(n);
1723 : if (n == NULL) {
1724 : bat_destroy(u);
1725 : return NULL;
1726 : }
1727 : bat_set_access(n, BAT_READ);
1728 : cs->bid = temp_create(n);
1729 : bat_destroy(n);
1730 : if (cs->ebid && !new)
1731 : temp_destroy(cs->ebid);
1732 : cs->ebid = 0;
1733 : cs->st = ST_DEFAULT;
1734 : /* at append_col the column's storage type is cleared */
1735 : cs->cleared = true;
1736 : } else {
1737 0 : if (!(b = temp_descriptor(cs->bid))) {
1738 0 : GDKfree(newoffsets);
1739 0 : bat_destroy(u);
1740 0 : return NULL;
1741 : }
1742 0 : n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
1743 0 : bat_destroy(b);
1744 0 : if (!n) {
1745 0 : GDKfree(newoffsets);
1746 0 : bat_destroy(u);
1747 0 : return NULL;
1748 : }
1749 0 : if (cs->ts != tr->tid) {
1750 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1751 0 : bat_destroy(u);
1752 0 : bat_destroy(n);
1753 0 : return NULL;
1754 : }
1755 0 : cs = &(*batp)->cs;
1756 0 : new = 1;
1757 0 : temp_dup(cs->ebid);
1758 0 : if (cs->uibid) {
1759 0 : temp_dup(cs->uibid);
1760 0 : temp_dup(cs->uvbid);
1761 : }
1762 : }
1763 0 : if (cs->bid)
1764 0 : temp_destroy(cs->bid);
1765 0 : n = transfer_to_systrans(n);
1766 0 : if (n == NULL) {
1767 0 : bat_destroy(u);
1768 0 : return NULL;
1769 : }
1770 0 : bat_set_access(n, BAT_READ);
1771 0 : cs->bid = temp_create(n);
1772 0 : bat_destroy(n);
1773 0 : cs->cleared = true;
1774 0 : i = newoffsets;
1775 : }
1776 : } else { /* append */
1777 4 : i = newoffsets;
1778 : }
1779 : }
1780 4 : bat_destroy(u);
1781 4 : return i;
1782 : }
1783 :
1784 : static void *
1785 1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
1786 : {
1787 1 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1788 1 : void *newoffsets = NULL;
1789 1 : BAT *b = NULL, *n = NULL;
1790 :
1791 1 : if (!(b = temp_descriptor(cs->bid)))
1792 : return NULL;
1793 :
1794 1 : if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
1795 0 : bat_destroy(b);
1796 0 : return NULL;
1797 : } else {
1798 : /* returns new offset bat if values within min/max, else decompress */
1799 1 : if (!newoffsets) {
1800 1 : n = FORdecompress_(b, offsetval, tt, PERSISTENT);
1801 1 : bat_destroy(b);
1802 1 : if (!n)
1803 : return NULL;
1804 : /* TODO decompress updates if any */
1805 1 : if (cs->bid)
1806 1 : temp_destroy(cs->bid);
1807 1 : n = transfer_to_systrans(n);
1808 1 : if (n == NULL)
1809 : return NULL;
1810 1 : bat_set_access(n, BAT_READ);
1811 1 : cs->bid = temp_create(n);
1812 1 : cs->st = ST_DEFAULT;
1813 : /* at append_col the column's storage type is cleared */
1814 1 : cs->cleared = true;
1815 1 : b = n;
1816 : } else { /* append */
1817 : i = newoffsets;
1818 : }
1819 : }
1820 1 : bat_destroy(b);
1821 1 : return i;
1822 : }
1823 :
1824 : static int
1825 6936 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
1826 : {
1827 6936 : void *oupd = upd;
1828 6936 : sql_delta *bat = *batp;
1829 6936 : column_storage *cs = &bat->cs;
1830 6936 : storage *s = ATOMIC_PTR_GET(&t->data);
1831 6936 : assert(!is_oid_nil(rid));
1832 6936 : int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
1833 :
1834 6936 : if (cs->st == ST_DICT) {
1835 : /* possibly a new array is returned */
1836 0 : upd = dict_append_val(tr, batp, upd, 1);
1837 0 : bat = *batp;
1838 0 : cs = &bat->cs;
1839 0 : if (!upd)
1840 : return LOG_ERR;
1841 : }
1842 :
1843 : /* check if rid is insert ? */
1844 6936 : if (!inplace) {
1845 : /* check conflict */
1846 4050 : if (segments_is_deleted(s->segs->h, tr, rid)) {
1847 0 : if (oupd != upd)
1848 0 : GDKfree(upd);
1849 0 : return LOG_CONFLICT;
1850 : }
1851 4050 : BAT *ui, *uv;
1852 :
1853 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1854 : /* currently only one update delta is possible */
1855 4050 : if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1856 0 : if (oupd != upd)
1857 0 : GDKfree(upd);
1858 0 : return LOG_ERR;
1859 : }
1860 :
1861 4050 : assert(uv->ttype);
1862 4050 : assert(BATcount(ui) == BATcount(uv));
1863 8100 : if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
1864 4050 : BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
1865 0 : if (oupd != upd)
1866 0 : GDKfree(upd);
1867 0 : bat_destroy(ui);
1868 0 : bat_destroy(uv);
1869 0 : return LOG_ERR;
1870 : }
1871 4050 : assert(BATcount(ui) == BATcount(uv));
1872 4050 : bat_destroy(ui);
1873 4050 : bat_destroy(uv);
1874 4050 : cs->ucnt++;
1875 : } else {
1876 2886 : BAT *b = NULL;
1877 :
1878 2886 : if((b = temp_descriptor(cs->bid)) == NULL) {
1879 0 : if (oupd != upd)
1880 0 : GDKfree(upd);
1881 0 : return LOG_ERR;
1882 : }
1883 2886 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
1884 0 : if (oupd != upd)
1885 0 : GDKfree(upd);
1886 0 : bat_destroy(b);
1887 0 : return LOG_ERR;
1888 : }
1889 2886 : bat_destroy(b);
1890 : }
1891 6936 : if (oupd != upd)
1892 0 : GDKfree(upd);
1893 : return LOG_OK;
1894 : }
1895 :
1896 : static int
1897 6936 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
1898 : {
1899 6936 : int res = LOG_OK;
1900 6936 : lock_table(tr->store, t->base.id);
1901 6936 : res = cs_update_val(tr, bat, t, rid, upd, is_new);
1902 6936 : unlock_table(tr->store, t->base.id);
1903 6936 : return res;
1904 : }
1905 :
1906 : static int
1907 158963 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
1908 : {
1909 158963 : (void)tr;
1910 158963 : if (!ocs)
1911 : return LOG_OK;
1912 158963 : cs->bid = ocs->bid;
1913 158963 : cs->ebid = ocs->ebid;
1914 158963 : cs->uibid = ocs->uibid;
1915 158963 : cs->uvbid = ocs->uvbid;
1916 158963 : cs->ucnt = ocs->ucnt;
1917 :
1918 158963 : if (temp) {
1919 25987 : cs->bid = temp_copy(cs->bid, true, false);
1920 25964 : if (cs->bid == BID_NIL)
1921 : return LOG_ERR;
1922 : } else {
1923 132976 : temp_dup(cs->bid);
1924 : }
1925 158960 : if (cs->ebid)
1926 6 : temp_dup(cs->ebid);
1927 158960 : cs->ucnt = 0;
1928 158960 : cs->uibid = e_bat(TYPE_oid);
1929 158989 : cs->uvbid = e_bat(type);
1930 158989 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1931 : return LOG_ERR;
1932 158989 : cs->st = ocs->st;
1933 158989 : return LOG_OK;
1934 : }
1935 :
1936 : static void
1937 321098 : destroy_delta(sql_delta *b, bool recursive)
1938 : {
1939 321098 : if (ATOMIC_DEC(&b->cs.refcnt) > 0)
1940 : return;
1941 299707 : if (recursive && b->next)
1942 128506 : destroy_delta(b->next, true);
1943 299707 : if (b->cs.uibid)
1944 99455 : temp_destroy(b->cs.uibid);
1945 299707 : if (b->cs.uvbid)
1946 99455 : temp_destroy(b->cs.uvbid);
1947 299707 : if (b->cs.bid)
1948 299707 : temp_destroy(b->cs.bid);
1949 299707 : if (b->cs.ebid)
1950 61 : temp_destroy(b->cs.ebid);
1951 299707 : b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
1952 299707 : _DELETE(b);
1953 : }
1954 :
1955 : static sql_delta *
1956 16071986 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
1957 : {
1958 16071986 : sql_delta *obat = ATOMIC_PTR_GET(&c->data);
1959 :
1960 16071986 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
1961 15939055 : return obat;
1962 132931 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
1963 : /* abort */
1964 12 : if (update_conflict)
1965 4 : *update_conflict = true;
1966 8 : else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
1967 8 : return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
1968 4 : return NULL;
1969 : }
1970 132919 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
1971 : return NULL;
1972 132917 : sql_delta* bat = ZNEW(sql_delta);
1973 132967 : if (!bat)
1974 : return NULL;
1975 132967 : ATOMIC_INIT(&bat->cs.refcnt, 1);
1976 132967 : if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
1977 0 : destroy_delta(bat, false);
1978 0 : return NULL;
1979 : }
1980 132973 : bat->cs.ts = tr->tid;
1981 : /* only one writer else abort */
1982 132973 : bat->next = obat;
1983 132973 : if (obat)
1984 132973 : bat->nr_updates = obat->nr_updates;
1985 132973 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
1986 0 : bat->next = NULL;
1987 0 : destroy_delta(bat, false);
1988 0 : if (update_conflict)
1989 0 : *update_conflict = true;
1990 0 : return NULL;
1991 : }
1992 : return bat;
1993 : }
1994 :
1995 : static int
1996 10043 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
1997 : {
1998 10043 : int ok = LOG_OK;
1999 :
2000 10043 : if (is_bat) {
2001 3107 : BAT *tids = incoming_tids;
2002 3107 : BAT *values = incoming_values;
2003 3107 : if (BATcount(tids) == 0)
2004 : return LOG_OK;
2005 3107 : ok = delta_update_bat(tr, delta, table, tids, values, is_new);
2006 : } else {
2007 6936 : ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
2008 : }
2009 : return ok;
2010 : }
2011 :
2012 : static int
2013 10069 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
2014 : {
2015 10069 : int res = LOG_OK;
2016 10069 : bool update_conflict = false;
2017 10069 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2018 :
2019 10069 : if (isbat) {
2020 3130 : BAT *t = tids;
2021 3130 : if (!BATcount(t))
2022 : return LOG_OK;
2023 : }
2024 :
2025 9768 : if (c == NULL)
2026 : return LOG_ERR;
2027 :
2028 9768 : if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
2029 4 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2030 :
2031 9764 : assert(delta && delta->cs.ts == tr->tid);
2032 9764 : assert(c->t->persistence != SQL_DECLARED_TABLE);
2033 9764 : if (odelta != delta)
2034 3457 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
2035 :
2036 9764 : odelta = delta;
2037 9764 : if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
2038 : return res;
2039 9764 : assert(delta == odelta);
2040 9764 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2041 0 : res = sql_trans_alter_storage(tr, c, NULL);
2042 : return res;
2043 : }
2044 :
2045 : static sql_delta *
2046 2457 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
2047 : {
2048 2457 : sql_delta *obat = ATOMIC_PTR_GET(&i->data);
2049 :
2050 2457 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
2051 2429 : return obat;
2052 28 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
2053 : /* abort */
2054 0 : if (update_conflict)
2055 0 : *update_conflict = true;
2056 0 : return NULL;
2057 : }
2058 28 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
2059 : return NULL;
2060 28 : sql_delta* bat = ZNEW(sql_delta);
2061 28 : if (!bat)
2062 : return NULL;
2063 28 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2064 33 : if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
2065 0 : destroy_delta(bat, false);
2066 0 : return NULL;
2067 : }
2068 28 : bat->cs.ts = tr->tid;
2069 : /* only one writer else abort */
2070 28 : bat->next = obat;
2071 28 : if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
2072 0 : bat->next = NULL;
2073 0 : destroy_delta(bat, false);
2074 0 : if (update_conflict)
2075 0 : *update_conflict = true;
2076 0 : return NULL;
2077 : }
2078 : return bat;
2079 : }
2080 :
2081 : static int
2082 782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
2083 : {
2084 782 : int res = LOG_OK;
2085 782 : bool update_conflict = false;
2086 782 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
2087 :
2088 782 : if (isbat) {
2089 782 : BAT *t = tids;
2090 782 : if (!BATcount(t))
2091 : return LOG_OK;
2092 : }
2093 :
2094 279 : if (i == NULL)
2095 : return LOG_ERR;
2096 :
2097 279 : if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
2098 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2099 :
2100 279 : assert(delta && delta->cs.ts == tr->tid);
2101 279 : if (odelta != delta)
2102 22 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
2103 :
2104 279 : odelta = delta;
2105 279 : res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
2106 279 : assert(delta == odelta);
2107 : return res;
2108 : }
2109 :
2110 : static int
2111 149267 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
2112 : {
2113 149267 : BAT *b, *oi = i;
2114 149267 : int err = 0;
2115 149267 : sql_delta *bat = *batp;
2116 :
2117 149267 : assert(!offsets || BATcount(offsets) == BATcount(i));
2118 149267 : if (!BATcount(i))
2119 : return LOG_OK;
2120 149267 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
2121 : return LOG_ERR;
2122 :
2123 149267 : lock_column(tr->store, id);
2124 150145 : if (bat->cs.st == ST_DICT) {
2125 13 : BAT *ni = dict_append_bat(tr, batp, oi);
2126 13 : bat = *batp;
2127 13 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2128 0 : bat_destroy(oi);
2129 13 : oi = ni;
2130 13 : if (!oi) {
2131 0 : unlock_column(tr->store, id);
2132 0 : return LOG_ERR;
2133 : }
2134 : }
2135 150145 : if (bat->cs.st == ST_FOR) {
2136 0 : BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
2137 0 : bat = *batp;
2138 0 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2139 0 : bat_destroy(oi);
2140 0 : oi = ni;
2141 0 : if (!oi) {
2142 0 : unlock_column(tr->store, id);
2143 0 : return LOG_ERR;
2144 : }
2145 : }
2146 :
2147 150145 : b = temp_descriptor(bat->cs.bid);
2148 149694 : if (b == NULL) {
2149 0 : unlock_column(tr->store, id);
2150 0 : if (oi != i)
2151 0 : bat_destroy(oi);
2152 0 : return LOG_ERR;
2153 : }
2154 149694 : if (!offsets && offset == b->hseqbase+BATcount(b)) {
2155 149507 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2156 0 : err = 1;
2157 176 : } else if (!offsets) {
2158 176 : if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
2159 0 : err = 1;
2160 11 : } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
2161 0 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2162 0 : err = 1;
2163 11 : } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
2164 0 : err = 1;
2165 : }
2166 149617 : bat_destroy(b);
2167 150551 : unlock_column(tr->store, id);
2168 :
2169 150347 : if (oi != i)
2170 13 : bat_destroy(oi);
2171 150347 : return (err)?LOG_ERR:LOG_OK;
2172 : }
2173 :
2174 : // Look at the offsets and find where the replacements end and the appends begin.
2175 : static BUN
2176 0 : start_of_appends(BAT *offsets, BUN bcnt)
2177 : {
2178 0 : BUN ocnt = BATcount(offsets);
2179 0 : if (ocnt == 0)
2180 : return 0;
2181 :
2182 0 : BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
2183 0 : if (highest < bcnt)
2184 : // all are replacements
2185 : return ocnt;
2186 :
2187 : // reason backward to find the first append.
2188 : // Suppose offsets has 15 entries, bcnt == 100
2189 : // and the highest offset in offsets is 109.
2190 0 : BUN new_bcnt = highest + 1; // 110
2191 0 : BUN nappends = new_bcnt - bcnt; // 10
2192 0 : BUN nreplacements = ocnt - nappends; // 5
2193 :
2194 : // The first append should be to position bcnt
2195 0 : assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
2196 :
2197 : return nreplacements;
2198 : }
2199 :
2200 :
2201 : static int
2202 15782213 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
2203 : {
2204 15782213 : void *oi = i;
2205 15782213 : BAT *b;
2206 15782213 : lock_column(tr->store, id);
2207 15794849 : sql_delta *bat = *batp;
2208 :
2209 15794849 : if (bat->cs.st == ST_DICT) {
2210 : /* possibly a new array is returned */
2211 4 : i = dict_append_val(tr, batp, i, cnt);
2212 4 : bat = *batp;
2213 4 : if (!i) {
2214 0 : unlock_column(tr->store, id);
2215 0 : return LOG_ERR;
2216 : }
2217 : }
2218 15794849 : if (bat->cs.st == ST_FOR) {
2219 : /* possibly a new array is returned */
2220 1 : i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
2221 1 : bat = *batp;
2222 1 : if (!i) {
2223 0 : unlock_column(tr->store, id);
2224 0 : return LOG_ERR;
2225 : }
2226 : }
2227 :
2228 15794849 : b = temp_descriptor(bat->cs.bid);
2229 15793616 : if (b == NULL) {
2230 0 : if (i != oi)
2231 0 : GDKfree(i);
2232 0 : unlock_column(tr->store, id);
2233 0 : return LOG_ERR;
2234 : }
2235 15793616 : BUN bcnt = BATcount(b);
2236 :
2237 15793616 : if (offsets) {
2238 : // The first few might be replacements while later items might be appends.
2239 : // Handle the replacements here while leaving the appends to the code below.
2240 0 : BUN nreplacements = start_of_appends(offsets, bcnt);
2241 :
2242 0 : oid *start = Tloc(offsets, 0);
2243 0 : if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
2244 0 : bat_destroy(b);
2245 0 : if (i != oi)
2246 0 : GDKfree(i);
2247 0 : unlock_column(tr->store, id);
2248 0 : return LOG_ERR;
2249 : }
2250 :
2251 : // Replacements have been handled. The rest are appends.
2252 0 : assert(offset == oid_nil);
2253 0 : offset = bcnt;
2254 0 : cnt -= nreplacements;
2255 : }
2256 :
2257 15793616 : if (bcnt > offset){
2258 552753 : size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
2259 552753 : if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
2260 0 : bat_destroy(b);
2261 0 : if (i != oi)
2262 0 : GDKfree(i);
2263 0 : unlock_column(tr->store, id);
2264 0 : return LOG_ERR;
2265 : }
2266 553527 : cnt -= ccnt;
2267 553527 : offset += ccnt;
2268 : }
2269 15794390 : if (cnt) {
2270 15240885 : if (BATcount(b) < offset) { /* add space */
2271 8716 : BUN d = offset - BATcount(b);
2272 8716 : if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
2273 0 : bat_destroy(b);
2274 0 : if (i != oi)
2275 0 : GDKfree(i);
2276 0 : unlock_column(tr->store, id);
2277 0 : return LOG_ERR;
2278 : }
2279 : }
2280 15240886 : if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
2281 0 : bat_destroy(b);
2282 0 : if (i != oi)
2283 0 : GDKfree(i);
2284 0 : unlock_column(tr->store, id);
2285 0 : return LOG_ERR;
2286 : }
2287 : }
2288 15797152 : bat_destroy(b);
2289 15793393 : if (i != oi)
2290 4 : GDKfree(i);
2291 15793393 : unlock_column(tr->store, id);
2292 15793393 : return LOG_OK;
2293 : }
2294 :
2295 : static int
2296 25987 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
2297 : {
2298 25987 : if (!(bat->segs = new_segments(tr, 0)))
2299 : return LOG_ERR;
2300 25987 : return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
2301 : }
2302 :
2303 : static int
2304 15931532 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type)
2305 : {
2306 15931532 : int ok = LOG_OK;
2307 :
2308 15931532 : if ((*delta)->cs.merged)
2309 36746 : (*delta)->cs.merged = false; /* TODO needs to move */
2310 15931532 : if (isbat) {
2311 149424 : BAT *bat = incoming_data;
2312 :
2313 149424 : if (BATcount(bat))
2314 149555 : ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
2315 : } else {
2316 15782108 : ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
2317 : }
2318 15951833 : return ok;
2319 : }
2320 :
2321 : static int
2322 15936776 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2323 : {
2324 15936776 : int res = LOG_OK;
2325 15936776 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2326 :
2327 15936776 : if (isbat) {
2328 149511 : BAT *t = data;
2329 149511 : if (!BATcount(t))
2330 : return LOG_OK;
2331 : }
2332 :
2333 15935994 : if ((delta = bind_col_data(tr, c, NULL)) == NULL)
2334 : return LOG_ERR;
2335 :
2336 15932508 : assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
2337 :
2338 15932508 : odelta = delta;
2339 15932508 : if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type)) != LOG_OK)
2340 : return res;
2341 15945590 : if (odelta != delta) {
2342 0 : delta->next = odelta;
2343 0 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
2344 0 : delta->next = NULL;
2345 0 : destroy_delta(delta, false);
2346 0 : return LOG_CONFLICT;
2347 : }
2348 : }
2349 15945590 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2350 1 : res = sql_trans_alter_storage(tr, c, NULL);
2351 : return res;
2352 : }
2353 :
2354 : static int
2355 2184 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2356 : {
2357 2184 : int res = LOG_OK;
2358 2184 : sql_delta *delta;
2359 :
2360 2184 : if (isbat) {
2361 1010 : BAT *t = data;
2362 1010 : if (!BATcount(t))
2363 : return LOG_OK;
2364 : }
2365 :
2366 2172 : if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
2367 : return LOG_ERR;
2368 :
2369 2172 : assert(delta->cs.st == ST_DEFAULT);
2370 :
2371 2172 : res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL);
2372 2172 : return res;
2373 : }
2374 :
2375 : static int
2376 79268 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
2377 : {
2378 79268 : int err = 0;
2379 :
2380 : /* TODO check for conflicting updates */
2381 79268 : (void)rid;
2382 79268 : (void)cnt;
2383 582938 : for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
2384 503670 : sql_column *c = n->data;
2385 503670 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
2386 :
2387 : /* check for active updates */
2388 503670 : if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
2389 : return 1;
2390 : }
2391 : return 0;
2392 : }
2393 :
2394 : static int
2395 75367 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
2396 : {
2397 75367 : int in_transaction = segments_in_transaction(tr, t);
2398 :
2399 75367 : lock_table(tr->store, t->base.id);
2400 : /* find segment of rid, split, mark new segment deleted (for tr->tid) */
2401 75367 : segment *seg = s->segs->h, *p = NULL;
2402 52566107 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2403 52566107 : if (seg->start <= rid && seg->end > rid) {
2404 75367 : if (!SEG_VALID_4_DELETE(seg,tr)) {
2405 4 : unlock_table(tr->store, t->base.id);
2406 4 : return LOG_CONFLICT;
2407 : }
2408 75363 : if (deletes_conflict_updates( tr, t, rid, 1)) {
2409 0 : unlock_table(tr->store, t->base.id);
2410 0 : return LOG_CONFLICT;
2411 : }
2412 75363 : if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
2413 0 : unlock_table(tr->store, t->base.id);
2414 0 : return LOG_ERR;
2415 : }
2416 : break;
2417 : }
2418 : }
2419 75363 : unlock_table(tr->store, t->base.id);
2420 75363 : if (!in_transaction)
2421 13082 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2422 : return LOG_OK;
2423 : }
2424 :
2425 : static int
2426 3903 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
2427 : {
2428 3903 : segment *seg = *Seg, *p = NULL;
2429 13355 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2430 13353 : if (seg->start <= start && seg->end > start) {
2431 3953 : size_t lcnt = cnt;
2432 3953 : if (start+lcnt > seg->end)
2433 54 : lcnt = seg->end-start;
2434 3953 : if (SEG_IS_DELETED(seg, tr)) {
2435 47 : start += lcnt;
2436 47 : cnt -= lcnt;
2437 47 : continue;
2438 3906 : } else if (!SEG_VALID_4_DELETE(seg, tr))
2439 1 : return LOG_CONFLICT;
2440 3905 : if (deletes_conflict_updates( tr, t, start, lcnt))
2441 : return LOG_CONFLICT;
2442 3905 : *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
2443 3905 : if (!seg)
2444 : return LOG_ERR;
2445 3905 : start += lcnt;
2446 3905 : cnt -= lcnt;
2447 : }
2448 13305 : if (start+cnt <= seg->end)
2449 : break;
2450 : }
2451 : return LOG_OK;
2452 : }
2453 :
2454 : static int
2455 721 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
2456 : {
2457 721 : segment *seg = s->segs->h;
2458 721 : return seg_delete_range(tr, t, s, &seg, start, cnt);
2459 : }
2460 :
2461 : static int
2462 319 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
2463 : {
2464 319 : int in_transaction = segments_in_transaction(tr, t);
2465 319 : BAT *oi = i; /* update ids */
2466 319 : int ok = LOG_OK;
2467 :
2468 319 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
2469 : return LOG_ERR;
2470 319 : if (BATcount(i)) {
2471 568 : if (BATtdense(i)) {
2472 249 : size_t start = i->tseqbase;
2473 249 : size_t cnt = BATcount(i);
2474 :
2475 249 : lock_table(tr->store, t->base.id);
2476 249 : ok = delete_range(tr, t, s, start, cnt);
2477 249 : unlock_table(tr->store, t->base.id);
2478 70 : } else if (complex_cand(i)) {
2479 0 : struct canditer ci;
2480 0 : oid f = 0, l = 0, cur = 0;
2481 :
2482 0 : canditer_init(&ci, NULL, i);
2483 0 : cur = f = canditer_next(&ci);
2484 :
2485 0 : lock_table(tr->store, t->base.id);
2486 0 : if (!is_oid_nil(f)) {
2487 0 : segment *seg = s->segs->h;
2488 0 : for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
2489 0 : if (cur+1 == l) {
2490 0 : cur++;
2491 0 : continue;
2492 : }
2493 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2494 0 : f = cur = l;
2495 : }
2496 0 : if (ok == LOG_OK)
2497 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2498 : }
2499 0 : unlock_table(tr->store, t->base.id);
2500 : } else {
2501 70 : if (!i->tsorted) {
2502 0 : assert(oi == i);
2503 0 : BAT *ni = NULL;
2504 0 : if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
2505 0 : ok = LOG_ERR;
2506 0 : if (ni)
2507 0 : i = ni;
2508 : }
2509 70 : assert(i->tsorted);
2510 70 : BUN icnt = BATcount(i);
2511 70 : BATiter ii = bat_iterator(i);
2512 70 : oid *o = ii.base, n = o[0]+1;
2513 70 : size_t lcnt = 1;
2514 :
2515 70 : lock_table(tr->store, t->base.id);
2516 70 : segment *seg = s->segs->h;
2517 23233 : for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
2518 23163 : if (o[i] == n) {
2519 22803 : lcnt++;
2520 22803 : n++;
2521 : } else {
2522 360 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2523 360 : lcnt = 0;
2524 : }
2525 23163 : if (!lcnt) {
2526 360 : n = o[i]+1;
2527 360 : lcnt = 1;
2528 : }
2529 : }
2530 70 : bat_iterator_end(&ii);
2531 70 : if (lcnt && ok == LOG_OK)
2532 70 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2533 70 : unlock_table(tr->store, t->base.id);
2534 : }
2535 : }
2536 319 : if (i != oi)
2537 25 : bat_destroy(i);
2538 : // assert
2539 319 : if (!in_transaction)
2540 271 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2541 : return ok;
2542 : }
2543 :
2544 : static void
2545 52489 : destroy_segments(segments *s)
2546 : {
2547 52489 : if (!s || sql_ref_dec(&s->r) > 0)
2548 0 : return;
2549 52489 : segment *seg = s->h;
2550 116131 : while(seg) {
2551 63642 : segment *n = ATOMIC_PTR_GET(&seg->next);
2552 63642 : ATOMIC_PTR_DESTROY(&seg->next);
2553 63642 : _DELETE(seg);
2554 63642 : seg = n;
2555 : }
2556 52489 : _DELETE(s);
2557 : }
2558 :
2559 : static void
2560 53951 : destroy_storage(storage *bat)
2561 : {
2562 53951 : if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
2563 : return;
2564 52347 : if (bat->next)
2565 6151 : destroy_storage(bat->next);
2566 52347 : destroy_segments(bat->segs);
2567 52347 : if (bat->cs.uibid)
2568 31162 : temp_destroy(bat->cs.uibid);
2569 52347 : if (bat->cs.uvbid)
2570 31162 : temp_destroy(bat->cs.uvbid);
2571 52347 : if (bat->cs.bid)
2572 52347 : temp_destroy(bat->cs.bid);
2573 52347 : bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
2574 52347 : _DELETE(bat);
2575 : }
2576 :
2577 : static int
2578 174928 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
2579 : {
2580 174928 : if (uncommitted) {
2581 446105 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2582 294393 : if (!VALID_4_READ(s->ts,tr))
2583 : return 1;
2584 : } else {
2585 275919 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2586 254039 : if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
2587 : return 1;
2588 : }
2589 :
2590 : return 0;
2591 : }
2592 :
2593 : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
2594 :
2595 : storage *
2596 2207712 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
2597 : {
2598 2207712 : storage *obat;
2599 :
2600 2207712 : obat = ATOMIC_PTR_GET(&t->data);
2601 :
2602 2207712 : if (obat->cs.ts != tr->tid)
2603 1529323 : if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
2604 1529268 : if (obat->cs.ts >= TRANSACTION_ID_BASE) {
2605 : /* abort */
2606 15360 : if (clear)
2607 15360 : *clear = true;
2608 15360 : return NULL;
2609 : }
2610 :
2611 2192352 : if (!clear)
2612 : return obat;
2613 :
2614 : /* remainder is only to handle clear */
2615 26419 : if (segments_conflict(tr, obat->segs, 1)) {
2616 433 : *clear = true;
2617 433 : return NULL;
2618 : }
2619 25987 : if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
2620 : return NULL;
2621 25987 : storage *bat = ZNEW(storage);
2622 25987 : if (!bat)
2623 : return NULL;
2624 25987 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2625 25987 : if (dup_storage(tr, obat, bat) != LOG_OK) {
2626 0 : destroy_storage(bat);
2627 0 : return NULL;
2628 : }
2629 25988 : bat->cs.cleared = true;
2630 25988 : bat->cs.ts = tr->tid;
2631 : /* only one writer else abort */
2632 25988 : bat->next = obat;
2633 25988 : if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
2634 9 : bat->next = NULL;
2635 9 : destroy_storage(bat);
2636 9 : if (clear)
2637 9 : *clear = true;
2638 9 : return NULL;
2639 : }
2640 : return bat;
2641 : }
2642 :
2643 : static int
2644 75734 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
2645 : {
2646 75734 : int ok = LOG_OK;
2647 75734 : BAT *b = ib;
2648 75734 : storage *bat;
2649 :
2650 75734 : if (isbat && !BATcount(b))
2651 : return ok;
2652 :
2653 75686 : if (t == NULL)
2654 : return LOG_ERR;
2655 :
2656 75686 : if ((bat = bind_del_data(tr, t, NULL)) == NULL)
2657 : return LOG_ERR;
2658 :
2659 75686 : if (isbat)
2660 319 : ok = storage_delete_bat(tr, t, bat, ib);
2661 : else
2662 75367 : ok = storage_delete_val(tr, t, bat, *(oid*)ib);
2663 : return ok;
2664 : }
2665 :
2666 : static size_t
2667 0 : dcount_col(sql_trans *tr, sql_column *c)
2668 : {
2669 0 : sql_delta *b;
2670 :
2671 0 : if (!isTable(c->t))
2672 : return 0;
2673 0 : b = col_timestamp_delta(tr, c);
2674 0 : if (!b)
2675 : return 1;
2676 :
2677 0 : storage *s = ATOMIC_PTR_GET(&c->t->data);
2678 0 : if (!s || !s->segs->t)
2679 : return 1;
2680 0 : size_t cnt = s->segs->t->end;
2681 0 : if (cnt) {
2682 0 : BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
2683 0 : size_t dcnt = 0;
2684 :
2685 0 : if (v)
2686 0 : dcnt = BATguess_uniques(v, NULL);
2687 0 : return dcnt;
2688 : }
2689 : return cnt;
2690 : }
2691 :
2692 : static BAT *
2693 3901965 : bind_no_view(BAT *b, bool quick)
2694 : {
2695 3901965 : if (VIEWtparent(b)) { /* If it is a view get the parent BAT */
2696 3897940 : BAT *nb = BBP_desc(VIEWtparent(b));
2697 3897940 : bat_destroy(b);
2698 3897940 : if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
2699 : return NULL;
2700 : }
2701 : return b;
2702 : }
2703 :
2704 : static int
2705 0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
2706 : {
2707 0 : int ok = 0;
2708 0 : assert(tr->active);
2709 0 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2710 0 : return 0;
2711 0 : lock_column(tr->store, c->base.id);
2712 0 : if (unique_est) {
2713 0 : sql_delta *d;
2714 0 : if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
2715 0 : BAT *b;
2716 0 : if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
2717 0 : MT_lock_set(&b->theaplock);
2718 0 : b->tunique_est = *unique_est;
2719 0 : MT_lock_unset(&b->theaplock);
2720 0 : bat_destroy(b);
2721 : }
2722 : }
2723 : }
2724 0 : if (min) {
2725 0 : _DELETE(c->min);
2726 0 : size_t minlen = ATOMlen(c->type.type->localtype, min);
2727 0 : if ((c->min = GDKmalloc(minlen)) != NULL) {
2728 0 : memcpy(c->min, min, minlen);
2729 0 : ok = 1;
2730 : }
2731 : }
2732 0 : if (max) {
2733 0 : _DELETE(c->max);
2734 0 : size_t maxlen = ATOMlen(c->type.type->localtype, max);
2735 0 : if ((c->max = GDKmalloc(maxlen)) != NULL) {
2736 0 : memcpy(c->max, max, maxlen);
2737 0 : ok = 1;
2738 : }
2739 : }
2740 0 : unlock_column(tr->store, c->base.id);
2741 0 : return ok;
2742 : }
2743 :
2744 : static int
2745 19 : min_max_col(sql_trans *tr, sql_column *c)
2746 : {
2747 19 : int ok = 0;
2748 19 : BAT *b = NULL;
2749 19 : sql_delta *d = NULL;
2750 :
2751 19 : assert(tr->active);
2752 19 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2753 0 : return 0;
2754 19 : if (c->min && c->max)
2755 : return 1;
2756 19 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2757 19 : if (d->cs.st == ST_FOR)
2758 : return 0;
2759 19 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2760 19 : lock_column(tr->store, c->base.id);
2761 19 : if (c->min && c->max) {
2762 0 : unlock_column(tr->store, c->base.id);
2763 0 : return 1;
2764 : }
2765 19 : _DELETE(c->min);
2766 19 : _DELETE(c->max);
2767 19 : if ((b = bind_col(tr, c, access))) {
2768 19 : if (!(b = bind_no_view(b, false))) {
2769 0 : unlock_column(tr->store, c->base.id);
2770 0 : return 0;
2771 : }
2772 19 : BATiter bi = bat_iterator(b);
2773 19 : if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
2774 16 : const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
2775 16 : size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
2776 :
2777 16 : if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
2778 0 : _DELETE(c->min);
2779 0 : _DELETE(c->max);
2780 : } else {
2781 16 : memcpy(c->min, nmin, minlen);
2782 16 : memcpy(c->max, nmax, maxlen);
2783 16 : ok = 1;
2784 : }
2785 : }
2786 19 : bat_iterator_end(&bi);
2787 19 : bat_destroy(b);
2788 : }
2789 19 : unlock_column(tr->store, c->base.id);
2790 : }
2791 : return ok;
2792 : }
2793 :
2794 : static size_t
2795 17 : count_segs(segment *s)
2796 : {
2797 17 : size_t nr = 0;
2798 :
2799 72 : for( ; s; s = ATOMIC_PTR_GET(&s->next))
2800 55 : nr++;
2801 17 : return nr;
2802 : }
2803 :
2804 : static size_t
2805 34 : count_del(sql_trans *tr, sql_table *t, int access)
2806 : {
2807 34 : storage *d;
2808 :
2809 34 : if (!isTable(t))
2810 : return 0;
2811 34 : d = tab_timestamp_storage(tr, t);
2812 34 : if (!d)
2813 : return 0;
2814 34 : if (access == 2)
2815 0 : return d->cs.ucnt;
2816 34 : if (access == 1)
2817 0 : return count_inserts(d->segs->h, tr);
2818 34 : if (access == 10) /* special case for counting the number of segments */
2819 17 : return count_segs(d->segs->h);
2820 17 : return count_deletes(d->segs->h, tr);
2821 : }
2822 :
2823 : static int
2824 23154 : sorted_col(sql_trans *tr, sql_column *col)
2825 : {
2826 23154 : int sorted = 0;
2827 :
2828 23154 : assert(tr->active);
2829 23154 : if (!isTable(col->t) || !col->t->s)
2830 : return 0;
2831 :
2832 23154 : if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
2833 23135 : BAT *b = bind_col(tr, col, QUICK);
2834 :
2835 23135 : if (b)
2836 23135 : sorted = b->tsorted || b->trevsorted;
2837 : }
2838 : return sorted;
2839 : }
2840 :
2841 : static int
2842 7986 : unique_col(sql_trans *tr, sql_column *col)
2843 : {
2844 7986 : int distinct = 0;
2845 :
2846 7986 : assert(tr->active);
2847 7986 : if (!isTable(col->t) || !col->t->s)
2848 : return 0;
2849 :
2850 7986 : if (col && ATOMIC_PTR_GET(&col->data)) {
2851 7986 : BAT *b = bind_col(tr, col, QUICK);
2852 :
2853 7986 : if (b)
2854 7986 : distinct = b->tkey;
2855 : }
2856 : return distinct;
2857 : }
2858 :
2859 : static int
2860 2161 : double_elim_col(sql_trans *tr, sql_column *col)
2861 : {
2862 2161 : int de = 0;
2863 2161 : sql_delta *d;
2864 :
2865 2161 : assert(tr->active);
2866 2161 : if (!isTable(col->t) || !col->t->s)
2867 : return 0;
2868 :
2869 2161 : if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
2870 6 : if (d->cs.st == ST_DICT) {
2871 6 : BAT *b = bind_col(tr, col, QUICK);
2872 6 : if (b && b->ttype == TYPE_bte)
2873 : de = 1;
2874 0 : else if (b && b->ttype == TYPE_sht)
2875 2161 : de = 2;
2876 : }
2877 2155 : } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
2878 2155 : BAT *b = bind_col(tr, col, QUICK);
2879 :
2880 2155 : if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
2881 2155 : de = GDK_ELIMDOUBLES(b->tvheap);
2882 2155 : if (de)
2883 1925 : de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
2884 : }
2885 1925 : assert(de >= 0 && de <= 16);
2886 : }
2887 : return de;
2888 : }
2889 :
2890 : static int
2891 3937068 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
2892 : {
2893 3937068 : int ok = 0;
2894 3937068 : BAT *b = NULL, *off = NULL, *upv = NULL;
2895 3937068 : sql_delta *d = NULL;
2896 :
2897 3937068 : (void) tr;
2898 3937068 : assert(tr->active);
2899 3937068 : *nonil = false;
2900 3937068 : *unique = false;
2901 3937068 : *unique_est = 0.0;
2902 3937068 : if (!c || !isTable(c->t) || !c->t->s)
2903 : return ok;
2904 :
2905 3936830 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2906 3898213 : if (d->cs.st == ST_FOR) {
2907 27 : *nonil = true; /* TODO for min/max. I will do it later */
2908 27 : return ok;
2909 : }
2910 3898186 : int eclass = c->type.type->eclass;
2911 3898186 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2912 3898186 : if ((b = bind_col(tr, c, access))) {
2913 3898751 : if (!(b = bind_no_view(b, false)))
2914 0 : return ok;
2915 3898832 : BATiter bi = bat_iterator(b);
2916 3898484 : *nonil = bi.nonil && !bi.nil;
2917 :
2918 3898484 : if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
2919 3591335 : d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
2920 2311444 : if (c->min && VALinit(min, bi.type, c->min))
2921 : ok |= 1;
2922 2311390 : else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
2923 2301532 : ok |= 1;
2924 2311346 : if (c->max && VALinit(max, bi.type, c->max))
2925 54 : ok |= 2;
2926 2311308 : else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
2927 2295261 : ok |= 2;
2928 : }
2929 3898355 : if (d->cs.ucnt == 0) {
2930 3894265 : if (d->cs.st == ST_DEFAULT) {
2931 3893565 : *unique = bi.key;
2932 3893565 : *unique_est = bi.unique_est;
2933 3893565 : if (*unique_est == 0)
2934 1253700 : *unique_est = (double)BATguess_uniques(b,NULL);
2935 700 : } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
2936 : /* for dict, check the offsets bat for uniqueness */
2937 700 : MT_lock_set(&off->theaplock);
2938 700 : *unique = off->tkey;
2939 700 : *unique_est = off->tunique_est;
2940 700 : MT_lock_unset(&off->theaplock);
2941 : }
2942 : }
2943 3898784 : bat_iterator_end(&bi);
2944 3898709 : bat_destroy(b);
2945 3898878 : if (*nonil && d->cs.ucnt > 0) {
2946 : /* This could use a quick descriptor */
2947 2775 : if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
2948 0 : *nonil = false;
2949 : } else {
2950 2775 : MT_lock_set(&upv->theaplock);
2951 2775 : *nonil &= upv->tnonil && !upv->tnil;
2952 2775 : MT_lock_unset(&upv->theaplock);
2953 2775 : bat_destroy(upv);
2954 : }
2955 : }
2956 : }
2957 : }
2958 : return ok;
2959 : }
2960 :
2961 : static int
2962 257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
2963 : {
2964 257 : assert(tr->active);
2965 257 : if (!isTable(col->t) || !col->t->s)
2966 : return LOG_OK;
2967 :
2968 252 : if (col && ATOMIC_PTR_GET(&col->data)) {
2969 252 : BAT *b = bind_col(tr, col, QUICK);
2970 :
2971 252 : if (b) { /* add props for ranges [min, max> */
2972 252 : MT_lock_set(&b->theaplock);
2973 252 : if (add_range) {
2974 179 : BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
2975 179 : if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
2976 103 : BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
2977 : else
2978 76 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2979 179 : if (!pt->with_nills || !col->null)
2980 117 : BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
2981 : } else {
2982 73 : BATrmprop_nolock(b, GDK_MIN_BOUND);
2983 73 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2984 73 : BATrmprop_nolock(b, GDK_NOT_NULL);
2985 : }
2986 252 : MT_lock_unset(&b->theaplock);
2987 : }
2988 : }
2989 : return LOG_OK;
2990 : }
2991 :
2992 : static int
2993 4737 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
2994 : {
2995 4737 : assert(tr->active);
2996 4737 : if (!isTable(col->t) || !col->t->s)
2997 : return LOG_OK;
2998 :
2999 4707 : if (col && ATOMIC_PTR_GET(&col->data)) {
3000 4707 : BAT *b = bind_col(tr, col, QUICK);
3001 :
3002 4707 : if (b) { /* add props for ranges [min, max> */
3003 4707 : if (not_null) {
3004 4705 : BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
3005 : } else {
3006 2 : BATrmprop(b, GDK_NOT_NULL);
3007 : }
3008 : }
3009 : }
3010 : return LOG_OK;
3011 : }
3012 :
3013 : static int
3014 32528 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
3015 : {
3016 32528 : sqlstore *store = tr->store;
3017 32528 : int bid = log_find_bat(store->logger, id);
3018 32528 : if (bid <= 0)
3019 : return LOG_ERR;
3020 32528 : cs->bid = temp_dup(bid);
3021 32528 : cs->ucnt = 0;
3022 32528 : cs->uibid = e_bat(TYPE_oid);
3023 32528 : cs->uvbid = e_bat(type);
3024 32528 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
3025 : return LOG_ERR;
3026 : return LOG_OK;
3027 : }
3028 :
3029 : static int
3030 68779 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
3031 : {
3032 68779 : int res = LOG_OK;
3033 68779 : gdk_return ok;
3034 68779 : BAT *b = temp_descriptor(bat->cs.bid);
3035 :
3036 68779 : if (b == NULL)
3037 : return LOG_ERR;
3038 :
3039 68779 : if (!bat->cs.uibid)
3040 68771 : bat->cs.uibid = e_bat(TYPE_oid);
3041 68779 : if (!bat->cs.uvbid)
3042 68771 : bat->cs.uvbid = e_bat(b->ttype);
3043 68779 : if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
3044 0 : res = LOG_ERR;
3045 68779 : if (GDKinmemory(0)) {
3046 178 : bat_destroy(b);
3047 178 : return res;
3048 : }
3049 :
3050 68601 : bat_set_access(b, BAT_READ);
3051 68601 : sqlstore *store = tr->store;
3052 68601 : ok = log_bat_persists(store->logger, b, id);
3053 68601 : bat_destroy(b);
3054 68601 : if(res != LOG_OK)
3055 : return res;
3056 68601 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3057 : }
3058 :
3059 : static int
3060 0 : new_persistent_delta( sql_delta *bat)
3061 : {
3062 0 : bat->cs.ucnt = 0;
3063 0 : return LOG_OK;
3064 : }
3065 :
3066 : static void
3067 132293 : create_delta( sql_delta *d, BAT *b)
3068 : {
3069 132293 : bat_set_access(b, BAT_READ);
3070 132293 : d->cs.bid = temp_create(b);
3071 132293 : d->cs.uibid = d->cs.uvbid = 0;
3072 132293 : d->cs.ucnt = 0;
3073 132293 : }
3074 :
3075 : static bat
3076 7313 : copyBat (bat i, int type, oid seq)
3077 : {
3078 7313 : BAT *b, *tb;
3079 7313 : bat res;
3080 :
3081 7313 : if (!i)
3082 : return i;
3083 7313 : tb = quick_descriptor(i);
3084 7313 : if (tb == NULL)
3085 : return 0;
3086 7313 : b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
3087 7313 : if (b == NULL)
3088 : return 0;
3089 :
3090 7313 : bat_set_access(b, BAT_READ);
3091 :
3092 7313 : res = temp_create(b);
3093 7313 : bat_destroy(b);
3094 7313 : return res;
3095 : }
3096 :
3097 : static int
3098 157231 : create_col(sql_trans *tr, sql_column *c)
3099 : {
3100 157231 : int ok = LOG_OK, new = 0;
3101 157231 : int type = c->type.type->localtype;
3102 157231 : sql_delta *bat = ATOMIC_PTR_GET(&c->data);
3103 :
3104 157231 : if (!bat) {
3105 157231 : new = 1;
3106 157231 : bat = ZNEW(sql_delta);
3107 157231 : if (!bat)
3108 : return LOG_ERR;
3109 157231 : ATOMIC_PTR_SET(&c->data, bat);
3110 157231 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3111 : }
3112 :
3113 157231 : if (new)
3114 157231 : bat->cs.ts = tr->tid;
3115 :
3116 157231 : if (!isNew(c)&& !isTempTable(c->t)){
3117 24916 : bat->cs.ts = tr->ts;
3118 24916 : ok = load_cs(tr, &bat->cs, type, c->base.id);
3119 24916 : if (ok == LOG_OK && c->storage_type) {
3120 4 : if (strcmp(c->storage_type, "DICT") == 0) {
3121 2 : sqlstore *store = tr->store;
3122 2 : int bid = log_find_bat(store->logger, -c->base.id);
3123 2 : if (bid <= 0)
3124 : return LOG_ERR;
3125 2 : bat->cs.ebid = temp_dup(bid);
3126 2 : bat->cs.st = ST_DICT;
3127 2 : } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
3128 2 : bat->cs.st = ST_FOR;
3129 : }
3130 : }
3131 24916 : return ok;
3132 132315 : } else if (bat && bat->cs.bid) {
3133 0 : return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
3134 : } else {
3135 132315 : sql_column *fc = NULL;
3136 132315 : size_t cnt = 0;
3137 :
3138 : /* alter ? */
3139 132315 : if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
3140 80325 : storage *s = tab_timestamp_storage(tr, fc->t);
3141 80325 : if (s == NULL)
3142 : return LOG_ERR;
3143 80325 : cnt = segs_end(s->segs, tr, c->t);
3144 : }
3145 132315 : if (cnt && fc != c) {
3146 22 : sql_delta *d = ATOMIC_PTR_GET(&fc->data);
3147 :
3148 22 : if (d->cs.bid) {
3149 22 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3150 22 : if(bat->cs.bid == BID_NIL)
3151 22 : ok = LOG_ERR;
3152 : }
3153 22 : if (d->cs.uibid) {
3154 10 : bat->cs.uibid = e_bat(TYPE_oid);
3155 10 : if (bat->cs.uibid == BID_NIL)
3156 22 : ok = LOG_ERR;
3157 : }
3158 22 : if (d->cs.uvbid) {
3159 10 : bat->cs.uvbid = e_bat(type);
3160 10 : if(bat->cs.uvbid == BID_NIL)
3161 0 : ok = LOG_ERR;
3162 : }
3163 : } else {
3164 132293 : BAT *b = bat_new(type, c->t->sz, PERSISTENT);
3165 132293 : if (!b) {
3166 : ok = LOG_ERR;
3167 : } else {
3168 132293 : create_delta(ATOMIC_PTR_GET(&c->data), b);
3169 132293 : bat_destroy(b);
3170 : }
3171 :
3172 132293 : if (!new) {
3173 0 : bat->cs.uibid = e_bat(TYPE_oid);
3174 0 : if (bat->cs.uibid == BID_NIL)
3175 0 : ok = LOG_ERR;
3176 0 : bat->cs.uvbid = e_bat(type);
3177 0 : if(bat->cs.uvbid == BID_NIL)
3178 0 : ok = LOG_ERR;
3179 : }
3180 : }
3181 132315 : bat->cs.ucnt = 0;
3182 :
3183 132315 : if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
3184 94 : trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
3185 : }
3186 : return ok;
3187 : }
3188 :
3189 : static int
3190 62391 : log_create_col_(sql_trans *tr, sql_column *c)
3191 : {
3192 62391 : assert(!isTempTable(c->t));
3193 62391 : return log_create_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3194 : }
3195 :
3196 : static int
3197 90 : log_create_col(sql_trans *tr, sql_change *change)
3198 : {
3199 90 : return log_create_col_(tr, (sql_column*)change->obj);
3200 : }
3201 :
3202 : static int
3203 119703 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
3204 : {
3205 119703 : (void) t; // TODO transaction_layer_revamp: remove if unnecessary
3206 119703 : (void)oldest;
3207 119703 : assert(delta->cs.ts == tr->tid);
3208 119703 : delta->cs.ts = commit_ts;
3209 :
3210 119703 : assert(delta->next == NULL);
3211 119703 : if (!delta->cs.merged)
3212 119702 : delta->nr_updates += merge_delta(delta);
3213 119703 : if (!tr->parent)
3214 119699 : base->new = 0;
3215 119703 : return LOG_OK;
3216 : }
3217 :
3218 : static int
3219 94 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3220 : {
3221 94 : sql_column *c = (sql_column*)change->obj;
3222 94 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3223 94 : if (!tr->parent)
3224 93 : c->base.new = 0;
3225 94 : return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
3226 : }
3227 :
3228 : /* will be called for new idx's and when new index columns are created */
3229 : static int
3230 9695 : create_idx(sql_trans *tr, sql_idx *ni)
3231 : {
3232 9695 : int ok = LOG_OK, new = 0;
3233 9695 : sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
3234 9695 : int type = TYPE_lng;
3235 :
3236 9695 : if (oid_index(ni->type))
3237 954 : type = TYPE_oid;
3238 :
3239 9695 : if (!bat) {
3240 9695 : new = 1;
3241 9695 : bat = ZNEW(sql_delta);
3242 9695 : if (!bat)
3243 : return LOG_ERR;
3244 9695 : ATOMIC_PTR_INIT(&ni->data, bat);
3245 9695 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3246 : }
3247 :
3248 9695 : if (new)
3249 9695 : bat->cs.ts = tr->tid;
3250 :
3251 9695 : if (!isNew(ni) && !isTempTable(ni->t)){
3252 2404 : bat->cs.ts = 1;
3253 2404 : return load_cs(tr, &bat->cs, type, ni->base.id);
3254 7291 : } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
3255 0 : return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
3256 : } else {
3257 7291 : sql_column *c = ol_first_node(ni->t->columns)->data;
3258 7291 : sql_delta *d = col_timestamp_delta(tr, c);
3259 :
3260 7291 : if (d) {
3261 : /* Here we also handle indices created through alter stmts */
3262 : /* These need to be created aligned to the existing data */
3263 7291 : if (d->cs.bid) {
3264 7291 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3265 7291 : if(bat->cs.bid == BID_NIL)
3266 7291 : ok = LOG_ERR;
3267 : }
3268 : } else {
3269 : return LOG_ERR;
3270 : }
3271 :
3272 7291 : bat->cs.ucnt = 0;
3273 :
3274 7291 : if (!new) {
3275 0 : bat->cs.uibid = e_bat(TYPE_oid);
3276 0 : if (bat->cs.uibid == BID_NIL)
3277 0 : ok = LOG_ERR;
3278 0 : bat->cs.uvbid = e_bat(type);
3279 0 : if(bat->cs.uvbid == BID_NIL)
3280 0 : ok = LOG_ERR;
3281 : }
3282 7291 : bat->cs.ucnt = 0;
3283 7291 : if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
3284 632 : trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
3285 : }
3286 : return ok;
3287 : }
3288 :
3289 : static int
3290 6388 : log_create_idx_(sql_trans *tr, sql_idx *i)
3291 : {
3292 6388 : assert(!isTempTable(i->t));
3293 6388 : return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3294 : }
3295 :
3296 : static int
3297 618 : log_create_idx(sql_trans *tr, sql_change *change)
3298 : {
3299 618 : return log_create_idx_(tr, (sql_idx*)change->obj);
3300 : }
3301 :
3302 : static int
3303 632 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3304 : {
3305 632 : sql_idx *i = (sql_idx*)change->obj;
3306 632 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3307 632 : if (!tr->parent)
3308 632 : i->base.new = 0;
3309 632 : return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
3310 : return LOG_OK;
3311 : }
3312 :
3313 : static int
3314 5208 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
3315 : {
3316 5208 : int ok = load_cs(tr, &s->cs, TYPE_msk, id);
3317 5208 : BAT *b = NULL, *ib = NULL;
3318 :
3319 5208 : if (ok != LOG_OK)
3320 : return ok;
3321 5208 : if (!(b = temp_descriptor(s->cs.bid)))
3322 : return LOG_ERR;
3323 5208 : ib = b;
3324 :
3325 5208 : if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
3326 0 : bat_destroy(ib);
3327 0 : return LOG_ERR;
3328 : }
3329 :
3330 5208 : if (BATcount(b)) {
3331 421 : if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
3332 0 : bat_destroy(ib);
3333 0 : return LOG_ERR;
3334 : }
3335 645 : if (BATtdense(b)) {
3336 224 : size_t start = b->tseqbase;
3337 224 : size_t cnt = BATcount(b);
3338 224 : ok = delete_range(tr, t, s, start, cnt);
3339 : } else {
3340 197 : assert(b->tsorted);
3341 197 : BUN icnt = BATcount(b);
3342 197 : BATiter bi = bat_iterator(b);
3343 197 : size_t lcnt = 1;
3344 197 : oid n;
3345 197 : segment *seg = s->segs->h;
3346 197 : if (complex_cand(b)) {
3347 0 : oid o = * (oid *) Tpos(&bi, 0);
3348 0 : n = o + 1;
3349 0 : for (BUN i = 1; i < icnt; i++) {
3350 0 : o = * (oid *) Tpos(&bi, i);
3351 0 : if (o == n) {
3352 0 : lcnt++;
3353 0 : n++;
3354 : } else {
3355 0 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3356 : break;
3357 : lcnt = 0;
3358 : }
3359 0 : if (!lcnt) {
3360 0 : n = o + 1;
3361 0 : lcnt = 1;
3362 : }
3363 : }
3364 : } else {
3365 197 : oid *o = bi.base;
3366 197 : n = o[0]+1;
3367 294880 : for (size_t i=1; i<icnt; i++) {
3368 294683 : if (o[i] == n) {
3369 291931 : lcnt++;
3370 291931 : n++;
3371 : } else {
3372 2752 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3373 : break;
3374 : lcnt = 0;
3375 : }
3376 294683 : if (!lcnt) {
3377 2752 : n = o[i]+1;
3378 2752 : lcnt = 1;
3379 : }
3380 : }
3381 : }
3382 197 : if (lcnt && ok == LOG_OK)
3383 197 : ok = delete_range(tr, t, s, n-lcnt, lcnt);
3384 197 : bat_iterator_end(&bi);
3385 : }
3386 421 : if (ok == LOG_OK)
3387 7016 : for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
3388 6595 : if (seg->ts == tr->tid)
3389 3429 : seg->ts = 1;
3390 : } else {
3391 4787 : if (ok == LOG_OK) {
3392 4787 : BAT *bb = quick_descriptor(s->cs.bid);
3393 :
3394 4787 : if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
3395 : ok = LOG_ERR;
3396 : } else {
3397 4787 : segment *seg = s->segs->h;
3398 4787 : if (seg->ts == tr->tid)
3399 4787 : seg->ts = 1;
3400 : }
3401 : }
3402 : }
3403 5208 : if (b != ib)
3404 5208 : bat_destroy(b);
3405 5208 : bat_destroy(ib);
3406 :
3407 5208 : return ok;
3408 : }
3409 :
3410 : static int
3411 26399 : create_del(sql_trans *tr, sql_table *t)
3412 : {
3413 26399 : int ok = LOG_OK, new = 0;
3414 26399 : BAT *b;
3415 26399 : storage *bat = ATOMIC_PTR_GET(&t->data);
3416 :
3417 26399 : if (!bat) {
3418 26399 : new = 1;
3419 26399 : bat = ZNEW(storage);
3420 26399 : if(!bat)
3421 : return LOG_ERR;
3422 26399 : ATOMIC_PTR_INIT(&t->data, bat);
3423 26399 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3424 26399 : bat->cs.ts = tr->tid;
3425 : }
3426 :
3427 26399 : if (!isNew(t) && !isTempTable(t)) {
3428 5208 : bat->cs.ts = tr->ts;
3429 5208 : return load_storage(tr, t, bat, t->base.id);
3430 21191 : } else if (bat->cs.bid) {
3431 : return ok;
3432 : } else {
3433 21191 : assert(!bat->segs);
3434 21191 : if (!(bat->segs = new_segments(tr, 0)))
3435 : return LOG_ERR;
3436 :
3437 21191 : b = bat_new(TYPE_msk, t->sz, PERSISTENT);
3438 21191 : if(b != NULL) {
3439 21191 : bat_set_access(b, BAT_READ);
3440 21191 : bat->cs.bid = temp_create(b);
3441 21191 : bat_destroy(b);
3442 : } else {
3443 : return LOG_ERR;
3444 : }
3445 21191 : if (new)
3446 27988 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
3447 : }
3448 : return ok;
3449 : }
3450 :
3451 : static int
3452 199193 : log_segment(sql_trans *tr, segment *s, sqlid id)
3453 : {
3454 199193 : sqlstore *store = tr->store;
3455 199193 : msk m = s->deleted;
3456 199193 : return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
3457 : }
3458 :
3459 : static int
3460 97721 : log_segments(sql_trans *tr, segments *segs, sqlid id)
3461 : {
3462 : /* log segments */
3463 97721 : lock_table(tr->store, id);
3464 495491 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3465 397770 : unlock_table(tr->store, id);
3466 397770 : if (seg->ts == tr->tid && seg->end-seg->start) {
3467 151137 : if (log_segment(tr, seg, id) != LOG_OK) {
3468 : return LOG_ERR;
3469 : }
3470 : }
3471 397770 : lock_table(tr->store, id);
3472 : }
3473 97721 : unlock_table(tr->store, id);
3474 97721 : return LOG_OK;
3475 : }
3476 :
3477 : static int
3478 12630 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
3479 : {
3480 12630 : BAT *b;
3481 12630 : int ok = LOG_OK;
3482 :
3483 12630 : if (GDKinmemory(0))
3484 : return LOG_OK;
3485 :
3486 12597 : b = temp_descriptor(bat->cs.bid);
3487 12597 : if (b == NULL)
3488 : return LOG_ERR;
3489 :
3490 12597 : sqlstore *store = tr->store;
3491 12597 : bat_set_access(b, BAT_READ);
3492 12597 : if (ok == LOG_OK)
3493 12597 : ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
3494 12597 : if (ok == LOG_OK)
3495 12597 : ok = log_segments(tr, bat->segs, t->base.id);
3496 12597 : bat_destroy(b);
3497 12597 : return ok;
3498 : }
3499 :
3500 : static int
3501 12644 : log_create_del(sql_trans *tr, sql_change *change)
3502 : {
3503 12644 : int ok = LOG_OK;
3504 12644 : sql_table *t = (sql_table*)change->obj;
3505 :
3506 12644 : if (t->base.deleted)
3507 : return ok;
3508 12630 : assert(!isTempTable(t));
3509 12630 : ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
3510 12630 : if (ok == LOG_OK) {
3511 74931 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3512 62301 : sql_column *c = n->data;
3513 :
3514 62301 : ok = log_create_col_(tr, c);
3515 : }
3516 12630 : if (t->idxs) {
3517 18412 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3518 5782 : sql_idx *i = n->data;
3519 :
3520 5782 : if (ATOMIC_PTR_GET(&i->data))
3521 5770 : ok = log_create_idx_(tr, i);
3522 : }
3523 : }
3524 : }
3525 : return ok;
3526 : }
3527 :
3528 : static int
3529 21193 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3530 : {
3531 21193 : int ok = LOG_OK;
3532 21193 : sql_table *t = (sql_table*)change->obj;
3533 21193 : storage *dbat = ATOMIC_PTR_GET(&t->data);
3534 :
3535 21193 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
3536 120 : assert(isTempTable(t));
3537 120 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
3538 120 : if (commit_ts) dbat->segs->h->ts = commit_ts;
3539 120 : return ok;
3540 : }
3541 :
3542 21073 : if (!commit_ts) /* rollback handled by ? */
3543 : return ok;
3544 19218 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
3545 19218 : assert(ok == LOG_OK);
3546 19218 : if (ok != LOG_OK)
3547 : return ok;
3548 19218 : merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
3549 19218 : assert(dbat->cs.ts == tr->tid);
3550 19218 : dbat->cs.ts = commit_ts;
3551 19218 : if (ok == LOG_OK) {
3552 132412 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3553 113194 : sql_column *c = n->data;
3554 113194 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3555 :
3556 113194 : ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
3557 : }
3558 19218 : if (t->idxs) {
3559 25013 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3560 5795 : sql_idx *i = n->data;
3561 5795 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3562 :
3563 5795 : if (delta)
3564 5783 : ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
3565 : }
3566 : }
3567 19218 : if (!tr->parent)
3568 19216 : t->base.new = 0;
3569 : }
3570 19218 : if (!tr->parent)
3571 19216 : t->base.new = 0;
3572 : return ok;
3573 : }
3574 :
3575 : static int
3576 21280 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
3577 : {
3578 21280 : gdk_return ok = GDK_SUCCEED;
3579 :
3580 21280 : sqlstore *store = tr->store;
3581 21280 : if (!GDKinmemory(0) && b && b->cs.bid)
3582 18896 : ok = log_bat_transient(store->logger, id);
3583 21280 : if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
3584 25 : ok = log_bat_transient(store->logger, -id);
3585 21280 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3586 : }
3587 :
3588 : static int
3589 176601 : destroy_col(sqlstore *store, sql_column *c)
3590 : {
3591 176601 : (void)store;
3592 176601 : if (ATOMIC_PTR_GET(&c->data))
3593 176601 : destroy_delta(ATOMIC_PTR_GET(&c->data), true);
3594 176601 : ATOMIC_PTR_SET(&c->data, NULL);
3595 176601 : return LOG_OK;
3596 : }
3597 :
3598 : static int
3599 19395 : log_destroy_col_(sql_trans *tr, sql_column *c)
3600 : {
3601 19395 : int ok = LOG_OK;
3602 19395 : assert(!isTempTable(c->t));
3603 19395 : if (!tr->parent) /* don't write save point commits */
3604 19395 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3605 19395 : return ok;
3606 : }
3607 :
3608 : static int
3609 19395 : log_destroy_col(sql_trans *tr, sql_change *change)
3610 : {
3611 19395 : sql_column *c = (sql_column*)change->obj;
3612 19395 : int res = log_destroy_col_(tr, c);
3613 19395 : change->obj = NULL;
3614 19395 : column_destroy(tr->store, c);
3615 19395 : return res;
3616 : }
3617 :
3618 : static int
3619 11494 : destroy_idx(sqlstore *store, sql_idx *i)
3620 : {
3621 11494 : (void)store;
3622 11494 : if (ATOMIC_PTR_GET(&i->data))
3623 11494 : destroy_delta(ATOMIC_PTR_GET(&i->data), true);
3624 11494 : ATOMIC_PTR_SET(&i->data, NULL);
3625 11494 : return LOG_OK;
3626 : }
3627 :
3628 : static int
3629 1961 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
3630 : {
3631 1961 : int ok = LOG_OK;
3632 1961 : assert(!isTempTable(i->t));
3633 1961 : if (ATOMIC_PTR_GET(&i->data)) {
3634 1885 : if (!tr->parent) /* don't write save point commits */
3635 1885 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3636 : }
3637 1961 : return ok;
3638 : }
3639 :
3640 : static int
3641 1961 : log_destroy_idx(sql_trans *tr, sql_change *change)
3642 : {
3643 1961 : sql_idx *i = (sql_idx*)change->obj;
3644 1961 : int res = log_destroy_idx_(tr, i);
3645 1961 : change->obj = NULL;
3646 1961 : idx_destroy(tr->store, i);
3647 1961 : return res;
3648 : }
3649 :
3650 : static int
3651 27967 : destroy_del(sqlstore *store, sql_table *t)
3652 : {
3653 27967 : (void)store;
3654 27967 : if (ATOMIC_PTR_GET(&t->data))
3655 27963 : destroy_storage(ATOMIC_PTR_GET(&t->data));
3656 27967 : ATOMIC_PTR_SET(&t->data, NULL);
3657 27967 : return LOG_OK;
3658 : }
3659 :
3660 : static int
3661 3289 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
3662 : {
3663 3289 : gdk_return ok = GDK_SUCCEED;
3664 :
3665 3289 : sqlstore *store = tr->store;
3666 3289 : if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
3667 3289 : bat && bat->cs.bid)
3668 3289 : ok = log_bat_transient(store->logger, id);
3669 3289 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3670 : }
3671 :
3672 : static int
3673 3289 : log_destroy_del(sql_trans *tr, sql_change *change)
3674 : {
3675 3289 : int ok = LOG_OK;
3676 3289 : sql_table *t = (sql_table*)change->obj;
3677 :
3678 3289 : assert(!isTempTable(t));
3679 3289 : ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
3680 3289 : return ok;
3681 : }
3682 :
3683 : static int
3684 24733 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3685 : {
3686 24733 : (void)tr;
3687 24733 : (void)change;
3688 24733 : (void)commit_ts;
3689 24733 : (void)oldest;
3690 24733 : if (commit_ts)
3691 24710 : change->handled = true;
3692 24733 : return 0;
3693 : }
3694 :
3695 : static int
3696 3360 : drop_del(sql_trans *tr, sql_table *t)
3697 : {
3698 3360 : int ok = LOG_OK;
3699 :
3700 3360 : if (!isNew(t)) {
3701 3360 : storage *bat = ATOMIC_PTR_GET(&t->data);
3702 3425 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, isTempTable(t) ? NULL : &log_destroy_del);
3703 : }
3704 3360 : return ok;
3705 : }
3706 :
3707 : static int
3708 19410 : drop_col(sql_trans *tr, sql_column *c)
3709 : {
3710 19410 : assert(!isNew(c));
3711 19410 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
3712 19410 : trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, isTempTable(c->t) ? NULL : &log_destroy_col);
3713 19410 : return LOG_OK;
3714 : }
3715 :
3716 : static int
3717 1963 : drop_idx(sql_trans *tr, sql_idx *i)
3718 : {
3719 1963 : assert(!isNew(i));
3720 1963 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
3721 1963 : trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, isTempTable(i->t) ? NULL : &log_destroy_idx);
3722 1963 : return LOG_OK;
3723 : }
3724 :
3725 :
3726 : static BUN
3727 129592 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
3728 : {
3729 129592 : BAT *b;
3730 129592 : BUN sz = 0;
3731 :
3732 129592 : (void)tr;
3733 129592 : assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
3734 129592 : if (cs->bid && renew) {
3735 129588 : b = quick_descriptor(cs->bid);
3736 129554 : if (b) {
3737 129554 : sz += BATcount(b);
3738 129554 : if (cs->st == ST_DICT) {
3739 2 : bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
3740 2 : BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
3741 :
3742 2 : if (nebid == BID_NIL || !n) {
3743 0 : temp_destroy(nebid);
3744 0 : bat_destroy(n);
3745 0 : return BUN_NONE;
3746 : }
3747 2 : temp_destroy(cs->ebid);
3748 2 : cs->ebid = nebid;
3749 2 : if (!temp)
3750 2 : bat_set_access(n, BAT_READ);
3751 2 : temp_destroy(cs->bid);
3752 2 : cs->bid = temp_create(n); /* create empty copy */
3753 2 : bat_destroy(n);
3754 : } else {
3755 129552 : bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
3756 :
3757 129483 : if (nbid == BID_NIL)
3758 : return BUN_NONE;
3759 129483 : temp_destroy(cs->bid);
3760 129549 : cs->bid = nbid;
3761 : }
3762 : } else {
3763 : return BUN_NONE;
3764 : }
3765 : }
3766 129555 : if (cs->uibid) {
3767 129412 : temp_destroy(cs->uibid);
3768 129462 : cs->uibid = 0;
3769 : }
3770 129605 : if (cs->uvbid) {
3771 129461 : temp_destroy(cs->uvbid);
3772 129466 : cs->uvbid = 0;
3773 : }
3774 129610 : cs->cleared = true;
3775 129610 : cs->ucnt = 0;
3776 129610 : return sz;
3777 : }
3778 :
3779 : static BUN
3780 129418 : clear_col(sql_trans *tr, sql_column *c, bool renew)
3781 : {
3782 129418 : bool update_conflict = false;
3783 129418 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
3784 :
3785 129418 : if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
3786 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3787 129459 : assert(c->t->persistence != SQL_DECLARED_TABLE);
3788 129459 : if (odelta != delta)
3789 129464 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
3790 129450 : if (delta)
3791 129450 : return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
3792 : return 0;
3793 : }
3794 :
3795 : static BUN
3796 21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
3797 : {
3798 21 : bool update_conflict = false;
3799 21 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
3800 :
3801 21 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
3802 15 : return 0;
3803 6 : if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
3804 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3805 6 : assert(i->t->persistence != SQL_DECLARED_TABLE);
3806 6 : if (odelta != delta)
3807 6 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
3808 6 : if (delta)
3809 6 : return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
3810 : return 0;
3811 : }
3812 :
3813 : static int
3814 142 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
3815 : {
3816 142 : if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
3817 : return LOG_ERR;
3818 142 : if (s->segs)
3819 142 : destroy_segments(s->segs);
3820 142 : if (!(s->segs = new_segments(tr, 0)))
3821 : return LOG_ERR;
3822 : return LOG_OK;
3823 : }
3824 :
3825 :
3826 : /*
3827 : * Clear the table, in general this means replacing the storage,
3828 : * but in case of earlier deletes (or inserts by this transaction), we only mark
3829 : * all segments as deleted.
3830 : * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
3831 : */
3832 : static BUN
3833 41832 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
3834 : {
3835 41832 : int clear = !in_transaction, ok = LOG_OK;
3836 41832 : bool conflict = false;
3837 41832 : storage *bat;
3838 :
3839 41883 : if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
3840 15802 : return conflict?BUN_NONE-1:BUN_NONE;
3841 :
3842 26029 : if (!clear) {
3843 51 : lock_table(tr->store, t->base.id);
3844 51 : ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
3845 51 : unlock_table(tr->store, t->base.id);
3846 : }
3847 26029 : assert(t->persistence != SQL_DECLARED_TABLE);
3848 26029 : if (!in_transaction)
3849 25981 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
3850 26026 : if (ok == LOG_ERR)
3851 : return BUN_NONE;
3852 26026 : if (ok == LOG_CONFLICT)
3853 0 : return BUN_NONE - 1;
3854 : return LOG_OK;
3855 : }
3856 :
3857 : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
3858 : static BUN
3859 41808 : clear_table(sql_trans *tr, sql_table *t)
3860 : {
3861 41808 : node *n = ol_first_node(t->columns);
3862 41808 : sql_column *c = n->data;
3863 41808 : storage *d = tab_timestamp_storage(tr, t);
3864 41829 : int in_transaction, clear;
3865 41829 : BUN sz, clear_ok;
3866 :
3867 41829 : if (!d)
3868 : return BUN_NONE;
3869 41829 : in_transaction = segments_in_transaction(tr, t);
3870 41827 : clear = !in_transaction;
3871 41827 : sz = count_col(tr, c, CNT_ACTIVE);
3872 41832 : if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
3873 : return clear_ok;
3874 :
3875 26027 : if (in_transaction)
3876 : return sz;
3877 :
3878 155435 : for (; n; n = n->next) {
3879 129456 : c = n->data;
3880 :
3881 129456 : if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
3882 0 : return clear_ok;
3883 : }
3884 25979 : if (t->idxs) {
3885 26000 : for (n = ol_first_node(t->idxs); n; n = n->next) {
3886 21 : sql_idx *ci = n->data;
3887 :
3888 21 : if (isTable(ci->t) && idx_has_column(ci->type) &&
3889 21 : (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
3890 0 : return clear_ok;
3891 : }
3892 : }
3893 25979 : if (clear)
3894 25979 : d->segs->nr_reused = 0;
3895 25979 : return sz;
3896 : }
3897 :
3898 : static int
3899 158888 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
3900 : {
3901 158888 : sqlstore *store = tr->store;
3902 158888 : gdk_return ok = GDK_SUCCEED;
3903 :
3904 158888 : (void) t;
3905 158888 : (void) segs;
3906 158888 : if (GDKinmemory(0))
3907 : return LOG_OK;
3908 :
3909 158881 : if (cs->cleared) {
3910 155496 : assert(cs->ucnt == 0);
3911 155496 : BAT *ins = temp_descriptor(cs->bid);
3912 155496 : if (!ins)
3913 : return LOG_ERR;
3914 155496 : assert(!isEbat(ins));
3915 155496 : bat_set_access(ins, BAT_READ);
3916 155496 : ok = log_bat_persists(store->logger, ins, id);
3917 155496 : bat_destroy(ins);
3918 155496 : if (ok == GDK_SUCCEED && cs->ebid) {
3919 56 : BAT *ins = temp_descriptor(cs->ebid);
3920 56 : if (!ins)
3921 : return LOG_ERR;
3922 56 : assert(!isEbat(ins));
3923 56 : bat_set_access(ins, BAT_READ);
3924 56 : ok = log_bat_persists(store->logger, ins, -id);
3925 56 : bat_destroy(ins);
3926 : }
3927 155496 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3928 : }
3929 :
3930 3385 : assert(!isTempTable(t));
3931 :
3932 3385 : if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
3933 2800 : BAT *ui = temp_descriptor(cs->uibid);
3934 2800 : BAT *uv = temp_descriptor(cs->uvbid);
3935 : /* any updates */
3936 2800 : if (ui == NULL || uv == NULL) {
3937 : ok = GDK_FAIL;
3938 2800 : } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
3939 2800 : ok = log_delta(store->logger, ui, uv, id);
3940 2800 : bat_destroy(ui);
3941 2800 : bat_destroy(uv);
3942 : }
3943 2800 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3944 : }
3945 :
3946 : static inline int
3947 59151 : tr_log_table_start(sql_trans *tr, sql_table *t) {
3948 59151 : sqlstore *store = tr->store;
3949 59151 : return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3950 : }
3951 :
3952 : static inline int
3953 59151 : tr_log_table_end(sql_trans *tr, sql_table *t) {
3954 59151 : sqlstore *store = tr->store;
3955 59151 : return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3956 : }
3957 :
3958 : static int
3959 59151 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
3960 : {
3961 59151 : sqlstore *store = tr->store;
3962 59151 : gdk_return ok = GDK_SUCCEED;
3963 :
3964 59151 : size_t end = segs_end(segs, tr, t);
3965 :
3966 59151 : if (tr_log_table_start(tr, t) != LOG_OK)
3967 : return LOG_ERR;
3968 :
3969 59151 : size_t nr_appends = 0;
3970 :
3971 59151 : lock_table(tr->store, t->base.id);
3972 418515 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3973 359364 : unlock_table(tr->store, t->base.id);
3974 :
3975 359364 : if (seg->ts == tr->tid && seg->end-seg->start) {
3976 120452 : if (!seg->deleted) {
3977 48056 : if (log_segment(tr, seg, t->base.id) != LOG_OK)
3978 : return LOG_ERR;
3979 :
3980 48056 : nr_appends += (seg->end - seg->start);
3981 : }
3982 : }
3983 359364 : lock_table(tr->store, t->base.id);
3984 : }
3985 59151 : unlock_table(tr->store, t->base.id);
3986 :
3987 411223 : for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
3988 352072 : sql_column *c = n->data;
3989 352072 : column_storage *cs = ATOMIC_PTR_GET(&c->data);
3990 :
3991 352072 : if (cs->cleared) {
3992 3 : ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
3993 3 : continue;
3994 : }
3995 :
3996 352069 : lock_table(tr->store, t->base.id);
3997 352069 : if (!cs->cleared) {
3998 2460206 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
3999 2108137 : unlock_table(tr->store, t->base.id);
4000 2108137 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4001 : /* append col*/
4002 271642 : BAT *ins = temp_descriptor(cs->bid);
4003 271642 : if (ins == NULL)
4004 : return LOG_ERR;
4005 271642 : assert(BATcount(ins) >= cur->end);
4006 271642 : ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
4007 271642 : bat_destroy(ins);
4008 : }
4009 2108137 : lock_table(tr->store, t->base.id);
4010 : }
4011 : }
4012 352069 : unlock_table(tr->store, t->base.id);
4013 :
4014 352069 : if (ok == GDK_SUCCEED && cs->ebid) {
4015 19 : BAT *ins = temp_descriptor(cs->ebid);
4016 19 : if (ins == NULL)
4017 : return LOG_ERR;
4018 19 : if (BATcount(ins) > ins->batInserted)
4019 17 : ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
4020 19 : BATcommit(ins, BATcount(ins));
4021 19 : bat_destroy(ins);
4022 : }
4023 : }
4024 :
4025 59151 : if (t->idxs) {
4026 64335 : for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
4027 5184 : sql_idx *i = n->data;
4028 :
4029 5184 : if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
4030 4406 : continue;
4031 778 : column_storage *cs = ATOMIC_PTR_GET(&i->data);
4032 :
4033 778 : if (cs) {
4034 778 : if (cs->cleared) {
4035 0 : ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4036 0 : continue;
4037 : }
4038 :
4039 778 : lock_table(tr->store, t->base.id);
4040 2387 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4041 1609 : unlock_table(tr->store, t->base.id);
4042 1609 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4043 : /* append idx */
4044 730 : BAT *ins = temp_descriptor(cs->bid);
4045 730 : if (ins == NULL)
4046 : return LOG_ERR;
4047 730 : assert(BATcount(ins) >= cur->end);
4048 730 : ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
4049 730 : bat_destroy(ins);
4050 : }
4051 1609 : lock_table(tr->store, t->base.id);
4052 : }
4053 778 : unlock_table(tr->store, t->base.id);
4054 : }
4055 : }
4056 : }
4057 :
4058 59151 : if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
4059 0 : return LOG_ERR;
4060 :
4061 : return LOG_OK;
4062 : }
4063 :
4064 : static int
4065 85124 : log_storage(sql_trans *tr, sql_table *t, storage *s)
4066 : {
4067 85124 : int ok = LOG_OK;
4068 85124 : bool cleared = s->cs.cleared;
4069 85124 : if (ok == LOG_OK && cleared)
4070 25973 : ok = tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
4071 25973 : if (ok == LOG_OK)
4072 85124 : ok = log_segments(tr, s->segs, t->base.id);
4073 85124 : if (ok == LOG_OK && !cleared)
4074 59151 : ok = log_table_append(tr, t, s->segs);
4075 85124 : return ok;
4076 : }
4077 :
4078 : static void
4079 424275 : merge_cs( column_storage *cs, const char* caller)
4080 : {
4081 424275 : if (cs->bid && cs->ucnt) {
4082 2809 : BAT *cur = temp_descriptor(cs->bid);
4083 2809 : BAT *ui = temp_descriptor(cs->uibid);
4084 2809 : BAT *uv = temp_descriptor(cs->uvbid);
4085 :
4086 2809 : if (!cur || !ui || !uv) {
4087 0 : bat_destroy(ui);
4088 0 : bat_destroy(uv);
4089 0 : bat_destroy(cur);
4090 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4091 : return;
4092 : }
4093 2809 : assert(BATcount(ui) == BATcount(uv));
4094 :
4095 : /* any updates */
4096 2809 : assert(!isEbat(cur));
4097 2809 : if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
4098 0 : bat_destroy(ui);
4099 0 : bat_destroy(uv);
4100 0 : bat_destroy(cur);
4101 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4102 : return;
4103 : }
4104 : /* cleanup the old deltas */
4105 2809 : temp_destroy(cs->uibid);
4106 2809 : temp_destroy(cs->uvbid);
4107 2809 : cs->uibid = e_bat(TYPE_oid);
4108 2809 : cs->uvbid = e_bat(cur->ttype);
4109 2809 : assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
4110 2809 : cs->ucnt = 0;
4111 2809 : bat_destroy(ui);
4112 2809 : bat_destroy(uv);
4113 2809 : bat_destroy(cur);
4114 : }
4115 424275 : cs->cleared = false;
4116 424275 : cs->merged = true;
4117 424275 : return;
4118 : }
4119 :
4120 : static lng
4121 381910 : merge_delta( sql_delta *obat)
4122 : {
4123 381910 : lng res = 0;
4124 381910 : if (obat && obat->next && !obat->cs.merged)
4125 132922 : res += merge_delta(obat->next);
4126 381910 : res += obat->cs.ucnt;
4127 381910 : merge_cs(&obat->cs, __func__);
4128 381910 : return res;
4129 : }
4130 :
4131 : static void
4132 42365 : merge_storage(storage *tdb)
4133 : {
4134 42365 : merge_cs(&tdb->cs, __func__);
4135 :
4136 42365 : if (tdb->next) {
4137 276 : destroy_storage(tdb->next);
4138 276 : tdb->next = NULL;
4139 : }
4140 42365 : }
4141 :
4142 : static sql_delta *
4143 1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
4144 : {
4145 : /* commit ie copy back to the parent transaction */
4146 1 : if (delta && delta->cs.ts == commit_ts && delta->next) {
4147 1 : sql_delta *od = delta->next;
4148 1 : if (od->cs.ts == commit_ts) {
4149 0 : sql_delta t = *od, *n = od->next;
4150 0 : *od = *delta;
4151 0 : od->next = n;
4152 0 : *delta = t;
4153 0 : delta->next = NULL;
4154 0 : destroy_delta(delta, true);
4155 0 : return od;
4156 : }
4157 : }
4158 : return delta;
4159 : }
4160 :
4161 : static int
4162 132886 : log_update_col( sql_trans *tr, sql_change *change)
4163 : {
4164 132886 : sql_column *c = (sql_column*)change->obj;
4165 132886 : assert(!isTempTable(c->t));
4166 :
4167 132886 : if (isDeleted(c->t)) {
4168 0 : change->handled = true;
4169 0 : return LOG_OK;
4170 : }
4171 :
4172 132886 : if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
4173 132886 : storage *s = ATOMIC_PTR_GET(&c->t->data);
4174 132886 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
4175 132886 : return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
4176 : }
4177 : return LOG_OK;
4178 : }
4179 :
4180 : static int
4181 217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
4182 : {
4183 217 : sqlstore *store = Store;
4184 :
4185 217 : sql_delta *d = (sql_delta*)change->data;
4186 217 : if (d->cs.ts < oldest) {
4187 82 : destroy_delta(d, false);
4188 82 : if (change->commit == &commit_update_idx)
4189 2 : table_destroy(store, ((sql_idx*)change->obj)->t);
4190 : else
4191 80 : table_destroy(store, ((sql_column*)change->obj)->t);
4192 82 : return 1;
4193 : }
4194 135 : if (d->cs.ts > TRANSACTION_ID_BASE)
4195 82 : d->cs.ts = store_get_timestamp(store) + 1;
4196 : return 0;
4197 : }
4198 :
4199 : static int
4200 9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
4201 : {
4202 9 : sqlstore *store = Store;
4203 :
4204 9 : storage *d = (storage*)change->data;
4205 9 : if (d->cs.ts < oldest) {
4206 3 : destroy_storage(d);
4207 3 : table_destroy(store, (sql_table*)change->obj);
4208 3 : return 1;
4209 : }
4210 6 : if (d->cs.ts > TRANSACTION_ID_BASE)
4211 3 : d->cs.ts = store_get_timestamp(store) + 1;
4212 : return 0;
4213 : }
4214 :
4215 : static int
4216 133004 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
4217 : {
4218 133004 : (void) type; // TODO transaction_layer_revamp remove if remains unused
4219 :
4220 133004 : sql_delta *delta = ATOMIC_PTR_GET(data), *idelta = delta;
4221 :
4222 133004 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4223 3 : int ok = LOG_OK;
4224 3 : assert(isTempTable(t));
4225 3 : if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
4226 0 : ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
4227 3 : if (!tr->parent)
4228 0 : t->base.new = base->new = 0;
4229 3 : change->handled = true;
4230 3 : return ok;
4231 : }
4232 :
4233 133001 : if (commit_ts)
4234 132919 : delta->cs.ts = commit_ts;
4235 133001 : if (!commit_ts) { /* rollback */
4236 82 : sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
4237 :
4238 82 : if (change->ts && t->base.new) /* handled by create col */
4239 : return LOG_OK;
4240 82 : if (o != d) {
4241 0 : while(o && o->next != d)
4242 : o = o->next;
4243 : }
4244 82 : if (o == ATOMIC_PTR_GET(data))
4245 82 : ATOMIC_PTR_SET(data, d->next);
4246 : else
4247 0 : o->next = d->next;
4248 82 : d->next = NULL;
4249 82 : change->cleanup = &tc_gc_rollbacked;
4250 132919 : } else if (!tr->parent) {
4251 : /* merge deltas */
4252 272827 : while (delta && delta->cs.ts > oldest)
4253 139909 : delta = delta->next;
4254 132918 : if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
4255 26551 : lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
4256 26551 : idelta->nr_updates += merge_delta(delta);
4257 26551 : unlock_column(tr->store, base->id);
4258 : }
4259 1 : } else if (tr->parent) /* move delta into older and cleanup current save points */
4260 1 : ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
4261 : return LOG_OK;
4262 : }
4263 :
4264 : static int
4265 132976 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4266 : {
4267 :
4268 132976 : sql_column *c = (sql_column*)change->obj;
4269 132976 : sql_base* base = &c->base;
4270 132976 : sql_table* t = c->t;
4271 132976 : ATOMIC_PTR_TYPE* data = &c->data;
4272 132976 : int type = c->type.type->localtype;
4273 :
4274 132976 : if (change->handled || isDeleted(c->t))
4275 : return LOG_OK;
4276 :
4277 132976 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4278 : }
4279 :
4280 : static int
4281 26 : log_update_idx( sql_trans *tr, sql_change *change)
4282 : {
4283 26 : sql_idx *i = (sql_idx*)change->obj;
4284 26 : assert(!isTempTable(i->t));
4285 :
4286 26 : if (isDeleted(i->t)) {
4287 0 : change->handled = true;
4288 0 : return LOG_OK;
4289 : }
4290 :
4291 26 : if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
4292 26 : storage *s = ATOMIC_PTR_GET(&i->t->data);
4293 26 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
4294 26 : return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
4295 : }
4296 : return LOG_OK;
4297 : }
4298 :
4299 : static int
4300 28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4301 : {
4302 28 : sql_idx *i = (sql_idx*)change->obj;
4303 28 : sql_base* base = &i->base;
4304 28 : sql_table* t = i->t;
4305 28 : ATOMIC_PTR_TYPE* data = &i->data;
4306 28 : int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
4307 :
4308 28 : if (change->handled || isDeleted(i->t))
4309 : return LOG_OK;
4310 :
4311 28 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4312 : }
4313 :
4314 : static storage *
4315 26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
4316 : {
4317 26 : if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
4318 0 : storage *od = dbat->next;
4319 0 : if (od->cs.ts == commit_ts) {
4320 0 : storage t = *od, *n = od->next;
4321 0 : *od = *dbat;
4322 0 : od->next = n;
4323 0 : *dbat = t;
4324 0 : dbat->next = NULL;
4325 0 : destroy_storage(dbat);
4326 0 : return od;
4327 : }
4328 : }
4329 : return dbat;
4330 : }
4331 :
4332 : static int
4333 85124 : log_update_del( sql_trans *tr, sql_change *change)
4334 : {
4335 85124 : sql_table *t = (sql_table*)change->obj;
4336 85124 : assert(!isTempTable(t));
4337 :
4338 85124 : if (isDeleted(t)) {
4339 0 : change->handled = true;
4340 0 : return LOG_OK;
4341 : }
4342 :
4343 85124 : if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
4344 85124 : return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
4345 : return LOG_OK;
4346 : }
4347 :
4348 : static int
4349 89649 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4350 : {
4351 89649 : int ok = LOG_OK;
4352 89649 : sql_table *t = (sql_table*)change->obj;
4353 89649 : storage *dbat = ATOMIC_PTR_GET(&t->data);
4354 :
4355 89649 : if (change->handled || isDeleted(t))
4356 : return ok;
4357 :
4358 89649 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4359 22 : assert(isTempTable(t));
4360 22 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
4361 22 : if (commit_ts) dbat->segs->h->ts = commit_ts;
4362 22 : change->handled = true;
4363 22 : return ok;
4364 : }
4365 :
4366 89627 : lock_table(tr->store, t->base.id);
4367 89627 : if (!commit_ts) { /* rollback */
4368 4203 : if (dbat->cs.ts == tr->tid) {
4369 6 : if (change->ts && t->base.new) { /* handled by the create table */
4370 3 : unlock_table(tr->store, t->base.id);
4371 3 : return ok;
4372 : }
4373 3 : storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
4374 :
4375 3 : if (o != d) {
4376 0 : while(o && o->next != d)
4377 : o = o->next;
4378 : }
4379 3 : if (o == ATOMIC_PTR_GET(&t->data)) {
4380 3 : assert(d->next);
4381 3 : ATOMIC_PTR_SET(&t->data, d->next);
4382 : } else
4383 0 : o->next = d->next;
4384 3 : d->next = NULL;
4385 3 : change->cleanup = &tc_gc_rollbacked_storage;
4386 : } else
4387 4197 : rollback_segments(dbat->segs, tr, change, oldest);
4388 85424 : } else if (ok == LOG_OK && !tr->parent) {
4389 85398 : if (dbat->cs.ts == tr->tid) /* cleared table */
4390 25975 : dbat->cs.ts = commit_ts;
4391 :
4392 85398 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
4393 85398 : if (ok == LOG_OK) {
4394 85398 : merge_segments(dbat, tr, change, commit_ts, oldest);
4395 85398 : if (oldest == commit_ts)
4396 42365 : merge_storage(dbat);
4397 : }
4398 85398 : if (dbat)
4399 85398 : dbat->cs.cleared = false;
4400 26 : } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
4401 26 : merge_segments(dbat, tr, change, commit_ts, oldest);
4402 26 : ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
4403 26 : storage *s = change->data;
4404 26 : if (s->cs.ts == tr->tid)
4405 0 : s->cs.ts = commit_ts;
4406 : }
4407 89624 : unlock_table(tr->store, t->base.id);
4408 89624 : return ok;
4409 : }
4410 :
4411 : /* only rollback (content version) case for now */
4412 : static int
4413 19570 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
4414 : {
4415 19570 : sqlstore *store = Store;
4416 19570 : sql_column *c = (sql_column*)change->obj;
4417 :
4418 19570 : if (!c) /* cleaned earlier */
4419 : return 1;
4420 :
4421 175 : if (change->handled || isDeleted(c->t)) {
4422 0 : column_destroy(store, c);
4423 0 : return 1;
4424 : }
4425 :
4426 : /* savepoint commit (did it merge ?) */
4427 175 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4428 0 : column_destroy(store, c);
4429 0 : return 1;
4430 : }
4431 175 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4432 : return 0;
4433 174 : sql_delta *d = (sql_delta*)change->data, *id = d;
4434 174 : if (d && d->next) {
4435 :
4436 66 : if (d->cs.ts > oldest)
4437 : return LOG_OK; /* cannot cleanup yet */
4438 :
4439 : // d is oldest reachable delta
4440 63 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4441 62 : destroy_delta(d->next, true);
4442 62 : d->next = NULL;
4443 : }
4444 63 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4445 63 : id->nr_updates += merge_delta(d);
4446 63 : unlock_column(store, c->base.id);
4447 : }
4448 171 : column_destroy(store, c);
4449 171 : return 1;
4450 : }
4451 :
4452 : static int
4453 791741 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
4454 : {
4455 791741 : sqlstore *store = Store;
4456 791741 : sql_column *c = (sql_column*)change->obj;
4457 :
4458 791741 : if (!c) /* cleaned earlier */
4459 : return 1;
4460 :
4461 791741 : if (change->handled || isDeleted(c->t)) {
4462 3 : table_destroy(store, c->t);
4463 3 : return 1;
4464 : }
4465 :
4466 : /* savepoint commit (did it merge ?) */
4467 791738 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4468 30183 : table_destroy(store, c->t);
4469 30183 : return 1;
4470 : }
4471 761555 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4472 : return 0;
4473 761554 : sql_delta *d = (sql_delta*)change->data, *id = d;
4474 761554 : if (d && d->next) {
4475 :
4476 761554 : if (d->cs.ts > oldest)
4477 : return LOG_OK; /* cannot cleanup yet */
4478 :
4479 : // d is oldest reachable delta
4480 102646 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4481 4327 : destroy_delta(d->next, true);
4482 4327 : d->next = NULL;
4483 : }
4484 102646 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4485 102646 : id->nr_updates += merge_delta(d);
4486 102646 : unlock_column(store, c->base.id);
4487 : }
4488 102646 : table_destroy(store, c->t);
4489 102646 : return 1;
4490 : }
4491 :
4492 : static int
4493 2595 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
4494 : {
4495 2595 : sqlstore *store = Store;
4496 2595 : sql_idx *i = (sql_idx*)change->obj;
4497 :
4498 2595 : if (!i) /* cleaned earlier */
4499 : return 1;
4500 :
4501 634 : if (change->handled || isDeleted(i->t)) {
4502 0 : idx_destroy(store, i);
4503 0 : return 1;
4504 : }
4505 :
4506 : /* savepoint commit (did it merge ?) */
4507 634 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4508 0 : idx_destroy(store, i);
4509 0 : return 1;
4510 : }
4511 634 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4512 : return 0;
4513 634 : sql_delta *d = (sql_delta*)change->data, *id = d;
4514 634 : if (d && d->next) {
4515 0 : if (d->cs.ts > oldest)
4516 : return LOG_OK; /* cannot cleanup yet */
4517 :
4518 : // d is oldest reachable delta
4519 0 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4520 0 : destroy_delta(d->next, true);
4521 0 : d->next = NULL;
4522 : }
4523 0 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4524 0 : id->nr_updates += merge_delta(d);
4525 0 : unlock_column(store, i->base.id);
4526 : }
4527 634 : idx_destroy(store, i);
4528 634 : return 1;
4529 : }
4530 :
4531 : static int
4532 26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
4533 : {
4534 26 : sqlstore *store = Store;
4535 26 : sql_idx *i = (sql_idx*)change->obj;
4536 :
4537 26 : if (!i) /* cleaned earlier */
4538 : return 1;
4539 :
4540 26 : if (change->handled || isDeleted(i->t)) {
4541 0 : table_destroy(store, i->t);
4542 0 : return 1;
4543 : }
4544 :
4545 : /* savepoint commit (did it merge ?) */
4546 26 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4547 0 : table_destroy(store, i->t);
4548 0 : return 1;
4549 : }
4550 26 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4551 : return 0;
4552 26 : sql_delta *d = (sql_delta*)change->data, *id = d;
4553 26 : if (d && d->next) {
4554 26 : if (d->cs.ts > oldest)
4555 : return LOG_OK; /* cannot cleanup yet */
4556 :
4557 : // d is oldest reachable delta
4558 26 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4559 26 : destroy_delta(d->next, true);
4560 26 : d->next = NULL;
4561 : }
4562 26 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4563 26 : id->nr_updates += merge_delta(d);
4564 26 : unlock_column(store, i->base.id);
4565 : }
4566 26 : table_destroy(store, i->t);
4567 26 : return 1;
4568 : }
4569 :
4570 : static int
4571 232544 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
4572 : {
4573 232544 : sqlstore *store = Store;
4574 232544 : sql_table *t = (sql_table*)change->obj;
4575 :
4576 232544 : if (change->handled || isDeleted(t)) {
4577 3534 : table_destroy(store, t);
4578 3534 : return 1;
4579 : }
4580 : /* savepoint commit (did it merge ?) */
4581 229010 : if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
4582 6151 : table_destroy(store, t);
4583 6151 : return 1;
4584 : }
4585 222859 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4586 : return 0;
4587 222831 : storage *d = (storage*)change->data;
4588 222831 : if (d->next) {
4589 150783 : if (d->cs.ts > oldest)
4590 : return LOG_OK; /* cannot cleanup yet */
4591 :
4592 19549 : destroy_storage(d->next);
4593 19549 : d->next = NULL;
4594 : }
4595 91597 : table_destroy(store, t);
4596 91597 : return 1;
4597 : }
4598 :
4599 : static int
4600 30558 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
4601 : {
4602 30558 : if (nr == 0)
4603 : return LOG_OK;
4604 30558 : assert (nr > 0);
4605 30558 : if ((!offsets || !*offsets) && nr == total) {
4606 30497 : *offset = slot;
4607 30497 : return LOG_OK;
4608 : }
4609 61 : if (!*offsets) {
4610 7 : *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
4611 7 : if (!*offsets)
4612 : return LOG_ERR;
4613 : }
4614 61 : oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
4615 16200 : for(size_t i = 0; i < nr; i++)
4616 16139 : dst[i] = slot + i;
4617 61 : (*offsets)->batCount += nr;
4618 61 : (*offsets)->theap->dirty = true;
4619 61 : return LOG_OK;
4620 : }
4621 :
4622 : static int
4623 30402 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4624 : {
4625 30402 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4626 30640 : assert(s->segs);
4627 30640 : ulng oldest = store_oldest(tr->store, NULL);
4628 30398 : BUN slot = 0;
4629 30398 : size_t total = cnt;
4630 :
4631 30398 : if (!locked)
4632 30444 : lock_table(tr->store, t->base.id);
4633 : /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
4634 61358 : for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4635 30755 : if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* reuse old deleted or rolled back append */
4636 35 : if ((seg->end - seg->start) >= cnt) {
4637 : /* if previous is claimed before we could simply adjust the end/start */
4638 13 : if (p && p->ts == tr->tid && !p->deleted) {
4639 2 : slot = p->end;
4640 2 : p->end += cnt;
4641 2 : seg->start += cnt;
4642 2 : if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
4643 : ok = LOG_ERR;
4644 : break;
4645 : }
4646 2 : s->segs->nr_reused += cnt;
4647 2 : cnt = 0;
4648 2 : break;
4649 : }
4650 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4651 11 : size_t rcnt = seg->end - seg->start;
4652 11 : if (rcnt > cnt)
4653 : rcnt = cnt;
4654 11 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
4655 : ok = LOG_ERR;
4656 : break;
4657 : }
4658 : }
4659 33 : seg->ts = tr->tid;
4660 33 : seg->deleted = false;
4661 33 : slot = seg->start;
4662 33 : if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
4663 : ok = LOG_ERR;
4664 : break;
4665 : }
4666 68 : s->segs->nr_reused += (seg->end - seg->start);
4667 68 : cnt -= (seg->end - seg->start);
4668 : }
4669 : }
4670 30605 : if (ok == LOG_OK && cnt) {
4671 30592 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4672 29469 : slot = s->segs->t->end;
4673 29469 : s->segs->t->end += cnt;
4674 : } else {
4675 1123 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4676 : ok = LOG_ERR;
4677 : } else {
4678 1158 : if (!s->segs->h)
4679 0 : s->segs->h = s->segs->t;
4680 1158 : slot = s->segs->t->start;
4681 : }
4682 : }
4683 30627 : if (ok == LOG_OK)
4684 30627 : ok = add_offsets(slot, cnt, total, offset, offsets);
4685 : }
4686 30526 : if (!locked)
4687 30494 : unlock_table(tr->store, t->base.id);
4688 :
4689 30677 : if (ok == LOG_OK) {
4690 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4691 30647 : if (!in_transaction) {
4692 1143 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4693 1143 : in_transaction = true;
4694 : }
4695 30647 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4696 30602 : tr->logchanges += (lng) total;
4697 30596 : if (*offsets) {
4698 0 : BAT *pos = *offsets;
4699 0 : assert(BATcount(pos) == total);
4700 0 : BATsetcount(pos, total); /* set other properties */
4701 7 : pos->tnil = false;
4702 7 : pos->tnonil = true;
4703 7 : pos->tkey = true;
4704 7 : pos->tsorted = true;
4705 7 : pos->trevsorted = false;
4706 : }
4707 : }
4708 30651 : return ok;
4709 : }
4710 :
4711 : static int
4712 2067364 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4713 : {
4714 2067364 : if (cnt > 1 && offsets)
4715 30409 : return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
4716 2036955 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4717 2036974 : assert(s->segs);
4718 2036974 : ulng oldest = store_oldest(tr->store, NULL);
4719 2036960 : BUN slot = 0;
4720 2036960 : int reused = 0;
4721 :
4722 2036960 : if (!locked)
4723 1911235 : lock_table(tr->store, t->base.id);
4724 : /* naive vacuum approach, iterator through segments, check for large enough deleted segments
4725 : * or create new segment at the end */
4726 9886174 : for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4727 7921903 : if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* reuse old deleted or rolled back append */
4728 :
4729 72705 : if ((seg->end - seg->start) >= cnt) {
4730 :
4731 : /* if previous is claimed before we could simply adjust the end/start */
4732 72705 : if (p && p->ts == tr->tid && !p->deleted) {
4733 56385 : slot = p->end;
4734 56385 : p->end += cnt;
4735 56385 : seg->start += cnt;
4736 56385 : s->segs->nr_reused += cnt;
4737 56385 : reused = 1;
4738 56385 : break;
4739 : }
4740 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4741 16320 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
4742 : ok = LOG_ERR;
4743 : break;
4744 : }
4745 : }
4746 16320 : seg->ts = tr->tid;
4747 16320 : seg->deleted = false;
4748 16320 : slot = seg->start;
4749 16320 : s->segs->nr_reused += cnt;
4750 16320 : reused = 1;
4751 16320 : break;
4752 : }
4753 : }
4754 2036976 : if (ok == LOG_OK && !reused) {
4755 1964272 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4756 1928932 : slot = s->segs->t->end;
4757 1928932 : s->segs->t->end += cnt;
4758 : } else {
4759 35340 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4760 : ok = LOG_ERR;
4761 : } else {
4762 35340 : if (!s->segs->h)
4763 0 : s->segs->h = s->segs->t;
4764 35340 : slot = s->segs->t->start;
4765 : }
4766 : }
4767 : }
4768 2036976 : if (!locked)
4769 1911251 : unlock_table(tr->store, t->base.id);
4770 :
4771 2036976 : if (ok == LOG_OK) {
4772 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4773 2036977 : if (!in_transaction) {
4774 49293 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4775 49293 : in_transaction = true;
4776 : }
4777 2036977 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4778 2036546 : tr->logchanges += (lng) cnt;
4779 2036977 : *offset = slot;
4780 : }
4781 : return ok;
4782 : }
4783 :
4784 : /*
4785 : * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
4786 : * of the global transaction and mark the newly added storage slots unused on the global
4787 : * level but used on the local transaction level. Besides this the local transaction needs
4788 : * to update (and mark unused) any slot in between the old end and new slots.
4789 : * */
4790 : static int
4791 1941588 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4792 : {
4793 1941588 : storage *s;
4794 :
4795 : /* we have a single segment structure for each persistent table
4796 : * for temporary tables each has its own */
4797 1941588 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4798 : return LOG_ERR;
4799 :
4800 1941726 : return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
4801 : }
4802 :
4803 : /* some tables cannot be updated concurrently (user/roles etc) */
4804 : static int
4805 125730 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4806 : {
4807 125730 : storage *s;
4808 125730 : int res = 0;
4809 :
4810 : /* we have a single segment structure for each persistent table
4811 : * for temporary tables each has its own */
4812 125730 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4813 : /* TODO check for other inserts ! */
4814 : return LOG_ERR;
4815 :
4816 125730 : lock_table(tr->store, t->base.id);
4817 125730 : if ((res = segments_conflict(tr, s->segs, 1))) {
4818 4 : unlock_table(tr->store, t->base.id);
4819 4 : return LOG_CONFLICT;
4820 : }
4821 125726 : res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
4822 125726 : unlock_table(tr->store, t->base.id);
4823 125726 : return res;
4824 : }
4825 :
4826 : static int
4827 22779 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
4828 : {
4829 22779 : storage *s;
4830 22779 : int res = 0;
4831 :
4832 22779 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4833 : return LOG_ERR;
4834 :
4835 22779 : lock_table(tr->store, t->base.id);
4836 22779 : res = segments_conflict(tr, s->segs, uncommitted);
4837 22779 : unlock_table(tr->store, t->base.id);
4838 22779 : return res ? LOG_CONFLICT : LOG_OK;
4839 : }
4840 :
4841 : static size_t
4842 1598101 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
4843 : {
4844 1598101 : size_t cnt = 0;
4845 :
4846 1948621 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
4847 : ;
4848 :
4849 3967738 : for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
4850 2369619 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
4851 157134 : cnt += s->end - s->start;
4852 : }
4853 1598119 : return cnt;
4854 : }
4855 :
4856 : static BAT *
4857 1598063 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
4858 : {
4859 1598063 : lock_table(tr->store, t->base.id);
4860 1598120 : segment *s = S->segs->h;
4861 : /* step one no deletes -> dense range */
4862 1598120 : uint32_t cur = 0;
4863 1598120 : size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
4864 1598118 : if (!dnr) {
4865 1467676 : unlock_table(tr->store, t->base.id);
4866 1467648 : return BATdense(start, start, end-start);
4867 : }
4868 :
4869 130442 : BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
4870 130442 : if (!b) {
4871 0 : unlock_table(tr->store, t->base.id);
4872 0 : return NULL;
4873 : }
4874 :
4875 130442 : uint32_t *restrict dst = Tloc(b, 0);
4876 62268096 : for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
4877 62204459 : if (s->end < start)
4878 263675 : continue;
4879 61940784 : if (s->start >= end)
4880 : break;
4881 61873979 : msk m = (SEG_IS_VALID(s, tr));
4882 61873979 : size_t lnr = s->end-s->start;
4883 61873979 : if (s->start < start)
4884 21533 : lnr -= (start - s->start);
4885 61873979 : if (s->end > end)
4886 15278 : lnr -= s->end - end;
4887 :
4888 61873979 : if (m) {
4889 1389991 : size_t used = pos&31, end = 32;
4890 1389991 : if (used) {
4891 1228113 : if (lnr < (32-used))
4892 745096 : end = used + lnr;
4893 1228113 : assert(end > used);
4894 1228113 : cur |= ((1U << (end - used)) - 1) << used;
4895 1228113 : lnr -= end - used;
4896 1228113 : pos += end - used;
4897 1228113 : if (end == 32) {
4898 483021 : *dst++ = cur;
4899 483021 : cur = 0;
4900 : }
4901 : }
4902 1389991 : size_t full = lnr/32;
4903 1389991 : size_t rest = lnr%32;
4904 1389991 : if (full > 0) {
4905 350642 : memset(dst, ~0, full * sizeof(*dst));
4906 350642 : dst += full;
4907 350642 : lnr -= full * 32;
4908 350642 : pos += full * 32;
4909 : }
4910 1389991 : if (rest > 0) {
4911 610117 : cur |= (1U << rest) - 1;
4912 610117 : lnr -= rest;
4913 610117 : pos += rest;
4914 : }
4915 1389991 : assert(lnr==0);
4916 : } else {
4917 60483988 : size_t used = pos&31, end = 32;
4918 60483988 : if (used) {
4919 58606790 : if (lnr < (32-used))
4920 56625527 : end = used + lnr;
4921 :
4922 58606790 : pos+= (end-used);
4923 58606790 : lnr-= (end-used);
4924 58606790 : if (end == 32) {
4925 1981269 : *dst++ = cur;
4926 1981269 : cur = 0;
4927 : }
4928 : }
4929 60483988 : size_t full = lnr/32;
4930 60483988 : size_t rest = lnr%32;
4931 60483988 : memset(dst, 0, full * sizeof(*dst));
4932 60483988 : dst += full;
4933 60483988 : lnr -= full * 32;
4934 60483988 : pos += full * 32;
4935 60483988 : pos+= rest;
4936 60483988 : lnr-= rest;
4937 60483988 : assert(lnr==0);
4938 : }
4939 : }
4940 :
4941 130442 : unlock_table(tr->store, t->base.id);
4942 130440 : if (pos%32)
4943 128162 : *dst=cur;
4944 130440 : BATsetcount(b, nr);
4945 130440 : bn = BATmaskedcands(start, nr, b, true);
4946 130437 : BBPreclaim(b);
4947 130430 : (void)pos;
4948 130430 : assert (pos == nr);
4949 : return bn;
4950 : }
4951 :
4952 : static void * /* BAT * */
4953 1604161 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
4954 : {
4955 : /* with nr_of_parts - part_nr we can adjust parts */
4956 1604161 : storage *s = tab_timestamp_storage(tr, t);
4957 :
4958 1604356 : if (!s)
4959 : return NULL;
4960 1604356 : size_t nr = segs_end(s->segs, tr, t);
4961 :
4962 1605070 : if (!nr)
4963 6968 : return BATdense(0, 0, 0);
4964 :
4965 : /* compute proper part */
4966 1598102 : size_t part_size = nr/nr_of_parts;
4967 1598102 : size_t start = part_size * part_nr;
4968 1598102 : size_t end = start + part_size;
4969 1598102 : if (part_nr == (nr_of_parts-1))
4970 1308444 : end = nr;
4971 1598102 : assert(end <= nr);
4972 1598102 : return segments2cands(s, tr, t, start, end);
4973 : }
4974 :
4975 : static int
4976 5 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
4977 : {
4978 5 : bool update_conflict = false;
4979 :
4980 5 : if (segments_in_transaction(tr, col->t))
4981 : return LOG_CONFLICT;
4982 :
4983 5 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
4984 :
4985 5 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
4986 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
4987 5 : assert(d && d->cs.ts == tr->tid);
4988 5 : if (odelta != d)
4989 5 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
4990 5 : if (d->cs.bid)
4991 5 : temp_destroy(d->cs.bid);
4992 5 : if (d->cs.uibid)
4993 5 : temp_destroy(d->cs.uibid);
4994 5 : if (d->cs.uvbid)
4995 5 : temp_destroy(d->cs.uvbid);
4996 5 : bat_set_access(bn, BAT_READ);
4997 5 : d->cs.bid = temp_create(bn);
4998 5 : d->cs.uibid = 0;
4999 5 : d->cs.uvbid = 0;
5000 5 : d->cs.ucnt = 0;
5001 5 : d->cs.cleared = true;
5002 5 : d->cs.ts = tr->tid;
5003 5 : ATOMIC_INIT(&d->cs.refcnt, 1);
5004 5 : return LOG_OK;
5005 : }
5006 :
5007 : static int
5008 5 : vacuum_col(sql_trans *tr, sql_column *c, bool force)
5009 : {
5010 5 : if (segments_in_transaction(tr, c->t))
5011 : return LOG_CONFLICT;
5012 :
5013 5 : sql_delta *d = NULL;
5014 :
5015 : /* do we have enough to clean */
5016 5 : if ((d = bind_col_data(tr, c, NULL)) == NULL)
5017 : return LOG_CONFLICT;
5018 :
5019 : /* do we have enough to clean */
5020 5 : if (!force && (d->nr_updates) < 1024)
5021 : return LOG_OK;
5022 :
5023 5 : BAT *b = NULL, *bn = NULL;;
5024 5 : if ((b = bind_col(tr, c, 0)) == NULL)
5025 : return LOG_ERR;
5026 5 : if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
5027 0 : BBPreclaim(b);
5028 0 : return LOG_ERR;
5029 : }
5030 5 : int res = swap_bats(tr, c, bn);
5031 5 : d->nr_updates = 0;
5032 5 : BBPreclaim(b);
5033 5 : BBPreclaim(bn);
5034 5 : return res;
5035 : }
5036 :
5037 : static int
5038 0 : vacuum_tab(sql_trans *tr, sql_table *t, bool force)
5039 : {
5040 0 : if (segments_in_transaction(tr, t))
5041 : return LOG_CONFLICT;
5042 :
5043 0 : storage *s;
5044 0 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
5045 : return LOG_ERR;
5046 :
5047 0 : for( node *n = ol_first_node(t->columns); n; n = n->next) {
5048 0 : sql_column *c = n->data;
5049 :
5050 0 : if (!ATOMvarsized(c->type.type->localtype))
5051 0 : continue;
5052 0 : sql_delta *d = NULL;
5053 :
5054 : /* do we have enough to clean */
5055 0 : if ((d = bind_col_data(tr, c, NULL)) == NULL)
5056 : return LOG_CONFLICT;
5057 :
5058 : /* do we have enough to clean */
5059 0 : if (!force && (d->nr_updates + s->segs->nr_reused) < 1024)
5060 0 : continue;
5061 :
5062 0 : BAT *b = NULL, *bn = NULL;;
5063 0 : if ((b = bind_col(tr, c, 0)) == NULL)
5064 : return LOG_ERR;
5065 0 : if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
5066 0 : BBPreclaim(b);
5067 0 : return LOG_ERR;
5068 : }
5069 0 : int res = swap_bats(tr, c, bn);
5070 0 : d->nr_updates = 0;
5071 0 : BBPreclaim(b);
5072 0 : BBPreclaim(bn);
5073 0 : if (res != LOG_OK)
5074 0 : return res;
5075 : }
5076 0 : s->segs->nr_reused = 0;
5077 0 : return LOG_OK;
5078 : }
5079 :
5080 :
5081 : static int
5082 58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
5083 : {
5084 58 : bool update_conflict = false;
5085 :
5086 58 : if (segments_in_transaction(tr, col->t))
5087 : return LOG_CONFLICT;
5088 :
5089 58 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5090 :
5091 58 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5092 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5093 58 : assert(d && d->cs.ts == tr->tid);
5094 58 : assert(col->t->persistence != SQL_DECLARED_TABLE);
5095 58 : if (odelta != d)
5096 58 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
5097 :
5098 58 : d->cs.st = st;
5099 58 : d->cs.cleared = true;
5100 58 : if (d->cs.bid)
5101 58 : temp_destroy(d->cs.bid);
5102 58 : o = transfer_to_systrans(o);
5103 58 : if (o == NULL)
5104 : return LOG_ERR;
5105 58 : bat_set_access(o, BAT_READ);
5106 58 : d->cs.bid = temp_create(o);
5107 58 : if (u) {
5108 53 : if (d->cs.ebid)
5109 0 : temp_destroy(d->cs.ebid);
5110 53 : u = transfer_to_systrans(u);
5111 53 : if (u == NULL)
5112 : return LOG_ERR;
5113 53 : d->cs.ebid = temp_create(u);
5114 : }
5115 : return LOG_OK;
5116 : }
5117 :
5118 : void
5119 352 : bat_storage_init( store_functions *sf)
5120 : {
5121 352 : sf->bind_col = &bind_col;
5122 352 : sf->bind_updates = &bind_updates;
5123 352 : sf->bind_updates_idx = &bind_updates_idx;
5124 352 : sf->bind_idx = &bind_idx;
5125 352 : sf->bind_cands = &bind_cands;
5126 :
5127 352 : sf->claim_tab = &claim_tab;
5128 352 : sf->key_claim_tab = &key_claim_tab;
5129 352 : sf->tab_validate = &tab_validate;
5130 :
5131 352 : sf->append_col = &append_col;
5132 352 : sf->append_idx = &append_idx;
5133 :
5134 352 : sf->update_col = &update_col;
5135 352 : sf->update_idx = &update_idx;
5136 :
5137 352 : sf->delete_tab = &delete_tab;
5138 :
5139 352 : sf->count_del = &count_del;
5140 352 : sf->count_col = &count_col;
5141 352 : sf->count_idx = &count_idx;
5142 352 : sf->dcount_col = &dcount_col;
5143 352 : sf->min_max_col = &min_max_col;
5144 352 : sf->set_stats_col = &set_stats_col;
5145 352 : sf->sorted_col = &sorted_col;
5146 352 : sf->unique_col = &unique_col;
5147 352 : sf->double_elim_col = &double_elim_col;
5148 352 : sf->col_stats = &col_stats;
5149 352 : sf->col_set_range = &col_set_range;
5150 352 : sf->col_not_null = &col_not_null;
5151 :
5152 352 : sf->col_dup = &col_dup;
5153 352 : sf->idx_dup = &idx_dup;
5154 352 : sf->del_dup = &del_dup;
5155 :
5156 352 : sf->create_col = &create_col; /* create and add to change list */
5157 352 : sf->create_idx = &create_idx;
5158 352 : sf->create_del = &create_del;
5159 :
5160 352 : sf->destroy_col = &destroy_col; /* free resources */
5161 352 : sf->destroy_idx = &destroy_idx;
5162 352 : sf->destroy_del = &destroy_del;
5163 :
5164 352 : sf->drop_col = &drop_col; /* add drop to change list */
5165 352 : sf->drop_idx = &drop_idx;
5166 352 : sf->drop_del = &drop_del;
5167 :
5168 352 : sf->clear_table = &clear_table;
5169 :
5170 352 : sf->vacuum_col = &vacuum_col;
5171 352 : sf->vacuum_tab = &vacuum_tab;
5172 352 : sf->col_compress = &col_compress;
5173 352 : }
|