LCOV - code coverage report
Current view: top level - sql/storage/bat - bat_storage.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 2455 3153 77.9 %
Date: 2024-04-25 20:03:45 Functions: 156 163 95.7 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "bat_storage.h"
      15             : #include "bat_utils.h"
      16             : #include "sql_string.h"
      17             : #include "gdk_atoms.h"
      18             : #include "gdk_atoms.h"
      19             : #include "matomic.h"
      20             : 
      21             : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
      22             : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
      23             : 
      24             : static int log_update_col( sql_trans *tr, sql_change *c);
      25             : static int log_update_idx( sql_trans *tr, sql_change *c);
      26             : static int log_update_del( sql_trans *tr, sql_change *c);
      27             : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      28             : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      29             : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      30             : static int log_create_col(sql_trans *tr, sql_change *change);
      31             : static int log_create_idx(sql_trans *tr, sql_change *change);
      32             : static int log_create_del(sql_trans *tr, sql_change *change);
      33             : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      34             : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      35             : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      36             : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
      37             : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
      38             : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
      39             : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
      40             : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
      41             : 
      42             : static void merge_delta( sql_delta *obat);
      43             : 
      44             : /* valid
      45             :  * !deleted && VALID_4_READ(TS, tr)                             existing or newly created segment
      46             :  *  deleted && TS > tr->ts && OLDTS < tr->ts                deleted after current transaction
      47             :  */
      48             : 
      49             : #define VALID_4_READ(TS,tr) \
      50             :         (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
      51             : 
      52             : /* when changed, check if the old status is still valid */
      53             : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
      54             :                 (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
      55             : 
      56             : #define SEG_VALID_4_DELETE(seg,tr) \
      57             :         (!seg->deleted && VALID_4_READ(seg->ts, tr))
      58             : 
      59             : /* Delete (in current trans or by some other finised transaction, or re-used segment which used to be deleted */
      60             : #define SEG_IS_DELETED(seg,tr) \
      61             :         ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
      62             :          (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
      63             : 
      64             : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
      65             : #define SEG_IS_VALID(seg, tr) \
      66             :                 ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
      67             :                  (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
      68             : 
      69             : static inline BAT *
      70        4871 : transfer_to_systrans(BAT *b)
      71             : {
      72             :         /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
      73        4871 :         MT_lock_set(&b->theaplock);
      74        4871 :         if (VIEWtparent(b) || VIEWvtparent(b)) {
      75          19 :                 MT_lock_unset(&b->theaplock);
      76          19 :                 BAT *bn = COLcopy(b, b->ttype, true, TRANSIENT);
      77          19 :                 BBPreclaim(b);
      78          19 :                 b = bn;
      79          19 :                 if (b == NULL)
      80             :                         return NULL;
      81          19 :                 MT_lock_set(&b->theaplock);
      82             :         }
      83        4871 :         if (b->theap->farmid == TRANSIENT ||
      84         185 :                 (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
      85        4686 :                 QryCtx *qc = MT_thread_get_qry_ctx();
      86        4686 :                 if (qc) {
      87        2468 :                         if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
      88        2468 :                                 ATOMIC_SUB(&qc->datasize, b->theap->size);
      89        2468 :                                 b->theap->farmid = SYSTRANS;
      90        2468 :                                 b->batRole = SYSTRANS;
      91             :                         }
      92        2468 :                         if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
      93        1069 :                                 ATOMIC_SUB(&qc->datasize, b->tvheap->size);
      94        1069 :                                 b->tvheap->farmid = SYSTRANS;
      95             :                         }
      96             :                 }
      97             :         }
      98        4871 :         MT_lock_unset(&b->theaplock);
      99        4871 :         return b;
     100             : }
     101             : 
     102             : static void
     103    20328425 : lock_table(sqlstore *store, sqlid id)
     104             : {
     105    20328425 :         MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     106    20330313 : }
     107             : 
     108             : static void
     109    20330234 : unlock_table(sqlstore *store, sqlid id)
     110             : {
     111    20330234 :         MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     112    20330547 : }
     113             : 
     114             : static void
     115    20402663 : lock_column(sqlstore *store, sqlid id)
     116             : {
     117    20402663 :         MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     118    20402208 : }
     119             : 
     120             : static void
     121    20402351 : unlock_column(sqlstore *store, sqlid id)
     122             : {
     123    20402351 :         MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     124    20402140 : }
     125             : 
     126             : static void
     127      110848 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     128             : {
     129      110848 :         assert(cleanup);
     130      110848 :         trans_add(tr, dup_base(b), data, cleanup, commit, log);
     131      110848 : }
     132             : 
     133             : static void
     134      132793 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     135             : {
     136      132793 :         assert(cleanup);
     137      132793 :         dup_base(&t->base);
     138      132792 :         trans_add(tr, b, data, cleanup, commit, log);
     139      132782 : }
     140             : 
     141             : static int
     142       78270 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
     143             : {
     144       78270 :         segment *s = change->data;
     145             : 
     146       78270 :         if (s->ts <= oldest) {
     147       33218 :                 while(s) {
     148       21064 :                         segment *n = s->prev;
     149       21064 :                         ATOMIC_PTR_DESTROY(&s->next);
     150       21064 :                         _DELETE(s);
     151       21064 :                         s = n;
     152             :                 }
     153       12154 :                 sqlstore *store = Store;
     154       12154 :                 table_destroy(store, (sql_table*)change->obj);
     155       12154 :                 return 1;
     156             :         }
     157             :         return LOG_OK;
     158             : }
     159             : 
     160             : static void
     161       21064 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
     162             : {
     163             :         /* we can only be accessed by anything older then commit_ts */
     164       21064 :         if (c->cleanup == &tc_gc_seg)
     165        8910 :                 s->prev = c->data;
     166             :         else
     167       12154 :                 c->cleanup = &tc_gc_seg;
     168       21064 :         c->data = s;
     169       21064 :         s->ts = commit_ts;
     170       16254 : }
     171             : 
     172             : static segment *
     173       86166 : new_segment(segment *o, sql_trans *tr, size_t cnt)
     174             : {
     175       86166 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     176             : 
     177       86166 :         assert(tr);
     178       86166 :         if (n) {
     179       86166 :                 n->ts = tr->tid;
     180       86166 :                 n->oldts = 0;
     181       86166 :                 n->deleted = false;
     182       86166 :                 if (o) {
     183       35248 :                         n->start = o->end;
     184       35248 :                         n->end = o->end + cnt;
     185             :                 } else {
     186       50918 :                         n->start = 0;
     187       50918 :                         n->end = cnt;
     188             :                 }
     189       86166 :                 ATOMIC_PTR_INIT(&n->next, NULL);
     190       86166 :                 n->prev = NULL;
     191       86166 :                 if (o)
     192       35248 :                         ATOMIC_PTR_SET(&o->next, n);
     193             :         }
     194       86166 :         return n;
     195             : }
     196             : 
     197             : static segment *
     198       80753 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
     199             : {
     200       80753 :         assert(tr);
     201       80753 :         if (o->start == start && o->end == start+cnt) {
     202        9682 :                 assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
     203        9682 :                 o->oldts = o->ts;
     204        9682 :                 o->ts = tr->tid;
     205        9682 :                 o->deleted = deleted;
     206        9682 :                 return o;
     207             :         }
     208       71071 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     209             : 
     210       71071 :         if (!n)
     211             :                 return NULL;
     212       71071 :         n->prev = NULL;
     213             : 
     214       71071 :         if (o->ts == tr->tid) {
     215        2737 :                 n->oldts = 0;
     216        2737 :                 n->ts = 1;
     217        2737 :                 n->deleted = true;
     218             :         } else {
     219       68334 :                 n->oldts = o->ts;
     220       68334 :                 n->ts = tr->tid;
     221       68334 :                 n->deleted = deleted;
     222             :         }
     223       71071 :         if (start == o->start) {
     224             :                 /* 2-way split: o remains latter part of segment, new one is
     225             :                  * inserted before */
     226       57740 :                 n->start = o->start;
     227       57740 :                 n->end = n->start + cnt;
     228       57740 :                 ATOMIC_PTR_INIT(&n->next, o);
     229       57740 :                 if (segs->h == o)
     230         430 :                         segs->h = n;
     231       57740 :                 if (p)
     232       57310 :                         ATOMIC_PTR_SET(&p->next, n);
     233       57740 :                 o->start = n->end;
     234       13331 :         } else if (start+cnt == o->end) {
     235             :                 /* 2-way split: o remains first part of segment, new one is
     236             :                  * added after */
     237        5384 :                 n->start = o->end - cnt;
     238        5384 :                 n->end = o->end;
     239        5384 :                 ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
     240        5384 :                 ATOMIC_PTR_SET(&o->next, n);
     241        5384 :                 if (segs->t == o)
     242         825 :                         segs->t = n;
     243        5384 :                 o->end = n->start;
     244             :         } else {
     245             :                 /* 3-way split: o remains first part of segment, two new ones
     246             :                  * are added after */
     247        7947 :                 segment *n2 = GDKmalloc(sizeof(segment));
     248        7947 :                 if (n2 == NULL) {
     249           0 :                         GDKfree(n);
     250           0 :                         return NULL;
     251             :                 }
     252        7947 :                 ATOMIC_PTR_INIT(&n->next, n2);
     253        7947 :                 n->start = start;
     254        7947 :                 n->end = start + cnt;
     255        7947 :                 *n2 = *o;
     256        7947 :                 ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
     257        7947 :                 n2->start = n->end;
     258        7947 :                 n2->prev = NULL;
     259        7947 :                 if (segs->t == o)
     260        2810 :                         segs->t = n2;
     261        7947 :                 ATOMIC_PTR_SET(&o->next, n);
     262        7947 :                 o->end = start;
     263             :         }
     264             :         return n;
     265             : }
     266             : 
     267             : static void
     268        4142 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
     269             : {
     270        4142 :         segment *cur = segs->h, *seg = NULL;
     271       17755 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     272       13613 :                 if (cur->ts == tr->tid) { /* revert */
     273        4633 :                         cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
     274        4633 :                         cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
     275        4633 :                         cur->oldts = 0;
     276             :                 }
     277       13613 :                 if (cur->ts <= oldest) { /* possibly merge range */
     278       13177 :                         if (!seg) { /* skip first */
     279             :                                 seg = cur;
     280        9035 :                         } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
     281             :                                 /* merge with previous */
     282        4810 :                                 seg->end = cur->end;
     283        4810 :                                 ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     284        4810 :                                 if (cur == segs->t)
     285        2839 :                                         segs->t = seg;
     286        4810 :                                 mark4destroy(cur, change, store_get_timestamp(tr->store));
     287        4810 :                                 cur = seg;
     288             :                         } else {
     289             :                                 seg = cur; /* begin of new merge */
     290             :                         }
     291             :                 }
     292             :         }
     293        4142 : }
     294             : 
     295             : static size_t
     296      100777 : segs_end_include_deleted( segments *segs, sql_trans *tr)
     297             : {
     298      100777 :         size_t cnt = 0;
     299      100777 :         segment *s = segs->h, *l = NULL;
     300             : 
     301      485958 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     302      385181 :                 if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
     303             :                                 l = s;
     304             :         }
     305      100777 :         if (l)
     306      100770 :                 cnt = l->end;
     307      100777 :         return cnt;
     308             : }
     309             : 
     310             : static int
     311      100777 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
     312             : {
     313             :         /* set bits correctly */
     314      100777 :         BAT *b = temp_descriptor(cs->bid);
     315             : 
     316      100777 :         if (!b)
     317             :                 return LOG_ERR;
     318      100777 :         segment *s = segs->h;
     319             : 
     320      100777 :         size_t nr = segs_end_include_deleted(segs, tr);
     321      100777 :         size_t rounded_nr = ((nr+31)&~31);
     322      100777 :         if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
     323           0 :                 bat_destroy(b);
     324           0 :                 return LOG_ERR;
     325             :         }
     326             : 
     327             :         /* disable all properties here */
     328      100777 :         MT_lock_set(&b->theaplock);
     329      100777 :         b->tsorted = false;
     330      100777 :         b->trevsorted = false;
     331      100777 :         b->tnosorted = 0;
     332      100777 :         b->tnorevsorted = 0;
     333      100777 :         b->tseqbase = oid_nil;
     334      100777 :         b->tkey = false;
     335      100777 :         b->tnokey[0] = 0;
     336      100777 :         b->tnokey[1] = 0;
     337      100777 :         b->theap->dirty = true;
     338      100777 :         BUN cnt = BATcount(b);
     339      100777 :         MT_lock_unset(&b->theaplock);
     340             : 
     341      100777 :         uint32_t *restrict dst;
     342             :         /* why hashlock ?? */
     343      100777 :         MT_rwlock_wrlock(&b->thashlock);
     344      519837 :         for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
     345      361660 :                 if (s->start >= nr)
     346             :                         break;
     347      318283 :                 if (s->ts == tr->tid && s->end != s->start) {
     348      138404 :                         if (cnt < s->start) { /* first mark as deleted ! */
     349        3716 :                                 size_t lnr = s->start-cnt;
     350        3716 :                                 size_t pos = cnt;
     351        3716 :                                 dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     352        3716 :                                 uint32_t cur = 0;
     353             : 
     354        3716 :                                 size_t used = pos&31, end = 32;
     355        3716 :                                 if (used) {
     356        3594 :                                         if (lnr < (32-used))
     357        3389 :                                                 end = used + lnr;
     358        3594 :                                         assert(end > used);
     359        3594 :                                         cur |= ((1U << (end - used)) - 1) << used;
     360        3594 :                                         lnr -= end - used;
     361        3594 :                                         *dst++ |= cur;
     362        3594 :                                         cur = 0;
     363             :                                 }
     364        3716 :                                 size_t full = lnr/32;
     365        3716 :                                 size_t rest = lnr%32;
     366        3716 :                                 if (full > 0) {
     367           8 :                                         memset(dst, ~0, full * sizeof(*dst));
     368           8 :                                         dst += full;
     369           8 :                                         lnr -= full * 32;
     370             :                                 }
     371        3716 :                                 if (rest > 0) {
     372         200 :                                         cur |= (1U << rest) - 1;
     373         200 :                                         lnr -= rest;
     374         200 :                                         *dst |= cur;
     375             :                                 }
     376        3716 :                                 assert(lnr==0);
     377             :                         }
     378      138404 :                         size_t lnr = s->end-s->start;
     379      138404 :                         size_t pos = s->start;
     380      138404 :                         dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     381      138404 :                         uint32_t cur = 0;
     382      138404 :                         size_t used = pos&31, end = 32;
     383      138404 :                         if (used) {
     384      100444 :                                 if (lnr < (32-used))
     385       94985 :                                         end = used + lnr;
     386      100444 :                                 assert(end > used);
     387      100444 :                                 cur |= ((1U << (end - used)) - 1) << used;
     388      100444 :                                 lnr -= end - used;
     389      100444 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     390      100444 :                                 dst++;
     391      100444 :                                 cur = 0;
     392             :                         }
     393      138404 :                         size_t full = lnr/32;
     394      138404 :                         size_t rest = lnr%32;
     395      138404 :                         if (full > 0) {
     396        3710 :                                 memset(dst, s->deleted?~0:0, full * sizeof(*dst));
     397        3710 :                                 dst += full;
     398        3710 :                                 lnr -= full * 32;
     399             :                         }
     400      138404 :                         if (rest > 0) {
     401       40152 :                                 cur |= (1U << rest) - 1;
     402       40152 :                                 lnr -= rest;
     403       40152 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     404             :                         }
     405      138404 :                         assert(lnr==0);
     406      138404 :                         if (cnt < s->end)
     407      318283 :                                 cnt = s->end;
     408             :                 }
     409             :         }
     410      100777 :         MT_rwlock_wrunlock(&b->thashlock);
     411      100777 :         if (nr > BATcount(b)) {
     412       60055 :                 MT_lock_set(&b->theaplock);
     413       60055 :                 BATsetcount(b, nr);
     414       60055 :                 MT_lock_unset(&b->theaplock);
     415             :         }
     416             : 
     417      100777 :         bat_destroy(b);
     418      100777 :         return LOG_OK;
     419             : }
     420             : 
     421             : /* TODO return LOG_OK/ERR */
     422             : static void
     423      100803 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
     424             : {
     425      100803 :         sqlstore* store = tr->store;
     426      100803 :         segment *cur = s->segs->h, *seg = NULL;
     427      486056 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     428      385253 :                 if (cur->ts == tr->tid) {
     429      152327 :                         if (!cur->deleted)
     430       89366 :                                 cur->oldts = 0;
     431      152327 :                         cur->ts = commit_ts;
     432             :                 }
     433      385253 :                 if (!seg) {
     434             :                         /* first segment */
     435             :                         seg = cur;
     436             :                 }
     437      284450 :                 else if (seg->ts < TRANSACTION_ID_BASE) {
     438             :                         /* possible merge since both deleted flags are equal */
     439      257964 :                         if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
     440      194715 :                                 int merge = 1;
     441      194715 :                                 node *n = store->active->h;
     442      628735 :                                 for (int i = 0; i < store->active->cnt; i++, n = n->next) {
     443      536753 :                                         sql_trans* other = ((sql_trans*)n->data);
     444      536753 :                                         ulng active = other->ts;
     445      536753 :                                         if(other->active == 2)
     446       19129 :                                                 continue; /* pretend that another recently committed transaction is no longer active */
     447      517624 :                                         if (active == tr->ts)
     448      124622 :                                                 continue; /* pretend that committing transaction has already committed and is no longer active */
     449      393002 :                                         if (seg->ts < active && cur->ts < active)
     450             :                                                 break;
     451      380645 :                                         if (seg->ts > active && cur->ts > active)
     452      290269 :                                                 continue;
     453             : 
     454       90376 :                                         assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
     455             :                                         /* cannot safely merge since there is an active transaction between the segments */
     456             :                                         merge = false;
     457             :                                         break;
     458             :                                 }
     459             :                                 /* merge segments */
     460      208678 :                                 if (merge) {
     461      104339 :                                         seg->end = cur->end;
     462      104339 :                                         ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     463      104339 :                                         if (cur == s->segs->t)
     464       25933 :                                                 s->segs->t = seg;
     465      104339 :                                         if (commit_ts == oldest) {
     466       88085 :                                                 ATOMIC_PTR_DESTROY(&cur->next);
     467       88085 :                                                 _DELETE(cur);
     468             :                                         } else
     469       32508 :                                                 mark4destroy(cur, change, commit_ts);
     470      104339 :                                         cur = seg;
     471      104339 :                                         continue;
     472             :                                 }
     473             :                         }
     474             :                 }
     475             :                 seg = cur;
     476             :         }
     477      100803 : }
     478             : 
     479             : static int
     480     2203384 : segments_in_transaction(sql_trans *tr, sql_table *t)
     481             : {
     482     2203384 :         storage *s = ATOMIC_PTR_GET(&t->data);
     483     2203384 :         segment *seg = s->segs->h;
     484             : 
     485     2203384 :         if (seg && s->segs->t->ts == tr->tid)
     486             :                 return 1;
     487      616316 :         for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
     488      513801 :                 if (seg->ts == tr->tid)
     489             :                         return 1;
     490             :         }
     491             :         return 0;
     492             : }
     493             : 
     494             : static size_t
     495    13284766 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
     496             : {
     497    13284766 :         size_t cnt = 0;
     498             : 
     499    13284766 :         lock_table(tr->store, table->base.id);
     500    13286725 :         segment *s = segs->h, *l = NULL;
     501             : 
     502    13286725 :         if (segs->t && SEG_IS_VALID(segs->t, tr))
     503    11374285 :                 l = s = segs->t;
     504             : 
     505   201051840 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     506   187765128 :                 if (SEG_IS_VALID(s, tr))
     507             :                                 l = s;
     508             :         }
     509    13286712 :         if (l)
     510    13273387 :                 cnt = l->end;
     511    13286712 :         unlock_table(tr->store, table->base.id);
     512    13286682 :         return cnt;
     513             : }
     514             : 
     515             : static segments *
     516       50918 : new_segments(sql_trans *tr, size_t cnt)
     517             : {
     518       50918 :         segments *n = (segments*)GDKmalloc(sizeof(segments));
     519             : 
     520       50918 :         if (n) {
     521       50918 :                 n->h = n->t = new_segment(NULL, tr, cnt);
     522       50918 :                 if (!n->h) {
     523           0 :                         GDKfree(n);
     524           0 :                         return NULL;
     525             :                 }
     526       50918 :                 sql_ref_init(&n->r);
     527             :         }
     528             :         return n;
     529             : }
     530             : 
     531             : static sql_delta *
     532    22971827 : timestamp_delta( sql_trans *tr, sql_delta *d)
     533             : {
     534    23003683 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     535       31856 :                 d = d->next;
     536    22971747 :         return d;
     537             : }
     538             : 
     539             : static sql_delta *
     540    22817531 : col_timestamp_delta( sql_trans *tr, sql_column *c)
     541             : {
     542    22817531 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
     543             : }
     544             : 
     545             : static sql_delta *
     546       21767 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
     547             : {
     548       21767 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
     549             : }
     550             : 
     551             : static storage *
     552    14276265 : timestamp_storage( sql_trans *tr, storage *d)
     553             : {
     554    14276265 :         if (!d)
     555             :                 return NULL;
     556    14324392 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     557       48127 :                 d = d->next;
     558             :         return d;
     559             : }
     560             : 
     561             : static storage *
     562    14251023 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
     563             : {
     564    14251023 :         return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
     565             : }
     566             : 
     567             : static sql_delta*
     568       19331 : delta_dup(sql_delta *d)
     569             : {
     570       19331 :         ATOMIC_INC(&d->cs.refcnt);
     571       19331 :         return d;
     572             : }
     573             : 
     574             : static void *
     575       17937 : col_dup(sql_column *c)
     576             : {
     577       17937 :         return delta_dup(ATOMIC_PTR_GET(&c->data));
     578             : }
     579             : 
     580             : static void *
     581        2681 : idx_dup(sql_idx *i)
     582             : {
     583        2681 :         if (!ATOMIC_PTR_GET(&i->data))
     584             :                 return NULL;
     585        1394 :         return delta_dup(ATOMIC_PTR_GET(&i->data));
     586             : }
     587             : 
     588             : static storage*
     589        1527 : storage_dup(storage *d)
     590             : {
     591        1527 :         ATOMIC_INC(&d->cs.refcnt);
     592        1527 :         return d;
     593             : }
     594             : 
     595             : static void *
     596        1527 : del_dup(sql_table *t)
     597             : {
     598        1527 :         return storage_dup(ATOMIC_PTR_GET(&t->data));
     599             : }
     600             : 
     601             : static size_t
     602          17 : count_inserts( segment *s, sql_trans *tr)
     603             : {
     604          17 :         size_t cnt = 0;
     605             : 
     606          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     607          55 :                 if (!s->deleted && s->ts == tr->tid)
     608           4 :                         cnt += s->end - s->start;
     609             :         }
     610          17 :         return cnt;
     611             : }
     612             : 
     613             : static size_t
     614      293457 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
     615             : {
     616      293457 :         size_t cnt = 0;
     617             : 
     618      315953 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
     619             :                 ;
     620             : 
     621      773622 :         for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
     622      480165 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
     623       40813 :                         cnt += s->end - s->start;
     624             :         }
     625      293457 :         return cnt;
     626             : }
     627             : 
     628             : static size_t
     629          17 : count_deletes( segment *s, sql_trans *tr)
     630             : {
     631          17 :         size_t cnt = 0;
     632             : 
     633          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     634          55 :                 if (SEG_IS_DELETED(s, tr))
     635          17 :                         cnt += s->end - s->start;
     636             :         }
     637          17 :         return cnt;
     638             : }
     639             : 
     640             : #define CNT_ACTIVE 10
     641             : 
     642             : static size_t
     643    12794200 : count_col(sql_trans *tr, sql_column *c, int access)
     644             : {
     645    12794200 :         storage *d;
     646    12794200 :         sql_delta *ds;
     647             : 
     648    12794200 :         if (!isTable(c->t))
     649             :                 return 0;
     650    12794200 :         d = tab_timestamp_storage(tr, c->t);
     651    12792869 :         ds = col_timestamp_delta(tr, c);
     652    12792798 :         if (!d ||!ds)
     653             :                 return 0;
     654    12792798 :         if (access == 2)
     655      448308 :                 return ds?ds->cs.ucnt:0;
     656    12344490 :         if (access == 1)
     657          17 :                 return count_inserts(d->segs->h, tr);
     658    12344473 :         if (access == QUICK)
     659      524474 :                 return d->segs->t?d->segs->t->end:0;
     660    11819999 :         if (access == CNT_ACTIVE) {
     661      293451 :                 size_t cnt = segs_end(d->segs, tr, c->t);
     662      293454 :                 lock_table(tr->store, c->t->base.id);
     663      293457 :                 cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
     664      293456 :                 unlock_table(tr->store, c->t->base.id);
     665      293456 :                 return cnt;
     666             :         }
     667    11526548 :         return segs_end(d->segs, tr, c->t);
     668             : }
     669             : 
     670             : static size_t
     671       18958 : count_idx(sql_trans *tr, sql_idx *i, int access)
     672             : {
     673       18958 :         storage *d;
     674       18958 :         sql_delta *ds;
     675             : 
     676       18958 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
     677        4751 :                 return 0;
     678       14207 :         d = tab_timestamp_storage(tr, i->t);
     679       14203 :         ds = idx_timestamp_delta(tr, i);
     680       14203 :         if (!d || !ds)
     681             :                 return 0;
     682       14203 :         if (access == 2)
     683        2838 :                 return ds?ds->cs.ucnt:0;
     684       11365 :         if (access == 1)
     685           0 :                 return count_inserts(d->segs->h, tr);
     686       11365 :         if (access == QUICK)
     687        3528 :                 return d->segs->t?d->segs->t->end:0;
     688        7837 :         return segs_end(d->segs, tr, i->t);
     689             : }
     690             : 
     691             : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
     692             : static BAT *
     693    12891274 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
     694             : {
     695    12891274 :         BAT *b;
     696             : 
     697    12891274 :         assert(access == RD_UPD_ID || access == RD_UPD_VAL);
     698             :         /* returns the updates for cs */
     699    12891274 :         if (cs->uibid && cs->uvbid && cs->ucnt) {
     700        4190 :                 if (access == RD_UPD_ID) {
     701        2793 :                         if (!(b = temp_descriptor(cs->uibid)))
     702             :                                 return NULL;
     703        2793 :                         if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
     704         111 :                            (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
     705        2682 :                                         oid nil = oid_nil;
     706             :                                         /* less then cnt */
     707        2682 :                                         BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false);
     708        2682 :                                         if (!s) {
     709           0 :                                                 bat_destroy(b);
     710           0 :                                                 return NULL;
     711             :                                         }
     712             : 
     713        2682 :                                         BAT *nb = BATproject(s, b);
     714        2682 :                                         bat_destroy(s);
     715        2682 :                                         bat_destroy(b);
     716        2682 :                                         b = nb;
     717             :                         }
     718             :                 } else {
     719        1397 :                         b = temp_descriptor(cs->uvbid);
     720             :                 }
     721             :         } else {
     722    21478529 :                 b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
     723             :         }
     724             :         return b;
     725             : }
     726             : 
     727             : static BAT *
     728           0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
     729             : {
     730           0 :         int err = 0;
     731           0 :         BAT *uv = *UV;
     732           0 :         BUN cnt = BATcount(ui)+BATcount(oi);
     733           0 :         BATiter uvi;
     734           0 :         BATiter ovi;
     735             : 
     736           0 :         if (uv) {
     737           0 :                 uvi = bat_iterator(uv);
     738           0 :                 ovi = bat_iterator(ov);
     739             :         }
     740             : 
     741             :         /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
     742           0 :         BUN uip = 0, uie = BATcount(ui);
     743           0 :         BUN oip = 0, oie = BATcount(oi);
     744             : 
     745           0 :         oid uiseqb = ui->tseqbase;
     746           0 :         oid oiseqb = oi->tseqbase;
     747           0 :         oid *uipt = NULL, *oipt = NULL;
     748           0 :         BATiter uii = bat_iterator(ui);
     749           0 :         BATiter oii = bat_iterator(oi);
     750           0 :         if (!BATtdensebi(&uii))
     751           0 :                 uipt = uii.base;
     752           0 :         if (!BATtdensebi(&oii))
     753           0 :                 oipt = oii.base;
     754             : 
     755           0 :         if (uiseqb == oiseqb && uie == oie) { /* full overlap, no values */
     756           0 :                 if (uv) {
     757           0 :                         bat_iterator_end(&uvi);
     758           0 :                         bat_iterator_end(&ovi);
     759             :                 }
     760           0 :                 bat_iterator_end(&uii);
     761           0 :                 bat_iterator_end(&oii);
     762           0 :                 if (uv) {
     763           0 :                         *UV = uv;
     764             :                 } else {
     765           0 :                         bat_destroy(uv);
     766             :                 }
     767           0 :                 bat_destroy(oi);
     768           0 :                 bat_destroy(ov);
     769           0 :                 return ui;
     770             :         }
     771           0 :         BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
     772           0 :         BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
     773             : 
     774           0 :         if (!ni || (uv && !nv)) {
     775           0 :                 bat_destroy(ni);
     776           0 :                 bat_destroy(nv);
     777           0 :                 bat_destroy(ui);
     778           0 :                 bat_destroy(uv);
     779           0 :                 bat_destroy(oi);
     780           0 :                 bat_destroy(ov);
     781           0 :                 return NULL;
     782             :         }
     783           0 :         while (uip < uie && oip < oie && !err) {
     784           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     785           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     786             : 
     787           0 :                 if (uiid <= oiid) {
     788           0 :                         if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     789           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     790             :                                 err = 1;
     791           0 :                         uip++;
     792           0 :                         if (uiid == oiid)
     793           0 :                                 oip++;
     794             :                 } else { /* uiid > oiid */
     795           0 :                         if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     796           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     797             :                                 err = 1;
     798           0 :                         oip++;
     799             :                 }
     800             :         }
     801           0 :         while (uip < uie && !err) {
     802           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     803           0 :                 if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     804           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     805             :                         err = 1;
     806           0 :                 uip++;
     807             :         }
     808           0 :         while (oip < oie && !err) {
     809           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     810           0 :                 if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     811           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     812             :                         err = 1;
     813           0 :                 oip++;
     814             :         }
     815           0 :         if (uv) {
     816           0 :                 bat_iterator_end(&uvi);
     817           0 :                 bat_iterator_end(&ovi);
     818             :         }
     819           0 :         bat_iterator_end(&uii);
     820           0 :         bat_iterator_end(&oii);
     821           0 :         bat_destroy(ui);
     822           0 :         bat_destroy(uv);
     823           0 :         bat_destroy(oi);
     824           0 :         bat_destroy(ov);
     825           0 :         if (!err) {
     826           0 :                 if (nv)
     827           0 :                         *UV = nv;
     828           0 :                 return ni;
     829             :         }
     830           0 :         *UV = NULL;
     831           0 :         bat_destroy(ni);
     832           0 :         bat_destroy(nv);
     833           0 :         return NULL;
     834             : }
     835             : 
     836             : static sql_delta *
     837     8594055 : older_delta( sql_delta *d, sql_trans *tr)
     838             : {
     839     8594055 :         sql_delta *o = d->next;
     840             : 
     841     8596976 :         while (o && !o->cs.merged) {
     842        2917 :                 if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     843             :                         break;
     844             :                 else
     845        2921 :                         o = o->next;
     846             :         }
     847     8594059 :         if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     848           0 :                 return o;
     849             :         return NULL;
     850             : }
     851             : 
     852             : static BAT *
     853     8594254 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
     854             : {
     855     8594254 :         assert(tr->active);
     856     8594254 :         sql_delta *o = NULL;
     857     8594254 :         BAT *ui = NULL, *uv = NULL;
     858             : 
     859     8594254 :         if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
     860             :                 return NULL;
     861     8593991 :         if (access == RD_UPD_VAL) {
     862     4297246 :                 if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
     863           0 :                         bat_destroy(ui);
     864           0 :                         return NULL;
     865             :                 }
     866             :         }
     867     8594063 :         while ((o = older_delta(d, tr)) != NULL) {
     868           0 :                 BAT *oui = NULL, *ouv = NULL;
     869           0 :                 if (!oui)
     870           0 :                         oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
     871           0 :                 if (access == RD_UPD_VAL)
     872           0 :                         ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
     873           0 :                 if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
     874           0 :                         bat_destroy(ui);
     875           0 :                         bat_destroy(uv);
     876           0 :                         bat_destroy(oui);
     877           0 :                         bat_destroy(ouv);
     878           0 :                         return NULL;
     879             :                 }
     880           0 :                 if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
     881             :                         return NULL;
     882             :                 d = o;
     883             :         }
     884     8594062 :         if (uv) {
     885     4297318 :                 bat_destroy(ui);
     886     4297318 :                 return uv;
     887             :         }
     888             :         return ui;
     889             : }
     890             : 
     891             : static BAT *
     892           3 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
     893             : {
     894           3 :         lock_column(tr->store, c->base.id);
     895           3 :         sql_delta *d = col_timestamp_delta(tr, c);
     896           3 :         int type = c->type.type->localtype;
     897             : 
     898           3 :         if (!d) {
     899           0 :                 unlock_column(tr->store, c->base.id);
     900           0 :                 return NULL;
     901             :         }
     902           3 :         if (d->cs.st == ST_DICT) {
     903           0 :                 BAT *b = quick_descriptor(d->cs.bid);
     904             : 
     905           0 :                 type = b->ttype;
     906             :         }
     907           3 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     908           3 :         unlock_column(tr->store, c->base.id);
     909           3 :         return bn;
     910             : }
     911             : 
     912             : static BAT *
     913           0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
     914             : {
     915           0 :         lock_column(tr->store, i->base.id);
     916           0 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     917           0 :         sql_delta *d = idx_timestamp_delta(tr, i);
     918             : 
     919           0 :         if (!d) {
     920           0 :                 unlock_column(tr->store, i->base.id);
     921           0 :                 return NULL;
     922             :         }
     923           0 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     924           0 :         unlock_column(tr->store, i->base.id);
     925           0 :         return bn;
     926             : }
     927             : 
     928             : static BAT *
     929     5729888 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
     930             : {
     931     5729888 :         BAT *b;
     932             : 
     933     5729888 :         assert(access == RDONLY || access == QUICK || access == RD_EXT);
     934     5729888 :         assert(cs != NULL);
     935     5729888 :         if (access == QUICK)
     936      134176 :                 return quick_descriptor(cs->bid);
     937     5595712 :         if (access == RD_EXT)
     938         341 :                 return temp_descriptor(cs->ebid);
     939     5595371 :         assert(cs->bid);
     940     5595371 :         b = temp_descriptor(cs->bid);
     941     5595162 :         if (b == NULL)
     942             :                 return NULL;
     943     5595162 :         assert(b->batRestricted == BAT_READ);
     944             :         /* return slice */
     945     5595162 :         BAT *s = BATslice(b, 0, cnt);
     946     5595341 :         bat_destroy(b);
     947     5595341 :         return s;
     948             : }
     949             : 
     950             : static int
     951     4297116 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
     952             : {
     953     4297116 :         lock_column(tr->store, c->base.id);
     954     4297006 :         size_t cnt = count_col(tr, c, 0);
     955     4297134 :         sql_delta *d = col_timestamp_delta(tr, c);
     956     4297094 :         int type = c->type.type->localtype;
     957             : 
     958     4297094 :         if (!d) {
     959           0 :                 unlock_column(tr->store, c->base.id);
     960           0 :                 return LOG_ERR;
     961             :         }
     962     4297094 :         if (d->cs.st == ST_DICT) {
     963           2 :                 BAT *b = quick_descriptor(d->cs.bid);
     964             : 
     965           2 :                 type = b->ttype;
     966             :         }
     967             : 
     968     4297094 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     969     4297273 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     970             : 
     971     4297262 :         unlock_column(tr->store, c->base.id);
     972             : 
     973     4297241 :         if (*ui == NULL || *uv == NULL) {
     974           0 :                 bat_destroy(*ui);
     975           0 :                 bat_destroy(*uv);
     976           0 :                 return LOG_ERR;
     977             :         }
     978             :         return LOG_OK;
     979             : }
     980             : 
     981             : static int
     982          57 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
     983             : {
     984          57 :         lock_column(tr->store, i->base.id);
     985          57 :         size_t cnt = count_idx(tr, i, 0);
     986          57 :         sql_delta *d = idx_timestamp_delta(tr, i);
     987          57 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     988             : 
     989          57 :         if (!d) {
     990           0 :                 unlock_column(tr->store, i->base.id);
     991           0 :                 return LOG_ERR;
     992             :         }
     993             : 
     994          57 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     995          57 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     996             : 
     997          57 :         unlock_column(tr->store, i->base.id);
     998             : 
     999          57 :         if (*ui == NULL || *uv == NULL) {
    1000           0 :                 bat_destroy(*ui);
    1001           0 :                 bat_destroy(*uv);
    1002           0 :                 return LOG_ERR;
    1003             :         }
    1004             :         return LOG_OK;
    1005             : }
    1006             : 
    1007             : static void *                                   /* BAT * */
    1008     5722550 : bind_col(sql_trans *tr, sql_column *c, int access)
    1009             : {
    1010     5722550 :         assert(access == QUICK || tr->active);
    1011     5722550 :         if (!isTable(c->t))
    1012             :                 return NULL;
    1013     5722550 :         sql_delta *d = col_timestamp_delta(tr, c);
    1014     5722422 :         if (!d)
    1015             :                 return NULL;
    1016     5722422 :         size_t cnt = count_col(tr, c, 0);
    1017     5722226 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
    1018           3 :                 return bind_ucol(tr, c, access, cnt);
    1019     5722223 :         BAT *b = cs_bind_bat( &d->cs, access, cnt);
    1020     5722100 :         assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
    1021             :         return b;
    1022             : }
    1023             : 
    1024             : static void *                                   /* BAT * */
    1025        7508 : bind_idx(sql_trans *tr, sql_idx * i, int access)
    1026             : {
    1027        7508 :         assert(access == QUICK || tr->active);
    1028        7508 :         if (!isTable(i->t))
    1029             :                 return NULL;
    1030        7508 :         sql_delta *d = idx_timestamp_delta(tr, i);
    1031        7510 :         if (!d)
    1032             :                 return NULL;
    1033        7510 :         size_t cnt = count_idx(tr, i, 0);
    1034        7508 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
    1035           0 :                 return bind_uidx(tr, i, access, cnt);
    1036        7508 :         return cs_bind_bat( &d->cs, access, cnt);
    1037             : }
    1038             : 
    1039             : static int
    1040         446 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
    1041             : {
    1042         446 :         if (!cs->uibid) {
    1043           0 :                 cs->uibid = e_bat(TYPE_oid);
    1044           0 :                 if (cs->uibid == BID_NIL)
    1045             :                         return LOG_ERR;
    1046             :         }
    1047         446 :         if (!cs->uvbid) {
    1048           0 :                 BAT *cur = quick_descriptor(cs->bid);
    1049           0 :                 if (!cur)
    1050             :                         return LOG_ERR;
    1051           0 :                 int type = cur->ttype;
    1052           0 :                 cs->uvbid = e_bat(type);
    1053           0 :                 if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1054             :                         return LOG_ERR;
    1055             :         }
    1056         446 :         BAT *ui = temp_descriptor(cs->uibid);
    1057         446 :         BAT *uv = temp_descriptor(cs->uvbid);
    1058             : 
    1059         446 :         if (ui == NULL || uv == NULL) {
    1060           0 :                 bat_destroy(ui);
    1061           0 :                 bat_destroy(uv);
    1062           0 :                 return LOG_ERR;
    1063             :         }
    1064         446 :         assert(ui && uv);
    1065         446 :         if (isEbat(ui)){
    1066         394 :                 temp_destroy(cs->uibid);
    1067         394 :                 cs->uibid = temp_copy(ui->batCacheid, true, true);
    1068         394 :                 bat_destroy(ui);
    1069         394 :                 if (cs->uibid == BID_NIL ||
    1070         394 :                     (ui = temp_descriptor(cs->uibid)) == NULL) {
    1071           0 :                         bat_destroy(uv);
    1072           0 :                         return LOG_ERR;
    1073             :                 }
    1074             :         }
    1075         446 :         if (isEbat(uv)){
    1076         394 :                 temp_destroy(cs->uvbid);
    1077         394 :                 cs->uvbid = temp_copy(uv->batCacheid, true, true);
    1078         394 :                 bat_destroy(uv);
    1079         394 :                 if (cs->uvbid == BID_NIL ||
    1080         394 :                     (uv = temp_descriptor(cs->uvbid)) == NULL) {
    1081           0 :                         bat_destroy(ui);
    1082           0 :                         return LOG_ERR;
    1083             :                 }
    1084             :         }
    1085         446 :         *Ui = ui;
    1086         446 :         *Uv = uv;
    1087         446 :         return LOG_OK;
    1088             : }
    1089             : 
    1090             : static int
    1091        3018 : segments_is_append(segment *s, sql_trans *tr, oid rid)
    1092             : {
    1093        5845 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1094        5845 :                 if (s->start <= rid && s->end > rid) {
    1095        3018 :                         if (s->ts == tr->tid && !s->deleted) {
    1096        2574 :                                 return 1;
    1097             :                         }
    1098             :                         break;
    1099             :                 }
    1100             :         }
    1101             :         return 0;
    1102             : }
    1103             : 
    1104             : static int
    1105         444 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
    1106             : {
    1107         647 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1108         647 :                 if (s->start <= rid && s->end > rid) {
    1109         444 :                         if (s->ts >= tr->ts && s->deleted) {
    1110           0 :                                 return 1;
    1111             :                         }
    1112             :                         break;
    1113             :                 }
    1114             :         }
    1115             :         return 0;
    1116             : }
    1117             : 
    1118             : static sql_delta *
    1119           0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
    1120             : {
    1121           0 :         sql_delta *n = ZNEW(sql_delta);
    1122           0 :         if (!n)
    1123             :                 return NULL;
    1124           0 :         *n = *bat;
    1125           0 :         n->next = NULL;
    1126           0 :         n->cs.ts = tr->tid;
    1127           0 :         return n;
    1128             : }
    1129             : 
    1130             : static BAT *
    1131          17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
    1132             : {
    1133          17 :         BAT *newoffsets = NULL;
    1134          17 :         sql_delta *bat = *batp;
    1135          17 :         column_storage *cs = &bat->cs;
    1136          17 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1137             : 
    1138          17 :         if (!u)
    1139             :                 return NULL;
    1140          17 :         BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
    1141          17 :         if (DICTprepare4append(&newoffsets, i, u) < 0) {
    1142           0 :                 assert(0);
    1143             :         } else {
    1144          17 :                 int new = 0;
    1145             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1146          17 :                 if (BATcount(u) >= max_cnt) {
    1147           1 :                         if (max_cnt == 64*1024) { /* decompress */
    1148           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1149           0 :                                         bat_destroy(u);
    1150           0 :                                         return NULL;
    1151             :                                 }
    1152           0 :                                 if (cs->ucnt) {
    1153           0 :                                         BAT *ui = NULL, *uv = NULL;
    1154           0 :                                         BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1155           0 :                                         bat_destroy(b);
    1156           0 :                                         if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1157           0 :                                                 bat_destroy(nb);
    1158           0 :                                                 bat_destroy(u);
    1159           0 :                                                 return NULL;
    1160             :                                         }
    1161           0 :                                         b = nb;
    1162           0 :                                         if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1163           0 :                                                 bat_destroy(ui);
    1164           0 :                                                 bat_destroy(uv);
    1165           0 :                                                 bat_destroy(b);
    1166           0 :                                                 bat_destroy(u);
    1167             :                                         }
    1168           0 :                                         bat_destroy(ui);
    1169           0 :                                         bat_destroy(uv);
    1170             :                                 }
    1171           0 :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1172           0 :                                 bat_destroy(b);
    1173           0 :                                 assert(newoffsets == NULL);
    1174           0 :                                 if (!n) {
    1175           0 :                                         bat_destroy(u);
    1176           0 :                                         return NULL;
    1177             :                                 }
    1178           0 :                                 if (cs->ts != tr->tid) {
    1179           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1180           0 :                                                 bat_destroy(n);
    1181           0 :                                                 return NULL;
    1182             :                                         }
    1183           0 :                                         cs = &(*batp)->cs;
    1184           0 :                                         new = 1;
    1185             :                                 }
    1186           0 :                                 if (cs->bid && !new)
    1187           0 :                                         temp_destroy(cs->bid);
    1188           0 :                                 n = transfer_to_systrans(n);
    1189           0 :                                 if (n == NULL)
    1190             :                                         return NULL;
    1191           0 :                                 bat_set_access(n, BAT_READ);
    1192           0 :                                 cs->bid = temp_create(n);
    1193           0 :                                 bat_destroy(n);
    1194           0 :                                 if (cs->ebid && !new)
    1195           0 :                                         temp_destroy(cs->ebid);
    1196           0 :                                 cs->ebid = 0;
    1197           0 :                                 cs->ucnt = 0;
    1198           0 :                                 if (cs->uibid && !new)
    1199           0 :                                         temp_destroy(cs->uibid);
    1200           0 :                                 if (cs->uvbid && !new)
    1201           0 :                                         temp_destroy(cs->uvbid);
    1202           0 :                                 cs->uibid = cs->uvbid = 0;
    1203           0 :                                 cs->st = ST_DEFAULT;
    1204           0 :                                 cs->cleared = true;
    1205             :                         } else {
    1206           1 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1207           0 :                                         bat_destroy(newoffsets);
    1208           0 :                                         bat_destroy(u);
    1209           0 :                                         return NULL;
    1210             :                                 }
    1211           1 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), PERSISTENT);
    1212           1 :                                 bat_destroy(b);
    1213           1 :                                 if (!n) {
    1214           0 :                                         bat_destroy(newoffsets);
    1215           0 :                                         bat_destroy(u);
    1216           0 :                                         return NULL;
    1217             :                                 }
    1218           1 :                                 if (cs->ts != tr->tid) {
    1219           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1220           0 :                                                 bat_destroy(n);
    1221           0 :                                                 return NULL;
    1222             :                                         }
    1223           0 :                                         cs = &(*batp)->cs;
    1224           0 :                                         new = 1;
    1225           0 :                                         temp_dup(cs->ebid);
    1226           0 :                                         if (cs->uibid) {
    1227           0 :                                                 temp_dup(cs->uibid);
    1228           0 :                                                 temp_dup(cs->uvbid);
    1229             :                                         }
    1230             :                                 }
    1231           1 :                                 if (cs->bid && !new)
    1232           1 :                                         temp_destroy(cs->bid);
    1233           1 :                                 n = transfer_to_systrans(n);
    1234           1 :                                 if (n == NULL)
    1235             :                                         return NULL;
    1236           1 :                                 bat_set_access(n, BAT_READ);
    1237           1 :                                 cs->bid = temp_create(n);
    1238           1 :                                 bat_destroy(n);
    1239           1 :                                 cs->cleared = true;
    1240           1 :                                 i = newoffsets;
    1241             :                         }
    1242             :                 } else { /* append */
    1243          16 :                         i = newoffsets;
    1244             :                 }
    1245             :         }
    1246          17 :         bat_destroy(u);
    1247          17 :         return i;
    1248             : }
    1249             : 
    1250             : static BAT *
    1251           0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
    1252             : {
    1253           0 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1254           0 :         BAT *newoffsets = NULL;
    1255           0 :         BAT *b = NULL, *n = NULL;
    1256             : 
    1257           0 :         if (!(b = temp_descriptor(cs->bid)))
    1258             :                 return NULL;
    1259             : 
    1260           0 :         if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
    1261           0 :                 assert(0);
    1262             :         } else {
    1263             :                 /* returns new offset bat if values within min/max, else decompress */
    1264           0 :                 if (!newoffsets) { /* decompress */
    1265           0 :                         if (cs->ucnt) {
    1266           0 :                                 BAT *ui = NULL, *uv = NULL;
    1267           0 :                                 BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1268           0 :                                 bat_destroy(b);
    1269           0 :                                 if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1270           0 :                                         bat_destroy(nb);
    1271           0 :                                         return NULL;
    1272             :                                 }
    1273           0 :                                 b = nb;
    1274           0 :                                 if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1275           0 :                                         bat_destroy(ui);
    1276           0 :                                         bat_destroy(uv);
    1277           0 :                                         bat_destroy(b);
    1278             :                                 }
    1279           0 :                                 bat_destroy(ui);
    1280           0 :                                 bat_destroy(uv);
    1281             :                         }
    1282           0 :                         n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
    1283           0 :                         bat_destroy(b);
    1284           0 :                         if (!n)
    1285             :                                 return NULL;
    1286           0 :                         if (cs->bid)
    1287           0 :                                 temp_destroy(cs->bid);
    1288           0 :                         n = transfer_to_systrans(n);
    1289           0 :                         if (n == NULL)
    1290             :                                 return NULL;
    1291           0 :                         bat_set_access(n, BAT_READ);
    1292           0 :                         cs->bid = temp_create(n);
    1293           0 :                         cs->ucnt = 0;
    1294           0 :                         if (cs->uibid)
    1295           0 :                                 temp_destroy(cs->uibid);
    1296           0 :                         if (cs->uvbid)
    1297           0 :                                 temp_destroy(cs->uvbid);
    1298           0 :                         cs->uibid = cs->uvbid = 0;
    1299           0 :                         cs->st = ST_DEFAULT;
    1300           0 :                         cs->cleared = true;
    1301           0 :                         b = n;
    1302             :                 } else { /* append */
    1303             :                         i = newoffsets;
    1304             :                 }
    1305             :         }
    1306           0 :         bat_destroy(b);
    1307           0 :         return i;
    1308             : }
    1309             : 
    1310             : /*
    1311             :  * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
    1312             :  */
    1313             : static int
    1314        2829 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1315             : {
    1316        2829 :         int res = LOG_OK;
    1317        2829 :         sql_delta *bat = *batp;
    1318        2829 :         column_storage *cs = &bat->cs;
    1319        2829 :         BAT *otids = tids, *oupdates = updates;
    1320             : 
    1321        2829 :         if (!BATcount(tids))
    1322             :                 return LOG_OK;
    1323             : 
    1324        2829 :         if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
    1325           6 :                 tids = BATunmask(tids);
    1326           6 :                 if (!tids)
    1327             :                         return LOG_ERR;
    1328             :         }
    1329        2829 :         if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
    1330           0 :                 updates = BATunmask(updates);
    1331           0 :                 if (!updates) {
    1332           0 :                         if (otids != tids)
    1333           0 :                                 bat_destroy(tids);
    1334           0 :                         return LOG_ERR;
    1335             :                 }
    1336        2829 :         } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
    1337          43 :                 updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
    1338          43 :                 if (!updates) {
    1339           0 :                         if (otids != tids)
    1340           0 :                                 bat_destroy(tids);
    1341           0 :                         return LOG_ERR;
    1342             :                 }
    1343             :         }
    1344             : 
    1345        2829 :         if (cs->st == ST_DICT) {
    1346             :                 /* possibly a new array is returned */
    1347           4 :                 BAT *nupdates = dict_append_bat(tr, batp, updates);
    1348           4 :                 bat = *batp;
    1349           4 :                 cs = &bat->cs;
    1350           4 :                 if (oupdates != updates)
    1351           0 :                         bat_destroy(updates);
    1352           4 :                 updates = nupdates;
    1353           4 :                 if (!updates) {
    1354           0 :                         if (otids != tids)
    1355           0 :                                 bat_destroy(tids);
    1356           0 :                         return LOG_ERR;
    1357             :                 }
    1358             :         }
    1359             : 
    1360             :         /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1361             :         /* currently only one update delta is possible */
    1362        2829 :         lock_table(tr->store, t->base.id);
    1363        2829 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1364        2829 :         if (!is_new && !cs->cleared) {
    1365        2511 :                 if (!tids->tsorted /* make sure we have simple dense or oids */) {
    1366           6 :                         BAT *sorted, *order;
    1367           6 :                         if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
    1368           0 :                                 if (otids != tids)
    1369           0 :                                         bat_destroy(tids);
    1370           0 :                                 if (oupdates != updates)
    1371           0 :                                         bat_destroy(updates);
    1372           0 :                                 unlock_table(tr->store, t->base.id);
    1373           0 :                                 return LOG_ERR;
    1374             :                         }
    1375           6 :                         if (otids != tids)
    1376           0 :                                 bat_destroy(tids);
    1377           6 :                         tids = sorted;
    1378           6 :                         BAT *nupdates = BATproject(order, updates);
    1379           6 :                         bat_destroy(order);
    1380           6 :                         if (oupdates != updates)
    1381           0 :                                 bat_destroy(updates);
    1382           6 :                         updates = nupdates;
    1383           6 :                         if (!updates) {
    1384           0 :                                 bat_destroy(tids);
    1385           0 :                                 unlock_table(tr->store, t->base.id);
    1386           0 :                                 return LOG_ERR;
    1387             :                         }
    1388             :                 }
    1389        2511 :                 assert(tids->tsorted);
    1390        2511 :                 BAT *ui = NULL, *uv = NULL;
    1391             : 
    1392             :                 /* handle updates on just inserted bits */
    1393             :                 /* handle updates on updates (within one transaction) */
    1394        2511 :                 BATiter upi = bat_iterator(updates);
    1395        2511 :                 BUN cnt = 0, ucnt = BATcount(tids);
    1396        2511 :                 BAT *b, *ins = NULL;
    1397        2511 :                 int *msk = NULL;
    1398             : 
    1399        2511 :                 if((b = temp_descriptor(cs->bid)) == NULL)
    1400             :                         res = LOG_ERR;
    1401             : 
    1402        2511 :                 if (res == LOG_OK && BATtdense(tids)) {
    1403        2468 :                         oid start = tids->tseqbase, offset = start;
    1404        2468 :                         oid end = start + ucnt;
    1405             : 
    1406        7477 :                         for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
    1407        5617 :                                 if (seg->start <= start && seg->end > start) {
    1408             :                                         /* check for delete conflicts */
    1409        2468 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1410           0 :                                                 res = LOG_CONFLICT;
    1411           0 :                                                 continue;
    1412             :                                         }
    1413             : 
    1414             :                                         /* check for inplace updates */
    1415        2468 :                                         BUN lend = end < seg->end?end:seg->end;
    1416        2468 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1417         109 :                                                 if (!ins) {
    1418         109 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1419         109 :                                                         if (!ins)
    1420             :                                                                 res = LOG_ERR;
    1421             :                                                         else {
    1422         109 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1423         109 :                                                                 msk = (int*)Tloc(ins, 0);
    1424         109 :                                                                 BUN end = (ucnt+31)/32;
    1425         109 :                                                                 memset(msk, 0, end * sizeof(int));
    1426             :                                                         }
    1427             :                                                 }
    1428         364 :                                                 for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
    1429         255 :                                                         const void *upd = BUNtail(upi, rid-offset);
    1430         255 :                                                         if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1431           0 :                                                                 res = LOG_ERR;
    1432             : 
    1433         255 :                                                         oid word = i/32;
    1434         255 :                                                         int pos = i%32;
    1435         255 :                                                         msk[word] |= 1U<<pos;
    1436         255 :                                                         cnt++;
    1437             :                                                 }
    1438             :                                         }
    1439             :                                 }
    1440        5617 :                                 if (end < seg->end)
    1441             :                                         break;
    1442             :                         }
    1443          46 :                 } else if (res == LOG_OK && complex_cand(tids)) {
    1444           3 :                         struct canditer ci;
    1445           3 :                         segment *seg = s->segs->h;
    1446           3 :                         canditer_init(&ci, NULL, tids);
    1447           3 :                         BUN i = 0;
    1448        1036 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1449        1033 :                                 oid rid = canditer_next(&ci);
    1450        1033 :                                 if (seg->end <= rid)
    1451          13 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1452        1020 :                                 else if (seg->start <= rid && seg->end > rid) {
    1453             :                                         /* check for delete conflicts */
    1454        1020 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1455           0 :                                                 res = LOG_CONFLICT;
    1456           0 :                                                 continue;
    1457             :                                         }
    1458             : 
    1459             :                                         /* check for inplace updates */
    1460        1020 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1461           0 :                                                 if (!ins) {
    1462           0 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1463           0 :                                                         if (!ins) {
    1464             :                                                                 res = LOG_ERR;
    1465             :                                                                 break;
    1466             :                                                         } else {
    1467           0 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1468           0 :                                                                 msk = (int*)Tloc(ins, 0);
    1469           0 :                                                                 BUN end = (ucnt+31)/32;
    1470           0 :                                                                 memset(msk, 0, end * sizeof(int));
    1471             :                                                         }
    1472             :                                                 }
    1473           0 :                                                 ptr upd = BUNtail(upi, i);
    1474           0 :                                                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1475           0 :                                                         res = LOG_ERR;
    1476             : 
    1477           0 :                                                 oid word = i/32;
    1478           0 :                                                 int pos = i%32;
    1479           0 :                                                 msk[word] |= 1U<<pos;
    1480           0 :                                                 cnt++;
    1481             :                                         }
    1482        1020 :                                         i++;
    1483             :                                 }
    1484             :                         }
    1485          40 :                 } else if (res == LOG_OK) {
    1486          40 :                         BUN i = 0;
    1487          40 :                         oid *rid = Tloc(tids,0);
    1488          40 :                         segment *seg = s->segs->h;
    1489         974 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1490         934 :                                 if (seg->end <= rid[i])
    1491         686 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1492         248 :                                 else if (seg->start <= rid[i] && seg->end > rid[i]) {
    1493             :                                         /* check for delete conflicts */
    1494         248 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1495           0 :                                                 res = LOG_CONFLICT;
    1496           0 :                                                 continue;
    1497             :                                         }
    1498             : 
    1499             :                                         /* check for inplace updates */
    1500         248 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1501         195 :                                                 if (!ins) {
    1502          23 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1503          23 :                                                         if (!ins) {
    1504             :                                                                 res = LOG_ERR;
    1505             :                                                                 break;
    1506             :                                                         } else {
    1507          23 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1508          23 :                                                                 msk = (int*)Tloc(ins, 0);
    1509          23 :                                                                 BUN end = (ucnt+31)/32;
    1510          23 :                                                                 memset(msk, 0, end * sizeof(int));
    1511             :                                                         }
    1512             :                                                 }
    1513         195 :                                                 const void *upd = BUNtail(upi, i);
    1514         195 :                                                 if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
    1515           0 :                                                         res = LOG_ERR;
    1516             : 
    1517         195 :                                                 oid word = i/32;
    1518         195 :                                                 int pos = i%32;
    1519         195 :                                                 msk[word] |= 1U<<pos;
    1520         195 :                                                 cnt++;
    1521             :                                         }
    1522         248 :                                         i++;
    1523             :                                 }
    1524             :                         }
    1525             :                 }
    1526             : 
    1527        2511 :                 if (res == LOG_OK && cnt < ucnt) {   /* now handle real updates */
    1528        2379 :                         if (cs->ucnt == 0) {
    1529        2377 :                                 if (cnt) {
    1530           0 :                                         BAT *nins = BATmaskedcands(0, ucnt, ins, false);
    1531           0 :                                         if (nins) {
    1532           0 :                                                 ui = BATproject(nins, tids);
    1533           0 :                                                 uv = BATproject(nins, updates);
    1534           0 :                                                 bat_destroy(nins);
    1535             :                                         }
    1536             :                                 } else {
    1537        2377 :                                         ui = temp_descriptor(tids->batCacheid);
    1538        2377 :                                         uv = temp_descriptor(updates->batCacheid);
    1539             :                                 }
    1540        2377 :                                 if (!ui || !uv) {
    1541             :                                         res = LOG_ERR;
    1542             :                                 } else {
    1543        2377 :                                         temp_destroy(cs->uibid);
    1544        2377 :                                         temp_destroy(cs->uvbid);
    1545        2377 :                                         ui = transfer_to_systrans(ui);
    1546        2377 :                                         uv = transfer_to_systrans(uv);
    1547        2377 :                                         if (ui == NULL || uv == NULL) {
    1548           0 :                                                 BBPreclaim(ui);
    1549           0 :                                                 BBPreclaim(uv);
    1550             :                                                 res = LOG_ERR;
    1551             :                                         } else {
    1552        2377 :                                                 cs->uibid = temp_create(ui);
    1553        2377 :                                                 cs->uvbid = temp_create(uv);
    1554        2377 :                                                 cs->ucnt = BATcount(ui);
    1555             :                                         }
    1556             :                                 }
    1557             :                         } else {
    1558           2 :                                 BAT *nui = NULL, *nuv = NULL;
    1559             : 
    1560             :                                 /* merge taking msk of inserted into account */
    1561           2 :                                 if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
    1562             :                                         res = LOG_ERR;
    1563             : 
    1564           2 :                                 if (res == LOG_OK) {
    1565           2 :                                         const void *upd = NULL;
    1566           2 :                                         nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
    1567           2 :                                         nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
    1568             : 
    1569           2 :                                         if (!nui || !nuv) {
    1570             :                                                 res = LOG_ERR;
    1571             :                                         } else {
    1572           2 :                                                 BATiter ovi = bat_iterator(uv);
    1573             : 
    1574             :                                                 /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
    1575           2 :                                                 BUN uip = 0, uie = BATcount(ui);
    1576           2 :                                                 BUN nip = 0, nie = BATcount(tids);
    1577           2 :                                                 oid uiseqb = ui->tseqbase;
    1578           2 :                                                 oid niseqb = tids->tseqbase;
    1579           2 :                                                 oid *uipt = NULL, *nipt = NULL;
    1580           2 :                                                 BATiter uii = bat_iterator(ui);
    1581           2 :                                                 BATiter tidsi = bat_iterator(tids);
    1582           2 :                                                 if (!BATtdensebi(&uii))
    1583           1 :                                                         uipt = uii.base;
    1584           2 :                                                 if (!BATtdensebi(&tidsi))
    1585           1 :                                                         nipt = tidsi.base;
    1586          20 :                                                 while (uip < uie && nip < nie && res == LOG_OK) {
    1587          18 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1588          18 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1589             : 
    1590          18 :                                                         if (uiv < niv) {
    1591           0 :                                                                 upd = BUNtail(ovi, uip);
    1592           0 :                                                                 if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1593           0 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1594             :                                                                         res = LOG_ERR;
    1595           0 :                                                                 uip++;
    1596          18 :                                                         } else if (uiv == niv) {
    1597             :                                                                 /* handle == */
    1598          18 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1599          18 :                                                                         upd = BUNtail(upi, nip);
    1600          36 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1601          18 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1602             :                                                                                 res = LOG_ERR;
    1603             :                                                                 } else {
    1604           0 :                                                                         upd = BUNtail(ovi, uip);
    1605           0 :                                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1606           0 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1607             :                                                                                 res = LOG_ERR;
    1608             :                                                                 }
    1609          18 :                                                                 uip++;
    1610          18 :                                                                 nip++;
    1611             :                                                         } else { /* uiv > niv */
    1612           0 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1613           0 :                                                                         upd = BUNtail(upi, nip);
    1614           0 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1615           0 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1616             :                                                                                 res = LOG_ERR;
    1617             :                                                                 }
    1618           0 :                                                                 nip++;
    1619             :                                                         }
    1620             :                                                 }
    1621           2 :                                                 while (uip < uie && res == LOG_OK) {
    1622           0 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1623           0 :                                                         upd = BUNtail(ovi, uip);
    1624           0 :                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1625           0 :                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1626             :                                                                 res = LOG_ERR;
    1627           0 :                                                         uip++;
    1628             :                                                 }
    1629           2 :                                                 while (nip < nie && res == LOG_OK) {
    1630           0 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1631           0 :                                                         if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1632           0 :                                                                 upd = BUNtail(upi, nip);
    1633           0 :                                                                 if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1634           0 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1635             :                                                                         res = LOG_ERR;
    1636             :                                                         }
    1637           0 :                                                         nip++;
    1638             :                                                 }
    1639           2 :                                                 bat_iterator_end(&uii);
    1640           2 :                                                 bat_iterator_end(&tidsi);
    1641           2 :                                                 bat_iterator_end(&ovi);
    1642           2 :                                                 if (res == LOG_OK) {
    1643           2 :                                                         temp_destroy(cs->uibid);
    1644           2 :                                                         temp_destroy(cs->uvbid);
    1645           2 :                                                         nui = transfer_to_systrans(nui);
    1646           2 :                                                         nuv = transfer_to_systrans(nuv);
    1647           2 :                                                         if (nui == NULL || nuv == NULL) {
    1648             :                                                                 res = LOG_ERR;
    1649             :                                                         } else {
    1650           2 :                                                                 cs->uibid = temp_create(nui);
    1651           2 :                                                                 cs->uvbid = temp_create(nuv);
    1652           2 :                                                                 cs->ucnt = BATcount(nui);
    1653             :                                                         }
    1654             :                                                 }
    1655             :                                         }
    1656           2 :                                         bat_destroy(nui);
    1657           2 :                                         bat_destroy(nuv);
    1658             :                                 }
    1659             :                         }
    1660             :                 }
    1661        2511 :                 bat_iterator_end(&upi);
    1662        2511 :                 bat_destroy(b);
    1663        2511 :                 unlock_table(tr->store, t->base.id);
    1664        2511 :                 bat_destroy(ins);
    1665        2511 :                 bat_destroy(ui);
    1666        2511 :                 bat_destroy(uv);
    1667        2511 :                 if (otids != tids)
    1668          10 :                         bat_destroy(tids);
    1669        2511 :                 if (oupdates != updates)
    1670          12 :                         bat_destroy(updates);
    1671        2511 :                 return res;
    1672             :         } else if (is_new || cs->cleared) {
    1673         318 :                 BAT *b = temp_descriptor(cs->bid);
    1674             : 
    1675         318 :                 if (b == NULL) {
    1676             :                         res = LOG_ERR;
    1677             :                 } else {
    1678         318 :                         if (BATcount(b)==0) {
    1679           1 :                                 if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
    1680           0 :                                         res = LOG_ERR;
    1681         317 :                         } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
    1682           0 :                                 res = LOG_ERR;
    1683         318 :                         BBPcold(b->batCacheid);
    1684         318 :                         bat_destroy(b);
    1685             :                 }
    1686             :         }
    1687         318 :         unlock_table(tr->store, t->base.id);
    1688         318 :         if (otids != tids)
    1689           2 :                 bat_destroy(tids);
    1690         318 :         if (oupdates != updates)
    1691          41 :                 bat_destroy(updates);
    1692             :         return res;
    1693             : }
    1694             : 
    1695             : static int
    1696        2829 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1697             : {
    1698        2829 :         return cs_update_bat(tr, bat, t, tids, updates, is_new);
    1699             : }
    1700             : 
    1701             : static void *
    1702           4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
    1703             : {
    1704           4 :         void *newoffsets = NULL;
    1705           4 :         sql_delta *bat = *batp;
    1706           4 :         column_storage *cs = &bat->cs;
    1707           4 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1708             : 
    1709           4 :         if (!u)
    1710             :                 return NULL;
    1711           4 :         BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
    1712           4 :         if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
    1713           0 :                 assert(0);
    1714             :         } else {
    1715           4 :                 int new = 0;
    1716             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1717           4 :                 if (BATcount(u) >= max_cnt) {
    1718           0 :                         if (max_cnt == 64*1024) { /* decompress */
    1719           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1720           0 :                                         bat_destroy(u);
    1721           0 :                                         return NULL;
    1722             :                                 }
    1723           0 :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1724             :                                 /* TODO decompress updates if any */
    1725           0 :                                 bat_destroy(b);
    1726           0 :                                 assert(newoffsets == NULL);
    1727           0 :                                 if (!n) {
    1728           0 :                                         bat_destroy(u);
    1729           0 :                                         return NULL;
    1730             :                                 }
    1731           0 :                                 if (cs->ts != tr->tid) {
    1732           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1733           0 :                                                 bat_destroy(n);
    1734           0 :                                                 bat_destroy(u);
    1735           0 :                                                 return NULL;
    1736             :                                         }
    1737           0 :                                         cs = &(*batp)->cs;
    1738           0 :                                         new = 1;
    1739           0 :                                         cs->uibid = cs->uvbid = 0;
    1740             :                                 }
    1741           0 :                                 if (cs->bid && !new)
    1742           0 :                                         temp_destroy(cs->bid);
    1743           0 :                                 n = transfer_to_systrans(n);
    1744           0 :                                 if (n == NULL) {
    1745           0 :                                         bat_destroy(u);
    1746           0 :                                         return NULL;
    1747             :                                 }
    1748           0 :                                 bat_set_access(n, BAT_READ);
    1749           0 :                                 cs->bid = temp_create(n);
    1750           0 :                                 bat_destroy(n);
    1751           0 :                                 if (cs->ebid && !new)
    1752           0 :                                         temp_destroy(cs->ebid);
    1753           0 :                                 cs->ebid = 0;
    1754           0 :                                 cs->st = ST_DEFAULT;
    1755             :                                 /* at append_col the column's storage type is cleared */
    1756           0 :                                 cs->cleared = true;
    1757             :                         } else {
    1758           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1759           0 :                                         GDKfree(newoffsets);
    1760           0 :                                         bat_destroy(u);
    1761           0 :                                         return NULL;
    1762             :                                 }
    1763           0 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, PERSISTENT);
    1764           0 :                                 bat_destroy(b);
    1765           0 :                                 if (!n) {
    1766           0 :                                         GDKfree(newoffsets);
    1767           0 :                                         bat_destroy(u);
    1768           0 :                                         return NULL;
    1769             :                                 }
    1770           0 :                                 if (cs->ts != tr->tid) {
    1771           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1772           0 :                                                 bat_destroy(u);
    1773           0 :                                                 bat_destroy(n);
    1774           0 :                                                 return NULL;
    1775             :                                         }
    1776           0 :                                         cs = &(*batp)->cs;
    1777           0 :                                         new = 1;
    1778           0 :                                         temp_dup(cs->ebid);
    1779           0 :                                         if (cs->uibid) {
    1780           0 :                                                 temp_dup(cs->uibid);
    1781           0 :                                                 temp_dup(cs->uvbid);
    1782             :                                         }
    1783             :                                 }
    1784           0 :                                 if (cs->bid)
    1785           0 :                                         temp_destroy(cs->bid);
    1786           0 :                                 n = transfer_to_systrans(n);
    1787           0 :                                 if (n == NULL) {
    1788           0 :                                         bat_destroy(u);
    1789           0 :                                         return NULL;
    1790             :                                 }
    1791           0 :                                 bat_set_access(n, BAT_READ);
    1792           0 :                                 cs->bid = temp_create(n);
    1793           0 :                                 bat_destroy(n);
    1794           0 :                                 cs->cleared = true;
    1795           0 :                                 i = newoffsets;
    1796             :                         }
    1797             :                 } else { /* append */
    1798           4 :                         i = newoffsets;
    1799             :                 }
    1800             :         }
    1801           4 :         bat_destroy(u);
    1802           4 :         return i;
    1803             : }
    1804             : 
    1805             : static void *
    1806           1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
    1807             : {
    1808           1 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1809           1 :         void *newoffsets = NULL;
    1810           1 :         BAT *b = NULL, *n = NULL;
    1811             : 
    1812           1 :         if (!(b = temp_descriptor(cs->bid)))
    1813             :                 return NULL;
    1814             : 
    1815           1 :         if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
    1816           0 :                 assert(0);
    1817             :         } else {
    1818             :                 /* returns new offset bat if values within min/max, else decompress */
    1819           1 :                 if (!newoffsets) {
    1820           1 :                         n = FORdecompress_(b, offsetval, tt, PERSISTENT);
    1821           1 :                         bat_destroy(b);
    1822           1 :                         if (!n)
    1823             :                                 return NULL;
    1824             :                         /* TODO decompress updates if any */
    1825           1 :                         if (cs->bid)
    1826           1 :                                 temp_destroy(cs->bid);
    1827           1 :                         n = transfer_to_systrans(n);
    1828           1 :                         if (n == NULL)
    1829             :                                 return NULL;
    1830           1 :                         bat_set_access(n, BAT_READ);
    1831           1 :                         cs->bid = temp_create(n);
    1832           1 :                         cs->st = ST_DEFAULT;
    1833             :                         /* at append_col the column's storage type is cleared */
    1834           1 :                         cs->cleared = true;
    1835           1 :                         b = n;
    1836             :                 } else { /* append */
    1837             :                         i = newoffsets;
    1838             :                 }
    1839             :         }
    1840           1 :         bat_destroy(b);
    1841           1 :         return i;
    1842             : }
    1843             : 
    1844             : static int
    1845        3018 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
    1846             : {
    1847        3018 :         void *oupd = upd;
    1848        3018 :         sql_delta *bat = *batp;
    1849        3018 :         column_storage *cs = &bat->cs;
    1850        3018 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1851        3018 :         assert(!is_oid_nil(rid));
    1852        3018 :         int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
    1853             : 
    1854        3018 :         if (cs->st == ST_DICT) {
    1855             :                 /* possibly a new array is returned */
    1856           0 :                 upd = dict_append_val(tr, batp, upd, 1);
    1857           0 :                 bat = *batp;
    1858           0 :                 cs = &bat->cs;
    1859           0 :                 if (!upd)
    1860             :                         return LOG_ERR;
    1861             :         }
    1862             : 
    1863             :         /* check if rid is insert ? */
    1864        3018 :         if (!inplace) {
    1865             :                 /* check conflict */
    1866         444 :                 if (segments_is_deleted(s->segs->h, tr, rid)) {
    1867           0 :                         if (oupd != upd)
    1868           0 :                                 GDKfree(upd);
    1869           0 :                         return LOG_CONFLICT;
    1870             :                 }
    1871         444 :                 BAT *ui, *uv;
    1872             : 
    1873             :                 /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1874             :                 /* currently only one update delta is possible */
    1875         444 :                 if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1876           0 :                         if (oupd != upd)
    1877           0 :                                 GDKfree(upd);
    1878           0 :                         return LOG_ERR;
    1879             :                 }
    1880             : 
    1881         444 :                 assert(uv->ttype);
    1882         444 :                 assert(BATcount(ui) == BATcount(uv));
    1883         888 :                 if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
    1884         444 :                     BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
    1885           0 :                         if (oupd != upd)
    1886           0 :                                 GDKfree(upd);
    1887           0 :                         bat_destroy(ui);
    1888           0 :                         bat_destroy(uv);
    1889           0 :                         return LOG_ERR;
    1890             :                 }
    1891         444 :                 assert(BATcount(ui) == BATcount(uv));
    1892         444 :                 bat_destroy(ui);
    1893         444 :                 bat_destroy(uv);
    1894         444 :                 cs->ucnt++;
    1895             :         } else {
    1896        2574 :                 BAT *b = NULL;
    1897             : 
    1898        2574 :                 if((b = temp_descriptor(cs->bid)) == NULL) {
    1899           0 :                         if (oupd != upd)
    1900           0 :                                 GDKfree(upd);
    1901           0 :                         return LOG_ERR;
    1902             :                 }
    1903        2574 :                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
    1904           0 :                         if (oupd != upd)
    1905           0 :                                 GDKfree(upd);
    1906           0 :                         bat_destroy(b);
    1907           0 :                         return LOG_ERR;
    1908             :                 }
    1909        2574 :                 bat_destroy(b);
    1910             :         }
    1911        3018 :         if (oupd != upd)
    1912           0 :                 GDKfree(upd);
    1913             :         return LOG_OK;
    1914             : }
    1915             : 
    1916             : static int
    1917        3018 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
    1918             : {
    1919        3018 :         int res = LOG_OK;
    1920        3018 :         lock_table(tr->store, t->base.id);
    1921        3018 :         res = cs_update_val(tr, bat, t, rid, upd, is_new);
    1922        3018 :         unlock_table(tr->store, t->base.id);
    1923        3018 :         return res;
    1924             : }
    1925             : 
    1926             : static int
    1927      158811 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
    1928             : {
    1929      158811 :         (void)tr;
    1930      158811 :         if (!ocs)
    1931             :                 return LOG_OK;
    1932      158811 :         cs->bid = ocs->bid;
    1933      158811 :         cs->ebid = ocs->ebid;
    1934      158811 :         cs->uibid = ocs->uibid;
    1935      158811 :         cs->uvbid = ocs->uvbid;
    1936      158811 :         cs->ucnt = ocs->ucnt;
    1937             : 
    1938      158811 :         if (temp) {
    1939       25973 :                 cs->bid = temp_copy(cs->bid, true, false);
    1940       25973 :                 if (cs->bid == BID_NIL)
    1941             :                         return LOG_ERR;
    1942             :         } else {
    1943      132838 :                 temp_dup(cs->bid);
    1944             :         }
    1945      158817 :         if (cs->ebid)
    1946           6 :                 temp_dup(cs->ebid);
    1947      158817 :         cs->ucnt = 0;
    1948      158817 :         cs->uibid = e_bat(TYPE_oid);
    1949      158823 :         cs->uvbid = e_bat(type);
    1950      158825 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1951             :                 return LOG_ERR;
    1952      158825 :         cs->st = ocs->st;
    1953      158825 :         return LOG_OK;
    1954             : }
    1955             : 
    1956             : static void
    1957      184026 : destroy_delta(sql_delta *b, bool recursive)
    1958             : {
    1959      184026 :         if (ATOMIC_DEC(&b->cs.refcnt) > 0)
    1960             :                 return;
    1961      164695 :         if (recursive && b->next)
    1962        1424 :                 destroy_delta(b->next, true);
    1963      164695 :         if (b->cs.uibid)
    1964       93695 :                 temp_destroy(b->cs.uibid);
    1965      164695 :         if (b->cs.uvbid)
    1966       93695 :                 temp_destroy(b->cs.uvbid);
    1967      164695 :         if (b->cs.bid)
    1968      164695 :                 temp_destroy(b->cs.bid);
    1969      164695 :         if (b->cs.ebid)
    1970          61 :                 temp_destroy(b->cs.ebid);
    1971      164695 :         ATOMIC_DESTROY(&b->cs.refcnt);
    1972      164695 :         b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
    1973      164695 :         _DELETE(b);
    1974             : }
    1975             : 
    1976             : static sql_delta *
    1977    16144303 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
    1978             : {
    1979    16144303 :         sql_delta *obat = ATOMIC_PTR_GET(&c->data);
    1980             : 
    1981    16144303 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    1982    16011470 :                 return obat;
    1983      132833 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    1984             :                 /* abort */
    1985          12 :                 if (update_conflict)
    1986           4 :                         *update_conflict = true;
    1987           8 :                 else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
    1988           8 :                         return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
    1989           4 :                 return NULL;
    1990             :         }
    1991      132821 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
    1992             :                 return NULL;
    1993      132823 :         sql_delta* bat = ZNEW(sql_delta);
    1994      132814 :         if (!bat)
    1995             :                 return NULL;
    1996      132814 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    1997      132814 :         if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
    1998           0 :                 destroy_delta(bat, false);
    1999           0 :                 return NULL;
    2000             :         }
    2001      132824 :         bat->cs.ts = tr->tid;
    2002             :         /* only one writer else abort */
    2003      132824 :         bat->next = obat;
    2004      132824 :         if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
    2005           0 :                 bat->next = NULL;
    2006           0 :                 destroy_delta(bat, false);
    2007           0 :                 if (update_conflict)
    2008           0 :                         *update_conflict = true;
    2009           0 :                 return NULL;
    2010             :         }
    2011             :         return bat;
    2012             : }
    2013             : 
    2014             : static int
    2015        5847 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
    2016             : {
    2017        5847 :         int ok = LOG_OK;
    2018             : 
    2019        5847 :         if (is_bat) {
    2020        2829 :                 BAT *tids = incoming_tids;
    2021        2829 :                 BAT *values = incoming_values;
    2022        2829 :                 if (BATcount(tids) == 0)
    2023             :                         return LOG_OK;
    2024        2829 :                 ok = delta_update_bat(tr, delta, table, tids, values, is_new);
    2025             :         } else {
    2026        3018 :                 ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
    2027             :         }
    2028             :         return ok;
    2029             : }
    2030             : 
    2031             : static int
    2032        6071 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, int tpe)
    2033             : {
    2034        6071 :         int res = LOG_OK;
    2035        6071 :         bool update_conflict = false;
    2036        6071 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2037             : 
    2038        6071 :         if (tpe == TYPE_bat) {
    2039        3050 :                 BAT *t = tids;
    2040        3050 :                 if (!BATcount(t))
    2041             :                         return LOG_OK;
    2042             :         }
    2043             : 
    2044        5576 :         if (c == NULL)
    2045             :                 return LOG_ERR;
    2046             : 
    2047        5576 :         if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
    2048           4 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2049             : 
    2050        5572 :         assert(delta && delta->cs.ts == tr->tid);
    2051        5572 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    2052        5572 :         if (odelta != delta)
    2053        3321 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    2054             : 
    2055        5572 :         odelta = delta;
    2056        5572 :         if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, tpe == TYPE_bat)) != LOG_OK)
    2057             :                 return res;
    2058        5572 :         assert(delta == odelta);
    2059        5572 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2060           0 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2061             :         return res;
    2062             : }
    2063             : 
    2064             : static sql_delta *
    2065        2435 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
    2066             : {
    2067        2435 :         sql_delta *obat = ATOMIC_PTR_GET(&i->data);
    2068             : 
    2069        2435 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    2070        2407 :                 return obat;
    2071          28 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    2072             :                 /* abort */
    2073           0 :                 if (update_conflict)
    2074           0 :                         *update_conflict = true;
    2075           0 :                 return NULL;
    2076             :         }
    2077          28 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
    2078             :                 return NULL;
    2079          28 :         sql_delta* bat = ZNEW(sql_delta);
    2080          28 :         if (!bat)
    2081             :                 return NULL;
    2082          28 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2083          33 :         if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
    2084           0 :                 destroy_delta(bat, false);
    2085           0 :                 return NULL;
    2086             :         }
    2087          28 :         bat->cs.ts = tr->tid;
    2088             :         /* only one writer else abort */
    2089          28 :         bat->next = obat;
    2090          28 :         if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
    2091           0 :                 bat->next = NULL;
    2092           0 :                 destroy_delta(bat, false);
    2093           0 :                 if (update_conflict)
    2094           0 :                         *update_conflict = true;
    2095           0 :                 return NULL;
    2096             :         }
    2097             :         return bat;
    2098             : }
    2099             : 
    2100             : static int
    2101         777 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, int tpe)
    2102             : {
    2103         777 :         int res = LOG_OK;
    2104         777 :         bool update_conflict = false;
    2105         777 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    2106             : 
    2107         777 :         if (tpe == TYPE_bat) {
    2108         777 :                 BAT *t = tids;
    2109         777 :                 if (!BATcount(t))
    2110             :                         return LOG_OK;
    2111             :         }
    2112             : 
    2113         275 :         if (i == NULL)
    2114             :                 return LOG_ERR;
    2115             : 
    2116         275 :         if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
    2117           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2118             : 
    2119         275 :         assert(delta && delta->cs.ts == tr->tid);
    2120         275 :         if (odelta != delta)
    2121          22 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    2122             : 
    2123         275 :         odelta = delta;
    2124         275 :         res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, tpe == TYPE_bat);
    2125         275 :         assert(delta == odelta);
    2126             :         return res;
    2127             : }
    2128             : 
    2129             : static int
    2130      149817 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
    2131             : {
    2132      149817 :         BAT *b, *oi = i;
    2133      149817 :         int err = 0;
    2134      149817 :         sql_delta *bat = *batp;
    2135             : 
    2136      149817 :         assert(!offsets || BATcount(offsets) == BATcount(i));
    2137      149817 :         if (!BATcount(i))
    2138             :                 return LOG_OK;
    2139      149817 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
    2140             :                 return LOG_ERR;
    2141             : 
    2142      149817 :         lock_column(tr->store, id);
    2143      149808 :         if (bat->cs.st == ST_DICT) {
    2144          13 :                 BAT *ni = dict_append_bat(tr, batp, oi);
    2145          13 :                 bat = *batp;
    2146          13 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2147           0 :                         bat_destroy(oi);
    2148          13 :                 oi = ni;
    2149          13 :                 if (!oi) {
    2150           0 :                         unlock_column(tr->store, id);
    2151           0 :                         return LOG_ERR;
    2152             :                 }
    2153             :         }
    2154      149808 :         if (bat->cs.st == ST_FOR) {
    2155           0 :                 BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
    2156           0 :                 bat = *batp;
    2157           0 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2158           0 :                         bat_destroy(oi);
    2159           0 :                 oi = ni;
    2160           0 :                 if (!oi) {
    2161           0 :                         unlock_column(tr->store, id);
    2162           0 :                         return LOG_ERR;
    2163             :                 }
    2164             :         }
    2165             : 
    2166      149808 :         b = temp_descriptor(bat->cs.bid);
    2167      149812 :         if (b == NULL) {
    2168           0 :                 unlock_column(tr->store, id);
    2169           0 :                 if (oi != i)
    2170           0 :                         bat_destroy(oi);
    2171           0 :                 return LOG_ERR;
    2172             :         }
    2173      149812 :         if (!offsets && offset == b->hseqbase+BATcount(b)) {
    2174      149624 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2175           0 :                         err = 1;
    2176         176 :         } else if (!offsets) {
    2177         176 :                 if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
    2178           0 :                         err = 1;
    2179          12 :         } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
    2180           0 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2181           0 :                         err = 1;
    2182          12 :         } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
    2183           0 :                         err = 1;
    2184             :         }
    2185      149811 :         bat_destroy(b);
    2186      149815 :         unlock_column(tr->store, id);
    2187             : 
    2188      149797 :         if (oi != i)
    2189          13 :                 bat_destroy(oi);
    2190      149797 :         return (err)?LOG_ERR:LOG_OK;
    2191             : }
    2192             : 
    2193             : // Look at the offsets and find where the replacements end and the appends begin.
    2194             : static BUN
    2195           0 : start_of_appends(BAT *offsets, BUN bcnt)
    2196             : {
    2197           0 :         BUN ocnt = BATcount(offsets);
    2198           0 :         if (ocnt == 0)
    2199             :                 return 0;
    2200             : 
    2201           0 :         BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
    2202           0 :         if (highest < bcnt)
    2203             :                 // all are replacements
    2204             :                 return ocnt;
    2205             : 
    2206             :         // reason backward to find the first append.
    2207             :         // Suppose offsets has 15 entries, bcnt == 100
    2208             :         // and the highest offset in offsets is 109.
    2209           0 :         BUN new_bcnt = highest + 1; // 110
    2210           0 :         BUN nappends = new_bcnt - bcnt; // 10
    2211           0 :         BUN nreplacements = ocnt - nappends; // 5
    2212             : 
    2213             :         // The first append should be to position bcnt
    2214           0 :         assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
    2215             : 
    2216             :         return nreplacements;
    2217             : }
    2218             : 
    2219             : 
    2220             : static int
    2221    15861578 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
    2222             : {
    2223    15861578 :         void *oi = i;
    2224    15861578 :         BAT *b;
    2225    15861578 :         lock_column(tr->store, id);
    2226    15861305 :         sql_delta *bat = *batp;
    2227             : 
    2228    15861305 :         if (bat->cs.st == ST_DICT) {
    2229             :                 /* possibly a new array is returned */
    2230           4 :                 i = dict_append_val(tr, batp, i, cnt);
    2231           4 :                 bat = *batp;
    2232           4 :                 if (!i) {
    2233           0 :                         unlock_column(tr->store, id);
    2234           0 :                         return LOG_ERR;
    2235             :                 }
    2236             :         }
    2237    15861305 :         if (bat->cs.st == ST_FOR) {
    2238             :                 /* possibly a new array is returned */
    2239           1 :                 i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
    2240           1 :                 bat = *batp;
    2241           1 :                 if (!i) {
    2242           0 :                         unlock_column(tr->store, id);
    2243           0 :                         return LOG_ERR;
    2244             :                 }
    2245             :         }
    2246             : 
    2247    15861305 :         b = temp_descriptor(bat->cs.bid);
    2248    15861141 :         if (b == NULL) {
    2249           0 :                 if (i != oi)
    2250           0 :                         GDKfree(i);
    2251           0 :                 unlock_column(tr->store, id);
    2252           0 :                 return LOG_ERR;
    2253             :         }
    2254    15861141 :         BUN bcnt = BATcount(b);
    2255             : 
    2256    15861141 :         if (offsets) {
    2257             :                 // The first few might be replacements while later items might be appends.
    2258             :                 // Handle the replacements here while leaving the appends to the code below.
    2259           0 :                 BUN nreplacements = start_of_appends(offsets, bcnt);
    2260             : 
    2261           0 :                 oid *start = Tloc(offsets, 0);
    2262           0 :                 if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
    2263           0 :                         bat_destroy(b);
    2264           0 :                         if (i != oi)
    2265           0 :                                 GDKfree(i);
    2266           0 :                         unlock_column(tr->store, id);
    2267           0 :                         return LOG_ERR;
    2268             :                 }
    2269             : 
    2270             :                 // Replacements have been handled. The rest are appends.
    2271           0 :                 assert(offset == oid_nil);
    2272           0 :                 offset = bcnt;
    2273           0 :                 cnt -= nreplacements;
    2274             :         }
    2275             : 
    2276    15861141 :         if (bcnt > offset){
    2277      509555 :                 size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
    2278      509555 :                 if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
    2279           0 :                         bat_destroy(b);
    2280           0 :                         if (i != oi)
    2281           0 :                                 GDKfree(i);
    2282           0 :                         unlock_column(tr->store, id);
    2283           0 :                         return LOG_ERR;
    2284             :                 }
    2285      509673 :                 cnt -= ccnt;
    2286      509673 :                 offset += ccnt;
    2287             :         }
    2288    15861259 :         if (cnt) {
    2289    15351584 :                 if (BATcount(b) < offset) { /* add space */
    2290       10798 :                         BUN d = offset - BATcount(b);
    2291       10798 :                         if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
    2292           0 :                                 bat_destroy(b);
    2293           0 :                                 if (i != oi)
    2294           0 :                                         GDKfree(i);
    2295           0 :                                 unlock_column(tr->store, id);
    2296           0 :                                 return LOG_ERR;
    2297             :                         }
    2298             :                 }
    2299    15351584 :                 if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
    2300           0 :                         bat_destroy(b);
    2301           0 :                         if (i != oi)
    2302           0 :                                 GDKfree(i);
    2303           0 :                         unlock_column(tr->store, id);
    2304           0 :                         return LOG_ERR;
    2305             :                 }
    2306             :         }
    2307    15861366 :         bat_destroy(b);
    2308    15861218 :         if (i != oi)
    2309           4 :                 GDKfree(i);
    2310    15861218 :         unlock_column(tr->store, id);
    2311    15861218 :         return LOG_OK;
    2312             : }
    2313             : 
    2314             : static int
    2315       25973 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
    2316             : {
    2317       25973 :         if (!(bat->segs = new_segments(tr, 0)))
    2318             :                 return LOG_ERR;
    2319       25973 :         return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
    2320             : }
    2321             : 
    2322             : static int
    2323    16011424 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, int tt, char *storage_type)
    2324             : {
    2325    16011424 :         int ok = LOG_OK;
    2326             : 
    2327    16011424 :         if ((*delta)->cs.merged)
    2328       35924 :                 (*delta)->cs.merged = false; /* TODO needs to move */
    2329    16011424 :         if (tt == TYPE_bat) {
    2330      149832 :                 BAT *bat = incoming_data;
    2331             : 
    2332      149832 :                 if (BATcount(bat))
    2333      149817 :                         ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
    2334             :         } else {
    2335    15861592 :                 ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
    2336             :         }
    2337    16011219 :         return ok;
    2338             : }
    2339             : 
    2340             : static int
    2341    16009966 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, int tpe)
    2342             : {
    2343    16009966 :         int res = LOG_OK;
    2344    16009966 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2345             : 
    2346    16009966 :         if (tpe == TYPE_bat) {
    2347      149617 :                 BAT *t = data;
    2348      149617 :                 if (!BATcount(t))
    2349             :                         return LOG_OK;
    2350             :         }
    2351             : 
    2352    16009184 :         if ((delta = bind_col_data(tr, c, NULL)) == NULL)
    2353             :                 return LOG_ERR;
    2354             : 
    2355    16009237 :         assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
    2356             : 
    2357    16009237 :         odelta = delta;
    2358    16009237 :         if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, tpe, c->storage_type)) != LOG_OK)
    2359             :                 return res;
    2360    16009035 :         if (odelta != delta) {
    2361           0 :                 delta->next = odelta;
    2362           0 :                 if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
    2363           0 :                         delta->next = NULL;
    2364           0 :                         destroy_delta(delta, false);
    2365           0 :                         return LOG_CONFLICT;
    2366             :                 }
    2367             :         }
    2368    16009035 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2369           1 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2370             :         return res;
    2371             : }
    2372             : 
    2373             : static int
    2374        2166 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, int tpe)
    2375             : {
    2376        2166 :         int res = LOG_OK;
    2377        2166 :         sql_delta *delta;
    2378             : 
    2379        2166 :         if (tpe == TYPE_bat) {
    2380         998 :                 BAT *t = data;
    2381         998 :                 if (!BATcount(t))
    2382             :                         return LOG_OK;
    2383             :         }
    2384             : 
    2385        2154 :         if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
    2386             :                 return LOG_ERR;
    2387             : 
    2388        2154 :         assert(delta->cs.st == ST_DEFAULT);
    2389             : 
    2390        2154 :         res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, tpe, NULL);
    2391        2154 :         return res;
    2392             : }
    2393             : 
    2394             : static int
    2395       66363 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
    2396             : {
    2397       66363 :         int err = 0;
    2398             : 
    2399             :         /* TODO check for conflicting updates */
    2400       66363 :         (void)rid;
    2401       66363 :         (void)cnt;
    2402      508592 :         for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
    2403      442229 :                 sql_column *c = n->data;
    2404      442229 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    2405             : 
    2406             :                 /* check for active updates */
    2407      442229 :                 if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
    2408             :                         return 1;
    2409             :         }
    2410             :         return 0;
    2411             : }
    2412             : 
    2413             : static int
    2414       64042 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
    2415             : {
    2416       64042 :         int in_transaction = segments_in_transaction(tr, t);
    2417             : 
    2418       64042 :         lock_table(tr->store, t->base.id);
    2419             :         /* find segment of rid, split, mark new segment deleted (for tr->tid) */
    2420       64042 :         segment *seg = s->segs->h, *p = NULL;
    2421    51336090 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2422    51336090 :                 if (seg->start <= rid && seg->end > rid) {
    2423       64042 :                         if (!SEG_VALID_4_DELETE(seg,tr)) {
    2424           4 :                                 unlock_table(tr->store, t->base.id);
    2425           4 :                                 return LOG_CONFLICT;
    2426             :                         }
    2427       64038 :                         if (deletes_conflict_updates( tr, t, rid, 1)) {
    2428           0 :                                 unlock_table(tr->store, t->base.id);
    2429           0 :                                 return LOG_CONFLICT;
    2430             :                         }
    2431       64038 :                         if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
    2432           0 :                                 unlock_table(tr->store, t->base.id);
    2433           0 :                                 return LOG_ERR;
    2434             :                         }
    2435             :                         break;
    2436             :                 }
    2437             :         }
    2438       64038 :         unlock_table(tr->store, t->base.id);
    2439       64038 :         if (!in_transaction)
    2440       12098 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2441             :         return LOG_OK;
    2442             : }
    2443             : 
    2444             : static int
    2445        2323 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
    2446             : {
    2447        2323 :         segment *seg = *Seg, *p = NULL;
    2448        7445 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2449        7443 :                 if (seg->start <= start && seg->end > start) {
    2450        2363 :                         size_t lcnt = cnt;
    2451        2363 :                         if (start+lcnt > seg->end)
    2452          44 :                                 lcnt = seg->end-start;
    2453        2363 :                         if (SEG_IS_DELETED(seg, tr)) {
    2454          37 :                                 start += lcnt;
    2455          37 :                                 cnt -= lcnt;
    2456          37 :                                 continue;
    2457        2326 :                         } else if (!SEG_VALID_4_DELETE(seg, tr))
    2458           1 :                                 return LOG_CONFLICT;
    2459        2325 :                         if (deletes_conflict_updates( tr, t, start, lcnt))
    2460             :                                 return LOG_CONFLICT;
    2461        2325 :                         *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
    2462        2325 :                         if (!seg)
    2463             :                                 return LOG_ERR;
    2464        2325 :                         start += lcnt;
    2465        2325 :                         cnt -= lcnt;
    2466             :                 }
    2467        7405 :                 if (start+cnt <= seg->end)
    2468             :                         break;
    2469             :         }
    2470             :         return LOG_OK;
    2471             : }
    2472             : 
    2473             : static int
    2474         523 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
    2475             : {
    2476         523 :         segment *seg = s->segs->h;
    2477         523 :         return seg_delete_range(tr, t, s, &seg, start, cnt);
    2478             : }
    2479             : 
    2480             : static int
    2481         291 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
    2482             : {
    2483         291 :         int in_transaction = segments_in_transaction(tr, t);
    2484         291 :         BAT *oi = i;    /* update ids */
    2485         291 :         int ok = LOG_OK;
    2486             : 
    2487         291 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
    2488             :                 return LOG_ERR;
    2489         291 :         if (BATcount(i)) {
    2490         514 :                 if (BATtdense(i)) {
    2491         223 :                         size_t start = i->tseqbase;
    2492         223 :                         size_t cnt = BATcount(i);
    2493             : 
    2494         223 :                         lock_table(tr->store, t->base.id);
    2495         223 :                         ok = delete_range(tr, t, s, start, cnt);
    2496         223 :                         unlock_table(tr->store, t->base.id);
    2497          68 :                 } else if (complex_cand(i)) {
    2498           0 :                         struct canditer ci;
    2499           0 :                         oid f = 0, l = 0, cur = 0;
    2500             : 
    2501           0 :                         canditer_init(&ci, NULL, i);
    2502           0 :                         cur = f = canditer_next(&ci);
    2503             : 
    2504           0 :                         lock_table(tr->store, t->base.id);
    2505           0 :                         if (!is_oid_nil(f)) {
    2506           0 :                                 segment *seg = s->segs->h;
    2507           0 :                                 for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
    2508           0 :                                         if (cur+1 == l) {
    2509           0 :                                                 cur++;
    2510           0 :                                                 continue;
    2511             :                                         }
    2512           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2513           0 :                                         f = cur = l;
    2514             :                                 }
    2515           0 :                                 if (ok == LOG_OK)
    2516           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2517             :                         }
    2518           0 :                         unlock_table(tr->store, t->base.id);
    2519             :                 } else {
    2520          68 :                         if (!i->tsorted) {
    2521           0 :                                 assert(oi == i);
    2522           0 :                                 BAT *ni = NULL;
    2523           0 :                                 if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
    2524           0 :                                         ok = LOG_ERR;
    2525           0 :                                 if (ni)
    2526           0 :                                         i = ni;
    2527             :                         }
    2528          68 :                         assert(i->tsorted);
    2529          68 :                         BUN icnt = BATcount(i);
    2530          68 :                         BATiter ii = bat_iterator(i);
    2531          68 :                         oid *o = ii.base, n = o[0]+1;
    2532          68 :                         size_t lcnt = 1;
    2533             : 
    2534          68 :                         lock_table(tr->store, t->base.id);
    2535          68 :                         segment *seg = s->segs->h;
    2536       17899 :                         for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
    2537       17831 :                                 if (o[i] == n) {
    2538       17572 :                                         lcnt++;
    2539       17572 :                                         n++;
    2540             :                                 } else {
    2541         259 :                                         ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2542         259 :                                         lcnt = 0;
    2543             :                                 }
    2544       17831 :                                 if (!lcnt) {
    2545         259 :                                         n = o[i]+1;
    2546         259 :                                         lcnt = 1;
    2547             :                                 }
    2548             :                         }
    2549          68 :                         bat_iterator_end(&ii);
    2550          68 :                         if (lcnt && ok == LOG_OK)
    2551          68 :                                 ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2552          68 :                         unlock_table(tr->store, t->base.id);
    2553             :                 }
    2554             :         }
    2555         291 :         if (i != oi)
    2556          25 :                 bat_destroy(i);
    2557             :         // assert
    2558         291 :         if (!in_transaction)
    2559         259 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2560             :         return ok;
    2561             : }
    2562             : 
    2563             : static void
    2564       50879 : destroy_segments(segments *s)
    2565             : {
    2566       50879 :         if (!s || sql_ref_dec(&s->r) > 0)
    2567           0 :                 return;
    2568       50879 :         segment *seg = s->h;
    2569      106873 :         while(seg) {
    2570       55994 :                 segment *n = ATOMIC_PTR_GET(&seg->next);
    2571       55994 :                 ATOMIC_PTR_DESTROY(&seg->next);
    2572       55994 :                 _DELETE(seg);
    2573       55994 :                 seg = n;
    2574             :         }
    2575       50879 :         _DELETE(s);
    2576             : }
    2577             : 
    2578             : static void
    2579       52265 : destroy_storage(storage *bat)
    2580             : {
    2581       52265 :         if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
    2582             :                 return;
    2583       50738 :         if (bat->next)
    2584       13957 :                 destroy_storage(bat->next);
    2585       50738 :         destroy_segments(bat->segs);
    2586       50738 :         if (bat->cs.uibid)
    2587       30486 :                 temp_destroy(bat->cs.uibid);
    2588       50738 :         if (bat->cs.uvbid)
    2589       30486 :                 temp_destroy(bat->cs.uvbid);
    2590       50738 :         if (bat->cs.bid)
    2591       50738 :                 temp_destroy(bat->cs.bid);
    2592       50738 :         ATOMIC_DESTROY(&bat->cs.refcnt);
    2593       50738 :         bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
    2594       50738 :         _DELETE(bat);
    2595             : }
    2596             : 
    2597             : static int
    2598      154982 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
    2599             : {
    2600      154982 :         if (uncommitted) {
    2601      397982 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2602      256856 :                         if (!VALID_4_READ(s->ts,tr))
    2603             :                                 return 1;
    2604             :         } else {
    2605       68731 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2606       56027 :                         if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
    2607             :                                 return 1;
    2608             :         }
    2609             : 
    2610             :         return 0;
    2611             : }
    2612             : 
    2613             : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
    2614             : 
    2615             : storage *
    2616     2216895 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
    2617             : {
    2618     2216895 :         storage *obat;
    2619             : 
    2620     2216895 :         obat = ATOMIC_PTR_GET(&t->data);
    2621             : 
    2622     2216895 :         if (obat->cs.ts != tr->tid)
    2623     1448401 :                 if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
    2624     1448370 :                         if (obat->cs.ts >= TRANSACTION_ID_BASE) {
    2625             :                                 /* abort */
    2626       15510 :                                 if (clear)
    2627       15510 :                                         *clear = true;
    2628       15510 :                                 return NULL;
    2629             :                         }
    2630             : 
    2631     2201385 :         if (!clear)
    2632             :                 return obat;
    2633             : 
    2634             :         /* remainder is only to handle clear */
    2635       26262 :         if (segments_conflict(tr, obat->segs, 1)) {
    2636         289 :                 *clear = true;
    2637         289 :                 return NULL;
    2638             :         }
    2639       25973 :         if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
    2640             :                 return NULL;
    2641       25973 :         storage *bat = ZNEW(storage);
    2642       25973 :         if (!bat)
    2643             :                 return NULL;
    2644       25973 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2645       25973 :         if (dup_storage(tr, obat, bat) != LOG_OK) {
    2646           0 :                 destroy_storage(bat);
    2647           0 :                 return NULL;
    2648             :         }
    2649       25973 :         bat->cs.cleared = true;
    2650       25973 :         bat->cs.ts = tr->tid;
    2651             :         /* only one writer else abort */
    2652       25973 :         bat->next = obat;
    2653       25973 :         if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
    2654           2 :                 bat->next = NULL;
    2655           2 :                 destroy_storage(bat);
    2656           2 :                 if (clear)
    2657           2 :                         *clear = true;
    2658           2 :                 return NULL;
    2659             :         }
    2660             :         return bat;
    2661             : }
    2662             : 
    2663             : static int
    2664       64378 : delete_tab(sql_trans *tr, sql_table * t, void *ib, int tpe)
    2665             : {
    2666       64378 :         int ok = LOG_OK;
    2667       64378 :         BAT *b = ib;
    2668       64378 :         storage *bat;
    2669             : 
    2670       64378 :         if (tpe == TYPE_bat && !BATcount(b))
    2671             :                 return ok;
    2672             : 
    2673       64333 :         if (t == NULL)
    2674             :                 return LOG_ERR;
    2675             : 
    2676       64333 :         if ((bat = bind_del_data(tr, t, NULL)) == NULL)
    2677             :                 return LOG_ERR;
    2678             : 
    2679       64333 :         if (tpe == TYPE_bat)
    2680         291 :                 ok = storage_delete_bat(tr, t, bat, ib);
    2681             :         else
    2682       64042 :                 ok = storage_delete_val(tr, t, bat, *(oid*)ib);
    2683             :         return ok;
    2684             : }
    2685             : 
    2686             : static size_t
    2687           0 : dcount_col(sql_trans *tr, sql_column *c)
    2688             : {
    2689           0 :         sql_delta *b;
    2690             : 
    2691           0 :         if (!isTable(c->t))
    2692             :                 return 0;
    2693           0 :         b = col_timestamp_delta(tr, c);
    2694           0 :         if (!b)
    2695             :                 return 1;
    2696             : 
    2697           0 :         storage *s = ATOMIC_PTR_GET(&c->t->data);
    2698           0 :         if (!s || !s->segs->t)
    2699             :                 return 1;
    2700           0 :         size_t cnt = s->segs->t->end;
    2701           0 :         if (cnt) {
    2702           0 :                 BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
    2703           0 :                 size_t dcnt = 0;
    2704             : 
    2705           0 :                 if (v)
    2706           0 :                         dcnt = BATguess_uniques(v, NULL);
    2707           0 :                 return dcnt;
    2708             :         }
    2709             :         return cnt;
    2710             : }
    2711             : 
    2712             : static BAT *
    2713      685445 : bind_no_view(BAT *b, bool quick)
    2714             : {
    2715      685445 :         if (isVIEW(b)) { /* If it is a view get the parent BAT */
    2716      685070 :                 BAT *nb = BBP_desc(VIEWtparent(b));
    2717      685070 :                 bat_destroy(b);
    2718      685069 :                 if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
    2719             :                         return NULL;
    2720             :         }
    2721             :         return b;
    2722             : }
    2723             : 
    2724             : static int
    2725           0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
    2726             : {
    2727           0 :         int ok = 0;
    2728           0 :         assert(tr->active);
    2729           0 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2730           0 :                 return 0;
    2731           0 :         lock_column(tr->store, c->base.id);
    2732           0 :         if (unique_est) {
    2733           0 :                 sql_delta *d;
    2734           0 :                 if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
    2735           0 :                         BAT *b;
    2736           0 :                         if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
    2737           0 :                                 MT_lock_set(&b->theaplock);
    2738           0 :                                 b->tunique_est = *unique_est;
    2739           0 :                                 MT_lock_unset(&b->theaplock);
    2740           0 :                                 bat_destroy(b);
    2741             :                         }
    2742             :                 }
    2743             :         }
    2744           0 :         if (min) {
    2745           0 :                 _DELETE(c->min);
    2746           0 :                 size_t minlen = ATOMlen(c->type.type->localtype, min);
    2747           0 :                 if ((c->min = GDKmalloc(minlen)) != NULL) {
    2748           0 :                         memcpy(c->min, min, minlen);
    2749           0 :                         ok = 1;
    2750             :                 }
    2751             :         }
    2752           0 :         if (max) {
    2753           0 :                 _DELETE(c->max);
    2754           0 :                 size_t maxlen = ATOMlen(c->type.type->localtype, max);
    2755           0 :                 if ((c->max = GDKmalloc(maxlen)) != NULL) {
    2756           0 :                         memcpy(c->max, max, maxlen);
    2757           0 :                         ok = 1;
    2758             :                 }
    2759             :         }
    2760           0 :         unlock_column(tr->store, c->base.id);
    2761           0 :         return ok;
    2762             : }
    2763             : 
    2764             : static int
    2765          26 : min_max_col(sql_trans *tr, sql_column *c)
    2766             : {
    2767          26 :         int ok = 0;
    2768          26 :         BAT *b = NULL;
    2769          26 :         sql_delta *d = NULL;
    2770             : 
    2771          26 :         assert(tr->active);
    2772          26 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2773           0 :                 return 0;
    2774          26 :         if (c->min && c->max)
    2775             :                 return 1;
    2776          26 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2777          26 :                 if (d->cs.st == ST_FOR)
    2778             :                         return 0;
    2779          26 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2780          26 :                 lock_column(tr->store, c->base.id);
    2781          26 :                 if (c->min && c->max) {
    2782           0 :                         unlock_column(tr->store, c->base.id);
    2783           0 :                         return 1;
    2784             :                 }
    2785          26 :                 _DELETE(c->min);
    2786          26 :                 _DELETE(c->max);
    2787          26 :                 if ((b = bind_col(tr, c, access))) {
    2788          26 :                         if (!(b = bind_no_view(b, false))) {
    2789           0 :                                 unlock_column(tr->store, c->base.id);
    2790           0 :                                 return 0;
    2791             :                         }
    2792          26 :                         BATiter bi = bat_iterator(b);
    2793          26 :                         if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
    2794          17 :                                 const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
    2795          17 :                                 size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
    2796             : 
    2797          17 :                                 if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
    2798           0 :                                         _DELETE(c->min);
    2799           0 :                                         _DELETE(c->max);
    2800             :                                 } else {
    2801          17 :                                         memcpy(c->min, nmin, minlen);
    2802          17 :                                         memcpy(c->max, nmax, maxlen);
    2803          17 :                                         ok = 1;
    2804             :                                 }
    2805             :                         }
    2806          26 :                         bat_iterator_end(&bi);
    2807          26 :                         bat_destroy(b);
    2808             :                 }
    2809          26 :                 unlock_column(tr->store, c->base.id);
    2810             :         }
    2811             :         return ok;
    2812             : }
    2813             : 
    2814             : static size_t
    2815          17 : count_segs(segment *s)
    2816             : {
    2817          17 :         size_t nr = 0;
    2818             : 
    2819          72 :         for( ; s; s = ATOMIC_PTR_GET(&s->next))
    2820          55 :                 nr++;
    2821          17 :         return nr;
    2822             : }
    2823             : 
    2824             : static size_t
    2825          34 : count_del(sql_trans *tr, sql_table *t, int access)
    2826             : {
    2827          34 :         storage *d;
    2828             : 
    2829          34 :         if (!isTable(t))
    2830             :                 return 0;
    2831          34 :         d = tab_timestamp_storage(tr, t);
    2832          34 :         if (!d)
    2833             :                 return 0;
    2834          34 :         if (access == 2)
    2835           0 :                 return d->cs.ucnt;
    2836          34 :         if (access == 1)
    2837           0 :                 return count_inserts(d->segs->h, tr);
    2838          34 :         if (access == 10) /* special case for counting the number of segments */
    2839          17 :                 return count_segs(d->segs->h);
    2840          17 :         return count_deletes(d->segs->h, tr);
    2841             : }
    2842             : 
    2843             : static int
    2844       24175 : sorted_col(sql_trans *tr, sql_column *col)
    2845             : {
    2846       24175 :         int sorted = 0;
    2847             : 
    2848       24175 :         assert(tr->active);
    2849       24175 :         if (!isTable(col->t) || !col->t->s)
    2850             :                 return 0;
    2851             : 
    2852       24175 :         if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
    2853       24150 :                 BAT *b = bind_col(tr, col, QUICK);
    2854             : 
    2855       24150 :                 if (b)
    2856       24150 :                         sorted = b->tsorted || b->trevsorted;
    2857             :         }
    2858             :         return sorted;
    2859             : }
    2860             : 
    2861             : static int
    2862        7901 : unique_col(sql_trans *tr, sql_column *col)
    2863             : {
    2864        7901 :         int distinct = 0;
    2865             : 
    2866        7901 :         assert(tr->active);
    2867        7901 :         if (!isTable(col->t) || !col->t->s)
    2868             :                 return 0;
    2869             : 
    2870        7901 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2871        7901 :                 BAT *b = bind_col(tr, col, QUICK);
    2872             : 
    2873        7901 :                 if (b)
    2874        7901 :                         distinct = b->tkey;
    2875             :         }
    2876             :         return distinct;
    2877             : }
    2878             : 
    2879             : static int
    2880        1892 : double_elim_col(sql_trans *tr, sql_column *col)
    2881             : {
    2882        1892 :         int de = 0;
    2883        1892 :         sql_delta *d;
    2884             : 
    2885        1892 :         assert(tr->active);
    2886        1892 :         if (!isTable(col->t) || !col->t->s)
    2887             :                 return 0;
    2888             : 
    2889        1892 :         if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
    2890           6 :                 if (d->cs.st == ST_DICT) {
    2891           6 :                         BAT *b = bind_col(tr, col, QUICK);
    2892           6 :                         if (b && b->ttype == TYPE_bte)
    2893             :                                 de = 1;
    2894           0 :                         else if (b && b->ttype == TYPE_sht)
    2895        1892 :                                 de = 2;
    2896             :                 }
    2897        1886 :         } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
    2898        1886 :                 BAT *b = bind_col(tr, col, QUICK);
    2899             : 
    2900        1886 :                 if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
    2901        1886 :                         de = GDK_ELIMDOUBLES(b->tvheap);
    2902        1886 :                         if (de)
    2903        1660 :                                 de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
    2904             :                 }
    2905        1660 :                 assert(de >= 0 && de <= 16);
    2906             :         }
    2907             :         return de;
    2908             : }
    2909             : 
    2910             : static int
    2911      706345 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
    2912             : {
    2913      706345 :         int ok = 0;
    2914      706345 :         BAT *b = NULL, *off = NULL, *upv = NULL;
    2915      706345 :         sql_delta *d = NULL;
    2916             : 
    2917      706345 :         (void) tr;
    2918      706345 :         assert(tr->active);
    2919      706345 :         *nonil = false;
    2920      706345 :         *unique = false;
    2921      706345 :         *unique_est = 0.0;
    2922      706345 :         if (!c || !isTable(c->t) || !c->t->s)
    2923             :                 return ok;
    2924             : 
    2925      706341 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2926      685232 :                 if (d->cs.st == ST_FOR) {
    2927           9 :                         *nonil = true; /* TODO for min/max. I will do it later */
    2928           9 :                         return ok;
    2929             :                 }
    2930      685223 :                 int eclass = c->type.type->eclass;
    2931      685223 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2932      685223 :                 if ((b = bind_col(tr, c, access))) {
    2933      685228 :                         if (!(b = bind_no_view(b, false)))
    2934           0 :                                 return ok;
    2935      685222 :                         BATiter bi = bat_iterator(b);
    2936      685229 :                         *nonil = bi.nonil && !bi.nil;
    2937             : 
    2938      685229 :                         if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
    2939      651312 :                                 d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
    2940      516971 :                                 if (c->min && VALinit(min, bi.type, c->min))
    2941             :                                         ok |= 1;
    2942      516917 :                                 else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
    2943      513151 :                                         ok |= 1;
    2944      516972 :                                 if (c->max && VALinit(max, bi.type, c->max))
    2945          54 :                                         ok |= 2;
    2946      516918 :                                 else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
    2947      511396 :                                         ok |= 2;
    2948             :                         }
    2949      685229 :                         if (d->cs.ucnt == 0) {
    2950      685206 :                                 if (d->cs.st == ST_DEFAULT) {
    2951      685020 :                                         *unique = bi.key;
    2952      685020 :                                         *unique_est = bi.unique_est;
    2953      685020 :                                         if (*unique_est == 0)
    2954      201890 :                                                 *unique_est = (double)BATguess_uniques(b,NULL);
    2955         186 :                                 } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
    2956             :                                         /* for dict, check the offsets bat for uniqueness */
    2957         186 :                                         MT_lock_set(&off->theaplock);
    2958         186 :                                         *unique = off->tkey;
    2959         186 :                                         *unique_est = off->tunique_est;
    2960         186 :                                         MT_lock_unset(&off->theaplock);
    2961             :                                 }
    2962             :                         }
    2963      685229 :                         bat_iterator_end(&bi);
    2964      685229 :                         bat_destroy(b);
    2965      685223 :                         if (*nonil && d->cs.ucnt > 0) {
    2966             :                                 /* This could use a quick descriptor */
    2967           3 :                                 if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
    2968           0 :                                         *nonil = false;
    2969             :                                 } else {
    2970           3 :                                         MT_lock_set(&upv->theaplock);
    2971           3 :                                         *nonil &= upv->tnonil && !upv->tnil;
    2972           3 :                                         MT_lock_unset(&upv->theaplock);
    2973           3 :                                         bat_destroy(upv);
    2974             :                                 }
    2975             :                         }
    2976             :                 }
    2977             :         }
    2978             :         return ok;
    2979             : }
    2980             : 
    2981             : static int
    2982         245 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
    2983             : {
    2984         245 :         assert(tr->active);
    2985         245 :         if (!isTable(col->t) || !col->t->s)
    2986             :                 return LOG_OK;
    2987             : 
    2988         240 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2989         240 :                 BAT *b = bind_col(tr, col, QUICK);
    2990             : 
    2991         240 :                 if (b) { /* add props for ranges [min, max> */
    2992         240 :                         MT_lock_set(&b->theaplock);
    2993         240 :                         if (add_range) {
    2994         167 :                                 BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
    2995         167 :                                 if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
    2996          99 :                                         BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
    2997             :                                 else
    2998          68 :                                         BATrmprop_nolock(b, GDK_MAX_BOUND);
    2999         167 :                                 if (!pt->with_nills || !col->null)
    3000         111 :                                         BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    3001             :                         } else {
    3002          73 :                                 BATrmprop_nolock(b, GDK_MIN_BOUND);
    3003          73 :                                 BATrmprop_nolock(b, GDK_MAX_BOUND);
    3004          73 :                                 BATrmprop_nolock(b, GDK_NOT_NULL);
    3005             :                         }
    3006         240 :                         MT_lock_unset(&b->theaplock);
    3007             :                 }
    3008             :         }
    3009             :         return LOG_OK;
    3010             : }
    3011             : 
    3012             : static int
    3013        4087 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
    3014             : {
    3015        4087 :         assert(tr->active);
    3016        4087 :         if (!isTable(col->t) || !col->t->s)
    3017             :                 return LOG_OK;
    3018             : 
    3019        4057 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    3020        4057 :                 BAT *b = bind_col(tr, col, QUICK);
    3021             : 
    3022        4057 :                 if (b) { /* add props for ranges [min, max> */
    3023        4057 :                         if (not_null) {
    3024        4055 :                                 BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    3025             :                         } else {
    3026           2 :                                 BATrmprop(b, GDK_NOT_NULL);
    3027             :                         }
    3028             :                 }
    3029             :         }
    3030             :         return LOG_OK;
    3031             : }
    3032             : 
    3033             : static int
    3034       28614 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
    3035             : {
    3036       28614 :         sqlstore *store = tr->store;
    3037       28614 :         int bid = log_find_bat(store->logger, id);
    3038       28614 :         if (bid <= 0)
    3039             :                 return LOG_ERR;
    3040       28614 :         cs->bid = temp_dup(bid);
    3041       28614 :         cs->ucnt = 0;
    3042       28614 :         cs->uibid = e_bat(TYPE_oid);
    3043       28614 :         cs->uvbid = e_bat(type);
    3044       28614 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    3045             :                 return LOG_ERR;
    3046             :         return LOG_OK;
    3047             : }
    3048             : 
    3049             : static int
    3050       66500 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
    3051             : {
    3052       66500 :         int res = LOG_OK;
    3053       66500 :         gdk_return ok;
    3054       66500 :         BAT *b = temp_descriptor(bat->cs.bid);
    3055             : 
    3056       66500 :         if (b == NULL)
    3057             :                 return LOG_ERR;
    3058             : 
    3059       66500 :         if (!bat->cs.uibid)
    3060       66492 :                 bat->cs.uibid = e_bat(TYPE_oid);
    3061       66500 :         if (!bat->cs.uvbid)
    3062       66492 :                 bat->cs.uvbid = e_bat(b->ttype);
    3063       66500 :         if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
    3064           0 :                 res = LOG_ERR;
    3065       66500 :         if (GDKinmemory(0)) {
    3066         174 :                 bat_destroy(b);
    3067         174 :                 return res;
    3068             :         }
    3069             : 
    3070       66326 :         bat_set_access(b, BAT_READ);
    3071       66326 :         sqlstore *store = tr->store;
    3072       66326 :         ok = log_bat_persists(store->logger, b, id);
    3073       66326 :         bat_destroy(b);
    3074       66326 :         if(res != LOG_OK)
    3075             :                 return res;
    3076       66326 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3077             : }
    3078             : 
    3079             : static int
    3080           0 : new_persistent_delta( sql_delta *bat)
    3081             : {
    3082           0 :         bat->cs.ucnt = 0;
    3083           0 :         return LOG_OK;
    3084             : }
    3085             : 
    3086             : static void
    3087      126876 : create_delta( sql_delta *d, BAT *b)
    3088             : {
    3089      126876 :         bat_set_access(b, BAT_READ);
    3090      126876 :         d->cs.bid = temp_create(b);
    3091      126876 :         d->cs.uibid = d->cs.uvbid = 0;
    3092      126876 :         d->cs.ucnt = 0;
    3093      126876 : }
    3094             : 
    3095             : static bat
    3096        7133 : copyBat (bat i, int type, oid seq)
    3097             : {
    3098        7133 :         BAT *b, *tb;
    3099        7133 :         bat res;
    3100             : 
    3101        7133 :         if (!i)
    3102             :                 return i;
    3103        7133 :         tb = quick_descriptor(i);
    3104        7133 :         if (tb == NULL)
    3105             :                 return 0;
    3106        7133 :         b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
    3107        7133 :         if (b == NULL)
    3108             :                 return 0;
    3109             : 
    3110        7133 :         bat_set_access(b, BAT_READ);
    3111             : 
    3112        7133 :         res = temp_create(b);
    3113        7133 :         bat_destroy(b);
    3114        7133 :         return res;
    3115             : }
    3116             : 
    3117             : static int
    3118      148815 : create_col(sql_trans *tr, sql_column *c)
    3119             : {
    3120      148815 :         int ok = LOG_OK, new = 0;
    3121      148815 :         int type = c->type.type->localtype;
    3122      148815 :         sql_delta *bat = ATOMIC_PTR_GET(&c->data);
    3123             : 
    3124      148815 :         if (!bat) {
    3125      148815 :                 new = 1;
    3126      148815 :                 bat = ZNEW(sql_delta);
    3127      148815 :                 if (!bat)
    3128             :                         return LOG_ERR;
    3129      148815 :                 ATOMIC_PTR_SET(&c->data, bat);
    3130      148815 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3131             :         }
    3132             : 
    3133      148815 :         if (new)
    3134      148815 :                 bat->cs.ts = tr->tid;
    3135             : 
    3136      148815 :         if (!isNew(c)&& !isTempTable(c->t)){
    3137       21917 :                 bat->cs.ts = tr->ts;
    3138       21917 :                 ok = load_cs(tr, &bat->cs, type, c->base.id);
    3139       21917 :                 if (ok == LOG_OK && c->storage_type) {
    3140           4 :                         if (strcmp(c->storage_type, "DICT") == 0) {
    3141           2 :                                 sqlstore *store = tr->store;
    3142           2 :                                 int bid = log_find_bat(store->logger, -c->base.id);
    3143           2 :                                 if (bid <= 0)
    3144             :                                         return LOG_ERR;
    3145           2 :                                 bat->cs.ebid = temp_dup(bid);
    3146           2 :                                 bat->cs.st = ST_DICT;
    3147           2 :                         } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
    3148           2 :                                 bat->cs.st = ST_FOR;
    3149             :                         }
    3150             :                 }
    3151       21917 :                 return ok;
    3152      126898 :         } else if (bat && bat->cs.bid) {
    3153           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
    3154             :         } else {
    3155      126898 :                 sql_column *fc = NULL;
    3156      126898 :                 size_t cnt = 0;
    3157             : 
    3158             :                 /* alter ? */
    3159      126898 :                 if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
    3160       78214 :                         storage *s = tab_timestamp_storage(tr, fc->t);
    3161       78214 :                         if (s == NULL)
    3162             :                                 return LOG_ERR;
    3163       78214 :                         cnt = segs_end(s->segs, tr, c->t);
    3164             :                 }
    3165      126898 :                 if (cnt && fc != c) {
    3166          22 :                         sql_delta *d = ATOMIC_PTR_GET(&fc->data);
    3167             : 
    3168          22 :                         if (d->cs.bid) {
    3169          22 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3170          22 :                                 if(bat->cs.bid == BID_NIL)
    3171          22 :                                         ok = LOG_ERR;
    3172             :                         }
    3173          22 :                         if (d->cs.uibid) {
    3174          10 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3175          10 :                                 if (bat->cs.uibid == BID_NIL)
    3176          22 :                                         ok = LOG_ERR;
    3177             :                         }
    3178          22 :                         if (d->cs.uvbid) {
    3179          10 :                                 bat->cs.uvbid = e_bat(type);
    3180          10 :                                 if(bat->cs.uvbid == BID_NIL)
    3181           0 :                                         ok = LOG_ERR;
    3182             :                         }
    3183             :                 } else {
    3184      126876 :                         BAT *b = bat_new(type, c->t->sz, PERSISTENT);
    3185      126876 :                         if (!b) {
    3186             :                                 ok = LOG_ERR;
    3187             :                         } else {
    3188      126876 :                                 create_delta(ATOMIC_PTR_GET(&c->data), b);
    3189      126876 :                                 bat_destroy(b);
    3190             :                         }
    3191             : 
    3192      126876 :                         if (!new) {
    3193           0 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3194           0 :                                 if (bat->cs.uibid == BID_NIL)
    3195           0 :                                         ok = LOG_ERR;
    3196           0 :                                 bat->cs.uvbid = e_bat(type);
    3197           0 :                                 if(bat->cs.uvbid == BID_NIL)
    3198           0 :                                         ok = LOG_ERR;
    3199             :                         }
    3200             :                 }
    3201      126898 :                 bat->cs.ucnt = 0;
    3202             : 
    3203      126898 :                 if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
    3204          90 :                         trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
    3205             :         }
    3206             :         return ok;
    3207             : }
    3208             : 
    3209             : static int
    3210       60292 : log_create_col_(sql_trans *tr, sql_column *c)
    3211             : {
    3212       60292 :         assert(!isTempTable(c->t));
    3213       60292 :         return log_create_delta(tr,  ATOMIC_PTR_GET(&c->data), c->base.id);
    3214             : }
    3215             : 
    3216             : static int
    3217          86 : log_create_col(sql_trans *tr, sql_change *change)
    3218             : {
    3219          86 :         return log_create_col_(tr, (sql_column*)change->obj);
    3220             : }
    3221             : 
    3222             : static int
    3223      114165 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
    3224             : {
    3225      114165 :         (void) t; // TODO transaction_layer_revamp: remove if unnecessary
    3226      114165 :         (void)oldest;
    3227      114165 :         assert(delta->cs.ts == tr->tid);
    3228      114165 :         delta->cs.ts = commit_ts;
    3229             : 
    3230      114165 :         assert(delta->next == NULL);
    3231      114165 :         if (!delta->cs.merged)
    3232      114164 :                 merge_delta(delta);
    3233      114165 :         if (!tr->parent)
    3234      114161 :                 base->new = 0;
    3235      114165 :         return LOG_OK;
    3236             : }
    3237             : 
    3238             : static int
    3239          90 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3240             : {
    3241          90 :         sql_column *c = (sql_column*)change->obj;
    3242          90 :         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3243          90 :         if (!tr->parent)
    3244          89 :                 c->base.new = 0;
    3245          90 :         return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
    3246             : }
    3247             : 
    3248             : /* will be called for new idx's and when new index columns are created */
    3249             : static int
    3250        9262 : create_idx(sql_trans *tr, sql_idx *ni)
    3251             : {
    3252        9262 :         int ok = LOG_OK, new = 0;
    3253        9262 :         sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
    3254        9262 :         int type = TYPE_lng;
    3255             : 
    3256        9262 :         if (oid_index(ni->type))
    3257         951 :                 type = TYPE_oid;
    3258             : 
    3259        9262 :         if (!bat) {
    3260        9262 :                 new = 1;
    3261        9262 :                 bat = ZNEW(sql_delta);
    3262        9262 :                 if (!bat)
    3263             :                         return LOG_ERR;
    3264        9262 :                 ATOMIC_PTR_SET(&ni->data, bat);
    3265        9262 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3266             :         }
    3267             : 
    3268        9262 :         if (new)
    3269        9262 :                 bat->cs.ts = tr->tid;
    3270             : 
    3271        9262 :         if (!isNew(ni) && !isTempTable(ni->t)){
    3272        2151 :                 bat->cs.ts = 1;
    3273        2151 :                 return load_cs(tr, &bat->cs, type, ni->base.id);
    3274        7111 :         } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
    3275           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
    3276             :         } else {
    3277        7111 :                 sql_column *c = ol_first_node(ni->t->columns)->data;
    3278        7111 :                 sql_delta *d = col_timestamp_delta(tr, c);
    3279             : 
    3280        7111 :                 if (d) {
    3281             :                         /* Here we also handle indices created through alter stmts */
    3282             :                         /* These need to be created aligned to the existing data */
    3283        7111 :                         if (d->cs.bid) {
    3284        7111 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3285        7111 :                                 if(bat->cs.bid == BID_NIL)
    3286        7111 :                                         ok = LOG_ERR;
    3287             :                         }
    3288             :                 } else {
    3289             :                         return LOG_ERR;
    3290             :                 }
    3291             : 
    3292        7111 :                 bat->cs.ucnt = 0;
    3293             : 
    3294        7111 :                 if (!new) {
    3295           0 :                         bat->cs.uibid = e_bat(TYPE_oid);
    3296           0 :                         if (bat->cs.uibid == BID_NIL)
    3297           0 :                                 ok = LOG_ERR;
    3298           0 :                         bat->cs.uvbid = e_bat(type);
    3299           0 :                         if(bat->cs.uvbid == BID_NIL)
    3300           0 :                                 ok = LOG_ERR;
    3301             :                 }
    3302        7111 :                 bat->cs.ucnt = 0;
    3303        7111 :                 if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
    3304         631 :                         trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
    3305             :         }
    3306             :         return ok;
    3307             : }
    3308             : 
    3309             : static int
    3310        6208 : log_create_idx_(sql_trans *tr, sql_idx *i)
    3311             : {
    3312        6208 :         assert(!isTempTable(i->t));
    3313        6208 :         return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3314             : }
    3315             : 
    3316             : static int
    3317         617 : log_create_idx(sql_trans *tr, sql_change *change)
    3318             : {
    3319         617 :         return log_create_idx_(tr, (sql_idx*)change->obj);
    3320             : }
    3321             : 
    3322             : static int
    3323         631 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3324             : {
    3325         631 :         sql_idx *i = (sql_idx*)change->obj;
    3326         631 :         sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3327         631 :         if (!tr->parent)
    3328         631 :                 i->base.new = 0;
    3329         631 :         return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
    3330             :         return LOG_OK;
    3331             : }
    3332             : 
    3333             : static int
    3334        4546 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
    3335             : {
    3336        4546 :         int ok = load_cs(tr, &s->cs, TYPE_msk, id);
    3337        4546 :         BAT *b = NULL, *ib = NULL;
    3338             : 
    3339        4546 :         if (ok != LOG_OK)
    3340             :                 return ok;
    3341        4546 :         if (!(b = temp_descriptor(s->cs.bid)))
    3342             :                 return LOG_ERR;
    3343        4546 :         ib = b;
    3344             : 
    3345        4546 :         if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
    3346           0 :                 bat_destroy(ib);
    3347           0 :                 return LOG_ERR;
    3348             :         }
    3349             : 
    3350        4546 :         if (BATcount(b)) {
    3351         261 :                 if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
    3352           0 :                         bat_destroy(ib);
    3353           0 :                         return LOG_ERR;
    3354             :                 }
    3355         398 :                 if (BATtdense(b)) {
    3356         137 :                         size_t start = b->tseqbase;
    3357         137 :                         size_t cnt = BATcount(b);
    3358         137 :                         ok = delete_range(tr, t, s, start, cnt);
    3359             :                 } else {
    3360         124 :                         assert(b->tsorted);
    3361         124 :                         BUN icnt = BATcount(b);
    3362         124 :                         BATiter bi = bat_iterator(b);
    3363         124 :                         size_t lcnt = 1;
    3364         124 :                         oid n;
    3365         124 :                         segment *seg = s->segs->h;
    3366         124 :                         if (complex_cand(b)) {
    3367           0 :                                 oid o = * (oid *) Tpos(&bi, 0);
    3368           0 :                                 n = o + 1;
    3369           0 :                                 for (BUN i = 1; i < icnt; i++) {
    3370           0 :                                         o = * (oid *) Tpos(&bi, i);
    3371           0 :                                         if (o == n) {
    3372           0 :                                                 lcnt++;
    3373           0 :                                                 n++;
    3374             :                                         } else {
    3375           0 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3376             :                                                         break;
    3377             :                                                 lcnt = 0;
    3378             :                                         }
    3379           0 :                                         if (!lcnt) {
    3380           0 :                                                 n = o + 1;
    3381           0 :                                                 lcnt = 1;
    3382             :                                         }
    3383             :                                 }
    3384             :                         } else {
    3385         124 :                                 oid *o = bi.base;
    3386         124 :                                 n = o[0]+1;
    3387      191799 :                                 for (size_t i=1; i<icnt; i++) {
    3388      191675 :                                         if (o[i] == n) {
    3389      190202 :                                                 lcnt++;
    3390      190202 :                                                 n++;
    3391             :                                         } else {
    3392        1473 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3393             :                                                         break;
    3394             :                                                 lcnt = 0;
    3395             :                                         }
    3396      191675 :                                         if (!lcnt) {
    3397        1473 :                                                 n = o[i]+1;
    3398        1473 :                                                 lcnt = 1;
    3399             :                                         }
    3400             :                                 }
    3401             :                         }
    3402         124 :                         if (lcnt && ok == LOG_OK)
    3403         124 :                                 ok = delete_range(tr, t, s, n-lcnt, lcnt);
    3404         124 :                         bat_iterator_end(&bi);
    3405             :                 }
    3406         261 :                 if (ok == LOG_OK)
    3407        3880 :                         for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
    3408        3619 :                                 if (seg->ts == tr->tid)
    3409        1888 :                                         seg->ts = 1;
    3410             :         } else {
    3411        4285 :                 if (ok == LOG_OK) {
    3412        4285 :                         BAT *bb = quick_descriptor(s->cs.bid);
    3413             : 
    3414        4285 :                         if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
    3415             :                                 ok = LOG_ERR;
    3416             :                         } else {
    3417        4285 :                                 segment *seg = s->segs->h;
    3418        4285 :                                 if (seg->ts == tr->tid)
    3419        4285 :                                         seg->ts = 1;
    3420             :                         }
    3421             :                 }
    3422             :         }
    3423        4546 :         if (b != ib)
    3424        4546 :                 bat_destroy(b);
    3425        4546 :         bat_destroy(ib);
    3426             : 
    3427        4546 :         return ok;
    3428             : }
    3429             : 
    3430             : static int
    3431       24804 : create_del(sql_trans *tr, sql_table *t)
    3432             : {
    3433       24804 :         int ok = LOG_OK, new = 0;
    3434       24804 :         BAT *b;
    3435       24804 :         storage *bat = ATOMIC_PTR_GET(&t->data);
    3436             : 
    3437       24804 :         if (!bat) {
    3438       24804 :                 new = 1;
    3439       24804 :                 bat = ZNEW(storage);
    3440       24804 :                 if(!bat)
    3441             :                         return LOG_ERR;
    3442       24804 :                 ATOMIC_PTR_SET(&t->data, bat);
    3443       24804 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3444       24804 :                 bat->cs.ts = tr->tid;
    3445             :         }
    3446             : 
    3447       24804 :         if (!isNew(t) && !isTempTable(t)) {
    3448        4546 :                 bat->cs.ts = tr->ts;
    3449        4546 :                 return load_storage(tr, t, bat, t->base.id);
    3450       20258 :         } else if (bat->cs.bid) {
    3451             :                 return ok;
    3452             :         } else {
    3453       20258 :                 assert(!bat->segs);
    3454       20258 :                 if (!(bat->segs = new_segments(tr, 0)))
    3455             :                         return LOG_ERR;
    3456             : 
    3457       20258 :                 b = bat_new(TYPE_msk, t->sz, PERSISTENT);
    3458       20258 :                 if(b != NULL) {
    3459       20258 :                         bat_set_access(b, BAT_READ);
    3460       20258 :                         bat->cs.bid = temp_create(b);
    3461       20258 :                         bat_destroy(b);
    3462             :                 } else {
    3463             :                         return LOG_ERR;
    3464             :                 }
    3465       20258 :                 if (new)
    3466       26708 :                         trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
    3467             :         }
    3468             :         return ok;
    3469             : }
    3470             : 
    3471             : static int
    3472      182774 : log_segment(sql_trans *tr, segment *s, sqlid id)
    3473             : {
    3474      182774 :         sqlstore *store = tr->store;
    3475      182774 :         msk m = s->deleted;
    3476      182774 :         return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
    3477             : }
    3478             : 
    3479             : static int
    3480       94225 : log_segments(sql_trans *tr, segments *segs, sqlid id)
    3481             : {
    3482             :         /* log segments */
    3483       94225 :         lock_table(tr->store, id);
    3484      470295 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    3485      376070 :                 unlock_table(tr->store, id);
    3486      376070 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    3487      137896 :                         if (log_segment(tr, seg, id) != LOG_OK) {
    3488           0 :                                 unlock_table(tr->store, id);
    3489           0 :                                 return LOG_ERR;
    3490             :                         }
    3491             :                 }
    3492      376070 :                 lock_table(tr->store, id);
    3493             :         }
    3494       94225 :         unlock_table(tr->store, id);
    3495       94225 :         return LOG_OK;
    3496             : }
    3497             : 
    3498             : static int
    3499       12049 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
    3500             : {
    3501       12049 :         BAT *b;
    3502       12049 :         int ok = LOG_OK;
    3503             : 
    3504       12049 :         if (GDKinmemory(0))
    3505             :                 return LOG_OK;
    3506             : 
    3507       12017 :         b = temp_descriptor(bat->cs.bid);
    3508       12017 :         if (b == NULL)
    3509             :                 return LOG_ERR;
    3510             : 
    3511       12017 :         sqlstore *store = tr->store;
    3512       12017 :         bat_set_access(b, BAT_READ);
    3513       12017 :         if (ok == LOG_OK)
    3514       12017 :                 ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
    3515       12017 :         if (ok == LOG_OK)
    3516       12017 :                 ok = log_segments(tr, bat->segs, t->base.id);
    3517       12017 :         bat_destroy(b);
    3518       12017 :         return ok;
    3519             : }
    3520             : 
    3521             : static int
    3522       12063 : log_create_del(sql_trans *tr, sql_change *change)
    3523             : {
    3524       12063 :         int ok = LOG_OK;
    3525       12063 :         sql_table *t = (sql_table*)change->obj;
    3526             : 
    3527       12063 :         if (t->base.deleted)
    3528             :                 return ok;
    3529       12049 :         assert(!isTempTable(t));
    3530       12049 :         ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
    3531       12049 :         if (ok == LOG_OK) {
    3532       72255 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3533       60206 :                         sql_column *c = n->data;
    3534             : 
    3535       60206 :                         ok = log_create_col_(tr, c);
    3536             :                 }
    3537       12049 :                 if (t->idxs) {
    3538       17648 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3539        5599 :                                 sql_idx *i = n->data;
    3540             : 
    3541        5599 :                                 if (ATOMIC_PTR_GET(&i->data))
    3542        5591 :                                         ok = log_create_idx_(tr, i);
    3543             :                         }
    3544             :                 }
    3545             :         }
    3546             :         return ok;
    3547             : }
    3548             : 
    3549             : static int
    3550       20260 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3551             : {
    3552       20260 :         int ok = LOG_OK;
    3553       20260 :         sql_table *t = (sql_table*)change->obj;
    3554       20260 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    3555             : 
    3556       20260 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    3557         119 :                 assert(isTempTable(t));
    3558         119 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    3559         119 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    3560         119 :                 return ok;
    3561             :         }
    3562             : 
    3563       20141 :         if (!commit_ts) /* rollback handled by ? */
    3564             :                 return ok;
    3565       18295 :         ok = segments2cs(tr, dbat->segs, &dbat->cs);
    3566       18295 :         assert(ok == LOG_OK);
    3567       18295 :         if (ok != LOG_OK)
    3568             :                 return ok;
    3569       18295 :         merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
    3570       18295 :         assert(dbat->cs.ts == tr->tid);
    3571       18295 :         dbat->cs.ts = commit_ts;
    3572       18295 :         if (ok == LOG_OK) {
    3573      126135 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3574      107840 :                         sql_column *c = n->data;
    3575      107840 :                         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3576             : 
    3577      107840 :                         ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
    3578             :                 }
    3579       18295 :                 if (t->idxs) {
    3580       23907 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3581        5612 :                                 sql_idx *i = n->data;
    3582        5612 :                                 sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3583             : 
    3584        5612 :                                 if (delta)
    3585        5604 :                                         ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
    3586             :                         }
    3587             :                 }
    3588       18295 :                 if (!tr->parent)
    3589       18293 :                         t->base.new = 0;
    3590             :         }
    3591       18295 :         if (!tr->parent)
    3592       18293 :                 t->base.new = 0;
    3593             :         return ok;
    3594             : }
    3595             : 
    3596             : static int
    3597       18245 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
    3598             : {
    3599       18245 :         gdk_return ok = GDK_SUCCEED;
    3600             : 
    3601       18245 :         sqlstore *store = tr->store;
    3602       18245 :         if (!GDKinmemory(0) && b && b->cs.bid)
    3603       18242 :                 ok = log_bat_transient(store->logger, id);
    3604       18245 :         if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
    3605          25 :                 ok = log_bat_transient(store->logger, -id);
    3606       18245 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3607             : }
    3608             : 
    3609             : static int
    3610      166552 : destroy_col(sqlstore *store, sql_column *c)
    3611             : {
    3612      166552 :         (void)store;
    3613      166552 :         if (ATOMIC_PTR_GET(&c->data))
    3614      166552 :                 destroy_delta(ATOMIC_PTR_GET(&c->data), true);
    3615      166552 :         ATOMIC_PTR_SET(&c->data, NULL);
    3616      166552 :         return LOG_OK;
    3617             : }
    3618             : 
    3619             : static int
    3620       16453 : log_destroy_col_(sql_trans *tr, sql_column *c)
    3621             : {
    3622       16453 :         int ok = LOG_OK;
    3623       16453 :         assert(!isTempTable(c->t));
    3624       16453 :         if (!tr->parent) /* don't write save point commits */
    3625       16453 :                 ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
    3626       16453 :         return ok;
    3627             : }
    3628             : 
    3629             : static int
    3630          55 : log_destroy_col(sql_trans *tr, sql_change *change)
    3631             : {
    3632          55 :         sql_column *c = (sql_column*)change->obj;
    3633          55 :         int res = log_destroy_col_(tr, c);
    3634          55 :         change->obj = NULL;
    3635          55 :         column_destroy(tr->store, c);
    3636          55 :         return res;
    3637             : }
    3638             : 
    3639             : static int
    3640       10639 : destroy_idx(sqlstore *store, sql_idx *i)
    3641             : {
    3642       10639 :         (void)store;
    3643       10639 :         if (ATOMIC_PTR_GET(&i->data))
    3644       10639 :                 destroy_delta(ATOMIC_PTR_GET(&i->data), true);
    3645       10639 :         ATOMIC_PTR_SET(&i->data, NULL);
    3646       10639 :         return LOG_OK;
    3647             : }
    3648             : 
    3649             : static int
    3650        1854 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
    3651             : {
    3652        1854 :         int ok = LOG_OK;
    3653        1854 :         assert(!isTempTable(i->t));
    3654        1854 :         if (ATOMIC_PTR_GET(&i->data)) {
    3655        1792 :                 if (!tr->parent) /* don't write save point commits */
    3656        1792 :                         ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3657             :         }
    3658        1854 :         return ok;
    3659             : }
    3660             : 
    3661             : static int
    3662         501 : log_destroy_idx(sql_trans *tr, sql_change *change)
    3663             : {
    3664         501 :         sql_idx *i = (sql_idx*)change->obj;
    3665         501 :         int res = log_destroy_idx_(tr, i);
    3666         501 :         change->obj = NULL;
    3667         501 :         idx_destroy(tr->store, i);
    3668         501 :         return res;
    3669             : }
    3670             : 
    3671             : static int
    3672       26302 : destroy_del(sqlstore *store, sql_table *t)
    3673             : {
    3674       26302 :         (void)store;
    3675       26302 :         if (ATOMIC_PTR_GET(&t->data))
    3676       26292 :                 destroy_storage(ATOMIC_PTR_GET(&t->data));
    3677       26302 :         ATOMIC_PTR_SET(&t->data, NULL);
    3678       26302 :         return LOG_OK;
    3679             : }
    3680             : 
    3681             : static int
    3682        3085 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
    3683             : {
    3684        3085 :         gdk_return ok = GDK_SUCCEED;
    3685             : 
    3686        3085 :         sqlstore *store = tr->store;
    3687        3085 :         if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
    3688        3085 :             bat && bat->cs.bid)
    3689        3085 :                 ok = log_bat_transient(store->logger, id);
    3690        3085 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3691             : }
    3692             : 
    3693             : static int
    3694        3085 : log_destroy_del(sql_trans *tr, sql_change *change)
    3695             : {
    3696        3085 :         int ok = LOG_OK;
    3697        3085 :         sql_table *t = (sql_table*)change->obj;
    3698             : 
    3699        3085 :         assert(!isTempTable(t));
    3700        3085 :         ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
    3701        3085 :         if (ok == LOG_OK) {
    3702       19483 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3703       16398 :                         sql_column *c = n->data;
    3704             : 
    3705       16398 :                         ok = log_destroy_col_(tr, c);
    3706             :                 }
    3707        3085 :                 if (t->idxs) {
    3708        4438 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3709        1353 :                                 sql_idx *i = n->data;
    3710             : 
    3711        1353 :                                 ok = log_destroy_idx_(tr, i);
    3712             :                         }
    3713             :                 }
    3714             :         }
    3715        3085 :         return ok;
    3716             : }
    3717             : 
    3718             : static int
    3719        3723 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3720             : {
    3721        3723 :         (void)tr;
    3722        3723 :         (void)change;
    3723        3723 :         (void)commit_ts;
    3724        3723 :         (void)oldest;
    3725        3723 :         if (commit_ts)
    3726        3708 :                 change->handled = true;
    3727        3723 :         return 0;
    3728             : }
    3729             : 
    3730             : static int
    3731        3158 : drop_del(sql_trans *tr, sql_table *t)
    3732             : {
    3733        3158 :         int ok = LOG_OK;
    3734             : 
    3735        3158 :         if (!isNew(t)) {
    3736        3158 :                 storage *bat = ATOMIC_PTR_GET(&t->data);
    3737        3225 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_destroy_del);
    3738             :         }
    3739        3158 :         return ok;
    3740             : }
    3741             : 
    3742             : static int
    3743          63 : drop_col(sql_trans *tr, sql_column *c)
    3744             : {
    3745          63 :         assert(!isNew(c));
    3746          63 :         sql_delta *d = ATOMIC_PTR_GET(&c->data);
    3747          63 :         trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_destroy_col);
    3748          63 :         return LOG_OK;
    3749             : }
    3750             : 
    3751             : static int
    3752         502 : drop_idx(sql_trans *tr, sql_idx *i)
    3753             : {
    3754         502 :         assert(!isNew(i));
    3755         502 :         sql_delta *d = ATOMIC_PTR_GET(&i->data);
    3756         502 :         trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_destroy_idx);
    3757         502 :         return LOG_OK;
    3758             : }
    3759             : 
    3760             : 
    3761             : static BUN
    3762      129591 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
    3763             : {
    3764      129591 :         BAT *b;
    3765      129591 :         BUN sz = 0;
    3766             : 
    3767      129591 :         (void)tr;
    3768      129591 :         assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
    3769      129591 :         if (cs->bid && renew) {
    3770      129591 :                 b = quick_descriptor(cs->bid);
    3771      129573 :                 if (b) {
    3772      129573 :                         sz += BATcount(b);
    3773      129573 :                         if (cs->st == ST_DICT) {
    3774           2 :                                 bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
    3775           2 :                                 BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
    3776             : 
    3777           2 :                                 if (nebid == BID_NIL || !n) {
    3778           0 :                                         temp_destroy(nebid);
    3779           0 :                                         bat_destroy(n);
    3780           0 :                                         return BUN_NONE;
    3781             :                                 }
    3782           2 :                                 temp_destroy(cs->ebid);
    3783           2 :                                 cs->ebid = nebid;
    3784           2 :                                 if (!temp)
    3785           2 :                                         bat_set_access(n, BAT_READ);
    3786           2 :                                 temp_destroy(cs->bid);
    3787           2 :                                 cs->bid = temp_create(n); /* create empty copy */
    3788           2 :                                 bat_destroy(n);
    3789             :                         } else {
    3790      129571 :                                 bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
    3791             : 
    3792      129569 :                                 if (nbid == BID_NIL)
    3793             :                                         return BUN_NONE;
    3794      129569 :                                 temp_destroy(cs->bid);
    3795      129569 :                                 cs->bid = nbid;
    3796             :                         }
    3797             :                 } else {
    3798             :                         return BUN_NONE;
    3799             :                 }
    3800             :         }
    3801      129571 :         if (cs->uibid) {
    3802      129428 :                 temp_destroy(cs->uibid);
    3803      129455 :                 cs->uibid = 0;
    3804             :         }
    3805      129598 :         if (cs->uvbid) {
    3806      129455 :                 temp_destroy(cs->uvbid);
    3807      129455 :                 cs->uvbid = 0;
    3808             :         }
    3809      129598 :         cs->cleared = true;
    3810      129598 :         cs->ucnt = 0;
    3811      129598 :         return sz;
    3812             : }
    3813             : 
    3814             : static BUN
    3815      129448 : clear_col(sql_trans *tr, sql_column *c, bool renew)
    3816             : {
    3817      129448 :         bool update_conflict = false;
    3818      129448 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    3819             : 
    3820      129448 :         if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
    3821           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3822      129448 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    3823      129448 :         if (odelta != delta)
    3824      129453 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    3825      129437 :         if (delta)
    3826      129437 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
    3827             :         return 0;
    3828             : }
    3829             : 
    3830             : static BUN
    3831          21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
    3832             : {
    3833          21 :         bool update_conflict = false;
    3834          21 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    3835             : 
    3836          21 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    3837          15 :                 return 0;
    3838           6 :         if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
    3839           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3840           6 :         assert(i->t->persistence != SQL_DECLARED_TABLE);
    3841           6 :         if (odelta != delta)
    3842           6 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    3843           6 :         if (delta)
    3844           6 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
    3845             :         return 0;
    3846             : }
    3847             : 
    3848             : static int
    3849         141 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
    3850             : {
    3851         141 :         if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
    3852             :                 return LOG_ERR;
    3853         141 :         if (s->segs)
    3854         141 :                 destroy_segments(s->segs);
    3855         141 :         if (!(s->segs = new_segments(tr, 0)))
    3856             :                 return LOG_ERR;
    3857             :         return LOG_OK;
    3858             : }
    3859             : 
    3860             : 
    3861             : /*
    3862             :  * Clear the table, in general this means replacing the storage,
    3863             :  * but in case of earlier deletes (or inserts by this transaction), we only mark
    3864             :  * all segments as deleted.
    3865             :  * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
    3866             :  */
    3867             : static BUN
    3868       41811 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
    3869             : {
    3870       41811 :         int clear = !in_transaction, ok = LOG_OK;
    3871       41811 :         bool conflict = false;
    3872       41811 :         storage *bat;
    3873             : 
    3874       41850 :         if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
    3875       15801 :                 return conflict?BUN_NONE-1:BUN_NONE;
    3876             : 
    3877       26010 :         if (!clear) {
    3878          39 :                 lock_table(tr->store, t->base.id);
    3879          39 :                 ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
    3880          39 :                 unlock_table(tr->store, t->base.id);
    3881             :         }
    3882       26010 :         assert(t->persistence != SQL_DECLARED_TABLE);
    3883       26010 :         if (!in_transaction)
    3884       25974 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    3885       26010 :         if (ok == LOG_ERR)
    3886             :                 return BUN_NONE;
    3887       26010 :         if (ok == LOG_CONFLICT)
    3888           0 :                 return BUN_NONE - 1;
    3889             :         return LOG_OK;
    3890             : }
    3891             : 
    3892             : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
    3893             : static BUN
    3894       41811 : clear_table(sql_trans *tr, sql_table *t)
    3895             : {
    3896       41811 :         node *n = ol_first_node(t->columns);
    3897       41811 :         sql_column *c = n->data;
    3898       41811 :         storage *d = tab_timestamp_storage(tr, t);
    3899       41811 :         int in_transaction, clear;
    3900       41811 :         BUN sz, clear_ok;
    3901             : 
    3902       41811 :         if (!d)
    3903             :                 return BUN_NONE;
    3904       41811 :         in_transaction = segments_in_transaction(tr, t);
    3905       41811 :         clear = !in_transaction;
    3906       41811 :         sz = count_col(tr, c, CNT_ACTIVE);
    3907       41811 :         if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
    3908             :                 return clear_ok;
    3909             : 
    3910       26010 :         if (in_transaction)
    3911             :                 return sz;
    3912             : 
    3913      155419 :         for (; n; n = n->next) {
    3914      129448 :                 c = n->data;
    3915             : 
    3916      129448 :                 if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
    3917           0 :                         return clear_ok;
    3918             :         }
    3919       25971 :         if (t->idxs) {
    3920       25992 :                 for (n = ol_first_node(t->idxs); n; n = n->next) {
    3921          21 :                         sql_idx *ci = n->data;
    3922             : 
    3923          21 :                         if (isTable(ci->t) && idx_has_column(ci->type) &&
    3924          21 :                                 (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
    3925           0 :                                 return clear_ok;
    3926             :                 }
    3927             :         }
    3928             :         return sz;
    3929             : }
    3930             : 
    3931             : static int
    3932      158729 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
    3933             : {
    3934      158729 :         sqlstore *store = tr->store;
    3935      158729 :         gdk_return ok = GDK_SUCCEED;
    3936             : 
    3937      158729 :         (void) t;
    3938      158729 :         (void) segs;
    3939      158729 :         if (GDKinmemory(0))
    3940             :                 return LOG_OK;
    3941             : 
    3942      158722 :         if (cs->cleared) {
    3943      155473 :                 assert(cs->ucnt == 0);
    3944      155473 :                 BAT *ins = temp_descriptor(cs->bid);
    3945      155473 :                 if (!ins)
    3946             :                         return LOG_ERR;
    3947      155473 :                 assert(!isEbat(ins));
    3948      155473 :                 bat_set_access(ins, BAT_READ);
    3949      155473 :                 ok = log_bat_persists(store->logger, ins, id);
    3950      155473 :                 bat_destroy(ins);
    3951      155473 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    3952          56 :                         BAT *ins = temp_descriptor(cs->ebid);
    3953          56 :                         if (!ins)
    3954             :                                 return LOG_ERR;
    3955          56 :                         assert(!isEbat(ins));
    3956          56 :                         bat_set_access(ins, BAT_READ);
    3957          56 :                         ok = log_bat_persists(store->logger, ins, -id);
    3958          56 :                         bat_destroy(ins);
    3959             :                 }
    3960      155473 :                 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3961             :         }
    3962             : 
    3963        3249 :         assert(!isTempTable(t));
    3964             : 
    3965        3249 :         if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
    3966        2699 :                 BAT *ui = temp_descriptor(cs->uibid);
    3967        2699 :                 BAT *uv = temp_descriptor(cs->uvbid);
    3968             :                 /* any updates */
    3969        2699 :                 if (ui == NULL || uv == NULL) {
    3970             :                         ok = GDK_FAIL;
    3971        2699 :                 } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
    3972        2699 :                         ok = log_delta(store->logger, ui, uv, id);
    3973        2699 :                 bat_destroy(ui);
    3974        2699 :                 bat_destroy(uv);
    3975             :         }
    3976        2699 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3977             : }
    3978             : 
    3979             : static inline int
    3980       56243 : tr_log_table_start(sql_trans *tr, sql_table *t) {
    3981       56243 :         sqlstore *store = tr->store;
    3982       56243 :         return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3983             : }
    3984             : 
    3985             : static inline int
    3986       56243 : tr_log_table_end(sql_trans *tr, sql_table *t) {
    3987       56243 :         sqlstore *store = tr->store;
    3988       56243 :         return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3989             : }
    3990             : 
    3991             : static int
    3992       56243 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
    3993             : {
    3994       56243 :         sqlstore *store = tr->store;
    3995       56243 :         gdk_return ok = GDK_SUCCEED;
    3996             : 
    3997       56243 :         size_t end = segs_end(segs, tr, t);
    3998             : 
    3999       56243 :         if (tr_log_table_start(tr, t) != LOG_OK)
    4000             :                 return LOG_ERR;
    4001             : 
    4002       56243 :         size_t nr_appends = 0;
    4003             : 
    4004       56243 :         lock_table(tr->store, t->base.id);
    4005      394417 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    4006      338174 :                 unlock_table(tr->store, t->base.id);
    4007             : 
    4008      338174 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    4009      107562 :                         if (!seg->deleted) {
    4010       44878 :                                 if (log_segment(tr, seg, t->base.id) != LOG_OK)
    4011             :                                         return LOG_ERR;
    4012             : 
    4013       44878 :                                 nr_appends += (seg->end - seg->start);
    4014             :                         }
    4015             :                 }
    4016      338174 :                 lock_table(tr->store, t->base.id);
    4017             :         }
    4018       56243 :         unlock_table(tr->store, t->base.id);
    4019             : 
    4020      388866 :         for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
    4021      332623 :                 sql_column *c = n->data;
    4022      332623 :                 column_storage *cs = ATOMIC_PTR_GET(&c->data);
    4023             : 
    4024      332623 :                 if (cs->cleared) {
    4025           3 :                         ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    4026           3 :                         continue;
    4027             :                 }
    4028             : 
    4029      332620 :                 lock_table(tr->store, t->base.id);
    4030      332620 :                 if (!cs->cleared) {
    4031     2300227 :                         for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    4032     1967607 :                                 unlock_table(tr->store, t->base.id);
    4033     1967607 :                                 if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4034             :                                         /* append col*/
    4035      253375 :                                         BAT *ins = temp_descriptor(cs->bid);
    4036      253375 :                                         if (ins == NULL)
    4037             :                                                 return LOG_ERR;
    4038      253375 :                                         assert(BATcount(ins) >= cur->end);
    4039      253375 :                                         ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
    4040      253375 :                                         bat_destroy(ins);
    4041             :                                 }
    4042     1967607 :                                 lock_table(tr->store, t->base.id);
    4043             :                         }
    4044             :                 }
    4045      332620 :                 unlock_table(tr->store, t->base.id);
    4046             : 
    4047      332620 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    4048          19 :                         BAT *ins = temp_descriptor(cs->ebid);
    4049          19 :                         if (ins == NULL)
    4050             :                                 return LOG_ERR;
    4051          19 :                         if (BATcount(ins) > ins->batInserted)
    4052          17 :                                 ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
    4053          19 :                         BATcommit(ins, BATcount(ins));
    4054          19 :                         bat_destroy(ins);
    4055             :                 }
    4056             :         }
    4057             : 
    4058       56243 :         if (t->idxs) {
    4059       60777 :                 for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
    4060        4534 :                         sql_idx *i = n->data;
    4061             : 
    4062        4534 :                         if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    4063        3765 :                                 continue;
    4064         769 :                         column_storage *cs = ATOMIC_PTR_GET(&i->data);
    4065             : 
    4066         769 :                         if (cs) {
    4067         769 :                                 if (cs->cleared) {
    4068           0 :                                         ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    4069           0 :                                         continue;
    4070             :                                 }
    4071             : 
    4072         769 :                                 lock_table(tr->store, t->base.id);
    4073        2360 :                                 for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    4074        1591 :                                         unlock_table(tr->store, t->base.id);
    4075        1591 :                                         if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4076             :                                                 /* append idx */
    4077         721 :                                                 BAT *ins = temp_descriptor(cs->bid);
    4078         721 :                                                 if (ins == NULL)
    4079             :                                                         return LOG_ERR;
    4080         721 :                                                 assert(BATcount(ins) >= cur->end);
    4081         721 :                                                 ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
    4082         721 :                                                 bat_destroy(ins);
    4083             :                                         }
    4084        1591 :                                         lock_table(tr->store, t->base.id);
    4085             :                                 }
    4086         769 :                                 unlock_table(tr->store, t->base.id);
    4087             :                         }
    4088             :                 }
    4089             :         }
    4090             : 
    4091       56243 :         if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
    4092           0 :                 return LOG_ERR;
    4093             : 
    4094             :         return LOG_OK;
    4095             : }
    4096             : 
    4097             : static int
    4098       82208 : log_storage(sql_trans *tr, sql_table *t, storage *s)
    4099             : {
    4100       82208 :         int ok = LOG_OK;
    4101       82208 :         bool cleared = s->cs.cleared;
    4102       82208 :         if (ok == LOG_OK && cleared)
    4103       25965 :                 ok =  tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
    4104       25965 :         if (ok == LOG_OK)
    4105       82208 :                 ok = log_segments(tr, s->segs, t->base.id);
    4106       82208 :         if (ok == LOG_OK && !cleared)
    4107       56243 :                 ok = log_table_append(tr, t, s->segs);
    4108       82208 :         return ok;
    4109             : }
    4110             : 
    4111             : static void
    4112      298415 : merge_cs( column_storage *cs, const char* caller)
    4113             : {
    4114      298415 :         if (cs->bid && cs->ucnt) {
    4115        2707 :                 BAT *cur = temp_descriptor(cs->bid);
    4116        2707 :                 BAT *ui = temp_descriptor(cs->uibid);
    4117        2707 :                 BAT *uv = temp_descriptor(cs->uvbid);
    4118             : 
    4119        2707 :                 if (!cur || !ui || !uv) {
    4120           0 :                         bat_destroy(ui);
    4121           0 :                         bat_destroy(uv);
    4122           0 :                         bat_destroy(cur);
    4123           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4124             :                         return;
    4125             :                 }
    4126        2707 :                 assert(BATcount(ui) == BATcount(uv));
    4127             : 
    4128             :                 /* any updates */
    4129        2707 :                 assert(!isEbat(cur));
    4130        2707 :                 if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
    4131           0 :                         bat_destroy(ui);
    4132           0 :                         bat_destroy(uv);
    4133           0 :                         bat_destroy(cur);
    4134           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4135             :                         return;
    4136             :                 }
    4137             :                 /* cleanup the old deltas */
    4138        2707 :                 temp_destroy(cs->uibid);
    4139        2707 :                 temp_destroy(cs->uvbid);
    4140        2707 :                 cs->uibid = e_bat(TYPE_oid);
    4141        2707 :                 cs->uvbid = e_bat(cur->ttype);
    4142        2707 :                 assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
    4143        2707 :                 cs->ucnt = 0;
    4144        2707 :                 bat_destroy(ui);
    4145        2707 :                 bat_destroy(uv);
    4146        2707 :                 bat_destroy(cur);
    4147             :         }
    4148      298415 :         cs->cleared = false;
    4149      298415 :         cs->merged = true;
    4150      298415 :         return;
    4151             : }
    4152             : 
    4153             : static void
    4154      258552 : merge_delta( sql_delta *obat)
    4155             : {
    4156      258552 :         if (obat && obat->next && !obat->cs.merged)
    4157       50351 :                 merge_delta(obat->next);
    4158      258552 :         merge_cs(&obat->cs, __func__);
    4159      258552 : }
    4160             : 
    4161             : static void
    4162       39863 : merge_storage(storage *tdb)
    4163             : {
    4164       39863 :         merge_cs(&tdb->cs, __func__);
    4165             : 
    4166       39863 :         if (tdb->next) {
    4167         481 :                 destroy_storage(tdb->next);
    4168         481 :                 tdb->next = NULL;
    4169             :         }
    4170       39863 : }
    4171             : 
    4172             : static sql_delta *
    4173           1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
    4174             : {
    4175             :         /* commit ie copy back to the parent transaction */
    4176           1 :         if (delta && delta->cs.ts == commit_ts && delta->next) {
    4177           1 :                 sql_delta *od = delta->next;
    4178           1 :                 if (od->cs.ts == commit_ts) {
    4179           0 :                         sql_delta t = *od, *n = od->next;
    4180           0 :                         *od = *delta;
    4181           0 :                         od->next = n;
    4182           0 :                         *delta = t;
    4183           0 :                         delta->next = NULL;
    4184           0 :                         destroy_delta(delta, true);
    4185           0 :                         return od;
    4186             :                 }
    4187             :         }
    4188             :         return delta;
    4189             : }
    4190             : 
    4191             : static int
    4192      132735 : log_update_col( sql_trans *tr, sql_change *change)
    4193             : {
    4194      132735 :         sql_column *c = (sql_column*)change->obj;
    4195      132735 :         assert(!isTempTable(c->t));
    4196             : 
    4197      132735 :         if (isDeleted(c->t)) {
    4198           0 :                 change->handled = true;
    4199           0 :                 return LOG_OK;
    4200             :         }
    4201             : 
    4202      132735 :         if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
    4203      132735 :                 storage *s = ATOMIC_PTR_GET(&c->t->data);
    4204      132735 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    4205      132735 :                 return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
    4206             :         }
    4207             :         return LOG_OK;
    4208             : }
    4209             : 
    4210             : static int
    4211         217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
    4212             : {
    4213         217 :         sqlstore *store = Store;
    4214             : 
    4215         217 :         sql_delta *d = (sql_delta*)change->data;
    4216         217 :         if (d->cs.ts < oldest) {
    4217          82 :                 destroy_delta(d, false);
    4218          82 :                 if (change->commit == &commit_update_idx)
    4219           2 :                         table_destroy(store, ((sql_idx*)change->obj)->t);
    4220             :                 else
    4221          80 :                         table_destroy(store, ((sql_column*)change->obj)->t);
    4222          82 :                 return 1;
    4223             :         }
    4224         135 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4225          82 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4226             :         return 0;
    4227             : }
    4228             : 
    4229             : static int
    4230           9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
    4231             : {
    4232           9 :         sqlstore *store = Store;
    4233             : 
    4234           9 :         storage *d = (storage*)change->data;
    4235           9 :         if (d->cs.ts < oldest) {
    4236           3 :                 destroy_storage(d);
    4237           3 :                 table_destroy(store, (sql_table*)change->obj);
    4238           3 :                 return 1;
    4239             :         }
    4240           6 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4241           3 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4242             :         return 0;
    4243             : }
    4244             : 
    4245             : static int
    4246      132853 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
    4247             : {
    4248      132853 :         (void) type; // TODO transaction_layer_revamp remove if remains unused
    4249             : 
    4250      132853 :         sql_delta *delta = ATOMIC_PTR_GET(data);
    4251             : 
    4252      132853 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4253           3 :                 int ok = LOG_OK;
    4254           3 :                 assert(isTempTable(t));
    4255           3 :                 if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
    4256           0 :                         ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
    4257           3 :                 if (!tr->parent)
    4258           0 :                         t->base.new = base->new = 0;
    4259           3 :                 change->handled = true;
    4260           3 :                 return ok;
    4261             :         }
    4262             : 
    4263      132850 :         if (commit_ts)
    4264      132768 :                 delta->cs.ts = commit_ts;
    4265      132850 :         if (!commit_ts) { /* rollback */
    4266          82 :                 sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
    4267             : 
    4268          82 :                 if (change->ts && t->base.new) /* handled by create col */
    4269             :                         return LOG_OK;
    4270          82 :                 if (o != d) {
    4271           0 :                         while(o && o->next != d)
    4272             :                                 o = o->next;
    4273             :                 }
    4274          82 :                 if (o == ATOMIC_PTR_GET(data))
    4275          82 :                         ATOMIC_PTR_SET(data, d->next);
    4276             :                 else
    4277           0 :                         o->next = d->next;
    4278          82 :                 d->next = NULL;
    4279          82 :                 change->cleanup = &tc_gc_rollbacked;
    4280      132768 :         } else if (!tr->parent) {
    4281             :                 /* merge deltas */
    4282      353355 :                 while (delta && delta->cs.ts > oldest)
    4283      220588 :                         delta = delta->next;
    4284      132767 :                 if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
    4285       30905 :                         lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
    4286       30905 :                         merge_delta(delta);
    4287       30905 :                         unlock_column(tr->store, base->id);
    4288             :                 }
    4289           1 :         } else if (tr->parent) /* move delta into older and cleanup current save points */
    4290           1 :                 ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
    4291             :         return LOG_OK;
    4292             : }
    4293             : 
    4294             : static int
    4295      132825 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4296             : {
    4297             : 
    4298      132825 :         sql_column *c = (sql_column*)change->obj;
    4299      132825 :         sql_base* base = &c->base;
    4300      132825 :         sql_table* t = c->t;
    4301      132825 :         ATOMIC_PTR_TYPE* data = &c->data;
    4302      132825 :         int type = c->type.type->localtype;
    4303             : 
    4304      132825 :         if (change->handled || isDeleted(c->t))
    4305             :                 return LOG_OK;
    4306             : 
    4307      132825 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4308             : }
    4309             : 
    4310             : static int
    4311          26 : log_update_idx( sql_trans *tr, sql_change *change)
    4312             : {
    4313          26 :         sql_idx *i = (sql_idx*)change->obj;
    4314          26 :         assert(!isTempTable(i->t));
    4315             : 
    4316          26 :         if (isDeleted(i->t)) {
    4317           0 :                 change->handled = true;
    4318           0 :                 return LOG_OK;
    4319             :         }
    4320             : 
    4321          26 :         if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
    4322          26 :                 storage *s = ATOMIC_PTR_GET(&i->t->data);
    4323          26 :                 sql_delta *d = ATOMIC_PTR_GET(&i->data);
    4324          26 :                 return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
    4325             :         }
    4326             :         return LOG_OK;
    4327             : }
    4328             : 
    4329             : static int
    4330          28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4331             : {
    4332          28 :         sql_idx *i = (sql_idx*)change->obj;
    4333          28 :         sql_base* base = &i->base;
    4334          28 :         sql_table* t = i->t;
    4335          28 :         ATOMIC_PTR_TYPE* data = &i->data;
    4336          28 :         int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
    4337             : 
    4338          28 :         if (change->handled || isDeleted(i->t))
    4339             :                 return LOG_OK;
    4340             : 
    4341          28 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4342             : }
    4343             : 
    4344             : static storage *
    4345          26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
    4346             : {
    4347          26 :         if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
    4348           0 :                 assert(0);
    4349             :                 storage *od = dbat->next;
    4350             :                 if (od->cs.ts == commit_ts) {
    4351             :                         storage t = *od, *n = od->next;
    4352             :                         *od = *dbat;
    4353             :                         od->next = n;
    4354             :                         *dbat = t;
    4355             :                         dbat->next = NULL;
    4356             :                         destroy_storage(dbat);
    4357             :                         return od;
    4358             :                 }
    4359             :         }
    4360          26 :         return dbat;
    4361             : }
    4362             : 
    4363             : static int
    4364       82208 : log_update_del( sql_trans *tr, sql_change *change)
    4365             : {
    4366       82208 :         sql_table *t = (sql_table*)change->obj;
    4367       82208 :         assert(!isTempTable(t));
    4368             : 
    4369       82208 :         if (isDeleted(t)) {
    4370           0 :                 change->handled = true;
    4371           0 :                 return LOG_OK;
    4372             :         }
    4373             : 
    4374       82208 :         if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
    4375       82208 :                 return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
    4376             :         return LOG_OK;
    4377             : }
    4378             : 
    4379             : static int
    4380       86678 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4381             : {
    4382       86678 :         int ok = LOG_OK;
    4383       86678 :         sql_table *t = (sql_table*)change->obj;
    4384       86678 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    4385             : 
    4386       86678 :         if (change->handled || isDeleted(t))
    4387             :                 return ok;
    4388             : 
    4389       86678 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4390          22 :                 assert(isTempTable(t));
    4391          22 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    4392          22 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    4393          22 :                 change->handled = true;
    4394          22 :                 return ok;
    4395             :         }
    4396             : 
    4397       86656 :         lock_table(tr->store, t->base.id);
    4398       86656 :         if (!commit_ts) { /* rollback */
    4399        4148 :                 if (dbat->cs.ts == tr->tid) {
    4400           6 :                         if (change->ts && t->base.new) { /* handled by the create table */
    4401           3 :                                 unlock_table(tr->store, t->base.id);
    4402           3 :                                 return ok;
    4403             :                         }
    4404           3 :                         storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
    4405             : 
    4406           3 :                         if (o != d) {
    4407           0 :                                 while(o && o->next != d)
    4408             :                                         o = o->next;
    4409             :                         }
    4410           3 :                         if (o == ATOMIC_PTR_GET(&t->data)) {
    4411           3 :                                 assert(d->next);
    4412           3 :                                 ATOMIC_PTR_SET(&t->data, d->next);
    4413             :                         } else
    4414           0 :                                 o->next = d->next;
    4415           3 :                         d->next = NULL;
    4416           3 :                         change->cleanup = &tc_gc_rollbacked_storage;
    4417             :                 } else
    4418        4142 :                         rollback_segments(dbat->segs, tr, change, oldest);
    4419       82508 :         } else if (ok == LOG_OK && !tr->parent) {
    4420       82482 :                 if (dbat->cs.ts == tr->tid) /* cleared table */
    4421       25967 :                         dbat->cs.ts = commit_ts;
    4422             : 
    4423       82482 :                 ok = segments2cs(tr, dbat->segs, &dbat->cs);
    4424       82482 :                 if (ok == LOG_OK) {
    4425       82482 :                         merge_segments(dbat, tr, change, commit_ts, oldest);
    4426       82482 :                         if (oldest == commit_ts)
    4427       39863 :                                 merge_storage(dbat);
    4428             :                 }
    4429       82482 :                 if (dbat)
    4430       82482 :                         dbat->cs.cleared = false;
    4431          26 :         } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
    4432          26 :                 merge_segments(dbat, tr, change, commit_ts, oldest);
    4433          26 :                 ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
    4434          26 :                 storage *s = change->data;
    4435          26 :                 if (s->cs.ts == tr->tid)
    4436           0 :                         s->cs.ts = commit_ts;
    4437             :         }
    4438       86653 :         unlock_table(tr->store, t->base.id);
    4439       86653 :         return ok;
    4440             : }
    4441             : 
    4442             : /* only rollback (content version) case for now */
    4443             : static int
    4444         215 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
    4445             : {
    4446         215 :         sqlstore *store = Store;
    4447         215 :         sql_column *c = (sql_column*)change->obj;
    4448             : 
    4449         215 :         if (!c) /* cleaned earlier */
    4450             :                 return 1;
    4451             : 
    4452         160 :         if (change->handled || isDeleted(c->t)) {
    4453           0 :                 column_destroy(store, c);
    4454           0 :                 return 1;
    4455             :         }
    4456             : 
    4457             :         /* savepoint commit (did it merge ?) */
    4458         160 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4459           0 :                 column_destroy(store, c);
    4460           0 :                 return 1;
    4461             :         }
    4462         160 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4463             :                 return 0;
    4464         159 :         sql_delta *d = (sql_delta*)change->data;
    4465         159 :         if (d && d->next) {
    4466             : 
    4467          62 :                 if (d->cs.ts > oldest)
    4468             :                         return LOG_OK; /* cannot cleanup yet */
    4469             : 
    4470             :                 // d is oldest reachable delta
    4471          59 :                 if (d->cs.merged && d->next) // Unreachable can immediately be destroyed.
    4472          58 :                         destroy_delta(d->next, true);
    4473             : 
    4474          59 :                 d->next = NULL;
    4475          59 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4476          59 :                 merge_delta(d);
    4477          59 :                 unlock_column(store, c->base.id);
    4478             :         }
    4479         156 :         column_destroy(store, c);
    4480         156 :         return 1;
    4481             : }
    4482             : 
    4483             : static int
    4484      861345 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
    4485             : {
    4486      861345 :         sqlstore *store = Store;
    4487      861345 :         sql_column *c = (sql_column*)change->obj;
    4488             : 
    4489      861345 :         if (!c) /* cleaned earlier */
    4490             :                 return 1;
    4491             : 
    4492      861345 :         if (change->handled || isDeleted(c->t)) {
    4493           3 :                 table_destroy(store, c->t);
    4494           3 :                 return 1;
    4495             :         }
    4496             : 
    4497             :         /* savepoint commit (did it merge ?) */
    4498      861342 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4499       69635 :                 table_destroy(store, c->t);
    4500       69635 :                 return 1;
    4501             :         }
    4502      791707 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4503             :                 return 0;
    4504      791706 :         sql_delta *d = (sql_delta*)change->data;
    4505      791706 :         if (d && d->next) {
    4506             : 
    4507      791706 :                 if (d->cs.ts > oldest)
    4508             :                         return LOG_OK; /* cannot cleanup yet */
    4509             : 
    4510             :                 // d is oldest reachable delta
    4511       63047 :                 if (d->cs.merged && d->next) // Unreachable can immediately be destroyed.
    4512        5245 :                         destroy_delta(d->next, true);
    4513             : 
    4514       63047 :                 d->next = NULL;
    4515       63047 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4516       63047 :                 merge_delta(d);
    4517       63047 :                 unlock_column(store, c->base.id);
    4518             :         }
    4519       63047 :         table_destroy(store, c->t);
    4520       63047 :         return 1;
    4521             : }
    4522             : 
    4523             : static int
    4524        1133 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
    4525             : {
    4526        1133 :         sqlstore *store = Store;
    4527        1133 :         sql_idx *i = (sql_idx*)change->obj;
    4528             : 
    4529        1133 :         if (!i) /* cleaned earlier */
    4530             :                 return 1;
    4531             : 
    4532         632 :         if (change->handled || isDeleted(i->t)) {
    4533           0 :                 idx_destroy(store, i);
    4534           0 :                 return 1;
    4535             :         }
    4536             : 
    4537             :         /* savepoint commit (did it merge ?) */
    4538         632 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4539           0 :                 idx_destroy(store, i);
    4540           0 :                 return 1;
    4541             :         }
    4542         632 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4543             :                 return 0;
    4544         632 :         sql_delta *d = (sql_delta*)change->data;
    4545         632 :         if (d->next) {
    4546             : 
    4547           0 :                 if (d->cs.ts > oldest)
    4548             :                         return LOG_OK; /* cannot cleanup yet */
    4549             : 
    4550             :                 // d is oldest reachable delta
    4551           0 :                 if (d->cs.merged && d->next) // Unreachable can immediately be destroyed.
    4552           0 :                         destroy_delta(d->next, true);
    4553             : 
    4554           0 :                 d->next = NULL;
    4555           0 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4556           0 :                 merge_delta(d);
    4557           0 :                 unlock_column(store, i->base.id);
    4558             :         }
    4559         632 :         idx_destroy(store, i);
    4560         632 :         return 1;
    4561             : }
    4562             : 
    4563             : static int
    4564          26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
    4565             : {
    4566          26 :         sqlstore *store = Store;
    4567          26 :         sql_idx *i = (sql_idx*)change->obj;
    4568             : 
    4569          26 :         if (!i) /* cleaned earlier */
    4570             :                 return 1;
    4571             : 
    4572          26 :         if (change->handled || isDeleted(i->t)) {
    4573           0 :                 table_destroy(store, i->t);
    4574           0 :                 return 1;
    4575             :         }
    4576             : 
    4577             :         /* savepoint commit (did it merge ?) */
    4578          26 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4579           0 :                 table_destroy(store, i->t);
    4580           0 :                 return 1;
    4581             :         }
    4582          26 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4583             :                 return 0;
    4584          26 :         sql_delta *d = (sql_delta*)change->data;
    4585          26 :         if (d->next) {
    4586             : 
    4587          26 :                 if (d->cs.ts > oldest)
    4588             :                         return LOG_OK; /* cannot cleanup yet */
    4589             : 
    4590             :                 // d is oldest reachable delta
    4591          26 :                 if (d->cs.merged && d->next) // Unreachable can immediately be destroyed.
    4592          26 :                         destroy_delta(d->next, true);
    4593             : 
    4594          26 :                 d->next = NULL;
    4595          26 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4596          26 :                 merge_delta(d);
    4597          26 :                 unlock_column(store, i->base.id);
    4598             :         }
    4599          26 :         table_destroy(store, i->t);
    4600          26 :         return 1;
    4601             : }
    4602             : 
    4603             : static int
    4604      242883 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
    4605             : {
    4606      242883 :         sqlstore *store = Store;
    4607      242883 :         sql_table *t = (sql_table*)change->obj;
    4608             : 
    4609      242883 :         if (change->handled || isDeleted(t)) {
    4610        3333 :                 table_destroy(store, t);
    4611        3333 :                 return 1;
    4612             :         }
    4613             :         /* savepoint commit (did it merge ?) */
    4614      239550 :         if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
    4615       13957 :                 table_destroy(store, t);
    4616       13957 :                 return 1;
    4617             :         }
    4618      225593 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4619             :                 return 0;
    4620      225565 :         storage *d = (storage*)change->data;
    4621      225565 :         if (d->next) {
    4622      156474 :                 if (d->cs.ts > oldest)
    4623             :                         return LOG_OK; /* cannot cleanup yet */
    4624             : 
    4625       11530 :                 destroy_storage(d->next);
    4626       11530 :                 d->next = NULL;
    4627             :         }
    4628       80621 :         table_destroy(store, t);
    4629       80621 :         return 1;
    4630             : }
    4631             : 
    4632             : static int
    4633       30308 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
    4634             : {
    4635       30308 :         if (nr == 0)
    4636             :                 return LOG_OK;
    4637       30308 :         assert (nr > 0);
    4638       30308 :         if ((!offsets || !*offsets) && nr == total) {
    4639       30279 :                 *offset = slot;
    4640       30279 :                 return LOG_OK;
    4641             :         }
    4642          29 :         if (!*offsets) {
    4643           7 :                 *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
    4644           7 :                 if (!*offsets)
    4645             :                         return LOG_ERR;
    4646             :         }
    4647          29 :         oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
    4648       16168 :         for(size_t i = 0; i < nr; i++)
    4649       16139 :                 dst[i] = slot + i;
    4650          29 :         (*offsets)->batCount += nr;
    4651          29 :         (*offsets)->theap->dirty = true;
    4652          29 :         return LOG_OK;
    4653             : }
    4654             : 
    4655             : static int
    4656       30286 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4657             : {
    4658       30286 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4659       30286 :         assert(s->segs);
    4660       30286 :         ulng oldest = store_oldest(tr->store, NULL);
    4661       30286 :         BUN slot = 0;
    4662       30286 :         size_t total = cnt;
    4663             : 
    4664       30286 :         if (!locked)
    4665       30286 :                 lock_table(tr->store, t->base.id);
    4666             :         /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
    4667       60694 :         for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4668       30410 :                 if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* re-use old deleted or rolledback append */
    4669          35 :                         if ((seg->end - seg->start) >= cnt) {
    4670             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4671          13 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4672           2 :                                         slot = p->end;
    4673           2 :                                         p->end += cnt;
    4674           2 :                                         seg->start += cnt;
    4675           2 :                                         if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
    4676             :                                                 ok = LOG_ERR;
    4677             :                                                 break;
    4678             :                                         }
    4679           2 :                                         cnt = 0;
    4680           2 :                                         break;
    4681             :                                 }
    4682             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4683          11 :                                 size_t rcnt = seg->end - seg->start;
    4684          11 :                                 if (rcnt > cnt)
    4685             :                                         rcnt = cnt;
    4686          11 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
    4687             :                                         ok = LOG_ERR;
    4688             :                                         break;
    4689             :                                 }
    4690             :                         }
    4691          33 :                         seg->ts = tr->tid;
    4692          33 :                         seg->deleted = false;
    4693          33 :                         slot = seg->start;
    4694          33 :                         if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
    4695             :                                 ok = LOG_ERR;
    4696             :                                 break;
    4697             :                         }
    4698          33 :                         cnt -= (seg->end - seg->start);
    4699             :                 }
    4700             :         }
    4701       30286 :         if (ok == LOG_OK && cnt) {
    4702       30273 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4703       29177 :                         slot = s->segs->t->end;
    4704       29177 :                         s->segs->t->end += cnt;
    4705             :                 } else {
    4706        1096 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4707             :                                 ok = LOG_ERR;
    4708             :                         } else {
    4709        1096 :                                 if (!s->segs->h)
    4710           0 :                                         s->segs->h = s->segs->t;
    4711        1096 :                                 slot = s->segs->t->start;
    4712             :                         }
    4713             :                 }
    4714       30273 :                 if (ok == LOG_OK)
    4715       30273 :                         ok = add_offsets(slot, cnt, total, offset, offsets);
    4716             :         }
    4717       30286 :         if (!locked)
    4718       30286 :                 unlock_table(tr->store, t->base.id);
    4719             : 
    4720       30286 :         if (ok == LOG_OK) {
    4721             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4722       30286 :                 if (!in_transaction) {
    4723        1088 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4724        1088 :                         in_transaction = true;
    4725             :                 }
    4726       30286 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4727       30272 :                         tr->logchanges += (int) total;
    4728       30286 :                 if (*offsets) {
    4729           7 :                         BAT *pos = *offsets;
    4730           7 :                         assert(BATcount(pos) == total);
    4731           7 :                         BATsetcount(pos, total); /* set other properties */
    4732           7 :                         pos->tnil = false;
    4733           7 :                         pos->tnonil = true;
    4734           7 :                         pos->tkey = true;
    4735           7 :                         pos->tsorted = true;
    4736           7 :                         pos->trevsorted = false;
    4737             :                 }
    4738             :         }
    4739       30286 :         return ok;
    4740             : }
    4741             : 
    4742             : static int
    4743     2097182 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4744             : {
    4745     2097182 :         if (cnt > 1 && offsets)
    4746       30286 :                 return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
    4747     2066896 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4748     2066896 :         assert(s->segs);
    4749     2066896 :         ulng oldest = store_oldest(tr->store, NULL);
    4750     2066896 :         BUN slot = 0;
    4751     2066896 :         int reused = 0;
    4752             : 
    4753     2066896 :         if (!locked)
    4754     1951743 :                 lock_table(tr->store, t->base.id);
    4755             :         /* naive vacuum approach, iterator through segments, check for large enough deleted segments
    4756             :          * or create new segment at the end */
    4757     7001137 :         for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4758     5000512 :                 if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* re-use old deleted or rolledback append */
    4759             : 
    4760       66271 :                         if ((seg->end - seg->start) >= cnt) {
    4761             : 
    4762             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4763       66271 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4764       51892 :                                         slot = p->end;
    4765       51892 :                                         p->end += cnt;
    4766       51892 :                                         seg->start += cnt;
    4767       51892 :                                         reused = 1;
    4768       51892 :                                         break;
    4769             :                                 }
    4770             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4771       14379 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
    4772             :                                         ok = LOG_ERR;
    4773             :                                         break;
    4774             :                                 }
    4775             :                         }
    4776       14379 :                         seg->ts = tr->tid;
    4777       14379 :                         seg->deleted = false;
    4778       14379 :                         slot = seg->start;
    4779       14379 :                         reused = 1;
    4780       14379 :                         break;
    4781             :                 }
    4782             :         }
    4783     2066896 :         if (ok == LOG_OK && !reused) {
    4784     2000625 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4785     1966473 :                         slot = s->segs->t->end;
    4786     1966473 :                         s->segs->t->end += cnt;
    4787             :                 } else {
    4788       34152 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4789             :                                 ok = LOG_ERR;
    4790             :                         } else {
    4791       34152 :                                 if (!s->segs->h)
    4792           0 :                                         s->segs->h = s->segs->t;
    4793       34152 :                                 slot = s->segs->t->start;
    4794             :                         }
    4795             :                 }
    4796             :         }
    4797     2066896 :         if (!locked)
    4798     1951743 :                 unlock_table(tr->store, t->base.id);
    4799             : 
    4800     2066896 :         if (ok == LOG_OK) {
    4801             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4802     2066896 :                 if (!in_transaction) {
    4803       47381 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4804       47381 :                         in_transaction = true;
    4805             :                 }
    4806     2066896 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4807     2066469 :                         tr->logchanges += (int) cnt;
    4808     2066896 :                 *offset = slot;
    4809             :         }
    4810             :         return ok;
    4811             : }
    4812             : 
    4813             : /*
    4814             :  * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
    4815             :  * of the global transaction and mark the newly added storage slots unused on the global
    4816             :  * level but used on the local transaction level. Besides this the local transaction needs
    4817             :  * to update (and mark unused) any slot inbetween the old end and new slots.
    4818             :  * */
    4819             : static int
    4820     1982029 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4821             : {
    4822     1982029 :         storage *s;
    4823             : 
    4824             :         /* we have a single segment structure for each persistent table
    4825             :          * for temporary tables each has its own */
    4826     1982029 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4827             :                 return LOG_ERR;
    4828             : 
    4829     1982029 :         return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
    4830             : }
    4831             : 
    4832             : /* some tables cannot be updated concurrently (user/roles etc) */
    4833             : static int
    4834      115157 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4835             : {
    4836      115157 :         storage *s;
    4837      115157 :         int res = 0;
    4838             : 
    4839             :         /* we have a single segment structure for each persistent table
    4840             :          * for temporary tables each has its own */
    4841      115157 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4842             :                 /* TODO check for other inserts ! */
    4843             :                 return LOG_ERR;
    4844             : 
    4845      115157 :         lock_table(tr->store, t->base.id);
    4846      115157 :         if ((res = segments_conflict(tr, s->segs, 1))) {
    4847           4 :                 unlock_table(tr->store, t->base.id);
    4848           4 :                 return LOG_CONFLICT;
    4849             :         }
    4850      115153 :         res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
    4851      115153 :         unlock_table(tr->store, t->base.id);
    4852      115153 :         return res;
    4853             : }
    4854             : 
    4855             : static int
    4856       13563 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
    4857             : {
    4858       13563 :         storage *s;
    4859       13563 :         int res = 0;
    4860             : 
    4861       13563 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4862             :                 return LOG_ERR;
    4863             : 
    4864       13563 :         lock_table(tr->store, t->base.id);
    4865       13563 :         res = segments_conflict(tr, s->segs, uncommitted);
    4866       13563 :         unlock_table(tr->store, t->base.id);
    4867       13563 :         return res ? LOG_CONFLICT : LOG_OK;
    4868             : }
    4869             : 
    4870             : static size_t
    4871     1315904 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
    4872             : {
    4873     1315904 :         size_t cnt = 0;
    4874             : 
    4875     1352318 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
    4876             :                 ;
    4877             : 
    4878     3304831 :         for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
    4879     1988926 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
    4880       97950 :                         cnt += s->end - s->start;
    4881             :         }
    4882     1315905 :         return cnt;
    4883             : }
    4884             : 
    4885             : static BAT *
    4886     1315905 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
    4887             : {
    4888     1315905 :         lock_table(tr->store, t->base.id);
    4889     1315904 :         segment *s = S->segs->h;
    4890             :         /* step one no deletes -> dense range */
    4891     1315904 :         uint32_t cur = 0;
    4892     1315904 :         size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
    4893     1315902 :         if (!dnr) {
    4894     1227484 :                 unlock_table(tr->store, t->base.id);
    4895     1227491 :                 return BATdense(start, start, end-start);
    4896             :         }
    4897             : 
    4898       88418 :         BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
    4899       88418 :         if (!b) {
    4900           0 :                 unlock_table(tr->store, t->base.id);
    4901           0 :                 return NULL;
    4902             :         }
    4903             : 
    4904       88418 :         uint32_t *restrict dst = Tloc(b, 0);
    4905    57204414 :         for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
    4906    57170407 :                 if (s->end < start)
    4907       17035 :                         continue;
    4908    57153372 :                 if (s->start >= end)
    4909             :                         break;
    4910    57098961 :                 msk m = (SEG_IS_VALID(s, tr));
    4911    57098961 :                 size_t lnr = s->end-s->start;
    4912    57098961 :                 if (s->start < start)
    4913        4575 :                         lnr -= (start - s->start);
    4914    57098961 :                 if (s->end > end)
    4915        3490 :                         lnr -= s->end - end;
    4916             : 
    4917    57098961 :                 if (m) {
    4918      617920 :                         size_t used = pos&31, end = 32;
    4919      617920 :                         if (used) {
    4920      515419 :                                 if (lnr < (32-used))
    4921      305932 :                                         end = used + lnr;
    4922      515419 :                                 assert(end > used);
    4923      515419 :                                 cur |= ((1U << (end - used)) - 1) << used;
    4924      515419 :                                 lnr -= end - used;
    4925      515419 :                                 pos += end - used;
    4926      515419 :                                 if (end == 32) {
    4927      209487 :                                         *dst++ = cur;
    4928      209487 :                                         cur = 0;
    4929             :                                 }
    4930             :                         }
    4931      617920 :                         size_t full = lnr/32;
    4932      617920 :                         size_t rest = lnr%32;
    4933      617920 :                         if (full > 0) {
    4934      196247 :                                 memset(dst, ~0, full * sizeof(*dst));
    4935      196247 :                                 dst += full;
    4936      196247 :                                 lnr -= full * 32;
    4937      196247 :                                 pos += full * 32;
    4938             :                         }
    4939      617920 :                         if (rest > 0) {
    4940      296271 :                                 cur |= (1U << rest) - 1;
    4941      296271 :                                 lnr -= rest;
    4942      296271 :                                 pos += rest;
    4943             :                         }
    4944      617920 :                         assert(lnr==0);
    4945             :                 } else {
    4946    56481041 :                         size_t used = pos&31, end = 32;
    4947    56481041 :                         if (used) {
    4948    54712384 :                                 if (lnr < (32-used))
    4949    52881192 :                                         end = used + lnr;
    4950             : 
    4951    54712384 :                                 pos+= (end-used);
    4952    54712384 :                                 lnr-= (end-used);
    4953    54712384 :                                 if (end == 32) {
    4954     1831192 :                                         *dst++ = cur;
    4955     1831192 :                                         cur = 0;
    4956             :                                 }
    4957             :                         }
    4958    56481041 :                         size_t full = lnr/32;
    4959    56481041 :                         size_t rest = lnr%32;
    4960    56481041 :                         memset(dst, 0, full * sizeof(*dst));
    4961    56481041 :                         dst += full;
    4962    56481041 :                         lnr -= full * 32;
    4963    56481041 :                         pos += full * 32;
    4964    56481041 :                         pos+= rest;
    4965    56481041 :                         lnr-= rest;
    4966    56481041 :                         assert(lnr==0);
    4967             :                 }
    4968             :         }
    4969             : 
    4970       88418 :         unlock_table(tr->store, t->base.id);
    4971       88418 :         if (pos%32)
    4972       83945 :                 *dst=cur;
    4973       88418 :         BATsetcount(b, nr);
    4974       88418 :         bn = BATmaskedcands(start, nr, b, true);
    4975       88418 :         BBPreclaim(b);
    4976       88418 :         (void)pos;
    4977       88418 :         assert (pos == nr);
    4978             :         return bn;
    4979             : }
    4980             : 
    4981             : static void *                                   /* BAT * */
    4982     1323016 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
    4983             : {
    4984             :         /* with nr_of_parts - part_nr we can adjust parts */
    4985     1323016 :         storage *s = tab_timestamp_storage(tr, t);
    4986             : 
    4987     1323006 :         if (!s)
    4988             :                 return NULL;
    4989     1323006 :         size_t nr = segs_end(s->segs, tr, t);
    4990             : 
    4991     1322988 :         if (!nr)
    4992        7084 :                 return BATdense(0, 0, 0);
    4993             : 
    4994             :         /* compute proper part */
    4995     1315904 :         size_t part_size = nr/nr_of_parts;
    4996     1315904 :         size_t start = part_size * part_nr;
    4997     1315904 :         size_t end = start + part_size;
    4998     1315904 :         if (part_nr == (nr_of_parts-1))
    4999     1246826 :                 end = nr;
    5000     1315904 :         assert(end <= nr);
    5001     1315904 :         return segments2cands(s, tr, t, start, end);
    5002             : }
    5003             : 
    5004             : static int
    5005           1 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
    5006             : {
    5007           1 :         bool update_conflict = false;
    5008             : 
    5009           1 :         if (segments_in_transaction(tr, col->t))
    5010             :                 return LOG_CONFLICT;
    5011             : 
    5012           1 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    5013             : 
    5014           1 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    5015           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    5016           1 :         assert(d && d->cs.ts == tr->tid);
    5017           1 :         if (odelta != d)
    5018           1 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
    5019           1 :         if (d->cs.bid)
    5020           1 :                 temp_destroy(d->cs.bid);
    5021           1 :         if (d->cs.uibid)
    5022           1 :                 temp_destroy(d->cs.uibid);
    5023           1 :         if (d->cs.uvbid)
    5024           1 :                 temp_destroy(d->cs.uvbid);
    5025           1 :         bat_set_access(bn, BAT_READ);
    5026           1 :         d->cs.bid = temp_create(bn);
    5027           1 :         d->cs.uibid = 0;
    5028           1 :         d->cs.uvbid = 0;
    5029           1 :         d->cs.ucnt = 0;
    5030           1 :         d->cs.cleared = true;
    5031           1 :         d->cs.ts = tr->tid;
    5032           1 :         ATOMIC_INIT(&d->cs.refcnt, 1);
    5033           1 :         return LOG_OK;
    5034             : }
    5035             : 
    5036             : static int
    5037          58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
    5038             : {
    5039          58 :         bool update_conflict = false;
    5040             : 
    5041          58 :         if (segments_in_transaction(tr, col->t))
    5042             :                 return LOG_CONFLICT;
    5043             : 
    5044          58 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    5045             : 
    5046          58 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    5047           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    5048          58 :         assert(d && d->cs.ts == tr->tid);
    5049          58 :         assert(col->t->persistence != SQL_DECLARED_TABLE);
    5050          58 :         if (odelta != d)
    5051          58 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
    5052             : 
    5053          58 :         d->cs.st = st;
    5054          58 :         d->cs.cleared = true;
    5055          58 :         if (d->cs.bid)
    5056          58 :                 temp_destroy(d->cs.bid);
    5057          58 :         o = transfer_to_systrans(o);
    5058          58 :         if (o == NULL)
    5059             :                 return LOG_ERR;
    5060          58 :         bat_set_access(o, BAT_READ);
    5061          58 :         d->cs.bid = temp_create(o);
    5062          58 :         if (u) {
    5063          53 :                 if (d->cs.ebid)
    5064           0 :                         temp_destroy(d->cs.ebid);
    5065          53 :                 u = transfer_to_systrans(u);
    5066          53 :                 if (u == NULL)
    5067             :                         return LOG_ERR;
    5068          53 :                 d->cs.ebid = temp_create(u);
    5069             :         }
    5070             :         return LOG_OK;
    5071             : }
    5072             : 
    5073             : void
    5074         336 : bat_storage_init( store_functions *sf)
    5075             : {
    5076         336 :         sf->bind_col = &bind_col;
    5077         336 :         sf->bind_updates = &bind_updates;
    5078         336 :         sf->bind_updates_idx = &bind_updates_idx;
    5079         336 :         sf->bind_idx = &bind_idx;
    5080         336 :         sf->bind_cands = &bind_cands;
    5081             : 
    5082         336 :         sf->claim_tab = &claim_tab;
    5083         336 :         sf->key_claim_tab = &key_claim_tab;
    5084         336 :         sf->tab_validate = &tab_validate;
    5085             : 
    5086         336 :         sf->append_col = &append_col;
    5087         336 :         sf->append_idx = &append_idx;
    5088             : 
    5089         336 :         sf->update_col = &update_col;
    5090         336 :         sf->update_idx = &update_idx;
    5091             : 
    5092         336 :         sf->delete_tab = &delete_tab;
    5093             : 
    5094         336 :         sf->count_del = &count_del;
    5095         336 :         sf->count_col = &count_col;
    5096         336 :         sf->count_idx = &count_idx;
    5097         336 :         sf->dcount_col = &dcount_col;
    5098         336 :         sf->min_max_col = &min_max_col;
    5099         336 :         sf->set_stats_col = &set_stats_col;
    5100         336 :         sf->sorted_col = &sorted_col;
    5101         336 :         sf->unique_col = &unique_col;
    5102         336 :         sf->double_elim_col = &double_elim_col;
    5103         336 :         sf->col_stats = &col_stats;
    5104         336 :         sf->col_set_range = &col_set_range;
    5105         336 :         sf->col_not_null = &col_not_null;
    5106             : 
    5107         336 :         sf->col_dup = &col_dup;
    5108         336 :         sf->idx_dup = &idx_dup;
    5109         336 :         sf->del_dup = &del_dup;
    5110             : 
    5111         336 :         sf->create_col = &create_col;    /* create and add to change list */
    5112         336 :         sf->create_idx = &create_idx;
    5113         336 :         sf->create_del = &create_del;
    5114             : 
    5115         336 :         sf->destroy_col = &destroy_col;  /* free resources */
    5116         336 :         sf->destroy_idx = &destroy_idx;
    5117         336 :         sf->destroy_del = &destroy_del;
    5118             : 
    5119         336 :         sf->drop_col = &drop_col;                /* add drop to change list */
    5120         336 :         sf->drop_idx = &drop_idx;
    5121         336 :         sf->drop_del = &drop_del;
    5122             : 
    5123         336 :         sf->clear_table = &clear_table;
    5124             : 
    5125         336 :         sf->swap_bats = &swap_bats;
    5126         336 :         sf->col_compress = &col_compress;
    5127         336 : }
    5128             : 
    5129             : #if 0
    5130             : static lng
    5131             : log_get_nr_inserted(sql_column *fc, lng *offset)
    5132             : {
    5133             :         lng cnt = 0;
    5134             : 
    5135             :         if (!fc || GDKinmemory(0))
    5136             :                 return 0;
    5137             : 
    5138             :         if (fc->base.atime && fc->base.allocated) {
    5139             :                 sql_delta *fb = fc->data;
    5140             :                 BAT *ins = temp_descriptor(fb->cs.bid);
    5141             : 
    5142             :                 if (ins && BATcount(ins) > 0 && BATcount(ins) > ins->batInserted) {
    5143             :                         cnt = BATcount(ins) - ins->batInserted;
    5144             :                 }
    5145             :                 bat_destroy(ins);
    5146             :         }
    5147             :         return cnt;
    5148             : }
    5149             : 
    5150             : static lng
    5151             : log_get_nr_deleted(sql_table *ft, lng *offset)
    5152             : {
    5153             :         lng cnt = 0;
    5154             : 
    5155             :         if (!ft || GDKinmemory(0))
    5156             :                 return 0;
    5157             : 
    5158             :         if (ft->base.atime && ft->base.allocated) {
    5159             :                 storage *fdb = ft->data;
    5160             :                 BAT *db = temp_descriptor(fdb->cs.bid);
    5161             : 
    5162             :                 if (db && BATcount(db) > 0 && BATcount(db) > db->batInserted) {
    5163             :                         cnt = BATcount(db) - db->batInserted;
    5164             :                         *offset = db->batInserted;
    5165             :                 }
    5166             :                 bat_destroy(db);
    5167             :         }
    5168             :         return cnt;
    5169             : }
    5170             : #endif

Generated by: LCOV version 1.14