LCOV - code coverage report
Current view: top level - sql/storage/bat - bat_storage.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 2480 3150 78.7 %
Date: 2024-12-20 21:24:02 Functions: 157 165 95.2 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "bat_storage.h"
      15             : #include "bat_utils.h"
      16             : #include "sql_string.h"
      17             : #include "matomic.h"
      18             : 
      19             : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
      20             : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
      21             : 
      22             : static int log_update_col( sql_trans *tr, sql_change *c);
      23             : static int log_update_idx( sql_trans *tr, sql_change *c);
      24             : static int log_update_del( sql_trans *tr, sql_change *c);
      25             : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      26             : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      27             : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      28             : static int log_create_col(sql_trans *tr, sql_change *change);
      29             : static int log_create_idx(sql_trans *tr, sql_change *change);
      30             : static int log_create_del(sql_trans *tr, sql_change *change);
      31             : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      32             : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      33             : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      34             : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
      35             : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
      36             : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
      37             : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
      38             : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
      39             : 
      40             : static lng merge_delta( sql_delta *obat);
      41             : 
      42             : /* valid
      43             :  * !deleted && VALID_4_READ(TS, tr)                             existing or newly created segment
      44             :  *  deleted && TS > tr->ts && OLDTS < tr->ts                deleted after current transaction
      45             :  */
      46             : 
      47             : #define VALID_4_READ(TS,tr) \
      48             :         (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
      49             : 
      50             : /* when changed, check if the old status is still valid */
      51             : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
      52             :                 (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
      53             : 
      54             : #define SEG_VALID_4_DELETE(seg,tr) \
      55             :         (!seg->deleted && VALID_4_READ(seg->ts, tr))
      56             : 
      57             : /* Delete (in current trans or by some other finished transaction, or re-used segment which used to be deleted */
      58             : #define SEG_IS_DELETED(seg,tr) \
      59             :         ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
      60             :          (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
      61             : 
      62             : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
      63             : #define SEG_IS_VALID(seg, tr) \
      64             :                 ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
      65             :                  (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
      66             : 
      67             : static inline BAT *
      68        5261 : transfer_to_systrans(BAT *b)
      69             : {
      70             :         /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
      71        5261 :         MT_lock_set(&b->theaplock);
      72        5261 :         if (VIEWtparent(b) || VIEWvtparent(b)) {
      73          29 :                 MT_lock_unset(&b->theaplock);
      74          29 :                 BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
      75          29 :                 BBPreclaim(b);
      76          29 :                 return bn;
      77             :         }
      78        5232 :         if (b->theap->farmid == TRANSIENT ||
      79          62 :                 (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
      80        4847 :                 QryCtx *qc = MT_thread_get_qry_ctx();
      81        4847 :                 if (qc) {
      82        2523 :                         if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
      83        2523 :                                 ATOMIC_SUB(&qc->datasize, b->theap->size);
      84        2523 :                                 b->theap->farmid = SYSTRANS;
      85        2523 :                                 b->batRole = SYSTRANS;
      86             :                         }
      87        2523 :                         if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
      88        1092 :                                 ATOMIC_SUB(&qc->datasize, b->tvheap->size);
      89        1092 :                                 b->tvheap->farmid = SYSTRANS;
      90             :                         }
      91             :                 }
      92             :         }
      93        5232 :         MT_lock_unset(&b->theaplock);
      94        5232 :         return b;
      95             : }
      96             : 
      97             : static void
      98    28750964 : lock_table(sqlstore *store, sqlid id)
      99             : {
     100    28750964 :         MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     101    28841076 : }
     102             : 
     103             : static void
     104    28841588 : unlock_table(sqlstore *store, sqlid id)
     105             : {
     106    28841588 :         MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     107    28839166 : }
     108             : 
     109             : static void
     110    21020256 : lock_column(sqlstore *store, sqlid id)
     111             : {
     112    21020256 :         MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     113    21043163 : }
     114             : 
     115             : static void
     116    21047231 : unlock_column(sqlstore *store, sqlid id)
     117             : {
     118    21047231 :         MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     119    21056612 : }
     120             : 
     121             : static void
     122      114655 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     123             : {
     124      114655 :         assert(cleanup);
     125      114655 :         trans_add(tr, dup_base(b), data, cleanup, commit, log);
     126      114654 : }
     127             : 
     128             : static void
     129      132850 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     130             : {
     131      132850 :         assert(cleanup);
     132      132850 :         dup_base(&t->base);
     133      132850 :         trans_add(tr, b, data, cleanup, commit, log);
     134      132839 : }
     135             : 
     136             : static int
     137       88755 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
     138             : {
     139       88755 :         segment *s = change->data;
     140             : 
     141       88755 :         if (s->ts <= oldest) {
     142       33719 :                 while(s) {
     143       21073 :                         segment *n = s->prev;
     144       21073 :                         ATOMIC_PTR_DESTROY(&s->next);
     145       21073 :                         _DELETE(s);
     146       21073 :                         s = n;
     147             :                 }
     148       12646 :                 sqlstore *store = Store;
     149       12646 :                 table_destroy(store, (sql_table*)change->obj);
     150       12646 :                 return 1;
     151             :         }
     152             :         return LOG_OK;
     153             : }
     154             : 
     155             : static void
     156       21073 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
     157             : {
     158             :         /* we can only be accessed by anything older then commit_ts */
     159       21073 :         if (c->cleanup == &tc_gc_seg)
     160        8427 :                 s->prev = c->data;
     161             :         else
     162       12646 :                 c->cleanup = &tc_gc_seg;
     163       21073 :         c->data = s;
     164       21073 :         s->ts = commit_ts;
     165       16725 : }
     166             : 
     167             : static segment *
     168       88647 : new_segment(segment *o, sql_trans *tr, size_t cnt)
     169             : {
     170       88647 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     171             : 
     172       88653 :         assert(tr);
     173       88653 :         if (n) {
     174       88653 :                 *n = (segment) {
     175       88653 :                         .ts = tr->tid,
     176             :                         .oldts = 0,
     177             :                         .deleted = false,
     178             :                         .start = 0,
     179             :                         .end = cnt,
     180             :                         .next = ATOMIC_PTR_VAR_INIT(NULL),
     181             :                         .prev = NULL,
     182             :                 };
     183       88653 :                 if (o) {
     184       36148 :                         n->start += o->end;
     185       36148 :                         n->end += o->end;
     186       36148 :                         ATOMIC_PTR_SET(&o->next, n);
     187             :                 }
     188             :         }
     189       88653 :         return n;
     190             : }
     191             : 
     192             : static segment *
     193       95657 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
     194             : {
     195       95657 :         assert(tr);
     196       95657 :         if (o->start == start && o->end == start+cnt) {
     197       11345 :                 assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
     198       11345 :                 o->oldts = o->ts;
     199       11345 :                 o->ts = tr->tid;
     200       11345 :                 o->deleted = deleted;
     201       11345 :                 return o;
     202             :         }
     203       84312 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     204             : 
     205       84312 :         if (!n)
     206             :                 return NULL;
     207       84312 :         n->prev = NULL;
     208             : 
     209       84312 :         if (o->ts == tr->tid) {
     210        5913 :                 n->oldts = 0;
     211        5913 :                 n->ts = 1;
     212        5913 :                 n->deleted = true;
     213             :         } else {
     214       78399 :                 n->oldts = o->ts;
     215       78399 :                 n->ts = tr->tid;
     216       78399 :                 n->deleted = deleted;
     217             :         }
     218       84312 :         if (start == o->start) {
     219             :                 /* 2-way split: o remains latter part of segment, new one is
     220             :                  * inserted before */
     221       67967 :                 n->start = o->start;
     222       67967 :                 n->end = n->start + cnt;
     223       67967 :                 ATOMIC_PTR_INIT(&n->next, o);
     224       67967 :                 if (segs->h == o)
     225         463 :                         segs->h = n;
     226       67967 :                 if (p)
     227       67504 :                         ATOMIC_PTR_SET(&p->next, n);
     228       67967 :                 o->start = n->end;
     229       16345 :         } else if (start+cnt == o->end) {
     230             :                 /* 2-way split: o remains first part of segment, new one is
     231             :                  * added after */
     232        5719 :                 n->start = o->end - cnt;
     233        5719 :                 n->end = o->end;
     234        5719 :                 ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
     235        5719 :                 ATOMIC_PTR_SET(&o->next, n);
     236        5719 :                 if (segs->t == o)
     237         882 :                         segs->t = n;
     238        5719 :                 o->end = n->start;
     239             :         } else {
     240             :                 /* 3-way split: o remains first part of segment, two new ones
     241             :                  * are added after */
     242       10626 :                 segment *n2 = GDKmalloc(sizeof(segment));
     243       10626 :                 if (n2 == NULL) {
     244           0 :                         GDKfree(n);
     245           0 :                         return NULL;
     246             :                 }
     247       10626 :                 ATOMIC_PTR_INIT(&n->next, n2);
     248       10626 :                 n->start = start;
     249       10626 :                 n->end = start + cnt;
     250       10626 :                 *n2 = *o;
     251       10626 :                 ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
     252       10626 :                 n2->start = n->end;
     253       10626 :                 n2->prev = NULL;
     254       10626 :                 if (segs->t == o)
     255        4456 :                         segs->t = n2;
     256       10626 :                 ATOMIC_PTR_SET(&o->next, n);
     257       10626 :                 o->end = start;
     258             :         }
     259             :         return n;
     260             : }
     261             : 
     262             : static void
     263        4302 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
     264             : {
     265        4302 :         segment *cur = segs->h, *seg = NULL;
     266       18367 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     267       14065 :                 if (cur->ts == tr->tid) { /* revert */
     268        4799 :                         cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
     269        4799 :                         cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
     270        4799 :                         cur->oldts = 0;
     271             :                 }
     272       14065 :                 if (cur->ts <= oldest) { /* possibly merge range */
     273       13107 :                         if (!seg) { /* skip first */
     274             :                                 seg = cur;
     275        8805 :                         } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
     276             :                                 /* merge with previous */
     277        4348 :                                 seg->end = cur->end;
     278        4348 :                                 ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     279        4348 :                                 if (cur == segs->t)
     280        2857 :                                         segs->t = seg;
     281        4348 :                                 mark4destroy(cur, change, store_get_timestamp(tr->store));
     282        4348 :                                 cur = seg;
     283             :                         } else {
     284             :                                 seg = cur; /* begin of new merge */
     285             :                         }
     286             :                 }
     287             :         }
     288        4302 : }
     289             : 
     290             : static size_t
     291      104212 : segs_end_include_deleted( segments *segs, sql_trans *tr)
     292             : {
     293      104212 :         size_t cnt = 0;
     294      104212 :         segment *s = segs->h, *l = NULL;
     295             : 
     296      509188 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     297      404976 :                 if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
     298             :                                 l = s;
     299             :         }
     300      104212 :         if (l)
     301      104205 :                 cnt = l->end;
     302      104212 :         return cnt;
     303             : }
     304             : 
     305             : static int
     306      104212 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
     307             : {
     308             :         /* set bits correctly */
     309      104212 :         BAT *b = temp_descriptor(cs->bid);
     310             : 
     311      104212 :         if (!b)
     312             :                 return LOG_ERR;
     313      104212 :         segment *s = segs->h;
     314             : 
     315      104212 :         size_t nr = segs_end_include_deleted(segs, tr);
     316      104212 :         size_t rounded_nr = ((nr+31)&~31);
     317      104212 :         if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
     318           0 :                 bat_destroy(b);
     319           0 :                 return LOG_ERR;
     320             :         }
     321             : 
     322             :         /* disable all properties here */
     323      104212 :         MT_lock_set(&b->theaplock);
     324      104212 :         b->tsorted = false;
     325      104212 :         b->trevsorted = false;
     326      104212 :         b->tnosorted = 0;
     327      104212 :         b->tnorevsorted = 0;
     328      104212 :         b->tseqbase = oid_nil;
     329      104212 :         b->tkey = false;
     330      104212 :         b->tnokey[0] = 0;
     331      104212 :         b->tnokey[1] = 0;
     332      104212 :         b->theap->dirty = true;
     333      104212 :         BUN cnt = BATcount(b);
     334             : 
     335      104212 :         uint32_t *restrict dst;
     336      445354 :         for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
     337      385948 :                 if (s->start >= nr)
     338             :                         break;
     339      341142 :                 if (s->ts == tr->tid && s->end != s->start) {
     340      151234 :                         if (cnt < s->start) { /* first mark as deleted ! */
     341        3318 :                                 size_t lnr = s->start-cnt;
     342        3318 :                                 size_t pos = cnt;
     343        3318 :                                 dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     344        3318 :                                 uint32_t cur = 0;
     345             : 
     346        3318 :                                 size_t used = pos&31, end = 32;
     347        3318 :                                 if (used) {
     348        3194 :                                         if (lnr < (32-used))
     349        3065 :                                                 end = used + lnr;
     350        3194 :                                         assert(end > used);
     351        3194 :                                         cur |= ((1U << (end - used)) - 1) << used;
     352        3194 :                                         lnr -= end - used;
     353        3194 :                                         *dst++ |= cur;
     354        3194 :                                         cur = 0;
     355             :                                 }
     356        3318 :                                 size_t full = lnr/32;
     357        3318 :                                 size_t rest = lnr%32;
     358        3318 :                                 if (full > 0) {
     359           7 :                                         memset(dst, ~0, full * sizeof(*dst));
     360           7 :                                         dst += full;
     361           7 :                                         lnr -= full * 32;
     362             :                                 }
     363        3318 :                                 if (rest > 0) {
     364         167 :                                         cur |= (1U << rest) - 1;
     365         167 :                                         lnr -= rest;
     366         167 :                                         *dst |= cur;
     367             :                                 }
     368        3318 :                                 assert(lnr==0);
     369             :                         }
     370      151234 :                         size_t lnr = s->end-s->start;
     371      151234 :                         size_t pos = s->start;
     372      151234 :                         dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     373      151234 :                         uint32_t cur = 0;
     374      151234 :                         size_t used = pos&31, end = 32;
     375      151234 :                         if (used) {
     376      112708 :                                 if (lnr < (32-used))
     377      105983 :                                         end = used + lnr;
     378      112708 :                                 assert(end > used);
     379      112708 :                                 cur |= ((1U << (end - used)) - 1) << used;
     380      112708 :                                 lnr -= end - used;
     381      112708 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     382      112708 :                                 dst++;
     383      112708 :                                 cur = 0;
     384             :                         }
     385      151234 :                         size_t full = lnr/32;
     386      151234 :                         size_t rest = lnr%32;
     387      151234 :                         if (full > 0) {
     388        3585 :                                 memset(dst, s->deleted?~0:0, full * sizeof(*dst));
     389        3585 :                                 dst += full;
     390        3585 :                                 lnr -= full * 32;
     391             :                         }
     392      151234 :                         if (rest > 0) {
     393       41716 :                                 cur |= (1U << rest) - 1;
     394       41716 :                                 lnr -= rest;
     395       41716 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     396             :                         }
     397      151234 :                         assert(lnr==0);
     398      151234 :                         if (cnt < s->end)
     399      341142 :                                 cnt = s->end;
     400             :                 }
     401             :         }
     402      104212 :         if (nr > BATcount(b)) {
     403       62353 :                 BATsetcount(b, nr);
     404             :         }
     405      104212 :         MT_lock_unset(&b->theaplock);
     406             : 
     407      104212 :         bat_destroy(b);
     408      104212 :         return LOG_OK;
     409             : }
     410             : 
     411             : /* TODO return LOG_OK/ERR */
     412             : static void
     413      104238 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
     414             : {
     415      104238 :         sqlstore* store = tr->store;
     416      104238 :         segment *cur = s->segs->h, *seg = NULL;
     417      509286 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     418      405048 :                 if (cur->ts == tr->tid) {
     419      165732 :                         if (!cur->deleted)
     420       93089 :                                 cur->oldts = 0;
     421      165732 :                         cur->ts = commit_ts;
     422             :                 }
     423      405048 :                 if (!seg) {
     424             :                         /* first segment */
     425             :                         seg = cur;
     426             :                 }
     427      300810 :                 else if (seg->ts < TRANSACTION_ID_BASE) {
     428             :                         /* possible merge since both deleted flags are equal */
     429      279381 :                         if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
     430      208666 :                                 int merge = 1;
     431      208666 :                                 node *n = store->active->h;
     432      709417 :                                 for (int i = 0; i < store->active->cnt; i++, n = n->next) {
     433      606368 :                                         sql_trans* other = ((sql_trans*)n->data);
     434      606368 :                                         ulng active = other->ts;
     435      606368 :                                         if(other->active == 2)
     436       30356 :                                                 continue; /* pretend that another recently committed transaction is no longer active */
     437      576012 :                                         if (active == tr->ts)
     438      136444 :                                                 continue; /* pretend that committing transaction has already committed and is no longer active */
     439      439568 :                                         if (seg->ts < active && cur->ts < active)
     440             :                                                 break;
     441      427040 :                                         if (seg->ts > active && cur->ts > active)
     442      333951 :                                                 continue;
     443             : 
     444       93089 :                                         assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
     445             :                                         /* cannot safely merge since there is an active transaction between the segments */
     446             :                                         merge = false;
     447             :                                         break;
     448             :                                 }
     449             :                                 /* merge segments */
     450      231154 :                                 if (merge) {
     451      115577 :                                         seg->end = cur->end;
     452      115577 :                                         ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     453      115577 :                                         if (cur == s->segs->t)
     454       27297 :                                                 s->segs->t = seg;
     455      115577 :                                         if (commit_ts == oldest) {
     456       98852 :                                                 ATOMIC_PTR_DESTROY(&cur->next);
     457       98852 :                                                 _DELETE(cur);
     458             :                                         } else
     459       33450 :                                                 mark4destroy(cur, change, commit_ts);
     460      115577 :                                         cur = seg;
     461      115577 :                                         continue;
     462             :                                 }
     463             :                         }
     464             :                 }
     465             :                 seg = cur;
     466             :         }
     467      104238 : }
     468             : 
     469             : static int
     470     2186689 : segments_in_transaction(sql_trans *tr, sql_table *t)
     471             : {
     472     2186689 :         storage *s = ATOMIC_PTR_GET(&t->data);
     473     2186689 :         segment *seg = s->segs->h;
     474             : 
     475     2186689 :         if (seg && s->segs->t->ts == tr->tid)
     476             :                 return 1;
     477      632512 :         for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
     478      527429 :                 if (seg->ts == tr->tid)
     479             :                         return 1;
     480             :         }
     481             :         return 0;
     482             : }
     483             : 
     484             : static size_t
     485    20691431 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
     486             : {
     487    20691431 :         size_t cnt = 0;
     488             : 
     489             :         /* because a table can grow rows over the time a transaction is running, we need to find the last valid segment, to
     490             :          * keep all of the parts aligned */
     491    20691431 :         lock_table(tr->store, table->base.id);
     492    20759406 :         segment *s = segs->h, *l = NULL;
     493             : 
     494    20759406 :         if (segs->t && SEG_IS_VALID(segs->t, tr))
     495    17008229 :                 l = s = segs->t;
     496             : 
     497   223672292 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     498   202912862 :                 if (SEG_IS_VALID(s, tr))
     499             :                                 l = s;
     500             :         }
     501    20759430 :         if (l)
     502    20741015 :                 cnt = l->end;
     503    20759430 :         unlock_table(tr->store, table->base.id);
     504    20757825 :         return cnt;
     505             : }
     506             : 
     507             : static segments *
     508       52507 : new_segments(sql_trans *tr, size_t cnt)
     509             : {
     510       52507 :         segments *n = (segments*)GDKmalloc(sizeof(segments));
     511             : 
     512       52507 :         if (n) {
     513       52507 :                 n->nr_reused = 0;
     514       52507 :                 n->h = n->t = new_segment(NULL, tr, cnt);
     515       52507 :                 if (!n->h) {
     516           0 :                         GDKfree(n);
     517           0 :                         return NULL;
     518             :                 }
     519       52507 :                 sql_ref_init(&n->r);
     520             :         }
     521             :         return n;
     522             : }
     523             : 
     524             : static sql_delta *
     525    34625865 : timestamp_delta( sql_trans *tr, sql_delta *d)
     526             : {
     527    34680022 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     528       54157 :                 d = d->next;
     529    34633071 :         return d;
     530             : }
     531             : 
     532             : static sql_delta *
     533    34471436 : col_timestamp_delta( sql_trans *tr, sql_column *c)
     534             : {
     535    34471436 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
     536             : }
     537             : 
     538             : static sql_delta *
     539       27358 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
     540             : {
     541       27358 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
     542             : }
     543             : 
     544             : static storage *
     545    21188615 : timestamp_storage( sql_trans *tr, storage *d)
     546             : {
     547    21188615 :         if (!d)
     548             :                 return NULL;
     549    21261160 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     550       72545 :                 d = d->next;
     551             :         return d;
     552             : }
     553             : 
     554             : static storage *
     555    21169850 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
     556             : {
     557    21169850 :         return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
     558             : }
     559             : 
     560             : static sql_delta*
     561       21385 : delta_dup(sql_delta *d)
     562             : {
     563       21385 :         ATOMIC_INC(&d->cs.refcnt);
     564       21385 :         return d;
     565             : }
     566             : 
     567             : static void *
     568       19569 : col_dup(sql_column *c)
     569             : {
     570       19569 :         return delta_dup(ATOMIC_PTR_GET(&c->data));
     571             : }
     572             : 
     573             : static void *
     574        3103 : idx_dup(sql_idx *i)
     575             : {
     576        3103 :         if (!ATOMIC_PTR_GET(&i->data))
     577             :                 return NULL;
     578        1816 :         return delta_dup(ATOMIC_PTR_GET(&i->data));
     579             : }
     580             : 
     581             : static storage*
     582        1601 : storage_dup(storage *d)
     583             : {
     584        1601 :         ATOMIC_INC(&d->cs.refcnt);
     585        1601 :         return d;
     586             : }
     587             : 
     588             : static void *
     589        1601 : del_dup(sql_table *t)
     590             : {
     591        1601 :         return storage_dup(ATOMIC_PTR_GET(&t->data));
     592             : }
     593             : 
     594             : static size_t
     595          17 : count_inserts( segment *s, sql_trans *tr)
     596             : {
     597          17 :         size_t cnt = 0;
     598             : 
     599          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     600          55 :                 if (!s->deleted && s->ts == tr->tid)
     601           4 :                         cnt += s->end - s->start;
     602             :         }
     603          17 :         return cnt;
     604             : }
     605             : 
     606             : static size_t
     607      873189 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
     608             : {
     609      873189 :         size_t cnt = 0;
     610             : 
     611     1023196 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
     612             :                 ;
     613             : 
     614     5681434 :         for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
     615     4808275 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
     616     1634050 :                         cnt += s->end - s->start;
     617             :         }
     618      873159 :         return cnt;
     619             : }
     620             : 
     621             : static size_t
     622          17 : count_deletes( segment *s, sql_trans *tr)
     623             : {
     624          17 :         size_t cnt = 0;
     625             : 
     626          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     627          55 :                 if (SEG_IS_DELETED(s, tr))
     628          17 :                         cnt += s->end - s->start;
     629             :         }
     630          17 :         return cnt;
     631             : }
     632             : 
     633             : #define CNT_ACTIVE 10
     634             : 
     635             : static size_t
     636    19430341 : count_col(sql_trans *tr, sql_column *c, int access)
     637             : {
     638    19430341 :         storage *d;
     639    19430341 :         sql_delta *ds;
     640             : 
     641    19430341 :         if (!isTable(c->t))
     642             :                 return 0;
     643    19430341 :         d = tab_timestamp_storage(tr, c->t);
     644    19442068 :         ds = col_timestamp_delta(tr, c);
     645    19447868 :         if (!d ||!ds)
     646             :                 return 0;
     647    19447868 :         if (access == 2)
     648      490764 :                 return ds?ds->cs.ucnt:0;
     649    18957104 :         if (access == 1)
     650          17 :                 return count_inserts(d->segs->h, tr);
     651    18957087 :         if (access == QUICK)
     652           0 :                 return d->segs->t?d->segs->t->end:0;
     653    18957087 :         if (access == CNT_ACTIVE) {
     654      872691 :                 size_t cnt = segs_end(d->segs, tr, c->t);
     655      873316 :                 lock_table(tr->store, c->t->base.id);
     656      873240 :                 cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
     657      873185 :                 unlock_table(tr->store, c->t->base.id);
     658      873185 :                 return cnt;
     659             :         }
     660    18084396 :         return segs_end(d->segs, tr, c->t);
     661             : }
     662             : 
     663             : static size_t
     664       23356 : count_idx(sql_trans *tr, sql_idx *i, int access)
     665             : {
     666       23356 :         storage *d;
     667       23356 :         sql_delta *ds;
     668             : 
     669       23356 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
     670        6719 :                 return 0;
     671       16637 :         d = tab_timestamp_storage(tr, i->t);
     672       16652 :         ds = idx_timestamp_delta(tr, i);
     673       16656 :         if (!d || !ds)
     674             :                 return 0;
     675       16656 :         if (access == 2)
     676        2871 :                 return ds?ds->cs.ucnt:0;
     677       13785 :         if (access == 1)
     678           0 :                 return count_inserts(d->segs->h, tr);
     679       13785 :         if (access == QUICK)
     680        3550 :                 return d->segs->t?d->segs->t->end:0;
     681       10235 :         return segs_end(d->segs, tr, i->t);
     682             : }
     683             : 
     684             : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
     685             : static BAT *
     686    14879592 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
     687             : {
     688    14879592 :         BAT *b;
     689             : 
     690    14879592 :         assert(access == RD_UPD_ID || access == RD_UPD_VAL);
     691             :         /* returns the updates for cs */
     692    14879592 :         if (cs->uibid && cs->uvbid && cs->ucnt) {
     693       14257 :                 if (access == RD_UPD_ID) {
     694        8574 :                         if (!(b = temp_descriptor(cs->uibid)))
     695             :                                 return NULL;
     696        8574 :                         if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
     697        2274 :                            (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
     698        6300 :                                         oid nil = oid_nil;
     699             :                                         /* less then cnt */
     700        6300 :                                         BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false, false);
     701        6300 :                                         if (!s) {
     702           0 :                                                 bat_destroy(b);
     703           0 :                                                 return NULL;
     704             :                                         }
     705             : 
     706        6300 :                                         BAT *nb = BATproject(s, b);
     707        6300 :                                         bat_destroy(s);
     708        6300 :                                         bat_destroy(b);
     709        6300 :                                         b = nb;
     710             :                         }
     711             :                 } else {
     712        5683 :                         b = temp_descriptor(cs->uvbid);
     713             :                 }
     714             :         } else {
     715    24776465 :                 b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
     716             :         }
     717             :         return b;
     718             : }
     719             : 
     720             : static BAT *
     721           0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
     722             : {
     723           0 :         int err = 0;
     724           0 :         BAT *uv = *UV;
     725           0 :         BUN cnt = BATcount(ui)+BATcount(oi);
     726           0 :         BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
     727           0 :         BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
     728             : 
     729           0 :         if (!ni || (uv && !nv)) {
     730           0 :                 bat_destroy(ni);
     731           0 :                 bat_destroy(nv);
     732           0 :                 bat_destroy(ui);
     733           0 :                 bat_destroy(uv);
     734           0 :                 bat_destroy(oi);
     735           0 :                 bat_destroy(ov);
     736           0 :                 return NULL;
     737             :         }
     738           0 :         BATiter uvi;
     739           0 :         BATiter ovi;
     740             : 
     741           0 :         if (uv) {
     742           0 :                 uvi = bat_iterator(uv);
     743           0 :                 ovi = bat_iterator(ov);
     744             :         }
     745             : 
     746             :         /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
     747           0 :         BUN uip = 0, uie = BATcount(ui);
     748           0 :         BUN oip = 0, oie = BATcount(oi);
     749             : 
     750           0 :         oid uiseqb = ui->tseqbase;
     751           0 :         oid oiseqb = oi->tseqbase;
     752           0 :         oid *uipt = NULL, *oipt = NULL;
     753           0 :         BATiter uii = bat_iterator(ui);
     754           0 :         BATiter oii = bat_iterator(oi);
     755           0 :         if (!BATtdensebi(&uii))
     756           0 :                 uipt = uii.base;
     757           0 :         if (!BATtdensebi(&oii))
     758           0 :                 oipt = oii.base;
     759           0 :         while (uip < uie && oip < oie && !err) {
     760           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     761           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     762             : 
     763           0 :                 if (uiid <= oiid) {
     764           0 :                         if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     765           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     766             :                                 err = 1;
     767           0 :                         uip++;
     768           0 :                         if (uiid == oiid)
     769           0 :                                 oip++;
     770             :                 } else { /* uiid > oiid */
     771           0 :                         if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     772           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     773             :                                 err = 1;
     774           0 :                         oip++;
     775             :                 }
     776             :         }
     777           0 :         while (uip < uie && !err) {
     778           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     779           0 :                 if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     780           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     781             :                         err = 1;
     782           0 :                 uip++;
     783             :         }
     784           0 :         while (oip < oie && !err) {
     785           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     786           0 :                 if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     787           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     788             :                         err = 1;
     789           0 :                 oip++;
     790             :         }
     791           0 :         if (uv) {
     792           0 :                 bat_iterator_end(&uvi);
     793           0 :                 bat_iterator_end(&ovi);
     794             :         }
     795           0 :         bat_iterator_end(&uii);
     796           0 :         bat_iterator_end(&oii);
     797           0 :         bat_destroy(ui);
     798           0 :         bat_destroy(uv);
     799           0 :         bat_destroy(oi);
     800           0 :         bat_destroy(ov);
     801           0 :         if (!err) {
     802           0 :                 if (nv)
     803           0 :                         *UV = nv;
     804           0 :                 return ni;
     805             :         }
     806           0 :         *UV = NULL;
     807           0 :         bat_destroy(ni);
     808           0 :         bat_destroy(nv);
     809           0 :         return NULL;
     810             : }
     811             : 
     812             : static sql_delta *
     813     9923237 : older_delta( sql_delta *d, sql_trans *tr)
     814             : {
     815     9923237 :         sql_delta *o = d->next;
     816             : 
     817     9932205 :         while (o && !o->cs.merged) {
     818        8944 :                 if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     819             :                         break;
     820             :                 else
     821        8968 :                         o = o->next;
     822             :         }
     823     9923261 :         if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     824           0 :                 return o;
     825             :         return NULL;
     826             : }
     827             : 
     828             : static BAT *
     829     9919541 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
     830             : {
     831     9919541 :         assert(tr->active);
     832     9919541 :         sql_delta *o = NULL;
     833     9919541 :         BAT *ui = NULL, *uv = NULL;
     834             : 
     835     9919541 :         if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
     836             :                 return NULL;
     837     9922934 :         if (access == RD_UPD_VAL) {
     838     4963165 :                 if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
     839           0 :                         bat_destroy(ui);
     840           0 :                         return NULL;
     841             :                 }
     842             :         }
     843     9923183 :         while ((o = older_delta(d, tr)) != NULL) {
     844           0 :                 BAT *oui = NULL, *ouv = NULL;
     845           0 :                 if (!oui)
     846           0 :                         oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
     847           0 :                 if (access == RD_UPD_VAL)
     848           0 :                         ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
     849           0 :                 if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
     850           0 :                         bat_destroy(ui);
     851           0 :                         bat_destroy(uv);
     852           0 :                         bat_destroy(oui);
     853           0 :                         bat_destroy(ouv);
     854           0 :                         return NULL;
     855             :                 }
     856           0 :                 if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
     857             :                         return NULL;
     858             :                 d = o;
     859             :         }
     860     9923272 :         if (uv) {
     861     4963405 :                 bat_destroy(ui);
     862     4963405 :                 return uv;
     863             :         }
     864             :         return ui;
     865             : }
     866             : 
     867             : static BAT *
     868        2795 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
     869             : {
     870        2795 :         lock_column(tr->store, c->base.id);
     871        2795 :         sql_delta *d = col_timestamp_delta(tr, c);
     872        2795 :         int type = c->type.type->localtype;
     873             : 
     874        2795 :         if (!d) {
     875           0 :                 unlock_column(tr->store, c->base.id);
     876           0 :                 return NULL;
     877             :         }
     878        2795 :         if (d->cs.st == ST_DICT) {
     879           0 :                 BAT *b = quick_descriptor(d->cs.bid);
     880             : 
     881           0 :                 type = b->ttype;
     882             :         }
     883        2795 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     884        2795 :         unlock_column(tr->store, c->base.id);
     885        2795 :         return bn;
     886             : }
     887             : 
     888             : static BAT *
     889           0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
     890             : {
     891           0 :         lock_column(tr->store, i->base.id);
     892           0 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     893           0 :         sql_delta *d = idx_timestamp_delta(tr, i);
     894             : 
     895           0 :         if (!d) {
     896           0 :                 unlock_column(tr->store, i->base.id);
     897           0 :                 return NULL;
     898             :         }
     899           0 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     900           0 :         unlock_column(tr->store, i->base.id);
     901           0 :         return bn;
     902             : }
     903             : 
     904             : static BAT *
     905    10107805 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
     906             : {
     907    10107805 :         BAT *b;
     908             : 
     909    10107805 :         assert(access == RDONLY || access == QUICK || access == RD_EXT);
     910    10107805 :         assert(cs != NULL);
     911    10107805 :         if (access == QUICK)
     912      179058 :                 return quick_descriptor(cs->bid);
     913     9928747 :         if (access == RD_EXT)
     914         857 :                 return temp_descriptor(cs->ebid);
     915     9927890 :         assert(cs->bid);
     916     9927890 :         b = temp_descriptor(cs->bid);
     917     9928820 :         if (b == NULL)
     918             :                 return NULL;
     919     9928820 :         assert(b->batRestricted == BAT_READ);
     920             :         /* return slice */
     921     9928820 :         BAT *s = BATslice(b, 0, cnt);
     922     9917106 :         bat_destroy(b);
     923     9917106 :         return s;
     924             : }
     925             : 
     926             : static int
     927     4959186 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
     928             : {
     929     4959186 :         lock_column(tr->store, c->base.id);
     930     4958283 :         size_t cnt = count_col(tr, c, 0);
     931     4960535 :         sql_delta *d = col_timestamp_delta(tr, c);
     932     4960572 :         int type = c->type.type->localtype;
     933             : 
     934     4960572 :         if (!d) {
     935           0 :                 unlock_column(tr->store, c->base.id);
     936           0 :                 return LOG_ERR;
     937             :         }
     938     4960572 :         if (d->cs.st == ST_DICT) {
     939           2 :                 BAT *b = quick_descriptor(d->cs.bid);
     940             : 
     941           2 :                 type = b->ttype;
     942             :         }
     943             : 
     944     4960572 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     945     4960503 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     946             : 
     947     4960345 :         unlock_column(tr->store, c->base.id);
     948             : 
     949     4960105 :         if (*ui == NULL || *uv == NULL) {
     950           0 :                 bat_destroy(*ui);
     951           0 :                 bat_destroy(*uv);
     952           0 :                 return LOG_ERR;
     953             :         }
     954             :         return LOG_OK;
     955             : }
     956             : 
     957             : static int
     958          23 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
     959             : {
     960          23 :         lock_column(tr->store, i->base.id);
     961          23 :         size_t cnt = count_idx(tr, i, 0);
     962          23 :         sql_delta *d = idx_timestamp_delta(tr, i);
     963          23 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     964             : 
     965          23 :         if (!d) {
     966           0 :                 unlock_column(tr->store, i->base.id);
     967           0 :                 return LOG_ERR;
     968             :         }
     969             : 
     970          23 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     971          23 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     972             : 
     973          23 :         unlock_column(tr->store, i->base.id);
     974             : 
     975          23 :         if (*ui == NULL || *uv == NULL) {
     976           0 :                 bat_destroy(*ui);
     977           0 :                 bat_destroy(*uv);
     978           0 :                 return LOG_ERR;
     979             :         }
     980             :         return LOG_OK;
     981             : }
     982             : 
     983             : static void *                                   /* BAT * */
     984    10096072 : bind_col(sql_trans *tr, sql_column *c, int access)
     985             : {
     986    10096072 :         assert(access == QUICK || tr->active);
     987    10096072 :         if (!isTable(c->t))
     988             :                 return NULL;
     989    10096072 :         sql_delta *d = col_timestamp_delta(tr, c);
     990    10098002 :         if (!d)
     991             :                 return NULL;
     992    10098002 :         size_t cnt = count_col(tr, c, 0);
     993    10101537 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
     994        2795 :                 return bind_ucol(tr, c, access, cnt);
     995    10098742 :         BAT *b = cs_bind_bat( &d->cs, access, cnt);
     996    10094689 :         assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
     997             :         return b;
     998             : }
     999             : 
    1000             : static void *                                   /* BAT * */
    1001       10699 : bind_idx(sql_trans *tr, sql_idx * i, int access)
    1002             : {
    1003       10699 :         assert(access == QUICK || tr->active);
    1004       10699 :         if (!isTable(i->t))
    1005             :                 return NULL;
    1006       10699 :         sql_delta *d = idx_timestamp_delta(tr, i);
    1007       10708 :         if (!d)
    1008             :                 return NULL;
    1009       10708 :         size_t cnt = count_idx(tr, i, 0);
    1010       10712 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
    1011           0 :                 return bind_uidx(tr, i, access, cnt);
    1012       10712 :         return cs_bind_bat( &d->cs, access, cnt);
    1013             : }
    1014             : 
    1015             : static int
    1016        4152 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
    1017             : {
    1018        4152 :         if (!cs->uibid) {
    1019           0 :                 cs->uibid = e_bat(TYPE_oid);
    1020           0 :                 if (cs->uibid == BID_NIL)
    1021             :                         return LOG_ERR;
    1022             :         }
    1023        4152 :         if (!cs->uvbid) {
    1024           0 :                 BAT *cur = quick_descriptor(cs->bid);
    1025           0 :                 if (!cur)
    1026             :                         return LOG_ERR;
    1027           0 :                 int type = cur->ttype;
    1028           0 :                 cs->uvbid = e_bat(type);
    1029           0 :                 if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1030             :                         return LOG_ERR;
    1031             :         }
    1032        4152 :         BAT *ui = temp_descriptor(cs->uibid);
    1033        4152 :         BAT *uv = temp_descriptor(cs->uvbid);
    1034             : 
    1035        4152 :         if (ui == NULL || uv == NULL) {
    1036           0 :                 bat_destroy(ui);
    1037           0 :                 bat_destroy(uv);
    1038           0 :                 return LOG_ERR;
    1039             :         }
    1040        4152 :         assert(ui && uv);
    1041        4152 :         if (isEbat(ui)){
    1042         400 :                 temp_destroy(cs->uibid);
    1043         400 :                 cs->uibid = temp_copy(ui->batCacheid, true, true);
    1044         400 :                 bat_destroy(ui);
    1045         400 :                 if (cs->uibid == BID_NIL ||
    1046         400 :                     (ui = temp_descriptor(cs->uibid)) == NULL) {
    1047           0 :                         bat_destroy(uv);
    1048           0 :                         return LOG_ERR;
    1049             :                 }
    1050             :         }
    1051        4152 :         if (isEbat(uv)){
    1052         400 :                 temp_destroy(cs->uvbid);
    1053         400 :                 cs->uvbid = temp_copy(uv->batCacheid, true, true);
    1054         400 :                 bat_destroy(uv);
    1055         400 :                 if (cs->uvbid == BID_NIL ||
    1056         400 :                     (uv = temp_descriptor(cs->uvbid)) == NULL) {
    1057           0 :                         bat_destroy(ui);
    1058           0 :                         return LOG_ERR;
    1059             :                 }
    1060             :         }
    1061        4152 :         *Ui = ui;
    1062        4152 :         *Uv = uv;
    1063        4152 :         return LOG_OK;
    1064             : }
    1065             : 
    1066             : static int
    1067        6936 : segments_is_append(segment *s, sql_trans *tr, oid rid)
    1068             : {
    1069      102577 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1070      102577 :                 if (s->start <= rid && s->end > rid) {
    1071        6936 :                         if (s->ts == tr->tid && !s->deleted) {
    1072        2886 :                                 return 1;
    1073             :                         }
    1074             :                         break;
    1075             :                 }
    1076             :         }
    1077             :         return 0;
    1078             : }
    1079             : 
    1080             : static int
    1081        4050 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
    1082             : {
    1083       96455 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1084       96455 :                 if (s->start <= rid && s->end > rid) {
    1085        4050 :                         if (s->ts >= tr->ts && s->deleted) {
    1086           0 :                                 return 1;
    1087             :                         }
    1088             :                         break;
    1089             :                 }
    1090             :         }
    1091             :         return 0;
    1092             : }
    1093             : 
    1094             : static sql_delta *
    1095           0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
    1096             : {
    1097           0 :         sql_delta *n = ZNEW(sql_delta);
    1098           0 :         if (!n)
    1099             :                 return NULL;
    1100           0 :         *n = *bat;
    1101           0 :         n->next = NULL;
    1102           0 :         n->cs.ts = tr->tid;
    1103           0 :         return n;
    1104             : }
    1105             : 
    1106             : static BAT *
    1107          17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
    1108             : {
    1109          17 :         BAT *newoffsets = NULL;
    1110          17 :         sql_delta *bat = *batp;
    1111          17 :         column_storage *cs = &bat->cs;
    1112          17 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1113             : 
    1114          17 :         if (!u)
    1115             :                 return NULL;
    1116          17 :         BUN max_cnt = (BATcount(u) < 256)?256:(BATcount(u)<65536)?65536:INT_MAX;
    1117          17 :         if (DICTprepare4append(&newoffsets, i, u) < 0) {
    1118           0 :                 bat_destroy(u);
    1119           0 :                 return NULL;
    1120             :         } else {
    1121          17 :                 int new = 0;
    1122             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1123          17 :                 if (BATcount(u) >= max_cnt) {
    1124           1 :                         if (max_cnt == INT_MAX) { /* decompress */
    1125           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1126           0 :                                         bat_destroy(u);
    1127           0 :                                         return NULL;
    1128             :                                 }
    1129           0 :                                 if (cs->ucnt) {
    1130           0 :                                         BAT *ui = NULL, *uv = NULL;
    1131           0 :                                         BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1132           0 :                                         bat_destroy(b);
    1133           0 :                                         if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1134           0 :                                                 bat_destroy(nb);
    1135           0 :                                                 bat_destroy(u);
    1136           0 :                                                 return NULL;
    1137             :                                         }
    1138           0 :                                         b = nb;
    1139           0 :                                         if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1140           0 :                                                 bat_destroy(ui);
    1141           0 :                                                 bat_destroy(uv);
    1142           0 :                                                 bat_destroy(b);
    1143           0 :                                                 bat_destroy(u);
    1144             :                                         }
    1145           0 :                                         bat_destroy(ui);
    1146           0 :                                         bat_destroy(uv);
    1147             :                                 }
    1148           0 :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1149           0 :                                 bat_destroy(b);
    1150           0 :                                 assert(newoffsets == NULL);
    1151           0 :                                 if (!n) {
    1152           0 :                                         bat_destroy(u);
    1153           0 :                                         return NULL;
    1154             :                                 }
    1155           0 :                                 if (cs->ts != tr->tid) {
    1156           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1157           0 :                                                 bat_destroy(n);
    1158           0 :                                                 return NULL;
    1159             :                                         }
    1160           0 :                                         cs = &(*batp)->cs;
    1161           0 :                                         new = 1;
    1162             :                                 }
    1163           0 :                                 if (cs->bid && !new)
    1164           0 :                                         temp_destroy(cs->bid);
    1165           0 :                                 n = transfer_to_systrans(n);
    1166           0 :                                 if (n == NULL)
    1167             :                                         return NULL;
    1168           0 :                                 bat_set_access(n, BAT_READ);
    1169           0 :                                 cs->bid = temp_create(n);
    1170           0 :                                 bat_destroy(n);
    1171           0 :                                 if (cs->ebid && !new)
    1172           0 :                                         temp_destroy(cs->ebid);
    1173           0 :                                 cs->ebid = 0;
    1174           0 :                                 cs->ucnt = 0;
    1175           0 :                                 if (cs->uibid && !new)
    1176           0 :                                         temp_destroy(cs->uibid);
    1177           0 :                                 if (cs->uvbid && !new)
    1178           0 :                                         temp_destroy(cs->uvbid);
    1179           0 :                                 cs->uibid = cs->uvbid = 0;
    1180           0 :                                 cs->st = ST_DEFAULT;
    1181           0 :                                 cs->cleared = true;
    1182             :                         } else {
    1183           1 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1184           0 :                                         bat_destroy(newoffsets);
    1185           0 :                                         bat_destroy(u);
    1186           0 :                                         return NULL;
    1187             :                                 }
    1188           2 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
    1189           1 :                                 bat_destroy(b);
    1190           1 :                                 if (!n) {
    1191           0 :                                         bat_destroy(newoffsets);
    1192           0 :                                         bat_destroy(u);
    1193           0 :                                         return NULL;
    1194             :                                 }
    1195           1 :                                 if (cs->ts != tr->tid) {
    1196           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1197           0 :                                                 bat_destroy(n);
    1198           0 :                                                 return NULL;
    1199             :                                         }
    1200           0 :                                         cs = &(*batp)->cs;
    1201           0 :                                         new = 1;
    1202           0 :                                         temp_dup(cs->ebid);
    1203           0 :                                         if (cs->uibid) {
    1204           0 :                                                 temp_dup(cs->uibid);
    1205           0 :                                                 temp_dup(cs->uvbid);
    1206             :                                         }
    1207             :                                 }
    1208           1 :                                 if (cs->bid && !new)
    1209           1 :                                         temp_destroy(cs->bid);
    1210           1 :                                 n = transfer_to_systrans(n);
    1211           1 :                                 if (n == NULL)
    1212             :                                         return NULL;
    1213           1 :                                 bat_set_access(n, BAT_READ);
    1214           1 :                                 cs->bid = temp_create(n);
    1215           1 :                                 bat_destroy(n);
    1216           1 :                                 cs->cleared = true;
    1217           1 :                                 i = newoffsets;
    1218             :                         }
    1219             :                 } else { /* append */
    1220          16 :                         i = newoffsets;
    1221             :                 }
    1222             :         }
    1223          17 :         bat_destroy(u);
    1224          17 :         return i;
    1225             : }
    1226             : 
    1227             : static BAT *
    1228           0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
    1229             : {
    1230           0 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1231           0 :         BAT *newoffsets = NULL;
    1232           0 :         BAT *b = NULL, *n = NULL;
    1233             : 
    1234           0 :         if (!(b = temp_descriptor(cs->bid)))
    1235             :                 return NULL;
    1236             : 
    1237           0 :         if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
    1238           0 :                 bat_destroy(b);
    1239           0 :                 return NULL;
    1240             :         } else {
    1241             :                 /* returns new offset bat if values within min/max, else decompress */
    1242           0 :                 if (!newoffsets) { /* decompress */
    1243           0 :                         if (cs->ucnt) {
    1244           0 :                                 BAT *ui = NULL, *uv = NULL;
    1245           0 :                                 BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1246           0 :                                 bat_destroy(b);
    1247           0 :                                 if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1248           0 :                                         bat_destroy(nb);
    1249           0 :                                         return NULL;
    1250             :                                 }
    1251           0 :                                 b = nb;
    1252           0 :                                 if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1253           0 :                                         bat_destroy(ui);
    1254           0 :                                         bat_destroy(uv);
    1255           0 :                                         bat_destroy(b);
    1256             :                                 }
    1257           0 :                                 bat_destroy(ui);
    1258           0 :                                 bat_destroy(uv);
    1259             :                         }
    1260           0 :                         n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
    1261           0 :                         bat_destroy(b);
    1262           0 :                         if (!n)
    1263             :                                 return NULL;
    1264           0 :                         if (cs->bid)
    1265           0 :                                 temp_destroy(cs->bid);
    1266           0 :                         n = transfer_to_systrans(n);
    1267           0 :                         if (n == NULL)
    1268             :                                 return NULL;
    1269           0 :                         bat_set_access(n, BAT_READ);
    1270           0 :                         cs->bid = temp_create(n);
    1271           0 :                         cs->ucnt = 0;
    1272           0 :                         if (cs->uibid)
    1273           0 :                                 temp_destroy(cs->uibid);
    1274           0 :                         if (cs->uvbid)
    1275           0 :                                 temp_destroy(cs->uvbid);
    1276           0 :                         cs->uibid = cs->uvbid = 0;
    1277           0 :                         cs->st = ST_DEFAULT;
    1278           0 :                         cs->cleared = true;
    1279           0 :                         b = n;
    1280             :                 } else { /* append */
    1281             :                         i = newoffsets;
    1282             :                 }
    1283             :         }
    1284           0 :         bat_destroy(b);
    1285           0 :         return i;
    1286             : }
    1287             : 
    1288             : /*
    1289             :  * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
    1290             :  */
    1291             : static int
    1292        3123 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1293             : {
    1294        3123 :         int res = LOG_OK;
    1295        3123 :         sql_delta *bat = *batp;
    1296        3123 :         column_storage *cs = &bat->cs;
    1297        3123 :         BAT *otids = tids, *oupdates = updates;
    1298             : 
    1299        3123 :         if (!BATcount(tids))
    1300             :                 return LOG_OK;
    1301             : 
    1302        3123 :         if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
    1303           6 :                 tids = BATunmask(tids);
    1304           6 :                 if (!tids)
    1305             :                         return LOG_ERR;
    1306             :         }
    1307        3123 :         if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
    1308           0 :                 updates = BATunmask(updates);
    1309           0 :                 if (!updates) {
    1310           0 :                         if (otids != tids)
    1311           0 :                                 bat_destroy(tids);
    1312           0 :                         return LOG_ERR;
    1313             :                 }
    1314        3123 :         } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
    1315          42 :                 updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
    1316          42 :                 if (!updates) {
    1317           0 :                         if (otids != tids)
    1318           0 :                                 bat_destroy(tids);
    1319           0 :                         return LOG_ERR;
    1320             :                 }
    1321             :         }
    1322             : 
    1323        3123 :         if (cs->st == ST_DICT) {
    1324             :                 /* possibly a new array is returned */
    1325           4 :                 BAT *nupdates = dict_append_bat(tr, batp, updates);
    1326           4 :                 bat = *batp;
    1327           4 :                 cs = &bat->cs;
    1328           4 :                 if (oupdates != updates)
    1329           0 :                         bat_destroy(updates);
    1330           4 :                 updates = nupdates;
    1331           4 :                 if (!updates) {
    1332           0 :                         if (otids != tids)
    1333           0 :                                 bat_destroy(tids);
    1334           0 :                         return LOG_ERR;
    1335             :                 }
    1336             :         }
    1337             : 
    1338             :         /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1339             :         /* currently only one update delta is possible */
    1340        3123 :         lock_table(tr->store, t->base.id);
    1341        3123 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1342        3123 :         if (!is_new && !cs->cleared) {
    1343        2800 :                 if (!tids->tsorted /* make sure we have simple dense or oids */) {
    1344           6 :                         BAT *sorted, *order;
    1345           6 :                         if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
    1346           0 :                                 if (otids != tids)
    1347           0 :                                         bat_destroy(tids);
    1348           0 :                                 if (oupdates != updates)
    1349           0 :                                         bat_destroy(updates);
    1350           0 :                                 unlock_table(tr->store, t->base.id);
    1351           0 :                                 return LOG_ERR;
    1352             :                         }
    1353           6 :                         if (otids != tids)
    1354           0 :                                 bat_destroy(tids);
    1355           6 :                         tids = sorted;
    1356           6 :                         BAT *nupdates = BATproject(order, updates);
    1357           6 :                         bat_destroy(order);
    1358           6 :                         if (oupdates != updates)
    1359           0 :                                 bat_destroy(updates);
    1360           6 :                         updates = nupdates;
    1361           6 :                         if (!updates) {
    1362           0 :                                 bat_destroy(tids);
    1363           0 :                                 unlock_table(tr->store, t->base.id);
    1364           0 :                                 return LOG_ERR;
    1365             :                         }
    1366             :                 }
    1367        2800 :                 assert(tids->tsorted);
    1368        2800 :                 BAT *ui = NULL, *uv = NULL;
    1369             : 
    1370             :                 /* handle updates on just inserted bits */
    1371             :                 /* handle updates on updates (within one transaction) */
    1372        2800 :                 BATiter upi = bat_iterator(updates);
    1373        2800 :                 BUN cnt = 0, ucnt = BATcount(tids);
    1374        2800 :                 BAT *b, *ins = NULL;
    1375        2800 :                 int *msk = NULL;
    1376             : 
    1377        2800 :                 if((b = temp_descriptor(cs->bid)) == NULL)
    1378             :                         res = LOG_ERR;
    1379             : 
    1380        2800 :                 if (res == LOG_OK && BATtdense(tids)) {
    1381        2596 :                         oid start = tids->tseqbase, offset = start;
    1382        2596 :                         oid end = start + ucnt;
    1383             : 
    1384       12271 :                         for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
    1385       10338 :                                 if (seg->start <= start && seg->end > start) {
    1386             :                                         /* check for delete conflicts */
    1387        2596 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1388           0 :                                                 res = LOG_CONFLICT;
    1389           0 :                                                 continue;
    1390             :                                         }
    1391             : 
    1392             :                                         /* check for inplace updates */
    1393        2596 :                                         BUN lend = end < seg->end?end:seg->end;
    1394        2596 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1395         182 :                                                 if (!ins) {
    1396         182 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1397         182 :                                                         if (!ins)
    1398             :                                                                 res = LOG_ERR;
    1399             :                                                         else {
    1400         182 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1401         182 :                                                                 msk = (int*)Tloc(ins, 0);
    1402         182 :                                                                 BUN end = (ucnt+31)/32;
    1403         182 :                                                                 memset(msk, 0, end * sizeof(int));
    1404             :                                                         }
    1405             :                                                 }
    1406         649 :                                                 for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
    1407         467 :                                                         const void *upd = BUNtail(upi, rid-offset);
    1408         467 :                                                         if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1409           0 :                                                                 res = LOG_ERR;
    1410             : 
    1411         467 :                                                         oid word = i/32;
    1412         467 :                                                         int pos = i%32;
    1413         467 :                                                         msk[word] |= 1U<<pos;
    1414         467 :                                                         cnt++;
    1415             :                                                 }
    1416             :                                         }
    1417             :                                 }
    1418       10338 :                                 if (end < seg->end)
    1419             :                                         break;
    1420             :                         }
    1421         207 :                 } else if (res == LOG_OK && complex_cand(tids)) {
    1422           3 :                         struct canditer ci;
    1423           3 :                         segment *seg = s->segs->h;
    1424           3 :                         canditer_init(&ci, NULL, tids);
    1425           3 :                         BUN i = 0;
    1426        1036 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1427        1033 :                                 oid rid = canditer_next(&ci);
    1428        1033 :                                 if (seg->end <= rid)
    1429          13 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1430        1020 :                                 else if (seg->start <= rid && seg->end > rid) {
    1431             :                                         /* check for delete conflicts */
    1432        1020 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1433           0 :                                                 res = LOG_CONFLICT;
    1434           0 :                                                 continue;
    1435             :                                         }
    1436             : 
    1437             :                                         /* check for inplace updates */
    1438        1020 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1439           0 :                                                 if (!ins) {
    1440           0 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1441           0 :                                                         if (!ins) {
    1442             :                                                                 res = LOG_ERR;
    1443             :                                                                 break;
    1444             :                                                         } else {
    1445           0 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1446           0 :                                                                 msk = (int*)Tloc(ins, 0);
    1447           0 :                                                                 BUN end = (ucnt+31)/32;
    1448           0 :                                                                 memset(msk, 0, end * sizeof(int));
    1449             :                                                         }
    1450             :                                                 }
    1451           0 :                                                 ptr upd = BUNtail(upi, i);
    1452           0 :                                                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1453           0 :                                                         res = LOG_ERR;
    1454             : 
    1455           0 :                                                 oid word = i/32;
    1456           0 :                                                 int pos = i%32;
    1457           0 :                                                 msk[word] |= 1U<<pos;
    1458           0 :                                                 cnt++;
    1459             :                                         }
    1460        1020 :                                         i++;
    1461             :                                 }
    1462             :                         }
    1463         201 :                 } else if (res == LOG_OK) {
    1464         201 :                         BUN i = 0;
    1465         201 :                         oid *rid = Tloc(tids,0);
    1466         201 :                         segment *seg = s->segs->h;
    1467       21958 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1468       21757 :                                 if (seg->end <= rid[i])
    1469        8325 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1470       13432 :                                 else if (seg->start <= rid[i] && seg->end > rid[i]) {
    1471             :                                         /* check for delete conflicts */
    1472       13432 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1473           0 :                                                 res = LOG_CONFLICT;
    1474           0 :                                                 continue;
    1475             :                                         }
    1476             : 
    1477             :                                         /* check for inplace updates */
    1478       13432 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1479         995 :                                                 if (!ins) {
    1480          80 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1481          80 :                                                         if (!ins) {
    1482             :                                                                 res = LOG_ERR;
    1483             :                                                                 break;
    1484             :                                                         } else {
    1485          80 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1486          80 :                                                                 msk = (int*)Tloc(ins, 0);
    1487          80 :                                                                 BUN end = (ucnt+31)/32;
    1488          80 :                                                                 memset(msk, 0, end * sizeof(int));
    1489             :                                                         }
    1490             :                                                 }
    1491         995 :                                                 const void *upd = BUNtail(upi, i);
    1492         995 :                                                 if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
    1493           0 :                                                         res = LOG_ERR;
    1494             : 
    1495         995 :                                                 oid word = i/32;
    1496         995 :                                                 int pos = i%32;
    1497         995 :                                                 msk[word] |= 1U<<pos;
    1498         995 :                                                 cnt++;
    1499             :                                         }
    1500       13432 :                                         i++;
    1501             :                                 }
    1502             :                         }
    1503             :                 }
    1504             : 
    1505        2800 :                 if (res == LOG_OK && cnt < ucnt) {   /* now handle real updates */
    1506        2574 :                         if (cs->ucnt == 0) {
    1507        2472 :                                 if (cnt) {
    1508          12 :                                         BAT *nins = BATmaskedcands(0, ucnt, ins, false);
    1509          12 :                                         if (nins) {
    1510          12 :                                                 ui = BATproject(nins, tids);
    1511          12 :                                                 uv = BATproject(nins, updates);
    1512          12 :                                                 bat_destroy(nins);
    1513             :                                         }
    1514             :                                 } else {
    1515        2460 :                                         ui = temp_descriptor(tids->batCacheid);
    1516        2460 :                                         uv = temp_descriptor(updates->batCacheid);
    1517             :                                 }
    1518        2472 :                                 if (!ui || !uv) {
    1519             :                                         res = LOG_ERR;
    1520             :                                 } else {
    1521        2472 :                                         temp_destroy(cs->uibid);
    1522        2472 :                                         temp_destroy(cs->uvbid);
    1523        2472 :                                         ui = transfer_to_systrans(ui);
    1524        2472 :                                         uv = transfer_to_systrans(uv);
    1525        2472 :                                         if (ui == NULL || uv == NULL) {
    1526           0 :                                                 BBPreclaim(ui);
    1527           0 :                                                 BBPreclaim(uv);
    1528             :                                                 res = LOG_ERR;
    1529             :                                         } else {
    1530        2472 :                                                 cs->uibid = temp_create(ui);
    1531        2472 :                                                 cs->uvbid = temp_create(uv);
    1532        2472 :                                                 cs->ucnt = BATcount(ui);
    1533             :                                         }
    1534             :                                 }
    1535             :                         } else {
    1536         102 :                                 BAT *nui = NULL, *nuv = NULL;
    1537             : 
    1538             :                                 /* merge taking msk of inserted into account */
    1539         102 :                                 if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
    1540             :                                         res = LOG_ERR;
    1541             : 
    1542         102 :                                 if (res == LOG_OK) {
    1543         102 :                                         const void *upd = NULL;
    1544         102 :                                         nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
    1545         102 :                                         nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
    1546             : 
    1547         102 :                                         if (!nui || !nuv) {
    1548             :                                                 res = LOG_ERR;
    1549             :                                         } else {
    1550         102 :                                                 BATiter ovi = bat_iterator(uv);
    1551             : 
    1552             :                                                 /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
    1553         102 :                                                 BUN uip = 0, uie = BATcount(ui);
    1554         102 :                                                 BUN nip = 0, nie = BATcount(tids);
    1555         102 :                                                 oid uiseqb = ui->tseqbase;
    1556         102 :                                                 oid niseqb = tids->tseqbase;
    1557         102 :                                                 oid *uipt = NULL, *nipt = NULL;
    1558         102 :                                                 BATiter uii = bat_iterator(ui);
    1559         102 :                                                 BATiter tidsi = bat_iterator(tids);
    1560         102 :                                                 if (!BATtdensebi(&uii))
    1561          97 :                                                         uipt = uii.base;
    1562         102 :                                                 if (!BATtdensebi(&tidsi))
    1563          93 :                                                         nipt = tidsi.base;
    1564       16091 :                                                 while (uip < uie && nip < nie && res == LOG_OK) {
    1565       15989 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1566       15989 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1567             : 
    1568       15989 :                                                         if (uiv < niv) {
    1569        8068 :                                                                 upd = BUNtail(ovi, uip);
    1570       16136 :                                                                 if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1571        8068 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1572             :                                                                         res = LOG_ERR;
    1573        8068 :                                                                 uip++;
    1574        7921 :                                                         } else if (uiv == niv) {
    1575             :                                                                 /* handle == */
    1576        1522 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1577        1522 :                                                                         upd = BUNtail(upi, nip);
    1578        3044 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1579        1522 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1580             :                                                                                 res = LOG_ERR;
    1581             :                                                                 } else {
    1582           0 :                                                                         upd = BUNtail(ovi, uip);
    1583           0 :                                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1584           0 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1585             :                                                                                 res = LOG_ERR;
    1586             :                                                                 }
    1587        1522 :                                                                 uip++;
    1588        1522 :                                                                 nip++;
    1589             :                                                         } else { /* uiv > niv */
    1590        6399 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1591        6255 :                                                                         upd = BUNtail(upi, nip);
    1592       12510 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1593        6255 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1594             :                                                                                 res = LOG_ERR;
    1595             :                                                                 }
    1596        6399 :                                                                 nip++;
    1597             :                                                         }
    1598             :                                                 }
    1599         923 :                                                 while (uip < uie && res == LOG_OK) {
    1600         821 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1601         821 :                                                         upd = BUNtail(ovi, uip);
    1602        1642 :                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1603         821 :                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1604             :                                                                 res = LOG_ERR;
    1605         821 :                                                         uip++;
    1606             :                                                 }
    1607         703 :                                                 while (nip < nie && res == LOG_OK) {
    1608         601 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1609         601 :                                                         if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1610         329 :                                                                 upd = BUNtail(upi, nip);
    1611         658 :                                                                 if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1612         329 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1613             :                                                                         res = LOG_ERR;
    1614             :                                                         }
    1615         601 :                                                         nip++;
    1616             :                                                 }
    1617         102 :                                                 bat_iterator_end(&uii);
    1618         102 :                                                 bat_iterator_end(&tidsi);
    1619         102 :                                                 bat_iterator_end(&ovi);
    1620         102 :                                                 if (res == LOG_OK) {
    1621         102 :                                                         temp_destroy(cs->uibid);
    1622         102 :                                                         temp_destroy(cs->uvbid);
    1623         102 :                                                         nui = transfer_to_systrans(nui);
    1624         102 :                                                         nuv = transfer_to_systrans(nuv);
    1625         102 :                                                         if (nui == NULL || nuv == NULL) {
    1626             :                                                                 res = LOG_ERR;
    1627             :                                                         } else {
    1628         102 :                                                                 cs->uibid = temp_create(nui);
    1629         102 :                                                                 cs->uvbid = temp_create(nuv);
    1630         102 :                                                                 cs->ucnt = BATcount(nui);
    1631             :                                                         }
    1632             :                                                 }
    1633             :                                         }
    1634         102 :                                         bat_destroy(nui);
    1635         102 :                                         bat_destroy(nuv);
    1636             :                                 }
    1637             :                         }
    1638             :                 }
    1639        2800 :                 bat_iterator_end(&upi);
    1640        2800 :                 bat_destroy(b);
    1641        2800 :                 unlock_table(tr->store, t->base.id);
    1642        2800 :                 bat_destroy(ins);
    1643        2800 :                 bat_destroy(ui);
    1644        2800 :                 bat_destroy(uv);
    1645        2800 :                 if (otids != tids)
    1646          10 :                         bat_destroy(tids);
    1647        2800 :                 if (oupdates != updates)
    1648          11 :                         bat_destroy(updates);
    1649        2800 :                 return res;
    1650             :         } else if (is_new || cs->cleared) {
    1651         323 :                 BAT *b = temp_descriptor(cs->bid);
    1652             : 
    1653         323 :                 if (b == NULL) {
    1654             :                         res = LOG_ERR;
    1655             :                 } else {
    1656         323 :                         if (BATcount(b)==0) {
    1657           1 :                                 if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
    1658           0 :                                         res = LOG_ERR;
    1659         322 :                         } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
    1660           0 :                                 res = LOG_ERR;
    1661         323 :                         BBPcold(b->batCacheid);
    1662         323 :                         bat_destroy(b);
    1663             :                 }
    1664             :         }
    1665         323 :         unlock_table(tr->store, t->base.id);
    1666         323 :         if (otids != tids)
    1667           2 :                 bat_destroy(tids);
    1668         323 :         if (oupdates != updates)
    1669          41 :                 bat_destroy(updates);
    1670             :         return res;
    1671             : }
    1672             : 
    1673             : static int
    1674        3123 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1675             : {
    1676        3123 :         return cs_update_bat(tr, bat, t, tids, updates, is_new);
    1677             : }
    1678             : 
    1679             : static void *
    1680           4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
    1681             : {
    1682           4 :         void *newoffsets = NULL;
    1683           4 :         sql_delta *bat = *batp;
    1684           4 :         column_storage *cs = &bat->cs;
    1685           4 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1686             : 
    1687           4 :         if (!u)
    1688             :                 return NULL;
    1689           4 :         BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
    1690           4 :         if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
    1691           0 :                 bat_destroy(u);
    1692           0 :                 return NULL;
    1693             :         } else {
    1694           4 :                 int new = 0;
    1695             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1696           4 :                 if (BATcount(u) >= max_cnt) {
    1697           0 :                         if (max_cnt == INT_MAX) { /* decompress */
    1698             :                                 if (!(b = temp_descriptor(cs->bid))) {
    1699             :                                         bat_destroy(u);
    1700             :                                         return NULL;
    1701             :                                 }
    1702             :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1703             :                                 /* TODO decompress updates if any */
    1704             :                                 bat_destroy(b);
    1705             :                                 assert(newoffsets == NULL);
    1706             :                                 if (!n) {
    1707             :                                         bat_destroy(u);
    1708             :                                         return NULL;
    1709             :                                 }
    1710             :                                 if (cs->ts != tr->tid) {
    1711             :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1712             :                                                 bat_destroy(n);
    1713             :                                                 bat_destroy(u);
    1714             :                                                 return NULL;
    1715             :                                         }
    1716             :                                         cs = &(*batp)->cs;
    1717             :                                         new = 1;
    1718             :                                         cs->uibid = cs->uvbid = 0;
    1719             :                                 }
    1720             :                                 if (cs->bid && !new)
    1721             :                                         temp_destroy(cs->bid);
    1722             :                                 n = transfer_to_systrans(n);
    1723             :                                 if (n == NULL) {
    1724             :                                         bat_destroy(u);
    1725             :                                         return NULL;
    1726             :                                 }
    1727             :                                 bat_set_access(n, BAT_READ);
    1728             :                                 cs->bid = temp_create(n);
    1729             :                                 bat_destroy(n);
    1730             :                                 if (cs->ebid && !new)
    1731             :                                         temp_destroy(cs->ebid);
    1732             :                                 cs->ebid = 0;
    1733             :                                 cs->st = ST_DEFAULT;
    1734             :                                 /* at append_col the column's storage type is cleared */
    1735             :                                 cs->cleared = true;
    1736             :                         } else {
    1737           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1738           0 :                                         GDKfree(newoffsets);
    1739           0 :                                         bat_destroy(u);
    1740           0 :                                         return NULL;
    1741             :                                 }
    1742           0 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
    1743           0 :                                 bat_destroy(b);
    1744           0 :                                 if (!n) {
    1745           0 :                                         GDKfree(newoffsets);
    1746           0 :                                         bat_destroy(u);
    1747           0 :                                         return NULL;
    1748             :                                 }
    1749           0 :                                 if (cs->ts != tr->tid) {
    1750           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1751           0 :                                                 bat_destroy(u);
    1752           0 :                                                 bat_destroy(n);
    1753           0 :                                                 return NULL;
    1754             :                                         }
    1755           0 :                                         cs = &(*batp)->cs;
    1756           0 :                                         new = 1;
    1757           0 :                                         temp_dup(cs->ebid);
    1758           0 :                                         if (cs->uibid) {
    1759           0 :                                                 temp_dup(cs->uibid);
    1760           0 :                                                 temp_dup(cs->uvbid);
    1761             :                                         }
    1762             :                                 }
    1763           0 :                                 if (cs->bid)
    1764           0 :                                         temp_destroy(cs->bid);
    1765           0 :                                 n = transfer_to_systrans(n);
    1766           0 :                                 if (n == NULL) {
    1767           0 :                                         bat_destroy(u);
    1768           0 :                                         return NULL;
    1769             :                                 }
    1770           0 :                                 bat_set_access(n, BAT_READ);
    1771           0 :                                 cs->bid = temp_create(n);
    1772           0 :                                 bat_destroy(n);
    1773           0 :                                 cs->cleared = true;
    1774           0 :                                 i = newoffsets;
    1775             :                         }
    1776             :                 } else { /* append */
    1777           4 :                         i = newoffsets;
    1778             :                 }
    1779             :         }
    1780           4 :         bat_destroy(u);
    1781           4 :         return i;
    1782             : }
    1783             : 
    1784             : static void *
    1785           1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
    1786             : {
    1787           1 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1788           1 :         void *newoffsets = NULL;
    1789           1 :         BAT *b = NULL, *n = NULL;
    1790             : 
    1791           1 :         if (!(b = temp_descriptor(cs->bid)))
    1792             :                 return NULL;
    1793             : 
    1794           1 :         if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
    1795           0 :                 bat_destroy(b);
    1796           0 :                 return NULL;
    1797             :         } else {
    1798             :                 /* returns new offset bat if values within min/max, else decompress */
    1799           1 :                 if (!newoffsets) {
    1800           1 :                         n = FORdecompress_(b, offsetval, tt, PERSISTENT);
    1801           1 :                         bat_destroy(b);
    1802           1 :                         if (!n)
    1803             :                                 return NULL;
    1804             :                         /* TODO decompress updates if any */
    1805           1 :                         if (cs->bid)
    1806           1 :                                 temp_destroy(cs->bid);
    1807           1 :                         n = transfer_to_systrans(n);
    1808           1 :                         if (n == NULL)
    1809             :                                 return NULL;
    1810           1 :                         bat_set_access(n, BAT_READ);
    1811           1 :                         cs->bid = temp_create(n);
    1812           1 :                         cs->st = ST_DEFAULT;
    1813             :                         /* at append_col the column's storage type is cleared */
    1814           1 :                         cs->cleared = true;
    1815           1 :                         b = n;
    1816             :                 } else { /* append */
    1817             :                         i = newoffsets;
    1818             :                 }
    1819             :         }
    1820           1 :         bat_destroy(b);
    1821           1 :         return i;
    1822             : }
    1823             : 
    1824             : static int
    1825        6936 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
    1826             : {
    1827        6936 :         void *oupd = upd;
    1828        6936 :         sql_delta *bat = *batp;
    1829        6936 :         column_storage *cs = &bat->cs;
    1830        6936 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1831        6936 :         assert(!is_oid_nil(rid));
    1832        6936 :         int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
    1833             : 
    1834        6936 :         if (cs->st == ST_DICT) {
    1835             :                 /* possibly a new array is returned */
    1836           0 :                 upd = dict_append_val(tr, batp, upd, 1);
    1837           0 :                 bat = *batp;
    1838           0 :                 cs = &bat->cs;
    1839           0 :                 if (!upd)
    1840             :                         return LOG_ERR;
    1841             :         }
    1842             : 
    1843             :         /* check if rid is insert ? */
    1844        6936 :         if (!inplace) {
    1845             :                 /* check conflict */
    1846        4050 :                 if (segments_is_deleted(s->segs->h, tr, rid)) {
    1847           0 :                         if (oupd != upd)
    1848           0 :                                 GDKfree(upd);
    1849           0 :                         return LOG_CONFLICT;
    1850             :                 }
    1851        4050 :                 BAT *ui, *uv;
    1852             : 
    1853             :                 /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1854             :                 /* currently only one update delta is possible */
    1855        4050 :                 if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1856           0 :                         if (oupd != upd)
    1857           0 :                                 GDKfree(upd);
    1858           0 :                         return LOG_ERR;
    1859             :                 }
    1860             : 
    1861        4050 :                 assert(uv->ttype);
    1862        4050 :                 assert(BATcount(ui) == BATcount(uv));
    1863        8100 :                 if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
    1864        4050 :                     BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
    1865           0 :                         if (oupd != upd)
    1866           0 :                                 GDKfree(upd);
    1867           0 :                         bat_destroy(ui);
    1868           0 :                         bat_destroy(uv);
    1869           0 :                         return LOG_ERR;
    1870             :                 }
    1871        4050 :                 assert(BATcount(ui) == BATcount(uv));
    1872        4050 :                 bat_destroy(ui);
    1873        4050 :                 bat_destroy(uv);
    1874        4050 :                 cs->ucnt++;
    1875             :         } else {
    1876        2886 :                 BAT *b = NULL;
    1877             : 
    1878        2886 :                 if((b = temp_descriptor(cs->bid)) == NULL) {
    1879           0 :                         if (oupd != upd)
    1880           0 :                                 GDKfree(upd);
    1881           0 :                         return LOG_ERR;
    1882             :                 }
    1883        2886 :                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
    1884           0 :                         if (oupd != upd)
    1885           0 :                                 GDKfree(upd);
    1886           0 :                         bat_destroy(b);
    1887           0 :                         return LOG_ERR;
    1888             :                 }
    1889        2886 :                 bat_destroy(b);
    1890             :         }
    1891        6936 :         if (oupd != upd)
    1892           0 :                 GDKfree(upd);
    1893             :         return LOG_OK;
    1894             : }
    1895             : 
    1896             : static int
    1897        6936 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
    1898             : {
    1899        6936 :         int res = LOG_OK;
    1900        6936 :         lock_table(tr->store, t->base.id);
    1901        6936 :         res = cs_update_val(tr, bat, t, rid, upd, is_new);
    1902        6936 :         unlock_table(tr->store, t->base.id);
    1903        6936 :         return res;
    1904             : }
    1905             : 
    1906             : static int
    1907      158836 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
    1908             : {
    1909      158836 :         (void)tr;
    1910      158836 :         if (!ocs)
    1911             :                 return LOG_OK;
    1912      158836 :         cs->bid = ocs->bid;
    1913      158836 :         cs->ebid = ocs->ebid;
    1914      158836 :         cs->uibid = ocs->uibid;
    1915      158836 :         cs->uvbid = ocs->uvbid;
    1916      158836 :         cs->ucnt = ocs->ucnt;
    1917             : 
    1918      158836 :         if (temp) {
    1919       25966 :                 cs->bid = temp_copy(cs->bid, true, false);
    1920       25944 :                 if (cs->bid == BID_NIL)
    1921             :                         return LOG_ERR;
    1922             :         } else {
    1923      132870 :                 temp_dup(cs->bid);
    1924             :         }
    1925      158844 :         if (cs->ebid)
    1926           6 :                 temp_dup(cs->ebid);
    1927      158844 :         cs->ucnt = 0;
    1928      158844 :         cs->uibid = e_bat(TYPE_oid);
    1929      158880 :         cs->uvbid = e_bat(type);
    1930      158883 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1931             :                 return LOG_ERR;
    1932      158883 :         cs->st = ocs->st;
    1933      158883 :         return LOG_OK;
    1934             : }
    1935             : 
    1936             : static void
    1937      320987 : destroy_delta(sql_delta *b, bool recursive)
    1938             : {
    1939      320987 :         if (ATOMIC_DEC(&b->cs.refcnt) > 0)
    1940             :                 return;
    1941      299602 :         if (recursive && b->next)
    1942      128246 :                 destroy_delta(b->next, true);
    1943      299602 :         if (b->cs.uibid)
    1944       99446 :                 temp_destroy(b->cs.uibid);
    1945      299602 :         if (b->cs.uvbid)
    1946       99446 :                 temp_destroy(b->cs.uvbid);
    1947      299602 :         if (b->cs.bid)
    1948      299602 :                 temp_destroy(b->cs.bid);
    1949      299602 :         if (b->cs.ebid)
    1950          61 :                 temp_destroy(b->cs.ebid);
    1951      299602 :         b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
    1952      299602 :         _DELETE(b);
    1953             : }
    1954             : 
    1955             : static sql_delta *
    1956    16084162 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
    1957             : {
    1958    16084162 :         sql_delta *obat = ATOMIC_PTR_GET(&c->data);
    1959             : 
    1960    16084162 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    1961    15951341 :                 return obat;
    1962      132821 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    1963             :                 /* abort */
    1964          12 :                 if (update_conflict)
    1965           4 :                         *update_conflict = true;
    1966           8 :                 else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
    1967           8 :                         return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
    1968           4 :                 return NULL;
    1969             :         }
    1970      132809 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
    1971             :                 return NULL;
    1972      132820 :         sql_delta* bat = ZNEW(sql_delta);
    1973      132865 :         if (!bat)
    1974             :                 return NULL;
    1975      132865 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    1976      132865 :         if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
    1977           0 :                 destroy_delta(bat, false);
    1978           0 :                 return NULL;
    1979             :         }
    1980      132885 :         bat->cs.ts = tr->tid;
    1981             :         /* only one writer else abort */
    1982      132885 :         bat->next = obat;
    1983      132885 :         if (obat)
    1984      132885 :                 bat->nr_updates = obat->nr_updates;
    1985      132885 :         if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
    1986           0 :                 bat->next = NULL;
    1987           0 :                 destroy_delta(bat, false);
    1988           0 :                 if (update_conflict)
    1989           0 :                         *update_conflict = true;
    1990           0 :                 return NULL;
    1991             :         }
    1992             :         return bat;
    1993             : }
    1994             : 
    1995             : static int
    1996       10059 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
    1997             : {
    1998       10059 :         int ok = LOG_OK;
    1999             : 
    2000       10059 :         if (is_bat) {
    2001        3123 :                 BAT *tids = incoming_tids;
    2002        3123 :                 BAT *values = incoming_values;
    2003        3123 :                 if (BATcount(tids) == 0)
    2004             :                         return LOG_OK;
    2005        3123 :                 ok = delta_update_bat(tr, delta, table, tids, values, is_new);
    2006             :         } else {
    2007        6936 :                 ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
    2008             :         }
    2009             :         return ok;
    2010             : }
    2011             : 
    2012             : static int
    2013       10082 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
    2014             : {
    2015       10082 :         int res = LOG_OK;
    2016       10082 :         bool update_conflict = false;
    2017       10082 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2018             : 
    2019       10082 :         if (isbat) {
    2020        3143 :                 BAT *t = tids;
    2021        3143 :                 if (!BATcount(t))
    2022             :                         return LOG_OK;
    2023             :         }
    2024             : 
    2025        9784 :         if (c == NULL)
    2026             :                 return LOG_ERR;
    2027             : 
    2028        9784 :         if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
    2029           4 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2030             : 
    2031        9780 :         assert(delta && delta->cs.ts == tr->tid);
    2032        9780 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    2033        9780 :         if (odelta != delta)
    2034        3457 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    2035             : 
    2036        9780 :         odelta = delta;
    2037        9780 :         if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
    2038             :                 return res;
    2039        9780 :         assert(delta == odelta);
    2040        9780 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2041           0 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2042             :         return res;
    2043             : }
    2044             : 
    2045             : static sql_delta *
    2046        2457 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
    2047             : {
    2048        2457 :         sql_delta *obat = ATOMIC_PTR_GET(&i->data);
    2049             : 
    2050        2457 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    2051        2429 :                 return obat;
    2052          28 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    2053             :                 /* abort */
    2054           0 :                 if (update_conflict)
    2055           0 :                         *update_conflict = true;
    2056           0 :                 return NULL;
    2057             :         }
    2058          28 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
    2059             :                 return NULL;
    2060          28 :         sql_delta* bat = ZNEW(sql_delta);
    2061          28 :         if (!bat)
    2062             :                 return NULL;
    2063          28 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2064          33 :         if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
    2065           0 :                 destroy_delta(bat, false);
    2066           0 :                 return NULL;
    2067             :         }
    2068          28 :         bat->cs.ts = tr->tid;
    2069             :         /* only one writer else abort */
    2070          28 :         bat->next = obat;
    2071          28 :         if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
    2072           0 :                 bat->next = NULL;
    2073           0 :                 destroy_delta(bat, false);
    2074           0 :                 if (update_conflict)
    2075           0 :                         *update_conflict = true;
    2076           0 :                 return NULL;
    2077             :         }
    2078             :         return bat;
    2079             : }
    2080             : 
    2081             : static int
    2082         782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
    2083             : {
    2084         782 :         int res = LOG_OK;
    2085         782 :         bool update_conflict = false;
    2086         782 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    2087             : 
    2088         782 :         if (isbat) {
    2089         782 :                 BAT *t = tids;
    2090         782 :                 if (!BATcount(t))
    2091             :                         return LOG_OK;
    2092             :         }
    2093             : 
    2094         279 :         if (i == NULL)
    2095             :                 return LOG_ERR;
    2096             : 
    2097         279 :         if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
    2098           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2099             : 
    2100         279 :         assert(delta && delta->cs.ts == tr->tid);
    2101         279 :         if (odelta != delta)
    2102          22 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    2103             : 
    2104         279 :         odelta = delta;
    2105         279 :         res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
    2106         279 :         assert(delta == odelta);
    2107             :         return res;
    2108             : }
    2109             : 
    2110             : static int
    2111      149122 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
    2112             : {
    2113      149122 :         BAT *b, *oi = i;
    2114      149122 :         int err = 0;
    2115      149122 :         sql_delta *bat = *batp;
    2116             : 
    2117      149122 :         assert(!offsets || BATcount(offsets) == BATcount(i));
    2118      149122 :         if (!BATcount(i))
    2119             :                 return LOG_OK;
    2120      149122 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
    2121             :                 return LOG_ERR;
    2122             : 
    2123      149122 :         lock_column(tr->store, id);
    2124      149900 :         if (bat->cs.st == ST_DICT) {
    2125          13 :                 BAT *ni = dict_append_bat(tr, batp, oi);
    2126          13 :                 bat = *batp;
    2127          13 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2128           0 :                         bat_destroy(oi);
    2129          13 :                 oi = ni;
    2130          13 :                 if (!oi) {
    2131           0 :                         unlock_column(tr->store, id);
    2132           0 :                         return LOG_ERR;
    2133             :                 }
    2134             :         }
    2135      149900 :         if (bat->cs.st == ST_FOR) {
    2136           0 :                 BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
    2137           0 :                 bat = *batp;
    2138           0 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2139           0 :                         bat_destroy(oi);
    2140           0 :                 oi = ni;
    2141           0 :                 if (!oi) {
    2142           0 :                         unlock_column(tr->store, id);
    2143           0 :                         return LOG_ERR;
    2144             :                 }
    2145             :         }
    2146             : 
    2147      149900 :         b = temp_descriptor(bat->cs.bid);
    2148      149803 :         if (b == NULL) {
    2149           0 :                 unlock_column(tr->store, id);
    2150           0 :                 if (oi != i)
    2151           0 :                         bat_destroy(oi);
    2152           0 :                 return LOG_ERR;
    2153             :         }
    2154      149803 :         if (!offsets && offset == b->hseqbase+BATcount(b)) {
    2155      149615 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2156           0 :                         err = 1;
    2157         176 :         } else if (!offsets) {
    2158         176 :                 if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
    2159           0 :                         err = 1;
    2160          12 :         } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
    2161           0 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2162           0 :                         err = 1;
    2163          12 :         } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
    2164           0 :                         err = 1;
    2165             :         }
    2166      149563 :         bat_destroy(b);
    2167      150447 :         unlock_column(tr->store, id);
    2168             : 
    2169      150493 :         if (oi != i)
    2170          13 :                 bat_destroy(oi);
    2171      150493 :         return (err)?LOG_ERR:LOG_OK;
    2172             : }
    2173             : 
    2174             : // Look at the offsets and find where the replacements end and the appends begin.
    2175             : static BUN
    2176           0 : start_of_appends(BAT *offsets, BUN bcnt)
    2177             : {
    2178           0 :         BUN ocnt = BATcount(offsets);
    2179           0 :         if (ocnt == 0)
    2180             :                 return 0;
    2181             : 
    2182           0 :         BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
    2183           0 :         if (highest < bcnt)
    2184             :                 // all are replacements
    2185             :                 return ocnt;
    2186             : 
    2187             :         // reason backward to find the first append.
    2188             :         // Suppose offsets has 15 entries, bcnt == 100
    2189             :         // and the highest offset in offsets is 109.
    2190           0 :         BUN new_bcnt = highest + 1; // 110
    2191           0 :         BUN nappends = new_bcnt - bcnt; // 10
    2192           0 :         BUN nreplacements = ocnt - nappends; // 5
    2193             : 
    2194             :         // The first append should be to position bcnt
    2195           0 :         assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
    2196             : 
    2197             :         return nreplacements;
    2198             : }
    2199             : 
    2200             : 
    2201             : static int
    2202    15787672 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
    2203             : {
    2204    15787672 :         void *oi = i;
    2205    15787672 :         BAT *b;
    2206    15787672 :         lock_column(tr->store, id);
    2207    15806971 :         sql_delta *bat = *batp;
    2208             : 
    2209    15806971 :         if (bat->cs.st == ST_DICT) {
    2210             :                 /* possibly a new array is returned */
    2211           4 :                 i = dict_append_val(tr, batp, i, cnt);
    2212           4 :                 bat = *batp;
    2213           4 :                 if (!i) {
    2214           0 :                         unlock_column(tr->store, id);
    2215           0 :                         return LOG_ERR;
    2216             :                 }
    2217             :         }
    2218    15806971 :         if (bat->cs.st == ST_FOR) {
    2219             :                 /* possibly a new array is returned */
    2220           1 :                 i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
    2221           1 :                 bat = *batp;
    2222           1 :                 if (!i) {
    2223           0 :                         unlock_column(tr->store, id);
    2224           0 :                         return LOG_ERR;
    2225             :                 }
    2226             :         }
    2227             : 
    2228    15806971 :         b = temp_descriptor(bat->cs.bid);
    2229    15812276 :         if (b == NULL) {
    2230           0 :                 if (i != oi)
    2231           0 :                         GDKfree(i);
    2232           0 :                 unlock_column(tr->store, id);
    2233           0 :                 return LOG_ERR;
    2234             :         }
    2235    15812276 :         BUN bcnt = BATcount(b);
    2236             : 
    2237    15812276 :         if (offsets) {
    2238             :                 // The first few might be replacements while later items might be appends.
    2239             :                 // Handle the replacements here while leaving the appends to the code below.
    2240           0 :                 BUN nreplacements = start_of_appends(offsets, bcnt);
    2241             : 
    2242           0 :                 oid *start = Tloc(offsets, 0);
    2243           0 :                 if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
    2244           0 :                         bat_destroy(b);
    2245           0 :                         if (i != oi)
    2246           0 :                                 GDKfree(i);
    2247           0 :                         unlock_column(tr->store, id);
    2248           0 :                         return LOG_ERR;
    2249             :                 }
    2250             : 
    2251             :                 // Replacements have been handled. The rest are appends.
    2252           0 :                 assert(offset == oid_nil);
    2253           0 :                 offset = bcnt;
    2254           0 :                 cnt -= nreplacements;
    2255             :         }
    2256             : 
    2257    15812276 :         if (bcnt > offset){
    2258      553914 :                 size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
    2259      553914 :                 if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
    2260           0 :                         bat_destroy(b);
    2261           0 :                         if (i != oi)
    2262           0 :                                 GDKfree(i);
    2263           0 :                         unlock_column(tr->store, id);
    2264           0 :                         return LOG_ERR;
    2265             :                 }
    2266      553676 :                 cnt -= ccnt;
    2267      553676 :                 offset += ccnt;
    2268             :         }
    2269    15812038 :         if (cnt) {
    2270    15258004 :                 if (BATcount(b) < offset) { /* add space */
    2271        8267 :                         BUN d = offset - BATcount(b);
    2272        8267 :                         if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
    2273           0 :                                 bat_destroy(b);
    2274           0 :                                 if (i != oi)
    2275           0 :                                         GDKfree(i);
    2276           0 :                                 unlock_column(tr->store, id);
    2277           0 :                                 return LOG_ERR;
    2278             :                         }
    2279             :                 }
    2280    15258005 :                 if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
    2281           0 :                         bat_destroy(b);
    2282           0 :                         if (i != oi)
    2283           0 :                                 GDKfree(i);
    2284           0 :                         unlock_column(tr->store, id);
    2285           0 :                         return LOG_ERR;
    2286             :                 }
    2287             :         }
    2288    15812593 :         bat_destroy(b);
    2289    15808551 :         if (i != oi)
    2290           4 :                 GDKfree(i);
    2291    15808551 :         unlock_column(tr->store, id);
    2292    15808551 :         return LOG_OK;
    2293             : }
    2294             : 
    2295             : static int
    2296       25971 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
    2297             : {
    2298       25971 :         if (!(bat->segs = new_segments(tr, 0)))
    2299             :                 return LOG_ERR;
    2300       25969 :         return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
    2301             : }
    2302             : 
    2303             : static int
    2304    15936603 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type)
    2305             : {
    2306    15936603 :         int ok = LOG_OK;
    2307             : 
    2308    15936603 :         if ((*delta)->cs.merged)
    2309       36630 :                 (*delta)->cs.merged = false; /* TODO needs to move */
    2310    15936603 :         if (isbat) {
    2311      148979 :                 BAT *bat = incoming_data;
    2312             : 
    2313      148979 :                 if (BATcount(bat))
    2314      149133 :                         ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
    2315             :         } else {
    2316    15787624 :                 ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
    2317             :         }
    2318    15966571 :         return ok;
    2319             : }
    2320             : 
    2321             : static int
    2322    15947327 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2323             : {
    2324    15947327 :         int res = LOG_OK;
    2325    15947327 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2326             : 
    2327    15947327 :         if (isbat) {
    2328      149302 :                 BAT *t = data;
    2329      149302 :                 if (!BATcount(t))
    2330             :                         return LOG_OK;
    2331             :         }
    2332             : 
    2333    15946550 :         if ((delta = bind_col_data(tr, c, NULL)) == NULL)
    2334             :                 return LOG_ERR;
    2335             : 
    2336    15944944 :         assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
    2337             : 
    2338    15944944 :         odelta = delta;
    2339    15944944 :         if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type)) != LOG_OK)
    2340             :                 return res;
    2341    15961190 :         if (odelta != delta) {
    2342           0 :                 delta->next = odelta;
    2343           0 :                 if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
    2344           0 :                         delta->next = NULL;
    2345           0 :                         destroy_delta(delta, false);
    2346           0 :                         return LOG_CONFLICT;
    2347             :                 }
    2348             :         }
    2349    15961190 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2350           1 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2351             :         return res;
    2352             : }
    2353             : 
    2354             : static int
    2355        2184 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2356             : {
    2357        2184 :         int res = LOG_OK;
    2358        2184 :         sql_delta *delta;
    2359             : 
    2360        2184 :         if (isbat) {
    2361        1010 :                 BAT *t = data;
    2362        1010 :                 if (!BATcount(t))
    2363             :                         return LOG_OK;
    2364             :         }
    2365             : 
    2366        2172 :         if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
    2367             :                 return LOG_ERR;
    2368             : 
    2369        2172 :         assert(delta->cs.st == ST_DEFAULT);
    2370             : 
    2371        2172 :         res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL);
    2372        2172 :         return res;
    2373             : }
    2374             : 
    2375             : static int
    2376       79235 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
    2377             : {
    2378       79235 :         int err = 0;
    2379             : 
    2380             :         /* TODO check for conflicting updates */
    2381       79235 :         (void)rid;
    2382       79235 :         (void)cnt;
    2383      582689 :         for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
    2384      503454 :                 sql_column *c = n->data;
    2385      503454 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    2386             : 
    2387             :                 /* check for active updates */
    2388      503454 :                 if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
    2389             :                         return 1;
    2390             :         }
    2391             :         return 0;
    2392             : }
    2393             : 
    2394             : static int
    2395       75337 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
    2396             : {
    2397       75337 :         int in_transaction = segments_in_transaction(tr, t);
    2398             : 
    2399       75337 :         lock_table(tr->store, t->base.id);
    2400             :         /* find segment of rid, split, mark new segment deleted (for tr->tid) */
    2401       75337 :         segment *seg = s->segs->h, *p = NULL;
    2402    52566086 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2403    52566086 :                 if (seg->start <= rid && seg->end > rid) {
    2404       75337 :                         if (!SEG_VALID_4_DELETE(seg,tr)) {
    2405           4 :                                 unlock_table(tr->store, t->base.id);
    2406           4 :                                 return LOG_CONFLICT;
    2407             :                         }
    2408       75333 :                         if (deletes_conflict_updates( tr, t, rid, 1)) {
    2409           0 :                                 unlock_table(tr->store, t->base.id);
    2410           0 :                                 return LOG_CONFLICT;
    2411             :                         }
    2412       75333 :                         if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
    2413           0 :                                 unlock_table(tr->store, t->base.id);
    2414           0 :                                 return LOG_ERR;
    2415             :                         }
    2416             :                         break;
    2417             :                 }
    2418             :         }
    2419       75333 :         unlock_table(tr->store, t->base.id);
    2420       75333 :         if (!in_transaction)
    2421       13065 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2422             :         return LOG_OK;
    2423             : }
    2424             : 
    2425             : static int
    2426        3900 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
    2427             : {
    2428        3900 :         segment *seg = *Seg, *p = NULL;
    2429       13343 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2430       13341 :                 if (seg->start <= start && seg->end > start) {
    2431        3950 :                         size_t lcnt = cnt;
    2432        3950 :                         if (start+lcnt > seg->end)
    2433          54 :                                 lcnt = seg->end-start;
    2434        3950 :                         if (SEG_IS_DELETED(seg, tr)) {
    2435          47 :                                 start += lcnt;
    2436          47 :                                 cnt -= lcnt;
    2437          47 :                                 continue;
    2438        3903 :                         } else if (!SEG_VALID_4_DELETE(seg, tr))
    2439           1 :                                 return LOG_CONFLICT;
    2440        3902 :                         if (deletes_conflict_updates( tr, t, start, lcnt))
    2441             :                                 return LOG_CONFLICT;
    2442        3902 :                         *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
    2443        3902 :                         if (!seg)
    2444             :                                 return LOG_ERR;
    2445        3902 :                         start += lcnt;
    2446        3902 :                         cnt -= lcnt;
    2447             :                 }
    2448       13293 :                 if (start+cnt <= seg->end)
    2449             :                         break;
    2450             :         }
    2451             :         return LOG_OK;
    2452             : }
    2453             : 
    2454             : static int
    2455         721 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
    2456             : {
    2457         721 :         segment *seg = s->segs->h;
    2458         721 :         return seg_delete_range(tr, t, s, &seg, start, cnt);
    2459             : }
    2460             : 
    2461             : static int
    2462         319 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
    2463             : {
    2464         319 :         int in_transaction = segments_in_transaction(tr, t);
    2465         319 :         BAT *oi = i;    /* update ids */
    2466         319 :         int ok = LOG_OK;
    2467             : 
    2468         319 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
    2469             :                 return LOG_ERR;
    2470         319 :         if (BATcount(i)) {
    2471         568 :                 if (BATtdense(i)) {
    2472         249 :                         size_t start = i->tseqbase;
    2473         249 :                         size_t cnt = BATcount(i);
    2474             : 
    2475         249 :                         lock_table(tr->store, t->base.id);
    2476         249 :                         ok = delete_range(tr, t, s, start, cnt);
    2477         249 :                         unlock_table(tr->store, t->base.id);
    2478          70 :                 } else if (complex_cand(i)) {
    2479           0 :                         struct canditer ci;
    2480           0 :                         oid f = 0, l = 0, cur = 0;
    2481             : 
    2482           0 :                         canditer_init(&ci, NULL, i);
    2483           0 :                         cur = f = canditer_next(&ci);
    2484             : 
    2485           0 :                         lock_table(tr->store, t->base.id);
    2486           0 :                         if (!is_oid_nil(f)) {
    2487           0 :                                 segment *seg = s->segs->h;
    2488           0 :                                 for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
    2489           0 :                                         if (cur+1 == l) {
    2490           0 :                                                 cur++;
    2491           0 :                                                 continue;
    2492             :                                         }
    2493           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2494           0 :                                         f = cur = l;
    2495             :                                 }
    2496           0 :                                 if (ok == LOG_OK)
    2497           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2498             :                         }
    2499           0 :                         unlock_table(tr->store, t->base.id);
    2500             :                 } else {
    2501          70 :                         if (!i->tsorted) {
    2502           0 :                                 assert(oi == i);
    2503           0 :                                 BAT *ni = NULL;
    2504           0 :                                 if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
    2505           0 :                                         ok = LOG_ERR;
    2506           0 :                                 if (ni)
    2507           0 :                                         i = ni;
    2508             :                         }
    2509          70 :                         assert(i->tsorted);
    2510          70 :                         BUN icnt = BATcount(i);
    2511          70 :                         BATiter ii = bat_iterator(i);
    2512          70 :                         oid *o = ii.base, n = o[0]+1;
    2513          70 :                         size_t lcnt = 1;
    2514             : 
    2515          70 :                         lock_table(tr->store, t->base.id);
    2516          70 :                         segment *seg = s->segs->h;
    2517       23233 :                         for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
    2518       23163 :                                 if (o[i] == n) {
    2519       22803 :                                         lcnt++;
    2520       22803 :                                         n++;
    2521             :                                 } else {
    2522         360 :                                         ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2523         360 :                                         lcnt = 0;
    2524             :                                 }
    2525       23163 :                                 if (!lcnt) {
    2526         360 :                                         n = o[i]+1;
    2527         360 :                                         lcnt = 1;
    2528             :                                 }
    2529             :                         }
    2530          70 :                         bat_iterator_end(&ii);
    2531          70 :                         if (lcnt && ok == LOG_OK)
    2532          70 :                                 ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2533          70 :                         unlock_table(tr->store, t->base.id);
    2534             :                 }
    2535             :         }
    2536         319 :         if (i != oi)
    2537          25 :                 bat_destroy(i);
    2538             :         // assert
    2539         319 :         if (!in_transaction)
    2540         271 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2541             :         return ok;
    2542             : }
    2543             : 
    2544             : static void
    2545       52468 : destroy_segments(segments *s)
    2546             : {
    2547       52468 :         if (!s || sql_ref_dec(&s->r) > 0)
    2548           0 :                 return;
    2549       52468 :         segment *seg = s->h;
    2550      116095 :         while(seg) {
    2551       63627 :                 segment *n = ATOMIC_PTR_GET(&seg->next);
    2552       63627 :                 ATOMIC_PTR_DESTROY(&seg->next);
    2553       63627 :                 _DELETE(seg);
    2554       63627 :                 seg = n;
    2555             :         }
    2556       52468 :         _DELETE(s);
    2557             : }
    2558             : 
    2559             : static void
    2560       53927 : destroy_storage(storage *bat)
    2561             : {
    2562       53927 :         if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
    2563             :                 return;
    2564       52326 :         if (bat->next)
    2565        6981 :                 destroy_storage(bat->next);
    2566       52326 :         destroy_segments(bat->segs);
    2567       52326 :         if (bat->cs.uibid)
    2568       31145 :                 temp_destroy(bat->cs.uibid);
    2569       52326 :         if (bat->cs.uvbid)
    2570       31145 :                 temp_destroy(bat->cs.uvbid);
    2571       52326 :         if (bat->cs.bid)
    2572       52326 :                 temp_destroy(bat->cs.bid);
    2573       52326 :         bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
    2574       52326 :         _DELETE(bat);
    2575             : }
    2576             : 
    2577             : static int
    2578      175684 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
    2579             : {
    2580      175684 :         if (uncommitted) {
    2581      448881 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2582      296701 :                         if (!VALID_4_READ(s->ts,tr))
    2583             :                                 return 1;
    2584             :         } else {
    2585      294713 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2586      272618 :                         if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
    2587             :                                 return 1;
    2588             :         }
    2589             : 
    2590             :         return 0;
    2591             : }
    2592             : 
    2593             : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
    2594             : 
    2595             : storage *
    2596     2209587 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
    2597             : {
    2598     2209587 :         storage *obat;
    2599             : 
    2600     2209587 :         obat = ATOMIC_PTR_GET(&t->data);
    2601             : 
    2602     2209587 :         if (obat->cs.ts != tr->tid)
    2603     1531196 :                 if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
    2604     1531141 :                         if (obat->cs.ts >= TRANSACTION_ID_BASE) {
    2605             :                                 /* abort */
    2606       15400 :                                 if (clear)
    2607       15400 :                                         *clear = true;
    2608       15400 :                                 return NULL;
    2609             :                         }
    2610             : 
    2611     2194187 :         if (!clear)
    2612             :                 return obat;
    2613             : 
    2614             :         /* remainder is only to handle clear */
    2615       26375 :         if (segments_conflict(tr, obat->segs, 1)) {
    2616         410 :                 *clear = true;
    2617         410 :                 return NULL;
    2618             :         }
    2619       25967 :         if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
    2620             :                 return NULL;
    2621       25967 :         storage *bat = ZNEW(storage);
    2622       25971 :         if (!bat)
    2623             :                 return NULL;
    2624       25971 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2625       25971 :         if (dup_storage(tr, obat, bat) != LOG_OK) {
    2626           0 :                 destroy_storage(bat);
    2627           0 :                 return NULL;
    2628             :         }
    2629       25971 :         bat->cs.cleared = true;
    2630       25971 :         bat->cs.ts = tr->tid;
    2631             :         /* only one writer else abort */
    2632       25971 :         bat->next = obat;
    2633       25971 :         if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
    2634          10 :                 bat->next = NULL;
    2635          10 :                 destroy_storage(bat);
    2636          10 :                 if (clear)
    2637          10 :                         *clear = true;
    2638          10 :                 return NULL;
    2639             :         }
    2640             :         return bat;
    2641             : }
    2642             : 
    2643             : static int
    2644       75704 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
    2645             : {
    2646       75704 :         int ok = LOG_OK;
    2647       75704 :         BAT *b = ib;
    2648       75704 :         storage *bat;
    2649             : 
    2650       75704 :         if (isbat && !BATcount(b))
    2651             :                 return ok;
    2652             : 
    2653       75656 :         if (t == NULL)
    2654             :                 return LOG_ERR;
    2655             : 
    2656       75656 :         if ((bat = bind_del_data(tr, t, NULL)) == NULL)
    2657             :                 return LOG_ERR;
    2658             : 
    2659       75656 :         if (isbat)
    2660         319 :                 ok = storage_delete_bat(tr, t, bat, ib);
    2661             :         else
    2662       75337 :                 ok = storage_delete_val(tr, t, bat, *(oid*)ib);
    2663             :         return ok;
    2664             : }
    2665             : 
    2666             : static size_t
    2667           0 : dcount_col(sql_trans *tr, sql_column *c)
    2668             : {
    2669           0 :         sql_delta *b;
    2670             : 
    2671           0 :         if (!isTable(c->t))
    2672             :                 return 0;
    2673           0 :         b = col_timestamp_delta(tr, c);
    2674           0 :         if (!b)
    2675             :                 return 1;
    2676             : 
    2677           0 :         storage *s = ATOMIC_PTR_GET(&c->t->data);
    2678           0 :         if (!s || !s->segs->t)
    2679             :                 return 1;
    2680           0 :         size_t cnt = s->segs->t->end;
    2681           0 :         if (cnt) {
    2682           0 :                 BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
    2683           0 :                 size_t dcnt = 0;
    2684             : 
    2685           0 :                 if (v)
    2686           0 :                         dcnt = BATguess_uniques(v, NULL);
    2687           0 :                 return dcnt;
    2688             :         }
    2689             :         return cnt;
    2690             : }
    2691             : 
    2692             : static BAT *
    2693     3900208 : bind_no_view(BAT *b, bool quick)
    2694             : {
    2695     3900208 :         if (VIEWtparent(b)) { /* If it is a view get the parent BAT */
    2696     3896179 :                 BAT *nb = BBP_desc(VIEWtparent(b));
    2697     3896179 :                 bat_destroy(b);
    2698     3896207 :                 if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
    2699             :                         return NULL;
    2700             :         }
    2701             :         return b;
    2702             : }
    2703             : 
    2704             : static int
    2705           0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
    2706             : {
    2707           0 :         int ok = 0;
    2708           0 :         assert(tr->active);
    2709           0 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2710           0 :                 return 0;
    2711           0 :         lock_column(tr->store, c->base.id);
    2712           0 :         if (unique_est) {
    2713           0 :                 sql_delta *d;
    2714           0 :                 if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
    2715           0 :                         BAT *b;
    2716           0 :                         if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
    2717           0 :                                 MT_lock_set(&b->theaplock);
    2718           0 :                                 b->tunique_est = *unique_est;
    2719           0 :                                 MT_lock_unset(&b->theaplock);
    2720           0 :                                 bat_destroy(b);
    2721             :                         }
    2722             :                 }
    2723             :         }
    2724           0 :         if (min) {
    2725           0 :                 _DELETE(c->min);
    2726           0 :                 size_t minlen = ATOMlen(c->type.type->localtype, min);
    2727           0 :                 if ((c->min = GDKmalloc(minlen)) != NULL) {
    2728           0 :                         memcpy(c->min, min, minlen);
    2729           0 :                         ok = 1;
    2730             :                 }
    2731             :         }
    2732           0 :         if (max) {
    2733           0 :                 _DELETE(c->max);
    2734           0 :                 size_t maxlen = ATOMlen(c->type.type->localtype, max);
    2735           0 :                 if ((c->max = GDKmalloc(maxlen)) != NULL) {
    2736           0 :                         memcpy(c->max, max, maxlen);
    2737           0 :                         ok = 1;
    2738             :                 }
    2739             :         }
    2740           0 :         unlock_column(tr->store, c->base.id);
    2741           0 :         return ok;
    2742             : }
    2743             : 
    2744             : static int
    2745          19 : min_max_col(sql_trans *tr, sql_column *c)
    2746             : {
    2747          19 :         int ok = 0;
    2748          19 :         BAT *b = NULL;
    2749          19 :         sql_delta *d = NULL;
    2750             : 
    2751          19 :         assert(tr->active);
    2752          19 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2753           0 :                 return 0;
    2754          19 :         if (c->min && c->max)
    2755             :                 return 1;
    2756          19 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2757          19 :                 if (d->cs.st == ST_FOR)
    2758             :                         return 0;
    2759          19 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2760          19 :                 lock_column(tr->store, c->base.id);
    2761          19 :                 if (c->min && c->max) {
    2762           0 :                         unlock_column(tr->store, c->base.id);
    2763           0 :                         return 1;
    2764             :                 }
    2765          19 :                 _DELETE(c->min);
    2766          19 :                 _DELETE(c->max);
    2767          19 :                 if ((b = bind_col(tr, c, access))) {
    2768          19 :                         if (!(b = bind_no_view(b, false))) {
    2769           0 :                                 unlock_column(tr->store, c->base.id);
    2770           0 :                                 return 0;
    2771             :                         }
    2772          19 :                         BATiter bi = bat_iterator(b);
    2773          19 :                         if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
    2774          16 :                                 const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
    2775          16 :                                 size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
    2776             : 
    2777          16 :                                 if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
    2778           0 :                                         _DELETE(c->min);
    2779           0 :                                         _DELETE(c->max);
    2780             :                                 } else {
    2781          16 :                                         memcpy(c->min, nmin, minlen);
    2782          16 :                                         memcpy(c->max, nmax, maxlen);
    2783          16 :                                         ok = 1;
    2784             :                                 }
    2785             :                         }
    2786          19 :                         bat_iterator_end(&bi);
    2787          19 :                         bat_destroy(b);
    2788             :                 }
    2789          19 :                 unlock_column(tr->store, c->base.id);
    2790             :         }
    2791             :         return ok;
    2792             : }
    2793             : 
    2794             : static size_t
    2795          17 : count_segs(segment *s)
    2796             : {
    2797          17 :         size_t nr = 0;
    2798             : 
    2799          72 :         for( ; s; s = ATOMIC_PTR_GET(&s->next))
    2800          55 :                 nr++;
    2801          17 :         return nr;
    2802             : }
    2803             : 
    2804             : static size_t
    2805          34 : count_del(sql_trans *tr, sql_table *t, int access)
    2806             : {
    2807          34 :         storage *d;
    2808             : 
    2809          34 :         if (!isTable(t))
    2810             :                 return 0;
    2811          34 :         d = tab_timestamp_storage(tr, t);
    2812          34 :         if (!d)
    2813             :                 return 0;
    2814          34 :         if (access == 2)
    2815           0 :                 return d->cs.ucnt;
    2816          34 :         if (access == 1)
    2817           0 :                 return count_inserts(d->segs->h, tr);
    2818          34 :         if (access == 10) /* special case for counting the number of segments */
    2819          17 :                 return count_segs(d->segs->h);
    2820          17 :         return count_deletes(d->segs->h, tr);
    2821             : }
    2822             : 
    2823             : static int
    2824       23184 : sorted_col(sql_trans *tr, sql_column *col)
    2825             : {
    2826       23184 :         int sorted = 0;
    2827             : 
    2828       23184 :         assert(tr->active);
    2829       23184 :         if (!isTable(col->t) || !col->t->s)
    2830             :                 return 0;
    2831             : 
    2832       23184 :         if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
    2833       23165 :                 BAT *b = bind_col(tr, col, QUICK);
    2834             : 
    2835       23165 :                 if (b)
    2836       23165 :                         sorted = b->tsorted || b->trevsorted;
    2837             :         }
    2838             :         return sorted;
    2839             : }
    2840             : 
    2841             : static int
    2842        7986 : unique_col(sql_trans *tr, sql_column *col)
    2843             : {
    2844        7986 :         int distinct = 0;
    2845             : 
    2846        7986 :         assert(tr->active);
    2847        7986 :         if (!isTable(col->t) || !col->t->s)
    2848             :                 return 0;
    2849             : 
    2850        7986 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2851        7986 :                 BAT *b = bind_col(tr, col, QUICK);
    2852             : 
    2853        7986 :                 if (b)
    2854        7986 :                         distinct = b->tkey;
    2855             :         }
    2856             :         return distinct;
    2857             : }
    2858             : 
    2859             : static int
    2860        2161 : double_elim_col(sql_trans *tr, sql_column *col)
    2861             : {
    2862        2161 :         int de = 0;
    2863        2161 :         sql_delta *d;
    2864             : 
    2865        2161 :         assert(tr->active);
    2866        2161 :         if (!isTable(col->t) || !col->t->s)
    2867             :                 return 0;
    2868             : 
    2869        2161 :         if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
    2870           6 :                 if (d->cs.st == ST_DICT) {
    2871           6 :                         BAT *b = bind_col(tr, col, QUICK);
    2872           6 :                         if (b && b->ttype == TYPE_bte)
    2873             :                                 de = 1;
    2874           0 :                         else if (b && b->ttype == TYPE_sht)
    2875        2161 :                                 de = 2;
    2876             :                 }
    2877        2155 :         } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
    2878        2155 :                 BAT *b = bind_col(tr, col, QUICK);
    2879             : 
    2880        2155 :                 if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
    2881        2155 :                         de = GDK_ELIMDOUBLES(b->tvheap);
    2882        2155 :                         if (de)
    2883        1925 :                                 de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
    2884             :                 }
    2885        1925 :                 assert(de >= 0 && de <= 16);
    2886             :         }
    2887             :         return de;
    2888             : }
    2889             : 
    2890             : static int
    2891     3935409 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
    2892             : {
    2893     3935409 :         int ok = 0;
    2894     3935409 :         BAT *b = NULL, *off = NULL, *upv = NULL;
    2895     3935409 :         sql_delta *d = NULL;
    2896             : 
    2897     3935409 :         (void) tr;
    2898     3935409 :         assert(tr->active);
    2899     3935409 :         *nonil = false;
    2900     3935409 :         *unique = false;
    2901     3935409 :         *unique_est = 0.0;
    2902     3935409 :         if (!c || !isTable(c->t) || !c->t->s)
    2903             :                 return ok;
    2904             : 
    2905     3935171 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2906     3896899 :                 if (d->cs.st == ST_FOR) {
    2907          27 :                         *nonil = true; /* TODO for min/max. I will do it later */
    2908          27 :                         return ok;
    2909             :                 }
    2910     3896872 :                 int eclass = c->type.type->eclass;
    2911     3896872 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2912     3896872 :                 if ((b = bind_col(tr, c, access))) {
    2913     3897003 :                         if (!(b = bind_no_view(b, false)))
    2914           0 :                                 return ok;
    2915     3897360 :                         BATiter bi = bat_iterator(b);
    2916     3897367 :                         *nonil = bi.nonil && !bi.nil;
    2917             : 
    2918     3897367 :                         if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
    2919     3590000 :                                 d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
    2920     2309676 :                                 if (c->min && VALinit(min, bi.type, c->min))
    2921             :                                         ok |= 1;
    2922     2309622 :                                 else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
    2923     2299795 :                                         ok |= 1;
    2924     2309609 :                                 if (c->max && VALinit(max, bi.type, c->max))
    2925          54 :                                         ok |= 2;
    2926     2309543 :                                 else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
    2927     2293532 :                                         ok |= 2;
    2928             :                         }
    2929     3897250 :                         if (d->cs.ucnt == 0) {
    2930     3892214 :                                 if (d->cs.st == ST_DEFAULT) {
    2931     3891514 :                                         *unique = bi.key;
    2932     3891514 :                                         *unique_est = bi.unique_est;
    2933     3891514 :                                         if (*unique_est == 0)
    2934     1252870 :                                                 *unique_est = (double)BATguess_uniques(b,NULL);
    2935         700 :                                 } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
    2936             :                                         /* for dict, check the offsets bat for uniqueness */
    2937         700 :                                         MT_lock_set(&off->theaplock);
    2938         700 :                                         *unique = off->tkey;
    2939         700 :                                         *unique_est = off->tunique_est;
    2940         700 :                                         MT_lock_unset(&off->theaplock);
    2941             :                                 }
    2942             :                         }
    2943     3898047 :                         bat_iterator_end(&bi);
    2944     3897067 :                         bat_destroy(b);
    2945     3897208 :                         if (*nonil && d->cs.ucnt > 0) {
    2946             :                                 /* This could use a quick descriptor */
    2947        2795 :                                 if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
    2948           0 :                                         *nonil = false;
    2949             :                                 } else {
    2950        2795 :                                         MT_lock_set(&upv->theaplock);
    2951        2795 :                                         *nonil &= upv->tnonil && !upv->tnil;
    2952        2795 :                                         MT_lock_unset(&upv->theaplock);
    2953        2795 :                                         bat_destroy(upv);
    2954             :                                 }
    2955             :                         }
    2956             :                 }
    2957             :         }
    2958             :         return ok;
    2959             : }
    2960             : 
    2961             : static int
    2962         257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
    2963             : {
    2964         257 :         assert(tr->active);
    2965         257 :         if (!isTable(col->t) || !col->t->s)
    2966             :                 return LOG_OK;
    2967             : 
    2968         252 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2969         252 :                 BAT *b = bind_col(tr, col, QUICK);
    2970             : 
    2971         252 :                 if (b) { /* add props for ranges [min, max> */
    2972         252 :                         MT_lock_set(&b->theaplock);
    2973         252 :                         if (add_range) {
    2974         179 :                                 BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
    2975         179 :                                 if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
    2976         103 :                                         BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
    2977             :                                 else
    2978          76 :                                         BATrmprop_nolock(b, GDK_MAX_BOUND);
    2979         179 :                                 if (!pt->with_nills || !col->null)
    2980         117 :                                         BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    2981             :                         } else {
    2982          73 :                                 BATrmprop_nolock(b, GDK_MIN_BOUND);
    2983          73 :                                 BATrmprop_nolock(b, GDK_MAX_BOUND);
    2984          73 :                                 BATrmprop_nolock(b, GDK_NOT_NULL);
    2985             :                         }
    2986         252 :                         MT_lock_unset(&b->theaplock);
    2987             :                 }
    2988             :         }
    2989             :         return LOG_OK;
    2990             : }
    2991             : 
    2992             : static int
    2993        4737 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
    2994             : {
    2995        4737 :         assert(tr->active);
    2996        4737 :         if (!isTable(col->t) || !col->t->s)
    2997             :                 return LOG_OK;
    2998             : 
    2999        4707 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    3000        4707 :                 BAT *b = bind_col(tr, col, QUICK);
    3001             : 
    3002        4707 :                 if (b) { /* add props for ranges [min, max> */
    3003        4707 :                         if (not_null) {
    3004        4705 :                                 BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    3005             :                         } else {
    3006           2 :                                 BATrmprop(b, GDK_NOT_NULL);
    3007             :                         }
    3008             :                 }
    3009             :         }
    3010             :         return LOG_OK;
    3011             : }
    3012             : 
    3013             : static int
    3014       32528 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
    3015             : {
    3016       32528 :         sqlstore *store = tr->store;
    3017       32528 :         int bid = log_find_bat(store->logger, id);
    3018       32528 :         if (bid <= 0)
    3019             :                 return LOG_ERR;
    3020       32528 :         cs->bid = temp_dup(bid);
    3021       32528 :         cs->ucnt = 0;
    3022       32528 :         cs->uibid = e_bat(TYPE_oid);
    3023       32528 :         cs->uvbid = e_bat(type);
    3024       32528 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    3025             :                 return LOG_ERR;
    3026             :         return LOG_OK;
    3027             : }
    3028             : 
    3029             : static int
    3030       68770 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
    3031             : {
    3032       68770 :         int res = LOG_OK;
    3033       68770 :         gdk_return ok;
    3034       68770 :         BAT *b = temp_descriptor(bat->cs.bid);
    3035             : 
    3036       68770 :         if (b == NULL)
    3037             :                 return LOG_ERR;
    3038             : 
    3039       68770 :         if (!bat->cs.uibid)
    3040       68762 :                 bat->cs.uibid = e_bat(TYPE_oid);
    3041       68770 :         if (!bat->cs.uvbid)
    3042       68762 :                 bat->cs.uvbid = e_bat(b->ttype);
    3043       68770 :         if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
    3044           0 :                 res = LOG_ERR;
    3045       68770 :         if (GDKinmemory(0)) {
    3046         178 :                 bat_destroy(b);
    3047         178 :                 return res;
    3048             :         }
    3049             : 
    3050       68592 :         bat_set_access(b, BAT_READ);
    3051       68592 :         sqlstore *store = tr->store;
    3052       68592 :         ok = log_bat_persists(store->logger, b, id);
    3053       68592 :         bat_destroy(b);
    3054       68592 :         if(res != LOG_OK)
    3055             :                 return res;
    3056       68592 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3057             : }
    3058             : 
    3059             : static int
    3060           0 : new_persistent_delta( sql_delta *bat)
    3061             : {
    3062           0 :         bat->cs.ucnt = 0;
    3063           0 :         return LOG_OK;
    3064             : }
    3065             : 
    3066             : static void
    3067      132279 : create_delta( sql_delta *d, BAT *b)
    3068             : {
    3069      132279 :         bat_set_access(b, BAT_READ);
    3070      132279 :         d->cs.bid = temp_create(b);
    3071      132279 :         d->cs.uibid = d->cs.uvbid = 0;
    3072      132279 :         d->cs.ucnt = 0;
    3073      132279 : }
    3074             : 
    3075             : static bat
    3076        7312 : copyBat (bat i, int type, oid seq)
    3077             : {
    3078        7312 :         BAT *b, *tb;
    3079        7312 :         bat res;
    3080             : 
    3081        7312 :         if (!i)
    3082             :                 return i;
    3083        7312 :         tb = quick_descriptor(i);
    3084        7312 :         if (tb == NULL)
    3085             :                 return 0;
    3086        7312 :         b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
    3087        7312 :         if (b == NULL)
    3088             :                 return 0;
    3089             : 
    3090        7312 :         bat_set_access(b, BAT_READ);
    3091             : 
    3092        7312 :         res = temp_create(b);
    3093        7312 :         bat_destroy(b);
    3094        7312 :         return res;
    3095             : }
    3096             : 
    3097             : static int
    3098      157217 : create_col(sql_trans *tr, sql_column *c)
    3099             : {
    3100      157217 :         int ok = LOG_OK, new = 0;
    3101      157217 :         int type = c->type.type->localtype;
    3102      157217 :         sql_delta *bat = ATOMIC_PTR_GET(&c->data);
    3103             : 
    3104      157217 :         if (!bat) {
    3105      157217 :                 new = 1;
    3106      157217 :                 bat = ZNEW(sql_delta);
    3107      157217 :                 if (!bat)
    3108             :                         return LOG_ERR;
    3109      157217 :                 ATOMIC_PTR_SET(&c->data, bat);
    3110      157217 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3111             :         }
    3112             : 
    3113      157217 :         if (new)
    3114      157217 :                 bat->cs.ts = tr->tid;
    3115             : 
    3116      157217 :         if (!isNew(c)&& !isTempTable(c->t)){
    3117       24916 :                 bat->cs.ts = tr->ts;
    3118       24916 :                 ok = load_cs(tr, &bat->cs, type, c->base.id);
    3119       24916 :                 if (ok == LOG_OK && c->storage_type) {
    3120           4 :                         if (strcmp(c->storage_type, "DICT") == 0) {
    3121           2 :                                 sqlstore *store = tr->store;
    3122           2 :                                 int bid = log_find_bat(store->logger, -c->base.id);
    3123           2 :                                 if (bid <= 0)
    3124             :                                         return LOG_ERR;
    3125           2 :                                 bat->cs.ebid = temp_dup(bid);
    3126           2 :                                 bat->cs.st = ST_DICT;
    3127           2 :                         } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
    3128           2 :                                 bat->cs.st = ST_FOR;
    3129             :                         }
    3130             :                 }
    3131       24916 :                 return ok;
    3132      132301 :         } else if (bat && bat->cs.bid) {
    3133           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
    3134             :         } else {
    3135      132301 :                 sql_column *fc = NULL;
    3136      132301 :                 size_t cnt = 0;
    3137             : 
    3138             :                 /* alter ? */
    3139      132301 :                 if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
    3140       80317 :                         storage *s = tab_timestamp_storage(tr, fc->t);
    3141       80317 :                         if (s == NULL)
    3142             :                                 return LOG_ERR;
    3143       80317 :                         cnt = segs_end(s->segs, tr, c->t);
    3144             :                 }
    3145      132301 :                 if (cnt && fc != c) {
    3146          22 :                         sql_delta *d = ATOMIC_PTR_GET(&fc->data);
    3147             : 
    3148          22 :                         if (d->cs.bid) {
    3149          22 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3150          22 :                                 if(bat->cs.bid == BID_NIL)
    3151          22 :                                         ok = LOG_ERR;
    3152             :                         }
    3153          22 :                         if (d->cs.uibid) {
    3154          10 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3155          10 :                                 if (bat->cs.uibid == BID_NIL)
    3156          22 :                                         ok = LOG_ERR;
    3157             :                         }
    3158          22 :                         if (d->cs.uvbid) {
    3159          10 :                                 bat->cs.uvbid = e_bat(type);
    3160          10 :                                 if(bat->cs.uvbid == BID_NIL)
    3161           0 :                                         ok = LOG_ERR;
    3162             :                         }
    3163             :                 } else {
    3164      132279 :                         BAT *b = bat_new(type, c->t->sz, PERSISTENT);
    3165      132279 :                         if (!b) {
    3166             :                                 ok = LOG_ERR;
    3167             :                         } else {
    3168      132279 :                                 create_delta(ATOMIC_PTR_GET(&c->data), b);
    3169      132279 :                                 bat_destroy(b);
    3170             :                         }
    3171             : 
    3172      132279 :                         if (!new) {
    3173           0 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3174           0 :                                 if (bat->cs.uibid == BID_NIL)
    3175           0 :                                         ok = LOG_ERR;
    3176           0 :                                 bat->cs.uvbid = e_bat(type);
    3177           0 :                                 if(bat->cs.uvbid == BID_NIL)
    3178           0 :                                         ok = LOG_ERR;
    3179             :                         }
    3180             :                 }
    3181      132301 :                 bat->cs.ucnt = 0;
    3182             : 
    3183      132301 :                 if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
    3184          91 :                         trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
    3185             :         }
    3186             :         return ok;
    3187             : }
    3188             : 
    3189             : static int
    3190       62383 : log_create_col_(sql_trans *tr, sql_column *c)
    3191             : {
    3192       62383 :         assert(!isTempTable(c->t));
    3193       62383 :         return log_create_delta(tr,  ATOMIC_PTR_GET(&c->data), c->base.id);
    3194             : }
    3195             : 
    3196             : static int
    3197          87 : log_create_col(sql_trans *tr, sql_change *change)
    3198             : {
    3199          87 :         return log_create_col_(tr, (sql_column*)change->obj);
    3200             : }
    3201             : 
    3202             : static int
    3203      119688 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
    3204             : {
    3205      119688 :         (void) t; // TODO transaction_layer_revamp: remove if unnecessary
    3206      119688 :         (void)oldest;
    3207      119688 :         assert(delta->cs.ts == tr->tid);
    3208      119688 :         delta->cs.ts = commit_ts;
    3209             : 
    3210      119688 :         assert(delta->next == NULL);
    3211      119688 :         if (!delta->cs.merged)
    3212      119687 :                 delta->nr_updates += merge_delta(delta);
    3213      119688 :         if (!tr->parent)
    3214      119684 :                 base->new = 0;
    3215      119688 :         return LOG_OK;
    3216             : }
    3217             : 
    3218             : static int
    3219          91 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3220             : {
    3221          91 :         sql_column *c = (sql_column*)change->obj;
    3222          91 :         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3223          91 :         if (!tr->parent)
    3224          90 :                 c->base.new = 0;
    3225          91 :         return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
    3226             : }
    3227             : 
    3228             : /* will be called for new idx's and when new index columns are created */
    3229             : static int
    3230        9694 : create_idx(sql_trans *tr, sql_idx *ni)
    3231             : {
    3232        9694 :         int ok = LOG_OK, new = 0;
    3233        9694 :         sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
    3234        9694 :         int type = TYPE_lng;
    3235             : 
    3236        9694 :         if (oid_index(ni->type))
    3237         954 :                 type = TYPE_oid;
    3238             : 
    3239        9694 :         if (!bat) {
    3240        9694 :                 new = 1;
    3241        9694 :                 bat = ZNEW(sql_delta);
    3242        9694 :                 if (!bat)
    3243             :                         return LOG_ERR;
    3244        9694 :                 ATOMIC_PTR_INIT(&ni->data, bat);
    3245        9694 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3246             :         }
    3247             : 
    3248        9694 :         if (new)
    3249        9694 :                 bat->cs.ts = tr->tid;
    3250             : 
    3251        9694 :         if (!isNew(ni) && !isTempTable(ni->t)){
    3252        2404 :                 bat->cs.ts = 1;
    3253        2404 :                 return load_cs(tr, &bat->cs, type, ni->base.id);
    3254        7290 :         } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
    3255           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
    3256             :         } else {
    3257        7290 :                 sql_column *c = ol_first_node(ni->t->columns)->data;
    3258        7290 :                 sql_delta *d = col_timestamp_delta(tr, c);
    3259             : 
    3260        7290 :                 if (d) {
    3261             :                         /* Here we also handle indices created through alter stmts */
    3262             :                         /* These need to be created aligned to the existing data */
    3263        7290 :                         if (d->cs.bid) {
    3264        7290 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3265        7290 :                                 if(bat->cs.bid == BID_NIL)
    3266        7290 :                                         ok = LOG_ERR;
    3267             :                         }
    3268             :                 } else {
    3269             :                         return LOG_ERR;
    3270             :                 }
    3271             : 
    3272        7290 :                 bat->cs.ucnt = 0;
    3273             : 
    3274        7290 :                 if (!new) {
    3275           0 :                         bat->cs.uibid = e_bat(TYPE_oid);
    3276           0 :                         if (bat->cs.uibid == BID_NIL)
    3277           0 :                                 ok = LOG_ERR;
    3278           0 :                         bat->cs.uvbid = e_bat(type);
    3279           0 :                         if(bat->cs.uvbid == BID_NIL)
    3280           0 :                                 ok = LOG_ERR;
    3281             :                 }
    3282        7290 :                 bat->cs.ucnt = 0;
    3283        7290 :                 if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
    3284         631 :                         trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
    3285             :         }
    3286             :         return ok;
    3287             : }
    3288             : 
    3289             : static int
    3290        6387 : log_create_idx_(sql_trans *tr, sql_idx *i)
    3291             : {
    3292        6387 :         assert(!isTempTable(i->t));
    3293        6387 :         return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3294             : }
    3295             : 
    3296             : static int
    3297         617 : log_create_idx(sql_trans *tr, sql_change *change)
    3298             : {
    3299         617 :         return log_create_idx_(tr, (sql_idx*)change->obj);
    3300             : }
    3301             : 
    3302             : static int
    3303         631 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3304             : {
    3305         631 :         sql_idx *i = (sql_idx*)change->obj;
    3306         631 :         sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3307         631 :         if (!tr->parent)
    3308         631 :                 i->base.new = 0;
    3309         631 :         return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
    3310             :         return LOG_OK;
    3311             : }
    3312             : 
    3313             : static int
    3314        5208 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
    3315             : {
    3316        5208 :         int ok = load_cs(tr, &s->cs, TYPE_msk, id);
    3317        5208 :         BAT *b = NULL, *ib = NULL;
    3318             : 
    3319        5208 :         if (ok != LOG_OK)
    3320             :                 return ok;
    3321        5208 :         if (!(b = temp_descriptor(s->cs.bid)))
    3322             :                 return LOG_ERR;
    3323        5208 :         ib = b;
    3324             : 
    3325        5208 :         if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
    3326           0 :                 bat_destroy(ib);
    3327           0 :                 return LOG_ERR;
    3328             :         }
    3329             : 
    3330        5208 :         if (BATcount(b)) {
    3331         421 :                 if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
    3332           0 :                         bat_destroy(ib);
    3333           0 :                         return LOG_ERR;
    3334             :                 }
    3335         645 :                 if (BATtdense(b)) {
    3336         224 :                         size_t start = b->tseqbase;
    3337         224 :                         size_t cnt = BATcount(b);
    3338         224 :                         ok = delete_range(tr, t, s, start, cnt);
    3339             :                 } else {
    3340         197 :                         assert(b->tsorted);
    3341         197 :                         BUN icnt = BATcount(b);
    3342         197 :                         BATiter bi = bat_iterator(b);
    3343         197 :                         size_t lcnt = 1;
    3344         197 :                         oid n;
    3345         197 :                         segment *seg = s->segs->h;
    3346         197 :                         if (complex_cand(b)) {
    3347           0 :                                 oid o = * (oid *) Tpos(&bi, 0);
    3348           0 :                                 n = o + 1;
    3349           0 :                                 for (BUN i = 1; i < icnt; i++) {
    3350           0 :                                         o = * (oid *) Tpos(&bi, i);
    3351           0 :                                         if (o == n) {
    3352           0 :                                                 lcnt++;
    3353           0 :                                                 n++;
    3354             :                                         } else {
    3355           0 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3356             :                                                         break;
    3357             :                                                 lcnt = 0;
    3358             :                                         }
    3359           0 :                                         if (!lcnt) {
    3360           0 :                                                 n = o + 1;
    3361           0 :                                                 lcnt = 1;
    3362             :                                         }
    3363             :                                 }
    3364             :                         } else {
    3365         197 :                                 oid *o = bi.base;
    3366         197 :                                 n = o[0]+1;
    3367      294828 :                                 for (size_t i=1; i<icnt; i++) {
    3368      294631 :                                         if (o[i] == n) {
    3369      291882 :                                                 lcnt++;
    3370      291882 :                                                 n++;
    3371             :                                         } else {
    3372        2749 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3373             :                                                         break;
    3374             :                                                 lcnt = 0;
    3375             :                                         }
    3376      294631 :                                         if (!lcnt) {
    3377        2749 :                                                 n = o[i]+1;
    3378        2749 :                                                 lcnt = 1;
    3379             :                                         }
    3380             :                                 }
    3381             :                         }
    3382         197 :                         if (lcnt && ok == LOG_OK)
    3383         197 :                                 ok = delete_range(tr, t, s, n-lcnt, lcnt);
    3384         197 :                         bat_iterator_end(&bi);
    3385             :                 }
    3386         421 :                 if (ok == LOG_OK)
    3387        7010 :                         for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
    3388        6589 :                                 if (seg->ts == tr->tid)
    3389        3426 :                                         seg->ts = 1;
    3390             :         } else {
    3391        4787 :                 if (ok == LOG_OK) {
    3392        4787 :                         BAT *bb = quick_descriptor(s->cs.bid);
    3393             : 
    3394        4787 :                         if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
    3395             :                                 ok = LOG_ERR;
    3396             :                         } else {
    3397        4787 :                                 segment *seg = s->segs->h;
    3398        4787 :                                 if (seg->ts == tr->tid)
    3399        4787 :                                         seg->ts = 1;
    3400             :                         }
    3401             :                 }
    3402             :         }
    3403        5208 :         if (b != ib)
    3404        5208 :                 bat_destroy(b);
    3405        5208 :         bat_destroy(ib);
    3406             : 
    3407        5208 :         return ok;
    3408             : }
    3409             : 
    3410             : static int
    3411       26395 : create_del(sql_trans *tr, sql_table *t)
    3412             : {
    3413       26395 :         int ok = LOG_OK, new = 0;
    3414       26395 :         BAT *b;
    3415       26395 :         storage *bat = ATOMIC_PTR_GET(&t->data);
    3416             : 
    3417       26395 :         if (!bat) {
    3418       26395 :                 new = 1;
    3419       26395 :                 bat = ZNEW(storage);
    3420       26395 :                 if(!bat)
    3421             :                         return LOG_ERR;
    3422       26395 :                 ATOMIC_PTR_INIT(&t->data, bat);
    3423       26395 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3424       26395 :                 bat->cs.ts = tr->tid;
    3425             :         }
    3426             : 
    3427       26395 :         if (!isNew(t) && !isTempTable(t)) {
    3428        5208 :                 bat->cs.ts = tr->ts;
    3429        5208 :                 return load_storage(tr, t, bat, t->base.id);
    3430       21187 :         } else if (bat->cs.bid) {
    3431             :                 return ok;
    3432             :         } else {
    3433       21187 :                 assert(!bat->segs);
    3434       21187 :                 if (!(bat->segs = new_segments(tr, 0)))
    3435             :                         return LOG_ERR;
    3436             : 
    3437       21187 :                 b = bat_new(TYPE_msk, t->sz, PERSISTENT);
    3438       21187 :                 if(b != NULL) {
    3439       21187 :                         bat_set_access(b, BAT_READ);
    3440       21187 :                         bat->cs.bid = temp_create(b);
    3441       21187 :                         bat_destroy(b);
    3442             :                 } else {
    3443             :                         return LOG_ERR;
    3444             :                 }
    3445       21187 :                 if (new)
    3446       27983 :                         trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
    3447             :         }
    3448             :         return ok;
    3449             : }
    3450             : 
    3451             : static int
    3452      198417 : log_segment(sql_trans *tr, segment *s, sqlid id)
    3453             : {
    3454      198417 :         sqlstore *store = tr->store;
    3455      198417 :         msk m = s->deleted;
    3456      198417 :         return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
    3457             : }
    3458             : 
    3459             : static int
    3460       97318 : log_segments(sql_trans *tr, segments *segs, sqlid id)
    3461             : {
    3462             :         /* log segments */
    3463       97318 :         lock_table(tr->store, id);
    3464      490866 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    3465      393548 :                 unlock_table(tr->store, id);
    3466      393548 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    3467      150725 :                         if (log_segment(tr, seg, id) != LOG_OK) {
    3468             :                                 return LOG_ERR;
    3469             :                         }
    3470             :                 }
    3471      393548 :                 lock_table(tr->store, id);
    3472             :         }
    3473       97318 :         unlock_table(tr->store, id);
    3474       97318 :         return LOG_OK;
    3475             : }
    3476             : 
    3477             : static int
    3478       12627 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
    3479             : {
    3480       12627 :         BAT *b;
    3481       12627 :         int ok = LOG_OK;
    3482             : 
    3483       12627 :         if (GDKinmemory(0))
    3484             :                 return LOG_OK;
    3485             : 
    3486       12594 :         b = temp_descriptor(bat->cs.bid);
    3487       12594 :         if (b == NULL)
    3488             :                 return LOG_ERR;
    3489             : 
    3490       12594 :         sqlstore *store = tr->store;
    3491       12594 :         bat_set_access(b, BAT_READ);
    3492       12594 :         if (ok == LOG_OK)
    3493       12594 :                 ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
    3494       12594 :         if (ok == LOG_OK)
    3495       12594 :                 ok = log_segments(tr, bat->segs, t->base.id);
    3496       12594 :         bat_destroy(b);
    3497       12594 :         return ok;
    3498             : }
    3499             : 
    3500             : static int
    3501       12641 : log_create_del(sql_trans *tr, sql_change *change)
    3502             : {
    3503       12641 :         int ok = LOG_OK;
    3504       12641 :         sql_table *t = (sql_table*)change->obj;
    3505             : 
    3506       12641 :         if (t->base.deleted)
    3507             :                 return ok;
    3508       12627 :         assert(!isTempTable(t));
    3509       12627 :         ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
    3510       12627 :         if (ok == LOG_OK) {
    3511       74923 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3512       62296 :                         sql_column *c = n->data;
    3513             : 
    3514       62296 :                         ok = log_create_col_(tr, c);
    3515             :                 }
    3516       12627 :                 if (t->idxs) {
    3517       18409 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3518        5782 :                                 sql_idx *i = n->data;
    3519             : 
    3520        5782 :                                 if (ATOMIC_PTR_GET(&i->data))
    3521        5770 :                                         ok = log_create_idx_(tr, i);
    3522             :                         }
    3523             :                 }
    3524             :         }
    3525             :         return ok;
    3526             : }
    3527             : 
    3528             : static int
    3529       21189 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3530             : {
    3531       21189 :         int ok = LOG_OK;
    3532       21189 :         sql_table *t = (sql_table*)change->obj;
    3533       21189 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    3534             : 
    3535       21189 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    3536         120 :                 assert(isTempTable(t));
    3537         120 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    3538         120 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    3539         120 :                 return ok;
    3540             :         }
    3541             : 
    3542       21069 :         if (!commit_ts) /* rollback handled by ? */
    3543             :                 return ok;
    3544       19214 :         ok = segments2cs(tr, dbat->segs, &dbat->cs);
    3545       19214 :         assert(ok == LOG_OK);
    3546       19214 :         if (ok != LOG_OK)
    3547             :                 return ok;
    3548       19214 :         merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
    3549       19214 :         assert(dbat->cs.ts == tr->tid);
    3550       19214 :         dbat->cs.ts = commit_ts;
    3551       19214 :         if (ok == LOG_OK) {
    3552      132397 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3553      113183 :                         sql_column *c = n->data;
    3554      113183 :                         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3555             : 
    3556      113183 :                         ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
    3557             :                 }
    3558       19214 :                 if (t->idxs) {
    3559       25009 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3560        5795 :                                 sql_idx *i = n->data;
    3561        5795 :                                 sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3562             : 
    3563        5795 :                                 if (delta)
    3564        5783 :                                         ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
    3565             :                         }
    3566             :                 }
    3567       19214 :                 if (!tr->parent)
    3568       19212 :                         t->base.new = 0;
    3569             :         }
    3570       19214 :         if (!tr->parent)
    3571       19212 :                 t->base.new = 0;
    3572             :         return ok;
    3573             : }
    3574             : 
    3575             : static int
    3576       21268 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
    3577             : {
    3578       21268 :         gdk_return ok = GDK_SUCCEED;
    3579             : 
    3580       21268 :         sqlstore *store = tr->store;
    3581       21268 :         if (!GDKinmemory(0) && b && b->cs.bid)
    3582       18884 :                 ok = log_bat_transient(store->logger, id);
    3583       21268 :         if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
    3584          25 :                 ok = log_bat_transient(store->logger, -id);
    3585       21268 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3586             : }
    3587             : 
    3588             : static int
    3589      176581 : destroy_col(sqlstore *store, sql_column *c)
    3590             : {
    3591      176581 :         (void)store;
    3592      176581 :         if (ATOMIC_PTR_GET(&c->data))
    3593      176581 :                 destroy_delta(ATOMIC_PTR_GET(&c->data), true);
    3594      176581 :         ATOMIC_PTR_SET(&c->data, NULL);
    3595      176581 :         return LOG_OK;
    3596             : }
    3597             : 
    3598             : static int
    3599       19384 : log_destroy_col_(sql_trans *tr, sql_column *c)
    3600             : {
    3601       19384 :         int ok = LOG_OK;
    3602       19384 :         assert(!isTempTable(c->t));
    3603       19384 :         if (!tr->parent) /* don't write save point commits */
    3604       19384 :                 ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
    3605       19384 :         return ok;
    3606             : }
    3607             : 
    3608             : static int
    3609       19384 : log_destroy_col(sql_trans *tr, sql_change *change)
    3610             : {
    3611       19384 :         sql_column *c = (sql_column*)change->obj;
    3612       19384 :         int res = log_destroy_col_(tr, c);
    3613       19384 :         change->obj = NULL;
    3614       19384 :         column_destroy(tr->store, c);
    3615       19384 :         return res;
    3616             : }
    3617             : 
    3618             : static int
    3619       11493 : destroy_idx(sqlstore *store, sql_idx *i)
    3620             : {
    3621       11493 :         (void)store;
    3622       11493 :         if (ATOMIC_PTR_GET(&i->data))
    3623       11493 :                 destroy_delta(ATOMIC_PTR_GET(&i->data), true);
    3624       11493 :         ATOMIC_PTR_SET(&i->data, NULL);
    3625       11493 :         return LOG_OK;
    3626             : }
    3627             : 
    3628             : static int
    3629        1960 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
    3630             : {
    3631        1960 :         int ok = LOG_OK;
    3632        1960 :         assert(!isTempTable(i->t));
    3633        1960 :         if (ATOMIC_PTR_GET(&i->data)) {
    3634        1884 :                 if (!tr->parent) /* don't write save point commits */
    3635        1884 :                         ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3636             :         }
    3637        1960 :         return ok;
    3638             : }
    3639             : 
    3640             : static int
    3641        1960 : log_destroy_idx(sql_trans *tr, sql_change *change)
    3642             : {
    3643        1960 :         sql_idx *i = (sql_idx*)change->obj;
    3644        1960 :         int res = log_destroy_idx_(tr, i);
    3645        1960 :         change->obj = NULL;
    3646        1960 :         idx_destroy(tr->store, i);
    3647        1960 :         return res;
    3648             : }
    3649             : 
    3650             : static int
    3651       27960 : destroy_del(sqlstore *store, sql_table *t)
    3652             : {
    3653       27960 :         (void)store;
    3654       27960 :         if (ATOMIC_PTR_GET(&t->data))
    3655       27956 :                 destroy_storage(ATOMIC_PTR_GET(&t->data));
    3656       27960 :         ATOMIC_PTR_SET(&t->data, NULL);
    3657       27960 :         return LOG_OK;
    3658             : }
    3659             : 
    3660             : static int
    3661        3285 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
    3662             : {
    3663        3285 :         gdk_return ok = GDK_SUCCEED;
    3664             : 
    3665        3285 :         sqlstore *store = tr->store;
    3666        3285 :         if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
    3667        3285 :             bat && bat->cs.bid)
    3668        3285 :                 ok = log_bat_transient(store->logger, id);
    3669        3285 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3670             : }
    3671             : 
    3672             : static int
    3673        3285 : log_destroy_del(sql_trans *tr, sql_change *change)
    3674             : {
    3675        3285 :         int ok = LOG_OK;
    3676        3285 :         sql_table *t = (sql_table*)change->obj;
    3677             : 
    3678        3285 :         assert(!isTempTable(t));
    3679        3285 :         ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
    3680        3285 :         return ok;
    3681             : }
    3682             : 
    3683             : static int
    3684       24717 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3685             : {
    3686       24717 :         (void)tr;
    3687       24717 :         (void)change;
    3688       24717 :         (void)commit_ts;
    3689       24717 :         (void)oldest;
    3690       24717 :         if (commit_ts)
    3691       24694 :                 change->handled = true;
    3692       24717 :         return 0;
    3693             : }
    3694             : 
    3695             : static int
    3696        3356 : drop_del(sql_trans *tr, sql_table *t)
    3697             : {
    3698        3356 :         int ok = LOG_OK;
    3699             : 
    3700        3356 :         if (!isNew(t)) {
    3701        3356 :                 storage *bat = ATOMIC_PTR_GET(&t->data);
    3702        3421 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, isTempTable(t) ? NULL : &log_destroy_del);
    3703             :         }
    3704        3356 :         return ok;
    3705             : }
    3706             : 
    3707             : static int
    3708       19399 : drop_col(sql_trans *tr, sql_column *c)
    3709             : {
    3710       19399 :         assert(!isNew(c));
    3711       19399 :         sql_delta *d = ATOMIC_PTR_GET(&c->data);
    3712       19399 :         trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, isTempTable(c->t) ? NULL : &log_destroy_col);
    3713       19399 :         return LOG_OK;
    3714             : }
    3715             : 
    3716             : static int
    3717        1962 : drop_idx(sql_trans *tr, sql_idx *i)
    3718             : {
    3719        1962 :         assert(!isNew(i));
    3720        1962 :         sql_delta *d = ATOMIC_PTR_GET(&i->data);
    3721        1962 :         trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, isTempTable(i->t) ? NULL : &log_destroy_idx);
    3722        1962 :         return LOG_OK;
    3723             : }
    3724             : 
    3725             : 
    3726             : static BUN
    3727      129478 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
    3728             : {
    3729      129478 :         BAT *b;
    3730      129478 :         BUN sz = 0;
    3731             : 
    3732      129478 :         (void)tr;
    3733      129478 :         assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
    3734      129478 :         if (cs->bid && renew) {
    3735      129496 :                 b = quick_descriptor(cs->bid);
    3736      129452 :                 if (b) {
    3737      129452 :                         sz += BATcount(b);
    3738      129452 :                         if (cs->st == ST_DICT) {
    3739           2 :                                 bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
    3740           2 :                                 BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
    3741             : 
    3742           2 :                                 if (nebid == BID_NIL || !n) {
    3743           0 :                                         temp_destroy(nebid);
    3744           0 :                                         bat_destroy(n);
    3745           0 :                                         return BUN_NONE;
    3746             :                                 }
    3747           2 :                                 temp_destroy(cs->ebid);
    3748           2 :                                 cs->ebid = nebid;
    3749           2 :                                 if (!temp)
    3750           2 :                                         bat_set_access(n, BAT_READ);
    3751           2 :                                 temp_destroy(cs->bid);
    3752           2 :                                 cs->bid = temp_create(n); /* create empty copy */
    3753           2 :                                 bat_destroy(n);
    3754             :                         } else {
    3755      129450 :                                 bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
    3756             : 
    3757      129366 :                                 if (nbid == BID_NIL)
    3758             :                                         return BUN_NONE;
    3759      129366 :                                 temp_destroy(cs->bid);
    3760      129451 :                                 cs->bid = nbid;
    3761             :                         }
    3762             :                 } else {
    3763             :                         return BUN_NONE;
    3764             :                 }
    3765             :         }
    3766      129435 :         if (cs->uibid) {
    3767      129290 :                 temp_destroy(cs->uibid);
    3768      129371 :                 cs->uibid = 0;
    3769             :         }
    3770      129516 :         if (cs->uvbid) {
    3771      129372 :                 temp_destroy(cs->uvbid);
    3772      129375 :                 cs->uvbid = 0;
    3773             :         }
    3774      129519 :         cs->cleared = true;
    3775      129519 :         cs->ucnt = 0;
    3776      129519 :         return sz;
    3777             : }
    3778             : 
    3779             : static BUN
    3780      129328 : clear_col(sql_trans *tr, sql_column *c, bool renew)
    3781             : {
    3782      129328 :         bool update_conflict = false;
    3783      129328 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    3784             : 
    3785      129328 :         if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
    3786           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3787      129369 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    3788      129369 :         if (odelta != delta)
    3789      129374 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    3790      129358 :         if (delta)
    3791      129358 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
    3792             :         return 0;
    3793             : }
    3794             : 
    3795             : static BUN
    3796          21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
    3797             : {
    3798          21 :         bool update_conflict = false;
    3799          21 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    3800             : 
    3801          21 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    3802          15 :                 return 0;
    3803           6 :         if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
    3804           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3805           6 :         assert(i->t->persistence != SQL_DECLARED_TABLE);
    3806           6 :         if (odelta != delta)
    3807           6 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    3808           6 :         if (delta)
    3809           6 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
    3810             :         return 0;
    3811             : }
    3812             : 
    3813             : static int
    3814         142 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
    3815             : {
    3816         142 :         if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
    3817             :                 return LOG_ERR;
    3818         142 :         if (s->segs)
    3819         142 :                 destroy_segments(s->segs);
    3820         142 :         if (!(s->segs = new_segments(tr, 0)))
    3821             :                 return LOG_ERR;
    3822             :         return LOG_OK;
    3823             : }
    3824             : 
    3825             : 
    3826             : /*
    3827             :  * Clear the table, in general this means replacing the storage,
    3828             :  * but in case of earlier deletes (or inserts by this transaction), we only mark
    3829             :  * all segments as deleted.
    3830             :  * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
    3831             :  */
    3832             : static BUN
    3833       41827 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
    3834             : {
    3835       41827 :         int clear = !in_transaction, ok = LOG_OK;
    3836       41827 :         bool conflict = false;
    3837       41827 :         storage *bat;
    3838             : 
    3839       41878 :         if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
    3840       15820 :                 return conflict?BUN_NONE-1:BUN_NONE;
    3841             : 
    3842       26010 :         if (!clear) {
    3843          51 :                 lock_table(tr->store, t->base.id);
    3844          51 :                 ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
    3845          51 :                 unlock_table(tr->store, t->base.id);
    3846             :         }
    3847       26010 :         assert(t->persistence != SQL_DECLARED_TABLE);
    3848       26010 :         if (!in_transaction)
    3849       25963 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    3850       26010 :         if (ok == LOG_ERR)
    3851             :                 return BUN_NONE;
    3852       26010 :         if (ok == LOG_CONFLICT)
    3853           0 :                 return BUN_NONE - 1;
    3854             :         return LOG_OK;
    3855             : }
    3856             : 
    3857             : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
    3858             : static BUN
    3859       41800 : clear_table(sql_trans *tr, sql_table *t)
    3860             : {
    3861       41800 :         node *n = ol_first_node(t->columns);
    3862       41800 :         sql_column *c = n->data;
    3863       41800 :         storage *d = tab_timestamp_storage(tr, t);
    3864       41826 :         int in_transaction, clear;
    3865       41826 :         BUN sz, clear_ok;
    3866             : 
    3867       41826 :         if (!d)
    3868             :                 return BUN_NONE;
    3869       41826 :         in_transaction = segments_in_transaction(tr, t);
    3870       41822 :         clear = !in_transaction;
    3871       41822 :         sz = count_col(tr, c, CNT_ACTIVE);
    3872       41831 :         if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
    3873             :                 return clear_ok;
    3874             : 
    3875       26010 :         if (in_transaction)
    3876             :                 return sz;
    3877             : 
    3878      155325 :         for (; n; n = n->next) {
    3879      129364 :                 c = n->data;
    3880             : 
    3881      129364 :                 if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
    3882           0 :                         return clear_ok;
    3883             :         }
    3884       25961 :         if (t->idxs) {
    3885       25982 :                 for (n = ol_first_node(t->idxs); n; n = n->next) {
    3886          21 :                         sql_idx *ci = n->data;
    3887             : 
    3888          21 :                         if (isTable(ci->t) && idx_has_column(ci->type) &&
    3889          21 :                                 (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
    3890           0 :                                 return clear_ok;
    3891             :                 }
    3892             :         }
    3893       25961 :         if (clear)
    3894       25961 :                 d->segs->nr_reused = 0;
    3895       25961 :         return sz;
    3896             : }
    3897             : 
    3898             : static int
    3899      158780 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
    3900             : {
    3901      158780 :         sqlstore *store = tr->store;
    3902      158780 :         gdk_return ok = GDK_SUCCEED;
    3903             : 
    3904      158780 :         (void) t;
    3905      158780 :         (void) segs;
    3906      158780 :         if (GDKinmemory(0))
    3907             :                 return LOG_OK;
    3908             : 
    3909      158773 :         if (cs->cleared) {
    3910      155388 :                 assert(cs->ucnt == 0);
    3911      155388 :                 BAT *ins = temp_descriptor(cs->bid);
    3912      155388 :                 if (!ins)
    3913             :                         return LOG_ERR;
    3914      155388 :                 assert(!isEbat(ins));
    3915      155388 :                 bat_set_access(ins, BAT_READ);
    3916      155388 :                 ok = log_bat_persists(store->logger, ins, id);
    3917      155388 :                 bat_destroy(ins);
    3918      155388 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    3919          56 :                         BAT *ins = temp_descriptor(cs->ebid);
    3920          56 :                         if (!ins)
    3921             :                                 return LOG_ERR;
    3922          56 :                         assert(!isEbat(ins));
    3923          56 :                         bat_set_access(ins, BAT_READ);
    3924          56 :                         ok = log_bat_persists(store->logger, ins, -id);
    3925          56 :                         bat_destroy(ins);
    3926             :                 }
    3927      155388 :                 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3928             :         }
    3929             : 
    3930        3385 :         assert(!isTempTable(t));
    3931             : 
    3932        3385 :         if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
    3933        2800 :                 BAT *ui = temp_descriptor(cs->uibid);
    3934        2800 :                 BAT *uv = temp_descriptor(cs->uvbid);
    3935             :                 /* any updates */
    3936        2800 :                 if (ui == NULL || uv == NULL) {
    3937             :                         ok = GDK_FAIL;
    3938        2800 :                 } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
    3939        2800 :                         ok = log_delta(store->logger, ui, uv, id);
    3940        2800 :                 bat_destroy(ui);
    3941        2800 :                 bat_destroy(uv);
    3942             :         }
    3943        2800 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3944             : }
    3945             : 
    3946             : static inline int
    3947       58769 : tr_log_table_start(sql_trans *tr, sql_table *t) {
    3948       58769 :         sqlstore *store = tr->store;
    3949       58769 :         return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3950             : }
    3951             : 
    3952             : static inline int
    3953       58769 : tr_log_table_end(sql_trans *tr, sql_table *t) {
    3954       58769 :         sqlstore *store = tr->store;
    3955       58769 :         return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3956             : }
    3957             : 
    3958             : static int
    3959       58769 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
    3960             : {
    3961       58769 :         sqlstore *store = tr->store;
    3962       58769 :         gdk_return ok = GDK_SUCCEED;
    3963             : 
    3964       58769 :         size_t end = segs_end(segs, tr, t);
    3965             : 
    3966       58769 :         if (tr_log_table_start(tr, t) != LOG_OK)
    3967             :                 return LOG_ERR;
    3968             : 
    3969       58769 :         size_t nr_appends = 0;
    3970             : 
    3971       58769 :         lock_table(tr->store, t->base.id);
    3972      413962 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    3973      355193 :                 unlock_table(tr->store, t->base.id);
    3974             : 
    3975      355193 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    3976      120058 :                         if (!seg->deleted) {
    3977       47692 :                                 if (log_segment(tr, seg, t->base.id) != LOG_OK)
    3978             :                                         return LOG_ERR;
    3979             : 
    3980       47692 :                                 nr_appends += (seg->end - seg->start);
    3981             :                         }
    3982             :                 }
    3983      355193 :                 lock_table(tr->store, t->base.id);
    3984             :         }
    3985       58769 :         unlock_table(tr->store, t->base.id);
    3986             : 
    3987      408860 :         for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
    3988      350091 :                 sql_column *c = n->data;
    3989      350091 :                 column_storage *cs = ATOMIC_PTR_GET(&c->data);
    3990             : 
    3991      350091 :                 if (cs->cleared) {
    3992           3 :                         ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    3993           3 :                         continue;
    3994             :                 }
    3995             : 
    3996      350088 :                 lock_table(tr->store, t->base.id);
    3997      350088 :                 if (!cs->cleared) {
    3998     2434761 :                         for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    3999     2084673 :                                 unlock_table(tr->store, t->base.id);
    4000     2084673 :                                 if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4001             :                                         /* append col*/
    4002      269772 :                                         BAT *ins = temp_descriptor(cs->bid);
    4003      269772 :                                         if (ins == NULL)
    4004             :                                                 return LOG_ERR;
    4005      269772 :                                         assert(BATcount(ins) >= cur->end);
    4006      269772 :                                         ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
    4007      269772 :                                         bat_destroy(ins);
    4008             :                                 }
    4009     2084673 :                                 lock_table(tr->store, t->base.id);
    4010             :                         }
    4011             :                 }
    4012      350088 :                 unlock_table(tr->store, t->base.id);
    4013             : 
    4014      350088 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    4015          19 :                         BAT *ins = temp_descriptor(cs->ebid);
    4016          19 :                         if (ins == NULL)
    4017             :                                 return LOG_ERR;
    4018          19 :                         if (BATcount(ins) > ins->batInserted)
    4019          17 :                                 ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
    4020          19 :                         BATcommit(ins, BATcount(ins));
    4021          19 :                         bat_destroy(ins);
    4022             :                 }
    4023             :         }
    4024             : 
    4025       58769 :         if (t->idxs) {
    4026       63953 :                 for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
    4027        5184 :                         sql_idx *i = n->data;
    4028             : 
    4029        5184 :                         if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    4030        4406 :                                 continue;
    4031         778 :                         column_storage *cs = ATOMIC_PTR_GET(&i->data);
    4032             : 
    4033         778 :                         if (cs) {
    4034         778 :                                 if (cs->cleared) {
    4035           0 :                                         ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    4036           0 :                                         continue;
    4037             :                                 }
    4038             : 
    4039         778 :                                 lock_table(tr->store, t->base.id);
    4040        2387 :                                 for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    4041        1609 :                                         unlock_table(tr->store, t->base.id);
    4042        1609 :                                         if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4043             :                                                 /* append idx */
    4044         730 :                                                 BAT *ins = temp_descriptor(cs->bid);
    4045         730 :                                                 if (ins == NULL)
    4046             :                                                         return LOG_ERR;
    4047         730 :                                                 assert(BATcount(ins) >= cur->end);
    4048         730 :                                                 ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
    4049         730 :                                                 bat_destroy(ins);
    4050             :                                         }
    4051        1609 :                                         lock_table(tr->store, t->base.id);
    4052             :                                 }
    4053         778 :                                 unlock_table(tr->store, t->base.id);
    4054             :                         }
    4055             :                 }
    4056             :         }
    4057             : 
    4058       58769 :         if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
    4059           0 :                 return LOG_ERR;
    4060             : 
    4061             :         return LOG_OK;
    4062             : }
    4063             : 
    4064             : static int
    4065       84724 : log_storage(sql_trans *tr, sql_table *t, storage *s)
    4066             : {
    4067       84724 :         int ok = LOG_OK;
    4068       84724 :         bool cleared = s->cs.cleared;
    4069       84724 :         if (ok == LOG_OK && cleared)
    4070       25955 :                 ok =  tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
    4071       25955 :         if (ok == LOG_OK)
    4072       84724 :                 ok = log_segments(tr, s->segs, t->base.id);
    4073       84724 :         if (ok == LOG_OK && !cleared)
    4074       58769 :                 ok = log_table_append(tr, t, s->segs);
    4075       84724 :         return ok;
    4076             : }
    4077             : 
    4078             : static void
    4079      420688 : merge_cs( column_storage *cs, const char* caller)
    4080             : {
    4081      420688 :         if (cs->bid && cs->ucnt) {
    4082        2809 :                 BAT *cur = temp_descriptor(cs->bid);
    4083        2809 :                 BAT *ui = temp_descriptor(cs->uibid);
    4084        2809 :                 BAT *uv = temp_descriptor(cs->uvbid);
    4085             : 
    4086        2809 :                 if (!cur || !ui || !uv) {
    4087           0 :                         bat_destroy(ui);
    4088           0 :                         bat_destroy(uv);
    4089           0 :                         bat_destroy(cur);
    4090           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4091             :                         return;
    4092             :                 }
    4093        2809 :                 assert(BATcount(ui) == BATcount(uv));
    4094             : 
    4095             :                 /* any updates */
    4096        2809 :                 assert(!isEbat(cur));
    4097        2809 :                 if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
    4098           0 :                         bat_destroy(ui);
    4099           0 :                         bat_destroy(uv);
    4100           0 :                         bat_destroy(cur);
    4101           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4102             :                         return;
    4103             :                 }
    4104             :                 /* cleanup the old deltas */
    4105        2809 :                 temp_destroy(cs->uibid);
    4106        2809 :                 temp_destroy(cs->uvbid);
    4107        2809 :                 cs->uibid = e_bat(TYPE_oid);
    4108        2809 :                 cs->uvbid = e_bat(cur->ttype);
    4109        2809 :                 assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
    4110        2809 :                 cs->ucnt = 0;
    4111        2809 :                 bat_destroy(ui);
    4112        2809 :                 bat_destroy(uv);
    4113        2809 :                 bat_destroy(cur);
    4114             :         }
    4115      420688 :         cs->cleared = false;
    4116      420688 :         cs->merged = true;
    4117      420688 :         return;
    4118             : }
    4119             : 
    4120             : static lng
    4121      378432 : merge_delta( sql_delta *obat)
    4122             : {
    4123      378432 :         lng res = 0;
    4124      378432 :         if (obat && obat->next && !obat->cs.merged)
    4125      132832 :                 res += merge_delta(obat->next);
    4126      378432 :         res += obat->cs.ucnt;
    4127      378432 :         merge_cs(&obat->cs, __func__);
    4128      378432 :         return res;
    4129             : }
    4130             : 
    4131             : static void
    4132       42256 : merge_storage(storage *tdb)
    4133             : {
    4134       42256 :         merge_cs(&tdb->cs, __func__);
    4135             : 
    4136       42256 :         if (tdb->next) {
    4137         310 :                 destroy_storage(tdb->next);
    4138         310 :                 tdb->next = NULL;
    4139             :         }
    4140       42256 : }
    4141             : 
    4142             : static sql_delta *
    4143           1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
    4144             : {
    4145             :         /* commit ie copy back to the parent transaction */
    4146           1 :         if (delta && delta->cs.ts == commit_ts && delta->next) {
    4147           1 :                 sql_delta *od = delta->next;
    4148           1 :                 if (od->cs.ts == commit_ts) {
    4149           0 :                         sql_delta t = *od, *n = od->next;
    4150           0 :                         *od = *delta;
    4151           0 :                         od->next = n;
    4152           0 :                         *delta = t;
    4153           0 :                         delta->next = NULL;
    4154           0 :                         destroy_delta(delta, true);
    4155           0 :                         return od;
    4156             :                 }
    4157             :         }
    4158             :         return delta;
    4159             : }
    4160             : 
    4161             : static int
    4162      132796 : log_update_col( sql_trans *tr, sql_change *change)
    4163             : {
    4164      132796 :         sql_column *c = (sql_column*)change->obj;
    4165      132796 :         assert(!isTempTable(c->t));
    4166             : 
    4167      132796 :         if (isDeleted(c->t)) {
    4168           0 :                 change->handled = true;
    4169           0 :                 return LOG_OK;
    4170             :         }
    4171             : 
    4172      132796 :         if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
    4173      132796 :                 storage *s = ATOMIC_PTR_GET(&c->t->data);
    4174      132796 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    4175      132796 :                 return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
    4176             :         }
    4177             :         return LOG_OK;
    4178             : }
    4179             : 
    4180             : static int
    4181         217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
    4182             : {
    4183         217 :         sqlstore *store = Store;
    4184             : 
    4185         217 :         sql_delta *d = (sql_delta*)change->data;
    4186         217 :         if (d->cs.ts < oldest) {
    4187          82 :                 destroy_delta(d, false);
    4188          82 :                 if (change->commit == &commit_update_idx)
    4189           2 :                         table_destroy(store, ((sql_idx*)change->obj)->t);
    4190             :                 else
    4191          80 :                         table_destroy(store, ((sql_column*)change->obj)->t);
    4192          82 :                 return 1;
    4193             :         }
    4194         135 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4195          82 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4196             :         return 0;
    4197             : }
    4198             : 
    4199             : static int
    4200           9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
    4201             : {
    4202           9 :         sqlstore *store = Store;
    4203             : 
    4204           9 :         storage *d = (storage*)change->data;
    4205           9 :         if (d->cs.ts < oldest) {
    4206           3 :                 destroy_storage(d);
    4207           3 :                 table_destroy(store, (sql_table*)change->obj);
    4208           3 :                 return 1;
    4209             :         }
    4210           6 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4211           3 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4212             :         return 0;
    4213             : }
    4214             : 
    4215             : static int
    4216      132914 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
    4217             : {
    4218      132914 :         (void) type; // TODO transaction_layer_revamp remove if remains unused
    4219             : 
    4220      132914 :         sql_delta *delta = ATOMIC_PTR_GET(data), *idelta = delta;
    4221             : 
    4222      132914 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4223           3 :                 int ok = LOG_OK;
    4224           3 :                 assert(isTempTable(t));
    4225           3 :                 if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
    4226           0 :                         ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
    4227           3 :                 if (!tr->parent)
    4228           0 :                         t->base.new = base->new = 0;
    4229           3 :                 change->handled = true;
    4230           3 :                 return ok;
    4231             :         }
    4232             : 
    4233      132911 :         if (commit_ts)
    4234      132829 :                 delta->cs.ts = commit_ts;
    4235      132911 :         if (!commit_ts) { /* rollback */
    4236          82 :                 sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
    4237             : 
    4238          82 :                 if (change->ts && t->base.new) /* handled by create col */
    4239             :                         return LOG_OK;
    4240          82 :                 if (o != d) {
    4241           0 :                         while(o && o->next != d)
    4242             :                                 o = o->next;
    4243             :                 }
    4244          82 :                 if (o == ATOMIC_PTR_GET(data))
    4245          82 :                         ATOMIC_PTR_SET(data, d->next);
    4246             :                 else
    4247           0 :                         o->next = d->next;
    4248          82 :                 d->next = NULL;
    4249          82 :                 change->cleanup = &tc_gc_rollbacked;
    4250      132829 :         } else if (!tr->parent) {
    4251             :                 /* merge deltas */
    4252      277977 :                 while (delta && delta->cs.ts > oldest)
    4253      145149 :                         delta = delta->next;
    4254      132828 :                 if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
    4255       27589 :                         lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
    4256       27589 :                         idelta->nr_updates += merge_delta(delta);
    4257       27589 :                         unlock_column(tr->store, base->id);
    4258             :                 }
    4259           1 :         } else if (tr->parent) /* move delta into older and cleanup current save points */
    4260           1 :                 ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
    4261             :         return LOG_OK;
    4262             : }
    4263             : 
    4264             : static int
    4265      132886 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4266             : {
    4267             : 
    4268      132886 :         sql_column *c = (sql_column*)change->obj;
    4269      132886 :         sql_base* base = &c->base;
    4270      132886 :         sql_table* t = c->t;
    4271      132886 :         ATOMIC_PTR_TYPE* data = &c->data;
    4272      132886 :         int type = c->type.type->localtype;
    4273             : 
    4274      132886 :         if (change->handled || isDeleted(c->t))
    4275             :                 return LOG_OK;
    4276             : 
    4277      132886 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4278             : }
    4279             : 
    4280             : static int
    4281          26 : log_update_idx( sql_trans *tr, sql_change *change)
    4282             : {
    4283          26 :         sql_idx *i = (sql_idx*)change->obj;
    4284          26 :         assert(!isTempTable(i->t));
    4285             : 
    4286          26 :         if (isDeleted(i->t)) {
    4287           0 :                 change->handled = true;
    4288           0 :                 return LOG_OK;
    4289             :         }
    4290             : 
    4291          26 :         if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
    4292          26 :                 storage *s = ATOMIC_PTR_GET(&i->t->data);
    4293          26 :                 sql_delta *d = ATOMIC_PTR_GET(&i->data);
    4294          26 :                 return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
    4295             :         }
    4296             :         return LOG_OK;
    4297             : }
    4298             : 
    4299             : static int
    4300          28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4301             : {
    4302          28 :         sql_idx *i = (sql_idx*)change->obj;
    4303          28 :         sql_base* base = &i->base;
    4304          28 :         sql_table* t = i->t;
    4305          28 :         ATOMIC_PTR_TYPE* data = &i->data;
    4306          28 :         int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
    4307             : 
    4308          28 :         if (change->handled || isDeleted(i->t))
    4309             :                 return LOG_OK;
    4310             : 
    4311          28 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4312             : }
    4313             : 
    4314             : static storage *
    4315          26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
    4316             : {
    4317          26 :         if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
    4318           0 :                 storage *od = dbat->next;
    4319           0 :                 if (od->cs.ts == commit_ts) {
    4320           0 :                         storage t = *od, *n = od->next;
    4321           0 :                         *od = *dbat;
    4322           0 :                         od->next = n;
    4323           0 :                         *dbat = t;
    4324           0 :                         dbat->next = NULL;
    4325           0 :                         destroy_storage(dbat);
    4326           0 :                         return od;
    4327             :                 }
    4328             :         }
    4329             :         return dbat;
    4330             : }
    4331             : 
    4332             : static int
    4333       84724 : log_update_del( sql_trans *tr, sql_change *change)
    4334             : {
    4335       84724 :         sql_table *t = (sql_table*)change->obj;
    4336       84724 :         assert(!isTempTable(t));
    4337             : 
    4338       84724 :         if (isDeleted(t)) {
    4339           0 :                 change->handled = true;
    4340           0 :                 return LOG_OK;
    4341             :         }
    4342             : 
    4343       84724 :         if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
    4344       84724 :                 return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
    4345             :         return LOG_OK;
    4346             : }
    4347             : 
    4348             : static int
    4349       89354 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4350             : {
    4351       89354 :         int ok = LOG_OK;
    4352       89354 :         sql_table *t = (sql_table*)change->obj;
    4353       89354 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    4354             : 
    4355       89354 :         if (change->handled || isDeleted(t))
    4356             :                 return ok;
    4357             : 
    4358       89354 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4359          22 :                 assert(isTempTable(t));
    4360          22 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    4361          22 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    4362          22 :                 change->handled = true;
    4363          22 :                 return ok;
    4364             :         }
    4365             : 
    4366       89332 :         lock_table(tr->store, t->base.id);
    4367       89332 :         if (!commit_ts) { /* rollback */
    4368        4308 :                 if (dbat->cs.ts == tr->tid) {
    4369           6 :                         if (change->ts && t->base.new) { /* handled by the create table */
    4370           3 :                                 unlock_table(tr->store, t->base.id);
    4371           3 :                                 return ok;
    4372             :                         }
    4373           3 :                         storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
    4374             : 
    4375           3 :                         if (o != d) {
    4376           0 :                                 while(o && o->next != d)
    4377             :                                         o = o->next;
    4378             :                         }
    4379           3 :                         if (o == ATOMIC_PTR_GET(&t->data)) {
    4380           3 :                                 assert(d->next);
    4381           3 :                                 ATOMIC_PTR_SET(&t->data, d->next);
    4382             :                         } else
    4383           0 :                                 o->next = d->next;
    4384           3 :                         d->next = NULL;
    4385           3 :                         change->cleanup = &tc_gc_rollbacked_storage;
    4386             :                 } else
    4387        4302 :                         rollback_segments(dbat->segs, tr, change, oldest);
    4388       85024 :         } else if (ok == LOG_OK && !tr->parent) {
    4389       84998 :                 if (dbat->cs.ts == tr->tid) /* cleared table */
    4390       25957 :                         dbat->cs.ts = commit_ts;
    4391             : 
    4392       84998 :                 ok = segments2cs(tr, dbat->segs, &dbat->cs);
    4393       84998 :                 if (ok == LOG_OK) {
    4394       84998 :                         merge_segments(dbat, tr, change, commit_ts, oldest);
    4395       84998 :                         if (oldest == commit_ts)
    4396       42256 :                                 merge_storage(dbat);
    4397             :                 }
    4398       84998 :                 if (dbat)
    4399       84998 :                         dbat->cs.cleared = false;
    4400          26 :         } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
    4401          26 :                 merge_segments(dbat, tr, change, commit_ts, oldest);
    4402          26 :                 ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
    4403          26 :                 storage *s = change->data;
    4404          26 :                 if (s->cs.ts == tr->tid)
    4405           0 :                         s->cs.ts = commit_ts;
    4406             :         }
    4407       89329 :         unlock_table(tr->store, t->base.id);
    4408       89329 :         return ok;
    4409             : }
    4410             : 
    4411             : /* only rollback (content version) case for now */
    4412             : static int
    4413       19556 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
    4414             : {
    4415       19556 :         sqlstore *store = Store;
    4416       19556 :         sql_column *c = (sql_column*)change->obj;
    4417             : 
    4418       19556 :         if (!c) /* cleaned earlier */
    4419             :                 return 1;
    4420             : 
    4421         172 :         if (change->handled || isDeleted(c->t)) {
    4422           0 :                 column_destroy(store, c);
    4423           0 :                 return 1;
    4424             :         }
    4425             : 
    4426             :         /* savepoint commit (did it merge ?) */
    4427         172 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4428           0 :                 column_destroy(store, c);
    4429           0 :                 return 1;
    4430             :         }
    4431         172 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4432             :                 return 0;
    4433         171 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4434         171 :         if (d && d->next) {
    4435             : 
    4436          66 :                 if (d->cs.ts > oldest)
    4437             :                         return LOG_OK; /* cannot cleanup yet */
    4438             : 
    4439             :                 // d is oldest reachable delta
    4440          63 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4441          62 :                         destroy_delta(d->next, true);
    4442          62 :                         d->next = NULL;
    4443             :                 }
    4444          63 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4445          63 :                 id->nr_updates += merge_delta(d);
    4446          63 :                 unlock_column(store, c->base.id);
    4447             :         }
    4448         168 :         column_destroy(store, c);
    4449         168 :         return 1;
    4450             : }
    4451             : 
    4452             : static int
    4453      787107 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
    4454             : {
    4455      787107 :         sqlstore *store = Store;
    4456      787107 :         sql_column *c = (sql_column*)change->obj;
    4457             : 
    4458      787107 :         if (!c) /* cleaned earlier */
    4459             :                 return 1;
    4460             : 
    4461      787107 :         if (change->handled || isDeleted(c->t)) {
    4462           3 :                 table_destroy(store, c->t);
    4463           3 :                 return 1;
    4464             :         }
    4465             : 
    4466             :         /* savepoint commit (did it merge ?) */
    4467      787104 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4468       34504 :                 table_destroy(store, c->t);
    4469       34504 :                 return 1;
    4470             :         }
    4471      752600 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4472             :                 return 0;
    4473      752599 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4474      752599 :         if (d && d->next) {
    4475             : 
    4476      752599 :                 if (d->cs.ts > oldest)
    4477             :                         return LOG_OK; /* cannot cleanup yet */
    4478             : 
    4479             :                 // d is oldest reachable delta
    4480       98235 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4481        4497 :                         destroy_delta(d->next, true);
    4482        4497 :                         d->next = NULL;
    4483             :                 }
    4484       98235 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4485       98235 :                 id->nr_updates += merge_delta(d);
    4486       98235 :                 unlock_column(store, c->base.id);
    4487             :         }
    4488       98235 :         table_destroy(store, c->t);
    4489       98235 :         return 1;
    4490             : }
    4491             : 
    4492             : static int
    4493        2593 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
    4494             : {
    4495        2593 :         sqlstore *store = Store;
    4496        2593 :         sql_idx *i = (sql_idx*)change->obj;
    4497             : 
    4498        2593 :         if (!i) /* cleaned earlier */
    4499             :                 return 1;
    4500             : 
    4501         633 :         if (change->handled || isDeleted(i->t)) {
    4502           0 :                 idx_destroy(store, i);
    4503           0 :                 return 1;
    4504             :         }
    4505             : 
    4506             :         /* savepoint commit (did it merge ?) */
    4507         633 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4508           0 :                 idx_destroy(store, i);
    4509           0 :                 return 1;
    4510             :         }
    4511         633 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4512             :                 return 0;
    4513         633 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4514         633 :         if (d && d->next) {
    4515           0 :                 if (d->cs.ts > oldest)
    4516             :                         return LOG_OK; /* cannot cleanup yet */
    4517             : 
    4518             :                 // d is oldest reachable delta
    4519           0 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4520           0 :                         destroy_delta(d->next, true);
    4521           0 :                         d->next = NULL;
    4522             :                 }
    4523           0 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4524           0 :                 id->nr_updates += merge_delta(d);
    4525           0 :                 unlock_column(store, i->base.id);
    4526             :         }
    4527         633 :         idx_destroy(store, i);
    4528         633 :         return 1;
    4529             : }
    4530             : 
    4531             : static int
    4532          26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
    4533             : {
    4534          26 :         sqlstore *store = Store;
    4535          26 :         sql_idx *i = (sql_idx*)change->obj;
    4536             : 
    4537          26 :         if (!i) /* cleaned earlier */
    4538             :                 return 1;
    4539             : 
    4540          26 :         if (change->handled || isDeleted(i->t)) {
    4541           0 :                 table_destroy(store, i->t);
    4542           0 :                 return 1;
    4543             :         }
    4544             : 
    4545             :         /* savepoint commit (did it merge ?) */
    4546          26 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4547           0 :                 table_destroy(store, i->t);
    4548           0 :                 return 1;
    4549             :         }
    4550          26 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4551             :                 return 0;
    4552          26 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4553          26 :         if (d && d->next) {
    4554          26 :                 if (d->cs.ts > oldest)
    4555             :                         return LOG_OK; /* cannot cleanup yet */
    4556             : 
    4557             :                 // d is oldest reachable delta
    4558          26 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4559          26 :                         destroy_delta(d->next, true);
    4560          26 :                         d->next = NULL;
    4561             :                 }
    4562          26 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4563          26 :                 id->nr_updates += merge_delta(d);
    4564          26 :                 unlock_column(store, i->base.id);
    4565             :         }
    4566          26 :         table_destroy(store, i->t);
    4567          26 :         return 1;
    4568             : }
    4569             : 
    4570             : static int
    4571      231434 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
    4572             : {
    4573      231434 :         sqlstore *store = Store;
    4574      231434 :         sql_table *t = (sql_table*)change->obj;
    4575             : 
    4576      231434 :         if (change->handled || isDeleted(t)) {
    4577        3530 :                 table_destroy(store, t);
    4578        3530 :                 return 1;
    4579             :         }
    4580             :         /* savepoint commit (did it merge ?) */
    4581      227904 :         if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
    4582        6981 :                 table_destroy(store, t);
    4583        6981 :                 return 1;
    4584             :         }
    4585      220923 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4586             :                 return 0;
    4587      220895 :         storage *d = (storage*)change->data;
    4588      220895 :         if (d->next) {
    4589      148851 :                 if (d->cs.ts > oldest)
    4590             :                         return LOG_OK; /* cannot cleanup yet */
    4591             : 
    4592       18667 :                 destroy_storage(d->next);
    4593       18667 :                 d->next = NULL;
    4594             :         }
    4595       90711 :         table_destroy(store, t);
    4596       90711 :         return 1;
    4597             : }
    4598             : 
    4599             : static int
    4600       30465 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
    4601             : {
    4602       30465 :         if (nr == 0)
    4603             :                 return LOG_OK;
    4604       30465 :         assert (nr > 0);
    4605       30465 :         if ((!offsets || !*offsets) && nr == total) {
    4606       30460 :                 *offset = slot;
    4607       30460 :                 return LOG_OK;
    4608             :         }
    4609           5 :         if (!*offsets) {
    4610           7 :                 *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
    4611           7 :                 if (!*offsets)
    4612             :                         return LOG_ERR;
    4613             :         }
    4614           5 :         oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
    4615       16144 :         for(size_t i = 0; i < nr; i++)
    4616       16139 :                 dst[i] = slot + i;
    4617           5 :         (*offsets)->batCount += nr;
    4618           5 :         (*offsets)->theap->dirty = true;
    4619           5 :         return LOG_OK;
    4620             : }
    4621             : 
    4622             : static int
    4623       30362 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4624             : {
    4625       30362 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4626       30627 :         assert(s->segs);
    4627       30627 :         ulng oldest = store_oldest(tr->store, NULL);
    4628       30420 :         BUN slot = 0;
    4629       30420 :         size_t total = cnt;
    4630             : 
    4631       30420 :         if (!locked)
    4632       30410 :                 lock_table(tr->store, t->base.id);
    4633             :         /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
    4634       61276 :         for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4635       30715 :                 if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* reuse old deleted or rolled back append */
    4636          35 :                         if ((seg->end - seg->start) >= cnt) {
    4637             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4638          13 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4639           2 :                                         slot = p->end;
    4640           2 :                                         p->end += cnt;
    4641           2 :                                         seg->start += cnt;
    4642           2 :                                         if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
    4643             :                                                 ok = LOG_ERR;
    4644             :                                                 break;
    4645             :                                         }
    4646           2 :                                         s->segs->nr_reused += cnt;
    4647           2 :                                         cnt = 0;
    4648           2 :                                         break;
    4649             :                                 }
    4650             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4651          11 :                                 size_t rcnt = seg->end - seg->start;
    4652          11 :                                 if (rcnt > cnt)
    4653             :                                         rcnt = cnt;
    4654          11 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
    4655             :                                         ok = LOG_ERR;
    4656             :                                         break;
    4657             :                                 }
    4658             :                         }
    4659          33 :                         seg->ts = tr->tid;
    4660          33 :                         seg->deleted = false;
    4661          33 :                         slot = seg->start;
    4662          33 :                         if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
    4663             :                                 ok = LOG_ERR;
    4664             :                                 break;
    4665             :                         }
    4666           0 :                         s->segs->nr_reused += (seg->end - seg->start);
    4667           0 :                         cnt -= (seg->end - seg->start);
    4668             :                 }
    4669             :         }
    4670       30563 :         if (ok == LOG_OK && cnt) {
    4671       30550 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4672       29444 :                         slot = s->segs->t->end;
    4673       29444 :                         s->segs->t->end += cnt;
    4674             :                 } else {
    4675        1106 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4676             :                                 ok = LOG_ERR;
    4677             :                         } else {
    4678        1157 :                                 if (!s->segs->h)
    4679           0 :                                         s->segs->h = s->segs->t;
    4680        1157 :                                 slot = s->segs->t->start;
    4681             :                         }
    4682             :                 }
    4683       30601 :                 if (ok == LOG_OK)
    4684       30601 :                         ok = add_offsets(slot, cnt, total, offset, offsets);
    4685             :         }
    4686       30458 :         if (!locked)
    4687       30375 :                 unlock_table(tr->store, t->base.id);
    4688             : 
    4689       30712 :         if (ok == LOG_OK) {
    4690             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4691       30633 :                 if (!in_transaction) {
    4692        1142 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4693        1142 :                         in_transaction = true;
    4694             :                 }
    4695       30633 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4696       30618 :                         tr->logchanges += (lng) total;
    4697       30632 :                 if (*offsets) {
    4698           6 :                         BAT *pos = *offsets;
    4699           6 :                         assert(BATcount(pos) == total);
    4700           6 :                         BATsetcount(pos, total); /* set other properties */
    4701           7 :                         pos->tnil = false;
    4702           7 :                         pos->tnonil = true;
    4703           7 :                         pos->tkey = true;
    4704           7 :                         pos->tsorted = true;
    4705           7 :                         pos->trevsorted = false;
    4706             :                 }
    4707             :         }
    4708       30712 :         return ok;
    4709             : }
    4710             : 
    4711             : static int
    4712     2068846 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4713             : {
    4714     2068846 :         if (cnt > 1 && offsets)
    4715       30322 :                 return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
    4716     2038524 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4717     2038621 :         assert(s->segs);
    4718     2038621 :         ulng oldest = store_oldest(tr->store, NULL);
    4719     2038518 :         BUN slot = 0;
    4720     2038518 :         int reused = 0;
    4721             : 
    4722     2038518 :         if (!locked)
    4723     1912307 :                 lock_table(tr->store, t->base.id);
    4724             :         /* naive vacuum approach, iterator through segments, check for large enough deleted segments
    4725             :          * or create new segment at the end */
    4726     9896361 :         for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4727     7930550 :                 if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* reuse old deleted or rolled back append */
    4728             : 
    4729       72838 :                         if ((seg->end - seg->start) >= cnt) {
    4730             : 
    4731             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4732       72838 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4733       56427 :                                         slot = p->end;
    4734       56427 :                                         p->end += cnt;
    4735       56427 :                                         seg->start += cnt;
    4736       56427 :                                         s->segs->nr_reused += cnt;
    4737       56427 :                                         reused = 1;
    4738       56427 :                                         break;
    4739             :                                 }
    4740             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4741       16411 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
    4742             :                                         ok = LOG_ERR;
    4743             :                                         break;
    4744             :                                 }
    4745             :                         }
    4746       16411 :                         seg->ts = tr->tid;
    4747       16411 :                         seg->deleted = false;
    4748       16411 :                         slot = seg->start;
    4749       16411 :                         s->segs->nr_reused += cnt;
    4750       16411 :                         reused = 1;
    4751       16411 :                         break;
    4752             :                 }
    4753             :         }
    4754     2038649 :         if (ok == LOG_OK && !reused) {
    4755     1965810 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4756     1930819 :                         slot = s->segs->t->end;
    4757     1930819 :                         s->segs->t->end += cnt;
    4758             :                 } else {
    4759       34991 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4760             :                                 ok = LOG_ERR;
    4761             :                         } else {
    4762       34991 :                                 if (!s->segs->h)
    4763           0 :                                         s->segs->h = s->segs->t;
    4764       34991 :                                 slot = s->segs->t->start;
    4765             :                         }
    4766             :                 }
    4767             :         }
    4768     2038649 :         if (!locked)
    4769     1912438 :                 unlock_table(tr->store, t->base.id);
    4770             : 
    4771     2038649 :         if (ok == LOG_OK) {
    4772             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4773     2038648 :                 if (!in_transaction) {
    4774       49034 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4775       49034 :                         in_transaction = true;
    4776             :                 }
    4777     2038648 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4778     2038216 :                         tr->logchanges += (lng) cnt;
    4779     2038647 :                 *offset = slot;
    4780             :         }
    4781             :         return ok;
    4782             : }
    4783             : 
    4784             : /*
    4785             :  * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
    4786             :  * of the global transaction and mark the newly added storage slots unused on the global
    4787             :  * level but used on the local transaction level. Besides this the local transaction needs
    4788             :  * to update (and mark unused) any slot in between the old end and new slots.
    4789             :  * */
    4790             : static int
    4791     1942685 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4792             : {
    4793     1942685 :         storage *s;
    4794             : 
    4795             :         /* we have a single segment structure for each persistent table
    4796             :          * for temporary tables each has its own */
    4797     1942685 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4798             :                 return LOG_ERR;
    4799             : 
    4800     1942799 :         return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
    4801             : }
    4802             : 
    4803             : /* some tables cannot be updated concurrently (user/roles etc) */
    4804             : static int
    4805      126214 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4806             : {
    4807      126214 :         storage *s;
    4808      126214 :         int res = 0;
    4809             : 
    4810             :         /* we have a single segment structure for each persistent table
    4811             :          * for temporary tables each has its own */
    4812      126214 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4813             :                 /* TODO check for other inserts ! */
    4814             :                 return LOG_ERR;
    4815             : 
    4816      126214 :         lock_table(tr->store, t->base.id);
    4817      126214 :         if ((res = segments_conflict(tr, s->segs, 1))) {
    4818           4 :                 unlock_table(tr->store, t->base.id);
    4819           4 :                 return LOG_CONFLICT;
    4820             :         }
    4821      126210 :         res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
    4822      126210 :         unlock_table(tr->store, t->base.id);
    4823      126210 :         return res;
    4824             : }
    4825             : 
    4826             : static int
    4827       23101 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
    4828             : {
    4829       23101 :         storage *s;
    4830       23101 :         int res = 0;
    4831             : 
    4832       23101 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4833             :                 return LOG_ERR;
    4834             : 
    4835       23101 :         lock_table(tr->store, t->base.id);
    4836       23101 :         res = segments_conflict(tr, s->segs, uncommitted);
    4837       23101 :         unlock_table(tr->store, t->base.id);
    4838       23101 :         return res ? LOG_CONFLICT : LOG_OK;
    4839             : }
    4840             : 
    4841             : static size_t
    4842     1601906 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
    4843             : {
    4844     1601906 :         size_t cnt = 0;
    4845             : 
    4846     1952342 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
    4847             :                 ;
    4848             : 
    4849     3976967 :         for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
    4850     2375055 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
    4851      157273 :                         cnt += s->end - s->start;
    4852             :         }
    4853     1601912 :         return cnt;
    4854             : }
    4855             : 
    4856             : static BAT *
    4857     1601850 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
    4858             : {
    4859     1601850 :         lock_table(tr->store, t->base.id);
    4860     1601918 :         segment *s = S->segs->h;
    4861             :         /* step one no deletes -> dense range */
    4862     1601918 :         uint32_t cur = 0;
    4863     1601918 :         size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
    4864     1601915 :         if (!dnr) {
    4865     1471356 :                 unlock_table(tr->store, t->base.id);
    4866     1471356 :                 return BATdense(start, start, end-start);
    4867             :         }
    4868             : 
    4869      130559 :         BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
    4870      130559 :         if (!b) {
    4871           0 :                 unlock_table(tr->store, t->base.id);
    4872           0 :                 return NULL;
    4873             :         }
    4874             : 
    4875      130559 :         uint32_t *restrict dst = Tloc(b, 0);
    4876    62272427 :         for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
    4877    62208631 :                 if (s->end < start)
    4878      263659 :                         continue;
    4879    61944972 :                 if (s->start >= end)
    4880             :                         break;
    4881    61878209 :                 msk m = (SEG_IS_VALID(s, tr));
    4882    61878209 :                 size_t lnr = s->end-s->start;
    4883    61878209 :                 if (s->start < start)
    4884       21469 :                         lnr -= (start - s->start);
    4885    61878209 :                 if (s->end > end)
    4886       15305 :                         lnr -= s->end - end;
    4887             : 
    4888    61878209 :                 if (m) {
    4889     1390682 :                         size_t used = pos&31, end = 32;
    4890     1390682 :                         if (used) {
    4891     1228522 :                                 if (lnr < (32-used))
    4892      745012 :                                         end = used + lnr;
    4893     1228522 :                                 assert(end > used);
    4894     1228522 :                                 cur |= ((1U << (end - used)) - 1) << used;
    4895     1228522 :                                 lnr -= end - used;
    4896     1228522 :                                 pos += end - used;
    4897     1228522 :                                 if (end == 32) {
    4898      483544 :                                         *dst++ = cur;
    4899      483544 :                                         cur = 0;
    4900             :                                 }
    4901             :                         }
    4902     1390682 :                         size_t full = lnr/32;
    4903     1390682 :                         size_t rest = lnr%32;
    4904     1390682 :                         if (full > 0) {
    4905      350904 :                                 memset(dst, ~0, full * sizeof(*dst));
    4906      350904 :                                 dst += full;
    4907      350904 :                                 lnr -= full * 32;
    4908      350904 :                                 pos += full * 32;
    4909             :                         }
    4910     1390682 :                         if (rest > 0) {
    4911      610665 :                                 cur |= (1U << rest) - 1;
    4912      610665 :                                 lnr -= rest;
    4913      610665 :                                 pos += rest;
    4914             :                         }
    4915     1390682 :                         assert(lnr==0);
    4916             :                 } else {
    4917    60487527 :                         size_t used = pos&31, end = 32;
    4918    60487527 :                         if (used) {
    4919    58610196 :                                 if (lnr < (32-used))
    4920    56628570 :                                         end = used + lnr;
    4921             : 
    4922    58610196 :                                 pos+= (end-used);
    4923    58610196 :                                 lnr-= (end-used);
    4924    58610196 :                                 if (end == 32) {
    4925     1981653 :                                         *dst++ = cur;
    4926     1981653 :                                         cur = 0;
    4927             :                                 }
    4928             :                         }
    4929    60487527 :                         size_t full = lnr/32;
    4930    60487527 :                         size_t rest = lnr%32;
    4931    60487527 :                         memset(dst, 0, full * sizeof(*dst));
    4932    60487527 :                         dst += full;
    4933    60487527 :                         lnr -= full * 32;
    4934    60487527 :                         pos += full * 32;
    4935    60487527 :                         pos+= rest;
    4936    60487527 :                         lnr-= rest;
    4937    60487527 :                         assert(lnr==0);
    4938             :                 }
    4939             :         }
    4940             : 
    4941      130559 :         unlock_table(tr->store, t->base.id);
    4942      130558 :         if (pos%32)
    4943      128253 :                 *dst=cur;
    4944      130558 :         BATsetcount(b, nr);
    4945      130558 :         bn = BATmaskedcands(start, nr, b, true);
    4946      130554 :         BBPreclaim(b);
    4947      130553 :         (void)pos;
    4948      130553 :         assert (pos == nr);
    4949             :         return bn;
    4950             : }
    4951             : 
    4952             : static void *                                   /* BAT * */
    4953     1607932 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
    4954             : {
    4955             :         /* with nr_of_parts - part_nr we can adjust parts */
    4956     1607932 :         storage *s = tab_timestamp_storage(tr, t);
    4957             : 
    4958     1608158 :         if (!s)
    4959             :                 return NULL;
    4960     1608158 :         size_t nr = segs_end(s->segs, tr, t);
    4961             : 
    4962     1608829 :         if (!nr)
    4963        6963 :                 return BATdense(0, 0, 0);
    4964             : 
    4965             :         /* compute proper part */
    4966     1601866 :         size_t part_size = nr/nr_of_parts;
    4967     1601866 :         size_t start = part_size * part_nr;
    4968     1601866 :         size_t end = start + part_size;
    4969     1601866 :         if (part_nr == (nr_of_parts-1))
    4970     1309864 :                 end = nr;
    4971     1601866 :         assert(end <= nr);
    4972     1601866 :         return segments2cands(s, tr, t, start, end);
    4973             : }
    4974             : 
    4975             : static int
    4976           5 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
    4977             : {
    4978           5 :         bool update_conflict = false;
    4979             : 
    4980           5 :         if (segments_in_transaction(tr, col->t))
    4981             :                 return LOG_CONFLICT;
    4982             : 
    4983           5 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    4984             : 
    4985           5 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    4986           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    4987           5 :         assert(d && d->cs.ts == tr->tid);
    4988           5 :         if (odelta != d)
    4989           5 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
    4990           5 :         if (d->cs.bid)
    4991           5 :                 temp_destroy(d->cs.bid);
    4992           5 :         if (d->cs.uibid)
    4993           5 :                 temp_destroy(d->cs.uibid);
    4994           5 :         if (d->cs.uvbid)
    4995           5 :                 temp_destroy(d->cs.uvbid);
    4996           5 :         bat_set_access(bn, BAT_READ);
    4997           5 :         d->cs.bid = temp_create(bn);
    4998           5 :         d->cs.uibid = 0;
    4999           5 :         d->cs.uvbid = 0;
    5000           5 :         d->cs.ucnt = 0;
    5001           5 :         d->cs.cleared = true;
    5002           5 :         d->cs.ts = tr->tid;
    5003           5 :         ATOMIC_INIT(&d->cs.refcnt, 1);
    5004           5 :         return LOG_OK;
    5005             : }
    5006             : 
    5007             : static int
    5008           5 : vacuum_col(sql_trans *tr, sql_column *c, bool force)
    5009             : {
    5010           5 :         if (segments_in_transaction(tr, c->t))
    5011             :                 return LOG_CONFLICT;
    5012             : 
    5013           5 :         sql_delta *d = NULL;
    5014             : 
    5015             :         /* do we have enough to clean */
    5016           5 :         if ((d = bind_col_data(tr, c, NULL)) == NULL)
    5017             :                 return LOG_CONFLICT;
    5018             : 
    5019             :         /* do we have enough to clean */
    5020           5 :         if (!force && (d->nr_updates) < 1024)
    5021             :                 return LOG_OK;
    5022             : 
    5023           5 :         BAT *b = NULL, *bn = NULL;;
    5024           5 :         if ((b = bind_col(tr, c, 0)) == NULL)
    5025             :                 return LOG_ERR;
    5026           5 :         if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
    5027           0 :                 BBPreclaim(b);
    5028           0 :                 return LOG_ERR;
    5029             :         }
    5030           5 :         int res = swap_bats(tr, c, bn);
    5031           5 :         d->nr_updates = 0;
    5032           5 :         BBPreclaim(b);
    5033           5 :         BBPreclaim(bn);
    5034           5 :         return res;
    5035             : }
    5036             : 
    5037             : static int
    5038           0 : vacuum_tab(sql_trans *tr, sql_table *t, bool force)
    5039             : {
    5040           0 :         if (segments_in_transaction(tr, t))
    5041             :                 return LOG_CONFLICT;
    5042             : 
    5043           0 :         storage *s;
    5044           0 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    5045             :                 return LOG_ERR;
    5046             : 
    5047           0 :         for( node *n = ol_first_node(t->columns); n; n = n->next) {
    5048           0 :                 sql_column *c = n->data;
    5049             : 
    5050           0 :                 if (!ATOMvarsized(c->type.type->localtype))
    5051           0 :                         continue;
    5052           0 :                 sql_delta *d = NULL;
    5053             : 
    5054             :                 /* do we have enough to clean */
    5055           0 :                 if ((d = bind_col_data(tr, c, NULL)) == NULL)
    5056             :                         return LOG_CONFLICT;
    5057             : 
    5058             :                 /* do we have enough to clean */
    5059           0 :                 if (!force && (d->nr_updates + s->segs->nr_reused) < 1024)
    5060           0 :                         continue;
    5061             : 
    5062           0 :                 BAT *b = NULL, *bn = NULL;;
    5063           0 :                 if ((b = bind_col(tr, c, 0)) == NULL)
    5064             :                         return LOG_ERR;
    5065           0 :                 if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
    5066           0 :                         BBPreclaim(b);
    5067           0 :                         return LOG_ERR;
    5068             :                 }
    5069           0 :                 int res = swap_bats(tr, c, bn);
    5070           0 :                 d->nr_updates = 0;
    5071           0 :                 BBPreclaim(b);
    5072           0 :                 BBPreclaim(bn);
    5073           0 :                 if (res != LOG_OK)
    5074           0 :                         return res;
    5075             :         }
    5076           0 :         s->segs->nr_reused = 0;
    5077           0 :         return LOG_OK;
    5078             : }
    5079             : 
    5080             : 
    5081             : static int
    5082          58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
    5083             : {
    5084          58 :         bool update_conflict = false;
    5085             : 
    5086          58 :         if (segments_in_transaction(tr, col->t))
    5087             :                 return LOG_CONFLICT;
    5088             : 
    5089          58 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    5090             : 
    5091          58 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    5092           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    5093          58 :         assert(d && d->cs.ts == tr->tid);
    5094          58 :         assert(col->t->persistence != SQL_DECLARED_TABLE);
    5095          58 :         if (odelta != d)
    5096          58 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
    5097             : 
    5098          58 :         d->cs.st = st;
    5099          58 :         d->cs.cleared = true;
    5100          58 :         if (d->cs.bid)
    5101          58 :                 temp_destroy(d->cs.bid);
    5102          58 :         o = transfer_to_systrans(o);
    5103          58 :         if (o == NULL)
    5104             :                 return LOG_ERR;
    5105          58 :         bat_set_access(o, BAT_READ);
    5106          58 :         d->cs.bid = temp_create(o);
    5107          58 :         if (u) {
    5108          53 :                 if (d->cs.ebid)
    5109           0 :                         temp_destroy(d->cs.ebid);
    5110          53 :                 u = transfer_to_systrans(u);
    5111          53 :                 if (u == NULL)
    5112             :                         return LOG_ERR;
    5113          53 :                 d->cs.ebid = temp_create(u);
    5114             :         }
    5115             :         return LOG_OK;
    5116             : }
    5117             : 
    5118             : void
    5119         352 : bat_storage_init( store_functions *sf)
    5120             : {
    5121         352 :         sf->bind_col = &bind_col;
    5122         352 :         sf->bind_updates = &bind_updates;
    5123         352 :         sf->bind_updates_idx = &bind_updates_idx;
    5124         352 :         sf->bind_idx = &bind_idx;
    5125         352 :         sf->bind_cands = &bind_cands;
    5126             : 
    5127         352 :         sf->claim_tab = &claim_tab;
    5128         352 :         sf->key_claim_tab = &key_claim_tab;
    5129         352 :         sf->tab_validate = &tab_validate;
    5130             : 
    5131         352 :         sf->append_col = &append_col;
    5132         352 :         sf->append_idx = &append_idx;
    5133             : 
    5134         352 :         sf->update_col = &update_col;
    5135         352 :         sf->update_idx = &update_idx;
    5136             : 
    5137         352 :         sf->delete_tab = &delete_tab;
    5138             : 
    5139         352 :         sf->count_del = &count_del;
    5140         352 :         sf->count_col = &count_col;
    5141         352 :         sf->count_idx = &count_idx;
    5142         352 :         sf->dcount_col = &dcount_col;
    5143         352 :         sf->min_max_col = &min_max_col;
    5144         352 :         sf->set_stats_col = &set_stats_col;
    5145         352 :         sf->sorted_col = &sorted_col;
    5146         352 :         sf->unique_col = &unique_col;
    5147         352 :         sf->double_elim_col = &double_elim_col;
    5148         352 :         sf->col_stats = &col_stats;
    5149         352 :         sf->col_set_range = &col_set_range;
    5150         352 :         sf->col_not_null = &col_not_null;
    5151             : 
    5152         352 :         sf->col_dup = &col_dup;
    5153         352 :         sf->idx_dup = &idx_dup;
    5154         352 :         sf->del_dup = &del_dup;
    5155             : 
    5156         352 :         sf->create_col = &create_col;    /* create and add to change list */
    5157         352 :         sf->create_idx = &create_idx;
    5158         352 :         sf->create_del = &create_del;
    5159             : 
    5160         352 :         sf->destroy_col = &destroy_col;  /* free resources */
    5161         352 :         sf->destroy_idx = &destroy_idx;
    5162         352 :         sf->destroy_del = &destroy_del;
    5163             : 
    5164         352 :         sf->drop_col = &drop_col;                /* add drop to change list */
    5165         352 :         sf->drop_idx = &drop_idx;
    5166         352 :         sf->drop_del = &drop_del;
    5167             : 
    5168         352 :         sf->clear_table = &clear_table;
    5169             : 
    5170         352 :         sf->vacuum_col = &vacuum_col;
    5171         352 :         sf->vacuum_tab = &vacuum_tab;
    5172         352 :         sf->col_compress = &col_compress;
    5173         352 : }

Generated by: LCOV version 1.14