Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024, 2025 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "bat_storage.h"
15 : #include "bat_utils.h"
16 : #include "sql_string.h"
17 : #include "matomic.h"
18 :
19 : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
20 : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
21 :
22 : static int log_update_col( sql_trans *tr, sql_change *c);
23 : static int log_update_idx( sql_trans *tr, sql_change *c);
24 : static int log_update_del( sql_trans *tr, sql_change *c);
25 : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
26 : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
27 : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
28 : static int log_create_col(sql_trans *tr, sql_change *change);
29 : static int log_create_idx(sql_trans *tr, sql_change *change);
30 : static int log_create_del(sql_trans *tr, sql_change *change);
31 : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
32 : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
33 : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
34 : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
35 : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
36 : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
37 : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
38 : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
39 :
40 : static lng merge_delta( sql_delta *obat);
41 :
42 : /* valid
43 : * !deleted && VALID_4_READ(TS, tr) existing or newly created segment
44 : * deleted && TS > tr->ts && OLDTS < tr->ts deleted after current transaction
45 : */
46 :
47 : #define VALID_4_READ(TS,tr) \
48 : (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
49 :
50 : /* when changed, check if the old status is still valid */
51 : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
52 : (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
53 :
54 : #define SEG_VALID_4_DELETE(seg,tr) \
55 : (!seg->deleted && VALID_4_READ(seg->ts, tr))
56 :
57 : /* Delete (in current trans or by some other finished transaction, or re-used segment which used to be deleted */
58 : #define SEG_IS_DELETED(seg,tr) \
59 : ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
60 : (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
61 :
62 : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
63 : #define SEG_IS_VALID(seg, tr) \
64 : ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
65 : (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
66 :
67 : static inline BAT *
68 5309 : transfer_to_systrans(BAT *b)
69 : {
70 : /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
71 5309 : MT_lock_set(&b->theaplock);
72 5309 : if (VIEWtparent(b) || VIEWvtparent(b)) {
73 29 : MT_lock_unset(&b->theaplock);
74 29 : BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
75 29 : BBPreclaim(b);
76 29 : return bn;
77 : }
78 5280 : if (b->theap->farmid == TRANSIENT ||
79 62 : (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
80 4895 : QryCtx *qc = MT_thread_get_qry_ctx();
81 4895 : if (qc) {
82 2523 : if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
83 2523 : ATOMIC_SUB(&qc->datasize, b->theap->size);
84 2523 : b->theap->farmid = SYSTRANS;
85 2523 : b->batRole = SYSTRANS;
86 : }
87 2523 : if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
88 1092 : ATOMIC_SUB(&qc->datasize, b->tvheap->size);
89 1092 : b->tvheap->farmid = SYSTRANS;
90 : }
91 : }
92 : }
93 5280 : MT_lock_unset(&b->theaplock);
94 5280 : return b;
95 : }
96 :
97 : static void
98 29508629 : lock_table(sqlstore *store, sqlid id)
99 : {
100 29508629 : MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
101 29607770 : }
102 :
103 : static void
104 29607720 : unlock_table(sqlstore *store, sqlid id)
105 : {
106 29607720 : MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
107 29605505 : }
108 :
109 : static void
110 23063184 : lock_column(sqlstore *store, sqlid id)
111 : {
112 23063184 : MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
113 23087258 : }
114 :
115 : static void
116 23083832 : unlock_column(sqlstore *store, sqlid id)
117 : {
118 23083832 : MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
119 23096634 : }
120 :
121 : static void
122 116206 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
123 : {
124 116206 : assert(cleanup);
125 116206 : trans_add(tr, dup_base(b), data, cleanup, commit, log);
126 116205 : }
127 :
128 : static void
129 132963 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
130 : {
131 132963 : assert(cleanup);
132 132963 : dup_base(&t->base);
133 132960 : trans_add(tr, b, data, cleanup, commit, log);
134 132950 : }
135 :
136 : static int
137 76627 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
138 : {
139 76627 : segment *s = change->data;
140 :
141 76627 : if (s->ts <= oldest) {
142 35014 : while(s) {
143 21394 : segment *n = s->prev;
144 21394 : _DELETE(s);
145 21394 : s = n;
146 : }
147 13620 : sqlstore *store = Store;
148 13620 : table_destroy(store, (sql_table*)change->obj);
149 13620 : return 1;
150 : }
151 : return LOG_OK;
152 : }
153 :
154 : static void
155 21394 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
156 : {
157 : /* we can only be accessed by anything older then commit_ts */
158 21394 : if (c->cleanup == &tc_gc_seg)
159 7774 : s->prev = c->data;
160 : else
161 13620 : c->cleanup = &tc_gc_seg;
162 21394 : c->data = s;
163 21394 : s->ts = commit_ts;
164 17144 : }
165 :
166 : static segment *
167 89714 : new_segment(segment *o, sql_trans *tr, size_t cnt)
168 : {
169 89714 : segment *n = (segment*)GDKmalloc(sizeof(segment));
170 :
171 89722 : assert(tr);
172 89722 : if (n) {
173 89722 : *n = (segment) {
174 89722 : .ts = tr->tid,
175 : .oldts = 0,
176 : .deleted = false,
177 : .start = 0,
178 : .end = cnt,
179 : .next = ATOMIC_PTR_VAR_INIT(NULL),
180 : .prev = NULL,
181 : };
182 89722 : if (o) {
183 36794 : n->start += o->end;
184 36794 : n->end += o->end;
185 36794 : ATOMIC_PTR_SET(&o->next, n);
186 : }
187 : }
188 89722 : return n;
189 : }
190 :
191 : static segment *
192 96675 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
193 : {
194 96675 : assert(tr);
195 96675 : if (o->start == start && o->end == start+cnt) {
196 11477 : assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
197 11477 : o->oldts = o->ts;
198 11477 : o->ts = tr->tid;
199 11477 : o->deleted = deleted;
200 11477 : return o;
201 : }
202 85198 : segment *n = (segment*)GDKmalloc(sizeof(segment));
203 :
204 85198 : if (!n)
205 : return NULL;
206 85198 : n->prev = NULL;
207 :
208 85198 : if (o->ts == tr->tid) {
209 5929 : n->oldts = 0;
210 5929 : n->ts = 1;
211 5929 : n->deleted = true;
212 : } else {
213 79269 : n->oldts = o->ts;
214 79269 : n->ts = tr->tid;
215 79269 : n->deleted = deleted;
216 : }
217 85198 : if (start == o->start) {
218 : /* 2-way split: o remains latter part of segment, new one is
219 : * inserted before */
220 68534 : n->start = o->start;
221 68534 : n->end = n->start + cnt;
222 68534 : ATOMIC_PTR_INIT(&n->next, o);
223 68534 : if (segs->h == o)
224 479 : segs->h = n;
225 68534 : if (p)
226 68055 : ATOMIC_PTR_SET(&p->next, n);
227 68534 : o->start = n->end;
228 16664 : } else if (start+cnt == o->end) {
229 : /* 2-way split: o remains first part of segment, new one is
230 : * added after */
231 5935 : n->start = o->end - cnt;
232 5935 : n->end = o->end;
233 5935 : ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
234 5935 : ATOMIC_PTR_SET(&o->next, n);
235 5935 : if (segs->t == o)
236 906 : segs->t = n;
237 5935 : o->end = n->start;
238 : } else {
239 : /* 3-way split: o remains first part of segment, two new ones
240 : * are added after */
241 10729 : segment *n2 = GDKmalloc(sizeof(segment));
242 10729 : if (n2 == NULL) {
243 0 : GDKfree(n);
244 0 : return NULL;
245 : }
246 10729 : ATOMIC_PTR_INIT(&n->next, n2);
247 10729 : n->start = start;
248 10729 : n->end = start + cnt;
249 10729 : *n2 = *o;
250 10729 : ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
251 10729 : n2->start = n->end;
252 10729 : n2->prev = NULL;
253 10729 : if (segs->t == o)
254 4500 : segs->t = n2;
255 10729 : ATOMIC_PTR_SET(&o->next, n);
256 10729 : o->end = start;
257 : }
258 : return n;
259 : }
260 :
261 : static void
262 4302 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
263 : {
264 4302 : segment *cur = segs->h, *seg = NULL;
265 18356 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
266 14054 : if (cur->ts == tr->tid) { /* revert */
267 4799 : cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
268 4799 : cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
269 4799 : cur->oldts = 0;
270 : }
271 14054 : if (cur->ts <= oldest) { /* possibly merge range */
272 12994 : if (!seg) { /* skip first */
273 : seg = cur;
274 8692 : } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
275 : /* merge with previous */
276 4250 : seg->end = cur->end;
277 4250 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
278 4250 : if (cur == segs->t)
279 2859 : segs->t = seg;
280 4250 : mark4destroy(cur, change, store_get_timestamp(tr->store));
281 4250 : cur = seg;
282 : } else {
283 : seg = cur; /* begin of new merge */
284 : }
285 : }
286 : }
287 4302 : }
288 :
289 : static size_t
290 105675 : segs_end_include_deleted( segments *segs, sql_trans *tr)
291 : {
292 105675 : size_t cnt = 0;
293 105675 : segment *s = segs->h, *l = NULL;
294 :
295 482556 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
296 376881 : if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
297 : l = s;
298 : }
299 105675 : if (l)
300 105668 : cnt = l->end;
301 105675 : return cnt;
302 : }
303 :
304 : static int
305 105675 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
306 : {
307 : /* set bits correctly */
308 105675 : BAT *b = temp_descriptor(cs->bid);
309 :
310 105675 : if (!b)
311 : return LOG_ERR;
312 105675 : segment *s = segs->h;
313 :
314 105675 : size_t nr = segs_end_include_deleted(segs, tr);
315 105675 : size_t rounded_nr = ((nr+31)&~31);
316 105675 : if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
317 0 : bat_destroy(b);
318 0 : return LOG_ERR;
319 : }
320 :
321 : /* disable all properties here */
322 105675 : MT_lock_set(&b->theaplock);
323 105675 : b->tsorted = false;
324 105675 : b->trevsorted = false;
325 105675 : b->tnosorted = 0;
326 105675 : b->tnorevsorted = 0;
327 105675 : b->tseqbase = oid_nil;
328 105675 : b->tkey = false;
329 105675 : b->tnokey[0] = 0;
330 105675 : b->tnokey[1] = 0;
331 105675 : BUN cnt = BATcount(b);
332 :
333 105675 : uint32_t *restrict dst;
334 424739 : for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
335 362982 : if (s->start >= nr)
336 : break;
337 319064 : if (s->ts == tr->tid && s->end != s->start) {
338 152972 : if (cnt < s->start) { /* first mark as deleted ! */
339 2720 : size_t lnr = s->start-cnt;
340 2720 : size_t pos = cnt;
341 2720 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
342 2720 : uint32_t cur = 0;
343 :
344 2720 : size_t used = pos&31, end = 32;
345 2720 : if (used) {
346 2639 : if (lnr < (32-used))
347 2499 : end = used + lnr;
348 2639 : assert(end > used);
349 2639 : cur |= ((1U << (end - used)) - 1) << used;
350 2639 : lnr -= end - used;
351 2639 : *dst++ |= cur;
352 2639 : cur = 0;
353 : }
354 2720 : size_t full = lnr/32;
355 2720 : size_t rest = lnr%32;
356 2720 : if (full > 0) {
357 9 : memset(dst, ~0, full * sizeof(*dst));
358 9 : dst += full;
359 9 : lnr -= full * 32;
360 : }
361 2720 : if (rest > 0) {
362 136 : cur |= (1U << rest) - 1;
363 136 : lnr -= rest;
364 136 : *dst |= cur;
365 : }
366 2720 : assert(lnr==0);
367 : }
368 152972 : size_t lnr = s->end-s->start;
369 152972 : size_t pos = s->start;
370 152972 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
371 152972 : uint32_t cur = 0;
372 152972 : size_t used = pos&31, end = 32;
373 152972 : if (used) {
374 114511 : if (lnr < (32-used))
375 109031 : end = used + lnr;
376 114511 : assert(end > used);
377 114511 : cur |= ((1U << (end - used)) - 1) << used;
378 114511 : lnr -= end - used;
379 114511 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
380 114511 : dst++;
381 114511 : cur = 0;
382 : }
383 152972 : size_t full = lnr/32;
384 152972 : size_t rest = lnr%32;
385 152972 : if (full > 0) {
386 3872 : memset(dst, s->deleted?~0:0, full * sizeof(*dst));
387 3872 : dst += full;
388 3872 : lnr -= full * 32;
389 : }
390 152972 : if (rest > 0) {
391 41009 : cur |= (1U << rest) - 1;
392 41009 : lnr -= rest;
393 41009 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
394 : }
395 152972 : assert(lnr==0);
396 152972 : if (cnt < s->end)
397 319064 : cnt = s->end;
398 : }
399 : }
400 105675 : if (nr > BATcount(b)) {
401 63921 : BATsetcount(b, nr);
402 : }
403 105675 : b->theap->dirty = true;
404 105675 : MT_lock_unset(&b->theaplock);
405 :
406 105675 : bat_destroy(b);
407 105675 : return LOG_OK;
408 : }
409 :
410 : /* TODO return LOG_OK/ERR */
411 : static void
412 105701 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
413 : {
414 105701 : sqlstore* store = tr->store;
415 105701 : segment *cur = s->segs->h, *seg = NULL;
416 482654 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
417 376953 : if (cur->ts == tr->tid) {
418 167707 : if (!cur->deleted)
419 94283 : cur->oldts = 0;
420 167707 : cur->ts = commit_ts;
421 : }
422 376953 : if (!seg) {
423 : /* first segment */
424 : seg = cur;
425 : }
426 271252 : else if (seg->ts < TRANSACTION_ID_BASE) {
427 : /* possible merge since both deleted flags are equal */
428 255110 : if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
429 187103 : int merge = 1;
430 187103 : node *n = store->active->h;
431 567101 : for (int i = 0; i < store->active->cnt; i++, n = n->next) {
432 461704 : sql_trans* other = ((sql_trans*)n->data);
433 461704 : ulng active = other->ts;
434 461704 : if(other->active == 2)
435 19614 : continue; /* pretend that another recently committed transaction is no longer active */
436 442090 : if (active == tr->ts)
437 132916 : continue; /* pretend that committing transaction has already committed and is no longer active */
438 309174 : if (seg->ts < active && cur->ts < active)
439 : break;
440 297296 : if (seg->ts > active && cur->ts > active)
441 227468 : continue;
442 :
443 69828 : assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
444 : /* cannot safely merge since there is an active transaction between the segments */
445 : merge = false;
446 : break;
447 : }
448 : /* merge segments */
449 234550 : if (merge) {
450 117275 : seg->end = cur->end;
451 117275 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
452 117275 : if (cur == s->segs->t)
453 28630 : s->segs->t = seg;
454 117275 : if (commit_ts == oldest) {
455 100131 : _DELETE(cur);
456 : } else
457 34288 : mark4destroy(cur, change, commit_ts);
458 117275 : cur = seg;
459 117275 : continue;
460 : }
461 : }
462 : }
463 : seg = cur;
464 : }
465 105701 : }
466 :
467 : static int
468 2228576 : segments_in_transaction(sql_trans *tr, sql_table *t)
469 : {
470 2228576 : storage *s = ATOMIC_PTR_GET(&t->data);
471 2228576 : segment *seg = s->segs->h;
472 :
473 2228576 : if (seg && s->segs->t->ts == tr->tid)
474 : return 1;
475 604221 : for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
476 497883 : if (seg->ts == tr->tid)
477 : return 1;
478 : }
479 : return 0;
480 : }
481 :
482 : static size_t
483 21391106 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
484 : {
485 21391106 : size_t cnt = 0;
486 :
487 : /* because a table can grow rows over the time a transaction is running, we need to find the last valid segment, to
488 : * keep all of the parts aligned */
489 21391106 : lock_table(tr->store, table->base.id);
490 21468370 : segment *s = segs->h, *l = NULL;
491 :
492 21468370 : if (segs->t && SEG_IS_VALID(segs->t, tr))
493 17729727 : l = s = segs->t;
494 :
495 220688777 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
496 199220456 : if (SEG_IS_VALID(s, tr))
497 : l = s;
498 : }
499 21468321 : if (l)
500 21449609 : cnt = l->end;
501 21468321 : unlock_table(tr->store, table->base.id);
502 21466889 : return cnt;
503 : }
504 :
505 : static segments *
506 52925 : new_segments(sql_trans *tr, size_t cnt)
507 : {
508 52925 : segments *n = (segments*)GDKmalloc(sizeof(segments));
509 :
510 52926 : if (n) {
511 52926 : n->nr_reused = 0;
512 52926 : n->h = n->t = new_segment(NULL, tr, cnt);
513 52928 : if (!n->h) {
514 0 : GDKfree(n);
515 0 : return NULL;
516 : }
517 52928 : sql_ref_init(&n->r);
518 : }
519 : return n;
520 : }
521 :
522 : static sql_delta *
523 34518081 : timestamp_delta( sql_trans *tr, sql_delta *d)
524 : {
525 34541549 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
526 23468 : d = d->next;
527 34525345 : return d;
528 : }
529 :
530 : static sql_delta *
531 34362332 : col_timestamp_delta( sql_trans *tr, sql_column *c)
532 : {
533 34362332 : return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
534 : }
535 :
536 : static sql_delta *
537 27347 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
538 : {
539 27347 : return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
540 : }
541 :
542 : static storage *
543 21887623 : timestamp_storage( sql_trans *tr, storage *d)
544 : {
545 21887623 : if (!d)
546 : return NULL;
547 21964561 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
548 76938 : d = d->next;
549 : return d;
550 : }
551 :
552 : static storage *
553 21870748 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
554 : {
555 21870748 : return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
556 : }
557 :
558 : static sql_delta*
559 21411 : delta_dup(sql_delta *d)
560 : {
561 21411 : ATOMIC_INC(&d->cs.refcnt);
562 21411 : return d;
563 : }
564 :
565 : static void *
566 19595 : col_dup(sql_column *c)
567 : {
568 19595 : if (!ATOMIC_PTR_GET(&c->data))
569 : return NULL;
570 19595 : return delta_dup(ATOMIC_PTR_GET(&c->data));
571 :
572 : }
573 :
574 : static void *
575 3103 : idx_dup(sql_idx *i)
576 : {
577 3103 : if (!ATOMIC_PTR_GET(&i->data))
578 : return NULL;
579 1816 : return delta_dup(ATOMIC_PTR_GET(&i->data));
580 : }
581 :
582 : static storage*
583 1608 : storage_dup(storage *d)
584 : {
585 1608 : ATOMIC_INC(&d->cs.refcnt);
586 1608 : return d;
587 : }
588 :
589 : static void *
590 1608 : del_dup(sql_table *t)
591 : {
592 1608 : return storage_dup(ATOMIC_PTR_GET(&t->data));
593 : }
594 :
595 : static size_t
596 17 : count_inserts( segment *s, sql_trans *tr)
597 : {
598 17 : size_t cnt = 0;
599 :
600 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
601 55 : if (!s->deleted && s->ts == tr->tid)
602 4 : cnt += s->end - s->start;
603 : }
604 17 : return cnt;
605 : }
606 :
607 : static size_t
608 882110 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
609 : {
610 882110 : size_t cnt = 0;
611 :
612 1033819 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
613 : ;
614 :
615 5626949 : for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
616 4744925 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
617 1634680 : cnt += s->end - s->start;
618 : }
619 882024 : return cnt;
620 : }
621 :
622 : static size_t
623 : count_deletes( segment *s, sql_trans *tr)
624 : {
625 : size_t cnt = 0;
626 :
627 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
628 : if (SEG_IS_DELETED(s, tr))
629 : cnt += s->end - s->start;
630 : }
631 : return cnt;
632 : }
633 :
634 : static size_t
635 18645077 : count_col(sql_trans *tr, sql_column *c, int access)
636 : {
637 18645077 : storage *d;
638 18645077 : sql_delta *ds;
639 :
640 18645077 : if (!isTable(c->t))
641 : return 0;
642 18645077 : d = tab_timestamp_storage(tr, c->t);
643 18658001 : ds = col_timestamp_delta(tr, c);
644 18662761 : if (!d ||!ds)
645 : return 0;
646 18662761 : if (access == 2)
647 493513 : return ds?ds->cs.ucnt:0;
648 18169248 : if (access == 1)
649 17 : return count_inserts(d->segs->h, tr);
650 18169231 : if (access == QUICK)
651 0 : return d->segs->t?d->segs->t->end:0;
652 18169231 : if (access == CNT_ACTIVE) {
653 42516 : size_t cnt = segs_end(d->segs, tr, c->t);
654 42517 : lock_table(tr->store, c->t->base.id);
655 42515 : cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
656 42513 : unlock_table(tr->store, c->t->base.id);
657 42513 : return cnt;
658 : }
659 18126715 : return segs_end(d->segs, tr, c->t);
660 : }
661 :
662 : static size_t
663 23332 : count_idx(sql_trans *tr, sql_idx *i, int access)
664 : {
665 23332 : storage *d;
666 23332 : sql_delta *ds;
667 :
668 23332 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
669 6719 : return 0;
670 16612 : d = tab_timestamp_storage(tr, i->t);
671 16652 : ds = idx_timestamp_delta(tr, i);
672 16652 : if (!d || !ds)
673 : return 0;
674 16652 : if (access == 2)
675 2871 : return ds?ds->cs.ucnt:0;
676 13781 : if (access == 1)
677 0 : return count_inserts(d->segs->h, tr);
678 13781 : if (access == QUICK)
679 3529 : return d->segs->t?d->segs->t->end:0;
680 10252 : return segs_end(d->segs, tr, i->t);
681 : }
682 :
683 : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
684 : static BAT *
685 15711688 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
686 : {
687 15711688 : BAT *b;
688 :
689 15711688 : assert(access == RD_UPD_ID || access == RD_UPD_VAL);
690 : /* returns the updates for cs */
691 15711688 : if (cs->uibid && cs->uvbid && cs->ucnt) {
692 14681 : if (access == RD_UPD_ID) {
693 8838 : if (!(b = temp_descriptor(cs->uibid)))
694 : return NULL;
695 8838 : if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
696 2322 : (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
697 6516 : oid nil = oid_nil;
698 : /* less then cnt */
699 6516 : BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false, false);
700 6516 : if (!s) {
701 0 : bat_destroy(b);
702 0 : return NULL;
703 : }
704 :
705 6516 : BAT *nb = BATproject(s, b);
706 6516 : bat_destroy(s);
707 6516 : bat_destroy(b);
708 6516 : b = nb;
709 : }
710 : } else {
711 5843 : b = temp_descriptor(cs->uvbid);
712 : }
713 : } else {
714 26163004 : b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
715 : }
716 : return b;
717 : }
718 :
719 : static BAT *
720 0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
721 : {
722 0 : int err = 0;
723 0 : BAT *uv = *UV;
724 0 : BUN cnt = BATcount(ui)+BATcount(oi);
725 0 : BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
726 0 : BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
727 :
728 0 : if (!ni || (uv && !nv)) {
729 0 : bat_destroy(ni);
730 0 : bat_destroy(nv);
731 0 : bat_destroy(ui);
732 0 : bat_destroy(uv);
733 0 : bat_destroy(oi);
734 0 : bat_destroy(ov);
735 0 : return NULL;
736 : }
737 0 : BATiter uvi;
738 0 : BATiter ovi;
739 :
740 0 : if (uv) {
741 0 : uvi = bat_iterator(uv);
742 0 : ovi = bat_iterator(ov);
743 : }
744 :
745 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
746 0 : BUN uip = 0, uie = BATcount(ui);
747 0 : BUN oip = 0, oie = BATcount(oi);
748 :
749 0 : oid uiseqb = ui->tseqbase;
750 0 : oid oiseqb = oi->tseqbase;
751 0 : oid *uipt = NULL, *oipt = NULL;
752 0 : BATiter uii = bat_iterator(ui);
753 0 : BATiter oii = bat_iterator(oi);
754 0 : if (!BATtdensebi(&uii))
755 0 : uipt = uii.base;
756 0 : if (!BATtdensebi(&oii))
757 0 : oipt = oii.base;
758 0 : while (uip < uie && oip < oie && !err) {
759 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
760 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
761 :
762 0 : if (uiid <= oiid) {
763 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
764 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
765 : err = 1;
766 0 : uip++;
767 0 : if (uiid == oiid)
768 0 : oip++;
769 : } else { /* uiid > oiid */
770 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
771 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
772 : err = 1;
773 0 : oip++;
774 : }
775 : }
776 0 : while (uip < uie && !err) {
777 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
778 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
779 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
780 : err = 1;
781 0 : uip++;
782 : }
783 0 : while (oip < oie && !err) {
784 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
785 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
786 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
787 : err = 1;
788 0 : oip++;
789 : }
790 0 : if (uv) {
791 0 : bat_iterator_end(&uvi);
792 0 : bat_iterator_end(&ovi);
793 : }
794 0 : bat_iterator_end(&uii);
795 0 : bat_iterator_end(&oii);
796 0 : bat_destroy(ui);
797 0 : bat_destroy(uv);
798 0 : bat_destroy(oi);
799 0 : bat_destroy(ov);
800 0 : if (!err) {
801 0 : if (nv)
802 0 : *UV = nv;
803 0 : return ni;
804 : }
805 0 : *UV = NULL;
806 0 : bat_destroy(ni);
807 0 : bat_destroy(nv);
808 0 : return NULL;
809 : }
810 :
811 : static sql_delta *
812 10478673 : older_delta( sql_delta *d, sql_trans *tr)
813 : {
814 10478673 : sql_delta *o = d->next;
815 :
816 10487937 : while (o && !o->cs.merged) {
817 9259 : if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
818 : break;
819 : else
820 9264 : o = o->next;
821 : }
822 10478678 : if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
823 0 : return o;
824 : return NULL;
825 : }
826 :
827 : static BAT *
828 10474635 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
829 : {
830 10474635 : assert(tr->active);
831 10474635 : sql_delta *o = NULL;
832 10474635 : BAT *ui = NULL, *uv = NULL;
833 :
834 10474635 : if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
835 : return NULL;
836 10478396 : if (access == RD_UPD_VAL) {
837 5240844 : if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
838 0 : bat_destroy(ui);
839 0 : return NULL;
840 : }
841 : }
842 10478630 : while ((o = older_delta(d, tr)) != NULL) {
843 0 : BAT *oui = NULL, *ouv = NULL;
844 0 : if (!oui)
845 0 : oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
846 0 : if (access == RD_UPD_VAL)
847 0 : ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
848 0 : if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
849 0 : bat_destroy(ui);
850 0 : bat_destroy(uv);
851 0 : bat_destroy(oui);
852 0 : bat_destroy(ouv);
853 0 : return NULL;
854 : }
855 0 : if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
856 : return NULL;
857 : d = o;
858 : }
859 10478708 : if (uv) {
860 5241068 : bat_destroy(ui);
861 5241068 : return uv;
862 : }
863 : return ui;
864 : }
865 :
866 : static BAT *
867 2851 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
868 : {
869 2851 : lock_column(tr->store, c->base.id);
870 2851 : sql_delta *d = col_timestamp_delta(tr, c);
871 2851 : int type = c->type.multiset?TYPE_int:c->type.type->localtype;
872 :
873 2851 : if (!d) {
874 0 : unlock_column(tr->store, c->base.id);
875 0 : return NULL;
876 : }
877 2851 : if (d->cs.st == ST_DICT) {
878 0 : BAT *b = quick_descriptor(d->cs.bid);
879 :
880 0 : type = b->ttype;
881 : }
882 2851 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
883 2851 : unlock_column(tr->store, c->base.id);
884 2851 : return bn;
885 : }
886 :
887 : static BAT *
888 0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
889 : {
890 0 : lock_column(tr->store, i->base.id);
891 0 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
892 0 : sql_delta *d = idx_timestamp_delta(tr, i);
893 :
894 0 : if (!d) {
895 0 : unlock_column(tr->store, i->base.id);
896 0 : return NULL;
897 : }
898 0 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
899 0 : unlock_column(tr->store, i->base.id);
900 0 : return bn;
901 : }
902 :
903 : static BAT *
904 10503634 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
905 : {
906 10503634 : BAT *b;
907 :
908 10503634 : assert(access == RDONLY || access == QUICK || access == RD_EXT);
909 10503634 : assert(cs != NULL);
910 10503634 : if (access == QUICK)
911 176710 : return quick_descriptor(cs->bid);
912 10326924 : if (access == RD_EXT)
913 858 : return temp_descriptor(cs->ebid);
914 10326066 : assert(cs->bid);
915 10326066 : b = temp_descriptor(cs->bid);
916 10326995 : if (b == NULL)
917 : return NULL;
918 10326995 : assert(b->batRestricted == BAT_READ);
919 : /* return slice */
920 10326995 : BAT *s = BATslice(b, 0, cnt);
921 10314956 : bat_destroy(b);
922 10314956 : return s;
923 : }
924 :
925 : static int
926 5236803 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
927 : {
928 5236803 : lock_column(tr->store, c->base.id);
929 5236690 : size_t cnt = count_col(tr, c, 0);
930 5238153 : sql_delta *d = col_timestamp_delta(tr, c);
931 5237688 : int type = c->type.multiset?TYPE_int:c->type.type->localtype;
932 :
933 5237688 : if (!d) {
934 0 : unlock_column(tr->store, c->base.id);
935 0 : return LOG_ERR;
936 : }
937 5237688 : if (d->cs.st == ST_DICT) {
938 2 : BAT *b = quick_descriptor(d->cs.bid);
939 :
940 2 : type = b->ttype;
941 : }
942 :
943 5237688 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
944 5238096 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
945 :
946 5238073 : unlock_column(tr->store, c->base.id);
947 :
948 5237728 : if (*ui == NULL || *uv == NULL) {
949 0 : bat_destroy(*ui);
950 0 : bat_destroy(*uv);
951 0 : return LOG_ERR;
952 : }
953 : return LOG_OK;
954 : }
955 :
956 : static int
957 23 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
958 : {
959 23 : lock_column(tr->store, i->base.id);
960 23 : size_t cnt = count_idx(tr, i, 0);
961 23 : sql_delta *d = idx_timestamp_delta(tr, i);
962 23 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
963 :
964 23 : if (!d) {
965 0 : unlock_column(tr->store, i->base.id);
966 0 : return LOG_ERR;
967 : }
968 :
969 23 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
970 23 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
971 :
972 23 : unlock_column(tr->store, i->base.id);
973 :
974 23 : if (*ui == NULL || *uv == NULL) {
975 0 : bat_destroy(*ui);
976 0 : bat_destroy(*uv);
977 0 : return LOG_ERR;
978 : }
979 : return LOG_OK;
980 : }
981 :
982 : static void * /* BAT * */
983 10491118 : bind_col(sql_trans *tr, sql_column *c, int access)
984 : {
985 10491118 : assert(access == QUICK || tr->active);
986 10491118 : if (!isTable(c->t))
987 : return NULL;
988 10491118 : sql_delta *d = col_timestamp_delta(tr, c);
989 10493347 : if (!d)
990 : return NULL;
991 10493347 : size_t cnt = count_col(tr, c, 0);
992 10496767 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
993 2851 : return bind_ucol(tr, c, access, cnt);
994 10493916 : BAT *b = cs_bind_bat( &d->cs, access, cnt);
995 10489559 : int type = c->type.multiset?TYPE_int:c->type.type->localtype;
996 10489559 : (void)type;
997 10489559 : assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == type) || (access == QUICK && b->ttype < 0));
998 : return b;
999 : }
1000 :
1001 : static void * /* BAT * */
1002 10690 : bind_idx(sql_trans *tr, sql_idx * i, int access)
1003 : {
1004 10690 : assert(access == QUICK || tr->active);
1005 10690 : if (!isTable(i->t))
1006 : return NULL;
1007 10690 : sql_delta *d = idx_timestamp_delta(tr, i);
1008 10699 : if (!d)
1009 : return NULL;
1010 10699 : size_t cnt = count_idx(tr, i, 0);
1011 10707 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
1012 0 : return bind_uidx(tr, i, access, cnt);
1013 10707 : return cs_bind_bat( &d->cs, access, cnt);
1014 : }
1015 :
1016 : static int
1017 4152 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
1018 : {
1019 4152 : if (!cs->uibid) {
1020 0 : cs->uibid = e_bat(TYPE_oid);
1021 0 : if (cs->uibid == BID_NIL)
1022 : return LOG_ERR;
1023 : }
1024 4152 : if (!cs->uvbid) {
1025 0 : BAT *cur = quick_descriptor(cs->bid);
1026 0 : if (!cur)
1027 : return LOG_ERR;
1028 0 : int type = cur->ttype;
1029 0 : cs->uvbid = e_bat(type);
1030 0 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1031 : return LOG_ERR;
1032 : }
1033 4152 : BAT *ui = temp_descriptor(cs->uibid);
1034 4152 : BAT *uv = temp_descriptor(cs->uvbid);
1035 :
1036 4152 : if (ui == NULL || uv == NULL) {
1037 0 : bat_destroy(ui);
1038 0 : bat_destroy(uv);
1039 0 : return LOG_ERR;
1040 : }
1041 4152 : assert(ui && uv);
1042 4152 : if (isEbat(ui)){
1043 400 : temp_destroy(cs->uibid);
1044 400 : cs->uibid = temp_copy(ui->batCacheid, true, true);
1045 400 : bat_destroy(ui);
1046 400 : if (cs->uibid == BID_NIL ||
1047 400 : (ui = temp_descriptor(cs->uibid)) == NULL) {
1048 0 : bat_destroy(uv);
1049 0 : return LOG_ERR;
1050 : }
1051 : }
1052 4152 : if (isEbat(uv)){
1053 400 : temp_destroy(cs->uvbid);
1054 400 : cs->uvbid = temp_copy(uv->batCacheid, true, true);
1055 400 : bat_destroy(uv);
1056 400 : if (cs->uvbid == BID_NIL ||
1057 400 : (uv = temp_descriptor(cs->uvbid)) == NULL) {
1058 0 : bat_destroy(ui);
1059 0 : return LOG_ERR;
1060 : }
1061 : }
1062 4152 : *Ui = ui;
1063 4152 : *Uv = uv;
1064 4152 : return LOG_OK;
1065 : }
1066 :
1067 : static int
1068 6984 : segments_is_append(segment *s, sql_trans *tr, oid rid)
1069 : {
1070 102673 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1071 102673 : if (s->start <= rid && s->end > rid) {
1072 6984 : if (s->ts == tr->tid && !s->deleted) {
1073 2934 : return 1;
1074 : }
1075 : break;
1076 : }
1077 : }
1078 : return 0;
1079 : }
1080 :
1081 : static int
1082 4050 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
1083 : {
1084 96455 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1085 96455 : if (s->start <= rid && s->end > rid) {
1086 4050 : if (s->ts >= tr->ts && s->deleted) {
1087 0 : return 1;
1088 : }
1089 : break;
1090 : }
1091 : }
1092 : return 0;
1093 : }
1094 :
1095 : static sql_delta *
1096 0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
1097 : {
1098 0 : sql_delta *n = ZNEW(sql_delta);
1099 0 : if (!n)
1100 : return NULL;
1101 0 : *n = *bat;
1102 0 : n->next = NULL;
1103 0 : n->cs.ts = tr->tid;
1104 0 : return n;
1105 : }
1106 :
1107 : static BAT *
1108 17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
1109 : {
1110 17 : BAT *newoffsets = NULL;
1111 17 : sql_delta *bat = *batp;
1112 17 : column_storage *cs = &bat->cs;
1113 17 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1114 :
1115 17 : if (!u)
1116 : return NULL;
1117 17 : BUN max_cnt = (BATcount(u) < 256)?256:(BATcount(u)<65536)?65536:INT_MAX;
1118 17 : if (DICTprepare4append(&newoffsets, i, u) < 0) {
1119 0 : bat_destroy(u);
1120 0 : return NULL;
1121 : } else {
1122 17 : int new = 0;
1123 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1124 17 : if (BATcount(u) >= max_cnt) {
1125 1 : if (max_cnt == INT_MAX) { /* decompress */
1126 0 : if (!(b = temp_descriptor(cs->bid))) {
1127 0 : bat_destroy(u);
1128 0 : return NULL;
1129 : }
1130 0 : if (cs->ucnt) {
1131 0 : BAT *ui = NULL, *uv = NULL;
1132 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1133 0 : bat_destroy(b);
1134 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1135 0 : bat_destroy(nb);
1136 0 : bat_destroy(u);
1137 0 : return NULL;
1138 : }
1139 0 : b = nb;
1140 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1141 0 : bat_destroy(ui);
1142 0 : bat_destroy(uv);
1143 0 : bat_destroy(b);
1144 0 : bat_destroy(u);
1145 : }
1146 0 : bat_destroy(ui);
1147 0 : bat_destroy(uv);
1148 : }
1149 0 : n = DICTdecompress_(b, u, PERSISTENT);
1150 0 : bat_destroy(b);
1151 0 : assert(newoffsets == NULL);
1152 0 : if (!n) {
1153 0 : bat_destroy(u);
1154 0 : return NULL;
1155 : }
1156 0 : if (cs->ts != tr->tid) {
1157 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1158 0 : bat_destroy(n);
1159 0 : return NULL;
1160 : }
1161 0 : cs = &(*batp)->cs;
1162 0 : new = 1;
1163 : }
1164 0 : if (cs->bid && !new)
1165 0 : temp_destroy(cs->bid);
1166 0 : n = transfer_to_systrans(n);
1167 0 : if (n == NULL)
1168 : return NULL;
1169 0 : bat_set_access(n, BAT_READ);
1170 0 : cs->bid = temp_create(n);
1171 0 : bat_destroy(n);
1172 0 : if (cs->ebid && !new)
1173 0 : temp_destroy(cs->ebid);
1174 0 : cs->ebid = 0;
1175 0 : cs->ucnt = 0;
1176 0 : if (cs->uibid && !new)
1177 0 : temp_destroy(cs->uibid);
1178 0 : if (cs->uvbid && !new)
1179 0 : temp_destroy(cs->uvbid);
1180 0 : cs->uibid = cs->uvbid = 0;
1181 0 : cs->st = ST_DEFAULT;
1182 0 : cs->cleared = true;
1183 : } else {
1184 1 : if (!(b = temp_descriptor(cs->bid))) {
1185 0 : bat_destroy(newoffsets);
1186 0 : bat_destroy(u);
1187 0 : return NULL;
1188 : }
1189 2 : n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
1190 1 : bat_destroy(b);
1191 1 : if (!n) {
1192 0 : bat_destroy(newoffsets);
1193 0 : bat_destroy(u);
1194 0 : return NULL;
1195 : }
1196 1 : if (cs->ts != tr->tid) {
1197 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1198 0 : bat_destroy(n);
1199 0 : return NULL;
1200 : }
1201 0 : cs = &(*batp)->cs;
1202 0 : new = 1;
1203 0 : temp_dup(cs->ebid);
1204 0 : if (cs->uibid) {
1205 0 : temp_dup(cs->uibid);
1206 0 : temp_dup(cs->uvbid);
1207 : }
1208 : }
1209 1 : if (cs->bid && !new)
1210 1 : temp_destroy(cs->bid);
1211 1 : n = transfer_to_systrans(n);
1212 1 : if (n == NULL)
1213 : return NULL;
1214 1 : bat_set_access(n, BAT_READ);
1215 1 : cs->bid = temp_create(n);
1216 1 : bat_destroy(n);
1217 1 : cs->cleared = true;
1218 1 : i = newoffsets;
1219 : }
1220 : } else { /* append */
1221 16 : i = newoffsets;
1222 : }
1223 : }
1224 17 : bat_destroy(u);
1225 17 : return i;
1226 : }
1227 :
1228 : static BAT *
1229 0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
1230 : {
1231 0 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1232 0 : BAT *newoffsets = NULL;
1233 0 : BAT *b = NULL, *n = NULL;
1234 :
1235 0 : if (!(b = temp_descriptor(cs->bid)))
1236 : return NULL;
1237 :
1238 0 : if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
1239 0 : bat_destroy(b);
1240 0 : return NULL;
1241 : } else {
1242 : /* returns new offset bat if values within min/max, else decompress */
1243 0 : if (!newoffsets) { /* decompress */
1244 0 : if (cs->ucnt) {
1245 0 : BAT *ui = NULL, *uv = NULL;
1246 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1247 0 : bat_destroy(b);
1248 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1249 0 : bat_destroy(nb);
1250 0 : return NULL;
1251 : }
1252 0 : b = nb;
1253 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1254 0 : bat_destroy(ui);
1255 0 : bat_destroy(uv);
1256 0 : bat_destroy(b);
1257 : }
1258 0 : bat_destroy(ui);
1259 0 : bat_destroy(uv);
1260 : }
1261 0 : n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
1262 0 : bat_destroy(b);
1263 0 : if (!n)
1264 : return NULL;
1265 0 : if (cs->bid)
1266 0 : temp_destroy(cs->bid);
1267 0 : n = transfer_to_systrans(n);
1268 0 : if (n == NULL)
1269 : return NULL;
1270 0 : bat_set_access(n, BAT_READ);
1271 0 : cs->bid = temp_create(n);
1272 0 : cs->ucnt = 0;
1273 0 : if (cs->uibid)
1274 0 : temp_destroy(cs->uibid);
1275 0 : if (cs->uvbid)
1276 0 : temp_destroy(cs->uvbid);
1277 0 : cs->uibid = cs->uvbid = 0;
1278 0 : cs->st = ST_DEFAULT;
1279 0 : cs->cleared = true;
1280 0 : b = n;
1281 : } else { /* append */
1282 : i = newoffsets;
1283 : }
1284 : }
1285 0 : bat_destroy(b);
1286 0 : return i;
1287 : }
1288 :
1289 : /*
1290 : * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
1291 : */
1292 : static int
1293 3163 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
1294 : {
1295 3163 : int res = LOG_OK;
1296 3163 : sql_delta *bat = *batp;
1297 3163 : column_storage *cs = &bat->cs;
1298 3163 : BAT *otids = tids, *oupdates = updates;
1299 :
1300 3163 : if (!BATcount(tids))
1301 : return LOG_OK;
1302 :
1303 3163 : if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
1304 6 : tids = BATunmask(tids);
1305 6 : if (!tids)
1306 : return LOG_ERR;
1307 : }
1308 3163 : if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
1309 0 : updates = BATunmask(updates);
1310 0 : if (!updates) {
1311 0 : if (otids != tids)
1312 0 : bat_destroy(tids);
1313 0 : return LOG_ERR;
1314 : }
1315 3163 : } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
1316 42 : updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
1317 42 : if (!updates) {
1318 0 : if (otids != tids)
1319 0 : bat_destroy(tids);
1320 0 : return LOG_ERR;
1321 : }
1322 : }
1323 :
1324 3163 : if (cs->st == ST_DICT) {
1325 : /* possibly a new array is returned */
1326 4 : BAT *nupdates = dict_append_bat(tr, batp, updates);
1327 4 : bat = *batp;
1328 4 : cs = &bat->cs;
1329 4 : if (oupdates != updates)
1330 0 : bat_destroy(updates);
1331 4 : updates = nupdates;
1332 4 : if (!updates) {
1333 0 : if (otids != tids)
1334 0 : bat_destroy(tids);
1335 0 : return LOG_ERR;
1336 : }
1337 : }
1338 :
1339 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1340 : /* currently only one update delta is possible */
1341 3163 : lock_table(tr->store, t->base.id);
1342 3163 : storage *s = ATOMIC_PTR_GET(&t->data);
1343 3163 : if (!is_new && !cs->cleared) {
1344 2840 : if (!tids->tsorted /* make sure we have simple dense or oids */) {
1345 6 : BAT *sorted, *order;
1346 6 : if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
1347 0 : if (otids != tids)
1348 0 : bat_destroy(tids);
1349 0 : if (oupdates != updates)
1350 0 : bat_destroy(updates);
1351 0 : unlock_table(tr->store, t->base.id);
1352 0 : return LOG_ERR;
1353 : }
1354 6 : if (otids != tids)
1355 0 : bat_destroy(tids);
1356 6 : tids = sorted;
1357 6 : BAT *nupdates = BATproject(order, updates);
1358 6 : bat_destroy(order);
1359 6 : if (oupdates != updates)
1360 0 : bat_destroy(updates);
1361 6 : updates = nupdates;
1362 6 : if (!updates) {
1363 0 : bat_destroy(tids);
1364 0 : unlock_table(tr->store, t->base.id);
1365 0 : return LOG_ERR;
1366 : }
1367 : }
1368 2840 : assert(tids->tsorted);
1369 2840 : BAT *ui = NULL, *uv = NULL;
1370 :
1371 : /* handle updates on just inserted bits */
1372 : /* handle updates on updates (within one transaction) */
1373 2840 : BATiter upi = bat_iterator(updates);
1374 2840 : BUN cnt = 0, ucnt = BATcount(tids);
1375 2840 : BAT *b, *ins = NULL;
1376 2840 : int *msk = NULL;
1377 :
1378 2840 : if((b = temp_descriptor(cs->bid)) == NULL)
1379 : res = LOG_ERR;
1380 :
1381 2840 : if (res == LOG_OK && BATtdense(tids)) {
1382 2637 : oid start = tids->tseqbase, offset = start;
1383 2637 : oid end = start + ucnt;
1384 :
1385 13391 : for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
1386 11438 : if (seg->start <= start && seg->end > start) {
1387 : /* check for delete conflicts */
1388 2637 : if (seg->ts >= tr->ts && seg->deleted) {
1389 0 : res = LOG_CONFLICT;
1390 0 : continue;
1391 : }
1392 :
1393 : /* check for inplace updates */
1394 2637 : BUN lend = end < seg->end?end:seg->end;
1395 2637 : if (seg->ts == tr->tid && !seg->deleted) {
1396 199 : if (!ins) {
1397 199 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1398 199 : if (!ins)
1399 : res = LOG_ERR;
1400 : else {
1401 199 : BATsetcount(ins, ucnt); /* all full updates */
1402 199 : msk = (int*)Tloc(ins, 0);
1403 199 : BUN end = (ucnt+31)/32;
1404 199 : memset(msk, 0, end * sizeof(int));
1405 : }
1406 : }
1407 684 : for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
1408 485 : const void *upd = BUNtail(upi, rid-offset);
1409 485 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1410 0 : res = LOG_ERR;
1411 :
1412 485 : oid word = i/32;
1413 485 : int pos = i%32;
1414 485 : msk[word] |= 1U<<pos;
1415 485 : cnt++;
1416 : }
1417 : }
1418 : }
1419 11438 : if (end < seg->end)
1420 : break;
1421 : }
1422 206 : } else if (res == LOG_OK && complex_cand(tids)) {
1423 3 : struct canditer ci;
1424 3 : segment *seg = s->segs->h;
1425 3 : canditer_init(&ci, NULL, tids);
1426 3 : BUN i = 0;
1427 1036 : while ( seg && res == LOG_OK && i < ucnt) {
1428 1033 : oid rid = canditer_next(&ci);
1429 1033 : if (seg->end <= rid)
1430 13 : seg = ATOMIC_PTR_GET(&seg->next);
1431 1020 : else if (seg->start <= rid && seg->end > rid) {
1432 : /* check for delete conflicts */
1433 1020 : if (seg->ts >= tr->ts && seg->deleted) {
1434 0 : res = LOG_CONFLICT;
1435 0 : continue;
1436 : }
1437 :
1438 : /* check for inplace updates */
1439 1020 : if (seg->ts == tr->tid && !seg->deleted) {
1440 0 : if (!ins) {
1441 0 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1442 0 : if (!ins) {
1443 : res = LOG_ERR;
1444 : break;
1445 : } else {
1446 0 : BATsetcount(ins, ucnt); /* all full updates */
1447 0 : msk = (int*)Tloc(ins, 0);
1448 0 : BUN end = (ucnt+31)/32;
1449 0 : memset(msk, 0, end * sizeof(int));
1450 : }
1451 : }
1452 0 : ptr upd = BUNtail(upi, i);
1453 0 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1454 0 : res = LOG_ERR;
1455 :
1456 0 : oid word = i/32;
1457 0 : int pos = i%32;
1458 0 : msk[word] |= 1U<<pos;
1459 0 : cnt++;
1460 : }
1461 1020 : i++;
1462 : }
1463 : }
1464 200 : } else if (res == LOG_OK) {
1465 200 : BUN i = 0;
1466 200 : oid *rid = Tloc(tids,0);
1467 200 : segment *seg = s->segs->h;
1468 21863 : while ( seg && res == LOG_OK && i < ucnt) {
1469 21663 : if (seg->end <= rid[i])
1470 8233 : seg = ATOMIC_PTR_GET(&seg->next);
1471 13430 : else if (seg->start <= rid[i] && seg->end > rid[i]) {
1472 : /* check for delete conflicts */
1473 13430 : if (seg->ts >= tr->ts && seg->deleted) {
1474 0 : res = LOG_CONFLICT;
1475 0 : continue;
1476 : }
1477 :
1478 : /* check for inplace updates */
1479 13430 : if (seg->ts == tr->tid && !seg->deleted) {
1480 993 : if (!ins) {
1481 79 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1482 79 : if (!ins) {
1483 : res = LOG_ERR;
1484 : break;
1485 : } else {
1486 79 : BATsetcount(ins, ucnt); /* all full updates */
1487 79 : msk = (int*)Tloc(ins, 0);
1488 79 : BUN end = (ucnt+31)/32;
1489 79 : memset(msk, 0, end * sizeof(int));
1490 : }
1491 : }
1492 993 : const void *upd = BUNtail(upi, i);
1493 993 : if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
1494 0 : res = LOG_ERR;
1495 :
1496 993 : oid word = i/32;
1497 993 : int pos = i%32;
1498 993 : msk[word] |= 1U<<pos;
1499 993 : cnt++;
1500 : }
1501 13430 : i++;
1502 : }
1503 : }
1504 : }
1505 :
1506 2840 : if (res == LOG_OK && cnt < ucnt) { /* now handle real updates */
1507 2598 : if (cs->ucnt == 0) {
1508 2496 : if (cnt) {
1509 12 : BAT *nins = BATmaskedcands(0, ucnt, ins, false);
1510 12 : if (nins) {
1511 12 : ui = BATproject(nins, tids);
1512 12 : uv = BATproject(nins, updates);
1513 12 : bat_destroy(nins);
1514 : }
1515 : } else {
1516 2484 : ui = temp_descriptor(tids->batCacheid);
1517 2484 : uv = temp_descriptor(updates->batCacheid);
1518 : }
1519 2496 : if (!ui || !uv) {
1520 : res = LOG_ERR;
1521 : } else {
1522 2496 : temp_destroy(cs->uibid);
1523 2496 : temp_destroy(cs->uvbid);
1524 2496 : ui = transfer_to_systrans(ui);
1525 2496 : uv = transfer_to_systrans(uv);
1526 2496 : if (ui == NULL || uv == NULL) {
1527 0 : BBPreclaim(ui);
1528 0 : BBPreclaim(uv);
1529 : res = LOG_ERR;
1530 : } else {
1531 2496 : cs->uibid = temp_create(ui);
1532 2496 : cs->uvbid = temp_create(uv);
1533 2496 : cs->ucnt = BATcount(ui);
1534 : }
1535 : }
1536 : } else {
1537 102 : BAT *nui = NULL, *nuv = NULL;
1538 :
1539 : /* merge taking msk of inserted into account */
1540 102 : if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
1541 : res = LOG_ERR;
1542 :
1543 102 : if (res == LOG_OK) {
1544 102 : const void *upd = NULL;
1545 102 : nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
1546 102 : nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
1547 :
1548 102 : if (!nui || !nuv) {
1549 : res = LOG_ERR;
1550 : } else {
1551 102 : BATiter ovi = bat_iterator(uv);
1552 :
1553 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
1554 102 : BUN uip = 0, uie = BATcount(ui);
1555 102 : BUN nip = 0, nie = BATcount(tids);
1556 102 : oid uiseqb = ui->tseqbase;
1557 102 : oid niseqb = tids->tseqbase;
1558 102 : oid *uipt = NULL, *nipt = NULL;
1559 102 : BATiter uii = bat_iterator(ui);
1560 102 : BATiter tidsi = bat_iterator(tids);
1561 102 : if (!BATtdensebi(&uii))
1562 97 : uipt = uii.base;
1563 102 : if (!BATtdensebi(&tidsi))
1564 93 : nipt = tidsi.base;
1565 16091 : while (uip < uie && nip < nie && res == LOG_OK) {
1566 15989 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1567 15989 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1568 :
1569 15989 : if (uiv < niv) {
1570 8068 : upd = BUNtail(ovi, uip);
1571 16136 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1572 8068 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1573 : res = LOG_ERR;
1574 8068 : uip++;
1575 7921 : } else if (uiv == niv) {
1576 : /* handle == */
1577 1522 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1578 1522 : upd = BUNtail(upi, nip);
1579 3044 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1580 1522 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1581 : res = LOG_ERR;
1582 : } else {
1583 0 : upd = BUNtail(ovi, uip);
1584 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1585 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1586 : res = LOG_ERR;
1587 : }
1588 1522 : uip++;
1589 1522 : nip++;
1590 : } else { /* uiv > niv */
1591 6399 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1592 6255 : upd = BUNtail(upi, nip);
1593 12510 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1594 6255 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1595 : res = LOG_ERR;
1596 : }
1597 6399 : nip++;
1598 : }
1599 : }
1600 923 : while (uip < uie && res == LOG_OK) {
1601 821 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1602 821 : upd = BUNtail(ovi, uip);
1603 1642 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1604 821 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1605 : res = LOG_ERR;
1606 821 : uip++;
1607 : }
1608 703 : while (nip < nie && res == LOG_OK) {
1609 601 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1610 601 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1611 329 : upd = BUNtail(upi, nip);
1612 658 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1613 329 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1614 : res = LOG_ERR;
1615 : }
1616 601 : nip++;
1617 : }
1618 102 : bat_iterator_end(&uii);
1619 102 : bat_iterator_end(&tidsi);
1620 102 : bat_iterator_end(&ovi);
1621 102 : if (res == LOG_OK) {
1622 102 : temp_destroy(cs->uibid);
1623 102 : temp_destroy(cs->uvbid);
1624 102 : nui = transfer_to_systrans(nui);
1625 102 : nuv = transfer_to_systrans(nuv);
1626 102 : if (nui == NULL || nuv == NULL) {
1627 : res = LOG_ERR;
1628 : } else {
1629 102 : cs->uibid = temp_create(nui);
1630 102 : cs->uvbid = temp_create(nuv);
1631 102 : cs->ucnt = BATcount(nui);
1632 : }
1633 : }
1634 : }
1635 102 : bat_destroy(nui);
1636 102 : bat_destroy(nuv);
1637 : }
1638 : }
1639 : }
1640 2840 : bat_iterator_end(&upi);
1641 2840 : bat_destroy(b);
1642 2840 : unlock_table(tr->store, t->base.id);
1643 2840 : bat_destroy(ins);
1644 2840 : bat_destroy(ui);
1645 2840 : bat_destroy(uv);
1646 2840 : if (otids != tids)
1647 10 : bat_destroy(tids);
1648 2840 : if (oupdates != updates)
1649 11 : bat_destroy(updates);
1650 2840 : return res;
1651 : } else if (is_new || cs->cleared) {
1652 323 : BAT *b = temp_descriptor(cs->bid);
1653 :
1654 323 : if (b == NULL) {
1655 : res = LOG_ERR;
1656 : } else {
1657 323 : if (BATcount(b)==0) {
1658 1 : if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
1659 0 : res = LOG_ERR;
1660 322 : } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
1661 0 : res = LOG_ERR;
1662 323 : BBPcold(b->batCacheid);
1663 323 : bat_destroy(b);
1664 : }
1665 : }
1666 323 : unlock_table(tr->store, t->base.id);
1667 323 : if (otids != tids)
1668 2 : bat_destroy(tids);
1669 323 : if (oupdates != updates)
1670 41 : bat_destroy(updates);
1671 : return res;
1672 : }
1673 :
1674 : static int
1675 3163 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
1676 : {
1677 3163 : return cs_update_bat(tr, bat, t, tids, updates, is_new);
1678 : }
1679 :
1680 : static void *
1681 4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
1682 : {
1683 4 : void *newoffsets = NULL;
1684 4 : sql_delta *bat = *batp;
1685 4 : column_storage *cs = &bat->cs;
1686 4 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1687 :
1688 4 : if (!u)
1689 : return NULL;
1690 4 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1691 4 : if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
1692 0 : bat_destroy(u);
1693 0 : return NULL;
1694 : } else {
1695 4 : int new = 0;
1696 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1697 4 : if (BATcount(u) >= max_cnt) {
1698 0 : if (max_cnt == INT_MAX) { /* decompress */
1699 : if (!(b = temp_descriptor(cs->bid))) {
1700 : bat_destroy(u);
1701 : return NULL;
1702 : }
1703 : n = DICTdecompress_(b, u, PERSISTENT);
1704 : /* TODO decompress updates if any */
1705 : bat_destroy(b);
1706 : assert(newoffsets == NULL);
1707 : if (!n) {
1708 : bat_destroy(u);
1709 : return NULL;
1710 : }
1711 : if (cs->ts != tr->tid) {
1712 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1713 : bat_destroy(n);
1714 : bat_destroy(u);
1715 : return NULL;
1716 : }
1717 : cs = &(*batp)->cs;
1718 : new = 1;
1719 : cs->uibid = cs->uvbid = 0;
1720 : }
1721 : if (cs->bid && !new)
1722 : temp_destroy(cs->bid);
1723 : n = transfer_to_systrans(n);
1724 : if (n == NULL) {
1725 : bat_destroy(u);
1726 : return NULL;
1727 : }
1728 : bat_set_access(n, BAT_READ);
1729 : cs->bid = temp_create(n);
1730 : bat_destroy(n);
1731 : if (cs->ebid && !new)
1732 : temp_destroy(cs->ebid);
1733 : cs->ebid = 0;
1734 : cs->st = ST_DEFAULT;
1735 : /* at append_col the column's storage type is cleared */
1736 : cs->cleared = true;
1737 : } else {
1738 0 : if (!(b = temp_descriptor(cs->bid))) {
1739 0 : GDKfree(newoffsets);
1740 0 : bat_destroy(u);
1741 0 : return NULL;
1742 : }
1743 0 : n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
1744 0 : bat_destroy(b);
1745 0 : if (!n) {
1746 0 : GDKfree(newoffsets);
1747 0 : bat_destroy(u);
1748 0 : return NULL;
1749 : }
1750 0 : if (cs->ts != tr->tid) {
1751 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1752 0 : bat_destroy(u);
1753 0 : bat_destroy(n);
1754 0 : return NULL;
1755 : }
1756 0 : cs = &(*batp)->cs;
1757 0 : new = 1;
1758 0 : temp_dup(cs->ebid);
1759 0 : if (cs->uibid) {
1760 0 : temp_dup(cs->uibid);
1761 0 : temp_dup(cs->uvbid);
1762 : }
1763 : }
1764 0 : if (cs->bid)
1765 0 : temp_destroy(cs->bid);
1766 0 : n = transfer_to_systrans(n);
1767 0 : if (n == NULL) {
1768 0 : bat_destroy(u);
1769 0 : return NULL;
1770 : }
1771 0 : bat_set_access(n, BAT_READ);
1772 0 : cs->bid = temp_create(n);
1773 0 : bat_destroy(n);
1774 0 : cs->cleared = true;
1775 0 : i = newoffsets;
1776 : }
1777 : } else { /* append */
1778 4 : i = newoffsets;
1779 : }
1780 : }
1781 4 : bat_destroy(u);
1782 4 : return i;
1783 : }
1784 :
1785 : static void *
1786 1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
1787 : {
1788 1 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1789 1 : void *newoffsets = NULL;
1790 1 : BAT *b = NULL, *n = NULL;
1791 :
1792 1 : if (!(b = temp_descriptor(cs->bid)))
1793 : return NULL;
1794 :
1795 1 : if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
1796 0 : bat_destroy(b);
1797 0 : return NULL;
1798 : } else {
1799 : /* returns new offset bat if values within min/max, else decompress */
1800 1 : if (!newoffsets) {
1801 1 : n = FORdecompress_(b, offsetval, tt, PERSISTENT);
1802 1 : bat_destroy(b);
1803 1 : if (!n)
1804 : return NULL;
1805 : /* TODO decompress updates if any */
1806 1 : if (cs->bid)
1807 1 : temp_destroy(cs->bid);
1808 1 : n = transfer_to_systrans(n);
1809 1 : if (n == NULL)
1810 : return NULL;
1811 1 : bat_set_access(n, BAT_READ);
1812 1 : cs->bid = temp_create(n);
1813 1 : cs->st = ST_DEFAULT;
1814 : /* at append_col the column's storage type is cleared */
1815 1 : cs->cleared = true;
1816 1 : b = n;
1817 : } else { /* append */
1818 : i = newoffsets;
1819 : }
1820 : }
1821 1 : bat_destroy(b);
1822 1 : return i;
1823 : }
1824 :
1825 : static int
1826 6984 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
1827 : {
1828 6984 : void *oupd = upd;
1829 6984 : sql_delta *bat = *batp;
1830 6984 : column_storage *cs = &bat->cs;
1831 6984 : storage *s = ATOMIC_PTR_GET(&t->data);
1832 6984 : assert(!is_oid_nil(rid));
1833 6984 : int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
1834 :
1835 6984 : if (cs->st == ST_DICT) {
1836 : /* possibly a new array is returned */
1837 0 : upd = dict_append_val(tr, batp, upd, 1);
1838 0 : bat = *batp;
1839 0 : cs = &bat->cs;
1840 0 : if (!upd)
1841 : return LOG_ERR;
1842 : }
1843 :
1844 : /* check if rid is insert ? */
1845 6984 : if (!inplace) {
1846 : /* check conflict */
1847 4050 : if (segments_is_deleted(s->segs->h, tr, rid)) {
1848 0 : if (oupd != upd)
1849 0 : GDKfree(upd);
1850 0 : return LOG_CONFLICT;
1851 : }
1852 4050 : BAT *ui, *uv;
1853 :
1854 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1855 : /* currently only one update delta is possible */
1856 4050 : if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1857 0 : if (oupd != upd)
1858 0 : GDKfree(upd);
1859 0 : return LOG_ERR;
1860 : }
1861 :
1862 4050 : assert(uv->ttype);
1863 4050 : assert(BATcount(ui) == BATcount(uv));
1864 8100 : if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
1865 4050 : BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
1866 0 : if (oupd != upd)
1867 0 : GDKfree(upd);
1868 0 : bat_destroy(ui);
1869 0 : bat_destroy(uv);
1870 0 : return LOG_ERR;
1871 : }
1872 4050 : assert(BATcount(ui) == BATcount(uv));
1873 4050 : bat_destroy(ui);
1874 4050 : bat_destroy(uv);
1875 4050 : cs->ucnt++;
1876 : } else {
1877 2934 : BAT *b = NULL;
1878 :
1879 2934 : if((b = temp_descriptor(cs->bid)) == NULL) {
1880 0 : if (oupd != upd)
1881 0 : GDKfree(upd);
1882 0 : return LOG_ERR;
1883 : }
1884 2934 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
1885 0 : if (oupd != upd)
1886 0 : GDKfree(upd);
1887 0 : bat_destroy(b);
1888 0 : return LOG_ERR;
1889 : }
1890 2934 : bat_destroy(b);
1891 : }
1892 6984 : if (oupd != upd)
1893 0 : GDKfree(upd);
1894 : return LOG_OK;
1895 : }
1896 :
1897 : static int
1898 6984 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
1899 : {
1900 6984 : int res = LOG_OK;
1901 6984 : lock_table(tr->store, t->base.id);
1902 6984 : res = cs_update_val(tr, bat, t, rid, upd, is_new);
1903 6984 : unlock_table(tr->store, t->base.id);
1904 6984 : return res;
1905 : }
1906 :
1907 : static int
1908 158989 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
1909 : {
1910 158989 : (void)tr;
1911 158989 : if (!ocs)
1912 : return LOG_OK;
1913 158989 : cs->bid = ocs->bid;
1914 158989 : cs->ebid = ocs->ebid;
1915 158989 : cs->uibid = ocs->uibid;
1916 158989 : cs->uvbid = ocs->uvbid;
1917 158989 : cs->ucnt = ocs->ucnt;
1918 :
1919 158989 : if (temp) {
1920 25979 : cs->bid = temp_copy(cs->bid, true, false);
1921 25972 : if (cs->bid == BID_NIL)
1922 : return LOG_ERR;
1923 : } else {
1924 133010 : temp_dup(cs->bid);
1925 : }
1926 158976 : if (cs->ebid)
1927 6 : temp_dup(cs->ebid);
1928 158976 : cs->ucnt = 0;
1929 158976 : cs->uibid = e_bat(TYPE_oid);
1930 159011 : cs->uvbid = e_bat(type);
1931 159010 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1932 : return LOG_ERR;
1933 159010 : cs->st = ocs->st;
1934 159010 : return LOG_OK;
1935 : }
1936 :
1937 : static void
1938 326384 : destroy_delta(sql_delta *b, bool recursive)
1939 : {
1940 326384 : if (ATOMIC_DEC(&b->cs.refcnt) > 0)
1941 : return;
1942 304973 : if (recursive && b->next)
1943 127501 : destroy_delta(b->next, true);
1944 304973 : if (b->cs.uibid)
1945 102021 : temp_destroy(b->cs.uibid);
1946 304973 : if (b->cs.uvbid)
1947 102021 : temp_destroy(b->cs.uvbid);
1948 304973 : if (b->cs.bid)
1949 304973 : temp_destroy(b->cs.bid);
1950 304973 : if (b->cs.ebid)
1951 61 : temp_destroy(b->cs.ebid);
1952 304973 : b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
1953 304973 : _DELETE(b);
1954 : }
1955 :
1956 : static sql_delta *
1957 17845634 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
1958 : {
1959 17845634 : sql_delta *obat = ATOMIC_PTR_GET(&c->data);
1960 :
1961 17845634 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
1962 17712678 : return obat;
1963 132956 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
1964 : /* abort */
1965 12 : if (update_conflict)
1966 4 : *update_conflict = true;
1967 8 : else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
1968 8 : return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
1969 4 : return NULL;
1970 : }
1971 132944 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
1972 : return NULL;
1973 132946 : sql_delta* bat = ZNEW(sql_delta);
1974 132982 : if (!bat)
1975 : return NULL;
1976 132982 : ATOMIC_INIT(&bat->cs.refcnt, 1);
1977 132982 : int type = c->type.multiset?TYPE_int:c->type.type->localtype;
1978 132982 : if (dup_cs(tr, &obat->cs, &bat->cs, type, 0) != LOG_OK) {
1979 0 : destroy_delta(bat, false);
1980 0 : return NULL;
1981 : }
1982 132996 : bat->cs.ts = tr->tid;
1983 : /* only one writer else abort */
1984 132996 : bat->next = obat;
1985 132996 : if (obat)
1986 132996 : bat->nr_updates = obat->nr_updates;
1987 132996 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
1988 0 : bat->next = NULL;
1989 0 : destroy_delta(bat, false);
1990 0 : if (update_conflict)
1991 0 : *update_conflict = true;
1992 0 : return NULL;
1993 : }
1994 : return bat;
1995 : }
1996 :
1997 : static int
1998 10147 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
1999 : {
2000 10147 : int ok = LOG_OK;
2001 :
2002 10147 : if (is_bat) {
2003 3163 : BAT *tids = incoming_tids;
2004 3163 : BAT *values = incoming_values;
2005 3163 : if (BATcount(tids) == 0)
2006 : return LOG_OK;
2007 3163 : ok = delta_update_bat(tr, delta, table, tids, values, is_new);
2008 : } else {
2009 6984 : ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
2010 : }
2011 : return ok;
2012 : }
2013 :
2014 : static int
2015 10186 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
2016 : {
2017 10186 : int res = LOG_OK;
2018 10186 : bool update_conflict = false;
2019 10186 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2020 :
2021 10186 : if (isbat) {
2022 3199 : BAT *t = tids;
2023 3199 : if (!BATcount(t))
2024 : return LOG_OK;
2025 : }
2026 :
2027 9872 : if (c == NULL)
2028 : return LOG_ERR;
2029 :
2030 9872 : if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
2031 4 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2032 :
2033 9868 : assert(delta && delta->cs.ts == tr->tid);
2034 9868 : assert(c->t->persistence != SQL_DECLARED_TABLE);
2035 9868 : if (odelta != delta)
2036 3489 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
2037 :
2038 9868 : odelta = delta;
2039 9868 : if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
2040 : return res;
2041 9868 : assert(delta == odelta);
2042 9868 : if (delta->cs.st == ST_DEFAULT && !c->type.multiset && c->storage_type)
2043 0 : res = sql_trans_alter_storage(tr, c, NULL);
2044 : return res;
2045 : }
2046 :
2047 : static sql_delta *
2048 2462 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
2049 : {
2050 2462 : sql_delta *obat = ATOMIC_PTR_GET(&i->data);
2051 :
2052 2462 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
2053 2434 : return obat;
2054 28 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
2055 : /* abort */
2056 0 : if (update_conflict)
2057 0 : *update_conflict = true;
2058 0 : return NULL;
2059 : }
2060 28 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
2061 : return NULL;
2062 28 : sql_delta* bat = ZNEW(sql_delta);
2063 28 : if (!bat)
2064 : return NULL;
2065 28 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2066 33 : if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
2067 0 : destroy_delta(bat, false);
2068 0 : return NULL;
2069 : }
2070 28 : bat->cs.ts = tr->tid;
2071 : /* only one writer else abort */
2072 28 : bat->next = obat;
2073 28 : if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
2074 0 : bat->next = NULL;
2075 0 : destroy_delta(bat, false);
2076 0 : if (update_conflict)
2077 0 : *update_conflict = true;
2078 0 : return NULL;
2079 : }
2080 : return bat;
2081 : }
2082 :
2083 : static int
2084 782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
2085 : {
2086 782 : int res = LOG_OK;
2087 782 : bool update_conflict = false;
2088 782 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
2089 :
2090 782 : if (isbat) {
2091 782 : BAT *t = tids;
2092 782 : if (!BATcount(t))
2093 : return LOG_OK;
2094 : }
2095 :
2096 279 : if (i == NULL)
2097 : return LOG_ERR;
2098 :
2099 279 : if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
2100 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2101 :
2102 279 : assert(delta && delta->cs.ts == tr->tid);
2103 279 : if (odelta != delta)
2104 22 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
2105 :
2106 279 : odelta = delta;
2107 279 : res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
2108 279 : assert(delta == odelta);
2109 : return res;
2110 : }
2111 :
2112 : static int
2113 149632 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
2114 : {
2115 149632 : BAT *b, *oi = i;
2116 149632 : int err = 0;
2117 149632 : sql_delta *bat = *batp;
2118 :
2119 149632 : assert(!offsets || BATcount(offsets) == BATcount(i));
2120 149632 : if (!BATcount(i))
2121 : return LOG_OK;
2122 149632 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
2123 : return LOG_ERR;
2124 :
2125 149632 : lock_column(tr->store, id);
2126 150538 : if (bat->cs.st == ST_DICT) {
2127 13 : BAT *ni = dict_append_bat(tr, batp, oi);
2128 13 : bat = *batp;
2129 13 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2130 0 : bat_destroy(oi);
2131 13 : oi = ni;
2132 13 : if (!oi) {
2133 0 : unlock_column(tr->store, id);
2134 0 : return LOG_ERR;
2135 : }
2136 : }
2137 150538 : if (bat->cs.st == ST_FOR) {
2138 0 : BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
2139 0 : bat = *batp;
2140 0 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2141 0 : bat_destroy(oi);
2142 0 : oi = ni;
2143 0 : if (!oi) {
2144 0 : unlock_column(tr->store, id);
2145 0 : return LOG_ERR;
2146 : }
2147 : }
2148 :
2149 150538 : b = temp_descriptor(bat->cs.bid);
2150 150089 : if (b == NULL) {
2151 0 : unlock_column(tr->store, id);
2152 0 : if (oi != i)
2153 0 : bat_destroy(oi);
2154 0 : return LOG_ERR;
2155 : }
2156 150089 : if (!offsets && offset == b->hseqbase+BATcount(b)) {
2157 149901 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2158 0 : err = 1;
2159 176 : } else if (!offsets) {
2160 176 : if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
2161 0 : err = 1;
2162 12 : } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
2163 0 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2164 0 : err = 1;
2165 12 : } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
2166 0 : err = 1;
2167 : }
2168 149816 : bat_destroy(b);
2169 150807 : unlock_column(tr->store, id);
2170 :
2171 150720 : if (oi != i)
2172 13 : bat_destroy(oi);
2173 150720 : return (err)?LOG_ERR:LOG_OK;
2174 : }
2175 :
2176 : // Look at the offsets and find where the replacements end and the appends begin.
2177 : static BUN
2178 0 : start_of_appends(BAT *offsets, BUN bcnt)
2179 : {
2180 0 : BUN ocnt = BATcount(offsets);
2181 0 : if (ocnt == 0)
2182 : return 0;
2183 :
2184 0 : BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
2185 0 : if (highest < bcnt)
2186 : // all are replacements
2187 : return ocnt;
2188 :
2189 : // reason backward to find the first append.
2190 : // Suppose offsets has 15 entries, bcnt == 100
2191 : // and the highest offset in offsets is 109.
2192 0 : BUN new_bcnt = highest + 1; // 110
2193 0 : BUN nappends = new_bcnt - bcnt; // 10
2194 0 : BUN nreplacements = ocnt - nappends; // 5
2195 :
2196 : // The first append should be to position bcnt
2197 0 : assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
2198 :
2199 : return nreplacements;
2200 : }
2201 :
2202 :
2203 : static int
2204 17554284 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
2205 : {
2206 17554284 : void *oi = i;
2207 17554284 : BAT *b;
2208 17554284 : lock_column(tr->store, id);
2209 17570298 : sql_delta *bat = *batp;
2210 :
2211 17570298 : if (bat->cs.st == ST_DICT) {
2212 : /* possibly a new array is returned */
2213 4 : i = dict_append_val(tr, batp, i, cnt);
2214 4 : bat = *batp;
2215 4 : if (!i) {
2216 0 : unlock_column(tr->store, id);
2217 0 : return LOG_ERR;
2218 : }
2219 : }
2220 17570298 : if (bat->cs.st == ST_FOR) {
2221 : /* possibly a new array is returned */
2222 1 : i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
2223 1 : bat = *batp;
2224 1 : if (!i) {
2225 0 : unlock_column(tr->store, id);
2226 0 : return LOG_ERR;
2227 : }
2228 : }
2229 :
2230 17570298 : b = temp_descriptor(bat->cs.bid);
2231 17567696 : if (b == NULL) {
2232 0 : if (i != oi)
2233 0 : GDKfree(i);
2234 0 : unlock_column(tr->store, id);
2235 0 : return LOG_ERR;
2236 : }
2237 17567696 : BUN bcnt = BATcount(b);
2238 :
2239 17567696 : if (offsets) {
2240 : // The first few might be replacements while later items might be appends.
2241 : // Handle the replacements here while leaving the appends to the code below.
2242 0 : BUN nreplacements = start_of_appends(offsets, bcnt);
2243 :
2244 0 : oid *start = Tloc(offsets, 0);
2245 0 : if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
2246 0 : bat_destroy(b);
2247 0 : if (i != oi)
2248 0 : GDKfree(i);
2249 0 : unlock_column(tr->store, id);
2250 0 : return LOG_ERR;
2251 : }
2252 :
2253 : // Replacements have been handled. The rest are appends.
2254 0 : assert(offset == oid_nil);
2255 0 : offset = bcnt;
2256 0 : cnt -= nreplacements;
2257 : }
2258 :
2259 17567696 : if (bcnt > offset){
2260 612666 : size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
2261 612666 : if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
2262 0 : bat_destroy(b);
2263 0 : if (i != oi)
2264 0 : GDKfree(i);
2265 0 : unlock_column(tr->store, id);
2266 0 : return LOG_ERR;
2267 : }
2268 614922 : cnt -= ccnt;
2269 614922 : offset += ccnt;
2270 : }
2271 17569952 : if (cnt) {
2272 16955010 : if (BATcount(b) < offset) { /* add space */
2273 6534 : BUN d = offset - BATcount(b);
2274 6534 : if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
2275 0 : bat_destroy(b);
2276 0 : if (i != oi)
2277 0 : GDKfree(i);
2278 0 : unlock_column(tr->store, id);
2279 0 : return LOG_ERR;
2280 : }
2281 : }
2282 16955010 : if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
2283 0 : bat_destroy(b);
2284 0 : if (i != oi)
2285 0 : GDKfree(i);
2286 0 : unlock_column(tr->store, id);
2287 0 : return LOG_ERR;
2288 : }
2289 : }
2290 17571621 : bat_destroy(b);
2291 17565947 : if (i != oi)
2292 4 : GDKfree(i);
2293 17565947 : unlock_column(tr->store, id);
2294 17565947 : return LOG_OK;
2295 : }
2296 :
2297 : static int
2298 25987 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
2299 : {
2300 25987 : if (!(bat->segs = new_segments(tr, 0)))
2301 : return LOG_ERR;
2302 25982 : return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
2303 : }
2304 :
2305 : static int
2306 17703734 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type)
2307 : {
2308 17703734 : int ok = LOG_OK;
2309 :
2310 17703734 : if ((*delta)->cs.merged)
2311 38039 : (*delta)->cs.merged = false; /* TODO needs to move */
2312 17703734 : if (isbat) {
2313 149790 : BAT *bat = incoming_data;
2314 :
2315 149790 : if (BATcount(bat))
2316 149895 : ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
2317 : } else {
2318 17553944 : ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
2319 : }
2320 17727183 : return ok;
2321 : }
2322 :
2323 : static int
2324 17710715 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2325 : {
2326 17710715 : int res = LOG_OK;
2327 17710715 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2328 :
2329 17710715 : if (isbat) {
2330 149875 : BAT *t = data;
2331 149875 : if (!BATcount(t))
2332 : return LOG_OK;
2333 : }
2334 :
2335 17709936 : if ((delta = bind_col_data(tr, c, NULL)) == NULL)
2336 : return LOG_ERR;
2337 :
2338 17706221 : assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
2339 :
2340 17706221 : odelta = delta;
2341 17706221 : if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type)) != LOG_OK)
2342 : return res;
2343 17724805 : if (odelta != delta) {
2344 0 : delta->next = odelta;
2345 0 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
2346 0 : delta->next = NULL;
2347 0 : destroy_delta(delta, false);
2348 0 : return LOG_CONFLICT;
2349 : }
2350 : }
2351 17724805 : if (delta->cs.st == ST_DEFAULT && !c->type.multiset && c->storage_type)
2352 1 : res = sql_trans_alter_storage(tr, c, NULL);
2353 : return res;
2354 : }
2355 :
2356 : static int
2357 2189 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2358 : {
2359 2189 : int res = LOG_OK;
2360 2189 : sql_delta *delta;
2361 :
2362 2189 : if (isbat) {
2363 1015 : BAT *t = data;
2364 1015 : if (!BATcount(t))
2365 : return LOG_OK;
2366 : }
2367 :
2368 2177 : if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
2369 : return LOG_ERR;
2370 :
2371 2177 : assert(delta->cs.st == ST_DEFAULT);
2372 :
2373 2177 : res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL);
2374 2177 : return res;
2375 : }
2376 :
2377 : static int
2378 80034 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
2379 : {
2380 80034 : int err = 0;
2381 :
2382 : /* TODO check for conflicting updates */
2383 80034 : (void)rid;
2384 80034 : (void)cnt;
2385 653688 : for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
2386 573654 : sql_column *c = n->data;
2387 573654 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
2388 :
2389 : /* check for active updates */
2390 573654 : if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
2391 : return 1;
2392 : }
2393 : return 0;
2394 : }
2395 :
2396 : static int
2397 76126 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
2398 : {
2399 76126 : lock_table(tr->store, t->base.id);
2400 :
2401 76126 : int in_transaction = segments_in_transaction(tr, t);
2402 :
2403 : /* find segment of rid, split, mark new segment deleted (for tr->tid) */
2404 76126 : segment *seg = s->segs->h, *p = NULL;
2405 52573588 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2406 52573588 : if (seg->start <= rid && seg->end > rid) {
2407 76126 : if (!SEG_VALID_4_DELETE(seg,tr)) {
2408 4 : unlock_table(tr->store, t->base.id);
2409 4 : return LOG_CONFLICT;
2410 : }
2411 76122 : if (deletes_conflict_updates( tr, t, rid, 1)) {
2412 0 : unlock_table(tr->store, t->base.id);
2413 0 : return LOG_CONFLICT;
2414 : }
2415 76122 : if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
2416 0 : unlock_table(tr->store, t->base.id);
2417 0 : return LOG_ERR;
2418 : }
2419 : break;
2420 : }
2421 : }
2422 76122 : unlock_table(tr->store, t->base.id);
2423 76122 : if (!in_transaction)
2424 13355 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2425 : return LOG_OK;
2426 : }
2427 :
2428 : static int
2429 3910 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
2430 : {
2431 3910 : segment *seg = *Seg, *p = NULL;
2432 13341 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2433 13339 : if (seg->start <= start && seg->end > start) {
2434 3960 : size_t lcnt = cnt;
2435 3960 : if (start+lcnt > seg->end)
2436 54 : lcnt = seg->end-start;
2437 3960 : if (SEG_IS_DELETED(seg, tr)) {
2438 47 : start += lcnt;
2439 47 : cnt -= lcnt;
2440 47 : continue;
2441 3913 : } else if (!SEG_VALID_4_DELETE(seg, tr))
2442 1 : return LOG_CONFLICT;
2443 3912 : if (deletes_conflict_updates( tr, t, start, lcnt))
2444 : return LOG_CONFLICT;
2445 3912 : *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
2446 3912 : if (!seg)
2447 : return LOG_ERR;
2448 3912 : start += lcnt;
2449 3912 : cnt -= lcnt;
2450 : }
2451 13291 : if (start+cnt <= seg->end)
2452 : break;
2453 : }
2454 : return LOG_OK;
2455 : }
2456 :
2457 : static int
2458 735 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
2459 : {
2460 735 : segment *seg = s->segs->h;
2461 735 : return seg_delete_range(tr, t, s, &seg, start, cnt);
2462 : }
2463 :
2464 : static int
2465 319 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
2466 : {
2467 319 : int in_transaction = segments_in_transaction(tr, t);
2468 319 : BAT *oi = i; /* update ids */
2469 319 : int ok = LOG_OK;
2470 :
2471 319 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
2472 : return LOG_ERR;
2473 319 : if (BATcount(i)) {
2474 568 : if (BATtdense(i)) {
2475 249 : size_t start = i->tseqbase;
2476 249 : size_t cnt = BATcount(i);
2477 :
2478 249 : lock_table(tr->store, t->base.id);
2479 249 : ok = delete_range(tr, t, s, start, cnt);
2480 249 : unlock_table(tr->store, t->base.id);
2481 70 : } else if (complex_cand(i)) {
2482 0 : struct canditer ci;
2483 0 : oid f = 0, l = 0, cur = 0;
2484 :
2485 0 : canditer_init(&ci, NULL, i);
2486 0 : cur = f = canditer_next(&ci);
2487 :
2488 0 : lock_table(tr->store, t->base.id);
2489 0 : if (!is_oid_nil(f)) {
2490 0 : segment *seg = s->segs->h;
2491 0 : for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
2492 0 : if (cur+1 == l) {
2493 0 : cur++;
2494 0 : continue;
2495 : }
2496 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2497 0 : f = cur = l;
2498 : }
2499 0 : if (ok == LOG_OK)
2500 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2501 : }
2502 0 : unlock_table(tr->store, t->base.id);
2503 : } else {
2504 70 : if (!i->tsorted) {
2505 0 : assert(oi == i);
2506 0 : BAT *ni = NULL;
2507 0 : if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
2508 0 : ok = LOG_ERR;
2509 0 : if (ni)
2510 0 : i = ni;
2511 : }
2512 70 : assert(i->tsorted);
2513 70 : BUN icnt = BATcount(i);
2514 70 : BATiter ii = bat_iterator(i);
2515 70 : oid *o = ii.base, n = o[0]+1;
2516 70 : size_t lcnt = 1;
2517 :
2518 70 : lock_table(tr->store, t->base.id);
2519 70 : segment *seg = s->segs->h;
2520 23233 : for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
2521 23163 : if (o[i] == n) {
2522 22803 : lcnt++;
2523 22803 : n++;
2524 : } else {
2525 360 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2526 360 : lcnt = 0;
2527 : }
2528 23163 : if (!lcnt) {
2529 360 : n = o[i]+1;
2530 360 : lcnt = 1;
2531 : }
2532 : }
2533 70 : bat_iterator_end(&ii);
2534 70 : if (lcnt && ok == LOG_OK)
2535 70 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2536 70 : unlock_table(tr->store, t->base.id);
2537 : }
2538 : }
2539 319 : if (i != oi)
2540 25 : bat_destroy(i);
2541 : // assert
2542 319 : if (!in_transaction)
2543 271 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2544 : return ok;
2545 : }
2546 :
2547 : static void
2548 52888 : destroy_segments(segments *s)
2549 : {
2550 52888 : if (!s || sql_ref_dec(&s->r) > 0)
2551 0 : return;
2552 52888 : segment *seg = s->h;
2553 116970 : while(seg) {
2554 64082 : segment *n = ATOMIC_PTR_GET(&seg->next);
2555 64082 : _DELETE(seg);
2556 64082 : seg = n;
2557 : }
2558 52888 : _DELETE(s);
2559 : }
2560 :
2561 : static void
2562 54349 : destroy_storage(storage *bat)
2563 : {
2564 54349 : if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
2565 : return;
2566 52741 : if (bat->next)
2567 5946 : destroy_storage(bat->next);
2568 52741 : destroy_segments(bat->segs);
2569 52741 : if (bat->cs.uibid)
2570 31243 : temp_destroy(bat->cs.uibid);
2571 52741 : if (bat->cs.uvbid)
2572 31243 : temp_destroy(bat->cs.uvbid);
2573 52741 : if (bat->cs.bid)
2574 52741 : temp_destroy(bat->cs.bid);
2575 52741 : bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
2576 52741 : _DELETE(bat);
2577 : }
2578 :
2579 : static int
2580 175551 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
2581 : {
2582 175551 : if (uncommitted) {
2583 456827 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2584 302179 : if (!VALID_4_READ(s->ts,tr))
2585 : return 1;
2586 : } else {
2587 302920 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2588 283420 : if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
2589 : return 1;
2590 : }
2591 :
2592 : return 0;
2593 : }
2594 :
2595 : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
2596 :
2597 : storage *
2598 2248754 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
2599 : {
2600 2248754 : storage *obat;
2601 :
2602 2248754 : obat = ATOMIC_PTR_GET(&t->data);
2603 :
2604 2248754 : if (obat->cs.ts != tr->tid)
2605 1555517 : if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
2606 1555462 : if (obat->cs.ts >= TRANSACTION_ID_BASE) {
2607 : /* abort */
2608 15397 : if (clear)
2609 15397 : *clear = true;
2610 15397 : return NULL;
2611 : }
2612 :
2613 2233357 : if (!clear)
2614 : return obat;
2615 :
2616 : /* remainder is only to handle clear */
2617 26375 : if (segments_conflict(tr, obat->segs, 1)) {
2618 398 : *clear = true;
2619 398 : return NULL;
2620 : }
2621 25981 : if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
2622 : return NULL;
2623 25984 : storage *bat = ZNEW(storage);
2624 25988 : if (!bat)
2625 : return NULL;
2626 25988 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2627 25988 : if (dup_storage(tr, obat, bat) != LOG_OK) {
2628 0 : destroy_storage(bat);
2629 0 : return NULL;
2630 : }
2631 25988 : bat->cs.cleared = true;
2632 25988 : bat->cs.ts = tr->tid;
2633 : /* only one writer else abort */
2634 25988 : bat->next = obat;
2635 25988 : if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
2636 10 : bat->next = NULL;
2637 10 : destroy_storage(bat);
2638 10 : if (clear)
2639 10 : *clear = true;
2640 10 : return NULL;
2641 : }
2642 : return bat;
2643 : }
2644 :
2645 : static int
2646 76493 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
2647 : {
2648 76493 : int ok = LOG_OK;
2649 76493 : BAT *b = ib;
2650 76493 : storage *bat;
2651 :
2652 76493 : if (isbat && !BATcount(b))
2653 : return ok;
2654 :
2655 76445 : if (t == NULL)
2656 : return LOG_ERR;
2657 :
2658 76445 : if ((bat = bind_del_data(tr, t, NULL)) == NULL)
2659 : return LOG_ERR;
2660 :
2661 76445 : if (isbat)
2662 319 : ok = storage_delete_bat(tr, t, bat, ib);
2663 : else
2664 76126 : ok = storage_delete_val(tr, t, bat, *(oid*)ib);
2665 : return ok;
2666 : }
2667 :
2668 : static size_t
2669 0 : dcount_col(sql_trans *tr, sql_column *c)
2670 : {
2671 0 : sql_delta *b;
2672 :
2673 0 : if (!isTable(c->t))
2674 : return 0;
2675 0 : b = col_timestamp_delta(tr, c);
2676 0 : if (!b)
2677 : return 1;
2678 :
2679 0 : storage *s = ATOMIC_PTR_GET(&c->t->data);
2680 0 : if (!s || !s->segs->t)
2681 : return 1;
2682 0 : size_t cnt = s->segs->t->end;
2683 0 : if (cnt) {
2684 0 : BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
2685 0 : size_t dcnt = 0;
2686 :
2687 0 : if (v)
2688 0 : dcnt = BATguess_uniques(v, NULL);
2689 0 : return dcnt;
2690 : }
2691 : return cnt;
2692 : }
2693 :
2694 : static BAT *
2695 4034514 : bind_no_view(BAT *b, bool quick)
2696 : {
2697 4034514 : if (VIEWtparent(b)) { /* If it is a view get the parent BAT */
2698 4030371 : BAT *nb = BBP_desc(VIEWtparent(b));
2699 4030371 : bat_destroy(b);
2700 4030350 : if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
2701 : return NULL;
2702 : }
2703 : return b;
2704 : }
2705 :
2706 : static int
2707 0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
2708 : {
2709 0 : int ok = 0;
2710 0 : assert(tr->active);
2711 0 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2712 0 : return 0;
2713 0 : lock_column(tr->store, c->base.id);
2714 0 : if (unique_est) {
2715 0 : sql_delta *d;
2716 0 : if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
2717 0 : BAT *b;
2718 0 : if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
2719 0 : MT_lock_set(&b->theaplock);
2720 0 : b->tunique_est = *unique_est;
2721 0 : MT_lock_unset(&b->theaplock);
2722 0 : bat_destroy(b);
2723 : }
2724 : }
2725 : }
2726 0 : int type = c->type.multiset?TYPE_int:c->type.type->localtype;
2727 0 : if (min) {
2728 0 : _DELETE(c->min);
2729 0 : size_t minlen = ATOMlen(type, min);
2730 0 : if ((c->min = GDKmalloc(minlen)) != NULL) {
2731 0 : memcpy(c->min, min, minlen);
2732 0 : ok = 1;
2733 : }
2734 : }
2735 0 : if (max) {
2736 0 : _DELETE(c->max);
2737 0 : size_t maxlen = ATOMlen(type, max);
2738 0 : if ((c->max = GDKmalloc(maxlen)) != NULL) {
2739 0 : memcpy(c->max, max, maxlen);
2740 0 : ok = 1;
2741 : }
2742 : }
2743 0 : unlock_column(tr->store, c->base.id);
2744 0 : return ok;
2745 : }
2746 :
2747 : static int
2748 19 : min_max_col(sql_trans *tr, sql_column *c)
2749 : {
2750 19 : int ok = 0;
2751 19 : BAT *b = NULL;
2752 19 : sql_delta *d = NULL;
2753 :
2754 19 : assert(tr->active);
2755 19 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2756 0 : return 0;
2757 19 : if (c->min && c->max)
2758 : return 1;
2759 19 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2760 19 : if (d->cs.st == ST_FOR)
2761 : return 0;
2762 19 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2763 19 : lock_column(tr->store, c->base.id);
2764 19 : if (c->min && c->max) {
2765 0 : unlock_column(tr->store, c->base.id);
2766 0 : return 1;
2767 : }
2768 19 : _DELETE(c->min);
2769 19 : _DELETE(c->max);
2770 19 : if ((b = bind_col(tr, c, access))) {
2771 19 : if (!(b = bind_no_view(b, false))) {
2772 0 : unlock_column(tr->store, c->base.id);
2773 0 : return 0;
2774 : }
2775 19 : BATiter bi = bat_iterator(b);
2776 19 : if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
2777 16 : const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
2778 16 : size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
2779 :
2780 16 : if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
2781 0 : _DELETE(c->min);
2782 0 : _DELETE(c->max);
2783 : } else {
2784 16 : memcpy(c->min, nmin, minlen);
2785 16 : memcpy(c->max, nmax, maxlen);
2786 16 : ok = 1;
2787 : }
2788 : }
2789 19 : bat_iterator_end(&bi);
2790 19 : bat_destroy(b);
2791 : }
2792 19 : unlock_column(tr->store, c->base.id);
2793 : }
2794 : return ok;
2795 : }
2796 :
2797 : static size_t
2798 17 : count_segs(segment *s)
2799 : {
2800 17 : size_t nr = 0;
2801 :
2802 72 : for( ; s; s = ATOMIC_PTR_GET(&s->next))
2803 55 : nr++;
2804 17 : return nr;
2805 : }
2806 :
2807 : static size_t
2808 1472597 : count_del(sql_trans *tr, sql_table *t, int access)
2809 : {
2810 1472597 : storage *d;
2811 :
2812 1472597 : if (!isTable(t))
2813 : return 0;
2814 1472597 : d = tab_timestamp_storage(tr, t);
2815 1473008 : if (!d)
2816 : return 0;
2817 1473008 : if (access == 2)
2818 0 : return d->cs.ucnt;
2819 1473008 : if (access == 1)
2820 0 : return count_inserts(d->segs->h, tr);
2821 1473008 : if (access == CNT_ACTIVE) {
2822 839064 : size_t cnt = segs_end(d->segs, tr, t);
2823 839688 : lock_table(tr->store, t->base.id);
2824 839650 : cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
2825 839554 : unlock_table(tr->store, t->base.id);
2826 839554 : return cnt;
2827 : }
2828 633944 : if (access == CNT_SEGS) /* special case for counting the number of segments */
2829 17 : return count_segs(d->segs->h);
2830 633927 : if (CNT_RDONLY)
2831 633927 : return segs_end(d->segs, tr, t);
2832 : return count_deletes(d->segs->h, tr);
2833 : }
2834 :
2835 : static int
2836 18772 : sorted_col(sql_trans *tr, sql_column *col)
2837 : {
2838 18772 : int sorted = 0;
2839 :
2840 18772 : assert(tr->active);
2841 18772 : if (!isTable(col->t) || !col->t->s)
2842 : return 0;
2843 :
2844 18772 : if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
2845 18754 : BAT *b = bind_col(tr, col, QUICK);
2846 :
2847 18754 : if (b)
2848 18754 : sorted = b->tsorted || b->trevsorted;
2849 : }
2850 : return sorted;
2851 : }
2852 :
2853 : static int
2854 6815 : unique_col(sql_trans *tr, sql_column *col)
2855 : {
2856 6815 : int distinct = 0;
2857 :
2858 6815 : assert(tr->active);
2859 6815 : if (!isTable(col->t) || !col->t->s)
2860 : return 0;
2861 :
2862 6815 : if (col && ATOMIC_PTR_GET(&col->data)) {
2863 6815 : BAT *b = bind_col(tr, col, QUICK);
2864 :
2865 6815 : if (b)
2866 6815 : distinct = b->tkey;
2867 : }
2868 : return distinct;
2869 : }
2870 :
2871 : static int
2872 1872 : double_elim_col(sql_trans *tr, sql_column *col)
2873 : {
2874 1872 : int de = 0;
2875 1872 : sql_delta *d;
2876 :
2877 1872 : assert(tr->active);
2878 1872 : if (!isTable(col->t) || !col->t->s)
2879 : return 0;
2880 :
2881 1872 : if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
2882 6 : if (d->cs.st == ST_DICT) {
2883 6 : BAT *b = bind_col(tr, col, QUICK);
2884 6 : if (b && b->ttype == TYPE_bte)
2885 : de = 1;
2886 0 : else if (b && b->ttype == TYPE_sht)
2887 1872 : de = 2;
2888 : }
2889 1866 : } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
2890 1866 : BAT *b = bind_col(tr, col, QUICK);
2891 :
2892 1866 : if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
2893 1866 : de = GDK_ELIMDOUBLES(b->tvheap);
2894 1866 : if (de)
2895 1754 : de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
2896 : }
2897 1754 : assert(de >= 0 && de <= 16);
2898 : }
2899 : return de;
2900 : }
2901 :
2902 : static int
2903 4070157 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
2904 : {
2905 4070157 : int ok = 0;
2906 4070157 : BAT *b = NULL, *off = NULL, *upv = NULL;
2907 4070157 : sql_delta *d = NULL;
2908 :
2909 4070157 : (void) tr;
2910 4070157 : assert(tr->active);
2911 4070157 : *nonil = false;
2912 4070157 : *unique = false;
2913 4070157 : *unique_est = 0.0;
2914 4070157 : if (!c || !isTable(c->t) || !c->t->s || c->type.type->composite)
2915 : return ok;
2916 :
2917 4069931 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2918 4030983 : if (d->cs.st == ST_FOR) {
2919 27 : *nonil = true; /* TODO for min/max. I will do it later */
2920 27 : return ok;
2921 : }
2922 4030956 : int eclass = c->type.type->eclass;
2923 4030956 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2924 4030956 : if ((b = bind_col(tr, c, access))) {
2925 4031199 : if (!(b = bind_no_view(b, false)))
2926 0 : return ok;
2927 4031540 : BATiter bi = bat_iterator(b);
2928 4031561 : *nonil = bi.nonil && !bi.nil;
2929 :
2930 4031561 : if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
2931 3719942 : d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
2932 2375021 : if (c->min && VALinit(min, bi.type, c->min))
2933 : ok |= 1;
2934 2374967 : else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
2935 2365140 : ok |= 1;
2936 2374994 : if (c->max && VALinit(max, bi.type, c->max))
2937 54 : ok |= 2;
2938 2374938 : else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
2939 2358901 : ok |= 2;
2940 : }
2941 4031486 : if (d->cs.ucnt == 0) {
2942 4026532 : if (d->cs.st == ST_DEFAULT) {
2943 4025831 : *unique = bi.key;
2944 4025831 : *unique_est = bi.unique_est;
2945 4025831 : if (*unique_est == 0)
2946 1298453 : *unique_est = (double)BATguess_uniques(b,NULL);
2947 701 : } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
2948 : /* for dict, check the offsets bat for uniqueness */
2949 701 : MT_lock_set(&off->theaplock);
2950 701 : *unique = off->tkey;
2951 701 : *unique_est = off->tunique_est;
2952 701 : MT_lock_unset(&off->theaplock);
2953 : }
2954 : }
2955 4031925 : bat_iterator_end(&bi);
2956 4031340 : bat_destroy(b);
2957 4031423 : if (*nonil && d->cs.ucnt > 0) {
2958 : /* This could use a quick descriptor */
2959 2851 : if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
2960 0 : *nonil = false;
2961 : } else {
2962 2851 : MT_lock_set(&upv->theaplock);
2963 2851 : *nonil &= upv->tnonil && !upv->tnil;
2964 2851 : MT_lock_unset(&upv->theaplock);
2965 2851 : bat_destroy(upv);
2966 : }
2967 : }
2968 : }
2969 : }
2970 : return ok;
2971 : }
2972 :
2973 : static int
2974 257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
2975 : {
2976 257 : assert(tr->active);
2977 257 : if (!isTable(col->t) || !col->t->s)
2978 : return LOG_OK;
2979 :
2980 252 : if (col && ATOMIC_PTR_GET(&col->data)) {
2981 252 : BAT *b = bind_col(tr, col, QUICK);
2982 :
2983 252 : if (b) { /* add props for ranges [min, max> */
2984 252 : MT_lock_set(&b->theaplock);
2985 252 : if (add_range) {
2986 179 : BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
2987 179 : if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
2988 103 : BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
2989 : else
2990 76 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2991 179 : if (!pt->with_nills || !col->null)
2992 117 : BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
2993 : } else {
2994 73 : BATrmprop_nolock(b, GDK_MIN_BOUND);
2995 73 : BATrmprop_nolock(b, GDK_MAX_BOUND);
2996 73 : BATrmprop_nolock(b, GDK_NOT_NULL);
2997 : }
2998 252 : MT_lock_unset(&b->theaplock);
2999 : }
3000 : }
3001 : return LOG_OK;
3002 : }
3003 :
3004 : static int
3005 4811 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
3006 : {
3007 4811 : assert(tr->active);
3008 4811 : if (!isTable(col->t) || !col->t->s)
3009 : return LOG_OK;
3010 :
3011 4781 : if (col && ATOMIC_PTR_GET(&col->data)) {
3012 4781 : BAT *b = bind_col(tr, col, QUICK);
3013 :
3014 4781 : if (b) { /* add props for ranges [min, max> */
3015 4781 : if (not_null) {
3016 4779 : BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
3017 : } else {
3018 2 : BATrmprop(b, GDK_NOT_NULL);
3019 : }
3020 : }
3021 : }
3022 : return LOG_OK;
3023 : }
3024 :
3025 : static int
3026 33411 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
3027 : {
3028 33411 : sqlstore *store = tr->store;
3029 33411 : int bid = log_find_bat(store->logger, id);
3030 33411 : if (bid <= 0)
3031 : return LOG_ERR;
3032 33411 : cs->bid = temp_dup(bid);
3033 33411 : cs->ucnt = 0;
3034 33411 : cs->uibid = e_bat(TYPE_oid);
3035 33411 : cs->uvbid = e_bat(type);
3036 33411 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
3037 : return LOG_ERR;
3038 : return LOG_OK;
3039 : }
3040 :
3041 : static int
3042 70514 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
3043 : {
3044 70514 : int res = LOG_OK;
3045 70514 : gdk_return ok;
3046 70514 : BAT *b = temp_descriptor(bat->cs.bid);
3047 :
3048 70514 : if (b == NULL)
3049 : return LOG_ERR;
3050 :
3051 70514 : if (!bat->cs.uibid)
3052 70506 : bat->cs.uibid = e_bat(TYPE_oid);
3053 70514 : if (!bat->cs.uvbid)
3054 70506 : bat->cs.uvbid = e_bat(b->ttype);
3055 70514 : if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
3056 0 : res = LOG_ERR;
3057 70514 : if (GDKinmemory(0)) {
3058 181 : bat_destroy(b);
3059 181 : return res;
3060 : }
3061 :
3062 70333 : bat_set_access(b, BAT_READ);
3063 70333 : sqlstore *store = tr->store;
3064 70333 : ok = log_bat_persists(store->logger, b, id);
3065 70333 : bat_destroy(b);
3066 70333 : if(res != LOG_OK)
3067 : return res;
3068 70333 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3069 : }
3070 :
3071 : static int
3072 0 : new_persistent_delta( sql_delta *bat)
3073 : {
3074 0 : bat->cs.ucnt = 0;
3075 0 : return LOG_OK;
3076 : }
3077 :
3078 : static void
3079 136653 : create_delta( sql_delta *d, BAT *b)
3080 : {
3081 136653 : bat_set_access(b, BAT_READ);
3082 136653 : d->cs.bid = temp_create(b);
3083 136653 : d->cs.uibid = d->cs.uvbid = 0;
3084 136653 : d->cs.ucnt = 0;
3085 136653 : }
3086 :
3087 : static bat
3088 7399 : copyBat (bat i, int type, oid seq)
3089 : {
3090 7399 : BAT *b, *tb;
3091 7399 : bat res;
3092 :
3093 7399 : if (!i)
3094 : return i;
3095 7399 : tb = quick_descriptor(i);
3096 7399 : if (tb == NULL)
3097 : return 0;
3098 7399 : b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
3099 7399 : if (b == NULL)
3100 : return 0;
3101 :
3102 7399 : bat_set_access(b, BAT_READ);
3103 :
3104 7399 : res = temp_create(b);
3105 7399 : bat_destroy(b);
3106 7399 : return res;
3107 : }
3108 :
3109 : static int
3110 162355 : create_col(sql_trans *tr, sql_column *c)
3111 : {
3112 162355 : int ok = LOG_OK, new = 0;
3113 162355 : int type = c->type.multiset?TYPE_int:c->type.type->localtype;
3114 162355 : sql_delta *bat = ATOMIC_PTR_GET(&c->data);
3115 :
3116 162355 : if (!bat) {
3117 162355 : new = 1;
3118 162355 : bat = ZNEW(sql_delta);
3119 162355 : if (!bat)
3120 : return LOG_ERR;
3121 162355 : ATOMIC_PTR_SET(&c->data, bat);
3122 162355 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3123 : }
3124 :
3125 162355 : if (new)
3126 162355 : bat->cs.ts = tr->tid;
3127 :
3128 162355 : if (!isNew(c)&& !isTempTable(c->t)){
3129 25680 : bat->cs.ts = tr->ts;
3130 25680 : ok = load_cs(tr, &bat->cs, type, c->base.id);
3131 25680 : if (ok == LOG_OK && c->storage_type) {
3132 4 : if (strcmp(c->storage_type, "DICT") == 0) {
3133 2 : sqlstore *store = tr->store;
3134 2 : int bid = log_find_bat(store->logger, -c->base.id);
3135 2 : if (bid <= 0)
3136 : return LOG_ERR;
3137 2 : bat->cs.ebid = temp_dup(bid);
3138 2 : bat->cs.st = ST_DICT;
3139 2 : } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
3140 2 : bat->cs.st = ST_FOR;
3141 : }
3142 : }
3143 25680 : return ok;
3144 136675 : } else if (bat && bat->cs.bid) {
3145 0 : return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
3146 : } else {
3147 136675 : sql_column *fc = NULL;
3148 136675 : size_t cnt = 0;
3149 :
3150 : /* alter ? */
3151 136675 : if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
3152 81974 : storage *s = tab_timestamp_storage(tr, fc->t);
3153 81974 : if (s == NULL)
3154 : return LOG_ERR;
3155 81974 : cnt = segs_end(s->segs, tr, c->t);
3156 : }
3157 136675 : if (cnt && fc != c) {
3158 22 : sql_delta *d = ATOMIC_PTR_GET(&fc->data);
3159 :
3160 22 : if (d->cs.bid) {
3161 22 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3162 22 : if(bat->cs.bid == BID_NIL)
3163 22 : ok = LOG_ERR;
3164 : }
3165 22 : if (d->cs.uibid) {
3166 10 : bat->cs.uibid = e_bat(TYPE_oid);
3167 10 : if (bat->cs.uibid == BID_NIL)
3168 22 : ok = LOG_ERR;
3169 : }
3170 22 : if (d->cs.uvbid) {
3171 10 : bat->cs.uvbid = e_bat(type);
3172 10 : if(bat->cs.uvbid == BID_NIL)
3173 0 : ok = LOG_ERR;
3174 : }
3175 : } else {
3176 136653 : BAT *b = bat_new(type, c->t->sz, PERSISTENT);
3177 136653 : if (!b) {
3178 : ok = LOG_ERR;
3179 : } else {
3180 136653 : create_delta(ATOMIC_PTR_GET(&c->data), b);
3181 136653 : bat_destroy(b);
3182 : }
3183 :
3184 136653 : if (!new) {
3185 0 : bat->cs.uibid = e_bat(TYPE_oid);
3186 0 : if (bat->cs.uibid == BID_NIL)
3187 0 : ok = LOG_ERR;
3188 0 : bat->cs.uvbid = e_bat(type);
3189 0 : if(bat->cs.uvbid == BID_NIL)
3190 0 : ok = LOG_ERR;
3191 : }
3192 : }
3193 136675 : bat->cs.ucnt = 0;
3194 :
3195 136675 : if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
3196 94 : trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
3197 : }
3198 : return ok;
3199 : }
3200 :
3201 : static int
3202 64040 : log_create_col_(sql_trans *tr, sql_column *c)
3203 : {
3204 64040 : assert(!isTempTable(c->t));
3205 64040 : return log_create_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3206 : }
3207 :
3208 : static int
3209 90 : log_create_col(sql_trans *tr, sql_change *change)
3210 : {
3211 90 : return log_create_col_(tr, (sql_column*)change->obj);
3212 : }
3213 :
3214 : static int
3215 124016 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
3216 : {
3217 124016 : (void) t; // TODO transaction_layer_revamp: remove if unnecessary
3218 124016 : (void)oldest;
3219 124016 : assert(delta->cs.ts == tr->tid);
3220 124016 : delta->cs.ts = commit_ts;
3221 :
3222 124016 : assert(delta->next == NULL);
3223 124016 : if (!delta->cs.merged)
3224 124015 : delta->nr_updates += merge_delta(delta);
3225 124016 : if (!tr->parent)
3226 124012 : base->new = 0;
3227 124016 : return LOG_OK;
3228 : }
3229 :
3230 : static int
3231 94 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3232 : {
3233 94 : sql_column *c = (sql_column*)change->obj;
3234 94 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3235 94 : if (!tr->parent)
3236 93 : c->base.new = 0;
3237 94 : return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
3238 : }
3239 :
3240 : /* will be called for new idx's and when new index columns are created */
3241 : static int
3242 9819 : create_idx(sql_trans *tr, sql_idx *ni)
3243 : {
3244 9819 : int ok = LOG_OK, new = 0;
3245 9819 : sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
3246 9819 : int type = TYPE_lng;
3247 :
3248 9819 : if (oid_index(ni->type))
3249 954 : type = TYPE_oid;
3250 :
3251 9819 : if (!bat) {
3252 9819 : new = 1;
3253 9819 : bat = ZNEW(sql_delta);
3254 9819 : if (!bat)
3255 : return LOG_ERR;
3256 9819 : ATOMIC_PTR_INIT(&ni->data, bat);
3257 9819 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3258 : }
3259 :
3260 9819 : if (new)
3261 9819 : bat->cs.ts = tr->tid;
3262 :
3263 9819 : if (!isNew(ni) && !isTempTable(ni->t)){
3264 2442 : bat->cs.ts = 1;
3265 2442 : return load_cs(tr, &bat->cs, type, ni->base.id);
3266 7377 : } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
3267 0 : return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
3268 : } else {
3269 7377 : sql_column *c = ol_first_node(ni->t->columns)->data;
3270 7377 : sql_delta *d = col_timestamp_delta(tr, c);
3271 :
3272 7377 : if (d) {
3273 : /* Here we also handle indices created through alter stmts */
3274 : /* These need to be created aligned to the existing data */
3275 7377 : if (d->cs.bid) {
3276 7377 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3277 7377 : if(bat->cs.bid == BID_NIL)
3278 7377 : ok = LOG_ERR;
3279 : }
3280 : } else {
3281 : return LOG_ERR;
3282 : }
3283 :
3284 7377 : bat->cs.ucnt = 0;
3285 :
3286 7377 : if (!new) {
3287 0 : bat->cs.uibid = e_bat(TYPE_oid);
3288 0 : if (bat->cs.uibid == BID_NIL)
3289 0 : ok = LOG_ERR;
3290 0 : bat->cs.uvbid = e_bat(type);
3291 0 : if(bat->cs.uvbid == BID_NIL)
3292 0 : ok = LOG_ERR;
3293 : }
3294 7377 : bat->cs.ucnt = 0;
3295 7377 : if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
3296 632 : trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
3297 : }
3298 : return ok;
3299 : }
3300 :
3301 : static int
3302 6474 : log_create_idx_(sql_trans *tr, sql_idx *i)
3303 : {
3304 6474 : assert(!isTempTable(i->t));
3305 6474 : return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3306 : }
3307 :
3308 : static int
3309 618 : log_create_idx(sql_trans *tr, sql_change *change)
3310 : {
3311 618 : return log_create_idx_(tr, (sql_idx*)change->obj);
3312 : }
3313 :
3314 : static int
3315 632 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3316 : {
3317 632 : sql_idx *i = (sql_idx*)change->obj;
3318 632 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3319 632 : if (!tr->parent)
3320 632 : i->base.new = 0;
3321 632 : return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
3322 : return LOG_OK;
3323 : }
3324 :
3325 : static int
3326 5289 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
3327 : {
3328 5289 : int ok = load_cs(tr, &s->cs, TYPE_msk, id);
3329 5289 : BAT *b = NULL, *ib = NULL;
3330 :
3331 5289 : if (ok != LOG_OK)
3332 : return ok;
3333 5289 : if (!(b = temp_descriptor(s->cs.bid)))
3334 : return LOG_ERR;
3335 5289 : ib = b;
3336 :
3337 5289 : if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
3338 0 : bat_destroy(ib);
3339 0 : return LOG_ERR;
3340 : }
3341 :
3342 5289 : if (BATcount(b)) {
3343 435 : if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
3344 0 : bat_destroy(ib);
3345 0 : return LOG_ERR;
3346 : }
3347 673 : if (BATtdense(b)) {
3348 238 : size_t start = b->tseqbase;
3349 238 : size_t cnt = BATcount(b);
3350 238 : ok = delete_range(tr, t, s, start, cnt);
3351 : } else {
3352 197 : assert(b->tsorted);
3353 197 : BUN icnt = BATcount(b);
3354 197 : BATiter bi = bat_iterator(b);
3355 197 : size_t lcnt = 1;
3356 197 : oid n;
3357 197 : segment *seg = s->segs->h;
3358 197 : if (complex_cand(b)) {
3359 0 : oid o = * (oid *) Tpos(&bi, 0);
3360 0 : n = o + 1;
3361 0 : for (BUN i = 1; i < icnt; i++) {
3362 0 : o = * (oid *) Tpos(&bi, i);
3363 0 : if (o == n) {
3364 0 : lcnt++;
3365 0 : n++;
3366 : } else {
3367 0 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3368 : break;
3369 : lcnt = 0;
3370 : }
3371 0 : if (!lcnt) {
3372 0 : n = o + 1;
3373 0 : lcnt = 1;
3374 : }
3375 : }
3376 : } else {
3377 197 : oid *o = bi.base;
3378 197 : n = o[0]+1;
3379 294714 : for (size_t i=1; i<icnt; i++) {
3380 294517 : if (o[i] == n) {
3381 291772 : lcnt++;
3382 291772 : n++;
3383 : } else {
3384 2745 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3385 : break;
3386 : lcnt = 0;
3387 : }
3388 294517 : if (!lcnt) {
3389 2745 : n = o[i]+1;
3390 2745 : lcnt = 1;
3391 : }
3392 : }
3393 : }
3394 197 : if (lcnt && ok == LOG_OK)
3395 197 : ok = delete_range(tr, t, s, n-lcnt, lcnt);
3396 197 : bat_iterator_end(&bi);
3397 : }
3398 435 : if (ok == LOG_OK)
3399 7042 : for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
3400 6607 : if (seg->ts == tr->tid)
3401 3436 : seg->ts = 1;
3402 : } else {
3403 4854 : if (ok == LOG_OK) {
3404 4854 : BAT *bb = quick_descriptor(s->cs.bid);
3405 :
3406 4854 : if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
3407 : ok = LOG_ERR;
3408 : } else {
3409 4854 : segment *seg = s->segs->h;
3410 4854 : if (seg->ts == tr->tid)
3411 4854 : seg->ts = 1;
3412 : }
3413 : }
3414 : }
3415 5289 : if (b != ib)
3416 5289 : bat_destroy(b);
3417 5289 : bat_destroy(ib);
3418 :
3419 5289 : return ok;
3420 : }
3421 :
3422 : static int
3423 26793 : create_del(sql_trans *tr, sql_table *t)
3424 : {
3425 26793 : int ok = LOG_OK, new = 0;
3426 26793 : BAT *b;
3427 26793 : storage *bat = ATOMIC_PTR_GET(&t->data);
3428 :
3429 26793 : if (!bat) {
3430 26793 : new = 1;
3431 26793 : bat = ZNEW(storage);
3432 26793 : if(!bat)
3433 : return LOG_ERR;
3434 26793 : ATOMIC_PTR_INIT(&t->data, bat);
3435 26793 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3436 26793 : bat->cs.ts = tr->tid;
3437 : }
3438 :
3439 26793 : if (!isNew(t) && !isTempTable(t)) {
3440 5289 : bat->cs.ts = tr->ts;
3441 5289 : return load_storage(tr, t, bat, t->base.id);
3442 21504 : } else if (bat->cs.bid) {
3443 : return ok;
3444 : } else {
3445 21504 : assert(!bat->segs);
3446 21504 : if (!(bat->segs = new_segments(tr, 0)))
3447 : return LOG_ERR;
3448 :
3449 21504 : b = bat_new(TYPE_msk, t->sz, PERSISTENT);
3450 21504 : if(b != NULL) {
3451 21504 : bat_set_access(b, BAT_READ);
3452 21504 : bat->cs.bid = temp_create(b);
3453 21504 : bat_destroy(b);
3454 : } else {
3455 : return LOG_ERR;
3456 : }
3457 21504 : if (new)
3458 28395 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
3459 : }
3460 : return ok;
3461 : }
3462 :
3463 : static int
3464 200993 : log_segment(sql_trans *tr, segment *s, sqlid id)
3465 : {
3466 200993 : sqlstore *store = tr->store;
3467 200993 : msk m = s->deleted;
3468 200993 : return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
3469 : }
3470 :
3471 : static int
3472 98686 : log_segments(sql_trans *tr, segments *segs, sqlid id)
3473 : {
3474 : /* log segments */
3475 98686 : lock_table(tr->store, id);
3476 465011 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3477 366325 : unlock_table(tr->store, id);
3478 366325 : if (seg->ts == tr->tid && seg->end-seg->start) {
3479 152439 : if (log_segment(tr, seg, id) != LOG_OK) {
3480 : return LOG_ERR;
3481 : }
3482 : }
3483 366325 : lock_table(tr->store, id);
3484 : }
3485 98686 : unlock_table(tr->store, id);
3486 98686 : return LOG_OK;
3487 : }
3488 :
3489 : static int
3490 12849 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
3491 : {
3492 12849 : BAT *b;
3493 12849 : int ok = LOG_OK;
3494 :
3495 12849 : if (GDKinmemory(0))
3496 : return LOG_OK;
3497 :
3498 12816 : b = temp_descriptor(bat->cs.bid);
3499 12816 : if (b == NULL)
3500 : return LOG_ERR;
3501 :
3502 12816 : sqlstore *store = tr->store;
3503 12816 : bat_set_access(b, BAT_READ);
3504 12816 : if (ok == LOG_OK)
3505 12816 : ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
3506 12816 : if (ok == LOG_OK)
3507 12816 : ok = log_segments(tr, bat->segs, t->base.id);
3508 12816 : bat_destroy(b);
3509 12816 : return ok;
3510 : }
3511 :
3512 : static int
3513 12863 : log_create_del(sql_trans *tr, sql_change *change)
3514 : {
3515 12863 : int ok = LOG_OK;
3516 12863 : sql_table *t = (sql_table*)change->obj;
3517 :
3518 12863 : if (t->base.deleted)
3519 : return ok;
3520 12849 : assert(!isTempTable(t));
3521 12849 : ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
3522 12849 : if (ok == LOG_OK) {
3523 76827 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3524 63978 : sql_column *c = n->data;
3525 :
3526 63978 : if (c->data)
3527 63950 : ok = log_create_col_(tr, c);
3528 : }
3529 12849 : if (t->idxs) {
3530 18717 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3531 5868 : sql_idx *i = n->data;
3532 :
3533 5868 : if (ATOMIC_PTR_GET(&i->data))
3534 5856 : ok = log_create_idx_(tr, i);
3535 : }
3536 : }
3537 : }
3538 : return ok;
3539 : }
3540 :
3541 : static int
3542 21506 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3543 : {
3544 21506 : int ok = LOG_OK;
3545 21506 : sql_table *t = (sql_table*)change->obj;
3546 21506 : storage *dbat = ATOMIC_PTR_GET(&t->data);
3547 :
3548 21506 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
3549 125 : assert(isTempTable(t));
3550 125 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
3551 125 : if (commit_ts) dbat->segs->h->ts = commit_ts;
3552 125 : return ok;
3553 : }
3554 :
3555 21381 : if (!commit_ts) /* rollback handled by ? */
3556 : return ok;
3557 19526 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
3558 19526 : assert(ok == LOG_OK);
3559 19526 : if (ok != LOG_OK)
3560 : return ok;
3561 19526 : merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
3562 19526 : assert(dbat->cs.ts == tr->tid);
3563 19526 : dbat->cs.ts = commit_ts;
3564 19526 : if (ok == LOG_OK) {
3565 136975 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3566 117449 : sql_column *c = n->data;
3567 117449 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3568 :
3569 117449 : if (delta)
3570 117421 : ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
3571 : }
3572 19526 : if (t->idxs) {
3573 25407 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3574 5881 : sql_idx *i = n->data;
3575 5881 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3576 :
3577 5881 : if (delta)
3578 5869 : ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
3579 : }
3580 : }
3581 19526 : if (!tr->parent)
3582 19524 : t->base.new = 0;
3583 : }
3584 19526 : if (!tr->parent)
3585 19524 : t->base.new = 0;
3586 : return ok;
3587 : }
3588 :
3589 : static int
3590 21510 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
3591 : {
3592 21510 : gdk_return ok = GDK_SUCCEED;
3593 :
3594 21510 : sqlstore *store = tr->store;
3595 21510 : if (!GDKinmemory(0) && b && b->cs.bid)
3596 19127 : ok = log_bat_transient(store->logger, id);
3597 21510 : if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
3598 25 : ok = log_bat_transient(store->logger, -id);
3599 21510 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3600 : }
3601 :
3602 : static int
3603 181740 : destroy_col(sqlstore *store, sql_column *c)
3604 : {
3605 181740 : (void)store;
3606 181740 : if (ATOMIC_PTR_GET(&c->data))
3607 181740 : destroy_delta(ATOMIC_PTR_GET(&c->data), true);
3608 181740 : ATOMIC_PTR_SET(&c->data, NULL);
3609 181740 : return LOG_OK;
3610 : }
3611 :
3612 : static int
3613 19611 : log_destroy_col_(sql_trans *tr, sql_column *c)
3614 : {
3615 19611 : int ok = LOG_OK;
3616 19611 : assert(!isTempTable(c->t));
3617 19611 : if (!tr->parent) /* don't write save point commits */
3618 19611 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3619 19611 : return ok;
3620 : }
3621 :
3622 : static int
3623 19611 : log_destroy_col(sql_trans *tr, sql_change *change)
3624 : {
3625 19611 : sql_column *c = (sql_column*)change->obj;
3626 19611 : int res = log_destroy_col_(tr, c);
3627 19611 : change->obj = NULL;
3628 19611 : column_destroy(tr->store, c);
3629 19611 : return res;
3630 : }
3631 :
3632 : static int
3633 11618 : destroy_idx(sqlstore *store, sql_idx *i)
3634 : {
3635 11618 : (void)store;
3636 11618 : if (ATOMIC_PTR_GET(&i->data))
3637 11618 : destroy_delta(ATOMIC_PTR_GET(&i->data), true);
3638 11618 : ATOMIC_PTR_SET(&i->data, NULL);
3639 11618 : return LOG_OK;
3640 : }
3641 :
3642 : static int
3643 1975 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
3644 : {
3645 1975 : int ok = LOG_OK;
3646 1975 : assert(!isTempTable(i->t));
3647 1975 : if (ATOMIC_PTR_GET(&i->data)) {
3648 1899 : if (!tr->parent) /* don't write save point commits */
3649 1899 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3650 : }
3651 1975 : return ok;
3652 : }
3653 :
3654 : static int
3655 1975 : log_destroy_idx(sql_trans *tr, sql_change *change)
3656 : {
3657 1975 : sql_idx *i = (sql_idx*)change->obj;
3658 1975 : int res = log_destroy_idx_(tr, i);
3659 1975 : change->obj = NULL;
3660 1975 : idx_destroy(tr->store, i);
3661 1975 : return res;
3662 : }
3663 :
3664 : static int
3665 28365 : destroy_del(sqlstore *store, sql_table *t)
3666 : {
3667 28365 : (void)store;
3668 28365 : if (ATOMIC_PTR_GET(&t->data))
3669 28361 : destroy_storage(ATOMIC_PTR_GET(&t->data));
3670 28365 : ATOMIC_PTR_SET(&t->data, NULL);
3671 28365 : return LOG_OK;
3672 : }
3673 :
3674 : static int
3675 3358 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
3676 : {
3677 3358 : gdk_return ok = GDK_SUCCEED;
3678 :
3679 3358 : sqlstore *store = tr->store;
3680 3358 : if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
3681 3358 : bat && bat->cs.bid)
3682 3358 : ok = log_bat_transient(store->logger, id);
3683 3358 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3684 : }
3685 :
3686 : static int
3687 3358 : log_destroy_del(sql_trans *tr, sql_change *change)
3688 : {
3689 3358 : int ok = LOG_OK;
3690 3358 : sql_table *t = (sql_table*)change->obj;
3691 :
3692 3358 : assert(!isTempTable(t));
3693 3358 : ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
3694 3358 : return ok;
3695 : }
3696 :
3697 : static int
3698 25037 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3699 : {
3700 25037 : (void)tr;
3701 25037 : (void)change;
3702 25037 : (void)commit_ts;
3703 25037 : (void)oldest;
3704 25037 : if (commit_ts)
3705 25014 : change->handled = true;
3706 25037 : return 0;
3707 : }
3708 :
3709 : static int
3710 3434 : drop_del(sql_trans *tr, sql_table *t)
3711 : {
3712 3434 : int ok = LOG_OK;
3713 :
3714 3434 : if (!isNew(t)) {
3715 3434 : storage *bat = ATOMIC_PTR_GET(&t->data);
3716 3504 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, isTempTable(t) ? NULL : &log_destroy_del);
3717 : }
3718 3434 : return ok;
3719 : }
3720 :
3721 : static int
3722 19626 : drop_col(sql_trans *tr, sql_column *c)
3723 : {
3724 19626 : assert(!isNew(c));
3725 19626 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
3726 19626 : trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, isTempTable(c->t) ? NULL : &log_destroy_col);
3727 19626 : return LOG_OK;
3728 : }
3729 :
3730 : static int
3731 1977 : drop_idx(sql_trans *tr, sql_idx *i)
3732 : {
3733 1977 : assert(!isNew(i));
3734 1977 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
3735 1977 : trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, isTempTable(i->t) ? NULL : &log_destroy_idx);
3736 1977 : return LOG_OK;
3737 : }
3738 :
3739 :
3740 : static BUN
3741 129589 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
3742 : {
3743 129589 : BAT *b;
3744 129589 : BUN sz = 0;
3745 :
3746 129589 : (void)tr;
3747 129589 : assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
3748 129589 : if (cs->bid && renew) {
3749 129583 : b = quick_descriptor(cs->bid);
3750 129599 : if (b) {
3751 129599 : sz += BATcount(b);
3752 129599 : if (cs->st == ST_DICT) {
3753 2 : bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
3754 2 : BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
3755 :
3756 2 : if (nebid == BID_NIL || !n) {
3757 0 : temp_destroy(nebid);
3758 0 : bat_destroy(n);
3759 0 : return BUN_NONE;
3760 : }
3761 2 : temp_destroy(cs->ebid);
3762 2 : cs->ebid = nebid;
3763 2 : if (!temp)
3764 2 : bat_set_access(n, BAT_READ);
3765 2 : temp_destroy(cs->bid);
3766 2 : cs->bid = temp_create(n); /* create empty copy */
3767 2 : bat_destroy(n);
3768 : } else {
3769 129597 : bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
3770 :
3771 129418 : if (nbid == BID_NIL)
3772 : return BUN_NONE;
3773 129418 : temp_destroy(cs->bid);
3774 129529 : cs->bid = nbid;
3775 : }
3776 : } else {
3777 : return BUN_NONE;
3778 : }
3779 : }
3780 129537 : if (cs->uibid) {
3781 129389 : temp_destroy(cs->uibid);
3782 129456 : cs->uibid = 0;
3783 : }
3784 129604 : if (cs->uvbid) {
3785 129455 : temp_destroy(cs->uvbid);
3786 129457 : cs->uvbid = 0;
3787 : }
3788 129606 : cs->cleared = true;
3789 129606 : cs->ucnt = 0;
3790 129606 : return sz;
3791 : }
3792 :
3793 : static BUN
3794 129408 : clear_col(sql_trans *tr, sql_column *c, bool renew)
3795 : {
3796 129408 : bool update_conflict = false;
3797 129408 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
3798 :
3799 129408 : if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
3800 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3801 129450 : assert(c->t->persistence != SQL_DECLARED_TABLE);
3802 129450 : if (odelta != delta)
3803 129455 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
3804 129437 : if (delta)
3805 129437 : return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
3806 : return 0;
3807 : }
3808 :
3809 : static BUN
3810 21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
3811 : {
3812 21 : bool update_conflict = false;
3813 21 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
3814 :
3815 21 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
3816 15 : return 0;
3817 6 : if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
3818 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3819 6 : assert(i->t->persistence != SQL_DECLARED_TABLE);
3820 6 : if (odelta != delta)
3821 6 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
3822 6 : if (delta)
3823 6 : return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
3824 : return 0;
3825 : }
3826 :
3827 : static int
3828 147 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
3829 : {
3830 147 : if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
3831 : return LOG_ERR;
3832 147 : if (s->segs)
3833 147 : destroy_segments(s->segs);
3834 147 : if (!(s->segs = new_segments(tr, 0)))
3835 : return LOG_ERR;
3836 : return LOG_OK;
3837 : }
3838 :
3839 :
3840 : /*
3841 : * Clear the table, in general this means replacing the storage,
3842 : * but in case of earlier deletes (or inserts by this transaction), we only mark
3843 : * all segments as deleted.
3844 : * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
3845 : */
3846 : static BUN
3847 41832 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
3848 : {
3849 41832 : int clear = !in_transaction, ok = LOG_OK;
3850 41832 : bool conflict = false;
3851 41832 : storage *bat;
3852 :
3853 41883 : if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
3854 15804 : return conflict?BUN_NONE-1:BUN_NONE;
3855 :
3856 26029 : if (!clear) {
3857 51 : lock_table(tr->store, t->base.id);
3858 51 : ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
3859 51 : unlock_table(tr->store, t->base.id);
3860 : }
3861 26029 : assert(t->persistence != SQL_DECLARED_TABLE);
3862 26029 : if (!in_transaction)
3863 25981 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
3864 26028 : if (ok == LOG_ERR)
3865 : return BUN_NONE;
3866 26028 : if (ok == LOG_CONFLICT)
3867 0 : return BUN_NONE - 1;
3868 : return LOG_OK;
3869 : }
3870 :
3871 : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
3872 : static BUN
3873 41807 : clear_table(sql_trans *tr, sql_table *t)
3874 : {
3875 41807 : node *n = ol_first_node(t->columns);
3876 41807 : storage *d = tab_timestamp_storage(tr, t);
3877 41827 : int in_transaction, clear;
3878 41827 : BUN sz, clear_ok;
3879 :
3880 41827 : sql_column *c = n->data;
3881 41827 : while (c && c->type.type->composite && !c->type.multiset) {
3882 0 : n = n->next;
3883 0 : c = n?n->data:NULL;
3884 : }
3885 41827 : if (!d || !c)
3886 : return BUN_NONE;
3887 41830 : lock_table(tr->store, t->base.id);
3888 41832 : in_transaction = segments_in_transaction(tr, t);
3889 41833 : unlock_table(tr->store, t->base.id);
3890 41834 : clear = !in_transaction;
3891 41834 : sz = count_col(tr, c, CNT_ACTIVE);
3892 41833 : if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
3893 : return clear_ok;
3894 :
3895 26028 : if (in_transaction)
3896 : return sz;
3897 :
3898 155387 : for (; n; n = n->next) {
3899 129409 : c = n->data;
3900 :
3901 129409 : if (c->type.type->composite && !c->type.multiset)
3902 0 : continue;
3903 :
3904 129409 : if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
3905 0 : return clear_ok;
3906 : }
3907 25978 : if (t->idxs) {
3908 25999 : for (n = ol_first_node(t->idxs); n; n = n->next) {
3909 21 : sql_idx *ci = n->data;
3910 :
3911 21 : if (isTable(ci->t) && idx_has_column(ci->type) &&
3912 21 : (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
3913 0 : return clear_ok;
3914 : }
3915 : }
3916 25978 : if (clear)
3917 25978 : d->segs->nr_reused = 0;
3918 25978 : return sz;
3919 : }
3920 :
3921 : static int
3922 158910 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
3923 : {
3924 158910 : sqlstore *store = tr->store;
3925 158910 : gdk_return ok = GDK_SUCCEED;
3926 :
3927 158910 : (void) t;
3928 158910 : (void) segs;
3929 158910 : if (GDKinmemory(0))
3930 : return LOG_OK;
3931 :
3932 158903 : if (cs->cleared) {
3933 155486 : assert(cs->ucnt == 0);
3934 155486 : BAT *ins = temp_descriptor(cs->bid);
3935 155486 : if (!ins)
3936 : return LOG_ERR;
3937 155486 : assert(!isEbat(ins));
3938 155486 : bat_set_access(ins, BAT_READ);
3939 155486 : ok = log_bat_persists(store->logger, ins, id);
3940 155486 : bat_destroy(ins);
3941 155486 : if (ok == GDK_SUCCEED && cs->ebid) {
3942 56 : BAT *ins = temp_descriptor(cs->ebid);
3943 56 : if (!ins)
3944 : return LOG_ERR;
3945 56 : assert(!isEbat(ins));
3946 56 : bat_set_access(ins, BAT_READ);
3947 56 : ok = log_bat_persists(store->logger, ins, -id);
3948 56 : bat_destroy(ins);
3949 : }
3950 155486 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3951 : }
3952 :
3953 3417 : assert(!isTempTable(t));
3954 :
3955 3417 : if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
3956 2824 : BAT *ui = temp_descriptor(cs->uibid);
3957 2824 : BAT *uv = temp_descriptor(cs->uvbid);
3958 : /* any updates */
3959 2824 : if (ui == NULL || uv == NULL) {
3960 : ok = GDK_FAIL;
3961 2824 : } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
3962 2824 : ok = log_delta(store->logger, ui, uv, id);
3963 2824 : bat_destroy(ui);
3964 2824 : bat_destroy(uv);
3965 : }
3966 2824 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3967 : }
3968 :
3969 : static inline int
3970 59898 : tr_log_table_start(sql_trans *tr, sql_table *t) {
3971 59898 : sqlstore *store = tr->store;
3972 59898 : return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3973 : }
3974 :
3975 : static inline int
3976 59898 : tr_log_table_end(sql_trans *tr, sql_table *t) {
3977 59898 : sqlstore *store = tr->store;
3978 59898 : return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3979 : }
3980 :
3981 : static int
3982 59898 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
3983 : {
3984 59898 : sqlstore *store = tr->store;
3985 59898 : gdk_return ok = GDK_SUCCEED;
3986 :
3987 59898 : size_t end = segs_end(segs, tr, t);
3988 :
3989 59898 : if (tr_log_table_start(tr, t) != LOG_OK)
3990 : return LOG_ERR;
3991 :
3992 59898 : size_t nr_appends = 0;
3993 :
3994 59898 : lock_table(tr->store, t->base.id);
3995 387560 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3996 327662 : unlock_table(tr->store, t->base.id);
3997 :
3998 327662 : if (seg->ts == tr->tid && seg->end-seg->start) {
3999 121679 : if (!seg->deleted) {
4000 48554 : if (log_segment(tr, seg, t->base.id) != LOG_OK)
4001 : return LOG_ERR;
4002 :
4003 48554 : nr_appends += (seg->end - seg->start);
4004 : }
4005 : }
4006 327662 : lock_table(tr->store, t->base.id);
4007 : }
4008 59898 : unlock_table(tr->store, t->base.id);
4009 :
4010 433454 : for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
4011 373556 : sql_column *c = n->data;
4012 373556 : column_storage *cs = ATOMIC_PTR_GET(&c->data);
4013 :
4014 373556 : if (c->type.type->composite && !c->type.multiset)
4015 47 : continue;
4016 373509 : if (cs->cleared) {
4017 3 : ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4018 3 : continue;
4019 : }
4020 :
4021 373506 : lock_table(tr->store, t->base.id);
4022 373506 : if (!cs->cleared) {
4023 2437491 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4024 2063985 : unlock_table(tr->store, t->base.id);
4025 2063985 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4026 : /* append col*/
4027 284869 : BAT *ins = temp_descriptor(cs->bid);
4028 284869 : if (ins == NULL)
4029 : return LOG_ERR;
4030 284869 : assert(BATcount(ins) >= cur->end);
4031 284869 : ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
4032 284869 : bat_destroy(ins);
4033 : }
4034 2063985 : lock_table(tr->store, t->base.id);
4035 : }
4036 : }
4037 373506 : unlock_table(tr->store, t->base.id);
4038 :
4039 373506 : if (ok == GDK_SUCCEED && cs->ebid) {
4040 19 : BAT *ins = temp_descriptor(cs->ebid);
4041 19 : if (ins == NULL)
4042 : return LOG_ERR;
4043 19 : if (BATcount(ins) > ins->batInserted)
4044 17 : ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
4045 19 : BATcommit(ins, BATcount(ins));
4046 19 : bat_destroy(ins);
4047 : }
4048 : }
4049 :
4050 59898 : if (t->idxs) {
4051 65086 : for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
4052 5188 : sql_idx *i = n->data;
4053 :
4054 5188 : if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
4055 4410 : continue;
4056 778 : column_storage *cs = ATOMIC_PTR_GET(&i->data);
4057 :
4058 778 : if (cs) {
4059 778 : if (cs->cleared) {
4060 0 : ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4061 0 : continue;
4062 : }
4063 :
4064 778 : lock_table(tr->store, t->base.id);
4065 2387 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4066 1609 : unlock_table(tr->store, t->base.id);
4067 1609 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4068 : /* append idx */
4069 730 : BAT *ins = temp_descriptor(cs->bid);
4070 730 : if (ins == NULL)
4071 : return LOG_ERR;
4072 730 : assert(BATcount(ins) >= cur->end);
4073 730 : ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
4074 730 : bat_destroy(ins);
4075 : }
4076 1609 : lock_table(tr->store, t->base.id);
4077 : }
4078 778 : unlock_table(tr->store, t->base.id);
4079 : }
4080 : }
4081 : }
4082 :
4083 59898 : if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
4084 0 : return LOG_ERR;
4085 :
4086 : return LOG_OK;
4087 : }
4088 :
4089 : static int
4090 85870 : log_storage(sql_trans *tr, sql_table *t, storage *s)
4091 : {
4092 85870 : int ok = LOG_OK;
4093 85870 : bool cleared = s->cs.cleared;
4094 85870 : if (ok == LOG_OK && cleared)
4095 25972 : ok = tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
4096 25972 : if (ok == LOG_OK)
4097 85870 : ok = log_segments(tr, s->segs, t->base.id);
4098 85870 : if (ok == LOG_OK && !cleared)
4099 59898 : ok = log_table_append(tr, t, s->segs);
4100 85870 : return ok;
4101 : }
4102 :
4103 : static void
4104 427173 : merge_cs( column_storage *cs, const char* caller)
4105 : {
4106 427173 : if (cs->bid && cs->ucnt) {
4107 2833 : BAT *cur = temp_descriptor(cs->bid);
4108 2833 : BAT *ui = temp_descriptor(cs->uibid);
4109 2833 : BAT *uv = temp_descriptor(cs->uvbid);
4110 :
4111 2833 : if (!cur || !ui || !uv) {
4112 0 : bat_destroy(ui);
4113 0 : bat_destroy(uv);
4114 0 : bat_destroy(cur);
4115 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4116 : return;
4117 : }
4118 2833 : assert(BATcount(ui) == BATcount(uv));
4119 :
4120 : /* any updates */
4121 2833 : assert(!isEbat(cur));
4122 2833 : if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
4123 0 : bat_destroy(ui);
4124 0 : bat_destroy(uv);
4125 0 : bat_destroy(cur);
4126 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4127 : return;
4128 : }
4129 : /* cleanup the old deltas */
4130 2833 : temp_destroy(cs->uibid);
4131 2833 : temp_destroy(cs->uvbid);
4132 2833 : cs->uibid = e_bat(TYPE_oid);
4133 2833 : cs->uvbid = e_bat(cur->ttype);
4134 2833 : assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
4135 2833 : cs->ucnt = 0;
4136 2833 : bat_destroy(ui);
4137 2833 : bat_destroy(uv);
4138 2833 : bat_destroy(cur);
4139 : }
4140 427173 : cs->cleared = false;
4141 427173 : cs->merged = true;
4142 427173 : return;
4143 : }
4144 :
4145 : static lng
4146 383959 : merge_delta( sql_delta *obat)
4147 : {
4148 383959 : lng res = 0;
4149 383959 : if (obat && obat->next && !obat->cs.merged)
4150 132945 : res += merge_delta(obat->next);
4151 383959 : res += obat->cs.ucnt;
4152 383959 : merge_cs(&obat->cs, __func__);
4153 383959 : return res;
4154 : }
4155 :
4156 : static void
4157 43214 : merge_storage(storage *tdb)
4158 : {
4159 43214 : merge_cs(&tdb->cs, __func__);
4160 :
4161 43214 : if (tdb->next) {
4162 476 : destroy_storage(tdb->next);
4163 476 : tdb->next = NULL;
4164 : }
4165 43214 : }
4166 :
4167 : static sql_delta *
4168 1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
4169 : {
4170 : /* commit ie copy back to the parent transaction */
4171 1 : if (delta && delta->cs.ts == commit_ts && delta->next) {
4172 1 : sql_delta *od = delta->next;
4173 1 : if (od->cs.ts == commit_ts) {
4174 0 : sql_delta t = *od, *n = od->next;
4175 0 : *od = *delta;
4176 0 : od->next = n;
4177 0 : *delta = t;
4178 0 : delta->next = NULL;
4179 0 : destroy_delta(delta, true);
4180 0 : return od;
4181 : }
4182 : }
4183 : return delta;
4184 : }
4185 :
4186 : static int
4187 132909 : log_update_col( sql_trans *tr, sql_change *change)
4188 : {
4189 132909 : sql_column *c = (sql_column*)change->obj;
4190 132909 : assert(!isTempTable(c->t));
4191 :
4192 132909 : if (isDeleted(c->t)) {
4193 0 : change->handled = true;
4194 0 : return LOG_OK;
4195 : }
4196 :
4197 132909 : if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
4198 132909 : storage *s = ATOMIC_PTR_GET(&c->t->data);
4199 132909 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
4200 132909 : return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
4201 : }
4202 : return LOG_OK;
4203 : }
4204 :
4205 : static int
4206 217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
4207 : {
4208 217 : sqlstore *store = Store;
4209 :
4210 217 : sql_delta *d = (sql_delta*)change->data;
4211 217 : if (d->cs.ts < oldest) {
4212 82 : destroy_delta(d, false);
4213 82 : if (change->commit == &commit_update_idx)
4214 2 : table_destroy(store, ((sql_idx*)change->obj)->t);
4215 : else
4216 80 : table_destroy(store, ((sql_column*)change->obj)->t);
4217 82 : return 1;
4218 : }
4219 135 : if (d->cs.ts > TRANSACTION_ID_BASE)
4220 82 : d->cs.ts = store_get_timestamp(store) + 1;
4221 : return 0;
4222 : }
4223 :
4224 : static int
4225 9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
4226 : {
4227 9 : sqlstore *store = Store;
4228 :
4229 9 : storage *d = (storage*)change->data;
4230 9 : if (d->cs.ts < oldest) {
4231 3 : destroy_storage(d);
4232 3 : table_destroy(store, (sql_table*)change->obj);
4233 3 : return 1;
4234 : }
4235 6 : if (d->cs.ts > TRANSACTION_ID_BASE)
4236 3 : d->cs.ts = store_get_timestamp(store) + 1;
4237 : return 0;
4238 : }
4239 :
4240 : static int
4241 133027 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
4242 : {
4243 133027 : (void) type; // TODO transaction_layer_revamp remove if remains unused
4244 :
4245 133027 : sql_delta *delta = ATOMIC_PTR_GET(data), *idelta = delta;
4246 :
4247 133027 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4248 3 : int ok = LOG_OK;
4249 3 : assert(isTempTable(t));
4250 3 : if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
4251 0 : ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
4252 3 : if (!tr->parent)
4253 0 : t->base.new = base->new = 0;
4254 3 : change->handled = true;
4255 3 : return ok;
4256 : }
4257 :
4258 133024 : if (commit_ts)
4259 132942 : delta->cs.ts = commit_ts;
4260 133024 : if (!commit_ts) { /* rollback */
4261 82 : sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
4262 :
4263 82 : if (change->ts && t->base.new) /* handled by create col */
4264 : return LOG_OK;
4265 82 : if (o != d) {
4266 0 : while(o && o->next != d)
4267 : o = o->next;
4268 : }
4269 82 : if (o == ATOMIC_PTR_GET(data))
4270 82 : ATOMIC_PTR_SET(data, d->next);
4271 : else
4272 0 : o->next = d->next;
4273 82 : d->next = NULL;
4274 82 : change->cleanup = &tc_gc_rollbacked;
4275 132942 : } else if (!tr->parent) {
4276 : /* merge deltas */
4277 275020 : while (delta && delta->cs.ts > oldest)
4278 142079 : delta = delta->next;
4279 132941 : if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
4280 23514 : lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
4281 23514 : idelta->nr_updates += merge_delta(delta);
4282 23514 : unlock_column(tr->store, base->id);
4283 : }
4284 1 : } else if (tr->parent) /* move delta into older and cleanup current save points */
4285 1 : ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
4286 : return LOG_OK;
4287 : }
4288 :
4289 : static int
4290 132999 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4291 : {
4292 :
4293 132999 : sql_column *c = (sql_column*)change->obj;
4294 132999 : sql_base* base = &c->base;
4295 132999 : sql_table* t = c->t;
4296 132999 : ATOMIC_PTR_TYPE* data = &c->data;
4297 132999 : int type = c->type.multiset?TYPE_int:c->type.type->localtype;
4298 :
4299 132999 : if (change->handled || isDeleted(c->t))
4300 : return LOG_OK;
4301 :
4302 132999 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4303 : }
4304 :
4305 : static int
4306 26 : log_update_idx( sql_trans *tr, sql_change *change)
4307 : {
4308 26 : sql_idx *i = (sql_idx*)change->obj;
4309 26 : assert(!isTempTable(i->t));
4310 :
4311 26 : if (isDeleted(i->t)) {
4312 0 : change->handled = true;
4313 0 : return LOG_OK;
4314 : }
4315 :
4316 26 : if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
4317 26 : storage *s = ATOMIC_PTR_GET(&i->t->data);
4318 26 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
4319 26 : return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
4320 : }
4321 : return LOG_OK;
4322 : }
4323 :
4324 : static int
4325 28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4326 : {
4327 28 : sql_idx *i = (sql_idx*)change->obj;
4328 28 : sql_base* base = &i->base;
4329 28 : sql_table* t = i->t;
4330 28 : ATOMIC_PTR_TYPE* data = &i->data;
4331 28 : int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
4332 :
4333 28 : if (change->handled || isDeleted(i->t))
4334 : return LOG_OK;
4335 :
4336 28 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4337 : }
4338 :
4339 : static storage *
4340 26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
4341 : {
4342 26 : if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
4343 0 : storage *od = dbat->next;
4344 0 : if (od->cs.ts == commit_ts) {
4345 0 : storage t = *od, *n = od->next;
4346 0 : *od = *dbat;
4347 0 : od->next = n;
4348 0 : *dbat = t;
4349 0 : dbat->next = NULL;
4350 0 : destroy_storage(dbat);
4351 0 : return od;
4352 : }
4353 : }
4354 : return dbat;
4355 : }
4356 :
4357 : static int
4358 85870 : log_update_del( sql_trans *tr, sql_change *change)
4359 : {
4360 85870 : sql_table *t = (sql_table*)change->obj;
4361 85870 : assert(!isTempTable(t));
4362 :
4363 85870 : if (isDeleted(t)) {
4364 0 : change->handled = true;
4365 0 : return LOG_OK;
4366 : }
4367 :
4368 85870 : if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
4369 85870 : return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
4370 : return LOG_OK;
4371 : }
4372 :
4373 : static int
4374 90505 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4375 : {
4376 90505 : int ok = LOG_OK;
4377 90505 : sql_table *t = (sql_table*)change->obj;
4378 90505 : storage *dbat = ATOMIC_PTR_GET(&t->data);
4379 :
4380 90505 : if (change->handled || isDeleted(t))
4381 : return ok;
4382 :
4383 90505 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4384 22 : assert(isTempTable(t));
4385 22 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
4386 22 : if (commit_ts) dbat->segs->h->ts = commit_ts;
4387 22 : change->handled = true;
4388 22 : return ok;
4389 : }
4390 :
4391 90483 : lock_table(tr->store, t->base.id);
4392 90483 : if (!commit_ts) { /* rollback */
4393 4308 : if (dbat->cs.ts == tr->tid) {
4394 6 : if (change->ts && t->base.new) { /* handled by the create table */
4395 3 : unlock_table(tr->store, t->base.id);
4396 3 : return ok;
4397 : }
4398 3 : storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
4399 :
4400 3 : if (o != d) {
4401 0 : while(o && o->next != d)
4402 : o = o->next;
4403 : }
4404 3 : if (o == ATOMIC_PTR_GET(&t->data)) {
4405 3 : assert(d->next);
4406 3 : ATOMIC_PTR_SET(&t->data, d->next);
4407 : } else
4408 0 : o->next = d->next;
4409 3 : d->next = NULL;
4410 3 : change->cleanup = &tc_gc_rollbacked_storage;
4411 : } else
4412 4302 : rollback_segments(dbat->segs, tr, change, oldest);
4413 86175 : } else if (ok == LOG_OK && !tr->parent) {
4414 86149 : if (dbat->cs.ts == tr->tid) /* cleared table */
4415 25974 : dbat->cs.ts = commit_ts;
4416 :
4417 86149 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
4418 86149 : if (ok == LOG_OK) {
4419 86149 : merge_segments(dbat, tr, change, commit_ts, oldest);
4420 86149 : if (oldest == commit_ts)
4421 43214 : merge_storage(dbat);
4422 : }
4423 86149 : if (dbat)
4424 86149 : dbat->cs.cleared = false;
4425 26 : } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
4426 26 : merge_segments(dbat, tr, change, commit_ts, oldest);
4427 26 : ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
4428 26 : storage *s = change->data;
4429 26 : if (s->cs.ts == tr->tid)
4430 0 : s->cs.ts = commit_ts;
4431 : }
4432 90480 : unlock_table(tr->store, t->base.id);
4433 90480 : return ok;
4434 : }
4435 :
4436 : /* only rollback (content version) case for now */
4437 : static int
4438 19786 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
4439 : {
4440 19786 : sqlstore *store = Store;
4441 19786 : sql_column *c = (sql_column*)change->obj;
4442 :
4443 19786 : if (!c) /* cleaned earlier */
4444 : return 1;
4445 :
4446 175 : if (change->handled || isDeleted(c->t)) {
4447 0 : column_destroy(store, c);
4448 0 : return 1;
4449 : }
4450 :
4451 : /* savepoint commit (did it merge ?) */
4452 175 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4453 0 : column_destroy(store, c);
4454 0 : return 1;
4455 : }
4456 175 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4457 : return 0;
4458 174 : sql_delta *d = (sql_delta*)change->data, *id = d;
4459 174 : if (d && d->next) {
4460 :
4461 66 : if (d->cs.ts > oldest)
4462 : return LOG_OK; /* cannot cleanup yet */
4463 :
4464 : // d is oldest reachable delta
4465 63 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4466 62 : destroy_delta(d->next, true);
4467 62 : d->next = NULL;
4468 : }
4469 63 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4470 63 : id->nr_updates += merge_delta(d);
4471 63 : unlock_column(store, c->base.id);
4472 : }
4473 171 : column_destroy(store, c);
4474 171 : return 1;
4475 : }
4476 :
4477 : static int
4478 732828 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
4479 : {
4480 732828 : sqlstore *store = Store;
4481 732828 : sql_column *c = (sql_column*)change->obj;
4482 :
4483 732828 : if (!c) /* cleaned earlier */
4484 : return 1;
4485 :
4486 732828 : if (change->handled || isDeleted(c->t)) {
4487 3 : table_destroy(store, c->t);
4488 3 : return 1;
4489 : }
4490 :
4491 : /* savepoint commit (did it merge ?) */
4492 732825 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4493 29456 : table_destroy(store, c->t);
4494 29456 : return 1;
4495 : }
4496 703369 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4497 : return 0;
4498 703368 : sql_delta *d = (sql_delta*)change->data, *id = d;
4499 703368 : if (d && d->next) {
4500 :
4501 703368 : if (d->cs.ts > oldest)
4502 : return LOG_OK; /* cannot cleanup yet */
4503 :
4504 : // d is oldest reachable delta
4505 103396 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4506 5355 : destroy_delta(d->next, true);
4507 5355 : d->next = NULL;
4508 : }
4509 103396 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4510 103396 : id->nr_updates += merge_delta(d);
4511 103396 : unlock_column(store, c->base.id);
4512 : }
4513 103396 : table_destroy(store, c->t);
4514 103396 : return 1;
4515 : }
4516 :
4517 : static int
4518 2609 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
4519 : {
4520 2609 : sqlstore *store = Store;
4521 2609 : sql_idx *i = (sql_idx*)change->obj;
4522 :
4523 2609 : if (!i) /* cleaned earlier */
4524 : return 1;
4525 :
4526 634 : if (change->handled || isDeleted(i->t)) {
4527 0 : idx_destroy(store, i);
4528 0 : return 1;
4529 : }
4530 :
4531 : /* savepoint commit (did it merge ?) */
4532 634 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4533 0 : idx_destroy(store, i);
4534 0 : return 1;
4535 : }
4536 634 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4537 : return 0;
4538 634 : sql_delta *d = (sql_delta*)change->data, *id = d;
4539 634 : if (d && d->next) {
4540 0 : if (d->cs.ts > oldest)
4541 : return LOG_OK; /* cannot cleanup yet */
4542 :
4543 : // d is oldest reachable delta
4544 0 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4545 0 : destroy_delta(d->next, true);
4546 0 : d->next = NULL;
4547 : }
4548 0 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4549 0 : id->nr_updates += merge_delta(d);
4550 0 : unlock_column(store, i->base.id);
4551 : }
4552 634 : idx_destroy(store, i);
4553 634 : return 1;
4554 : }
4555 :
4556 : static int
4557 26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
4558 : {
4559 26 : sqlstore *store = Store;
4560 26 : sql_idx *i = (sql_idx*)change->obj;
4561 :
4562 26 : if (!i) /* cleaned earlier */
4563 : return 1;
4564 :
4565 26 : if (change->handled || isDeleted(i->t)) {
4566 0 : table_destroy(store, i->t);
4567 0 : return 1;
4568 : }
4569 :
4570 : /* savepoint commit (did it merge ?) */
4571 26 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4572 0 : table_destroy(store, i->t);
4573 0 : return 1;
4574 : }
4575 26 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4576 : return 0;
4577 26 : sql_delta *d = (sql_delta*)change->data, *id = d;
4578 26 : if (d && d->next) {
4579 26 : if (d->cs.ts > oldest)
4580 : return LOG_OK; /* cannot cleanup yet */
4581 :
4582 : // d is oldest reachable delta
4583 26 : if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
4584 26 : destroy_delta(d->next, true);
4585 26 : d->next = NULL;
4586 : }
4587 26 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4588 26 : id->nr_updates += merge_delta(d);
4589 26 : unlock_column(store, i->base.id);
4590 : }
4591 26 : table_destroy(store, i->t);
4592 26 : return 1;
4593 : }
4594 :
4595 : static int
4596 221176 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
4597 : {
4598 221176 : sqlstore *store = Store;
4599 221176 : sql_table *t = (sql_table*)change->obj;
4600 :
4601 221176 : if (change->handled || isDeleted(t)) {
4602 3608 : table_destroy(store, t);
4603 3608 : return 1;
4604 : }
4605 : /* savepoint commit (did it merge ?) */
4606 217568 : if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
4607 5946 : table_destroy(store, t);
4608 5946 : return 1;
4609 : }
4610 211622 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4611 : return 0;
4612 211594 : storage *d = (storage*)change->data;
4613 211594 : if (d->next) {
4614 138907 : if (d->cs.ts > oldest)
4615 : return LOG_OK; /* cannot cleanup yet */
4616 :
4617 19553 : destroy_storage(d->next);
4618 19553 : d->next = NULL;
4619 : }
4620 92240 : table_destroy(store, t);
4621 92240 : return 1;
4622 : }
4623 :
4624 : static int
4625 30622 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
4626 : {
4627 30622 : if (nr == 0)
4628 : return LOG_OK;
4629 30622 : assert (nr > 0);
4630 30622 : if ((!offsets || !*offsets) && nr == total) {
4631 30626 : *offset = slot;
4632 30626 : return LOG_OK;
4633 : }
4634 0 : if (!*offsets) {
4635 7 : *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
4636 7 : if (!*offsets)
4637 : return LOG_ERR;
4638 : }
4639 0 : oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
4640 16135 : for(size_t i = 0; i < nr; i++)
4641 16139 : dst[i] = slot + i;
4642 0 : (*offsets)->batCount += nr;
4643 0 : (*offsets)->theap->dirty = true;
4644 0 : return LOG_OK;
4645 : }
4646 :
4647 : static int
4648 30432 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4649 : {
4650 30432 : assert(s->segs);
4651 30432 : ulng oldest = store_oldest(tr->store, NULL);
4652 30456 : BUN slot = 0;
4653 30456 : size_t total = cnt;
4654 :
4655 30456 : if (!locked)
4656 30563 : lock_table(tr->store, t->base.id);
4657 30595 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4658 : /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
4659 61598 : for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4660 30874 : if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* reuse old deleted or rolled back append */
4661 35 : if ((seg->end - seg->start) >= cnt) {
4662 : /* if previous is claimed before we could simply adjust the end/start */
4663 13 : if (p && p->ts == tr->tid && !p->deleted) {
4664 2 : slot = p->end;
4665 2 : p->end += cnt;
4666 2 : seg->start += cnt;
4667 2 : if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
4668 : ok = LOG_ERR;
4669 : break;
4670 : }
4671 2 : s->segs->nr_reused += cnt;
4672 2 : cnt = 0;
4673 2 : break;
4674 : }
4675 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4676 11 : size_t rcnt = seg->end - seg->start;
4677 11 : if (rcnt > cnt)
4678 : rcnt = cnt;
4679 11 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
4680 : ok = LOG_ERR;
4681 : break;
4682 : }
4683 : }
4684 33 : seg->ts = tr->tid;
4685 33 : seg->deleted = false;
4686 33 : slot = seg->start;
4687 33 : if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
4688 : ok = LOG_ERR;
4689 : break;
4690 : }
4691 0 : s->segs->nr_reused += (seg->end - seg->start);
4692 0 : cnt -= (seg->end - seg->start);
4693 : }
4694 : }
4695 30726 : if (ok == LOG_OK && cnt) {
4696 30713 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4697 29497 : slot = s->segs->t->end;
4698 29497 : s->segs->t->end += cnt;
4699 : } else {
4700 1216 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4701 : ok = LOG_ERR;
4702 : } else {
4703 1236 : if (!s->segs->h)
4704 0 : s->segs->h = s->segs->t;
4705 1236 : slot = s->segs->t->start;
4706 : }
4707 : }
4708 30733 : if (ok == LOG_OK)
4709 30733 : ok = add_offsets(slot, cnt, total, offset, offsets);
4710 : }
4711 30643 : if (!locked)
4712 30562 : unlock_table(tr->store, t->base.id);
4713 :
4714 30846 : if (ok == LOG_OK) {
4715 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4716 30770 : if (!in_transaction) {
4717 1221 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4718 1221 : in_transaction = true;
4719 : }
4720 30770 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4721 30744 : tr->logchanges += (lng) total;
4722 30764 : if (*offsets) {
4723 19 : BAT *pos = *offsets;
4724 19 : assert(BATcount(pos) == total);
4725 19 : BATsetcount(pos, total); /* set other properties */
4726 7 : pos->tnil = false;
4727 7 : pos->tnonil = true;
4728 7 : pos->tkey = true;
4729 7 : pos->tsorted = true;
4730 7 : pos->trevsorted = false;
4731 : }
4732 : }
4733 30828 : return ok;
4734 : }
4735 :
4736 : static int
4737 2109870 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4738 : {
4739 2109870 : if (cnt > 1 && offsets)
4740 30433 : return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
4741 2079437 : assert(s->segs);
4742 2079437 : ulng oldest = store_oldest(tr->store, NULL);
4743 2079437 : BUN slot = 0;
4744 2079437 : int reused = 0;
4745 :
4746 2079437 : if (!locked)
4747 1950769 : lock_table(tr->store, t->base.id);
4748 2079488 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4749 : /* naive vacuum approach, iterator through segments, check for large enough deleted segments
4750 : * or create new segment at the end */
4751 9999547 : for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4752 7993520 : if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* reuse old deleted or rolled back append */
4753 :
4754 73461 : if ((seg->end - seg->start) >= cnt) {
4755 :
4756 : /* if previous is claimed before we could simply adjust the end/start */
4757 73461 : if (p && p->ts == tr->tid && !p->deleted) {
4758 56831 : slot = p->end;
4759 56831 : p->end += cnt;
4760 56831 : seg->start += cnt;
4761 56831 : s->segs->nr_reused += cnt;
4762 56831 : reused = 1;
4763 56831 : break;
4764 : }
4765 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4766 16630 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
4767 : ok = LOG_ERR;
4768 : break;
4769 : }
4770 : }
4771 16630 : seg->ts = tr->tid;
4772 16630 : seg->deleted = false;
4773 16630 : slot = seg->start;
4774 16630 : s->segs->nr_reused += cnt;
4775 16630 : reused = 1;
4776 16630 : break;
4777 : }
4778 : }
4779 2079488 : if (ok == LOG_OK && !reused) {
4780 2006027 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4781 1970469 : slot = s->segs->t->end;
4782 1970469 : s->segs->t->end += cnt;
4783 : } else {
4784 35558 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4785 : ok = LOG_ERR;
4786 : } else {
4787 35558 : if (!s->segs->h)
4788 0 : s->segs->h = s->segs->t;
4789 35558 : slot = s->segs->t->start;
4790 : }
4791 : }
4792 : }
4793 2079488 : if (!locked)
4794 1950820 : unlock_table(tr->store, t->base.id);
4795 :
4796 2079488 : if (ok == LOG_OK) {
4797 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4798 2079488 : if (!in_transaction) {
4799 49801 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4800 49801 : in_transaction = true;
4801 : }
4802 2079488 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4803 2079035 : tr->logchanges += (lng) cnt;
4804 2079488 : *offset = slot;
4805 : }
4806 : return ok;
4807 : }
4808 :
4809 : /*
4810 : * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
4811 : * of the global transaction and mark the newly added storage slots unused on the global
4812 : * level but used on the local transaction level. Besides this the local transaction needs
4813 : * to update (and mark unused) any slot in between the old end and new slots.
4814 : * */
4815 : static int
4816 1981241 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4817 : {
4818 1981241 : storage *s;
4819 :
4820 : /* we have a single segment structure for each persistent table
4821 : * for temporary tables each has its own */
4822 1981241 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4823 : return LOG_ERR;
4824 :
4825 1981374 : return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
4826 : }
4827 :
4828 : /* some tables cannot be updated concurrently (user/roles etc) */
4829 : static int
4830 128672 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4831 : {
4832 128672 : storage *s;
4833 128672 : int res = 0;
4834 :
4835 : /* we have a single segment structure for each persistent table
4836 : * for temporary tables each has its own */
4837 128672 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4838 : /* TODO check for other inserts ! */
4839 : return LOG_ERR;
4840 :
4841 128672 : lock_table(tr->store, t->base.id);
4842 128672 : if ((res = segments_conflict(tr, s->segs, 1))) {
4843 4 : unlock_table(tr->store, t->base.id);
4844 4 : return LOG_CONFLICT;
4845 : }
4846 128668 : res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
4847 128668 : unlock_table(tr->store, t->base.id);
4848 128668 : return res;
4849 : }
4850 :
4851 : static int
4852 20504 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
4853 : {
4854 20504 : storage *s;
4855 20504 : int res = 0;
4856 :
4857 20504 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4858 : return LOG_ERR;
4859 :
4860 20504 : lock_table(tr->store, t->base.id);
4861 20504 : res = segments_conflict(tr, s->segs, uncommitted);
4862 20504 : unlock_table(tr->store, t->base.id);
4863 20504 : return res ? LOG_CONFLICT : LOG_OK;
4864 : }
4865 :
4866 : static size_t
4867 1617362 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
4868 : {
4869 1617362 : size_t cnt = 0;
4870 :
4871 1970125 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
4872 : ;
4873 :
4874 3999828 : for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
4875 2382458 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
4876 157054 : cnt += s->end - s->start;
4877 : }
4878 1617370 : return cnt;
4879 : }
4880 :
4881 : static BAT *
4882 1617334 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
4883 : {
4884 1617334 : lock_table(tr->store, t->base.id);
4885 1617381 : segment *s = S->segs->h;
4886 : /* step one no deletes -> dense range */
4887 1617381 : uint32_t cur = 0;
4888 1617381 : size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
4889 1617374 : if (!dnr) {
4890 1487248 : unlock_table(tr->store, t->base.id);
4891 1487263 : return BATdense(start, start, end-start);
4892 : }
4893 :
4894 130126 : BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
4895 130126 : if (!b) {
4896 0 : unlock_table(tr->store, t->base.id);
4897 0 : return NULL;
4898 : }
4899 :
4900 130126 : uint32_t *restrict dst = Tloc(b, 0);
4901 62294795 : for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
4902 62230449 : if (s->end < start)
4903 262672 : continue;
4904 61967777 : if (s->start >= end)
4905 : break;
4906 61901997 : msk m = (SEG_IS_VALID(s, tr));
4907 61901997 : size_t lnr = s->end-s->start;
4908 61901997 : if (s->start < start)
4909 19901 : lnr -= (start - s->start);
4910 61901997 : if (s->end > end)
4911 15073 : lnr -= s->end - end;
4912 :
4913 61901997 : if (m) {
4914 1390057 : size_t used = pos&31, end = 32;
4915 1390057 : if (used) {
4916 1227907 : if (lnr < (32-used))
4917 750357 : end = used + lnr;
4918 1227907 : assert(end > used);
4919 1227907 : cur |= ((1U << (end - used)) - 1) << used;
4920 1227907 : lnr -= end - used;
4921 1227907 : pos += end - used;
4922 1227907 : if (end == 32) {
4923 477577 : *dst++ = cur;
4924 477577 : cur = 0;
4925 : }
4926 : }
4927 1390057 : size_t full = lnr/32;
4928 1390057 : size_t rest = lnr%32;
4929 1390057 : if (full > 0) {
4930 354552 : memset(dst, ~0, full * sizeof(*dst));
4931 354552 : dst += full;
4932 354552 : lnr -= full * 32;
4933 354552 : pos += full * 32;
4934 : }
4935 1390057 : if (rest > 0) {
4936 608183 : cur |= (1U << rest) - 1;
4937 608183 : lnr -= rest;
4938 608183 : pos += rest;
4939 : }
4940 1390057 : assert(lnr==0);
4941 : } else {
4942 60511940 : size_t used = pos&31, end = 32;
4943 60511940 : if (used) {
4944 58630182 : if (lnr < (32-used))
4945 56639551 : end = used + lnr;
4946 :
4947 58630182 : pos+= (end-used);
4948 58630182 : lnr-= (end-used);
4949 58630182 : if (end == 32) {
4950 1990658 : *dst++ = cur;
4951 1990658 : cur = 0;
4952 : }
4953 : }
4954 60511940 : size_t full = lnr/32;
4955 60511940 : size_t rest = lnr%32;
4956 60511940 : memset(dst, 0, full * sizeof(*dst));
4957 60511940 : dst += full;
4958 60511940 : lnr -= full * 32;
4959 60511940 : pos += full * 32;
4960 60511940 : pos+= rest;
4961 60511940 : lnr-= rest;
4962 60511940 : assert(lnr==0);
4963 : }
4964 : }
4965 :
4966 130126 : unlock_table(tr->store, t->base.id);
4967 130126 : if (pos%32)
4968 127014 : *dst=cur;
4969 130126 : BATsetcount(b, nr);
4970 130126 : bn = BATmaskedcands(start, nr, b, true);
4971 130120 : BBPreclaim(b);
4972 130120 : (void)pos;
4973 130120 : assert (pos == nr);
4974 : return bn;
4975 : }
4976 :
4977 : static void * /* BAT * */
4978 1623294 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
4979 : {
4980 : /* with nr_of_parts - part_nr we can adjust parts */
4981 1623294 : storage *s = tab_timestamp_storage(tr, t);
4982 :
4983 1623534 : if (!s)
4984 : return NULL;
4985 1623534 : size_t nr = segs_end(s->segs, tr, t);
4986 :
4987 1624366 : if (!nr)
4988 7023 : return BATdense(0, 0, 0);
4989 :
4990 : /* compute proper part */
4991 1617343 : size_t part_size = nr/nr_of_parts;
4992 1617343 : size_t start = part_size * part_nr;
4993 1617343 : size_t end = start + part_size;
4994 1617343 : if (part_nr == (nr_of_parts-1))
4995 1327966 : end = nr;
4996 1617343 : assert(end <= nr);
4997 1617343 : return segments2cands(s, tr, t, start, end);
4998 : }
4999 :
5000 : static int
5001 5 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
5002 : {
5003 5 : bool update_conflict = false;
5004 :
5005 5 : if (segments_in_transaction(tr, col->t))
5006 : return LOG_CONFLICT;
5007 :
5008 5 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5009 :
5010 5 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5011 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5012 5 : assert(d && d->cs.ts == tr->tid);
5013 5 : if (odelta != d)
5014 5 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
5015 5 : if (d->cs.bid)
5016 5 : temp_destroy(d->cs.bid);
5017 5 : if (d->cs.uibid)
5018 5 : temp_destroy(d->cs.uibid);
5019 5 : if (d->cs.uvbid)
5020 5 : temp_destroy(d->cs.uvbid);
5021 5 : bat_set_access(bn, BAT_READ);
5022 5 : d->cs.bid = temp_create(bn);
5023 5 : d->cs.uibid = 0;
5024 5 : d->cs.uvbid = 0;
5025 5 : d->cs.ucnt = 0;
5026 5 : d->cs.cleared = true;
5027 5 : d->cs.ts = tr->tid;
5028 5 : ATOMIC_INIT(&d->cs.refcnt, 1);
5029 5 : return LOG_OK;
5030 : }
5031 :
5032 : static int
5033 5 : vacuum_col(sql_trans *tr, sql_column *c, bool force)
5034 : {
5035 5 : if (segments_in_transaction(tr, c->t))
5036 : return LOG_CONFLICT;
5037 :
5038 5 : sql_delta *d = NULL;
5039 :
5040 : /* do we have enough to clean */
5041 5 : if ((d = bind_col_data(tr, c, NULL)) == NULL)
5042 : return LOG_CONFLICT;
5043 :
5044 : /* do we have enough to clean */
5045 5 : if (!force && (d->nr_updates) < 1024)
5046 : return LOG_OK;
5047 :
5048 5 : BAT *b = NULL, *bn = NULL;;
5049 5 : if ((b = bind_col(tr, c, 0)) == NULL)
5050 : return LOG_ERR;
5051 5 : if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
5052 0 : BBPreclaim(b);
5053 0 : return LOG_ERR;
5054 : }
5055 5 : int res = swap_bats(tr, c, bn);
5056 5 : d->nr_updates = 0;
5057 5 : BBPreclaim(b);
5058 5 : BBPreclaim(bn);
5059 5 : return res;
5060 : }
5061 :
5062 : static int
5063 0 : vacuum_tab(sql_trans *tr, sql_table *t, bool force)
5064 : {
5065 0 : if (segments_in_transaction(tr, t))
5066 : return LOG_CONFLICT;
5067 :
5068 0 : storage *s;
5069 0 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
5070 : return LOG_ERR;
5071 :
5072 0 : for( node *n = ol_first_node(t->columns); n; n = n->next) {
5073 0 : sql_column *c = n->data;
5074 0 : int type = c->type.multiset?TYPE_int:c->type.type->localtype;
5075 :
5076 0 : if (!ATOMvarsized(type))
5077 0 : continue;
5078 0 : sql_delta *d = NULL;
5079 :
5080 : /* do we have enough to clean */
5081 0 : if ((d = bind_col_data(tr, c, NULL)) == NULL)
5082 : return LOG_CONFLICT;
5083 :
5084 : /* do we have enough to clean */
5085 0 : if (!force && (d->nr_updates + s->segs->nr_reused) < 1024)
5086 0 : continue;
5087 :
5088 0 : BAT *b = NULL, *bn = NULL;;
5089 0 : if ((b = bind_col(tr, c, 0)) == NULL)
5090 : return LOG_ERR;
5091 0 : if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
5092 0 : BBPreclaim(b);
5093 0 : return LOG_ERR;
5094 : }
5095 0 : int res = swap_bats(tr, c, bn);
5096 0 : d->nr_updates = 0;
5097 0 : BBPreclaim(b);
5098 0 : BBPreclaim(bn);
5099 0 : if (res != LOG_OK)
5100 0 : return res;
5101 : }
5102 0 : s->segs->nr_reused = 0;
5103 0 : return LOG_OK;
5104 : }
5105 :
5106 :
5107 : static int
5108 58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
5109 : {
5110 58 : bool update_conflict = false;
5111 :
5112 58 : if (segments_in_transaction(tr, col->t))
5113 : return LOG_CONFLICT;
5114 :
5115 58 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5116 :
5117 58 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5118 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5119 58 : assert(d && d->cs.ts == tr->tid);
5120 58 : assert(col->t->persistence != SQL_DECLARED_TABLE);
5121 58 : if (odelta != d)
5122 58 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
5123 :
5124 58 : d->cs.st = st;
5125 58 : d->cs.cleared = true;
5126 58 : if (d->cs.bid)
5127 58 : temp_destroy(d->cs.bid);
5128 58 : o = transfer_to_systrans(o);
5129 58 : if (o == NULL)
5130 : return LOG_ERR;
5131 58 : bat_set_access(o, BAT_READ);
5132 58 : d->cs.bid = temp_create(o);
5133 58 : if (u) {
5134 53 : if (d->cs.ebid)
5135 0 : temp_destroy(d->cs.ebid);
5136 53 : u = transfer_to_systrans(u);
5137 53 : if (u == NULL)
5138 : return LOG_ERR;
5139 53 : d->cs.ebid = temp_create(u);
5140 : }
5141 : return LOG_OK;
5142 : }
5143 :
5144 : void
5145 358 : bat_storage_init( store_functions *sf)
5146 : {
5147 358 : sf->bind_col = &bind_col;
5148 358 : sf->bind_updates = &bind_updates;
5149 358 : sf->bind_updates_idx = &bind_updates_idx;
5150 358 : sf->bind_idx = &bind_idx;
5151 358 : sf->bind_cands = &bind_cands;
5152 :
5153 358 : sf->claim_tab = &claim_tab;
5154 358 : sf->key_claim_tab = &key_claim_tab;
5155 358 : sf->tab_validate = &tab_validate;
5156 :
5157 358 : sf->append_col = &append_col;
5158 358 : sf->append_idx = &append_idx;
5159 :
5160 358 : sf->update_col = &update_col;
5161 358 : sf->update_idx = &update_idx;
5162 :
5163 358 : sf->delete_tab = &delete_tab;
5164 :
5165 358 : sf->count_del = &count_del;
5166 358 : sf->count_col = &count_col;
5167 358 : sf->count_idx = &count_idx;
5168 358 : sf->dcount_col = &dcount_col;
5169 358 : sf->min_max_col = &min_max_col;
5170 358 : sf->set_stats_col = &set_stats_col;
5171 358 : sf->sorted_col = &sorted_col;
5172 358 : sf->unique_col = &unique_col;
5173 358 : sf->double_elim_col = &double_elim_col;
5174 358 : sf->col_stats = &col_stats;
5175 358 : sf->col_set_range = &col_set_range;
5176 358 : sf->col_not_null = &col_not_null;
5177 :
5178 358 : sf->col_dup = &col_dup;
5179 358 : sf->idx_dup = &idx_dup;
5180 358 : sf->del_dup = &del_dup;
5181 :
5182 358 : sf->create_col = &create_col; /* create and add to change list */
5183 358 : sf->create_idx = &create_idx;
5184 358 : sf->create_del = &create_del;
5185 :
5186 358 : sf->destroy_col = &destroy_col; /* free resources */
5187 358 : sf->destroy_idx = &destroy_idx;
5188 358 : sf->destroy_del = &destroy_del;
5189 :
5190 358 : sf->drop_col = &drop_col; /* add drop to change list */
5191 358 : sf->drop_idx = &drop_idx;
5192 358 : sf->drop_del = &drop_del;
5193 :
5194 358 : sf->clear_table = &clear_table;
5195 :
5196 358 : sf->vacuum_col = &vacuum_col;
5197 358 : sf->vacuum_tab = &vacuum_tab;
5198 358 : sf->col_compress = &col_compress;
5199 358 : }
|