LCOV - code coverage report
Current view: top level - sql/storage/bat - bat_storage.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 2451 3128 78.4 %
Date: 2024-11-13 19:37:10 Functions: 156 163 95.7 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "bat_storage.h"
      15             : #include "bat_utils.h"
      16             : #include "sql_string.h"
      17             : #include "gdk_atoms.h"
      18             : #include "gdk_atoms.h"
      19             : #include "matomic.h"
      20             : 
      21             : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
      22             : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
      23             : 
      24             : static int log_update_col( sql_trans *tr, sql_change *c);
      25             : static int log_update_idx( sql_trans *tr, sql_change *c);
      26             : static int log_update_del( sql_trans *tr, sql_change *c);
      27             : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      28             : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      29             : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      30             : static int log_create_col(sql_trans *tr, sql_change *change);
      31             : static int log_create_idx(sql_trans *tr, sql_change *change);
      32             : static int log_create_del(sql_trans *tr, sql_change *change);
      33             : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      34             : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      35             : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      36             : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
      37             : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
      38             : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
      39             : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
      40             : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
      41             : 
      42             : static void merge_delta( sql_delta *obat);
      43             : 
      44             : /* valid
      45             :  * !deleted && VALID_4_READ(TS, tr)                             existing or newly created segment
      46             :  *  deleted && TS > tr->ts && OLDTS < tr->ts                deleted after current transaction
      47             :  */
      48             : 
      49             : #define VALID_4_READ(TS,tr) \
      50             :         (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
      51             : 
      52             : /* when changed, check if the old status is still valid */
      53             : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
      54             :                 (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
      55             : 
      56             : #define SEG_VALID_4_DELETE(seg,tr) \
      57             :         (!seg->deleted && VALID_4_READ(seg->ts, tr))
      58             : 
      59             : /* Delete (in current trans or by some other finished transaction, or re-used segment which used to be deleted */
      60             : #define SEG_IS_DELETED(seg,tr) \
      61             :         ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
      62             :          (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
      63             : 
      64             : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
      65             : #define SEG_IS_VALID(seg, tr) \
      66             :                 ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
      67             :                  (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
      68             : 
      69             : static inline BAT *
      70        5063 : transfer_to_systrans(BAT *b)
      71             : {
      72             :         /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
      73        5063 :         MT_lock_set(&b->theaplock);
      74        5063 :         if (VIEWtparent(b) || VIEWvtparent(b)) {
      75          17 :                 MT_lock_unset(&b->theaplock);
      76          17 :                 BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
      77          17 :                 BBPreclaim(b);
      78          17 :                 return bn;
      79             :         }
      80        5046 :         if (b->theap->farmid == TRANSIENT ||
      81          14 :                 (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
      82        4757 :                 QryCtx *qc = MT_thread_get_qry_ctx();
      83        4757 :                 if (qc) {
      84        2463 :                         if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
      85        2463 :                                 ATOMIC_SUB(&qc->datasize, b->theap->size);
      86        2463 :                                 b->theap->farmid = SYSTRANS;
      87        2463 :                                 b->batRole = SYSTRANS;
      88             :                         }
      89        2463 :                         if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
      90        1067 :                                 ATOMIC_SUB(&qc->datasize, b->tvheap->size);
      91        1067 :                                 b->tvheap->farmid = SYSTRANS;
      92             :                         }
      93             :                 }
      94             :         }
      95        5046 :         MT_lock_unset(&b->theaplock);
      96        5046 :         return b;
      97             : }
      98             : 
      99             : static void
     100    27398229 : lock_table(sqlstore *store, sqlid id)
     101             : {
     102    27398229 :         MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     103    27489105 : }
     104             : 
     105             : static void
     106    27489213 : unlock_table(sqlstore *store, sqlid id)
     107             : {
     108    27489213 :         MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     109    27486672 : }
     110             : 
     111             : static void
     112    20073472 : lock_column(sqlstore *store, sqlid id)
     113             : {
     114    20073472 :         MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     115    20091546 : }
     116             : 
     117             : static void
     118    20088800 : unlock_column(sqlstore *store, sqlid id)
     119             : {
     120    20088800 :         MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     121    20098978 : }
     122             : 
     123             : static void
     124      114003 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     125             : {
     126      114003 :         assert(cleanup);
     127      114003 :         trans_add(tr, dup_base(b), data, cleanup, commit, log);
     128      113999 : }
     129             : 
     130             : static void
     131      132827 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     132             : {
     133      132827 :         assert(cleanup);
     134      132827 :         dup_base(&t->base);
     135      132829 :         trans_add(tr, b, data, cleanup, commit, log);
     136      132825 : }
     137             : 
     138             : static int
     139       91894 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
     140             : {
     141       91894 :         segment *s = change->data;
     142             : 
     143       91894 :         if (s->ts <= oldest) {
     144       33972 :                 while(s) {
     145       21198 :                         segment *n = s->prev;
     146       21198 :                         ATOMIC_PTR_DESTROY(&s->next);
     147       21198 :                         _DELETE(s);
     148       21198 :                         s = n;
     149             :                 }
     150       12774 :                 sqlstore *store = Store;
     151       12774 :                 table_destroy(store, (sql_table*)change->obj);
     152       12774 :                 return 1;
     153             :         }
     154             :         return LOG_OK;
     155             : }
     156             : 
     157             : static void
     158       21198 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
     159             : {
     160             :         /* we can only be accessed by anything older then commit_ts */
     161       21198 :         if (c->cleanup == &tc_gc_seg)
     162        8424 :                 s->prev = c->data;
     163             :         else
     164       12774 :                 c->cleanup = &tc_gc_seg;
     165       21198 :         c->data = s;
     166       21198 :         s->ts = commit_ts;
     167       16796 : }
     168             : 
     169             : static segment *
     170       87530 : new_segment(segment *o, sql_trans *tr, size_t cnt)
     171             : {
     172       87530 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     173             : 
     174       87530 :         assert(tr);
     175       87530 :         if (n) {
     176       87530 :                 *n = (segment) {
     177       87530 :                         .ts = tr->tid,
     178             :                         .oldts = 0,
     179             :                         .deleted = false,
     180             :                         .start = 0,
     181             :                         .end = cnt,
     182             :                         .next = ATOMIC_PTR_VAR_INIT(NULL),
     183             :                         .prev = NULL,
     184             :                 };
     185       87530 :                 if (o) {
     186       36272 :                         n->start += o->end;
     187       36272 :                         n->end += o->end;
     188       36272 :                         ATOMIC_PTR_SET(&o->next, n);
     189             :                 }
     190             :         }
     191       87530 :         return n;
     192             : }
     193             : 
     194             : static segment *
     195       86105 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
     196             : {
     197       86105 :         assert(tr);
     198       86105 :         if (o->start == start && o->end == start+cnt) {
     199       10007 :                 assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
     200       10007 :                 o->oldts = o->ts;
     201       10007 :                 o->ts = tr->tid;
     202       10007 :                 o->deleted = deleted;
     203       10007 :                 return o;
     204             :         }
     205       76098 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     206             : 
     207       76098 :         if (!n)
     208             :                 return NULL;
     209       76098 :         n->prev = NULL;
     210             : 
     211       76098 :         if (o->ts == tr->tid) {
     212        3820 :                 n->oldts = 0;
     213        3820 :                 n->ts = 1;
     214        3820 :                 n->deleted = true;
     215             :         } else {
     216       72278 :                 n->oldts = o->ts;
     217       72278 :                 n->ts = tr->tid;
     218       72278 :                 n->deleted = deleted;
     219             :         }
     220       76098 :         if (start == o->start) {
     221             :                 /* 2-way split: o remains latter part of segment, new one is
     222             :                  * inserted before */
     223       60812 :                 n->start = o->start;
     224       60812 :                 n->end = n->start + cnt;
     225       60812 :                 ATOMIC_PTR_INIT(&n->next, o);
     226       60812 :                 if (segs->h == o)
     227         439 :                         segs->h = n;
     228       60812 :                 if (p)
     229       60373 :                         ATOMIC_PTR_SET(&p->next, n);
     230       60812 :                 o->start = n->end;
     231       15286 :         } else if (start+cnt == o->end) {
     232             :                 /* 2-way split: o remains first part of segment, new one is
     233             :                  * added after */
     234        5704 :                 n->start = o->end - cnt;
     235        5704 :                 n->end = o->end;
     236        5704 :                 ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
     237        5704 :                 ATOMIC_PTR_SET(&o->next, n);
     238        5704 :                 if (segs->t == o)
     239         822 :                         segs->t = n;
     240        5704 :                 o->end = n->start;
     241             :         } else {
     242             :                 /* 3-way split: o remains first part of segment, two new ones
     243             :                  * are added after */
     244        9582 :                 segment *n2 = GDKmalloc(sizeof(segment));
     245        9582 :                 if (n2 == NULL) {
     246           0 :                         GDKfree(n);
     247           0 :                         return NULL;
     248             :                 }
     249        9582 :                 ATOMIC_PTR_INIT(&n->next, n2);
     250        9582 :                 n->start = start;
     251        9582 :                 n->end = start + cnt;
     252        9582 :                 *n2 = *o;
     253        9582 :                 ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
     254        9582 :                 n2->start = n->end;
     255        9582 :                 n2->prev = NULL;
     256        9582 :                 if (segs->t == o)
     257        4035 :                         segs->t = n2;
     258        9582 :                 ATOMIC_PTR_SET(&o->next, n);
     259        9582 :                 o->end = start;
     260             :         }
     261             :         return n;
     262             : }
     263             : 
     264             : static void
     265        4190 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
     266             : {
     267        4190 :         segment *cur = segs->h, *seg = NULL;
     268       17953 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     269       13763 :                 if (cur->ts == tr->tid) { /* revert */
     270        4687 :                         cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
     271        4687 :                         cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
     272        4687 :                         cur->oldts = 0;
     273             :                 }
     274       13763 :                 if (cur->ts <= oldest) { /* possibly merge range */
     275       12914 :                         if (!seg) { /* skip first */
     276             :                                 seg = cur;
     277        8724 :                         } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
     278             :                                 /* merge with previous */
     279        4402 :                                 seg->end = cur->end;
     280        4402 :                                 ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     281        4402 :                                 if (cur == segs->t)
     282        2853 :                                         segs->t = seg;
     283        4402 :                                 mark4destroy(cur, change, store_get_timestamp(tr->store));
     284        4402 :                                 cur = seg;
     285             :                         } else {
     286             :                                 seg = cur; /* begin of new merge */
     287             :                         }
     288             :                 }
     289             :         }
     290        4190 : }
     291             : 
     292             : static size_t
     293      103698 : segs_end_include_deleted( segments *segs, sql_trans *tr)
     294             : {
     295      103698 :         size_t cnt = 0;
     296      103698 :         segment *s = segs->h, *l = NULL;
     297             : 
     298      498013 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     299      394315 :                 if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
     300             :                                 l = s;
     301             :         }
     302      103698 :         if (l)
     303      103691 :                 cnt = l->end;
     304      103698 :         return cnt;
     305             : }
     306             : 
     307             : static int
     308      103698 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
     309             : {
     310             :         /* set bits correctly */
     311      103698 :         BAT *b = temp_descriptor(cs->bid);
     312             : 
     313      103698 :         if (!b)
     314             :                 return LOG_ERR;
     315      103698 :         segment *s = segs->h;
     316             : 
     317      103698 :         size_t nr = segs_end_include_deleted(segs, tr);
     318      103698 :         size_t rounded_nr = ((nr+31)&~31);
     319      103698 :         if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
     320           0 :                 bat_destroy(b);
     321           0 :                 return LOG_ERR;
     322             :         }
     323             : 
     324             :         /* disable all properties here */
     325      103698 :         MT_lock_set(&b->theaplock);
     326      103698 :         b->tsorted = false;
     327      103698 :         b->trevsorted = false;
     328      103698 :         b->tnosorted = 0;
     329      103698 :         b->tnorevsorted = 0;
     330      103698 :         b->tseqbase = oid_nil;
     331      103698 :         b->tkey = false;
     332      103698 :         b->tnokey[0] = 0;
     333      103698 :         b->tnokey[1] = 0;
     334      103698 :         b->theap->dirty = true;
     335      103698 :         BUN cnt = BATcount(b);
     336             : 
     337      103698 :         uint32_t *restrict dst;
     338      433745 :         for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
     339      374512 :                 if (s->start >= nr)
     340             :                         break;
     341      330047 :                 if (s->ts == tr->tid && s->end != s->start) {
     342      143952 :                         if (cnt < s->start) { /* first mark as deleted ! */
     343        3297 :                                 size_t lnr = s->start-cnt;
     344        3297 :                                 size_t pos = cnt;
     345        3297 :                                 dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     346        3297 :                                 uint32_t cur = 0;
     347             : 
     348        3297 :                                 size_t used = pos&31, end = 32;
     349        3297 :                                 if (used) {
     350        3199 :                                         if (lnr < (32-used))
     351        3026 :                                                 end = used + lnr;
     352        3199 :                                         assert(end > used);
     353        3199 :                                         cur |= ((1U << (end - used)) - 1) << used;
     354        3199 :                                         lnr -= end - used;
     355        3199 :                                         *dst++ |= cur;
     356        3199 :                                         cur = 0;
     357             :                                 }
     358        3297 :                                 size_t full = lnr/32;
     359        3297 :                                 size_t rest = lnr%32;
     360        3297 :                                 if (full > 0) {
     361           7 :                                         memset(dst, ~0, full * sizeof(*dst));
     362           7 :                                         dst += full;
     363           7 :                                         lnr -= full * 32;
     364             :                                 }
     365        3297 :                                 if (rest > 0) {
     366         160 :                                         cur |= (1U << rest) - 1;
     367         160 :                                         lnr -= rest;
     368         160 :                                         *dst |= cur;
     369             :                                 }
     370        3297 :                                 assert(lnr==0);
     371             :                         }
     372      143952 :                         size_t lnr = s->end-s->start;
     373      143952 :                         size_t pos = s->start;
     374      143952 :                         dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     375      143952 :                         uint32_t cur = 0;
     376      143952 :                         size_t used = pos&31, end = 32;
     377      143952 :                         if (used) {
     378      106151 :                                 if (lnr < (32-used))
     379      100993 :                                         end = used + lnr;
     380      106151 :                                 assert(end > used);
     381      106151 :                                 cur |= ((1U << (end - used)) - 1) << used;
     382      106151 :                                 lnr -= end - used;
     383      106151 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     384      106151 :                                 dst++;
     385      106151 :                                 cur = 0;
     386             :                         }
     387      143952 :                         size_t full = lnr/32;
     388      143952 :                         size_t rest = lnr%32;
     389      143952 :                         if (full > 0) {
     390        3522 :                                 memset(dst, s->deleted?~0:0, full * sizeof(*dst));
     391        3522 :                                 dst += full;
     392        3522 :                                 lnr -= full * 32;
     393             :                         }
     394      143952 :                         if (rest > 0) {
     395       40208 :                                 cur |= (1U << rest) - 1;
     396       40208 :                                 lnr -= rest;
     397       40208 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     398             :                         }
     399      143952 :                         assert(lnr==0);
     400      143952 :                         if (cnt < s->end)
     401      330047 :                                 cnt = s->end;
     402             :                 }
     403             :         }
     404      103698 :         if (nr > BATcount(b)) {
     405       62385 :                 BATsetcount(b, nr);
     406             :         }
     407      103698 :         MT_lock_unset(&b->theaplock);
     408             : 
     409      103698 :         bat_destroy(b);
     410      103698 :         return LOG_OK;
     411             : }
     412             : 
     413             : /* TODO return LOG_OK/ERR */
     414             : static void
     415      103724 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
     416             : {
     417      103724 :         sqlstore* store = tr->store;
     418      103724 :         segment *cur = s->segs->h, *seg = NULL;
     419      498111 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     420      394387 :                 if (cur->ts == tr->tid) {
     421      158133 :                         if (!cur->deleted)
     422       91592 :                                 cur->oldts = 0;
     423      158133 :                         cur->ts = commit_ts;
     424             :                 }
     425      394387 :                 if (!seg) {
     426             :                         /* first segment */
     427             :                         seg = cur;
     428             :                 }
     429      290663 :                 else if (seg->ts < TRANSACTION_ID_BASE) {
     430             :                         /* possible merge since both deleted flags are equal */
     431      268469 :                         if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
     432      204559 :                                 int merge = 1;
     433      204559 :                                 node *n = store->active->h;
     434      709388 :                                 for (int i = 0; i < store->active->cnt; i++, n = n->next) {
     435      612398 :                                         sql_trans* other = ((sql_trans*)n->data);
     436      612398 :                                         ulng active = other->ts;
     437      612398 :                                         if(other->active == 2)
     438       31811 :                                                 continue; /* pretend that another recently committed transaction is no longer active */
     439      580587 :                                         if (active == tr->ts)
     440      131723 :                                                 continue; /* pretend that committing transaction has already committed and is no longer active */
     441      448864 :                                         if (seg->ts < active && cur->ts < active)
     442             :                                                 break;
     443      436256 :                                         if (seg->ts > active && cur->ts > active)
     444      341295 :                                                 continue;
     445             : 
     446       94961 :                                         assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
     447             :                                         /* cannot safely merge since there is an active transaction between the segments */
     448             :                                         merge = false;
     449             :                                         break;
     450             :                                 }
     451             :                                 /* merge segments */
     452      219196 :                                 if (merge) {
     453      109598 :                                         seg->end = cur->end;
     454      109598 :                                         ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     455      109598 :                                         if (cur == s->segs->t)
     456       27206 :                                                 s->segs->t = seg;
     457      109598 :                                         if (commit_ts == oldest) {
     458       92802 :                                                 ATOMIC_PTR_DESTROY(&cur->next);
     459       92802 :                                                 _DELETE(cur);
     460             :                                         } else
     461       33592 :                                                 mark4destroy(cur, change, commit_ts);
     462      109598 :                                         cur = seg;
     463      109598 :                                         continue;
     464             :                                 }
     465             :                         }
     466             :                 }
     467             :                 seg = cur;
     468             :         }
     469      103724 : }
     470             : 
     471             : static int
     472     2151474 : segments_in_transaction(sql_trans *tr, sql_table *t)
     473             : {
     474     2151474 :         storage *s = ATOMIC_PTR_GET(&t->data);
     475     2151474 :         segment *seg = s->segs->h;
     476             : 
     477     2151474 :         if (seg && s->segs->t->ts == tr->tid)
     478             :                 return 1;
     479      595584 :         for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
     480      490805 :                 if (seg->ts == tr->tid)
     481             :                         return 1;
     482             :         }
     483             :         return 0;
     484             : }
     485             : 
     486             : static size_t
     487    19528763 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
     488             : {
     489    19528763 :         size_t cnt = 0;
     490             : 
     491             :         /* because a table can grow rows over the time a transaction is running, we need to find the last valid segment, to
     492             :          * keep all of the parts aligned */
     493    19528763 :         lock_table(tr->store, table->base.id);
     494    19598998 :         segment *s = segs->h, *l = NULL;
     495             : 
     496    19598998 :         if (segs->t && SEG_IS_VALID(segs->t, tr))
     497    15971276 :                 l = s = segs->t;
     498             : 
     499   222410191 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     500   202811366 :                 if (SEG_IS_VALID(s, tr))
     501             :                                 l = s;
     502             :         }
     503    19598825 :         if (l)
     504    19580431 :                 cnt = l->end;
     505    19598825 :         unlock_table(tr->store, table->base.id);
     506    19597337 :         return cnt;
     507             : }
     508             : 
     509             : static segments *
     510       51254 : new_segments(sql_trans *tr, size_t cnt)
     511             : {
     512       51254 :         segments *n = (segments*)GDKmalloc(sizeof(segments));
     513             : 
     514       51258 :         if (n) {
     515       51258 :                 n->h = n->t = new_segment(NULL, tr, cnt);
     516       51258 :                 if (!n->h) {
     517           0 :                         GDKfree(n);
     518           0 :                         return NULL;
     519             :                 }
     520       51258 :                 sql_ref_init(&n->r);
     521             :         }
     522             :         return n;
     523             : }
     524             : 
     525             : static sql_delta *
     526    32441841 : timestamp_delta( sql_trans *tr, sql_delta *d)
     527             : {
     528    32495460 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     529       53619 :                 d = d->next;
     530    32448549 :         return d;
     531             : }
     532             : 
     533             : static sql_delta *
     534    32287734 : col_timestamp_delta( sql_trans *tr, sql_column *c)
     535             : {
     536    32287734 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
     537             : }
     538             : 
     539             : static sql_delta *
     540       27587 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
     541             : {
     542       27587 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
     543             : }
     544             : 
     545             : static storage *
     546    20019346 : timestamp_storage( sql_trans *tr, storage *d)
     547             : {
     548    20019346 :         if (!d)
     549             :                 return NULL;
     550    20092380 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     551       73034 :                 d = d->next;
     552             :         return d;
     553             : }
     554             : 
     555             : static storage *
     556    20002807 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
     557             : {
     558    20002807 :         return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
     559             : }
     560             : 
     561             : static sql_delta*
     562       19306 : delta_dup(sql_delta *d)
     563             : {
     564       19306 :         ATOMIC_INC(&d->cs.refcnt);
     565       19306 :         return d;
     566             : }
     567             : 
     568             : static void *
     569       17940 : col_dup(sql_column *c)
     570             : {
     571       17940 :         return delta_dup(ATOMIC_PTR_GET(&c->data));
     572             : }
     573             : 
     574             : static void *
     575        2653 : idx_dup(sql_idx *i)
     576             : {
     577        2653 :         if (!ATOMIC_PTR_GET(&i->data))
     578             :                 return NULL;
     579        1366 :         return delta_dup(ATOMIC_PTR_GET(&i->data));
     580             : }
     581             : 
     582             : static storage*
     583        1539 : storage_dup(storage *d)
     584             : {
     585        1539 :         ATOMIC_INC(&d->cs.refcnt);
     586        1539 :         return d;
     587             : }
     588             : 
     589             : static void *
     590        1539 : del_dup(sql_table *t)
     591             : {
     592        1539 :         return storage_dup(ATOMIC_PTR_GET(&t->data));
     593             : }
     594             : 
     595             : static size_t
     596          17 : count_inserts( segment *s, sql_trans *tr)
     597             : {
     598          17 :         size_t cnt = 0;
     599             : 
     600          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     601          55 :                 if (!s->deleted && s->ts == tr->tid)
     602           4 :                         cnt += s->end - s->start;
     603             :         }
     604          17 :         return cnt;
     605             : }
     606             : 
     607             : static size_t
     608      841852 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
     609             : {
     610      841852 :         size_t cnt = 0;
     611             : 
     612      988696 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
     613             :                 ;
     614             : 
     615     4354789 :         for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
     616     3512981 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
     617     1069703 :                         cnt += s->end - s->start;
     618             :         }
     619      841808 :         return cnt;
     620             : }
     621             : 
     622             : static size_t
     623          17 : count_deletes( segment *s, sql_trans *tr)
     624             : {
     625          17 :         size_t cnt = 0;
     626             : 
     627          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     628          55 :                 if (SEG_IS_DELETED(s, tr))
     629          17 :                         cnt += s->end - s->start;
     630             :         }
     631          17 :         return cnt;
     632             : }
     633             : 
     634             : #define CNT_ACTIVE 10
     635             : 
     636             : static size_t
     637    18297840 : count_col(sql_trans *tr, sql_column *c, int access)
     638             : {
     639    18297840 :         storage *d;
     640    18297840 :         sql_delta *ds;
     641             : 
     642    18297840 :         if (!isTable(c->t))
     643             :                 return 0;
     644    18297840 :         d = tab_timestamp_storage(tr, c->t);
     645    18307940 :         ds = col_timestamp_delta(tr, c);
     646    18312646 :         if (!d ||!ds)
     647             :                 return 0;
     648    18312646 :         if (access == 2)
     649      481592 :                 return ds?ds->cs.ucnt:0;
     650    17831054 :         if (access == 1)
     651          17 :                 return count_inserts(d->segs->h, tr);
     652    17831037 :         if (access == QUICK)
     653           0 :                 return d->segs->t?d->segs->t->end:0;
     654    17831037 :         if (access == CNT_ACTIVE) {
     655      841465 :                 size_t cnt = segs_end(d->segs, tr, c->t);
     656      841948 :                 lock_table(tr->store, c->t->base.id);
     657      841885 :                 cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
     658      841827 :                 unlock_table(tr->store, c->t->base.id);
     659      841827 :                 return cnt;
     660             :         }
     661    16989572 :         return segs_end(d->segs, tr, c->t);
     662             : }
     663             : 
     664             : static size_t
     665       23528 : count_idx(sql_trans *tr, sql_idx *i, int access)
     666             : {
     667       23528 :         storage *d;
     668       23528 :         sql_delta *ds;
     669             : 
     670       23528 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
     671        6719 :                 return 0;
     672       16807 :         d = tab_timestamp_storage(tr, i->t);
     673       16822 :         ds = idx_timestamp_delta(tr, i);
     674       16828 :         if (!d || !ds)
     675             :                 return 0;
     676       16828 :         if (access == 2)
     677        2871 :                 return ds?ds->cs.ucnt:0;
     678       13957 :         if (access == 1)
     679           0 :                 return count_inserts(d->segs->h, tr);
     680       13957 :         if (access == QUICK)
     681        3550 :                 return d->segs->t?d->segs->t->end:0;
     682       10407 :         return segs_end(d->segs, tr, i->t);
     683             : }
     684             : 
     685             : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
     686             : static BAT *
     687    13594550 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
     688             : {
     689    13594550 :         BAT *b;
     690             : 
     691    13594550 :         assert(access == RD_UPD_ID || access == RD_UPD_VAL);
     692             :         /* returns the updates for cs */
     693    13594550 :         if (cs->uibid && cs->uvbid && cs->ucnt) {
     694       12106 :                 if (access == RD_UPD_ID) {
     695        7301 :                         if (!(b = temp_descriptor(cs->uibid)))
     696             :                                 return NULL;
     697        7301 :                         if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
     698        1681 :                            (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
     699        5620 :                                         oid nil = oid_nil;
     700             :                                         /* less then cnt */
     701        5620 :                                         BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false);
     702        5620 :                                         if (!s) {
     703           0 :                                                 bat_destroy(b);
     704           0 :                                                 return NULL;
     705             :                                         }
     706             : 
     707        5620 :                                         BAT *nb = BATproject(s, b);
     708        5620 :                                         bat_destroy(s);
     709        5620 :                                         bat_destroy(b);
     710        5620 :                                         b = nb;
     711             :                         }
     712             :                 } else {
     713        4805 :                         b = temp_descriptor(cs->uvbid);
     714             :                 }
     715             :         } else {
     716    22638045 :                 b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
     717             :         }
     718             :         return b;
     719             : }
     720             : 
     721             : static BAT *
     722           0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
     723             : {
     724           0 :         int err = 0;
     725           0 :         BAT *uv = *UV;
     726           0 :         BUN cnt = BATcount(ui)+BATcount(oi);
     727           0 :         BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
     728           0 :         BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
     729             : 
     730           0 :         if (!ni || (uv && !nv)) {
     731           0 :                 bat_destroy(ni);
     732           0 :                 bat_destroy(nv);
     733           0 :                 bat_destroy(ui);
     734           0 :                 bat_destroy(uv);
     735           0 :                 bat_destroy(oi);
     736           0 :                 bat_destroy(ov);
     737           0 :                 return NULL;
     738             :         }
     739           0 :         BATiter uvi;
     740           0 :         BATiter ovi;
     741             : 
     742           0 :         if (uv) {
     743           0 :                 uvi = bat_iterator(uv);
     744           0 :                 ovi = bat_iterator(ov);
     745             :         }
     746             : 
     747             :         /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
     748           0 :         BUN uip = 0, uie = BATcount(ui);
     749           0 :         BUN oip = 0, oie = BATcount(oi);
     750             : 
     751           0 :         oid uiseqb = ui->tseqbase;
     752           0 :         oid oiseqb = oi->tseqbase;
     753           0 :         oid *uipt = NULL, *oipt = NULL;
     754           0 :         BATiter uii = bat_iterator(ui);
     755           0 :         BATiter oii = bat_iterator(oi);
     756           0 :         if (!BATtdensebi(&uii))
     757           0 :                 uipt = uii.base;
     758           0 :         if (!BATtdensebi(&oii))
     759           0 :                 oipt = oii.base;
     760           0 :         while (uip < uie && oip < oie && !err) {
     761           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     762           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     763             : 
     764           0 :                 if (uiid <= oiid) {
     765           0 :                         if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     766           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     767             :                                 err = 1;
     768           0 :                         uip++;
     769           0 :                         if (uiid == oiid)
     770           0 :                                 oip++;
     771             :                 } else { /* uiid > oiid */
     772           0 :                         if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     773           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     774             :                                 err = 1;
     775           0 :                         oip++;
     776             :                 }
     777             :         }
     778           0 :         while (uip < uie && !err) {
     779           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     780           0 :                 if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     781           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     782             :                         err = 1;
     783           0 :                 uip++;
     784             :         }
     785           0 :         while (oip < oie && !err) {
     786           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     787           0 :                 if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     788           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     789             :                         err = 1;
     790           0 :                 oip++;
     791             :         }
     792           0 :         if (uv) {
     793           0 :                 bat_iterator_end(&uvi);
     794           0 :                 bat_iterator_end(&ovi);
     795             :         }
     796           0 :         bat_iterator_end(&uii);
     797           0 :         bat_iterator_end(&oii);
     798           0 :         bat_destroy(ui);
     799           0 :         bat_destroy(uv);
     800           0 :         bat_destroy(oi);
     801           0 :         bat_destroy(ov);
     802           0 :         if (!err) {
     803           0 :                 if (nv)
     804           0 :                         *UV = nv;
     805           0 :                 return ni;
     806             :         }
     807           0 :         *UV = NULL;
     808           0 :         bat_destroy(ni);
     809           0 :         bat_destroy(nv);
     810           0 :         return NULL;
     811             : }
     812             : 
     813             : static sql_delta *
     814     9066581 : older_delta( sql_delta *d, sql_trans *tr)
     815             : {
     816     9066581 :         sql_delta *o = d->next;
     817             : 
     818     9074028 :         while (o && !o->cs.merged) {
     819        7407 :                 if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     820             :                         break;
     821             :                 else
     822        7447 :                         o = o->next;
     823             :         }
     824     9066621 :         if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     825           0 :                 return o;
     826             :         return NULL;
     827             : }
     828             : 
     829             : static BAT *
     830     9062843 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
     831             : {
     832     9062843 :         assert(tr->active);
     833     9062843 :         sql_delta *o = NULL;
     834     9062843 :         BAT *ui = NULL, *uv = NULL;
     835             : 
     836     9062843 :         if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
     837             :                 return NULL;
     838     9066228 :         if (access == RD_UPD_VAL) {
     839     4534577 :                 if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
     840           0 :                         bat_destroy(ui);
     841           0 :                         return NULL;
     842             :                 }
     843             :         }
     844     9066496 :         while ((o = older_delta(d, tr)) != NULL) {
     845           0 :                 BAT *oui = NULL, *ouv = NULL;
     846           0 :                 if (!oui)
     847           0 :                         oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
     848           0 :                 if (access == RD_UPD_VAL)
     849           0 :                         ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
     850           0 :                 if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
     851           0 :                         bat_destroy(ui);
     852           0 :                         bat_destroy(uv);
     853           0 :                         bat_destroy(oui);
     854           0 :                         bat_destroy(ouv);
     855           0 :                         return NULL;
     856             :                 }
     857           0 :                 if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
     858             :                         return NULL;
     859             :                 d = o;
     860             :         }
     861     9066644 :         if (uv) {
     862     4534851 :                 bat_destroy(ui);
     863     4534851 :                 return uv;
     864             :         }
     865             :         return ui;
     866             : }
     867             : 
     868             : static BAT *
     869        2312 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
     870             : {
     871        2312 :         lock_column(tr->store, c->base.id);
     872        2312 :         sql_delta *d = col_timestamp_delta(tr, c);
     873        2312 :         int type = c->type.type->localtype;
     874             : 
     875        2312 :         if (!d) {
     876           0 :                 unlock_column(tr->store, c->base.id);
     877           0 :                 return NULL;
     878             :         }
     879        2312 :         if (d->cs.st == ST_DICT) {
     880           0 :                 BAT *b = quick_descriptor(d->cs.bid);
     881             : 
     882           0 :                 type = b->ttype;
     883             :         }
     884        2312 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     885        2312 :         unlock_column(tr->store, c->base.id);
     886        2312 :         return bn;
     887             : }
     888             : 
     889             : static BAT *
     890           0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
     891             : {
     892           0 :         lock_column(tr->store, i->base.id);
     893           0 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     894           0 :         sql_delta *d = idx_timestamp_delta(tr, i);
     895             : 
     896           0 :         if (!d) {
     897           0 :                 unlock_column(tr->store, i->base.id);
     898           0 :                 return NULL;
     899             :         }
     900           0 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     901           0 :         unlock_column(tr->store, i->base.id);
     902           0 :         return bn;
     903             : }
     904             : 
     905             : static BAT *
     906     9486517 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
     907             : {
     908     9486517 :         BAT *b;
     909             : 
     910     9486517 :         assert(access == RDONLY || access == QUICK || access == RD_EXT);
     911     9486517 :         assert(cs != NULL);
     912     9486517 :         if (access == QUICK)
     913      175751 :                 return quick_descriptor(cs->bid);
     914     9310766 :         if (access == RD_EXT)
     915         857 :                 return temp_descriptor(cs->ebid);
     916     9309909 :         assert(cs->bid);
     917     9309909 :         b = temp_descriptor(cs->bid);
     918     9310238 :         if (b == NULL)
     919             :                 return NULL;
     920     9310238 :         assert(b->batRestricted == BAT_READ);
     921             :         /* return slice */
     922     9310238 :         BAT *s = BATslice(b, 0, cnt);
     923     9296593 :         bat_destroy(b);
     924     9296593 :         return s;
     925             : }
     926             : 
     927             : static int
     928     4530054 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
     929             : {
     930     4530054 :         lock_column(tr->store, c->base.id);
     931     4530701 :         size_t cnt = count_col(tr, c, 0);
     932     4532456 :         sql_delta *d = col_timestamp_delta(tr, c);
     933     4532482 :         int type = c->type.type->localtype;
     934             : 
     935     4532482 :         if (!d) {
     936           0 :                 unlock_column(tr->store, c->base.id);
     937           0 :                 return LOG_ERR;
     938             :         }
     939     4532482 :         if (d->cs.st == ST_DICT) {
     940           2 :                 BAT *b = quick_descriptor(d->cs.bid);
     941             : 
     942           2 :                 type = b->ttype;
     943             :         }
     944             : 
     945     4532482 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     946     4532415 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     947             : 
     948     4532313 :         unlock_column(tr->store, c->base.id);
     949             : 
     950     4532120 :         if (*ui == NULL || *uv == NULL) {
     951           0 :                 bat_destroy(*ui);
     952           0 :                 bat_destroy(*uv);
     953           0 :                 return LOG_ERR;
     954             :         }
     955             :         return LOG_OK;
     956             : }
     957             : 
     958             : static int
     959          23 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
     960             : {
     961          23 :         lock_column(tr->store, i->base.id);
     962          23 :         size_t cnt = count_idx(tr, i, 0);
     963          23 :         sql_delta *d = idx_timestamp_delta(tr, i);
     964          23 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     965             : 
     966          23 :         if (!d) {
     967           0 :                 unlock_column(tr->store, i->base.id);
     968           0 :                 return LOG_ERR;
     969             :         }
     970             : 
     971          23 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     972          23 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     973             : 
     974          23 :         unlock_column(tr->store, i->base.id);
     975             : 
     976          23 :         if (*ui == NULL || *uv == NULL) {
     977           0 :                 bat_destroy(*ui);
     978           0 :                 bat_destroy(*uv);
     979           0 :                 return LOG_ERR;
     980             :         }
     981             :         return LOG_OK;
     982             : }
     983             : 
     984             : static void *                                   /* BAT * */
     985     9474190 : bind_col(sql_trans *tr, sql_column *c, int access)
     986             : {
     987     9474190 :         assert(access == QUICK || tr->active);
     988     9474190 :         if (!isTable(c->t))
     989             :                 return NULL;
     990     9474190 :         sql_delta *d = col_timestamp_delta(tr, c);
     991     9475971 :         if (!d)
     992             :                 return NULL;
     993     9475971 :         size_t cnt = count_col(tr, c, 0);
     994     9479360 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
     995        2312 :                 return bind_ucol(tr, c, access, cnt);
     996     9477048 :         BAT *b = cs_bind_bat( &d->cs, access, cnt);
     997     9469423 :         assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
     998             :         return b;
     999             : }
    1000             : 
    1001             : static void *                                   /* BAT * */
    1002       10772 : bind_idx(sql_trans *tr, sql_idx * i, int access)
    1003             : {
    1004       10772 :         assert(access == QUICK || tr->active);
    1005       10772 :         if (!isTable(i->t))
    1006             :                 return NULL;
    1007       10772 :         sql_delta *d = idx_timestamp_delta(tr, i);
    1008       10774 :         if (!d)
    1009             :                 return NULL;
    1010       10774 :         size_t cnt = count_idx(tr, i, 0);
    1011       10776 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
    1012           0 :                 return bind_uidx(tr, i, access, cnt);
    1013       10776 :         return cs_bind_bat( &d->cs, access, cnt);
    1014             : }
    1015             : 
    1016             : static int
    1017        2296 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
    1018             : {
    1019        2296 :         if (!cs->uibid) {
    1020           0 :                 cs->uibid = e_bat(TYPE_oid);
    1021           0 :                 if (cs->uibid == BID_NIL)
    1022             :                         return LOG_ERR;
    1023             :         }
    1024        2296 :         if (!cs->uvbid) {
    1025           0 :                 BAT *cur = quick_descriptor(cs->bid);
    1026           0 :                 if (!cur)
    1027             :                         return LOG_ERR;
    1028           0 :                 int type = cur->ttype;
    1029           0 :                 cs->uvbid = e_bat(type);
    1030           0 :                 if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1031             :                         return LOG_ERR;
    1032             :         }
    1033        2296 :         BAT *ui = temp_descriptor(cs->uibid);
    1034        2296 :         BAT *uv = temp_descriptor(cs->uvbid);
    1035             : 
    1036        2296 :         if (ui == NULL || uv == NULL) {
    1037           0 :                 bat_destroy(ui);
    1038           0 :                 bat_destroy(uv);
    1039           0 :                 return LOG_ERR;
    1040             :         }
    1041        2296 :         assert(ui && uv);
    1042        2296 :         if (isEbat(ui)){
    1043         392 :                 temp_destroy(cs->uibid);
    1044         392 :                 cs->uibid = temp_copy(ui->batCacheid, true, true);
    1045         392 :                 bat_destroy(ui);
    1046         392 :                 if (cs->uibid == BID_NIL ||
    1047         392 :                     (ui = temp_descriptor(cs->uibid)) == NULL) {
    1048           0 :                         bat_destroy(uv);
    1049           0 :                         return LOG_ERR;
    1050             :                 }
    1051             :         }
    1052        2296 :         if (isEbat(uv)){
    1053         392 :                 temp_destroy(cs->uvbid);
    1054         392 :                 cs->uvbid = temp_copy(uv->batCacheid, true, true);
    1055         392 :                 bat_destroy(uv);
    1056         392 :                 if (cs->uvbid == BID_NIL ||
    1057         392 :                     (uv = temp_descriptor(cs->uvbid)) == NULL) {
    1058           0 :                         bat_destroy(ui);
    1059           0 :                         return LOG_ERR;
    1060             :                 }
    1061             :         }
    1062        2296 :         *Ui = ui;
    1063        2296 :         *Uv = uv;
    1064        2296 :         return LOG_OK;
    1065             : }
    1066             : 
    1067             : static int
    1068        5116 : segments_is_append(segment *s, sql_trans *tr, oid rid)
    1069             : {
    1070       44691 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1071       44691 :                 if (s->start <= rid && s->end > rid) {
    1072        5116 :                         if (s->ts == tr->tid && !s->deleted) {
    1073        2874 :                                 return 1;
    1074             :                         }
    1075             :                         break;
    1076             :                 }
    1077             :         }
    1078             :         return 0;
    1079             : }
    1080             : 
    1081             : static int
    1082        2242 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
    1083             : {
    1084       38593 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1085       38593 :                 if (s->start <= rid && s->end > rid) {
    1086        2242 :                         if (s->ts >= tr->ts && s->deleted) {
    1087           0 :                                 return 1;
    1088             :                         }
    1089             :                         break;
    1090             :                 }
    1091             :         }
    1092             :         return 0;
    1093             : }
    1094             : 
    1095             : static sql_delta *
    1096           0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
    1097             : {
    1098           0 :         sql_delta *n = ZNEW(sql_delta);
    1099           0 :         if (!n)
    1100             :                 return NULL;
    1101           0 :         *n = *bat;
    1102           0 :         n->next = NULL;
    1103           0 :         n->cs.ts = tr->tid;
    1104           0 :         return n;
    1105             : }
    1106             : 
    1107             : static BAT *
    1108          17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
    1109             : {
    1110          17 :         BAT *newoffsets = NULL;
    1111          17 :         sql_delta *bat = *batp;
    1112          17 :         column_storage *cs = &bat->cs;
    1113          17 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1114             : 
    1115          17 :         if (!u)
    1116             :                 return NULL;
    1117          17 :         BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
    1118          17 :         if (DICTprepare4append(&newoffsets, i, u) < 0) {
    1119           0 :                 bat_destroy(u);
    1120           0 :                 return NULL;
    1121             :         } else {
    1122          17 :                 int new = 0;
    1123             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1124          17 :                 if (BATcount(u) >= max_cnt) {
    1125           1 :                         if (max_cnt == 64*1024) { /* decompress */
    1126           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1127           0 :                                         bat_destroy(u);
    1128           0 :                                         return NULL;
    1129             :                                 }
    1130           0 :                                 if (cs->ucnt) {
    1131           0 :                                         BAT *ui = NULL, *uv = NULL;
    1132           0 :                                         BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1133           0 :                                         bat_destroy(b);
    1134           0 :                                         if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1135           0 :                                                 bat_destroy(nb);
    1136           0 :                                                 bat_destroy(u);
    1137           0 :                                                 return NULL;
    1138             :                                         }
    1139           0 :                                         b = nb;
    1140           0 :                                         if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1141           0 :                                                 bat_destroy(ui);
    1142           0 :                                                 bat_destroy(uv);
    1143           0 :                                                 bat_destroy(b);
    1144           0 :                                                 bat_destroy(u);
    1145             :                                         }
    1146           0 :                                         bat_destroy(ui);
    1147           0 :                                         bat_destroy(uv);
    1148             :                                 }
    1149           0 :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1150           0 :                                 bat_destroy(b);
    1151           0 :                                 assert(newoffsets == NULL);
    1152           0 :                                 if (!n) {
    1153           0 :                                         bat_destroy(u);
    1154           0 :                                         return NULL;
    1155             :                                 }
    1156           0 :                                 if (cs->ts != tr->tid) {
    1157           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1158           0 :                                                 bat_destroy(n);
    1159           0 :                                                 return NULL;
    1160             :                                         }
    1161           0 :                                         cs = &(*batp)->cs;
    1162           0 :                                         new = 1;
    1163             :                                 }
    1164           0 :                                 if (cs->bid && !new)
    1165           0 :                                         temp_destroy(cs->bid);
    1166           0 :                                 n = transfer_to_systrans(n);
    1167           0 :                                 if (n == NULL)
    1168             :                                         return NULL;
    1169           0 :                                 bat_set_access(n, BAT_READ);
    1170           0 :                                 cs->bid = temp_create(n);
    1171           0 :                                 bat_destroy(n);
    1172           0 :                                 if (cs->ebid && !new)
    1173           0 :                                         temp_destroy(cs->ebid);
    1174           0 :                                 cs->ebid = 0;
    1175           0 :                                 cs->ucnt = 0;
    1176           0 :                                 if (cs->uibid && !new)
    1177           0 :                                         temp_destroy(cs->uibid);
    1178           0 :                                 if (cs->uvbid && !new)
    1179           0 :                                         temp_destroy(cs->uvbid);
    1180           0 :                                 cs->uibid = cs->uvbid = 0;
    1181           0 :                                 cs->st = ST_DEFAULT;
    1182           0 :                                 cs->cleared = true;
    1183             :                         } else {
    1184           1 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1185           0 :                                         bat_destroy(newoffsets);
    1186           0 :                                         bat_destroy(u);
    1187           0 :                                         return NULL;
    1188             :                                 }
    1189           1 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), PERSISTENT);
    1190           1 :                                 bat_destroy(b);
    1191           1 :                                 if (!n) {
    1192           0 :                                         bat_destroy(newoffsets);
    1193           0 :                                         bat_destroy(u);
    1194           0 :                                         return NULL;
    1195             :                                 }
    1196           1 :                                 if (cs->ts != tr->tid) {
    1197           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1198           0 :                                                 bat_destroy(n);
    1199           0 :                                                 return NULL;
    1200             :                                         }
    1201           0 :                                         cs = &(*batp)->cs;
    1202           0 :                                         new = 1;
    1203           0 :                                         temp_dup(cs->ebid);
    1204           0 :                                         if (cs->uibid) {
    1205           0 :                                                 temp_dup(cs->uibid);
    1206           0 :                                                 temp_dup(cs->uvbid);
    1207             :                                         }
    1208             :                                 }
    1209           1 :                                 if (cs->bid && !new)
    1210           1 :                                         temp_destroy(cs->bid);
    1211           1 :                                 n = transfer_to_systrans(n);
    1212           1 :                                 if (n == NULL)
    1213             :                                         return NULL;
    1214           1 :                                 bat_set_access(n, BAT_READ);
    1215           1 :                                 cs->bid = temp_create(n);
    1216           1 :                                 bat_destroy(n);
    1217           1 :                                 cs->cleared = true;
    1218           1 :                                 i = newoffsets;
    1219             :                         }
    1220             :                 } else { /* append */
    1221          16 :                         i = newoffsets;
    1222             :                 }
    1223             :         }
    1224          17 :         bat_destroy(u);
    1225          17 :         return i;
    1226             : }
    1227             : 
    1228             : static BAT *
    1229           0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
    1230             : {
    1231           0 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1232           0 :         BAT *newoffsets = NULL;
    1233           0 :         BAT *b = NULL, *n = NULL;
    1234             : 
    1235           0 :         if (!(b = temp_descriptor(cs->bid)))
    1236             :                 return NULL;
    1237             : 
    1238           0 :         if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
    1239           0 :                 bat_destroy(b);
    1240           0 :                 return NULL;
    1241             :         } else {
    1242             :                 /* returns new offset bat if values within min/max, else decompress */
    1243           0 :                 if (!newoffsets) { /* decompress */
    1244           0 :                         if (cs->ucnt) {
    1245           0 :                                 BAT *ui = NULL, *uv = NULL;
    1246           0 :                                 BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1247           0 :                                 bat_destroy(b);
    1248           0 :                                 if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1249           0 :                                         bat_destroy(nb);
    1250           0 :                                         return NULL;
    1251             :                                 }
    1252           0 :                                 b = nb;
    1253           0 :                                 if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1254           0 :                                         bat_destroy(ui);
    1255           0 :                                         bat_destroy(uv);
    1256           0 :                                         bat_destroy(b);
    1257             :                                 }
    1258           0 :                                 bat_destroy(ui);
    1259           0 :                                 bat_destroy(uv);
    1260             :                         }
    1261           0 :                         n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
    1262           0 :                         bat_destroy(b);
    1263           0 :                         if (!n)
    1264             :                                 return NULL;
    1265           0 :                         if (cs->bid)
    1266           0 :                                 temp_destroy(cs->bid);
    1267           0 :                         n = transfer_to_systrans(n);
    1268           0 :                         if (n == NULL)
    1269             :                                 return NULL;
    1270           0 :                         bat_set_access(n, BAT_READ);
    1271           0 :                         cs->bid = temp_create(n);
    1272           0 :                         cs->ucnt = 0;
    1273           0 :                         if (cs->uibid)
    1274           0 :                                 temp_destroy(cs->uibid);
    1275           0 :                         if (cs->uvbid)
    1276           0 :                                 temp_destroy(cs->uvbid);
    1277           0 :                         cs->uibid = cs->uvbid = 0;
    1278           0 :                         cs->st = ST_DEFAULT;
    1279           0 :                         cs->cleared = true;
    1280           0 :                         b = n;
    1281             :                 } else { /* append */
    1282             :                         i = newoffsets;
    1283             :                 }
    1284             :         }
    1285           0 :         bat_destroy(b);
    1286           0 :         return i;
    1287             : }
    1288             : 
    1289             : /*
    1290             :  * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
    1291             :  */
    1292             : static int
    1293        2912 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1294             : {
    1295        2912 :         int res = LOG_OK;
    1296        2912 :         sql_delta *bat = *batp;
    1297        2912 :         column_storage *cs = &bat->cs;
    1298        2912 :         BAT *otids = tids, *oupdates = updates;
    1299             : 
    1300        2912 :         if (!BATcount(tids))
    1301             :                 return LOG_OK;
    1302             : 
    1303        2912 :         if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
    1304           6 :                 tids = BATunmask(tids);
    1305           6 :                 if (!tids)
    1306             :                         return LOG_ERR;
    1307             :         }
    1308        2912 :         if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
    1309           0 :                 updates = BATunmask(updates);
    1310           0 :                 if (!updates) {
    1311           0 :                         if (otids != tids)
    1312           0 :                                 bat_destroy(tids);
    1313           0 :                         return LOG_ERR;
    1314             :                 }
    1315        2912 :         } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
    1316          42 :                 updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
    1317          42 :                 if (!updates) {
    1318           0 :                         if (otids != tids)
    1319           0 :                                 bat_destroy(tids);
    1320           0 :                         return LOG_ERR;
    1321             :                 }
    1322             :         }
    1323             : 
    1324        2912 :         if (cs->st == ST_DICT) {
    1325             :                 /* possibly a new array is returned */
    1326           4 :                 BAT *nupdates = dict_append_bat(tr, batp, updates);
    1327           4 :                 bat = *batp;
    1328           4 :                 cs = &bat->cs;
    1329           4 :                 if (oupdates != updates)
    1330           0 :                         bat_destroy(updates);
    1331           4 :                 updates = nupdates;
    1332           4 :                 if (!updates) {
    1333           0 :                         if (otids != tids)
    1334           0 :                                 bat_destroy(tids);
    1335           0 :                         return LOG_ERR;
    1336             :                 }
    1337             :         }
    1338             : 
    1339             :         /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1340             :         /* currently only one update delta is possible */
    1341        2912 :         lock_table(tr->store, t->base.id);
    1342        2912 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1343        2912 :         if (!is_new && !cs->cleared) {
    1344        2589 :                 if (!tids->tsorted /* make sure we have simple dense or oids */) {
    1345           6 :                         BAT *sorted, *order;
    1346           6 :                         if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
    1347           0 :                                 if (otids != tids)
    1348           0 :                                         bat_destroy(tids);
    1349           0 :                                 if (oupdates != updates)
    1350           0 :                                         bat_destroy(updates);
    1351           0 :                                 unlock_table(tr->store, t->base.id);
    1352           0 :                                 return LOG_ERR;
    1353             :                         }
    1354           6 :                         if (otids != tids)
    1355           0 :                                 bat_destroy(tids);
    1356           6 :                         tids = sorted;
    1357           6 :                         BAT *nupdates = BATproject(order, updates);
    1358           6 :                         bat_destroy(order);
    1359           6 :                         if (oupdates != updates)
    1360           0 :                                 bat_destroy(updates);
    1361           6 :                         updates = nupdates;
    1362           6 :                         if (!updates) {
    1363           0 :                                 bat_destroy(tids);
    1364           0 :                                 unlock_table(tr->store, t->base.id);
    1365           0 :                                 return LOG_ERR;
    1366             :                         }
    1367             :                 }
    1368        2589 :                 assert(tids->tsorted);
    1369        2589 :                 BAT *ui = NULL, *uv = NULL;
    1370             : 
    1371             :                 /* handle updates on just inserted bits */
    1372             :                 /* handle updates on updates (within one transaction) */
    1373        2589 :                 BATiter upi = bat_iterator(updates);
    1374        2589 :                 BUN cnt = 0, ucnt = BATcount(tids);
    1375        2589 :                 BAT *b, *ins = NULL;
    1376        2589 :                 int *msk = NULL;
    1377             : 
    1378        2589 :                 if((b = temp_descriptor(cs->bid)) == NULL)
    1379             :                         res = LOG_ERR;
    1380             : 
    1381        2589 :                 if (res == LOG_OK && BATtdense(tids)) {
    1382        2457 :                         oid start = tids->tseqbase, offset = start;
    1383        2457 :                         oid end = start + ucnt;
    1384             : 
    1385        8121 :                         for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
    1386        6264 :                                 if (seg->start <= start && seg->end > start) {
    1387             :                                         /* check for delete conflicts */
    1388        2457 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1389           0 :                                                 res = LOG_CONFLICT;
    1390           0 :                                                 continue;
    1391             :                                         }
    1392             : 
    1393             :                                         /* check for inplace updates */
    1394        2457 :                                         BUN lend = end < seg->end?end:seg->end;
    1395        2457 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1396          86 :                                                 if (!ins) {
    1397          86 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1398          86 :                                                         if (!ins)
    1399             :                                                                 res = LOG_ERR;
    1400             :                                                         else {
    1401          86 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1402          86 :                                                                 msk = (int*)Tloc(ins, 0);
    1403          86 :                                                                 BUN end = (ucnt+31)/32;
    1404          86 :                                                                 memset(msk, 0, end * sizeof(int));
    1405             :                                                         }
    1406             :                                                 }
    1407         361 :                                                 for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
    1408         275 :                                                         const void *upd = BUNtail(upi, rid-offset);
    1409         275 :                                                         if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1410           0 :                                                                 res = LOG_ERR;
    1411             : 
    1412         275 :                                                         oid word = i/32;
    1413         275 :                                                         int pos = i%32;
    1414         275 :                                                         msk[word] |= 1U<<pos;
    1415         275 :                                                         cnt++;
    1416             :                                                 }
    1417             :                                         }
    1418             :                                 }
    1419        6264 :                                 if (end < seg->end)
    1420             :                                         break;
    1421             :                         }
    1422         135 :                 } else if (res == LOG_OK && complex_cand(tids)) {
    1423           3 :                         struct canditer ci;
    1424           3 :                         segment *seg = s->segs->h;
    1425           3 :                         canditer_init(&ci, NULL, tids);
    1426           3 :                         BUN i = 0;
    1427        1036 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1428        1033 :                                 oid rid = canditer_next(&ci);
    1429        1033 :                                 if (seg->end <= rid)
    1430          13 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1431        1020 :                                 else if (seg->start <= rid && seg->end > rid) {
    1432             :                                         /* check for delete conflicts */
    1433        1020 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1434           0 :                                                 res = LOG_CONFLICT;
    1435           0 :                                                 continue;
    1436             :                                         }
    1437             : 
    1438             :                                         /* check for inplace updates */
    1439        1020 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1440           0 :                                                 if (!ins) {
    1441           0 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1442           0 :                                                         if (!ins) {
    1443             :                                                                 res = LOG_ERR;
    1444             :                                                                 break;
    1445             :                                                         } else {
    1446           0 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1447           0 :                                                                 msk = (int*)Tloc(ins, 0);
    1448           0 :                                                                 BUN end = (ucnt+31)/32;
    1449           0 :                                                                 memset(msk, 0, end * sizeof(int));
    1450             :                                                         }
    1451             :                                                 }
    1452           0 :                                                 ptr upd = BUNtail(upi, i);
    1453           0 :                                                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1454           0 :                                                         res = LOG_ERR;
    1455             : 
    1456           0 :                                                 oid word = i/32;
    1457           0 :                                                 int pos = i%32;
    1458           0 :                                                 msk[word] |= 1U<<pos;
    1459           0 :                                                 cnt++;
    1460             :                                         }
    1461        1020 :                                         i++;
    1462             :                                 }
    1463             :                         }
    1464         129 :                 } else if (res == LOG_OK) {
    1465         129 :                         BUN i = 0;
    1466         129 :                         oid *rid = Tloc(tids,0);
    1467         129 :                         segment *seg = s->segs->h;
    1468       14304 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1469       14175 :                                 if (seg->end <= rid[i])
    1470        3719 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1471       10456 :                                 else if (seg->start <= rid[i] && seg->end > rid[i]) {
    1472             :                                         /* check for delete conflicts */
    1473       10456 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1474           0 :                                                 res = LOG_CONFLICT;
    1475           0 :                                                 continue;
    1476             :                                         }
    1477             : 
    1478             :                                         /* check for inplace updates */
    1479       10456 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1480         291 :                                                 if (!ins) {
    1481          28 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1482          28 :                                                         if (!ins) {
    1483             :                                                                 res = LOG_ERR;
    1484             :                                                                 break;
    1485             :                                                         } else {
    1486          28 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1487          28 :                                                                 msk = (int*)Tloc(ins, 0);
    1488          28 :                                                                 BUN end = (ucnt+31)/32;
    1489          28 :                                                                 memset(msk, 0, end * sizeof(int));
    1490             :                                                         }
    1491             :                                                 }
    1492         291 :                                                 const void *upd = BUNtail(upi, i);
    1493         291 :                                                 if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
    1494           0 :                                                         res = LOG_ERR;
    1495             : 
    1496         291 :                                                 oid word = i/32;
    1497         291 :                                                 int pos = i%32;
    1498         291 :                                                 msk[word] |= 1U<<pos;
    1499         291 :                                                 cnt++;
    1500             :                                         }
    1501       10456 :                                         i++;
    1502             :                                 }
    1503             :                         }
    1504             :                 }
    1505             : 
    1506        2589 :                 if (res == LOG_OK && cnt < ucnt) {   /* now handle real updates */
    1507        2475 :                         if (cs->ucnt == 0) {
    1508        2421 :                                 if (cnt) {
    1509           0 :                                         BAT *nins = BATmaskedcands(0, ucnt, ins, false);
    1510           0 :                                         if (nins) {
    1511           0 :                                                 ui = BATproject(nins, tids);
    1512           0 :                                                 uv = BATproject(nins, updates);
    1513           0 :                                                 bat_destroy(nins);
    1514             :                                         }
    1515             :                                 } else {
    1516        2421 :                                         ui = temp_descriptor(tids->batCacheid);
    1517        2421 :                                         uv = temp_descriptor(updates->batCacheid);
    1518             :                                 }
    1519        2421 :                                 if (!ui || !uv) {
    1520             :                                         res = LOG_ERR;
    1521             :                                 } else {
    1522        2421 :                                         temp_destroy(cs->uibid);
    1523        2421 :                                         temp_destroy(cs->uvbid);
    1524        2421 :                                         ui = transfer_to_systrans(ui);
    1525        2421 :                                         uv = transfer_to_systrans(uv);
    1526        2421 :                                         if (ui == NULL || uv == NULL) {
    1527           0 :                                                 BBPreclaim(ui);
    1528           0 :                                                 BBPreclaim(uv);
    1529             :                                                 res = LOG_ERR;
    1530             :                                         } else {
    1531        2421 :                                                 cs->uibid = temp_create(ui);
    1532        2421 :                                                 cs->uvbid = temp_create(uv);
    1533        2421 :                                                 cs->ucnt = BATcount(ui);
    1534             :                                         }
    1535             :                                 }
    1536             :                         } else {
    1537          54 :                                 BAT *nui = NULL, *nuv = NULL;
    1538             : 
    1539             :                                 /* merge taking msk of inserted into account */
    1540          54 :                                 if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
    1541             :                                         res = LOG_ERR;
    1542             : 
    1543          54 :                                 if (res == LOG_OK) {
    1544          54 :                                         const void *upd = NULL;
    1545          54 :                                         nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
    1546          54 :                                         nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
    1547             : 
    1548          54 :                                         if (!nui || !nuv) {
    1549             :                                                 res = LOG_ERR;
    1550             :                                         } else {
    1551          54 :                                                 BATiter ovi = bat_iterator(uv);
    1552             : 
    1553             :                                                 /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
    1554          54 :                                                 BUN uip = 0, uie = BATcount(ui);
    1555          54 :                                                 BUN nip = 0, nie = BATcount(tids);
    1556          54 :                                                 oid uiseqb = ui->tseqbase;
    1557          54 :                                                 oid niseqb = tids->tseqbase;
    1558          54 :                                                 oid *uipt = NULL, *nipt = NULL;
    1559          54 :                                                 BATiter uii = bat_iterator(ui);
    1560          54 :                                                 BATiter tidsi = bat_iterator(tids);
    1561          54 :                                                 if (!BATtdensebi(&uii))
    1562          53 :                                                         uipt = uii.base;
    1563          54 :                                                 if (!BATtdensebi(&tidsi))
    1564          53 :                                                         nipt = tidsi.base;
    1565       14211 :                                                 while (uip < uie && nip < nie && res == LOG_OK) {
    1566       14157 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1567       14157 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1568             : 
    1569       14157 :                                                         if (uiv < niv) {
    1570        7916 :                                                                 upd = BUNtail(ovi, uip);
    1571       15832 :                                                                 if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1572        7916 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1573             :                                                                         res = LOG_ERR;
    1574        7916 :                                                                 uip++;
    1575        6241 :                                                         } else if (uiv == niv) {
    1576             :                                                                 /* handle == */
    1577          18 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1578          18 :                                                                         upd = BUNtail(upi, nip);
    1579          36 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1580          18 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1581             :                                                                                 res = LOG_ERR;
    1582             :                                                                 } else {
    1583           0 :                                                                         upd = BUNtail(ovi, uip);
    1584           0 :                                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1585           0 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1586             :                                                                                 res = LOG_ERR;
    1587             :                                                                 }
    1588          18 :                                                                 uip++;
    1589          18 :                                                                 nip++;
    1590             :                                                         } else { /* uiv > niv */
    1591        6223 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1592        6223 :                                                                         upd = BUNtail(upi, nip);
    1593       12446 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1594        6223 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1595             :                                                                                 res = LOG_ERR;
    1596             :                                                                 }
    1597        6223 :                                                                 nip++;
    1598             :                                                         }
    1599             :                                                 }
    1600         275 :                                                 while (uip < uie && res == LOG_OK) {
    1601         221 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1602         221 :                                                         upd = BUNtail(ovi, uip);
    1603         442 :                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1604         221 :                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1605             :                                                                 res = LOG_ERR;
    1606         221 :                                                         uip++;
    1607             :                                                 }
    1608         383 :                                                 while (nip < nie && res == LOG_OK) {
    1609         329 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1610         329 :                                                         if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1611         329 :                                                                 upd = BUNtail(upi, nip);
    1612         658 :                                                                 if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1613         329 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1614             :                                                                         res = LOG_ERR;
    1615             :                                                         }
    1616         329 :                                                         nip++;
    1617             :                                                 }
    1618          54 :                                                 bat_iterator_end(&uii);
    1619          54 :                                                 bat_iterator_end(&tidsi);
    1620          54 :                                                 bat_iterator_end(&ovi);
    1621          54 :                                                 if (res == LOG_OK) {
    1622          54 :                                                         temp_destroy(cs->uibid);
    1623          54 :                                                         temp_destroy(cs->uvbid);
    1624          54 :                                                         nui = transfer_to_systrans(nui);
    1625          54 :                                                         nuv = transfer_to_systrans(nuv);
    1626          54 :                                                         if (nui == NULL || nuv == NULL) {
    1627             :                                                                 res = LOG_ERR;
    1628             :                                                         } else {
    1629          54 :                                                                 cs->uibid = temp_create(nui);
    1630          54 :                                                                 cs->uvbid = temp_create(nuv);
    1631          54 :                                                                 cs->ucnt = BATcount(nui);
    1632             :                                                         }
    1633             :                                                 }
    1634             :                                         }
    1635          54 :                                         bat_destroy(nui);
    1636          54 :                                         bat_destroy(nuv);
    1637             :                                 }
    1638             :                         }
    1639             :                 }
    1640        2589 :                 bat_iterator_end(&upi);
    1641        2589 :                 bat_destroy(b);
    1642        2589 :                 unlock_table(tr->store, t->base.id);
    1643        2589 :                 bat_destroy(ins);
    1644        2589 :                 bat_destroy(ui);
    1645        2589 :                 bat_destroy(uv);
    1646        2589 :                 if (otids != tids)
    1647          10 :                         bat_destroy(tids);
    1648        2589 :                 if (oupdates != updates)
    1649          11 :                         bat_destroy(updates);
    1650        2589 :                 return res;
    1651             :         } else if (is_new || cs->cleared) {
    1652         323 :                 BAT *b = temp_descriptor(cs->bid);
    1653             : 
    1654         323 :                 if (b == NULL) {
    1655             :                         res = LOG_ERR;
    1656             :                 } else {
    1657         323 :                         if (BATcount(b)==0) {
    1658           1 :                                 if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
    1659           0 :                                         res = LOG_ERR;
    1660         322 :                         } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
    1661           0 :                                 res = LOG_ERR;
    1662         323 :                         BBPcold(b->batCacheid);
    1663         323 :                         bat_destroy(b);
    1664             :                 }
    1665             :         }
    1666         323 :         unlock_table(tr->store, t->base.id);
    1667         323 :         if (otids != tids)
    1668           2 :                 bat_destroy(tids);
    1669         323 :         if (oupdates != updates)
    1670          41 :                 bat_destroy(updates);
    1671             :         return res;
    1672             : }
    1673             : 
    1674             : static int
    1675        2912 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1676             : {
    1677        2912 :         return cs_update_bat(tr, bat, t, tids, updates, is_new);
    1678             : }
    1679             : 
    1680             : static void *
    1681           4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
    1682             : {
    1683           4 :         void *newoffsets = NULL;
    1684           4 :         sql_delta *bat = *batp;
    1685           4 :         column_storage *cs = &bat->cs;
    1686           4 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1687             : 
    1688           4 :         if (!u)
    1689             :                 return NULL;
    1690           4 :         BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
    1691           4 :         if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
    1692           0 :                 bat_destroy(u);
    1693           0 :                 return NULL;
    1694             :         } else {
    1695           4 :                 int new = 0;
    1696             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1697           4 :                 if (BATcount(u) >= max_cnt) {
    1698           0 :                         if (max_cnt == 64*1024) { /* decompress */
    1699           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1700           0 :                                         bat_destroy(u);
    1701           0 :                                         return NULL;
    1702             :                                 }
    1703           0 :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1704             :                                 /* TODO decompress updates if any */
    1705           0 :                                 bat_destroy(b);
    1706           0 :                                 assert(newoffsets == NULL);
    1707           0 :                                 if (!n) {
    1708           0 :                                         bat_destroy(u);
    1709           0 :                                         return NULL;
    1710             :                                 }
    1711           0 :                                 if (cs->ts != tr->tid) {
    1712           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1713           0 :                                                 bat_destroy(n);
    1714           0 :                                                 bat_destroy(u);
    1715           0 :                                                 return NULL;
    1716             :                                         }
    1717           0 :                                         cs = &(*batp)->cs;
    1718           0 :                                         new = 1;
    1719           0 :                                         cs->uibid = cs->uvbid = 0;
    1720             :                                 }
    1721           0 :                                 if (cs->bid && !new)
    1722           0 :                                         temp_destroy(cs->bid);
    1723           0 :                                 n = transfer_to_systrans(n);
    1724           0 :                                 if (n == NULL) {
    1725           0 :                                         bat_destroy(u);
    1726           0 :                                         return NULL;
    1727             :                                 }
    1728           0 :                                 bat_set_access(n, BAT_READ);
    1729           0 :                                 cs->bid = temp_create(n);
    1730           0 :                                 bat_destroy(n);
    1731           0 :                                 if (cs->ebid && !new)
    1732           0 :                                         temp_destroy(cs->ebid);
    1733           0 :                                 cs->ebid = 0;
    1734           0 :                                 cs->st = ST_DEFAULT;
    1735             :                                 /* at append_col the column's storage type is cleared */
    1736           0 :                                 cs->cleared = true;
    1737             :                         } else {
    1738           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1739           0 :                                         GDKfree(newoffsets);
    1740           0 :                                         bat_destroy(u);
    1741           0 :                                         return NULL;
    1742             :                                 }
    1743           0 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, PERSISTENT);
    1744           0 :                                 bat_destroy(b);
    1745           0 :                                 if (!n) {
    1746           0 :                                         GDKfree(newoffsets);
    1747           0 :                                         bat_destroy(u);
    1748           0 :                                         return NULL;
    1749             :                                 }
    1750           0 :                                 if (cs->ts != tr->tid) {
    1751           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1752           0 :                                                 bat_destroy(u);
    1753           0 :                                                 bat_destroy(n);
    1754           0 :                                                 return NULL;
    1755             :                                         }
    1756           0 :                                         cs = &(*batp)->cs;
    1757           0 :                                         new = 1;
    1758           0 :                                         temp_dup(cs->ebid);
    1759           0 :                                         if (cs->uibid) {
    1760           0 :                                                 temp_dup(cs->uibid);
    1761           0 :                                                 temp_dup(cs->uvbid);
    1762             :                                         }
    1763             :                                 }
    1764           0 :                                 if (cs->bid)
    1765           0 :                                         temp_destroy(cs->bid);
    1766           0 :                                 n = transfer_to_systrans(n);
    1767           0 :                                 if (n == NULL) {
    1768           0 :                                         bat_destroy(u);
    1769           0 :                                         return NULL;
    1770             :                                 }
    1771           0 :                                 bat_set_access(n, BAT_READ);
    1772           0 :                                 cs->bid = temp_create(n);
    1773           0 :                                 bat_destroy(n);
    1774           0 :                                 cs->cleared = true;
    1775           0 :                                 i = newoffsets;
    1776             :                         }
    1777             :                 } else { /* append */
    1778           4 :                         i = newoffsets;
    1779             :                 }
    1780             :         }
    1781           4 :         bat_destroy(u);
    1782           4 :         return i;
    1783             : }
    1784             : 
    1785             : static void *
    1786           1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
    1787             : {
    1788           1 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1789           1 :         void *newoffsets = NULL;
    1790           1 :         BAT *b = NULL, *n = NULL;
    1791             : 
    1792           1 :         if (!(b = temp_descriptor(cs->bid)))
    1793             :                 return NULL;
    1794             : 
    1795           1 :         if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
    1796           0 :                 bat_destroy(b);
    1797           0 :                 return NULL;
    1798             :         } else {
    1799             :                 /* returns new offset bat if values within min/max, else decompress */
    1800           1 :                 if (!newoffsets) {
    1801           1 :                         n = FORdecompress_(b, offsetval, tt, PERSISTENT);
    1802           1 :                         bat_destroy(b);
    1803           1 :                         if (!n)
    1804             :                                 return NULL;
    1805             :                         /* TODO decompress updates if any */
    1806           1 :                         if (cs->bid)
    1807           1 :                                 temp_destroy(cs->bid);
    1808           1 :                         n = transfer_to_systrans(n);
    1809           1 :                         if (n == NULL)
    1810             :                                 return NULL;
    1811           1 :                         bat_set_access(n, BAT_READ);
    1812           1 :                         cs->bid = temp_create(n);
    1813           1 :                         cs->st = ST_DEFAULT;
    1814             :                         /* at append_col the column's storage type is cleared */
    1815           1 :                         cs->cleared = true;
    1816           1 :                         b = n;
    1817             :                 } else { /* append */
    1818             :                         i = newoffsets;
    1819             :                 }
    1820             :         }
    1821           1 :         bat_destroy(b);
    1822           1 :         return i;
    1823             : }
    1824             : 
    1825             : static int
    1826        5116 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
    1827             : {
    1828        5116 :         void *oupd = upd;
    1829        5116 :         sql_delta *bat = *batp;
    1830        5116 :         column_storage *cs = &bat->cs;
    1831        5116 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1832        5116 :         assert(!is_oid_nil(rid));
    1833        5116 :         int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
    1834             : 
    1835        5116 :         if (cs->st == ST_DICT) {
    1836             :                 /* possibly a new array is returned */
    1837           0 :                 upd = dict_append_val(tr, batp, upd, 1);
    1838           0 :                 bat = *batp;
    1839           0 :                 cs = &bat->cs;
    1840           0 :                 if (!upd)
    1841             :                         return LOG_ERR;
    1842             :         }
    1843             : 
    1844             :         /* check if rid is insert ? */
    1845        5116 :         if (!inplace) {
    1846             :                 /* check conflict */
    1847        2242 :                 if (segments_is_deleted(s->segs->h, tr, rid)) {
    1848           0 :                         if (oupd != upd)
    1849           0 :                                 GDKfree(upd);
    1850           0 :                         return LOG_CONFLICT;
    1851             :                 }
    1852        2242 :                 BAT *ui, *uv;
    1853             : 
    1854             :                 /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1855             :                 /* currently only one update delta is possible */
    1856        2242 :                 if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1857           0 :                         if (oupd != upd)
    1858           0 :                                 GDKfree(upd);
    1859           0 :                         return LOG_ERR;
    1860             :                 }
    1861             : 
    1862        2242 :                 assert(uv->ttype);
    1863        2242 :                 assert(BATcount(ui) == BATcount(uv));
    1864        4484 :                 if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
    1865        2242 :                     BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
    1866           0 :                         if (oupd != upd)
    1867           0 :                                 GDKfree(upd);
    1868           0 :                         bat_destroy(ui);
    1869           0 :                         bat_destroy(uv);
    1870           0 :                         return LOG_ERR;
    1871             :                 }
    1872        2242 :                 assert(BATcount(ui) == BATcount(uv));
    1873        2242 :                 bat_destroy(ui);
    1874        2242 :                 bat_destroy(uv);
    1875        2242 :                 cs->ucnt++;
    1876             :         } else {
    1877        2874 :                 BAT *b = NULL;
    1878             : 
    1879        2874 :                 if((b = temp_descriptor(cs->bid)) == NULL) {
    1880           0 :                         if (oupd != upd)
    1881           0 :                                 GDKfree(upd);
    1882           0 :                         return LOG_ERR;
    1883             :                 }
    1884        2874 :                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
    1885           0 :                         if (oupd != upd)
    1886           0 :                                 GDKfree(upd);
    1887           0 :                         bat_destroy(b);
    1888           0 :                         return LOG_ERR;
    1889             :                 }
    1890        2874 :                 bat_destroy(b);
    1891             :         }
    1892        5116 :         if (oupd != upd)
    1893           0 :                 GDKfree(upd);
    1894             :         return LOG_OK;
    1895             : }
    1896             : 
    1897             : static int
    1898        5116 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
    1899             : {
    1900        5116 :         int res = LOG_OK;
    1901        5116 :         lock_table(tr->store, t->base.id);
    1902        5116 :         res = cs_update_val(tr, bat, t, rid, upd, is_new);
    1903        5116 :         unlock_table(tr->store, t->base.id);
    1904        5116 :         return res;
    1905             : }
    1906             : 
    1907             : static int
    1908      158850 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
    1909             : {
    1910      158850 :         (void)tr;
    1911      158850 :         if (!ocs)
    1912             :                 return LOG_OK;
    1913      158850 :         cs->bid = ocs->bid;
    1914      158850 :         cs->ebid = ocs->ebid;
    1915      158850 :         cs->uibid = ocs->uibid;
    1916      158850 :         cs->uvbid = ocs->uvbid;
    1917      158850 :         cs->ucnt = ocs->ucnt;
    1918             : 
    1919      158850 :         if (temp) {
    1920       25979 :                 cs->bid = temp_copy(cs->bid, true, false);
    1921       25967 :                 if (cs->bid == BID_NIL)
    1922             :                         return LOG_ERR;
    1923             :         } else {
    1924      132871 :                 temp_dup(cs->bid);
    1925             :         }
    1926      158854 :         if (cs->ebid)
    1927           6 :                 temp_dup(cs->ebid);
    1928      158854 :         cs->ucnt = 0;
    1929      158854 :         cs->uibid = e_bat(TYPE_oid);
    1930      158875 :         cs->uvbid = e_bat(type);
    1931      158874 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1932             :                 return LOG_ERR;
    1933      158874 :         cs->st = ocs->st;
    1934      158874 :         return LOG_OK;
    1935             : }
    1936             : 
    1937             : static void
    1938      311472 : destroy_delta(sql_delta *b, bool recursive)
    1939             : {
    1940      311472 :         if (ATOMIC_DEC(&b->cs.refcnt) > 0)
    1941             :                 return;
    1942      292166 :         if (recursive && b->next)
    1943      128426 :                 destroy_delta(b->next, true);
    1944      292166 :         if (b->cs.uibid)
    1945       93992 :                 temp_destroy(b->cs.uibid);
    1946      292166 :         if (b->cs.uvbid)
    1947       93992 :                 temp_destroy(b->cs.uvbid);
    1948      292166 :         if (b->cs.bid)
    1949      292166 :                 temp_destroy(b->cs.bid);
    1950      292166 :         if (b->cs.ebid)
    1951          61 :                 temp_destroy(b->cs.ebid);
    1952      292166 :         b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
    1953      292166 :         _DELETE(b);
    1954             : }
    1955             : 
    1956             : static sql_delta *
    1957    15559847 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
    1958             : {
    1959    15559847 :         sql_delta *obat = ATOMIC_PTR_GET(&c->data);
    1960             : 
    1961    15559847 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    1962    15427010 :                 return obat;
    1963      132837 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    1964             :                 /* abort */
    1965          12 :                 if (update_conflict)
    1966           4 :                         *update_conflict = true;
    1967           8 :                 else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
    1968           8 :                         return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
    1969           4 :                 return NULL;
    1970             :         }
    1971      132825 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
    1972             :                 return NULL;
    1973      132822 :         sql_delta* bat = ZNEW(sql_delta);
    1974      132850 :         if (!bat)
    1975             :                 return NULL;
    1976      132850 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    1977      132850 :         if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
    1978           0 :                 destroy_delta(bat, false);
    1979           0 :                 return NULL;
    1980             :         }
    1981      132863 :         bat->cs.ts = tr->tid;
    1982             :         /* only one writer else abort */
    1983      132863 :         bat->next = obat;
    1984      132863 :         if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
    1985           0 :                 bat->next = NULL;
    1986           0 :                 destroy_delta(bat, false);
    1987           0 :                 if (update_conflict)
    1988           0 :                         *update_conflict = true;
    1989           0 :                 return NULL;
    1990             :         }
    1991             :         return bat;
    1992             : }
    1993             : 
    1994             : static int
    1995        8028 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
    1996             : {
    1997        8028 :         int ok = LOG_OK;
    1998             : 
    1999        8028 :         if (is_bat) {
    2000        2912 :                 BAT *tids = incoming_tids;
    2001        2912 :                 BAT *values = incoming_values;
    2002        2912 :                 if (BATcount(tids) == 0)
    2003             :                         return LOG_OK;
    2004        2912 :                 ok = delta_update_bat(tr, delta, table, tids, values, is_new);
    2005             :         } else {
    2006        5116 :                 ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
    2007             :         }
    2008             :         return ok;
    2009             : }
    2010             : 
    2011             : static int
    2012        8051 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
    2013             : {
    2014        8051 :         int res = LOG_OK;
    2015        8051 :         bool update_conflict = false;
    2016        8051 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2017             : 
    2018        8051 :         if (isbat) {
    2019        2932 :                 BAT *t = tids;
    2020        2932 :                 if (!BATcount(t))
    2021             :                         return LOG_OK;
    2022             :         }
    2023             : 
    2024        7753 :         if (c == NULL)
    2025             :                 return LOG_ERR;
    2026             : 
    2027        7753 :         if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
    2028           4 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2029             : 
    2030        7749 :         assert(delta && delta->cs.ts == tr->tid);
    2031        7749 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    2032        7749 :         if (odelta != delta)
    2033        3384 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    2034             : 
    2035        7749 :         odelta = delta;
    2036        7749 :         if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
    2037             :                 return res;
    2038        7749 :         assert(delta == odelta);
    2039        7749 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2040           0 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2041             :         return res;
    2042             : }
    2043             : 
    2044             : static sql_delta *
    2045        2457 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
    2046             : {
    2047        2457 :         sql_delta *obat = ATOMIC_PTR_GET(&i->data);
    2048             : 
    2049        2457 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    2050        2429 :                 return obat;
    2051          28 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    2052             :                 /* abort */
    2053           0 :                 if (update_conflict)
    2054           0 :                         *update_conflict = true;
    2055           0 :                 return NULL;
    2056             :         }
    2057          28 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
    2058             :                 return NULL;
    2059          28 :         sql_delta* bat = ZNEW(sql_delta);
    2060          28 :         if (!bat)
    2061             :                 return NULL;
    2062          28 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2063          33 :         if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
    2064           0 :                 destroy_delta(bat, false);
    2065           0 :                 return NULL;
    2066             :         }
    2067          28 :         bat->cs.ts = tr->tid;
    2068             :         /* only one writer else abort */
    2069          28 :         bat->next = obat;
    2070          28 :         if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
    2071           0 :                 bat->next = NULL;
    2072           0 :                 destroy_delta(bat, false);
    2073           0 :                 if (update_conflict)
    2074           0 :                         *update_conflict = true;
    2075           0 :                 return NULL;
    2076             :         }
    2077             :         return bat;
    2078             : }
    2079             : 
    2080             : static int
    2081         782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
    2082             : {
    2083         782 :         int res = LOG_OK;
    2084         782 :         bool update_conflict = false;
    2085         782 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    2086             : 
    2087         782 :         if (isbat) {
    2088         782 :                 BAT *t = tids;
    2089         782 :                 if (!BATcount(t))
    2090             :                         return LOG_OK;
    2091             :         }
    2092             : 
    2093         279 :         if (i == NULL)
    2094             :                 return LOG_ERR;
    2095             : 
    2096         279 :         if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
    2097           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2098             : 
    2099         279 :         assert(delta && delta->cs.ts == tr->tid);
    2100         279 :         if (odelta != delta)
    2101          22 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    2102             : 
    2103         279 :         odelta = delta;
    2104         279 :         res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
    2105         279 :         assert(delta == odelta);
    2106             :         return res;
    2107             : }
    2108             : 
    2109             : static int
    2110      149305 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
    2111             : {
    2112      149305 :         BAT *b, *oi = i;
    2113      149305 :         int err = 0;
    2114      149305 :         sql_delta *bat = *batp;
    2115             : 
    2116      149305 :         assert(!offsets || BATcount(offsets) == BATcount(i));
    2117      149305 :         if (!BATcount(i))
    2118             :                 return LOG_OK;
    2119      149305 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
    2120             :                 return LOG_ERR;
    2121             : 
    2122      149305 :         lock_column(tr->store, id);
    2123      149994 :         if (bat->cs.st == ST_DICT) {
    2124          13 :                 BAT *ni = dict_append_bat(tr, batp, oi);
    2125          13 :                 bat = *batp;
    2126          13 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2127           0 :                         bat_destroy(oi);
    2128          13 :                 oi = ni;
    2129          13 :                 if (!oi) {
    2130           0 :                         unlock_column(tr->store, id);
    2131           0 :                         return LOG_ERR;
    2132             :                 }
    2133             :         }
    2134      149994 :         if (bat->cs.st == ST_FOR) {
    2135           0 :                 BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
    2136           0 :                 bat = *batp;
    2137           0 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2138           0 :                         bat_destroy(oi);
    2139           0 :                 oi = ni;
    2140           0 :                 if (!oi) {
    2141           0 :                         unlock_column(tr->store, id);
    2142           0 :                         return LOG_ERR;
    2143             :                 }
    2144             :         }
    2145             : 
    2146      149994 :         b = temp_descriptor(bat->cs.bid);
    2147      149543 :         if (b == NULL) {
    2148           0 :                 unlock_column(tr->store, id);
    2149           0 :                 if (oi != i)
    2150           0 :                         bat_destroy(oi);
    2151           0 :                 return LOG_ERR;
    2152             :         }
    2153      149543 :         if (!offsets && offset == b->hseqbase+BATcount(b)) {
    2154      149355 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2155          48 :                         err = 1;
    2156         176 :         } else if (!offsets) {
    2157         176 :                 if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
    2158          48 :                         err = 1;
    2159          12 :         } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
    2160           0 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2161          48 :                         err = 1;
    2162          12 :         } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
    2163          48 :                         err = 1;
    2164             :         }
    2165      149723 :         bat_destroy(b);
    2166      150379 :         unlock_column(tr->store, id);
    2167             : 
    2168      150261 :         if (oi != i)
    2169          13 :                 bat_destroy(oi);
    2170      150261 :         return (err)?LOG_ERR:LOG_OK;
    2171             : }
    2172             : 
    2173             : // Look at the offsets and find where the replacements end and the appends begin.
    2174             : static BUN
    2175           0 : start_of_appends(BAT *offsets, BUN bcnt)
    2176             : {
    2177           0 :         BUN ocnt = BATcount(offsets);
    2178           0 :         if (ocnt == 0)
    2179             :                 return 0;
    2180             : 
    2181           0 :         BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
    2182           0 :         if (highest < bcnt)
    2183             :                 // all are replacements
    2184             :                 return ocnt;
    2185             : 
    2186             :         // reason backward to find the first append.
    2187             :         // Suppose offsets has 15 entries, bcnt == 100
    2188             :         // and the highest offset in offsets is 109.
    2189           0 :         BUN new_bcnt = highest + 1; // 110
    2190           0 :         BUN nappends = new_bcnt - bcnt; // 10
    2191           0 :         BUN nreplacements = ocnt - nappends; // 5
    2192             : 
    2193             :         // The first append should be to position bcnt
    2194           0 :         assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
    2195             : 
    2196             :         return nreplacements;
    2197             : }
    2198             : 
    2199             : 
    2200             : static int
    2201    15271713 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
    2202             : {
    2203    15271713 :         void *oi = i;
    2204    15271713 :         BAT *b;
    2205    15271713 :         lock_column(tr->store, id);
    2206    15282025 :         sql_delta *bat = *batp;
    2207             : 
    2208    15282025 :         if (bat->cs.st == ST_DICT) {
    2209             :                 /* possibly a new array is returned */
    2210           4 :                 i = dict_append_val(tr, batp, i, cnt);
    2211           4 :                 bat = *batp;
    2212           4 :                 if (!i) {
    2213           0 :                         unlock_column(tr->store, id);
    2214           0 :                         return LOG_ERR;
    2215             :                 }
    2216             :         }
    2217    15282025 :         if (bat->cs.st == ST_FOR) {
    2218             :                 /* possibly a new array is returned */
    2219           1 :                 i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
    2220           1 :                 bat = *batp;
    2221           1 :                 if (!i) {
    2222           0 :                         unlock_column(tr->store, id);
    2223           0 :                         return LOG_ERR;
    2224             :                 }
    2225             :         }
    2226             : 
    2227    15282025 :         b = temp_descriptor(bat->cs.bid);
    2228    15282381 :         if (b == NULL) {
    2229           0 :                 if (i != oi)
    2230           0 :                         GDKfree(i);
    2231           0 :                 unlock_column(tr->store, id);
    2232           0 :                 return LOG_ERR;
    2233             :         }
    2234    15282381 :         BUN bcnt = BATcount(b);
    2235             : 
    2236    15282381 :         if (offsets) {
    2237             :                 // The first few might be replacements while later items might be appends.
    2238             :                 // Handle the replacements here while leaving the appends to the code below.
    2239           0 :                 BUN nreplacements = start_of_appends(offsets, bcnt);
    2240             : 
    2241           0 :                 oid *start = Tloc(offsets, 0);
    2242           0 :                 if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
    2243           0 :                         bat_destroy(b);
    2244           0 :                         if (i != oi)
    2245           0 :                                 GDKfree(i);
    2246           0 :                         unlock_column(tr->store, id);
    2247           0 :                         return LOG_ERR;
    2248             :                 }
    2249             : 
    2250             :                 // Replacements have been handled. The rest are appends.
    2251           0 :                 assert(offset == oid_nil);
    2252           0 :                 offset = bcnt;
    2253           0 :                 cnt -= nreplacements;
    2254             :         }
    2255             : 
    2256    15282381 :         if (bcnt > offset){
    2257      481336 :                 size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
    2258      481336 :                 if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
    2259           0 :                         bat_destroy(b);
    2260           0 :                         if (i != oi)
    2261           0 :                                 GDKfree(i);
    2262           0 :                         unlock_column(tr->store, id);
    2263           0 :                         return LOG_ERR;
    2264             :                 }
    2265      481501 :                 cnt -= ccnt;
    2266      481501 :                 offset += ccnt;
    2267             :         }
    2268    15282546 :         if (cnt) {
    2269    14801031 :                 if (BATcount(b) < offset) { /* add space */
    2270        8464 :                         BUN d = offset - BATcount(b);
    2271        8464 :                         if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
    2272           0 :                                 bat_destroy(b);
    2273           0 :                                 if (i != oi)
    2274           0 :                                         GDKfree(i);
    2275           0 :                                 unlock_column(tr->store, id);
    2276           0 :                                 return LOG_ERR;
    2277             :                         }
    2278             :                 }
    2279    14801031 :                 if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
    2280           0 :                         bat_destroy(b);
    2281           0 :                         if (i != oi)
    2282           0 :                                 GDKfree(i);
    2283           0 :                         unlock_column(tr->store, id);
    2284           0 :                         return LOG_ERR;
    2285             :                 }
    2286             :         }
    2287    15283939 :         bat_destroy(b);
    2288    15277769 :         if (i != oi)
    2289           4 :                 GDKfree(i);
    2290    15277769 :         unlock_column(tr->store, id);
    2291    15277769 :         return LOG_OK;
    2292             : }
    2293             : 
    2294             : static int
    2295       25980 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
    2296             : {
    2297       25980 :         if (!(bat->segs = new_segments(tr, 0)))
    2298             :                 return LOG_ERR;
    2299       25980 :         return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
    2300             : }
    2301             : 
    2302             : static int
    2303    15421139 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type)
    2304             : {
    2305    15421139 :         int ok = LOG_OK;
    2306             : 
    2307    15421139 :         if ((*delta)->cs.merged)
    2308       36393 :                 (*delta)->cs.merged = false; /* TODO needs to move */
    2309    15421139 :         if (isbat) {
    2310      149462 :                 BAT *bat = incoming_data;
    2311             : 
    2312      149462 :                 if (BATcount(bat))
    2313      149562 :                         ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
    2314             :         } else {
    2315    15271677 :                 ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
    2316             :         }
    2317    15435879 :         return ok;
    2318             : }
    2319             : 
    2320             : static int
    2321    15428172 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2322             : {
    2323    15428172 :         int res = LOG_OK;
    2324    15428172 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2325             : 
    2326    15428172 :         if (isbat) {
    2327      149595 :                 BAT *t = data;
    2328      149595 :                 if (!BATcount(t))
    2329             :                         return LOG_OK;
    2330             :         }
    2331             : 
    2332    15427392 :         if ((delta = bind_col_data(tr, c, NULL)) == NULL)
    2333             :                 return LOG_ERR;
    2334             : 
    2335    15422307 :         assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
    2336             : 
    2337    15422307 :         odelta = delta;
    2338    15422307 :         if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type)) != LOG_OK)
    2339             :                 return res;
    2340    15432742 :         if (odelta != delta) {
    2341           0 :                 delta->next = odelta;
    2342           0 :                 if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
    2343           0 :                         delta->next = NULL;
    2344           0 :                         destroy_delta(delta, false);
    2345           0 :                         return LOG_CONFLICT;
    2346             :                 }
    2347             :         }
    2348    15432742 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2349           1 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2350             :         return res;
    2351             : }
    2352             : 
    2353             : static int
    2354        2184 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2355             : {
    2356        2184 :         int res = LOG_OK;
    2357        2184 :         sql_delta *delta;
    2358             : 
    2359        2184 :         if (isbat) {
    2360        1010 :                 BAT *t = data;
    2361        1010 :                 if (!BATcount(t))
    2362             :                         return LOG_OK;
    2363             :         }
    2364             : 
    2365        2172 :         if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
    2366             :                 return LOG_ERR;
    2367             : 
    2368        2172 :         assert(delta->cs.st == ST_DEFAULT);
    2369             : 
    2370        2172 :         res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL);
    2371        2172 :         return res;
    2372             : }
    2373             : 
    2374             : static int
    2375       71037 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
    2376             : {
    2377       71037 :         int err = 0;
    2378             : 
    2379             :         /* TODO check for conflicting updates */
    2380       71037 :         (void)rid;
    2381       71037 :         (void)cnt;
    2382      536037 :         for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
    2383      465000 :                 sql_column *c = n->data;
    2384      465000 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    2385             : 
    2386             :                 /* check for active updates */
    2387      465000 :                 if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
    2388             :                         return 1;
    2389             :         }
    2390             :         return 0;
    2391             : }
    2392             : 
    2393             : static int
    2394       67571 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
    2395             : {
    2396       67571 :         int in_transaction = segments_in_transaction(tr, t);
    2397             : 
    2398       67571 :         lock_table(tr->store, t->base.id);
    2399             :         /* find segment of rid, split, mark new segment deleted (for tr->tid) */
    2400       67571 :         segment *seg = s->segs->h, *p = NULL;
    2401    51386312 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2402    51386312 :                 if (seg->start <= rid && seg->end > rid) {
    2403       67571 :                         if (!SEG_VALID_4_DELETE(seg,tr)) {
    2404           4 :                                 unlock_table(tr->store, t->base.id);
    2405           4 :                                 return LOG_CONFLICT;
    2406             :                         }
    2407       67567 :                         if (deletes_conflict_updates( tr, t, rid, 1)) {
    2408           0 :                                 unlock_table(tr->store, t->base.id);
    2409           0 :                                 return LOG_CONFLICT;
    2410             :                         }
    2411       67567 :                         if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
    2412           0 :                                 unlock_table(tr->store, t->base.id);
    2413           0 :                                 return LOG_ERR;
    2414             :                         }
    2415             :                         break;
    2416             :                 }
    2417             :         }
    2418       67567 :         unlock_table(tr->store, t->base.id);
    2419       67567 :         if (!in_transaction)
    2420       12796 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2421             :         return LOG_OK;
    2422             : }
    2423             : 
    2424             : static int
    2425        3468 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
    2426             : {
    2427        3468 :         segment *seg = *Seg, *p = NULL;
    2428       11633 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2429       11631 :                 if (seg->start <= start && seg->end > start) {
    2430        3518 :                         size_t lcnt = cnt;
    2431        3518 :                         if (start+lcnt > seg->end)
    2432          54 :                                 lcnt = seg->end-start;
    2433        3518 :                         if (SEG_IS_DELETED(seg, tr)) {
    2434          47 :                                 start += lcnt;
    2435          47 :                                 cnt -= lcnt;
    2436          47 :                                 continue;
    2437        3471 :                         } else if (!SEG_VALID_4_DELETE(seg, tr))
    2438           1 :                                 return LOG_CONFLICT;
    2439        3470 :                         if (deletes_conflict_updates( tr, t, start, lcnt))
    2440             :                                 return LOG_CONFLICT;
    2441        3470 :                         *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
    2442        3470 :                         if (!seg)
    2443             :                                 return LOG_ERR;
    2444        3470 :                         start += lcnt;
    2445        3470 :                         cnt -= lcnt;
    2446             :                 }
    2447       11583 :                 if (start+cnt <= seg->end)
    2448             :                         break;
    2449             :         }
    2450             :         return LOG_OK;
    2451             : }
    2452             : 
    2453             : static int
    2454         612 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
    2455             : {
    2456         612 :         segment *seg = s->segs->h;
    2457         612 :         return seg_delete_range(tr, t, s, &seg, start, cnt);
    2458             : }
    2459             : 
    2460             : static int
    2461         288 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
    2462             : {
    2463         288 :         int in_transaction = segments_in_transaction(tr, t);
    2464         288 :         BAT *oi = i;    /* update ids */
    2465         288 :         int ok = LOG_OK;
    2466             : 
    2467         288 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
    2468             :                 return LOG_ERR;
    2469         288 :         if (BATcount(i)) {
    2470         515 :                 if (BATtdense(i)) {
    2471         227 :                         size_t start = i->tseqbase;
    2472         227 :                         size_t cnt = BATcount(i);
    2473             : 
    2474         227 :                         lock_table(tr->store, t->base.id);
    2475         227 :                         ok = delete_range(tr, t, s, start, cnt);
    2476         227 :                         unlock_table(tr->store, t->base.id);
    2477          61 :                 } else if (complex_cand(i)) {
    2478           0 :                         struct canditer ci;
    2479           0 :                         oid f = 0, l = 0, cur = 0;
    2480             : 
    2481           0 :                         canditer_init(&ci, NULL, i);
    2482           0 :                         cur = f = canditer_next(&ci);
    2483             : 
    2484           0 :                         lock_table(tr->store, t->base.id);
    2485           0 :                         if (!is_oid_nil(f)) {
    2486           0 :                                 segment *seg = s->segs->h;
    2487           0 :                                 for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
    2488           0 :                                         if (cur+1 == l) {
    2489           0 :                                                 cur++;
    2490           0 :                                                 continue;
    2491             :                                         }
    2492           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2493           0 :                                         f = cur = l;
    2494             :                                 }
    2495           0 :                                 if (ok == LOG_OK)
    2496           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2497             :                         }
    2498           0 :                         unlock_table(tr->store, t->base.id);
    2499             :                 } else {
    2500          61 :                         if (!i->tsorted) {
    2501           0 :                                 assert(oi == i);
    2502           0 :                                 BAT *ni = NULL;
    2503           0 :                                 if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
    2504           0 :                                         ok = LOG_ERR;
    2505           0 :                                 if (ni)
    2506           0 :                                         i = ni;
    2507             :                         }
    2508          61 :                         assert(i->tsorted);
    2509          61 :                         BUN icnt = BATcount(i);
    2510          61 :                         BATiter ii = bat_iterator(i);
    2511          61 :                         oid *o = ii.base, n = o[0]+1;
    2512          61 :                         size_t lcnt = 1;
    2513             : 
    2514          61 :                         lock_table(tr->store, t->base.id);
    2515          61 :                         segment *seg = s->segs->h;
    2516       17005 :                         for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
    2517       16944 :                                 if (o[i] == n) {
    2518       16634 :                                         lcnt++;
    2519       16634 :                                         n++;
    2520             :                                 } else {
    2521         310 :                                         ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2522         310 :                                         lcnt = 0;
    2523             :                                 }
    2524       16944 :                                 if (!lcnt) {
    2525         310 :                                         n = o[i]+1;
    2526         310 :                                         lcnt = 1;
    2527             :                                 }
    2528             :                         }
    2529          61 :                         bat_iterator_end(&ii);
    2530          61 :                         if (lcnt && ok == LOG_OK)
    2531          61 :                                 ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2532          61 :                         unlock_table(tr->store, t->base.id);
    2533             :                 }
    2534             :         }
    2535         288 :         if (i != oi)
    2536          25 :                 bat_destroy(i);
    2537             :         // assert
    2538         288 :         if (!in_transaction)
    2539         256 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2540             :         return ok;
    2541             : }
    2542             : 
    2543             : static void
    2544       51218 : destroy_segments(segments *s)
    2545             : {
    2546       51218 :         if (!s || sql_ref_dec(&s->r) > 0)
    2547           0 :                 return;
    2548       51218 :         segment *seg = s->h;
    2549      110386 :         while(seg) {
    2550       59168 :                 segment *n = ATOMIC_PTR_GET(&seg->next);
    2551       59168 :                 ATOMIC_PTR_DESTROY(&seg->next);
    2552       59168 :                 _DELETE(seg);
    2553       59168 :                 seg = n;
    2554             :         }
    2555       51218 :         _DELETE(s);
    2556             : }
    2557             : 
    2558             : static void
    2559       52615 : destroy_storage(storage *bat)
    2560             : {
    2561       52615 :         if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
    2562             :                 return;
    2563       51076 :         if (bat->next)
    2564        7349 :                 destroy_storage(bat->next);
    2565       51076 :         destroy_segments(bat->segs);
    2566       51076 :         if (bat->cs.uibid)
    2567       30233 :                 temp_destroy(bat->cs.uibid);
    2568       51076 :         if (bat->cs.uvbid)
    2569       30233 :                 temp_destroy(bat->cs.uvbid);
    2570       51076 :         if (bat->cs.bid)
    2571       51076 :                 temp_destroy(bat->cs.bid);
    2572       51076 :         bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
    2573       51076 :         _DELETE(bat);
    2574             : }
    2575             : 
    2576             : static int
    2577      172383 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
    2578             : {
    2579      172383 :         if (uncommitted) {
    2580      437784 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2581      286794 :                         if (!VALID_4_READ(s->ts,tr))
    2582             :                                 return 1;
    2583             :         } else {
    2584      147944 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2585      127827 :                         if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
    2586             :                                 return 1;
    2587             :         }
    2588             : 
    2589             :         return 0;
    2590             : }
    2591             : 
    2592             : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
    2593             : 
    2594             : storage *
    2595     2172328 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
    2596             : {
    2597     2172328 :         storage *obat;
    2598             : 
    2599     2172328 :         obat = ATOMIC_PTR_GET(&t->data);
    2600             : 
    2601     2172328 :         if (obat->cs.ts != tr->tid)
    2602     1498594 :                 if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
    2603     1498539 :                         if (obat->cs.ts >= TRANSACTION_ID_BASE) {
    2604             :                                 /* abort */
    2605       15413 :                                 if (clear)
    2606       15413 :                                         *clear = true;
    2607       15413 :                                 return NULL;
    2608             :                         }
    2609             : 
    2610     2156915 :         if (!clear)
    2611             :                 return obat;
    2612             : 
    2613             :         /* remainder is only to handle clear */
    2614       26358 :         if (segments_conflict(tr, obat->segs, 1)) {
    2615         379 :                 *clear = true;
    2616         379 :                 return NULL;
    2617             :         }
    2618       25979 :         if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
    2619             :                 return NULL;
    2620       25979 :         storage *bat = ZNEW(storage);
    2621       25981 :         if (!bat)
    2622             :                 return NULL;
    2623       25981 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2624       25981 :         if (dup_storage(tr, obat, bat) != LOG_OK) {
    2625           0 :                 destroy_storage(bat);
    2626           0 :                 return NULL;
    2627             :         }
    2628       25983 :         bat->cs.cleared = true;
    2629       25983 :         bat->cs.ts = tr->tid;
    2630             :         /* only one writer else abort */
    2631       25983 :         bat->next = obat;
    2632       25983 :         if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
    2633          15 :                 bat->next = NULL;
    2634          15 :                 destroy_storage(bat);
    2635          15 :                 if (clear)
    2636          15 :                         *clear = true;
    2637          15 :                 return NULL;
    2638             :         }
    2639             :         return bat;
    2640             : }
    2641             : 
    2642             : static int
    2643       67904 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
    2644             : {
    2645       67904 :         int ok = LOG_OK;
    2646       67904 :         BAT *b = ib;
    2647       67904 :         storage *bat;
    2648             : 
    2649       67904 :         if (isbat && !BATcount(b))
    2650             :                 return ok;
    2651             : 
    2652       67859 :         if (t == NULL)
    2653             :                 return LOG_ERR;
    2654             : 
    2655       67859 :         if ((bat = bind_del_data(tr, t, NULL)) == NULL)
    2656             :                 return LOG_ERR;
    2657             : 
    2658       67859 :         if (isbat)
    2659         288 :                 ok = storage_delete_bat(tr, t, bat, ib);
    2660             :         else
    2661       67571 :                 ok = storage_delete_val(tr, t, bat, *(oid*)ib);
    2662             :         return ok;
    2663             : }
    2664             : 
    2665             : static size_t
    2666           0 : dcount_col(sql_trans *tr, sql_column *c)
    2667             : {
    2668           0 :         sql_delta *b;
    2669             : 
    2670           0 :         if (!isTable(c->t))
    2671             :                 return 0;
    2672           0 :         b = col_timestamp_delta(tr, c);
    2673           0 :         if (!b)
    2674             :                 return 1;
    2675             : 
    2676           0 :         storage *s = ATOMIC_PTR_GET(&c->t->data);
    2677           0 :         if (!s || !s->segs->t)
    2678             :                 return 1;
    2679           0 :         size_t cnt = s->segs->t->end;
    2680           0 :         if (cnt) {
    2681           0 :                 BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
    2682           0 :                 size_t dcnt = 0;
    2683             : 
    2684           0 :                 if (v)
    2685           0 :                         dcnt = BATguess_uniques(v, NULL);
    2686           0 :                 return dcnt;
    2687             :         }
    2688             :         return cnt;
    2689             : }
    2690             : 
    2691             : static BAT *
    2692     3719334 : bind_no_view(BAT *b, bool quick)
    2693             : {
    2694     3719334 :         if (VIEWtparent(b)) { /* If it is a view get the parent BAT */
    2695     3715718 :                 BAT *nb = BBP_desc(VIEWtparent(b));
    2696     3715718 :                 bat_destroy(b);
    2697     3715514 :                 if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
    2698             :                         return NULL;
    2699             :         }
    2700             :         return b;
    2701             : }
    2702             : 
    2703             : static int
    2704           0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
    2705             : {
    2706           0 :         int ok = 0;
    2707           0 :         assert(tr->active);
    2708           0 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2709           0 :                 return 0;
    2710           0 :         lock_column(tr->store, c->base.id);
    2711           0 :         if (unique_est) {
    2712           0 :                 sql_delta *d;
    2713           0 :                 if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
    2714           0 :                         BAT *b;
    2715           0 :                         if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
    2716           0 :                                 MT_lock_set(&b->theaplock);
    2717           0 :                                 b->tunique_est = *unique_est;
    2718           0 :                                 MT_lock_unset(&b->theaplock);
    2719           0 :                                 bat_destroy(b);
    2720             :                         }
    2721             :                 }
    2722             :         }
    2723           0 :         if (min) {
    2724           0 :                 _DELETE(c->min);
    2725           0 :                 size_t minlen = ATOMlen(c->type.type->localtype, min);
    2726           0 :                 if ((c->min = GDKmalloc(minlen)) != NULL) {
    2727           0 :                         memcpy(c->min, min, minlen);
    2728           0 :                         ok = 1;
    2729             :                 }
    2730             :         }
    2731           0 :         if (max) {
    2732           0 :                 _DELETE(c->max);
    2733           0 :                 size_t maxlen = ATOMlen(c->type.type->localtype, max);
    2734           0 :                 if ((c->max = GDKmalloc(maxlen)) != NULL) {
    2735           0 :                         memcpy(c->max, max, maxlen);
    2736           0 :                         ok = 1;
    2737             :                 }
    2738             :         }
    2739           0 :         unlock_column(tr->store, c->base.id);
    2740           0 :         return ok;
    2741             : }
    2742             : 
    2743             : static int
    2744          19 : min_max_col(sql_trans *tr, sql_column *c)
    2745             : {
    2746          19 :         int ok = 0;
    2747          19 :         BAT *b = NULL;
    2748          19 :         sql_delta *d = NULL;
    2749             : 
    2750          19 :         assert(tr->active);
    2751          19 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2752           0 :                 return 0;
    2753          19 :         if (c->min && c->max)
    2754             :                 return 1;
    2755          19 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2756          19 :                 if (d->cs.st == ST_FOR)
    2757             :                         return 0;
    2758          19 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2759          19 :                 lock_column(tr->store, c->base.id);
    2760          19 :                 if (c->min && c->max) {
    2761           0 :                         unlock_column(tr->store, c->base.id);
    2762           0 :                         return 1;
    2763             :                 }
    2764          19 :                 _DELETE(c->min);
    2765          19 :                 _DELETE(c->max);
    2766          19 :                 if ((b = bind_col(tr, c, access))) {
    2767          19 :                         if (!(b = bind_no_view(b, false))) {
    2768           0 :                                 unlock_column(tr->store, c->base.id);
    2769           0 :                                 return 0;
    2770             :                         }
    2771          19 :                         BATiter bi = bat_iterator(b);
    2772          19 :                         if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
    2773          16 :                                 const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
    2774          16 :                                 size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
    2775             : 
    2776          16 :                                 if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
    2777           0 :                                         _DELETE(c->min);
    2778           0 :                                         _DELETE(c->max);
    2779             :                                 } else {
    2780          16 :                                         memcpy(c->min, nmin, minlen);
    2781          16 :                                         memcpy(c->max, nmax, maxlen);
    2782          16 :                                         ok = 1;
    2783             :                                 }
    2784             :                         }
    2785          19 :                         bat_iterator_end(&bi);
    2786          19 :                         bat_destroy(b);
    2787             :                 }
    2788          19 :                 unlock_column(tr->store, c->base.id);
    2789             :         }
    2790             :         return ok;
    2791             : }
    2792             : 
    2793             : static size_t
    2794          17 : count_segs(segment *s)
    2795             : {
    2796          17 :         size_t nr = 0;
    2797             : 
    2798          72 :         for( ; s; s = ATOMIC_PTR_GET(&s->next))
    2799          55 :                 nr++;
    2800          17 :         return nr;
    2801             : }
    2802             : 
    2803             : static size_t
    2804          34 : count_del(sql_trans *tr, sql_table *t, int access)
    2805             : {
    2806          34 :         storage *d;
    2807             : 
    2808          34 :         if (!isTable(t))
    2809             :                 return 0;
    2810          34 :         d = tab_timestamp_storage(tr, t);
    2811          34 :         if (!d)
    2812             :                 return 0;
    2813          34 :         if (access == 2)
    2814           0 :                 return d->cs.ucnt;
    2815          34 :         if (access == 1)
    2816           0 :                 return count_inserts(d->segs->h, tr);
    2817          34 :         if (access == 10) /* special case for counting the number of segments */
    2818          17 :                 return count_segs(d->segs->h);
    2819          17 :         return count_deletes(d->segs->h, tr);
    2820             : }
    2821             : 
    2822             : static int
    2823       21496 : sorted_col(sql_trans *tr, sql_column *col)
    2824             : {
    2825       21496 :         int sorted = 0;
    2826             : 
    2827       21496 :         assert(tr->active);
    2828       21496 :         if (!isTable(col->t) || !col->t->s)
    2829             :                 return 0;
    2830             : 
    2831       21496 :         if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
    2832       21477 :                 BAT *b = bind_col(tr, col, QUICK);
    2833             : 
    2834       21477 :                 if (b)
    2835       21477 :                         sorted = b->tsorted || b->trevsorted;
    2836             :         }
    2837             :         return sorted;
    2838             : }
    2839             : 
    2840             : static int
    2841        7814 : unique_col(sql_trans *tr, sql_column *col)
    2842             : {
    2843        7814 :         int distinct = 0;
    2844             : 
    2845        7814 :         assert(tr->active);
    2846        7814 :         if (!isTable(col->t) || !col->t->s)
    2847             :                 return 0;
    2848             : 
    2849        7814 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2850        7814 :                 BAT *b = bind_col(tr, col, QUICK);
    2851             : 
    2852        7814 :                 if (b)
    2853        7814 :                         distinct = b->tkey;
    2854             :         }
    2855             :         return distinct;
    2856             : }
    2857             : 
    2858             : static int
    2859        2048 : double_elim_col(sql_trans *tr, sql_column *col)
    2860             : {
    2861        2048 :         int de = 0;
    2862        2048 :         sql_delta *d;
    2863             : 
    2864        2048 :         assert(tr->active);
    2865        2048 :         if (!isTable(col->t) || !col->t->s)
    2866             :                 return 0;
    2867             : 
    2868        2048 :         if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
    2869           6 :                 if (d->cs.st == ST_DICT) {
    2870           6 :                         BAT *b = bind_col(tr, col, QUICK);
    2871           6 :                         if (b && b->ttype == TYPE_bte)
    2872             :                                 de = 1;
    2873           0 :                         else if (b && b->ttype == TYPE_sht)
    2874        2048 :                                 de = 2;
    2875             :                 }
    2876        2042 :         } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
    2877        2042 :                 BAT *b = bind_col(tr, col, QUICK);
    2878             : 
    2879        2042 :                 if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
    2880        2042 :                         de = GDK_ELIMDOUBLES(b->tvheap);
    2881        2042 :                         if (de)
    2882        1812 :                                 de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
    2883             :                 }
    2884        1812 :                 assert(de >= 0 && de <= 16);
    2885             :         }
    2886             :         return de;
    2887             : }
    2888             : 
    2889             : static int
    2890     3755061 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
    2891             : {
    2892     3755061 :         int ok = 0;
    2893     3755061 :         BAT *b = NULL, *off = NULL, *upv = NULL;
    2894     3755061 :         sql_delta *d = NULL;
    2895             : 
    2896     3755061 :         (void) tr;
    2897     3755061 :         assert(tr->active);
    2898     3755061 :         *nonil = false;
    2899     3755061 :         *unique = false;
    2900     3755061 :         *unique_est = 0.0;
    2901     3755061 :         if (!c || !isTable(c->t) || !c->t->s)
    2902             :                 return ok;
    2903             : 
    2904     3754823 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2905     3716445 :                 if (d->cs.st == ST_FOR) {
    2906          27 :                         *nonil = true; /* TODO for min/max. I will do it later */
    2907          27 :                         return ok;
    2908             :                 }
    2909     3716418 :                 int eclass = c->type.type->eclass;
    2910     3716418 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2911     3716418 :                 if ((b = bind_col(tr, c, access))) {
    2912     3716514 :                         if (!(b = bind_no_view(b, false)))
    2913           0 :                                 return ok;
    2914     3716832 :                         BATiter bi = bat_iterator(b);
    2915     3716785 :                         *nonil = bi.nonil && !bi.nil;
    2916             : 
    2917     3716785 :                         if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
    2918     3428057 :                                 d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
    2919     2238043 :                                 if (c->min && VALinit(min, bi.type, c->min))
    2920             :                                         ok |= 1;
    2921     2237989 :                                 else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
    2922     2229126 :                                         ok |= 1;
    2923     2238022 :                                 if (c->max && VALinit(max, bi.type, c->max))
    2924          54 :                                         ok |= 2;
    2925     2237967 :                                 else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
    2926     2222637 :                                         ok |= 2;
    2927             :                         }
    2928     3716727 :                         if (d->cs.ucnt == 0) {
    2929     3712931 :                                 if (d->cs.st == ST_DEFAULT) {
    2930     3712231 :                                         *unique = bi.key;
    2931     3712231 :                                         *unique_est = bi.unique_est;
    2932     3712231 :                                         if (*unique_est == 0)
    2933     1227475 :                                                 *unique_est = (double)BATguess_uniques(b,NULL);
    2934         700 :                                 } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
    2935             :                                         /* for dict, check the offsets bat for uniqueness */
    2936         700 :                                         MT_lock_set(&off->theaplock);
    2937         700 :                                         *unique = off->tkey;
    2938         700 :                                         *unique_est = off->tunique_est;
    2939         700 :                                         MT_lock_unset(&off->theaplock);
    2940             :                                 }
    2941             :                         }
    2942     3717267 :                         bat_iterator_end(&bi);
    2943     3716658 :                         bat_destroy(b);
    2944     3716563 :                         if (*nonil && d->cs.ucnt > 0) {
    2945             :                                 /* This could use a quick descriptor */
    2946        2312 :                                 if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
    2947           0 :                                         *nonil = false;
    2948             :                                 } else {
    2949        2312 :                                         MT_lock_set(&upv->theaplock);
    2950        2312 :                                         *nonil &= upv->tnonil && !upv->tnil;
    2951        2312 :                                         MT_lock_unset(&upv->theaplock);
    2952        2312 :                                         bat_destroy(upv);
    2953             :                                 }
    2954             :                         }
    2955             :                 }
    2956             :         }
    2957             :         return ok;
    2958             : }
    2959             : 
    2960             : static int
    2961         257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
    2962             : {
    2963         257 :         assert(tr->active);
    2964         257 :         if (!isTable(col->t) || !col->t->s)
    2965             :                 return LOG_OK;
    2966             : 
    2967         252 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2968         252 :                 BAT *b = bind_col(tr, col, QUICK);
    2969             : 
    2970         252 :                 if (b) { /* add props for ranges [min, max> */
    2971         252 :                         MT_lock_set(&b->theaplock);
    2972         252 :                         if (add_range) {
    2973         179 :                                 BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
    2974         179 :                                 if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
    2975         103 :                                         BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
    2976             :                                 else
    2977          76 :                                         BATrmprop_nolock(b, GDK_MAX_BOUND);
    2978         179 :                                 if (!pt->with_nills || !col->null)
    2979         117 :                                         BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    2980             :                         } else {
    2981          73 :                                 BATrmprop_nolock(b, GDK_MIN_BOUND);
    2982          73 :                                 BATrmprop_nolock(b, GDK_MAX_BOUND);
    2983          73 :                                 BATrmprop_nolock(b, GDK_NOT_NULL);
    2984             :                         }
    2985         252 :                         MT_lock_unset(&b->theaplock);
    2986             :                 }
    2987             :         }
    2988             :         return LOG_OK;
    2989             : }
    2990             : 
    2991             : static int
    2992        3975 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
    2993             : {
    2994        3975 :         assert(tr->active);
    2995        3975 :         if (!isTable(col->t) || !col->t->s)
    2996             :                 return LOG_OK;
    2997             : 
    2998        3945 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2999        3945 :                 BAT *b = bind_col(tr, col, QUICK);
    3000             : 
    3001        3945 :                 if (b) { /* add props for ranges [min, max> */
    3002        3945 :                         if (not_null) {
    3003        3943 :                                 BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    3004             :                         } else {
    3005           2 :                                 BATrmprop(b, GDK_NOT_NULL);
    3006             :                         }
    3007             :                 }
    3008             :         }
    3009             :         return LOG_OK;
    3010             : }
    3011             : 
    3012             : static int
    3013       26712 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
    3014             : {
    3015       26712 :         sqlstore *store = tr->store;
    3016       26712 :         int bid = log_find_bat(store->logger, id);
    3017       26712 :         if (bid <= 0)
    3018             :                 return LOG_ERR;
    3019       26712 :         cs->bid = temp_dup(bid);
    3020       26712 :         cs->ucnt = 0;
    3021       26712 :         cs->uibid = e_bat(TYPE_oid);
    3022       26712 :         cs->uvbid = e_bat(type);
    3023       26712 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    3024             :                 return LOG_ERR;
    3025             :         return LOG_OK;
    3026             : }
    3027             : 
    3028             : static int
    3029       68280 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
    3030             : {
    3031       68280 :         int res = LOG_OK;
    3032       68280 :         gdk_return ok;
    3033       68280 :         BAT *b = temp_descriptor(bat->cs.bid);
    3034             : 
    3035       68280 :         if (b == NULL)
    3036             :                 return LOG_ERR;
    3037             : 
    3038       68280 :         if (!bat->cs.uibid)
    3039       68272 :                 bat->cs.uibid = e_bat(TYPE_oid);
    3040       68280 :         if (!bat->cs.uvbid)
    3041       68272 :                 bat->cs.uvbid = e_bat(b->ttype);
    3042       68280 :         if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
    3043           0 :                 res = LOG_ERR;
    3044       68280 :         if (GDKinmemory(0)) {
    3045         177 :                 bat_destroy(b);
    3046         177 :                 return res;
    3047             :         }
    3048             : 
    3049       68103 :         bat_set_access(b, BAT_READ);
    3050       68103 :         sqlstore *store = tr->store;
    3051       68103 :         ok = log_bat_persists(store->logger, b, id);
    3052       68103 :         bat_destroy(b);
    3053       68103 :         if(res != LOG_OK)
    3054             :                 return res;
    3055       68103 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3056             : }
    3057             : 
    3058             : static int
    3059           0 : new_persistent_delta( sql_delta *bat)
    3060             : {
    3061           0 :         bat->cs.ucnt = 0;
    3062           0 :         return LOG_OK;
    3063             : }
    3064             : 
    3065             : static void
    3066      129773 : create_delta( sql_delta *d, BAT *b)
    3067             : {
    3068      129773 :         bat_set_access(b, BAT_READ);
    3069      129773 :         d->cs.bid = temp_create(b);
    3070      129773 :         d->cs.uibid = d->cs.uvbid = 0;
    3071      129773 :         d->cs.ucnt = 0;
    3072      129773 : }
    3073             : 
    3074             : static bat
    3075        7293 : copyBat (bat i, int type, oid seq)
    3076             : {
    3077        7293 :         BAT *b, *tb;
    3078        7293 :         bat res;
    3079             : 
    3080        7293 :         if (!i)
    3081             :                 return i;
    3082        7293 :         tb = quick_descriptor(i);
    3083        7293 :         if (tb == NULL)
    3084             :                 return 0;
    3085        7293 :         b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
    3086        7293 :         if (b == NULL)
    3087             :                 return 0;
    3088             : 
    3089        7293 :         bat_set_access(b, BAT_READ);
    3090             : 
    3091        7293 :         res = temp_create(b);
    3092        7293 :         bat_destroy(b);
    3093        7293 :         return res;
    3094             : }
    3095             : 
    3096             : static int
    3097      150234 : create_col(sql_trans *tr, sql_column *c)
    3098             : {
    3099      150234 :         int ok = LOG_OK, new = 0;
    3100      150234 :         int type = c->type.type->localtype;
    3101      150234 :         sql_delta *bat = ATOMIC_PTR_GET(&c->data);
    3102             : 
    3103      150234 :         if (!bat) {
    3104      150234 :                 new = 1;
    3105      150234 :                 bat = ZNEW(sql_delta);
    3106      150234 :                 if (!bat)
    3107             :                         return LOG_ERR;
    3108      150234 :                 ATOMIC_PTR_SET(&c->data, bat);
    3109      150234 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3110             :         }
    3111             : 
    3112      150234 :         if (new)
    3113      150234 :                 bat->cs.ts = tr->tid;
    3114             : 
    3115      150234 :         if (!isNew(c)&& !isTempTable(c->t)){
    3116       20439 :                 bat->cs.ts = tr->ts;
    3117       20439 :                 ok = load_cs(tr, &bat->cs, type, c->base.id);
    3118       20439 :                 if (ok == LOG_OK && c->storage_type) {
    3119           4 :                         if (strcmp(c->storage_type, "DICT") == 0) {
    3120           2 :                                 sqlstore *store = tr->store;
    3121           2 :                                 int bid = log_find_bat(store->logger, -c->base.id);
    3122           2 :                                 if (bid <= 0)
    3123             :                                         return LOG_ERR;
    3124           2 :                                 bat->cs.ebid = temp_dup(bid);
    3125           2 :                                 bat->cs.st = ST_DICT;
    3126           2 :                         } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
    3127           2 :                                 bat->cs.st = ST_FOR;
    3128             :                         }
    3129             :                 }
    3130       20439 :                 return ok;
    3131      129795 :         } else if (bat && bat->cs.bid) {
    3132           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
    3133             :         } else {
    3134      129795 :                 sql_column *fc = NULL;
    3135      129795 :                 size_t cnt = 0;
    3136             : 
    3137             :                 /* alter ? */
    3138      129795 :                 if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
    3139       79846 :                         storage *s = tab_timestamp_storage(tr, fc->t);
    3140       79846 :                         if (s == NULL)
    3141             :                                 return LOG_ERR;
    3142       79846 :                         cnt = segs_end(s->segs, tr, c->t);
    3143             :                 }
    3144      129795 :                 if (cnt && fc != c) {
    3145          22 :                         sql_delta *d = ATOMIC_PTR_GET(&fc->data);
    3146             : 
    3147          22 :                         if (d->cs.bid) {
    3148          22 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3149          22 :                                 if(bat->cs.bid == BID_NIL)
    3150          22 :                                         ok = LOG_ERR;
    3151             :                         }
    3152          22 :                         if (d->cs.uibid) {
    3153          10 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3154          10 :                                 if (bat->cs.uibid == BID_NIL)
    3155          22 :                                         ok = LOG_ERR;
    3156             :                         }
    3157          22 :                         if (d->cs.uvbid) {
    3158          10 :                                 bat->cs.uvbid = e_bat(type);
    3159          10 :                                 if(bat->cs.uvbid == BID_NIL)
    3160           0 :                                         ok = LOG_ERR;
    3161             :                         }
    3162             :                 } else {
    3163      129773 :                         BAT *b = bat_new(type, c->t->sz, PERSISTENT);
    3164      129773 :                         if (!b) {
    3165             :                                 ok = LOG_ERR;
    3166             :                         } else {
    3167      129773 :                                 create_delta(ATOMIC_PTR_GET(&c->data), b);
    3168      129773 :                                 bat_destroy(b);
    3169             :                         }
    3170             : 
    3171      129773 :                         if (!new) {
    3172           0 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3173           0 :                                 if (bat->cs.uibid == BID_NIL)
    3174           0 :                                         ok = LOG_ERR;
    3175           0 :                                 bat->cs.uvbid = e_bat(type);
    3176           0 :                                 if(bat->cs.uvbid == BID_NIL)
    3177           0 :                                         ok = LOG_ERR;
    3178             :                         }
    3179             :                 }
    3180      129795 :                 bat->cs.ucnt = 0;
    3181             : 
    3182      129795 :                 if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
    3183          91 :                         trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
    3184             :         }
    3185             :         return ok;
    3186             : }
    3187             : 
    3188             : static int
    3189       61912 : log_create_col_(sql_trans *tr, sql_column *c)
    3190             : {
    3191       61912 :         assert(!isTempTable(c->t));
    3192       61912 :         return log_create_delta(tr,  ATOMIC_PTR_GET(&c->data), c->base.id);
    3193             : }
    3194             : 
    3195             : static int
    3196          87 : log_create_col(sql_trans *tr, sql_change *change)
    3197             : {
    3198          87 :         return log_create_col_(tr, (sql_column*)change->obj);
    3199             : }
    3200             : 
    3201             : static int
    3202      117163 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
    3203             : {
    3204      117163 :         (void) t; // TODO transaction_layer_revamp: remove if unnecessary
    3205      117163 :         (void)oldest;
    3206      117163 :         assert(delta->cs.ts == tr->tid);
    3207      117163 :         delta->cs.ts = commit_ts;
    3208             : 
    3209      117163 :         assert(delta->next == NULL);
    3210      117163 :         if (!delta->cs.merged)
    3211      117162 :                 merge_delta(delta);
    3212      117163 :         if (!tr->parent)
    3213      117159 :                 base->new = 0;
    3214      117163 :         return LOG_OK;
    3215             : }
    3216             : 
    3217             : static int
    3218          91 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3219             : {
    3220          91 :         sql_column *c = (sql_column*)change->obj;
    3221          91 :         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3222          91 :         if (!tr->parent)
    3223          90 :                 c->base.new = 0;
    3224          91 :         return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
    3225             : }
    3226             : 
    3227             : /* will be called for new idx's and when new index columns are created */
    3228             : static int
    3229        9260 : create_idx(sql_trans *tr, sql_idx *ni)
    3230             : {
    3231        9260 :         int ok = LOG_OK, new = 0;
    3232        9260 :         sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
    3233        9260 :         int type = TYPE_lng;
    3234             : 
    3235        9260 :         if (oid_index(ni->type))
    3236         954 :                 type = TYPE_oid;
    3237             : 
    3238        9260 :         if (!bat) {
    3239        9260 :                 new = 1;
    3240        9260 :                 bat = ZNEW(sql_delta);
    3241        9260 :                 if (!bat)
    3242             :                         return LOG_ERR;
    3243        9260 :                 ATOMIC_PTR_INIT(&ni->data, bat);
    3244        9260 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3245             :         }
    3246             : 
    3247        9260 :         if (new)
    3248        9260 :                 bat->cs.ts = tr->tid;
    3249             : 
    3250        9260 :         if (!isNew(ni) && !isTempTable(ni->t)){
    3251        1989 :                 bat->cs.ts = 1;
    3252        1989 :                 return load_cs(tr, &bat->cs, type, ni->base.id);
    3253        7271 :         } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
    3254           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
    3255             :         } else {
    3256        7271 :                 sql_column *c = ol_first_node(ni->t->columns)->data;
    3257        7271 :                 sql_delta *d = col_timestamp_delta(tr, c);
    3258             : 
    3259        7271 :                 if (d) {
    3260             :                         /* Here we also handle indices created through alter stmts */
    3261             :                         /* These need to be created aligned to the existing data */
    3262        7271 :                         if (d->cs.bid) {
    3263        7271 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3264        7271 :                                 if(bat->cs.bid == BID_NIL)
    3265        7271 :                                         ok = LOG_ERR;
    3266             :                         }
    3267             :                 } else {
    3268             :                         return LOG_ERR;
    3269             :                 }
    3270             : 
    3271        7271 :                 bat->cs.ucnt = 0;
    3272             : 
    3273        7271 :                 if (!new) {
    3274           0 :                         bat->cs.uibid = e_bat(TYPE_oid);
    3275           0 :                         if (bat->cs.uibid == BID_NIL)
    3276           0 :                                 ok = LOG_ERR;
    3277           0 :                         bat->cs.uvbid = e_bat(type);
    3278           0 :                         if(bat->cs.uvbid == BID_NIL)
    3279           0 :                                 ok = LOG_ERR;
    3280             :                 }
    3281        7271 :                 bat->cs.ucnt = 0;
    3282        7271 :                 if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
    3283         631 :                         trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
    3284             :         }
    3285             :         return ok;
    3286             : }
    3287             : 
    3288             : static int
    3289        6368 : log_create_idx_(sql_trans *tr, sql_idx *i)
    3290             : {
    3291        6368 :         assert(!isTempTable(i->t));
    3292        6368 :         return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3293             : }
    3294             : 
    3295             : static int
    3296         617 : log_create_idx(sql_trans *tr, sql_change *change)
    3297             : {
    3298         617 :         return log_create_idx_(tr, (sql_idx*)change->obj);
    3299             : }
    3300             : 
    3301             : static int
    3302         631 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3303             : {
    3304         631 :         sql_idx *i = (sql_idx*)change->obj;
    3305         631 :         sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3306         631 :         if (!tr->parent)
    3307         631 :                 i->base.new = 0;
    3308         631 :         return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
    3309             :         return LOG_OK;
    3310             : }
    3311             : 
    3312             : static int
    3313        4284 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
    3314             : {
    3315        4284 :         int ok = load_cs(tr, &s->cs, TYPE_msk, id);
    3316        4284 :         BAT *b = NULL, *ib = NULL;
    3317             : 
    3318        4284 :         if (ok != LOG_OK)
    3319             :                 return ok;
    3320        4284 :         if (!(b = temp_descriptor(s->cs.bid)))
    3321             :                 return LOG_ERR;
    3322        4284 :         ib = b;
    3323             : 
    3324        4284 :         if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
    3325           0 :                 bat_destroy(ib);
    3326           0 :                 return LOG_ERR;
    3327             :         }
    3328             : 
    3329        4284 :         if (BATcount(b)) {
    3330         334 :                 if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
    3331           0 :                         bat_destroy(ib);
    3332           0 :                         return LOG_ERR;
    3333             :                 }
    3334         495 :                 if (BATtdense(b)) {
    3335         161 :                         size_t start = b->tseqbase;
    3336         161 :                         size_t cnt = BATcount(b);
    3337         161 :                         ok = delete_range(tr, t, s, start, cnt);
    3338             :                 } else {
    3339         173 :                         assert(b->tsorted);
    3340         173 :                         BUN icnt = BATcount(b);
    3341         173 :                         BATiter bi = bat_iterator(b);
    3342         173 :                         size_t lcnt = 1;
    3343         173 :                         oid n;
    3344         173 :                         segment *seg = s->segs->h;
    3345         173 :                         if (complex_cand(b)) {
    3346           0 :                                 oid o = * (oid *) Tpos(&bi, 0);
    3347           0 :                                 n = o + 1;
    3348           0 :                                 for (BUN i = 1; i < icnt; i++) {
    3349           0 :                                         o = * (oid *) Tpos(&bi, i);
    3350           0 :                                         if (o == n) {
    3351           0 :                                                 lcnt++;
    3352           0 :                                                 n++;
    3353             :                                         } else {
    3354           0 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3355             :                                                         break;
    3356             :                                                 lcnt = 0;
    3357             :                                         }
    3358           0 :                                         if (!lcnt) {
    3359           0 :                                                 n = o + 1;
    3360           0 :                                                 lcnt = 1;
    3361             :                                         }
    3362             :                                 }
    3363             :                         } else {
    3364         173 :                                 oid *o = bi.base;
    3365         173 :                                 n = o[0]+1;
    3366      281583 :                                 for (size_t i=1; i<icnt; i++) {
    3367      281410 :                                         if (o[i] == n) {
    3368      278925 :                                                 lcnt++;
    3369      278925 :                                                 n++;
    3370             :                                         } else {
    3371        2485 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3372             :                                                         break;
    3373             :                                                 lcnt = 0;
    3374             :                                         }
    3375      281410 :                                         if (!lcnt) {
    3376        2485 :                                                 n = o[i]+1;
    3377        2485 :                                                 lcnt = 1;
    3378             :                                         }
    3379             :                                 }
    3380             :                         }
    3381         173 :                         if (lcnt && ok == LOG_OK)
    3382         173 :                                 ok = delete_range(tr, t, s, n-lcnt, lcnt);
    3383         173 :                         bat_iterator_end(&bi);
    3384             :                 }
    3385         334 :                 if (ok == LOG_OK)
    3386        6192 :                         for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
    3387        5858 :                                 if (seg->ts == tr->tid)
    3388        3043 :                                         seg->ts = 1;
    3389             :         } else {
    3390        3950 :                 if (ok == LOG_OK) {
    3391        3950 :                         BAT *bb = quick_descriptor(s->cs.bid);
    3392             : 
    3393        3950 :                         if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
    3394             :                                 ok = LOG_ERR;
    3395             :                         } else {
    3396        3950 :                                 segment *seg = s->segs->h;
    3397        3950 :                                 if (seg->ts == tr->tid)
    3398        3950 :                                         seg->ts = 1;
    3399             :                         }
    3400             :                 }
    3401             :         }
    3402        4284 :         if (b != ib)
    3403        4284 :                 bat_destroy(b);
    3404        4284 :         bat_destroy(ib);
    3405             : 
    3406        4284 :         return ok;
    3407             : }
    3408             : 
    3409             : static int
    3410       25133 : create_del(sql_trans *tr, sql_table *t)
    3411             : {
    3412       25133 :         int ok = LOG_OK, new = 0;
    3413       25133 :         BAT *b;
    3414       25133 :         storage *bat = ATOMIC_PTR_GET(&t->data);
    3415             : 
    3416       25133 :         if (!bat) {
    3417       25133 :                 new = 1;
    3418       25133 :                 bat = ZNEW(storage);
    3419       25133 :                 if(!bat)
    3420             :                         return LOG_ERR;
    3421       25133 :                 ATOMIC_PTR_INIT(&t->data, bat);
    3422       25133 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3423       25133 :                 bat->cs.ts = tr->tid;
    3424             :         }
    3425             : 
    3426       25133 :         if (!isNew(t) && !isTempTable(t)) {
    3427        4284 :                 bat->cs.ts = tr->ts;
    3428        4284 :                 return load_storage(tr, t, bat, t->base.id);
    3429       20849 :         } else if (bat->cs.bid) {
    3430             :                 return ok;
    3431             :         } else {
    3432       20849 :                 assert(!bat->segs);
    3433       20849 :                 if (!(bat->segs = new_segments(tr, 0)))
    3434             :                         return LOG_ERR;
    3435             : 
    3436       20849 :                 b = bat_new(TYPE_msk, t->sz, PERSISTENT);
    3437       20849 :                 if(b != NULL) {
    3438       20849 :                         bat_set_access(b, BAT_READ);
    3439       20849 :                         bat->cs.bid = temp_create(b);
    3440       20849 :                         bat_destroy(b);
    3441             :                 } else {
    3442             :                         return LOG_ERR;
    3443             :                 }
    3444       20849 :                 if (new)
    3445       27375 :                         trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
    3446             :         }
    3447             :         return ok;
    3448             : }
    3449             : 
    3450             : static int
    3451      189969 : log_segment(sql_trans *tr, segment *s, sqlid id)
    3452             : {
    3453      189969 :         sqlstore *store = tr->store;
    3454      189969 :         msk m = s->deleted;
    3455      189969 :         return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
    3456             : }
    3457             : 
    3458             : static int
    3459       97074 : log_segments(sql_trans *tr, segments *segs, sqlid id)
    3460             : {
    3461             :         /* log segments */
    3462       97074 :         lock_table(tr->store, id);
    3463      479967 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    3464      382893 :                 unlock_table(tr->store, id);
    3465      382893 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    3466      143443 :                         if (log_segment(tr, seg, id) != LOG_OK) {
    3467             :                                 return LOG_ERR;
    3468             :                         }
    3469             :                 }
    3470      382893 :                 lock_table(tr->store, id);
    3471             :         }
    3472       97074 :         unlock_table(tr->store, id);
    3473       97074 :         return LOG_OK;
    3474             : }
    3475             : 
    3476             : static int
    3477       12559 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
    3478             : {
    3479       12559 :         BAT *b;
    3480       12559 :         int ok = LOG_OK;
    3481             : 
    3482       12559 :         if (GDKinmemory(0))
    3483             :                 return LOG_OK;
    3484             : 
    3485       12526 :         b = temp_descriptor(bat->cs.bid);
    3486       12526 :         if (b == NULL)
    3487             :                 return LOG_ERR;
    3488             : 
    3489       12526 :         sqlstore *store = tr->store;
    3490       12526 :         bat_set_access(b, BAT_READ);
    3491       12526 :         if (ok == LOG_OK)
    3492       12526 :                 ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
    3493       12526 :         if (ok == LOG_OK)
    3494       12526 :                 ok = log_segments(tr, bat->segs, t->base.id);
    3495       12526 :         bat_destroy(b);
    3496       12526 :         return ok;
    3497             : }
    3498             : 
    3499             : static int
    3500       12573 : log_create_del(sql_trans *tr, sql_change *change)
    3501             : {
    3502       12573 :         int ok = LOG_OK;
    3503       12573 :         sql_table *t = (sql_table*)change->obj;
    3504             : 
    3505       12573 :         if (t->base.deleted)
    3506             :                 return ok;
    3507       12559 :         assert(!isTempTable(t));
    3508       12559 :         ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
    3509       12559 :         if (ok == LOG_OK) {
    3510       74384 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3511       61825 :                         sql_column *c = n->data;
    3512             : 
    3513       61825 :                         ok = log_create_col_(tr, c);
    3514             :                 }
    3515       12559 :                 if (t->idxs) {
    3516       18322 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3517        5763 :                                 sql_idx *i = n->data;
    3518             : 
    3519        5763 :                                 if (ATOMIC_PTR_GET(&i->data))
    3520        5751 :                                         ok = log_create_idx_(tr, i);
    3521             :                         }
    3522             :                 }
    3523             :         }
    3524             :         return ok;
    3525             : }
    3526             : 
    3527             : static int
    3528       20851 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3529             : {
    3530       20851 :         int ok = LOG_OK;
    3531       20851 :         sql_table *t = (sql_table*)change->obj;
    3532       20851 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    3533             : 
    3534       20851 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    3535         120 :                 assert(isTempTable(t));
    3536         120 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    3537         120 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    3538         120 :                 return ok;
    3539             :         }
    3540             : 
    3541       20731 :         if (!commit_ts) /* rollback handled by ? */
    3542             :                 return ok;
    3543       18876 :         ok = segments2cs(tr, dbat->segs, &dbat->cs);
    3544       18876 :         assert(ok == LOG_OK);
    3545       18876 :         if (ok != LOG_OK)
    3546             :                 return ok;
    3547       18876 :         merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
    3548       18876 :         assert(dbat->cs.ts == tr->tid);
    3549       18876 :         dbat->cs.ts = commit_ts;
    3550       18876 :         if (ok == LOG_OK) {
    3551      129553 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3552      110677 :                         sql_column *c = n->data;
    3553      110677 :                         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3554             : 
    3555      110677 :                         ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
    3556             :                 }
    3557       18876 :                 if (t->idxs) {
    3558       24652 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3559        5776 :                                 sql_idx *i = n->data;
    3560        5776 :                                 sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3561             : 
    3562        5776 :                                 if (delta)
    3563        5764 :                                         ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
    3564             :                         }
    3565             :                 }
    3566       18876 :                 if (!tr->parent)
    3567       18874 :                         t->base.new = 0;
    3568             :         }
    3569       18876 :         if (!tr->parent)
    3570       18874 :                 t->base.new = 0;
    3571             :         return ok;
    3572             : }
    3573             : 
    3574             : static int
    3575       19759 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
    3576             : {
    3577       19759 :         gdk_return ok = GDK_SUCCEED;
    3578             : 
    3579       19759 :         sqlstore *store = tr->store;
    3580       19759 :         if (!GDKinmemory(0) && b && b->cs.bid)
    3581       18835 :                 ok = log_bat_transient(store->logger, id);
    3582       19759 :         if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
    3583          25 :                 ok = log_bat_transient(store->logger, -id);
    3584       19759 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3585             : }
    3586             : 
    3587             : static int
    3588      167970 : destroy_col(sqlstore *store, sql_column *c)
    3589             : {
    3590      167970 :         (void)store;
    3591      167970 :         if (ATOMIC_PTR_GET(&c->data))
    3592      167970 :                 destroy_delta(ATOMIC_PTR_GET(&c->data), true);
    3593      167970 :         ATOMIC_PTR_SET(&c->data, NULL);
    3594      167970 :         return LOG_OK;
    3595             : }
    3596             : 
    3597             : static int
    3598       17876 : log_destroy_col_(sql_trans *tr, sql_column *c)
    3599             : {
    3600       17876 :         int ok = LOG_OK;
    3601       17876 :         assert(!isTempTable(c->t));
    3602       17876 :         if (!tr->parent) /* don't write save point commits */
    3603       17876 :                 ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
    3604       17876 :         return ok;
    3605             : }
    3606             : 
    3607             : static int
    3608       17876 : log_destroy_col(sql_trans *tr, sql_change *change)
    3609             : {
    3610       17876 :         sql_column *c = (sql_column*)change->obj;
    3611       17876 :         int res = log_destroy_col_(tr, c);
    3612       17876 :         change->obj = NULL;
    3613       17876 :         column_destroy(tr->store, c);
    3614       17876 :         return res;
    3615             : }
    3616             : 
    3617             : static int
    3618       10609 : destroy_idx(sqlstore *store, sql_idx *i)
    3619             : {
    3620       10609 :         (void)store;
    3621       10609 :         if (ATOMIC_PTR_GET(&i->data))
    3622       10609 :                 destroy_delta(ATOMIC_PTR_GET(&i->data), true);
    3623       10609 :         ATOMIC_PTR_SET(&i->data, NULL);
    3624       10609 :         return LOG_OK;
    3625             : }
    3626             : 
    3627             : static int
    3628        1959 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
    3629             : {
    3630        1959 :         int ok = LOG_OK;
    3631        1959 :         assert(!isTempTable(i->t));
    3632        1959 :         if (ATOMIC_PTR_GET(&i->data)) {
    3633        1883 :                 if (!tr->parent) /* don't write save point commits */
    3634        1883 :                         ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3635             :         }
    3636        1959 :         return ok;
    3637             : }
    3638             : 
    3639             : static int
    3640        1959 : log_destroy_idx(sql_trans *tr, sql_change *change)
    3641             : {
    3642        1959 :         sql_idx *i = (sql_idx*)change->obj;
    3643        1959 :         int res = log_destroy_idx_(tr, i);
    3644        1959 :         change->obj = NULL;
    3645        1959 :         idx_destroy(tr->store, i);
    3646        1959 :         return res;
    3647             : }
    3648             : 
    3649             : static int
    3650       26636 : destroy_del(sqlstore *store, sql_table *t)
    3651             : {
    3652       26636 :         (void)store;
    3653       26636 :         if (ATOMIC_PTR_GET(&t->data))
    3654       26632 :                 destroy_storage(ATOMIC_PTR_GET(&t->data));
    3655       26636 :         ATOMIC_PTR_SET(&t->data, NULL);
    3656       26636 :         return LOG_OK;
    3657             : }
    3658             : 
    3659             : static int
    3660        3258 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
    3661             : {
    3662        3258 :         gdk_return ok = GDK_SUCCEED;
    3663             : 
    3664        3258 :         sqlstore *store = tr->store;
    3665        3258 :         if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
    3666        3258 :             bat && bat->cs.bid)
    3667        3258 :                 ok = log_bat_transient(store->logger, id);
    3668        3258 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3669             : }
    3670             : 
    3671             : static int
    3672        3258 : log_destroy_del(sql_trans *tr, sql_change *change)
    3673             : {
    3674        3258 :         int ok = LOG_OK;
    3675        3258 :         sql_table *t = (sql_table*)change->obj;
    3676             : 
    3677        3258 :         assert(!isTempTable(t));
    3678        3258 :         ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
    3679        3258 :         return ok;
    3680             : }
    3681             : 
    3682             : static int
    3683       23181 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3684             : {
    3685       23181 :         (void)tr;
    3686       23181 :         (void)change;
    3687       23181 :         (void)commit_ts;
    3688       23181 :         (void)oldest;
    3689       23181 :         if (commit_ts)
    3690       23158 :                 change->handled = true;
    3691       23181 :         return 0;
    3692             : }
    3693             : 
    3694             : static int
    3695        3329 : drop_del(sql_trans *tr, sql_table *t)
    3696             : {
    3697        3329 :         int ok = LOG_OK;
    3698             : 
    3699        3329 :         if (!isNew(t)) {
    3700        3329 :                 storage *bat = ATOMIC_PTR_GET(&t->data);
    3701        3394 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, isTempTable(t) ? NULL : &log_destroy_del);
    3702             :         }
    3703        3329 :         return ok;
    3704             : }
    3705             : 
    3706             : static int
    3707       17891 : drop_col(sql_trans *tr, sql_column *c)
    3708             : {
    3709       17891 :         assert(!isNew(c));
    3710       17891 :         sql_delta *d = ATOMIC_PTR_GET(&c->data);
    3711       17891 :         trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, isTempTable(c->t) ? NULL : &log_destroy_col);
    3712       17891 :         return LOG_OK;
    3713             : }
    3714             : 
    3715             : static int
    3716        1961 : drop_idx(sql_trans *tr, sql_idx *i)
    3717             : {
    3718        1961 :         assert(!isNew(i));
    3719        1961 :         sql_delta *d = ATOMIC_PTR_GET(&i->data);
    3720        1961 :         trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, isTempTable(i->t) ? NULL : &log_destroy_idx);
    3721        1961 :         return LOG_OK;
    3722             : }
    3723             : 
    3724             : 
    3725             : static BUN
    3726      129559 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
    3727             : {
    3728      129559 :         BAT *b;
    3729      129559 :         BUN sz = 0;
    3730             : 
    3731      129559 :         (void)tr;
    3732      129559 :         assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
    3733      129559 :         if (cs->bid && renew) {
    3734      129554 :                 b = quick_descriptor(cs->bid);
    3735      129522 :                 if (b) {
    3736      129522 :                         sz += BATcount(b);
    3737      129522 :                         if (cs->st == ST_DICT) {
    3738           2 :                                 bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
    3739           2 :                                 BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
    3740             : 
    3741           2 :                                 if (nebid == BID_NIL || !n) {
    3742           0 :                                         temp_destroy(nebid);
    3743           0 :                                         bat_destroy(n);
    3744           0 :                                         return BUN_NONE;
    3745             :                                 }
    3746           2 :                                 temp_destroy(cs->ebid);
    3747           2 :                                 cs->ebid = nebid;
    3748           2 :                                 if (!temp)
    3749           2 :                                         bat_set_access(n, BAT_READ);
    3750           2 :                                 temp_destroy(cs->bid);
    3751           2 :                                 cs->bid = temp_create(n); /* create empty copy */
    3752           2 :                                 bat_destroy(n);
    3753             :                         } else {
    3754      129520 :                                 bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
    3755             : 
    3756      129445 :                                 if (nbid == BID_NIL)
    3757             :                                         return BUN_NONE;
    3758      129445 :                                 temp_destroy(cs->bid);
    3759      129514 :                                 cs->bid = nbid;
    3760             :                         }
    3761             :                 } else {
    3762             :                         return BUN_NONE;
    3763             :                 }
    3764             :         }
    3765      129521 :         if (cs->uibid) {
    3766      129377 :                 temp_destroy(cs->uibid);
    3767      129428 :                 cs->uibid = 0;
    3768             :         }
    3769      129572 :         if (cs->uvbid) {
    3770      129428 :                 temp_destroy(cs->uvbid);
    3771      129429 :                 cs->uvbid = 0;
    3772             :         }
    3773      129573 :         cs->cleared = true;
    3774      129573 :         cs->ucnt = 0;
    3775      129573 :         return sz;
    3776             : }
    3777             : 
    3778             : static BUN
    3779      129392 : clear_col(sql_trans *tr, sql_column *c, bool renew)
    3780             : {
    3781      129392 :         bool update_conflict = false;
    3782      129392 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    3783             : 
    3784      129392 :         if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
    3785           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3786      129419 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    3787      129419 :         if (odelta != delta)
    3788      129424 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    3789      129417 :         if (delta)
    3790      129417 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
    3791             :         return 0;
    3792             : }
    3793             : 
    3794             : static BUN
    3795          21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
    3796             : {
    3797          21 :         bool update_conflict = false;
    3798          21 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    3799             : 
    3800          21 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    3801          15 :                 return 0;
    3802           6 :         if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
    3803           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3804           6 :         assert(i->t->persistence != SQL_DECLARED_TABLE);
    3805           6 :         if (odelta != delta)
    3806           6 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    3807           6 :         if (delta)
    3808           6 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
    3809             :         return 0;
    3810             : }
    3811             : 
    3812             : static int
    3813         142 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
    3814             : {
    3815         142 :         if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
    3816             :                 return LOG_ERR;
    3817         142 :         if (s->segs)
    3818         142 :                 destroy_segments(s->segs);
    3819         142 :         if (!(s->segs = new_segments(tr, 0)))
    3820             :                 return LOG_ERR;
    3821             :         return LOG_OK;
    3822             : }
    3823             : 
    3824             : 
    3825             : /*
    3826             :  * Clear the table, in general this means replacing the storage,
    3827             :  * but in case of earlier deletes (or inserts by this transaction), we only mark
    3828             :  * all segments as deleted.
    3829             :  * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
    3830             :  */
    3831             : static BUN
    3832       41822 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
    3833             : {
    3834       41822 :         int clear = !in_transaction, ok = LOG_OK;
    3835       41822 :         bool conflict = false;
    3836       41822 :         storage *bat;
    3837             : 
    3838       41873 :         if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
    3839       15807 :                 return conflict?BUN_NONE-1:BUN_NONE;
    3840             : 
    3841       26019 :         if (!clear) {
    3842          51 :                 lock_table(tr->store, t->base.id);
    3843          51 :                 ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
    3844          51 :                 unlock_table(tr->store, t->base.id);
    3845             :         }
    3846       26019 :         assert(t->persistence != SQL_DECLARED_TABLE);
    3847       26019 :         if (!in_transaction)
    3848       25971 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    3849       26015 :         if (ok == LOG_ERR)
    3850             :                 return BUN_NONE;
    3851       26015 :         if (ok == LOG_CONFLICT)
    3852           0 :                 return BUN_NONE - 1;
    3853             :         return LOG_OK;
    3854             : }
    3855             : 
    3856             : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
    3857             : static BUN
    3858       41799 : clear_table(sql_trans *tr, sql_table *t)
    3859             : {
    3860       41799 :         node *n = ol_first_node(t->columns);
    3861       41799 :         sql_column *c = n->data;
    3862       41799 :         storage *d = tab_timestamp_storage(tr, t);
    3863       41822 :         int in_transaction, clear;
    3864       41822 :         BUN sz, clear_ok;
    3865             : 
    3866       41822 :         if (!d)
    3867             :                 return BUN_NONE;
    3868       41822 :         in_transaction = segments_in_transaction(tr, t);
    3869       41815 :         clear = !in_transaction;
    3870       41815 :         sz = count_col(tr, c, CNT_ACTIVE);
    3871       41821 :         if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
    3872             :                 return clear_ok;
    3873             : 
    3874       26015 :         if (in_transaction)
    3875             :                 return sz;
    3876             : 
    3877      155385 :         for (; n; n = n->next) {
    3878      129417 :                 c = n->data;
    3879             : 
    3880      129417 :                 if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
    3881           0 :                         return clear_ok;
    3882             :         }
    3883       25968 :         if (t->idxs) {
    3884       25989 :                 for (n = ol_first_node(t->idxs); n; n = n->next) {
    3885          21 :                         sql_idx *ci = n->data;
    3886             : 
    3887          21 :                         if (isTable(ci->t) && idx_has_column(ci->type) &&
    3888          21 :                                 (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
    3889           0 :                                 return clear_ok;
    3890             :                 }
    3891             :         }
    3892             :         return sz;
    3893             : }
    3894             : 
    3895             : static int
    3896      158767 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
    3897             : {
    3898      158767 :         sqlstore *store = tr->store;
    3899      158767 :         gdk_return ok = GDK_SUCCEED;
    3900             : 
    3901      158767 :         (void) t;
    3902      158767 :         (void) segs;
    3903      158767 :         if (GDKinmemory(0))
    3904             :                 return LOG_OK;
    3905             : 
    3906      158760 :         if (cs->cleared) {
    3907      155448 :                 assert(cs->ucnt == 0);
    3908      155448 :                 BAT *ins = temp_descriptor(cs->bid);
    3909      155448 :                 if (!ins)
    3910             :                         return LOG_ERR;
    3911      155448 :                 assert(!isEbat(ins));
    3912      155448 :                 bat_set_access(ins, BAT_READ);
    3913      155448 :                 ok = log_bat_persists(store->logger, ins, id);
    3914      155448 :                 bat_destroy(ins);
    3915      155448 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    3916          56 :                         BAT *ins = temp_descriptor(cs->ebid);
    3917          56 :                         if (!ins)
    3918             :                                 return LOG_ERR;
    3919          56 :                         assert(!isEbat(ins));
    3920          56 :                         bat_set_access(ins, BAT_READ);
    3921          56 :                         ok = log_bat_persists(store->logger, ins, -id);
    3922          56 :                         bat_destroy(ins);
    3923             :                 }
    3924      155448 :                 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3925             :         }
    3926             : 
    3927        3312 :         assert(!isTempTable(t));
    3928             : 
    3929        3312 :         if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
    3930        2741 :                 BAT *ui = temp_descriptor(cs->uibid);
    3931        2741 :                 BAT *uv = temp_descriptor(cs->uvbid);
    3932             :                 /* any updates */
    3933        2741 :                 if (ui == NULL || uv == NULL) {
    3934             :                         ok = GDK_FAIL;
    3935        2741 :                 } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
    3936        2741 :                         ok = log_delta(store->logger, ui, uv, id);
    3937        2741 :                 bat_destroy(ui);
    3938        2741 :                 bat_destroy(uv);
    3939             :         }
    3940        2741 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3941             : }
    3942             : 
    3943             : static inline int
    3944       58586 : tr_log_table_start(sql_trans *tr, sql_table *t) {
    3945       58586 :         sqlstore *store = tr->store;
    3946       58586 :         return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3947             : }
    3948             : 
    3949             : static inline int
    3950       58586 : tr_log_table_end(sql_trans *tr, sql_table *t) {
    3951       58586 :         sqlstore *store = tr->store;
    3952       58586 :         return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3953             : }
    3954             : 
    3955             : static int
    3956       58586 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
    3957             : {
    3958       58586 :         sqlstore *store = tr->store;
    3959       58586 :         gdk_return ok = GDK_SUCCEED;
    3960             : 
    3961       58586 :         size_t end = segs_end(segs, tr, t);
    3962             : 
    3963       58586 :         if (tr_log_table_start(tr, t) != LOG_OK)
    3964             :                 return LOG_ERR;
    3965             : 
    3966       58586 :         size_t nr_appends = 0;
    3967             : 
    3968       58586 :         lock_table(tr->store, t->base.id);
    3969      403161 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    3970      344575 :                 unlock_table(tr->store, t->base.id);
    3971             : 
    3972      344575 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    3973      112790 :                         if (!seg->deleted) {
    3974       46526 :                                 if (log_segment(tr, seg, t->base.id) != LOG_OK)
    3975             :                                         return LOG_ERR;
    3976             : 
    3977       46526 :                                 nr_appends += (seg->end - seg->start);
    3978             :                         }
    3979             :                 }
    3980      344575 :                 lock_table(tr->store, t->base.id);
    3981             :         }
    3982       58586 :         unlock_table(tr->store, t->base.id);
    3983             : 
    3984      405950 :         for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
    3985      347364 :                 sql_column *c = n->data;
    3986      347364 :                 column_storage *cs = ATOMIC_PTR_GET(&c->data);
    3987             : 
    3988      347364 :                 if (cs->cleared) {
    3989           3 :                         ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    3990           3 :                         continue;
    3991             :                 }
    3992             : 
    3993      347361 :                 lock_table(tr->store, t->base.id);
    3994      347361 :                 if (!cs->cleared) {
    3995     2369596 :                         for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    3996     2022235 :                                 unlock_table(tr->store, t->base.id);
    3997     2022235 :                                 if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    3998             :                                         /* append col*/
    3999      262748 :                                         BAT *ins = temp_descriptor(cs->bid);
    4000      262748 :                                         if (ins == NULL)
    4001             :                                                 return LOG_ERR;
    4002      262748 :                                         assert(BATcount(ins) >= cur->end);
    4003      262748 :                                         ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
    4004      262748 :                                         bat_destroy(ins);
    4005             :                                 }
    4006     2022235 :                                 lock_table(tr->store, t->base.id);
    4007             :                         }
    4008             :                 }
    4009      347361 :                 unlock_table(tr->store, t->base.id);
    4010             : 
    4011      347361 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    4012          19 :                         BAT *ins = temp_descriptor(cs->ebid);
    4013          19 :                         if (ins == NULL)
    4014             :                                 return LOG_ERR;
    4015          19 :                         if (BATcount(ins) > ins->batInserted)
    4016          17 :                                 ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
    4017          19 :                         BATcommit(ins, BATcount(ins));
    4018          19 :                         bat_destroy(ins);
    4019             :                 }
    4020             :         }
    4021             : 
    4022       58586 :         if (t->idxs) {
    4023       63768 :                 for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
    4024        5182 :                         sql_idx *i = n->data;
    4025             : 
    4026        5182 :                         if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    4027        4404 :                                 continue;
    4028         778 :                         column_storage *cs = ATOMIC_PTR_GET(&i->data);
    4029             : 
    4030         778 :                         if (cs) {
    4031         778 :                                 if (cs->cleared) {
    4032           0 :                                         ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    4033           0 :                                         continue;
    4034             :                                 }
    4035             : 
    4036         778 :                                 lock_table(tr->store, t->base.id);
    4037        2387 :                                 for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    4038        1609 :                                         unlock_table(tr->store, t->base.id);
    4039        1609 :                                         if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4040             :                                                 /* append idx */
    4041         730 :                                                 BAT *ins = temp_descriptor(cs->bid);
    4042         730 :                                                 if (ins == NULL)
    4043             :                                                         return LOG_ERR;
    4044         730 :                                                 assert(BATcount(ins) >= cur->end);
    4045         730 :                                                 ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
    4046         730 :                                                 bat_destroy(ins);
    4047             :                                         }
    4048        1609 :                                         lock_table(tr->store, t->base.id);
    4049             :                                 }
    4050         778 :                                 unlock_table(tr->store, t->base.id);
    4051             :                         }
    4052             :                 }
    4053             :         }
    4054             : 
    4055       58586 :         if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
    4056           0 :                 return LOG_ERR;
    4057             : 
    4058             :         return LOG_OK;
    4059             : }
    4060             : 
    4061             : static int
    4062       84548 : log_storage(sql_trans *tr, sql_table *t, storage *s)
    4063             : {
    4064       84548 :         int ok = LOG_OK;
    4065       84548 :         bool cleared = s->cs.cleared;
    4066       84548 :         if (ok == LOG_OK && cleared)
    4067       25962 :                 ok =  tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
    4068       25962 :         if (ok == LOG_OK)
    4069       84548 :                 ok = log_segments(tr, s->segs, t->base.id);
    4070       84548 :         if (ok == LOG_OK && !cleared)
    4071       58586 :                 ok = log_table_append(tr, t, s->segs);
    4072       84548 :         return ok;
    4073             : }
    4074             : 
    4075             : static void
    4076      418499 : merge_cs( column_storage *cs, const char* caller)
    4077             : {
    4078      418499 :         if (cs->bid && cs->ucnt) {
    4079        2750 :                 BAT *cur = temp_descriptor(cs->bid);
    4080        2750 :                 BAT *ui = temp_descriptor(cs->uibid);
    4081        2750 :                 BAT *uv = temp_descriptor(cs->uvbid);
    4082             : 
    4083        2750 :                 if (!cur || !ui || !uv) {
    4084           0 :                         bat_destroy(ui);
    4085           0 :                         bat_destroy(uv);
    4086           0 :                         bat_destroy(cur);
    4087           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4088             :                         return;
    4089             :                 }
    4090        2750 :                 assert(BATcount(ui) == BATcount(uv));
    4091             : 
    4092             :                 /* any updates */
    4093        2750 :                 assert(!isEbat(cur));
    4094        2750 :                 if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
    4095           0 :                         bat_destroy(ui);
    4096           0 :                         bat_destroy(uv);
    4097           0 :                         bat_destroy(cur);
    4098           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4099             :                         return;
    4100             :                 }
    4101             :                 /* cleanup the old deltas */
    4102        2750 :                 temp_destroy(cs->uibid);
    4103        2750 :                 temp_destroy(cs->uvbid);
    4104        2750 :                 cs->uibid = e_bat(TYPE_oid);
    4105        2750 :                 cs->uvbid = e_bat(cur->ttype);
    4106        2750 :                 assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
    4107        2750 :                 cs->ucnt = 0;
    4108        2750 :                 bat_destroy(ui);
    4109        2750 :                 bat_destroy(uv);
    4110        2750 :                 bat_destroy(cur);
    4111             :         }
    4112      418499 :         cs->cleared = false;
    4113      418499 :         cs->merged = true;
    4114      418499 :         return;
    4115             : }
    4116             : 
    4117             : static void
    4118      376572 : merge_delta( sql_delta *obat)
    4119             : {
    4120      376572 :         if (obat && obat->next && !obat->cs.merged)
    4121      132812 :                 merge_delta(obat->next);
    4122      376572 :         merge_cs(&obat->cs, __func__);
    4123      376572 : }
    4124             : 
    4125             : static void
    4126       41927 : merge_storage(storage *tdb)
    4127             : {
    4128       41927 :         merge_cs(&tdb->cs, __func__);
    4129             : 
    4130       41927 :         if (tdb->next) {
    4131         281 :                 destroy_storage(tdb->next);
    4132         281 :                 tdb->next = NULL;
    4133             :         }
    4134       41927 : }
    4135             : 
    4136             : static sql_delta *
    4137           1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
    4138             : {
    4139             :         /* commit ie copy back to the parent transaction */
    4140           1 :         if (delta && delta->cs.ts == commit_ts && delta->next) {
    4141           1 :                 sql_delta *od = delta->next;
    4142           1 :                 if (od->cs.ts == commit_ts) {
    4143           0 :                         sql_delta t = *od, *n = od->next;
    4144           0 :                         *od = *delta;
    4145           0 :                         od->next = n;
    4146           0 :                         *delta = t;
    4147           0 :                         delta->next = NULL;
    4148           0 :                         destroy_delta(delta, true);
    4149           0 :                         return od;
    4150             :                 }
    4151             :         }
    4152             :         return delta;
    4153             : }
    4154             : 
    4155             : static int
    4156      132776 : log_update_col( sql_trans *tr, sql_change *change)
    4157             : {
    4158      132776 :         sql_column *c = (sql_column*)change->obj;
    4159      132776 :         assert(!isTempTable(c->t));
    4160             : 
    4161      132776 :         if (isDeleted(c->t)) {
    4162           0 :                 change->handled = true;
    4163           0 :                 return LOG_OK;
    4164             :         }
    4165             : 
    4166      132776 :         if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
    4167      132776 :                 storage *s = ATOMIC_PTR_GET(&c->t->data);
    4168      132776 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    4169      132776 :                 return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
    4170             :         }
    4171             :         return LOG_OK;
    4172             : }
    4173             : 
    4174             : static int
    4175         217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
    4176             : {
    4177         217 :         sqlstore *store = Store;
    4178             : 
    4179         217 :         sql_delta *d = (sql_delta*)change->data;
    4180         217 :         if (d->cs.ts < oldest) {
    4181          82 :                 destroy_delta(d, false);
    4182          82 :                 if (change->commit == &commit_update_idx)
    4183           2 :                         table_destroy(store, ((sql_idx*)change->obj)->t);
    4184             :                 else
    4185          80 :                         table_destroy(store, ((sql_column*)change->obj)->t);
    4186          82 :                 return 1;
    4187             :         }
    4188         135 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4189          82 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4190             :         return 0;
    4191             : }
    4192             : 
    4193             : static int
    4194           9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
    4195             : {
    4196           9 :         sqlstore *store = Store;
    4197             : 
    4198           9 :         storage *d = (storage*)change->data;
    4199           9 :         if (d->cs.ts < oldest) {
    4200           3 :                 destroy_storage(d);
    4201           3 :                 table_destroy(store, (sql_table*)change->obj);
    4202           3 :                 return 1;
    4203             :         }
    4204           6 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4205           3 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4206             :         return 0;
    4207             : }
    4208             : 
    4209             : static int
    4210      132894 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
    4211             : {
    4212      132894 :         (void) type; // TODO transaction_layer_revamp remove if remains unused
    4213             : 
    4214      132894 :         sql_delta *delta = ATOMIC_PTR_GET(data);
    4215             : 
    4216      132894 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4217           3 :                 int ok = LOG_OK;
    4218           3 :                 assert(isTempTable(t));
    4219           3 :                 if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
    4220           0 :                         ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
    4221           3 :                 if (!tr->parent)
    4222           0 :                         t->base.new = base->new = 0;
    4223           3 :                 change->handled = true;
    4224           3 :                 return ok;
    4225             :         }
    4226             : 
    4227      132891 :         if (commit_ts)
    4228      132809 :                 delta->cs.ts = commit_ts;
    4229      132891 :         if (!commit_ts) { /* rollback */
    4230          82 :                 sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
    4231             : 
    4232          82 :                 if (change->ts && t->base.new) /* handled by create col */
    4233             :                         return LOG_OK;
    4234          82 :                 if (o != d) {
    4235           0 :                         while(o && o->next != d)
    4236             :                                 o = o->next;
    4237             :                 }
    4238          82 :                 if (o == ATOMIC_PTR_GET(data))
    4239          82 :                         ATOMIC_PTR_SET(data, d->next);
    4240             :                 else
    4241           0 :                         o->next = d->next;
    4242          82 :                 d->next = NULL;
    4243          82 :                 change->cleanup = &tc_gc_rollbacked;
    4244      132809 :         } else if (!tr->parent) {
    4245             :                 /* merge deltas */
    4246      277032 :                 while (delta && delta->cs.ts > oldest)
    4247      144224 :                         delta = delta->next;
    4248      132808 :                 if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
    4249       29950 :                         lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
    4250       29950 :                         merge_delta(delta);
    4251       29950 :                         unlock_column(tr->store, base->id);
    4252             :                 }
    4253           1 :         } else if (tr->parent) /* move delta into older and cleanup current save points */
    4254           1 :                 ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
    4255             :         return LOG_OK;
    4256             : }
    4257             : 
    4258             : static int
    4259      132866 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4260             : {
    4261             : 
    4262      132866 :         sql_column *c = (sql_column*)change->obj;
    4263      132866 :         sql_base* base = &c->base;
    4264      132866 :         sql_table* t = c->t;
    4265      132866 :         ATOMIC_PTR_TYPE* data = &c->data;
    4266      132866 :         int type = c->type.type->localtype;
    4267             : 
    4268      132866 :         if (change->handled || isDeleted(c->t))
    4269             :                 return LOG_OK;
    4270             : 
    4271      132866 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4272             : }
    4273             : 
    4274             : static int
    4275          26 : log_update_idx( sql_trans *tr, sql_change *change)
    4276             : {
    4277          26 :         sql_idx *i = (sql_idx*)change->obj;
    4278          26 :         assert(!isTempTable(i->t));
    4279             : 
    4280          26 :         if (isDeleted(i->t)) {
    4281           0 :                 change->handled = true;
    4282           0 :                 return LOG_OK;
    4283             :         }
    4284             : 
    4285          26 :         if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
    4286          26 :                 storage *s = ATOMIC_PTR_GET(&i->t->data);
    4287          26 :                 sql_delta *d = ATOMIC_PTR_GET(&i->data);
    4288          26 :                 return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
    4289             :         }
    4290             :         return LOG_OK;
    4291             : }
    4292             : 
    4293             : static int
    4294          28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4295             : {
    4296          28 :         sql_idx *i = (sql_idx*)change->obj;
    4297          28 :         sql_base* base = &i->base;
    4298          28 :         sql_table* t = i->t;
    4299          28 :         ATOMIC_PTR_TYPE* data = &i->data;
    4300          28 :         int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
    4301             : 
    4302          28 :         if (change->handled || isDeleted(i->t))
    4303             :                 return LOG_OK;
    4304             : 
    4305          28 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4306             : }
    4307             : 
    4308             : static storage *
    4309          26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
    4310             : {
    4311          26 :         if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
    4312           0 :                 storage *od = dbat->next;
    4313           0 :                 if (od->cs.ts == commit_ts) {
    4314           0 :                         storage t = *od, *n = od->next;
    4315           0 :                         *od = *dbat;
    4316           0 :                         od->next = n;
    4317           0 :                         *dbat = t;
    4318           0 :                         dbat->next = NULL;
    4319           0 :                         destroy_storage(dbat);
    4320           0 :                         return od;
    4321             :                 }
    4322             :         }
    4323             :         return dbat;
    4324             : }
    4325             : 
    4326             : static int
    4327       84548 : log_update_del( sql_trans *tr, sql_change *change)
    4328             : {
    4329       84548 :         sql_table *t = (sql_table*)change->obj;
    4330       84548 :         assert(!isTempTable(t));
    4331             : 
    4332       84548 :         if (isDeleted(t)) {
    4333           0 :                 change->handled = true;
    4334           0 :                 return LOG_OK;
    4335             :         }
    4336             : 
    4337       84548 :         if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
    4338       84548 :                 return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
    4339             :         return LOG_OK;
    4340             : }
    4341             : 
    4342             : static int
    4343       89066 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4344             : {
    4345       89066 :         int ok = LOG_OK;
    4346       89066 :         sql_table *t = (sql_table*)change->obj;
    4347       89066 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    4348             : 
    4349       89066 :         if (change->handled || isDeleted(t))
    4350             :                 return ok;
    4351             : 
    4352       89066 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4353          22 :                 assert(isTempTable(t));
    4354          22 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    4355          22 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    4356          22 :                 change->handled = true;
    4357          22 :                 return ok;
    4358             :         }
    4359             : 
    4360       89044 :         lock_table(tr->store, t->base.id);
    4361       89044 :         if (!commit_ts) { /* rollback */
    4362        4196 :                 if (dbat->cs.ts == tr->tid) {
    4363           6 :                         if (change->ts && t->base.new) { /* handled by the create table */
    4364           3 :                                 unlock_table(tr->store, t->base.id);
    4365           3 :                                 return ok;
    4366             :                         }
    4367           3 :                         storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
    4368             : 
    4369           3 :                         if (o != d) {
    4370           0 :                                 while(o && o->next != d)
    4371             :                                         o = o->next;
    4372             :                         }
    4373           3 :                         if (o == ATOMIC_PTR_GET(&t->data)) {
    4374           3 :                                 assert(d->next);
    4375           3 :                                 ATOMIC_PTR_SET(&t->data, d->next);
    4376             :                         } else
    4377           0 :                                 o->next = d->next;
    4378           3 :                         d->next = NULL;
    4379           3 :                         change->cleanup = &tc_gc_rollbacked_storage;
    4380             :                 } else
    4381        4190 :                         rollback_segments(dbat->segs, tr, change, oldest);
    4382       84848 :         } else if (ok == LOG_OK && !tr->parent) {
    4383       84822 :                 if (dbat->cs.ts == tr->tid) /* cleared table */
    4384       25964 :                         dbat->cs.ts = commit_ts;
    4385             : 
    4386       84822 :                 ok = segments2cs(tr, dbat->segs, &dbat->cs);
    4387       84822 :                 if (ok == LOG_OK) {
    4388       84822 :                         merge_segments(dbat, tr, change, commit_ts, oldest);
    4389       84822 :                         if (oldest == commit_ts)
    4390       41927 :                                 merge_storage(dbat);
    4391             :                 }
    4392       84822 :                 if (dbat)
    4393       84822 :                         dbat->cs.cleared = false;
    4394          26 :         } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
    4395          26 :                 merge_segments(dbat, tr, change, commit_ts, oldest);
    4396          26 :                 ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
    4397          26 :                 storage *s = change->data;
    4398          26 :                 if (s->cs.ts == tr->tid)
    4399           0 :                         s->cs.ts = commit_ts;
    4400             :         }
    4401       89041 :         unlock_table(tr->store, t->base.id);
    4402       89041 :         return ok;
    4403             : }
    4404             : 
    4405             : /* only rollback (content version) case for now */
    4406             : static int
    4407       18048 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
    4408             : {
    4409       18048 :         sqlstore *store = Store;
    4410       18048 :         sql_column *c = (sql_column*)change->obj;
    4411             : 
    4412       18048 :         if (!c) /* cleaned earlier */
    4413             :                 return 1;
    4414             : 
    4415         172 :         if (change->handled || isDeleted(c->t)) {
    4416           0 :                 column_destroy(store, c);
    4417           0 :                 return 1;
    4418             :         }
    4419             : 
    4420             :         /* savepoint commit (did it merge ?) */
    4421         172 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4422           0 :                 column_destroy(store, c);
    4423           0 :                 return 1;
    4424             :         }
    4425         172 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4426             :                 return 0;
    4427         171 :         sql_delta *d = (sql_delta*)change->data;
    4428         171 :         if (d && d->next) {
    4429             : 
    4430          66 :                 if (d->cs.ts > oldest)
    4431             :                         return LOG_OK; /* cannot cleanup yet */
    4432             : 
    4433             :                 // d is oldest reachable delta
    4434          63 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4435          62 :                         destroy_delta(d->next, true);
    4436          62 :                         d->next = NULL;
    4437             :                 }
    4438          63 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4439          63 :                 merge_delta(d);
    4440          63 :                 unlock_column(store, c->base.id);
    4441             :         }
    4442         168 :         column_destroy(store, c);
    4443         168 :         return 1;
    4444             : }
    4445             : 
    4446             : static int
    4447      804016 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
    4448             : {
    4449      804016 :         sqlstore *store = Store;
    4450      804016 :         sql_column *c = (sql_column*)change->obj;
    4451             : 
    4452      804016 :         if (!c) /* cleaned earlier */
    4453             :                 return 1;
    4454             : 
    4455      804016 :         if (change->handled || isDeleted(c->t)) {
    4456           3 :                 table_destroy(store, c->t);
    4457           3 :                 return 1;
    4458             :         }
    4459             : 
    4460             :         /* savepoint commit (did it merge ?) */
    4461      804013 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4462       36160 :                 table_destroy(store, c->t);
    4463       36160 :                 return 1;
    4464             :         }
    4465      767853 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4466             :                 return 0;
    4467      767852 :         sql_delta *d = (sql_delta*)change->data;
    4468      767852 :         if (d && d->next) {
    4469             : 
    4470      767852 :                 if (d->cs.ts > oldest)
    4471             :                         return LOG_OK; /* cannot cleanup yet */
    4472             : 
    4473             :                 // d is oldest reachable delta
    4474       96559 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4475        4297 :                         destroy_delta(d->next, true);
    4476        4297 :                         d->next = NULL;
    4477             :                 }
    4478       96559 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4479       96559 :                 merge_delta(d);
    4480       96559 :                 unlock_column(store, c->base.id);
    4481             :         }
    4482       96559 :         table_destroy(store, c->t);
    4483       96559 :         return 1;
    4484             : }
    4485             : 
    4486             : static int
    4487        2592 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
    4488             : {
    4489        2592 :         sqlstore *store = Store;
    4490        2592 :         sql_idx *i = (sql_idx*)change->obj;
    4491             : 
    4492        2592 :         if (!i) /* cleaned earlier */
    4493             :                 return 1;
    4494             : 
    4495         633 :         if (change->handled || isDeleted(i->t)) {
    4496           0 :                 idx_destroy(store, i);
    4497           0 :                 return 1;
    4498             :         }
    4499             : 
    4500             :         /* savepoint commit (did it merge ?) */
    4501         633 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4502           0 :                 idx_destroy(store, i);
    4503           0 :                 return 1;
    4504             :         }
    4505         633 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4506             :                 return 0;
    4507         633 :         sql_delta *d = (sql_delta*)change->data;
    4508         633 :         if (d->next) {
    4509             : 
    4510           0 :                 if (d->cs.ts > oldest)
    4511             :                         return LOG_OK; /* cannot cleanup yet */
    4512             : 
    4513             :                 // d is oldest reachable delta
    4514           0 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4515           0 :                         destroy_delta(d->next, true);
    4516           0 :                         d->next = NULL;
    4517             :                 }
    4518           0 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4519           0 :                 merge_delta(d);
    4520           0 :                 unlock_column(store, i->base.id);
    4521             :         }
    4522         633 :         idx_destroy(store, i);
    4523         633 :         return 1;
    4524             : }
    4525             : 
    4526             : static int
    4527          26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
    4528             : {
    4529          26 :         sqlstore *store = Store;
    4530          26 :         sql_idx *i = (sql_idx*)change->obj;
    4531             : 
    4532          26 :         if (!i) /* cleaned earlier */
    4533             :                 return 1;
    4534             : 
    4535          26 :         if (change->handled || isDeleted(i->t)) {
    4536           0 :                 table_destroy(store, i->t);
    4537           0 :                 return 1;
    4538             :         }
    4539             : 
    4540             :         /* savepoint commit (did it merge ?) */
    4541          26 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4542           0 :                 table_destroy(store, i->t);
    4543           0 :                 return 1;
    4544             :         }
    4545          26 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4546             :                 return 0;
    4547          26 :         sql_delta *d = (sql_delta*)change->data;
    4548          26 :         if (d->next) {
    4549             : 
    4550          26 :                 if (d->cs.ts > oldest)
    4551             :                         return LOG_OK; /* cannot cleanup yet */
    4552             : 
    4553             :                 // d is oldest reachable delta
    4554          26 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4555          26 :                         destroy_delta(d->next, true);
    4556          26 :                         d->next = NULL;
    4557             :                 }
    4558          26 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4559          26 :                 merge_delta(d);
    4560          26 :                 unlock_column(store, i->base.id);
    4561             :         }
    4562          26 :         table_destroy(store, i->t);
    4563          26 :         return 1;
    4564             : }
    4565             : 
    4566             : static int
    4567      234046 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
    4568             : {
    4569      234046 :         sqlstore *store = Store;
    4570      234046 :         sql_table *t = (sql_table*)change->obj;
    4571             : 
    4572      234046 :         if (change->handled || isDeleted(t)) {
    4573        3503 :                 table_destroy(store, t);
    4574        3503 :                 return 1;
    4575             :         }
    4576             :         /* savepoint commit (did it merge ?) */
    4577      230543 :         if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
    4578        7349 :                 table_destroy(store, t);
    4579        7349 :                 return 1;
    4580             :         }
    4581      223194 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4582             :                 return 0;
    4583      223166 :         storage *d = (storage*)change->data;
    4584      223166 :         if (d->next) {
    4585      151912 :                 if (d->cs.ts > oldest)
    4586             :                         return LOG_OK; /* cannot cleanup yet */
    4587             : 
    4588       18335 :                 destroy_storage(d->next);
    4589       18335 :                 d->next = NULL;
    4590             :         }
    4591       89589 :         table_destroy(store, t);
    4592       89589 :         return 1;
    4593             : }
    4594             : 
    4595             : static int
    4596       30459 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
    4597             : {
    4598       30459 :         if (nr == 0)
    4599             :                 return LOG_OK;
    4600       30459 :         assert (nr > 0);
    4601       30459 :         if ((!offsets || !*offsets) && nr == total) {
    4602       30405 :                 *offset = slot;
    4603       30405 :                 return LOG_OK;
    4604             :         }
    4605          54 :         if (!*offsets) {
    4606           7 :                 *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
    4607           7 :                 if (!*offsets)
    4608             :                         return LOG_ERR;
    4609             :         }
    4610          54 :         oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
    4611       16193 :         for(size_t i = 0; i < nr; i++)
    4612       16139 :                 dst[i] = slot + i;
    4613          54 :         (*offsets)->batCount += nr;
    4614          54 :         (*offsets)->theap->dirty = true;
    4615          54 :         return LOG_OK;
    4616             : }
    4617             : 
    4618             : static int
    4619       30359 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4620             : {
    4621       30359 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4622       30598 :         assert(s->segs);
    4623       30598 :         ulng oldest = store_oldest(tr->store, NULL);
    4624       30416 :         BUN slot = 0;
    4625       30416 :         size_t total = cnt;
    4626             : 
    4627       30416 :         if (!locked)
    4628       30414 :                 lock_table(tr->store, t->base.id);
    4629             :         /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
    4630       61257 :         for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4631       30716 :                 if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* reuse old deleted or rolled back append */
    4632          35 :                         if ((seg->end - seg->start) >= cnt) {
    4633             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4634          13 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4635           2 :                                         slot = p->end;
    4636           2 :                                         p->end += cnt;
    4637           2 :                                         seg->start += cnt;
    4638           2 :                                         if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
    4639             :                                                 ok = LOG_ERR;
    4640             :                                                 break;
    4641             :                                         }
    4642           2 :                                         cnt = 0;
    4643           2 :                                         break;
    4644             :                                 }
    4645             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4646          11 :                                 size_t rcnt = seg->end - seg->start;
    4647          11 :                                 if (rcnt > cnt)
    4648             :                                         rcnt = cnt;
    4649          11 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
    4650             :                                         ok = LOG_ERR;
    4651             :                                         break;
    4652             :                                 }
    4653             :                         }
    4654          33 :                         seg->ts = tr->tid;
    4655          33 :                         seg->deleted = false;
    4656          33 :                         slot = seg->start;
    4657          33 :                         if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
    4658             :                                 ok = LOG_ERR;
    4659             :                                 break;
    4660             :                         }
    4661           0 :                         cnt -= (seg->end - seg->start);
    4662             :                 }
    4663             :         }
    4664       30543 :         if (ok == LOG_OK && cnt) {
    4665       30530 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4666       29447 :                         slot = s->segs->t->end;
    4667       29447 :                         s->segs->t->end += cnt;
    4668             :                 } else {
    4669        1083 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4670             :                                 ok = LOG_ERR;
    4671             :                         } else {
    4672        1135 :                                 if (!s->segs->h)
    4673           0 :                                         s->segs->h = s->segs->t;
    4674        1135 :                                 slot = s->segs->t->start;
    4675             :                         }
    4676             :                 }
    4677       30582 :                 if (ok == LOG_OK)
    4678       30582 :                         ok = add_offsets(slot, cnt, total, offset, offsets);
    4679             :         }
    4680       30408 :         if (!locked)
    4681       30411 :                 unlock_table(tr->store, t->base.id);
    4682             : 
    4683       30610 :         if (ok == LOG_OK) {
    4684             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4685       30613 :                 if (!in_transaction) {
    4686        1120 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4687        1120 :                         in_transaction = true;
    4688             :                 }
    4689       30613 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4690       30568 :                         tr->logchanges += (lng) total;
    4691       30558 :                 if (*offsets) {
    4692           0 :                         BAT *pos = *offsets;
    4693           0 :                         assert(BATcount(pos) == total);
    4694           0 :                         BATsetcount(pos, total); /* set other properties */
    4695           7 :                         pos->tnil = false;
    4696           7 :                         pos->tnonil = true;
    4697           7 :                         pos->tkey = true;
    4698           7 :                         pos->tsorted = true;
    4699           7 :                         pos->trevsorted = false;
    4700             :                 }
    4701             :         }
    4702       30585 :         return ok;
    4703             : }
    4704             : 
    4705             : static int
    4706     2041547 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4707             : {
    4708     2041547 :         if (cnt > 1 && offsets)
    4709       30359 :                 return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
    4710     2011188 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4711     2011245 :         assert(s->segs);
    4712     2011245 :         ulng oldest = store_oldest(tr->store, NULL);
    4713     2011200 :         BUN slot = 0;
    4714     2011200 :         int reused = 0;
    4715             : 
    4716     2011200 :         if (!locked)
    4717     1886193 :                 lock_table(tr->store, t->base.id);
    4718             :         /* naive vacuum approach, iterator through segments, check for large enough deleted segments
    4719             :          * or create new segment at the end */
    4720     7472304 :         for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4721     5524493 :                 if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* reuse old deleted or rolled back append */
    4722             : 
    4723       63444 :                         if ((seg->end - seg->start) >= cnt) {
    4724             : 
    4725             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4726       63444 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4727       48387 :                                         slot = p->end;
    4728       48387 :                                         p->end += cnt;
    4729       48387 :                                         seg->start += cnt;
    4730       48387 :                                         reused = 1;
    4731       48387 :                                         break;
    4732             :                                 }
    4733             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4734       15057 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
    4735             :                                         ok = LOG_ERR;
    4736             :                                         break;
    4737             :                                 }
    4738             :                         }
    4739       15057 :                         seg->ts = tr->tid;
    4740       15057 :                         seg->deleted = false;
    4741       15057 :                         slot = seg->start;
    4742       15057 :                         reused = 1;
    4743       15057 :                         break;
    4744             :                 }
    4745             :         }
    4746     2011255 :         if (ok == LOG_OK && !reused) {
    4747     1947817 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4748     1912680 :                         slot = s->segs->t->end;
    4749     1912680 :                         s->segs->t->end += cnt;
    4750             :                 } else {
    4751       35137 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4752             :                                 ok = LOG_ERR;
    4753             :                         } else {
    4754       35137 :                                 if (!s->segs->h)
    4755           0 :                                         s->segs->h = s->segs->t;
    4756       35137 :                                 slot = s->segs->t->start;
    4757             :                         }
    4758             :                 }
    4759             :         }
    4760     2011255 :         if (!locked)
    4761     1886248 :                 unlock_table(tr->store, t->base.id);
    4762             : 
    4763     2011255 :         if (ok == LOG_OK) {
    4764             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4765     2011261 :                 if (!in_transaction) {
    4766       49045 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4767       49045 :                         in_transaction = true;
    4768             :                 }
    4769     2011261 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4770     2010830 :                         tr->logchanges += (lng) cnt;
    4771     2011261 :                 *offset = slot;
    4772             :         }
    4773             :         return ok;
    4774             : }
    4775             : 
    4776             : /*
    4777             :  * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
    4778             :  * of the global transaction and mark the newly added storage slots unused on the global
    4779             :  * level but used on the local transaction level. Besides this the local transaction needs
    4780             :  * to update (and mark unused) any slot in between the old end and new slots.
    4781             :  * */
    4782             : static int
    4783     1916563 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4784             : {
    4785     1916563 :         storage *s;
    4786             : 
    4787             :         /* we have a single segment structure for each persistent table
    4788             :          * for temporary tables each has its own */
    4789     1916563 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4790             :                 return LOG_ERR;
    4791             : 
    4792     1916642 :         return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
    4793             : }
    4794             : 
    4795             : /* some tables cannot be updated concurrently (user/roles etc) */
    4796             : static int
    4797      125017 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4798             : {
    4799      125017 :         storage *s;
    4800      125017 :         int res = 0;
    4801             : 
    4802             :         /* we have a single segment structure for each persistent table
    4803             :          * for temporary tables each has its own */
    4804      125017 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4805             :                 /* TODO check for other inserts ! */
    4806             :                 return LOG_ERR;
    4807             : 
    4808      125017 :         lock_table(tr->store, t->base.id);
    4809      125017 :         if ((res = segments_conflict(tr, s->segs, 1))) {
    4810           4 :                 unlock_table(tr->store, t->base.id);
    4811           4 :                 return LOG_CONFLICT;
    4812             :         }
    4813      125013 :         res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
    4814      125013 :         unlock_table(tr->store, t->base.id);
    4815      125013 :         return res;
    4816             : }
    4817             : 
    4818             : static int
    4819       21012 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
    4820             : {
    4821       21012 :         storage *s;
    4822       21012 :         int res = 0;
    4823             : 
    4824       21012 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4825             :                 return LOG_ERR;
    4826             : 
    4827       21012 :         lock_table(tr->store, t->base.id);
    4828       21012 :         res = segments_conflict(tr, s->segs, uncommitted);
    4829       21012 :         unlock_table(tr->store, t->base.id);
    4830       21012 :         return res ? LOG_CONFLICT : LOG_OK;
    4831             : }
    4832             : 
    4833             : static size_t
    4834     1567942 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
    4835             : {
    4836     1567942 :         size_t cnt = 0;
    4837             : 
    4838     1814661 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
    4839             :                 ;
    4840             : 
    4841     3843146 :         for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
    4842     2275188 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
    4843      116255 :                         cnt += s->end - s->start;
    4844             :         }
    4845     1567958 :         return cnt;
    4846             : }
    4847             : 
    4848             : static BAT *
    4849     1567904 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
    4850             : {
    4851     1567904 :         lock_table(tr->store, t->base.id);
    4852     1567959 :         segment *s = S->segs->h;
    4853             :         /* step one no deletes -> dense range */
    4854     1567959 :         uint32_t cur = 0;
    4855     1567959 :         size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
    4856     1567959 :         if (!dnr) {
    4857     1463210 :                 unlock_table(tr->store, t->base.id);
    4858     1463186 :                 return BATdense(start, start, end-start);
    4859             :         }
    4860             : 
    4861      104749 :         BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
    4862      104749 :         if (!b) {
    4863           0 :                 unlock_table(tr->store, t->base.id);
    4864           0 :                 return NULL;
    4865             :         }
    4866             : 
    4867      104749 :         uint32_t *restrict dst = Tloc(b, 0);
    4868    58709724 :         for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
    4869    58666530 :                 if (s->end < start)
    4870      180100 :                         continue;
    4871    58486430 :                 if (s->start >= end)
    4872             :                         break;
    4873    58424875 :                 msk m = (SEG_IS_VALID(s, tr));
    4874    58424875 :                 size_t lnr = s->end-s->start;
    4875    58424875 :                 if (s->start < start)
    4876       15334 :                         lnr -= (start - s->start);
    4877    58424875 :                 if (s->end > end)
    4878        9959 :                         lnr -= s->end - end;
    4879             : 
    4880    58424875 :                 if (m) {
    4881      889411 :                         size_t used = pos&31, end = 32;
    4882      889411 :                         if (used) {
    4883      761682 :                                 if (lnr < (32-used))
    4884      486797 :                                         end = used + lnr;
    4885      761682 :                                 assert(end > used);
    4886      761682 :                                 cur |= ((1U << (end - used)) - 1) << used;
    4887      761682 :                                 lnr -= end - used;
    4888      761682 :                                 pos += end - used;
    4889      761682 :                                 if (end == 32) {
    4890      274885 :                                         *dst++ = cur;
    4891      274885 :                                         cur = 0;
    4892             :                                 }
    4893             :                         }
    4894      889411 :                         size_t full = lnr/32;
    4895      889411 :                         size_t rest = lnr%32;
    4896      889411 :                         if (full > 0) {
    4897      219270 :                                 memset(dst, ~0, full * sizeof(*dst));
    4898      219270 :                                 dst += full;
    4899      219270 :                                 lnr -= full * 32;
    4900      219270 :                                 pos += full * 32;
    4901             :                         }
    4902      889411 :                         if (rest > 0) {
    4903      381882 :                                 cur |= (1U << rest) - 1;
    4904      381882 :                                 lnr -= rest;
    4905      381882 :                                 pos += rest;
    4906             :                         }
    4907      889411 :                         assert(lnr==0);
    4908             :                 } else {
    4909    57535464 :                         size_t used = pos&31, end = 32;
    4910    57535464 :                         if (used) {
    4911    55736245 :                                 if (lnr < (32-used))
    4912    53840922 :                                         end = used + lnr;
    4913             : 
    4914    55736245 :                                 pos+= (end-used);
    4915    55736245 :                                 lnr-= (end-used);
    4916    55736245 :                                 if (end == 32) {
    4917     1895323 :                                         *dst++ = cur;
    4918     1895323 :                                         cur = 0;
    4919             :                                 }
    4920             :                         }
    4921    57535464 :                         size_t full = lnr/32;
    4922    57535464 :                         size_t rest = lnr%32;
    4923    57535464 :                         memset(dst, 0, full * sizeof(*dst));
    4924    57535464 :                         dst += full;
    4925    57535464 :                         lnr -= full * 32;
    4926    57535464 :                         pos += full * 32;
    4927    57535464 :                         pos+= rest;
    4928    57535464 :                         lnr-= rest;
    4929    57535464 :                         assert(lnr==0);
    4930             :                 }
    4931             :         }
    4932             : 
    4933      104749 :         unlock_table(tr->store, t->base.id);
    4934      104747 :         if (pos%32)
    4935      102142 :                 *dst=cur;
    4936      104747 :         BATsetcount(b, nr);
    4937      104749 :         bn = BATmaskedcands(start, nr, b, true);
    4938      104747 :         BBPreclaim(b);
    4939      104746 :         (void)pos;
    4940      104746 :         assert (pos == nr);
    4941             :         return bn;
    4942             : }
    4943             : 
    4944             : static void *                                   /* BAT * */
    4945     1573831 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
    4946             : {
    4947             :         /* with nr_of_parts - part_nr we can adjust parts */
    4948     1573831 :         storage *s = tab_timestamp_storage(tr, t);
    4949             : 
    4950     1573980 :         if (!s)
    4951             :                 return NULL;
    4952     1573980 :         size_t nr = segs_end(s->segs, tr, t);
    4953             : 
    4954     1574700 :         if (!nr)
    4955        6763 :                 return BATdense(0, 0, 0);
    4956             : 
    4957             :         /* compute proper part */
    4958     1567937 :         size_t part_size = nr/nr_of_parts;
    4959     1567937 :         size_t start = part_size * part_nr;
    4960     1567937 :         size_t end = start + part_size;
    4961     1567937 :         if (part_nr == (nr_of_parts-1))
    4962     1278876 :                 end = nr;
    4963     1567937 :         assert(end <= nr);
    4964     1567937 :         return segments2cands(s, tr, t, start, end);
    4965             : }
    4966             : 
    4967             : static int
    4968           5 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
    4969             : {
    4970           5 :         bool update_conflict = false;
    4971             : 
    4972           5 :         if (segments_in_transaction(tr, col->t))
    4973             :                 return LOG_CONFLICT;
    4974             : 
    4975           5 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    4976             : 
    4977           5 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    4978           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    4979           5 :         assert(d && d->cs.ts == tr->tid);
    4980           5 :         if (odelta != d)
    4981           5 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
    4982           5 :         if (d->cs.bid)
    4983           5 :                 temp_destroy(d->cs.bid);
    4984           5 :         if (d->cs.uibid)
    4985           5 :                 temp_destroy(d->cs.uibid);
    4986           5 :         if (d->cs.uvbid)
    4987           5 :                 temp_destroy(d->cs.uvbid);
    4988           5 :         bat_set_access(bn, BAT_READ);
    4989           5 :         d->cs.bid = temp_create(bn);
    4990           5 :         d->cs.uibid = 0;
    4991           5 :         d->cs.uvbid = 0;
    4992           5 :         d->cs.ucnt = 0;
    4993           5 :         d->cs.cleared = true;
    4994           5 :         d->cs.ts = tr->tid;
    4995           5 :         ATOMIC_INIT(&d->cs.refcnt, 1);
    4996           5 :         return LOG_OK;
    4997             : }
    4998             : 
    4999             : static int
    5000          58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
    5001             : {
    5002          58 :         bool update_conflict = false;
    5003             : 
    5004          58 :         if (segments_in_transaction(tr, col->t))
    5005             :                 return LOG_CONFLICT;
    5006             : 
    5007          58 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    5008             : 
    5009          58 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    5010           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    5011          58 :         assert(d && d->cs.ts == tr->tid);
    5012          58 :         assert(col->t->persistence != SQL_DECLARED_TABLE);
    5013          58 :         if (odelta != d)
    5014          58 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
    5015             : 
    5016          58 :         d->cs.st = st;
    5017          58 :         d->cs.cleared = true;
    5018          58 :         if (d->cs.bid)
    5019          58 :                 temp_destroy(d->cs.bid);
    5020          58 :         o = transfer_to_systrans(o);
    5021          58 :         if (o == NULL)
    5022             :                 return LOG_ERR;
    5023          58 :         bat_set_access(o, BAT_READ);
    5024          58 :         d->cs.bid = temp_create(o);
    5025          58 :         if (u) {
    5026          53 :                 if (d->cs.ebid)
    5027           0 :                         temp_destroy(d->cs.ebid);
    5028          53 :                 u = transfer_to_systrans(u);
    5029          53 :                 if (u == NULL)
    5030             :                         return LOG_ERR;
    5031          53 :                 d->cs.ebid = temp_create(u);
    5032             :         }
    5033             :         return LOG_OK;
    5034             : }
    5035             : 
    5036             : void
    5037         328 : bat_storage_init( store_functions *sf)
    5038             : {
    5039         328 :         sf->bind_col = &bind_col;
    5040         328 :         sf->bind_updates = &bind_updates;
    5041         328 :         sf->bind_updates_idx = &bind_updates_idx;
    5042         328 :         sf->bind_idx = &bind_idx;
    5043         328 :         sf->bind_cands = &bind_cands;
    5044             : 
    5045         328 :         sf->claim_tab = &claim_tab;
    5046         328 :         sf->key_claim_tab = &key_claim_tab;
    5047         328 :         sf->tab_validate = &tab_validate;
    5048             : 
    5049         328 :         sf->append_col = &append_col;
    5050         328 :         sf->append_idx = &append_idx;
    5051             : 
    5052         328 :         sf->update_col = &update_col;
    5053         328 :         sf->update_idx = &update_idx;
    5054             : 
    5055         328 :         sf->delete_tab = &delete_tab;
    5056             : 
    5057         328 :         sf->count_del = &count_del;
    5058         328 :         sf->count_col = &count_col;
    5059         328 :         sf->count_idx = &count_idx;
    5060         328 :         sf->dcount_col = &dcount_col;
    5061         328 :         sf->min_max_col = &min_max_col;
    5062         328 :         sf->set_stats_col = &set_stats_col;
    5063         328 :         sf->sorted_col = &sorted_col;
    5064         328 :         sf->unique_col = &unique_col;
    5065         328 :         sf->double_elim_col = &double_elim_col;
    5066         328 :         sf->col_stats = &col_stats;
    5067         328 :         sf->col_set_range = &col_set_range;
    5068         328 :         sf->col_not_null = &col_not_null;
    5069             : 
    5070         328 :         sf->col_dup = &col_dup;
    5071         328 :         sf->idx_dup = &idx_dup;
    5072         328 :         sf->del_dup = &del_dup;
    5073             : 
    5074         328 :         sf->create_col = &create_col;    /* create and add to change list */
    5075         328 :         sf->create_idx = &create_idx;
    5076         328 :         sf->create_del = &create_del;
    5077             : 
    5078         328 :         sf->destroy_col = &destroy_col;  /* free resources */
    5079         328 :         sf->destroy_idx = &destroy_idx;
    5080         328 :         sf->destroy_del = &destroy_del;
    5081             : 
    5082         328 :         sf->drop_col = &drop_col;                /* add drop to change list */
    5083         328 :         sf->drop_idx = &drop_idx;
    5084         328 :         sf->drop_del = &drop_del;
    5085             : 
    5086         328 :         sf->clear_table = &clear_table;
    5087             : 
    5088         328 :         sf->swap_bats = &swap_bats;
    5089         328 :         sf->col_compress = &col_compress;
    5090         328 : }
    5091             : 
    5092             : #if 0
    5093             : static lng
    5094             : log_get_nr_inserted(sql_column *fc, lng *offset)
    5095             : {
    5096             :         lng cnt = 0;
    5097             : 
    5098             :         if (!fc || GDKinmemory(0))
    5099             :                 return 0;
    5100             : 
    5101             :         if (fc->base.atime && fc->base.allocated) {
    5102             :                 sql_delta *fb = fc->data;
    5103             :                 BAT *ins = temp_descriptor(fb->cs.bid);
    5104             : 
    5105             :                 if (ins && BATcount(ins) > 0 && BATcount(ins) > ins->batInserted) {
    5106             :                         cnt = BATcount(ins) - ins->batInserted;
    5107             :                 }
    5108             :                 bat_destroy(ins);
    5109             :         }
    5110             :         return cnt;
    5111             : }
    5112             : 
    5113             : static lng
    5114             : log_get_nr_deleted(sql_table *ft, lng *offset)
    5115             : {
    5116             :         lng cnt = 0;
    5117             : 
    5118             :         if (!ft || GDKinmemory(0))
    5119             :                 return 0;
    5120             : 
    5121             :         if (ft->base.atime && ft->base.allocated) {
    5122             :                 storage *fdb = ft->data;
    5123             :                 BAT *db = temp_descriptor(fdb->cs.bid);
    5124             : 
    5125             :                 if (db && BATcount(db) > 0 && BATcount(db) > db->batInserted) {
    5126             :                         cnt = BATcount(db) - db->batInserted;
    5127             :                         *offset = db->batInserted;
    5128             :                 }
    5129             :                 bat_destroy(db);
    5130             :         }
    5131             :         return cnt;
    5132             : }
    5133             : #endif

Generated by: LCOV version 1.14