Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "bat_storage.h"
15 : #include "bat_utils.h"
16 : #include "sql_string.h"
17 : #include "gdk_atoms.h"
18 : #include "gdk_atoms.h"
19 : #include "matomic.h"
20 :
21 : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
22 : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
23 :
24 : static int log_update_col( sql_trans *tr, sql_change *c);
25 : static int log_update_idx( sql_trans *tr, sql_change *c);
26 : static int log_update_del( sql_trans *tr, sql_change *c);
27 : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
28 : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
29 : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
30 : static int log_create_col(sql_trans *tr, sql_change *change);
31 : static int log_create_idx(sql_trans *tr, sql_change *change);
32 : static int log_create_del(sql_trans *tr, sql_change *change);
33 : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
34 : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
35 : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
36 : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
37 : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
38 : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
39 : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
40 : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
41 :
42 : static void merge_delta( sql_delta *obat);
43 :
44 : /* valid
45 : * !deleted && VALID_4_READ(TS, tr) existing or newly created segment
46 : * deleted && TS > tr->ts && OLDTS < tr->ts deleted after current transaction
47 : */
48 :
49 : #define VALID_4_READ(TS,tr) \
50 : (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
51 :
52 : /* when changed, check if the old status is still valid */
53 : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
54 : (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
55 :
56 : #define SEG_VALID_4_DELETE(seg,tr) \
57 : (!seg->deleted && VALID_4_READ(seg->ts, tr))
58 :
59 : /* Delete (in current trans or by some other finised transaction, or re-used segment which used to be deleted */
60 : #define SEG_IS_DELETED(seg,tr) \
61 : ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
62 : (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
63 :
64 : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
65 : #define SEG_IS_VALID(seg, tr) \
66 : ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
67 : (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
68 :
69 : static inline BAT *
70 5152 : transfer_to_systrans(BAT *b)
71 : {
72 : /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
73 5152 : MT_lock_set(&b->theaplock);
74 5152 : if (VIEWtparent(b) || VIEWvtparent(b)) {
75 18 : MT_lock_unset(&b->theaplock);
76 18 : BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
77 18 : BBPreclaim(b);
78 18 : return bn;
79 : }
80 5134 : if (b->theap->farmid == TRANSIENT ||
81 14 : (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
82 4749 : QryCtx *qc = MT_thread_get_qry_ctx();
83 4750 : if (qc) {
84 2424 : if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
85 2423 : ATOMIC_SUB(&qc->datasize, b->theap->size);
86 2423 : b->theap->farmid = SYSTRANS;
87 2423 : b->batRole = SYSTRANS;
88 : }
89 2424 : if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
90 1061 : ATOMIC_SUB(&qc->datasize, b->tvheap->size);
91 1061 : b->tvheap->farmid = SYSTRANS;
92 : }
93 : }
94 : }
95 5135 : MT_lock_unset(&b->theaplock);
96 5134 : return b;
97 : }
98 :
99 : static void
100 26220459 : lock_table(sqlstore *store, sqlid id)
101 : {
102 26220459 : MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
103 26309700 : }
104 :
105 : static void
106 26309216 : unlock_table(sqlstore *store, sqlid id)
107 : {
108 26309216 : MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
109 26307788 : }
110 :
111 : static void
112 19917110 : lock_column(sqlstore *store, sqlid id)
113 : {
114 19917110 : MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
115 19950155 : }
116 :
117 : static void
118 19938144 : unlock_column(sqlstore *store, sqlid id)
119 : {
120 19938144 : MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
121 19957956 : }
122 :
123 : static void
124 109635 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
125 : {
126 109635 : assert(cleanup);
127 109635 : trans_add(tr, dup_base(b), data, cleanup, commit, log);
128 109633 : }
129 :
130 : static void
131 133019 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
132 : {
133 133019 : assert(cleanup);
134 133019 : dup_base(&t->base);
135 133018 : trans_add(tr, b, data, cleanup, commit, log);
136 133004 : }
137 :
138 : static int
139 76103 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
140 : {
141 76103 : segment *s = change->data;
142 :
143 76103 : if (s->ts <= oldest) {
144 32884 : while(s) {
145 20435 : segment *n = s->prev;
146 20435 : ATOMIC_PTR_DESTROY(&s->next);
147 20435 : _DELETE(s);
148 20435 : s = n;
149 : }
150 12449 : sqlstore *store = Store;
151 12449 : table_destroy(store, (sql_table*)change->obj);
152 12449 : return 1;
153 : }
154 : return LOG_OK;
155 : }
156 :
157 : static void
158 20435 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
159 : {
160 : /* we can only be accessed by anything older then commit_ts */
161 20435 : if (c->cleanup == &tc_gc_seg)
162 7986 : s->prev = c->data;
163 : else
164 12449 : c->cleanup = &tc_gc_seg;
165 20435 : c->data = s;
166 20435 : s->ts = commit_ts;
167 16063 : }
168 :
169 : static segment *
170 86502 : new_segment(segment *o, sql_trans *tr, size_t cnt)
171 : {
172 86502 : segment *n = (segment*)GDKmalloc(sizeof(segment));
173 :
174 86504 : assert(tr);
175 86504 : if (n) {
176 86504 : *n = (segment) {
177 86504 : .ts = tr->tid,
178 : .oldts = 0,
179 : .deleted = false,
180 : .start = 0,
181 : .end = cnt,
182 : .next = ATOMIC_PTR_VAR_INIT(NULL),
183 : .prev = NULL,
184 : };
185 86504 : if (o) {
186 35690 : n->start += o->end;
187 35690 : n->end += o->end;
188 35690 : ATOMIC_PTR_SET(&o->next, n);
189 : }
190 : }
191 86504 : return n;
192 : }
193 :
194 : static segment *
195 87579 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
196 : {
197 87579 : assert(tr);
198 87579 : if (o->start == start && o->end == start+cnt) {
199 9621 : assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
200 9621 : o->oldts = o->ts;
201 9621 : o->ts = tr->tid;
202 9621 : o->deleted = deleted;
203 9621 : return o;
204 : }
205 77958 : segment *n = (segment*)GDKmalloc(sizeof(segment));
206 :
207 77958 : if (!n)
208 : return NULL;
209 77958 : n->prev = NULL;
210 :
211 77958 : if (o->ts == tr->tid) {
212 4052 : n->oldts = 0;
213 4052 : n->ts = 1;
214 4052 : n->deleted = true;
215 : } else {
216 73906 : n->oldts = o->ts;
217 73906 : n->ts = tr->tid;
218 73906 : n->deleted = deleted;
219 : }
220 77958 : if (start == o->start) {
221 : /* 2-way split: o remains latter part of segment, new one is
222 : * inserted before */
223 63482 : n->start = o->start;
224 63482 : n->end = n->start + cnt;
225 63482 : ATOMIC_PTR_INIT(&n->next, o);
226 63482 : if (segs->h == o)
227 454 : segs->h = n;
228 63482 : if (p)
229 63028 : ATOMIC_PTR_SET(&p->next, n);
230 63482 : o->start = n->end;
231 14476 : } else if (start+cnt == o->end) {
232 : /* 2-way split: o remains first part of segment, new one is
233 : * added after */
234 5271 : n->start = o->end - cnt;
235 5271 : n->end = o->end;
236 5271 : ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
237 5271 : ATOMIC_PTR_SET(&o->next, n);
238 5271 : if (segs->t == o)
239 805 : segs->t = n;
240 5271 : o->end = n->start;
241 : } else {
242 : /* 3-way split: o remains first part of segment, two new ones
243 : * are added after */
244 9205 : segment *n2 = GDKmalloc(sizeof(segment));
245 9205 : if (n2 == NULL) {
246 0 : GDKfree(n);
247 0 : return NULL;
248 : }
249 9205 : ATOMIC_PTR_INIT(&n->next, n2);
250 9205 : n->start = start;
251 9205 : n->end = start + cnt;
252 9205 : *n2 = *o;
253 9205 : ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
254 9205 : n2->start = n->end;
255 9205 : n2->prev = NULL;
256 9205 : if (segs->t == o)
257 3455 : segs->t = n2;
258 9205 : ATOMIC_PTR_SET(&o->next, n);
259 9205 : o->end = start;
260 : }
261 : return n;
262 : }
263 :
264 : static void
265 3485 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
266 : {
267 3485 : segment *cur = segs->h, *seg = NULL;
268 15356 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
269 11871 : if (cur->ts == tr->tid) { /* revert */
270 3952 : cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
271 3952 : cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
272 3952 : cur->oldts = 0;
273 : }
274 11871 : if (cur->ts <= oldest) { /* possibly merge range */
275 11468 : if (!seg) { /* skip first */
276 : seg = cur;
277 7983 : } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
278 : /* merge with previous */
279 4372 : seg->end = cur->end;
280 4372 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
281 4372 : if (cur == segs->t)
282 2677 : segs->t = seg;
283 4372 : mark4destroy(cur, change, store_get_timestamp(tr->store));
284 4372 : cur = seg;
285 : } else {
286 : seg = cur; /* begin of new merge */
287 : }
288 : }
289 : }
290 3485 : }
291 :
292 : static size_t
293 100506 : segs_end_include_deleted( segments *segs, sql_trans *tr)
294 : {
295 100506 : size_t cnt = 0;
296 100506 : segment *s = segs->h, *l = NULL;
297 :
298 466626 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
299 366120 : if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
300 : l = s;
301 : }
302 100506 : if (l)
303 100499 : cnt = l->end;
304 100506 : return cnt;
305 : }
306 :
307 : static int
308 100506 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
309 : {
310 : /* set bits correctly */
311 100506 : BAT *b = temp_descriptor(cs->bid);
312 :
313 100506 : if (!b)
314 : return LOG_ERR;
315 100506 : segment *s = segs->h;
316 :
317 100506 : size_t nr = segs_end_include_deleted(segs, tr);
318 100506 : size_t rounded_nr = ((nr+31)&~31);
319 100506 : if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
320 0 : bat_destroy(b);
321 0 : return LOG_ERR;
322 : }
323 :
324 : /* disable all properties here */
325 100506 : MT_lock_set(&b->theaplock);
326 100506 : b->tsorted = false;
327 100506 : b->trevsorted = false;
328 100506 : b->tnosorted = 0;
329 100506 : b->tnorevsorted = 0;
330 100506 : b->tseqbase = oid_nil;
331 100506 : b->tkey = false;
332 100506 : b->tnokey[0] = 0;
333 100506 : b->tnokey[1] = 0;
334 100506 : b->theap->dirty = true;
335 100506 : BUN cnt = BATcount(b);
336 100506 : MT_lock_unset(&b->theaplock);
337 :
338 100506 : uint32_t *restrict dst;
339 : /* why hashlock ?? */
340 100506 : MT_rwlock_wrlock(&b->thashlock);
341 509601 : for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
342 349541 : if (s->start >= nr)
343 : break;
344 308589 : if (s->ts == tr->tid && s->end != s->start) {
345 145056 : if (cnt < s->start) { /* first mark as deleted ! */
346 3173 : size_t lnr = s->start-cnt;
347 3173 : size_t pos = cnt;
348 3173 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
349 3173 : uint32_t cur = 0;
350 :
351 3173 : size_t used = pos&31, end = 32;
352 3173 : if (used) {
353 3072 : if (lnr < (32-used))
354 2924 : end = used + lnr;
355 3072 : assert(end > used);
356 3072 : cur |= ((1U << (end - used)) - 1) << used;
357 3072 : lnr -= end - used;
358 3072 : *dst++ |= cur;
359 3072 : cur = 0;
360 : }
361 3173 : size_t full = lnr/32;
362 3173 : size_t rest = lnr%32;
363 3173 : if (full > 0) {
364 6 : memset(dst, ~0, full * sizeof(*dst));
365 6 : dst += full;
366 6 : lnr -= full * 32;
367 : }
368 3173 : if (rest > 0) {
369 142 : cur |= (1U << rest) - 1;
370 142 : lnr -= rest;
371 142 : *dst |= cur;
372 : }
373 3173 : assert(lnr==0);
374 : }
375 145056 : size_t lnr = s->end-s->start;
376 145056 : size_t pos = s->start;
377 145056 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
378 145056 : uint32_t cur = 0;
379 145056 : size_t used = pos&31, end = 32;
380 145056 : if (used) {
381 107437 : if (lnr < (32-used))
382 101527 : end = used + lnr;
383 107437 : assert(end > used);
384 107437 : cur |= ((1U << (end - used)) - 1) << used;
385 107437 : lnr -= end - used;
386 107437 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
387 107437 : dst++;
388 107437 : cur = 0;
389 : }
390 145056 : size_t full = lnr/32;
391 145056 : size_t rest = lnr%32;
392 145056 : if (full > 0) {
393 3513 : memset(dst, s->deleted?~0:0, full * sizeof(*dst));
394 3513 : dst += full;
395 3513 : lnr -= full * 32;
396 : }
397 145056 : if (rest > 0) {
398 40540 : cur |= (1U << rest) - 1;
399 40540 : lnr -= rest;
400 40540 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
401 : }
402 145056 : assert(lnr==0);
403 145056 : if (cnt < s->end)
404 308589 : cnt = s->end;
405 : }
406 : }
407 100506 : MT_rwlock_wrunlock(&b->thashlock);
408 100506 : if (nr > BATcount(b)) {
409 61946 : MT_lock_set(&b->theaplock);
410 61946 : BATsetcount(b, nr);
411 61946 : MT_lock_unset(&b->theaplock);
412 : }
413 :
414 100506 : bat_destroy(b);
415 100506 : return LOG_OK;
416 : }
417 :
418 : /* TODO return LOG_OK/ERR */
419 : static void
420 100532 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
421 : {
422 100532 : sqlstore* store = tr->store;
423 100532 : segment *cur = s->segs->h, *seg = NULL;
424 466724 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
425 366192 : if (cur->ts == tr->tid) {
426 158705 : if (!cur->deleted)
427 89990 : cur->oldts = 0;
428 158705 : cur->ts = commit_ts;
429 : }
430 366192 : if (!seg) {
431 : /* first segment */
432 : seg = cur;
433 : }
434 265660 : else if (seg->ts < TRANSACTION_ID_BASE) {
435 : /* possible merge since both deleted flags are equal */
436 246891 : if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
437 184859 : int merge = 1;
438 184859 : node *n = store->active->h;
439 581425 : for (int i = 0; i < store->active->cnt; i++, n = n->next) {
440 482948 : sql_trans* other = ((sql_trans*)n->data);
441 482948 : ulng active = other->ts;
442 482948 : if(other->active == 2)
443 26914 : continue; /* pretend that another recently committed transaction is no longer active */
444 456034 : if (active == tr->ts)
445 129362 : continue; /* pretend that committing transaction has already committed and is no longer active */
446 326672 : if (seg->ts < active && cur->ts < active)
447 : break;
448 315051 : if (seg->ts > active && cur->ts > active)
449 240290 : continue;
450 :
451 74761 : assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
452 : /* cannot safely merge since there is an active transaction between the segments */
453 : merge = false;
454 : break;
455 : }
456 : /* merge segments */
457 220196 : if (merge) {
458 110098 : seg->end = cur->end;
459 110098 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
460 110098 : if (cur == s->segs->t)
461 26569 : s->segs->t = seg;
462 110098 : if (commit_ts == oldest) {
463 94035 : ATOMIC_PTR_DESTROY(&cur->next);
464 94035 : _DELETE(cur);
465 : } else
466 32126 : mark4destroy(cur, change, commit_ts);
467 110098 : cur = seg;
468 110098 : continue;
469 : }
470 : }
471 : }
472 : seg = cur;
473 : }
474 100532 : }
475 :
476 : static int
477 2119110 : segments_in_transaction(sql_trans *tr, sql_table *t)
478 : {
479 2119110 : storage *s = ATOMIC_PTR_GET(&t->data);
480 2119110 : segment *seg = s->segs->h;
481 :
482 2119110 : if (seg && s->segs->t->ts == tr->tid)
483 : return 1;
484 636614 : for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
485 534990 : if (seg->ts == tr->tid)
486 : return 1;
487 : }
488 : return 0;
489 : }
490 :
491 : static size_t
492 18795497 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
493 : {
494 18795497 : size_t cnt = 0;
495 :
496 18795497 : lock_table(tr->store, table->base.id);
497 18862295 : segment *s = segs->h, *l = NULL;
498 :
499 18862295 : if (segs->t && SEG_IS_VALID(segs->t, tr))
500 15723325 : l = s = segs->t;
501 :
502 216332575 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
503 197470630 : if (SEG_IS_VALID(s, tr))
504 : l = s;
505 : }
506 18861945 : if (l)
507 18844715 : cnt = l->end;
508 18861945 : unlock_table(tr->store, table->base.id);
509 18861038 : return cnt;
510 : }
511 :
512 : static segments *
513 50812 : new_segments(sql_trans *tr, size_t cnt)
514 : {
515 50812 : segments *n = (segments*)GDKmalloc(sizeof(segments));
516 :
517 50814 : if (n) {
518 50814 : n->h = n->t = new_segment(NULL, tr, cnt);
519 50811 : if (!n->h) {
520 0 : GDKfree(n);
521 0 : return NULL;
522 : }
523 50811 : sql_ref_init(&n->r);
524 : }
525 : return n;
526 : }
527 :
528 : static sql_delta *
529 32481078 : timestamp_delta( sql_trans *tr, sql_delta *d)
530 : {
531 32534702 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
532 53624 : d = d->next;
533 32487987 : return d;
534 : }
535 :
536 : static sql_delta *
537 32319647 : col_timestamp_delta( sql_trans *tr, sql_column *c)
538 : {
539 32319647 : return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
540 : }
541 :
542 : static sql_delta *
543 25236 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
544 : {
545 25236 : return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
546 : }
547 :
548 : static storage *
549 19755407 : timestamp_storage( sql_trans *tr, storage *d)
550 : {
551 19755407 : if (!d)
552 : return NULL;
553 19828003 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
554 72596 : d = d->next;
555 : return d;
556 : }
557 :
558 : static storage *
559 19738078 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
560 : {
561 19738078 : return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
562 : }
563 :
564 : static sql_delta*
565 18536 : delta_dup(sql_delta *d)
566 : {
567 18536 : ATOMIC_INC(&d->cs.refcnt);
568 18536 : return d;
569 : }
570 :
571 : static void *
572 17167 : col_dup(sql_column *c)
573 : {
574 17167 : return delta_dup(ATOMIC_PTR_GET(&c->data));
575 : }
576 :
577 : static void *
578 2656 : idx_dup(sql_idx *i)
579 : {
580 2656 : if (!ATOMIC_PTR_GET(&i->data))
581 : return NULL;
582 1369 : return delta_dup(ATOMIC_PTR_GET(&i->data));
583 : }
584 :
585 : static storage*
586 1480 : storage_dup(storage *d)
587 : {
588 1480 : ATOMIC_INC(&d->cs.refcnt);
589 1480 : return d;
590 : }
591 :
592 : static void *
593 1480 : del_dup(sql_table *t)
594 : {
595 1480 : return storage_dup(ATOMIC_PTR_GET(&t->data));
596 : }
597 :
598 : static size_t
599 17 : count_inserts( segment *s, sql_trans *tr)
600 : {
601 17 : size_t cnt = 0;
602 :
603 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
604 55 : if (!s->deleted && s->ts == tr->tid)
605 4 : cnt += s->end - s->start;
606 : }
607 17 : return cnt;
608 : }
609 :
610 : static size_t
611 836462 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
612 : {
613 836462 : size_t cnt = 0;
614 :
615 978181 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
616 : ;
617 :
618 4833080 : for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
619 3996615 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
620 1310719 : cnt += s->end - s->start;
621 : }
622 836465 : return cnt;
623 : }
624 :
625 : static size_t
626 17 : count_deletes( segment *s, sql_trans *tr)
627 : {
628 17 : size_t cnt = 0;
629 :
630 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
631 55 : if (SEG_IS_DELETED(s, tr))
632 17 : cnt += s->end - s->start;
633 : }
634 17 : return cnt;
635 : }
636 :
637 : #define CNT_ACTIVE 10
638 :
639 : static size_t
640 18194771 : count_col(sql_trans *tr, sql_column *c, int access)
641 : {
642 18194771 : storage *d;
643 18194771 : sql_delta *ds;
644 :
645 18194771 : if (!isTable(c->t))
646 : return 0;
647 18194771 : d = tab_timestamp_storage(tr, c->t);
648 18206820 : ds = col_timestamp_delta(tr, c);
649 18230690 : if (!d ||!ds)
650 : return 0;
651 18230690 : if (access == 2)
652 445120 : return ds?ds->cs.ucnt:0;
653 17785570 : if (access == 1)
654 17 : return count_inserts(d->segs->h, tr);
655 17785553 : if (access == QUICK)
656 521081 : return d->segs->t?d->segs->t->end:0;
657 17264472 : if (access == CNT_ACTIVE) {
658 836276 : size_t cnt = segs_end(d->segs, tr, c->t);
659 836650 : lock_table(tr->store, c->t->base.id);
660 836476 : cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
661 836489 : unlock_table(tr->store, c->t->base.id);
662 836489 : return cnt;
663 : }
664 16428196 : return segs_end(d->segs, tr, c->t);
665 : }
666 :
667 : static size_t
668 21104 : count_idx(sql_trans *tr, sql_idx *i, int access)
669 : {
670 21104 : storage *d;
671 21104 : sql_delta *ds;
672 :
673 21104 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
674 4595 : return 0;
675 16510 : d = tab_timestamp_storage(tr, i->t);
676 16541 : ds = idx_timestamp_delta(tr, i);
677 16556 : if (!d || !ds)
678 : return 0;
679 16556 : if (access == 2)
680 2848 : return ds?ds->cs.ucnt:0;
681 13708 : if (access == 1)
682 0 : return count_inserts(d->segs->h, tr);
683 13708 : if (access == QUICK)
684 3301 : return d->segs->t?d->segs->t->end:0;
685 10407 : return segs_end(d->segs, tr, i->t);
686 : }
687 :
688 : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
689 : static BAT *
690 13933328 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
691 : {
692 13933328 : BAT *b;
693 :
694 13933328 : assert(access == RD_UPD_ID || access == RD_UPD_VAL);
695 : /* returns the updates for cs */
696 13933328 : if (cs->uibid && cs->uvbid && cs->ucnt) {
697 7616 : if (access == RD_UPD_ID) {
698 4906 : if (!(b = temp_descriptor(cs->uibid)))
699 : return NULL;
700 4906 : if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
701 936 : (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
702 3970 : oid nil = oid_nil;
703 : /* less then cnt */
704 3970 : BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false);
705 3970 : if (!s) {
706 0 : bat_destroy(b);
707 0 : return NULL;
708 : }
709 :
710 3970 : BAT *nb = BATproject(s, b);
711 3970 : bat_destroy(s);
712 3970 : bat_destroy(b);
713 3970 : b = nb;
714 : }
715 : } else {
716 2710 : b = temp_descriptor(cs->uvbid);
717 : }
718 : } else {
719 23210494 : b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
720 : }
721 : return b;
722 : }
723 :
724 : static BAT *
725 0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
726 : {
727 0 : int err = 0;
728 0 : BAT *uv = *UV;
729 0 : BUN cnt = BATcount(ui)+BATcount(oi);
730 0 : BATiter uvi;
731 0 : BATiter ovi;
732 :
733 0 : if (uv) {
734 0 : uvi = bat_iterator(uv);
735 0 : ovi = bat_iterator(ov);
736 : }
737 :
738 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
739 0 : BUN uip = 0, uie = BATcount(ui);
740 0 : BUN oip = 0, oie = BATcount(oi);
741 :
742 0 : oid uiseqb = ui->tseqbase;
743 0 : oid oiseqb = oi->tseqbase;
744 0 : oid *uipt = NULL, *oipt = NULL;
745 0 : BATiter uii = bat_iterator(ui);
746 0 : BATiter oii = bat_iterator(oi);
747 0 : if (!BATtdensebi(&uii))
748 0 : uipt = uii.base;
749 0 : if (!BATtdensebi(&oii))
750 0 : oipt = oii.base;
751 :
752 0 : if (uiseqb == oiseqb && uie == oie) { /* full overlap, no values */
753 0 : if (uv) {
754 0 : bat_iterator_end(&uvi);
755 0 : bat_iterator_end(&ovi);
756 : }
757 0 : bat_iterator_end(&uii);
758 0 : bat_iterator_end(&oii);
759 0 : if (uv) {
760 0 : *UV = uv;
761 : } else {
762 0 : bat_destroy(uv);
763 : }
764 0 : bat_destroy(oi);
765 0 : bat_destroy(ov);
766 0 : return ui;
767 : }
768 0 : BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
769 0 : BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
770 :
771 0 : if (!ni || (uv && !nv)) {
772 0 : bat_destroy(ni);
773 0 : bat_destroy(nv);
774 0 : bat_destroy(ui);
775 0 : bat_destroy(uv);
776 0 : bat_destroy(oi);
777 0 : bat_destroy(ov);
778 0 : return NULL;
779 : }
780 0 : while (uip < uie && oip < oie && !err) {
781 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
782 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
783 :
784 0 : if (uiid <= oiid) {
785 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
786 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
787 : err = 1;
788 0 : uip++;
789 0 : if (uiid == oiid)
790 0 : oip++;
791 : } else { /* uiid > oiid */
792 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
793 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
794 : err = 1;
795 0 : oip++;
796 : }
797 : }
798 0 : while (uip < uie && !err) {
799 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
800 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
801 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
802 : err = 1;
803 0 : uip++;
804 : }
805 0 : while (oip < oie && !err) {
806 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
807 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
808 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
809 : err = 1;
810 0 : oip++;
811 : }
812 0 : if (uv) {
813 0 : bat_iterator_end(&uvi);
814 0 : bat_iterator_end(&ovi);
815 : }
816 0 : bat_iterator_end(&uii);
817 0 : bat_iterator_end(&oii);
818 0 : bat_destroy(ui);
819 0 : bat_destroy(uv);
820 0 : bat_destroy(oi);
821 0 : bat_destroy(ov);
822 0 : if (!err) {
823 0 : if (nv)
824 0 : *UV = nv;
825 0 : return ni;
826 : }
827 0 : *UV = NULL;
828 0 : bat_destroy(ni);
829 0 : bat_destroy(nv);
830 0 : return NULL;
831 : }
832 :
833 : static sql_delta *
834 9294339 : older_delta( sql_delta *d, sql_trans *tr)
835 : {
836 9294339 : sql_delta *o = d->next;
837 :
838 9299485 : while (o && !o->cs.merged) {
839 5133 : if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
840 : break;
841 : else
842 5146 : o = o->next;
843 : }
844 9294352 : if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
845 0 : return o;
846 : return NULL;
847 : }
848 :
849 : static BAT *
850 9289407 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
851 : {
852 9289407 : assert(tr->active);
853 9289407 : sql_delta *o = NULL;
854 9289407 : BAT *ui = NULL, *uv = NULL;
855 :
856 9289407 : if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
857 : return NULL;
858 9293947 : if (access == RD_UPD_VAL) {
859 4647278 : if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
860 0 : bat_destroy(ui);
861 0 : return NULL;
862 : }
863 : }
864 9294275 : while ((o = older_delta(d, tr)) != NULL) {
865 0 : BAT *oui = NULL, *ouv = NULL;
866 0 : if (!oui)
867 0 : oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
868 0 : if (access == RD_UPD_VAL)
869 0 : ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
870 0 : if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
871 0 : bat_destroy(ui);
872 0 : bat_destroy(uv);
873 0 : bat_destroy(oui);
874 0 : bat_destroy(ouv);
875 0 : return NULL;
876 : }
877 0 : if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
878 : return NULL;
879 : d = o;
880 : }
881 9294382 : if (uv) {
882 4647607 : bat_destroy(ui);
883 4647607 : return uv;
884 : }
885 : return ui;
886 : }
887 :
888 : static BAT *
889 519 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
890 : {
891 519 : lock_column(tr->store, c->base.id);
892 519 : sql_delta *d = col_timestamp_delta(tr, c);
893 519 : int type = c->type.type->localtype;
894 :
895 519 : if (!d) {
896 0 : unlock_column(tr->store, c->base.id);
897 0 : return NULL;
898 : }
899 519 : if (d->cs.st == ST_DICT) {
900 0 : BAT *b = quick_descriptor(d->cs.bid);
901 :
902 0 : type = b->ttype;
903 : }
904 519 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
905 519 : unlock_column(tr->store, c->base.id);
906 519 : return bn;
907 : }
908 :
909 : static BAT *
910 0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
911 : {
912 0 : lock_column(tr->store, i->base.id);
913 0 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
914 0 : sql_delta *d = idx_timestamp_delta(tr, i);
915 :
916 0 : if (!d) {
917 0 : unlock_column(tr->store, i->base.id);
918 0 : return NULL;
919 : }
920 0 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
921 0 : unlock_column(tr->store, i->base.id);
922 0 : return bn;
923 : }
924 :
925 : static BAT *
926 9498280 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
927 : {
928 9498280 : BAT *b;
929 :
930 9498280 : assert(access == RDONLY || access == QUICK || access == RD_EXT);
931 9498280 : assert(cs != NULL);
932 9498280 : if (access == QUICK)
933 134150 : return quick_descriptor(cs->bid);
934 9364130 : if (access == RD_EXT)
935 860 : return temp_descriptor(cs->ebid);
936 9363270 : assert(cs->bid);
937 9363270 : b = temp_descriptor(cs->bid);
938 9364304 : if (b == NULL)
939 : return NULL;
940 9364304 : assert(b->batRestricted == BAT_READ);
941 : /* return slice */
942 9364304 : BAT *s = BATslice(b, 0, cnt);
943 9348393 : bat_destroy(b);
944 9348393 : return s;
945 : }
946 :
947 : static int
948 4645063 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
949 : {
950 4645063 : lock_column(tr->store, c->base.id);
951 4645550 : size_t cnt = count_col(tr, c, 0);
952 4646855 : sql_delta *d = col_timestamp_delta(tr, c);
953 4646081 : int type = c->type.type->localtype;
954 :
955 4646081 : if (!d) {
956 0 : unlock_column(tr->store, c->base.id);
957 0 : return LOG_ERR;
958 : }
959 4646081 : if (d->cs.st == ST_DICT) {
960 2 : BAT *b = quick_descriptor(d->cs.bid);
961 :
962 2 : type = b->ttype;
963 : }
964 :
965 4646081 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
966 4646908 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
967 :
968 4646881 : unlock_column(tr->store, c->base.id);
969 :
970 4646647 : if (*ui == NULL || *uv == NULL) {
971 0 : bat_destroy(*ui);
972 0 : bat_destroy(*uv);
973 0 : return LOG_ERR;
974 : }
975 : return LOG_OK;
976 : }
977 :
978 : static int
979 57 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
980 : {
981 57 : lock_column(tr->store, i->base.id);
982 57 : size_t cnt = count_idx(tr, i, 0);
983 57 : sql_delta *d = idx_timestamp_delta(tr, i);
984 57 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
985 :
986 57 : if (!d) {
987 0 : unlock_column(tr->store, i->base.id);
988 0 : return LOG_ERR;
989 : }
990 :
991 57 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
992 57 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
993 :
994 57 : unlock_column(tr->store, i->base.id);
995 :
996 57 : if (*ui == NULL || *uv == NULL) {
997 0 : bat_destroy(*ui);
998 0 : bat_destroy(*uv);
999 0 : return LOG_ERR;
1000 : }
1001 : return LOG_OK;
1002 : }
1003 :
1004 : static void * /* BAT * */
1005 9484353 : bind_col(sql_trans *tr, sql_column *c, int access)
1006 : {
1007 9484353 : assert(access == QUICK || tr->active);
1008 9484353 : if (!isTable(c->t))
1009 : return NULL;
1010 9484353 : sql_delta *d = col_timestamp_delta(tr, c);
1011 9486778 : if (!d)
1012 : return NULL;
1013 9486778 : size_t cnt = count_col(tr, c, 0);
1014 9490911 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
1015 519 : return bind_ucol(tr, c, access, cnt);
1016 9490392 : BAT *b = cs_bind_bat( &d->cs, access, cnt);
1017 9480612 : assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
1018 : return b;
1019 : }
1020 :
1021 : static void * /* BAT * */
1022 8681 : bind_idx(sql_trans *tr, sql_idx * i, int access)
1023 : {
1024 8681 : assert(access == QUICK || tr->active);
1025 8681 : if (!isTable(i->t))
1026 : return NULL;
1027 8681 : sql_delta *d = idx_timestamp_delta(tr, i);
1028 8685 : if (!d)
1029 : return NULL;
1030 8685 : size_t cnt = count_idx(tr, i, 0);
1031 8685 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
1032 0 : return bind_uidx(tr, i, access, cnt);
1033 8685 : return cs_bind_bat( &d->cs, access, cnt);
1034 : }
1035 :
1036 : static int
1037 3888 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
1038 : {
1039 3888 : if (!cs->uibid) {
1040 0 : cs->uibid = e_bat(TYPE_oid);
1041 0 : if (cs->uibid == BID_NIL)
1042 : return LOG_ERR;
1043 : }
1044 3888 : if (!cs->uvbid) {
1045 0 : BAT *cur = quick_descriptor(cs->bid);
1046 0 : if (!cur)
1047 : return LOG_ERR;
1048 0 : int type = cur->ttype;
1049 0 : cs->uvbid = e_bat(type);
1050 0 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1051 : return LOG_ERR;
1052 : }
1053 3888 : BAT *ui = temp_descriptor(cs->uibid);
1054 3888 : BAT *uv = temp_descriptor(cs->uvbid);
1055 :
1056 3888 : if (ui == NULL || uv == NULL) {
1057 0 : bat_destroy(ui);
1058 0 : bat_destroy(uv);
1059 0 : return LOG_ERR;
1060 : }
1061 3888 : assert(ui && uv);
1062 3888 : if (isEbat(ui)){
1063 380 : temp_destroy(cs->uibid);
1064 380 : cs->uibid = temp_copy(ui->batCacheid, true, true);
1065 380 : bat_destroy(ui);
1066 380 : if (cs->uibid == BID_NIL ||
1067 380 : (ui = temp_descriptor(cs->uibid)) == NULL) {
1068 0 : bat_destroy(uv);
1069 0 : return LOG_ERR;
1070 : }
1071 : }
1072 3888 : if (isEbat(uv)){
1073 380 : temp_destroy(cs->uvbid);
1074 380 : cs->uvbid = temp_copy(uv->batCacheid, true, true);
1075 380 : bat_destroy(uv);
1076 380 : if (cs->uvbid == BID_NIL ||
1077 380 : (uv = temp_descriptor(cs->uvbid)) == NULL) {
1078 0 : bat_destroy(ui);
1079 0 : return LOG_ERR;
1080 : }
1081 : }
1082 3888 : *Ui = ui;
1083 3888 : *Uv = uv;
1084 3888 : return LOG_OK;
1085 : }
1086 :
1087 : static int
1088 6346 : segments_is_append(segment *s, sql_trans *tr, oid rid)
1089 : {
1090 93115 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1091 93115 : if (s->start <= rid && s->end > rid) {
1092 6346 : if (s->ts == tr->tid && !s->deleted) {
1093 2564 : return 1;
1094 : }
1095 : break;
1096 : }
1097 : }
1098 : return 0;
1099 : }
1100 :
1101 : static int
1102 3782 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
1103 : {
1104 87937 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1105 87937 : if (s->start <= rid && s->end > rid) {
1106 3782 : if (s->ts >= tr->ts && s->deleted) {
1107 0 : return 1;
1108 : }
1109 : break;
1110 : }
1111 : }
1112 : return 0;
1113 : }
1114 :
1115 : static sql_delta *
1116 0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
1117 : {
1118 0 : sql_delta *n = ZNEW(sql_delta);
1119 0 : if (!n)
1120 : return NULL;
1121 0 : *n = *bat;
1122 0 : n->next = NULL;
1123 0 : n->cs.ts = tr->tid;
1124 0 : return n;
1125 : }
1126 :
1127 : static BAT *
1128 17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
1129 : {
1130 17 : BAT *newoffsets = NULL;
1131 17 : sql_delta *bat = *batp;
1132 17 : column_storage *cs = &bat->cs;
1133 17 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1134 :
1135 17 : if (!u)
1136 : return NULL;
1137 17 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1138 17 : if (DICTprepare4append(&newoffsets, i, u) < 0) {
1139 0 : bat_destroy(u);
1140 0 : return NULL;
1141 : } else {
1142 17 : int new = 0;
1143 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1144 17 : if (BATcount(u) >= max_cnt) {
1145 1 : if (max_cnt == 64*1024) { /* decompress */
1146 0 : if (!(b = temp_descriptor(cs->bid))) {
1147 0 : bat_destroy(u);
1148 0 : return NULL;
1149 : }
1150 0 : if (cs->ucnt) {
1151 0 : BAT *ui = NULL, *uv = NULL;
1152 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1153 0 : bat_destroy(b);
1154 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1155 0 : bat_destroy(nb);
1156 0 : bat_destroy(u);
1157 0 : return NULL;
1158 : }
1159 0 : b = nb;
1160 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1161 0 : bat_destroy(ui);
1162 0 : bat_destroy(uv);
1163 0 : bat_destroy(b);
1164 0 : bat_destroy(u);
1165 : }
1166 0 : bat_destroy(ui);
1167 0 : bat_destroy(uv);
1168 : }
1169 0 : n = DICTdecompress_(b, u, PERSISTENT);
1170 0 : bat_destroy(b);
1171 0 : assert(newoffsets == NULL);
1172 0 : if (!n) {
1173 0 : bat_destroy(u);
1174 0 : return NULL;
1175 : }
1176 0 : if (cs->ts != tr->tid) {
1177 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1178 0 : bat_destroy(n);
1179 0 : return NULL;
1180 : }
1181 0 : cs = &(*batp)->cs;
1182 0 : new = 1;
1183 : }
1184 0 : if (cs->bid && !new)
1185 0 : temp_destroy(cs->bid);
1186 0 : n = transfer_to_systrans(n);
1187 0 : if (n == NULL)
1188 : return NULL;
1189 0 : bat_set_access(n, BAT_READ);
1190 0 : cs->bid = temp_create(n);
1191 0 : bat_destroy(n);
1192 0 : if (cs->ebid && !new)
1193 0 : temp_destroy(cs->ebid);
1194 0 : cs->ebid = 0;
1195 0 : cs->ucnt = 0;
1196 0 : if (cs->uibid && !new)
1197 0 : temp_destroy(cs->uibid);
1198 0 : if (cs->uvbid && !new)
1199 0 : temp_destroy(cs->uvbid);
1200 0 : cs->uibid = cs->uvbid = 0;
1201 0 : cs->st = ST_DEFAULT;
1202 0 : cs->cleared = true;
1203 : } else {
1204 1 : if (!(b = temp_descriptor(cs->bid))) {
1205 0 : bat_destroy(newoffsets);
1206 0 : bat_destroy(u);
1207 0 : return NULL;
1208 : }
1209 1 : n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), PERSISTENT);
1210 1 : bat_destroy(b);
1211 1 : if (!n) {
1212 0 : bat_destroy(newoffsets);
1213 0 : bat_destroy(u);
1214 0 : return NULL;
1215 : }
1216 1 : if (cs->ts != tr->tid) {
1217 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1218 0 : bat_destroy(n);
1219 0 : return NULL;
1220 : }
1221 0 : cs = &(*batp)->cs;
1222 0 : new = 1;
1223 0 : temp_dup(cs->ebid);
1224 0 : if (cs->uibid) {
1225 0 : temp_dup(cs->uibid);
1226 0 : temp_dup(cs->uvbid);
1227 : }
1228 : }
1229 1 : if (cs->bid && !new)
1230 1 : temp_destroy(cs->bid);
1231 1 : n = transfer_to_systrans(n);
1232 1 : if (n == NULL)
1233 : return NULL;
1234 1 : bat_set_access(n, BAT_READ);
1235 1 : cs->bid = temp_create(n);
1236 1 : bat_destroy(n);
1237 1 : cs->cleared = true;
1238 1 : i = newoffsets;
1239 : }
1240 : } else { /* append */
1241 16 : i = newoffsets;
1242 : }
1243 : }
1244 17 : bat_destroy(u);
1245 17 : return i;
1246 : }
1247 :
1248 : static BAT *
1249 0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
1250 : {
1251 0 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1252 0 : BAT *newoffsets = NULL;
1253 0 : BAT *b = NULL, *n = NULL;
1254 :
1255 0 : if (!(b = temp_descriptor(cs->bid)))
1256 : return NULL;
1257 :
1258 0 : if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
1259 0 : bat_destroy(b);
1260 0 : return NULL;
1261 : } else {
1262 : /* returns new offset bat if values within min/max, else decompress */
1263 0 : if (!newoffsets) { /* decompress */
1264 0 : if (cs->ucnt) {
1265 0 : BAT *ui = NULL, *uv = NULL;
1266 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1267 0 : bat_destroy(b);
1268 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1269 0 : bat_destroy(nb);
1270 0 : return NULL;
1271 : }
1272 0 : b = nb;
1273 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1274 0 : bat_destroy(ui);
1275 0 : bat_destroy(uv);
1276 0 : bat_destroy(b);
1277 : }
1278 0 : bat_destroy(ui);
1279 0 : bat_destroy(uv);
1280 : }
1281 0 : n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
1282 0 : bat_destroy(b);
1283 0 : if (!n)
1284 : return NULL;
1285 0 : if (cs->bid)
1286 0 : temp_destroy(cs->bid);
1287 0 : n = transfer_to_systrans(n);
1288 0 : if (n == NULL)
1289 : return NULL;
1290 0 : bat_set_access(n, BAT_READ);
1291 0 : cs->bid = temp_create(n);
1292 0 : cs->ucnt = 0;
1293 0 : if (cs->uibid)
1294 0 : temp_destroy(cs->uibid);
1295 0 : if (cs->uvbid)
1296 0 : temp_destroy(cs->uvbid);
1297 0 : cs->uibid = cs->uvbid = 0;
1298 0 : cs->st = ST_DEFAULT;
1299 0 : cs->cleared = true;
1300 0 : b = n;
1301 : } else { /* append */
1302 : i = newoffsets;
1303 : }
1304 : }
1305 0 : bat_destroy(b);
1306 0 : return i;
1307 : }
1308 :
1309 : /*
1310 : * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
1311 : */
1312 : static int
1313 3009 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
1314 : {
1315 3009 : int res = LOG_OK;
1316 3009 : sql_delta *bat = *batp;
1317 3009 : column_storage *cs = &bat->cs;
1318 3009 : BAT *otids = tids, *oupdates = updates;
1319 :
1320 3009 : if (!BATcount(tids))
1321 : return LOG_OK;
1322 :
1323 3009 : if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
1324 6 : tids = BATunmask(tids);
1325 6 : if (!tids)
1326 : return LOG_ERR;
1327 : }
1328 3009 : if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
1329 0 : updates = BATunmask(updates);
1330 0 : if (!updates) {
1331 0 : if (otids != tids)
1332 0 : bat_destroy(tids);
1333 0 : return LOG_ERR;
1334 : }
1335 3009 : } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
1336 42 : updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
1337 42 : if (!updates) {
1338 0 : if (otids != tids)
1339 0 : bat_destroy(tids);
1340 0 : return LOG_ERR;
1341 : }
1342 : }
1343 :
1344 3009 : if (cs->st == ST_DICT) {
1345 : /* possibly a new array is returned */
1346 4 : BAT *nupdates = dict_append_bat(tr, batp, updates);
1347 4 : bat = *batp;
1348 4 : cs = &bat->cs;
1349 4 : if (oupdates != updates)
1350 0 : bat_destroy(updates);
1351 4 : updates = nupdates;
1352 4 : if (!updates) {
1353 0 : if (otids != tids)
1354 0 : bat_destroy(tids);
1355 0 : return LOG_ERR;
1356 : }
1357 : }
1358 :
1359 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1360 : /* currently only one update delta is possible */
1361 3009 : lock_table(tr->store, t->base.id);
1362 3009 : storage *s = ATOMIC_PTR_GET(&t->data);
1363 3009 : if (!is_new && !cs->cleared) {
1364 2693 : if (!tids->tsorted /* make sure we have simple dense or oids */) {
1365 0 : BAT *sorted, *order;
1366 0 : if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
1367 0 : if (otids != tids)
1368 0 : bat_destroy(tids);
1369 0 : if (oupdates != updates)
1370 0 : bat_destroy(updates);
1371 0 : unlock_table(tr->store, t->base.id);
1372 0 : return LOG_ERR;
1373 : }
1374 0 : if (otids != tids)
1375 0 : bat_destroy(tids);
1376 0 : tids = sorted;
1377 0 : BAT *nupdates = BATproject(order, updates);
1378 0 : bat_destroy(order);
1379 0 : if (oupdates != updates)
1380 0 : bat_destroy(updates);
1381 0 : updates = nupdates;
1382 0 : if (!updates) {
1383 0 : bat_destroy(tids);
1384 0 : unlock_table(tr->store, t->base.id);
1385 0 : return LOG_ERR;
1386 : }
1387 : }
1388 2693 : assert(tids->tsorted);
1389 2693 : BAT *ui = NULL, *uv = NULL;
1390 :
1391 : /* handle updates on just inserted bits */
1392 : /* handle updates on updates (within one transaction) */
1393 2693 : BATiter upi = bat_iterator(updates);
1394 2693 : BUN cnt = 0, ucnt = BATcount(tids);
1395 2693 : BAT *b, *ins = NULL;
1396 2693 : int *msk = NULL;
1397 :
1398 2693 : if((b = temp_descriptor(cs->bid)) == NULL)
1399 : res = LOG_ERR;
1400 :
1401 2693 : if (res == LOG_OK && BATtdense(tids)) {
1402 2464 : oid start = tids->tseqbase, offset = start;
1403 2464 : oid end = start + ucnt;
1404 :
1405 8302 : for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
1406 6460 : if (seg->start <= start && seg->end > start) {
1407 : /* check for delete conflicts */
1408 2464 : if (seg->ts >= tr->ts && seg->deleted) {
1409 0 : res = LOG_CONFLICT;
1410 0 : continue;
1411 : }
1412 :
1413 : /* check for inplace updates */
1414 2464 : BUN lend = end < seg->end?end:seg->end;
1415 2464 : if (seg->ts == tr->tid && !seg->deleted) {
1416 133 : if (!ins) {
1417 133 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1418 133 : if (!ins)
1419 : res = LOG_ERR;
1420 : else {
1421 133 : BATsetcount(ins, ucnt); /* all full updates */
1422 133 : msk = (int*)Tloc(ins, 0);
1423 133 : BUN end = (ucnt+31)/32;
1424 133 : memset(msk, 0, end * sizeof(int));
1425 : }
1426 : }
1427 726 : for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
1428 593 : const void *upd = BUNtail(upi, rid-offset);
1429 593 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1430 0 : res = LOG_ERR;
1431 :
1432 593 : oid word = i/32;
1433 593 : int pos = i%32;
1434 593 : msk[word] |= 1U<<pos;
1435 593 : cnt++;
1436 : }
1437 : }
1438 : }
1439 6460 : if (end < seg->end)
1440 : break;
1441 : }
1442 232 : } else if (res == LOG_OK && complex_cand(tids)) {
1443 3 : struct canditer ci;
1444 3 : segment *seg = s->segs->h;
1445 3 : canditer_init(&ci, NULL, tids);
1446 3 : BUN i = 0;
1447 1036 : while ( seg && res == LOG_OK && i < ucnt) {
1448 1033 : oid rid = canditer_next(&ci);
1449 1033 : if (seg->end <= rid)
1450 13 : seg = ATOMIC_PTR_GET(&seg->next);
1451 1020 : else if (seg->start <= rid && seg->end > rid) {
1452 : /* check for delete conflicts */
1453 1020 : if (seg->ts >= tr->ts && seg->deleted) {
1454 0 : res = LOG_CONFLICT;
1455 0 : continue;
1456 : }
1457 :
1458 : /* check for inplace updates */
1459 1020 : if (seg->ts == tr->tid && !seg->deleted) {
1460 0 : if (!ins) {
1461 0 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1462 0 : if (!ins) {
1463 : res = LOG_ERR;
1464 : break;
1465 : } else {
1466 0 : BATsetcount(ins, ucnt); /* all full updates */
1467 0 : msk = (int*)Tloc(ins, 0);
1468 0 : BUN end = (ucnt+31)/32;
1469 0 : memset(msk, 0, end * sizeof(int));
1470 : }
1471 : }
1472 0 : ptr upd = BUNtail(upi, i);
1473 0 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1474 0 : res = LOG_ERR;
1475 :
1476 0 : oid word = i/32;
1477 0 : int pos = i%32;
1478 0 : msk[word] |= 1U<<pos;
1479 0 : cnt++;
1480 : }
1481 1020 : i++;
1482 : }
1483 : }
1484 226 : } else if (res == LOG_OK) {
1485 226 : BUN i = 0;
1486 226 : oid *rid = Tloc(tids,0);
1487 226 : segment *seg = s->segs->h;
1488 28077 : while ( seg && res == LOG_OK && i < ucnt) {
1489 27851 : if (seg->end <= rid[i])
1490 8113 : seg = ATOMIC_PTR_GET(&seg->next);
1491 19738 : else if (seg->start <= rid[i] && seg->end > rid[i]) {
1492 : /* check for delete conflicts */
1493 19738 : if (seg->ts >= tr->ts && seg->deleted) {
1494 0 : res = LOG_CONFLICT;
1495 0 : continue;
1496 : }
1497 :
1498 : /* check for inplace updates */
1499 19738 : if (seg->ts == tr->tid && !seg->deleted) {
1500 417 : if (!ins) {
1501 47 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1502 47 : if (!ins) {
1503 : res = LOG_ERR;
1504 : break;
1505 : } else {
1506 47 : BATsetcount(ins, ucnt); /* all full updates */
1507 47 : msk = (int*)Tloc(ins, 0);
1508 47 : BUN end = (ucnt+31)/32;
1509 47 : memset(msk, 0, end * sizeof(int));
1510 : }
1511 : }
1512 417 : const void *upd = BUNtail(upi, i);
1513 417 : if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
1514 0 : res = LOG_ERR;
1515 :
1516 417 : oid word = i/32;
1517 417 : int pos = i%32;
1518 417 : msk[word] |= 1U<<pos;
1519 417 : cnt++;
1520 : }
1521 19738 : i++;
1522 : }
1523 : }
1524 : }
1525 :
1526 2693 : if (res == LOG_OK && cnt < ucnt) { /* now handle real updates */
1527 2513 : if (cs->ucnt == 0) {
1528 2407 : if (cnt) {
1529 0 : BAT *nins = BATmaskedcands(0, ucnt, ins, false);
1530 0 : if (nins) {
1531 0 : ui = BATproject(nins, tids);
1532 0 : uv = BATproject(nins, updates);
1533 0 : bat_destroy(nins);
1534 : }
1535 : } else {
1536 2407 : ui = temp_descriptor(tids->batCacheid);
1537 2407 : uv = temp_descriptor(updates->batCacheid);
1538 : }
1539 2407 : if (!ui || !uv) {
1540 : res = LOG_ERR;
1541 : } else {
1542 2407 : temp_destroy(cs->uibid);
1543 2407 : temp_destroy(cs->uvbid);
1544 2407 : ui = transfer_to_systrans(ui);
1545 2407 : uv = transfer_to_systrans(uv);
1546 2407 : if (ui == NULL || uv == NULL) {
1547 0 : BBPreclaim(ui);
1548 0 : BBPreclaim(uv);
1549 : res = LOG_ERR;
1550 : } else {
1551 2407 : cs->uibid = temp_create(ui);
1552 2407 : cs->uvbid = temp_create(uv);
1553 2407 : cs->ucnt = BATcount(ui);
1554 : }
1555 : }
1556 : } else {
1557 106 : BAT *nui = NULL, *nuv = NULL;
1558 :
1559 : /* merge taking msk of inserted into account */
1560 106 : if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
1561 : res = LOG_ERR;
1562 :
1563 106 : if (res == LOG_OK) {
1564 106 : const void *upd = NULL;
1565 106 : nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
1566 106 : nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
1567 :
1568 106 : if (!nui || !nuv) {
1569 : res = LOG_ERR;
1570 : } else {
1571 106 : BATiter ovi = bat_iterator(uv);
1572 :
1573 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
1574 106 : BUN uip = 0, uie = BATcount(ui);
1575 106 : BUN nip = 0, nie = BATcount(tids);
1576 106 : oid uiseqb = ui->tseqbase;
1577 106 : oid niseqb = tids->tseqbase;
1578 106 : oid *uipt = NULL, *nipt = NULL;
1579 106 : BATiter uii = bat_iterator(ui);
1580 106 : BATiter tidsi = bat_iterator(tids);
1581 106 : if (!BATtdensebi(&uii))
1582 105 : uipt = uii.base;
1583 106 : if (!BATtdensebi(&tidsi))
1584 105 : nipt = tidsi.base;
1585 26789 : while (uip < uie && nip < nie && res == LOG_OK) {
1586 26683 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1587 26683 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1588 :
1589 26683 : if (uiv < niv) {
1590 15068 : upd = BUNtail(ovi, uip);
1591 30136 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1592 15068 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1593 : res = LOG_ERR;
1594 15068 : uip++;
1595 11615 : } else if (uiv == niv) {
1596 : /* handle == */
1597 18 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1598 18 : upd = BUNtail(upi, nip);
1599 36 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1600 18 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1601 : res = LOG_ERR;
1602 : } else {
1603 0 : upd = BUNtail(ovi, uip);
1604 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1605 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1606 : res = LOG_ERR;
1607 : }
1608 18 : uip++;
1609 18 : nip++;
1610 : } else { /* uiv > niv */
1611 11597 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1612 11597 : upd = BUNtail(upi, nip);
1613 23194 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1614 11597 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1615 : res = LOG_ERR;
1616 : }
1617 11597 : nip++;
1618 : }
1619 : }
1620 542 : while (uip < uie && res == LOG_OK) {
1621 436 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1622 436 : upd = BUNtail(ovi, uip);
1623 872 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1624 436 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1625 : res = LOG_ERR;
1626 436 : uip++;
1627 : }
1628 799 : while (nip < nie && res == LOG_OK) {
1629 693 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1630 693 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1631 693 : upd = BUNtail(upi, nip);
1632 1386 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1633 693 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1634 : res = LOG_ERR;
1635 : }
1636 693 : nip++;
1637 : }
1638 106 : bat_iterator_end(&uii);
1639 106 : bat_iterator_end(&tidsi);
1640 106 : bat_iterator_end(&ovi);
1641 106 : if (res == LOG_OK) {
1642 106 : temp_destroy(cs->uibid);
1643 106 : temp_destroy(cs->uvbid);
1644 106 : nui = transfer_to_systrans(nui);
1645 106 : nuv = transfer_to_systrans(nuv);
1646 106 : if (nui == NULL || nuv == NULL) {
1647 : res = LOG_ERR;
1648 : } else {
1649 106 : cs->uibid = temp_create(nui);
1650 106 : cs->uvbid = temp_create(nuv);
1651 106 : cs->ucnt = BATcount(nui);
1652 : }
1653 : }
1654 : }
1655 106 : bat_destroy(nui);
1656 106 : bat_destroy(nuv);
1657 : }
1658 : }
1659 : }
1660 2693 : bat_iterator_end(&upi);
1661 2693 : bat_destroy(b);
1662 2693 : unlock_table(tr->store, t->base.id);
1663 2693 : bat_destroy(ins);
1664 2693 : bat_destroy(ui);
1665 2693 : bat_destroy(uv);
1666 2693 : if (otids != tids)
1667 4 : bat_destroy(tids);
1668 2693 : if (oupdates != updates)
1669 5 : bat_destroy(updates);
1670 2693 : return res;
1671 : } else if (is_new || cs->cleared) {
1672 316 : BAT *b = temp_descriptor(cs->bid);
1673 :
1674 316 : if (b == NULL) {
1675 : res = LOG_ERR;
1676 : } else {
1677 316 : if (BATcount(b)==0) {
1678 1 : if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
1679 0 : res = LOG_ERR;
1680 315 : } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
1681 0 : res = LOG_ERR;
1682 316 : BBPcold(b->batCacheid);
1683 316 : bat_destroy(b);
1684 : }
1685 : }
1686 316 : unlock_table(tr->store, t->base.id);
1687 316 : if (otids != tids)
1688 2 : bat_destroy(tids);
1689 316 : if (oupdates != updates)
1690 41 : bat_destroy(updates);
1691 : return res;
1692 : }
1693 :
1694 : static int
1695 3009 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
1696 : {
1697 3009 : return cs_update_bat(tr, bat, t, tids, updates, is_new);
1698 : }
1699 :
1700 : static void *
1701 4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
1702 : {
1703 4 : void *newoffsets = NULL;
1704 4 : sql_delta *bat = *batp;
1705 4 : column_storage *cs = &bat->cs;
1706 4 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1707 :
1708 4 : if (!u)
1709 : return NULL;
1710 4 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1711 4 : if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
1712 0 : bat_destroy(u);
1713 0 : return NULL;
1714 : } else {
1715 4 : int new = 0;
1716 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1717 4 : if (BATcount(u) >= max_cnt) {
1718 0 : if (max_cnt == 64*1024) { /* decompress */
1719 0 : if (!(b = temp_descriptor(cs->bid))) {
1720 0 : bat_destroy(u);
1721 0 : return NULL;
1722 : }
1723 0 : n = DICTdecompress_(b, u, PERSISTENT);
1724 : /* TODO decompress updates if any */
1725 0 : bat_destroy(b);
1726 0 : assert(newoffsets == NULL);
1727 0 : if (!n) {
1728 0 : bat_destroy(u);
1729 0 : return NULL;
1730 : }
1731 0 : if (cs->ts != tr->tid) {
1732 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1733 0 : bat_destroy(n);
1734 0 : bat_destroy(u);
1735 0 : return NULL;
1736 : }
1737 0 : cs = &(*batp)->cs;
1738 0 : new = 1;
1739 0 : cs->uibid = cs->uvbid = 0;
1740 : }
1741 0 : if (cs->bid && !new)
1742 0 : temp_destroy(cs->bid);
1743 0 : n = transfer_to_systrans(n);
1744 0 : if (n == NULL) {
1745 0 : bat_destroy(u);
1746 0 : return NULL;
1747 : }
1748 0 : bat_set_access(n, BAT_READ);
1749 0 : cs->bid = temp_create(n);
1750 0 : bat_destroy(n);
1751 0 : if (cs->ebid && !new)
1752 0 : temp_destroy(cs->ebid);
1753 0 : cs->ebid = 0;
1754 0 : cs->st = ST_DEFAULT;
1755 : /* at append_col the column's storage type is cleared */
1756 0 : cs->cleared = true;
1757 : } else {
1758 0 : if (!(b = temp_descriptor(cs->bid))) {
1759 0 : GDKfree(newoffsets);
1760 0 : bat_destroy(u);
1761 0 : return NULL;
1762 : }
1763 0 : n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, PERSISTENT);
1764 0 : bat_destroy(b);
1765 0 : if (!n) {
1766 0 : GDKfree(newoffsets);
1767 0 : bat_destroy(u);
1768 0 : return NULL;
1769 : }
1770 0 : if (cs->ts != tr->tid) {
1771 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1772 0 : bat_destroy(u);
1773 0 : bat_destroy(n);
1774 0 : return NULL;
1775 : }
1776 0 : cs = &(*batp)->cs;
1777 0 : new = 1;
1778 0 : temp_dup(cs->ebid);
1779 0 : if (cs->uibid) {
1780 0 : temp_dup(cs->uibid);
1781 0 : temp_dup(cs->uvbid);
1782 : }
1783 : }
1784 0 : if (cs->bid)
1785 0 : temp_destroy(cs->bid);
1786 0 : n = transfer_to_systrans(n);
1787 0 : if (n == NULL) {
1788 0 : bat_destroy(u);
1789 0 : return NULL;
1790 : }
1791 0 : bat_set_access(n, BAT_READ);
1792 0 : cs->bid = temp_create(n);
1793 0 : bat_destroy(n);
1794 0 : cs->cleared = true;
1795 0 : i = newoffsets;
1796 : }
1797 : } else { /* append */
1798 4 : i = newoffsets;
1799 : }
1800 : }
1801 4 : bat_destroy(u);
1802 4 : return i;
1803 : }
1804 :
1805 : static void *
1806 1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
1807 : {
1808 1 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1809 1 : void *newoffsets = NULL;
1810 1 : BAT *b = NULL, *n = NULL;
1811 :
1812 1 : if (!(b = temp_descriptor(cs->bid)))
1813 : return NULL;
1814 :
1815 1 : if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
1816 0 : bat_destroy(b);
1817 0 : return NULL;
1818 : } else {
1819 : /* returns new offset bat if values within min/max, else decompress */
1820 1 : if (!newoffsets) {
1821 1 : n = FORdecompress_(b, offsetval, tt, PERSISTENT);
1822 1 : bat_destroy(b);
1823 1 : if (!n)
1824 : return NULL;
1825 : /* TODO decompress updates if any */
1826 1 : if (cs->bid)
1827 1 : temp_destroy(cs->bid);
1828 1 : n = transfer_to_systrans(n);
1829 1 : if (n == NULL)
1830 : return NULL;
1831 1 : bat_set_access(n, BAT_READ);
1832 1 : cs->bid = temp_create(n);
1833 1 : cs->st = ST_DEFAULT;
1834 : /* at append_col the column's storage type is cleared */
1835 1 : cs->cleared = true;
1836 1 : b = n;
1837 : } else { /* append */
1838 : i = newoffsets;
1839 : }
1840 : }
1841 1 : bat_destroy(b);
1842 1 : return i;
1843 : }
1844 :
1845 : static int
1846 6346 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
1847 : {
1848 6346 : void *oupd = upd;
1849 6346 : sql_delta *bat = *batp;
1850 6346 : column_storage *cs = &bat->cs;
1851 6346 : storage *s = ATOMIC_PTR_GET(&t->data);
1852 6346 : assert(!is_oid_nil(rid));
1853 6346 : int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
1854 :
1855 6346 : if (cs->st == ST_DICT) {
1856 : /* possibly a new array is returned */
1857 0 : upd = dict_append_val(tr, batp, upd, 1);
1858 0 : bat = *batp;
1859 0 : cs = &bat->cs;
1860 0 : if (!upd)
1861 : return LOG_ERR;
1862 : }
1863 :
1864 : /* check if rid is insert ? */
1865 6346 : if (!inplace) {
1866 : /* check conflict */
1867 3782 : if (segments_is_deleted(s->segs->h, tr, rid)) {
1868 0 : if (oupd != upd)
1869 0 : GDKfree(upd);
1870 0 : return LOG_CONFLICT;
1871 : }
1872 3782 : BAT *ui, *uv;
1873 :
1874 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1875 : /* currently only one update delta is possible */
1876 3782 : if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1877 0 : if (oupd != upd)
1878 0 : GDKfree(upd);
1879 0 : return LOG_ERR;
1880 : }
1881 :
1882 3782 : assert(uv->ttype);
1883 3782 : assert(BATcount(ui) == BATcount(uv));
1884 7564 : if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
1885 3782 : BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
1886 0 : if (oupd != upd)
1887 0 : GDKfree(upd);
1888 0 : bat_destroy(ui);
1889 0 : bat_destroy(uv);
1890 0 : return LOG_ERR;
1891 : }
1892 3782 : assert(BATcount(ui) == BATcount(uv));
1893 3782 : bat_destroy(ui);
1894 3782 : bat_destroy(uv);
1895 3782 : cs->ucnt++;
1896 : } else {
1897 2564 : BAT *b = NULL;
1898 :
1899 2564 : if((b = temp_descriptor(cs->bid)) == NULL) {
1900 0 : if (oupd != upd)
1901 0 : GDKfree(upd);
1902 0 : return LOG_ERR;
1903 : }
1904 2564 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
1905 0 : if (oupd != upd)
1906 0 : GDKfree(upd);
1907 0 : bat_destroy(b);
1908 0 : return LOG_ERR;
1909 : }
1910 2564 : bat_destroy(b);
1911 : }
1912 6346 : if (oupd != upd)
1913 0 : GDKfree(upd);
1914 : return LOG_OK;
1915 : }
1916 :
1917 : static int
1918 6346 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
1919 : {
1920 6346 : int res = LOG_OK;
1921 6346 : lock_table(tr->store, t->base.id);
1922 6346 : res = cs_update_val(tr, bat, t, rid, upd, is_new);
1923 6346 : unlock_table(tr->store, t->base.id);
1924 6346 : return res;
1925 : }
1926 :
1927 : static int
1928 159070 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
1929 : {
1930 159070 : (void)tr;
1931 159070 : if (!ocs)
1932 : return LOG_OK;
1933 159070 : cs->bid = ocs->bid;
1934 159070 : cs->ebid = ocs->ebid;
1935 159070 : cs->uibid = ocs->uibid;
1936 159070 : cs->uvbid = ocs->uvbid;
1937 159070 : cs->ucnt = ocs->ucnt;
1938 :
1939 159070 : if (temp) {
1940 26017 : cs->bid = temp_copy(cs->bid, true, false);
1941 25991 : if (cs->bid == BID_NIL)
1942 : return LOG_ERR;
1943 : } else {
1944 133053 : temp_dup(cs->bid);
1945 : }
1946 159051 : if (cs->ebid)
1947 6 : temp_dup(cs->ebid);
1948 159051 : cs->ucnt = 0;
1949 159051 : cs->uibid = e_bat(TYPE_oid);
1950 159101 : cs->uvbid = e_bat(type);
1951 159103 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1952 : return LOG_ERR;
1953 159103 : cs->st = ocs->st;
1954 159103 : return LOG_OK;
1955 : }
1956 :
1957 : static void
1958 308588 : destroy_delta(sql_delta *b, bool recursive)
1959 : {
1960 308588 : if (ATOMIC_DEC(&b->cs.refcnt) > 0)
1961 : return;
1962 290052 : if (recursive && b->next)
1963 29848 : destroy_delta(b->next, true);
1964 290052 : if (b->cs.uibid)
1965 94695 : temp_destroy(b->cs.uibid);
1966 290052 : if (b->cs.uvbid)
1967 94695 : temp_destroy(b->cs.uvbid);
1968 290052 : if (b->cs.bid)
1969 290052 : temp_destroy(b->cs.bid);
1970 290052 : if (b->cs.ebid)
1971 61 : temp_destroy(b->cs.ebid);
1972 290052 : b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
1973 290052 : _DELETE(b);
1974 : }
1975 :
1976 : static sql_delta *
1977 15297642 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
1978 : {
1979 15297642 : sql_delta *obat = ATOMIC_PTR_GET(&c->data);
1980 :
1981 15297642 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
1982 15164642 : return obat;
1983 133000 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
1984 : /* abort */
1985 12 : if (update_conflict)
1986 4 : *update_conflict = true;
1987 8 : else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
1988 8 : return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
1989 4 : return NULL;
1990 : }
1991 132988 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
1992 : return NULL;
1993 132991 : sql_delta* bat = ZNEW(sql_delta);
1994 133036 : if (!bat)
1995 : return NULL;
1996 133036 : ATOMIC_INIT(&bat->cs.refcnt, 1);
1997 133036 : if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
1998 0 : destroy_delta(bat, false);
1999 0 : return NULL;
2000 : }
2001 133049 : bat->cs.ts = tr->tid;
2002 : /* only one writer else abort */
2003 133049 : bat->next = obat;
2004 133049 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
2005 0 : bat->next = NULL;
2006 0 : destroy_delta(bat, false);
2007 0 : if (update_conflict)
2008 0 : *update_conflict = true;
2009 0 : return NULL;
2010 : }
2011 : return bat;
2012 : }
2013 :
2014 : static int
2015 9355 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
2016 : {
2017 9355 : int ok = LOG_OK;
2018 :
2019 9355 : if (is_bat) {
2020 3009 : BAT *tids = incoming_tids;
2021 3009 : BAT *values = incoming_values;
2022 3009 : if (BATcount(tids) == 0)
2023 : return LOG_OK;
2024 3009 : ok = delta_update_bat(tr, delta, table, tids, values, is_new);
2025 : } else {
2026 6346 : ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
2027 : }
2028 : return ok;
2029 : }
2030 :
2031 : static int
2032 9597 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
2033 : {
2034 9597 : int res = LOG_OK;
2035 9597 : bool update_conflict = false;
2036 9597 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2037 :
2038 9597 : if (isbat) {
2039 3248 : BAT *t = tids;
2040 3248 : if (!BATcount(t))
2041 : return LOG_OK;
2042 : }
2043 :
2044 9082 : if (c == NULL)
2045 : return LOG_ERR;
2046 :
2047 9082 : if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
2048 4 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2049 :
2050 9078 : assert(delta && delta->cs.ts == tr->tid);
2051 9078 : assert(c->t->persistence != SQL_DECLARED_TABLE);
2052 9078 : if (odelta != delta)
2053 3356 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
2054 :
2055 9078 : odelta = delta;
2056 9078 : if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
2057 : return res;
2058 9078 : assert(delta == odelta);
2059 9078 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2060 0 : res = sql_trans_alter_storage(tr, c, NULL);
2061 : return res;
2062 : }
2063 :
2064 : static sql_delta *
2065 2446 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
2066 : {
2067 2446 : sql_delta *obat = ATOMIC_PTR_GET(&i->data);
2068 :
2069 2446 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
2070 2418 : return obat;
2071 28 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
2072 : /* abort */
2073 0 : if (update_conflict)
2074 0 : *update_conflict = true;
2075 0 : return NULL;
2076 : }
2077 28 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
2078 : return NULL;
2079 28 : sql_delta* bat = ZNEW(sql_delta);
2080 28 : if (!bat)
2081 : return NULL;
2082 28 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2083 33 : if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
2084 0 : destroy_delta(bat, false);
2085 0 : return NULL;
2086 : }
2087 28 : bat->cs.ts = tr->tid;
2088 : /* only one writer else abort */
2089 28 : bat->next = obat;
2090 28 : if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
2091 0 : bat->next = NULL;
2092 0 : destroy_delta(bat, false);
2093 0 : if (update_conflict)
2094 0 : *update_conflict = true;
2095 0 : return NULL;
2096 : }
2097 : return bat;
2098 : }
2099 :
2100 : static int
2101 776 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
2102 : {
2103 776 : int res = LOG_OK;
2104 776 : bool update_conflict = false;
2105 776 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
2106 :
2107 776 : if (isbat) {
2108 776 : BAT *t = tids;
2109 776 : if (!BATcount(t))
2110 : return LOG_OK;
2111 : }
2112 :
2113 277 : if (i == NULL)
2114 : return LOG_ERR;
2115 :
2116 277 : if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
2117 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2118 :
2119 277 : assert(delta && delta->cs.ts == tr->tid);
2120 277 : if (odelta != delta)
2121 22 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
2122 :
2123 277 : odelta = delta;
2124 277 : res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
2125 277 : assert(delta == odelta);
2126 : return res;
2127 : }
2128 :
2129 : static int
2130 148081 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type, bool istemp)
2131 : {
2132 148081 : BAT *b, *oi = i;
2133 148081 : int err = 0;
2134 148081 : sql_delta *bat = *batp;
2135 :
2136 148081 : assert(!offsets || BATcount(offsets) == BATcount(i));
2137 148081 : if (!BATcount(i))
2138 : return LOG_OK;
2139 148081 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
2140 : return LOG_ERR;
2141 :
2142 148081 : lock_column(tr->store, id);
2143 149107 : if (bat->cs.st == ST_DICT) {
2144 13 : BAT *ni = dict_append_bat(tr, batp, oi);
2145 13 : bat = *batp;
2146 13 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2147 0 : bat_destroy(oi);
2148 13 : oi = ni;
2149 13 : if (!oi) {
2150 0 : unlock_column(tr->store, id);
2151 0 : return LOG_ERR;
2152 : }
2153 : }
2154 149107 : if (bat->cs.st == ST_FOR) {
2155 0 : BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
2156 0 : bat = *batp;
2157 0 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2158 0 : bat_destroy(oi);
2159 0 : oi = ni;
2160 0 : if (!oi) {
2161 0 : unlock_column(tr->store, id);
2162 0 : return LOG_ERR;
2163 : }
2164 : }
2165 :
2166 149107 : b = temp_descriptor(bat->cs.bid);
2167 149069 : if (b == NULL) {
2168 0 : unlock_column(tr->store, id);
2169 0 : if (oi != i)
2170 0 : bat_destroy(oi);
2171 0 : return LOG_ERR;
2172 : }
2173 149069 : if (istemp && !offsets && offset == 0 && BATcount(b) == 0 && bat->cs.ucnt == 0) {
2174 14 : bat_set_access(i, BAT_READ);
2175 14 : if (bat->cs.bid)
2176 14 : temp_destroy(bat->cs.bid);
2177 14 : i = transfer_to_systrans(i);
2178 14 : bat->cs.bid = temp_create(i);
2179 149055 : } else if (!offsets && offset == b->hseqbase+BATcount(b)) {
2180 148867 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2181 263 : err = 1;
2182 176 : } else if (!offsets) {
2183 176 : if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
2184 263 : err = 1;
2185 12 : } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
2186 0 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2187 263 : err = 1;
2188 12 : } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
2189 0 : err = 1;
2190 : }
2191 148810 : bat_destroy(b);
2192 148529 : unlock_column(tr->store, id);
2193 :
2194 149659 : if (oi != i)
2195 13 : bat_destroy(oi);
2196 149659 : return (err)?LOG_ERR:LOG_OK;
2197 : }
2198 :
2199 : // Look at the offsets and find where the replacements end and the appends begin.
2200 : static BUN
2201 0 : start_of_appends(BAT *offsets, BUN bcnt)
2202 : {
2203 0 : BUN ocnt = BATcount(offsets);
2204 0 : if (ocnt == 0)
2205 : return 0;
2206 :
2207 0 : BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
2208 0 : if (highest < bcnt)
2209 : // all are replacements
2210 : return ocnt;
2211 :
2212 : // reason backward to find the first append.
2213 : // Suppose offsets has 15 entries, bcnt == 100
2214 : // and the highest offset in offsets is 109.
2215 0 : BUN new_bcnt = highest + 1; // 110
2216 0 : BUN nappends = new_bcnt - bcnt; // 10
2217 0 : BUN nreplacements = ocnt - nappends; // 5
2218 :
2219 : // The first append should be to position bcnt
2220 0 : assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
2221 :
2222 : return nreplacements;
2223 : }
2224 :
2225 :
2226 : static int
2227 14999524 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
2228 : {
2229 14999524 : void *oi = i;
2230 14999524 : BAT *b;
2231 14999524 : lock_column(tr->store, id);
2232 15027331 : sql_delta *bat = *batp;
2233 :
2234 15027331 : if (bat->cs.st == ST_DICT) {
2235 : /* possibly a new array is returned */
2236 4 : i = dict_append_val(tr, batp, i, cnt);
2237 4 : bat = *batp;
2238 4 : if (!i) {
2239 0 : unlock_column(tr->store, id);
2240 0 : return LOG_ERR;
2241 : }
2242 : }
2243 15027331 : if (bat->cs.st == ST_FOR) {
2244 : /* possibly a new array is returned */
2245 1 : i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
2246 1 : bat = *batp;
2247 1 : if (!i) {
2248 0 : unlock_column(tr->store, id);
2249 0 : return LOG_ERR;
2250 : }
2251 : }
2252 :
2253 15027331 : b = temp_descriptor(bat->cs.bid);
2254 15026375 : if (b == NULL) {
2255 0 : if (i != oi)
2256 0 : GDKfree(i);
2257 0 : unlock_column(tr->store, id);
2258 0 : return LOG_ERR;
2259 : }
2260 15026375 : BUN bcnt = BATcount(b);
2261 :
2262 15026375 : if (offsets) {
2263 : // The first few might be replacements while later items might be appends.
2264 : // Handle the replacements here while leaving the appends to the code below.
2265 0 : BUN nreplacements = start_of_appends(offsets, bcnt);
2266 :
2267 0 : oid *start = Tloc(offsets, 0);
2268 0 : if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
2269 0 : bat_destroy(b);
2270 0 : if (i != oi)
2271 0 : GDKfree(i);
2272 0 : unlock_column(tr->store, id);
2273 0 : return LOG_ERR;
2274 : }
2275 :
2276 : // Replacements have been handled. The rest are appends.
2277 0 : assert(offset == oid_nil);
2278 0 : offset = bcnt;
2279 0 : cnt -= nreplacements;
2280 : }
2281 :
2282 15026375 : if (bcnt > offset){
2283 530672 : size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
2284 530672 : if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
2285 0 : bat_destroy(b);
2286 0 : if (i != oi)
2287 0 : GDKfree(i);
2288 0 : unlock_column(tr->store, id);
2289 0 : return LOG_ERR;
2290 : }
2291 530810 : cnt -= ccnt;
2292 530810 : offset += ccnt;
2293 : }
2294 15026513 : if (cnt) {
2295 14495337 : if (BATcount(b) < offset) { /* add space */
2296 6949 : BUN d = offset - BATcount(b);
2297 6949 : if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
2298 0 : bat_destroy(b);
2299 0 : if (i != oi)
2300 0 : GDKfree(i);
2301 0 : unlock_column(tr->store, id);
2302 0 : return LOG_ERR;
2303 : }
2304 : }
2305 14495334 : if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
2306 0 : bat_destroy(b);
2307 0 : if (i != oi)
2308 0 : GDKfree(i);
2309 0 : unlock_column(tr->store, id);
2310 0 : return LOG_ERR;
2311 : }
2312 : }
2313 15020177 : bat_destroy(b);
2314 15013307 : if (i != oi)
2315 4 : GDKfree(i);
2316 15013307 : unlock_column(tr->store, id);
2317 15013307 : return LOG_OK;
2318 : }
2319 :
2320 : static int
2321 26024 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
2322 : {
2323 26024 : if (!(bat->segs = new_segments(tr, 0)))
2324 : return LOG_ERR;
2325 26019 : return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
2326 : }
2327 :
2328 : static int
2329 15146948 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type, bool isnew)
2330 : {
2331 15146948 : int ok = LOG_OK;
2332 :
2333 15146948 : if ((*delta)->cs.merged)
2334 34984 : (*delta)->cs.merged = false; /* TODO needs to move */
2335 15146948 : if (isbat) {
2336 147831 : BAT *bat = incoming_data;
2337 :
2338 147831 : if (BATcount(bat))
2339 148023 : ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type, isnew);
2340 : } else {
2341 14999117 : ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
2342 : }
2343 15178790 : return ok;
2344 : }
2345 :
2346 : static int
2347 15159551 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2348 : {
2349 15159551 : int res = LOG_OK;
2350 15159551 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2351 :
2352 15159551 : if (isbat) {
2353 148247 : BAT *t = data;
2354 148247 : if (!BATcount(t))
2355 : return LOG_OK;
2356 : }
2357 :
2358 15158835 : if ((delta = bind_col_data(tr, c, NULL)) == NULL)
2359 : return LOG_ERR;
2360 :
2361 15158986 : assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
2362 :
2363 15158986 : odelta = delta;
2364 15158986 : if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type, isTempTable(c->t))) != LOG_OK)
2365 : return res;
2366 15175812 : if (odelta != delta) {
2367 0 : delta->next = odelta;
2368 0 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
2369 0 : delta->next = NULL;
2370 0 : destroy_delta(delta, false);
2371 0 : return LOG_CONFLICT;
2372 : }
2373 : }
2374 15175812 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2375 1 : res = sql_trans_alter_storage(tr, c, NULL);
2376 : return res;
2377 : }
2378 :
2379 : static int
2380 2175 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2381 : {
2382 2175 : int res = LOG_OK;
2383 2175 : sql_delta *delta;
2384 :
2385 2175 : if (isbat) {
2386 1003 : BAT *t = data;
2387 1003 : if (!BATcount(t))
2388 : return LOG_OK;
2389 : }
2390 :
2391 2163 : if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
2392 : return LOG_ERR;
2393 :
2394 2163 : assert(delta->cs.st == ST_DEFAULT);
2395 :
2396 2163 : res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL, isTempTable(i->t));
2397 2163 : return res;
2398 : }
2399 :
2400 : static int
2401 73428 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
2402 : {
2403 73428 : int err = 0;
2404 :
2405 : /* TODO check for conflicting updates */
2406 73428 : (void)rid;
2407 73428 : (void)cnt;
2408 542838 : for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
2409 469410 : sql_column *c = n->data;
2410 469410 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
2411 :
2412 : /* check for active updates */
2413 469410 : if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
2414 : return 1;
2415 : }
2416 : return 0;
2417 : }
2418 :
2419 : static int
2420 70460 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
2421 : {
2422 70460 : int in_transaction = segments_in_transaction(tr, t);
2423 :
2424 70460 : lock_table(tr->store, t->base.id);
2425 : /* find segment of rid, split, mark new segment deleted (for tr->tid) */
2426 70460 : segment *seg = s->segs->h, *p = NULL;
2427 51769916 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2428 51769916 : if (seg->start <= rid && seg->end > rid) {
2429 70460 : if (!SEG_VALID_4_DELETE(seg,tr)) {
2430 4 : unlock_table(tr->store, t->base.id);
2431 4 : return LOG_CONFLICT;
2432 : }
2433 70456 : if (deletes_conflict_updates( tr, t, rid, 1)) {
2434 0 : unlock_table(tr->store, t->base.id);
2435 0 : return LOG_CONFLICT;
2436 : }
2437 70456 : if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
2438 0 : unlock_table(tr->store, t->base.id);
2439 0 : return LOG_ERR;
2440 : }
2441 : break;
2442 : }
2443 : }
2444 70456 : unlock_table(tr->store, t->base.id);
2445 70456 : if (!in_transaction)
2446 11921 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2447 : return LOG_OK;
2448 : }
2449 :
2450 : static int
2451 2970 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
2452 : {
2453 2970 : segment *seg = *Seg, *p = NULL;
2454 9713 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2455 9711 : if (seg->start <= start && seg->end > start) {
2456 3020 : size_t lcnt = cnt;
2457 3020 : if (start+lcnt > seg->end)
2458 54 : lcnt = seg->end-start;
2459 3020 : if (SEG_IS_DELETED(seg, tr)) {
2460 47 : start += lcnt;
2461 47 : cnt -= lcnt;
2462 47 : continue;
2463 2973 : } else if (!SEG_VALID_4_DELETE(seg, tr))
2464 1 : return LOG_CONFLICT;
2465 2972 : if (deletes_conflict_updates( tr, t, start, lcnt))
2466 : return LOG_CONFLICT;
2467 2972 : *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
2468 2972 : if (!seg)
2469 : return LOG_ERR;
2470 2972 : start += lcnt;
2471 2972 : cnt -= lcnt;
2472 : }
2473 9663 : if (start+cnt <= seg->end)
2474 : break;
2475 : }
2476 : return LOG_OK;
2477 : }
2478 :
2479 : static int
2480 604 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
2481 : {
2482 604 : segment *seg = s->segs->h;
2483 604 : return seg_delete_range(tr, t, s, &seg, start, cnt);
2484 : }
2485 :
2486 : static int
2487 297 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
2488 : {
2489 297 : int in_transaction = segments_in_transaction(tr, t);
2490 297 : BAT *oi = i; /* update ids */
2491 297 : int ok = LOG_OK;
2492 :
2493 297 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
2494 : return LOG_ERR;
2495 297 : if (BATcount(i)) {
2496 522 : if (BATtdense(i)) {
2497 225 : size_t start = i->tseqbase;
2498 225 : size_t cnt = BATcount(i);
2499 :
2500 225 : lock_table(tr->store, t->base.id);
2501 225 : ok = delete_range(tr, t, s, start, cnt);
2502 225 : unlock_table(tr->store, t->base.id);
2503 72 : } else if (complex_cand(i)) {
2504 0 : struct canditer ci;
2505 0 : oid f = 0, l = 0, cur = 0;
2506 :
2507 0 : canditer_init(&ci, NULL, i);
2508 0 : cur = f = canditer_next(&ci);
2509 :
2510 0 : lock_table(tr->store, t->base.id);
2511 0 : if (!is_oid_nil(f)) {
2512 0 : segment *seg = s->segs->h;
2513 0 : for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
2514 0 : if (cur+1 == l) {
2515 0 : cur++;
2516 0 : continue;
2517 : }
2518 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2519 0 : f = cur = l;
2520 : }
2521 0 : if (ok == LOG_OK)
2522 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2523 : }
2524 0 : unlock_table(tr->store, t->base.id);
2525 : } else {
2526 72 : if (!i->tsorted) {
2527 0 : assert(oi == i);
2528 0 : BAT *ni = NULL;
2529 0 : if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
2530 0 : ok = LOG_ERR;
2531 0 : if (ni)
2532 0 : i = ni;
2533 : }
2534 72 : assert(i->tsorted);
2535 72 : BUN icnt = BATcount(i);
2536 72 : BATiter ii = bat_iterator(i);
2537 72 : oid *o = ii.base, n = o[0]+1;
2538 72 : size_t lcnt = 1;
2539 :
2540 72 : lock_table(tr->store, t->base.id);
2541 72 : segment *seg = s->segs->h;
2542 24025 : for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
2543 23953 : if (o[i] == n) {
2544 23610 : lcnt++;
2545 23610 : n++;
2546 : } else {
2547 343 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2548 343 : lcnt = 0;
2549 : }
2550 23953 : if (!lcnt) {
2551 343 : n = o[i]+1;
2552 343 : lcnt = 1;
2553 : }
2554 : }
2555 72 : bat_iterator_end(&ii);
2556 72 : if (lcnt && ok == LOG_OK)
2557 72 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2558 72 : unlock_table(tr->store, t->base.id);
2559 : }
2560 : }
2561 297 : if (i != oi)
2562 25 : bat_destroy(i);
2563 : // assert
2564 297 : if (!in_transaction)
2565 267 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2566 : return ok;
2567 : }
2568 :
2569 : static void
2570 50775 : destroy_segments(segments *s)
2571 : {
2572 50775 : if (!s || sql_ref_dec(&s->r) > 0)
2573 0 : return;
2574 50775 : segment *seg = s->h;
2575 109931 : while(seg) {
2576 59156 : segment *n = ATOMIC_PTR_GET(&seg->next);
2577 59156 : ATOMIC_PTR_DESTROY(&seg->next);
2578 59156 : _DELETE(seg);
2579 59156 : seg = n;
2580 : }
2581 50775 : _DELETE(s);
2582 : }
2583 :
2584 : static void
2585 52115 : destroy_storage(storage *bat)
2586 : {
2587 52115 : if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
2588 : return;
2589 50635 : if (bat->next)
2590 6061 : destroy_storage(bat->next);
2591 50635 : destroy_segments(bat->segs);
2592 50635 : if (bat->cs.uibid)
2593 30809 : temp_destroy(bat->cs.uibid);
2594 50635 : if (bat->cs.uvbid)
2595 30809 : temp_destroy(bat->cs.uvbid);
2596 50635 : if (bat->cs.bid)
2597 50635 : temp_destroy(bat->cs.bid);
2598 50635 : bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
2599 50635 : _DELETE(bat);
2600 : }
2601 :
2602 : static int
2603 158913 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
2604 : {
2605 158913 : if (uncommitted) {
2606 426114 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2607 281747 : if (!VALID_4_READ(s->ts,tr))
2608 : return 1;
2609 : } else {
2610 119803 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2611 106224 : if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
2612 : return 1;
2613 : }
2614 :
2615 : return 0;
2616 : }
2617 :
2618 : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
2619 :
2620 : storage *
2621 2133016 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
2622 : {
2623 2133016 : storage *obat;
2624 :
2625 2133016 : obat = ATOMIC_PTR_GET(&t->data);
2626 :
2627 2133016 : if (obat->cs.ts != tr->tid)
2628 1472121 : if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
2629 1472066 : if (obat->cs.ts >= TRANSACTION_ID_BASE) {
2630 : /* abort */
2631 15168 : if (clear)
2632 15168 : *clear = true;
2633 15168 : return NULL;
2634 : }
2635 :
2636 2117848 : if (!clear)
2637 : return obat;
2638 :
2639 : /* remainder is only to handle clear */
2640 26598 : if (segments_conflict(tr, obat->segs, 1)) {
2641 577 : *clear = true;
2642 577 : return NULL;
2643 : }
2644 26023 : if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
2645 : return NULL;
2646 26025 : storage *bat = ZNEW(storage);
2647 26025 : if (!bat)
2648 : return NULL;
2649 26025 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2650 26025 : if (dup_storage(tr, obat, bat) != LOG_OK) {
2651 0 : destroy_storage(bat);
2652 0 : return NULL;
2653 : }
2654 26026 : bat->cs.cleared = true;
2655 26026 : bat->cs.ts = tr->tid;
2656 : /* only one writer else abort */
2657 26026 : bat->next = obat;
2658 26026 : if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
2659 17 : bat->next = NULL;
2660 17 : destroy_storage(bat);
2661 17 : if (clear)
2662 17 : *clear = true;
2663 17 : return NULL;
2664 : }
2665 : return bat;
2666 : }
2667 :
2668 : static int
2669 70796 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
2670 : {
2671 70796 : int ok = LOG_OK;
2672 70796 : BAT *b = ib;
2673 70796 : storage *bat;
2674 :
2675 70796 : if (isbat && !BATcount(b))
2676 : return ok;
2677 :
2678 70757 : if (t == NULL)
2679 : return LOG_ERR;
2680 :
2681 70757 : if ((bat = bind_del_data(tr, t, NULL)) == NULL)
2682 : return LOG_ERR;
2683 :
2684 70757 : if (isbat)
2685 297 : ok = storage_delete_bat(tr, t, bat, ib);
2686 : else
2687 70460 : ok = storage_delete_val(tr, t, bat, *(oid*)ib);
2688 : return ok;
2689 : }
2690 :
2691 : static size_t
2692 0 : dcount_col(sql_trans *tr, sql_column *c)
2693 : {
2694 0 : sql_delta *b;
2695 :
2696 0 : if (!isTable(c->t))
2697 : return 0;
2698 0 : b = col_timestamp_delta(tr, c);
2699 0 : if (!b)
2700 : return 1;
2701 :
2702 0 : storage *s = ATOMIC_PTR_GET(&c->t->data);
2703 0 : if (!s || !s->segs->t)
2704 : return 1;
2705 0 : size_t cnt = s->segs->t->end;
2706 0 : if (cnt) {
2707 0 : BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
2708 0 : size_t dcnt = 0;
2709 :
2710 0 : if (v)
2711 0 : dcnt = BATguess_uniques(v, NULL);
2712 0 : return dcnt;
2713 : }
2714 : return cnt;
2715 : }
2716 :
2717 : static BAT *
2718 3674216 : bind_no_view(BAT *b, bool quick)
2719 : {
2720 3674216 : if (isVIEW(b)) { /* If it is a view get the parent BAT */
2721 3672287 : BAT *nb = BBP_desc(VIEWtparent(b));
2722 3672287 : bat_destroy(b);
2723 3672916 : if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
2724 : return NULL;
2725 : }
2726 : return b;
2727 : }
2728 :
2729 : static int
2730 0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
2731 : {
2732 0 : int ok = 0;
2733 0 : assert(tr->active);
2734 0 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2735 0 : return 0;
2736 0 : lock_column(tr->store, c->base.id);
2737 0 : if (unique_est) {
2738 0 : sql_delta *d;
2739 0 : if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
2740 0 : BAT *b;
2741 0 : if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
2742 0 : MT_lock_set(&b->theaplock);
2743 0 : b->tunique_est = *unique_est;
2744 0 : MT_lock_unset(&b->theaplock);
2745 0 : bat_destroy(b);
2746 : }
2747 : }
2748 : }
2749 0 : if (min) {
2750 0 : _DELETE(c->min);
2751 0 : size_t minlen = ATOMlen(c->type.type->localtype, min);
2752 0 : if ((c->min = GDKmalloc(minlen)) != NULL) {
2753 0 : memcpy(c->min, min, minlen);
2754 0 : ok = 1;
2755 : }
2756 : }
2757 0 : if (max) {
2758 0 : _DELETE(c->max);
2759 0 : size_t maxlen = ATOMlen(c->type.type->localtype, max);
2760 0 : if ((c->max = GDKmalloc(maxlen)) != NULL) {
2761 0 : memcpy(c->max, max, maxlen);
2762 0 : ok = 1;
2763 : }
2764 : }
2765 0 : unlock_column(tr->store, c->base.id);
2766 0 : return ok;
2767 : }
2768 :
2769 : static int
2770 9 : min_max_col(sql_trans *tr, sql_column *c)
2771 : {
2772 9 : int ok = 0;
2773 9 : BAT *b = NULL;
2774 9 : sql_delta *d = NULL;
2775 :
2776 9 : assert(tr->active);
2777 9 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2778 0 : return 0;
2779 9 : if (c->min && c->max)
2780 : return 1;
2781 9 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2782 9 : if (d->cs.st == ST_FOR)
2783 : return 0;
2784 9 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2785 9 : lock_column(tr->store, c->base.id);
2786 9 : if (c->min && c->max) {
2787 0 : unlock_column(tr->store, c->base.id);
2788 0 : return 1;
2789 : }
2790 9 : _DELETE(c->min);
2791 9 : _DELETE(c->max);
2792 9 : if ((b = bind_col(tr, c, access))) {
2793 9 : if (!(b = bind_no_view(b, false))) {
2794 0 : unlock_column(tr->store, c->base.id);
2795 0 : return 0;
2796 : }
2797 9 : BATiter bi = bat_iterator(b);
2798 9 : if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
2799 9 : const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
2800 9 : size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
2801 :
2802 9 : if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
2803 0 : _DELETE(c->min);
2804 0 : _DELETE(c->max);
2805 : } else {
2806 9 : memcpy(c->min, nmin, minlen);
2807 9 : memcpy(c->max, nmax, maxlen);
2808 9 : ok = 1;
2809 : }
2810 : }
2811 9 : bat_iterator_end(&bi);
2812 9 : bat_destroy(b);
2813 : }
2814 9 : unlock_column(tr->store, c->base.id);
2815 : }
2816 : return ok;
2817 : }
2818 :
2819 : static size_t
2820 17 : count_segs(segment *s)
2821 : {
2822 17 : size_t nr = 0;
2823 :
2824 72 : for( ; s; s = ATOMIC_PTR_GET(&s->next))
2825 55 : nr++;
2826 17 : return nr;
2827 : }
2828 :
2829 : static size_t
2830 34 : count_del(sql_trans *tr, sql_table *t, int access)
2831 : {
2832 34 : storage *d;
2833 :
2834 34 : if (!isTable(t))
2835 : return 0;
2836 34 : d = tab_timestamp_storage(tr, t);
2837 34 : if (!d)
2838 : return 0;
2839 34 : if (access == 2)
2840 0 : return d->cs.ucnt;
2841 34 : if (access == 1)
2842 0 : return count_inserts(d->segs->h, tr);
2843 34 : if (access == 10) /* special case for counting the number of segments */
2844 17 : return count_segs(d->segs->h);
2845 17 : return count_deletes(d->segs->h, tr);
2846 : }
2847 :
2848 : static int
2849 26283 : sorted_col(sql_trans *tr, sql_column *col)
2850 : {
2851 26283 : int sorted = 0;
2852 :
2853 26283 : assert(tr->active);
2854 26283 : if (!isTable(col->t) || !col->t->s)
2855 : return 0;
2856 :
2857 26283 : if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
2858 26264 : BAT *b = bind_col(tr, col, QUICK);
2859 :
2860 26264 : if (b)
2861 26264 : sorted = b->tsorted || b->trevsorted;
2862 : }
2863 : return sorted;
2864 : }
2865 :
2866 : static int
2867 7230 : unique_col(sql_trans *tr, sql_column *col)
2868 : {
2869 7230 : int distinct = 0;
2870 :
2871 7230 : assert(tr->active);
2872 7230 : if (!isTable(col->t) || !col->t->s)
2873 : return 0;
2874 :
2875 7230 : if (col && ATOMIC_PTR_GET(&col->data)) {
2876 7230 : BAT *b = bind_col(tr, col, QUICK);
2877 :
2878 7230 : if (b)
2879 7230 : distinct = b->tkey;
2880 : }
2881 : return distinct;
2882 : }
2883 :
2884 : static int
2885 1716 : double_elim_col(sql_trans *tr, sql_column *col)
2886 : {
2887 1716 : int de = 0;
2888 1716 : sql_delta *d;
2889 :
2890 1716 : assert(tr->active);
2891 1716 : if (!isTable(col->t) || !col->t->s)
2892 : return 0;
2893 :
2894 1716 : if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
2895 6 : if (d->cs.st == ST_DICT) {
2896 6 : BAT *b = bind_col(tr, col, QUICK);
2897 6 : if (b && b->ttype == TYPE_bte)
2898 : de = 1;
2899 0 : else if (b && b->ttype == TYPE_sht)
2900 1716 : de = 2;
2901 : }
2902 1710 : } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
2903 1710 : BAT *b = bind_col(tr, col, QUICK);
2904 :
2905 1710 : if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
2906 1710 : de = GDK_ELIMDOUBLES(b->tvheap);
2907 1710 : if (de)
2908 1484 : de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
2909 : }
2910 1484 : assert(de >= 0 && de <= 16);
2911 : }
2912 : return de;
2913 : }
2914 :
2915 : static int
2916 3710629 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
2917 : {
2918 3710629 : int ok = 0;
2919 3710629 : BAT *b = NULL, *off = NULL, *upv = NULL;
2920 3710629 : sql_delta *d = NULL;
2921 :
2922 3710629 : (void) tr;
2923 3710629 : assert(tr->active);
2924 3710629 : *nonil = false;
2925 3710629 : *unique = false;
2926 3710629 : *unique_est = 0.0;
2927 3710629 : if (!c || !isTable(c->t) || !c->t->s)
2928 : return ok;
2929 :
2930 3710422 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2931 3673554 : if (d->cs.st == ST_FOR) {
2932 30 : *nonil = true; /* TODO for min/max. I will do it later */
2933 30 : return ok;
2934 : }
2935 3673524 : int eclass = c->type.type->eclass;
2936 3673524 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2937 3673524 : if ((b = bind_col(tr, c, access))) {
2938 3673313 : if (!(b = bind_no_view(b, false)))
2939 0 : return ok;
2940 3674177 : BATiter bi = bat_iterator(b);
2941 3673368 : *nonil = bi.nonil && !bi.nil;
2942 :
2943 3673368 : if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
2944 3382204 : d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
2945 2226512 : if (c->min && VALinit(min, bi.type, c->min))
2946 : ok |= 1;
2947 2226496 : else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
2948 2218033 : ok |= 1;
2949 2226402 : if (c->max && VALinit(max, bi.type, c->max))
2950 16 : ok |= 2;
2951 2226388 : else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
2952 2212532 : ok |= 2;
2953 : }
2954 3673227 : if (d->cs.ucnt == 0) {
2955 3670620 : if (d->cs.st == ST_DEFAULT) {
2956 3669915 : *unique = bi.key;
2957 3669915 : *unique_est = bi.unique_est;
2958 3669915 : if (*unique_est == 0)
2959 1167295 : *unique_est = (double)BATguess_uniques(b,NULL);
2960 705 : } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
2961 : /* for dict, check the offsets bat for uniqueness */
2962 705 : MT_lock_set(&off->theaplock);
2963 705 : *unique = off->tkey;
2964 705 : *unique_est = off->tunique_est;
2965 705 : MT_lock_unset(&off->theaplock);
2966 : }
2967 : }
2968 3673860 : bat_iterator_end(&bi);
2969 3673483 : bat_destroy(b);
2970 3673230 : if (*nonil && d->cs.ucnt > 0) {
2971 : /* This could use a quick descriptor */
2972 519 : if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
2973 0 : *nonil = false;
2974 : } else {
2975 519 : MT_lock_set(&upv->theaplock);
2976 519 : *nonil &= upv->tnonil && !upv->tnil;
2977 519 : MT_lock_unset(&upv->theaplock);
2978 519 : bat_destroy(upv);
2979 : }
2980 : }
2981 : }
2982 : }
2983 : return ok;
2984 : }
2985 :
2986 : static int
2987 257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
2988 : {
2989 257 : assert(tr->active);
2990 257 : if (!isTable(col->t) || !col->t->s)
2991 : return LOG_OK;
2992 :
2993 252 : if (col && ATOMIC_PTR_GET(&col->data)) {
2994 252 : BAT *b = bind_col(tr, col, QUICK);
2995 :
2996 252 : if (b) { /* add props for ranges [min, max> */
2997 252 : MT_lock_set(&b->theaplock);
2998 252 : if (add_range) {
2999 179 : BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
3000 179 : if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
3001 103 : BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
3002 : else
3003 76 : BATrmprop_nolock(b, GDK_MAX_BOUND);
3004 179 : if (!pt->with_nills || !col->null)
3005 117 : BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
3006 : } else {
3007 73 : BATrmprop_nolock(b, GDK_MIN_BOUND);
3008 73 : BATrmprop_nolock(b, GDK_MAX_BOUND);
3009 73 : BATrmprop_nolock(b, GDK_NOT_NULL);
3010 : }
3011 252 : MT_lock_unset(&b->theaplock);
3012 : }
3013 : }
3014 : return LOG_OK;
3015 : }
3016 :
3017 : static int
3018 4313 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
3019 : {
3020 4313 : assert(tr->active);
3021 4313 : if (!isTable(col->t) || !col->t->s)
3022 : return LOG_OK;
3023 :
3024 4283 : if (col && ATOMIC_PTR_GET(&col->data)) {
3025 4283 : BAT *b = bind_col(tr, col, QUICK);
3026 :
3027 4283 : if (b) { /* add props for ranges [min, max> */
3028 4283 : if (not_null) {
3029 4281 : BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
3030 : } else {
3031 2 : BATrmprop(b, GDK_NOT_NULL);
3032 : }
3033 : }
3034 : }
3035 : return LOG_OK;
3036 : }
3037 :
3038 : static int
3039 30322 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
3040 : {
3041 30322 : sqlstore *store = tr->store;
3042 30322 : int bid = log_find_bat(store->logger, id);
3043 30322 : if (bid <= 0)
3044 : return LOG_ERR;
3045 30322 : cs->bid = temp_dup(bid);
3046 30322 : cs->ucnt = 0;
3047 30322 : cs->uibid = e_bat(TYPE_oid);
3048 30322 : cs->uvbid = e_bat(type);
3049 30322 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
3050 : return LOG_ERR;
3051 : return LOG_OK;
3052 : }
3053 :
3054 : static int
3055 65930 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
3056 : {
3057 65930 : int res = LOG_OK;
3058 65930 : gdk_return ok;
3059 65930 : BAT *b = temp_descriptor(bat->cs.bid);
3060 :
3061 65930 : if (b == NULL)
3062 : return LOG_ERR;
3063 :
3064 65930 : if (!bat->cs.uibid)
3065 65924 : bat->cs.uibid = e_bat(TYPE_oid);
3066 65930 : if (!bat->cs.uvbid)
3067 65924 : bat->cs.uvbid = e_bat(b->ttype);
3068 65930 : if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
3069 0 : res = LOG_ERR;
3070 65930 : if (GDKinmemory(0)) {
3071 174 : bat_destroy(b);
3072 174 : return res;
3073 : }
3074 :
3075 65756 : bat_set_access(b, BAT_READ);
3076 65756 : sqlstore *store = tr->store;
3077 65756 : ok = log_bat_persists(store->logger, b, id);
3078 65756 : bat_destroy(b);
3079 65756 : if(res != LOG_OK)
3080 : return res;
3081 65756 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3082 : }
3083 :
3084 : static int
3085 0 : new_persistent_delta( sql_delta *bat)
3086 : {
3087 0 : bat->cs.ucnt = 0;
3088 0 : return LOG_OK;
3089 : }
3090 :
3091 : static void
3092 124665 : create_delta( sql_delta *d, BAT *b)
3093 : {
3094 124665 : bat_set_access(b, BAT_READ);
3095 124665 : d->cs.bid = temp_create(b);
3096 124665 : d->cs.uibid = d->cs.uvbid = 0;
3097 124665 : d->cs.ucnt = 0;
3098 124665 : }
3099 :
3100 : static bat
3101 7020 : copyBat (bat i, int type, oid seq)
3102 : {
3103 7020 : BAT *b, *tb;
3104 7020 : bat res;
3105 :
3106 7020 : if (!i)
3107 : return i;
3108 7020 : tb = quick_descriptor(i);
3109 7020 : if (tb == NULL)
3110 : return 0;
3111 7020 : b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
3112 7020 : if (b == NULL)
3113 : return 0;
3114 :
3115 7020 : bat_set_access(b, BAT_READ);
3116 :
3117 7020 : res = temp_create(b);
3118 7020 : bat_destroy(b);
3119 7020 : return res;
3120 : }
3121 :
3122 : static int
3123 147911 : create_col(sql_trans *tr, sql_column *c)
3124 : {
3125 147911 : int ok = LOG_OK, new = 0;
3126 147911 : int type = c->type.type->localtype;
3127 147911 : sql_delta *bat = ATOMIC_PTR_GET(&c->data);
3128 :
3129 147911 : if (!bat) {
3130 147911 : new = 1;
3131 147911 : bat = ZNEW(sql_delta);
3132 147911 : if (!bat)
3133 : return LOG_ERR;
3134 147911 : ATOMIC_PTR_SET(&c->data, bat);
3135 147911 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3136 : }
3137 :
3138 147911 : if (new)
3139 147911 : bat->cs.ts = tr->tid;
3140 :
3141 147911 : if (!isNew(c)&& !isTempTable(c->t)){
3142 23226 : bat->cs.ts = tr->ts;
3143 23226 : ok = load_cs(tr, &bat->cs, type, c->base.id);
3144 23226 : if (ok == LOG_OK && c->storage_type) {
3145 4 : if (strcmp(c->storage_type, "DICT") == 0) {
3146 2 : sqlstore *store = tr->store;
3147 2 : int bid = log_find_bat(store->logger, -c->base.id);
3148 2 : if (bid <= 0)
3149 : return LOG_ERR;
3150 2 : bat->cs.ebid = temp_dup(bid);
3151 2 : bat->cs.st = ST_DICT;
3152 2 : } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
3153 2 : bat->cs.st = ST_FOR;
3154 : }
3155 : }
3156 23226 : return ok;
3157 124685 : } else if (bat && bat->cs.bid) {
3158 0 : return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
3159 : } else {
3160 124685 : sql_column *fc = NULL;
3161 124685 : size_t cnt = 0;
3162 :
3163 : /* alter ? */
3164 124685 : if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
3165 77066 : storage *s = tab_timestamp_storage(tr, fc->t);
3166 77066 : if (s == NULL)
3167 : return LOG_ERR;
3168 77066 : cnt = segs_end(s->segs, tr, c->t);
3169 : }
3170 124685 : if (cnt && fc != c) {
3171 20 : sql_delta *d = ATOMIC_PTR_GET(&fc->data);
3172 :
3173 20 : if (d->cs.bid) {
3174 20 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3175 20 : if(bat->cs.bid == BID_NIL)
3176 20 : ok = LOG_ERR;
3177 : }
3178 20 : if (d->cs.uibid) {
3179 8 : bat->cs.uibid = e_bat(TYPE_oid);
3180 8 : if (bat->cs.uibid == BID_NIL)
3181 20 : ok = LOG_ERR;
3182 : }
3183 20 : if (d->cs.uvbid) {
3184 8 : bat->cs.uvbid = e_bat(type);
3185 8 : if(bat->cs.uvbid == BID_NIL)
3186 0 : ok = LOG_ERR;
3187 : }
3188 : } else {
3189 124665 : BAT *b = bat_new(type, c->t->sz, PERSISTENT);
3190 124665 : if (!b) {
3191 : ok = LOG_ERR;
3192 : } else {
3193 124665 : create_delta(ATOMIC_PTR_GET(&c->data), b);
3194 124665 : bat_destroy(b);
3195 : }
3196 :
3197 124664 : if (!new) {
3198 0 : bat->cs.uibid = e_bat(TYPE_oid);
3199 0 : if (bat->cs.uibid == BID_NIL)
3200 0 : ok = LOG_ERR;
3201 0 : bat->cs.uvbid = e_bat(type);
3202 0 : if(bat->cs.uvbid == BID_NIL)
3203 0 : ok = LOG_ERR;
3204 : }
3205 : }
3206 124684 : bat->cs.ucnt = 0;
3207 :
3208 124684 : if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
3209 76 : trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
3210 : }
3211 : return ok;
3212 : }
3213 :
3214 : static int
3215 59690 : log_create_col_(sql_trans *tr, sql_column *c)
3216 : {
3217 59690 : assert(!isTempTable(c->t));
3218 59690 : return log_create_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3219 : }
3220 :
3221 : static int
3222 72 : log_create_col(sql_trans *tr, sql_change *change)
3223 : {
3224 72 : return log_create_col_(tr, (sql_column*)change->obj);
3225 : }
3226 :
3227 : static int
3228 112490 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
3229 : {
3230 112490 : (void) t; // TODO transaction_layer_revamp: remove if unnecessary
3231 112490 : (void)oldest;
3232 112490 : assert(delta->cs.ts == tr->tid);
3233 112490 : delta->cs.ts = commit_ts;
3234 :
3235 112490 : assert(delta->next == NULL);
3236 112490 : if (!delta->cs.merged)
3237 112489 : merge_delta(delta);
3238 112490 : if (!tr->parent)
3239 112486 : base->new = 0;
3240 112490 : return LOG_OK;
3241 : }
3242 :
3243 : static int
3244 76 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3245 : {
3246 76 : sql_column *c = (sql_column*)change->obj;
3247 76 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3248 76 : if (!tr->parent)
3249 75 : c->base.new = 0;
3250 76 : return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
3251 : }
3252 :
3253 : /* will be called for new idx's and when new index columns are created */
3254 : static int
3255 9280 : create_idx(sql_trans *tr, sql_idx *ni)
3256 : {
3257 9280 : int ok = LOG_OK, new = 0;
3258 9280 : sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
3259 9280 : int type = TYPE_lng;
3260 :
3261 9280 : if (oid_index(ni->type))
3262 935 : type = TYPE_oid;
3263 :
3264 9280 : if (!bat) {
3265 9280 : new = 1;
3266 9280 : bat = ZNEW(sql_delta);
3267 9280 : if (!bat)
3268 : return LOG_ERR;
3269 9280 : ATOMIC_PTR_INIT(&ni->data, bat);
3270 9280 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3271 : }
3272 :
3273 9280 : if (new)
3274 9280 : bat->cs.ts = tr->tid;
3275 :
3276 9280 : if (!isNew(ni) && !isTempTable(ni->t)){
3277 2280 : bat->cs.ts = 1;
3278 2280 : return load_cs(tr, &bat->cs, type, ni->base.id);
3279 7000 : } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
3280 0 : return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
3281 : } else {
3282 7000 : sql_column *c = ol_first_node(ni->t->columns)->data;
3283 7000 : sql_delta *d = col_timestamp_delta(tr, c);
3284 :
3285 7000 : if (d) {
3286 : /* Here we also handle indices created through alter stmts */
3287 : /* These need to be created aligned to the existing data */
3288 7000 : if (d->cs.bid) {
3289 7000 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3290 7000 : if(bat->cs.bid == BID_NIL)
3291 7000 : ok = LOG_ERR;
3292 : }
3293 : } else {
3294 : return LOG_ERR;
3295 : }
3296 :
3297 7000 : bat->cs.ucnt = 0;
3298 :
3299 7000 : if (!new) {
3300 0 : bat->cs.uibid = e_bat(TYPE_oid);
3301 0 : if (bat->cs.uibid == BID_NIL)
3302 0 : ok = LOG_ERR;
3303 0 : bat->cs.uvbid = e_bat(type);
3304 0 : if(bat->cs.uvbid == BID_NIL)
3305 0 : ok = LOG_ERR;
3306 : }
3307 7000 : bat->cs.ucnt = 0;
3308 7000 : if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
3309 619 : trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
3310 : }
3311 : return ok;
3312 : }
3313 :
3314 : static int
3315 6240 : log_create_idx_(sql_trans *tr, sql_idx *i)
3316 : {
3317 6240 : assert(!isTempTable(i->t));
3318 6240 : return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3319 : }
3320 :
3321 : static int
3322 606 : log_create_idx(sql_trans *tr, sql_change *change)
3323 : {
3324 606 : return log_create_idx_(tr, (sql_idx*)change->obj);
3325 : }
3326 :
3327 : static int
3328 619 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3329 : {
3330 619 : sql_idx *i = (sql_idx*)change->obj;
3331 619 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3332 619 : if (!tr->parent)
3333 619 : i->base.new = 0;
3334 619 : return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
3335 : return LOG_OK;
3336 : }
3337 :
3338 : static int
3339 4816 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
3340 : {
3341 4816 : int ok = load_cs(tr, &s->cs, TYPE_msk, id);
3342 4816 : BAT *b = NULL, *ib = NULL;
3343 :
3344 4816 : if (ok != LOG_OK)
3345 : return ok;
3346 4816 : if (!(b = temp_descriptor(s->cs.bid)))
3347 : return LOG_ERR;
3348 4816 : ib = b;
3349 :
3350 4816 : if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
3351 0 : bat_destroy(ib);
3352 0 : return LOG_ERR;
3353 : }
3354 :
3355 4816 : if (BATcount(b)) {
3356 328 : if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
3357 0 : bat_destroy(ib);
3358 0 : return LOG_ERR;
3359 : }
3360 505 : if (BATtdense(b)) {
3361 177 : size_t start = b->tseqbase;
3362 177 : size_t cnt = BATcount(b);
3363 177 : ok = delete_range(tr, t, s, start, cnt);
3364 : } else {
3365 151 : assert(b->tsorted);
3366 151 : BUN icnt = BATcount(b);
3367 151 : BATiter bi = bat_iterator(b);
3368 151 : size_t lcnt = 1;
3369 151 : oid n;
3370 151 : segment *seg = s->segs->h;
3371 151 : if (complex_cand(b)) {
3372 0 : oid o = * (oid *) Tpos(&bi, 0);
3373 0 : n = o + 1;
3374 0 : for (BUN i = 1; i < icnt; i++) {
3375 0 : o = * (oid *) Tpos(&bi, i);
3376 0 : if (o == n) {
3377 0 : lcnt++;
3378 0 : n++;
3379 : } else {
3380 0 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3381 : break;
3382 : lcnt = 0;
3383 : }
3384 0 : if (!lcnt) {
3385 0 : n = o + 1;
3386 0 : lcnt = 1;
3387 : }
3388 : }
3389 : } else {
3390 151 : oid *o = bi.base;
3391 151 : n = o[0]+1;
3392 207956 : for (size_t i=1; i<icnt; i++) {
3393 207805 : if (o[i] == n) {
3394 205854 : lcnt++;
3395 205854 : n++;
3396 : } else {
3397 1951 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3398 : break;
3399 : lcnt = 0;
3400 : }
3401 207805 : if (!lcnt) {
3402 1951 : n = o[i]+1;
3403 1951 : lcnt = 1;
3404 : }
3405 : }
3406 : }
3407 151 : if (lcnt && ok == LOG_OK)
3408 151 : ok = delete_range(tr, t, s, n-lcnt, lcnt);
3409 151 : bat_iterator_end(&bi);
3410 : }
3411 328 : if (ok == LOG_OK)
3412 5047 : for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
3413 4719 : if (seg->ts == tr->tid)
3414 2450 : seg->ts = 1;
3415 : } else {
3416 4488 : if (ok == LOG_OK) {
3417 4488 : BAT *bb = quick_descriptor(s->cs.bid);
3418 :
3419 4488 : if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
3420 : ok = LOG_ERR;
3421 : } else {
3422 4488 : segment *seg = s->segs->h;
3423 4488 : if (seg->ts == tr->tid)
3424 4488 : seg->ts = 1;
3425 : }
3426 : }
3427 : }
3428 4816 : if (b != ib)
3429 4816 : bat_destroy(b);
3430 4816 : bat_destroy(ib);
3431 :
3432 4816 : return ok;
3433 : }
3434 :
3435 : static int
3436 24648 : create_del(sql_trans *tr, sql_table *t)
3437 : {
3438 24648 : int ok = LOG_OK, new = 0;
3439 24648 : BAT *b;
3440 24648 : storage *bat = ATOMIC_PTR_GET(&t->data);
3441 :
3442 24648 : if (!bat) {
3443 24648 : new = 1;
3444 24648 : bat = ZNEW(storage);
3445 24648 : if(!bat)
3446 : return LOG_ERR;
3447 24648 : ATOMIC_PTR_INIT(&t->data, bat);
3448 24648 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3449 24648 : bat->cs.ts = tr->tid;
3450 : }
3451 :
3452 24648 : if (!isNew(t) && !isTempTable(t)) {
3453 4816 : bat->cs.ts = tr->ts;
3454 4816 : return load_storage(tr, t, bat, t->base.id);
3455 19832 : } else if (bat->cs.bid) {
3456 : return ok;
3457 : } else {
3458 19832 : assert(!bat->segs);
3459 19832 : if (!(bat->segs = new_segments(tr, 0)))
3460 : return LOG_ERR;
3461 :
3462 19832 : b = bat_new(TYPE_msk, t->sz, PERSISTENT);
3463 19832 : if(b != NULL) {
3464 19832 : bat_set_access(b, BAT_READ);
3465 19832 : bat->cs.bid = temp_create(b);
3466 19832 : bat_destroy(b);
3467 : } else {
3468 : return LOG_ERR;
3469 : }
3470 19832 : if (new)
3471 26146 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
3472 : }
3473 : return ok;
3474 : }
3475 :
3476 : static int
3477 190295 : log_segment(sql_trans *tr, segment *s, sqlid id)
3478 : {
3479 190295 : sqlstore *store = tr->store;
3480 190295 : msk m = s->deleted;
3481 190295 : return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
3482 : }
3483 :
3484 : static int
3485 94100 : log_segments(sql_trans *tr, segments *segs, sqlid id)
3486 : {
3487 : /* log segments */
3488 94100 : lock_table(tr->store, id);
3489 449747 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3490 355647 : unlock_table(tr->store, id);
3491 355647 : if (seg->ts == tr->tid && seg->end-seg->start) {
3492 144552 : if (log_segment(tr, seg, id) != LOG_OK) {
3493 0 : unlock_table(tr->store, id);
3494 0 : return LOG_ERR;
3495 : }
3496 : }
3497 355647 : lock_table(tr->store, id);
3498 : }
3499 94100 : unlock_table(tr->store, id);
3500 94100 : return LOG_OK;
3501 : }
3502 :
3503 : static int
3504 11914 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
3505 : {
3506 11914 : BAT *b;
3507 11914 : int ok = LOG_OK;
3508 :
3509 11914 : if (GDKinmemory(0))
3510 : return LOG_OK;
3511 :
3512 11882 : b = temp_descriptor(bat->cs.bid);
3513 11882 : if (b == NULL)
3514 : return LOG_ERR;
3515 :
3516 11882 : sqlstore *store = tr->store;
3517 11882 : bat_set_access(b, BAT_READ);
3518 11882 : if (ok == LOG_OK)
3519 11882 : ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
3520 11882 : if (ok == LOG_OK)
3521 11882 : ok = log_segments(tr, bat->segs, t->base.id);
3522 11882 : bat_destroy(b);
3523 11882 : return ok;
3524 : }
3525 :
3526 : static int
3527 11924 : log_create_del(sql_trans *tr, sql_change *change)
3528 : {
3529 11924 : int ok = LOG_OK;
3530 11924 : sql_table *t = (sql_table*)change->obj;
3531 :
3532 11924 : if (t->base.deleted)
3533 : return ok;
3534 11914 : assert(!isTempTable(t));
3535 11914 : ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
3536 11914 : if (ok == LOG_OK) {
3537 71532 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3538 59618 : sql_column *c = n->data;
3539 :
3540 59618 : ok = log_create_col_(tr, c);
3541 : }
3542 11914 : if (t->idxs) {
3543 17560 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3544 5646 : sql_idx *i = n->data;
3545 :
3546 5646 : if (ATOMIC_PTR_GET(&i->data))
3547 5634 : ok = log_create_idx_(tr, i);
3548 : }
3549 : }
3550 : }
3551 : return ok;
3552 : }
3553 :
3554 : static int
3555 19834 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3556 : {
3557 19834 : int ok = LOG_OK;
3558 19834 : sql_table *t = (sql_table*)change->obj;
3559 19834 : storage *dbat = ATOMIC_PTR_GET(&t->data);
3560 :
3561 19834 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
3562 118 : assert(isTempTable(t));
3563 118 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
3564 118 : if (commit_ts) dbat->segs->h->ts = commit_ts;
3565 118 : return ok;
3566 : }
3567 :
3568 19716 : if (!commit_ts) /* rollback handled by ? */
3569 : return ok;
3570 18017 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
3571 18017 : assert(ok == LOG_OK);
3572 18017 : if (ok != LOG_OK)
3573 : return ok;
3574 18017 : merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
3575 18017 : assert(dbat->cs.ts == tr->tid);
3576 18017 : dbat->cs.ts = commit_ts;
3577 18017 : if (ok == LOG_OK) {
3578 124165 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3579 106148 : sql_column *c = n->data;
3580 106148 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3581 :
3582 106148 : ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
3583 : }
3584 18017 : if (t->idxs) {
3585 23676 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3586 5659 : sql_idx *i = n->data;
3587 5659 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3588 :
3589 5659 : if (delta)
3590 5647 : ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
3591 : }
3592 : }
3593 18017 : if (!tr->parent)
3594 18015 : t->base.new = 0;
3595 : }
3596 18017 : if (!tr->parent)
3597 18015 : t->base.new = 0;
3598 : return ok;
3599 : }
3600 :
3601 : static int
3602 17876 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
3603 : {
3604 17876 : gdk_return ok = GDK_SUCCEED;
3605 :
3606 17876 : sqlstore *store = tr->store;
3607 17876 : if (!GDKinmemory(0) && b && b->cs.bid)
3608 17873 : ok = log_bat_transient(store->logger, id);
3609 17876 : if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
3610 25 : ok = log_bat_transient(store->logger, -id);
3611 17876 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3612 : }
3613 :
3614 : static int
3615 164878 : destroy_col(sqlstore *store, sql_column *c)
3616 : {
3617 164878 : (void)store;
3618 164878 : if (ATOMIC_PTR_GET(&c->data))
3619 164878 : destroy_delta(ATOMIC_PTR_GET(&c->data), true);
3620 164878 : ATOMIC_PTR_SET(&c->data, NULL);
3621 164878 : return LOG_OK;
3622 : }
3623 :
3624 : static int
3625 16042 : log_destroy_col_(sql_trans *tr, sql_column *c)
3626 : {
3627 16042 : int ok = LOG_OK;
3628 16042 : assert(!isTempTable(c->t));
3629 16042 : if (!tr->parent) /* don't write save point commits */
3630 16042 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3631 16042 : return ok;
3632 : }
3633 :
3634 : static int
3635 49 : log_destroy_col(sql_trans *tr, sql_change *change)
3636 : {
3637 49 : sql_column *c = (sql_column*)change->obj;
3638 49 : int res = log_destroy_col_(tr, c);
3639 49 : change->obj = NULL;
3640 49 : column_destroy(tr->store, c);
3641 49 : return res;
3642 : }
3643 :
3644 : static int
3645 10632 : destroy_idx(sqlstore *store, sql_idx *i)
3646 : {
3647 10632 : (void)store;
3648 10632 : if (ATOMIC_PTR_GET(&i->data))
3649 10632 : destroy_delta(ATOMIC_PTR_GET(&i->data), true);
3650 10632 : ATOMIC_PTR_SET(&i->data, NULL);
3651 10632 : return LOG_OK;
3652 : }
3653 :
3654 : static int
3655 1896 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
3656 : {
3657 1896 : int ok = LOG_OK;
3658 1896 : assert(!isTempTable(i->t));
3659 1896 : if (ATOMIC_PTR_GET(&i->data)) {
3660 1834 : if (!tr->parent) /* don't write save point commits */
3661 1834 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3662 : }
3663 1896 : return ok;
3664 : }
3665 :
3666 : static int
3667 496 : log_destroy_idx(sql_trans *tr, sql_change *change)
3668 : {
3669 496 : sql_idx *i = (sql_idx*)change->obj;
3670 496 : int res = log_destroy_idx_(tr, i);
3671 496 : change->obj = NULL;
3672 496 : idx_destroy(tr->store, i);
3673 496 : return res;
3674 : }
3675 :
3676 : static int
3677 26099 : destroy_del(sqlstore *store, sql_table *t)
3678 : {
3679 26099 : (void)store;
3680 26099 : if (ATOMIC_PTR_GET(&t->data))
3681 26089 : destroy_storage(ATOMIC_PTR_GET(&t->data));
3682 26099 : ATOMIC_PTR_SET(&t->data, NULL);
3683 26099 : return LOG_OK;
3684 : }
3685 :
3686 : static int
3687 2975 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
3688 : {
3689 2975 : gdk_return ok = GDK_SUCCEED;
3690 :
3691 2975 : sqlstore *store = tr->store;
3692 2975 : if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
3693 2975 : bat && bat->cs.bid)
3694 2975 : ok = log_bat_transient(store->logger, id);
3695 2975 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3696 : }
3697 :
3698 : static int
3699 2975 : log_destroy_del(sql_trans *tr, sql_change *change)
3700 : {
3701 2975 : int ok = LOG_OK;
3702 2975 : sql_table *t = (sql_table*)change->obj;
3703 :
3704 2975 : assert(!isTempTable(t));
3705 2975 : ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
3706 2975 : if (ok == LOG_OK) {
3707 18968 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3708 15993 : sql_column *c = n->data;
3709 :
3710 15993 : ok = log_destroy_col_(tr, c);
3711 : }
3712 2975 : if (t->idxs) {
3713 4375 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3714 1400 : sql_idx *i = n->data;
3715 :
3716 1400 : ok = log_destroy_idx_(tr, i);
3717 : }
3718 : }
3719 : }
3720 2975 : return ok;
3721 : }
3722 :
3723 : static int
3724 3600 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3725 : {
3726 3600 : (void)tr;
3727 3600 : (void)change;
3728 3600 : (void)commit_ts;
3729 3600 : (void)oldest;
3730 3600 : if (commit_ts)
3731 3586 : change->handled = true;
3732 3600 : return 0;
3733 : }
3734 :
3735 : static int
3736 3047 : drop_del(sql_trans *tr, sql_table *t)
3737 : {
3738 3047 : int ok = LOG_OK;
3739 :
3740 3047 : if (!isNew(t)) {
3741 3047 : storage *bat = ATOMIC_PTR_GET(&t->data);
3742 3113 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_destroy_del);
3743 : }
3744 3047 : return ok;
3745 : }
3746 :
3747 : static int
3748 56 : drop_col(sql_trans *tr, sql_column *c)
3749 : {
3750 56 : assert(!isNew(c));
3751 56 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
3752 56 : trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_destroy_col);
3753 56 : return LOG_OK;
3754 : }
3755 :
3756 : static int
3757 497 : drop_idx(sql_trans *tr, sql_idx *i)
3758 : {
3759 497 : assert(!isNew(i));
3760 497 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
3761 497 : trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_destroy_idx);
3762 497 : return LOG_OK;
3763 : }
3764 :
3765 :
3766 : static BUN
3767 129718 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
3768 : {
3769 129718 : BAT *b;
3770 129718 : BUN sz = 0;
3771 :
3772 129718 : (void)tr;
3773 129718 : assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
3774 129718 : if (cs->bid && renew) {
3775 129757 : b = quick_descriptor(cs->bid);
3776 129719 : if (b) {
3777 129719 : sz += BATcount(b);
3778 129719 : if (cs->st == ST_DICT) {
3779 2 : bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
3780 2 : BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
3781 :
3782 2 : if (nebid == BID_NIL || !n) {
3783 0 : temp_destroy(nebid);
3784 0 : bat_destroy(n);
3785 0 : return BUN_NONE;
3786 : }
3787 2 : temp_destroy(cs->ebid);
3788 2 : cs->ebid = nebid;
3789 2 : if (!temp)
3790 2 : bat_set_access(n, BAT_READ);
3791 2 : temp_destroy(cs->bid);
3792 2 : cs->bid = temp_create(n); /* create empty copy */
3793 2 : bat_destroy(n);
3794 : } else {
3795 129717 : bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
3796 :
3797 129453 : if (nbid == BID_NIL)
3798 : return BUN_NONE;
3799 129453 : temp_destroy(cs->bid);
3800 129603 : cs->bid = nbid;
3801 : }
3802 : } else {
3803 : return BUN_NONE;
3804 : }
3805 : }
3806 129566 : if (cs->uibid) {
3807 129423 : temp_destroy(cs->uibid);
3808 129642 : cs->uibid = 0;
3809 : }
3810 129785 : if (cs->uvbid) {
3811 129643 : temp_destroy(cs->uvbid);
3812 129646 : cs->uvbid = 0;
3813 : }
3814 129788 : cs->cleared = true;
3815 129788 : cs->ucnt = 0;
3816 129788 : return sz;
3817 : }
3818 :
3819 : static BUN
3820 129599 : clear_col(sql_trans *tr, sql_column *c, bool renew)
3821 : {
3822 129599 : bool update_conflict = false;
3823 129599 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
3824 :
3825 129599 : if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
3826 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3827 129639 : assert(c->t->persistence != SQL_DECLARED_TABLE);
3828 129639 : if (odelta != delta)
3829 129644 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
3830 129624 : if (delta)
3831 129624 : return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
3832 : return 0;
3833 : }
3834 :
3835 : static BUN
3836 21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
3837 : {
3838 21 : bool update_conflict = false;
3839 21 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
3840 :
3841 21 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
3842 15 : return 0;
3843 6 : if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
3844 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3845 6 : assert(i->t->persistence != SQL_DECLARED_TABLE);
3846 6 : if (odelta != delta)
3847 6 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
3848 6 : if (delta)
3849 6 : return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
3850 : return 0;
3851 : }
3852 :
3853 : static int
3854 140 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
3855 : {
3856 140 : if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
3857 : return LOG_ERR;
3858 140 : if (s->segs)
3859 140 : destroy_segments(s->segs);
3860 140 : if (!(s->segs = new_segments(tr, 0)))
3861 : return LOG_ERR;
3862 : return LOG_OK;
3863 : }
3864 :
3865 :
3866 : /*
3867 : * Clear the table, in general this means replacing the storage,
3868 : * but in case of earlier deletes (or inserts by this transaction), we only mark
3869 : * all segments as deleted.
3870 : * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
3871 : */
3872 : static BUN
3873 41823 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
3874 : {
3875 41823 : int clear = !in_transaction, ok = LOG_OK;
3876 41823 : bool conflict = false;
3877 41823 : storage *bat;
3878 :
3879 41874 : if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
3880 15762 : return conflict?BUN_NONE-1:BUN_NONE;
3881 :
3882 26060 : if (!clear) {
3883 51 : lock_table(tr->store, t->base.id);
3884 51 : ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
3885 51 : unlock_table(tr->store, t->base.id);
3886 : }
3887 26060 : assert(t->persistence != SQL_DECLARED_TABLE);
3888 26060 : if (!in_transaction)
3889 26012 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
3890 26058 : if (ok == LOG_ERR)
3891 : return BUN_NONE;
3892 26058 : if (ok == LOG_CONFLICT)
3893 0 : return BUN_NONE - 1;
3894 : return LOG_OK;
3895 : }
3896 :
3897 : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
3898 : static BUN
3899 41802 : clear_table(sql_trans *tr, sql_table *t)
3900 : {
3901 41802 : node *n = ol_first_node(t->columns);
3902 41802 : sql_column *c = n->data;
3903 41802 : storage *d = tab_timestamp_storage(tr, t);
3904 41820 : int in_transaction, clear;
3905 41820 : BUN sz, clear_ok;
3906 :
3907 41820 : if (!d)
3908 : return BUN_NONE;
3909 41820 : in_transaction = segments_in_transaction(tr, t);
3910 41820 : clear = !in_transaction;
3911 41820 : sz = count_col(tr, c, CNT_ACTIVE);
3912 41823 : if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
3913 : return clear_ok;
3914 :
3915 26057 : if (in_transaction)
3916 : return sz;
3917 :
3918 155645 : for (; n; n = n->next) {
3919 129636 : c = n->data;
3920 :
3921 129636 : if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
3922 0 : return clear_ok;
3923 : }
3924 26009 : if (t->idxs) {
3925 26030 : for (n = ol_first_node(t->idxs); n; n = n->next) {
3926 21 : sql_idx *ci = n->data;
3927 :
3928 21 : if (isTable(ci->t) && idx_has_column(ci->type) &&
3929 21 : (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
3930 0 : return clear_ok;
3931 : }
3932 : }
3933 : return sz;
3934 : }
3935 :
3936 : static int
3937 158997 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
3938 : {
3939 158997 : sqlstore *store = tr->store;
3940 158997 : gdk_return ok = GDK_SUCCEED;
3941 :
3942 158997 : (void) t;
3943 158997 : (void) segs;
3944 158997 : if (GDKinmemory(0))
3945 : return LOG_OK;
3946 :
3947 158990 : if (cs->cleared) {
3948 155702 : assert(cs->ucnt == 0);
3949 155702 : BAT *ins = temp_descriptor(cs->bid);
3950 155702 : if (!ins)
3951 : return LOG_ERR;
3952 155702 : assert(!isEbat(ins));
3953 155702 : bat_set_access(ins, BAT_READ);
3954 155702 : ok = log_bat_persists(store->logger, ins, id);
3955 155702 : bat_destroy(ins);
3956 155702 : if (ok == GDK_SUCCEED && cs->ebid) {
3957 56 : BAT *ins = temp_descriptor(cs->ebid);
3958 56 : if (!ins)
3959 : return LOG_ERR;
3960 56 : assert(!isEbat(ins));
3961 56 : bat_set_access(ins, BAT_READ);
3962 56 : ok = log_bat_persists(store->logger, ins, -id);
3963 56 : bat_destroy(ins);
3964 : }
3965 155702 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3966 : }
3967 :
3968 3288 : assert(!isTempTable(t));
3969 :
3970 3288 : if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
3971 2717 : BAT *ui = temp_descriptor(cs->uibid);
3972 2717 : BAT *uv = temp_descriptor(cs->uvbid);
3973 : /* any updates */
3974 2717 : if (ui == NULL || uv == NULL) {
3975 : ok = GDK_FAIL;
3976 2717 : } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
3977 2717 : ok = log_delta(store->logger, ui, uv, id);
3978 2717 : bat_destroy(ui);
3979 2717 : bat_destroy(uv);
3980 : }
3981 2717 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3982 : }
3983 :
3984 : static inline int
3985 56215 : tr_log_table_start(sql_trans *tr, sql_table *t) {
3986 56215 : sqlstore *store = tr->store;
3987 56215 : return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3988 : }
3989 :
3990 : static inline int
3991 56215 : tr_log_table_end(sql_trans *tr, sql_table *t) {
3992 56215 : sqlstore *store = tr->store;
3993 56215 : return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3994 : }
3995 :
3996 : static int
3997 56215 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
3998 : {
3999 56215 : sqlstore *store = tr->store;
4000 56215 : gdk_return ok = GDK_SUCCEED;
4001 :
4002 56215 : size_t end = segs_end(segs, tr, t);
4003 :
4004 56215 : if (tr_log_table_start(tr, t) != LOG_OK)
4005 : return LOG_ERR;
4006 :
4007 56215 : size_t nr_appends = 0;
4008 :
4009 56215 : lock_table(tr->store, t->base.id);
4010 374106 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
4011 317891 : unlock_table(tr->store, t->base.id);
4012 :
4013 317891 : if (seg->ts == tr->tid && seg->end-seg->start) {
4014 114183 : if (!seg->deleted) {
4015 45743 : if (log_segment(tr, seg, t->base.id) != LOG_OK)
4016 : return LOG_ERR;
4017 :
4018 45743 : nr_appends += (seg->end - seg->start);
4019 : }
4020 : }
4021 317891 : lock_table(tr->store, t->base.id);
4022 : }
4023 56215 : unlock_table(tr->store, t->base.id);
4024 :
4025 385212 : for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
4026 328997 : sql_column *c = n->data;
4027 328997 : column_storage *cs = ATOMIC_PTR_GET(&c->data);
4028 :
4029 328997 : if (cs->cleared) {
4030 3 : ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4031 3 : continue;
4032 : }
4033 :
4034 328994 : lock_table(tr->store, t->base.id);
4035 328994 : if (!cs->cleared) {
4036 2192236 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4037 1863242 : unlock_table(tr->store, t->base.id);
4038 1863242 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4039 : /* append col*/
4040 255359 : BAT *ins = temp_descriptor(cs->bid);
4041 255359 : if (ins == NULL)
4042 : return LOG_ERR;
4043 255359 : assert(BATcount(ins) >= cur->end);
4044 255359 : ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
4045 255359 : bat_destroy(ins);
4046 : }
4047 1863242 : lock_table(tr->store, t->base.id);
4048 : }
4049 : }
4050 328994 : unlock_table(tr->store, t->base.id);
4051 :
4052 328994 : if (ok == GDK_SUCCEED && cs->ebid) {
4053 19 : BAT *ins = temp_descriptor(cs->ebid);
4054 19 : if (ins == NULL)
4055 : return LOG_ERR;
4056 19 : if (BATcount(ins) > ins->batInserted)
4057 17 : ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
4058 19 : BATcommit(ins, BATcount(ins));
4059 19 : bat_destroy(ins);
4060 : }
4061 : }
4062 :
4063 56215 : if (t->idxs) {
4064 61387 : for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
4065 5172 : sql_idx *i = n->data;
4066 :
4067 5172 : if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
4068 4399 : continue;
4069 773 : column_storage *cs = ATOMIC_PTR_GET(&i->data);
4070 :
4071 773 : if (cs) {
4072 773 : if (cs->cleared) {
4073 0 : ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4074 0 : continue;
4075 : }
4076 :
4077 773 : lock_table(tr->store, t->base.id);
4078 2372 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4079 1599 : unlock_table(tr->store, t->base.id);
4080 1599 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4081 : /* append idx */
4082 725 : BAT *ins = temp_descriptor(cs->bid);
4083 725 : if (ins == NULL)
4084 : return LOG_ERR;
4085 725 : assert(BATcount(ins) >= cur->end);
4086 725 : ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
4087 725 : bat_destroy(ins);
4088 : }
4089 1599 : lock_table(tr->store, t->base.id);
4090 : }
4091 773 : unlock_table(tr->store, t->base.id);
4092 : }
4093 : }
4094 : }
4095 :
4096 56215 : if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
4097 0 : return LOG_ERR;
4098 :
4099 : return LOG_OK;
4100 : }
4101 :
4102 : static int
4103 82218 : log_storage(sql_trans *tr, sql_table *t, storage *s)
4104 : {
4105 82218 : int ok = LOG_OK;
4106 82218 : bool cleared = s->cs.cleared;
4107 82218 : if (ok == LOG_OK && cleared)
4108 26003 : ok = tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
4109 26003 : if (ok == LOG_OK)
4110 82218 : ok = log_segments(tr, s->segs, t->base.id);
4111 82218 : if (ok == LOG_OK && !cleared)
4112 56215 : ok = log_table_append(tr, t, s->segs);
4113 82218 : return ok;
4114 : }
4115 :
4116 : static void
4117 309982 : merge_cs( column_storage *cs, const char* caller)
4118 : {
4119 309982 : if (cs->bid && cs->ucnt) {
4120 2725 : BAT *cur = temp_descriptor(cs->bid);
4121 2725 : BAT *ui = temp_descriptor(cs->uibid);
4122 2725 : BAT *uv = temp_descriptor(cs->uvbid);
4123 :
4124 2725 : if (!cur || !ui || !uv) {
4125 0 : bat_destroy(ui);
4126 0 : bat_destroy(uv);
4127 0 : bat_destroy(cur);
4128 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4129 : return;
4130 : }
4131 2725 : assert(BATcount(ui) == BATcount(uv));
4132 :
4133 : /* any updates */
4134 2725 : assert(!isEbat(cur));
4135 2725 : if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
4136 0 : bat_destroy(ui);
4137 0 : bat_destroy(uv);
4138 0 : bat_destroy(cur);
4139 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4140 : return;
4141 : }
4142 : /* cleanup the old deltas */
4143 2725 : temp_destroy(cs->uibid);
4144 2725 : temp_destroy(cs->uvbid);
4145 2725 : cs->uibid = e_bat(TYPE_oid);
4146 2725 : cs->uvbid = e_bat(cur->ttype);
4147 2725 : assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
4148 2725 : cs->ucnt = 0;
4149 2725 : bat_destroy(ui);
4150 2725 : bat_destroy(uv);
4151 2725 : bat_destroy(cur);
4152 : }
4153 309982 : cs->cleared = false;
4154 309982 : cs->merged = true;
4155 309982 : return;
4156 : }
4157 :
4158 : static void
4159 269928 : merge_delta( sql_delta *obat)
4160 : {
4161 269928 : if (obat && obat->next && !obat->cs.merged)
4162 28958 : merge_delta(obat->next);
4163 269928 : merge_cs(&obat->cs, __func__);
4164 269928 : }
4165 :
4166 : static void
4167 40054 : merge_storage(storage *tdb)
4168 : {
4169 40054 : merge_cs(&tdb->cs, __func__);
4170 :
4171 40054 : if (tdb->next) {
4172 275 : destroy_storage(tdb->next);
4173 275 : tdb->next = NULL;
4174 : }
4175 40054 : }
4176 :
4177 : static sql_delta *
4178 1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
4179 : {
4180 : /* commit ie copy back to the parent transaction */
4181 1 : if (delta && delta->cs.ts == commit_ts && delta->next) {
4182 1 : sql_delta *od = delta->next;
4183 1 : if (od->cs.ts == commit_ts) {
4184 0 : sql_delta t = *od, *n = od->next;
4185 0 : *od = *delta;
4186 0 : od->next = n;
4187 0 : *delta = t;
4188 0 : delta->next = NULL;
4189 0 : destroy_delta(delta, true);
4190 0 : return od;
4191 : }
4192 : }
4193 : return delta;
4194 : }
4195 :
4196 : static int
4197 132965 : log_update_col( sql_trans *tr, sql_change *change)
4198 : {
4199 132965 : sql_column *c = (sql_column*)change->obj;
4200 132965 : assert(!isTempTable(c->t));
4201 :
4202 132965 : if (isDeleted(c->t)) {
4203 0 : change->handled = true;
4204 0 : return LOG_OK;
4205 : }
4206 :
4207 132965 : if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
4208 132965 : storage *s = ATOMIC_PTR_GET(&c->t->data);
4209 132965 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
4210 132965 : return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
4211 : }
4212 : return LOG_OK;
4213 : }
4214 :
4215 : static int
4216 207 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
4217 : {
4218 207 : sqlstore *store = Store;
4219 :
4220 207 : sql_delta *d = (sql_delta*)change->data;
4221 207 : if (d->cs.ts < oldest) {
4222 78 : destroy_delta(d, false);
4223 78 : if (change->commit == &commit_update_idx)
4224 2 : table_destroy(store, ((sql_idx*)change->obj)->t);
4225 : else
4226 76 : table_destroy(store, ((sql_column*)change->obj)->t);
4227 78 : return 1;
4228 : }
4229 129 : if (d->cs.ts > TRANSACTION_ID_BASE)
4230 78 : d->cs.ts = store_get_timestamp(store) + 1;
4231 : return 0;
4232 : }
4233 :
4234 : static int
4235 9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
4236 : {
4237 9 : sqlstore *store = Store;
4238 :
4239 9 : storage *d = (storage*)change->data;
4240 9 : if (d->cs.ts < oldest) {
4241 3 : destroy_storage(d);
4242 3 : table_destroy(store, (sql_table*)change->obj);
4243 3 : return 1;
4244 : }
4245 6 : if (d->cs.ts > TRANSACTION_ID_BASE)
4246 3 : d->cs.ts = store_get_timestamp(store) + 1;
4247 : return 0;
4248 : }
4249 :
4250 : static int
4251 133079 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
4252 : {
4253 133079 : (void) type; // TODO transaction_layer_revamp remove if remains unused
4254 :
4255 133079 : sql_delta *delta = ATOMIC_PTR_GET(data);
4256 :
4257 133079 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4258 3 : int ok = LOG_OK;
4259 3 : assert(isTempTable(t));
4260 3 : if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
4261 0 : ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
4262 3 : if (!tr->parent)
4263 0 : t->base.new = base->new = 0;
4264 3 : change->handled = true;
4265 3 : return ok;
4266 : }
4267 :
4268 133076 : if (commit_ts)
4269 132998 : delta->cs.ts = commit_ts;
4270 133076 : if (!commit_ts) { /* rollback */
4271 78 : sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
4272 :
4273 78 : if (change->ts && t->base.new) /* handled by create col */
4274 : return LOG_OK;
4275 78 : if (o != d) {
4276 0 : while(o && o->next != d)
4277 : o = o->next;
4278 : }
4279 78 : if (o == ATOMIC_PTR_GET(data))
4280 78 : ATOMIC_PTR_SET(data, d->next);
4281 : else
4282 0 : o->next = d->next;
4283 78 : d->next = NULL;
4284 78 : change->cleanup = &tc_gc_rollbacked;
4285 132998 : } else if (!tr->parent) {
4286 : /* merge deltas */
4287 274539 : while (delta && delta->cs.ts > oldest)
4288 141542 : delta = delta->next;
4289 132997 : if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
4290 25329 : lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
4291 25329 : merge_delta(delta);
4292 25329 : unlock_column(tr->store, base->id);
4293 : }
4294 1 : } else if (tr->parent) /* move delta into older and cleanup current save points */
4295 1 : ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
4296 : return LOG_OK;
4297 : }
4298 :
4299 : static int
4300 133051 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4301 : {
4302 :
4303 133051 : sql_column *c = (sql_column*)change->obj;
4304 133051 : sql_base* base = &c->base;
4305 133051 : sql_table* t = c->t;
4306 133051 : ATOMIC_PTR_TYPE* data = &c->data;
4307 133051 : int type = c->type.type->localtype;
4308 :
4309 133051 : if (change->handled || isDeleted(c->t))
4310 : return LOG_OK;
4311 :
4312 133051 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4313 : }
4314 :
4315 : static int
4316 26 : log_update_idx( sql_trans *tr, sql_change *change)
4317 : {
4318 26 : sql_idx *i = (sql_idx*)change->obj;
4319 26 : assert(!isTempTable(i->t));
4320 :
4321 26 : if (isDeleted(i->t)) {
4322 0 : change->handled = true;
4323 0 : return LOG_OK;
4324 : }
4325 :
4326 26 : if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
4327 26 : storage *s = ATOMIC_PTR_GET(&i->t->data);
4328 26 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
4329 26 : return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
4330 : }
4331 : return LOG_OK;
4332 : }
4333 :
4334 : static int
4335 28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4336 : {
4337 28 : sql_idx *i = (sql_idx*)change->obj;
4338 28 : sql_base* base = &i->base;
4339 28 : sql_table* t = i->t;
4340 28 : ATOMIC_PTR_TYPE* data = &i->data;
4341 28 : int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
4342 :
4343 28 : if (change->handled || isDeleted(i->t))
4344 : return LOG_OK;
4345 :
4346 28 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4347 : }
4348 :
4349 : static storage *
4350 26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
4351 : {
4352 26 : if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
4353 0 : storage *od = dbat->next;
4354 0 : if (od->cs.ts == commit_ts) {
4355 0 : storage t = *od, *n = od->next;
4356 0 : *od = *dbat;
4357 0 : od->next = n;
4358 0 : *dbat = t;
4359 0 : dbat->next = NULL;
4360 0 : destroy_storage(dbat);
4361 0 : return od;
4362 : }
4363 : }
4364 : return dbat;
4365 : }
4366 :
4367 : static int
4368 82218 : log_update_del( sql_trans *tr, sql_change *change)
4369 : {
4370 82218 : sql_table *t = (sql_table*)change->obj;
4371 82218 : assert(!isTempTable(t));
4372 :
4373 82218 : if (isDeleted(t)) {
4374 0 : change->handled = true;
4375 0 : return LOG_OK;
4376 : }
4377 :
4378 82218 : if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
4379 82218 : return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
4380 : return LOG_OK;
4381 : }
4382 :
4383 : static int
4384 86028 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4385 : {
4386 86028 : int ok = LOG_OK;
4387 86028 : sql_table *t = (sql_table*)change->obj;
4388 86028 : storage *dbat = ATOMIC_PTR_GET(&t->data);
4389 :
4390 86028 : if (change->handled || isDeleted(t))
4391 : return ok;
4392 :
4393 86028 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4394 22 : assert(isTempTable(t));
4395 22 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
4396 22 : if (commit_ts) dbat->segs->h->ts = commit_ts;
4397 22 : change->handled = true;
4398 22 : return ok;
4399 : }
4400 :
4401 86006 : lock_table(tr->store, t->base.id);
4402 86006 : if (!commit_ts) { /* rollback */
4403 3491 : if (dbat->cs.ts == tr->tid) {
4404 6 : if (change->ts && t->base.new) { /* handled by the create table */
4405 3 : unlock_table(tr->store, t->base.id);
4406 3 : return ok;
4407 : }
4408 3 : storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
4409 :
4410 3 : if (o != d) {
4411 0 : while(o && o->next != d)
4412 : o = o->next;
4413 : }
4414 3 : if (o == ATOMIC_PTR_GET(&t->data)) {
4415 3 : assert(d->next);
4416 3 : ATOMIC_PTR_SET(&t->data, d->next);
4417 : } else
4418 0 : o->next = d->next;
4419 3 : d->next = NULL;
4420 3 : change->cleanup = &tc_gc_rollbacked_storage;
4421 : } else
4422 3485 : rollback_segments(dbat->segs, tr, change, oldest);
4423 82515 : } else if (ok == LOG_OK && !tr->parent) {
4424 82489 : if (dbat->cs.ts == tr->tid) /* cleared table */
4425 26005 : dbat->cs.ts = commit_ts;
4426 :
4427 82489 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
4428 82489 : if (ok == LOG_OK) {
4429 82489 : merge_segments(dbat, tr, change, commit_ts, oldest);
4430 82489 : if (oldest == commit_ts)
4431 40054 : merge_storage(dbat);
4432 : }
4433 82489 : if (dbat)
4434 82489 : dbat->cs.cleared = false;
4435 26 : } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
4436 26 : merge_segments(dbat, tr, change, commit_ts, oldest);
4437 26 : ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
4438 26 : storage *s = change->data;
4439 26 : if (s->cs.ts == tr->tid)
4440 0 : s->cs.ts = commit_ts;
4441 : }
4442 86003 : unlock_table(tr->store, t->base.id);
4443 86003 : return ok;
4444 : }
4445 :
4446 : /* only rollback (content version) case for now */
4447 : static int
4448 194 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
4449 : {
4450 194 : sqlstore *store = Store;
4451 194 : sql_column *c = (sql_column*)change->obj;
4452 :
4453 194 : if (!c) /* cleaned earlier */
4454 : return 1;
4455 :
4456 145 : if (change->handled || isDeleted(c->t)) {
4457 0 : column_destroy(store, c);
4458 0 : return 1;
4459 : }
4460 :
4461 : /* savepoint commit (did it merge ?) */
4462 145 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4463 0 : column_destroy(store, c);
4464 0 : return 1;
4465 : }
4466 145 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4467 : return 0;
4468 144 : sql_delta *d = (sql_delta*)change->data;
4469 144 : if (d && d->next) {
4470 :
4471 62 : if (d->cs.ts > oldest)
4472 : return LOG_OK; /* cannot cleanup yet */
4473 :
4474 : // d is oldest reachable delta
4475 59 : if (d->next) // Unreachable can immediately be destroyed.
4476 59 : destroy_delta(d->next, true);
4477 :
4478 59 : d->next = NULL;
4479 59 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4480 59 : merge_delta(d);
4481 59 : unlock_column(store, c->base.id);
4482 : }
4483 141 : column_destroy(store, c);
4484 141 : return 1;
4485 : }
4486 :
4487 : static int
4488 788563 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
4489 : {
4490 788563 : sqlstore *store = Store;
4491 788563 : sql_column *c = (sql_column*)change->obj;
4492 :
4493 788563 : if (!c) /* cleaned earlier */
4494 : return 1;
4495 :
4496 788563 : if (change->handled || isDeleted(c->t)) {
4497 3 : table_destroy(store, c->t);
4498 3 : return 1;
4499 : }
4500 :
4501 : /* savepoint commit (did it merge ?) */
4502 788560 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4503 29845 : table_destroy(store, c->t);
4504 29845 : return 1;
4505 : }
4506 758715 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4507 : return 0;
4508 758714 : sql_delta *d = (sql_delta*)change->data;
4509 758714 : if (d && d->next) {
4510 :
4511 758714 : if (d->cs.ts > oldest)
4512 : return LOG_OK; /* cannot cleanup yet */
4513 :
4514 : // d is oldest reachable delta
4515 103067 : if (d->next) // Unreachable can immediately be destroyed.
4516 103067 : destroy_delta(d->next, true);
4517 :
4518 103067 : d->next = NULL;
4519 103067 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4520 103067 : merge_delta(d);
4521 103067 : unlock_column(store, c->base.id);
4522 : }
4523 103067 : table_destroy(store, c->t);
4524 103067 : return 1;
4525 : }
4526 :
4527 : static int
4528 1116 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
4529 : {
4530 1116 : sqlstore *store = Store;
4531 1116 : sql_idx *i = (sql_idx*)change->obj;
4532 :
4533 1116 : if (!i) /* cleaned earlier */
4534 : return 1;
4535 :
4536 620 : if (change->handled || isDeleted(i->t)) {
4537 0 : idx_destroy(store, i);
4538 0 : return 1;
4539 : }
4540 :
4541 : /* savepoint commit (did it merge ?) */
4542 620 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4543 0 : idx_destroy(store, i);
4544 0 : return 1;
4545 : }
4546 620 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4547 : return 0;
4548 620 : sql_delta *d = (sql_delta*)change->data;
4549 620 : if (d->next) {
4550 :
4551 0 : if (d->cs.ts > oldest)
4552 : return LOG_OK; /* cannot cleanup yet */
4553 :
4554 : // d is oldest reachable delta
4555 0 : if (d->next) // Unreachable can immediately be destroyed.
4556 0 : destroy_delta(d->next, true);
4557 :
4558 0 : d->next = NULL;
4559 0 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4560 0 : merge_delta(d);
4561 0 : unlock_column(store, i->base.id);
4562 : }
4563 620 : idx_destroy(store, i);
4564 620 : return 1;
4565 : }
4566 :
4567 : static int
4568 26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
4569 : {
4570 26 : sqlstore *store = Store;
4571 26 : sql_idx *i = (sql_idx*)change->obj;
4572 :
4573 26 : if (!i) /* cleaned earlier */
4574 : return 1;
4575 :
4576 26 : if (change->handled || isDeleted(i->t)) {
4577 0 : table_destroy(store, i->t);
4578 0 : return 1;
4579 : }
4580 :
4581 : /* savepoint commit (did it merge ?) */
4582 26 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4583 0 : table_destroy(store, i->t);
4584 0 : return 1;
4585 : }
4586 26 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4587 : return 0;
4588 26 : sql_delta *d = (sql_delta*)change->data;
4589 26 : if (d->next) {
4590 :
4591 26 : if (d->cs.ts > oldest)
4592 : return LOG_OK; /* cannot cleanup yet */
4593 :
4594 : // d is oldest reachable delta
4595 26 : if (d->next) // Unreachable can immediately be destroyed.
4596 26 : destroy_delta(d->next, true);
4597 :
4598 26 : d->next = NULL;
4599 26 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4600 26 : merge_delta(d);
4601 26 : unlock_column(store, i->base.id);
4602 : }
4603 26 : table_destroy(store, i->t);
4604 26 : return 1;
4605 : }
4606 :
4607 : static int
4608 226993 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
4609 : {
4610 226993 : sqlstore *store = Store;
4611 226993 : sql_table *t = (sql_table*)change->obj;
4612 :
4613 226993 : if (change->handled || isDeleted(t)) {
4614 3213 : table_destroy(store, t);
4615 3213 : return 1;
4616 : }
4617 : /* savepoint commit (did it merge ?) */
4618 223780 : if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
4619 6061 : table_destroy(store, t);
4620 6061 : return 1;
4621 : }
4622 217719 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4623 : return 0;
4624 217691 : storage *d = (storage*)change->data;
4625 217691 : if (d->next) {
4626 150206 : if (d->cs.ts > oldest)
4627 : return LOG_OK; /* cannot cleanup yet */
4628 :
4629 19670 : destroy_storage(d->next);
4630 19670 : d->next = NULL;
4631 : }
4632 87155 : table_destroy(store, t);
4633 87155 : return 1;
4634 : }
4635 :
4636 : static int
4637 30098 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
4638 : {
4639 30098 : if (nr == 0)
4640 : return LOG_OK;
4641 30098 : assert (nr > 0);
4642 30098 : if ((!offsets || !*offsets) && nr == total) {
4643 30079 : *offset = slot;
4644 30079 : return LOG_OK;
4645 : }
4646 19 : if (!*offsets) {
4647 7 : *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
4648 7 : if (!*offsets)
4649 : return LOG_ERR;
4650 : }
4651 19 : oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
4652 16158 : for(size_t i = 0; i < nr; i++)
4653 16139 : dst[i] = slot + i;
4654 19 : (*offsets)->batCount += nr;
4655 19 : (*offsets)->theap->dirty = true;
4656 19 : return LOG_OK;
4657 : }
4658 :
4659 : static int
4660 29918 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4661 : {
4662 29918 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4663 30235 : assert(s->segs);
4664 30235 : ulng oldest = store_oldest(tr->store, NULL);
4665 29965 : BUN slot = 0;
4666 29965 : size_t total = cnt;
4667 :
4668 29965 : if (!locked)
4669 29967 : lock_table(tr->store, t->base.id);
4670 : /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
4671 60421 : for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4672 30308 : if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* re-use old deleted or rolledback append */
4673 35 : if ((seg->end - seg->start) >= cnt) {
4674 : /* if previous is claimed before we could simply adjust the end/start */
4675 13 : if (p && p->ts == tr->tid && !p->deleted) {
4676 2 : slot = p->end;
4677 2 : p->end += cnt;
4678 2 : seg->start += cnt;
4679 2 : if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
4680 : ok = LOG_ERR;
4681 : break;
4682 : }
4683 2 : cnt = 0;
4684 2 : break;
4685 : }
4686 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4687 11 : size_t rcnt = seg->end - seg->start;
4688 11 : if (rcnt > cnt)
4689 : rcnt = cnt;
4690 11 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
4691 : ok = LOG_ERR;
4692 : break;
4693 : }
4694 : }
4695 33 : seg->ts = tr->tid;
4696 33 : seg->deleted = false;
4697 33 : slot = seg->start;
4698 33 : if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
4699 : ok = LOG_ERR;
4700 : break;
4701 : }
4702 0 : cnt -= (seg->end - seg->start);
4703 : }
4704 : }
4705 30115 : if (ok == LOG_OK && cnt) {
4706 30102 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4707 29158 : slot = s->segs->t->end;
4708 29158 : s->segs->t->end += cnt;
4709 : } else {
4710 944 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4711 : ok = LOG_ERR;
4712 : } else {
4713 1059 : if (!s->segs->h)
4714 0 : s->segs->h = s->segs->t;
4715 1059 : slot = s->segs->t->start;
4716 : }
4717 : }
4718 30217 : if (ok == LOG_OK)
4719 30217 : ok = add_offsets(slot, cnt, total, offset, offsets);
4720 : }
4721 30107 : if (!locked)
4722 30033 : unlock_table(tr->store, t->base.id);
4723 :
4724 30327 : if (ok == LOG_OK) {
4725 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4726 30256 : if (!in_transaction) {
4727 1044 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4728 1044 : in_transaction = true;
4729 : }
4730 30256 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4731 30241 : tr->logchanges += (int) total;
4732 30252 : if (*offsets) {
4733 6 : BAT *pos = *offsets;
4734 6 : assert(BATcount(pos) == total);
4735 6 : BATsetcount(pos, total); /* set other properties */
4736 7 : pos->tnil = false;
4737 7 : pos->tnonil = true;
4738 7 : pos->tkey = true;
4739 7 : pos->tsorted = true;
4740 7 : pos->trevsorted = false;
4741 : }
4742 : }
4743 30324 : return ok;
4744 : }
4745 :
4746 : static int
4747 2006351 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4748 : {
4749 2006351 : if (cnt > 1 && offsets)
4750 29917 : return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
4751 1976434 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4752 1976443 : assert(s->segs);
4753 1976443 : ulng oldest = store_oldest(tr->store, NULL);
4754 1976436 : BUN slot = 0;
4755 1976436 : int reused = 0;
4756 :
4757 1976436 : if (!locked)
4758 1858093 : lock_table(tr->store, t->base.id);
4759 : /* naive vacuum approach, iterator through segments, check for large enough deleted segments
4760 : * or create new segment at the end */
4761 8358146 : for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4762 6452538 : if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* re-use old deleted or rolledback append */
4763 :
4764 70837 : if ((seg->end - seg->start) >= cnt) {
4765 :
4766 : /* if previous is claimed before we could simply adjust the end/start */
4767 70837 : if (p && p->ts == tr->tid && !p->deleted) {
4768 56697 : slot = p->end;
4769 56697 : p->end += cnt;
4770 56697 : seg->start += cnt;
4771 56697 : reused = 1;
4772 56697 : break;
4773 : }
4774 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4775 14140 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
4776 : ok = LOG_ERR;
4777 : break;
4778 : }
4779 : }
4780 14140 : seg->ts = tr->tid;
4781 14140 : seg->deleted = false;
4782 14140 : slot = seg->start;
4783 14140 : reused = 1;
4784 14140 : break;
4785 : }
4786 : }
4787 1976445 : if (ok == LOG_OK && !reused) {
4788 1905609 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4789 1870978 : slot = s->segs->t->end;
4790 1870978 : s->segs->t->end += cnt;
4791 : } else {
4792 34631 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4793 : ok = LOG_ERR;
4794 : } else {
4795 34631 : if (!s->segs->h)
4796 0 : s->segs->h = s->segs->t;
4797 34631 : slot = s->segs->t->start;
4798 : }
4799 : }
4800 : }
4801 1976445 : if (!locked)
4802 1858102 : unlock_table(tr->store, t->base.id);
4803 :
4804 1976445 : if (ok == LOG_OK) {
4805 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4806 1976446 : if (!in_transaction) {
4807 46904 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4808 46904 : in_transaction = true;
4809 : }
4810 1976446 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4811 1976021 : tr->logchanges += (int) cnt;
4812 1976446 : *offset = slot;
4813 : }
4814 : return ok;
4815 : }
4816 :
4817 : /*
4818 : * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
4819 : * of the global transaction and mark the newly added storage slots unused on the global
4820 : * level but used on the local transaction level. Besides this the local transaction needs
4821 : * to update (and mark unused) any slot inbetween the old end and new slots.
4822 : * */
4823 : static int
4824 1888042 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4825 : {
4826 1888042 : storage *s;
4827 :
4828 : /* we have a single segment structure for each persistent table
4829 : * for temporary tables each has its own */
4830 1888042 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4831 : return LOG_ERR;
4832 :
4833 1888170 : return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
4834 : }
4835 :
4836 : /* some tables cannot be updated concurrently (user/roles etc) */
4837 : static int
4838 118348 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4839 : {
4840 118348 : storage *s;
4841 118348 : int res = 0;
4842 :
4843 : /* we have a single segment structure for each persistent table
4844 : * for temporary tables each has its own */
4845 118348 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4846 : /* TODO check for other inserts ! */
4847 : return LOG_ERR;
4848 :
4849 118348 : lock_table(tr->store, t->base.id);
4850 118348 : if ((res = segments_conflict(tr, s->segs, 1))) {
4851 4 : unlock_table(tr->store, t->base.id);
4852 4 : return LOG_CONFLICT;
4853 : }
4854 118344 : res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
4855 118344 : unlock_table(tr->store, t->base.id);
4856 118344 : return res;
4857 : }
4858 :
4859 : static int
4860 13968 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
4861 : {
4862 13968 : storage *s;
4863 13968 : int res = 0;
4864 :
4865 13968 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4866 : return LOG_ERR;
4867 :
4868 13968 : lock_table(tr->store, t->base.id);
4869 13968 : res = segments_conflict(tr, s->segs, uncommitted);
4870 13968 : unlock_table(tr->store, t->base.id);
4871 13968 : return res ? LOG_CONFLICT : LOG_OK;
4872 : }
4873 :
4874 : static size_t
4875 1408044 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
4876 : {
4877 1408044 : size_t cnt = 0;
4878 :
4879 1580955 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
4880 : ;
4881 :
4882 3498816 : for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
4883 2090754 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
4884 120807 : cnt += s->end - s->start;
4885 : }
4886 1408062 : return cnt;
4887 : }
4888 :
4889 : static BAT *
4890 1408017 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
4891 : {
4892 1408017 : lock_table(tr->store, t->base.id);
4893 1408063 : segment *s = S->segs->h;
4894 : /* step one no deletes -> dense range */
4895 1408063 : uint32_t cur = 0;
4896 1408063 : size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
4897 1408069 : if (!dnr) {
4898 1301929 : unlock_table(tr->store, t->base.id);
4899 1301924 : return BATdense(start, start, end-start);
4900 : }
4901 :
4902 106140 : BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
4903 106140 : if (!b) {
4904 0 : unlock_table(tr->store, t->base.id);
4905 0 : return NULL;
4906 : }
4907 :
4908 106140 : uint32_t *restrict dst = Tloc(b, 0);
4909 59490587 : for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
4910 59442140 : if (s->end < start)
4911 83316 : continue;
4912 59358824 : if (s->start >= end)
4913 : break;
4914 59301131 : msk m = (SEG_IS_VALID(s, tr));
4915 59301131 : size_t lnr = s->end-s->start;
4916 59301131 : if (s->start < start)
4917 10750 : lnr -= (start - s->start);
4918 59301131 : if (s->end > end)
4919 7816 : lnr -= s->end - end;
4920 :
4921 59301131 : if (m) {
4922 972854 : size_t used = pos&31, end = 32;
4923 972854 : if (used) {
4924 843770 : if (lnr < (32-used))
4925 489833 : end = used + lnr;
4926 843770 : assert(end > used);
4927 843770 : cur |= ((1U << (end - used)) - 1) << used;
4928 843770 : lnr -= end - used;
4929 843770 : pos += end - used;
4930 843770 : if (end == 32) {
4931 353937 : *dst++ = cur;
4932 353937 : cur = 0;
4933 : }
4934 : }
4935 972854 : size_t full = lnr/32;
4936 972854 : size_t rest = lnr%32;
4937 972854 : if (full > 0) {
4938 273427 : memset(dst, ~0, full * sizeof(*dst));
4939 273427 : dst += full;
4940 273427 : lnr -= full * 32;
4941 273427 : pos += full * 32;
4942 : }
4943 972854 : if (rest > 0) {
4944 455704 : cur |= (1U << rest) - 1;
4945 455704 : lnr -= rest;
4946 455704 : pos += rest;
4947 : }
4948 972854 : assert(lnr==0);
4949 : } else {
4950 58328277 : size_t used = pos&31, end = 32;
4951 58328277 : if (used) {
4952 56496093 : if (lnr < (32-used))
4953 54578819 : end = used + lnr;
4954 :
4955 56496093 : pos+= (end-used);
4956 56496093 : lnr-= (end-used);
4957 56496093 : if (end == 32) {
4958 1917274 : *dst++ = cur;
4959 1917274 : cur = 0;
4960 : }
4961 : }
4962 58328277 : size_t full = lnr/32;
4963 58328277 : size_t rest = lnr%32;
4964 58328277 : memset(dst, 0, full * sizeof(*dst));
4965 58328277 : dst += full;
4966 58328277 : lnr -= full * 32;
4967 58328277 : pos += full * 32;
4968 58328277 : pos+= rest;
4969 58328277 : lnr-= rest;
4970 58328277 : assert(lnr==0);
4971 : }
4972 : }
4973 :
4974 106140 : unlock_table(tr->store, t->base.id);
4975 106140 : if (pos%32)
4976 101405 : *dst=cur;
4977 106140 : BATsetcount(b, nr);
4978 106140 : bn = BATmaskedcands(start, nr, b, true);
4979 106139 : BBPreclaim(b);
4980 106138 : (void)pos;
4981 106138 : assert (pos == nr);
4982 : return bn;
4983 : }
4984 :
4985 : static void * /* BAT * */
4986 1413543 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
4987 : {
4988 : /* with nr_of_parts - part_nr we can adjust parts */
4989 1413543 : storage *s = tab_timestamp_storage(tr, t);
4990 :
4991 1413717 : if (!s)
4992 : return NULL;
4993 1413717 : size_t nr = segs_end(s->segs, tr, t);
4994 :
4995 1414134 : if (!nr)
4996 6103 : return BATdense(0, 0, 0);
4997 :
4998 : /* compute proper part */
4999 1408031 : size_t part_size = nr/nr_of_parts;
5000 1408031 : size_t start = part_size * part_nr;
5001 1408031 : size_t end = start + part_size;
5002 1408031 : if (part_nr == (nr_of_parts-1))
5003 1260751 : end = nr;
5004 1408031 : assert(end <= nr);
5005 1408031 : return segments2cands(s, tr, t, start, end);
5006 : }
5007 :
5008 : static int
5009 1 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
5010 : {
5011 1 : bool update_conflict = false;
5012 :
5013 1 : if (segments_in_transaction(tr, col->t))
5014 : return LOG_CONFLICT;
5015 :
5016 1 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5017 :
5018 1 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5019 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5020 1 : assert(d && d->cs.ts == tr->tid);
5021 1 : if (odelta != d)
5022 1 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
5023 1 : if (d->cs.bid)
5024 1 : temp_destroy(d->cs.bid);
5025 1 : if (d->cs.uibid)
5026 1 : temp_destroy(d->cs.uibid);
5027 1 : if (d->cs.uvbid)
5028 1 : temp_destroy(d->cs.uvbid);
5029 1 : bat_set_access(bn, BAT_READ);
5030 1 : d->cs.bid = temp_create(bn);
5031 1 : d->cs.uibid = 0;
5032 1 : d->cs.uvbid = 0;
5033 1 : d->cs.ucnt = 0;
5034 1 : d->cs.cleared = true;
5035 1 : d->cs.ts = tr->tid;
5036 1 : ATOMIC_INIT(&d->cs.refcnt, 1);
5037 1 : return LOG_OK;
5038 : }
5039 :
5040 : static int
5041 58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
5042 : {
5043 58 : bool update_conflict = false;
5044 :
5045 58 : if (segments_in_transaction(tr, col->t))
5046 : return LOG_CONFLICT;
5047 :
5048 58 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5049 :
5050 58 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5051 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5052 58 : assert(d && d->cs.ts == tr->tid);
5053 58 : assert(col->t->persistence != SQL_DECLARED_TABLE);
5054 58 : if (odelta != d)
5055 58 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
5056 :
5057 58 : d->cs.st = st;
5058 58 : d->cs.cleared = true;
5059 58 : if (d->cs.bid)
5060 58 : temp_destroy(d->cs.bid);
5061 58 : o = transfer_to_systrans(o);
5062 58 : if (o == NULL)
5063 : return LOG_ERR;
5064 58 : bat_set_access(o, BAT_READ);
5065 58 : d->cs.bid = temp_create(o);
5066 58 : if (u) {
5067 53 : if (d->cs.ebid)
5068 0 : temp_destroy(d->cs.ebid);
5069 53 : u = transfer_to_systrans(u);
5070 53 : if (u == NULL)
5071 : return LOG_ERR;
5072 53 : d->cs.ebid = temp_create(u);
5073 : }
5074 : return LOG_OK;
5075 : }
5076 :
5077 : void
5078 341 : bat_storage_init( store_functions *sf)
5079 : {
5080 341 : sf->bind_col = &bind_col;
5081 341 : sf->bind_updates = &bind_updates;
5082 341 : sf->bind_updates_idx = &bind_updates_idx;
5083 341 : sf->bind_idx = &bind_idx;
5084 341 : sf->bind_cands = &bind_cands;
5085 :
5086 341 : sf->claim_tab = &claim_tab;
5087 341 : sf->key_claim_tab = &key_claim_tab;
5088 341 : sf->tab_validate = &tab_validate;
5089 :
5090 341 : sf->append_col = &append_col;
5091 341 : sf->append_idx = &append_idx;
5092 :
5093 341 : sf->update_col = &update_col;
5094 341 : sf->update_idx = &update_idx;
5095 :
5096 341 : sf->delete_tab = &delete_tab;
5097 :
5098 341 : sf->count_del = &count_del;
5099 341 : sf->count_col = &count_col;
5100 341 : sf->count_idx = &count_idx;
5101 341 : sf->dcount_col = &dcount_col;
5102 341 : sf->min_max_col = &min_max_col;
5103 341 : sf->set_stats_col = &set_stats_col;
5104 341 : sf->sorted_col = &sorted_col;
5105 341 : sf->unique_col = &unique_col;
5106 341 : sf->double_elim_col = &double_elim_col;
5107 341 : sf->col_stats = &col_stats;
5108 341 : sf->col_set_range = &col_set_range;
5109 341 : sf->col_not_null = &col_not_null;
5110 :
5111 341 : sf->col_dup = &col_dup;
5112 341 : sf->idx_dup = &idx_dup;
5113 341 : sf->del_dup = &del_dup;
5114 :
5115 341 : sf->create_col = &create_col; /* create and add to change list */
5116 341 : sf->create_idx = &create_idx;
5117 341 : sf->create_del = &create_del;
5118 :
5119 341 : sf->destroy_col = &destroy_col; /* free resources */
5120 341 : sf->destroy_idx = &destroy_idx;
5121 341 : sf->destroy_del = &destroy_del;
5122 :
5123 341 : sf->drop_col = &drop_col; /* add drop to change list */
5124 341 : sf->drop_idx = &drop_idx;
5125 341 : sf->drop_del = &drop_del;
5126 :
5127 341 : sf->clear_table = &clear_table;
5128 :
5129 341 : sf->swap_bats = &swap_bats;
5130 341 : sf->col_compress = &col_compress;
5131 341 : }
5132 :
5133 : #if 0
5134 : static lng
5135 : log_get_nr_inserted(sql_column *fc, lng *offset)
5136 : {
5137 : lng cnt = 0;
5138 :
5139 : if (!fc || GDKinmemory(0))
5140 : return 0;
5141 :
5142 : if (fc->base.atime && fc->base.allocated) {
5143 : sql_delta *fb = fc->data;
5144 : BAT *ins = temp_descriptor(fb->cs.bid);
5145 :
5146 : if (ins && BATcount(ins) > 0 && BATcount(ins) > ins->batInserted) {
5147 : cnt = BATcount(ins) - ins->batInserted;
5148 : }
5149 : bat_destroy(ins);
5150 : }
5151 : return cnt;
5152 : }
5153 :
5154 : static lng
5155 : log_get_nr_deleted(sql_table *ft, lng *offset)
5156 : {
5157 : lng cnt = 0;
5158 :
5159 : if (!ft || GDKinmemory(0))
5160 : return 0;
5161 :
5162 : if (ft->base.atime && ft->base.allocated) {
5163 : storage *fdb = ft->data;
5164 : BAT *db = temp_descriptor(fdb->cs.bid);
5165 :
5166 : if (db && BATcount(db) > 0 && BATcount(db) > db->batInserted) {
5167 : cnt = BATcount(db) - db->batInserted;
5168 : *offset = db->batInserted;
5169 : }
5170 : bat_destroy(db);
5171 : }
5172 : return cnt;
5173 : }
5174 : #endif
|