Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "bat_storage.h"
15 : #include "bat_utils.h"
16 : #include "sql_string.h"
17 : #include "gdk_atoms.h"
18 : #include "gdk_atoms.h"
19 : #include "matomic.h"
20 :
21 : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
22 : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
23 :
24 : static int log_update_col( sql_trans *tr, sql_change *c);
25 : static int log_update_idx( sql_trans *tr, sql_change *c);
26 : static int log_update_del( sql_trans *tr, sql_change *c);
27 : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
28 : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
29 : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
30 : static int log_create_col(sql_trans *tr, sql_change *change);
31 : static int log_create_idx(sql_trans *tr, sql_change *change);
32 : static int log_create_del(sql_trans *tr, sql_change *change);
33 : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
34 : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
35 : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
36 : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
37 : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
38 : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
39 : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
40 : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
41 :
42 : static void merge_delta( sql_delta *obat);
43 :
44 : /* valid
45 : * !deleted && VALID_4_READ(TS, tr) existing or newly created segment
46 : * deleted && TS > tr->ts && OLDTS < tr->ts deleted after current transaction
47 : */
48 :
49 : #define VALID_4_READ(TS,tr) \
50 : (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
51 :
52 : /* when changed, check if the old status is still valid */
53 : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
54 : (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
55 :
56 : #define SEG_VALID_4_DELETE(seg,tr) \
57 : (!seg->deleted && VALID_4_READ(seg->ts, tr))
58 :
59 : /* Delete (in current trans or by some other finished transaction, or re-used segment which used to be deleted */
60 : #define SEG_IS_DELETED(seg,tr) \
61 : ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
62 : (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
63 :
64 : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
65 : #define SEG_IS_VALID(seg, tr) \
66 : ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
67 : (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
68 :
69 : static inline BAT *
70 5063 : transfer_to_systrans(BAT *b)
71 : {
72 : /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
73 5063 : MT_lock_set(&b->theaplock);
74 5063 : if (VIEWtparent(b) || VIEWvtparent(b)) {
75 17 : MT_lock_unset(&b->theaplock);
76 17 : BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
77 17 : BBPreclaim(b);
78 17 : return bn;
79 : }
80 5046 : if (b->theap->farmid == TRANSIENT ||
81 14 : (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
82 4757 : QryCtx *qc = MT_thread_get_qry_ctx();
83 4757 : if (qc) {
84 2463 : if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
85 2463 : ATOMIC_SUB(&qc->datasize, b->theap->size);
86 2463 : b->theap->farmid = SYSTRANS;
87 2463 : b->batRole = SYSTRANS;
88 : }
89 2463 : if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
90 1067 : ATOMIC_SUB(&qc->datasize, b->tvheap->size);
91 1067 : b->tvheap->farmid = SYSTRANS;
92 : }
93 : }
94 : }
95 5046 : MT_lock_unset(&b->theaplock);
96 5046 : return b;
97 : }
98 :
99 : static void
100 24836931 : lock_table(sqlstore *store, sqlid id)
101 : {
102 24836931 : MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
103 24839332 : }
104 :
105 : static void
106 24839193 : unlock_table(sqlstore *store, sqlid id)
107 : {
108 24839193 : MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
109 24840307 : }
110 :
111 : static void
112 19851025 : lock_column(sqlstore *store, sqlid id)
113 : {
114 19851025 : MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
115 19850766 : }
116 :
117 : static void
118 19850818 : unlock_column(sqlstore *store, sqlid id)
119 : {
120 19850818 : MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
121 19850530 : }
122 :
123 : static void
124 113802 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
125 : {
126 113802 : assert(cleanup);
127 113802 : trans_add(tr, dup_base(b), data, cleanup, commit, log);
128 113802 : }
129 :
130 : static void
131 132515 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
132 : {
133 132515 : assert(cleanup);
134 132515 : dup_base(&t->base);
135 132515 : trans_add(tr, b, data, cleanup, commit, log);
136 132507 : }
137 :
138 : static int
139 80689 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
140 : {
141 80689 : segment *s = change->data;
142 :
143 80689 : if (s->ts <= oldest) {
144 33242 : while(s) {
145 21201 : segment *n = s->prev;
146 21201 : ATOMIC_PTR_DESTROY(&s->next);
147 21201 : _DELETE(s);
148 21201 : s = n;
149 : }
150 12041 : sqlstore *store = Store;
151 12041 : table_destroy(store, (sql_table*)change->obj);
152 12041 : return 1;
153 : }
154 : return LOG_OK;
155 : }
156 :
157 : static void
158 21201 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
159 : {
160 : /* we can only be accessed by anything older then commit_ts */
161 21201 : if (c->cleanup == &tc_gc_seg)
162 9160 : s->prev = c->data;
163 : else
164 12041 : c->cleanup = &tc_gc_seg;
165 21201 : c->data = s;
166 21201 : s->ts = commit_ts;
167 16792 : }
168 :
169 : static segment *
170 87344 : new_segment(segment *o, sql_trans *tr, size_t cnt)
171 : {
172 87344 : segment *n = (segment*)GDKmalloc(sizeof(segment));
173 :
174 87344 : assert(tr);
175 87344 : if (n) {
176 87344 : *n = (segment) {
177 87344 : .ts = tr->tid,
178 : .oldts = 0,
179 : .deleted = false,
180 : .start = 0,
181 : .end = cnt,
182 : .next = ATOMIC_PTR_VAR_INIT(NULL),
183 : .prev = NULL,
184 : };
185 87344 : if (o) {
186 36237 : n->start += o->end;
187 36237 : n->end += o->end;
188 36237 : ATOMIC_PTR_SET(&o->next, n);
189 : }
190 : }
191 87344 : return n;
192 : }
193 :
194 : static segment *
195 86018 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
196 : {
197 86018 : assert(tr);
198 86018 : if (o->start == start && o->end == start+cnt) {
199 10059 : assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
200 10059 : o->oldts = o->ts;
201 10059 : o->ts = tr->tid;
202 10059 : o->deleted = deleted;
203 10059 : return o;
204 : }
205 75959 : segment *n = (segment*)GDKmalloc(sizeof(segment));
206 :
207 75959 : if (!n)
208 : return NULL;
209 75959 : n->prev = NULL;
210 :
211 75959 : if (o->ts == tr->tid) {
212 3820 : n->oldts = 0;
213 3820 : n->ts = 1;
214 3820 : n->deleted = true;
215 : } else {
216 72139 : n->oldts = o->ts;
217 72139 : n->ts = tr->tid;
218 72139 : n->deleted = deleted;
219 : }
220 75959 : if (start == o->start) {
221 : /* 2-way split: o remains latter part of segment, new one is
222 : * inserted before */
223 60729 : n->start = o->start;
224 60729 : n->end = n->start + cnt;
225 60729 : ATOMIC_PTR_INIT(&n->next, o);
226 60729 : if (segs->h == o)
227 439 : segs->h = n;
228 60729 : if (p)
229 60290 : ATOMIC_PTR_SET(&p->next, n);
230 60729 : o->start = n->end;
231 15230 : } else if (start+cnt == o->end) {
232 : /* 2-way split: o remains first part of segment, new one is
233 : * added after */
234 5579 : n->start = o->end - cnt;
235 5579 : n->end = o->end;
236 5579 : ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
237 5579 : ATOMIC_PTR_SET(&o->next, n);
238 5579 : if (segs->t == o)
239 822 : segs->t = n;
240 5579 : o->end = n->start;
241 : } else {
242 : /* 3-way split: o remains first part of segment, two new ones
243 : * are added after */
244 9651 : segment *n2 = GDKmalloc(sizeof(segment));
245 9651 : if (n2 == NULL) {
246 0 : GDKfree(n);
247 0 : return NULL;
248 : }
249 9651 : ATOMIC_PTR_INIT(&n->next, n2);
250 9651 : n->start = start;
251 9651 : n->end = start + cnt;
252 9651 : *n2 = *o;
253 9651 : ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
254 9651 : n2->start = n->end;
255 9651 : n2->prev = NULL;
256 9651 : if (segs->t == o)
257 4035 : segs->t = n2;
258 9651 : ATOMIC_PTR_SET(&o->next, n);
259 9651 : o->end = start;
260 : }
261 : return n;
262 : }
263 :
264 : static void
265 4148 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
266 : {
267 4148 : segment *cur = segs->h, *seg = NULL;
268 17949 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
269 13801 : if (cur->ts == tr->tid) { /* revert */
270 4645 : cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
271 4645 : cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
272 4645 : cur->oldts = 0;
273 : }
274 13801 : if (cur->ts <= oldest) { /* possibly merge range */
275 12949 : if (!seg) { /* skip first */
276 : seg = cur;
277 8801 : } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
278 : /* merge with previous */
279 4409 : seg->end = cur->end;
280 4409 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
281 4409 : if (cur == segs->t)
282 2857 : segs->t = seg;
283 4409 : mark4destroy(cur, change, store_get_timestamp(tr->store));
284 4409 : cur = seg;
285 : } else {
286 : seg = cur; /* begin of new merge */
287 : }
288 : }
289 : }
290 4148 : }
291 :
292 : static size_t
293 103547 : segs_end_include_deleted( segments *segs, sql_trans *tr)
294 : {
295 103547 : size_t cnt = 0;
296 103547 : segment *s = segs->h, *l = NULL;
297 :
298 509551 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
299 406004 : if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
300 : l = s;
301 : }
302 103547 : if (l)
303 103540 : cnt = l->end;
304 103547 : return cnt;
305 : }
306 :
307 : static int
308 103547 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
309 : {
310 : /* set bits correctly */
311 103547 : BAT *b = temp_descriptor(cs->bid);
312 :
313 103547 : if (!b)
314 : return LOG_ERR;
315 103547 : segment *s = segs->h;
316 :
317 103547 : size_t nr = segs_end_include_deleted(segs, tr);
318 103547 : size_t rounded_nr = ((nr+31)&~31);
319 103547 : if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
320 0 : bat_destroy(b);
321 0 : return LOG_ERR;
322 : }
323 :
324 : /* disable all properties here */
325 103547 : MT_lock_set(&b->theaplock);
326 103547 : b->tsorted = false;
327 103547 : b->trevsorted = false;
328 103547 : b->tnosorted = 0;
329 103547 : b->tnorevsorted = 0;
330 103547 : b->tseqbase = oid_nil;
331 103547 : b->tkey = false;
332 103547 : b->tnokey[0] = 0;
333 103547 : b->tnokey[1] = 0;
334 103547 : b->theap->dirty = true;
335 103547 : BUN cnt = BATcount(b);
336 :
337 103547 : uint32_t *restrict dst;
338 436650 : for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
339 378477 : if (s->start >= nr)
340 : break;
341 333103 : if (s->ts == tr->tid && s->end != s->start) {
342 143807 : if (cnt < s->start) { /* first mark as deleted ! */
343 3900 : size_t lnr = s->start-cnt;
344 3900 : size_t pos = cnt;
345 3900 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
346 3900 : uint32_t cur = 0;
347 :
348 3900 : size_t used = pos&31, end = 32;
349 3900 : if (used) {
350 3775 : if (lnr < (32-used))
351 3577 : end = used + lnr;
352 3775 : assert(end > used);
353 3775 : cur |= ((1U << (end - used)) - 1) << used;
354 3775 : lnr -= end - used;
355 3775 : *dst++ |= cur;
356 3775 : cur = 0;
357 : }
358 3900 : size_t full = lnr/32;
359 3900 : size_t rest = lnr%32;
360 3900 : if (full > 0) {
361 7 : memset(dst, ~0, full * sizeof(*dst));
362 7 : dst += full;
363 7 : lnr -= full * 32;
364 : }
365 3900 : if (rest > 0) {
366 229 : cur |= (1U << rest) - 1;
367 229 : lnr -= rest;
368 229 : *dst |= cur;
369 : }
370 3900 : assert(lnr==0);
371 : }
372 143807 : size_t lnr = s->end-s->start;
373 143807 : size_t pos = s->start;
374 143807 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
375 143807 : uint32_t cur = 0;
376 143807 : size_t used = pos&31, end = 32;
377 143807 : if (used) {
378 106076 : if (lnr < (32-used))
379 100917 : end = used + lnr;
380 106076 : assert(end > used);
381 106076 : cur |= ((1U << (end - used)) - 1) << used;
382 106076 : lnr -= end - used;
383 106076 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
384 106076 : dst++;
385 106076 : cur = 0;
386 : }
387 143807 : size_t full = lnr/32;
388 143807 : size_t rest = lnr%32;
389 143807 : if (full > 0) {
390 3522 : memset(dst, s->deleted?~0:0, full * sizeof(*dst));
391 3522 : dst += full;
392 3522 : lnr -= full * 32;
393 : }
394 143807 : if (rest > 0) {
395 40139 : cur |= (1U << rest) - 1;
396 40139 : lnr -= rest;
397 40139 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
398 : }
399 143807 : assert(lnr==0);
400 143807 : if (cnt < s->end)
401 333103 : cnt = s->end;
402 : }
403 : }
404 103547 : if (nr > BATcount(b)) {
405 60914 : BATsetcount(b, nr);
406 : }
407 103547 : MT_lock_unset(&b->theaplock);
408 :
409 103547 : bat_destroy(b);
410 103547 : return LOG_OK;
411 : }
412 :
413 : /* TODO return LOG_OK/ERR */
414 : static void
415 103573 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
416 : {
417 103573 : sqlstore* store = tr->store;
418 103573 : segment *cur = s->segs->h, *seg = NULL;
419 509649 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
420 406076 : if (cur->ts == tr->tid) {
421 157965 : if (!cur->deleted)
422 91450 : cur->oldts = 0;
423 157965 : cur->ts = commit_ts;
424 : }
425 406076 : if (!seg) {
426 : /* first segment */
427 : seg = cur;
428 : }
429 302503 : else if (seg->ts < TRANSACTION_ID_BASE) {
430 : /* possible merge since both deleted flags are equal */
431 271869 : if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
432 203549 : int merge = 1;
433 203549 : node *n = store->active->h;
434 665059 : for (int i = 0; i < store->active->cnt; i++, n = n->next) {
435 568543 : sql_trans* other = ((sql_trans*)n->data);
436 568543 : ulng active = other->ts;
437 568543 : if(other->active == 2)
438 22893 : continue; /* pretend that another recently committed transaction is no longer active */
439 545650 : if (active == tr->ts)
440 134513 : continue; /* pretend that committing transaction has already committed and is no longer active */
441 411137 : if (seg->ts < active && cur->ts < active)
442 : break;
443 398133 : if (seg->ts > active && cur->ts > active)
444 304104 : continue;
445 :
446 94029 : assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
447 : /* cannot safely merge since there is an active transaction between the segments */
448 : merge = false;
449 : break;
450 : }
451 : /* merge segments */
452 219040 : if (merge) {
453 109520 : seg->end = cur->end;
454 109520 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
455 109520 : if (cur == s->segs->t)
456 26762 : s->segs->t = seg;
457 109520 : if (commit_ts == oldest) {
458 92728 : ATOMIC_PTR_DESTROY(&cur->next);
459 92728 : _DELETE(cur);
460 : } else
461 33584 : mark4destroy(cur, change, commit_ts);
462 109520 : cur = seg;
463 109520 : continue;
464 : }
465 : }
466 : }
467 : seg = cur;
468 : }
469 103573 : }
470 :
471 : static int
472 2151368 : segments_in_transaction(sql_trans *tr, sql_table *t)
473 : {
474 2151368 : storage *s = ATOMIC_PTR_GET(&t->data);
475 2151368 : segment *seg = s->segs->h;
476 :
477 2151368 : if (seg && s->segs->t->ts == tr->tid)
478 : return 1;
479 606417 : for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
480 501610 : if (seg->ts == tr->tid)
481 : return 1;
482 : }
483 : return 0;
484 : }
485 :
486 : static size_t
487 17070664 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
488 : {
489 17070664 : size_t cnt = 0;
490 :
491 : /* because a table can grow rows over the time a transaction is running, we need to find the last valid segment, to
492 : * keep all of the parts aligned */
493 17070664 : lock_table(tr->store, table->base.id);
494 17073007 : segment *s = segs->h, *l = NULL;
495 :
496 17073007 : if (segs->t && SEG_IS_VALID(segs->t, tr))
497 14777268 : l = s = segs->t;
498 :
499 210246439 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
500 193173610 : if (SEG_IS_VALID(s, tr))
501 : l = s;
502 : }
503 17072829 : if (l)
504 17054645 : cnt = l->end;
505 17072829 : unlock_table(tr->store, table->base.id);
506 17073402 : return cnt;
507 : }
508 :
509 : static segments *
510 51107 : new_segments(sql_trans *tr, size_t cnt)
511 : {
512 51107 : segments *n = (segments*)GDKmalloc(sizeof(segments));
513 :
514 51107 : if (n) {
515 51107 : n->h = n->t = new_segment(NULL, tr, cnt);
516 51107 : if (!n->h) {
517 0 : GDKfree(n);
518 0 : return NULL;
519 : }
520 51107 : sql_ref_init(&n->r);
521 : }
522 : return n;
523 : }
524 :
525 : static sql_delta *
526 29880685 : timestamp_delta( sql_trans *tr, sql_delta *d)
527 : {
528 29928618 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
529 47933 : d = d->next;
530 29880852 : return d;
531 : }
532 :
533 : static sql_delta *
534 29725205 : col_timestamp_delta( sql_trans *tr, sql_column *c)
535 : {
536 29725205 : return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
537 : }
538 :
539 : static sql_delta *
540 23602 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
541 : {
542 23602 : return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
543 : }
544 :
545 : static storage *
546 18111712 : timestamp_storage( sql_trans *tr, storage *d)
547 : {
548 18111712 : if (!d)
549 : return NULL;
550 18175638 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
551 63926 : d = d->next;
552 : return d;
553 : }
554 :
555 : static storage *
556 18086677 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
557 : {
558 18086677 : return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
559 : }
560 :
561 : static sql_delta*
562 19306 : delta_dup(sql_delta *d)
563 : {
564 19306 : ATOMIC_INC(&d->cs.refcnt);
565 19306 : return d;
566 : }
567 :
568 : static void *
569 17940 : col_dup(sql_column *c)
570 : {
571 17940 : return delta_dup(ATOMIC_PTR_GET(&c->data));
572 : }
573 :
574 : static void *
575 2653 : idx_dup(sql_idx *i)
576 : {
577 2653 : if (!ATOMIC_PTR_GET(&i->data))
578 : return NULL;
579 1366 : return delta_dup(ATOMIC_PTR_GET(&i->data));
580 : }
581 :
582 : static storage*
583 1539 : storage_dup(storage *d)
584 : {
585 1539 : ATOMIC_INC(&d->cs.refcnt);
586 1539 : return d;
587 : }
588 :
589 : static void *
590 1539 : del_dup(sql_table *t)
591 : {
592 1539 : return storage_dup(ATOMIC_PTR_GET(&t->data));
593 : }
594 :
595 : static size_t
596 17 : count_inserts( segment *s, sql_trans *tr)
597 : {
598 17 : size_t cnt = 0;
599 :
600 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
601 55 : if (!s->deleted && s->ts == tr->tid)
602 4 : cnt += s->end - s->start;
603 : }
604 17 : return cnt;
605 : }
606 :
607 : static size_t
608 840298 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
609 : {
610 840298 : size_t cnt = 0;
611 :
612 987071 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
613 : ;
614 :
615 4371072 : for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
616 3530775 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
617 1080958 : cnt += s->end - s->start;
618 : }
619 840297 : return cnt;
620 : }
621 :
622 : static size_t
623 17 : count_deletes( segment *s, sql_trans *tr)
624 : {
625 17 : size_t cnt = 0;
626 :
627 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
628 55 : if (SEG_IS_DELETED(s, tr))
629 17 : cnt += s->end - s->start;
630 : }
631 17 : return cnt;
632 : }
633 :
634 : #define CNT_ACTIVE 10
635 :
636 : static size_t
637 16588044 : count_col(sql_trans *tr, sql_column *c, int access)
638 : {
639 16588044 : storage *d;
640 16588044 : sql_delta *ds;
641 :
642 16588044 : if (!isTable(c->t))
643 : return 0;
644 16588044 : d = tab_timestamp_storage(tr, c->t);
645 16586095 : ds = col_timestamp_delta(tr, c);
646 16586648 : if (!d ||!ds)
647 : return 0;
648 16586648 : if (access == 2)
649 480344 : return ds?ds->cs.ucnt:0;
650 16106304 : if (access == 1)
651 17 : return count_inserts(d->segs->h, tr);
652 16106287 : if (access == QUICK)
653 544068 : return d->segs->t?d->segs->t->end:0;
654 15562219 : if (access == CNT_ACTIVE) {
655 840305 : size_t cnt = segs_end(d->segs, tr, c->t);
656 840318 : lock_table(tr->store, c->t->base.id);
657 840299 : cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
658 840296 : unlock_table(tr->store, c->t->base.id);
659 840296 : return cnt;
660 : }
661 14721914 : return segs_end(d->segs, tr, c->t);
662 : }
663 :
664 : static size_t
665 20899 : count_idx(sql_trans *tr, sql_idx *i, int access)
666 : {
667 20899 : storage *d;
668 20899 : sql_delta *ds;
669 :
670 20899 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
671 6719 : return 0;
672 14180 : d = tab_timestamp_storage(tr, i->t);
673 14178 : ds = idx_timestamp_delta(tr, i);
674 14178 : if (!d || !ds)
675 : return 0;
676 14178 : if (access == 2)
677 2871 : return ds?ds->cs.ucnt:0;
678 11307 : if (access == 1)
679 0 : return count_inserts(d->segs->h, tr);
680 11307 : if (access == QUICK)
681 3550 : return d->segs->t?d->segs->t->end:0;
682 7757 : return segs_end(d->segs, tr, i->t);
683 : }
684 :
685 : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
686 : static BAT *
687 12960326 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
688 : {
689 12960326 : BAT *b;
690 :
691 12960326 : assert(access == RD_UPD_ID || access == RD_UPD_VAL);
692 : /* returns the updates for cs */
693 12960326 : if (cs->uibid && cs->uvbid && cs->ucnt) {
694 12070 : if (access == RD_UPD_ID) {
695 7277 : if (!(b = temp_descriptor(cs->uibid)))
696 : return NULL;
697 7277 : if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
698 1657 : (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
699 5620 : oid nil = oid_nil;
700 : /* less then cnt */
701 5620 : BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false);
702 5620 : if (!s) {
703 0 : bat_destroy(b);
704 0 : return NULL;
705 : }
706 :
707 5620 : BAT *nb = BATproject(s, b);
708 5620 : bat_destroy(s);
709 5620 : bat_destroy(b);
710 5620 : b = nb;
711 : }
712 : } else {
713 4793 : b = temp_descriptor(cs->uvbid);
714 : }
715 : } else {
716 21580496 : b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
717 : }
718 : return b;
719 : }
720 :
721 : static BAT *
722 0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
723 : {
724 0 : int err = 0;
725 0 : BAT *uv = *UV;
726 0 : BUN cnt = BATcount(ui)+BATcount(oi);
727 0 : BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
728 0 : BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
729 :
730 0 : if (!ni || (uv && !nv)) {
731 0 : bat_destroy(ni);
732 0 : bat_destroy(nv);
733 0 : bat_destroy(ui);
734 0 : bat_destroy(uv);
735 0 : bat_destroy(oi);
736 0 : bat_destroy(ov);
737 0 : return NULL;
738 : }
739 0 : BATiter uvi;
740 0 : BATiter ovi;
741 :
742 0 : if (uv) {
743 0 : uvi = bat_iterator(uv);
744 0 : ovi = bat_iterator(ov);
745 : }
746 :
747 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
748 0 : BUN uip = 0, uie = BATcount(ui);
749 0 : BUN oip = 0, oie = BATcount(oi);
750 :
751 0 : oid uiseqb = ui->tseqbase;
752 0 : oid oiseqb = oi->tseqbase;
753 0 : oid *uipt = NULL, *oipt = NULL;
754 0 : BATiter uii = bat_iterator(ui);
755 0 : BATiter oii = bat_iterator(oi);
756 0 : if (!BATtdensebi(&uii))
757 0 : uipt = uii.base;
758 0 : if (!BATtdensebi(&oii))
759 0 : oipt = oii.base;
760 0 : while (uip < uie && oip < oie && !err) {
761 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
762 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
763 :
764 0 : if (uiid <= oiid) {
765 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
766 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
767 : err = 1;
768 0 : uip++;
769 0 : if (uiid == oiid)
770 0 : oip++;
771 : } else { /* uiid > oiid */
772 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
773 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
774 : err = 1;
775 0 : oip++;
776 : }
777 : }
778 0 : while (uip < uie && !err) {
779 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
780 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
781 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
782 : err = 1;
783 0 : uip++;
784 : }
785 0 : while (oip < oie && !err) {
786 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
787 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
788 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
789 : err = 1;
790 0 : oip++;
791 : }
792 0 : if (uv) {
793 0 : bat_iterator_end(&uvi);
794 0 : bat_iterator_end(&ovi);
795 : }
796 0 : bat_iterator_end(&uii);
797 0 : bat_iterator_end(&oii);
798 0 : bat_destroy(ui);
799 0 : bat_destroy(uv);
800 0 : bat_destroy(oi);
801 0 : bat_destroy(ov);
802 0 : if (!err) {
803 0 : if (nv)
804 0 : *UV = nv;
805 0 : return ni;
806 : }
807 0 : *UV = NULL;
808 0 : bat_destroy(ni);
809 0 : bat_destroy(nv);
810 0 : return NULL;
811 : }
812 :
813 : static sql_delta *
814 8639584 : older_delta( sql_delta *d, sql_trans *tr)
815 : {
816 8639584 : sql_delta *o = d->next;
817 :
818 8646983 : while (o && !o->cs.merged) {
819 7395 : if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
820 : break;
821 : else
822 7399 : o = o->next;
823 : }
824 8639588 : if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
825 0 : return o;
826 : return NULL;
827 : }
828 :
829 : static BAT *
830 8639691 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
831 : {
832 8639691 : assert(tr->active);
833 8639691 : sql_delta *o = NULL;
834 8639691 : BAT *ui = NULL, *uv = NULL;
835 :
836 8639691 : if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
837 : return NULL;
838 8639510 : if (access == RD_UPD_VAL) {
839 4321092 : if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
840 0 : bat_destroy(ui);
841 0 : return NULL;
842 : }
843 : }
844 8639584 : while ((o = older_delta(d, tr)) != NULL) {
845 0 : BAT *oui = NULL, *ouv = NULL;
846 0 : if (!oui)
847 0 : oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
848 0 : if (access == RD_UPD_VAL)
849 0 : ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
850 0 : if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
851 0 : bat_destroy(ui);
852 0 : bat_destroy(uv);
853 0 : bat_destroy(oui);
854 0 : bat_destroy(ouv);
855 0 : return NULL;
856 : }
857 0 : if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
858 : return NULL;
859 : d = o;
860 : }
861 8639552 : if (uv) {
862 4321158 : bat_destroy(ui);
863 4321158 : return uv;
864 : }
865 : return ui;
866 : }
867 :
868 : static BAT *
869 2312 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
870 : {
871 2312 : lock_column(tr->store, c->base.id);
872 2312 : sql_delta *d = col_timestamp_delta(tr, c);
873 2312 : int type = c->type.type->localtype;
874 :
875 2312 : if (!d) {
876 0 : unlock_column(tr->store, c->base.id);
877 0 : return NULL;
878 : }
879 2312 : if (d->cs.st == ST_DICT) {
880 0 : BAT *b = quick_descriptor(d->cs.bid);
881 :
882 0 : type = b->ttype;
883 : }
884 2312 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
885 2312 : unlock_column(tr->store, c->base.id);
886 2312 : return bn;
887 : }
888 :
889 : static BAT *
890 0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
891 : {
892 0 : lock_column(tr->store, i->base.id);
893 0 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
894 0 : sql_delta *d = idx_timestamp_delta(tr, i);
895 :
896 0 : if (!d) {
897 0 : unlock_column(tr->store, i->base.id);
898 0 : return NULL;
899 : }
900 0 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
901 0 : unlock_column(tr->store, i->base.id);
902 0 : return bn;
903 : }
904 :
905 : static BAT *
906 8819854 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
907 : {
908 8819854 : BAT *b;
909 :
910 8819854 : assert(access == RDONLY || access == QUICK || access == RD_EXT);
911 8819854 : assert(cs != NULL);
912 8819854 : if (access == QUICK)
913 175820 : return quick_descriptor(cs->bid);
914 8644034 : if (access == RD_EXT)
915 857 : return temp_descriptor(cs->ebid);
916 8643177 : assert(cs->bid);
917 8643177 : b = temp_descriptor(cs->bid);
918 8643041 : if (b == NULL)
919 : return NULL;
920 8643041 : assert(b->batRestricted == BAT_READ);
921 : /* return slice */
922 8643041 : BAT *s = BATslice(b, 0, cnt);
923 8643194 : bat_destroy(b);
924 8643194 : return s;
925 : }
926 :
927 : static int
928 4318588 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
929 : {
930 4318588 : lock_column(tr->store, c->base.id);
931 4318528 : size_t cnt = count_col(tr, c, 0);
932 4318711 : sql_delta *d = col_timestamp_delta(tr, c);
933 4318682 : int type = c->type.type->localtype;
934 :
935 4318682 : if (!d) {
936 0 : unlock_column(tr->store, c->base.id);
937 0 : return LOG_ERR;
938 : }
939 4318682 : if (d->cs.st == ST_DICT) {
940 2 : BAT *b = quick_descriptor(d->cs.bid);
941 :
942 2 : type = b->ttype;
943 : }
944 :
945 4318682 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
946 4318831 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
947 :
948 4318832 : unlock_column(tr->store, c->base.id);
949 :
950 4318800 : if (*ui == NULL || *uv == NULL) {
951 0 : bat_destroy(*ui);
952 0 : bat_destroy(*uv);
953 0 : return LOG_ERR;
954 : }
955 : return LOG_OK;
956 : }
957 :
958 : static int
959 23 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
960 : {
961 23 : lock_column(tr->store, i->base.id);
962 23 : size_t cnt = count_idx(tr, i, 0);
963 23 : sql_delta *d = idx_timestamp_delta(tr, i);
964 23 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
965 :
966 23 : if (!d) {
967 0 : unlock_column(tr->store, i->base.id);
968 0 : return LOG_ERR;
969 : }
970 :
971 23 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
972 23 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
973 :
974 23 : unlock_column(tr->store, i->base.id);
975 :
976 23 : if (*ui == NULL || *uv == NULL) {
977 0 : bat_destroy(*ui);
978 0 : bat_destroy(*uv);
979 0 : return LOG_ERR;
980 : }
981 : return LOG_OK;
982 : }
983 :
984 : static void * /* BAT * */
985 8812732 : bind_col(sql_trans *tr, sql_column *c, int access)
986 : {
987 8812732 : assert(access == QUICK || tr->active);
988 8812732 : if (!isTable(c->t))
989 : return NULL;
990 8812732 : sql_delta *d = col_timestamp_delta(tr, c);
991 8812563 : if (!d)
992 : return NULL;
993 8812563 : size_t cnt = count_col(tr, c, 0);
994 8812342 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
995 2312 : return bind_ucol(tr, c, access, cnt);
996 8810030 : BAT *b = cs_bind_bat( &d->cs, access, cnt);
997 8810080 : assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
998 : return b;
999 : }
1000 :
1001 : static void * /* BAT * */
1002 9407 : bind_idx(sql_trans *tr, sql_idx * i, int access)
1003 : {
1004 9407 : assert(access == QUICK || tr->active);
1005 9407 : if (!isTable(i->t))
1006 : return NULL;
1007 9407 : sql_delta *d = idx_timestamp_delta(tr, i);
1008 9408 : if (!d)
1009 : return NULL;
1010 9408 : size_t cnt = count_idx(tr, i, 0);
1011 9409 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
1012 0 : return bind_uidx(tr, i, access, cnt);
1013 9409 : return cs_bind_bat( &d->cs, access, cnt);
1014 : }
1015 :
1016 : static int
1017 2296 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
1018 : {
1019 2296 : if (!cs->uibid) {
1020 0 : cs->uibid = e_bat(TYPE_oid);
1021 0 : if (cs->uibid == BID_NIL)
1022 : return LOG_ERR;
1023 : }
1024 2296 : if (!cs->uvbid) {
1025 0 : BAT *cur = quick_descriptor(cs->bid);
1026 0 : if (!cur)
1027 : return LOG_ERR;
1028 0 : int type = cur->ttype;
1029 0 : cs->uvbid = e_bat(type);
1030 0 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1031 : return LOG_ERR;
1032 : }
1033 2296 : BAT *ui = temp_descriptor(cs->uibid);
1034 2296 : BAT *uv = temp_descriptor(cs->uvbid);
1035 :
1036 2296 : if (ui == NULL || uv == NULL) {
1037 0 : bat_destroy(ui);
1038 0 : bat_destroy(uv);
1039 0 : return LOG_ERR;
1040 : }
1041 2296 : assert(ui && uv);
1042 2296 : if (isEbat(ui)){
1043 392 : temp_destroy(cs->uibid);
1044 392 : cs->uibid = temp_copy(ui->batCacheid, true, true);
1045 392 : bat_destroy(ui);
1046 392 : if (cs->uibid == BID_NIL ||
1047 392 : (ui = temp_descriptor(cs->uibid)) == NULL) {
1048 0 : bat_destroy(uv);
1049 0 : return LOG_ERR;
1050 : }
1051 : }
1052 2296 : if (isEbat(uv)){
1053 392 : temp_destroy(cs->uvbid);
1054 392 : cs->uvbid = temp_copy(uv->batCacheid, true, true);
1055 392 : bat_destroy(uv);
1056 392 : if (cs->uvbid == BID_NIL ||
1057 392 : (uv = temp_descriptor(cs->uvbid)) == NULL) {
1058 0 : bat_destroy(ui);
1059 0 : return LOG_ERR;
1060 : }
1061 : }
1062 2296 : *Ui = ui;
1063 2296 : *Uv = uv;
1064 2296 : return LOG_OK;
1065 : }
1066 :
1067 : static int
1068 5116 : segments_is_append(segment *s, sql_trans *tr, oid rid)
1069 : {
1070 44691 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1071 44691 : if (s->start <= rid && s->end > rid) {
1072 5116 : if (s->ts == tr->tid && !s->deleted) {
1073 2874 : return 1;
1074 : }
1075 : break;
1076 : }
1077 : }
1078 : return 0;
1079 : }
1080 :
1081 : static int
1082 2242 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
1083 : {
1084 38593 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1085 38593 : if (s->start <= rid && s->end > rid) {
1086 2242 : if (s->ts >= tr->ts && s->deleted) {
1087 0 : return 1;
1088 : }
1089 : break;
1090 : }
1091 : }
1092 : return 0;
1093 : }
1094 :
1095 : static sql_delta *
1096 0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
1097 : {
1098 0 : sql_delta *n = ZNEW(sql_delta);
1099 0 : if (!n)
1100 : return NULL;
1101 0 : *n = *bat;
1102 0 : n->next = NULL;
1103 0 : n->cs.ts = tr->tid;
1104 0 : return n;
1105 : }
1106 :
1107 : static BAT *
1108 17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
1109 : {
1110 17 : BAT *newoffsets = NULL;
1111 17 : sql_delta *bat = *batp;
1112 17 : column_storage *cs = &bat->cs;
1113 17 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1114 :
1115 17 : if (!u)
1116 : return NULL;
1117 17 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1118 17 : if (DICTprepare4append(&newoffsets, i, u) < 0) {
1119 0 : bat_destroy(u);
1120 0 : return NULL;
1121 : } else {
1122 17 : int new = 0;
1123 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1124 17 : if (BATcount(u) >= max_cnt) {
1125 1 : if (max_cnt == 64*1024) { /* decompress */
1126 0 : if (!(b = temp_descriptor(cs->bid))) {
1127 0 : bat_destroy(u);
1128 0 : return NULL;
1129 : }
1130 0 : if (cs->ucnt) {
1131 0 : BAT *ui = NULL, *uv = NULL;
1132 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1133 0 : bat_destroy(b);
1134 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1135 0 : bat_destroy(nb);
1136 0 : bat_destroy(u);
1137 0 : return NULL;
1138 : }
1139 0 : b = nb;
1140 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1141 0 : bat_destroy(ui);
1142 0 : bat_destroy(uv);
1143 0 : bat_destroy(b);
1144 0 : bat_destroy(u);
1145 : }
1146 0 : bat_destroy(ui);
1147 0 : bat_destroy(uv);
1148 : }
1149 0 : n = DICTdecompress_(b, u, PERSISTENT);
1150 0 : bat_destroy(b);
1151 0 : assert(newoffsets == NULL);
1152 0 : if (!n) {
1153 0 : bat_destroy(u);
1154 0 : return NULL;
1155 : }
1156 0 : if (cs->ts != tr->tid) {
1157 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1158 0 : bat_destroy(n);
1159 0 : return NULL;
1160 : }
1161 0 : cs = &(*batp)->cs;
1162 0 : new = 1;
1163 : }
1164 0 : if (cs->bid && !new)
1165 0 : temp_destroy(cs->bid);
1166 0 : n = transfer_to_systrans(n);
1167 0 : if (n == NULL)
1168 : return NULL;
1169 0 : bat_set_access(n, BAT_READ);
1170 0 : cs->bid = temp_create(n);
1171 0 : bat_destroy(n);
1172 0 : if (cs->ebid && !new)
1173 0 : temp_destroy(cs->ebid);
1174 0 : cs->ebid = 0;
1175 0 : cs->ucnt = 0;
1176 0 : if (cs->uibid && !new)
1177 0 : temp_destroy(cs->uibid);
1178 0 : if (cs->uvbid && !new)
1179 0 : temp_destroy(cs->uvbid);
1180 0 : cs->uibid = cs->uvbid = 0;
1181 0 : cs->st = ST_DEFAULT;
1182 0 : cs->cleared = true;
1183 : } else {
1184 1 : if (!(b = temp_descriptor(cs->bid))) {
1185 0 : bat_destroy(newoffsets);
1186 0 : bat_destroy(u);
1187 0 : return NULL;
1188 : }
1189 1 : n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), PERSISTENT);
1190 1 : bat_destroy(b);
1191 1 : if (!n) {
1192 0 : bat_destroy(newoffsets);
1193 0 : bat_destroy(u);
1194 0 : return NULL;
1195 : }
1196 1 : if (cs->ts != tr->tid) {
1197 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1198 0 : bat_destroy(n);
1199 0 : return NULL;
1200 : }
1201 0 : cs = &(*batp)->cs;
1202 0 : new = 1;
1203 0 : temp_dup(cs->ebid);
1204 0 : if (cs->uibid) {
1205 0 : temp_dup(cs->uibid);
1206 0 : temp_dup(cs->uvbid);
1207 : }
1208 : }
1209 1 : if (cs->bid && !new)
1210 1 : temp_destroy(cs->bid);
1211 1 : n = transfer_to_systrans(n);
1212 1 : if (n == NULL)
1213 : return NULL;
1214 1 : bat_set_access(n, BAT_READ);
1215 1 : cs->bid = temp_create(n);
1216 1 : bat_destroy(n);
1217 1 : cs->cleared = true;
1218 1 : i = newoffsets;
1219 : }
1220 : } else { /* append */
1221 16 : i = newoffsets;
1222 : }
1223 : }
1224 17 : bat_destroy(u);
1225 17 : return i;
1226 : }
1227 :
1228 : static BAT *
1229 0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
1230 : {
1231 0 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1232 0 : BAT *newoffsets = NULL;
1233 0 : BAT *b = NULL, *n = NULL;
1234 :
1235 0 : if (!(b = temp_descriptor(cs->bid)))
1236 : return NULL;
1237 :
1238 0 : if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
1239 0 : bat_destroy(b);
1240 0 : return NULL;
1241 : } else {
1242 : /* returns new offset bat if values within min/max, else decompress */
1243 0 : if (!newoffsets) { /* decompress */
1244 0 : if (cs->ucnt) {
1245 0 : BAT *ui = NULL, *uv = NULL;
1246 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1247 0 : bat_destroy(b);
1248 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1249 0 : bat_destroy(nb);
1250 0 : return NULL;
1251 : }
1252 0 : b = nb;
1253 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1254 0 : bat_destroy(ui);
1255 0 : bat_destroy(uv);
1256 0 : bat_destroy(b);
1257 : }
1258 0 : bat_destroy(ui);
1259 0 : bat_destroy(uv);
1260 : }
1261 0 : n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
1262 0 : bat_destroy(b);
1263 0 : if (!n)
1264 : return NULL;
1265 0 : if (cs->bid)
1266 0 : temp_destroy(cs->bid);
1267 0 : n = transfer_to_systrans(n);
1268 0 : if (n == NULL)
1269 : return NULL;
1270 0 : bat_set_access(n, BAT_READ);
1271 0 : cs->bid = temp_create(n);
1272 0 : cs->ucnt = 0;
1273 0 : if (cs->uibid)
1274 0 : temp_destroy(cs->uibid);
1275 0 : if (cs->uvbid)
1276 0 : temp_destroy(cs->uvbid);
1277 0 : cs->uibid = cs->uvbid = 0;
1278 0 : cs->st = ST_DEFAULT;
1279 0 : cs->cleared = true;
1280 0 : b = n;
1281 : } else { /* append */
1282 : i = newoffsets;
1283 : }
1284 : }
1285 0 : bat_destroy(b);
1286 0 : return i;
1287 : }
1288 :
1289 : /*
1290 : * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
1291 : */
1292 : static int
1293 2912 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
1294 : {
1295 2912 : int res = LOG_OK;
1296 2912 : sql_delta *bat = *batp;
1297 2912 : column_storage *cs = &bat->cs;
1298 2912 : BAT *otids = tids, *oupdates = updates;
1299 :
1300 2912 : if (!BATcount(tids))
1301 : return LOG_OK;
1302 :
1303 2912 : if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
1304 6 : tids = BATunmask(tids);
1305 6 : if (!tids)
1306 : return LOG_ERR;
1307 : }
1308 2912 : if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
1309 0 : updates = BATunmask(updates);
1310 0 : if (!updates) {
1311 0 : if (otids != tids)
1312 0 : bat_destroy(tids);
1313 0 : return LOG_ERR;
1314 : }
1315 2912 : } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
1316 42 : updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
1317 42 : if (!updates) {
1318 0 : if (otids != tids)
1319 0 : bat_destroy(tids);
1320 0 : return LOG_ERR;
1321 : }
1322 : }
1323 :
1324 2912 : if (cs->st == ST_DICT) {
1325 : /* possibly a new array is returned */
1326 4 : BAT *nupdates = dict_append_bat(tr, batp, updates);
1327 4 : bat = *batp;
1328 4 : cs = &bat->cs;
1329 4 : if (oupdates != updates)
1330 0 : bat_destroy(updates);
1331 4 : updates = nupdates;
1332 4 : if (!updates) {
1333 0 : if (otids != tids)
1334 0 : bat_destroy(tids);
1335 0 : return LOG_ERR;
1336 : }
1337 : }
1338 :
1339 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1340 : /* currently only one update delta is possible */
1341 2912 : lock_table(tr->store, t->base.id);
1342 2912 : storage *s = ATOMIC_PTR_GET(&t->data);
1343 2912 : if (!is_new && !cs->cleared) {
1344 2589 : if (!tids->tsorted /* make sure we have simple dense or oids */) {
1345 6 : BAT *sorted, *order;
1346 6 : if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
1347 0 : if (otids != tids)
1348 0 : bat_destroy(tids);
1349 0 : if (oupdates != updates)
1350 0 : bat_destroy(updates);
1351 0 : unlock_table(tr->store, t->base.id);
1352 0 : return LOG_ERR;
1353 : }
1354 6 : if (otids != tids)
1355 0 : bat_destroy(tids);
1356 6 : tids = sorted;
1357 6 : BAT *nupdates = BATproject(order, updates);
1358 6 : bat_destroy(order);
1359 6 : if (oupdates != updates)
1360 0 : bat_destroy(updates);
1361 6 : updates = nupdates;
1362 6 : if (!updates) {
1363 0 : bat_destroy(tids);
1364 0 : unlock_table(tr->store, t->base.id);
1365 0 : return LOG_ERR;
1366 : }
1367 : }
1368 2589 : assert(tids->tsorted);
1369 2589 : BAT *ui = NULL, *uv = NULL;
1370 :
1371 : /* handle updates on just inserted bits */
1372 : /* handle updates on updates (within one transaction) */
1373 2589 : BATiter upi = bat_iterator(updates);
1374 2589 : BUN cnt = 0, ucnt = BATcount(tids);
1375 2589 : BAT *b, *ins = NULL;
1376 2589 : int *msk = NULL;
1377 :
1378 2589 : if((b = temp_descriptor(cs->bid)) == NULL)
1379 : res = LOG_ERR;
1380 :
1381 2589 : if (res == LOG_OK && BATtdense(tids)) {
1382 2457 : oid start = tids->tseqbase, offset = start;
1383 2457 : oid end = start + ucnt;
1384 :
1385 8121 : for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
1386 6264 : if (seg->start <= start && seg->end > start) {
1387 : /* check for delete conflicts */
1388 2457 : if (seg->ts >= tr->ts && seg->deleted) {
1389 0 : res = LOG_CONFLICT;
1390 0 : continue;
1391 : }
1392 :
1393 : /* check for inplace updates */
1394 2457 : BUN lend = end < seg->end?end:seg->end;
1395 2457 : if (seg->ts == tr->tid && !seg->deleted) {
1396 86 : if (!ins) {
1397 86 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1398 86 : if (!ins)
1399 : res = LOG_ERR;
1400 : else {
1401 86 : BATsetcount(ins, ucnt); /* all full updates */
1402 86 : msk = (int*)Tloc(ins, 0);
1403 86 : BUN end = (ucnt+31)/32;
1404 86 : memset(msk, 0, end * sizeof(int));
1405 : }
1406 : }
1407 361 : for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
1408 275 : const void *upd = BUNtail(upi, rid-offset);
1409 275 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1410 0 : res = LOG_ERR;
1411 :
1412 275 : oid word = i/32;
1413 275 : int pos = i%32;
1414 275 : msk[word] |= 1U<<pos;
1415 275 : cnt++;
1416 : }
1417 : }
1418 : }
1419 6264 : if (end < seg->end)
1420 : break;
1421 : }
1422 135 : } else if (res == LOG_OK && complex_cand(tids)) {
1423 3 : struct canditer ci;
1424 3 : segment *seg = s->segs->h;
1425 3 : canditer_init(&ci, NULL, tids);
1426 3 : BUN i = 0;
1427 1036 : while ( seg && res == LOG_OK && i < ucnt) {
1428 1033 : oid rid = canditer_next(&ci);
1429 1033 : if (seg->end <= rid)
1430 13 : seg = ATOMIC_PTR_GET(&seg->next);
1431 1020 : else if (seg->start <= rid && seg->end > rid) {
1432 : /* check for delete conflicts */
1433 1020 : if (seg->ts >= tr->ts && seg->deleted) {
1434 0 : res = LOG_CONFLICT;
1435 0 : continue;
1436 : }
1437 :
1438 : /* check for inplace updates */
1439 1020 : if (seg->ts == tr->tid && !seg->deleted) {
1440 0 : if (!ins) {
1441 0 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1442 0 : if (!ins) {
1443 : res = LOG_ERR;
1444 : break;
1445 : } else {
1446 0 : BATsetcount(ins, ucnt); /* all full updates */
1447 0 : msk = (int*)Tloc(ins, 0);
1448 0 : BUN end = (ucnt+31)/32;
1449 0 : memset(msk, 0, end * sizeof(int));
1450 : }
1451 : }
1452 0 : ptr upd = BUNtail(upi, i);
1453 0 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1454 0 : res = LOG_ERR;
1455 :
1456 0 : oid word = i/32;
1457 0 : int pos = i%32;
1458 0 : msk[word] |= 1U<<pos;
1459 0 : cnt++;
1460 : }
1461 1020 : i++;
1462 : }
1463 : }
1464 129 : } else if (res == LOG_OK) {
1465 129 : BUN i = 0;
1466 129 : oid *rid = Tloc(tids,0);
1467 129 : segment *seg = s->segs->h;
1468 14304 : while ( seg && res == LOG_OK && i < ucnt) {
1469 14175 : if (seg->end <= rid[i])
1470 3719 : seg = ATOMIC_PTR_GET(&seg->next);
1471 10456 : else if (seg->start <= rid[i] && seg->end > rid[i]) {
1472 : /* check for delete conflicts */
1473 10456 : if (seg->ts >= tr->ts && seg->deleted) {
1474 0 : res = LOG_CONFLICT;
1475 0 : continue;
1476 : }
1477 :
1478 : /* check for inplace updates */
1479 10456 : if (seg->ts == tr->tid && !seg->deleted) {
1480 291 : if (!ins) {
1481 28 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1482 28 : if (!ins) {
1483 : res = LOG_ERR;
1484 : break;
1485 : } else {
1486 28 : BATsetcount(ins, ucnt); /* all full updates */
1487 28 : msk = (int*)Tloc(ins, 0);
1488 28 : BUN end = (ucnt+31)/32;
1489 28 : memset(msk, 0, end * sizeof(int));
1490 : }
1491 : }
1492 291 : const void *upd = BUNtail(upi, i);
1493 291 : if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
1494 0 : res = LOG_ERR;
1495 :
1496 291 : oid word = i/32;
1497 291 : int pos = i%32;
1498 291 : msk[word] |= 1U<<pos;
1499 291 : cnt++;
1500 : }
1501 10456 : i++;
1502 : }
1503 : }
1504 : }
1505 :
1506 2589 : if (res == LOG_OK && cnt < ucnt) { /* now handle real updates */
1507 2475 : if (cs->ucnt == 0) {
1508 2421 : if (cnt) {
1509 0 : BAT *nins = BATmaskedcands(0, ucnt, ins, false);
1510 0 : if (nins) {
1511 0 : ui = BATproject(nins, tids);
1512 0 : uv = BATproject(nins, updates);
1513 0 : bat_destroy(nins);
1514 : }
1515 : } else {
1516 2421 : ui = temp_descriptor(tids->batCacheid);
1517 2421 : uv = temp_descriptor(updates->batCacheid);
1518 : }
1519 2421 : if (!ui || !uv) {
1520 : res = LOG_ERR;
1521 : } else {
1522 2421 : temp_destroy(cs->uibid);
1523 2421 : temp_destroy(cs->uvbid);
1524 2421 : ui = transfer_to_systrans(ui);
1525 2421 : uv = transfer_to_systrans(uv);
1526 2421 : if (ui == NULL || uv == NULL) {
1527 0 : BBPreclaim(ui);
1528 0 : BBPreclaim(uv);
1529 : res = LOG_ERR;
1530 : } else {
1531 2421 : cs->uibid = temp_create(ui);
1532 2421 : cs->uvbid = temp_create(uv);
1533 2421 : cs->ucnt = BATcount(ui);
1534 : }
1535 : }
1536 : } else {
1537 54 : BAT *nui = NULL, *nuv = NULL;
1538 :
1539 : /* merge taking msk of inserted into account */
1540 54 : if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
1541 : res = LOG_ERR;
1542 :
1543 54 : if (res == LOG_OK) {
1544 54 : const void *upd = NULL;
1545 54 : nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
1546 54 : nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
1547 :
1548 54 : if (!nui || !nuv) {
1549 : res = LOG_ERR;
1550 : } else {
1551 54 : BATiter ovi = bat_iterator(uv);
1552 :
1553 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
1554 54 : BUN uip = 0, uie = BATcount(ui);
1555 54 : BUN nip = 0, nie = BATcount(tids);
1556 54 : oid uiseqb = ui->tseqbase;
1557 54 : oid niseqb = tids->tseqbase;
1558 54 : oid *uipt = NULL, *nipt = NULL;
1559 54 : BATiter uii = bat_iterator(ui);
1560 54 : BATiter tidsi = bat_iterator(tids);
1561 54 : if (!BATtdensebi(&uii))
1562 53 : uipt = uii.base;
1563 54 : if (!BATtdensebi(&tidsi))
1564 53 : nipt = tidsi.base;
1565 14211 : while (uip < uie && nip < nie && res == LOG_OK) {
1566 14157 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1567 14157 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1568 :
1569 14157 : if (uiv < niv) {
1570 7916 : upd = BUNtail(ovi, uip);
1571 15832 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1572 7916 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1573 : res = LOG_ERR;
1574 7916 : uip++;
1575 6241 : } else if (uiv == niv) {
1576 : /* handle == */
1577 18 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1578 18 : upd = BUNtail(upi, nip);
1579 36 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1580 18 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1581 : res = LOG_ERR;
1582 : } else {
1583 0 : upd = BUNtail(ovi, uip);
1584 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1585 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1586 : res = LOG_ERR;
1587 : }
1588 18 : uip++;
1589 18 : nip++;
1590 : } else { /* uiv > niv */
1591 6223 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1592 6223 : upd = BUNtail(upi, nip);
1593 12446 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1594 6223 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1595 : res = LOG_ERR;
1596 : }
1597 6223 : nip++;
1598 : }
1599 : }
1600 275 : while (uip < uie && res == LOG_OK) {
1601 221 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1602 221 : upd = BUNtail(ovi, uip);
1603 442 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1604 221 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1605 : res = LOG_ERR;
1606 221 : uip++;
1607 : }
1608 383 : while (nip < nie && res == LOG_OK) {
1609 329 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1610 329 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1611 329 : upd = BUNtail(upi, nip);
1612 658 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1613 329 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1614 : res = LOG_ERR;
1615 : }
1616 329 : nip++;
1617 : }
1618 54 : bat_iterator_end(&uii);
1619 54 : bat_iterator_end(&tidsi);
1620 54 : bat_iterator_end(&ovi);
1621 54 : if (res == LOG_OK) {
1622 54 : temp_destroy(cs->uibid);
1623 54 : temp_destroy(cs->uvbid);
1624 54 : nui = transfer_to_systrans(nui);
1625 54 : nuv = transfer_to_systrans(nuv);
1626 54 : if (nui == NULL || nuv == NULL) {
1627 : res = LOG_ERR;
1628 : } else {
1629 54 : cs->uibid = temp_create(nui);
1630 54 : cs->uvbid = temp_create(nuv);
1631 54 : cs->ucnt = BATcount(nui);
1632 : }
1633 : }
1634 : }
1635 54 : bat_destroy(nui);
1636 54 : bat_destroy(nuv);
1637 : }
1638 : }
1639 : }
1640 2589 : bat_iterator_end(&upi);
1641 2589 : bat_destroy(b);
1642 2589 : unlock_table(tr->store, t->base.id);
1643 2589 : bat_destroy(ins);
1644 2589 : bat_destroy(ui);
1645 2589 : bat_destroy(uv);
1646 2589 : if (otids != tids)
1647 10 : bat_destroy(tids);
1648 2589 : if (oupdates != updates)
1649 11 : bat_destroy(updates);
1650 2589 : return res;
1651 : } else if (is_new || cs->cleared) {
1652 323 : BAT *b = temp_descriptor(cs->bid);
1653 :
1654 323 : if (b == NULL) {
1655 : res = LOG_ERR;
1656 : } else {
1657 323 : if (BATcount(b)==0) {
1658 1 : if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
1659 0 : res = LOG_ERR;
1660 322 : } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
1661 0 : res = LOG_ERR;
1662 323 : BBPcold(b->batCacheid);
1663 323 : bat_destroy(b);
1664 : }
1665 : }
1666 323 : unlock_table(tr->store, t->base.id);
1667 323 : if (otids != tids)
1668 2 : bat_destroy(tids);
1669 323 : if (oupdates != updates)
1670 41 : bat_destroy(updates);
1671 : return res;
1672 : }
1673 :
1674 : static int
1675 2912 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
1676 : {
1677 2912 : return cs_update_bat(tr, bat, t, tids, updates, is_new);
1678 : }
1679 :
1680 : static void *
1681 4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
1682 : {
1683 4 : void *newoffsets = NULL;
1684 4 : sql_delta *bat = *batp;
1685 4 : column_storage *cs = &bat->cs;
1686 4 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1687 :
1688 4 : if (!u)
1689 : return NULL;
1690 4 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1691 4 : if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
1692 0 : bat_destroy(u);
1693 0 : return NULL;
1694 : } else {
1695 4 : int new = 0;
1696 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1697 4 : if (BATcount(u) >= max_cnt) {
1698 0 : if (max_cnt == 64*1024) { /* decompress */
1699 0 : if (!(b = temp_descriptor(cs->bid))) {
1700 0 : bat_destroy(u);
1701 0 : return NULL;
1702 : }
1703 0 : n = DICTdecompress_(b, u, PERSISTENT);
1704 : /* TODO decompress updates if any */
1705 0 : bat_destroy(b);
1706 0 : assert(newoffsets == NULL);
1707 0 : if (!n) {
1708 0 : bat_destroy(u);
1709 0 : return NULL;
1710 : }
1711 0 : if (cs->ts != tr->tid) {
1712 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1713 0 : bat_destroy(n);
1714 0 : bat_destroy(u);
1715 0 : return NULL;
1716 : }
1717 0 : cs = &(*batp)->cs;
1718 0 : new = 1;
1719 0 : cs->uibid = cs->uvbid = 0;
1720 : }
1721 0 : if (cs->bid && !new)
1722 0 : temp_destroy(cs->bid);
1723 0 : n = transfer_to_systrans(n);
1724 0 : if (n == NULL) {
1725 0 : bat_destroy(u);
1726 0 : return NULL;
1727 : }
1728 0 : bat_set_access(n, BAT_READ);
1729 0 : cs->bid = temp_create(n);
1730 0 : bat_destroy(n);
1731 0 : if (cs->ebid && !new)
1732 0 : temp_destroy(cs->ebid);
1733 0 : cs->ebid = 0;
1734 0 : cs->st = ST_DEFAULT;
1735 : /* at append_col the column's storage type is cleared */
1736 0 : cs->cleared = true;
1737 : } else {
1738 0 : if (!(b = temp_descriptor(cs->bid))) {
1739 0 : GDKfree(newoffsets);
1740 0 : bat_destroy(u);
1741 0 : return NULL;
1742 : }
1743 0 : n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, PERSISTENT);
1744 0 : bat_destroy(b);
1745 0 : if (!n) {
1746 0 : GDKfree(newoffsets);
1747 0 : bat_destroy(u);
1748 0 : return NULL;
1749 : }
1750 0 : if (cs->ts != tr->tid) {
1751 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1752 0 : bat_destroy(u);
1753 0 : bat_destroy(n);
1754 0 : return NULL;
1755 : }
1756 0 : cs = &(*batp)->cs;
1757 0 : new = 1;
1758 0 : temp_dup(cs->ebid);
1759 0 : if (cs->uibid) {
1760 0 : temp_dup(cs->uibid);
1761 0 : temp_dup(cs->uvbid);
1762 : }
1763 : }
1764 0 : if (cs->bid)
1765 0 : temp_destroy(cs->bid);
1766 0 : n = transfer_to_systrans(n);
1767 0 : if (n == NULL) {
1768 0 : bat_destroy(u);
1769 0 : return NULL;
1770 : }
1771 0 : bat_set_access(n, BAT_READ);
1772 0 : cs->bid = temp_create(n);
1773 0 : bat_destroy(n);
1774 0 : cs->cleared = true;
1775 0 : i = newoffsets;
1776 : }
1777 : } else { /* append */
1778 4 : i = newoffsets;
1779 : }
1780 : }
1781 4 : bat_destroy(u);
1782 4 : return i;
1783 : }
1784 :
1785 : static void *
1786 1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
1787 : {
1788 1 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1789 1 : void *newoffsets = NULL;
1790 1 : BAT *b = NULL, *n = NULL;
1791 :
1792 1 : if (!(b = temp_descriptor(cs->bid)))
1793 : return NULL;
1794 :
1795 1 : if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
1796 0 : bat_destroy(b);
1797 0 : return NULL;
1798 : } else {
1799 : /* returns new offset bat if values within min/max, else decompress */
1800 1 : if (!newoffsets) {
1801 1 : n = FORdecompress_(b, offsetval, tt, PERSISTENT);
1802 1 : bat_destroy(b);
1803 1 : if (!n)
1804 : return NULL;
1805 : /* TODO decompress updates if any */
1806 1 : if (cs->bid)
1807 1 : temp_destroy(cs->bid);
1808 1 : n = transfer_to_systrans(n);
1809 1 : if (n == NULL)
1810 : return NULL;
1811 1 : bat_set_access(n, BAT_READ);
1812 1 : cs->bid = temp_create(n);
1813 1 : cs->st = ST_DEFAULT;
1814 : /* at append_col the column's storage type is cleared */
1815 1 : cs->cleared = true;
1816 1 : b = n;
1817 : } else { /* append */
1818 : i = newoffsets;
1819 : }
1820 : }
1821 1 : bat_destroy(b);
1822 1 : return i;
1823 : }
1824 :
1825 : static int
1826 5116 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
1827 : {
1828 5116 : void *oupd = upd;
1829 5116 : sql_delta *bat = *batp;
1830 5116 : column_storage *cs = &bat->cs;
1831 5116 : storage *s = ATOMIC_PTR_GET(&t->data);
1832 5116 : assert(!is_oid_nil(rid));
1833 5116 : int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
1834 :
1835 5116 : if (cs->st == ST_DICT) {
1836 : /* possibly a new array is returned */
1837 0 : upd = dict_append_val(tr, batp, upd, 1);
1838 0 : bat = *batp;
1839 0 : cs = &bat->cs;
1840 0 : if (!upd)
1841 : return LOG_ERR;
1842 : }
1843 :
1844 : /* check if rid is insert ? */
1845 5116 : if (!inplace) {
1846 : /* check conflict */
1847 2242 : if (segments_is_deleted(s->segs->h, tr, rid)) {
1848 0 : if (oupd != upd)
1849 0 : GDKfree(upd);
1850 0 : return LOG_CONFLICT;
1851 : }
1852 2242 : BAT *ui, *uv;
1853 :
1854 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1855 : /* currently only one update delta is possible */
1856 2242 : if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1857 0 : if (oupd != upd)
1858 0 : GDKfree(upd);
1859 0 : return LOG_ERR;
1860 : }
1861 :
1862 2242 : assert(uv->ttype);
1863 2242 : assert(BATcount(ui) == BATcount(uv));
1864 4484 : if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
1865 2242 : BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
1866 0 : if (oupd != upd)
1867 0 : GDKfree(upd);
1868 0 : bat_destroy(ui);
1869 0 : bat_destroy(uv);
1870 0 : return LOG_ERR;
1871 : }
1872 2242 : assert(BATcount(ui) == BATcount(uv));
1873 2242 : bat_destroy(ui);
1874 2242 : bat_destroy(uv);
1875 2242 : cs->ucnt++;
1876 : } else {
1877 2874 : BAT *b = NULL;
1878 :
1879 2874 : if((b = temp_descriptor(cs->bid)) == NULL) {
1880 0 : if (oupd != upd)
1881 0 : GDKfree(upd);
1882 0 : return LOG_ERR;
1883 : }
1884 2874 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
1885 0 : if (oupd != upd)
1886 0 : GDKfree(upd);
1887 0 : bat_destroy(b);
1888 0 : return LOG_ERR;
1889 : }
1890 2874 : bat_destroy(b);
1891 : }
1892 5116 : if (oupd != upd)
1893 0 : GDKfree(upd);
1894 : return LOG_OK;
1895 : }
1896 :
1897 : static int
1898 5116 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
1899 : {
1900 5116 : int res = LOG_OK;
1901 5116 : lock_table(tr->store, t->base.id);
1902 5116 : res = cs_update_val(tr, bat, t, rid, upd, is_new);
1903 5116 : unlock_table(tr->store, t->base.id);
1904 5116 : return res;
1905 : }
1906 :
1907 : static int
1908 158473 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
1909 : {
1910 158473 : (void)tr;
1911 158473 : if (!ocs)
1912 : return LOG_OK;
1913 158473 : cs->bid = ocs->bid;
1914 158473 : cs->ebid = ocs->ebid;
1915 158473 : cs->uibid = ocs->uibid;
1916 158473 : cs->uvbid = ocs->uvbid;
1917 158473 : cs->ucnt = ocs->ucnt;
1918 :
1919 158473 : if (temp) {
1920 25910 : cs->bid = temp_copy(cs->bid, true, false);
1921 25910 : if (cs->bid == BID_NIL)
1922 : return LOG_ERR;
1923 : } else {
1924 132563 : temp_dup(cs->bid);
1925 : }
1926 158485 : if (cs->ebid)
1927 6 : temp_dup(cs->ebid);
1928 158485 : cs->ucnt = 0;
1929 158485 : cs->uibid = e_bat(TYPE_oid);
1930 158484 : cs->uvbid = e_bat(type);
1931 158488 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1932 : return LOG_ERR;
1933 158488 : cs->st = ocs->st;
1934 158488 : return LOG_OK;
1935 : }
1936 :
1937 : static void
1938 310838 : destroy_delta(sql_delta *b, bool recursive)
1939 : {
1940 310838 : if (ATOMIC_DEC(&b->cs.refcnt) > 0)
1941 : return;
1942 291532 : if (recursive && b->next)
1943 127301 : destroy_delta(b->next, true);
1944 291532 : if (b->cs.uibid)
1945 93735 : temp_destroy(b->cs.uibid);
1946 291532 : if (b->cs.uvbid)
1947 93735 : temp_destroy(b->cs.uvbid);
1948 291532 : if (b->cs.bid)
1949 291532 : temp_destroy(b->cs.bid);
1950 291532 : if (b->cs.ebid)
1951 61 : temp_destroy(b->cs.ebid);
1952 291532 : b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
1953 291532 : _DELETE(b);
1954 : }
1955 :
1956 : static sql_delta *
1957 15573619 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
1958 : {
1959 15573619 : sql_delta *obat = ATOMIC_PTR_GET(&c->data);
1960 :
1961 15573619 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
1962 15441059 : return obat;
1963 132560 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
1964 : /* abort */
1965 12 : if (update_conflict)
1966 4 : *update_conflict = true;
1967 8 : else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
1968 8 : return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
1969 4 : return NULL;
1970 : }
1971 132548 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
1972 : return NULL;
1973 132532 : sql_delta* bat = ZNEW(sql_delta);
1974 132542 : if (!bat)
1975 : return NULL;
1976 132542 : ATOMIC_INIT(&bat->cs.refcnt, 1);
1977 132542 : if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
1978 0 : destroy_delta(bat, false);
1979 0 : return NULL;
1980 : }
1981 132550 : bat->cs.ts = tr->tid;
1982 : /* only one writer else abort */
1983 132550 : bat->next = obat;
1984 132550 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
1985 0 : bat->next = NULL;
1986 0 : destroy_delta(bat, false);
1987 0 : if (update_conflict)
1988 0 : *update_conflict = true;
1989 0 : return NULL;
1990 : }
1991 : return bat;
1992 : }
1993 :
1994 : static int
1995 8028 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
1996 : {
1997 8028 : int ok = LOG_OK;
1998 :
1999 8028 : if (is_bat) {
2000 2912 : BAT *tids = incoming_tids;
2001 2912 : BAT *values = incoming_values;
2002 2912 : if (BATcount(tids) == 0)
2003 : return LOG_OK;
2004 2912 : ok = delta_update_bat(tr, delta, table, tids, values, is_new);
2005 : } else {
2006 5116 : ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
2007 : }
2008 : return ok;
2009 : }
2010 :
2011 : static int
2012 8051 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
2013 : {
2014 8051 : int res = LOG_OK;
2015 8051 : bool update_conflict = false;
2016 8051 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2017 :
2018 8051 : if (isbat) {
2019 2932 : BAT *t = tids;
2020 2932 : if (!BATcount(t))
2021 : return LOG_OK;
2022 : }
2023 :
2024 7753 : if (c == NULL)
2025 : return LOG_ERR;
2026 :
2027 7753 : if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
2028 4 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2029 :
2030 7749 : assert(delta && delta->cs.ts == tr->tid);
2031 7749 : assert(c->t->persistence != SQL_DECLARED_TABLE);
2032 7749 : if (odelta != delta)
2033 3384 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
2034 :
2035 7749 : odelta = delta;
2036 7749 : if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
2037 : return res;
2038 7749 : assert(delta == odelta);
2039 7749 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2040 0 : res = sql_trans_alter_storage(tr, c, NULL);
2041 : return res;
2042 : }
2043 :
2044 : static sql_delta *
2045 2457 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
2046 : {
2047 2457 : sql_delta *obat = ATOMIC_PTR_GET(&i->data);
2048 :
2049 2457 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
2050 2429 : return obat;
2051 28 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
2052 : /* abort */
2053 0 : if (update_conflict)
2054 0 : *update_conflict = true;
2055 0 : return NULL;
2056 : }
2057 28 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
2058 : return NULL;
2059 28 : sql_delta* bat = ZNEW(sql_delta);
2060 28 : if (!bat)
2061 : return NULL;
2062 28 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2063 33 : if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
2064 0 : destroy_delta(bat, false);
2065 0 : return NULL;
2066 : }
2067 28 : bat->cs.ts = tr->tid;
2068 : /* only one writer else abort */
2069 28 : bat->next = obat;
2070 28 : if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
2071 0 : bat->next = NULL;
2072 0 : destroy_delta(bat, false);
2073 0 : if (update_conflict)
2074 0 : *update_conflict = true;
2075 0 : return NULL;
2076 : }
2077 : return bat;
2078 : }
2079 :
2080 : static int
2081 782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
2082 : {
2083 782 : int res = LOG_OK;
2084 782 : bool update_conflict = false;
2085 782 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
2086 :
2087 782 : if (isbat) {
2088 782 : BAT *t = tids;
2089 782 : if (!BATcount(t))
2090 : return LOG_OK;
2091 : }
2092 :
2093 279 : if (i == NULL)
2094 : return LOG_ERR;
2095 :
2096 279 : if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
2097 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2098 :
2099 279 : assert(delta && delta->cs.ts == tr->tid);
2100 279 : if (odelta != delta)
2101 22 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
2102 :
2103 279 : odelta = delta;
2104 279 : res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
2105 279 : assert(delta == odelta);
2106 : return res;
2107 : }
2108 :
2109 : static int
2110 150314 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
2111 : {
2112 150314 : BAT *b, *oi = i;
2113 150314 : int err = 0;
2114 150314 : sql_delta *bat = *batp;
2115 :
2116 150314 : assert(!offsets || BATcount(offsets) == BATcount(i));
2117 150314 : if (!BATcount(i))
2118 : return LOG_OK;
2119 150314 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
2120 : return LOG_ERR;
2121 :
2122 150314 : lock_column(tr->store, id);
2123 150298 : if (bat->cs.st == ST_DICT) {
2124 13 : BAT *ni = dict_append_bat(tr, batp, oi);
2125 13 : bat = *batp;
2126 13 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2127 0 : bat_destroy(oi);
2128 13 : oi = ni;
2129 13 : if (!oi) {
2130 0 : unlock_column(tr->store, id);
2131 0 : return LOG_ERR;
2132 : }
2133 : }
2134 150298 : if (bat->cs.st == ST_FOR) {
2135 0 : BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
2136 0 : bat = *batp;
2137 0 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2138 0 : bat_destroy(oi);
2139 0 : oi = ni;
2140 0 : if (!oi) {
2141 0 : unlock_column(tr->store, id);
2142 0 : return LOG_ERR;
2143 : }
2144 : }
2145 :
2146 150298 : b = temp_descriptor(bat->cs.bid);
2147 150256 : if (b == NULL) {
2148 0 : unlock_column(tr->store, id);
2149 0 : if (oi != i)
2150 0 : bat_destroy(oi);
2151 0 : return LOG_ERR;
2152 : }
2153 150256 : if (!offsets && offset == b->hseqbase+BATcount(b)) {
2154 150068 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2155 0 : err = 1;
2156 176 : } else if (!offsets) {
2157 176 : if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
2158 0 : err = 1;
2159 12 : } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
2160 0 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2161 0 : err = 1;
2162 12 : } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
2163 0 : err = 1;
2164 : }
2165 150288 : bat_destroy(b);
2166 150293 : unlock_column(tr->store, id);
2167 :
2168 150285 : if (oi != i)
2169 13 : bat_destroy(oi);
2170 150285 : return (err)?LOG_ERR:LOG_OK;
2171 : }
2172 :
2173 : // Look at the offsets and find where the replacements end and the appends begin.
2174 : static BUN
2175 0 : start_of_appends(BAT *offsets, BUN bcnt)
2176 : {
2177 0 : BUN ocnt = BATcount(offsets);
2178 0 : if (ocnt == 0)
2179 : return 0;
2180 :
2181 0 : BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
2182 0 : if (highest < bcnt)
2183 : // all are replacements
2184 : return ocnt;
2185 :
2186 : // reason backward to find the first append.
2187 : // Suppose offsets has 15 entries, bcnt == 100
2188 : // and the highest offset in offsets is 109.
2189 0 : BUN new_bcnt = highest + 1; // 110
2190 0 : BUN nappends = new_bcnt - bcnt; // 10
2191 0 : BUN nreplacements = ocnt - nappends; // 5
2192 :
2193 : // The first append should be to position bcnt
2194 0 : assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
2195 :
2196 : return nreplacements;
2197 : }
2198 :
2199 :
2200 : static int
2201 15288465 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
2202 : {
2203 15288465 : void *oi = i;
2204 15288465 : BAT *b;
2205 15288465 : lock_column(tr->store, id);
2206 15288513 : sql_delta *bat = *batp;
2207 :
2208 15288513 : if (bat->cs.st == ST_DICT) {
2209 : /* possibly a new array is returned */
2210 4 : i = dict_append_val(tr, batp, i, cnt);
2211 4 : bat = *batp;
2212 4 : if (!i) {
2213 0 : unlock_column(tr->store, id);
2214 0 : return LOG_ERR;
2215 : }
2216 : }
2217 15288513 : if (bat->cs.st == ST_FOR) {
2218 : /* possibly a new array is returned */
2219 1 : i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
2220 1 : bat = *batp;
2221 1 : if (!i) {
2222 0 : unlock_column(tr->store, id);
2223 0 : return LOG_ERR;
2224 : }
2225 : }
2226 :
2227 15288513 : b = temp_descriptor(bat->cs.bid);
2228 15288182 : if (b == NULL) {
2229 0 : if (i != oi)
2230 0 : GDKfree(i);
2231 0 : unlock_column(tr->store, id);
2232 0 : return LOG_ERR;
2233 : }
2234 15288182 : BUN bcnt = BATcount(b);
2235 :
2236 15288182 : if (offsets) {
2237 : // The first few might be replacements while later items might be appends.
2238 : // Handle the replacements here while leaving the appends to the code below.
2239 0 : BUN nreplacements = start_of_appends(offsets, bcnt);
2240 :
2241 0 : oid *start = Tloc(offsets, 0);
2242 0 : if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
2243 0 : bat_destroy(b);
2244 0 : if (i != oi)
2245 0 : GDKfree(i);
2246 0 : unlock_column(tr->store, id);
2247 0 : return LOG_ERR;
2248 : }
2249 :
2250 : // Replacements have been handled. The rest are appends.
2251 0 : assert(offset == oid_nil);
2252 0 : offset = bcnt;
2253 0 : cnt -= nreplacements;
2254 : }
2255 :
2256 15288182 : if (bcnt > offset){
2257 487946 : size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
2258 487946 : if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
2259 0 : bat_destroy(b);
2260 0 : if (i != oi)
2261 0 : GDKfree(i);
2262 0 : unlock_column(tr->store, id);
2263 0 : return LOG_ERR;
2264 : }
2265 488039 : cnt -= ccnt;
2266 488039 : offset += ccnt;
2267 : }
2268 15288275 : if (cnt) {
2269 14800225 : if (BATcount(b) < offset) { /* add space */
2270 10920 : BUN d = offset - BATcount(b);
2271 10920 : if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
2272 0 : bat_destroy(b);
2273 0 : if (i != oi)
2274 0 : GDKfree(i);
2275 0 : unlock_column(tr->store, id);
2276 0 : return LOG_ERR;
2277 : }
2278 : }
2279 14800225 : if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
2280 0 : bat_destroy(b);
2281 0 : if (i != oi)
2282 0 : GDKfree(i);
2283 0 : unlock_column(tr->store, id);
2284 0 : return LOG_ERR;
2285 : }
2286 : }
2287 15288485 : bat_destroy(b);
2288 15288326 : if (i != oi)
2289 4 : GDKfree(i);
2290 15288326 : unlock_column(tr->store, id);
2291 15288326 : return LOG_OK;
2292 : }
2293 :
2294 : static int
2295 25910 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
2296 : {
2297 25910 : if (!(bat->segs = new_segments(tr, 0)))
2298 : return LOG_ERR;
2299 25910 : return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
2300 : }
2301 :
2302 : static int
2303 15438805 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type)
2304 : {
2305 15438805 : int ok = LOG_OK;
2306 :
2307 15438805 : if ((*delta)->cs.merged)
2308 36734 : (*delta)->cs.merged = false; /* TODO needs to move */
2309 15438805 : if (isbat) {
2310 150312 : BAT *bat = incoming_data;
2311 :
2312 150312 : if (BATcount(bat))
2313 150309 : ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
2314 : } else {
2315 15288493 : ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
2316 : }
2317 15438603 : return ok;
2318 : }
2319 :
2320 : static int
2321 15437544 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2322 : {
2323 15437544 : int res = LOG_OK;
2324 15437544 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2325 :
2326 15437544 : if (isbat) {
2327 150107 : BAT *t = data;
2328 150107 : if (!BATcount(t))
2329 : return LOG_OK;
2330 : }
2331 :
2332 15436749 : if ((delta = bind_col_data(tr, c, NULL)) == NULL)
2333 : return LOG_ERR;
2334 :
2335 15436727 : assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
2336 :
2337 15436727 : odelta = delta;
2338 15436727 : if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type)) != LOG_OK)
2339 : return res;
2340 15436441 : if (odelta != delta) {
2341 0 : delta->next = odelta;
2342 0 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
2343 0 : delta->next = NULL;
2344 0 : destroy_delta(delta, false);
2345 0 : return LOG_CONFLICT;
2346 : }
2347 : }
2348 15436441 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2349 1 : res = sql_trans_alter_storage(tr, c, NULL);
2350 : return res;
2351 : }
2352 :
2353 : static int
2354 2184 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2355 : {
2356 2184 : int res = LOG_OK;
2357 2184 : sql_delta *delta;
2358 :
2359 2184 : if (isbat) {
2360 1010 : BAT *t = data;
2361 1010 : if (!BATcount(t))
2362 : return LOG_OK;
2363 : }
2364 :
2365 2172 : if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
2366 : return LOG_ERR;
2367 :
2368 2172 : assert(delta->cs.st == ST_DEFAULT);
2369 :
2370 2172 : res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL);
2371 2172 : return res;
2372 : }
2373 :
2374 : static int
2375 71011 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
2376 : {
2377 71011 : int err = 0;
2378 :
2379 : /* TODO check for conflicting updates */
2380 71011 : (void)rid;
2381 71011 : (void)cnt;
2382 535831 : for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
2383 464820 : sql_column *c = n->data;
2384 464820 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
2385 :
2386 : /* check for active updates */
2387 464820 : if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
2388 : return 1;
2389 : }
2390 : return 0;
2391 : }
2392 :
2393 : static int
2394 67545 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
2395 : {
2396 67545 : int in_transaction = segments_in_transaction(tr, t);
2397 :
2398 67545 : lock_table(tr->store, t->base.id);
2399 : /* find segment of rid, split, mark new segment deleted (for tr->tid) */
2400 67545 : segment *seg = s->segs->h, *p = NULL;
2401 51386397 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2402 51386397 : if (seg->start <= rid && seg->end > rid) {
2403 67545 : if (!SEG_VALID_4_DELETE(seg,tr)) {
2404 4 : unlock_table(tr->store, t->base.id);
2405 4 : return LOG_CONFLICT;
2406 : }
2407 67541 : if (deletes_conflict_updates( tr, t, rid, 1)) {
2408 0 : unlock_table(tr->store, t->base.id);
2409 0 : return LOG_CONFLICT;
2410 : }
2411 67541 : if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
2412 0 : unlock_table(tr->store, t->base.id);
2413 0 : return LOG_ERR;
2414 : }
2415 : break;
2416 : }
2417 : }
2418 67541 : unlock_table(tr->store, t->base.id);
2419 67541 : if (!in_transaction)
2420 12763 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2421 : return LOG_OK;
2422 : }
2423 :
2424 : static int
2425 3468 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
2426 : {
2427 3468 : segment *seg = *Seg, *p = NULL;
2428 11633 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2429 11631 : if (seg->start <= start && seg->end > start) {
2430 3518 : size_t lcnt = cnt;
2431 3518 : if (start+lcnt > seg->end)
2432 54 : lcnt = seg->end-start;
2433 3518 : if (SEG_IS_DELETED(seg, tr)) {
2434 47 : start += lcnt;
2435 47 : cnt -= lcnt;
2436 47 : continue;
2437 3471 : } else if (!SEG_VALID_4_DELETE(seg, tr))
2438 1 : return LOG_CONFLICT;
2439 3470 : if (deletes_conflict_updates( tr, t, start, lcnt))
2440 : return LOG_CONFLICT;
2441 3470 : *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
2442 3470 : if (!seg)
2443 : return LOG_ERR;
2444 3470 : start += lcnt;
2445 3470 : cnt -= lcnt;
2446 : }
2447 11583 : if (start+cnt <= seg->end)
2448 : break;
2449 : }
2450 : return LOG_OK;
2451 : }
2452 :
2453 : static int
2454 612 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
2455 : {
2456 612 : segment *seg = s->segs->h;
2457 612 : return seg_delete_range(tr, t, s, &seg, start, cnt);
2458 : }
2459 :
2460 : static int
2461 288 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
2462 : {
2463 288 : int in_transaction = segments_in_transaction(tr, t);
2464 288 : BAT *oi = i; /* update ids */
2465 288 : int ok = LOG_OK;
2466 :
2467 288 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
2468 : return LOG_ERR;
2469 288 : if (BATcount(i)) {
2470 515 : if (BATtdense(i)) {
2471 227 : size_t start = i->tseqbase;
2472 227 : size_t cnt = BATcount(i);
2473 :
2474 227 : lock_table(tr->store, t->base.id);
2475 227 : ok = delete_range(tr, t, s, start, cnt);
2476 227 : unlock_table(tr->store, t->base.id);
2477 61 : } else if (complex_cand(i)) {
2478 0 : struct canditer ci;
2479 0 : oid f = 0, l = 0, cur = 0;
2480 :
2481 0 : canditer_init(&ci, NULL, i);
2482 0 : cur = f = canditer_next(&ci);
2483 :
2484 0 : lock_table(tr->store, t->base.id);
2485 0 : if (!is_oid_nil(f)) {
2486 0 : segment *seg = s->segs->h;
2487 0 : for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
2488 0 : if (cur+1 == l) {
2489 0 : cur++;
2490 0 : continue;
2491 : }
2492 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2493 0 : f = cur = l;
2494 : }
2495 0 : if (ok == LOG_OK)
2496 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2497 : }
2498 0 : unlock_table(tr->store, t->base.id);
2499 : } else {
2500 61 : if (!i->tsorted) {
2501 0 : assert(oi == i);
2502 0 : BAT *ni = NULL;
2503 0 : if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
2504 0 : ok = LOG_ERR;
2505 0 : if (ni)
2506 0 : i = ni;
2507 : }
2508 61 : assert(i->tsorted);
2509 61 : BUN icnt = BATcount(i);
2510 61 : BATiter ii = bat_iterator(i);
2511 61 : oid *o = ii.base, n = o[0]+1;
2512 61 : size_t lcnt = 1;
2513 :
2514 61 : lock_table(tr->store, t->base.id);
2515 61 : segment *seg = s->segs->h;
2516 17005 : for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
2517 16944 : if (o[i] == n) {
2518 16634 : lcnt++;
2519 16634 : n++;
2520 : } else {
2521 310 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2522 310 : lcnt = 0;
2523 : }
2524 16944 : if (!lcnt) {
2525 310 : n = o[i]+1;
2526 310 : lcnt = 1;
2527 : }
2528 : }
2529 61 : bat_iterator_end(&ii);
2530 61 : if (lcnt && ok == LOG_OK)
2531 61 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2532 61 : unlock_table(tr->store, t->base.id);
2533 : }
2534 : }
2535 288 : if (i != oi)
2536 25 : bat_destroy(i);
2537 : // assert
2538 288 : if (!in_transaction)
2539 256 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2540 : return ok;
2541 : }
2542 :
2543 : static void
2544 51067 : destroy_segments(segments *s)
2545 : {
2546 51067 : if (!s || sql_ref_dec(&s->r) > 0)
2547 0 : return;
2548 51067 : segment *seg = s->h;
2549 110050 : while(seg) {
2550 58983 : segment *n = ATOMIC_PTR_GET(&seg->next);
2551 58983 : ATOMIC_PTR_DESTROY(&seg->next);
2552 58983 : _DELETE(seg);
2553 58983 : seg = n;
2554 : }
2555 51067 : _DELETE(s);
2556 : }
2557 :
2558 : static void
2559 52464 : destroy_storage(storage *bat)
2560 : {
2561 52464 : if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
2562 : return;
2563 50925 : if (bat->next)
2564 14670 : destroy_storage(bat->next);
2565 50925 : destroy_segments(bat->segs);
2566 50925 : if (bat->cs.uibid)
2567 30108 : temp_destroy(bat->cs.uibid);
2568 50925 : if (bat->cs.uvbid)
2569 30108 : temp_destroy(bat->cs.uvbid);
2570 50925 : if (bat->cs.bid)
2571 50925 : temp_destroy(bat->cs.bid);
2572 50925 : bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
2573 50925 : _DELETE(bat);
2574 : }
2575 :
2576 : static int
2577 172077 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
2578 : {
2579 172077 : if (uncommitted) {
2580 437454 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2581 286531 : if (!VALID_4_READ(s->ts,tr))
2582 : return 1;
2583 : } else {
2584 148106 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2585 127990 : if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
2586 : return 1;
2587 : }
2588 :
2589 : return 0;
2590 : }
2591 :
2592 : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
2593 :
2594 : storage *
2595 2172279 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
2596 : {
2597 2172279 : storage *obat;
2598 :
2599 2172279 : obat = ATOMIC_PTR_GET(&t->data);
2600 :
2601 2172279 : if (obat->cs.ts != tr->tid)
2602 1498419 : if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
2603 1498364 : if (obat->cs.ts >= TRANSACTION_ID_BASE) {
2604 : /* abort */
2605 15681 : if (clear)
2606 15681 : *clear = true;
2607 15681 : return NULL;
2608 : }
2609 :
2610 2156598 : if (!clear)
2611 : return obat;
2612 :
2613 : /* remainder is only to handle clear */
2614 26094 : if (segments_conflict(tr, obat->segs, 1)) {
2615 184 : *clear = true;
2616 184 : return NULL;
2617 : }
2618 25910 : if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
2619 : return NULL;
2620 25910 : storage *bat = ZNEW(storage);
2621 25910 : if (!bat)
2622 : return NULL;
2623 25910 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2624 25910 : if (dup_storage(tr, obat, bat) != LOG_OK) {
2625 0 : destroy_storage(bat);
2626 0 : return NULL;
2627 : }
2628 25910 : bat->cs.cleared = true;
2629 25910 : bat->cs.ts = tr->tid;
2630 : /* only one writer else abort */
2631 25910 : bat->next = obat;
2632 25910 : if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
2633 5 : bat->next = NULL;
2634 5 : destroy_storage(bat);
2635 5 : if (clear)
2636 5 : *clear = true;
2637 5 : return NULL;
2638 : }
2639 : return bat;
2640 : }
2641 :
2642 : static int
2643 67878 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
2644 : {
2645 67878 : int ok = LOG_OK;
2646 67878 : BAT *b = ib;
2647 67878 : storage *bat;
2648 :
2649 67878 : if (isbat && !BATcount(b))
2650 : return ok;
2651 :
2652 67833 : if (t == NULL)
2653 : return LOG_ERR;
2654 :
2655 67833 : if ((bat = bind_del_data(tr, t, NULL)) == NULL)
2656 : return LOG_ERR;
2657 :
2658 67833 : if (isbat)
2659 288 : ok = storage_delete_bat(tr, t, bat, ib);
2660 : else
2661 67545 : ok = storage_delete_val(tr, t, bat, *(oid*)ib);
2662 : return ok;
2663 : }
2664 :
2665 : static size_t
2666 0 : dcount_col(sql_trans *tr, sql_column *c)
2667 : {
2668 0 : sql_delta *b;
2669 :
2670 0 : if (!isTable(c->t))
2671 : return 0;
2672 0 : b = col_timestamp_delta(tr, c);
2673 0 : if (!b)
2674 : return 1;
2675 :
2676 0 : storage *s = ATOMIC_PTR_GET(&c->t->data);
2677 0 : if (!s || !s->segs->t)
2678 : return 1;
2679 0 : size_t cnt = s->segs->t->end;
2680 0 : if (cnt) {
2681 0 : BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
2682 0 : size_t dcnt = 0;
2683 :
2684 0 : if (v)
2685 0 : dcnt = BATguess_uniques(v, NULL);
2686 0 : return dcnt;
2687 : }
2688 : return cnt;
2689 : }
2690 :
2691 : static BAT *
2692 3713577 : bind_no_view(BAT *b, bool quick)
2693 : {
2694 3713577 : if (VIEWtparent(b)) { /* If it is a view get the parent BAT */
2695 3709862 : BAT *nb = BBP_desc(VIEWtparent(b));
2696 3709862 : bat_destroy(b);
2697 3709901 : if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
2698 : return NULL;
2699 : }
2700 : return b;
2701 : }
2702 :
2703 : static int
2704 0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
2705 : {
2706 0 : int ok = 0;
2707 0 : assert(tr->active);
2708 0 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2709 0 : return 0;
2710 0 : lock_column(tr->store, c->base.id);
2711 0 : if (unique_est) {
2712 0 : sql_delta *d;
2713 0 : if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
2714 0 : BAT *b;
2715 0 : if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
2716 0 : MT_lock_set(&b->theaplock);
2717 0 : b->tunique_est = *unique_est;
2718 0 : MT_lock_unset(&b->theaplock);
2719 0 : bat_destroy(b);
2720 : }
2721 : }
2722 : }
2723 0 : if (min) {
2724 0 : _DELETE(c->min);
2725 0 : size_t minlen = ATOMlen(c->type.type->localtype, min);
2726 0 : if ((c->min = GDKmalloc(minlen)) != NULL) {
2727 0 : memcpy(c->min, min, minlen);
2728 0 : ok = 1;
2729 : }
2730 : }
2731 0 : if (max) {
2732 0 : _DELETE(c->max);
2733 0 : size_t maxlen = ATOMlen(c->type.type->localtype, max);
2734 0 : if ((c->max = GDKmalloc(maxlen)) != NULL) {
2735 0 : memcpy(c->max, max, maxlen);
2736 0 : ok = 1;
2737 : }
2738 : }
2739 0 : unlock_column(tr->store, c->base.id);
2740 0 : return ok;
2741 : }
2742 :
2743 : static int
2744 19 : min_max_col(sql_trans *tr, sql_column *c)
2745 : {
2746 19 : int ok = 0;
2747 19 : BAT *b = NULL;
2748 19 : sql_delta *d = NULL;
2749 :
2750 19 : assert(tr->active);
2751 19 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2752 0 : return 0;
2753 19 : if (c->min && c->max)
2754 : return 1;
2755 19 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2756 19 : if (d->cs.st == ST_FOR)
2757 : return 0;
2758 19 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2759 19 : lock_column(tr->store, c->base.id);
2760 19 : if (c->min && c->max) {
2761 0 : unlock_column(tr->store, c->base.id);
2762 0 : return 1;
2763 : }
2764 19 : _DELETE(c->min);
2765 19 : _DELETE(c->max);
2766 19 : if ((b = bind_col(tr, c, access))) {
2767 19 : if (!(b = bind_no_view(b, false))) {
2768 0 : unlock_column(tr->store, c->base.id);
2769 0 : return 0;
2770 : }
2771 19 : BATiter bi = bat_iterator(b);
2772 19 : if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
2773 16 : const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
2774 16 : size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
2775 :
2776 16 : if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
2777 0 : _DELETE(c->min);
2778 0 : _DELETE(c->max);
2779 : } else {
2780 16 : memcpy(c->min, nmin, minlen);
2781 16 : memcpy(c->max, nmax, maxlen);
2782 16 : ok = 1;
2783 : }
2784 : }
2785 19 : bat_iterator_end(&bi);
2786 19 : bat_destroy(b);
2787 : }
2788 19 : unlock_column(tr->store, c->base.id);
2789 : }
2790 : return ok;
2791 : }
2792 :
2793 : static size_t
2794 17 : count_segs(segment *s)
2795 : {
2796 17 : size_t nr = 0;
2797 :
2798 72 : for( ; s; s = ATOMIC_PTR_GET(&s->next))
2799 55 : nr++;
2800 17 : return nr;
2801 : }
2802 :
2803 : static size_t
2804 34 : count_del(sql_trans *tr, sql_table *t, int access)
2805 : {
2806 34 : storage *d;
2807 :
2808 34 : if (!isTable(t))
2809 : return 0;
2810 34 : d = tab_timestamp_storage(tr, t);
2811 34 : if (!d)
2812 : return 0;
2813 34 : if (access == 2)
2814 0 : return d->cs.ucnt;
2815 34 : if (access == 1)
2816 0 : return count_inserts(d->segs->h, tr);
2817 34 : if (access == 10) /* special case for counting the number of segments */
2818 17 : return count_segs(d->segs->h);
2819 17 : return count_deletes(d->segs->h, tr);
2820 : }
2821 :
2822 : static int
2823 21598 : sorted_col(sql_trans *tr, sql_column *col)
2824 : {
2825 21598 : int sorted = 0;
2826 :
2827 21598 : assert(tr->active);
2828 21598 : if (!isTable(col->t) || !col->t->s)
2829 : return 0;
2830 :
2831 21598 : if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
2832 21579 : BAT *b = bind_col(tr, col, QUICK);
2833 :
2834 21579 : if (b)
2835 21579 : sorted = b->tsorted || b->trevsorted;
2836 : }
2837 : return sorted;
2838 : }
2839 :
2840 : static int
2841 7814 : unique_col(sql_trans *tr, sql_column *col)
2842 : {
2843 7814 : int distinct = 0;
2844 :
2845 7814 : assert(tr->active);
2846 7814 : if (!isTable(col->t) || !col->t->s)
2847 : return 0;
2848 :
2849 7814 : if (col && ATOMIC_PTR_GET(&col->data)) {
2850 7814 : BAT *b = bind_col(tr, col, QUICK);
2851 :
2852 7814 : if (b)
2853 7814 : distinct = b->tkey;
2854 : }
2855 : return distinct;
2856 : }
2857 :
2858 : static int
2859 2048 : double_elim_col(sql_trans *tr, sql_column *col)
2860 : {
2861 2048 : int de = 0;
2862 2048 : sql_delta *d;
2863 :
2864 2048 : assert(tr->active);
2865 2048 : if (!isTable(col->t) || !col->t->s)
2866 : return 0;
2867 :
2868 2048 : if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
2869 6 : if (d->cs.st == ST_DICT) {
2870 6 : BAT *b = bind_col(tr, col, QUICK);
2871 6 : if (b && b->ttype == TYPE_bte)
2872 : de = 1;
2873 0 : else if (b && b->ttype == TYPE_sht)
2874 2048 : de = 2;
2875 : }
2876 2042 : } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
2877 2042 : BAT *b = bind_col(tr, col, QUICK);
2878 :
2879 2042 : if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
2880 2042 : de = GDK_ELIMDOUBLES(b->tvheap);
2881 2042 : if (de)
2882 1812 : de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
2883 : }
2884 1812 : assert(de >= 0 && de <= 16);
2885 : }
2886 : return de;
2887 : }
2888 :
2889 : static int
2890 3749948 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
2891 : {
2892 3749948 : int ok = 0;
2893 3749948 : BAT *b = NULL, *off = NULL, *upv = NULL;
2894 3749948 : sql_delta *d = NULL;
2895 :
2896 3749948 : (void) tr;
2897 3749948 : assert(tr->active);
2898 3749948 : *nonil = false;
2899 3749948 : *unique = false;
2900 3749948 : *unique_est = 0.0;
2901 3749948 : if (!c || !isTable(c->t) || !c->t->s)
2902 : return ok;
2903 :
2904 3749710 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2905 3710512 : if (d->cs.st == ST_FOR) {
2906 27 : *nonil = true; /* TODO for min/max. I will do it later */
2907 27 : return ok;
2908 : }
2909 3710485 : int eclass = c->type.type->eclass;
2910 3710485 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2911 3710485 : if ((b = bind_col(tr, c, access))) {
2912 3710549 : if (!(b = bind_no_view(b, false)))
2913 0 : return ok;
2914 3710551 : BATiter bi = bat_iterator(b);
2915 3710590 : *nonil = bi.nonil && !bi.nil;
2916 :
2917 3710590 : if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
2918 3421905 : d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
2919 2232680 : if (c->min && VALinit(min, bi.type, c->min))
2920 : ok |= 1;
2921 2232626 : else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
2922 2223771 : ok |= 1;
2923 2232667 : if (c->max && VALinit(max, bi.type, c->max))
2924 54 : ok |= 2;
2925 2232628 : else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
2926 2217321 : ok |= 2;
2927 : }
2928 3710573 : if (d->cs.ucnt == 0) {
2929 3707492 : if (d->cs.st == ST_DEFAULT) {
2930 3706792 : *unique = bi.key;
2931 3706792 : *unique_est = bi.unique_est;
2932 3706792 : if (*unique_est == 0)
2933 1226710 : *unique_est = (double)BATguess_uniques(b,NULL);
2934 700 : } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
2935 : /* for dict, check the offsets bat for uniqueness */
2936 700 : MT_lock_set(&off->theaplock);
2937 700 : *unique = off->tkey;
2938 700 : *unique_est = off->tunique_est;
2939 700 : MT_lock_unset(&off->theaplock);
2940 : }
2941 : }
2942 3710565 : bat_iterator_end(&bi);
2943 3710597 : bat_destroy(b);
2944 3710543 : if (*nonil && d->cs.ucnt > 0) {
2945 : /* This could use a quick descriptor */
2946 2312 : if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
2947 0 : *nonil = false;
2948 : } else {
2949 2312 : MT_lock_set(&upv->theaplock);
2950 2312 : *nonil &= upv->tnonil && !upv->tnil;
2951 2312 : MT_lock_unset(&upv->theaplock);
2952 2312 : bat_destroy(upv);
2953 : }
2954 : }
2955 : }
2956 : }
2957 : return ok;
2958 : }
2959 :
2960 : static int
2961 257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
2962 : {
2963 257 : assert(tr->active);
2964 257 : if (!isTable(col->t) || !col->t->s)
2965 : return LOG_OK;
2966 :
2967 252 : if (col && ATOMIC_PTR_GET(&col->data)) {
2968 252 : BAT *b = bind_col(tr, col, QUICK);
2969 :
2970 252 : if (b) { /* add props for ranges [min, max> */
2971 252 : MT_lock_set(&b->theaplock);
2972 252 : if (add_range) {
2973 179 : BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
2974 179 : if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
2975 103 : BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
2976 : else
2977 76 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2978 179 : if (!pt->with_nills || !col->null)
2979 117 : BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
2980 : } else {
2981 73 : BATrmprop_nolock(b, GDK_MIN_BOUND);
2982 73 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2983 73 : BATrmprop_nolock(b, GDK_NOT_NULL);
2984 : }
2985 252 : MT_lock_unset(&b->theaplock);
2986 : }
2987 : }
2988 : return LOG_OK;
2989 : }
2990 :
2991 : static int
2992 3942 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
2993 : {
2994 3942 : assert(tr->active);
2995 3942 : if (!isTable(col->t) || !col->t->s)
2996 : return LOG_OK;
2997 :
2998 3912 : if (col && ATOMIC_PTR_GET(&col->data)) {
2999 3912 : BAT *b = bind_col(tr, col, QUICK);
3000 :
3001 3912 : if (b) { /* add props for ranges [min, max> */
3002 3912 : if (not_null) {
3003 3910 : BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
3004 : } else {
3005 2 : BATrmprop(b, GDK_NOT_NULL);
3006 : }
3007 : }
3008 : }
3009 : return LOG_OK;
3010 : }
3011 :
3012 : static int
3013 26430 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
3014 : {
3015 26430 : sqlstore *store = tr->store;
3016 26430 : int bid = log_find_bat(store->logger, id);
3017 26430 : if (bid <= 0)
3018 : return LOG_ERR;
3019 26430 : cs->bid = temp_dup(bid);
3020 26430 : cs->ucnt = 0;
3021 26430 : cs->uibid = e_bat(TYPE_oid);
3022 26430 : cs->uvbid = e_bat(type);
3023 26430 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
3024 : return LOG_ERR;
3025 : return LOG_OK;
3026 : }
3027 :
3028 : static int
3029 68253 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
3030 : {
3031 68253 : int res = LOG_OK;
3032 68253 : gdk_return ok;
3033 68253 : BAT *b = temp_descriptor(bat->cs.bid);
3034 :
3035 68253 : if (b == NULL)
3036 : return LOG_ERR;
3037 :
3038 68253 : if (!bat->cs.uibid)
3039 68245 : bat->cs.uibid = e_bat(TYPE_oid);
3040 68253 : if (!bat->cs.uvbid)
3041 68245 : bat->cs.uvbid = e_bat(b->ttype);
3042 68253 : if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
3043 0 : res = LOG_ERR;
3044 68253 : if (GDKinmemory(0)) {
3045 177 : bat_destroy(b);
3046 177 : return res;
3047 : }
3048 :
3049 68076 : bat_set_access(b, BAT_READ);
3050 68076 : sqlstore *store = tr->store;
3051 68076 : ok = log_bat_persists(store->logger, b, id);
3052 68076 : bat_destroy(b);
3053 68076 : if(res != LOG_OK)
3054 : return res;
3055 68076 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3056 : }
3057 :
3058 : static int
3059 0 : new_persistent_delta( sql_delta *bat)
3060 : {
3061 0 : bat->cs.ucnt = 0;
3062 0 : return LOG_OK;
3063 : }
3064 :
3065 : static void
3066 129684 : create_delta( sql_delta *d, BAT *b)
3067 : {
3068 129684 : bat_set_access(b, BAT_READ);
3069 129684 : d->cs.bid = temp_create(b);
3070 129684 : d->cs.uibid = d->cs.uvbid = 0;
3071 129684 : d->cs.ucnt = 0;
3072 129684 : }
3073 :
3074 : static bat
3075 7293 : copyBat (bat i, int type, oid seq)
3076 : {
3077 7293 : BAT *b, *tb;
3078 7293 : bat res;
3079 :
3080 7293 : if (!i)
3081 : return i;
3082 7293 : tb = quick_descriptor(i);
3083 7293 : if (tb == NULL)
3084 : return 0;
3085 7293 : b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
3086 7293 : if (b == NULL)
3087 : return 0;
3088 :
3089 7293 : bat_set_access(b, BAT_READ);
3090 :
3091 7293 : res = temp_create(b);
3092 7293 : bat_destroy(b);
3093 7293 : return res;
3094 : }
3095 :
3096 : static int
3097 149933 : create_col(sql_trans *tr, sql_column *c)
3098 : {
3099 149933 : int ok = LOG_OK, new = 0;
3100 149933 : int type = c->type.type->localtype;
3101 149933 : sql_delta *bat = ATOMIC_PTR_GET(&c->data);
3102 :
3103 149933 : if (!bat) {
3104 149933 : new = 1;
3105 149933 : bat = ZNEW(sql_delta);
3106 149933 : if (!bat)
3107 : return LOG_ERR;
3108 149933 : ATOMIC_PTR_SET(&c->data, bat);
3109 149933 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3110 : }
3111 :
3112 149933 : if (new)
3113 149933 : bat->cs.ts = tr->tid;
3114 :
3115 149933 : if (!isNew(c)&& !isTempTable(c->t)){
3116 20227 : bat->cs.ts = tr->ts;
3117 20227 : ok = load_cs(tr, &bat->cs, type, c->base.id);
3118 20227 : if (ok == LOG_OK && c->storage_type) {
3119 4 : if (strcmp(c->storage_type, "DICT") == 0) {
3120 2 : sqlstore *store = tr->store;
3121 2 : int bid = log_find_bat(store->logger, -c->base.id);
3122 2 : if (bid <= 0)
3123 : return LOG_ERR;
3124 2 : bat->cs.ebid = temp_dup(bid);
3125 2 : bat->cs.st = ST_DICT;
3126 2 : } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
3127 2 : bat->cs.st = ST_FOR;
3128 : }
3129 : }
3130 20227 : return ok;
3131 129706 : } else if (bat && bat->cs.bid) {
3132 0 : return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
3133 : } else {
3134 129706 : sql_column *fc = NULL;
3135 129706 : size_t cnt = 0;
3136 :
3137 : /* alter ? */
3138 129706 : if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
3139 79816 : storage *s = tab_timestamp_storage(tr, fc->t);
3140 79816 : if (s == NULL)
3141 : return LOG_ERR;
3142 79816 : cnt = segs_end(s->segs, tr, c->t);
3143 : }
3144 129706 : if (cnt && fc != c) {
3145 22 : sql_delta *d = ATOMIC_PTR_GET(&fc->data);
3146 :
3147 22 : if (d->cs.bid) {
3148 22 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3149 22 : if(bat->cs.bid == BID_NIL)
3150 22 : ok = LOG_ERR;
3151 : }
3152 22 : if (d->cs.uibid) {
3153 10 : bat->cs.uibid = e_bat(TYPE_oid);
3154 10 : if (bat->cs.uibid == BID_NIL)
3155 22 : ok = LOG_ERR;
3156 : }
3157 22 : if (d->cs.uvbid) {
3158 10 : bat->cs.uvbid = e_bat(type);
3159 10 : if(bat->cs.uvbid == BID_NIL)
3160 0 : ok = LOG_ERR;
3161 : }
3162 : } else {
3163 129684 : BAT *b = bat_new(type, c->t->sz, PERSISTENT);
3164 129684 : if (!b) {
3165 : ok = LOG_ERR;
3166 : } else {
3167 129684 : create_delta(ATOMIC_PTR_GET(&c->data), b);
3168 129684 : bat_destroy(b);
3169 : }
3170 :
3171 129684 : if (!new) {
3172 0 : bat->cs.uibid = e_bat(TYPE_oid);
3173 0 : if (bat->cs.uibid == BID_NIL)
3174 0 : ok = LOG_ERR;
3175 0 : bat->cs.uvbid = e_bat(type);
3176 0 : if(bat->cs.uvbid == BID_NIL)
3177 0 : ok = LOG_ERR;
3178 : }
3179 : }
3180 129706 : bat->cs.ucnt = 0;
3181 :
3182 129706 : if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
3183 91 : trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
3184 : }
3185 : return ok;
3186 : }
3187 :
3188 : static int
3189 61885 : log_create_col_(sql_trans *tr, sql_column *c)
3190 : {
3191 61885 : assert(!isTempTable(c->t));
3192 61885 : return log_create_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3193 : }
3194 :
3195 : static int
3196 87 : log_create_col(sql_trans *tr, sql_change *change)
3197 : {
3198 87 : return log_create_col_(tr, (sql_column*)change->obj);
3199 : }
3200 :
3201 : static int
3202 117077 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
3203 : {
3204 117077 : (void) t; // TODO transaction_layer_revamp: remove if unnecessary
3205 117077 : (void)oldest;
3206 117077 : assert(delta->cs.ts == tr->tid);
3207 117077 : delta->cs.ts = commit_ts;
3208 :
3209 117077 : assert(delta->next == NULL);
3210 117077 : if (!delta->cs.merged)
3211 117076 : merge_delta(delta);
3212 117077 : if (!tr->parent)
3213 117073 : base->new = 0;
3214 117077 : return LOG_OK;
3215 : }
3216 :
3217 : static int
3218 91 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3219 : {
3220 91 : sql_column *c = (sql_column*)change->obj;
3221 91 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3222 91 : if (!tr->parent)
3223 90 : c->base.new = 0;
3224 91 : return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
3225 : }
3226 :
3227 : /* will be called for new idx's and when new index columns are created */
3228 : static int
3229 9242 : create_idx(sql_trans *tr, sql_idx *ni)
3230 : {
3231 9242 : int ok = LOG_OK, new = 0;
3232 9242 : sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
3233 9242 : int type = TYPE_lng;
3234 :
3235 9242 : if (oid_index(ni->type))
3236 954 : type = TYPE_oid;
3237 :
3238 9242 : if (!bat) {
3239 9242 : new = 1;
3240 9242 : bat = ZNEW(sql_delta);
3241 9242 : if (!bat)
3242 : return LOG_ERR;
3243 9242 : ATOMIC_PTR_INIT(&ni->data, bat);
3244 9242 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3245 : }
3246 :
3247 9242 : if (new)
3248 9242 : bat->cs.ts = tr->tid;
3249 :
3250 9242 : if (!isNew(ni) && !isTempTable(ni->t)){
3251 1971 : bat->cs.ts = 1;
3252 1971 : return load_cs(tr, &bat->cs, type, ni->base.id);
3253 7271 : } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
3254 0 : return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
3255 : } else {
3256 7271 : sql_column *c = ol_first_node(ni->t->columns)->data;
3257 7271 : sql_delta *d = col_timestamp_delta(tr, c);
3258 :
3259 7271 : if (d) {
3260 : /* Here we also handle indices created through alter stmts */
3261 : /* These need to be created aligned to the existing data */
3262 7271 : if (d->cs.bid) {
3263 7271 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3264 7271 : if(bat->cs.bid == BID_NIL)
3265 7271 : ok = LOG_ERR;
3266 : }
3267 : } else {
3268 : return LOG_ERR;
3269 : }
3270 :
3271 7271 : bat->cs.ucnt = 0;
3272 :
3273 7271 : if (!new) {
3274 0 : bat->cs.uibid = e_bat(TYPE_oid);
3275 0 : if (bat->cs.uibid == BID_NIL)
3276 0 : ok = LOG_ERR;
3277 0 : bat->cs.uvbid = e_bat(type);
3278 0 : if(bat->cs.uvbid == BID_NIL)
3279 0 : ok = LOG_ERR;
3280 : }
3281 7271 : bat->cs.ucnt = 0;
3282 7271 : if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
3283 631 : trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
3284 : }
3285 : return ok;
3286 : }
3287 :
3288 : static int
3289 6368 : log_create_idx_(sql_trans *tr, sql_idx *i)
3290 : {
3291 6368 : assert(!isTempTable(i->t));
3292 6368 : return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3293 : }
3294 :
3295 : static int
3296 617 : log_create_idx(sql_trans *tr, sql_change *change)
3297 : {
3298 617 : return log_create_idx_(tr, (sql_idx*)change->obj);
3299 : }
3300 :
3301 : static int
3302 631 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3303 : {
3304 631 : sql_idx *i = (sql_idx*)change->obj;
3305 631 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3306 631 : if (!tr->parent)
3307 631 : i->base.new = 0;
3308 631 : return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
3309 : return LOG_OK;
3310 : }
3311 :
3312 : static int
3313 4232 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
3314 : {
3315 4232 : int ok = load_cs(tr, &s->cs, TYPE_msk, id);
3316 4232 : BAT *b = NULL, *ib = NULL;
3317 :
3318 4232 : if (ok != LOG_OK)
3319 : return ok;
3320 4232 : if (!(b = temp_descriptor(s->cs.bid)))
3321 : return LOG_ERR;
3322 4232 : ib = b;
3323 :
3324 4232 : if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
3325 0 : bat_destroy(ib);
3326 0 : return LOG_ERR;
3327 : }
3328 :
3329 4232 : if (BATcount(b)) {
3330 334 : if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
3331 0 : bat_destroy(ib);
3332 0 : return LOG_ERR;
3333 : }
3334 495 : if (BATtdense(b)) {
3335 161 : size_t start = b->tseqbase;
3336 161 : size_t cnt = BATcount(b);
3337 161 : ok = delete_range(tr, t, s, start, cnt);
3338 : } else {
3339 173 : assert(b->tsorted);
3340 173 : BUN icnt = BATcount(b);
3341 173 : BATiter bi = bat_iterator(b);
3342 173 : size_t lcnt = 1;
3343 173 : oid n;
3344 173 : segment *seg = s->segs->h;
3345 173 : if (complex_cand(b)) {
3346 0 : oid o = * (oid *) Tpos(&bi, 0);
3347 0 : n = o + 1;
3348 0 : for (BUN i = 1; i < icnt; i++) {
3349 0 : o = * (oid *) Tpos(&bi, i);
3350 0 : if (o == n) {
3351 0 : lcnt++;
3352 0 : n++;
3353 : } else {
3354 0 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3355 : break;
3356 : lcnt = 0;
3357 : }
3358 0 : if (!lcnt) {
3359 0 : n = o + 1;
3360 0 : lcnt = 1;
3361 : }
3362 : }
3363 : } else {
3364 173 : oid *o = bi.base;
3365 173 : n = o[0]+1;
3366 281583 : for (size_t i=1; i<icnt; i++) {
3367 281410 : if (o[i] == n) {
3368 278925 : lcnt++;
3369 278925 : n++;
3370 : } else {
3371 2485 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3372 : break;
3373 : lcnt = 0;
3374 : }
3375 281410 : if (!lcnt) {
3376 2485 : n = o[i]+1;
3377 2485 : lcnt = 1;
3378 : }
3379 : }
3380 : }
3381 173 : if (lcnt && ok == LOG_OK)
3382 173 : ok = delete_range(tr, t, s, n-lcnt, lcnt);
3383 173 : bat_iterator_end(&bi);
3384 : }
3385 334 : if (ok == LOG_OK)
3386 6192 : for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
3387 5858 : if (seg->ts == tr->tid)
3388 3043 : seg->ts = 1;
3389 : } else {
3390 3898 : if (ok == LOG_OK) {
3391 3898 : BAT *bb = quick_descriptor(s->cs.bid);
3392 :
3393 3898 : if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
3394 : ok = LOG_ERR;
3395 : } else {
3396 3898 : segment *seg = s->segs->h;
3397 3898 : if (seg->ts == tr->tid)
3398 3898 : seg->ts = 1;
3399 : }
3400 : }
3401 : }
3402 4232 : if (b != ib)
3403 4232 : bat_destroy(b);
3404 4232 : bat_destroy(ib);
3405 :
3406 4232 : return ok;
3407 : }
3408 :
3409 : static int
3410 25055 : create_del(sql_trans *tr, sql_table *t)
3411 : {
3412 25055 : int ok = LOG_OK, new = 0;
3413 25055 : BAT *b;
3414 25055 : storage *bat = ATOMIC_PTR_GET(&t->data);
3415 :
3416 25055 : if (!bat) {
3417 25055 : new = 1;
3418 25055 : bat = ZNEW(storage);
3419 25055 : if(!bat)
3420 : return LOG_ERR;
3421 25055 : ATOMIC_PTR_INIT(&t->data, bat);
3422 25055 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3423 25055 : bat->cs.ts = tr->tid;
3424 : }
3425 :
3426 25055 : if (!isNew(t) && !isTempTable(t)) {
3427 4232 : bat->cs.ts = tr->ts;
3428 4232 : return load_storage(tr, t, bat, t->base.id);
3429 20823 : } else if (bat->cs.bid) {
3430 : return ok;
3431 : } else {
3432 20823 : assert(!bat->segs);
3433 20823 : if (!(bat->segs = new_segments(tr, 0)))
3434 : return LOG_ERR;
3435 :
3436 20823 : b = bat_new(TYPE_msk, t->sz, PERSISTENT);
3437 20823 : if(b != NULL) {
3438 20823 : bat_set_access(b, BAT_READ);
3439 20823 : bat->cs.bid = temp_create(b);
3440 20823 : bat_destroy(b);
3441 : } else {
3442 : return LOG_ERR;
3443 : }
3444 20823 : if (new)
3445 27341 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
3446 : }
3447 : return ok;
3448 : }
3449 :
3450 : static int
3451 189770 : log_segment(sql_trans *tr, segment *s, sqlid id)
3452 : {
3453 189770 : sqlstore *store = tr->store;
3454 189770 : msk m = s->deleted;
3455 189770 : return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
3456 : }
3457 :
3458 : static int
3459 96931 : log_segments(sql_trans *tr, segments *segs, sqlid id)
3460 : {
3461 : /* log segments */
3462 96931 : lock_table(tr->store, id);
3463 493175 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3464 396244 : unlock_table(tr->store, id);
3465 396244 : if (seg->ts == tr->tid && seg->end-seg->start) {
3466 143298 : if (log_segment(tr, seg, id) != LOG_OK) {
3467 : return LOG_ERR;
3468 : }
3469 : }
3470 396244 : lock_table(tr->store, id);
3471 : }
3472 96931 : unlock_table(tr->store, id);
3473 96931 : return LOG_OK;
3474 : }
3475 :
3476 : static int
3477 12542 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
3478 : {
3479 12542 : BAT *b;
3480 12542 : int ok = LOG_OK;
3481 :
3482 12542 : if (GDKinmemory(0))
3483 : return LOG_OK;
3484 :
3485 12509 : b = temp_descriptor(bat->cs.bid);
3486 12509 : if (b == NULL)
3487 : return LOG_ERR;
3488 :
3489 12509 : sqlstore *store = tr->store;
3490 12509 : bat_set_access(b, BAT_READ);
3491 12509 : if (ok == LOG_OK)
3492 12509 : ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
3493 12509 : if (ok == LOG_OK)
3494 12509 : ok = log_segments(tr, bat->segs, t->base.id);
3495 12509 : bat_destroy(b);
3496 12509 : return ok;
3497 : }
3498 :
3499 : static int
3500 12556 : log_create_del(sql_trans *tr, sql_change *change)
3501 : {
3502 12556 : int ok = LOG_OK;
3503 12556 : sql_table *t = (sql_table*)change->obj;
3504 :
3505 12556 : if (t->base.deleted)
3506 : return ok;
3507 12542 : assert(!isTempTable(t));
3508 12542 : ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
3509 12542 : if (ok == LOG_OK) {
3510 74340 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3511 61798 : sql_column *c = n->data;
3512 :
3513 61798 : ok = log_create_col_(tr, c);
3514 : }
3515 12542 : if (t->idxs) {
3516 18305 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3517 5763 : sql_idx *i = n->data;
3518 :
3519 5763 : if (ATOMIC_PTR_GET(&i->data))
3520 5751 : ok = log_create_idx_(tr, i);
3521 : }
3522 : }
3523 : }
3524 : return ok;
3525 : }
3526 :
3527 : static int
3528 20825 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3529 : {
3530 20825 : int ok = LOG_OK;
3531 20825 : sql_table *t = (sql_table*)change->obj;
3532 20825 : storage *dbat = ATOMIC_PTR_GET(&t->data);
3533 :
3534 20825 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
3535 120 : assert(isTempTable(t));
3536 120 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
3537 120 : if (commit_ts) dbat->segs->h->ts = commit_ts;
3538 120 : return ok;
3539 : }
3540 :
3541 20705 : if (!commit_ts) /* rollback handled by ? */
3542 : return ok;
3543 18851 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
3544 18851 : assert(ok == LOG_OK);
3545 18851 : if (ok != LOG_OK)
3546 : return ok;
3547 18851 : merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
3548 18851 : assert(dbat->cs.ts == tr->tid);
3549 18851 : dbat->cs.ts = commit_ts;
3550 18851 : if (ok == LOG_OK) {
3551 129442 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3552 110591 : sql_column *c = n->data;
3553 110591 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3554 :
3555 110591 : ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
3556 : }
3557 18851 : if (t->idxs) {
3558 24627 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3559 5776 : sql_idx *i = n->data;
3560 5776 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3561 :
3562 5776 : if (delta)
3563 5764 : ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
3564 : }
3565 : }
3566 18851 : if (!tr->parent)
3567 18849 : t->base.new = 0;
3568 : }
3569 18851 : if (!tr->parent)
3570 18849 : t->base.new = 0;
3571 : return ok;
3572 : }
3573 :
3574 : static int
3575 19748 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
3576 : {
3577 19748 : gdk_return ok = GDK_SUCCEED;
3578 :
3579 19748 : sqlstore *store = tr->store;
3580 19748 : if (!GDKinmemory(0) && b && b->cs.bid)
3581 18828 : ok = log_bat_transient(store->logger, id);
3582 19748 : if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
3583 25 : ok = log_bat_transient(store->logger, -id);
3584 19748 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3585 : }
3586 :
3587 : static int
3588 167669 : destroy_col(sqlstore *store, sql_column *c)
3589 : {
3590 167669 : (void)store;
3591 167669 : if (ATOMIC_PTR_GET(&c->data))
3592 167669 : destroy_delta(ATOMIC_PTR_GET(&c->data), true);
3593 167669 : ATOMIC_PTR_SET(&c->data, NULL);
3594 167669 : return LOG_OK;
3595 : }
3596 :
3597 : static int
3598 17865 : log_destroy_col_(sql_trans *tr, sql_column *c)
3599 : {
3600 17865 : int ok = LOG_OK;
3601 17865 : assert(!isTempTable(c->t));
3602 17865 : if (!tr->parent) /* don't write save point commits */
3603 17865 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3604 17865 : return ok;
3605 : }
3606 :
3607 : static int
3608 17865 : log_destroy_col(sql_trans *tr, sql_change *change)
3609 : {
3610 17865 : sql_column *c = (sql_column*)change->obj;
3611 17865 : int res = log_destroy_col_(tr, c);
3612 17865 : change->obj = NULL;
3613 17865 : column_destroy(tr->store, c);
3614 17865 : return res;
3615 : }
3616 :
3617 : static int
3618 10591 : destroy_idx(sqlstore *store, sql_idx *i)
3619 : {
3620 10591 : (void)store;
3621 10591 : if (ATOMIC_PTR_GET(&i->data))
3622 10591 : destroy_delta(ATOMIC_PTR_GET(&i->data), true);
3623 10591 : ATOMIC_PTR_SET(&i->data, NULL);
3624 10591 : return LOG_OK;
3625 : }
3626 :
3627 : static int
3628 1959 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
3629 : {
3630 1959 : int ok = LOG_OK;
3631 1959 : assert(!isTempTable(i->t));
3632 1959 : if (ATOMIC_PTR_GET(&i->data)) {
3633 1883 : if (!tr->parent) /* don't write save point commits */
3634 1883 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3635 : }
3636 1959 : return ok;
3637 : }
3638 :
3639 : static int
3640 1959 : log_destroy_idx(sql_trans *tr, sql_change *change)
3641 : {
3642 1959 : sql_idx *i = (sql_idx*)change->obj;
3643 1959 : int res = log_destroy_idx_(tr, i);
3644 1959 : change->obj = NULL;
3645 1959 : idx_destroy(tr->store, i);
3646 1959 : return res;
3647 : }
3648 :
3649 : static int
3650 26558 : destroy_del(sqlstore *store, sql_table *t)
3651 : {
3652 26558 : (void)store;
3653 26558 : if (ATOMIC_PTR_GET(&t->data))
3654 26554 : destroy_storage(ATOMIC_PTR_GET(&t->data));
3655 26558 : ATOMIC_PTR_SET(&t->data, NULL);
3656 26558 : return LOG_OK;
3657 : }
3658 :
3659 : static int
3660 3251 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
3661 : {
3662 3251 : gdk_return ok = GDK_SUCCEED;
3663 :
3664 3251 : sqlstore *store = tr->store;
3665 3251 : if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
3666 3251 : bat && bat->cs.bid)
3667 3251 : ok = log_bat_transient(store->logger, id);
3668 3251 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3669 : }
3670 :
3671 : static int
3672 3251 : log_destroy_del(sql_trans *tr, sql_change *change)
3673 : {
3674 3251 : int ok = LOG_OK;
3675 3251 : sql_table *t = (sql_table*)change->obj;
3676 :
3677 3251 : assert(!isTempTable(t));
3678 3251 : ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
3679 3251 : return ok;
3680 : }
3681 :
3682 : static int
3683 23163 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3684 : {
3685 23163 : (void)tr;
3686 23163 : (void)change;
3687 23163 : (void)commit_ts;
3688 23163 : (void)oldest;
3689 23163 : if (commit_ts)
3690 23140 : change->handled = true;
3691 23163 : return 0;
3692 : }
3693 :
3694 : static int
3695 3322 : drop_del(sql_trans *tr, sql_table *t)
3696 : {
3697 3322 : int ok = LOG_OK;
3698 :
3699 3322 : if (!isNew(t)) {
3700 3322 : storage *bat = ATOMIC_PTR_GET(&t->data);
3701 3387 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, isTempTable(t) ? NULL : &log_destroy_del);
3702 : }
3703 3322 : return ok;
3704 : }
3705 :
3706 : static int
3707 17880 : drop_col(sql_trans *tr, sql_column *c)
3708 : {
3709 17880 : assert(!isNew(c));
3710 17880 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
3711 17880 : trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, isTempTable(c->t) ? NULL : &log_destroy_col);
3712 17880 : return LOG_OK;
3713 : }
3714 :
3715 : static int
3716 1961 : drop_idx(sql_trans *tr, sql_idx *i)
3717 : {
3718 1961 : assert(!isNew(i));
3719 1961 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
3720 1961 : trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, isTempTable(i->t) ? NULL : &log_destroy_idx);
3721 1961 : return LOG_OK;
3722 : }
3723 :
3724 :
3725 : static BUN
3726 129251 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
3727 : {
3728 129251 : BAT *b;
3729 129251 : BUN sz = 0;
3730 :
3731 129251 : (void)tr;
3732 129251 : assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
3733 129251 : if (cs->bid && renew) {
3734 129251 : b = quick_descriptor(cs->bid);
3735 129235 : if (b) {
3736 129235 : sz += BATcount(b);
3737 129235 : if (cs->st == ST_DICT) {
3738 2 : bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
3739 2 : BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
3740 :
3741 2 : if (nebid == BID_NIL || !n) {
3742 0 : temp_destroy(nebid);
3743 0 : bat_destroy(n);
3744 0 : return BUN_NONE;
3745 : }
3746 2 : temp_destroy(cs->ebid);
3747 2 : cs->ebid = nebid;
3748 2 : if (!temp)
3749 2 : bat_set_access(n, BAT_READ);
3750 2 : temp_destroy(cs->bid);
3751 2 : cs->bid = temp_create(n); /* create empty copy */
3752 2 : bat_destroy(n);
3753 : } else {
3754 129233 : bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
3755 :
3756 129244 : if (nbid == BID_NIL)
3757 : return BUN_NONE;
3758 129244 : temp_destroy(cs->bid);
3759 129229 : cs->bid = nbid;
3760 : }
3761 : } else {
3762 : return BUN_NONE;
3763 : }
3764 : }
3765 129231 : if (cs->uibid) {
3766 129087 : temp_destroy(cs->uibid);
3767 129114 : cs->uibid = 0;
3768 : }
3769 129258 : if (cs->uvbid) {
3770 129114 : temp_destroy(cs->uvbid);
3771 129114 : cs->uvbid = 0;
3772 : }
3773 129258 : cs->cleared = true;
3774 129258 : cs->ucnt = 0;
3775 129258 : return sz;
3776 : }
3777 :
3778 : static BUN
3779 129105 : clear_col(sql_trans *tr, sql_column *c, bool renew)
3780 : {
3781 129105 : bool update_conflict = false;
3782 129105 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
3783 :
3784 129105 : if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
3785 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3786 129107 : assert(c->t->persistence != SQL_DECLARED_TABLE);
3787 129107 : if (odelta != delta)
3788 129112 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
3789 129099 : if (delta)
3790 129099 : return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
3791 : return 0;
3792 : }
3793 :
3794 : static BUN
3795 21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
3796 : {
3797 21 : bool update_conflict = false;
3798 21 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
3799 :
3800 21 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
3801 15 : return 0;
3802 6 : if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
3803 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3804 6 : assert(i->t->persistence != SQL_DECLARED_TABLE);
3805 6 : if (odelta != delta)
3806 6 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
3807 6 : if (delta)
3808 6 : return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
3809 : return 0;
3810 : }
3811 :
3812 : static int
3813 142 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
3814 : {
3815 142 : if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
3816 : return LOG_ERR;
3817 142 : if (s->segs)
3818 142 : destroy_segments(s->segs);
3819 142 : if (!(s->segs = new_segments(tr, 0)))
3820 : return LOG_ERR;
3821 : return LOG_OK;
3822 : }
3823 :
3824 :
3825 : /*
3826 : * Clear the table, in general this means replacing the storage,
3827 : * but in case of earlier deletes (or inserts by this transaction), we only mark
3828 : * all segments as deleted.
3829 : * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
3830 : */
3831 : static BUN
3832 41826 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
3833 : {
3834 41826 : int clear = !in_transaction, ok = LOG_OK;
3835 41826 : bool conflict = false;
3836 41826 : storage *bat;
3837 :
3838 41877 : if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
3839 15870 : return conflict?BUN_NONE-1:BUN_NONE;
3840 :
3841 25956 : if (!clear) {
3842 51 : lock_table(tr->store, t->base.id);
3843 51 : ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
3844 51 : unlock_table(tr->store, t->base.id);
3845 : }
3846 25956 : assert(t->persistence != SQL_DECLARED_TABLE);
3847 25956 : if (!in_transaction)
3848 25908 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
3849 25956 : if (ok == LOG_ERR)
3850 : return BUN_NONE;
3851 25956 : if (ok == LOG_CONFLICT)
3852 0 : return BUN_NONE - 1;
3853 : return LOG_OK;
3854 : }
3855 :
3856 : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
3857 : static BUN
3858 41826 : clear_table(sql_trans *tr, sql_table *t)
3859 : {
3860 41826 : node *n = ol_first_node(t->columns);
3861 41826 : sql_column *c = n->data;
3862 41826 : storage *d = tab_timestamp_storage(tr, t);
3863 41826 : int in_transaction, clear;
3864 41826 : BUN sz, clear_ok;
3865 :
3866 41826 : if (!d)
3867 : return BUN_NONE;
3868 41826 : in_transaction = segments_in_transaction(tr, t);
3869 41825 : clear = !in_transaction;
3870 41825 : sz = count_col(tr, c, CNT_ACTIVE);
3871 41826 : if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
3872 : return clear_ok;
3873 :
3874 25956 : if (in_transaction)
3875 : return sz;
3876 :
3877 155012 : for (; n; n = n->next) {
3878 129107 : c = n->data;
3879 :
3880 129107 : if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
3881 0 : return clear_ok;
3882 : }
3883 25905 : if (t->idxs) {
3884 25926 : for (n = ol_first_node(t->idxs); n; n = n->next) {
3885 21 : sql_idx *ci = n->data;
3886 :
3887 21 : if (isTable(ci->t) && idx_has_column(ci->type) &&
3888 21 : (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
3889 0 : return clear_ok;
3890 : }
3891 : }
3892 : return sz;
3893 : }
3894 :
3895 : static int
3896 158389 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
3897 : {
3898 158389 : sqlstore *store = tr->store;
3899 158389 : gdk_return ok = GDK_SUCCEED;
3900 :
3901 158389 : (void) t;
3902 158389 : (void) segs;
3903 158389 : if (GDKinmemory(0))
3904 : return LOG_OK;
3905 :
3906 158382 : if (cs->cleared) {
3907 155070 : assert(cs->ucnt == 0);
3908 155070 : BAT *ins = temp_descriptor(cs->bid);
3909 155070 : if (!ins)
3910 : return LOG_ERR;
3911 155070 : assert(!isEbat(ins));
3912 155070 : bat_set_access(ins, BAT_READ);
3913 155070 : ok = log_bat_persists(store->logger, ins, id);
3914 155070 : bat_destroy(ins);
3915 155070 : if (ok == GDK_SUCCEED && cs->ebid) {
3916 56 : BAT *ins = temp_descriptor(cs->ebid);
3917 56 : if (!ins)
3918 : return LOG_ERR;
3919 56 : assert(!isEbat(ins));
3920 56 : bat_set_access(ins, BAT_READ);
3921 56 : ok = log_bat_persists(store->logger, ins, -id);
3922 56 : bat_destroy(ins);
3923 : }
3924 155070 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3925 : }
3926 :
3927 3312 : assert(!isTempTable(t));
3928 :
3929 3312 : if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
3930 2741 : BAT *ui = temp_descriptor(cs->uibid);
3931 2741 : BAT *uv = temp_descriptor(cs->uvbid);
3932 : /* any updates */
3933 2741 : if (ui == NULL || uv == NULL) {
3934 : ok = GDK_FAIL;
3935 2741 : } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
3936 2741 : ok = log_delta(store->logger, ui, uv, id);
3937 2741 : bat_destroy(ui);
3938 2741 : bat_destroy(uv);
3939 : }
3940 2741 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3941 : }
3942 :
3943 : static inline int
3944 58523 : tr_log_table_start(sql_trans *tr, sql_table *t) {
3945 58523 : sqlstore *store = tr->store;
3946 58523 : return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3947 : }
3948 :
3949 : static inline int
3950 58523 : tr_log_table_end(sql_trans *tr, sql_table *t) {
3951 58523 : sqlstore *store = tr->store;
3952 58523 : return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3953 : }
3954 :
3955 : static int
3956 58523 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
3957 : {
3958 58523 : sqlstore *store = tr->store;
3959 58523 : gdk_return ok = GDK_SUCCEED;
3960 :
3961 58523 : size_t end = segs_end(segs, tr, t);
3962 :
3963 58523 : if (tr_log_table_start(tr, t) != LOG_OK)
3964 : return LOG_ERR;
3965 :
3966 58523 : size_t nr_appends = 0;
3967 :
3968 58523 : lock_table(tr->store, t->base.id);
3969 416464 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3970 357941 : unlock_table(tr->store, t->base.id);
3971 :
3972 357941 : if (seg->ts == tr->tid && seg->end-seg->start) {
3973 112710 : if (!seg->deleted) {
3974 46472 : if (log_segment(tr, seg, t->base.id) != LOG_OK)
3975 : return LOG_ERR;
3976 :
3977 46472 : nr_appends += (seg->end - seg->start);
3978 : }
3979 : }
3980 357941 : lock_table(tr->store, t->base.id);
3981 : }
3982 58523 : unlock_table(tr->store, t->base.id);
3983 :
3984 405484 : for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
3985 346961 : sql_column *c = n->data;
3986 346961 : column_storage *cs = ATOMIC_PTR_GET(&c->data);
3987 :
3988 346961 : if (cs->cleared) {
3989 3 : ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
3990 3 : continue;
3991 : }
3992 :
3993 346958 : lock_table(tr->store, t->base.id);
3994 346958 : if (!cs->cleared) {
3995 2431312 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
3996 2084354 : unlock_table(tr->store, t->base.id);
3997 2084354 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
3998 : /* append col*/
3999 262358 : BAT *ins = temp_descriptor(cs->bid);
4000 262358 : if (ins == NULL)
4001 : return LOG_ERR;
4002 262358 : assert(BATcount(ins) >= cur->end);
4003 262358 : ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
4004 262358 : bat_destroy(ins);
4005 : }
4006 2084354 : lock_table(tr->store, t->base.id);
4007 : }
4008 : }
4009 346958 : unlock_table(tr->store, t->base.id);
4010 :
4011 346958 : if (ok == GDK_SUCCEED && cs->ebid) {
4012 19 : BAT *ins = temp_descriptor(cs->ebid);
4013 19 : if (ins == NULL)
4014 : return LOG_ERR;
4015 19 : if (BATcount(ins) > ins->batInserted)
4016 17 : ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
4017 19 : BATcommit(ins, BATcount(ins));
4018 19 : bat_destroy(ins);
4019 : }
4020 : }
4021 :
4022 58523 : if (t->idxs) {
4023 63705 : for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
4024 5182 : sql_idx *i = n->data;
4025 :
4026 5182 : if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
4027 4404 : continue;
4028 778 : column_storage *cs = ATOMIC_PTR_GET(&i->data);
4029 :
4030 778 : if (cs) {
4031 778 : if (cs->cleared) {
4032 0 : ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4033 0 : continue;
4034 : }
4035 :
4036 778 : lock_table(tr->store, t->base.id);
4037 2387 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4038 1609 : unlock_table(tr->store, t->base.id);
4039 1609 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4040 : /* append idx */
4041 730 : BAT *ins = temp_descriptor(cs->bid);
4042 730 : if (ins == NULL)
4043 : return LOG_ERR;
4044 730 : assert(BATcount(ins) >= cur->end);
4045 730 : ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
4046 730 : bat_destroy(ins);
4047 : }
4048 1609 : lock_table(tr->store, t->base.id);
4049 : }
4050 778 : unlock_table(tr->store, t->base.id);
4051 : }
4052 : }
4053 : }
4054 :
4055 58523 : if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
4056 0 : return LOG_ERR;
4057 :
4058 : return LOG_OK;
4059 : }
4060 :
4061 : static int
4062 84422 : log_storage(sql_trans *tr, sql_table *t, storage *s)
4063 : {
4064 84422 : int ok = LOG_OK;
4065 84422 : bool cleared = s->cs.cleared;
4066 84422 : if (ok == LOG_OK && cleared)
4067 25899 : ok = tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
4068 25899 : if (ok == LOG_OK)
4069 84422 : ok = log_segments(tr, s->segs, t->base.id);
4070 84422 : if (ok == LOG_OK && !cleared)
4071 58523 : ok = log_table_append(tr, t, s->segs);
4072 84422 : return ok;
4073 : }
4074 :
4075 : static void
4076 382673 : merge_cs( column_storage *cs, const char* caller)
4077 : {
4078 382673 : if (cs->bid && cs->ucnt) {
4079 2750 : BAT *cur = temp_descriptor(cs->bid);
4080 2750 : BAT *ui = temp_descriptor(cs->uibid);
4081 2750 : BAT *uv = temp_descriptor(cs->uvbid);
4082 :
4083 2750 : if (!cur || !ui || !uv) {
4084 0 : bat_destroy(ui);
4085 0 : bat_destroy(uv);
4086 0 : bat_destroy(cur);
4087 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4088 : return;
4089 : }
4090 2750 : assert(BATcount(ui) == BATcount(uv));
4091 :
4092 : /* any updates */
4093 2750 : assert(!isEbat(cur));
4094 2750 : if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
4095 0 : bat_destroy(ui);
4096 0 : bat_destroy(uv);
4097 0 : bat_destroy(cur);
4098 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4099 : return;
4100 : }
4101 : /* cleanup the old deltas */
4102 2750 : temp_destroy(cs->uibid);
4103 2750 : temp_destroy(cs->uvbid);
4104 2750 : cs->uibid = e_bat(TYPE_oid);
4105 2750 : cs->uvbid = e_bat(cur->ttype);
4106 2750 : assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
4107 2750 : cs->ucnt = 0;
4108 2750 : bat_destroy(ui);
4109 2750 : bat_destroy(uv);
4110 2750 : bat_destroy(cur);
4111 : }
4112 382673 : cs->cleared = false;
4113 382673 : cs->merged = true;
4114 382673 : return;
4115 : }
4116 :
4117 : static void
4118 340702 : merge_delta( sql_delta *obat)
4119 : {
4120 340702 : if (obat && obat->next && !obat->cs.merged)
4121 132497 : merge_delta(obat->next);
4122 340702 : merge_cs(&obat->cs, __func__);
4123 340702 : }
4124 :
4125 : static void
4126 41971 : merge_storage(storage *tdb)
4127 : {
4128 41971 : merge_cs(&tdb->cs, __func__);
4129 :
4130 41971 : if (tdb->next) {
4131 443 : destroy_storage(tdb->next);
4132 443 : tdb->next = NULL;
4133 : }
4134 41971 : }
4135 :
4136 : static sql_delta *
4137 1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
4138 : {
4139 : /* commit ie copy back to the parent transaction */
4140 1 : if (delta && delta->cs.ts == commit_ts && delta->next) {
4141 1 : sql_delta *od = delta->next;
4142 1 : if (od->cs.ts == commit_ts) {
4143 0 : sql_delta t = *od, *n = od->next;
4144 0 : *od = *delta;
4145 0 : od->next = n;
4146 0 : *delta = t;
4147 0 : delta->next = NULL;
4148 0 : destroy_delta(delta, true);
4149 0 : return od;
4150 : }
4151 : }
4152 : return delta;
4153 : }
4154 :
4155 : static int
4156 132461 : log_update_col( sql_trans *tr, sql_change *change)
4157 : {
4158 132461 : sql_column *c = (sql_column*)change->obj;
4159 132461 : assert(!isTempTable(c->t));
4160 :
4161 132461 : if (isDeleted(c->t)) {
4162 0 : change->handled = true;
4163 0 : return LOG_OK;
4164 : }
4165 :
4166 132461 : if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
4167 132461 : storage *s = ATOMIC_PTR_GET(&c->t->data);
4168 132461 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
4169 132461 : return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
4170 : }
4171 : return LOG_OK;
4172 : }
4173 :
4174 : static int
4175 216 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
4176 : {
4177 216 : sqlstore *store = Store;
4178 :
4179 216 : sql_delta *d = (sql_delta*)change->data;
4180 216 : if (d->cs.ts < oldest) {
4181 82 : destroy_delta(d, false);
4182 82 : if (change->commit == &commit_update_idx)
4183 2 : table_destroy(store, ((sql_idx*)change->obj)->t);
4184 : else
4185 80 : table_destroy(store, ((sql_column*)change->obj)->t);
4186 82 : return 1;
4187 : }
4188 134 : if (d->cs.ts > TRANSACTION_ID_BASE)
4189 82 : d->cs.ts = store_get_timestamp(store) + 1;
4190 : return 0;
4191 : }
4192 :
4193 : static int
4194 9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
4195 : {
4196 9 : sqlstore *store = Store;
4197 :
4198 9 : storage *d = (storage*)change->data;
4199 9 : if (d->cs.ts < oldest) {
4200 3 : destroy_storage(d);
4201 3 : table_destroy(store, (sql_table*)change->obj);
4202 3 : return 1;
4203 : }
4204 6 : if (d->cs.ts > TRANSACTION_ID_BASE)
4205 3 : d->cs.ts = store_get_timestamp(store) + 1;
4206 : return 0;
4207 : }
4208 :
4209 : static int
4210 132579 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
4211 : {
4212 132579 : (void) type; // TODO transaction_layer_revamp remove if remains unused
4213 :
4214 132579 : sql_delta *delta = ATOMIC_PTR_GET(data);
4215 :
4216 132579 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4217 3 : int ok = LOG_OK;
4218 3 : assert(isTempTable(t));
4219 3 : if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
4220 0 : ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
4221 3 : if (!tr->parent)
4222 0 : t->base.new = base->new = 0;
4223 3 : change->handled = true;
4224 3 : return ok;
4225 : }
4226 :
4227 132576 : if (commit_ts)
4228 132494 : delta->cs.ts = commit_ts;
4229 132576 : if (!commit_ts) { /* rollback */
4230 82 : sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
4231 :
4232 82 : if (change->ts && t->base.new) /* handled by create col */
4233 : return LOG_OK;
4234 82 : if (o != d) {
4235 0 : while(o && o->next != d)
4236 : o = o->next;
4237 : }
4238 82 : if (o == ATOMIC_PTR_GET(data))
4239 82 : ATOMIC_PTR_SET(data, d->next);
4240 : else
4241 0 : o->next = d->next;
4242 82 : d->next = NULL;
4243 82 : change->cleanup = &tc_gc_rollbacked;
4244 132494 : } else if (!tr->parent) {
4245 : /* merge deltas */
4246 363422 : while (delta && delta->cs.ts > oldest)
4247 230929 : delta = delta->next;
4248 132493 : if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
4249 31926 : lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
4250 31926 : merge_delta(delta);
4251 31926 : unlock_column(tr->store, base->id);
4252 : }
4253 1 : } else if (tr->parent) /* move delta into older and cleanup current save points */
4254 1 : ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
4255 : return LOG_OK;
4256 : }
4257 :
4258 : static int
4259 132551 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4260 : {
4261 :
4262 132551 : sql_column *c = (sql_column*)change->obj;
4263 132551 : sql_base* base = &c->base;
4264 132551 : sql_table* t = c->t;
4265 132551 : ATOMIC_PTR_TYPE* data = &c->data;
4266 132551 : int type = c->type.type->localtype;
4267 :
4268 132551 : if (change->handled || isDeleted(c->t))
4269 : return LOG_OK;
4270 :
4271 132551 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4272 : }
4273 :
4274 : static int
4275 26 : log_update_idx( sql_trans *tr, sql_change *change)
4276 : {
4277 26 : sql_idx *i = (sql_idx*)change->obj;
4278 26 : assert(!isTempTable(i->t));
4279 :
4280 26 : if (isDeleted(i->t)) {
4281 0 : change->handled = true;
4282 0 : return LOG_OK;
4283 : }
4284 :
4285 26 : if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
4286 26 : storage *s = ATOMIC_PTR_GET(&i->t->data);
4287 26 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
4288 26 : return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
4289 : }
4290 : return LOG_OK;
4291 : }
4292 :
4293 : static int
4294 28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4295 : {
4296 28 : sql_idx *i = (sql_idx*)change->obj;
4297 28 : sql_base* base = &i->base;
4298 28 : sql_table* t = i->t;
4299 28 : ATOMIC_PTR_TYPE* data = &i->data;
4300 28 : int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
4301 :
4302 28 : if (change->handled || isDeleted(i->t))
4303 : return LOG_OK;
4304 :
4305 28 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4306 : }
4307 :
4308 : static storage *
4309 26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
4310 : {
4311 26 : if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
4312 0 : storage *od = dbat->next;
4313 0 : if (od->cs.ts == commit_ts) {
4314 0 : storage t = *od, *n = od->next;
4315 0 : *od = *dbat;
4316 0 : od->next = n;
4317 0 : *dbat = t;
4318 0 : dbat->next = NULL;
4319 0 : destroy_storage(dbat);
4320 0 : return od;
4321 : }
4322 : }
4323 : return dbat;
4324 : }
4325 :
4326 : static int
4327 84422 : log_update_del( sql_trans *tr, sql_change *change)
4328 : {
4329 84422 : sql_table *t = (sql_table*)change->obj;
4330 84422 : assert(!isTempTable(t));
4331 :
4332 84422 : if (isDeleted(t)) {
4333 0 : change->handled = true;
4334 0 : return LOG_OK;
4335 : }
4336 :
4337 84422 : if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
4338 84422 : return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
4339 : return LOG_OK;
4340 : }
4341 :
4342 : static int
4343 88898 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4344 : {
4345 88898 : int ok = LOG_OK;
4346 88898 : sql_table *t = (sql_table*)change->obj;
4347 88898 : storage *dbat = ATOMIC_PTR_GET(&t->data);
4348 :
4349 88898 : if (change->handled || isDeleted(t))
4350 : return ok;
4351 :
4352 88898 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4353 22 : assert(isTempTable(t));
4354 22 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
4355 22 : if (commit_ts) dbat->segs->h->ts = commit_ts;
4356 22 : change->handled = true;
4357 22 : return ok;
4358 : }
4359 :
4360 88876 : lock_table(tr->store, t->base.id);
4361 88876 : if (!commit_ts) { /* rollback */
4362 4154 : if (dbat->cs.ts == tr->tid) {
4363 6 : if (change->ts && t->base.new) { /* handled by the create table */
4364 3 : unlock_table(tr->store, t->base.id);
4365 3 : return ok;
4366 : }
4367 3 : storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
4368 :
4369 3 : if (o != d) {
4370 0 : while(o && o->next != d)
4371 : o = o->next;
4372 : }
4373 3 : if (o == ATOMIC_PTR_GET(&t->data)) {
4374 3 : assert(d->next);
4375 3 : ATOMIC_PTR_SET(&t->data, d->next);
4376 : } else
4377 0 : o->next = d->next;
4378 3 : d->next = NULL;
4379 3 : change->cleanup = &tc_gc_rollbacked_storage;
4380 : } else
4381 4148 : rollback_segments(dbat->segs, tr, change, oldest);
4382 84722 : } else if (ok == LOG_OK && !tr->parent) {
4383 84696 : if (dbat->cs.ts == tr->tid) /* cleared table */
4384 25901 : dbat->cs.ts = commit_ts;
4385 :
4386 84696 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
4387 84696 : if (ok == LOG_OK) {
4388 84696 : merge_segments(dbat, tr, change, commit_ts, oldest);
4389 84696 : if (oldest == commit_ts)
4390 41971 : merge_storage(dbat);
4391 : }
4392 84696 : if (dbat)
4393 84696 : dbat->cs.cleared = false;
4394 26 : } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
4395 26 : merge_segments(dbat, tr, change, commit_ts, oldest);
4396 26 : ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
4397 26 : storage *s = change->data;
4398 26 : if (s->cs.ts == tr->tid)
4399 0 : s->cs.ts = commit_ts;
4400 : }
4401 88873 : unlock_table(tr->store, t->base.id);
4402 88873 : return ok;
4403 : }
4404 :
4405 : /* only rollback (content version) case for now */
4406 : static int
4407 18037 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
4408 : {
4409 18037 : sqlstore *store = Store;
4410 18037 : sql_column *c = (sql_column*)change->obj;
4411 :
4412 18037 : if (!c) /* cleaned earlier */
4413 : return 1;
4414 :
4415 172 : if (change->handled || isDeleted(c->t)) {
4416 0 : column_destroy(store, c);
4417 0 : return 1;
4418 : }
4419 :
4420 : /* savepoint commit (did it merge ?) */
4421 172 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4422 0 : column_destroy(store, c);
4423 0 : return 1;
4424 : }
4425 172 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4426 : return 0;
4427 171 : sql_delta *d = (sql_delta*)change->data;
4428 171 : if (d && d->next) {
4429 :
4430 66 : if (d->cs.ts > oldest)
4431 : return LOG_OK; /* cannot cleanup yet */
4432 :
4433 : // d is oldest reachable delta
4434 63 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4435 62 : destroy_delta(d->next, true);
4436 62 : d->next = NULL;
4437 : }
4438 63 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4439 63 : merge_delta(d);
4440 63 : unlock_column(store, c->base.id);
4441 : }
4442 168 : column_destroy(store, c);
4443 168 : return 1;
4444 : }
4445 :
4446 : static int
4447 856479 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
4448 : {
4449 856479 : sqlstore *store = Store;
4450 856479 : sql_column *c = (sql_column*)change->obj;
4451 :
4452 856479 : if (!c) /* cleaned earlier */
4453 : return 1;
4454 :
4455 856479 : if (change->handled || isDeleted(c->t)) {
4456 3 : table_destroy(store, c->t);
4457 3 : return 1;
4458 : }
4459 :
4460 : /* savepoint commit (did it merge ?) */
4461 856476 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4462 73290 : table_destroy(store, c->t);
4463 73290 : return 1;
4464 : }
4465 783186 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4466 : return 0;
4467 783185 : sql_delta *d = (sql_delta*)change->data;
4468 783185 : if (d && d->next) {
4469 :
4470 783185 : if (d->cs.ts > oldest)
4471 : return LOG_OK; /* cannot cleanup yet */
4472 :
4473 : // d is oldest reachable delta
4474 59114 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4475 5107 : destroy_delta(d->next, true);
4476 5107 : d->next = NULL;
4477 : }
4478 59114 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4479 59114 : merge_delta(d);
4480 59114 : unlock_column(store, c->base.id);
4481 : }
4482 59114 : table_destroy(store, c->t);
4483 59114 : return 1;
4484 : }
4485 :
4486 : static int
4487 2592 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
4488 : {
4489 2592 : sqlstore *store = Store;
4490 2592 : sql_idx *i = (sql_idx*)change->obj;
4491 :
4492 2592 : if (!i) /* cleaned earlier */
4493 : return 1;
4494 :
4495 633 : if (change->handled || isDeleted(i->t)) {
4496 0 : idx_destroy(store, i);
4497 0 : return 1;
4498 : }
4499 :
4500 : /* savepoint commit (did it merge ?) */
4501 633 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4502 0 : idx_destroy(store, i);
4503 0 : return 1;
4504 : }
4505 633 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4506 : return 0;
4507 633 : sql_delta *d = (sql_delta*)change->data;
4508 633 : if (d->next) {
4509 :
4510 0 : if (d->cs.ts > oldest)
4511 : return LOG_OK; /* cannot cleanup yet */
4512 :
4513 : // d is oldest reachable delta
4514 0 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4515 0 : destroy_delta(d->next, true);
4516 0 : d->next = NULL;
4517 : }
4518 0 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4519 0 : merge_delta(d);
4520 0 : unlock_column(store, i->base.id);
4521 : }
4522 633 : idx_destroy(store, i);
4523 633 : return 1;
4524 : }
4525 :
4526 : static int
4527 26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
4528 : {
4529 26 : sqlstore *store = Store;
4530 26 : sql_idx *i = (sql_idx*)change->obj;
4531 :
4532 26 : if (!i) /* cleaned earlier */
4533 : return 1;
4534 :
4535 26 : if (change->handled || isDeleted(i->t)) {
4536 0 : table_destroy(store, i->t);
4537 0 : return 1;
4538 : }
4539 :
4540 : /* savepoint commit (did it merge ?) */
4541 26 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4542 0 : table_destroy(store, i->t);
4543 0 : return 1;
4544 : }
4545 26 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4546 : return 0;
4547 26 : sql_delta *d = (sql_delta*)change->data;
4548 26 : if (d->next) {
4549 :
4550 26 : if (d->cs.ts > oldest)
4551 : return LOG_OK; /* cannot cleanup yet */
4552 :
4553 : // d is oldest reachable delta
4554 26 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4555 26 : destroy_delta(d->next, true);
4556 26 : d->next = NULL;
4557 : }
4558 26 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4559 26 : merge_delta(d);
4560 26 : unlock_column(store, i->base.id);
4561 : }
4562 26 : table_destroy(store, i->t);
4563 26 : return 1;
4564 : }
4565 :
4566 : static int
4567 245110 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
4568 : {
4569 245110 : sqlstore *store = Store;
4570 245110 : sql_table *t = (sql_table*)change->obj;
4571 :
4572 245110 : if (change->handled || isDeleted(t)) {
4573 3496 : table_destroy(store, t);
4574 3496 : return 1;
4575 : }
4576 : /* savepoint commit (did it merge ?) */
4577 241614 : if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
4578 14670 : table_destroy(store, t);
4579 14670 : return 1;
4580 : }
4581 226944 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4582 : return 0;
4583 226916 : storage *d = (storage*)change->data;
4584 226916 : if (d->next) {
4585 154898 : if (d->cs.ts > oldest)
4586 : return LOG_OK; /* cannot cleanup yet */
4587 :
4588 10789 : destroy_storage(d->next);
4589 10789 : d->next = NULL;
4590 : }
4591 82807 : table_destroy(store, t);
4592 82807 : return 1;
4593 : }
4594 :
4595 : static int
4596 30574 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
4597 : {
4598 30574 : if (nr == 0)
4599 : return LOG_OK;
4600 30574 : assert (nr > 0);
4601 30574 : if ((!offsets || !*offsets) && nr == total) {
4602 30545 : *offset = slot;
4603 30545 : return LOG_OK;
4604 : }
4605 29 : if (!*offsets) {
4606 7 : *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
4607 7 : if (!*offsets)
4608 : return LOG_ERR;
4609 : }
4610 29 : oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
4611 16168 : for(size_t i = 0; i < nr; i++)
4612 16139 : dst[i] = slot + i;
4613 29 : (*offsets)->batCount += nr;
4614 29 : (*offsets)->theap->dirty = true;
4615 29 : return LOG_OK;
4616 : }
4617 :
4618 : static int
4619 30552 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4620 : {
4621 30552 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4622 30552 : assert(s->segs);
4623 30552 : ulng oldest = store_oldest(tr->store, NULL);
4624 30552 : BUN slot = 0;
4625 30552 : size_t total = cnt;
4626 :
4627 30552 : if (!locked)
4628 30552 : lock_table(tr->store, t->base.id);
4629 : /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
4630 61236 : for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4631 30686 : if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* reuse old deleted or rolled back append */
4632 35 : if ((seg->end - seg->start) >= cnt) {
4633 : /* if previous is claimed before we could simply adjust the end/start */
4634 13 : if (p && p->ts == tr->tid && !p->deleted) {
4635 2 : slot = p->end;
4636 2 : p->end += cnt;
4637 2 : seg->start += cnt;
4638 2 : if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
4639 : ok = LOG_ERR;
4640 : break;
4641 : }
4642 2 : cnt = 0;
4643 2 : break;
4644 : }
4645 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4646 11 : size_t rcnt = seg->end - seg->start;
4647 11 : if (rcnt > cnt)
4648 : rcnt = cnt;
4649 11 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
4650 : ok = LOG_ERR;
4651 : break;
4652 : }
4653 : }
4654 33 : seg->ts = tr->tid;
4655 33 : seg->deleted = false;
4656 33 : slot = seg->start;
4657 33 : if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
4658 : ok = LOG_ERR;
4659 : break;
4660 : }
4661 33 : cnt -= (seg->end - seg->start);
4662 : }
4663 : }
4664 30552 : if (ok == LOG_OK && cnt) {
4665 30539 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4666 29405 : slot = s->segs->t->end;
4667 29405 : s->segs->t->end += cnt;
4668 : } else {
4669 1134 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4670 : ok = LOG_ERR;
4671 : } else {
4672 1134 : if (!s->segs->h)
4673 0 : s->segs->h = s->segs->t;
4674 1134 : slot = s->segs->t->start;
4675 : }
4676 : }
4677 30539 : if (ok == LOG_OK)
4678 30539 : ok = add_offsets(slot, cnt, total, offset, offsets);
4679 : }
4680 30552 : if (!locked)
4681 30552 : unlock_table(tr->store, t->base.id);
4682 :
4683 30552 : if (ok == LOG_OK) {
4684 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4685 30552 : if (!in_transaction) {
4686 1119 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4687 1119 : in_transaction = true;
4688 : }
4689 30552 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4690 30538 : tr->logchanges += (lng) total;
4691 30552 : if (*offsets) {
4692 7 : BAT *pos = *offsets;
4693 7 : assert(BATcount(pos) == total);
4694 7 : BATsetcount(pos, total); /* set other properties */
4695 7 : pos->tnil = false;
4696 7 : pos->tnonil = true;
4697 7 : pos->tkey = true;
4698 7 : pos->tsorted = true;
4699 7 : pos->trevsorted = false;
4700 : }
4701 : }
4702 30552 : return ok;
4703 : }
4704 :
4705 : static int
4706 2041648 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4707 : {
4708 2041648 : if (cnt > 1 && offsets)
4709 30552 : return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
4710 2011096 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4711 2011096 : assert(s->segs);
4712 2011096 : ulng oldest = store_oldest(tr->store, NULL);
4713 2011096 : BUN slot = 0;
4714 2011096 : int reused = 0;
4715 :
4716 2011096 : if (!locked)
4717 1886083 : lock_table(tr->store, t->base.id);
4718 : /* naive vacuum approach, iterator through segments, check for large enough deleted segments
4719 : * or create new segment at the end */
4720 7482660 : for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4721 5534944 : if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* reuse old deleted or rolled back append */
4722 :
4723 63380 : if ((seg->end - seg->start) >= cnt) {
4724 :
4725 : /* if previous is claimed before we could simply adjust the end/start */
4726 63380 : if (p && p->ts == tr->tid && !p->deleted) {
4727 48384 : slot = p->end;
4728 48384 : p->end += cnt;
4729 48384 : seg->start += cnt;
4730 48384 : reused = 1;
4731 48384 : break;
4732 : }
4733 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4734 14996 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
4735 : ok = LOG_ERR;
4736 : break;
4737 : }
4738 : }
4739 14996 : seg->ts = tr->tid;
4740 14996 : seg->deleted = false;
4741 14996 : slot = seg->start;
4742 14996 : reused = 1;
4743 14996 : break;
4744 : }
4745 : }
4746 2011096 : if (ok == LOG_OK && !reused) {
4747 1947716 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4748 1912613 : slot = s->segs->t->end;
4749 1912613 : s->segs->t->end += cnt;
4750 : } else {
4751 35103 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4752 : ok = LOG_ERR;
4753 : } else {
4754 35103 : if (!s->segs->h)
4755 0 : s->segs->h = s->segs->t;
4756 35103 : slot = s->segs->t->start;
4757 : }
4758 : }
4759 : }
4760 2011096 : if (!locked)
4761 1886083 : unlock_table(tr->store, t->base.id);
4762 :
4763 2011096 : if (ok == LOG_OK) {
4764 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4765 2011096 : if (!in_transaction) {
4766 48974 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4767 48974 : in_transaction = true;
4768 : }
4769 2011096 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4770 2010665 : tr->logchanges += (lng) cnt;
4771 2011096 : *offset = slot;
4772 : }
4773 : return ok;
4774 : }
4775 :
4776 : /*
4777 : * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
4778 : * of the global transaction and mark the newly added storage slots unused on the global
4779 : * level but used on the local transaction level. Besides this the local transaction needs
4780 : * to update (and mark unused) any slot in between the old end and new slots.
4781 : * */
4782 : static int
4783 1916635 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4784 : {
4785 1916635 : storage *s;
4786 :
4787 : /* we have a single segment structure for each persistent table
4788 : * for temporary tables each has its own */
4789 1916635 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4790 : return LOG_ERR;
4791 :
4792 1916635 : return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
4793 : }
4794 :
4795 : /* some tables cannot be updated concurrently (user/roles etc) */
4796 : static int
4797 125017 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4798 : {
4799 125017 : storage *s;
4800 125017 : int res = 0;
4801 :
4802 : /* we have a single segment structure for each persistent table
4803 : * for temporary tables each has its own */
4804 125017 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4805 : /* TODO check for other inserts ! */
4806 : return LOG_ERR;
4807 :
4808 125017 : lock_table(tr->store, t->base.id);
4809 125017 : if ((res = segments_conflict(tr, s->segs, 1))) {
4810 4 : unlock_table(tr->store, t->base.id);
4811 4 : return LOG_CONFLICT;
4812 : }
4813 125013 : res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
4814 125013 : unlock_table(tr->store, t->base.id);
4815 125013 : return res;
4816 : }
4817 :
4818 : static int
4819 20966 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
4820 : {
4821 20966 : storage *s;
4822 20966 : int res = 0;
4823 :
4824 20966 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4825 : return LOG_ERR;
4826 :
4827 20966 : lock_table(tr->store, t->base.id);
4828 20966 : res = segments_conflict(tr, s->segs, uncommitted);
4829 20966 : unlock_table(tr->store, t->base.id);
4830 20966 : return res ? LOG_CONFLICT : LOG_OK;
4831 : }
4832 :
4833 : static size_t
4834 1356400 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
4835 : {
4836 1356400 : size_t cnt = 0;
4837 :
4838 1469333 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
4839 : ;
4840 :
4841 3426236 : for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
4842 2069838 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
4843 113889 : cnt += s->end - s->start;
4844 : }
4845 1356398 : return cnt;
4846 : }
4847 :
4848 : static BAT *
4849 1356439 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
4850 : {
4851 1356439 : lock_table(tr->store, t->base.id);
4852 1356401 : segment *s = S->segs->h;
4853 : /* step one no deletes -> dense range */
4854 1356401 : uint32_t cur = 0;
4855 1356401 : size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
4856 1356399 : if (!dnr) {
4857 1253985 : unlock_table(tr->store, t->base.id);
4858 1254025 : return BATdense(start, start, end-start);
4859 : }
4860 :
4861 102414 : BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
4862 102414 : if (!b) {
4863 0 : unlock_table(tr->store, t->base.id);
4864 0 : return NULL;
4865 : }
4866 :
4867 102414 : uint32_t *restrict dst = Tloc(b, 0);
4868 58626033 : for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
4869 58581977 : if (s->end < start)
4870 85105 : continue;
4871 58496872 : if (s->start >= end)
4872 : break;
4873 58438514 : msk m = (SEG_IS_VALID(s, tr));
4874 58438514 : size_t lnr = s->end-s->start;
4875 58438514 : if (s->start < start)
4876 6640 : lnr -= (start - s->start);
4877 58438514 : if (s->end > end)
4878 5808 : lnr -= s->end - end;
4879 :
4880 58438514 : if (m) {
4881 900547 : size_t used = pos&31, end = 32;
4882 900547 : if (used) {
4883 773608 : if (lnr < (32-used))
4884 494841 : end = used + lnr;
4885 773608 : assert(end > used);
4886 773608 : cur |= ((1U << (end - used)) - 1) << used;
4887 773608 : lnr -= end - used;
4888 773608 : pos += end - used;
4889 773608 : if (end == 32) {
4890 278767 : *dst++ = cur;
4891 278767 : cur = 0;
4892 : }
4893 : }
4894 900547 : size_t full = lnr/32;
4895 900547 : size_t rest = lnr%32;
4896 900547 : if (full > 0) {
4897 222033 : memset(dst, ~0, full * sizeof(*dst));
4898 222033 : dst += full;
4899 222033 : lnr -= full * 32;
4900 222033 : pos += full * 32;
4901 : }
4902 900547 : if (rest > 0) {
4903 386011 : cur |= (1U << rest) - 1;
4904 386011 : lnr -= rest;
4905 386011 : pos += rest;
4906 : }
4907 900547 : assert(lnr==0);
4908 : } else {
4909 57537967 : size_t used = pos&31, end = 32;
4910 57537967 : if (used) {
4911 55740063 : if (lnr < (32-used))
4912 53844648 : end = used + lnr;
4913 :
4914 55740063 : pos+= (end-used);
4915 55740063 : lnr-= (end-used);
4916 55740063 : if (end == 32) {
4917 1895415 : *dst++ = cur;
4918 1895415 : cur = 0;
4919 : }
4920 : }
4921 57537967 : size_t full = lnr/32;
4922 57537967 : size_t rest = lnr%32;
4923 57537967 : memset(dst, 0, full * sizeof(*dst));
4924 57537967 : dst += full;
4925 57537967 : lnr -= full * 32;
4926 57537967 : pos += full * 32;
4927 57537967 : pos+= rest;
4928 57537967 : lnr-= rest;
4929 57537967 : assert(lnr==0);
4930 : }
4931 : }
4932 :
4933 102414 : unlock_table(tr->store, t->base.id);
4934 102414 : if (pos%32)
4935 101096 : *dst=cur;
4936 102414 : BATsetcount(b, nr);
4937 102414 : bn = BATmaskedcands(start, nr, b, true);
4938 102414 : BBPreclaim(b);
4939 102414 : (void)pos;
4940 102414 : assert (pos == nr);
4941 : return bn;
4942 : }
4943 :
4944 : static void * /* BAT * */
4945 1363213 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
4946 : {
4947 : /* with nr_of_parts - part_nr we can adjust parts */
4948 1363213 : storage *s = tab_timestamp_storage(tr, t);
4949 :
4950 1363201 : if (!s)
4951 : return NULL;
4952 1363201 : size_t nr = segs_end(s->segs, tr, t);
4953 :
4954 1363197 : if (!nr)
4955 6757 : return BATdense(0, 0, 0);
4956 :
4957 : /* compute proper part */
4958 1356440 : size_t part_size = nr/nr_of_parts;
4959 1356440 : size_t start = part_size * part_nr;
4960 1356440 : size_t end = start + part_size;
4961 1356440 : if (part_nr == (nr_of_parts-1))
4962 1284997 : end = nr;
4963 1356440 : assert(end <= nr);
4964 1356440 : return segments2cands(s, tr, t, start, end);
4965 : }
4966 :
4967 : static int
4968 5 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
4969 : {
4970 5 : bool update_conflict = false;
4971 :
4972 5 : if (segments_in_transaction(tr, col->t))
4973 : return LOG_CONFLICT;
4974 :
4975 5 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
4976 :
4977 5 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
4978 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
4979 5 : assert(d && d->cs.ts == tr->tid);
4980 5 : if (odelta != d)
4981 5 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
4982 5 : if (d->cs.bid)
4983 5 : temp_destroy(d->cs.bid);
4984 5 : if (d->cs.uibid)
4985 5 : temp_destroy(d->cs.uibid);
4986 5 : if (d->cs.uvbid)
4987 5 : temp_destroy(d->cs.uvbid);
4988 5 : bat_set_access(bn, BAT_READ);
4989 5 : d->cs.bid = temp_create(bn);
4990 5 : d->cs.uibid = 0;
4991 5 : d->cs.uvbid = 0;
4992 5 : d->cs.ucnt = 0;
4993 5 : d->cs.cleared = true;
4994 5 : d->cs.ts = tr->tid;
4995 5 : ATOMIC_INIT(&d->cs.refcnt, 1);
4996 5 : return LOG_OK;
4997 : }
4998 :
4999 : static int
5000 58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
5001 : {
5002 58 : bool update_conflict = false;
5003 :
5004 58 : if (segments_in_transaction(tr, col->t))
5005 : return LOG_CONFLICT;
5006 :
5007 58 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5008 :
5009 58 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5010 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5011 58 : assert(d && d->cs.ts == tr->tid);
5012 58 : assert(col->t->persistence != SQL_DECLARED_TABLE);
5013 58 : if (odelta != d)
5014 58 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
5015 :
5016 58 : d->cs.st = st;
5017 58 : d->cs.cleared = true;
5018 58 : if (d->cs.bid)
5019 58 : temp_destroy(d->cs.bid);
5020 58 : o = transfer_to_systrans(o);
5021 58 : if (o == NULL)
5022 : return LOG_ERR;
5023 58 : bat_set_access(o, BAT_READ);
5024 58 : d->cs.bid = temp_create(o);
5025 58 : if (u) {
5026 53 : if (d->cs.ebid)
5027 0 : temp_destroy(d->cs.ebid);
5028 53 : u = transfer_to_systrans(u);
5029 53 : if (u == NULL)
5030 : return LOG_ERR;
5031 53 : d->cs.ebid = temp_create(u);
5032 : }
5033 : return LOG_OK;
5034 : }
5035 :
5036 : void
5037 327 : bat_storage_init( store_functions *sf)
5038 : {
5039 327 : sf->bind_col = &bind_col;
5040 327 : sf->bind_updates = &bind_updates;
5041 327 : sf->bind_updates_idx = &bind_updates_idx;
5042 327 : sf->bind_idx = &bind_idx;
5043 327 : sf->bind_cands = &bind_cands;
5044 :
5045 327 : sf->claim_tab = &claim_tab;
5046 327 : sf->key_claim_tab = &key_claim_tab;
5047 327 : sf->tab_validate = &tab_validate;
5048 :
5049 327 : sf->append_col = &append_col;
5050 327 : sf->append_idx = &append_idx;
5051 :
5052 327 : sf->update_col = &update_col;
5053 327 : sf->update_idx = &update_idx;
5054 :
5055 327 : sf->delete_tab = &delete_tab;
5056 :
5057 327 : sf->count_del = &count_del;
5058 327 : sf->count_col = &count_col;
5059 327 : sf->count_idx = &count_idx;
5060 327 : sf->dcount_col = &dcount_col;
5061 327 : sf->min_max_col = &min_max_col;
5062 327 : sf->set_stats_col = &set_stats_col;
5063 327 : sf->sorted_col = &sorted_col;
5064 327 : sf->unique_col = &unique_col;
5065 327 : sf->double_elim_col = &double_elim_col;
5066 327 : sf->col_stats = &col_stats;
5067 327 : sf->col_set_range = &col_set_range;
5068 327 : sf->col_not_null = &col_not_null;
5069 :
5070 327 : sf->col_dup = &col_dup;
5071 327 : sf->idx_dup = &idx_dup;
5072 327 : sf->del_dup = &del_dup;
5073 :
5074 327 : sf->create_col = &create_col; /* create and add to change list */
5075 327 : sf->create_idx = &create_idx;
5076 327 : sf->create_del = &create_del;
5077 :
5078 327 : sf->destroy_col = &destroy_col; /* free resources */
5079 327 : sf->destroy_idx = &destroy_idx;
5080 327 : sf->destroy_del = &destroy_del;
5081 :
5082 327 : sf->drop_col = &drop_col; /* add drop to change list */
5083 327 : sf->drop_idx = &drop_idx;
5084 327 : sf->drop_del = &drop_del;
5085 :
5086 327 : sf->clear_table = &clear_table;
5087 :
5088 327 : sf->swap_bats = &swap_bats;
5089 327 : sf->col_compress = &col_compress;
5090 327 : }
5091 :
5092 : #if 0
5093 : static lng
5094 : log_get_nr_inserted(sql_column *fc, lng *offset)
5095 : {
5096 : lng cnt = 0;
5097 :
5098 : if (!fc || GDKinmemory(0))
5099 : return 0;
5100 :
5101 : if (fc->base.atime && fc->base.allocated) {
5102 : sql_delta *fb = fc->data;
5103 : BAT *ins = temp_descriptor(fb->cs.bid);
5104 :
5105 : if (ins && BATcount(ins) > 0 && BATcount(ins) > ins->batInserted) {
5106 : cnt = BATcount(ins) - ins->batInserted;
5107 : }
5108 : bat_destroy(ins);
5109 : }
5110 : return cnt;
5111 : }
5112 :
5113 : static lng
5114 : log_get_nr_deleted(sql_table *ft, lng *offset)
5115 : {
5116 : lng cnt = 0;
5117 :
5118 : if (!ft || GDKinmemory(0))
5119 : return 0;
5120 :
5121 : if (ft->base.atime && ft->base.allocated) {
5122 : storage *fdb = ft->data;
5123 : BAT *db = temp_descriptor(fdb->cs.bid);
5124 :
5125 : if (db && BATcount(db) > 0 && BATcount(db) > db->batInserted) {
5126 : cnt = BATcount(db) - db->batInserted;
5127 : *offset = db->batInserted;
5128 : }
5129 : bat_destroy(db);
5130 : }
5131 : return cnt;
5132 : }
5133 : #endif
|