Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "bat_storage.h"
15 : #include "bat_utils.h"
16 : #include "sql_string.h"
17 : #include "gdk_atoms.h"
18 : #include "gdk_atoms.h"
19 : #include "matomic.h"
20 :
21 : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
22 : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
23 :
24 : static int log_update_col( sql_trans *tr, sql_change *c);
25 : static int log_update_idx( sql_trans *tr, sql_change *c);
26 : static int log_update_del( sql_trans *tr, sql_change *c);
27 : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
28 : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
29 : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
30 : static int log_create_col(sql_trans *tr, sql_change *change);
31 : static int log_create_idx(sql_trans *tr, sql_change *change);
32 : static int log_create_del(sql_trans *tr, sql_change *change);
33 : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
34 : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
35 : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
36 : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
37 : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
38 : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
39 : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
40 : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
41 :
42 : static lng merge_delta( sql_delta *obat);
43 :
44 : /* valid
45 : * !deleted && VALID_4_READ(TS, tr) existing or newly created segment
46 : * deleted && TS > tr->ts && OLDTS < tr->ts deleted after current transaction
47 : */
48 :
49 : #define VALID_4_READ(TS,tr) \
50 : (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
51 :
52 : /* when changed, check if the old status is still valid */
53 : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
54 : (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
55 :
56 : #define SEG_VALID_4_DELETE(seg,tr) \
57 : (!seg->deleted && VALID_4_READ(seg->ts, tr))
58 :
59 : /* Delete (in current trans or by some other finished transaction, or re-used segment which used to be deleted */
60 : #define SEG_IS_DELETED(seg,tr) \
61 : ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
62 : (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
63 :
64 : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
65 : #define SEG_IS_VALID(seg, tr) \
66 : ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
67 : (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
68 :
69 : static inline BAT *
70 4955 : transfer_to_systrans(BAT *b)
71 : {
72 : /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
73 4955 : MT_lock_set(&b->theaplock);
74 4955 : if (VIEWtparent(b) || VIEWvtparent(b)) {
75 17 : MT_lock_unset(&b->theaplock);
76 17 : BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
77 17 : BBPreclaim(b);
78 17 : return bn;
79 : }
80 4938 : if (b->theap->farmid == TRANSIENT ||
81 14 : (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
82 4753 : QryCtx *qc = MT_thread_get_qry_ctx();
83 4753 : if (qc) {
84 2523 : if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
85 2523 : ATOMIC_SUB(&qc->datasize, b->theap->size);
86 2523 : b->theap->farmid = SYSTRANS;
87 2523 : b->batRole = SYSTRANS;
88 : }
89 2523 : if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
90 1092 : ATOMIC_SUB(&qc->datasize, b->tvheap->size);
91 1092 : b->tvheap->farmid = SYSTRANS;
92 : }
93 : }
94 : }
95 4938 : MT_lock_unset(&b->theaplock);
96 4938 : return b;
97 : }
98 :
99 : static void
100 25641912 : lock_table(sqlstore *store, sqlid id)
101 : {
102 25641912 : MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
103 25723955 : }
104 :
105 : static void
106 25724036 : unlock_table(sqlstore *store, sqlid id)
107 : {
108 25724036 : MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
109 25721901 : }
110 :
111 : static void
112 19817211 : lock_column(sqlstore *store, sqlid id)
113 : {
114 19817211 : MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
115 19822342 : }
116 :
117 : static void
118 19822787 : unlock_column(sqlstore *store, sqlid id)
119 : {
120 19822787 : MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
121 19825434 : }
122 :
123 : static void
124 113873 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
125 : {
126 113873 : assert(cleanup);
127 113873 : trans_add(tr, dup_base(b), data, cleanup, commit, log);
128 113868 : }
129 :
130 : static void
131 132763 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
132 : {
133 132763 : assert(cleanup);
134 132763 : dup_base(&t->base);
135 132764 : trans_add(tr, b, data, cleanup, commit, log);
136 132745 : }
137 :
138 : static int
139 92672 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
140 : {
141 92672 : segment *s = change->data;
142 :
143 92672 : if (s->ts <= oldest) {
144 34090 : while(s) {
145 21266 : segment *n = s->prev;
146 21266 : ATOMIC_PTR_DESTROY(&s->next);
147 21266 : _DELETE(s);
148 21266 : s = n;
149 : }
150 12824 : sqlstore *store = Store;
151 12824 : table_destroy(store, (sql_table*)change->obj);
152 12824 : return 1;
153 : }
154 : return LOG_OK;
155 : }
156 :
157 : static void
158 21266 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
159 : {
160 : /* we can only be accessed by anything older then commit_ts */
161 21266 : if (c->cleanup == &tc_gc_seg)
162 8442 : s->prev = c->data;
163 : else
164 12824 : c->cleanup = &tc_gc_seg;
165 21266 : c->data = s;
166 21266 : s->ts = commit_ts;
167 16868 : }
168 :
169 : static segment *
170 86647 : new_segment(segment *o, sql_trans *tr, size_t cnt)
171 : {
172 86647 : segment *n = (segment*)GDKmalloc(sizeof(segment));
173 :
174 86645 : assert(tr);
175 86645 : if (n) {
176 86645 : *n = (segment) {
177 86645 : .ts = tr->tid,
178 : .oldts = 0,
179 : .deleted = false,
180 : .start = 0,
181 : .end = cnt,
182 : .next = ATOMIC_PTR_VAR_INIT(NULL),
183 : .prev = NULL,
184 : };
185 86645 : if (o) {
186 36210 : n->start += o->end;
187 36210 : n->end += o->end;
188 36210 : ATOMIC_PTR_SET(&o->next, n);
189 : }
190 : }
191 86645 : return n;
192 : }
193 :
194 : static segment *
195 81065 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
196 : {
197 81065 : assert(tr);
198 81065 : if (o->start == start && o->end == start+cnt) {
199 10001 : assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
200 10001 : o->oldts = o->ts;
201 10001 : o->ts = tr->tid;
202 10001 : o->deleted = deleted;
203 10001 : return o;
204 : }
205 71064 : segment *n = (segment*)GDKmalloc(sizeof(segment));
206 :
207 71064 : if (!n)
208 : return NULL;
209 71064 : n->prev = NULL;
210 :
211 71064 : if (o->ts == tr->tid) {
212 2240 : n->oldts = 0;
213 2240 : n->ts = 1;
214 2240 : n->deleted = true;
215 : } else {
216 68824 : n->oldts = o->ts;
217 68824 : n->ts = tr->tid;
218 68824 : n->deleted = deleted;
219 : }
220 71064 : if (start == o->start) {
221 : /* 2-way split: o remains latter part of segment, new one is
222 : * inserted before */
223 57917 : n->start = o->start;
224 57917 : n->end = n->start + cnt;
225 57917 : ATOMIC_PTR_INIT(&n->next, o);
226 57917 : if (segs->h == o)
227 418 : segs->h = n;
228 57917 : if (p)
229 57499 : ATOMIC_PTR_SET(&p->next, n);
230 57917 : o->start = n->end;
231 13147 : } else if (start+cnt == o->end) {
232 : /* 2-way split: o remains first part of segment, new one is
233 : * added after */
234 5588 : n->start = o->end - cnt;
235 5588 : n->end = o->end;
236 5588 : ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
237 5588 : ATOMIC_PTR_SET(&o->next, n);
238 5588 : if (segs->t == o)
239 827 : segs->t = n;
240 5588 : o->end = n->start;
241 : } else {
242 : /* 3-way split: o remains first part of segment, two new ones
243 : * are added after */
244 7559 : segment *n2 = GDKmalloc(sizeof(segment));
245 7559 : if (n2 == NULL) {
246 0 : GDKfree(n);
247 0 : return NULL;
248 : }
249 7559 : ATOMIC_PTR_INIT(&n->next, n2);
250 7559 : n->start = start;
251 7559 : n->end = start + cnt;
252 7559 : *n2 = *o;
253 7559 : ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
254 7559 : n2->start = n->end;
255 7559 : n2->prev = NULL;
256 7559 : if (segs->t == o)
257 2453 : segs->t = n2;
258 7559 : ATOMIC_PTR_SET(&o->next, n);
259 7559 : o->end = start;
260 : }
261 : return n;
262 : }
263 :
264 : static void
265 4306 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
266 : {
267 4306 : segment *cur = segs->h, *seg = NULL;
268 18334 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
269 14028 : if (cur->ts == tr->tid) { /* revert */
270 4803 : cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
271 4803 : cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
272 4803 : cur->oldts = 0;
273 : }
274 14028 : if (cur->ts <= oldest) { /* possibly merge range */
275 13113 : if (!seg) { /* skip first */
276 : seg = cur;
277 8807 : } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
278 : /* merge with previous */
279 4398 : seg->end = cur->end;
280 4398 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
281 4398 : if (cur == segs->t)
282 2861 : segs->t = seg;
283 4398 : mark4destroy(cur, change, store_get_timestamp(tr->store));
284 4398 : cur = seg;
285 : } else {
286 : seg = cur; /* begin of new merge */
287 : }
288 : }
289 : }
290 4306 : }
291 :
292 : static size_t
293 103438 : segs_end_include_deleted( segments *segs, sql_trans *tr)
294 : {
295 103438 : size_t cnt = 0;
296 103438 : segment *s = segs->h, *l = NULL;
297 :
298 491927 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
299 388489 : if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
300 : l = s;
301 : }
302 103438 : if (l)
303 103431 : cnt = l->end;
304 103438 : return cnt;
305 : }
306 :
307 : static int
308 103438 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
309 : {
310 : /* set bits correctly */
311 103438 : BAT *b = temp_descriptor(cs->bid);
312 :
313 103438 : if (!b)
314 : return LOG_ERR;
315 103438 : segment *s = segs->h;
316 :
317 103438 : size_t nr = segs_end_include_deleted(segs, tr);
318 103438 : size_t rounded_nr = ((nr+31)&~31);
319 103438 : if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
320 0 : bat_destroy(b);
321 0 : return LOG_ERR;
322 : }
323 :
324 : /* disable all properties here */
325 103438 : MT_lock_set(&b->theaplock);
326 103438 : b->tsorted = false;
327 103438 : b->trevsorted = false;
328 103438 : b->tnosorted = 0;
329 103438 : b->tnorevsorted = 0;
330 103438 : b->tseqbase = oid_nil;
331 103438 : b->tkey = false;
332 103438 : b->tnokey[0] = 0;
333 103438 : b->tnokey[1] = 0;
334 103438 : b->theap->dirty = true;
335 103438 : BUN cnt = BATcount(b);
336 :
337 103438 : uint32_t *restrict dst;
338 428207 : for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
339 368906 : if (s->start >= nr)
340 : break;
341 324769 : if (s->ts == tr->tid && s->end != s->start) {
342 139970 : if (cnt < s->start) { /* first mark as deleted ! */
343 3282 : size_t lnr = s->start-cnt;
344 3282 : size_t pos = cnt;
345 3282 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
346 3282 : uint32_t cur = 0;
347 :
348 3282 : size_t used = pos&31, end = 32;
349 3282 : if (used) {
350 3180 : if (lnr < (32-used))
351 3006 : end = used + lnr;
352 3180 : assert(end > used);
353 3180 : cur |= ((1U << (end - used)) - 1) << used;
354 3180 : lnr -= end - used;
355 3180 : *dst++ |= cur;
356 3180 : cur = 0;
357 : }
358 3282 : size_t full = lnr/32;
359 3282 : size_t rest = lnr%32;
360 3282 : if (full > 0) {
361 7 : memset(dst, ~0, full * sizeof(*dst));
362 7 : dst += full;
363 7 : lnr -= full * 32;
364 : }
365 3282 : if (rest > 0) {
366 163 : cur |= (1U << rest) - 1;
367 163 : lnr -= rest;
368 163 : *dst |= cur;
369 : }
370 3282 : assert(lnr==0);
371 : }
372 139970 : size_t lnr = s->end-s->start;
373 139970 : size_t pos = s->start;
374 139970 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
375 139970 : uint32_t cur = 0;
376 139970 : size_t used = pos&31, end = 32;
377 139970 : if (used) {
378 102486 : if (lnr < (32-used))
379 97798 : end = used + lnr;
380 102486 : assert(end > used);
381 102486 : cur |= ((1U << (end - used)) - 1) << used;
382 102486 : lnr -= end - used;
383 102486 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
384 102486 : dst++;
385 102486 : cur = 0;
386 : }
387 139970 : size_t full = lnr/32;
388 139970 : size_t rest = lnr%32;
389 139970 : if (full > 0) {
390 3413 : memset(dst, s->deleted?~0:0, full * sizeof(*dst));
391 3413 : dst += full;
392 3413 : lnr -= full * 32;
393 : }
394 139970 : if (rest > 0) {
395 39862 : cur |= (1U << rest) - 1;
396 39862 : lnr -= rest;
397 39862 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
398 : }
399 139970 : assert(lnr==0);
400 139970 : if (cnt < s->end)
401 324769 : cnt = s->end;
402 : }
403 : }
404 103438 : if (nr > BATcount(b)) {
405 62254 : BATsetcount(b, nr);
406 : }
407 103438 : MT_lock_unset(&b->theaplock);
408 :
409 103438 : bat_destroy(b);
410 103438 : return LOG_OK;
411 : }
412 :
413 : /* TODO return LOG_OK/ERR */
414 : static void
415 103464 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
416 : {
417 103464 : sqlstore* store = tr->store;
418 103464 : segment *cur = s->segs->h, *seg = NULL;
419 492025 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
420 388561 : if (cur->ts == tr->tid) {
421 154018 : if (!cur->deleted)
422 90974 : cur->oldts = 0;
423 154018 : cur->ts = commit_ts;
424 : }
425 388561 : if (!seg) {
426 : /* first segment */
427 : seg = cur;
428 : }
429 285097 : else if (seg->ts < TRANSACTION_ID_BASE) {
430 : /* possible merge since both deleted flags are equal */
431 262953 : if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
432 203134 : int merge = 1;
433 203134 : node *n = store->active->h;
434 708086 : for (int i = 0; i < store->active->cnt; i++, n = n->next) {
435 614514 : sql_trans* other = ((sql_trans*)n->data);
436 614514 : ulng active = other->ts;
437 614514 : if(other->active == 2)
438 31753 : continue; /* pretend that another recently committed transaction is no longer active */
439 582761 : if (active == tr->ts)
440 127731 : continue; /* pretend that committing transaction has already committed and is no longer active */
441 455030 : if (seg->ts < active && cur->ts < active)
442 : break;
443 442297 : if (seg->ts > active && cur->ts > active)
444 345468 : continue;
445 :
446 96829 : assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
447 : /* cannot safely merge since there is an active transaction between the segments */
448 : merge = false;
449 : break;
450 : }
451 : /* merge segments */
452 212610 : if (merge) {
453 106305 : seg->end = cur->end;
454 106305 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
455 106305 : if (cur == s->segs->t)
456 27176 : s->segs->t = seg;
457 106305 : if (commit_ts == oldest) {
458 89437 : ATOMIC_PTR_DESTROY(&cur->next);
459 89437 : _DELETE(cur);
460 : } else
461 33736 : mark4destroy(cur, change, commit_ts);
462 106305 : cur = seg;
463 106305 : continue;
464 : }
465 : }
466 : }
467 : seg = cur;
468 : }
469 103464 : }
470 :
471 : static int
472 2113749 : segments_in_transaction(sql_trans *tr, sql_table *t)
473 : {
474 2113749 : storage *s = ATOMIC_PTR_GET(&t->data);
475 2113749 : segment *seg = s->segs->h;
476 :
477 2113749 : if (seg && s->segs->t->ts == tr->tid)
478 : return 1;
479 564728 : for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
480 459850 : if (seg->ts == tr->tid)
481 : return 1;
482 : }
483 : return 0;
484 : }
485 :
486 : static size_t
487 17977373 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
488 : {
489 17977373 : size_t cnt = 0;
490 :
491 : /* because a table can grow rows over the time a transaction is running, we need to find the last valid segment, to
492 : * keep all of the parts aligned */
493 17977373 : lock_table(tr->store, table->base.id);
494 18040420 : segment *s = segs->h, *l = NULL;
495 :
496 18040420 : if (segs->t && SEG_IS_VALID(segs->t, tr))
497 14344591 : l = s = segs->t;
498 :
499 219471494 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
500 201431245 : if (SEG_IS_VALID(s, tr))
501 : l = s;
502 : }
503 18040249 : if (l)
504 18021848 : cnt = l->end;
505 18040249 : unlock_table(tr->store, table->base.id);
506 18039124 : return cnt;
507 : }
508 :
509 : static segments *
510 50434 : new_segments(sql_trans *tr, size_t cnt)
511 : {
512 50434 : segments *n = (segments*)GDKmalloc(sizeof(segments));
513 :
514 50439 : if (n) {
515 50439 : n->nr_reused = 0;
516 50439 : n->h = n->t = new_segment(NULL, tr, cnt);
517 50435 : if (!n->h) {
518 0 : GDKfree(n);
519 0 : return NULL;
520 : }
521 50435 : sql_ref_init(&n->r);
522 : }
523 : return n;
524 : }
525 :
526 : static sql_delta *
527 29774722 : timestamp_delta( sql_trans *tr, sql_delta *d)
528 : {
529 29827495 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
530 52773 : d = d->next;
531 29780561 : return d;
532 : }
533 :
534 : static sql_delta *
535 29622658 : col_timestamp_delta( sql_trans *tr, sql_column *c)
536 : {
537 29622658 : return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
538 : }
539 :
540 : static sql_delta *
541 23407 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
542 : {
543 23407 : return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
544 : }
545 :
546 : static storage *
547 18418545 : timestamp_storage( sql_trans *tr, storage *d)
548 : {
549 18418545 : if (!d)
550 : return NULL;
551 18488610 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
552 70065 : d = d->next;
553 : return d;
554 : }
555 :
556 : static storage *
557 18396931 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
558 : {
559 18396931 : return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
560 : }
561 :
562 : static sql_delta*
563 19306 : delta_dup(sql_delta *d)
564 : {
565 19306 : ATOMIC_INC(&d->cs.refcnt);
566 19306 : return d;
567 : }
568 :
569 : static void *
570 17940 : col_dup(sql_column *c)
571 : {
572 17940 : return delta_dup(ATOMIC_PTR_GET(&c->data));
573 : }
574 :
575 : static void *
576 2653 : idx_dup(sql_idx *i)
577 : {
578 2653 : if (!ATOMIC_PTR_GET(&i->data))
579 : return NULL;
580 1366 : return delta_dup(ATOMIC_PTR_GET(&i->data));
581 : }
582 :
583 : static storage*
584 1539 : storage_dup(storage *d)
585 : {
586 1539 : ATOMIC_INC(&d->cs.refcnt);
587 1539 : return d;
588 : }
589 :
590 : static void *
591 1539 : del_dup(sql_table *t)
592 : {
593 1539 : return storage_dup(ATOMIC_PTR_GET(&t->data));
594 : }
595 :
596 : static size_t
597 17 : count_inserts( segment *s, sql_trans *tr)
598 : {
599 17 : size_t cnt = 0;
600 :
601 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
602 55 : if (!s->deleted && s->ts == tr->tid)
603 4 : cnt += s->end - s->start;
604 : }
605 17 : return cnt;
606 : }
607 :
608 : static size_t
609 780889 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
610 : {
611 780889 : size_t cnt = 0;
612 :
613 922793 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
614 : ;
615 :
616 1929429 : for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
617 1148592 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
618 23992 : cnt += s->end - s->start;
619 : }
620 780837 : return cnt;
621 : }
622 :
623 : static size_t
624 17 : count_deletes( segment *s, sql_trans *tr)
625 : {
626 17 : size_t cnt = 0;
627 :
628 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
629 55 : if (SEG_IS_DELETED(s, tr))
630 17 : cnt += s->end - s->start;
631 : }
632 17 : return cnt;
633 : }
634 :
635 : #define CNT_ACTIVE 10
636 :
637 : static size_t
638 16746668 : count_col(sql_trans *tr, sql_column *c, int access)
639 : {
640 16746668 : storage *d;
641 16746668 : sql_delta *ds;
642 :
643 16746668 : if (!isTable(c->t))
644 : return 0;
645 16746668 : d = tab_timestamp_storage(tr, c->t);
646 16756873 : ds = col_timestamp_delta(tr, c);
647 16762303 : if (!d ||!ds)
648 : return 0;
649 16762303 : if (access == 2)
650 433532 : return ds?ds->cs.ucnt:0;
651 16328771 : if (access == 1)
652 17 : return count_inserts(d->segs->h, tr);
653 16328754 : if (access == QUICK)
654 0 : return d->segs->t?d->segs->t->end:0;
655 16328754 : if (access == CNT_ACTIVE) {
656 780416 : size_t cnt = segs_end(d->segs, tr, c->t);
657 780976 : lock_table(tr->store, c->t->base.id);
658 780905 : cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
659 780852 : unlock_table(tr->store, c->t->base.id);
660 780852 : return cnt;
661 : }
662 15548338 : return segs_end(d->segs, tr, c->t);
663 : }
664 :
665 : static size_t
666 19352 : count_idx(sql_trans *tr, sql_idx *i, int access)
667 : {
668 19352 : storage *d;
669 19352 : sql_delta *ds;
670 :
671 19352 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
672 2543 : return 0;
673 16809 : d = tab_timestamp_storage(tr, i->t);
674 16823 : ds = idx_timestamp_delta(tr, i);
675 16828 : if (!d || !ds)
676 : return 0;
677 16828 : if (access == 2)
678 2871 : return ds?ds->cs.ucnt:0;
679 13957 : if (access == 1)
680 0 : return count_inserts(d->segs->h, tr);
681 13957 : if (access == QUICK)
682 3550 : return d->segs->t?d->segs->t->end:0;
683 10407 : return segs_end(d->segs, tr, i->t);
684 : }
685 :
686 : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
687 : static BAT *
688 12754931 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
689 : {
690 12754931 : BAT *b;
691 :
692 12754931 : assert(access == RD_UPD_ID || access == RD_UPD_VAL);
693 : /* returns the updates for cs */
694 12754931 : if (cs->uibid && cs->uvbid && cs->ucnt) {
695 12460 : if (access == RD_UPD_ID) {
696 7615 : if (!(b = temp_descriptor(cs->uibid)))
697 : return NULL;
698 7615 : if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
699 1337 : (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
700 6278 : oid nil = oid_nil;
701 : /* less then cnt */
702 6278 : BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false, false);
703 6278 : if (!s) {
704 0 : bat_destroy(b);
705 0 : return NULL;
706 : }
707 :
708 6278 : BAT *nb = BATproject(s, b);
709 6278 : bat_destroy(s);
710 6278 : bat_destroy(b);
711 6278 : b = nb;
712 : }
713 : } else {
714 4845 : b = temp_descriptor(cs->uvbid);
715 : }
716 : } else {
717 21238227 : b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
718 : }
719 : return b;
720 : }
721 :
722 : static BAT *
723 0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
724 : {
725 0 : int err = 0;
726 0 : BAT *uv = *UV;
727 0 : BUN cnt = BATcount(ui)+BATcount(oi);
728 0 : BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
729 0 : BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
730 :
731 0 : if (!ni || (uv && !nv)) {
732 0 : bat_destroy(ni);
733 0 : bat_destroy(nv);
734 0 : bat_destroy(ui);
735 0 : bat_destroy(uv);
736 0 : bat_destroy(oi);
737 0 : bat_destroy(ov);
738 0 : return NULL;
739 : }
740 0 : BATiter uvi;
741 0 : BATiter ovi;
742 :
743 0 : if (uv) {
744 0 : uvi = bat_iterator(uv);
745 0 : ovi = bat_iterator(ov);
746 : }
747 :
748 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
749 0 : BUN uip = 0, uie = BATcount(ui);
750 0 : BUN oip = 0, oie = BATcount(oi);
751 :
752 0 : oid uiseqb = ui->tseqbase;
753 0 : oid oiseqb = oi->tseqbase;
754 0 : oid *uipt = NULL, *oipt = NULL;
755 0 : BATiter uii = bat_iterator(ui);
756 0 : BATiter oii = bat_iterator(oi);
757 0 : if (!BATtdensebi(&uii))
758 0 : uipt = uii.base;
759 0 : if (!BATtdensebi(&oii))
760 0 : oipt = oii.base;
761 0 : while (uip < uie && oip < oie && !err) {
762 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
763 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
764 :
765 0 : if (uiid <= oiid) {
766 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
767 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
768 : err = 1;
769 0 : uip++;
770 0 : if (uiid == oiid)
771 0 : oip++;
772 : } else { /* uiid > oiid */
773 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
774 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
775 : err = 1;
776 0 : oip++;
777 : }
778 : }
779 0 : while (uip < uie && !err) {
780 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
781 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
782 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
783 : err = 1;
784 0 : uip++;
785 : }
786 0 : while (oip < oie && !err) {
787 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
788 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
789 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
790 : err = 1;
791 0 : oip++;
792 : }
793 0 : if (uv) {
794 0 : bat_iterator_end(&uvi);
795 0 : bat_iterator_end(&ovi);
796 : }
797 0 : bat_iterator_end(&uii);
798 0 : bat_iterator_end(&oii);
799 0 : bat_destroy(ui);
800 0 : bat_destroy(uv);
801 0 : bat_destroy(oi);
802 0 : bat_destroy(ov);
803 0 : if (!err) {
804 0 : if (nv)
805 0 : *UV = nv;
806 0 : return ni;
807 : }
808 0 : *UV = NULL;
809 0 : bat_destroy(ni);
810 0 : bat_destroy(nv);
811 0 : return NULL;
812 : }
813 :
814 : static sql_delta *
815 8506905 : older_delta( sql_delta *d, sql_trans *tr)
816 : {
817 8506905 : sql_delta *o = d->next;
818 :
819 8514518 : while (o && !o->cs.merged) {
820 7602 : if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
821 : break;
822 : else
823 7613 : o = o->next;
824 : }
825 8506916 : if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
826 0 : return o;
827 : return NULL;
828 : }
829 :
830 : static BAT *
831 8503215 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
832 : {
833 8503215 : assert(tr->active);
834 8503215 : sql_delta *o = NULL;
835 8503215 : BAT *ui = NULL, *uv = NULL;
836 :
837 8503215 : if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
838 : return NULL;
839 8506555 : if (access == RD_UPD_VAL) {
840 4254488 : if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
841 0 : bat_destroy(ui);
842 0 : return NULL;
843 : }
844 : }
845 8506829 : while ((o = older_delta(d, tr)) != NULL) {
846 0 : BAT *oui = NULL, *ouv = NULL;
847 0 : if (!oui)
848 0 : oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
849 0 : if (access == RD_UPD_VAL)
850 0 : ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
851 0 : if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
852 0 : bat_destroy(ui);
853 0 : bat_destroy(uv);
854 0 : bat_destroy(oui);
855 0 : bat_destroy(ouv);
856 0 : return NULL;
857 : }
858 0 : if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
859 : return NULL;
860 : d = o;
861 : }
862 8506925 : if (uv) {
863 4254764 : bat_destroy(ui);
864 4254764 : return uv;
865 : }
866 : return ui;
867 : }
868 :
869 : static BAT *
870 2078 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
871 : {
872 2078 : lock_column(tr->store, c->base.id);
873 2078 : sql_delta *d = col_timestamp_delta(tr, c);
874 2078 : int type = c->type.type->localtype;
875 :
876 2078 : if (!d) {
877 0 : unlock_column(tr->store, c->base.id);
878 0 : return NULL;
879 : }
880 2078 : if (d->cs.st == ST_DICT) {
881 0 : BAT *b = quick_descriptor(d->cs.bid);
882 :
883 0 : type = b->ttype;
884 : }
885 2078 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
886 2078 : unlock_column(tr->store, c->base.id);
887 2078 : return bn;
888 : }
889 :
890 : static BAT *
891 0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
892 : {
893 0 : lock_column(tr->store, i->base.id);
894 0 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
895 0 : sql_delta *d = idx_timestamp_delta(tr, i);
896 :
897 0 : if (!d) {
898 0 : unlock_column(tr->store, i->base.id);
899 0 : return NULL;
900 : }
901 0 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
902 0 : unlock_column(tr->store, i->base.id);
903 0 : return bn;
904 : }
905 :
906 : static BAT *
907 8646523 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
908 : {
909 8646523 : BAT *b;
910 :
911 8646523 : assert(access == RDONLY || access == QUICK || access == RD_EXT);
912 8646523 : assert(cs != NULL);
913 8646523 : if (access == QUICK)
914 87679 : return quick_descriptor(cs->bid);
915 8558844 : if (access == RD_EXT)
916 857 : return temp_descriptor(cs->ebid);
917 8557987 : assert(cs->bid);
918 8557987 : b = temp_descriptor(cs->bid);
919 8559315 : if (b == NULL)
920 : return NULL;
921 8559315 : assert(b->batRestricted == BAT_READ);
922 : /* return slice */
923 8559315 : BAT *s = BATslice(b, 0, cnt);
924 8544275 : bat_destroy(b);
925 8544275 : return s;
926 : }
927 :
928 : static int
929 4250782 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
930 : {
931 4250782 : lock_column(tr->store, c->base.id);
932 4250956 : size_t cnt = count_col(tr, c, 0);
933 4252573 : sql_delta *d = col_timestamp_delta(tr, c);
934 4252594 : int type = c->type.type->localtype;
935 :
936 4252594 : if (!d) {
937 0 : unlock_column(tr->store, c->base.id);
938 0 : return LOG_ERR;
939 : }
940 4252594 : if (d->cs.st == ST_DICT) {
941 2 : BAT *b = quick_descriptor(d->cs.bid);
942 :
943 2 : type = b->ttype;
944 : }
945 :
946 4252594 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
947 4252609 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
948 :
949 4252521 : unlock_column(tr->store, c->base.id);
950 :
951 4252311 : if (*ui == NULL || *uv == NULL) {
952 0 : bat_destroy(*ui);
953 0 : bat_destroy(*uv);
954 0 : return LOG_ERR;
955 : }
956 : return LOG_OK;
957 : }
958 :
959 : static int
960 23 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
961 : {
962 23 : lock_column(tr->store, i->base.id);
963 23 : size_t cnt = count_idx(tr, i, 0);
964 23 : sql_delta *d = idx_timestamp_delta(tr, i);
965 23 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
966 :
967 23 : if (!d) {
968 0 : unlock_column(tr->store, i->base.id);
969 0 : return LOG_ERR;
970 : }
971 :
972 23 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
973 23 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
974 :
975 23 : unlock_column(tr->store, i->base.id);
976 :
977 23 : if (*ui == NULL || *uv == NULL) {
978 0 : bat_destroy(*ui);
979 0 : bat_destroy(*uv);
980 0 : return LOG_ERR;
981 : }
982 : return LOG_OK;
983 : }
984 :
985 : static void * /* BAT * */
986 8637377 : bind_col(sql_trans *tr, sql_column *c, int access)
987 : {
988 8637377 : assert(access == QUICK || tr->active);
989 8637377 : if (!isTable(c->t))
990 : return NULL;
991 8637377 : sql_delta *d = col_timestamp_delta(tr, c);
992 8639359 : if (!d)
993 : return NULL;
994 8639359 : size_t cnt = count_col(tr, c, 0);
995 8643299 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
996 2078 : return bind_ucol(tr, c, access, cnt);
997 8641221 : BAT *b = cs_bind_bat( &d->cs, access, cnt);
998 8636261 : assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
999 : return b;
1000 : }
1001 :
1002 : static void * /* BAT * */
1003 6594 : bind_idx(sql_trans *tr, sql_idx * i, int access)
1004 : {
1005 6594 : assert(access == QUICK || tr->active);
1006 6594 : if (!isTable(i->t))
1007 : return NULL;
1008 6594 : sql_delta *d = idx_timestamp_delta(tr, i);
1009 6598 : if (!d)
1010 : return NULL;
1011 6598 : size_t cnt = count_idx(tr, i, 0);
1012 6599 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
1013 0 : return bind_uidx(tr, i, access, cnt);
1014 6599 : return cs_bind_bat( &d->cs, access, cnt);
1015 : }
1016 :
1017 : static int
1018 4052 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
1019 : {
1020 4052 : if (!cs->uibid) {
1021 0 : cs->uibid = e_bat(TYPE_oid);
1022 0 : if (cs->uibid == BID_NIL)
1023 : return LOG_ERR;
1024 : }
1025 4052 : if (!cs->uvbid) {
1026 0 : BAT *cur = quick_descriptor(cs->bid);
1027 0 : if (!cur)
1028 : return LOG_ERR;
1029 0 : int type = cur->ttype;
1030 0 : cs->uvbid = e_bat(type);
1031 0 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1032 : return LOG_ERR;
1033 : }
1034 4052 : BAT *ui = temp_descriptor(cs->uibid);
1035 4052 : BAT *uv = temp_descriptor(cs->uvbid);
1036 :
1037 4052 : if (ui == NULL || uv == NULL) {
1038 0 : bat_destroy(ui);
1039 0 : bat_destroy(uv);
1040 0 : return LOG_ERR;
1041 : }
1042 4052 : assert(ui && uv);
1043 4052 : if (isEbat(ui)){
1044 400 : temp_destroy(cs->uibid);
1045 400 : cs->uibid = temp_copy(ui->batCacheid, true, true);
1046 400 : bat_destroy(ui);
1047 400 : if (cs->uibid == BID_NIL ||
1048 400 : (ui = temp_descriptor(cs->uibid)) == NULL) {
1049 0 : bat_destroy(uv);
1050 0 : return LOG_ERR;
1051 : }
1052 : }
1053 4052 : if (isEbat(uv)){
1054 400 : temp_destroy(cs->uvbid);
1055 400 : cs->uvbid = temp_copy(uv->batCacheid, true, true);
1056 400 : bat_destroy(uv);
1057 400 : if (cs->uvbid == BID_NIL ||
1058 400 : (uv = temp_descriptor(cs->uvbid)) == NULL) {
1059 0 : bat_destroy(ui);
1060 0 : return LOG_ERR;
1061 : }
1062 : }
1063 4052 : *Ui = ui;
1064 4052 : *Uv = uv;
1065 4052 : return LOG_OK;
1066 : }
1067 :
1068 : static int
1069 6908 : segments_is_append(segment *s, sql_trans *tr, oid rid)
1070 : {
1071 102221 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1072 102221 : if (s->start <= rid && s->end > rid) {
1073 6908 : if (s->ts == tr->tid && !s->deleted) {
1074 2858 : return 1;
1075 : }
1076 : break;
1077 : }
1078 : }
1079 : return 0;
1080 : }
1081 :
1082 : static int
1083 4050 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
1084 : {
1085 96455 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1086 96455 : if (s->start <= rid && s->end > rid) {
1087 4050 : if (s->ts >= tr->ts && s->deleted) {
1088 0 : return 1;
1089 : }
1090 : break;
1091 : }
1092 : }
1093 : return 0;
1094 : }
1095 :
1096 : static sql_delta *
1097 0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
1098 : {
1099 0 : sql_delta *n = ZNEW(sql_delta);
1100 0 : if (!n)
1101 : return NULL;
1102 0 : *n = *bat;
1103 0 : n->next = NULL;
1104 0 : n->cs.ts = tr->tid;
1105 0 : return n;
1106 : }
1107 :
1108 : static BAT *
1109 17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
1110 : {
1111 17 : BAT *newoffsets = NULL;
1112 17 : sql_delta *bat = *batp;
1113 17 : column_storage *cs = &bat->cs;
1114 17 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1115 :
1116 17 : if (!u)
1117 : return NULL;
1118 17 : BUN max_cnt = (BATcount(u) < 256)?256:(BATcount(u)<65536)?65536:INT_MAX;
1119 17 : if (DICTprepare4append(&newoffsets, i, u) < 0) {
1120 0 : bat_destroy(u);
1121 0 : return NULL;
1122 : } else {
1123 17 : int new = 0;
1124 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1125 17 : if (BATcount(u) >= max_cnt) {
1126 1 : if (max_cnt == INT_MAX) { /* decompress */
1127 0 : if (!(b = temp_descriptor(cs->bid))) {
1128 0 : bat_destroy(u);
1129 0 : return NULL;
1130 : }
1131 0 : if (cs->ucnt) {
1132 0 : BAT *ui = NULL, *uv = NULL;
1133 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1134 0 : bat_destroy(b);
1135 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1136 0 : bat_destroy(nb);
1137 0 : bat_destroy(u);
1138 0 : return NULL;
1139 : }
1140 0 : b = nb;
1141 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1142 0 : bat_destroy(ui);
1143 0 : bat_destroy(uv);
1144 0 : bat_destroy(b);
1145 0 : bat_destroy(u);
1146 : }
1147 0 : bat_destroy(ui);
1148 0 : bat_destroy(uv);
1149 : }
1150 0 : n = DICTdecompress_(b, u, PERSISTENT);
1151 0 : bat_destroy(b);
1152 0 : assert(newoffsets == NULL);
1153 0 : if (!n) {
1154 0 : bat_destroy(u);
1155 0 : return NULL;
1156 : }
1157 0 : if (cs->ts != tr->tid) {
1158 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1159 0 : bat_destroy(n);
1160 0 : return NULL;
1161 : }
1162 0 : cs = &(*batp)->cs;
1163 0 : new = 1;
1164 : }
1165 0 : if (cs->bid && !new)
1166 0 : temp_destroy(cs->bid);
1167 0 : n = transfer_to_systrans(n);
1168 0 : if (n == NULL)
1169 : return NULL;
1170 0 : bat_set_access(n, BAT_READ);
1171 0 : cs->bid = temp_create(n);
1172 0 : bat_destroy(n);
1173 0 : if (cs->ebid && !new)
1174 0 : temp_destroy(cs->ebid);
1175 0 : cs->ebid = 0;
1176 0 : cs->ucnt = 0;
1177 0 : if (cs->uibid && !new)
1178 0 : temp_destroy(cs->uibid);
1179 0 : if (cs->uvbid && !new)
1180 0 : temp_destroy(cs->uvbid);
1181 0 : cs->uibid = cs->uvbid = 0;
1182 0 : cs->st = ST_DEFAULT;
1183 0 : cs->cleared = true;
1184 : } else {
1185 1 : if (!(b = temp_descriptor(cs->bid))) {
1186 0 : bat_destroy(newoffsets);
1187 0 : bat_destroy(u);
1188 0 : return NULL;
1189 : }
1190 2 : n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
1191 1 : bat_destroy(b);
1192 1 : if (!n) {
1193 0 : bat_destroy(newoffsets);
1194 0 : bat_destroy(u);
1195 0 : return NULL;
1196 : }
1197 1 : if (cs->ts != tr->tid) {
1198 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1199 0 : bat_destroy(n);
1200 0 : return NULL;
1201 : }
1202 0 : cs = &(*batp)->cs;
1203 0 : new = 1;
1204 0 : temp_dup(cs->ebid);
1205 0 : if (cs->uibid) {
1206 0 : temp_dup(cs->uibid);
1207 0 : temp_dup(cs->uvbid);
1208 : }
1209 : }
1210 1 : if (cs->bid && !new)
1211 1 : temp_destroy(cs->bid);
1212 1 : n = transfer_to_systrans(n);
1213 1 : if (n == NULL)
1214 : return NULL;
1215 1 : bat_set_access(n, BAT_READ);
1216 1 : cs->bid = temp_create(n);
1217 1 : bat_destroy(n);
1218 1 : cs->cleared = true;
1219 1 : i = newoffsets;
1220 : }
1221 : } else { /* append */
1222 16 : i = newoffsets;
1223 : }
1224 : }
1225 17 : bat_destroy(u);
1226 17 : return i;
1227 : }
1228 :
1229 : static BAT *
1230 0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
1231 : {
1232 0 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1233 0 : BAT *newoffsets = NULL;
1234 0 : BAT *b = NULL, *n = NULL;
1235 :
1236 0 : if (!(b = temp_descriptor(cs->bid)))
1237 : return NULL;
1238 :
1239 0 : if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
1240 0 : bat_destroy(b);
1241 0 : return NULL;
1242 : } else {
1243 : /* returns new offset bat if values within min/max, else decompress */
1244 0 : if (!newoffsets) { /* decompress */
1245 0 : if (cs->ucnt) {
1246 0 : BAT *ui = NULL, *uv = NULL;
1247 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1248 0 : bat_destroy(b);
1249 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1250 0 : bat_destroy(nb);
1251 0 : return NULL;
1252 : }
1253 0 : b = nb;
1254 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1255 0 : bat_destroy(ui);
1256 0 : bat_destroy(uv);
1257 0 : bat_destroy(b);
1258 : }
1259 0 : bat_destroy(ui);
1260 0 : bat_destroy(uv);
1261 : }
1262 0 : n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
1263 0 : bat_destroy(b);
1264 0 : if (!n)
1265 : return NULL;
1266 0 : if (cs->bid)
1267 0 : temp_destroy(cs->bid);
1268 0 : n = transfer_to_systrans(n);
1269 0 : if (n == NULL)
1270 : return NULL;
1271 0 : bat_set_access(n, BAT_READ);
1272 0 : cs->bid = temp_create(n);
1273 0 : cs->ucnt = 0;
1274 0 : if (cs->uibid)
1275 0 : temp_destroy(cs->uibid);
1276 0 : if (cs->uvbid)
1277 0 : temp_destroy(cs->uvbid);
1278 0 : cs->uibid = cs->uvbid = 0;
1279 0 : cs->st = ST_DEFAULT;
1280 0 : cs->cleared = true;
1281 0 : b = n;
1282 : } else { /* append */
1283 : i = newoffsets;
1284 : }
1285 : }
1286 0 : bat_destroy(b);
1287 0 : return i;
1288 : }
1289 :
1290 : /*
1291 : * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
1292 : */
1293 : static int
1294 2756 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
1295 : {
1296 2756 : int res = LOG_OK;
1297 2756 : sql_delta *bat = *batp;
1298 2756 : column_storage *cs = &bat->cs;
1299 2756 : BAT *otids = tids, *oupdates = updates;
1300 :
1301 2756 : if (!BATcount(tids))
1302 : return LOG_OK;
1303 :
1304 2756 : if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
1305 6 : tids = BATunmask(tids);
1306 6 : if (!tids)
1307 : return LOG_ERR;
1308 : }
1309 2756 : if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
1310 0 : updates = BATunmask(updates);
1311 0 : if (!updates) {
1312 0 : if (otids != tids)
1313 0 : bat_destroy(tids);
1314 0 : return LOG_ERR;
1315 : }
1316 2756 : } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
1317 42 : updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
1318 42 : if (!updates) {
1319 0 : if (otids != tids)
1320 0 : bat_destroy(tids);
1321 0 : return LOG_ERR;
1322 : }
1323 : }
1324 :
1325 2756 : if (cs->st == ST_DICT) {
1326 : /* possibly a new array is returned */
1327 4 : BAT *nupdates = dict_append_bat(tr, batp, updates);
1328 4 : bat = *batp;
1329 4 : cs = &bat->cs;
1330 4 : if (oupdates != updates)
1331 0 : bat_destroy(updates);
1332 4 : updates = nupdates;
1333 4 : if (!updates) {
1334 0 : if (otids != tids)
1335 0 : bat_destroy(tids);
1336 0 : return LOG_ERR;
1337 : }
1338 : }
1339 :
1340 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1341 : /* currently only one update delta is possible */
1342 2756 : lock_table(tr->store, t->base.id);
1343 2756 : storage *s = ATOMIC_PTR_GET(&t->data);
1344 2756 : if (!is_new && !cs->cleared) {
1345 2433 : if (!tids->tsorted /* make sure we have simple dense or oids */) {
1346 6 : BAT *sorted, *order;
1347 6 : if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
1348 0 : if (otids != tids)
1349 0 : bat_destroy(tids);
1350 0 : if (oupdates != updates)
1351 0 : bat_destroy(updates);
1352 0 : unlock_table(tr->store, t->base.id);
1353 0 : return LOG_ERR;
1354 : }
1355 6 : if (otids != tids)
1356 0 : bat_destroy(tids);
1357 6 : tids = sorted;
1358 6 : BAT *nupdates = BATproject(order, updates);
1359 6 : bat_destroy(order);
1360 6 : if (oupdates != updates)
1361 0 : bat_destroy(updates);
1362 6 : updates = nupdates;
1363 6 : if (!updates) {
1364 0 : bat_destroy(tids);
1365 0 : unlock_table(tr->store, t->base.id);
1366 0 : return LOG_ERR;
1367 : }
1368 : }
1369 2433 : assert(tids->tsorted);
1370 2433 : BAT *ui = NULL, *uv = NULL;
1371 :
1372 : /* handle updates on just inserted bits */
1373 : /* handle updates on updates (within one transaction) */
1374 2433 : BATiter upi = bat_iterator(updates);
1375 2433 : BUN cnt = 0, ucnt = BATcount(tids);
1376 2433 : BAT *b, *ins = NULL;
1377 2433 : int *msk = NULL;
1378 :
1379 2433 : if((b = temp_descriptor(cs->bid)) == NULL)
1380 : res = LOG_ERR;
1381 :
1382 2433 : if (res == LOG_OK && BATtdense(tids)) {
1383 2412 : oid start = tids->tseqbase, offset = start;
1384 2412 : oid end = start + ucnt;
1385 :
1386 5184 : for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
1387 3332 : if (seg->start <= start && seg->end > start) {
1388 : /* check for delete conflicts */
1389 2412 : if (seg->ts >= tr->ts && seg->deleted) {
1390 0 : res = LOG_CONFLICT;
1391 0 : continue;
1392 : }
1393 :
1394 : /* check for inplace updates */
1395 2412 : BUN lend = end < seg->end?end:seg->end;
1396 2412 : if (seg->ts == tr->tid && !seg->deleted) {
1397 11 : if (!ins) {
1398 11 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1399 11 : if (!ins)
1400 : res = LOG_ERR;
1401 : else {
1402 11 : BATsetcount(ins, ucnt); /* all full updates */
1403 11 : msk = (int*)Tloc(ins, 0);
1404 11 : BUN end = (ucnt+31)/32;
1405 11 : memset(msk, 0, end * sizeof(int));
1406 : }
1407 : }
1408 26 : for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
1409 15 : const void *upd = BUNtail(upi, rid-offset);
1410 15 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1411 0 : res = LOG_ERR;
1412 :
1413 15 : oid word = i/32;
1414 15 : int pos = i%32;
1415 15 : msk[word] |= 1U<<pos;
1416 15 : cnt++;
1417 : }
1418 : }
1419 : }
1420 3332 : if (end < seg->end)
1421 : break;
1422 : }
1423 24 : } else if (res == LOG_OK && complex_cand(tids)) {
1424 3 : struct canditer ci;
1425 3 : segment *seg = s->segs->h;
1426 3 : canditer_init(&ci, NULL, tids);
1427 3 : BUN i = 0;
1428 1036 : while ( seg && res == LOG_OK && i < ucnt) {
1429 1033 : oid rid = canditer_next(&ci);
1430 1033 : if (seg->end <= rid)
1431 13 : seg = ATOMIC_PTR_GET(&seg->next);
1432 1020 : else if (seg->start <= rid && seg->end > rid) {
1433 : /* check for delete conflicts */
1434 1020 : if (seg->ts >= tr->ts && seg->deleted) {
1435 0 : res = LOG_CONFLICT;
1436 0 : continue;
1437 : }
1438 :
1439 : /* check for inplace updates */
1440 1020 : if (seg->ts == tr->tid && !seg->deleted) {
1441 0 : if (!ins) {
1442 0 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1443 0 : if (!ins) {
1444 : res = LOG_ERR;
1445 : break;
1446 : } else {
1447 0 : BATsetcount(ins, ucnt); /* all full updates */
1448 0 : msk = (int*)Tloc(ins, 0);
1449 0 : BUN end = (ucnt+31)/32;
1450 0 : memset(msk, 0, end * sizeof(int));
1451 : }
1452 : }
1453 0 : ptr upd = BUNtail(upi, i);
1454 0 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1455 0 : res = LOG_ERR;
1456 :
1457 0 : oid word = i/32;
1458 0 : int pos = i%32;
1459 0 : msk[word] |= 1U<<pos;
1460 0 : cnt++;
1461 : }
1462 1020 : i++;
1463 : }
1464 : }
1465 18 : } else if (res == LOG_OK) {
1466 18 : BUN i = 0;
1467 18 : oid *rid = Tloc(tids,0);
1468 18 : segment *seg = s->segs->h;
1469 92 : while ( seg && res == LOG_OK && i < ucnt) {
1470 74 : if (seg->end <= rid[i])
1471 18 : seg = ATOMIC_PTR_GET(&seg->next);
1472 56 : else if (seg->start <= rid[i] && seg->end > rid[i]) {
1473 : /* check for delete conflicts */
1474 56 : if (seg->ts >= tr->ts && seg->deleted) {
1475 0 : res = LOG_CONFLICT;
1476 0 : continue;
1477 : }
1478 :
1479 : /* check for inplace updates */
1480 56 : if (seg->ts == tr->tid && !seg->deleted) {
1481 3 : if (!ins) {
1482 1 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1483 1 : if (!ins) {
1484 : res = LOG_ERR;
1485 : break;
1486 : } else {
1487 1 : BATsetcount(ins, ucnt); /* all full updates */
1488 1 : msk = (int*)Tloc(ins, 0);
1489 1 : BUN end = (ucnt+31)/32;
1490 1 : memset(msk, 0, end * sizeof(int));
1491 : }
1492 : }
1493 3 : const void *upd = BUNtail(upi, i);
1494 3 : if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
1495 0 : res = LOG_ERR;
1496 :
1497 3 : oid word = i/32;
1498 3 : int pos = i%32;
1499 3 : msk[word] |= 1U<<pos;
1500 3 : cnt++;
1501 : }
1502 56 : i++;
1503 : }
1504 : }
1505 : }
1506 :
1507 2433 : if (res == LOG_OK && cnt < ucnt) { /* now handle real updates */
1508 2421 : if (cs->ucnt == 0) {
1509 2419 : if (cnt) {
1510 0 : BAT *nins = BATmaskedcands(0, ucnt, ins, false);
1511 0 : if (nins) {
1512 0 : ui = BATproject(nins, tids);
1513 0 : uv = BATproject(nins, updates);
1514 0 : bat_destroy(nins);
1515 : }
1516 : } else {
1517 2419 : ui = temp_descriptor(tids->batCacheid);
1518 2419 : uv = temp_descriptor(updates->batCacheid);
1519 : }
1520 2419 : if (!ui || !uv) {
1521 : res = LOG_ERR;
1522 : } else {
1523 2419 : temp_destroy(cs->uibid);
1524 2419 : temp_destroy(cs->uvbid);
1525 2419 : ui = transfer_to_systrans(ui);
1526 2419 : uv = transfer_to_systrans(uv);
1527 2419 : if (ui == NULL || uv == NULL) {
1528 0 : BBPreclaim(ui);
1529 0 : BBPreclaim(uv);
1530 : res = LOG_ERR;
1531 : } else {
1532 2419 : cs->uibid = temp_create(ui);
1533 2419 : cs->uvbid = temp_create(uv);
1534 2419 : cs->ucnt = BATcount(ui);
1535 : }
1536 : }
1537 : } else {
1538 2 : BAT *nui = NULL, *nuv = NULL;
1539 :
1540 : /* merge taking msk of inserted into account */
1541 2 : if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
1542 : res = LOG_ERR;
1543 :
1544 2 : if (res == LOG_OK) {
1545 2 : const void *upd = NULL;
1546 2 : nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
1547 2 : nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
1548 :
1549 2 : if (!nui || !nuv) {
1550 : res = LOG_ERR;
1551 : } else {
1552 2 : BATiter ovi = bat_iterator(uv);
1553 :
1554 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
1555 2 : BUN uip = 0, uie = BATcount(ui);
1556 2 : BUN nip = 0, nie = BATcount(tids);
1557 2 : oid uiseqb = ui->tseqbase;
1558 2 : oid niseqb = tids->tseqbase;
1559 2 : oid *uipt = NULL, *nipt = NULL;
1560 2 : BATiter uii = bat_iterator(ui);
1561 2 : BATiter tidsi = bat_iterator(tids);
1562 2 : if (!BATtdensebi(&uii))
1563 1 : uipt = uii.base;
1564 2 : if (!BATtdensebi(&tidsi))
1565 1 : nipt = tidsi.base;
1566 20 : while (uip < uie && nip < nie && res == LOG_OK) {
1567 18 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1568 18 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1569 :
1570 18 : if (uiv < niv) {
1571 0 : upd = BUNtail(ovi, uip);
1572 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1573 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1574 : res = LOG_ERR;
1575 0 : uip++;
1576 18 : } else if (uiv == niv) {
1577 : /* handle == */
1578 18 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1579 18 : upd = BUNtail(upi, nip);
1580 36 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1581 18 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1582 : res = LOG_ERR;
1583 : } else {
1584 0 : upd = BUNtail(ovi, uip);
1585 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1586 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1587 : res = LOG_ERR;
1588 : }
1589 18 : uip++;
1590 18 : nip++;
1591 : } else { /* uiv > niv */
1592 0 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1593 0 : upd = BUNtail(upi, nip);
1594 0 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1595 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1596 : res = LOG_ERR;
1597 : }
1598 0 : nip++;
1599 : }
1600 : }
1601 2 : while (uip < uie && res == LOG_OK) {
1602 0 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1603 0 : upd = BUNtail(ovi, uip);
1604 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1605 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1606 : res = LOG_ERR;
1607 0 : uip++;
1608 : }
1609 2 : while (nip < nie && res == LOG_OK) {
1610 0 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1611 0 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1612 0 : upd = BUNtail(upi, nip);
1613 0 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1614 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1615 : res = LOG_ERR;
1616 : }
1617 0 : nip++;
1618 : }
1619 2 : bat_iterator_end(&uii);
1620 2 : bat_iterator_end(&tidsi);
1621 2 : bat_iterator_end(&ovi);
1622 2 : if (res == LOG_OK) {
1623 2 : temp_destroy(cs->uibid);
1624 2 : temp_destroy(cs->uvbid);
1625 2 : nui = transfer_to_systrans(nui);
1626 2 : nuv = transfer_to_systrans(nuv);
1627 2 : if (nui == NULL || nuv == NULL) {
1628 : res = LOG_ERR;
1629 : } else {
1630 2 : cs->uibid = temp_create(nui);
1631 2 : cs->uvbid = temp_create(nuv);
1632 2 : cs->ucnt = BATcount(nui);
1633 : }
1634 : }
1635 : }
1636 2 : bat_destroy(nui);
1637 2 : bat_destroy(nuv);
1638 : }
1639 : }
1640 : }
1641 2433 : bat_iterator_end(&upi);
1642 2433 : bat_destroy(b);
1643 2433 : unlock_table(tr->store, t->base.id);
1644 2433 : bat_destroy(ins);
1645 2433 : bat_destroy(ui);
1646 2433 : bat_destroy(uv);
1647 2433 : if (otids != tids)
1648 10 : bat_destroy(tids);
1649 2433 : if (oupdates != updates)
1650 11 : bat_destroy(updates);
1651 2433 : return res;
1652 : } else if (is_new || cs->cleared) {
1653 323 : BAT *b = temp_descriptor(cs->bid);
1654 :
1655 323 : if (b == NULL) {
1656 : res = LOG_ERR;
1657 : } else {
1658 323 : if (BATcount(b)==0) {
1659 1 : if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
1660 0 : res = LOG_ERR;
1661 322 : } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
1662 0 : res = LOG_ERR;
1663 323 : BBPcold(b->batCacheid);
1664 323 : bat_destroy(b);
1665 : }
1666 : }
1667 323 : unlock_table(tr->store, t->base.id);
1668 323 : if (otids != tids)
1669 2 : bat_destroy(tids);
1670 323 : if (oupdates != updates)
1671 41 : bat_destroy(updates);
1672 : return res;
1673 : }
1674 :
1675 : static int
1676 2756 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
1677 : {
1678 2756 : return cs_update_bat(tr, bat, t, tids, updates, is_new);
1679 : }
1680 :
1681 : static void *
1682 4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
1683 : {
1684 4 : void *newoffsets = NULL;
1685 4 : sql_delta *bat = *batp;
1686 4 : column_storage *cs = &bat->cs;
1687 4 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1688 :
1689 4 : if (!u)
1690 : return NULL;
1691 4 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1692 4 : if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
1693 0 : bat_destroy(u);
1694 0 : return NULL;
1695 : } else {
1696 4 : int new = 0;
1697 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1698 4 : if (BATcount(u) >= max_cnt) {
1699 0 : if (max_cnt == INT_MAX) { /* decompress */
1700 : if (!(b = temp_descriptor(cs->bid))) {
1701 : bat_destroy(u);
1702 : return NULL;
1703 : }
1704 : n = DICTdecompress_(b, u, PERSISTENT);
1705 : /* TODO decompress updates if any */
1706 : bat_destroy(b);
1707 : assert(newoffsets == NULL);
1708 : if (!n) {
1709 : bat_destroy(u);
1710 : return NULL;
1711 : }
1712 : if (cs->ts != tr->tid) {
1713 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1714 : bat_destroy(n);
1715 : bat_destroy(u);
1716 : return NULL;
1717 : }
1718 : cs = &(*batp)->cs;
1719 : new = 1;
1720 : cs->uibid = cs->uvbid = 0;
1721 : }
1722 : if (cs->bid && !new)
1723 : temp_destroy(cs->bid);
1724 : n = transfer_to_systrans(n);
1725 : if (n == NULL) {
1726 : bat_destroy(u);
1727 : return NULL;
1728 : }
1729 : bat_set_access(n, BAT_READ);
1730 : cs->bid = temp_create(n);
1731 : bat_destroy(n);
1732 : if (cs->ebid && !new)
1733 : temp_destroy(cs->ebid);
1734 : cs->ebid = 0;
1735 : cs->st = ST_DEFAULT;
1736 : /* at append_col the column's storage type is cleared */
1737 : cs->cleared = true;
1738 : } else {
1739 0 : if (!(b = temp_descriptor(cs->bid))) {
1740 0 : GDKfree(newoffsets);
1741 0 : bat_destroy(u);
1742 0 : return NULL;
1743 : }
1744 0 : n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
1745 0 : bat_destroy(b);
1746 0 : if (!n) {
1747 0 : GDKfree(newoffsets);
1748 0 : bat_destroy(u);
1749 0 : return NULL;
1750 : }
1751 0 : if (cs->ts != tr->tid) {
1752 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1753 0 : bat_destroy(u);
1754 0 : bat_destroy(n);
1755 0 : return NULL;
1756 : }
1757 0 : cs = &(*batp)->cs;
1758 0 : new = 1;
1759 0 : temp_dup(cs->ebid);
1760 0 : if (cs->uibid) {
1761 0 : temp_dup(cs->uibid);
1762 0 : temp_dup(cs->uvbid);
1763 : }
1764 : }
1765 0 : if (cs->bid)
1766 0 : temp_destroy(cs->bid);
1767 0 : n = transfer_to_systrans(n);
1768 0 : if (n == NULL) {
1769 0 : bat_destroy(u);
1770 0 : return NULL;
1771 : }
1772 0 : bat_set_access(n, BAT_READ);
1773 0 : cs->bid = temp_create(n);
1774 0 : bat_destroy(n);
1775 0 : cs->cleared = true;
1776 0 : i = newoffsets;
1777 : }
1778 : } else { /* append */
1779 4 : i = newoffsets;
1780 : }
1781 : }
1782 4 : bat_destroy(u);
1783 4 : return i;
1784 : }
1785 :
1786 : static void *
1787 1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
1788 : {
1789 1 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1790 1 : void *newoffsets = NULL;
1791 1 : BAT *b = NULL, *n = NULL;
1792 :
1793 1 : if (!(b = temp_descriptor(cs->bid)))
1794 : return NULL;
1795 :
1796 1 : if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
1797 0 : bat_destroy(b);
1798 0 : return NULL;
1799 : } else {
1800 : /* returns new offset bat if values within min/max, else decompress */
1801 1 : if (!newoffsets) {
1802 1 : n = FORdecompress_(b, offsetval, tt, PERSISTENT);
1803 1 : bat_destroy(b);
1804 1 : if (!n)
1805 : return NULL;
1806 : /* TODO decompress updates if any */
1807 1 : if (cs->bid)
1808 1 : temp_destroy(cs->bid);
1809 1 : n = transfer_to_systrans(n);
1810 1 : if (n == NULL)
1811 : return NULL;
1812 1 : bat_set_access(n, BAT_READ);
1813 1 : cs->bid = temp_create(n);
1814 1 : cs->st = ST_DEFAULT;
1815 : /* at append_col the column's storage type is cleared */
1816 1 : cs->cleared = true;
1817 1 : b = n;
1818 : } else { /* append */
1819 : i = newoffsets;
1820 : }
1821 : }
1822 1 : bat_destroy(b);
1823 1 : return i;
1824 : }
1825 :
1826 : static int
1827 6908 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
1828 : {
1829 6908 : void *oupd = upd;
1830 6908 : sql_delta *bat = *batp;
1831 6908 : column_storage *cs = &bat->cs;
1832 6908 : storage *s = ATOMIC_PTR_GET(&t->data);
1833 6908 : assert(!is_oid_nil(rid));
1834 6908 : int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
1835 :
1836 6908 : if (cs->st == ST_DICT) {
1837 : /* possibly a new array is returned */
1838 0 : upd = dict_append_val(tr, batp, upd, 1);
1839 0 : bat = *batp;
1840 0 : cs = &bat->cs;
1841 0 : if (!upd)
1842 : return LOG_ERR;
1843 : }
1844 :
1845 : /* check if rid is insert ? */
1846 6908 : if (!inplace) {
1847 : /* check conflict */
1848 4050 : if (segments_is_deleted(s->segs->h, tr, rid)) {
1849 0 : if (oupd != upd)
1850 0 : GDKfree(upd);
1851 0 : return LOG_CONFLICT;
1852 : }
1853 4050 : BAT *ui, *uv;
1854 :
1855 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1856 : /* currently only one update delta is possible */
1857 4050 : if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1858 0 : if (oupd != upd)
1859 0 : GDKfree(upd);
1860 0 : return LOG_ERR;
1861 : }
1862 :
1863 4050 : assert(uv->ttype);
1864 4050 : assert(BATcount(ui) == BATcount(uv));
1865 8100 : if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
1866 4050 : BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
1867 0 : if (oupd != upd)
1868 0 : GDKfree(upd);
1869 0 : bat_destroy(ui);
1870 0 : bat_destroy(uv);
1871 0 : return LOG_ERR;
1872 : }
1873 4050 : assert(BATcount(ui) == BATcount(uv));
1874 4050 : bat_destroy(ui);
1875 4050 : bat_destroy(uv);
1876 4050 : cs->ucnt++;
1877 : } else {
1878 2858 : BAT *b = NULL;
1879 :
1880 2858 : if((b = temp_descriptor(cs->bid)) == NULL) {
1881 0 : if (oupd != upd)
1882 0 : GDKfree(upd);
1883 0 : return LOG_ERR;
1884 : }
1885 2858 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
1886 0 : if (oupd != upd)
1887 0 : GDKfree(upd);
1888 0 : bat_destroy(b);
1889 0 : return LOG_ERR;
1890 : }
1891 2858 : bat_destroy(b);
1892 : }
1893 6908 : if (oupd != upd)
1894 0 : GDKfree(upd);
1895 : return LOG_OK;
1896 : }
1897 :
1898 : static int
1899 6908 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
1900 : {
1901 6908 : int res = LOG_OK;
1902 6908 : lock_table(tr->store, t->base.id);
1903 6908 : res = cs_update_val(tr, bat, t, rid, upd, is_new);
1904 6908 : unlock_table(tr->store, t->base.id);
1905 6908 : return res;
1906 : }
1907 :
1908 : static int
1909 158761 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
1910 : {
1911 158761 : (void)tr;
1912 158761 : if (!ocs)
1913 : return LOG_OK;
1914 158761 : cs->bid = ocs->bid;
1915 158761 : cs->ebid = ocs->ebid;
1916 158761 : cs->uibid = ocs->uibid;
1917 158761 : cs->uvbid = ocs->uvbid;
1918 158761 : cs->ucnt = ocs->ucnt;
1919 :
1920 158761 : if (temp) {
1921 25967 : cs->bid = temp_copy(cs->bid, true, false);
1922 25954 : if (cs->bid == BID_NIL)
1923 : return LOG_ERR;
1924 : } else {
1925 132794 : temp_dup(cs->bid);
1926 : }
1927 158770 : if (cs->ebid)
1928 6 : temp_dup(cs->ebid);
1929 158770 : cs->ucnt = 0;
1930 158770 : cs->uibid = e_bat(TYPE_oid);
1931 158798 : cs->uvbid = e_bat(type);
1932 158795 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1933 : return LOG_ERR;
1934 158795 : cs->st = ocs->st;
1935 158795 : return LOG_OK;
1936 : }
1937 :
1938 : static void
1939 302284 : destroy_delta(sql_delta *b, bool recursive)
1940 : {
1941 302284 : if (ATOMIC_DEC(&b->cs.refcnt) > 0)
1942 : return;
1943 282978 : if (recursive && b->next)
1944 128416 : destroy_delta(b->next, true);
1945 282978 : if (b->cs.uibid)
1946 86894 : temp_destroy(b->cs.uibid);
1947 282978 : if (b->cs.uvbid)
1948 86894 : temp_destroy(b->cs.uvbid);
1949 282978 : if (b->cs.bid)
1950 282978 : temp_destroy(b->cs.bid);
1951 282978 : if (b->cs.ebid)
1952 61 : temp_destroy(b->cs.ebid);
1953 282978 : b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
1954 282978 : _DELETE(b);
1955 : }
1956 :
1957 : static sql_delta *
1958 15576026 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
1959 : {
1960 15576026 : sql_delta *obat = ATOMIC_PTR_GET(&c->data);
1961 :
1962 15576026 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
1963 15443288 : return obat;
1964 132738 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
1965 : /* abort */
1966 12 : if (update_conflict)
1967 4 : *update_conflict = true;
1968 8 : else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
1969 8 : return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
1970 4 : return NULL;
1971 : }
1972 132726 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
1973 : return NULL;
1974 132744 : sql_delta* bat = ZNEW(sql_delta);
1975 132779 : if (!bat)
1976 : return NULL;
1977 132779 : ATOMIC_INIT(&bat->cs.refcnt, 1);
1978 132779 : if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
1979 0 : destroy_delta(bat, false);
1980 0 : return NULL;
1981 : }
1982 132797 : bat->cs.ts = tr->tid;
1983 : /* only one writer else abort */
1984 132797 : bat->next = obat;
1985 132797 : if (obat)
1986 132797 : bat->nr_updates = obat->nr_updates;
1987 132797 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
1988 0 : bat->next = NULL;
1989 0 : destroy_delta(bat, false);
1990 0 : if (update_conflict)
1991 0 : *update_conflict = true;
1992 0 : return NULL;
1993 : }
1994 : return bat;
1995 : }
1996 :
1997 : static int
1998 9664 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
1999 : {
2000 9664 : int ok = LOG_OK;
2001 :
2002 9664 : if (is_bat) {
2003 2756 : BAT *tids = incoming_tids;
2004 2756 : BAT *values = incoming_values;
2005 2756 : if (BATcount(tids) == 0)
2006 : return LOG_OK;
2007 2756 : ok = delta_update_bat(tr, delta, table, tids, values, is_new);
2008 : } else {
2009 6908 : ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
2010 : }
2011 : return ok;
2012 : }
2013 :
2014 : static int
2015 9675 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
2016 : {
2017 9675 : int res = LOG_OK;
2018 9675 : bool update_conflict = false;
2019 9675 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2020 :
2021 9675 : if (isbat) {
2022 2764 : BAT *t = tids;
2023 2764 : if (!BATcount(t))
2024 : return LOG_OK;
2025 : }
2026 :
2027 9389 : if (c == NULL)
2028 : return LOG_ERR;
2029 :
2030 9389 : if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
2031 4 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2032 :
2033 9385 : assert(delta && delta->cs.ts == tr->tid);
2034 9385 : assert(c->t->persistence != SQL_DECLARED_TABLE);
2035 9385 : if (odelta != delta)
2036 3362 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
2037 :
2038 9385 : odelta = delta;
2039 9385 : if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
2040 : return res;
2041 9385 : assert(delta == odelta);
2042 9385 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2043 0 : res = sql_trans_alter_storage(tr, c, NULL);
2044 : return res;
2045 : }
2046 :
2047 : static sql_delta *
2048 2457 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
2049 : {
2050 2457 : sql_delta *obat = ATOMIC_PTR_GET(&i->data);
2051 :
2052 2457 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
2053 2429 : return obat;
2054 28 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
2055 : /* abort */
2056 0 : if (update_conflict)
2057 0 : *update_conflict = true;
2058 0 : return NULL;
2059 : }
2060 28 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
2061 : return NULL;
2062 28 : sql_delta* bat = ZNEW(sql_delta);
2063 28 : if (!bat)
2064 : return NULL;
2065 28 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2066 33 : if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
2067 0 : destroy_delta(bat, false);
2068 0 : return NULL;
2069 : }
2070 28 : bat->cs.ts = tr->tid;
2071 : /* only one writer else abort */
2072 28 : bat->next = obat;
2073 28 : if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
2074 0 : bat->next = NULL;
2075 0 : destroy_delta(bat, false);
2076 0 : if (update_conflict)
2077 0 : *update_conflict = true;
2078 0 : return NULL;
2079 : }
2080 : return bat;
2081 : }
2082 :
2083 : static int
2084 782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
2085 : {
2086 782 : int res = LOG_OK;
2087 782 : bool update_conflict = false;
2088 782 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
2089 :
2090 782 : if (isbat) {
2091 782 : BAT *t = tids;
2092 782 : if (!BATcount(t))
2093 : return LOG_OK;
2094 : }
2095 :
2096 279 : if (i == NULL)
2097 : return LOG_ERR;
2098 :
2099 279 : if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
2100 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2101 :
2102 279 : assert(delta && delta->cs.ts == tr->tid);
2103 279 : if (odelta != delta)
2104 22 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
2105 :
2106 279 : odelta = delta;
2107 279 : res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
2108 279 : assert(delta == odelta);
2109 : return res;
2110 : }
2111 :
2112 : static int
2113 149187 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
2114 : {
2115 149187 : BAT *b, *oi = i;
2116 149187 : int err = 0;
2117 149187 : sql_delta *bat = *batp;
2118 :
2119 149187 : assert(!offsets || BATcount(offsets) == BATcount(i));
2120 149187 : if (!BATcount(i))
2121 : return LOG_OK;
2122 149187 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
2123 : return LOG_ERR;
2124 :
2125 149187 : lock_column(tr->store, id);
2126 150003 : if (bat->cs.st == ST_DICT) {
2127 13 : BAT *ni = dict_append_bat(tr, batp, oi);
2128 13 : bat = *batp;
2129 13 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2130 0 : bat_destroy(oi);
2131 13 : oi = ni;
2132 13 : if (!oi) {
2133 0 : unlock_column(tr->store, id);
2134 0 : return LOG_ERR;
2135 : }
2136 : }
2137 150003 : if (bat->cs.st == ST_FOR) {
2138 0 : BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
2139 0 : bat = *batp;
2140 0 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2141 0 : bat_destroy(oi);
2142 0 : oi = ni;
2143 0 : if (!oi) {
2144 0 : unlock_column(tr->store, id);
2145 0 : return LOG_ERR;
2146 : }
2147 : }
2148 :
2149 150003 : b = temp_descriptor(bat->cs.bid);
2150 149863 : if (b == NULL) {
2151 0 : unlock_column(tr->store, id);
2152 0 : if (oi != i)
2153 0 : bat_destroy(oi);
2154 0 : return LOG_ERR;
2155 : }
2156 149863 : if (!offsets && offset == b->hseqbase+BATcount(b)) {
2157 149675 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2158 785 : err = 1;
2159 176 : } else if (!offsets) {
2160 176 : if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
2161 785 : err = 1;
2162 12 : } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
2163 0 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2164 785 : err = 1;
2165 12 : } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
2166 785 : err = 1;
2167 : }
2168 149966 : bat_destroy(b);
2169 150213 : unlock_column(tr->store, id);
2170 :
2171 150467 : if (oi != i)
2172 13 : bat_destroy(oi);
2173 150467 : return (err)?LOG_ERR:LOG_OK;
2174 : }
2175 :
2176 : // Look at the offsets and find where the replacements end and the appends begin.
2177 : static BUN
2178 0 : start_of_appends(BAT *offsets, BUN bcnt)
2179 : {
2180 0 : BUN ocnt = BATcount(offsets);
2181 0 : if (ocnt == 0)
2182 : return 0;
2183 :
2184 0 : BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
2185 0 : if (highest < bcnt)
2186 : // all are replacements
2187 : return ocnt;
2188 :
2189 : // reason backward to find the first append.
2190 : // Suppose offsets has 15 entries, bcnt == 100
2191 : // and the highest offset in offsets is 109.
2192 0 : BUN new_bcnt = highest + 1; // 110
2193 0 : BUN nappends = new_bcnt - bcnt; // 10
2194 0 : BUN nreplacements = ocnt - nappends; // 5
2195 :
2196 : // The first append should be to position bcnt
2197 0 : assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
2198 :
2199 : return nreplacements;
2200 : }
2201 :
2202 :
2203 : static int
2204 15288589 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
2205 : {
2206 15288589 : void *oi = i;
2207 15288589 : BAT *b;
2208 15288589 : lock_column(tr->store, id);
2209 15291548 : sql_delta *bat = *batp;
2210 :
2211 15291548 : if (bat->cs.st == ST_DICT) {
2212 : /* possibly a new array is returned */
2213 4 : i = dict_append_val(tr, batp, i, cnt);
2214 4 : bat = *batp;
2215 4 : if (!i) {
2216 0 : unlock_column(tr->store, id);
2217 0 : return LOG_ERR;
2218 : }
2219 : }
2220 15291548 : if (bat->cs.st == ST_FOR) {
2221 : /* possibly a new array is returned */
2222 1 : i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
2223 1 : bat = *batp;
2224 1 : if (!i) {
2225 0 : unlock_column(tr->store, id);
2226 0 : return LOG_ERR;
2227 : }
2228 : }
2229 :
2230 15291548 : b = temp_descriptor(bat->cs.bid);
2231 15289813 : if (b == NULL) {
2232 0 : if (i != oi)
2233 0 : GDKfree(i);
2234 0 : unlock_column(tr->store, id);
2235 0 : return LOG_ERR;
2236 : }
2237 15289813 : BUN bcnt = BATcount(b);
2238 :
2239 15289813 : if (offsets) {
2240 : // The first few might be replacements while later items might be appends.
2241 : // Handle the replacements here while leaving the appends to the code below.
2242 0 : BUN nreplacements = start_of_appends(offsets, bcnt);
2243 :
2244 0 : oid *start = Tloc(offsets, 0);
2245 0 : if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
2246 0 : bat_destroy(b);
2247 0 : if (i != oi)
2248 0 : GDKfree(i);
2249 0 : unlock_column(tr->store, id);
2250 0 : return LOG_ERR;
2251 : }
2252 :
2253 : // Replacements have been handled. The rest are appends.
2254 0 : assert(offset == oid_nil);
2255 0 : offset = bcnt;
2256 0 : cnt -= nreplacements;
2257 : }
2258 :
2259 15289813 : if (bcnt > offset){
2260 329290 : size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
2261 329290 : if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
2262 0 : bat_destroy(b);
2263 0 : if (i != oi)
2264 0 : GDKfree(i);
2265 0 : unlock_column(tr->store, id);
2266 0 : return LOG_ERR;
2267 : }
2268 329439 : cnt -= ccnt;
2269 329439 : offset += ccnt;
2270 : }
2271 15289962 : if (cnt) {
2272 14960505 : if (BATcount(b) < offset) { /* add space */
2273 8308 : BUN d = offset - BATcount(b);
2274 8308 : if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
2275 0 : bat_destroy(b);
2276 0 : if (i != oi)
2277 0 : GDKfree(i);
2278 0 : unlock_column(tr->store, id);
2279 0 : return LOG_ERR;
2280 : }
2281 : }
2282 14960503 : if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
2283 0 : bat_destroy(b);
2284 0 : if (i != oi)
2285 0 : GDKfree(i);
2286 0 : unlock_column(tr->store, id);
2287 0 : return LOG_ERR;
2288 : }
2289 : }
2290 15290825 : bat_destroy(b);
2291 15290355 : if (i != oi)
2292 4 : GDKfree(i);
2293 15290355 : unlock_column(tr->store, id);
2294 15290355 : return LOG_OK;
2295 : }
2296 :
2297 : static int
2298 25970 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
2299 : {
2300 25970 : if (!(bat->segs = new_segments(tr, 0)))
2301 : return LOG_ERR;
2302 25967 : return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
2303 : }
2304 :
2305 : static int
2306 15437837 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type)
2307 : {
2308 15437837 : int ok = LOG_OK;
2309 :
2310 15437837 : if ((*delta)->cs.merged)
2311 36526 : (*delta)->cs.merged = false; /* TODO needs to move */
2312 15437837 : if (isbat) {
2313 149127 : BAT *bat = incoming_data;
2314 :
2315 149127 : if (BATcount(bat))
2316 149255 : ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
2317 : } else {
2318 15288710 : ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
2319 : }
2320 15442435 : return ok;
2321 : }
2322 :
2323 : static int
2324 15437593 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2325 : {
2326 15437593 : int res = LOG_OK;
2327 15437593 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2328 :
2329 15437593 : if (isbat) {
2330 149272 : BAT *t = data;
2331 149272 : if (!BATcount(t))
2332 : return LOG_OK;
2333 : }
2334 :
2335 15436816 : if ((delta = bind_col_data(tr, c, NULL)) == NULL)
2336 : return LOG_ERR;
2337 :
2338 15437466 : assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
2339 :
2340 15437466 : odelta = delta;
2341 15437466 : if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type)) != LOG_OK)
2342 : return res;
2343 15440418 : if (odelta != delta) {
2344 0 : delta->next = odelta;
2345 0 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
2346 0 : delta->next = NULL;
2347 0 : destroy_delta(delta, false);
2348 0 : return LOG_CONFLICT;
2349 : }
2350 : }
2351 15440418 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2352 1 : res = sql_trans_alter_storage(tr, c, NULL);
2353 : return res;
2354 : }
2355 :
2356 : static int
2357 2184 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2358 : {
2359 2184 : int res = LOG_OK;
2360 2184 : sql_delta *delta;
2361 :
2362 2184 : if (isbat) {
2363 1010 : BAT *t = data;
2364 1010 : if (!BATcount(t))
2365 : return LOG_OK;
2366 : }
2367 :
2368 2172 : if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
2369 : return LOG_ERR;
2370 :
2371 2172 : assert(delta->cs.st == ST_DEFAULT);
2372 :
2373 2172 : res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL);
2374 2172 : return res;
2375 : }
2376 :
2377 : static int
2378 66259 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
2379 : {
2380 66259 : int err = 0;
2381 :
2382 : /* TODO check for conflicting updates */
2383 66259 : (void)rid;
2384 66259 : (void)cnt;
2385 516677 : for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
2386 450418 : sql_column *c = n->data;
2387 450418 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
2388 :
2389 : /* check for active updates */
2390 450418 : if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
2391 : return 1;
2392 : }
2393 : return 0;
2394 : }
2395 :
2396 : static int
2397 64311 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
2398 : {
2399 64311 : int in_transaction = segments_in_transaction(tr, t);
2400 :
2401 64311 : lock_table(tr->store, t->base.id);
2402 : /* find segment of rid, split, mark new segment deleted (for tr->tid) */
2403 64311 : segment *seg = s->segs->h, *p = NULL;
2404 51205442 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2405 51205442 : if (seg->start <= rid && seg->end > rid) {
2406 64311 : if (!SEG_VALID_4_DELETE(seg,tr)) {
2407 4 : unlock_table(tr->store, t->base.id);
2408 4 : return LOG_CONFLICT;
2409 : }
2410 64307 : if (deletes_conflict_updates( tr, t, rid, 1)) {
2411 0 : unlock_table(tr->store, t->base.id);
2412 0 : return LOG_CONFLICT;
2413 : }
2414 64307 : if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
2415 0 : unlock_table(tr->store, t->base.id);
2416 0 : return LOG_ERR;
2417 : }
2418 : break;
2419 : }
2420 : }
2421 64307 : unlock_table(tr->store, t->base.id);
2422 64307 : if (!in_transaction)
2423 12783 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2424 : return LOG_OK;
2425 : }
2426 :
2427 : static int
2428 1950 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
2429 : {
2430 1950 : segment *seg = *Seg, *p = NULL;
2431 6143 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2432 6141 : if (seg->start <= start && seg->end > start) {
2433 2000 : size_t lcnt = cnt;
2434 2000 : if (start+lcnt > seg->end)
2435 54 : lcnt = seg->end-start;
2436 2000 : if (SEG_IS_DELETED(seg, tr)) {
2437 47 : start += lcnt;
2438 47 : cnt -= lcnt;
2439 47 : continue;
2440 1953 : } else if (!SEG_VALID_4_DELETE(seg, tr))
2441 1 : return LOG_CONFLICT;
2442 1952 : if (deletes_conflict_updates( tr, t, start, lcnt))
2443 : return LOG_CONFLICT;
2444 1952 : *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
2445 1952 : if (!seg)
2446 : return LOG_ERR;
2447 1952 : start += lcnt;
2448 1952 : cnt -= lcnt;
2449 : }
2450 6093 : if (start+cnt <= seg->end)
2451 : break;
2452 : }
2453 : return LOG_OK;
2454 : }
2455 :
2456 : static int
2457 486 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
2458 : {
2459 486 : segment *seg = s->segs->h;
2460 486 : return seg_delete_range(tr, t, s, &seg, start, cnt);
2461 : }
2462 :
2463 : static int
2464 302 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
2465 : {
2466 302 : int in_transaction = segments_in_transaction(tr, t);
2467 302 : BAT *oi = i; /* update ids */
2468 302 : int ok = LOG_OK;
2469 :
2470 302 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
2471 : return LOG_ERR;
2472 302 : if (BATcount(i)) {
2473 539 : if (BATtdense(i)) {
2474 237 : size_t start = i->tseqbase;
2475 237 : size_t cnt = BATcount(i);
2476 :
2477 237 : lock_table(tr->store, t->base.id);
2478 237 : ok = delete_range(tr, t, s, start, cnt);
2479 237 : unlock_table(tr->store, t->base.id);
2480 65 : } else if (complex_cand(i)) {
2481 0 : struct canditer ci;
2482 0 : oid f = 0, l = 0, cur = 0;
2483 :
2484 0 : canditer_init(&ci, NULL, i);
2485 0 : cur = f = canditer_next(&ci);
2486 :
2487 0 : lock_table(tr->store, t->base.id);
2488 0 : if (!is_oid_nil(f)) {
2489 0 : segment *seg = s->segs->h;
2490 0 : for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
2491 0 : if (cur+1 == l) {
2492 0 : cur++;
2493 0 : continue;
2494 : }
2495 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2496 0 : f = cur = l;
2497 : }
2498 0 : if (ok == LOG_OK)
2499 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2500 : }
2501 0 : unlock_table(tr->store, t->base.id);
2502 : } else {
2503 65 : if (!i->tsorted) {
2504 0 : assert(oi == i);
2505 0 : BAT *ni = NULL;
2506 0 : if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
2507 0 : ok = LOG_ERR;
2508 0 : if (ni)
2509 0 : i = ni;
2510 : }
2511 65 : assert(i->tsorted);
2512 65 : BUN icnt = BATcount(i);
2513 65 : BATiter ii = bat_iterator(i);
2514 65 : oid *o = ii.base, n = o[0]+1;
2515 65 : size_t lcnt = 1;
2516 :
2517 65 : lock_table(tr->store, t->base.id);
2518 65 : segment *seg = s->segs->h;
2519 22227 : for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
2520 22162 : if (o[i] == n) {
2521 21807 : lcnt++;
2522 21807 : n++;
2523 : } else {
2524 355 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2525 355 : lcnt = 0;
2526 : }
2527 22162 : if (!lcnt) {
2528 355 : n = o[i]+1;
2529 355 : lcnt = 1;
2530 : }
2531 : }
2532 65 : bat_iterator_end(&ii);
2533 65 : if (lcnt && ok == LOG_OK)
2534 65 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2535 65 : unlock_table(tr->store, t->base.id);
2536 : }
2537 : }
2538 302 : if (i != oi)
2539 25 : bat_destroy(i);
2540 : // assert
2541 302 : if (!in_transaction)
2542 270 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2543 : return ok;
2544 : }
2545 :
2546 : static void
2547 49565 : destroy_segments(segments *s)
2548 : {
2549 49565 : if (!s || sql_ref_dec(&s->r) > 0)
2550 0 : return;
2551 49565 : segment *seg = s->h;
2552 100326 : while(seg) {
2553 50761 : segment *n = ATOMIC_PTR_GET(&seg->next);
2554 50761 : ATOMIC_PTR_DESTROY(&seg->next);
2555 50761 : _DELETE(seg);
2556 50761 : seg = n;
2557 : }
2558 49565 : _DELETE(s);
2559 : }
2560 :
2561 : static void
2562 50962 : destroy_storage(storage *bat)
2563 : {
2564 50962 : if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
2565 : return;
2566 49423 : if (bat->next)
2567 6587 : destroy_storage(bat->next);
2568 49423 : destroy_segments(bat->segs);
2569 49423 : if (bat->cs.uibid)
2570 28863 : temp_destroy(bat->cs.uibid);
2571 49423 : if (bat->cs.uvbid)
2572 28863 : temp_destroy(bat->cs.uvbid);
2573 49423 : if (bat->cs.bid)
2574 49423 : temp_destroy(bat->cs.bid);
2575 49423 : bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
2576 49423 : _DELETE(bat);
2577 : }
2578 :
2579 : static int
2580 171151 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
2581 : {
2582 171151 : if (uncommitted) {
2583 419396 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2584 268765 : if (!VALID_4_READ(s->ts,tr))
2585 : return 1;
2586 : } else {
2587 55221 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2588 36121 : if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
2589 : return 1;
2590 : }
2591 :
2592 : return 0;
2593 : }
2594 :
2595 : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
2596 :
2597 : storage *
2598 2133702 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
2599 : {
2600 2133702 : storage *obat;
2601 :
2602 2133702 : obat = ATOMIC_PTR_GET(&t->data);
2603 :
2604 2133702 : if (obat->cs.ts != tr->tid)
2605 1457954 : if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
2606 1457899 : if (obat->cs.ts >= TRANSACTION_ID_BASE) {
2607 : /* abort */
2608 15392 : if (clear)
2609 15392 : *clear = true;
2610 15392 : return NULL;
2611 : }
2612 :
2613 2118310 : if (!clear)
2614 : return obat;
2615 :
2616 : /* remainder is only to handle clear */
2617 26384 : if (segments_conflict(tr, obat->segs, 1)) {
2618 417 : *clear = true;
2619 417 : return NULL;
2620 : }
2621 25966 : if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
2622 : return NULL;
2623 25967 : storage *bat = ZNEW(storage);
2624 25972 : if (!bat)
2625 : return NULL;
2626 25972 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2627 25972 : if (dup_storage(tr, obat, bat) != LOG_OK) {
2628 0 : destroy_storage(bat);
2629 0 : return NULL;
2630 : }
2631 25972 : bat->cs.cleared = true;
2632 25972 : bat->cs.ts = tr->tid;
2633 : /* only one writer else abort */
2634 25972 : bat->next = obat;
2635 25972 : if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
2636 9 : bat->next = NULL;
2637 9 : destroy_storage(bat);
2638 9 : if (clear)
2639 9 : *clear = true;
2640 9 : return NULL;
2641 : }
2642 : return bat;
2643 : }
2644 :
2645 : static int
2646 64662 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
2647 : {
2648 64662 : int ok = LOG_OK;
2649 64662 : BAT *b = ib;
2650 64662 : storage *bat;
2651 :
2652 64662 : if (isbat && !BATcount(b))
2653 : return ok;
2654 :
2655 64613 : if (t == NULL)
2656 : return LOG_ERR;
2657 :
2658 64613 : if ((bat = bind_del_data(tr, t, NULL)) == NULL)
2659 : return LOG_ERR;
2660 :
2661 64613 : if (isbat)
2662 302 : ok = storage_delete_bat(tr, t, bat, ib);
2663 : else
2664 64311 : ok = storage_delete_val(tr, t, bat, *(oid*)ib);
2665 : return ok;
2666 : }
2667 :
2668 : static size_t
2669 0 : dcount_col(sql_trans *tr, sql_column *c)
2670 : {
2671 0 : sql_delta *b;
2672 :
2673 0 : if (!isTable(c->t))
2674 : return 0;
2675 0 : b = col_timestamp_delta(tr, c);
2676 0 : if (!b)
2677 : return 1;
2678 :
2679 0 : storage *s = ATOMIC_PTR_GET(&c->t->data);
2680 0 : if (!s || !s->segs->t)
2681 : return 1;
2682 0 : size_t cnt = s->segs->t->end;
2683 0 : if (cnt) {
2684 0 : BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
2685 0 : size_t dcnt = 0;
2686 :
2687 0 : if (v)
2688 0 : dcnt = BATguess_uniques(v, NULL);
2689 0 : return dcnt;
2690 : }
2691 : return cnt;
2692 : }
2693 :
2694 : static BAT *
2695 3255267 : bind_no_view(BAT *b, bool quick)
2696 : {
2697 3255267 : if (VIEWtparent(b)) { /* If it is a view get the parent BAT */
2698 3251865 : BAT *nb = BBP_desc(VIEWtparent(b));
2699 3251865 : bat_destroy(b);
2700 3251869 : if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
2701 : return NULL;
2702 : }
2703 : return b;
2704 : }
2705 :
2706 : static int
2707 0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
2708 : {
2709 0 : int ok = 0;
2710 0 : assert(tr->active);
2711 0 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2712 0 : return 0;
2713 0 : lock_column(tr->store, c->base.id);
2714 0 : if (unique_est) {
2715 0 : sql_delta *d;
2716 0 : if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
2717 0 : BAT *b;
2718 0 : if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
2719 0 : MT_lock_set(&b->theaplock);
2720 0 : b->tunique_est = *unique_est;
2721 0 : MT_lock_unset(&b->theaplock);
2722 0 : bat_destroy(b);
2723 : }
2724 : }
2725 : }
2726 0 : if (min) {
2727 0 : _DELETE(c->min);
2728 0 : size_t minlen = ATOMlen(c->type.type->localtype, min);
2729 0 : if ((c->min = GDKmalloc(minlen)) != NULL) {
2730 0 : memcpy(c->min, min, minlen);
2731 0 : ok = 1;
2732 : }
2733 : }
2734 0 : if (max) {
2735 0 : _DELETE(c->max);
2736 0 : size_t maxlen = ATOMlen(c->type.type->localtype, max);
2737 0 : if ((c->max = GDKmalloc(maxlen)) != NULL) {
2738 0 : memcpy(c->max, max, maxlen);
2739 0 : ok = 1;
2740 : }
2741 : }
2742 0 : unlock_column(tr->store, c->base.id);
2743 0 : return ok;
2744 : }
2745 :
2746 : static int
2747 19 : min_max_col(sql_trans *tr, sql_column *c)
2748 : {
2749 19 : int ok = 0;
2750 19 : BAT *b = NULL;
2751 19 : sql_delta *d = NULL;
2752 :
2753 19 : assert(tr->active);
2754 19 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2755 0 : return 0;
2756 19 : if (c->min && c->max)
2757 : return 1;
2758 19 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2759 19 : if (d->cs.st == ST_FOR)
2760 : return 0;
2761 19 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2762 19 : lock_column(tr->store, c->base.id);
2763 19 : if (c->min && c->max) {
2764 0 : unlock_column(tr->store, c->base.id);
2765 0 : return 1;
2766 : }
2767 19 : _DELETE(c->min);
2768 19 : _DELETE(c->max);
2769 19 : if ((b = bind_col(tr, c, access))) {
2770 19 : if (!(b = bind_no_view(b, false))) {
2771 0 : unlock_column(tr->store, c->base.id);
2772 0 : return 0;
2773 : }
2774 19 : BATiter bi = bat_iterator(b);
2775 19 : if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
2776 16 : const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
2777 16 : size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
2778 :
2779 16 : if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
2780 0 : _DELETE(c->min);
2781 0 : _DELETE(c->max);
2782 : } else {
2783 16 : memcpy(c->min, nmin, minlen);
2784 16 : memcpy(c->max, nmax, maxlen);
2785 16 : ok = 1;
2786 : }
2787 : }
2788 19 : bat_iterator_end(&bi);
2789 19 : bat_destroy(b);
2790 : }
2791 19 : unlock_column(tr->store, c->base.id);
2792 : }
2793 : return ok;
2794 : }
2795 :
2796 : static size_t
2797 17 : count_segs(segment *s)
2798 : {
2799 17 : size_t nr = 0;
2800 :
2801 72 : for( ; s; s = ATOMIC_PTR_GET(&s->next))
2802 55 : nr++;
2803 17 : return nr;
2804 : }
2805 :
2806 : static size_t
2807 34 : count_del(sql_trans *tr, sql_table *t, int access)
2808 : {
2809 34 : storage *d;
2810 :
2811 34 : if (!isTable(t))
2812 : return 0;
2813 34 : d = tab_timestamp_storage(tr, t);
2814 34 : if (!d)
2815 : return 0;
2816 34 : if (access == 2)
2817 0 : return d->cs.ucnt;
2818 34 : if (access == 1)
2819 0 : return count_inserts(d->segs->h, tr);
2820 34 : if (access == 10) /* special case for counting the number of segments */
2821 17 : return count_segs(d->segs->h);
2822 17 : return count_deletes(d->segs->h, tr);
2823 : }
2824 :
2825 : static int
2826 19380 : sorted_col(sql_trans *tr, sql_column *col)
2827 : {
2828 19380 : int sorted = 0;
2829 :
2830 19380 : assert(tr->active);
2831 19380 : if (!isTable(col->t) || !col->t->s)
2832 : return 0;
2833 :
2834 19380 : if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
2835 19361 : BAT *b = bind_col(tr, col, QUICK);
2836 :
2837 19361 : if (b)
2838 19361 : sorted = b->tsorted || b->trevsorted;
2839 : }
2840 : return sorted;
2841 : }
2842 :
2843 : static int
2844 7468 : unique_col(sql_trans *tr, sql_column *col)
2845 : {
2846 7468 : int distinct = 0;
2847 :
2848 7468 : assert(tr->active);
2849 7468 : if (!isTable(col->t) || !col->t->s)
2850 : return 0;
2851 :
2852 7468 : if (col && ATOMIC_PTR_GET(&col->data)) {
2853 7468 : BAT *b = bind_col(tr, col, QUICK);
2854 :
2855 7468 : if (b)
2856 7468 : distinct = b->tkey;
2857 : }
2858 : return distinct;
2859 : }
2860 :
2861 : static int
2862 1849 : double_elim_col(sql_trans *tr, sql_column *col)
2863 : {
2864 1849 : int de = 0;
2865 1849 : sql_delta *d;
2866 :
2867 1849 : assert(tr->active);
2868 1849 : if (!isTable(col->t) || !col->t->s)
2869 : return 0;
2870 :
2871 1849 : if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
2872 6 : if (d->cs.st == ST_DICT) {
2873 6 : BAT *b = bind_col(tr, col, QUICK);
2874 6 : if (b && b->ttype == TYPE_bte)
2875 : de = 1;
2876 0 : else if (b && b->ttype == TYPE_sht)
2877 1849 : de = 2;
2878 : }
2879 1843 : } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
2880 1843 : BAT *b = bind_col(tr, col, QUICK);
2881 :
2882 1843 : if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
2883 1843 : de = GDK_ELIMDOUBLES(b->tvheap);
2884 1843 : if (de)
2885 1621 : de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
2886 : }
2887 1621 : assert(de >= 0 && de <= 16);
2888 : }
2889 : return de;
2890 : }
2891 :
2892 : static int
2893 3290926 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
2894 : {
2895 3290926 : int ok = 0;
2896 3290926 : BAT *b = NULL, *off = NULL, *upv = NULL;
2897 3290926 : sql_delta *d = NULL;
2898 :
2899 3290926 : (void) tr;
2900 3290926 : assert(tr->active);
2901 3290926 : *nonil = false;
2902 3290926 : *unique = false;
2903 3290926 : *unique_est = 0.0;
2904 3290926 : if (!c || !isTable(c->t) || !c->t->s)
2905 : return ok;
2906 :
2907 3290688 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2908 3252263 : if (d->cs.st == ST_FOR) {
2909 27 : *nonil = true; /* TODO for min/max. I will do it later */
2910 27 : return ok;
2911 : }
2912 3252236 : int eclass = c->type.type->eclass;
2913 3252236 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2914 3252236 : if ((b = bind_col(tr, c, access))) {
2915 3252660 : if (!(b = bind_no_view(b, false)))
2916 0 : return ok;
2917 3253111 : BATiter bi = bat_iterator(b);
2918 3252838 : *nonil = bi.nonil && !bi.nil;
2919 :
2920 3252838 : if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
2921 3024495 : d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
2922 2053842 : if (c->min && VALinit(min, bi.type, c->min))
2923 : ok |= 1;
2924 2053788 : else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
2925 2045837 : ok |= 1;
2926 2053815 : if (c->max && VALinit(max, bi.type, c->max))
2927 54 : ok |= 2;
2928 2053772 : else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
2929 2038367 : ok |= 2;
2930 : }
2931 3252772 : if (d->cs.ucnt == 0) {
2932 3250208 : if (d->cs.st == ST_DEFAULT) {
2933 3249508 : *unique = bi.key;
2934 3249508 : *unique_est = bi.unique_est;
2935 3249508 : if (*unique_est == 0)
2936 1172283 : *unique_est = (double)BATguess_uniques(b,NULL);
2937 700 : } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
2938 : /* for dict, check the offsets bat for uniqueness */
2939 700 : MT_lock_set(&off->theaplock);
2940 700 : *unique = off->tkey;
2941 700 : *unique_est = off->tunique_est;
2942 700 : MT_lock_unset(&off->theaplock);
2943 : }
2944 : }
2945 3253454 : bat_iterator_end(&bi);
2946 3252667 : bat_destroy(b);
2947 3252808 : if (*nonil && d->cs.ucnt > 0) {
2948 : /* This could use a quick descriptor */
2949 2078 : if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
2950 0 : *nonil = false;
2951 : } else {
2952 2078 : MT_lock_set(&upv->theaplock);
2953 2078 : *nonil &= upv->tnonil && !upv->tnil;
2954 2078 : MT_lock_unset(&upv->theaplock);
2955 2078 : bat_destroy(upv);
2956 : }
2957 : }
2958 : }
2959 : }
2960 : return ok;
2961 : }
2962 :
2963 : static int
2964 233 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
2965 : {
2966 233 : assert(tr->active);
2967 233 : if (!isTable(col->t) || !col->t->s)
2968 : return LOG_OK;
2969 :
2970 228 : if (col && ATOMIC_PTR_GET(&col->data)) {
2971 228 : BAT *b = bind_col(tr, col, QUICK);
2972 :
2973 228 : if (b) { /* add props for ranges [min, max> */
2974 228 : MT_lock_set(&b->theaplock);
2975 228 : if (add_range) {
2976 155 : BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
2977 155 : if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
2978 95 : BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
2979 : else
2980 60 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2981 155 : if (!pt->with_nills || !col->null)
2982 105 : BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
2983 : } else {
2984 73 : BATrmprop_nolock(b, GDK_MIN_BOUND);
2985 73 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2986 73 : BATrmprop_nolock(b, GDK_NOT_NULL);
2987 : }
2988 228 : MT_lock_unset(&b->theaplock);
2989 : }
2990 : }
2991 : return LOG_OK;
2992 : }
2993 :
2994 : static int
2995 3492 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
2996 : {
2997 3492 : assert(tr->active);
2998 3492 : if (!isTable(col->t) || !col->t->s)
2999 : return LOG_OK;
3000 :
3001 3474 : if (col && ATOMIC_PTR_GET(&col->data)) {
3002 3474 : BAT *b = bind_col(tr, col, QUICK);
3003 :
3004 3474 : if (b) { /* add props for ranges [min, max> */
3005 3474 : if (not_null) {
3006 3472 : BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
3007 : } else {
3008 2 : BATrmprop(b, GDK_NOT_NULL);
3009 : }
3010 : }
3011 : }
3012 : return LOG_OK;
3013 : }
3014 :
3015 : static int
3016 22819 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
3017 : {
3018 22819 : sqlstore *store = tr->store;
3019 22819 : int bid = log_find_bat(store->logger, id);
3020 22819 : if (bid <= 0)
3021 : return LOG_ERR;
3022 22819 : cs->bid = temp_dup(bid);
3023 22819 : cs->ucnt = 0;
3024 22819 : cs->uibid = e_bat(TYPE_oid);
3025 22819 : cs->uvbid = e_bat(type);
3026 22819 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
3027 : return LOG_ERR;
3028 : return LOG_OK;
3029 : }
3030 :
3031 : static int
3032 68467 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
3033 : {
3034 68467 : int res = LOG_OK;
3035 68467 : gdk_return ok;
3036 68467 : BAT *b = temp_descriptor(bat->cs.bid);
3037 :
3038 68467 : if (b == NULL)
3039 : return LOG_ERR;
3040 :
3041 68467 : if (!bat->cs.uibid)
3042 68459 : bat->cs.uibid = e_bat(TYPE_oid);
3043 68467 : if (!bat->cs.uvbid)
3044 68459 : bat->cs.uvbid = e_bat(b->ttype);
3045 68467 : if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
3046 0 : res = LOG_ERR;
3047 68467 : if (GDKinmemory(0)) {
3048 178 : bat_destroy(b);
3049 178 : return res;
3050 : }
3051 :
3052 68289 : bat_set_access(b, BAT_READ);
3053 68289 : sqlstore *store = tr->store;
3054 68289 : ok = log_bat_persists(store->logger, b, id);
3055 68289 : bat_destroy(b);
3056 68289 : if(res != LOG_OK)
3057 : return res;
3058 68289 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3059 : }
3060 :
3061 : static int
3062 0 : new_persistent_delta( sql_delta *bat)
3063 : {
3064 0 : bat->cs.ucnt = 0;
3065 0 : return LOG_OK;
3066 : }
3067 :
3068 : static void
3069 128617 : create_delta( sql_delta *d, BAT *b)
3070 : {
3071 128617 : bat_set_access(b, BAT_READ);
3072 128617 : d->cs.bid = temp_create(b);
3073 128617 : d->cs.uibid = d->cs.uvbid = 0;
3074 128617 : d->cs.ucnt = 0;
3075 128617 : }
3076 :
3077 : static bat
3078 7277 : copyBat (bat i, int type, oid seq)
3079 : {
3080 7277 : BAT *b, *tb;
3081 7277 : bat res;
3082 :
3083 7277 : if (!i)
3084 : return i;
3085 7277 : tb = quick_descriptor(i);
3086 7277 : if (tb == NULL)
3087 : return 0;
3088 7277 : b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
3089 7277 : if (b == NULL)
3090 : return 0;
3091 :
3092 7277 : bat_set_access(b, BAT_READ);
3093 :
3094 7277 : res = temp_create(b);
3095 7277 : bat_destroy(b);
3096 7277 : return res;
3097 : }
3098 :
3099 : static int
3100 146106 : create_col(sql_trans *tr, sql_column *c)
3101 : {
3102 146106 : int ok = LOG_OK, new = 0;
3103 146106 : int type = c->type.type->localtype;
3104 146106 : sql_delta *bat = ATOMIC_PTR_GET(&c->data);
3105 :
3106 146106 : if (!bat) {
3107 146106 : new = 1;
3108 146106 : bat = ZNEW(sql_delta);
3109 146106 : if (!bat)
3110 : return LOG_ERR;
3111 146106 : ATOMIC_PTR_SET(&c->data, bat);
3112 146106 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3113 : }
3114 :
3115 146106 : if (new)
3116 146106 : bat->cs.ts = tr->tid;
3117 :
3118 146106 : if (!isNew(c)&& !isTempTable(c->t)){
3119 17467 : bat->cs.ts = tr->ts;
3120 17467 : ok = load_cs(tr, &bat->cs, type, c->base.id);
3121 17467 : if (ok == LOG_OK && c->storage_type) {
3122 4 : if (strcmp(c->storage_type, "DICT") == 0) {
3123 2 : sqlstore *store = tr->store;
3124 2 : int bid = log_find_bat(store->logger, -c->base.id);
3125 2 : if (bid <= 0)
3126 : return LOG_ERR;
3127 2 : bat->cs.ebid = temp_dup(bid);
3128 2 : bat->cs.st = ST_DICT;
3129 2 : } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
3130 2 : bat->cs.st = ST_FOR;
3131 : }
3132 : }
3133 17467 : return ok;
3134 128639 : } else if (bat && bat->cs.bid) {
3135 0 : return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
3136 : } else {
3137 128639 : sql_column *fc = NULL;
3138 128639 : size_t cnt = 0;
3139 :
3140 : /* alter ? */
3141 128639 : if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
3142 80049 : storage *s = tab_timestamp_storage(tr, fc->t);
3143 80049 : if (s == NULL)
3144 : return LOG_ERR;
3145 80049 : cnt = segs_end(s->segs, tr, c->t);
3146 : }
3147 128639 : if (cnt && fc != c) {
3148 22 : sql_delta *d = ATOMIC_PTR_GET(&fc->data);
3149 :
3150 22 : if (d->cs.bid) {
3151 22 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3152 22 : if(bat->cs.bid == BID_NIL)
3153 22 : ok = LOG_ERR;
3154 : }
3155 22 : if (d->cs.uibid) {
3156 10 : bat->cs.uibid = e_bat(TYPE_oid);
3157 10 : if (bat->cs.uibid == BID_NIL)
3158 22 : ok = LOG_ERR;
3159 : }
3160 22 : if (d->cs.uvbid) {
3161 10 : bat->cs.uvbid = e_bat(type);
3162 10 : if(bat->cs.uvbid == BID_NIL)
3163 0 : ok = LOG_ERR;
3164 : }
3165 : } else {
3166 128617 : BAT *b = bat_new(type, c->t->sz, PERSISTENT);
3167 128617 : if (!b) {
3168 : ok = LOG_ERR;
3169 : } else {
3170 128617 : create_delta(ATOMIC_PTR_GET(&c->data), b);
3171 128617 : bat_destroy(b);
3172 : }
3173 :
3174 128617 : if (!new) {
3175 0 : bat->cs.uibid = e_bat(TYPE_oid);
3176 0 : if (bat->cs.uibid == BID_NIL)
3177 0 : ok = LOG_ERR;
3178 0 : bat->cs.uvbid = e_bat(type);
3179 0 : if(bat->cs.uvbid == BID_NIL)
3180 0 : ok = LOG_ERR;
3181 : }
3182 : }
3183 128639 : bat->cs.ucnt = 0;
3184 :
3185 128639 : if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
3186 91 : trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
3187 : }
3188 : return ok;
3189 : }
3190 :
3191 : static int
3192 62115 : log_create_col_(sql_trans *tr, sql_column *c)
3193 : {
3194 62115 : assert(!isTempTable(c->t));
3195 62115 : return log_create_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3196 : }
3197 :
3198 : static int
3199 87 : log_create_col(sql_trans *tr, sql_change *change)
3200 : {
3201 87 : return log_create_col_(tr, (sql_column*)change->obj);
3202 : }
3203 :
3204 : static int
3205 116279 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
3206 : {
3207 116279 : (void) t; // TODO transaction_layer_revamp: remove if unnecessary
3208 116279 : (void)oldest;
3209 116279 : assert(delta->cs.ts == tr->tid);
3210 116279 : delta->cs.ts = commit_ts;
3211 :
3212 116279 : assert(delta->next == NULL);
3213 116279 : if (!delta->cs.merged)
3214 116278 : delta->nr_updates += merge_delta(delta);
3215 116279 : if (!tr->parent)
3216 116275 : base->new = 0;
3217 116279 : return LOG_OK;
3218 : }
3219 :
3220 : static int
3221 91 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3222 : {
3223 91 : sql_column *c = (sql_column*)change->obj;
3224 91 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3225 91 : if (!tr->parent)
3226 90 : c->base.new = 0;
3227 91 : return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
3228 : }
3229 :
3230 : /* will be called for new idx's and when new index columns are created */
3231 : static int
3232 8944 : create_idx(sql_trans *tr, sql_idx *ni)
3233 : {
3234 8944 : int ok = LOG_OK, new = 0;
3235 8944 : sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
3236 8944 : int type = TYPE_lng;
3237 :
3238 8944 : if (oid_index(ni->type))
3239 938 : type = TYPE_oid;
3240 :
3241 8944 : if (!bat) {
3242 8944 : new = 1;
3243 8944 : bat = ZNEW(sql_delta);
3244 8944 : if (!bat)
3245 : return LOG_ERR;
3246 8944 : ATOMIC_PTR_INIT(&ni->data, bat);
3247 8944 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3248 : }
3249 :
3250 8944 : if (new)
3251 8944 : bat->cs.ts = tr->tid;
3252 :
3253 8944 : if (!isNew(ni) && !isTempTable(ni->t)){
3254 1689 : bat->cs.ts = 1;
3255 1689 : return load_cs(tr, &bat->cs, type, ni->base.id);
3256 7255 : } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
3257 0 : return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
3258 : } else {
3259 7255 : sql_column *c = ol_first_node(ni->t->columns)->data;
3260 7255 : sql_delta *d = col_timestamp_delta(tr, c);
3261 :
3262 7255 : if (d) {
3263 : /* Here we also handle indices created through alter stmts */
3264 : /* These need to be created aligned to the existing data */
3265 7255 : if (d->cs.bid) {
3266 7255 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3267 7255 : if(bat->cs.bid == BID_NIL)
3268 7255 : ok = LOG_ERR;
3269 : }
3270 : } else {
3271 : return LOG_ERR;
3272 : }
3273 :
3274 7255 : bat->cs.ucnt = 0;
3275 :
3276 7255 : if (!new) {
3277 0 : bat->cs.uibid = e_bat(TYPE_oid);
3278 0 : if (bat->cs.uibid == BID_NIL)
3279 0 : ok = LOG_ERR;
3280 0 : bat->cs.uvbid = e_bat(type);
3281 0 : if(bat->cs.uvbid == BID_NIL)
3282 0 : ok = LOG_ERR;
3283 : }
3284 7255 : bat->cs.ucnt = 0;
3285 7255 : if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
3286 631 : trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
3287 : }
3288 : return ok;
3289 : }
3290 :
3291 : static int
3292 6352 : log_create_idx_(sql_trans *tr, sql_idx *i)
3293 : {
3294 6352 : assert(!isTempTable(i->t));
3295 6352 : return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3296 : }
3297 :
3298 : static int
3299 617 : log_create_idx(sql_trans *tr, sql_change *change)
3300 : {
3301 617 : return log_create_idx_(tr, (sql_idx*)change->obj);
3302 : }
3303 :
3304 : static int
3305 631 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3306 : {
3307 631 : sql_idx *i = (sql_idx*)change->obj;
3308 631 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3309 631 : if (!tr->parent)
3310 631 : i->base.new = 0;
3311 631 : return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
3312 : return LOG_OK;
3313 : }
3314 :
3315 : static int
3316 3663 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
3317 : {
3318 3663 : int ok = load_cs(tr, &s->cs, TYPE_msk, id);
3319 3663 : BAT *b = NULL, *ib = NULL;
3320 :
3321 3663 : if (ok != LOG_OK)
3322 : return ok;
3323 3663 : if (!(b = temp_descriptor(s->cs.bid)))
3324 : return LOG_ERR;
3325 3663 : ib = b;
3326 :
3327 3663 : if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
3328 0 : bat_destroy(ib);
3329 0 : return LOG_ERR;
3330 : }
3331 :
3332 3663 : if (BATcount(b)) {
3333 198 : if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
3334 0 : bat_destroy(ib);
3335 0 : return LOG_ERR;
3336 : }
3337 327 : if (BATtdense(b)) {
3338 129 : size_t start = b->tseqbase;
3339 129 : size_t cnt = BATcount(b);
3340 129 : ok = delete_range(tr, t, s, start, cnt);
3341 : } else {
3342 69 : assert(b->tsorted);
3343 69 : BUN icnt = BATcount(b);
3344 69 : BATiter bi = bat_iterator(b);
3345 69 : size_t lcnt = 1;
3346 69 : oid n;
3347 69 : segment *seg = s->segs->h;
3348 69 : if (complex_cand(b)) {
3349 0 : oid o = * (oid *) Tpos(&bi, 0);
3350 0 : n = o + 1;
3351 0 : for (BUN i = 1; i < icnt; i++) {
3352 0 : o = * (oid *) Tpos(&bi, i);
3353 0 : if (o == n) {
3354 0 : lcnt++;
3355 0 : n++;
3356 : } else {
3357 0 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3358 : break;
3359 : lcnt = 0;
3360 : }
3361 0 : if (!lcnt) {
3362 0 : n = o + 1;
3363 0 : lcnt = 1;
3364 : }
3365 : }
3366 : } else {
3367 69 : oid *o = bi.base;
3368 69 : n = o[0]+1;
3369 133331 : for (size_t i=1; i<icnt; i++) {
3370 133262 : if (o[i] == n) {
3371 132218 : lcnt++;
3372 132218 : n++;
3373 : } else {
3374 1044 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3375 : break;
3376 : lcnt = 0;
3377 : }
3378 133262 : if (!lcnt) {
3379 1044 : n = o[i]+1;
3380 1044 : lcnt = 1;
3381 : }
3382 : }
3383 : }
3384 69 : if (lcnt && ok == LOG_OK)
3385 69 : ok = delete_range(tr, t, s, n-lcnt, lcnt);
3386 69 : bat_iterator_end(&bi);
3387 : }
3388 198 : if (ok == LOG_OK)
3389 2775 : for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
3390 2577 : if (seg->ts == tr->tid)
3391 1342 : seg->ts = 1;
3392 : } else {
3393 3465 : if (ok == LOG_OK) {
3394 3465 : BAT *bb = quick_descriptor(s->cs.bid);
3395 :
3396 3465 : if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
3397 : ok = LOG_ERR;
3398 : } else {
3399 3465 : segment *seg = s->segs->h;
3400 3465 : if (seg->ts == tr->tid)
3401 3465 : seg->ts = 1;
3402 : }
3403 : }
3404 : }
3405 3663 : if (b != ib)
3406 3663 : bat_destroy(b);
3407 3663 : bat_destroy(ib);
3408 :
3409 3663 : return ok;
3410 : }
3411 :
3412 : static int
3413 24325 : create_del(sql_trans *tr, sql_table *t)
3414 : {
3415 24325 : int ok = LOG_OK, new = 0;
3416 24325 : BAT *b;
3417 24325 : storage *bat = ATOMIC_PTR_GET(&t->data);
3418 :
3419 24325 : if (!bat) {
3420 24325 : new = 1;
3421 24325 : bat = ZNEW(storage);
3422 24325 : if(!bat)
3423 : return LOG_ERR;
3424 24325 : ATOMIC_PTR_INIT(&t->data, bat);
3425 24325 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3426 24325 : bat->cs.ts = tr->tid;
3427 : }
3428 :
3429 24325 : if (!isNew(t) && !isTempTable(t)) {
3430 3663 : bat->cs.ts = tr->ts;
3431 3663 : return load_storage(tr, t, bat, t->base.id);
3432 20662 : } else if (bat->cs.bid) {
3433 : return ok;
3434 : } else {
3435 20662 : assert(!bat->segs);
3436 20662 : if (!(bat->segs = new_segments(tr, 0)))
3437 : return LOG_ERR;
3438 :
3439 20662 : b = bat_new(TYPE_msk, t->sz, PERSISTENT);
3440 20662 : if(b != NULL) {
3441 20662 : bat_set_access(b, BAT_READ);
3442 20662 : bat->cs.bid = temp_create(b);
3443 20662 : bat_destroy(b);
3444 : } else {
3445 : return LOG_ERR;
3446 : }
3447 20662 : if (new)
3448 27012 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
3449 : }
3450 : return ok;
3451 : }
3452 :
3453 : static int
3454 185529 : log_segment(sql_trans *tr, segment *s, sqlid id)
3455 : {
3456 185529 : sqlstore *store = tr->store;
3457 185529 : msk m = s->deleted;
3458 185529 : return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
3459 : }
3460 :
3461 : static int
3462 96958 : log_segments(sql_trans *tr, segments *segs, sqlid id)
3463 : {
3464 : /* log segments */
3465 96958 : lock_table(tr->store, id);
3466 474416 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3467 377458 : unlock_table(tr->store, id);
3468 377458 : if (seg->ts == tr->tid && seg->end-seg->start) {
3469 139461 : if (log_segment(tr, seg, id) != LOG_OK) {
3470 : return LOG_ERR;
3471 : }
3472 : }
3473 377458 : lock_table(tr->store, id);
3474 : }
3475 96958 : unlock_table(tr->store, id);
3476 96958 : return LOG_OK;
3477 : }
3478 :
3479 : static int
3480 12548 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
3481 : {
3482 12548 : BAT *b;
3483 12548 : int ok = LOG_OK;
3484 :
3485 12548 : if (GDKinmemory(0))
3486 : return LOG_OK;
3487 :
3488 12515 : b = temp_descriptor(bat->cs.bid);
3489 12515 : if (b == NULL)
3490 : return LOG_ERR;
3491 :
3492 12515 : sqlstore *store = tr->store;
3493 12515 : bat_set_access(b, BAT_READ);
3494 12515 : if (ok == LOG_OK)
3495 12515 : ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
3496 12515 : if (ok == LOG_OK)
3497 12515 : ok = log_segments(tr, bat->segs, t->base.id);
3498 12515 : bat_destroy(b);
3499 12515 : return ok;
3500 : }
3501 :
3502 : static int
3503 12562 : log_create_del(sql_trans *tr, sql_change *change)
3504 : {
3505 12562 : int ok = LOG_OK;
3506 12562 : sql_table *t = (sql_table*)change->obj;
3507 :
3508 12562 : if (t->base.deleted)
3509 : return ok;
3510 12548 : assert(!isTempTable(t));
3511 12548 : ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
3512 12548 : if (ok == LOG_OK) {
3513 74576 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3514 62028 : sql_column *c = n->data;
3515 :
3516 62028 : ok = log_create_col_(tr, c);
3517 : }
3518 12548 : if (t->idxs) {
3519 18295 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3520 5747 : sql_idx *i = n->data;
3521 :
3522 5747 : if (ATOMIC_PTR_GET(&i->data))
3523 5735 : ok = log_create_idx_(tr, i);
3524 : }
3525 : }
3526 : }
3527 : return ok;
3528 : }
3529 :
3530 : static int
3531 20664 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3532 : {
3533 20664 : int ok = LOG_OK;
3534 20664 : sql_table *t = (sql_table*)change->obj;
3535 20664 : storage *dbat = ATOMIC_PTR_GET(&t->data);
3536 :
3537 20664 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
3538 120 : assert(isTempTable(t));
3539 120 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
3540 120 : if (commit_ts) dbat->segs->h->ts = commit_ts;
3541 120 : return ok;
3542 : }
3543 :
3544 20544 : if (!commit_ts) /* rollback handled by ? */
3545 : return ok;
3546 18721 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
3547 18721 : assert(ok == LOG_OK);
3548 18721 : if (ok != LOG_OK)
3549 : return ok;
3550 18721 : merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
3551 18721 : assert(dbat->cs.ts == tr->tid);
3552 18721 : dbat->cs.ts = commit_ts;
3553 18721 : if (ok == LOG_OK) {
3554 128530 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3555 109809 : sql_column *c = n->data;
3556 109809 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3557 :
3558 109809 : ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
3559 : }
3560 18721 : if (t->idxs) {
3561 24481 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3562 5760 : sql_idx *i = n->data;
3563 5760 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3564 :
3565 5760 : if (delta)
3566 5748 : ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
3567 : }
3568 : }
3569 18721 : if (!tr->parent)
3570 18719 : t->base.new = 0;
3571 : }
3572 18721 : if (!tr->parent)
3573 18719 : t->base.new = 0;
3574 : return ok;
3575 : }
3576 :
3577 : static int
3578 19376 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
3579 : {
3580 19376 : gdk_return ok = GDK_SUCCEED;
3581 :
3582 19376 : sqlstore *store = tr->store;
3583 19376 : if (!GDKinmemory(0) && b && b->cs.bid)
3584 18811 : ok = log_bat_transient(store->logger, id);
3585 19376 : if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
3586 25 : ok = log_bat_transient(store->logger, -id);
3587 19376 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3588 : }
3589 :
3590 : static int
3591 159517 : destroy_col(sqlstore *store, sql_column *c)
3592 : {
3593 159517 : (void)store;
3594 159517 : if (ATOMIC_PTR_GET(&c->data))
3595 159517 : destroy_delta(ATOMIC_PTR_GET(&c->data), true);
3596 159517 : ATOMIC_PTR_SET(&c->data, NULL);
3597 159517 : return LOG_OK;
3598 : }
3599 :
3600 : static int
3601 17509 : log_destroy_col_(sql_trans *tr, sql_column *c)
3602 : {
3603 17509 : int ok = LOG_OK;
3604 17509 : assert(!isTempTable(c->t));
3605 17509 : if (!tr->parent) /* don't write save point commits */
3606 17509 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3607 17509 : return ok;
3608 : }
3609 :
3610 : static int
3611 17509 : log_destroy_col(sql_trans *tr, sql_change *change)
3612 : {
3613 17509 : sql_column *c = (sql_column*)change->obj;
3614 17509 : int res = log_destroy_col_(tr, c);
3615 17509 : change->obj = NULL;
3616 17509 : column_destroy(tr->store, c);
3617 17509 : return res;
3618 : }
3619 :
3620 : static int
3621 9939 : destroy_idx(sqlstore *store, sql_idx *i)
3622 : {
3623 9939 : (void)store;
3624 9939 : if (ATOMIC_PTR_GET(&i->data))
3625 9939 : destroy_delta(ATOMIC_PTR_GET(&i->data), true);
3626 9939 : ATOMIC_PTR_SET(&i->data, NULL);
3627 9939 : return LOG_OK;
3628 : }
3629 :
3630 : static int
3631 1943 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
3632 : {
3633 1943 : int ok = LOG_OK;
3634 1943 : assert(!isTempTable(i->t));
3635 1943 : if (ATOMIC_PTR_GET(&i->data)) {
3636 1867 : if (!tr->parent) /* don't write save point commits */
3637 1867 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3638 : }
3639 1943 : return ok;
3640 : }
3641 :
3642 : static int
3643 1943 : log_destroy_idx(sql_trans *tr, sql_change *change)
3644 : {
3645 1943 : sql_idx *i = (sql_idx*)change->obj;
3646 1943 : int res = log_destroy_idx_(tr, i);
3647 1943 : change->obj = NULL;
3648 1943 : idx_destroy(tr->store, i);
3649 1943 : return res;
3650 : }
3651 :
3652 : static int
3653 24994 : destroy_del(sqlstore *store, sql_table *t)
3654 : {
3655 24994 : (void)store;
3656 24994 : if (ATOMIC_PTR_GET(&t->data))
3657 24990 : destroy_storage(ATOMIC_PTR_GET(&t->data));
3658 24994 : ATOMIC_PTR_SET(&t->data, NULL);
3659 24994 : return LOG_OK;
3660 : }
3661 :
3662 : static int
3663 3256 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
3664 : {
3665 3256 : gdk_return ok = GDK_SUCCEED;
3666 :
3667 3256 : sqlstore *store = tr->store;
3668 3256 : if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
3669 3256 : bat && bat->cs.bid)
3670 3256 : ok = log_bat_transient(store->logger, id);
3671 3256 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3672 : }
3673 :
3674 : static int
3675 3256 : log_destroy_del(sql_trans *tr, sql_change *change)
3676 : {
3677 3256 : int ok = LOG_OK;
3678 3256 : sql_table *t = (sql_table*)change->obj;
3679 :
3680 3256 : assert(!isTempTable(t));
3681 3256 : ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
3682 3256 : return ok;
3683 : }
3684 :
3685 : static int
3686 22796 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3687 : {
3688 22796 : (void)tr;
3689 22796 : (void)change;
3690 22796 : (void)commit_ts;
3691 22796 : (void)oldest;
3692 22796 : if (commit_ts)
3693 22773 : change->handled = true;
3694 22796 : return 0;
3695 : }
3696 :
3697 : static int
3698 3327 : drop_del(sql_trans *tr, sql_table *t)
3699 : {
3700 3327 : int ok = LOG_OK;
3701 :
3702 3327 : if (!isNew(t)) {
3703 3327 : storage *bat = ATOMIC_PTR_GET(&t->data);
3704 3392 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, isTempTable(t) ? NULL : &log_destroy_del);
3705 : }
3706 3327 : return ok;
3707 : }
3708 :
3709 : static int
3710 17524 : drop_col(sql_trans *tr, sql_column *c)
3711 : {
3712 17524 : assert(!isNew(c));
3713 17524 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
3714 17524 : trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, isTempTable(c->t) ? NULL : &log_destroy_col);
3715 17524 : return LOG_OK;
3716 : }
3717 :
3718 : static int
3719 1945 : drop_idx(sql_trans *tr, sql_idx *i)
3720 : {
3721 1945 : assert(!isNew(i));
3722 1945 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
3723 1945 : trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, isTempTable(i->t) ? NULL : &log_destroy_idx);
3724 1945 : return LOG_OK;
3725 : }
3726 :
3727 :
3728 : static BUN
3729 129457 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
3730 : {
3731 129457 : BAT *b;
3732 129457 : BUN sz = 0;
3733 :
3734 129457 : (void)tr;
3735 129457 : assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
3736 129457 : if (cs->bid && renew) {
3737 129500 : b = quick_descriptor(cs->bid);
3738 129463 : if (b) {
3739 129463 : sz += BATcount(b);
3740 129463 : if (cs->st == ST_DICT) {
3741 2 : bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
3742 2 : BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
3743 :
3744 2 : if (nebid == BID_NIL || !n) {
3745 0 : temp_destroy(nebid);
3746 0 : bat_destroy(n);
3747 0 : return BUN_NONE;
3748 : }
3749 2 : temp_destroy(cs->ebid);
3750 2 : cs->ebid = nebid;
3751 2 : if (!temp)
3752 2 : bat_set_access(n, BAT_READ);
3753 2 : temp_destroy(cs->bid);
3754 2 : cs->bid = temp_create(n); /* create empty copy */
3755 2 : bat_destroy(n);
3756 : } else {
3757 129461 : bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
3758 :
3759 129380 : if (nbid == BID_NIL)
3760 : return BUN_NONE;
3761 129380 : temp_destroy(cs->bid);
3762 129423 : cs->bid = nbid;
3763 : }
3764 : } else {
3765 : return BUN_NONE;
3766 : }
3767 : }
3768 129382 : if (cs->uibid) {
3769 129239 : temp_destroy(cs->uibid);
3770 129385 : cs->uibid = 0;
3771 : }
3772 129528 : if (cs->uvbid) {
3773 129384 : temp_destroy(cs->uvbid);
3774 129386 : cs->uvbid = 0;
3775 : }
3776 129530 : cs->cleared = true;
3777 129530 : cs->ucnt = 0;
3778 129530 : return sz;
3779 : }
3780 :
3781 : static BUN
3782 129342 : clear_col(sql_trans *tr, sql_column *c, bool renew)
3783 : {
3784 129342 : bool update_conflict = false;
3785 129342 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
3786 :
3787 129342 : if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
3788 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3789 129377 : assert(c->t->persistence != SQL_DECLARED_TABLE);
3790 129377 : if (odelta != delta)
3791 129382 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
3792 129359 : if (delta)
3793 129359 : return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
3794 : return 0;
3795 : }
3796 :
3797 : static BUN
3798 21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
3799 : {
3800 21 : bool update_conflict = false;
3801 21 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
3802 :
3803 21 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
3804 15 : return 0;
3805 6 : if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
3806 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3807 6 : assert(i->t->persistence != SQL_DECLARED_TABLE);
3808 6 : if (odelta != delta)
3809 6 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
3810 6 : if (delta)
3811 6 : return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
3812 : return 0;
3813 : }
3814 :
3815 : static int
3816 142 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
3817 : {
3818 142 : if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
3819 : return LOG_ERR;
3820 142 : if (s->segs)
3821 142 : destroy_segments(s->segs);
3822 142 : if (!(s->segs = new_segments(tr, 0)))
3823 : return LOG_ERR;
3824 : return LOG_OK;
3825 : }
3826 :
3827 :
3828 : /*
3829 : * Clear the table, in general this means replacing the storage,
3830 : * but in case of earlier deletes (or inserts by this transaction), we only mark
3831 : * all segments as deleted.
3832 : * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
3833 : */
3834 : static BUN
3835 41831 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
3836 : {
3837 41831 : int clear = !in_transaction, ok = LOG_OK;
3838 41831 : bool conflict = false;
3839 41831 : storage *bat;
3840 :
3841 41882 : if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
3842 15818 : return conflict?BUN_NONE-1:BUN_NONE;
3843 :
3844 26014 : if (!clear) {
3845 51 : lock_table(tr->store, t->base.id);
3846 51 : ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
3847 51 : unlock_table(tr->store, t->base.id);
3848 : }
3849 26014 : assert(t->persistence != SQL_DECLARED_TABLE);
3850 26014 : if (!in_transaction)
3851 25966 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
3852 26009 : if (ok == LOG_ERR)
3853 : return BUN_NONE;
3854 26009 : if (ok == LOG_CONFLICT)
3855 0 : return BUN_NONE - 1;
3856 : return LOG_OK;
3857 : }
3858 :
3859 : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
3860 : static BUN
3861 41799 : clear_table(sql_trans *tr, sql_table *t)
3862 : {
3863 41799 : node *n = ol_first_node(t->columns);
3864 41799 : sql_column *c = n->data;
3865 41799 : storage *d = tab_timestamp_storage(tr, t);
3866 41831 : int in_transaction, clear;
3867 41831 : BUN sz, clear_ok;
3868 :
3869 41831 : if (!d)
3870 : return BUN_NONE;
3871 41831 : in_transaction = segments_in_transaction(tr, t);
3872 41827 : clear = !in_transaction;
3873 41827 : sz = count_col(tr, c, CNT_ACTIVE);
3874 41832 : if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
3875 : return clear_ok;
3876 :
3877 26011 : if (in_transaction)
3878 : return sz;
3879 :
3880 155339 : for (; n; n = n->next) {
3881 129376 : c = n->data;
3882 :
3883 129376 : if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
3884 0 : return clear_ok;
3885 : }
3886 25963 : if (t->idxs) {
3887 25984 : for (n = ol_first_node(t->idxs); n; n = n->next) {
3888 21 : sql_idx *ci = n->data;
3889 :
3890 21 : if (isTable(ci->t) && idx_has_column(ci->type) &&
3891 21 : (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
3892 0 : return clear_ok;
3893 : }
3894 : }
3895 25963 : if (clear)
3896 25963 : d->segs->nr_reused = 0;
3897 25963 : return sz;
3898 : }
3899 :
3900 : static int
3901 158697 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
3902 : {
3903 158697 : sqlstore *store = tr->store;
3904 158697 : gdk_return ok = GDK_SUCCEED;
3905 :
3906 158697 : (void) t;
3907 158697 : (void) segs;
3908 158697 : if (GDKinmemory(0))
3909 : return LOG_OK;
3910 :
3911 158690 : if (cs->cleared) {
3912 155400 : assert(cs->ucnt == 0);
3913 155400 : BAT *ins = temp_descriptor(cs->bid);
3914 155400 : if (!ins)
3915 : return LOG_ERR;
3916 155400 : assert(!isEbat(ins));
3917 155400 : bat_set_access(ins, BAT_READ);
3918 155400 : ok = log_bat_persists(store->logger, ins, id);
3919 155400 : bat_destroy(ins);
3920 155400 : if (ok == GDK_SUCCEED && cs->ebid) {
3921 56 : BAT *ins = temp_descriptor(cs->ebid);
3922 56 : if (!ins)
3923 : return LOG_ERR;
3924 56 : assert(!isEbat(ins));
3925 56 : bat_set_access(ins, BAT_READ);
3926 56 : ok = log_bat_persists(store->logger, ins, -id);
3927 56 : bat_destroy(ins);
3928 : }
3929 155400 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3930 : }
3931 :
3932 3290 : assert(!isTempTable(t));
3933 :
3934 3290 : if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
3935 2747 : BAT *ui = temp_descriptor(cs->uibid);
3936 2747 : BAT *uv = temp_descriptor(cs->uvbid);
3937 : /* any updates */
3938 2747 : if (ui == NULL || uv == NULL) {
3939 : ok = GDK_FAIL;
3940 2747 : } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
3941 2747 : ok = log_delta(store->logger, ui, uv, id);
3942 2747 : bat_destroy(ui);
3943 2747 : bat_destroy(uv);
3944 : }
3945 2747 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3946 : }
3947 :
3948 : static inline int
3949 58486 : tr_log_table_start(sql_trans *tr, sql_table *t) {
3950 58486 : sqlstore *store = tr->store;
3951 58486 : return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3952 : }
3953 :
3954 : static inline int
3955 58486 : tr_log_table_end(sql_trans *tr, sql_table *t) {
3956 58486 : sqlstore *store = tr->store;
3957 58486 : return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3958 : }
3959 :
3960 : static int
3961 58486 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
3962 : {
3963 58486 : sqlstore *store = tr->store;
3964 58486 : gdk_return ok = GDK_SUCCEED;
3965 :
3966 58486 : size_t end = segs_end(segs, tr, t);
3967 :
3968 58486 : if (tr_log_table_start(tr, t) != LOG_OK)
3969 : return LOG_ERR;
3970 :
3971 58486 : size_t nr_appends = 0;
3972 :
3973 58486 : lock_table(tr->store, t->base.id);
3974 397664 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3975 339178 : unlock_table(tr->store, t->base.id);
3976 :
3977 339178 : if (seg->ts == tr->tid && seg->end-seg->start) {
3978 108835 : if (!seg->deleted) {
3979 46068 : if (log_segment(tr, seg, t->base.id) != LOG_OK)
3980 : return LOG_ERR;
3981 :
3982 46068 : nr_appends += (seg->end - seg->start);
3983 : }
3984 : }
3985 339178 : lock_table(tr->store, t->base.id);
3986 : }
3987 58486 : unlock_table(tr->store, t->base.id);
3988 :
3989 406107 : for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
3990 347621 : sql_column *c = n->data;
3991 347621 : column_storage *cs = ATOMIC_PTR_GET(&c->data);
3992 :
3993 347621 : if (cs->cleared) {
3994 3 : ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
3995 3 : continue;
3996 : }
3997 :
3998 347618 : lock_table(tr->store, t->base.id);
3999 347618 : if (!cs->cleared) {
4000 2326848 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4001 1979230 : unlock_table(tr->store, t->base.id);
4002 1979230 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4003 : /* append col*/
4004 259800 : BAT *ins = temp_descriptor(cs->bid);
4005 259800 : if (ins == NULL)
4006 : return LOG_ERR;
4007 259800 : assert(BATcount(ins) >= cur->end);
4008 259800 : ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
4009 259800 : bat_destroy(ins);
4010 : }
4011 1979230 : lock_table(tr->store, t->base.id);
4012 : }
4013 : }
4014 347618 : unlock_table(tr->store, t->base.id);
4015 :
4016 347618 : if (ok == GDK_SUCCEED && cs->ebid) {
4017 19 : BAT *ins = temp_descriptor(cs->ebid);
4018 19 : if (ins == NULL)
4019 : return LOG_ERR;
4020 19 : if (BATcount(ins) > ins->batInserted)
4021 17 : ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
4022 19 : BATcommit(ins, BATcount(ins));
4023 19 : bat_destroy(ins);
4024 : }
4025 : }
4026 :
4027 58486 : if (t->idxs) {
4028 63668 : for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
4029 5182 : sql_idx *i = n->data;
4030 :
4031 5182 : if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
4032 4404 : continue;
4033 778 : column_storage *cs = ATOMIC_PTR_GET(&i->data);
4034 :
4035 778 : if (cs) {
4036 778 : if (cs->cleared) {
4037 0 : ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4038 0 : continue;
4039 : }
4040 :
4041 778 : lock_table(tr->store, t->base.id);
4042 2387 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4043 1609 : unlock_table(tr->store, t->base.id);
4044 1609 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4045 : /* append idx */
4046 730 : BAT *ins = temp_descriptor(cs->bid);
4047 730 : if (ins == NULL)
4048 : return LOG_ERR;
4049 730 : assert(BATcount(ins) >= cur->end);
4050 730 : ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
4051 730 : bat_destroy(ins);
4052 : }
4053 1609 : lock_table(tr->store, t->base.id);
4054 : }
4055 778 : unlock_table(tr->store, t->base.id);
4056 : }
4057 : }
4058 : }
4059 :
4060 58486 : if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
4061 0 : return LOG_ERR;
4062 :
4063 : return LOG_OK;
4064 : }
4065 :
4066 : static int
4067 84443 : log_storage(sql_trans *tr, sql_table *t, storage *s)
4068 : {
4069 84443 : int ok = LOG_OK;
4070 84443 : bool cleared = s->cs.cleared;
4071 84443 : if (ok == LOG_OK && cleared)
4072 25957 : ok = tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
4073 25957 : if (ok == LOG_OK)
4074 84443 : ok = log_segments(tr, s->segs, t->base.id);
4075 84443 : if (ok == LOG_OK && !cleared)
4076 58486 : ok = log_table_append(tr, t, s->segs);
4077 84443 : return ok;
4078 : }
4079 :
4080 : static void
4081 418846 : merge_cs( column_storage *cs, const char* caller)
4082 : {
4083 418846 : if (cs->bid && cs->ucnt) {
4084 2756 : BAT *cur = temp_descriptor(cs->bid);
4085 2756 : BAT *ui = temp_descriptor(cs->uibid);
4086 2756 : BAT *uv = temp_descriptor(cs->uvbid);
4087 :
4088 2756 : if (!cur || !ui || !uv) {
4089 0 : bat_destroy(ui);
4090 0 : bat_destroy(uv);
4091 0 : bat_destroy(cur);
4092 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4093 : return;
4094 : }
4095 2756 : assert(BATcount(ui) == BATcount(uv));
4096 :
4097 : /* any updates */
4098 2756 : assert(!isEbat(cur));
4099 2756 : if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
4100 0 : bat_destroy(ui);
4101 0 : bat_destroy(uv);
4102 0 : bat_destroy(cur);
4103 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4104 : return;
4105 : }
4106 : /* cleanup the old deltas */
4107 2756 : temp_destroy(cs->uibid);
4108 2756 : temp_destroy(cs->uvbid);
4109 2756 : cs->uibid = e_bat(TYPE_oid);
4110 2756 : cs->uvbid = e_bat(cur->ttype);
4111 2756 : assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
4112 2756 : cs->ucnt = 0;
4113 2756 : bat_destroy(ui);
4114 2756 : bat_destroy(uv);
4115 2756 : bat_destroy(cur);
4116 : }
4117 418846 : cs->cleared = false;
4118 418846 : cs->merged = true;
4119 418846 : return;
4120 : }
4121 :
4122 : static lng
4123 377091 : merge_delta( sql_delta *obat)
4124 : {
4125 377091 : lng res = 0;
4126 377091 : if (obat && obat->next && !obat->cs.merged)
4127 132747 : res += merge_delta(obat->next);
4128 377091 : res += obat->cs.ucnt;
4129 377091 : merge_cs(&obat->cs, __func__);
4130 377091 : return res;
4131 : }
4132 :
4133 : static void
4134 41755 : merge_storage(storage *tdb)
4135 : {
4136 41755 : merge_cs(&tdb->cs, __func__);
4137 :
4138 41755 : if (tdb->next) {
4139 278 : destroy_storage(tdb->next);
4140 278 : tdb->next = NULL;
4141 : }
4142 41755 : }
4143 :
4144 : static sql_delta *
4145 1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
4146 : {
4147 : /* commit ie copy back to the parent transaction */
4148 1 : if (delta && delta->cs.ts == commit_ts && delta->next) {
4149 1 : sql_delta *od = delta->next;
4150 1 : if (od->cs.ts == commit_ts) {
4151 0 : sql_delta t = *od, *n = od->next;
4152 0 : *od = *delta;
4153 0 : od->next = n;
4154 0 : *delta = t;
4155 0 : delta->next = NULL;
4156 0 : destroy_delta(delta, true);
4157 0 : return od;
4158 : }
4159 : }
4160 : return delta;
4161 : }
4162 :
4163 : static int
4164 132711 : log_update_col( sql_trans *tr, sql_change *change)
4165 : {
4166 132711 : sql_column *c = (sql_column*)change->obj;
4167 132711 : assert(!isTempTable(c->t));
4168 :
4169 132711 : if (isDeleted(c->t)) {
4170 0 : change->handled = true;
4171 0 : return LOG_OK;
4172 : }
4173 :
4174 132711 : if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
4175 132711 : storage *s = ATOMIC_PTR_GET(&c->t->data);
4176 132711 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
4177 132711 : return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
4178 : }
4179 : return LOG_OK;
4180 : }
4181 :
4182 : static int
4183 217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
4184 : {
4185 217 : sqlstore *store = Store;
4186 :
4187 217 : sql_delta *d = (sql_delta*)change->data;
4188 217 : if (d->cs.ts < oldest) {
4189 82 : destroy_delta(d, false);
4190 82 : if (change->commit == &commit_update_idx)
4191 2 : table_destroy(store, ((sql_idx*)change->obj)->t);
4192 : else
4193 80 : table_destroy(store, ((sql_column*)change->obj)->t);
4194 82 : return 1;
4195 : }
4196 135 : if (d->cs.ts > TRANSACTION_ID_BASE)
4197 82 : d->cs.ts = store_get_timestamp(store) + 1;
4198 : return 0;
4199 : }
4200 :
4201 : static int
4202 9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
4203 : {
4204 9 : sqlstore *store = Store;
4205 :
4206 9 : storage *d = (storage*)change->data;
4207 9 : if (d->cs.ts < oldest) {
4208 3 : destroy_storage(d);
4209 3 : table_destroy(store, (sql_table*)change->obj);
4210 3 : return 1;
4211 : }
4212 6 : if (d->cs.ts > TRANSACTION_ID_BASE)
4213 3 : d->cs.ts = store_get_timestamp(store) + 1;
4214 : return 0;
4215 : }
4216 :
4217 : static int
4218 132829 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
4219 : {
4220 132829 : (void) type; // TODO transaction_layer_revamp remove if remains unused
4221 :
4222 132829 : sql_delta *delta = ATOMIC_PTR_GET(data), *idelta = delta;
4223 :
4224 132829 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4225 3 : int ok = LOG_OK;
4226 3 : assert(isTempTable(t));
4227 3 : if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
4228 0 : ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
4229 3 : if (!tr->parent)
4230 0 : t->base.new = base->new = 0;
4231 3 : change->handled = true;
4232 3 : return ok;
4233 : }
4234 :
4235 132826 : if (commit_ts)
4236 132744 : delta->cs.ts = commit_ts;
4237 132826 : if (!commit_ts) { /* rollback */
4238 82 : sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
4239 :
4240 82 : if (change->ts && t->base.new) /* handled by create col */
4241 : return LOG_OK;
4242 82 : if (o != d) {
4243 0 : while(o && o->next != d)
4244 : o = o->next;
4245 : }
4246 82 : if (o == ATOMIC_PTR_GET(data))
4247 82 : ATOMIC_PTR_SET(data, d->next);
4248 : else
4249 0 : o->next = d->next;
4250 82 : d->next = NULL;
4251 82 : change->cleanup = &tc_gc_rollbacked;
4252 132744 : } else if (!tr->parent) {
4253 : /* merge deltas */
4254 274662 : while (delta && delta->cs.ts > oldest)
4255 141919 : delta = delta->next;
4256 132743 : if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
4257 27678 : lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
4258 27678 : idelta->nr_updates += merge_delta(delta);
4259 27678 : unlock_column(tr->store, base->id);
4260 : }
4261 1 : } else if (tr->parent) /* move delta into older and cleanup current save points */
4262 1 : ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
4263 : return LOG_OK;
4264 : }
4265 :
4266 : static int
4267 132801 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4268 : {
4269 :
4270 132801 : sql_column *c = (sql_column*)change->obj;
4271 132801 : sql_base* base = &c->base;
4272 132801 : sql_table* t = c->t;
4273 132801 : ATOMIC_PTR_TYPE* data = &c->data;
4274 132801 : int type = c->type.type->localtype;
4275 :
4276 132801 : if (change->handled || isDeleted(c->t))
4277 : return LOG_OK;
4278 :
4279 132801 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4280 : }
4281 :
4282 : static int
4283 26 : log_update_idx( sql_trans *tr, sql_change *change)
4284 : {
4285 26 : sql_idx *i = (sql_idx*)change->obj;
4286 26 : assert(!isTempTable(i->t));
4287 :
4288 26 : if (isDeleted(i->t)) {
4289 0 : change->handled = true;
4290 0 : return LOG_OK;
4291 : }
4292 :
4293 26 : if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
4294 26 : storage *s = ATOMIC_PTR_GET(&i->t->data);
4295 26 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
4296 26 : return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
4297 : }
4298 : return LOG_OK;
4299 : }
4300 :
4301 : static int
4302 28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4303 : {
4304 28 : sql_idx *i = (sql_idx*)change->obj;
4305 28 : sql_base* base = &i->base;
4306 28 : sql_table* t = i->t;
4307 28 : ATOMIC_PTR_TYPE* data = &i->data;
4308 28 : int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
4309 :
4310 28 : if (change->handled || isDeleted(i->t))
4311 : return LOG_OK;
4312 :
4313 28 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4314 : }
4315 :
4316 : static storage *
4317 26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
4318 : {
4319 26 : if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
4320 0 : storage *od = dbat->next;
4321 0 : if (od->cs.ts == commit_ts) {
4322 0 : storage t = *od, *n = od->next;
4323 0 : *od = *dbat;
4324 0 : od->next = n;
4325 0 : *dbat = t;
4326 0 : dbat->next = NULL;
4327 0 : destroy_storage(dbat);
4328 0 : return od;
4329 : }
4330 : }
4331 : return dbat;
4332 : }
4333 :
4334 : static int
4335 84443 : log_update_del( sql_trans *tr, sql_change *change)
4336 : {
4337 84443 : sql_table *t = (sql_table*)change->obj;
4338 84443 : assert(!isTempTable(t));
4339 :
4340 84443 : if (isDeleted(t)) {
4341 0 : change->handled = true;
4342 0 : return LOG_OK;
4343 : }
4344 :
4345 84443 : if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
4346 84443 : return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
4347 : return LOG_OK;
4348 : }
4349 :
4350 : static int
4351 89077 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4352 : {
4353 89077 : int ok = LOG_OK;
4354 89077 : sql_table *t = (sql_table*)change->obj;
4355 89077 : storage *dbat = ATOMIC_PTR_GET(&t->data);
4356 :
4357 89077 : if (change->handled || isDeleted(t))
4358 : return ok;
4359 :
4360 89077 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4361 22 : assert(isTempTable(t));
4362 22 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
4363 22 : if (commit_ts) dbat->segs->h->ts = commit_ts;
4364 22 : change->handled = true;
4365 22 : return ok;
4366 : }
4367 :
4368 89055 : lock_table(tr->store, t->base.id);
4369 89055 : if (!commit_ts) { /* rollback */
4370 4312 : if (dbat->cs.ts == tr->tid) {
4371 6 : if (change->ts && t->base.new) { /* handled by the create table */
4372 3 : unlock_table(tr->store, t->base.id);
4373 3 : return ok;
4374 : }
4375 3 : storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
4376 :
4377 3 : if (o != d) {
4378 0 : while(o && o->next != d)
4379 : o = o->next;
4380 : }
4381 3 : if (o == ATOMIC_PTR_GET(&t->data)) {
4382 3 : assert(d->next);
4383 3 : ATOMIC_PTR_SET(&t->data, d->next);
4384 : } else
4385 0 : o->next = d->next;
4386 3 : d->next = NULL;
4387 3 : change->cleanup = &tc_gc_rollbacked_storage;
4388 : } else
4389 4306 : rollback_segments(dbat->segs, tr, change, oldest);
4390 84743 : } else if (ok == LOG_OK && !tr->parent) {
4391 84717 : if (dbat->cs.ts == tr->tid) /* cleared table */
4392 25959 : dbat->cs.ts = commit_ts;
4393 :
4394 84717 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
4395 84717 : if (ok == LOG_OK) {
4396 84717 : merge_segments(dbat, tr, change, commit_ts, oldest);
4397 84717 : if (oldest == commit_ts)
4398 41755 : merge_storage(dbat);
4399 : }
4400 84717 : if (dbat)
4401 84717 : dbat->cs.cleared = false;
4402 26 : } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
4403 26 : merge_segments(dbat, tr, change, commit_ts, oldest);
4404 26 : ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
4405 26 : storage *s = change->data;
4406 26 : if (s->cs.ts == tr->tid)
4407 0 : s->cs.ts = commit_ts;
4408 : }
4409 89052 : unlock_table(tr->store, t->base.id);
4410 89052 : return ok;
4411 : }
4412 :
4413 : /* only rollback (content version) case for now */
4414 : static int
4415 17681 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
4416 : {
4417 17681 : sqlstore *store = Store;
4418 17681 : sql_column *c = (sql_column*)change->obj;
4419 :
4420 17681 : if (!c) /* cleaned earlier */
4421 : return 1;
4422 :
4423 172 : if (change->handled || isDeleted(c->t)) {
4424 0 : column_destroy(store, c);
4425 0 : return 1;
4426 : }
4427 :
4428 : /* savepoint commit (did it merge ?) */
4429 172 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4430 0 : column_destroy(store, c);
4431 0 : return 1;
4432 : }
4433 172 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4434 : return 0;
4435 171 : sql_delta *d = (sql_delta*)change->data, *id = d;
4436 171 : if (d && d->next) {
4437 :
4438 66 : if (d->cs.ts > oldest)
4439 : return LOG_OK; /* cannot cleanup yet */
4440 :
4441 : // d is oldest reachable delta
4442 63 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4443 62 : destroy_delta(d->next, true);
4444 62 : d->next = NULL;
4445 : }
4446 63 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4447 63 : id->nr_updates += merge_delta(d);
4448 63 : unlock_column(store, c->base.id);
4449 : }
4450 168 : column_destroy(store, c);
4451 168 : return 1;
4452 : }
4453 :
4454 : static int
4455 790309 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
4456 : {
4457 790309 : sqlstore *store = Store;
4458 790309 : sql_column *c = (sql_column*)change->obj;
4459 :
4460 790309 : if (!c) /* cleaned earlier */
4461 : return 1;
4462 :
4463 790309 : if (change->handled || isDeleted(c->t)) {
4464 3 : table_destroy(store, c->t);
4465 3 : return 1;
4466 : }
4467 :
4468 : /* savepoint commit (did it merge ?) */
4469 790306 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4470 32355 : table_destroy(store, c->t);
4471 32355 : return 1;
4472 : }
4473 757951 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4474 : return 0;
4475 757950 : sql_delta *d = (sql_delta*)change->data, *id = d;
4476 757950 : if (d && d->next) {
4477 :
4478 757950 : if (d->cs.ts > oldest)
4479 : return LOG_OK; /* cannot cleanup yet */
4480 :
4481 : // d is oldest reachable delta
4482 100299 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4483 4242 : destroy_delta(d->next, true);
4484 4242 : d->next = NULL;
4485 : }
4486 100299 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4487 100299 : id->nr_updates += merge_delta(d);
4488 100299 : unlock_column(store, c->base.id);
4489 : }
4490 100299 : table_destroy(store, c->t);
4491 100299 : return 1;
4492 : }
4493 :
4494 : static int
4495 2576 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
4496 : {
4497 2576 : sqlstore *store = Store;
4498 2576 : sql_idx *i = (sql_idx*)change->obj;
4499 :
4500 2576 : if (!i) /* cleaned earlier */
4501 : return 1;
4502 :
4503 633 : if (change->handled || isDeleted(i->t)) {
4504 0 : idx_destroy(store, i);
4505 0 : return 1;
4506 : }
4507 :
4508 : /* savepoint commit (did it merge ?) */
4509 633 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4510 0 : idx_destroy(store, i);
4511 0 : return 1;
4512 : }
4513 633 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4514 : return 0;
4515 633 : sql_delta *d = (sql_delta*)change->data, *id = d;
4516 633 : if (d && d->next) {
4517 0 : if (d->cs.ts > oldest)
4518 : return LOG_OK; /* cannot cleanup yet */
4519 :
4520 : // d is oldest reachable delta
4521 0 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4522 0 : destroy_delta(d->next, true);
4523 0 : d->next = NULL;
4524 : }
4525 0 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4526 0 : id->nr_updates += merge_delta(d);
4527 0 : unlock_column(store, i->base.id);
4528 : }
4529 633 : idx_destroy(store, i);
4530 633 : return 1;
4531 : }
4532 :
4533 : static int
4534 26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
4535 : {
4536 26 : sqlstore *store = Store;
4537 26 : sql_idx *i = (sql_idx*)change->obj;
4538 :
4539 26 : if (!i) /* cleaned earlier */
4540 : return 1;
4541 :
4542 26 : if (change->handled || isDeleted(i->t)) {
4543 0 : table_destroy(store, i->t);
4544 0 : return 1;
4545 : }
4546 :
4547 : /* savepoint commit (did it merge ?) */
4548 26 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4549 0 : table_destroy(store, i->t);
4550 0 : return 1;
4551 : }
4552 26 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4553 : return 0;
4554 26 : sql_delta *d = (sql_delta*)change->data, *id = d;
4555 26 : if (d && d->next) {
4556 26 : if (d->cs.ts > oldest)
4557 : return LOG_OK; /* cannot cleanup yet */
4558 :
4559 : // d is oldest reachable delta
4560 26 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4561 26 : destroy_delta(d->next, true);
4562 26 : d->next = NULL;
4563 : }
4564 26 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4565 26 : id->nr_updates += merge_delta(d);
4566 26 : unlock_column(store, i->base.id);
4567 : }
4568 26 : table_destroy(store, i->t);
4569 26 : return 1;
4570 : }
4571 :
4572 : static int
4573 231174 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
4574 : {
4575 231174 : sqlstore *store = Store;
4576 231174 : sql_table *t = (sql_table*)change->obj;
4577 :
4578 231174 : if (change->handled || isDeleted(t)) {
4579 3501 : table_destroy(store, t);
4580 3501 : return 1;
4581 : }
4582 : /* savepoint commit (did it merge ?) */
4583 227673 : if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
4584 6587 : table_destroy(store, t);
4585 6587 : return 1;
4586 : }
4587 221086 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4588 : return 0;
4589 221058 : storage *d = (storage*)change->data;
4590 221058 : if (d->next) {
4591 150028 : if (d->cs.ts > oldest)
4592 : return LOG_OK; /* cannot cleanup yet */
4593 :
4594 19095 : destroy_storage(d->next);
4595 19095 : d->next = NULL;
4596 : }
4597 90125 : table_destroy(store, t);
4598 90125 : return 1;
4599 : }
4600 :
4601 : static int
4602 30467 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
4603 : {
4604 30467 : if (nr == 0)
4605 : return LOG_OK;
4606 30467 : assert (nr > 0);
4607 30467 : if ((!offsets || !*offsets) && nr == total) {
4608 30471 : *offset = slot;
4609 30471 : return LOG_OK;
4610 : }
4611 0 : if (!*offsets) {
4612 7 : *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
4613 7 : if (!*offsets)
4614 : return LOG_ERR;
4615 : }
4616 0 : oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
4617 16135 : for(size_t i = 0; i < nr; i++)
4618 16139 : dst[i] = slot + i;
4619 0 : (*offsets)->batCount += nr;
4620 0 : (*offsets)->theap->dirty = true;
4621 0 : return LOG_OK;
4622 : }
4623 :
4624 : static int
4625 30336 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4626 : {
4627 30336 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4628 30593 : assert(s->segs);
4629 30593 : ulng oldest = store_oldest(tr->store, NULL);
4630 30395 : BUN slot = 0;
4631 30395 : size_t total = cnt;
4632 :
4633 30395 : if (!locked)
4634 30404 : lock_table(tr->store, t->base.id);
4635 : /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
4636 61224 : for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4637 30694 : if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* reuse old deleted or rolled back append */
4638 35 : if ((seg->end - seg->start) >= cnt) {
4639 : /* if previous is claimed before we could simply adjust the end/start */
4640 13 : if (p && p->ts == tr->tid && !p->deleted) {
4641 2 : slot = p->end;
4642 2 : p->end += cnt;
4643 2 : seg->start += cnt;
4644 2 : if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
4645 : ok = LOG_ERR;
4646 : break;
4647 : }
4648 2 : s->segs->nr_reused += cnt;
4649 2 : cnt = 0;
4650 2 : break;
4651 : }
4652 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4653 11 : size_t rcnt = seg->end - seg->start;
4654 11 : if (rcnt > cnt)
4655 : rcnt = cnt;
4656 11 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
4657 : ok = LOG_ERR;
4658 : break;
4659 : }
4660 : }
4661 33 : seg->ts = tr->tid;
4662 33 : seg->deleted = false;
4663 33 : slot = seg->start;
4664 33 : if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
4665 : ok = LOG_ERR;
4666 : break;
4667 : }
4668 10 : s->segs->nr_reused += (seg->end - seg->start);
4669 10 : cnt -= (seg->end - seg->start);
4670 : }
4671 : }
4672 30532 : if (ok == LOG_OK && cnt) {
4673 30519 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4674 29410 : slot = s->segs->t->end;
4675 29410 : s->segs->t->end += cnt;
4676 : } else {
4677 1109 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4678 : ok = LOG_ERR;
4679 : } else {
4680 1150 : if (!s->segs->h)
4681 0 : s->segs->h = s->segs->t;
4682 1150 : slot = s->segs->t->start;
4683 : }
4684 : }
4685 30560 : if (ok == LOG_OK)
4686 30560 : ok = add_offsets(slot, cnt, total, offset, offsets);
4687 : }
4688 30479 : if (!locked)
4689 30426 : unlock_table(tr->store, t->base.id);
4690 :
4691 30644 : if (ok == LOG_OK) {
4692 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4693 30598 : if (!in_transaction) {
4694 1135 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4695 1135 : in_transaction = true;
4696 : }
4697 30598 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4698 30586 : tr->logchanges += (lng) total;
4699 30598 : if (*offsets) {
4700 15 : BAT *pos = *offsets;
4701 15 : assert(BATcount(pos) == total);
4702 15 : BATsetcount(pos, total); /* set other properties */
4703 7 : pos->tnil = false;
4704 7 : pos->tnonil = true;
4705 7 : pos->tkey = true;
4706 7 : pos->tsorted = true;
4707 7 : pos->trevsorted = false;
4708 : }
4709 : }
4710 30636 : return ok;
4711 : }
4712 :
4713 : static int
4714 2007044 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4715 : {
4716 2007044 : if (cnt > 1 && offsets)
4717 30336 : return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
4718 1976708 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4719 1976743 : assert(s->segs);
4720 1976743 : ulng oldest = store_oldest(tr->store, NULL);
4721 1976712 : BUN slot = 0;
4722 1976712 : int reused = 0;
4723 :
4724 1976712 : if (!locked)
4725 1852051 : lock_table(tr->store, t->base.id);
4726 : /* naive vacuum approach, iterator through segments, check for large enough deleted segments
4727 : * or create new segment at the end */
4728 5469595 : for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4729 3539076 : if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* reuse old deleted or rolled back append */
4730 :
4731 46234 : if ((seg->end - seg->start) >= cnt) {
4732 :
4733 : /* if previous is claimed before we could simply adjust the end/start */
4734 46234 : if (p && p->ts == tr->tid && !p->deleted) {
4735 31439 : slot = p->end;
4736 31439 : p->end += cnt;
4737 31439 : seg->start += cnt;
4738 31439 : s->segs->nr_reused += cnt;
4739 31439 : reused = 1;
4740 31439 : break;
4741 : }
4742 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4743 14795 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
4744 : ok = LOG_ERR;
4745 : break;
4746 : }
4747 : }
4748 14795 : seg->ts = tr->tid;
4749 14795 : seg->deleted = false;
4750 14795 : slot = seg->start;
4751 14795 : s->segs->nr_reused += cnt;
4752 14795 : reused = 1;
4753 14795 : break;
4754 : }
4755 : }
4756 1976753 : if (ok == LOG_OK && !reused) {
4757 1930520 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4758 1895460 : slot = s->segs->t->end;
4759 1895460 : s->segs->t->end += cnt;
4760 : } else {
4761 35060 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4762 : ok = LOG_ERR;
4763 : } else {
4764 35060 : if (!s->segs->h)
4765 0 : s->segs->h = s->segs->t;
4766 35060 : slot = s->segs->t->start;
4767 : }
4768 : }
4769 : }
4770 1976753 : if (!locked)
4771 1852092 : unlock_table(tr->store, t->base.id);
4772 :
4773 1976753 : if (ok == LOG_OK) {
4774 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4775 1976754 : if (!in_transaction) {
4776 49093 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4777 49093 : in_transaction = true;
4778 : }
4779 1976754 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4780 1976323 : tr->logchanges += (lng) cnt;
4781 1976754 : *offset = slot;
4782 : }
4783 : return ok;
4784 : }
4785 :
4786 : /*
4787 : * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
4788 : * of the global transaction and mark the newly added storage slots unused on the global
4789 : * level but used on the local transaction level. Besides this the local transaction needs
4790 : * to update (and mark unused) any slot in between the old end and new slots.
4791 : * */
4792 : static int
4793 1882391 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4794 : {
4795 1882391 : storage *s;
4796 :
4797 : /* we have a single segment structure for each persistent table
4798 : * for temporary tables each has its own */
4799 1882391 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4800 : return LOG_ERR;
4801 :
4802 1882505 : return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
4803 : }
4804 :
4805 : /* some tables cannot be updated concurrently (user/roles etc) */
4806 : static int
4807 124666 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4808 : {
4809 124666 : storage *s;
4810 124666 : int res = 0;
4811 :
4812 : /* we have a single segment structure for each persistent table
4813 : * for temporary tables each has its own */
4814 124666 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4815 : /* TODO check for other inserts ! */
4816 : return LOG_ERR;
4817 :
4818 124666 : lock_table(tr->store, t->base.id);
4819 124666 : if ((res = segments_conflict(tr, s->segs, 1))) {
4820 4 : unlock_table(tr->store, t->base.id);
4821 4 : return LOG_CONFLICT;
4822 : }
4823 124662 : res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
4824 124662 : unlock_table(tr->store, t->base.id);
4825 124662 : return res;
4826 : }
4827 :
4828 : static int
4829 20103 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
4830 : {
4831 20103 : storage *s;
4832 20103 : int res = 0;
4833 :
4834 20103 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4835 : return LOG_ERR;
4836 :
4837 20103 : lock_table(tr->store, t->base.id);
4838 20103 : res = segments_conflict(tr, s->segs, uncommitted);
4839 20103 : unlock_table(tr->store, t->base.id);
4840 20103 : return res ? LOG_CONFLICT : LOG_OK;
4841 : }
4842 :
4843 : static size_t
4844 1513024 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
4845 : {
4846 1513024 : size_t cnt = 0;
4847 :
4848 1524368 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
4849 : ;
4850 :
4851 3700702 : for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
4852 2187667 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
4853 84830 : cnt += s->end - s->start;
4854 : }
4855 1513035 : return cnt;
4856 : }
4857 :
4858 : static BAT *
4859 1512982 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
4860 : {
4861 1512982 : lock_table(tr->store, t->base.id);
4862 1513041 : segment *s = S->segs->h;
4863 : /* step one no deletes -> dense range */
4864 1513041 : uint32_t cur = 0;
4865 1513041 : size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
4866 1513046 : if (!dnr) {
4867 1437068 : unlock_table(tr->store, t->base.id);
4868 1437064 : return BATdense(start, start, end-start);
4869 : }
4870 :
4871 75978 : BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
4872 75978 : if (!b) {
4873 0 : unlock_table(tr->store, t->base.id);
4874 0 : return NULL;
4875 : }
4876 :
4877 75978 : uint32_t *restrict dst = Tloc(b, 0);
4878 56749670 : for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
4879 56724826 : if (s->end < start)
4880 143 : continue;
4881 56724683 : if (s->start >= end)
4882 : break;
4883 56673549 : msk m = (SEG_IS_VALID(s, tr));
4884 56673549 : size_t lnr = s->end-s->start;
4885 56673549 : if (s->start < start)
4886 3716 : lnr -= (start - s->start);
4887 56673549 : if (s->end > end)
4888 51 : lnr -= s->end - end;
4889 :
4890 56673549 : if (m) {
4891 475045 : size_t used = pos&31, end = 32;
4892 475045 : if (used) {
4893 386461 : if (lnr < (32-used))
4894 245645 : end = used + lnr;
4895 386461 : assert(end > used);
4896 386461 : cur |= ((1U << (end - used)) - 1) << used;
4897 386461 : lnr -= end - used;
4898 386461 : pos += end - used;
4899 386461 : if (end == 32) {
4900 140816 : *dst++ = cur;
4901 140816 : cur = 0;
4902 : }
4903 : }
4904 475045 : size_t full = lnr/32;
4905 475045 : size_t rest = lnr%32;
4906 475045 : if (full > 0) {
4907 151280 : memset(dst, ~0, full * sizeof(*dst));
4908 151280 : dst += full;
4909 151280 : lnr -= full * 32;
4910 151280 : pos += full * 32;
4911 : }
4912 475045 : if (rest > 0) {
4913 218147 : cur |= (1U << rest) - 1;
4914 218147 : lnr -= rest;
4915 218147 : pos += rest;
4916 : }
4917 475045 : assert(lnr==0);
4918 : } else {
4919 56198504 : size_t used = pos&31, end = 32;
4920 56198504 : if (used) {
4921 54447512 : if (lnr < (32-used))
4922 52647348 : end = used + lnr;
4923 :
4924 54447512 : pos+= (end-used);
4925 54447512 : lnr-= (end-used);
4926 54447512 : if (end == 32) {
4927 1800164 : *dst++ = cur;
4928 1800164 : cur = 0;
4929 : }
4930 : }
4931 56198504 : size_t full = lnr/32;
4932 56198504 : size_t rest = lnr%32;
4933 56198504 : memset(dst, 0, full * sizeof(*dst));
4934 56198504 : dst += full;
4935 56198504 : lnr -= full * 32;
4936 56198504 : pos += full * 32;
4937 56198504 : pos+= rest;
4938 56198504 : lnr-= rest;
4939 56198504 : assert(lnr==0);
4940 : }
4941 : }
4942 :
4943 75978 : unlock_table(tr->store, t->base.id);
4944 75978 : if (pos%32)
4945 74103 : *dst=cur;
4946 75978 : BATsetcount(b, nr);
4947 75978 : bn = BATmaskedcands(start, nr, b, true);
4948 75975 : BBPreclaim(b);
4949 75977 : (void)pos;
4950 75977 : assert (pos == nr);
4951 : return bn;
4952 : }
4953 :
4954 : static void * /* BAT * */
4955 1518285 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
4956 : {
4957 : /* with nr_of_parts - part_nr we can adjust parts */
4958 1518285 : storage *s = tab_timestamp_storage(tr, t);
4959 :
4960 1518464 : if (!s)
4961 : return NULL;
4962 1518464 : size_t nr = segs_end(s->segs, tr, t);
4963 :
4964 1519178 : if (!nr)
4965 6186 : return BATdense(0, 0, 0);
4966 :
4967 : /* compute proper part */
4968 1512992 : size_t part_size = nr/nr_of_parts;
4969 1512992 : size_t start = part_size * part_nr;
4970 1512992 : size_t end = start + part_size;
4971 1512992 : if (part_nr == (nr_of_parts-1))
4972 1249800 : end = nr;
4973 1512992 : assert(end <= nr);
4974 1512992 : return segments2cands(s, tr, t, start, end);
4975 : }
4976 :
4977 : static int
4978 5 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
4979 : {
4980 5 : bool update_conflict = false;
4981 :
4982 5 : if (segments_in_transaction(tr, col->t))
4983 : return LOG_CONFLICT;
4984 :
4985 5 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
4986 :
4987 5 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
4988 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
4989 5 : assert(d && d->cs.ts == tr->tid);
4990 5 : if (odelta != d)
4991 5 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
4992 5 : if (d->cs.bid)
4993 5 : temp_destroy(d->cs.bid);
4994 5 : if (d->cs.uibid)
4995 5 : temp_destroy(d->cs.uibid);
4996 5 : if (d->cs.uvbid)
4997 5 : temp_destroy(d->cs.uvbid);
4998 5 : bat_set_access(bn, BAT_READ);
4999 5 : d->cs.bid = temp_create(bn);
5000 5 : d->cs.uibid = 0;
5001 5 : d->cs.uvbid = 0;
5002 5 : d->cs.ucnt = 0;
5003 5 : d->cs.cleared = true;
5004 5 : d->cs.ts = tr->tid;
5005 5 : ATOMIC_INIT(&d->cs.refcnt, 1);
5006 5 : return LOG_OK;
5007 : }
5008 :
5009 : static int
5010 5 : vacuum_col(sql_trans *tr, sql_column *c, bool force)
5011 : {
5012 5 : if (segments_in_transaction(tr, c->t))
5013 : return LOG_CONFLICT;
5014 :
5015 5 : sql_delta *d = NULL;
5016 :
5017 : /* do we have enough to clean */
5018 5 : if ((d = bind_col_data(tr, c, NULL)) == NULL)
5019 : return LOG_CONFLICT;
5020 :
5021 : /* do we have enough to clean */
5022 5 : if (!force && (d->nr_updates) < 1024)
5023 : return LOG_OK;
5024 :
5025 5 : BAT *b = NULL, *bn = NULL;;
5026 5 : if ((b = bind_col(tr, c, 0)) == NULL)
5027 : return LOG_ERR;
5028 5 : if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
5029 0 : BBPreclaim(b);
5030 0 : return LOG_ERR;
5031 : }
5032 5 : int res = swap_bats(tr, c, bn);
5033 5 : d->nr_updates = 0;
5034 5 : BBPreclaim(b);
5035 5 : BBPreclaim(bn);
5036 5 : return res;
5037 : }
5038 :
5039 : static int
5040 0 : vacuum_tab(sql_trans *tr, sql_table *t, bool force)
5041 : {
5042 0 : if (segments_in_transaction(tr, t))
5043 : return LOG_CONFLICT;
5044 :
5045 0 : storage *s;
5046 0 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
5047 : return LOG_ERR;
5048 :
5049 0 : for( node *n = ol_first_node(t->columns); n; n = n->next) {
5050 0 : sql_column *c = n->data;
5051 :
5052 0 : if (!ATOMvarsized(c->type.type->localtype))
5053 0 : continue;
5054 0 : sql_delta *d = NULL;
5055 :
5056 : /* do we have enough to clean */
5057 0 : if ((d = bind_col_data(tr, c, NULL)) == NULL)
5058 : return LOG_CONFLICT;
5059 :
5060 : /* do we have enough to clean */
5061 0 : if (!force && (d->nr_updates + s->segs->nr_reused) < 1024)
5062 0 : continue;
5063 :
5064 0 : BAT *b = NULL, *bn = NULL;;
5065 0 : if ((b = bind_col(tr, c, 0)) == NULL)
5066 : return LOG_ERR;
5067 0 : if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
5068 0 : BBPreclaim(b);
5069 0 : return LOG_ERR;
5070 : }
5071 0 : int res = swap_bats(tr, c, bn);
5072 0 : d->nr_updates = 0;
5073 0 : BBPreclaim(b);
5074 0 : BBPreclaim(bn);
5075 0 : if (res != LOG_OK)
5076 0 : return res;
5077 : }
5078 0 : s->segs->nr_reused = 0;
5079 0 : return LOG_OK;
5080 : }
5081 :
5082 :
5083 : static int
5084 58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
5085 : {
5086 58 : bool update_conflict = false;
5087 :
5088 58 : if (segments_in_transaction(tr, col->t))
5089 : return LOG_CONFLICT;
5090 :
5091 58 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5092 :
5093 58 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5094 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5095 58 : assert(d && d->cs.ts == tr->tid);
5096 58 : assert(col->t->persistence != SQL_DECLARED_TABLE);
5097 58 : if (odelta != d)
5098 58 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
5099 :
5100 58 : d->cs.st = st;
5101 58 : d->cs.cleared = true;
5102 58 : if (d->cs.bid)
5103 58 : temp_destroy(d->cs.bid);
5104 58 : o = transfer_to_systrans(o);
5105 58 : if (o == NULL)
5106 : return LOG_ERR;
5107 58 : bat_set_access(o, BAT_READ);
5108 58 : d->cs.bid = temp_create(o);
5109 58 : if (u) {
5110 53 : if (d->cs.ebid)
5111 0 : temp_destroy(d->cs.ebid);
5112 53 : u = transfer_to_systrans(u);
5113 53 : if (u == NULL)
5114 : return LOG_ERR;
5115 53 : d->cs.ebid = temp_create(u);
5116 : }
5117 : return LOG_OK;
5118 : }
5119 :
5120 : void
5121 315 : bat_storage_init( store_functions *sf)
5122 : {
5123 315 : sf->bind_col = &bind_col;
5124 315 : sf->bind_updates = &bind_updates;
5125 315 : sf->bind_updates_idx = &bind_updates_idx;
5126 315 : sf->bind_idx = &bind_idx;
5127 315 : sf->bind_cands = &bind_cands;
5128 :
5129 315 : sf->claim_tab = &claim_tab;
5130 315 : sf->key_claim_tab = &key_claim_tab;
5131 315 : sf->tab_validate = &tab_validate;
5132 :
5133 315 : sf->append_col = &append_col;
5134 315 : sf->append_idx = &append_idx;
5135 :
5136 315 : sf->update_col = &update_col;
5137 315 : sf->update_idx = &update_idx;
5138 :
5139 315 : sf->delete_tab = &delete_tab;
5140 :
5141 315 : sf->count_del = &count_del;
5142 315 : sf->count_col = &count_col;
5143 315 : sf->count_idx = &count_idx;
5144 315 : sf->dcount_col = &dcount_col;
5145 315 : sf->min_max_col = &min_max_col;
5146 315 : sf->set_stats_col = &set_stats_col;
5147 315 : sf->sorted_col = &sorted_col;
5148 315 : sf->unique_col = &unique_col;
5149 315 : sf->double_elim_col = &double_elim_col;
5150 315 : sf->col_stats = &col_stats;
5151 315 : sf->col_set_range = &col_set_range;
5152 315 : sf->col_not_null = &col_not_null;
5153 :
5154 315 : sf->col_dup = &col_dup;
5155 315 : sf->idx_dup = &idx_dup;
5156 315 : sf->del_dup = &del_dup;
5157 :
5158 315 : sf->create_col = &create_col; /* create and add to change list */
5159 315 : sf->create_idx = &create_idx;
5160 315 : sf->create_del = &create_del;
5161 :
5162 315 : sf->destroy_col = &destroy_col; /* free resources */
5163 315 : sf->destroy_idx = &destroy_idx;
5164 315 : sf->destroy_del = &destroy_del;
5165 :
5166 315 : sf->drop_col = &drop_col; /* add drop to change list */
5167 315 : sf->drop_idx = &drop_idx;
5168 315 : sf->drop_del = &drop_del;
5169 :
5170 315 : sf->clear_table = &clear_table;
5171 :
5172 315 : sf->vacuum_col = &vacuum_col;
5173 315 : sf->vacuum_tab = &vacuum_tab;
5174 315 : sf->col_compress = &col_compress;
5175 315 : }
|