LCOV - code coverage report
Current view: top level - sql/storage/bat - bat_storage.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 2481 3149 78.8 %
Date: 2025-03-24 23:16:36 Functions: 157 165 95.2 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024, 2025 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "bat_storage.h"
      15             : #include "bat_utils.h"
      16             : #include "sql_string.h"
      17             : #include "matomic.h"
      18             : 
      19             : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
      20             : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
      21             : 
      22             : static int log_update_col( sql_trans *tr, sql_change *c);
      23             : static int log_update_idx( sql_trans *tr, sql_change *c);
      24             : static int log_update_del( sql_trans *tr, sql_change *c);
      25             : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      26             : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      27             : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      28             : static int log_create_col(sql_trans *tr, sql_change *change);
      29             : static int log_create_idx(sql_trans *tr, sql_change *change);
      30             : static int log_create_del(sql_trans *tr, sql_change *change);
      31             : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      32             : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      33             : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      34             : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
      35             : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
      36             : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
      37             : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
      38             : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
      39             : 
      40             : static lng merge_delta( sql_delta *obat);
      41             : 
      42             : /* valid
      43             :  * !deleted && VALID_4_READ(TS, tr)                             existing or newly created segment
      44             :  *  deleted && TS > tr->ts && OLDTS < tr->ts                deleted after current transaction
      45             :  */
      46             : 
      47             : #define VALID_4_READ(TS,tr) \
      48             :         (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
      49             : 
      50             : /* when changed, check if the old status is still valid */
      51             : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
      52             :                 (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
      53             : 
      54             : #define SEG_VALID_4_DELETE(seg,tr) \
      55             :         (!seg->deleted && VALID_4_READ(seg->ts, tr))
      56             : 
      57             : /* Delete (in current trans or by some other finished transaction, or re-used segment which used to be deleted */
      58             : #define SEG_IS_DELETED(seg,tr) \
      59             :         ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
      60             :          (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
      61             : 
      62             : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
      63             : #define SEG_IS_VALID(seg, tr) \
      64             :                 ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
      65             :                  (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
      66             : 
      67             : static inline BAT *
      68        5299 : transfer_to_systrans(BAT *b)
      69             : {
      70             :         /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
      71        5299 :         MT_lock_set(&b->theaplock);
      72        5299 :         if (VIEWtparent(b) || VIEWvtparent(b)) {
      73          29 :                 MT_lock_unset(&b->theaplock);
      74          29 :                 BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
      75          29 :                 BBPreclaim(b);
      76          29 :                 return bn;
      77             :         }
      78        5270 :         if (b->theap->farmid == TRANSIENT ||
      79          62 :                 (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
      80        4885 :                 QryCtx *qc = MT_thread_get_qry_ctx();
      81        4885 :                 if (qc) {
      82        2523 :                         if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
      83        2523 :                                 ATOMIC_SUB(&qc->datasize, b->theap->size);
      84        2523 :                                 b->theap->farmid = SYSTRANS;
      85        2523 :                                 b->batRole = SYSTRANS;
      86             :                         }
      87        2523 :                         if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
      88        1092 :                                 ATOMIC_SUB(&qc->datasize, b->tvheap->size);
      89        1092 :                                 b->tvheap->farmid = SYSTRANS;
      90             :                         }
      91             :                 }
      92             :         }
      93        5270 :         MT_lock_unset(&b->theaplock);
      94        5270 :         return b;
      95             : }
      96             : 
      97             : static void
      98    28799722 : lock_table(sqlstore *store, sqlid id)
      99             : {
     100    28799722 :         MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     101    28889230 : }
     102             : 
     103             : static void
     104    28889076 : unlock_table(sqlstore *store, sqlid id)
     105             : {
     106    28889076 :         MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     107    28886362 : }
     108             : 
     109             : static void
     110    21287348 : lock_column(sqlstore *store, sqlid id)
     111             : {
     112    21287348 :         MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     113    21310616 : }
     114             : 
     115             : static void
     116    21308384 : unlock_column(sqlstore *store, sqlid id)
     117             : {
     118    21308384 :         MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     119    21320715 : }
     120             : 
     121             : static void
     122      115247 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     123             : {
     124      115247 :         assert(cleanup);
     125      115247 :         trans_add(tr, dup_base(b), data, cleanup, commit, log);
     126      115245 : }
     127             : 
     128             : static void
     129      132848 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     130             : {
     131      132848 :         assert(cleanup);
     132      132848 :         dup_base(&t->base);
     133      132846 :         trans_add(tr, b, data, cleanup, commit, log);
     134      132838 : }
     135             : 
     136             : static int
     137       76496 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
     138             : {
     139       76496 :         segment *s = change->data;
     140             : 
     141       76496 :         if (s->ts <= oldest) {
     142       34571 :                 while(s) {
     143       21130 :                         segment *n = s->prev;
     144       21130 :                         _DELETE(s);
     145       21130 :                         s = n;
     146             :                 }
     147       13441 :                 sqlstore *store = Store;
     148       13441 :                 table_destroy(store, (sql_table*)change->obj);
     149       13441 :                 return 1;
     150             :         }
     151             :         return LOG_OK;
     152             : }
     153             : 
     154             : static void
     155       21130 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
     156             : {
     157             :         /* we can only be accessed by anything older then commit_ts */
     158       21130 :         if (c->cleanup == &tc_gc_seg)
     159        7689 :                 s->prev = c->data;
     160             :         else
     161       13441 :                 c->cleanup = &tc_gc_seg;
     162       21130 :         c->data = s;
     163       21130 :         s->ts = commit_ts;
     164       16896 : }
     165             : 
     166             : static segment *
     167       89150 : new_segment(segment *o, sql_trans *tr, size_t cnt)
     168             : {
     169       89150 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     170             : 
     171       89152 :         assert(tr);
     172       89152 :         if (n) {
     173       89152 :                 *n = (segment) {
     174       89152 :                         .ts = tr->tid,
     175             :                         .oldts = 0,
     176             :                         .deleted = false,
     177             :                         .start = 0,
     178             :                         .end = cnt,
     179             :                         .next = ATOMIC_PTR_VAR_INIT(NULL),
     180             :                         .prev = NULL,
     181             :                 };
     182       89152 :                 if (o) {
     183       36368 :                         n->start += o->end;
     184       36368 :                         n->end += o->end;
     185       36368 :                         ATOMIC_PTR_SET(&o->next, n);
     186             :                 }
     187             :         }
     188       89152 :         return n;
     189             : }
     190             : 
     191             : static segment *
     192       96011 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
     193             : {
     194       96011 :         assert(tr);
     195       96011 :         if (o->start == start && o->end == start+cnt) {
     196       11375 :                 assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
     197       11375 :                 o->oldts = o->ts;
     198       11375 :                 o->ts = tr->tid;
     199       11375 :                 o->deleted = deleted;
     200       11375 :                 return o;
     201             :         }
     202       84636 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     203             : 
     204       84636 :         if (!n)
     205             :                 return NULL;
     206       84636 :         n->prev = NULL;
     207             : 
     208       84636 :         if (o->ts == tr->tid) {
     209        5921 :                 n->oldts = 0;
     210        5921 :                 n->ts = 1;
     211        5921 :                 n->deleted = true;
     212             :         } else {
     213       78715 :                 n->oldts = o->ts;
     214       78715 :                 n->ts = tr->tid;
     215       78715 :                 n->deleted = deleted;
     216             :         }
     217       84636 :         if (start == o->start) {
     218             :                 /* 2-way split: o remains latter part of segment, new one is
     219             :                  * inserted before */
     220       68203 :                 n->start = o->start;
     221       68203 :                 n->end = n->start + cnt;
     222       68203 :                 ATOMIC_PTR_INIT(&n->next, o);
     223       68203 :                 if (segs->h == o)
     224         464 :                         segs->h = n;
     225       68203 :                 if (p)
     226       67739 :                         ATOMIC_PTR_SET(&p->next, n);
     227       68203 :                 o->start = n->end;
     228       16433 :         } else if (start+cnt == o->end) {
     229             :                 /* 2-way split: o remains first part of segment, new one is
     230             :                  * added after */
     231        5801 :                 n->start = o->end - cnt;
     232        5801 :                 n->end = o->end;
     233        5801 :                 ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
     234        5801 :                 ATOMIC_PTR_SET(&o->next, n);
     235        5801 :                 if (segs->t == o)
     236         901 :                         segs->t = n;
     237        5801 :                 o->end = n->start;
     238             :         } else {
     239             :                 /* 3-way split: o remains first part of segment, two new ones
     240             :                  * are added after */
     241       10632 :                 segment *n2 = GDKmalloc(sizeof(segment));
     242       10632 :                 if (n2 == NULL) {
     243           0 :                         GDKfree(n);
     244           0 :                         return NULL;
     245             :                 }
     246       10632 :                 ATOMIC_PTR_INIT(&n->next, n2);
     247       10632 :                 n->start = start;
     248       10632 :                 n->end = start + cnt;
     249       10632 :                 *n2 = *o;
     250       10632 :                 ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
     251       10632 :                 n2->start = n->end;
     252       10632 :                 n2->prev = NULL;
     253       10632 :                 if (segs->t == o)
     254        4477 :                         segs->t = n2;
     255       10632 :                 ATOMIC_PTR_SET(&o->next, n);
     256       10632 :                 o->end = start;
     257             :         }
     258             :         return n;
     259             : }
     260             : 
     261             : static void
     262        4302 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
     263             : {
     264        4302 :         segment *cur = segs->h, *seg = NULL;
     265       18313 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     266       14011 :                 if (cur->ts == tr->tid) { /* revert */
     267        4799 :                         cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
     268        4799 :                         cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
     269        4799 :                         cur->oldts = 0;
     270             :                 }
     271       14011 :                 if (cur->ts <= oldest) { /* possibly merge range */
     272       12940 :                         if (!seg) { /* skip first */
     273             :                                 seg = cur;
     274        8638 :                         } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
     275             :                                 /* merge with previous */
     276        4234 :                                 seg->end = cur->end;
     277        4234 :                                 ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     278        4234 :                                 if (cur == segs->t)
     279        2857 :                                         segs->t = seg;
     280        4234 :                                 mark4destroy(cur, change, store_get_timestamp(tr->store));
     281        4234 :                                 cur = seg;
     282             :                         } else {
     283             :                                 seg = cur; /* begin of new merge */
     284             :                         }
     285             :                 }
     286             :         }
     287        4302 : }
     288             : 
     289             : static size_t
     290      104773 : segs_end_include_deleted( segments *segs, sql_trans *tr)
     291             : {
     292      104773 :         size_t cnt = 0;
     293      104773 :         segment *s = segs->h, *l = NULL;
     294             : 
     295      480103 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     296      375330 :                 if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
     297             :                                 l = s;
     298             :         }
     299      104773 :         if (l)
     300      104766 :                 cnt = l->end;
     301      104773 :         return cnt;
     302             : }
     303             : 
     304             : static int
     305      104773 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
     306             : {
     307             :         /* set bits correctly */
     308      104773 :         BAT *b = temp_descriptor(cs->bid);
     309             : 
     310      104773 :         if (!b)
     311             :                 return LOG_ERR;
     312      104773 :         segment *s = segs->h;
     313             : 
     314      104773 :         size_t nr = segs_end_include_deleted(segs, tr);
     315      104773 :         size_t rounded_nr = ((nr+31)&~31);
     316      104773 :         if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
     317           0 :                 bat_destroy(b);
     318           0 :                 return LOG_ERR;
     319             :         }
     320             : 
     321             :         /* disable all properties here */
     322      104773 :         MT_lock_set(&b->theaplock);
     323      104773 :         b->tsorted = false;
     324      104773 :         b->trevsorted = false;
     325      104773 :         b->tnosorted = 0;
     326      104773 :         b->tnorevsorted = 0;
     327      104773 :         b->tseqbase = oid_nil;
     328      104773 :         b->tkey = false;
     329      104773 :         b->tnokey[0] = 0;
     330      104773 :         b->tnokey[1] = 0;
     331      104773 :         BUN cnt = BATcount(b);
     332             : 
     333      104773 :         uint32_t *restrict dst;
     334      421885 :         for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
     335      360659 :                 if (s->start >= nr)
     336             :                         break;
     337      317112 :                 if (s->ts == tr->tid && s->end != s->start) {
     338      151852 :                         if (cnt < s->start) { /* first mark as deleted ! */
     339        2646 :                                 size_t lnr = s->start-cnt;
     340        2646 :                                 size_t pos = cnt;
     341        2646 :                                 dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     342        2646 :                                 uint32_t cur = 0;
     343             : 
     344        2646 :                                 size_t used = pos&31, end = 32;
     345        2646 :                                 if (used) {
     346        2564 :                                         if (lnr < (32-used))
     347        2436 :                                                 end = used + lnr;
     348        2564 :                                         assert(end > used);
     349        2564 :                                         cur |= ((1U << (end - used)) - 1) << used;
     350        2564 :                                         lnr -= end - used;
     351        2564 :                                         *dst++ |= cur;
     352        2564 :                                         cur = 0;
     353             :                                 }
     354        2646 :                                 size_t full = lnr/32;
     355        2646 :                                 size_t rest = lnr%32;
     356        2646 :                                 if (full > 0) {
     357           7 :                                         memset(dst, ~0, full * sizeof(*dst));
     358           7 :                                         dst += full;
     359           7 :                                         lnr -= full * 32;
     360             :                                 }
     361        2646 :                                 if (rest > 0) {
     362         137 :                                         cur |= (1U << rest) - 1;
     363         137 :                                         lnr -= rest;
     364         137 :                                         *dst |= cur;
     365             :                                 }
     366        2646 :                                 assert(lnr==0);
     367             :                         }
     368      151852 :                         size_t lnr = s->end-s->start;
     369      151852 :                         size_t pos = s->start;
     370      151852 :                         dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     371      151852 :                         uint32_t cur = 0;
     372      151852 :                         size_t used = pos&31, end = 32;
     373      151852 :                         if (used) {
     374      113878 :                                 if (lnr < (32-used))
     375      108543 :                                         end = used + lnr;
     376      113878 :                                 assert(end > used);
     377      113878 :                                 cur |= ((1U << (end - used)) - 1) << used;
     378      113878 :                                 lnr -= end - used;
     379      113878 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     380      113878 :                                 dst++;
     381      113878 :                                 cur = 0;
     382             :                         }
     383      151852 :                         size_t full = lnr/32;
     384      151852 :                         size_t rest = lnr%32;
     385      151852 :                         if (full > 0) {
     386        3623 :                                 memset(dst, s->deleted?~0:0, full * sizeof(*dst));
     387        3623 :                                 dst += full;
     388        3623 :                                 lnr -= full * 32;
     389             :                         }
     390      151852 :                         if (rest > 0) {
     391       40697 :                                 cur |= (1U << rest) - 1;
     392       40697 :                                 lnr -= rest;
     393       40697 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     394             :                         }
     395      151852 :                         assert(lnr==0);
     396      151852 :                         if (cnt < s->end)
     397      317112 :                                 cnt = s->end;
     398             :                 }
     399             :         }
     400      104773 :         if (nr > BATcount(b)) {
     401       63597 :                 BATsetcount(b, nr);
     402             :         }
     403      104773 :         b->theap->dirty = true;
     404      104773 :         MT_lock_unset(&b->theaplock);
     405             : 
     406      104773 :         bat_destroy(b);
     407      104773 :         return LOG_OK;
     408             : }
     409             : 
     410             : /* TODO return LOG_OK/ERR */
     411             : static void
     412      104799 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
     413             : {
     414      104799 :         sqlstore* store = tr->store;
     415      104799 :         segment *cur = s->segs->h, *seg = NULL;
     416      480201 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     417      375402 :                 if (cur->ts == tr->tid) {
     418      166492 :                         if (!cur->deleted)
     419       93579 :                                 cur->oldts = 0;
     420      166492 :                         cur->ts = commit_ts;
     421             :                 }
     422      375402 :                 if (!seg) {
     423             :                         /* first segment */
     424             :                         seg = cur;
     425             :                 }
     426      270603 :                 else if (seg->ts < TRANSACTION_ID_BASE) {
     427             :                         /* possible merge since both deleted flags are equal */
     428      253851 :                         if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
     429      186625 :                                 int merge = 1;
     430      186625 :                                 node *n = store->active->h;
     431      573390 :                                 for (int i = 0; i < store->active->cnt; i++, n = n->next) {
     432      469117 :                                         sql_trans* other = ((sql_trans*)n->data);
     433      469117 :                                         ulng active = other->ts;
     434      469117 :                                         if(other->active == 2)
     435       21894 :                                                 continue; /* pretend that another recently committed transaction is no longer active */
     436      447223 :                                         if (active == tr->ts)
     437      132428 :                                                 continue; /* pretend that committing transaction has already committed and is no longer active */
     438      314795 :                                         if (seg->ts < active && cur->ts < active)
     439             :                                                 break;
     440      302831 :                                         if (seg->ts > active && cur->ts > active)
     441      232443 :                                                 continue;
     442             : 
     443       70388 :                                         assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
     444             :                                         /* cannot safely merge since there is an active transaction between the segments */
     445             :                                         merge = false;
     446             :                                         break;
     447             :                                 }
     448             :                                 /* merge segments */
     449      232474 :                                 if (merge) {
     450      116237 :                                         seg->end = cur->end;
     451      116237 :                                         ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     452      116237 :                                         if (cur == s->segs->t)
     453       28190 :                                                 s->segs->t = seg;
     454      116237 :                                         if (commit_ts == oldest) {
     455       99341 :                                                 _DELETE(cur);
     456             :                                         } else
     457       33792 :                                                 mark4destroy(cur, change, commit_ts);
     458      116237 :                                         cur = seg;
     459      116237 :                                         continue;
     460             :                                 }
     461             :                         }
     462             :                 }
     463             :                 seg = cur;
     464             :         }
     465      104799 : }
     466             : 
     467             : static int
     468     2213459 : segments_in_transaction(sql_trans *tr, sql_table *t)
     469             : {
     470     2213459 :         storage *s = ATOMIC_PTR_GET(&t->data);
     471     2213459 :         segment *seg = s->segs->h;
     472             : 
     473     2213459 :         if (seg && s->segs->t->ts == tr->tid)
     474             :                 return 1;
     475      602712 :         for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
     476      497151 :                 if (seg->ts == tr->tid)
     477             :                         return 1;
     478             :         }
     479             :         return 0;
     480             : }
     481             : 
     482             : static size_t
     483    20857376 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
     484             : {
     485    20857376 :         size_t cnt = 0;
     486             : 
     487             :         /* because a table can grow rows over the time a transaction is running, we need to find the last valid segment, to
     488             :          * keep all of the parts aligned */
     489    20857376 :         lock_table(tr->store, table->base.id);
     490    20924814 :         segment *s = segs->h, *l = NULL;
     491             : 
     492    20924814 :         if (segs->t && SEG_IS_VALID(segs->t, tr))
     493    17198856 :                 l = s = segs->t;
     494             : 
     495   220014372 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     496   199089755 :                 if (SEG_IS_VALID(s, tr))
     497             :                                 l = s;
     498             :         }
     499    20924617 :         if (l)
     500    20906199 :                 cnt = l->end;
     501    20924617 :         unlock_table(tr->store, table->base.id);
     502    20922993 :         return cnt;
     503             : }
     504             : 
     505             : static segments *
     506       52782 : new_segments(sql_trans *tr, size_t cnt)
     507             : {
     508       52782 :         segments *n = (segments*)GDKmalloc(sizeof(segments));
     509             : 
     510       52784 :         if (n) {
     511       52784 :                 n->nr_reused = 0;
     512       52784 :                 n->h = n->t = new_segment(NULL, tr, cnt);
     513       52784 :                 if (!n->h) {
     514           0 :                         GDKfree(n);
     515           0 :                         return NULL;
     516             :                 }
     517       52784 :                 sql_ref_init(&n->r);
     518             :         }
     519             :         return n;
     520             : }
     521             : 
     522             : static sql_delta *
     523    34939879 : timestamp_delta( sql_trans *tr, sql_delta *d)
     524             : {
     525    34993622 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     526       53743 :                 d = d->next;
     527    34946493 :         return d;
     528             : }
     529             : 
     530             : static sql_delta *
     531    34775257 : col_timestamp_delta( sql_trans *tr, sql_column *c)
     532             : {
     533    34775257 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
     534             : }
     535             : 
     536             : static sql_delta *
     537       27332 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
     538             : {
     539       27332 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
     540             : }
     541             : 
     542             : static storage *
     543    21341504 : timestamp_storage( sql_trans *tr, storage *d)
     544             : {
     545    21341504 :         if (!d)
     546             :                 return NULL;
     547    21412708 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     548       71204 :                 d = d->next;
     549             :         return d;
     550             : }
     551             : 
     552             : static storage *
     553    21324623 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
     554             : {
     555    21324623 :         return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
     556             : }
     557             : 
     558             : static sql_delta*
     559       21406 : delta_dup(sql_delta *d)
     560             : {
     561       21406 :         ATOMIC_INC(&d->cs.refcnt);
     562       21406 :         return d;
     563             : }
     564             : 
     565             : static void *
     566       19590 : col_dup(sql_column *c)
     567             : {
     568       19590 :         return delta_dup(ATOMIC_PTR_GET(&c->data));
     569             : }
     570             : 
     571             : static void *
     572        3103 : idx_dup(sql_idx *i)
     573             : {
     574        3103 :         if (!ATOMIC_PTR_GET(&i->data))
     575             :                 return NULL;
     576        1816 :         return delta_dup(ATOMIC_PTR_GET(&i->data));
     577             : }
     578             : 
     579             : static storage*
     580        1607 : storage_dup(storage *d)
     581             : {
     582        1607 :         ATOMIC_INC(&d->cs.refcnt);
     583        1607 :         return d;
     584             : }
     585             : 
     586             : static void *
     587        1607 : del_dup(sql_table *t)
     588             : {
     589        1607 :         return storage_dup(ATOMIC_PTR_GET(&t->data));
     590             : }
     591             : 
     592             : static size_t
     593          17 : count_inserts( segment *s, sql_trans *tr)
     594             : {
     595          17 :         size_t cnt = 0;
     596             : 
     597          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     598          55 :                 if (!s->deleted && s->ts == tr->tid)
     599           4 :                         cnt += s->end - s->start;
     600             :         }
     601          17 :         return cnt;
     602             : }
     603             : 
     604             : static size_t
     605      879870 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
     606             : {
     607      879870 :         size_t cnt = 0;
     608             : 
     609     1031095 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
     610             :                 ;
     611             : 
     612     5614068 :         for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
     613     4734224 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
     614     1628529 :                         cnt += s->end - s->start;
     615             :         }
     616      879844 :         return cnt;
     617             : }
     618             : 
     619             : static size_t
     620          17 : count_deletes( segment *s, sql_trans *tr)
     621             : {
     622          17 :         size_t cnt = 0;
     623             : 
     624          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     625          55 :                 if (SEG_IS_DELETED(s, tr))
     626          17 :                         cnt += s->end - s->start;
     627             :         }
     628          17 :         return cnt;
     629             : }
     630             : 
     631             : #define CNT_ACTIVE 10
     632             : 
     633             : static size_t
     634    19573890 : count_col(sql_trans *tr, sql_column *c, int access)
     635             : {
     636    19573890 :         storage *d;
     637    19573890 :         sql_delta *ds;
     638             : 
     639    19573890 :         if (!isTable(c->t))
     640             :                 return 0;
     641    19573890 :         d = tab_timestamp_storage(tr, c->t);
     642    19582515 :         ds = col_timestamp_delta(tr, c);
     643    19605764 :         if (!d ||!ds)
     644             :                 return 0;
     645    19605764 :         if (access == 2)
     646      491250 :                 return ds?ds->cs.ucnt:0;
     647    19114514 :         if (access == 1)
     648          17 :                 return count_inserts(d->segs->h, tr);
     649    19114497 :         if (access == QUICK)
     650           0 :                 return d->segs->t?d->segs->t->end:0;
     651    19114497 :         if (access == CNT_ACTIVE) {
     652      879719 :                 size_t cnt = segs_end(d->segs, tr, c->t);
     653      880003 :                 lock_table(tr->store, c->t->base.id);
     654      879900 :                 cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
     655      879866 :                 unlock_table(tr->store, c->t->base.id);
     656      879866 :                 return cnt;
     657             :         }
     658    18234778 :         return segs_end(d->segs, tr, c->t);
     659             : }
     660             : 
     661             : static size_t
     662       23334 : count_idx(sql_trans *tr, sql_idx *i, int access)
     663             : {
     664       23334 :         storage *d;
     665       23334 :         sql_delta *ds;
     666             : 
     667       23334 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
     668        6719 :                 return 0;
     669       16614 :         d = tab_timestamp_storage(tr, i->t);
     670       16641 :         ds = idx_timestamp_delta(tr, i);
     671       16653 :         if (!d || !ds)
     672             :                 return 0;
     673       16653 :         if (access == 2)
     674        2871 :                 return ds?ds->cs.ucnt:0;
     675       13782 :         if (access == 1)
     676           0 :                 return count_inserts(d->segs->h, tr);
     677       13782 :         if (access == QUICK)
     678        3529 :                 return d->segs->t?d->segs->t->end:0;
     679       10253 :         return segs_end(d->segs, tr, i->t);
     680             : }
     681             : 
     682             : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
     683             : static BAT *
     684    15056756 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
     685             : {
     686    15056756 :         BAT *b;
     687             : 
     688    15056756 :         assert(access == RD_UPD_ID || access == RD_UPD_VAL);
     689             :         /* returns the updates for cs */
     690    15056756 :         if (cs->uibid && cs->uvbid && cs->ucnt) {
     691       14444 :                 if (access == RD_UPD_ID) {
     692        8683 :                         if (!(b = temp_descriptor(cs->uibid)))
     693             :                                 return NULL;
     694        8682 :                         if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
     695        2317 :                            (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
     696        6365 :                                         oid nil = oid_nil;
     697             :                                         /* less then cnt */
     698        6365 :                                         BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false, false);
     699        6366 :                                         if (!s) {
     700           0 :                                                 bat_destroy(b);
     701           0 :                                                 return NULL;
     702             :                                         }
     703             : 
     704        6366 :                                         BAT *nb = BATproject(s, b);
     705        6366 :                                         bat_destroy(s);
     706        6366 :                                         bat_destroy(b);
     707        6366 :                                         b = nb;
     708             :                         }
     709             :                 } else {
     710        5761 :                         b = temp_descriptor(cs->uvbid);
     711             :                 }
     712             :         } else {
     713    25071816 :                 b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
     714             :         }
     715             :         return b;
     716             : }
     717             : 
     718             : static BAT *
     719           0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
     720             : {
     721           0 :         int err = 0;
     722           0 :         BAT *uv = *UV;
     723           0 :         BUN cnt = BATcount(ui)+BATcount(oi);
     724           0 :         BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
     725           0 :         BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
     726             : 
     727           0 :         if (!ni || (uv && !nv)) {
     728           0 :                 bat_destroy(ni);
     729           0 :                 bat_destroy(nv);
     730           0 :                 bat_destroy(ui);
     731           0 :                 bat_destroy(uv);
     732           0 :                 bat_destroy(oi);
     733           0 :                 bat_destroy(ov);
     734           0 :                 return NULL;
     735             :         }
     736           0 :         BATiter uvi;
     737           0 :         BATiter ovi;
     738             : 
     739           0 :         if (uv) {
     740           0 :                 uvi = bat_iterator(uv);
     741           0 :                 ovi = bat_iterator(ov);
     742             :         }
     743             : 
     744             :         /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
     745           0 :         BUN uip = 0, uie = BATcount(ui);
     746           0 :         BUN oip = 0, oie = BATcount(oi);
     747             : 
     748           0 :         oid uiseqb = ui->tseqbase;
     749           0 :         oid oiseqb = oi->tseqbase;
     750           0 :         oid *uipt = NULL, *oipt = NULL;
     751           0 :         BATiter uii = bat_iterator(ui);
     752           0 :         BATiter oii = bat_iterator(oi);
     753           0 :         if (!BATtdensebi(&uii))
     754           0 :                 uipt = uii.base;
     755           0 :         if (!BATtdensebi(&oii))
     756           0 :                 oipt = oii.base;
     757           0 :         while (uip < uie && oip < oie && !err) {
     758           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     759           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     760             : 
     761           0 :                 if (uiid <= oiid) {
     762           0 :                         if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     763           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     764             :                                 err = 1;
     765           0 :                         uip++;
     766           0 :                         if (uiid == oiid)
     767           0 :                                 oip++;
     768             :                 } else { /* uiid > oiid */
     769           0 :                         if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     770           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     771             :                                 err = 1;
     772           0 :                         oip++;
     773             :                 }
     774             :         }
     775           0 :         while (uip < uie && !err) {
     776           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     777           0 :                 if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     778           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     779             :                         err = 1;
     780           0 :                 uip++;
     781             :         }
     782           0 :         while (oip < oie && !err) {
     783           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     784           0 :                 if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     785           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     786             :                         err = 1;
     787           0 :                 oip++;
     788             :         }
     789           0 :         if (uv) {
     790           0 :                 bat_iterator_end(&uvi);
     791           0 :                 bat_iterator_end(&ovi);
     792             :         }
     793           0 :         bat_iterator_end(&uii);
     794           0 :         bat_iterator_end(&oii);
     795           0 :         bat_destroy(ui);
     796           0 :         bat_destroy(uv);
     797           0 :         bat_destroy(oi);
     798           0 :         bat_destroy(ov);
     799           0 :         if (!err) {
     800           0 :                 if (nv)
     801           0 :                         *UV = nv;
     802           0 :                 return ni;
     803             :         }
     804           0 :         *UV = NULL;
     805           0 :         bat_destroy(ni);
     806           0 :         bat_destroy(nv);
     807           0 :         return NULL;
     808             : }
     809             : 
     810             : static sql_delta *
     811    10041181 : older_delta( sql_delta *d, sql_trans *tr)
     812             : {
     813    10041181 :         sql_delta *o = d->next;
     814             : 
     815    10050290 :         while (o && !o->cs.merged) {
     816        9104 :                 if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     817             :                         break;
     818             :                 else
     819        9109 :                         o = o->next;
     820             :         }
     821    10041186 :         if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     822           0 :                 return o;
     823             :         return NULL;
     824             : }
     825             : 
     826             : static BAT *
     827    10038034 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
     828             : {
     829    10038034 :         assert(tr->active);
     830    10038034 :         sql_delta *o = NULL;
     831    10038034 :         BAT *ui = NULL, *uv = NULL;
     832             : 
     833    10038034 :         if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
     834             :                 return NULL;
     835    10040920 :         if (access == RD_UPD_VAL) {
     836     5022068 :                 if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
     837           0 :                         bat_destroy(ui);
     838           0 :                         return NULL;
     839             :                 }
     840             :         }
     841    10041121 :         while ((o = older_delta(d, tr)) != NULL) {
     842           0 :                 BAT *oui = NULL, *ouv = NULL;
     843           0 :                 if (!oui)
     844           0 :                         oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
     845           0 :                 if (access == RD_UPD_VAL)
     846           0 :                         ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
     847           0 :                 if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
     848           0 :                         bat_destroy(ui);
     849           0 :                         bat_destroy(uv);
     850           0 :                         bat_destroy(oui);
     851           0 :                         bat_destroy(ouv);
     852           0 :                         return NULL;
     853             :                 }
     854           0 :                 if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
     855             :                         return NULL;
     856             :                 d = o;
     857             :         }
     858    10041220 :         if (uv) {
     859     5022276 :                 bat_destroy(ui);
     860     5022276 :                 return uv;
     861             :         }
     862             :         return ui;
     863             : }
     864             : 
     865             : static BAT *
     866        2842 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
     867             : {
     868        2842 :         lock_column(tr->store, c->base.id);
     869        2842 :         sql_delta *d = col_timestamp_delta(tr, c);
     870        2842 :         int type = c->type.type->localtype;
     871             : 
     872        2842 :         if (!d) {
     873           0 :                 unlock_column(tr->store, c->base.id);
     874           0 :                 return NULL;
     875             :         }
     876        2842 :         if (d->cs.st == ST_DICT) {
     877           0 :                 BAT *b = quick_descriptor(d->cs.bid);
     878             : 
     879           0 :                 type = b->ttype;
     880             :         }
     881        2842 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     882        2842 :         unlock_column(tr->store, c->base.id);
     883        2842 :         return bn;
     884             : }
     885             : 
     886             : static BAT *
     887           0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
     888             : {
     889           0 :         lock_column(tr->store, i->base.id);
     890           0 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     891           0 :         sql_delta *d = idx_timestamp_delta(tr, i);
     892             : 
     893           0 :         if (!d) {
     894           0 :                 unlock_column(tr->store, i->base.id);
     895           0 :                 return NULL;
     896             :         }
     897           0 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     898           0 :         unlock_column(tr->store, i->base.id);
     899           0 :         return bn;
     900             : }
     901             : 
     902             : static BAT *
     903    10200581 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
     904             : {
     905    10200581 :         BAT *b;
     906             : 
     907    10200581 :         assert(access == RDONLY || access == QUICK || access == RD_EXT);
     908    10200581 :         assert(cs != NULL);
     909    10200581 :         if (access == QUICK)
     910      173355 :                 return quick_descriptor(cs->bid);
     911    10027226 :         if (access == RD_EXT)
     912         857 :                 return temp_descriptor(cs->ebid);
     913    10026369 :         assert(cs->bid);
     914    10026369 :         b = temp_descriptor(cs->bid);
     915    10027451 :         if (b == NULL)
     916             :                 return NULL;
     917    10027451 :         assert(b->batRestricted == BAT_READ);
     918             :         /* return slice */
     919    10027451 :         BAT *s = BATslice(b, 0, cnt);
     920    10015468 :         bat_destroy(b);
     921    10015468 :         return s;
     922             : }
     923             : 
     924             : static int
     925     5018215 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
     926             : {
     927     5018215 :         lock_column(tr->store, c->base.id);
     928     5017919 :         size_t cnt = count_col(tr, c, 0);
     929     5019338 :         sql_delta *d = col_timestamp_delta(tr, c);
     930     5018797 :         int type = c->type.type->localtype;
     931             : 
     932     5018797 :         if (!d) {
     933           0 :                 unlock_column(tr->store, c->base.id);
     934           0 :                 return LOG_ERR;
     935             :         }
     936     5018797 :         if (d->cs.st == ST_DICT) {
     937           2 :                 BAT *b = quick_descriptor(d->cs.bid);
     938             : 
     939           2 :                 type = b->ttype;
     940             :         }
     941             : 
     942     5018797 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     943     5019317 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     944             : 
     945     5019283 :         unlock_column(tr->store, c->base.id);
     946             : 
     947     5019021 :         if (*ui == NULL || *uv == NULL) {
     948           0 :                 bat_destroy(*ui);
     949           0 :                 bat_destroy(*uv);
     950           0 :                 return LOG_ERR;
     951             :         }
     952             :         return LOG_OK;
     953             : }
     954             : 
     955             : static int
     956          23 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
     957             : {
     958          23 :         lock_column(tr->store, i->base.id);
     959          23 :         size_t cnt = count_idx(tr, i, 0);
     960          23 :         sql_delta *d = idx_timestamp_delta(tr, i);
     961          23 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     962             : 
     963          23 :         if (!d) {
     964           0 :                 unlock_column(tr->store, i->base.id);
     965           0 :                 return LOG_ERR;
     966             :         }
     967             : 
     968          23 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     969          23 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     970             : 
     971          23 :         unlock_column(tr->store, i->base.id);
     972             : 
     973          23 :         if (*ui == NULL || *uv == NULL) {
     974           0 :                 bat_destroy(*ui);
     975           0 :                 bat_destroy(*uv);
     976           0 :                 return LOG_ERR;
     977             :         }
     978             :         return LOG_OK;
     979             : }
     980             : 
     981             : static void *                                   /* BAT * */
     982    10188008 : bind_col(sql_trans *tr, sql_column *c, int access)
     983             : {
     984    10188008 :         assert(access == QUICK || tr->active);
     985    10188008 :         if (!isTable(c->t))
     986             :                 return NULL;
     987    10188008 :         sql_delta *d = col_timestamp_delta(tr, c);
     988    10190053 :         if (!d)
     989             :                 return NULL;
     990    10190053 :         size_t cnt = count_col(tr, c, 0);
     991    10193827 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
     992        2842 :                 return bind_ucol(tr, c, access, cnt);
     993    10190985 :         BAT *b = cs_bind_bat( &d->cs, access, cnt);
     994    10185428 :         assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
     995             :         return b;
     996             : }
     997             : 
     998             : static void *                                   /* BAT * */
     999       10699 : bind_idx(sql_trans *tr, sql_idx * i, int access)
    1000             : {
    1001       10699 :         assert(access == QUICK || tr->active);
    1002       10699 :         if (!isTable(i->t))
    1003             :                 return NULL;
    1004       10699 :         sql_delta *d = idx_timestamp_delta(tr, i);
    1005       10701 :         if (!d)
    1006             :                 return NULL;
    1007       10701 :         size_t cnt = count_idx(tr, i, 0);
    1008       10706 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
    1009           0 :                 return bind_uidx(tr, i, access, cnt);
    1010       10706 :         return cs_bind_bat( &d->cs, access, cnt);
    1011             : }
    1012             : 
    1013             : static int
    1014        4152 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
    1015             : {
    1016        4152 :         if (!cs->uibid) {
    1017           0 :                 cs->uibid = e_bat(TYPE_oid);
    1018           0 :                 if (cs->uibid == BID_NIL)
    1019             :                         return LOG_ERR;
    1020             :         }
    1021        4152 :         if (!cs->uvbid) {
    1022           0 :                 BAT *cur = quick_descriptor(cs->bid);
    1023           0 :                 if (!cur)
    1024             :                         return LOG_ERR;
    1025           0 :                 int type = cur->ttype;
    1026           0 :                 cs->uvbid = e_bat(type);
    1027           0 :                 if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1028             :                         return LOG_ERR;
    1029             :         }
    1030        4152 :         BAT *ui = temp_descriptor(cs->uibid);
    1031        4152 :         BAT *uv = temp_descriptor(cs->uvbid);
    1032             : 
    1033        4152 :         if (ui == NULL || uv == NULL) {
    1034           0 :                 bat_destroy(ui);
    1035           0 :                 bat_destroy(uv);
    1036           0 :                 return LOG_ERR;
    1037             :         }
    1038        4152 :         assert(ui && uv);
    1039        4152 :         if (isEbat(ui)){
    1040         400 :                 temp_destroy(cs->uibid);
    1041         400 :                 cs->uibid = temp_copy(ui->batCacheid, true, true);
    1042         400 :                 bat_destroy(ui);
    1043         400 :                 if (cs->uibid == BID_NIL ||
    1044         400 :                     (ui = temp_descriptor(cs->uibid)) == NULL) {
    1045           0 :                         bat_destroy(uv);
    1046           0 :                         return LOG_ERR;
    1047             :                 }
    1048             :         }
    1049        4152 :         if (isEbat(uv)){
    1050         400 :                 temp_destroy(cs->uvbid);
    1051         400 :                 cs->uvbid = temp_copy(uv->batCacheid, true, true);
    1052         400 :                 bat_destroy(uv);
    1053         400 :                 if (cs->uvbid == BID_NIL ||
    1054         400 :                     (uv = temp_descriptor(cs->uvbid)) == NULL) {
    1055           0 :                         bat_destroy(ui);
    1056           0 :                         return LOG_ERR;
    1057             :                 }
    1058             :         }
    1059        4152 :         *Ui = ui;
    1060        4152 :         *Uv = uv;
    1061        4152 :         return LOG_OK;
    1062             : }
    1063             : 
    1064             : static int
    1065        6972 : segments_is_append(segment *s, sql_trans *tr, oid rid)
    1066             : {
    1067      102649 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1068      102649 :                 if (s->start <= rid && s->end > rid) {
    1069        6972 :                         if (s->ts == tr->tid && !s->deleted) {
    1070        2922 :                                 return 1;
    1071             :                         }
    1072             :                         break;
    1073             :                 }
    1074             :         }
    1075             :         return 0;
    1076             : }
    1077             : 
    1078             : static int
    1079        4050 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
    1080             : {
    1081       96455 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1082       96455 :                 if (s->start <= rid && s->end > rid) {
    1083        4050 :                         if (s->ts >= tr->ts && s->deleted) {
    1084           0 :                                 return 1;
    1085             :                         }
    1086             :                         break;
    1087             :                 }
    1088             :         }
    1089             :         return 0;
    1090             : }
    1091             : 
    1092             : static sql_delta *
    1093           0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
    1094             : {
    1095           0 :         sql_delta *n = ZNEW(sql_delta);
    1096           0 :         if (!n)
    1097             :                 return NULL;
    1098           0 :         *n = *bat;
    1099           0 :         n->next = NULL;
    1100           0 :         n->cs.ts = tr->tid;
    1101           0 :         return n;
    1102             : }
    1103             : 
    1104             : static BAT *
    1105          17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
    1106             : {
    1107          17 :         BAT *newoffsets = NULL;
    1108          17 :         sql_delta *bat = *batp;
    1109          17 :         column_storage *cs = &bat->cs;
    1110          17 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1111             : 
    1112          17 :         if (!u)
    1113             :                 return NULL;
    1114          17 :         BUN max_cnt = (BATcount(u) < 256)?256:(BATcount(u)<65536)?65536:INT_MAX;
    1115          17 :         if (DICTprepare4append(&newoffsets, i, u) < 0) {
    1116           0 :                 bat_destroy(u);
    1117           0 :                 return NULL;
    1118             :         } else {
    1119          17 :                 int new = 0;
    1120             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1121          17 :                 if (BATcount(u) >= max_cnt) {
    1122           1 :                         if (max_cnt == INT_MAX) { /* decompress */
    1123           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1124           0 :                                         bat_destroy(u);
    1125           0 :                                         return NULL;
    1126             :                                 }
    1127           0 :                                 if (cs->ucnt) {
    1128           0 :                                         BAT *ui = NULL, *uv = NULL;
    1129           0 :                                         BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1130           0 :                                         bat_destroy(b);
    1131           0 :                                         if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1132           0 :                                                 bat_destroy(nb);
    1133           0 :                                                 bat_destroy(u);
    1134           0 :                                                 return NULL;
    1135             :                                         }
    1136           0 :                                         b = nb;
    1137           0 :                                         if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1138           0 :                                                 bat_destroy(ui);
    1139           0 :                                                 bat_destroy(uv);
    1140           0 :                                                 bat_destroy(b);
    1141           0 :                                                 bat_destroy(u);
    1142             :                                         }
    1143           0 :                                         bat_destroy(ui);
    1144           0 :                                         bat_destroy(uv);
    1145             :                                 }
    1146           0 :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1147           0 :                                 bat_destroy(b);
    1148           0 :                                 assert(newoffsets == NULL);
    1149           0 :                                 if (!n) {
    1150           0 :                                         bat_destroy(u);
    1151           0 :                                         return NULL;
    1152             :                                 }
    1153           0 :                                 if (cs->ts != tr->tid) {
    1154           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1155           0 :                                                 bat_destroy(n);
    1156           0 :                                                 return NULL;
    1157             :                                         }
    1158           0 :                                         cs = &(*batp)->cs;
    1159           0 :                                         new = 1;
    1160             :                                 }
    1161           0 :                                 if (cs->bid && !new)
    1162           0 :                                         temp_destroy(cs->bid);
    1163           0 :                                 n = transfer_to_systrans(n);
    1164           0 :                                 if (n == NULL)
    1165             :                                         return NULL;
    1166           0 :                                 bat_set_access(n, BAT_READ);
    1167           0 :                                 cs->bid = temp_create(n);
    1168           0 :                                 bat_destroy(n);
    1169           0 :                                 if (cs->ebid && !new)
    1170           0 :                                         temp_destroy(cs->ebid);
    1171           0 :                                 cs->ebid = 0;
    1172           0 :                                 cs->ucnt = 0;
    1173           0 :                                 if (cs->uibid && !new)
    1174           0 :                                         temp_destroy(cs->uibid);
    1175           0 :                                 if (cs->uvbid && !new)
    1176           0 :                                         temp_destroy(cs->uvbid);
    1177           0 :                                 cs->uibid = cs->uvbid = 0;
    1178           0 :                                 cs->st = ST_DEFAULT;
    1179           0 :                                 cs->cleared = true;
    1180             :                         } else {
    1181           1 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1182           0 :                                         bat_destroy(newoffsets);
    1183           0 :                                         bat_destroy(u);
    1184           0 :                                         return NULL;
    1185             :                                 }
    1186           2 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
    1187           1 :                                 bat_destroy(b);
    1188           1 :                                 if (!n) {
    1189           0 :                                         bat_destroy(newoffsets);
    1190           0 :                                         bat_destroy(u);
    1191           0 :                                         return NULL;
    1192             :                                 }
    1193           1 :                                 if (cs->ts != tr->tid) {
    1194           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1195           0 :                                                 bat_destroy(n);
    1196           0 :                                                 return NULL;
    1197             :                                         }
    1198           0 :                                         cs = &(*batp)->cs;
    1199           0 :                                         new = 1;
    1200           0 :                                         temp_dup(cs->ebid);
    1201           0 :                                         if (cs->uibid) {
    1202           0 :                                                 temp_dup(cs->uibid);
    1203           0 :                                                 temp_dup(cs->uvbid);
    1204             :                                         }
    1205             :                                 }
    1206           1 :                                 if (cs->bid && !new)
    1207           1 :                                         temp_destroy(cs->bid);
    1208           1 :                                 n = transfer_to_systrans(n);
    1209           1 :                                 if (n == NULL)
    1210             :                                         return NULL;
    1211           1 :                                 bat_set_access(n, BAT_READ);
    1212           1 :                                 cs->bid = temp_create(n);
    1213           1 :                                 bat_destroy(n);
    1214           1 :                                 cs->cleared = true;
    1215           1 :                                 i = newoffsets;
    1216             :                         }
    1217             :                 } else { /* append */
    1218          16 :                         i = newoffsets;
    1219             :                 }
    1220             :         }
    1221          17 :         bat_destroy(u);
    1222          17 :         return i;
    1223             : }
    1224             : 
    1225             : static BAT *
    1226           0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
    1227             : {
    1228           0 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1229           0 :         BAT *newoffsets = NULL;
    1230           0 :         BAT *b = NULL, *n = NULL;
    1231             : 
    1232           0 :         if (!(b = temp_descriptor(cs->bid)))
    1233             :                 return NULL;
    1234             : 
    1235           0 :         if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
    1236           0 :                 bat_destroy(b);
    1237           0 :                 return NULL;
    1238             :         } else {
    1239             :                 /* returns new offset bat if values within min/max, else decompress */
    1240           0 :                 if (!newoffsets) { /* decompress */
    1241           0 :                         if (cs->ucnt) {
    1242           0 :                                 BAT *ui = NULL, *uv = NULL;
    1243           0 :                                 BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1244           0 :                                 bat_destroy(b);
    1245           0 :                                 if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1246           0 :                                         bat_destroy(nb);
    1247           0 :                                         return NULL;
    1248             :                                 }
    1249           0 :                                 b = nb;
    1250           0 :                                 if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1251           0 :                                         bat_destroy(ui);
    1252           0 :                                         bat_destroy(uv);
    1253           0 :                                         bat_destroy(b);
    1254             :                                 }
    1255           0 :                                 bat_destroy(ui);
    1256           0 :                                 bat_destroy(uv);
    1257             :                         }
    1258           0 :                         n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
    1259           0 :                         bat_destroy(b);
    1260           0 :                         if (!n)
    1261             :                                 return NULL;
    1262           0 :                         if (cs->bid)
    1263           0 :                                 temp_destroy(cs->bid);
    1264           0 :                         n = transfer_to_systrans(n);
    1265           0 :                         if (n == NULL)
    1266             :                                 return NULL;
    1267           0 :                         bat_set_access(n, BAT_READ);
    1268           0 :                         cs->bid = temp_create(n);
    1269           0 :                         cs->ucnt = 0;
    1270           0 :                         if (cs->uibid)
    1271           0 :                                 temp_destroy(cs->uibid);
    1272           0 :                         if (cs->uvbid)
    1273           0 :                                 temp_destroy(cs->uvbid);
    1274           0 :                         cs->uibid = cs->uvbid = 0;
    1275           0 :                         cs->st = ST_DEFAULT;
    1276           0 :                         cs->cleared = true;
    1277           0 :                         b = n;
    1278             :                 } else { /* append */
    1279             :                         i = newoffsets;
    1280             :                 }
    1281             :         }
    1282           0 :         bat_destroy(b);
    1283           0 :         return i;
    1284             : }
    1285             : 
    1286             : /*
    1287             :  * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
    1288             :  */
    1289             : static int
    1290        3158 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1291             : {
    1292        3158 :         int res = LOG_OK;
    1293        3158 :         sql_delta *bat = *batp;
    1294        3158 :         column_storage *cs = &bat->cs;
    1295        3158 :         BAT *otids = tids, *oupdates = updates;
    1296             : 
    1297        3158 :         if (!BATcount(tids))
    1298             :                 return LOG_OK;
    1299             : 
    1300        3158 :         if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
    1301           6 :                 tids = BATunmask(tids);
    1302           6 :                 if (!tids)
    1303             :                         return LOG_ERR;
    1304             :         }
    1305        3158 :         if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
    1306           0 :                 updates = BATunmask(updates);
    1307           0 :                 if (!updates) {
    1308           0 :                         if (otids != tids)
    1309           0 :                                 bat_destroy(tids);
    1310           0 :                         return LOG_ERR;
    1311             :                 }
    1312        3158 :         } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
    1313          42 :                 updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
    1314          42 :                 if (!updates) {
    1315           0 :                         if (otids != tids)
    1316           0 :                                 bat_destroy(tids);
    1317           0 :                         return LOG_ERR;
    1318             :                 }
    1319             :         }
    1320             : 
    1321        3158 :         if (cs->st == ST_DICT) {
    1322             :                 /* possibly a new array is returned */
    1323           4 :                 BAT *nupdates = dict_append_bat(tr, batp, updates);
    1324           4 :                 bat = *batp;
    1325           4 :                 cs = &bat->cs;
    1326           4 :                 if (oupdates != updates)
    1327           0 :                         bat_destroy(updates);
    1328           4 :                 updates = nupdates;
    1329           4 :                 if (!updates) {
    1330           0 :                         if (otids != tids)
    1331           0 :                                 bat_destroy(tids);
    1332           0 :                         return LOG_ERR;
    1333             :                 }
    1334             :         }
    1335             : 
    1336             :         /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1337             :         /* currently only one update delta is possible */
    1338        3158 :         lock_table(tr->store, t->base.id);
    1339        3158 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1340        3158 :         if (!is_new && !cs->cleared) {
    1341        2835 :                 if (!tids->tsorted /* make sure we have simple dense or oids */) {
    1342           6 :                         BAT *sorted, *order;
    1343           6 :                         if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
    1344           0 :                                 if (otids != tids)
    1345           0 :                                         bat_destroy(tids);
    1346           0 :                                 if (oupdates != updates)
    1347           0 :                                         bat_destroy(updates);
    1348           0 :                                 unlock_table(tr->store, t->base.id);
    1349           0 :                                 return LOG_ERR;
    1350             :                         }
    1351           6 :                         if (otids != tids)
    1352           0 :                                 bat_destroy(tids);
    1353           6 :                         tids = sorted;
    1354           6 :                         BAT *nupdates = BATproject(order, updates);
    1355           6 :                         bat_destroy(order);
    1356           6 :                         if (oupdates != updates)
    1357           0 :                                 bat_destroy(updates);
    1358           6 :                         updates = nupdates;
    1359           6 :                         if (!updates) {
    1360           0 :                                 bat_destroy(tids);
    1361           0 :                                 unlock_table(tr->store, t->base.id);
    1362           0 :                                 return LOG_ERR;
    1363             :                         }
    1364             :                 }
    1365        2835 :                 assert(tids->tsorted);
    1366        2835 :                 BAT *ui = NULL, *uv = NULL;
    1367             : 
    1368             :                 /* handle updates on just inserted bits */
    1369             :                 /* handle updates on updates (within one transaction) */
    1370        2835 :                 BATiter upi = bat_iterator(updates);
    1371        2835 :                 BUN cnt = 0, ucnt = BATcount(tids);
    1372        2835 :                 BAT *b, *ins = NULL;
    1373        2835 :                 int *msk = NULL;
    1374             : 
    1375        2835 :                 if((b = temp_descriptor(cs->bid)) == NULL)
    1376             :                         res = LOG_ERR;
    1377             : 
    1378        2835 :                 if (res == LOG_OK && BATtdense(tids)) {
    1379        2631 :                         oid start = tids->tseqbase, offset = start;
    1380        2631 :                         oid end = start + ucnt;
    1381             : 
    1382       13272 :                         for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
    1383       11322 :                                 if (seg->start <= start && seg->end > start) {
    1384             :                                         /* check for delete conflicts */
    1385        2631 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1386           0 :                                                 res = LOG_CONFLICT;
    1387           0 :                                                 continue;
    1388             :                                         }
    1389             : 
    1390             :                                         /* check for inplace updates */
    1391        2631 :                                         BUN lend = end < seg->end?end:seg->end;
    1392        2631 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1393         198 :                                                 if (!ins) {
    1394         198 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1395         198 :                                                         if (!ins)
    1396             :                                                                 res = LOG_ERR;
    1397             :                                                         else {
    1398         198 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1399         198 :                                                                 msk = (int*)Tloc(ins, 0);
    1400         198 :                                                                 BUN end = (ucnt+31)/32;
    1401         198 :                                                                 memset(msk, 0, end * sizeof(int));
    1402             :                                                         }
    1403             :                                                 }
    1404         681 :                                                 for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
    1405         483 :                                                         const void *upd = BUNtail(upi, rid-offset);
    1406         483 :                                                         if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1407           0 :                                                                 res = LOG_ERR;
    1408             : 
    1409         483 :                                                         oid word = i/32;
    1410         483 :                                                         int pos = i%32;
    1411         483 :                                                         msk[word] |= 1U<<pos;
    1412         483 :                                                         cnt++;
    1413             :                                                 }
    1414             :                                         }
    1415             :                                 }
    1416       11322 :                                 if (end < seg->end)
    1417             :                                         break;
    1418             :                         }
    1419         207 :                 } else if (res == LOG_OK && complex_cand(tids)) {
    1420           3 :                         struct canditer ci;
    1421           3 :                         segment *seg = s->segs->h;
    1422           3 :                         canditer_init(&ci, NULL, tids);
    1423           3 :                         BUN i = 0;
    1424        1036 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1425        1033 :                                 oid rid = canditer_next(&ci);
    1426        1033 :                                 if (seg->end <= rid)
    1427          13 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1428        1020 :                                 else if (seg->start <= rid && seg->end > rid) {
    1429             :                                         /* check for delete conflicts */
    1430        1020 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1431           0 :                                                 res = LOG_CONFLICT;
    1432           0 :                                                 continue;
    1433             :                                         }
    1434             : 
    1435             :                                         /* check for inplace updates */
    1436        1020 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1437           0 :                                                 if (!ins) {
    1438           0 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1439           0 :                                                         if (!ins) {
    1440             :                                                                 res = LOG_ERR;
    1441             :                                                                 break;
    1442             :                                                         } else {
    1443           0 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1444           0 :                                                                 msk = (int*)Tloc(ins, 0);
    1445           0 :                                                                 BUN end = (ucnt+31)/32;
    1446           0 :                                                                 memset(msk, 0, end * sizeof(int));
    1447             :                                                         }
    1448             :                                                 }
    1449           0 :                                                 ptr upd = BUNtail(upi, i);
    1450           0 :                                                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1451           0 :                                                         res = LOG_ERR;
    1452             : 
    1453           0 :                                                 oid word = i/32;
    1454           0 :                                                 int pos = i%32;
    1455           0 :                                                 msk[word] |= 1U<<pos;
    1456           0 :                                                 cnt++;
    1457             :                                         }
    1458        1020 :                                         i++;
    1459             :                                 }
    1460             :                         }
    1461         201 :                 } else if (res == LOG_OK) {
    1462         201 :                         BUN i = 0;
    1463         201 :                         oid *rid = Tloc(tids,0);
    1464         201 :                         segment *seg = s->segs->h;
    1465       21958 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1466       21757 :                                 if (seg->end <= rid[i])
    1467        8325 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1468       13432 :                                 else if (seg->start <= rid[i] && seg->end > rid[i]) {
    1469             :                                         /* check for delete conflicts */
    1470       13432 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1471           0 :                                                 res = LOG_CONFLICT;
    1472           0 :                                                 continue;
    1473             :                                         }
    1474             : 
    1475             :                                         /* check for inplace updates */
    1476       13432 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1477         995 :                                                 if (!ins) {
    1478          80 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1479          80 :                                                         if (!ins) {
    1480             :                                                                 res = LOG_ERR;
    1481             :                                                                 break;
    1482             :                                                         } else {
    1483          80 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1484          80 :                                                                 msk = (int*)Tloc(ins, 0);
    1485          80 :                                                                 BUN end = (ucnt+31)/32;
    1486          80 :                                                                 memset(msk, 0, end * sizeof(int));
    1487             :                                                         }
    1488             :                                                 }
    1489         995 :                                                 const void *upd = BUNtail(upi, i);
    1490         995 :                                                 if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
    1491           0 :                                                         res = LOG_ERR;
    1492             : 
    1493         995 :                                                 oid word = i/32;
    1494         995 :                                                 int pos = i%32;
    1495         995 :                                                 msk[word] |= 1U<<pos;
    1496         995 :                                                 cnt++;
    1497             :                                         }
    1498       13432 :                                         i++;
    1499             :                                 }
    1500             :                         }
    1501             :                 }
    1502             : 
    1503        2835 :                 if (res == LOG_OK && cnt < ucnt) {   /* now handle real updates */
    1504        2593 :                         if (cs->ucnt == 0) {
    1505        2491 :                                 if (cnt) {
    1506          12 :                                         BAT *nins = BATmaskedcands(0, ucnt, ins, false);
    1507          12 :                                         if (nins) {
    1508          12 :                                                 ui = BATproject(nins, tids);
    1509          12 :                                                 uv = BATproject(nins, updates);
    1510          12 :                                                 bat_destroy(nins);
    1511             :                                         }
    1512             :                                 } else {
    1513        2479 :                                         ui = temp_descriptor(tids->batCacheid);
    1514        2479 :                                         uv = temp_descriptor(updates->batCacheid);
    1515             :                                 }
    1516        2491 :                                 if (!ui || !uv) {
    1517             :                                         res = LOG_ERR;
    1518             :                                 } else {
    1519        2491 :                                         temp_destroy(cs->uibid);
    1520        2491 :                                         temp_destroy(cs->uvbid);
    1521        2491 :                                         ui = transfer_to_systrans(ui);
    1522        2491 :                                         uv = transfer_to_systrans(uv);
    1523        2491 :                                         if (ui == NULL || uv == NULL) {
    1524           0 :                                                 BBPreclaim(ui);
    1525           0 :                                                 BBPreclaim(uv);
    1526             :                                                 res = LOG_ERR;
    1527             :                                         } else {
    1528        2491 :                                                 cs->uibid = temp_create(ui);
    1529        2491 :                                                 cs->uvbid = temp_create(uv);
    1530        2491 :                                                 cs->ucnt = BATcount(ui);
    1531             :                                         }
    1532             :                                 }
    1533             :                         } else {
    1534         102 :                                 BAT *nui = NULL, *nuv = NULL;
    1535             : 
    1536             :                                 /* merge taking msk of inserted into account */
    1537         102 :                                 if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
    1538             :                                         res = LOG_ERR;
    1539             : 
    1540         102 :                                 if (res == LOG_OK) {
    1541         102 :                                         const void *upd = NULL;
    1542         102 :                                         nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
    1543         102 :                                         nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
    1544             : 
    1545         102 :                                         if (!nui || !nuv) {
    1546             :                                                 res = LOG_ERR;
    1547             :                                         } else {
    1548         102 :                                                 BATiter ovi = bat_iterator(uv);
    1549             : 
    1550             :                                                 /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
    1551         102 :                                                 BUN uip = 0, uie = BATcount(ui);
    1552         102 :                                                 BUN nip = 0, nie = BATcount(tids);
    1553         102 :                                                 oid uiseqb = ui->tseqbase;
    1554         102 :                                                 oid niseqb = tids->tseqbase;
    1555         102 :                                                 oid *uipt = NULL, *nipt = NULL;
    1556         102 :                                                 BATiter uii = bat_iterator(ui);
    1557         102 :                                                 BATiter tidsi = bat_iterator(tids);
    1558         102 :                                                 if (!BATtdensebi(&uii))
    1559          97 :                                                         uipt = uii.base;
    1560         102 :                                                 if (!BATtdensebi(&tidsi))
    1561          93 :                                                         nipt = tidsi.base;
    1562       16091 :                                                 while (uip < uie && nip < nie && res == LOG_OK) {
    1563       15989 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1564       15989 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1565             : 
    1566       15989 :                                                         if (uiv < niv) {
    1567        8068 :                                                                 upd = BUNtail(ovi, uip);
    1568       16136 :                                                                 if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1569        8068 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1570             :                                                                         res = LOG_ERR;
    1571        8068 :                                                                 uip++;
    1572        7921 :                                                         } else if (uiv == niv) {
    1573             :                                                                 /* handle == */
    1574        1522 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1575        1522 :                                                                         upd = BUNtail(upi, nip);
    1576        3044 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1577        1522 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1578             :                                                                                 res = LOG_ERR;
    1579             :                                                                 } else {
    1580           0 :                                                                         upd = BUNtail(ovi, uip);
    1581           0 :                                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1582           0 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1583             :                                                                                 res = LOG_ERR;
    1584             :                                                                 }
    1585        1522 :                                                                 uip++;
    1586        1522 :                                                                 nip++;
    1587             :                                                         } else { /* uiv > niv */
    1588        6399 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1589        6255 :                                                                         upd = BUNtail(upi, nip);
    1590       12510 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1591        6255 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1592             :                                                                                 res = LOG_ERR;
    1593             :                                                                 }
    1594        6399 :                                                                 nip++;
    1595             :                                                         }
    1596             :                                                 }
    1597         923 :                                                 while (uip < uie && res == LOG_OK) {
    1598         821 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1599         821 :                                                         upd = BUNtail(ovi, uip);
    1600        1642 :                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1601         821 :                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1602             :                                                                 res = LOG_ERR;
    1603         821 :                                                         uip++;
    1604             :                                                 }
    1605         703 :                                                 while (nip < nie && res == LOG_OK) {
    1606         601 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1607         601 :                                                         if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1608         329 :                                                                 upd = BUNtail(upi, nip);
    1609         658 :                                                                 if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1610         329 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1611             :                                                                         res = LOG_ERR;
    1612             :                                                         }
    1613         601 :                                                         nip++;
    1614             :                                                 }
    1615         102 :                                                 bat_iterator_end(&uii);
    1616         102 :                                                 bat_iterator_end(&tidsi);
    1617         102 :                                                 bat_iterator_end(&ovi);
    1618         102 :                                                 if (res == LOG_OK) {
    1619         102 :                                                         temp_destroy(cs->uibid);
    1620         102 :                                                         temp_destroy(cs->uvbid);
    1621         102 :                                                         nui = transfer_to_systrans(nui);
    1622         102 :                                                         nuv = transfer_to_systrans(nuv);
    1623         102 :                                                         if (nui == NULL || nuv == NULL) {
    1624             :                                                                 res = LOG_ERR;
    1625             :                                                         } else {
    1626         102 :                                                                 cs->uibid = temp_create(nui);
    1627         102 :                                                                 cs->uvbid = temp_create(nuv);
    1628         102 :                                                                 cs->ucnt = BATcount(nui);
    1629             :                                                         }
    1630             :                                                 }
    1631             :                                         }
    1632         102 :                                         bat_destroy(nui);
    1633         102 :                                         bat_destroy(nuv);
    1634             :                                 }
    1635             :                         }
    1636             :                 }
    1637        2835 :                 bat_iterator_end(&upi);
    1638        2835 :                 bat_destroy(b);
    1639        2835 :                 unlock_table(tr->store, t->base.id);
    1640        2835 :                 bat_destroy(ins);
    1641        2835 :                 bat_destroy(ui);
    1642        2835 :                 bat_destroy(uv);
    1643        2835 :                 if (otids != tids)
    1644          10 :                         bat_destroy(tids);
    1645        2835 :                 if (oupdates != updates)
    1646          11 :                         bat_destroy(updates);
    1647        2835 :                 return res;
    1648             :         } else if (is_new || cs->cleared) {
    1649         323 :                 BAT *b = temp_descriptor(cs->bid);
    1650             : 
    1651         323 :                 if (b == NULL) {
    1652             :                         res = LOG_ERR;
    1653             :                 } else {
    1654         323 :                         if (BATcount(b)==0) {
    1655           1 :                                 if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
    1656           0 :                                         res = LOG_ERR;
    1657         322 :                         } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
    1658           0 :                                 res = LOG_ERR;
    1659         323 :                         BBPcold(b->batCacheid);
    1660         323 :                         bat_destroy(b);
    1661             :                 }
    1662             :         }
    1663         323 :         unlock_table(tr->store, t->base.id);
    1664         323 :         if (otids != tids)
    1665           2 :                 bat_destroy(tids);
    1666         323 :         if (oupdates != updates)
    1667          41 :                 bat_destroy(updates);
    1668             :         return res;
    1669             : }
    1670             : 
    1671             : static int
    1672        3158 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1673             : {
    1674        3158 :         return cs_update_bat(tr, bat, t, tids, updates, is_new);
    1675             : }
    1676             : 
    1677             : static void *
    1678           4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
    1679             : {
    1680           4 :         void *newoffsets = NULL;
    1681           4 :         sql_delta *bat = *batp;
    1682           4 :         column_storage *cs = &bat->cs;
    1683           4 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1684             : 
    1685           4 :         if (!u)
    1686             :                 return NULL;
    1687           4 :         BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
    1688           4 :         if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
    1689           0 :                 bat_destroy(u);
    1690           0 :                 return NULL;
    1691             :         } else {
    1692           4 :                 int new = 0;
    1693             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1694           4 :                 if (BATcount(u) >= max_cnt) {
    1695           0 :                         if (max_cnt == INT_MAX) { /* decompress */
    1696             :                                 if (!(b = temp_descriptor(cs->bid))) {
    1697             :                                         bat_destroy(u);
    1698             :                                         return NULL;
    1699             :                                 }
    1700             :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1701             :                                 /* TODO decompress updates if any */
    1702             :                                 bat_destroy(b);
    1703             :                                 assert(newoffsets == NULL);
    1704             :                                 if (!n) {
    1705             :                                         bat_destroy(u);
    1706             :                                         return NULL;
    1707             :                                 }
    1708             :                                 if (cs->ts != tr->tid) {
    1709             :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1710             :                                                 bat_destroy(n);
    1711             :                                                 bat_destroy(u);
    1712             :                                                 return NULL;
    1713             :                                         }
    1714             :                                         cs = &(*batp)->cs;
    1715             :                                         new = 1;
    1716             :                                         cs->uibid = cs->uvbid = 0;
    1717             :                                 }
    1718             :                                 if (cs->bid && !new)
    1719             :                                         temp_destroy(cs->bid);
    1720             :                                 n = transfer_to_systrans(n);
    1721             :                                 if (n == NULL) {
    1722             :                                         bat_destroy(u);
    1723             :                                         return NULL;
    1724             :                                 }
    1725             :                                 bat_set_access(n, BAT_READ);
    1726             :                                 cs->bid = temp_create(n);
    1727             :                                 bat_destroy(n);
    1728             :                                 if (cs->ebid && !new)
    1729             :                                         temp_destroy(cs->ebid);
    1730             :                                 cs->ebid = 0;
    1731             :                                 cs->st = ST_DEFAULT;
    1732             :                                 /* at append_col the column's storage type is cleared */
    1733             :                                 cs->cleared = true;
    1734             :                         } else {
    1735           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1736           0 :                                         GDKfree(newoffsets);
    1737           0 :                                         bat_destroy(u);
    1738           0 :                                         return NULL;
    1739             :                                 }
    1740           0 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
    1741           0 :                                 bat_destroy(b);
    1742           0 :                                 if (!n) {
    1743           0 :                                         GDKfree(newoffsets);
    1744           0 :                                         bat_destroy(u);
    1745           0 :                                         return NULL;
    1746             :                                 }
    1747           0 :                                 if (cs->ts != tr->tid) {
    1748           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1749           0 :                                                 bat_destroy(u);
    1750           0 :                                                 bat_destroy(n);
    1751           0 :                                                 return NULL;
    1752             :                                         }
    1753           0 :                                         cs = &(*batp)->cs;
    1754           0 :                                         new = 1;
    1755           0 :                                         temp_dup(cs->ebid);
    1756           0 :                                         if (cs->uibid) {
    1757           0 :                                                 temp_dup(cs->uibid);
    1758           0 :                                                 temp_dup(cs->uvbid);
    1759             :                                         }
    1760             :                                 }
    1761           0 :                                 if (cs->bid)
    1762           0 :                                         temp_destroy(cs->bid);
    1763           0 :                                 n = transfer_to_systrans(n);
    1764           0 :                                 if (n == NULL) {
    1765           0 :                                         bat_destroy(u);
    1766           0 :                                         return NULL;
    1767             :                                 }
    1768           0 :                                 bat_set_access(n, BAT_READ);
    1769           0 :                                 cs->bid = temp_create(n);
    1770           0 :                                 bat_destroy(n);
    1771           0 :                                 cs->cleared = true;
    1772           0 :                                 i = newoffsets;
    1773             :                         }
    1774             :                 } else { /* append */
    1775           4 :                         i = newoffsets;
    1776             :                 }
    1777             :         }
    1778           4 :         bat_destroy(u);
    1779           4 :         return i;
    1780             : }
    1781             : 
    1782             : static void *
    1783           1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
    1784             : {
    1785           1 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1786           1 :         void *newoffsets = NULL;
    1787           1 :         BAT *b = NULL, *n = NULL;
    1788             : 
    1789           1 :         if (!(b = temp_descriptor(cs->bid)))
    1790             :                 return NULL;
    1791             : 
    1792           1 :         if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
    1793           0 :                 bat_destroy(b);
    1794           0 :                 return NULL;
    1795             :         } else {
    1796             :                 /* returns new offset bat if values within min/max, else decompress */
    1797           1 :                 if (!newoffsets) {
    1798           1 :                         n = FORdecompress_(b, offsetval, tt, PERSISTENT);
    1799           1 :                         bat_destroy(b);
    1800           1 :                         if (!n)
    1801             :                                 return NULL;
    1802             :                         /* TODO decompress updates if any */
    1803           1 :                         if (cs->bid)
    1804           1 :                                 temp_destroy(cs->bid);
    1805           1 :                         n = transfer_to_systrans(n);
    1806           1 :                         if (n == NULL)
    1807             :                                 return NULL;
    1808           1 :                         bat_set_access(n, BAT_READ);
    1809           1 :                         cs->bid = temp_create(n);
    1810           1 :                         cs->st = ST_DEFAULT;
    1811             :                         /* at append_col the column's storage type is cleared */
    1812           1 :                         cs->cleared = true;
    1813           1 :                         b = n;
    1814             :                 } else { /* append */
    1815             :                         i = newoffsets;
    1816             :                 }
    1817             :         }
    1818           1 :         bat_destroy(b);
    1819           1 :         return i;
    1820             : }
    1821             : 
    1822             : static int
    1823        6972 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
    1824             : {
    1825        6972 :         void *oupd = upd;
    1826        6972 :         sql_delta *bat = *batp;
    1827        6972 :         column_storage *cs = &bat->cs;
    1828        6972 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1829        6972 :         assert(!is_oid_nil(rid));
    1830        6972 :         int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
    1831             : 
    1832        6972 :         if (cs->st == ST_DICT) {
    1833             :                 /* possibly a new array is returned */
    1834           0 :                 upd = dict_append_val(tr, batp, upd, 1);
    1835           0 :                 bat = *batp;
    1836           0 :                 cs = &bat->cs;
    1837           0 :                 if (!upd)
    1838             :                         return LOG_ERR;
    1839             :         }
    1840             : 
    1841             :         /* check if rid is insert ? */
    1842        6972 :         if (!inplace) {
    1843             :                 /* check conflict */
    1844        4050 :                 if (segments_is_deleted(s->segs->h, tr, rid)) {
    1845           0 :                         if (oupd != upd)
    1846           0 :                                 GDKfree(upd);
    1847           0 :                         return LOG_CONFLICT;
    1848             :                 }
    1849        4050 :                 BAT *ui, *uv;
    1850             : 
    1851             :                 /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1852             :                 /* currently only one update delta is possible */
    1853        4050 :                 if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1854           0 :                         if (oupd != upd)
    1855           0 :                                 GDKfree(upd);
    1856           0 :                         return LOG_ERR;
    1857             :                 }
    1858             : 
    1859        4050 :                 assert(uv->ttype);
    1860        4050 :                 assert(BATcount(ui) == BATcount(uv));
    1861        8100 :                 if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
    1862        4050 :                     BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
    1863           0 :                         if (oupd != upd)
    1864           0 :                                 GDKfree(upd);
    1865           0 :                         bat_destroy(ui);
    1866           0 :                         bat_destroy(uv);
    1867           0 :                         return LOG_ERR;
    1868             :                 }
    1869        4050 :                 assert(BATcount(ui) == BATcount(uv));
    1870        4050 :                 bat_destroy(ui);
    1871        4050 :                 bat_destroy(uv);
    1872        4050 :                 cs->ucnt++;
    1873             :         } else {
    1874        2922 :                 BAT *b = NULL;
    1875             : 
    1876        2922 :                 if((b = temp_descriptor(cs->bid)) == NULL) {
    1877           0 :                         if (oupd != upd)
    1878           0 :                                 GDKfree(upd);
    1879           0 :                         return LOG_ERR;
    1880             :                 }
    1881        2922 :                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
    1882           0 :                         if (oupd != upd)
    1883           0 :                                 GDKfree(upd);
    1884           0 :                         bat_destroy(b);
    1885           0 :                         return LOG_ERR;
    1886             :                 }
    1887        2922 :                 bat_destroy(b);
    1888             :         }
    1889        6972 :         if (oupd != upd)
    1890           0 :                 GDKfree(upd);
    1891             :         return LOG_OK;
    1892             : }
    1893             : 
    1894             : static int
    1895        6972 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
    1896             : {
    1897        6972 :         int res = LOG_OK;
    1898        6972 :         lock_table(tr->store, t->base.id);
    1899        6972 :         res = cs_update_val(tr, bat, t, rid, upd, is_new);
    1900        6972 :         unlock_table(tr->store, t->base.id);
    1901        6972 :         return res;
    1902             : }
    1903             : 
    1904             : static int
    1905      158860 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
    1906             : {
    1907      158860 :         (void)tr;
    1908      158860 :         if (!ocs)
    1909             :                 return LOG_OK;
    1910      158860 :         cs->bid = ocs->bid;
    1911      158860 :         cs->ebid = ocs->ebid;
    1912      158860 :         cs->uibid = ocs->uibid;
    1913      158860 :         cs->uvbid = ocs->uvbid;
    1914      158860 :         cs->ucnt = ocs->ucnt;
    1915             : 
    1916      158860 :         if (temp) {
    1917       25963 :                 cs->bid = temp_copy(cs->bid, true, false);
    1918       25945 :                 if (cs->bid == BID_NIL)
    1919             :                         return LOG_ERR;
    1920             :         } else {
    1921      132897 :                 temp_dup(cs->bid);
    1922             :         }
    1923      158839 :         if (cs->ebid)
    1924           6 :                 temp_dup(cs->ebid);
    1925      158839 :         cs->ucnt = 0;
    1926      158839 :         cs->uibid = e_bat(TYPE_oid);
    1927      158876 :         cs->uvbid = e_bat(type);
    1928      158875 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1929             :                 return LOG_ERR;
    1930      158875 :         cs->st = ocs->st;
    1931      158875 :         return LOG_OK;
    1932             : }
    1933             : 
    1934             : static void
    1935      322614 : destroy_delta(sql_delta *b, bool recursive)
    1936             : {
    1937      322614 :         if (ATOMIC_DEC(&b->cs.refcnt) > 0)
    1938             :                 return;
    1939      301208 :         if (recursive && b->next)
    1940      127511 :                 destroy_delta(b->next, true);
    1941      301208 :         if (b->cs.uibid)
    1942      100579 :                 temp_destroy(b->cs.uibid);
    1943      301208 :         if (b->cs.uvbid)
    1944      100579 :                 temp_destroy(b->cs.uvbid);
    1945      301208 :         if (b->cs.bid)
    1946      301208 :                 temp_destroy(b->cs.bid);
    1947      301208 :         if (b->cs.ebid)
    1948          61 :                 temp_destroy(b->cs.ebid);
    1949      301208 :         b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
    1950      301208 :         _DELETE(b);
    1951             : }
    1952             : 
    1953             : static sql_delta *
    1954    16287369 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
    1955             : {
    1956    16287369 :         sql_delta *obat = ATOMIC_PTR_GET(&c->data);
    1957             : 
    1958    16287369 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    1959    16154527 :                 return obat;
    1960      132842 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    1961             :                 /* abort */
    1962          12 :                 if (update_conflict)
    1963           4 :                         *update_conflict = true;
    1964           8 :                 else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
    1965           8 :                         return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
    1966           4 :                 return NULL;
    1967             :         }
    1968      132830 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
    1969             :                 return NULL;
    1970      132830 :         sql_delta* bat = ZNEW(sql_delta);
    1971      132879 :         if (!bat)
    1972             :                 return NULL;
    1973      132879 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    1974      132879 :         if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
    1975           0 :                 destroy_delta(bat, false);
    1976           0 :                 return NULL;
    1977             :         }
    1978      132882 :         bat->cs.ts = tr->tid;
    1979             :         /* only one writer else abort */
    1980      132882 :         bat->next = obat;
    1981      132882 :         if (obat)
    1982      132882 :                 bat->nr_updates = obat->nr_updates;
    1983      132882 :         if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
    1984           0 :                 bat->next = NULL;
    1985           0 :                 destroy_delta(bat, false);
    1986           0 :                 if (update_conflict)
    1987           0 :                         *update_conflict = true;
    1988           0 :                 return NULL;
    1989             :         }
    1990             :         return bat;
    1991             : }
    1992             : 
    1993             : static int
    1994       10130 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
    1995             : {
    1996       10130 :         int ok = LOG_OK;
    1997             : 
    1998       10130 :         if (is_bat) {
    1999        3158 :                 BAT *tids = incoming_tids;
    2000        3158 :                 BAT *values = incoming_values;
    2001        3158 :                 if (BATcount(tids) == 0)
    2002             :                         return LOG_OK;
    2003        3158 :                 ok = delta_update_bat(tr, delta, table, tids, values, is_new);
    2004             :         } else {
    2005        6972 :                 ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
    2006             :         }
    2007             :         return ok;
    2008             : }
    2009             : 
    2010             : static int
    2011       10169 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
    2012             : {
    2013       10169 :         int res = LOG_OK;
    2014       10169 :         bool update_conflict = false;
    2015       10169 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2016             : 
    2017       10169 :         if (isbat) {
    2018        3194 :                 BAT *t = tids;
    2019        3194 :                 if (!BATcount(t))
    2020             :                         return LOG_OK;
    2021             :         }
    2022             : 
    2023        9855 :         if (c == NULL)
    2024             :                 return LOG_ERR;
    2025             : 
    2026        9855 :         if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
    2027           4 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2028             : 
    2029        9851 :         assert(delta && delta->cs.ts == tr->tid);
    2030        9851 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    2031        9851 :         if (odelta != delta)
    2032        3482 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    2033             : 
    2034        9851 :         odelta = delta;
    2035        9851 :         if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
    2036             :                 return res;
    2037        9851 :         assert(delta == odelta);
    2038        9851 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2039           0 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2040             :         return res;
    2041             : }
    2042             : 
    2043             : static sql_delta *
    2044        2462 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
    2045             : {
    2046        2462 :         sql_delta *obat = ATOMIC_PTR_GET(&i->data);
    2047             : 
    2048        2462 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    2049        2434 :                 return obat;
    2050          28 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    2051             :                 /* abort */
    2052           0 :                 if (update_conflict)
    2053           0 :                         *update_conflict = true;
    2054           0 :                 return NULL;
    2055             :         }
    2056          28 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
    2057             :                 return NULL;
    2058          28 :         sql_delta* bat = ZNEW(sql_delta);
    2059          28 :         if (!bat)
    2060             :                 return NULL;
    2061          28 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2062          33 :         if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
    2063           0 :                 destroy_delta(bat, false);
    2064           0 :                 return NULL;
    2065             :         }
    2066          28 :         bat->cs.ts = tr->tid;
    2067             :         /* only one writer else abort */
    2068          28 :         bat->next = obat;
    2069          28 :         if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
    2070           0 :                 bat->next = NULL;
    2071           0 :                 destroy_delta(bat, false);
    2072           0 :                 if (update_conflict)
    2073           0 :                         *update_conflict = true;
    2074           0 :                 return NULL;
    2075             :         }
    2076             :         return bat;
    2077             : }
    2078             : 
    2079             : static int
    2080         782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
    2081             : {
    2082         782 :         int res = LOG_OK;
    2083         782 :         bool update_conflict = false;
    2084         782 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    2085             : 
    2086         782 :         if (isbat) {
    2087         782 :                 BAT *t = tids;
    2088         782 :                 if (!BATcount(t))
    2089             :                         return LOG_OK;
    2090             :         }
    2091             : 
    2092         279 :         if (i == NULL)
    2093             :                 return LOG_ERR;
    2094             : 
    2095         279 :         if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
    2096           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2097             : 
    2098         279 :         assert(delta && delta->cs.ts == tr->tid);
    2099         279 :         if (odelta != delta)
    2100          22 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    2101             : 
    2102         279 :         odelta = delta;
    2103         279 :         res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
    2104         279 :         assert(delta == odelta);
    2105             :         return res;
    2106             : }
    2107             : 
    2108             : static int
    2109      149217 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
    2110             : {
    2111      149217 :         BAT *b, *oi = i;
    2112      149217 :         int err = 0;
    2113      149217 :         sql_delta *bat = *batp;
    2114             : 
    2115      149217 :         assert(!offsets || BATcount(offsets) == BATcount(i));
    2116      149217 :         if (!BATcount(i))
    2117             :                 return LOG_OK;
    2118      149217 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
    2119             :                 return LOG_ERR;
    2120             : 
    2121      149217 :         lock_column(tr->store, id);
    2122      150076 :         if (bat->cs.st == ST_DICT) {
    2123          13 :                 BAT *ni = dict_append_bat(tr, batp, oi);
    2124          13 :                 bat = *batp;
    2125          13 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2126           0 :                         bat_destroy(oi);
    2127          13 :                 oi = ni;
    2128          13 :                 if (!oi) {
    2129           0 :                         unlock_column(tr->store, id);
    2130           0 :                         return LOG_ERR;
    2131             :                 }
    2132             :         }
    2133      150076 :         if (bat->cs.st == ST_FOR) {
    2134           0 :                 BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
    2135           0 :                 bat = *batp;
    2136           0 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2137           0 :                         bat_destroy(oi);
    2138           0 :                 oi = ni;
    2139           0 :                 if (!oi) {
    2140           0 :                         unlock_column(tr->store, id);
    2141           0 :                         return LOG_ERR;
    2142             :                 }
    2143             :         }
    2144             : 
    2145      150076 :         b = temp_descriptor(bat->cs.bid);
    2146      149772 :         if (b == NULL) {
    2147           0 :                 unlock_column(tr->store, id);
    2148           0 :                 if (oi != i)
    2149           0 :                         bat_destroy(oi);
    2150           0 :                 return LOG_ERR;
    2151             :         }
    2152      149772 :         if (!offsets && offset == b->hseqbase+BATcount(b)) {
    2153      149584 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2154           0 :                         err = 1;
    2155         176 :         } else if (!offsets) {
    2156         176 :                 if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
    2157           0 :                         err = 1;
    2158          12 :         } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
    2159           0 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2160           0 :                         err = 1;
    2161          12 :         } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
    2162           0 :                         err = 1;
    2163             :         }
    2164      149554 :         bat_destroy(b);
    2165      150382 :         unlock_column(tr->store, id);
    2166             : 
    2167      150393 :         if (oi != i)
    2168          13 :                 bat_destroy(oi);
    2169      150393 :         return (err)?LOG_ERR:LOG_OK;
    2170             : }
    2171             : 
    2172             : // Look at the offsets and find where the replacements end and the appends begin.
    2173             : static BUN
    2174           0 : start_of_appends(BAT *offsets, BUN bcnt)
    2175             : {
    2176           0 :         BUN ocnt = BATcount(offsets);
    2177           0 :         if (ocnt == 0)
    2178             :                 return 0;
    2179             : 
    2180           0 :         BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
    2181           0 :         if (highest < bcnt)
    2182             :                 // all are replacements
    2183             :                 return ocnt;
    2184             : 
    2185             :         // reason backward to find the first append.
    2186             :         // Suppose offsets has 15 entries, bcnt == 100
    2187             :         // and the highest offset in offsets is 109.
    2188           0 :         BUN new_bcnt = highest + 1; // 110
    2189           0 :         BUN nappends = new_bcnt - bcnt; // 10
    2190           0 :         BUN nreplacements = ocnt - nappends; // 5
    2191             : 
    2192             :         // The first append should be to position bcnt
    2193           0 :         assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
    2194             : 
    2195             :         return nreplacements;
    2196             : }
    2197             : 
    2198             : 
    2199             : static int
    2200    15997552 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
    2201             : {
    2202    15997552 :         void *oi = i;
    2203    15997552 :         BAT *b;
    2204    15997552 :         lock_column(tr->store, id);
    2205    16013865 :         sql_delta *bat = *batp;
    2206             : 
    2207    16013865 :         if (bat->cs.st == ST_DICT) {
    2208             :                 /* possibly a new array is returned */
    2209           4 :                 i = dict_append_val(tr, batp, i, cnt);
    2210           4 :                 bat = *batp;
    2211           4 :                 if (!i) {
    2212           0 :                         unlock_column(tr->store, id);
    2213           0 :                         return LOG_ERR;
    2214             :                 }
    2215             :         }
    2216    16013865 :         if (bat->cs.st == ST_FOR) {
    2217             :                 /* possibly a new array is returned */
    2218           1 :                 i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
    2219           1 :                 bat = *batp;
    2220           1 :                 if (!i) {
    2221           0 :                         unlock_column(tr->store, id);
    2222           0 :                         return LOG_ERR;
    2223             :                 }
    2224             :         }
    2225             : 
    2226    16013865 :         b = temp_descriptor(bat->cs.bid);
    2227    16009651 :         if (b == NULL) {
    2228           0 :                 if (i != oi)
    2229           0 :                         GDKfree(i);
    2230           0 :                 unlock_column(tr->store, id);
    2231           0 :                 return LOG_ERR;
    2232             :         }
    2233    16009651 :         BUN bcnt = BATcount(b);
    2234             : 
    2235    16009651 :         if (offsets) {
    2236             :                 // The first few might be replacements while later items might be appends.
    2237             :                 // Handle the replacements here while leaving the appends to the code below.
    2238           0 :                 BUN nreplacements = start_of_appends(offsets, bcnt);
    2239             : 
    2240           0 :                 oid *start = Tloc(offsets, 0);
    2241           0 :                 if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
    2242           0 :                         bat_destroy(b);
    2243           0 :                         if (i != oi)
    2244           0 :                                 GDKfree(i);
    2245           0 :                         unlock_column(tr->store, id);
    2246           0 :                         return LOG_ERR;
    2247             :                 }
    2248             : 
    2249             :                 // Replacements have been handled. The rest are appends.
    2250           0 :                 assert(offset == oid_nil);
    2251           0 :                 offset = bcnt;
    2252           0 :                 cnt -= nreplacements;
    2253             :         }
    2254             : 
    2255    16009651 :         if (bcnt > offset){
    2256      550208 :                 size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
    2257      550208 :                 if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
    2258           0 :                         bat_destroy(b);
    2259           0 :                         if (i != oi)
    2260           0 :                                 GDKfree(i);
    2261           0 :                         unlock_column(tr->store, id);
    2262           0 :                         return LOG_ERR;
    2263             :                 }
    2264      554036 :                 cnt -= ccnt;
    2265      554036 :                 offset += ccnt;
    2266             :         }
    2267    16013479 :         if (cnt) {
    2268    15459409 :                 if (BATcount(b) < offset) { /* add space */
    2269        6621 :                         BUN d = offset - BATcount(b);
    2270        6621 :                         if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
    2271           0 :                                 bat_destroy(b);
    2272           0 :                                 if (i != oi)
    2273           0 :                                         GDKfree(i);
    2274           0 :                                 unlock_column(tr->store, id);
    2275           0 :                                 return LOG_ERR;
    2276             :                         }
    2277             :                 }
    2278    15459407 :                 if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
    2279           0 :                         bat_destroy(b);
    2280           0 :                         if (i != oi)
    2281           0 :                                 GDKfree(i);
    2282           0 :                         unlock_column(tr->store, id);
    2283           0 :                         return LOG_ERR;
    2284             :                 }
    2285             :         }
    2286    16017003 :         bat_destroy(b);
    2287    16010773 :         if (i != oi)
    2288           4 :                 GDKfree(i);
    2289    16010773 :         unlock_column(tr->store, id);
    2290    16010773 :         return LOG_OK;
    2291             : }
    2292             : 
    2293             : static int
    2294       25965 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
    2295             : {
    2296       25965 :         if (!(bat->segs = new_segments(tr, 0)))
    2297             :                 return LOG_ERR;
    2298       25963 :         return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
    2299             : }
    2300             : 
    2301             : static int
    2302    16146756 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type)
    2303             : {
    2304    16146756 :         int ok = LOG_OK;
    2305             : 
    2306    16146756 :         if ((*delta)->cs.merged)
    2307       37049 :                 (*delta)->cs.merged = false; /* TODO needs to move */
    2308    16146756 :         if (isbat) {
    2309      149404 :                 BAT *bat = incoming_data;
    2310             : 
    2311      149404 :                 if (BATcount(bat))
    2312      149545 :                         ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
    2313             :         } else {
    2314    15997352 :                 ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
    2315             :         }
    2316    16170250 :         return ok;
    2317             : }
    2318             : 
    2319             : static int
    2320    16153788 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2321             : {
    2322    16153788 :         int res = LOG_OK;
    2323    16153788 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2324             : 
    2325    16153788 :         if (isbat) {
    2326      149363 :                 BAT *t = data;
    2327      149363 :                 if (!BATcount(t))
    2328             :                         return LOG_OK;
    2329             :         }
    2330             : 
    2331    16153013 :         if ((delta = bind_col_data(tr, c, NULL)) == NULL)
    2332             :                 return LOG_ERR;
    2333             : 
    2334    16148185 :         assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
    2335             : 
    2336    16148185 :         odelta = delta;
    2337    16148185 :         if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type)) != LOG_OK)
    2338             :                 return res;
    2339    16166859 :         if (odelta != delta) {
    2340           0 :                 delta->next = odelta;
    2341           0 :                 if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
    2342           0 :                         delta->next = NULL;
    2343           0 :                         destroy_delta(delta, false);
    2344           0 :                         return LOG_CONFLICT;
    2345             :                 }
    2346             :         }
    2347    16166859 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2348           1 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2349             :         return res;
    2350             : }
    2351             : 
    2352             : static int
    2353        2189 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2354             : {
    2355        2189 :         int res = LOG_OK;
    2356        2189 :         sql_delta *delta;
    2357             : 
    2358        2189 :         if (isbat) {
    2359        1015 :                 BAT *t = data;
    2360        1015 :                 if (!BATcount(t))
    2361             :                         return LOG_OK;
    2362             :         }
    2363             : 
    2364        2177 :         if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
    2365             :                 return LOG_ERR;
    2366             : 
    2367        2177 :         assert(delta->cs.st == ST_DEFAULT);
    2368             : 
    2369        2177 :         res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL);
    2370        2177 :         return res;
    2371             : }
    2372             : 
    2373             : static int
    2374       79515 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
    2375             : {
    2376       79515 :         int err = 0;
    2377             : 
    2378             :         /* TODO check for conflicting updates */
    2379       79515 :         (void)rid;
    2380       79515 :         (void)cnt;
    2381      584710 :         for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
    2382      505195 :                 sql_column *c = n->data;
    2383      505195 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    2384             : 
    2385             :                 /* check for active updates */
    2386      505195 :                 if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
    2387             :                         return 1;
    2388             :         }
    2389             :         return 0;
    2390             : }
    2391             : 
    2392             : static int
    2393       75607 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
    2394             : {
    2395       75607 :         lock_table(tr->store, t->base.id);
    2396             : 
    2397       75607 :         int in_transaction = segments_in_transaction(tr, t);
    2398             : 
    2399             :         /* find segment of rid, split, mark new segment deleted (for tr->tid) */
    2400       75607 :         segment *seg = s->segs->h, *p = NULL;
    2401    52567024 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2402    52567024 :                 if (seg->start <= rid && seg->end > rid) {
    2403       75607 :                         if (!SEG_VALID_4_DELETE(seg,tr)) {
    2404           4 :                                 unlock_table(tr->store, t->base.id);
    2405           4 :                                 return LOG_CONFLICT;
    2406             :                         }
    2407       75603 :                         if (deletes_conflict_updates( tr, t, rid, 1)) {
    2408           0 :                                 unlock_table(tr->store, t->base.id);
    2409           0 :                                 return LOG_CONFLICT;
    2410             :                         }
    2411       75603 :                         if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
    2412           0 :                                 unlock_table(tr->store, t->base.id);
    2413           0 :                                 return LOG_ERR;
    2414             :                         }
    2415             :                         break;
    2416             :                 }
    2417             :         }
    2418       75603 :         unlock_table(tr->store, t->base.id);
    2419       75603 :         if (!in_transaction)
    2420       13169 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2421             :         return LOG_OK;
    2422             : }
    2423             : 
    2424             : static int
    2425        3910 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
    2426             : {
    2427        3910 :         segment *seg = *Seg, *p = NULL;
    2428       13341 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2429       13339 :                 if (seg->start <= start && seg->end > start) {
    2430        3960 :                         size_t lcnt = cnt;
    2431        3960 :                         if (start+lcnt > seg->end)
    2432          54 :                                 lcnt = seg->end-start;
    2433        3960 :                         if (SEG_IS_DELETED(seg, tr)) {
    2434          47 :                                 start += lcnt;
    2435          47 :                                 cnt -= lcnt;
    2436          47 :                                 continue;
    2437        3913 :                         } else if (!SEG_VALID_4_DELETE(seg, tr))
    2438           1 :                                 return LOG_CONFLICT;
    2439        3912 :                         if (deletes_conflict_updates( tr, t, start, lcnt))
    2440             :                                 return LOG_CONFLICT;
    2441        3912 :                         *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
    2442        3912 :                         if (!seg)
    2443             :                                 return LOG_ERR;
    2444        3912 :                         start += lcnt;
    2445        3912 :                         cnt -= lcnt;
    2446             :                 }
    2447       13291 :                 if (start+cnt <= seg->end)
    2448             :                         break;
    2449             :         }
    2450             :         return LOG_OK;
    2451             : }
    2452             : 
    2453             : static int
    2454         735 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
    2455             : {
    2456         735 :         segment *seg = s->segs->h;
    2457         735 :         return seg_delete_range(tr, t, s, &seg, start, cnt);
    2458             : }
    2459             : 
    2460             : static int
    2461         319 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
    2462             : {
    2463         319 :         int in_transaction = segments_in_transaction(tr, t);
    2464         319 :         BAT *oi = i;    /* update ids */
    2465         319 :         int ok = LOG_OK;
    2466             : 
    2467         319 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
    2468             :                 return LOG_ERR;
    2469         319 :         if (BATcount(i)) {
    2470         568 :                 if (BATtdense(i)) {
    2471         249 :                         size_t start = i->tseqbase;
    2472         249 :                         size_t cnt = BATcount(i);
    2473             : 
    2474         249 :                         lock_table(tr->store, t->base.id);
    2475         249 :                         ok = delete_range(tr, t, s, start, cnt);
    2476         249 :                         unlock_table(tr->store, t->base.id);
    2477          70 :                 } else if (complex_cand(i)) {
    2478           0 :                         struct canditer ci;
    2479           0 :                         oid f = 0, l = 0, cur = 0;
    2480             : 
    2481           0 :                         canditer_init(&ci, NULL, i);
    2482           0 :                         cur = f = canditer_next(&ci);
    2483             : 
    2484           0 :                         lock_table(tr->store, t->base.id);
    2485           0 :                         if (!is_oid_nil(f)) {
    2486           0 :                                 segment *seg = s->segs->h;
    2487           0 :                                 for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
    2488           0 :                                         if (cur+1 == l) {
    2489           0 :                                                 cur++;
    2490           0 :                                                 continue;
    2491             :                                         }
    2492           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2493           0 :                                         f = cur = l;
    2494             :                                 }
    2495           0 :                                 if (ok == LOG_OK)
    2496           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2497             :                         }
    2498           0 :                         unlock_table(tr->store, t->base.id);
    2499             :                 } else {
    2500          70 :                         if (!i->tsorted) {
    2501           0 :                                 assert(oi == i);
    2502           0 :                                 BAT *ni = NULL;
    2503           0 :                                 if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
    2504           0 :                                         ok = LOG_ERR;
    2505           0 :                                 if (ni)
    2506           0 :                                         i = ni;
    2507             :                         }
    2508          70 :                         assert(i->tsorted);
    2509          70 :                         BUN icnt = BATcount(i);
    2510          70 :                         BATiter ii = bat_iterator(i);
    2511          70 :                         oid *o = ii.base, n = o[0]+1;
    2512          70 :                         size_t lcnt = 1;
    2513             : 
    2514          70 :                         lock_table(tr->store, t->base.id);
    2515          70 :                         segment *seg = s->segs->h;
    2516       23233 :                         for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
    2517       23163 :                                 if (o[i] == n) {
    2518       22803 :                                         lcnt++;
    2519       22803 :                                         n++;
    2520             :                                 } else {
    2521         360 :                                         ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2522         360 :                                         lcnt = 0;
    2523             :                                 }
    2524       23163 :                                 if (!lcnt) {
    2525         360 :                                         n = o[i]+1;
    2526         360 :                                         lcnt = 1;
    2527             :                                 }
    2528             :                         }
    2529          70 :                         bat_iterator_end(&ii);
    2530          70 :                         if (lcnt && ok == LOG_OK)
    2531          70 :                                 ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2532          70 :                         unlock_table(tr->store, t->base.id);
    2533             :                 }
    2534             :         }
    2535         319 :         if (i != oi)
    2536          25 :                 bat_destroy(i);
    2537             :         // assert
    2538         319 :         if (!in_transaction)
    2539         271 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2540             :         return ok;
    2541             : }
    2542             : 
    2543             : static void
    2544       52745 : destroy_segments(segments *s)
    2545             : {
    2546       52745 :         if (!s || sql_ref_dec(&s->r) > 0)
    2547           0 :                 return;
    2548       52745 :         segment *seg = s->h;
    2549      116653 :         while(seg) {
    2550       63908 :                 segment *n = ATOMIC_PTR_GET(&seg->next);
    2551       63908 :                 _DELETE(seg);
    2552       63908 :                 seg = n;
    2553             :         }
    2554       52745 :         _DELETE(s);
    2555             : }
    2556             : 
    2557             : static void
    2558       54210 : destroy_storage(storage *bat)
    2559             : {
    2560       54210 :         if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
    2561             :                 return;
    2562       52603 :         if (bat->next)
    2563        6325 :                 destroy_storage(bat->next);
    2564       52603 :         destroy_segments(bat->segs);
    2565       52603 :         if (bat->cs.uibid)
    2566       31221 :                 temp_destroy(bat->cs.uibid);
    2567       52603 :         if (bat->cs.uvbid)
    2568       31221 :                 temp_destroy(bat->cs.uvbid);
    2569       52603 :         if (bat->cs.bid)
    2570       52603 :                 temp_destroy(bat->cs.bid);
    2571       52603 :         bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
    2572       52603 :         _DELETE(bat);
    2573             : }
    2574             : 
    2575             : static int
    2576      174882 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
    2577             : {
    2578      174882 :         if (uncommitted) {
    2579      455136 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2580      301059 :                         if (!VALID_4_READ(s->ts,tr))
    2581             :                                 return 1;
    2582             :         } else {
    2583      298791 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2584      279365 :                         if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
    2585             :                                 return 1;
    2586             :         }
    2587             : 
    2588             :         return 0;
    2589             : }
    2590             : 
    2591             : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
    2592             : 
    2593             : storage *
    2594     2233552 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
    2595             : {
    2596     2233552 :         storage *obat;
    2597             : 
    2598     2233552 :         obat = ATOMIC_PTR_GET(&t->data);
    2599             : 
    2600     2233552 :         if (obat->cs.ts != tr->tid)
    2601     1546660 :                 if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
    2602     1546605 :                         if (obat->cs.ts >= TRANSACTION_ID_BASE) {
    2603             :                                 /* abort */
    2604       15444 :                                 if (clear)
    2605       15444 :                                         *clear = true;
    2606       15444 :                                 return NULL;
    2607             :                         }
    2608             : 
    2609     2218108 :         if (!clear)
    2610             :                 return obat;
    2611             : 
    2612             :         /* remainder is only to handle clear */
    2613       26334 :         if (segments_conflict(tr, obat->segs, 1)) {
    2614         371 :                 *clear = true;
    2615         371 :                 return NULL;
    2616             :         }
    2617       25963 :         if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
    2618             :                 return NULL;
    2619       25963 :         storage *bat = ZNEW(storage);
    2620       25966 :         if (!bat)
    2621             :                 return NULL;
    2622       25966 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2623       25966 :         if (dup_storage(tr, obat, bat) != LOG_OK) {
    2624           0 :                 destroy_storage(bat);
    2625           0 :                 return NULL;
    2626             :         }
    2627       25966 :         bat->cs.cleared = true;
    2628       25966 :         bat->cs.ts = tr->tid;
    2629             :         /* only one writer else abort */
    2630       25966 :         bat->next = obat;
    2631       25966 :         if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
    2632          10 :                 bat->next = NULL;
    2633          10 :                 destroy_storage(bat);
    2634          10 :                 if (clear)
    2635          10 :                         *clear = true;
    2636          10 :                 return NULL;
    2637             :         }
    2638             :         return bat;
    2639             : }
    2640             : 
    2641             : static int
    2642       75974 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
    2643             : {
    2644       75974 :         int ok = LOG_OK;
    2645       75974 :         BAT *b = ib;
    2646       75974 :         storage *bat;
    2647             : 
    2648       75974 :         if (isbat && !BATcount(b))
    2649             :                 return ok;
    2650             : 
    2651       75926 :         if (t == NULL)
    2652             :                 return LOG_ERR;
    2653             : 
    2654       75926 :         if ((bat = bind_del_data(tr, t, NULL)) == NULL)
    2655             :                 return LOG_ERR;
    2656             : 
    2657       75926 :         if (isbat)
    2658         319 :                 ok = storage_delete_bat(tr, t, bat, ib);
    2659             :         else
    2660       75607 :                 ok = storage_delete_val(tr, t, bat, *(oid*)ib);
    2661             :         return ok;
    2662             : }
    2663             : 
    2664             : static size_t
    2665           0 : dcount_col(sql_trans *tr, sql_column *c)
    2666             : {
    2667           0 :         sql_delta *b;
    2668             : 
    2669           0 :         if (!isTable(c->t))
    2670             :                 return 0;
    2671           0 :         b = col_timestamp_delta(tr, c);
    2672           0 :         if (!b)
    2673             :                 return 1;
    2674             : 
    2675           0 :         storage *s = ATOMIC_PTR_GET(&c->t->data);
    2676           0 :         if (!s || !s->segs->t)
    2677             :                 return 1;
    2678           0 :         size_t cnt = s->segs->t->end;
    2679           0 :         if (cnt) {
    2680           0 :                 BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
    2681           0 :                 size_t dcnt = 0;
    2682             : 
    2683           0 :                 if (v)
    2684           0 :                         dcnt = BATguess_uniques(v, NULL);
    2685           0 :                 return dcnt;
    2686             :         }
    2687             :         return cnt;
    2688             : }
    2689             : 
    2690             : static BAT *
    2691     3942853 : bind_no_view(BAT *b, bool quick)
    2692             : {
    2693     3942853 :         if (VIEWtparent(b)) { /* If it is a view get the parent BAT */
    2694     3938715 :                 BAT *nb = BBP_desc(VIEWtparent(b));
    2695     3938715 :                 bat_destroy(b);
    2696     3938812 :                 if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
    2697             :                         return NULL;
    2698             :         }
    2699             :         return b;
    2700             : }
    2701             : 
    2702             : static int
    2703           0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
    2704             : {
    2705           0 :         int ok = 0;
    2706           0 :         assert(tr->active);
    2707           0 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2708           0 :                 return 0;
    2709           0 :         lock_column(tr->store, c->base.id);
    2710           0 :         if (unique_est) {
    2711           0 :                 sql_delta *d;
    2712           0 :                 if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
    2713           0 :                         BAT *b;
    2714           0 :                         if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
    2715           0 :                                 MT_lock_set(&b->theaplock);
    2716           0 :                                 b->tunique_est = *unique_est;
    2717           0 :                                 MT_lock_unset(&b->theaplock);
    2718           0 :                                 bat_destroy(b);
    2719             :                         }
    2720             :                 }
    2721             :         }
    2722           0 :         if (min) {
    2723           0 :                 _DELETE(c->min);
    2724           0 :                 size_t minlen = ATOMlen(c->type.type->localtype, min);
    2725           0 :                 if ((c->min = GDKmalloc(minlen)) != NULL) {
    2726           0 :                         memcpy(c->min, min, minlen);
    2727           0 :                         ok = 1;
    2728             :                 }
    2729             :         }
    2730           0 :         if (max) {
    2731           0 :                 _DELETE(c->max);
    2732           0 :                 size_t maxlen = ATOMlen(c->type.type->localtype, max);
    2733           0 :                 if ((c->max = GDKmalloc(maxlen)) != NULL) {
    2734           0 :                         memcpy(c->max, max, maxlen);
    2735           0 :                         ok = 1;
    2736             :                 }
    2737             :         }
    2738           0 :         unlock_column(tr->store, c->base.id);
    2739           0 :         return ok;
    2740             : }
    2741             : 
    2742             : static int
    2743          19 : min_max_col(sql_trans *tr, sql_column *c)
    2744             : {
    2745          19 :         int ok = 0;
    2746          19 :         BAT *b = NULL;
    2747          19 :         sql_delta *d = NULL;
    2748             : 
    2749          19 :         assert(tr->active);
    2750          19 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2751           0 :                 return 0;
    2752          19 :         if (c->min && c->max)
    2753             :                 return 1;
    2754          19 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2755          19 :                 if (d->cs.st == ST_FOR)
    2756             :                         return 0;
    2757          19 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2758          19 :                 lock_column(tr->store, c->base.id);
    2759          19 :                 if (c->min && c->max) {
    2760           0 :                         unlock_column(tr->store, c->base.id);
    2761           0 :                         return 1;
    2762             :                 }
    2763          19 :                 _DELETE(c->min);
    2764          19 :                 _DELETE(c->max);
    2765          19 :                 if ((b = bind_col(tr, c, access))) {
    2766          19 :                         if (!(b = bind_no_view(b, false))) {
    2767           0 :                                 unlock_column(tr->store, c->base.id);
    2768           0 :                                 return 0;
    2769             :                         }
    2770          19 :                         BATiter bi = bat_iterator(b);
    2771          19 :                         if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
    2772          16 :                                 const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
    2773          16 :                                 size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
    2774             : 
    2775          16 :                                 if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
    2776           0 :                                         _DELETE(c->min);
    2777           0 :                                         _DELETE(c->max);
    2778             :                                 } else {
    2779          16 :                                         memcpy(c->min, nmin, minlen);
    2780          16 :                                         memcpy(c->max, nmax, maxlen);
    2781          16 :                                         ok = 1;
    2782             :                                 }
    2783             :                         }
    2784          19 :                         bat_iterator_end(&bi);
    2785          19 :                         bat_destroy(b);
    2786             :                 }
    2787          19 :                 unlock_column(tr->store, c->base.id);
    2788             :         }
    2789             :         return ok;
    2790             : }
    2791             : 
    2792             : static size_t
    2793          17 : count_segs(segment *s)
    2794             : {
    2795          17 :         size_t nr = 0;
    2796             : 
    2797          72 :         for( ; s; s = ATOMIC_PTR_GET(&s->next))
    2798          55 :                 nr++;
    2799          17 :         return nr;
    2800             : }
    2801             : 
    2802             : static size_t
    2803          34 : count_del(sql_trans *tr, sql_table *t, int access)
    2804             : {
    2805          34 :         storage *d;
    2806             : 
    2807          34 :         if (!isTable(t))
    2808             :                 return 0;
    2809          34 :         d = tab_timestamp_storage(tr, t);
    2810          34 :         if (!d)
    2811             :                 return 0;
    2812          34 :         if (access == 2)
    2813           0 :                 return d->cs.ucnt;
    2814          34 :         if (access == 1)
    2815           0 :                 return count_inserts(d->segs->h, tr);
    2816          34 :         if (access == 10) /* special case for counting the number of segments */
    2817          17 :                 return count_segs(d->segs->h);
    2818          17 :         return count_deletes(d->segs->h, tr);
    2819             : }
    2820             : 
    2821             : static int
    2822       18316 : sorted_col(sql_trans *tr, sql_column *col)
    2823             : {
    2824       18316 :         int sorted = 0;
    2825             : 
    2826       18316 :         assert(tr->active);
    2827       18316 :         if (!isTable(col->t) || !col->t->s)
    2828             :                 return 0;
    2829             : 
    2830       18316 :         if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
    2831       18298 :                 BAT *b = bind_col(tr, col, QUICK);
    2832             : 
    2833       18298 :                 if (b)
    2834       18298 :                         sorted = b->tsorted || b->trevsorted;
    2835             :         }
    2836             :         return sorted;
    2837             : }
    2838             : 
    2839             : static int
    2840        6909 : unique_col(sql_trans *tr, sql_column *col)
    2841             : {
    2842        6909 :         int distinct = 0;
    2843             : 
    2844        6909 :         assert(tr->active);
    2845        6909 :         if (!isTable(col->t) || !col->t->s)
    2846             :                 return 0;
    2847             : 
    2848        6909 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2849        6909 :                 BAT *b = bind_col(tr, col, QUICK);
    2850             : 
    2851        6909 :                 if (b)
    2852        6909 :                         distinct = b->tkey;
    2853             :         }
    2854             :         return distinct;
    2855             : }
    2856             : 
    2857             : static int
    2858        1868 : double_elim_col(sql_trans *tr, sql_column *col)
    2859             : {
    2860        1868 :         int de = 0;
    2861        1868 :         sql_delta *d;
    2862             : 
    2863        1868 :         assert(tr->active);
    2864        1868 :         if (!isTable(col->t) || !col->t->s)
    2865             :                 return 0;
    2866             : 
    2867        1868 :         if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
    2868           6 :                 if (d->cs.st == ST_DICT) {
    2869           6 :                         BAT *b = bind_col(tr, col, QUICK);
    2870           6 :                         if (b && b->ttype == TYPE_bte)
    2871             :                                 de = 1;
    2872           0 :                         else if (b && b->ttype == TYPE_sht)
    2873        1868 :                                 de = 2;
    2874             :                 }
    2875        1862 :         } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
    2876        1862 :                 BAT *b = bind_col(tr, col, QUICK);
    2877             : 
    2878        1862 :                 if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
    2879        1862 :                         de = GDK_ELIMDOUBLES(b->tvheap);
    2880        1862 :                         if (de)
    2881        1750 :                                 de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
    2882             :                 }
    2883        1750 :                 assert(de >= 0 && de <= 16);
    2884             :         }
    2885             :         return de;
    2886             : }
    2887             : 
    2888             : static int
    2889     3977758 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
    2890             : {
    2891     3977758 :         int ok = 0;
    2892     3977758 :         BAT *b = NULL, *off = NULL, *upv = NULL;
    2893     3977758 :         sql_delta *d = NULL;
    2894             : 
    2895     3977758 :         (void) tr;
    2896     3977758 :         assert(tr->active);
    2897     3977758 :         *nonil = false;
    2898     3977758 :         *unique = false;
    2899     3977758 :         *unique_est = 0.0;
    2900     3977758 :         if (!c || !isTable(c->t) || !c->t->s)
    2901             :                 return ok;
    2902             : 
    2903     3977520 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2904     3938944 :                 if (d->cs.st == ST_FOR) {
    2905          27 :                         *nonil = true; /* TODO for min/max. I will do it later */
    2906          27 :                         return ok;
    2907             :                 }
    2908     3938917 :                 int eclass = c->type.type->eclass;
    2909     3938917 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2910     3938917 :                 if ((b = bind_col(tr, c, access))) {
    2911     3939656 :                         if (!(b = bind_no_view(b, false)))
    2912           0 :                                 return ok;
    2913     3940052 :                         BATiter bi = bat_iterator(b);
    2914     3939291 :                         *nonil = bi.nonil && !bi.nil;
    2915             : 
    2916     3939291 :                         if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
    2917     3629505 :                                 d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
    2918     2342323 :                                 if (c->min && VALinit(min, bi.type, c->min))
    2919             :                                         ok |= 1;
    2920     2342269 :                                 else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
    2921     2332391 :                                         ok |= 1;
    2922     2342239 :                                 if (c->max && VALinit(max, bi.type, c->max))
    2923          54 :                                         ok |= 2;
    2924     2342196 :                                 else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
    2925     2326096 :                                         ok |= 2;
    2926             :                         }
    2927     3939163 :                         if (d->cs.ucnt == 0) {
    2928     3935046 :                                 if (d->cs.st == ST_DEFAULT) {
    2929     3934346 :                                         *unique = bi.key;
    2930     3934346 :                                         *unique_est = bi.unique_est;
    2931     3934346 :                                         if (*unique_est == 0)
    2932     1263865 :                                                 *unique_est = (double)BATguess_uniques(b,NULL);
    2933         700 :                                 } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
    2934             :                                         /* for dict, check the offsets bat for uniqueness */
    2935         700 :                                         MT_lock_set(&off->theaplock);
    2936         700 :                                         *unique = off->tkey;
    2937         700 :                                         *unique_est = off->tunique_est;
    2938         700 :                                         MT_lock_unset(&off->theaplock);
    2939             :                                 }
    2940             :                         }
    2941     3939551 :                         bat_iterator_end(&bi);
    2942     3939815 :                         bat_destroy(b);
    2943     3939894 :                         if (*nonil && d->cs.ucnt > 0) {
    2944             :                                 /* This could use a quick descriptor */
    2945        2842 :                                 if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
    2946           0 :                                         *nonil = false;
    2947             :                                 } else {
    2948        2842 :                                         MT_lock_set(&upv->theaplock);
    2949        2842 :                                         *nonil &= upv->tnonil && !upv->tnil;
    2950        2842 :                                         MT_lock_unset(&upv->theaplock);
    2951        2842 :                                         bat_destroy(upv);
    2952             :                                 }
    2953             :                         }
    2954             :                 }
    2955             :         }
    2956             :         return ok;
    2957             : }
    2958             : 
    2959             : static int
    2960         257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
    2961             : {
    2962         257 :         assert(tr->active);
    2963         257 :         if (!isTable(col->t) || !col->t->s)
    2964             :                 return LOG_OK;
    2965             : 
    2966         252 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2967         252 :                 BAT *b = bind_col(tr, col, QUICK);
    2968             : 
    2969         252 :                 if (b) { /* add props for ranges [min, max> */
    2970         252 :                         MT_lock_set(&b->theaplock);
    2971         252 :                         if (add_range) {
    2972         179 :                                 BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
    2973         179 :                                 if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
    2974         103 :                                         BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
    2975             :                                 else
    2976          76 :                                         BATrmprop_nolock(b, GDK_MAX_BOUND);
    2977         179 :                                 if (!pt->with_nills || !col->null)
    2978         117 :                                         BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    2979             :                         } else {
    2980          73 :                                 BATrmprop_nolock(b, GDK_MIN_BOUND);
    2981          73 :                                 BATrmprop_nolock(b, GDK_MAX_BOUND);
    2982          73 :                                 BATrmprop_nolock(b, GDK_NOT_NULL);
    2983             :                         }
    2984         252 :                         MT_lock_unset(&b->theaplock);
    2985             :                 }
    2986             :         }
    2987             :         return LOG_OK;
    2988             : }
    2989             : 
    2990             : static int
    2991        4809 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
    2992             : {
    2993        4809 :         assert(tr->active);
    2994        4809 :         if (!isTable(col->t) || !col->t->s)
    2995             :                 return LOG_OK;
    2996             : 
    2997        4779 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2998        4779 :                 BAT *b = bind_col(tr, col, QUICK);
    2999             : 
    3000        4779 :                 if (b) { /* add props for ranges [min, max> */
    3001        4779 :                         if (not_null) {
    3002        4777 :                                 BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    3003             :                         } else {
    3004           2 :                                 BATrmprop(b, GDK_NOT_NULL);
    3005             :                         }
    3006             :                 }
    3007             :         }
    3008             :         return LOG_OK;
    3009             : }
    3010             : 
    3011             : static int
    3012       33030 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
    3013             : {
    3014       33030 :         sqlstore *store = tr->store;
    3015       33030 :         int bid = log_find_bat(store->logger, id);
    3016       33030 :         if (bid <= 0)
    3017             :                 return LOG_ERR;
    3018       33030 :         cs->bid = temp_dup(bid);
    3019       33030 :         cs->ucnt = 0;
    3020       33030 :         cs->uibid = e_bat(TYPE_oid);
    3021       33030 :         cs->uvbid = e_bat(type);
    3022       33030 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    3023             :                 return LOG_ERR;
    3024             :         return LOG_OK;
    3025             : }
    3026             : 
    3027             : static int
    3028       69457 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
    3029             : {
    3030       69457 :         int res = LOG_OK;
    3031       69457 :         gdk_return ok;
    3032       69457 :         BAT *b = temp_descriptor(bat->cs.bid);
    3033             : 
    3034       69457 :         if (b == NULL)
    3035             :                 return LOG_ERR;
    3036             : 
    3037       69457 :         if (!bat->cs.uibid)
    3038       69449 :                 bat->cs.uibid = e_bat(TYPE_oid);
    3039       69457 :         if (!bat->cs.uvbid)
    3040       69449 :                 bat->cs.uvbid = e_bat(b->ttype);
    3041       69457 :         if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
    3042           0 :                 res = LOG_ERR;
    3043       69457 :         if (GDKinmemory(0)) {
    3044         178 :                 bat_destroy(b);
    3045         178 :                 return res;
    3046             :         }
    3047             : 
    3048       69279 :         bat_set_access(b, BAT_READ);
    3049       69279 :         sqlstore *store = tr->store;
    3050       69279 :         ok = log_bat_persists(store->logger, b, id);
    3051       69279 :         bat_destroy(b);
    3052       69279 :         if(res != LOG_OK)
    3053             :                 return res;
    3054       69279 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3055             : }
    3056             : 
    3057             : static int
    3058           0 : new_persistent_delta( sql_delta *bat)
    3059             : {
    3060           0 :         bat->cs.ucnt = 0;
    3061           0 :         return LOG_OK;
    3062             : }
    3063             : 
    3064             : static void
    3065      133395 : create_delta( sql_delta *d, BAT *b)
    3066             : {
    3067      133395 :         bat_set_access(b, BAT_READ);
    3068      133395 :         d->cs.bid = temp_create(b);
    3069      133395 :         d->cs.uibid = d->cs.uvbid = 0;
    3070      133395 :         d->cs.ucnt = 0;
    3071      133395 : }
    3072             : 
    3073             : static bat
    3074        7381 : copyBat (bat i, int type, oid seq)
    3075             : {
    3076        7381 :         BAT *b, *tb;
    3077        7381 :         bat res;
    3078             : 
    3079        7381 :         if (!i)
    3080             :                 return i;
    3081        7381 :         tb = quick_descriptor(i);
    3082        7381 :         if (tb == NULL)
    3083             :                 return 0;
    3084        7381 :         b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
    3085        7381 :         if (b == NULL)
    3086             :                 return 0;
    3087             : 
    3088        7381 :         bat_set_access(b, BAT_READ);
    3089             : 
    3090        7381 :         res = temp_create(b);
    3091        7381 :         bat_destroy(b);
    3092        7381 :         return res;
    3093             : }
    3094             : 
    3095             : static int
    3096      158716 : create_col(sql_trans *tr, sql_column *c)
    3097             : {
    3098      158716 :         int ok = LOG_OK, new = 0;
    3099      158716 :         int type = c->type.type->localtype;
    3100      158716 :         sql_delta *bat = ATOMIC_PTR_GET(&c->data);
    3101             : 
    3102      158716 :         if (!bat) {
    3103      158716 :                 new = 1;
    3104      158716 :                 bat = ZNEW(sql_delta);
    3105      158716 :                 if (!bat)
    3106             :                         return LOG_ERR;
    3107      158716 :                 ATOMIC_PTR_SET(&c->data, bat);
    3108      158716 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3109             :         }
    3110             : 
    3111      158716 :         if (new)
    3112      158716 :                 bat->cs.ts = tr->tid;
    3113             : 
    3114      158716 :         if (!isNew(c)&& !isTempTable(c->t)){
    3115       25299 :                 bat->cs.ts = tr->ts;
    3116       25299 :                 ok = load_cs(tr, &bat->cs, type, c->base.id);
    3117       25299 :                 if (ok == LOG_OK && c->storage_type) {
    3118           4 :                         if (strcmp(c->storage_type, "DICT") == 0) {
    3119           2 :                                 sqlstore *store = tr->store;
    3120           2 :                                 int bid = log_find_bat(store->logger, -c->base.id);
    3121           2 :                                 if (bid <= 0)
    3122             :                                         return LOG_ERR;
    3123           2 :                                 bat->cs.ebid = temp_dup(bid);
    3124           2 :                                 bat->cs.st = ST_DICT;
    3125           2 :                         } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
    3126           2 :                                 bat->cs.st = ST_FOR;
    3127             :                         }
    3128             :                 }
    3129       25299 :                 return ok;
    3130      133417 :         } else if (bat && bat->cs.bid) {
    3131           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
    3132             :         } else {
    3133      133417 :                 sql_column *fc = NULL;
    3134      133417 :                 size_t cnt = 0;
    3135             : 
    3136             :                 /* alter ? */
    3137      133417 :                 if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
    3138       80935 :                         storage *s = tab_timestamp_storage(tr, fc->t);
    3139       80935 :                         if (s == NULL)
    3140             :                                 return LOG_ERR;
    3141       80935 :                         cnt = segs_end(s->segs, tr, c->t);
    3142             :                 }
    3143      133417 :                 if (cnt && fc != c) {
    3144          22 :                         sql_delta *d = ATOMIC_PTR_GET(&fc->data);
    3145             : 
    3146          22 :                         if (d->cs.bid) {
    3147          22 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3148          22 :                                 if(bat->cs.bid == BID_NIL)
    3149          22 :                                         ok = LOG_ERR;
    3150             :                         }
    3151          22 :                         if (d->cs.uibid) {
    3152          10 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3153          10 :                                 if (bat->cs.uibid == BID_NIL)
    3154          22 :                                         ok = LOG_ERR;
    3155             :                         }
    3156          22 :                         if (d->cs.uvbid) {
    3157          10 :                                 bat->cs.uvbid = e_bat(type);
    3158          10 :                                 if(bat->cs.uvbid == BID_NIL)
    3159           0 :                                         ok = LOG_ERR;
    3160             :                         }
    3161             :                 } else {
    3162      133395 :                         BAT *b = bat_new(type, c->t->sz, PERSISTENT);
    3163      133395 :                         if (!b) {
    3164             :                                 ok = LOG_ERR;
    3165             :                         } else {
    3166      133395 :                                 create_delta(ATOMIC_PTR_GET(&c->data), b);
    3167      133395 :                                 bat_destroy(b);
    3168             :                         }
    3169             : 
    3170      133395 :                         if (!new) {
    3171           0 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3172           0 :                                 if (bat->cs.uibid == BID_NIL)
    3173           0 :                                         ok = LOG_ERR;
    3174           0 :                                 bat->cs.uvbid = e_bat(type);
    3175           0 :                                 if(bat->cs.uvbid == BID_NIL)
    3176           0 :                                         ok = LOG_ERR;
    3177             :                         }
    3178             :                 }
    3179      133417 :                 bat->cs.ucnt = 0;
    3180             : 
    3181      133417 :                 if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
    3182          94 :                         trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
    3183             :         }
    3184             :         return ok;
    3185             : }
    3186             : 
    3187             : static int
    3188       63001 : log_create_col_(sql_trans *tr, sql_column *c)
    3189             : {
    3190       63001 :         assert(!isTempTable(c->t));
    3191       63001 :         return log_create_delta(tr,  ATOMIC_PTR_GET(&c->data), c->base.id);
    3192             : }
    3193             : 
    3194             : static int
    3195          90 : log_create_col(sql_trans *tr, sql_change *change)
    3196             : {
    3197          90 :         return log_create_col_(tr, (sql_column*)change->obj);
    3198             : }
    3199             : 
    3200             : static int
    3201      120873 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
    3202             : {
    3203      120873 :         (void) t; // TODO transaction_layer_revamp: remove if unnecessary
    3204      120873 :         (void)oldest;
    3205      120873 :         assert(delta->cs.ts == tr->tid);
    3206      120873 :         delta->cs.ts = commit_ts;
    3207             : 
    3208      120873 :         assert(delta->next == NULL);
    3209      120873 :         if (!delta->cs.merged)
    3210      120872 :                 delta->nr_updates += merge_delta(delta);
    3211      120873 :         if (!tr->parent)
    3212      120869 :                 base->new = 0;
    3213      120873 :         return LOG_OK;
    3214             : }
    3215             : 
    3216             : static int
    3217          94 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3218             : {
    3219          94 :         sql_column *c = (sql_column*)change->obj;
    3220          94 :         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3221          94 :         if (!tr->parent)
    3222          93 :                 c->base.new = 0;
    3223          94 :         return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
    3224             : }
    3225             : 
    3226             : /* will be called for new idx's and when new index columns are created */
    3227             : static int
    3228        9801 : create_idx(sql_trans *tr, sql_idx *ni)
    3229             : {
    3230        9801 :         int ok = LOG_OK, new = 0;
    3231        9801 :         sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
    3232        9801 :         int type = TYPE_lng;
    3233             : 
    3234        9801 :         if (oid_index(ni->type))
    3235         954 :                 type = TYPE_oid;
    3236             : 
    3237        9801 :         if (!bat) {
    3238        9801 :                 new = 1;
    3239        9801 :                 bat = ZNEW(sql_delta);
    3240        9801 :                 if (!bat)
    3241             :                         return LOG_ERR;
    3242        9801 :                 ATOMIC_PTR_INIT(&ni->data, bat);
    3243        9801 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3244             :         }
    3245             : 
    3246        9801 :         if (new)
    3247        9801 :                 bat->cs.ts = tr->tid;
    3248             : 
    3249        9801 :         if (!isNew(ni) && !isTempTable(ni->t)){
    3250        2442 :                 bat->cs.ts = 1;
    3251        2442 :                 return load_cs(tr, &bat->cs, type, ni->base.id);
    3252        7359 :         } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
    3253           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
    3254             :         } else {
    3255        7359 :                 sql_column *c = ol_first_node(ni->t->columns)->data;
    3256        7359 :                 sql_delta *d = col_timestamp_delta(tr, c);
    3257             : 
    3258        7359 :                 if (d) {
    3259             :                         /* Here we also handle indices created through alter stmts */
    3260             :                         /* These need to be created aligned to the existing data */
    3261        7359 :                         if (d->cs.bid) {
    3262        7359 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3263        7359 :                                 if(bat->cs.bid == BID_NIL)
    3264        7359 :                                         ok = LOG_ERR;
    3265             :                         }
    3266             :                 } else {
    3267             :                         return LOG_ERR;
    3268             :                 }
    3269             : 
    3270        7359 :                 bat->cs.ucnt = 0;
    3271             : 
    3272        7359 :                 if (!new) {
    3273           0 :                         bat->cs.uibid = e_bat(TYPE_oid);
    3274           0 :                         if (bat->cs.uibid == BID_NIL)
    3275           0 :                                 ok = LOG_ERR;
    3276           0 :                         bat->cs.uvbid = e_bat(type);
    3277           0 :                         if(bat->cs.uvbid == BID_NIL)
    3278           0 :                                 ok = LOG_ERR;
    3279             :                 }
    3280        7359 :                 bat->cs.ucnt = 0;
    3281        7359 :                 if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
    3282         632 :                         trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
    3283             :         }
    3284             :         return ok;
    3285             : }
    3286             : 
    3287             : static int
    3288        6456 : log_create_idx_(sql_trans *tr, sql_idx *i)
    3289             : {
    3290        6456 :         assert(!isTempTable(i->t));
    3291        6456 :         return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3292             : }
    3293             : 
    3294             : static int
    3295         618 : log_create_idx(sql_trans *tr, sql_change *change)
    3296             : {
    3297         618 :         return log_create_idx_(tr, (sql_idx*)change->obj);
    3298             : }
    3299             : 
    3300             : static int
    3301         632 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3302             : {
    3303         632 :         sql_idx *i = (sql_idx*)change->obj;
    3304         632 :         sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3305         632 :         if (!tr->parent)
    3306         632 :                 i->base.new = 0;
    3307         632 :         return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
    3308             :         return LOG_OK;
    3309             : }
    3310             : 
    3311             : static int
    3312        5289 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
    3313             : {
    3314        5289 :         int ok = load_cs(tr, &s->cs, TYPE_msk, id);
    3315        5289 :         BAT *b = NULL, *ib = NULL;
    3316             : 
    3317        5289 :         if (ok != LOG_OK)
    3318             :                 return ok;
    3319        5289 :         if (!(b = temp_descriptor(s->cs.bid)))
    3320             :                 return LOG_ERR;
    3321        5289 :         ib = b;
    3322             : 
    3323        5289 :         if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
    3324           0 :                 bat_destroy(ib);
    3325           0 :                 return LOG_ERR;
    3326             :         }
    3327             : 
    3328        5289 :         if (BATcount(b)) {
    3329         435 :                 if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
    3330           0 :                         bat_destroy(ib);
    3331           0 :                         return LOG_ERR;
    3332             :                 }
    3333         673 :                 if (BATtdense(b)) {
    3334         238 :                         size_t start = b->tseqbase;
    3335         238 :                         size_t cnt = BATcount(b);
    3336         238 :                         ok = delete_range(tr, t, s, start, cnt);
    3337             :                 } else {
    3338         197 :                         assert(b->tsorted);
    3339         197 :                         BUN icnt = BATcount(b);
    3340         197 :                         BATiter bi = bat_iterator(b);
    3341         197 :                         size_t lcnt = 1;
    3342         197 :                         oid n;
    3343         197 :                         segment *seg = s->segs->h;
    3344         197 :                         if (complex_cand(b)) {
    3345           0 :                                 oid o = * (oid *) Tpos(&bi, 0);
    3346           0 :                                 n = o + 1;
    3347           0 :                                 for (BUN i = 1; i < icnt; i++) {
    3348           0 :                                         o = * (oid *) Tpos(&bi, i);
    3349           0 :                                         if (o == n) {
    3350           0 :                                                 lcnt++;
    3351           0 :                                                 n++;
    3352             :                                         } else {
    3353           0 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3354             :                                                         break;
    3355             :                                                 lcnt = 0;
    3356             :                                         }
    3357           0 :                                         if (!lcnt) {
    3358           0 :                                                 n = o + 1;
    3359           0 :                                                 lcnt = 1;
    3360             :                                         }
    3361             :                                 }
    3362             :                         } else {
    3363         197 :                                 oid *o = bi.base;
    3364         197 :                                 n = o[0]+1;
    3365      294794 :                                 for (size_t i=1; i<icnt; i++) {
    3366      294597 :                                         if (o[i] == n) {
    3367      291852 :                                                 lcnt++;
    3368      291852 :                                                 n++;
    3369             :                                         } else {
    3370        2745 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3371             :                                                         break;
    3372             :                                                 lcnt = 0;
    3373             :                                         }
    3374      294597 :                                         if (!lcnt) {
    3375        2745 :                                                 n = o[i]+1;
    3376        2745 :                                                 lcnt = 1;
    3377             :                                         }
    3378             :                                 }
    3379             :                         }
    3380         197 :                         if (lcnt && ok == LOG_OK)
    3381         197 :                                 ok = delete_range(tr, t, s, n-lcnt, lcnt);
    3382         197 :                         bat_iterator_end(&bi);
    3383             :                 }
    3384         435 :                 if (ok == LOG_OK)
    3385        7042 :                         for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
    3386        6607 :                                 if (seg->ts == tr->tid)
    3387        3436 :                                         seg->ts = 1;
    3388             :         } else {
    3389        4854 :                 if (ok == LOG_OK) {
    3390        4854 :                         BAT *bb = quick_descriptor(s->cs.bid);
    3391             : 
    3392        4854 :                         if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
    3393             :                                 ok = LOG_ERR;
    3394             :                         } else {
    3395        4854 :                                 segment *seg = s->segs->h;
    3396        4854 :                                 if (seg->ts == tr->tid)
    3397        4854 :                                         seg->ts = 1;
    3398             :                         }
    3399             :                 }
    3400             :         }
    3401        5289 :         if (b != ib)
    3402        5289 :                 bat_destroy(b);
    3403        5289 :         bat_destroy(ib);
    3404             : 
    3405        5289 :         return ok;
    3406             : }
    3407             : 
    3408             : static int
    3409       26677 : create_del(sql_trans *tr, sql_table *t)
    3410             : {
    3411       26677 :         int ok = LOG_OK, new = 0;
    3412       26677 :         BAT *b;
    3413       26677 :         storage *bat = ATOMIC_PTR_GET(&t->data);
    3414             : 
    3415       26677 :         if (!bat) {
    3416       26677 :                 new = 1;
    3417       26677 :                 bat = ZNEW(storage);
    3418       26677 :                 if(!bat)
    3419             :                         return LOG_ERR;
    3420       26677 :                 ATOMIC_PTR_INIT(&t->data, bat);
    3421       26677 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3422       26677 :                 bat->cs.ts = tr->tid;
    3423             :         }
    3424             : 
    3425       26677 :         if (!isNew(t) && !isTempTable(t)) {
    3426        5289 :                 bat->cs.ts = tr->ts;
    3427        5289 :                 return load_storage(tr, t, bat, t->base.id);
    3428       21388 :         } else if (bat->cs.bid) {
    3429             :                 return ok;
    3430             :         } else {
    3431       21388 :                 assert(!bat->segs);
    3432       21388 :                 if (!(bat->segs = new_segments(tr, 0)))
    3433             :                         return LOG_ERR;
    3434             : 
    3435       21388 :                 b = bat_new(TYPE_msk, t->sz, PERSISTENT);
    3436       21388 :                 if(b != NULL) {
    3437       21388 :                         bat_set_access(b, BAT_READ);
    3438       21388 :                         bat->cs.bid = temp_create(b);
    3439       21388 :                         bat_destroy(b);
    3440             :                 } else {
    3441             :                         return LOG_ERR;
    3442             :                 }
    3443       21388 :                 if (new)
    3444       28249 :                         trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
    3445             :         }
    3446             :         return ok;
    3447             : }
    3448             : 
    3449             : static int
    3450      199328 : log_segment(sql_trans *tr, segment *s, sqlid id)
    3451             : {
    3452      199328 :         sqlstore *store = tr->store;
    3453      199328 :         msk m = s->deleted;
    3454      199328 :         return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
    3455             : }
    3456             : 
    3457             : static int
    3458       97813 : log_segments(sql_trans *tr, segments *segs, sqlid id)
    3459             : {
    3460             :         /* log segments */
    3461       97813 :         lock_table(tr->store, id);
    3462      462397 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    3463      364584 :                 unlock_table(tr->store, id);
    3464      364584 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    3465      151343 :                         if (log_segment(tr, seg, id) != LOG_OK) {
    3466             :                                 return LOG_ERR;
    3467             :                         }
    3468             :                 }
    3469      364584 :                 lock_table(tr->store, id);
    3470             :         }
    3471       97813 :         unlock_table(tr->store, id);
    3472       97813 :         return LOG_OK;
    3473             : }
    3474             : 
    3475             : static int
    3476       12763 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
    3477             : {
    3478       12763 :         BAT *b;
    3479       12763 :         int ok = LOG_OK;
    3480             : 
    3481       12763 :         if (GDKinmemory(0))
    3482             :                 return LOG_OK;
    3483             : 
    3484       12730 :         b = temp_descriptor(bat->cs.bid);
    3485       12730 :         if (b == NULL)
    3486             :                 return LOG_ERR;
    3487             : 
    3488       12730 :         sqlstore *store = tr->store;
    3489       12730 :         bat_set_access(b, BAT_READ);
    3490       12730 :         if (ok == LOG_OK)
    3491       12730 :                 ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
    3492       12730 :         if (ok == LOG_OK)
    3493       12730 :                 ok = log_segments(tr, bat->segs, t->base.id);
    3494       12730 :         bat_destroy(b);
    3495       12730 :         return ok;
    3496             : }
    3497             : 
    3498             : static int
    3499       12777 : log_create_del(sql_trans *tr, sql_change *change)
    3500             : {
    3501       12777 :         int ok = LOG_OK;
    3502       12777 :         sql_table *t = (sql_table*)change->obj;
    3503             : 
    3504       12777 :         if (t->base.deleted)
    3505             :                 return ok;
    3506       12763 :         assert(!isTempTable(t));
    3507       12763 :         ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
    3508       12763 :         if (ok == LOG_OK) {
    3509       75674 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3510       62911 :                         sql_column *c = n->data;
    3511             : 
    3512       62911 :                         ok = log_create_col_(tr, c);
    3513             :                 }
    3514       12763 :                 if (t->idxs) {
    3515       18613 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3516        5850 :                                 sql_idx *i = n->data;
    3517             : 
    3518        5850 :                                 if (ATOMIC_PTR_GET(&i->data))
    3519        5838 :                                         ok = log_create_idx_(tr, i);
    3520             :                         }
    3521             :                 }
    3522             :         }
    3523             :         return ok;
    3524             : }
    3525             : 
    3526             : static int
    3527       21390 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3528             : {
    3529       21390 :         int ok = LOG_OK;
    3530       21390 :         sql_table *t = (sql_table*)change->obj;
    3531       21390 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    3532             : 
    3533       21390 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    3534         120 :                 assert(isTempTable(t));
    3535         120 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    3536         120 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    3537         120 :                 return ok;
    3538             :         }
    3539             : 
    3540       21270 :         if (!commit_ts) /* rollback handled by ? */
    3541             :                 return ok;
    3542       19415 :         ok = segments2cs(tr, dbat->segs, &dbat->cs);
    3543       19415 :         assert(ok == LOG_OK);
    3544       19415 :         if (ok != LOG_OK)
    3545             :                 return ok;
    3546       19415 :         merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
    3547       19415 :         assert(dbat->cs.ts == tr->tid);
    3548       19415 :         dbat->cs.ts = commit_ts;
    3549       19415 :         if (ok == LOG_OK) {
    3550      133711 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3551      114296 :                         sql_column *c = n->data;
    3552      114296 :                         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3553             : 
    3554      114296 :                         ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
    3555             :                 }
    3556       19415 :                 if (t->idxs) {
    3557       25278 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3558        5863 :                                 sql_idx *i = n->data;
    3559        5863 :                                 sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3560             : 
    3561        5863 :                                 if (delta)
    3562        5851 :                                         ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
    3563             :                         }
    3564             :                 }
    3565       19415 :                 if (!tr->parent)
    3566       19413 :                         t->base.new = 0;
    3567             :         }
    3568       19415 :         if (!tr->parent)
    3569       19413 :                 t->base.new = 0;
    3570             :         return ok;
    3571             : }
    3572             : 
    3573             : static int
    3574       21350 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
    3575             : {
    3576       21350 :         gdk_return ok = GDK_SUCCEED;
    3577             : 
    3578       21350 :         sqlstore *store = tr->store;
    3579       21350 :         if (!GDKinmemory(0) && b && b->cs.bid)
    3580       18966 :                 ok = log_bat_transient(store->logger, id);
    3581       21350 :         if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
    3582          25 :                 ok = log_bat_transient(store->logger, -id);
    3583       21350 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3584             : }
    3585             : 
    3586             : static int
    3587      178101 : destroy_col(sqlstore *store, sql_column *c)
    3588             : {
    3589      178101 :         (void)store;
    3590      178101 :         if (ATOMIC_PTR_GET(&c->data))
    3591      178101 :                 destroy_delta(ATOMIC_PTR_GET(&c->data), true);
    3592      178101 :         ATOMIC_PTR_SET(&c->data, NULL);
    3593      178101 :         return LOG_OK;
    3594             : }
    3595             : 
    3596             : static int
    3597       19451 : log_destroy_col_(sql_trans *tr, sql_column *c)
    3598             : {
    3599       19451 :         int ok = LOG_OK;
    3600       19451 :         assert(!isTempTable(c->t));
    3601       19451 :         if (!tr->parent) /* don't write save point commits */
    3602       19451 :                 ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
    3603       19451 :         return ok;
    3604             : }
    3605             : 
    3606             : static int
    3607       19451 : log_destroy_col(sql_trans *tr, sql_change *change)
    3608             : {
    3609       19451 :         sql_column *c = (sql_column*)change->obj;
    3610       19451 :         int res = log_destroy_col_(tr, c);
    3611       19451 :         change->obj = NULL;
    3612       19451 :         column_destroy(tr->store, c);
    3613       19451 :         return res;
    3614             : }
    3615             : 
    3616             : static int
    3617       11600 : destroy_idx(sqlstore *store, sql_idx *i)
    3618             : {
    3619       11600 :         (void)store;
    3620       11600 :         if (ATOMIC_PTR_GET(&i->data))
    3621       11600 :                 destroy_delta(ATOMIC_PTR_GET(&i->data), true);
    3622       11600 :         ATOMIC_PTR_SET(&i->data, NULL);
    3623       11600 :         return LOG_OK;
    3624             : }
    3625             : 
    3626             : static int
    3627        1975 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
    3628             : {
    3629        1975 :         int ok = LOG_OK;
    3630        1975 :         assert(!isTempTable(i->t));
    3631        1975 :         if (ATOMIC_PTR_GET(&i->data)) {
    3632        1899 :                 if (!tr->parent) /* don't write save point commits */
    3633        1899 :                         ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3634             :         }
    3635        1975 :         return ok;
    3636             : }
    3637             : 
    3638             : static int
    3639        1975 : log_destroy_idx(sql_trans *tr, sql_change *change)
    3640             : {
    3641        1975 :         sql_idx *i = (sql_idx*)change->obj;
    3642        1975 :         int res = log_destroy_idx_(tr, i);
    3643        1975 :         change->obj = NULL;
    3644        1975 :         idx_destroy(tr->store, i);
    3645        1975 :         return res;
    3646             : }
    3647             : 
    3648             : static int
    3649       28248 : destroy_del(sqlstore *store, sql_table *t)
    3650             : {
    3651       28248 :         (void)store;
    3652       28248 :         if (ATOMIC_PTR_GET(&t->data))
    3653       28244 :                 destroy_storage(ATOMIC_PTR_GET(&t->data));
    3654       28248 :         ATOMIC_PTR_SET(&t->data, NULL);
    3655       28248 :         return LOG_OK;
    3656             : }
    3657             : 
    3658             : static int
    3659        3311 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
    3660             : {
    3661        3311 :         gdk_return ok = GDK_SUCCEED;
    3662             : 
    3663        3311 :         sqlstore *store = tr->store;
    3664        3311 :         if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
    3665        3311 :             bat && bat->cs.bid)
    3666        3311 :                 ok = log_bat_transient(store->logger, id);
    3667        3311 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3668             : }
    3669             : 
    3670             : static int
    3671        3311 : log_destroy_del(sql_trans *tr, sql_change *change)
    3672             : {
    3673        3311 :         int ok = LOG_OK;
    3674        3311 :         sql_table *t = (sql_table*)change->obj;
    3675             : 
    3676        3311 :         assert(!isTempTable(t));
    3677        3311 :         ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
    3678        3311 :         return ok;
    3679             : }
    3680             : 
    3681             : static int
    3682       24825 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3683             : {
    3684       24825 :         (void)tr;
    3685       24825 :         (void)change;
    3686       24825 :         (void)commit_ts;
    3687       24825 :         (void)oldest;
    3688       24825 :         if (commit_ts)
    3689       24802 :                 change->handled = true;
    3690       24825 :         return 0;
    3691             : }
    3692             : 
    3693             : static int
    3694        3382 : drop_del(sql_trans *tr, sql_table *t)
    3695             : {
    3696        3382 :         int ok = LOG_OK;
    3697             : 
    3698        3382 :         if (!isNew(t)) {
    3699        3382 :                 storage *bat = ATOMIC_PTR_GET(&t->data);
    3700        3447 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, isTempTable(t) ? NULL : &log_destroy_del);
    3701             :         }
    3702        3382 :         return ok;
    3703             : }
    3704             : 
    3705             : static int
    3706       19466 : drop_col(sql_trans *tr, sql_column *c)
    3707             : {
    3708       19466 :         assert(!isNew(c));
    3709       19466 :         sql_delta *d = ATOMIC_PTR_GET(&c->data);
    3710       19466 :         trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, isTempTable(c->t) ? NULL : &log_destroy_col);
    3711       19466 :         return LOG_OK;
    3712             : }
    3713             : 
    3714             : static int
    3715        1977 : drop_idx(sql_trans *tr, sql_idx *i)
    3716             : {
    3717        1977 :         assert(!isNew(i));
    3718        1977 :         sql_delta *d = ATOMIC_PTR_GET(&i->data);
    3719        1977 :         trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, isTempTable(i->t) ? NULL : &log_destroy_idx);
    3720        1977 :         return LOG_OK;
    3721             : }
    3722             : 
    3723             : 
    3724             : static BUN
    3725      129461 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
    3726             : {
    3727      129461 :         BAT *b;
    3728      129461 :         BUN sz = 0;
    3729             : 
    3730      129461 :         (void)tr;
    3731      129461 :         assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
    3732      129461 :         if (cs->bid && renew) {
    3733      129457 :                 b = quick_descriptor(cs->bid);
    3734      129490 :                 if (b) {
    3735      129490 :                         sz += BATcount(b);
    3736      129490 :                         if (cs->st == ST_DICT) {
    3737           2 :                                 bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
    3738           2 :                                 BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
    3739             : 
    3740           2 :                                 if (nebid == BID_NIL || !n) {
    3741           0 :                                         temp_destroy(nebid);
    3742           0 :                                         bat_destroy(n);
    3743           0 :                                         return BUN_NONE;
    3744             :                                 }
    3745           2 :                                 temp_destroy(cs->ebid);
    3746           2 :                                 cs->ebid = nebid;
    3747           2 :                                 if (!temp)
    3748           2 :                                         bat_set_access(n, BAT_READ);
    3749           2 :                                 temp_destroy(cs->bid);
    3750           2 :                                 cs->bid = temp_create(n); /* create empty copy */
    3751           2 :                                 bat_destroy(n);
    3752             :                         } else {
    3753      129488 :                                 bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
    3754             : 
    3755      129270 :                                 if (nbid == BID_NIL)
    3756             :                                         return BUN_NONE;
    3757      129270 :                                 temp_destroy(cs->bid);
    3758      129410 :                                 cs->bid = nbid;
    3759             :                         }
    3760             :                 } else {
    3761             :                         return BUN_NONE;
    3762             :                 }
    3763             :         }
    3764      129416 :         if (cs->uibid) {
    3765      129273 :                 temp_destroy(cs->uibid);
    3766      129347 :                 cs->uibid = 0;
    3767             :         }
    3768      129490 :         if (cs->uvbid) {
    3769      129346 :                 temp_destroy(cs->uvbid);
    3770      129350 :                 cs->uvbid = 0;
    3771             :         }
    3772      129494 :         cs->cleared = true;
    3773      129494 :         cs->ucnt = 0;
    3774      129494 :         return sz;
    3775             : }
    3776             : 
    3777             : static BUN
    3778      129305 : clear_col(sql_trans *tr, sql_column *c, bool renew)
    3779             : {
    3780      129305 :         bool update_conflict = false;
    3781      129305 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    3782             : 
    3783      129305 :         if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
    3784           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3785      129342 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    3786      129342 :         if (odelta != delta)
    3787      129347 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    3788      129332 :         if (delta)
    3789      129332 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
    3790             :         return 0;
    3791             : }
    3792             : 
    3793             : static BUN
    3794          21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
    3795             : {
    3796          21 :         bool update_conflict = false;
    3797          21 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    3798             : 
    3799          21 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    3800          15 :                 return 0;
    3801           6 :         if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
    3802           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3803           6 :         assert(i->t->persistence != SQL_DECLARED_TABLE);
    3804           6 :         if (odelta != delta)
    3805           6 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    3806           6 :         if (delta)
    3807           6 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
    3808             :         return 0;
    3809             : }
    3810             : 
    3811             : static int
    3812         142 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
    3813             : {
    3814         142 :         if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
    3815             :                 return LOG_ERR;
    3816         142 :         if (s->segs)
    3817         142 :                 destroy_segments(s->segs);
    3818         142 :         if (!(s->segs = new_segments(tr, 0)))
    3819             :                 return LOG_ERR;
    3820             :         return LOG_OK;
    3821             : }
    3822             : 
    3823             : 
    3824             : /*
    3825             :  * Clear the table, in general this means replacing the storage,
    3826             :  * but in case of earlier deletes (or inserts by this transaction), we only mark
    3827             :  * all segments as deleted.
    3828             :  * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
    3829             :  */
    3830             : static BUN
    3831       41831 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
    3832             : {
    3833       41831 :         int clear = !in_transaction, ok = LOG_OK;
    3834       41831 :         bool conflict = false;
    3835       41831 :         storage *bat;
    3836             : 
    3837       41882 :         if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
    3838       15825 :                 return conflict?BUN_NONE-1:BUN_NONE;
    3839             : 
    3840       26007 :         if (!clear) {
    3841          51 :                 lock_table(tr->store, t->base.id);
    3842          51 :                 ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
    3843          51 :                 unlock_table(tr->store, t->base.id);
    3844             :         }
    3845       26007 :         assert(t->persistence != SQL_DECLARED_TABLE);
    3846       26007 :         if (!in_transaction)
    3847       25959 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    3848       26005 :         if (ok == LOG_ERR)
    3849             :                 return BUN_NONE;
    3850       26005 :         if (ok == LOG_CONFLICT)
    3851           0 :                 return BUN_NONE - 1;
    3852             :         return LOG_OK;
    3853             : }
    3854             : 
    3855             : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
    3856             : static BUN
    3857       41811 : clear_table(sql_trans *tr, sql_table *t)
    3858             : {
    3859       41811 :         node *n = ol_first_node(t->columns);
    3860       41811 :         sql_column *c = n->data;
    3861       41811 :         storage *d = tab_timestamp_storage(tr, t);
    3862       41827 :         int in_transaction, clear;
    3863       41827 :         BUN sz, clear_ok;
    3864             : 
    3865       41827 :         if (!d)
    3866             :                 return BUN_NONE;
    3867       41827 :         lock_table(tr->store, t->base.id);
    3868       41831 :         in_transaction = segments_in_transaction(tr, t);
    3869       41832 :         unlock_table(tr->store, t->base.id);
    3870       41831 :         clear = !in_transaction;
    3871       41831 :         sz = count_col(tr, c, CNT_ACTIVE);
    3872       41831 :         if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
    3873             :                 return clear_ok;
    3874             : 
    3875       26005 :         if (in_transaction)
    3876             :                 return sz;
    3877             : 
    3878      155296 :         for (; n; n = n->next) {
    3879      129340 :                 c = n->data;
    3880             : 
    3881      129340 :                 if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
    3882           0 :                         return clear_ok;
    3883             :         }
    3884       25956 :         if (t->idxs) {
    3885       25977 :                 for (n = ol_first_node(t->idxs); n; n = n->next) {
    3886          21 :                         sql_idx *ci = n->data;
    3887             : 
    3888          21 :                         if (isTable(ci->t) && idx_has_column(ci->type) &&
    3889          21 :                                 (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
    3890           0 :                                 return clear_ok;
    3891             :                 }
    3892             :         }
    3893       25956 :         if (clear)
    3894       25956 :                 d->segs->nr_reused = 0;
    3895       25956 :         return sz;
    3896             : }
    3897             : 
    3898             : static int
    3899      158775 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
    3900             : {
    3901      158775 :         sqlstore *store = tr->store;
    3902      158775 :         gdk_return ok = GDK_SUCCEED;
    3903             : 
    3904      158775 :         (void) t;
    3905      158775 :         (void) segs;
    3906      158775 :         if (GDKinmemory(0))
    3907             :                 return LOG_OK;
    3908             : 
    3909      158768 :         if (cs->cleared) {
    3910      155358 :                 assert(cs->ucnt == 0);
    3911      155358 :                 BAT *ins = temp_descriptor(cs->bid);
    3912      155358 :                 if (!ins)
    3913             :                         return LOG_ERR;
    3914      155358 :                 assert(!isEbat(ins));
    3915      155358 :                 bat_set_access(ins, BAT_READ);
    3916      155358 :                 ok = log_bat_persists(store->logger, ins, id);
    3917      155358 :                 bat_destroy(ins);
    3918      155358 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    3919          56 :                         BAT *ins = temp_descriptor(cs->ebid);
    3920          56 :                         if (!ins)
    3921             :                                 return LOG_ERR;
    3922          56 :                         assert(!isEbat(ins));
    3923          56 :                         bat_set_access(ins, BAT_READ);
    3924          56 :                         ok = log_bat_persists(store->logger, ins, -id);
    3925          56 :                         bat_destroy(ins);
    3926             :                 }
    3927      155358 :                 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3928             :         }
    3929             : 
    3930        3410 :         assert(!isTempTable(t));
    3931             : 
    3932        3410 :         if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
    3933        2819 :                 BAT *ui = temp_descriptor(cs->uibid);
    3934        2819 :                 BAT *uv = temp_descriptor(cs->uvbid);
    3935             :                 /* any updates */
    3936        2819 :                 if (ui == NULL || uv == NULL) {
    3937             :                         ok = GDK_FAIL;
    3938        2819 :                 } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
    3939        2819 :                         ok = log_delta(store->logger, ui, uv, id);
    3940        2819 :                 bat_destroy(ui);
    3941        2819 :                 bat_destroy(uv);
    3942             :         }
    3943        2819 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3944             : }
    3945             : 
    3946             : static inline int
    3947       59133 : tr_log_table_start(sql_trans *tr, sql_table *t) {
    3948       59133 :         sqlstore *store = tr->store;
    3949       59133 :         return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3950             : }
    3951             : 
    3952             : static inline int
    3953       59133 : tr_log_table_end(sql_trans *tr, sql_table *t) {
    3954       59133 :         sqlstore *store = tr->store;
    3955       59133 :         return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3956             : }
    3957             : 
    3958             : static int
    3959       59133 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
    3960             : {
    3961       59133 :         sqlstore *store = tr->store;
    3962       59133 :         gdk_return ok = GDK_SUCCEED;
    3963             : 
    3964       59133 :         size_t end = segs_end(segs, tr, t);
    3965             : 
    3966       59133 :         if (tr_log_table_start(tr, t) != LOG_OK)
    3967             :                 return LOG_ERR;
    3968             : 
    3969       59133 :         size_t nr_appends = 0;
    3970             : 
    3971       59133 :         lock_table(tr->store, t->base.id);
    3972      385143 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    3973      326010 :                 unlock_table(tr->store, t->base.id);
    3974             : 
    3975      326010 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    3976      120621 :                         if (!seg->deleted) {
    3977       47985 :                                 if (log_segment(tr, seg, t->base.id) != LOG_OK)
    3978             :                                         return LOG_ERR;
    3979             : 
    3980       47985 :                                 nr_appends += (seg->end - seg->start);
    3981             :                         }
    3982             :                 }
    3983      326010 :                 lock_table(tr->store, t->base.id);
    3984             :         }
    3985       59133 :         unlock_table(tr->store, t->base.id);
    3986             : 
    3987      411567 :         for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
    3988      352434 :                 sql_column *c = n->data;
    3989      352434 :                 column_storage *cs = ATOMIC_PTR_GET(&c->data);
    3990             : 
    3991      352434 :                 if (cs->cleared) {
    3992           3 :                         ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    3993           3 :                         continue;
    3994             :                 }
    3995             : 
    3996      352431 :                 lock_table(tr->store, t->base.id);
    3997      352431 :                 if (!cs->cleared) {
    3998     2290491 :                         for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    3999     1938060 :                                 unlock_table(tr->store, t->base.id);
    4000     1938060 :                                 if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4001             :                                         /* append col*/
    4002      271681 :                                         BAT *ins = temp_descriptor(cs->bid);
    4003      271681 :                                         if (ins == NULL)
    4004             :                                                 return LOG_ERR;
    4005      271681 :                                         assert(BATcount(ins) >= cur->end);
    4006      271681 :                                         ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
    4007      271681 :                                         bat_destroy(ins);
    4008             :                                 }
    4009     1938060 :                                 lock_table(tr->store, t->base.id);
    4010             :                         }
    4011             :                 }
    4012      352431 :                 unlock_table(tr->store, t->base.id);
    4013             : 
    4014      352431 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    4015          19 :                         BAT *ins = temp_descriptor(cs->ebid);
    4016          19 :                         if (ins == NULL)
    4017             :                                 return LOG_ERR;
    4018          19 :                         if (BATcount(ins) > ins->batInserted)
    4019          17 :                                 ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
    4020          19 :                         BATcommit(ins, BATcount(ins));
    4021          19 :                         bat_destroy(ins);
    4022             :                 }
    4023             :         }
    4024             : 
    4025       59133 :         if (t->idxs) {
    4026       64320 :                 for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
    4027        5187 :                         sql_idx *i = n->data;
    4028             : 
    4029        5187 :                         if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    4030        4409 :                                 continue;
    4031         778 :                         column_storage *cs = ATOMIC_PTR_GET(&i->data);
    4032             : 
    4033         778 :                         if (cs) {
    4034         778 :                                 if (cs->cleared) {
    4035           0 :                                         ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    4036           0 :                                         continue;
    4037             :                                 }
    4038             : 
    4039         778 :                                 lock_table(tr->store, t->base.id);
    4040        2387 :                                 for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    4041        1609 :                                         unlock_table(tr->store, t->base.id);
    4042        1609 :                                         if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4043             :                                                 /* append idx */
    4044         730 :                                                 BAT *ins = temp_descriptor(cs->bid);
    4045         730 :                                                 if (ins == NULL)
    4046             :                                                         return LOG_ERR;
    4047         730 :                                                 assert(BATcount(ins) >= cur->end);
    4048         730 :                                                 ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
    4049         730 :                                                 bat_destroy(ins);
    4050             :                                         }
    4051        1609 :                                         lock_table(tr->store, t->base.id);
    4052             :                                 }
    4053         778 :                                 unlock_table(tr->store, t->base.id);
    4054             :                         }
    4055             :                 }
    4056             :         }
    4057             : 
    4058       59133 :         if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
    4059           0 :                 return LOG_ERR;
    4060             : 
    4061             :         return LOG_OK;
    4062             : }
    4063             : 
    4064             : static int
    4065       85083 : log_storage(sql_trans *tr, sql_table *t, storage *s)
    4066             : {
    4067       85083 :         int ok = LOG_OK;
    4068       85083 :         bool cleared = s->cs.cleared;
    4069       85083 :         if (ok == LOG_OK && cleared)
    4070       25950 :                 ok =  tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
    4071       25950 :         if (ok == LOG_OK)
    4072       85083 :                 ok = log_segments(tr, s->segs, t->base.id);
    4073       85083 :         if (ok == LOG_OK && !cleared)
    4074       59133 :                 ok = log_table_append(tr, t, s->segs);
    4075       85083 :         return ok;
    4076             : }
    4077             : 
    4078             : static void
    4079      422625 : merge_cs( column_storage *cs, const char* caller)
    4080             : {
    4081      422625 :         if (cs->bid && cs->ucnt) {
    4082        2828 :                 BAT *cur = temp_descriptor(cs->bid);
    4083        2828 :                 BAT *ui = temp_descriptor(cs->uibid);
    4084        2828 :                 BAT *uv = temp_descriptor(cs->uvbid);
    4085             : 
    4086        2828 :                 if (!cur || !ui || !uv) {
    4087           0 :                         bat_destroy(ui);
    4088           0 :                         bat_destroy(uv);
    4089           0 :                         bat_destroy(cur);
    4090           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4091             :                         return;
    4092             :                 }
    4093        2828 :                 assert(BATcount(ui) == BATcount(uv));
    4094             : 
    4095             :                 /* any updates */
    4096        2828 :                 assert(!isEbat(cur));
    4097        2828 :                 if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
    4098           0 :                         bat_destroy(ui);
    4099           0 :                         bat_destroy(uv);
    4100           0 :                         bat_destroy(cur);
    4101           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4102             :                         return;
    4103             :                 }
    4104             :                 /* cleanup the old deltas */
    4105        2828 :                 temp_destroy(cs->uibid);
    4106        2828 :                 temp_destroy(cs->uvbid);
    4107        2828 :                 cs->uibid = e_bat(TYPE_oid);
    4108        2828 :                 cs->uvbid = e_bat(cur->ttype);
    4109        2828 :                 assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
    4110        2828 :                 cs->ucnt = 0;
    4111        2828 :                 bat_destroy(ui);
    4112        2828 :                 bat_destroy(uv);
    4113        2828 :                 bat_destroy(cur);
    4114             :         }
    4115      422625 :         cs->cleared = false;
    4116      422625 :         cs->merged = true;
    4117      422625 :         return;
    4118             : }
    4119             : 
    4120             : static lng
    4121      379936 : merge_delta( sql_delta *obat)
    4122             : {
    4123      379936 :         lng res = 0;
    4124      379936 :         if (obat && obat->next && !obat->cs.merged)
    4125      132832 :                 res += merge_delta(obat->next);
    4126      379936 :         res += obat->cs.ucnt;
    4127      379936 :         merge_cs(&obat->cs, __func__);
    4128      379936 :         return res;
    4129             : }
    4130             : 
    4131             : static void
    4132       42689 : merge_storage(storage *tdb)
    4133             : {
    4134       42689 :         merge_cs(&tdb->cs, __func__);
    4135             : 
    4136       42689 :         if (tdb->next) {
    4137         452 :                 destroy_storage(tdb->next);
    4138         452 :                 tdb->next = NULL;
    4139             :         }
    4140       42689 : }
    4141             : 
    4142             : static sql_delta *
    4143           1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
    4144             : {
    4145             :         /* commit ie copy back to the parent transaction */
    4146           1 :         if (delta && delta->cs.ts == commit_ts && delta->next) {
    4147           1 :                 sql_delta *od = delta->next;
    4148           1 :                 if (od->cs.ts == commit_ts) {
    4149           0 :                         sql_delta t = *od, *n = od->next;
    4150           0 :                         *od = *delta;
    4151           0 :                         od->next = n;
    4152           0 :                         *delta = t;
    4153           0 :                         delta->next = NULL;
    4154           0 :                         destroy_delta(delta, true);
    4155           0 :                         return od;
    4156             :                 }
    4157             :         }
    4158             :         return delta;
    4159             : }
    4160             : 
    4161             : static int
    4162      132796 : log_update_col( sql_trans *tr, sql_change *change)
    4163             : {
    4164      132796 :         sql_column *c = (sql_column*)change->obj;
    4165      132796 :         assert(!isTempTable(c->t));
    4166             : 
    4167      132796 :         if (isDeleted(c->t)) {
    4168           0 :                 change->handled = true;
    4169           0 :                 return LOG_OK;
    4170             :         }
    4171             : 
    4172      132796 :         if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
    4173      132796 :                 storage *s = ATOMIC_PTR_GET(&c->t->data);
    4174      132796 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    4175      132796 :                 return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
    4176             :         }
    4177             :         return LOG_OK;
    4178             : }
    4179             : 
    4180             : static int
    4181         217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
    4182             : {
    4183         217 :         sqlstore *store = Store;
    4184             : 
    4185         217 :         sql_delta *d = (sql_delta*)change->data;
    4186         217 :         if (d->cs.ts < oldest) {
    4187          82 :                 destroy_delta(d, false);
    4188          82 :                 if (change->commit == &commit_update_idx)
    4189           2 :                         table_destroy(store, ((sql_idx*)change->obj)->t);
    4190             :                 else
    4191          80 :                         table_destroy(store, ((sql_column*)change->obj)->t);
    4192          82 :                 return 1;
    4193             :         }
    4194         135 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4195          82 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4196             :         return 0;
    4197             : }
    4198             : 
    4199             : static int
    4200           9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
    4201             : {
    4202           9 :         sqlstore *store = Store;
    4203             : 
    4204           9 :         storage *d = (storage*)change->data;
    4205           9 :         if (d->cs.ts < oldest) {
    4206           3 :                 destroy_storage(d);
    4207           3 :                 table_destroy(store, (sql_table*)change->obj);
    4208           3 :                 return 1;
    4209             :         }
    4210           6 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4211           3 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4212             :         return 0;
    4213             : }
    4214             : 
    4215             : static int
    4216      132914 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
    4217             : {
    4218      132914 :         (void) type; // TODO transaction_layer_revamp remove if remains unused
    4219             : 
    4220      132914 :         sql_delta *delta = ATOMIC_PTR_GET(data), *idelta = delta;
    4221             : 
    4222      132914 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4223           3 :                 int ok = LOG_OK;
    4224           3 :                 assert(isTempTable(t));
    4225           3 :                 if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
    4226           0 :                         ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
    4227           3 :                 if (!tr->parent)
    4228           0 :                         t->base.new = base->new = 0;
    4229           3 :                 change->handled = true;
    4230           3 :                 return ok;
    4231             :         }
    4232             : 
    4233      132911 :         if (commit_ts)
    4234      132829 :                 delta->cs.ts = commit_ts;
    4235      132911 :         if (!commit_ts) { /* rollback */
    4236          82 :                 sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
    4237             : 
    4238          82 :                 if (change->ts && t->base.new) /* handled by create col */
    4239             :                         return LOG_OK;
    4240          82 :                 if (o != d) {
    4241           0 :                         while(o && o->next != d)
    4242             :                                 o = o->next;
    4243             :                 }
    4244          82 :                 if (o == ATOMIC_PTR_GET(data))
    4245          82 :                         ATOMIC_PTR_SET(data, d->next);
    4246             :                 else
    4247           0 :                         o->next = d->next;
    4248          82 :                 d->next = NULL;
    4249          82 :                 change->cleanup = &tc_gc_rollbacked;
    4250      132829 :         } else if (!tr->parent) {
    4251             :                 /* merge deltas */
    4252      276067 :                 while (delta && delta->cs.ts > oldest)
    4253      143239 :                         delta = delta->next;
    4254      132828 :                 if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
    4255       24738 :                         lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
    4256       24738 :                         idelta->nr_updates += merge_delta(delta);
    4257       24738 :                         unlock_column(tr->store, base->id);
    4258             :                 }
    4259           1 :         } else if (tr->parent) /* move delta into older and cleanup current save points */
    4260           1 :                 ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
    4261             :         return LOG_OK;
    4262             : }
    4263             : 
    4264             : static int
    4265      132886 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4266             : {
    4267             : 
    4268      132886 :         sql_column *c = (sql_column*)change->obj;
    4269      132886 :         sql_base* base = &c->base;
    4270      132886 :         sql_table* t = c->t;
    4271      132886 :         ATOMIC_PTR_TYPE* data = &c->data;
    4272      132886 :         int type = c->type.type->localtype;
    4273             : 
    4274      132886 :         if (change->handled || isDeleted(c->t))
    4275             :                 return LOG_OK;
    4276             : 
    4277      132886 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4278             : }
    4279             : 
    4280             : static int
    4281          26 : log_update_idx( sql_trans *tr, sql_change *change)
    4282             : {
    4283          26 :         sql_idx *i = (sql_idx*)change->obj;
    4284          26 :         assert(!isTempTable(i->t));
    4285             : 
    4286          26 :         if (isDeleted(i->t)) {
    4287           0 :                 change->handled = true;
    4288           0 :                 return LOG_OK;
    4289             :         }
    4290             : 
    4291          26 :         if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
    4292          26 :                 storage *s = ATOMIC_PTR_GET(&i->t->data);
    4293          26 :                 sql_delta *d = ATOMIC_PTR_GET(&i->data);
    4294          26 :                 return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
    4295             :         }
    4296             :         return LOG_OK;
    4297             : }
    4298             : 
    4299             : static int
    4300          28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4301             : {
    4302          28 :         sql_idx *i = (sql_idx*)change->obj;
    4303          28 :         sql_base* base = &i->base;
    4304          28 :         sql_table* t = i->t;
    4305          28 :         ATOMIC_PTR_TYPE* data = &i->data;
    4306          28 :         int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
    4307             : 
    4308          28 :         if (change->handled || isDeleted(i->t))
    4309             :                 return LOG_OK;
    4310             : 
    4311          28 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4312             : }
    4313             : 
    4314             : static storage *
    4315          26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
    4316             : {
    4317          26 :         if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
    4318           0 :                 storage *od = dbat->next;
    4319           0 :                 if (od->cs.ts == commit_ts) {
    4320           0 :                         storage t = *od, *n = od->next;
    4321           0 :                         *od = *dbat;
    4322           0 :                         od->next = n;
    4323           0 :                         *dbat = t;
    4324           0 :                         dbat->next = NULL;
    4325           0 :                         destroy_storage(dbat);
    4326           0 :                         return od;
    4327             :                 }
    4328             :         }
    4329             :         return dbat;
    4330             : }
    4331             : 
    4332             : static int
    4333       85083 : log_update_del( sql_trans *tr, sql_change *change)
    4334             : {
    4335       85083 :         sql_table *t = (sql_table*)change->obj;
    4336       85083 :         assert(!isTempTable(t));
    4337             : 
    4338       85083 :         if (isDeleted(t)) {
    4339           0 :                 change->handled = true;
    4340           0 :                 return LOG_OK;
    4341             :         }
    4342             : 
    4343       85083 :         if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
    4344       85083 :                 return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
    4345             :         return LOG_OK;
    4346             : }
    4347             : 
    4348             : static int
    4349       89714 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4350             : {
    4351       89714 :         int ok = LOG_OK;
    4352       89714 :         sql_table *t = (sql_table*)change->obj;
    4353       89714 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    4354             : 
    4355       89714 :         if (change->handled || isDeleted(t))
    4356             :                 return ok;
    4357             : 
    4358       89714 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4359          22 :                 assert(isTempTable(t));
    4360          22 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    4361          22 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    4362          22 :                 change->handled = true;
    4363          22 :                 return ok;
    4364             :         }
    4365             : 
    4366       89692 :         lock_table(tr->store, t->base.id);
    4367       89692 :         if (!commit_ts) { /* rollback */
    4368        4308 :                 if (dbat->cs.ts == tr->tid) {
    4369           6 :                         if (change->ts && t->base.new) { /* handled by the create table */
    4370           3 :                                 unlock_table(tr->store, t->base.id);
    4371           3 :                                 return ok;
    4372             :                         }
    4373           3 :                         storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
    4374             : 
    4375           3 :                         if (o != d) {
    4376           0 :                                 while(o && o->next != d)
    4377             :                                         o = o->next;
    4378             :                         }
    4379           3 :                         if (o == ATOMIC_PTR_GET(&t->data)) {
    4380           3 :                                 assert(d->next);
    4381           3 :                                 ATOMIC_PTR_SET(&t->data, d->next);
    4382             :                         } else
    4383           0 :                                 o->next = d->next;
    4384           3 :                         d->next = NULL;
    4385           3 :                         change->cleanup = &tc_gc_rollbacked_storage;
    4386             :                 } else
    4387        4302 :                         rollback_segments(dbat->segs, tr, change, oldest);
    4388       85384 :         } else if (ok == LOG_OK && !tr->parent) {
    4389       85358 :                 if (dbat->cs.ts == tr->tid) /* cleared table */
    4390       25952 :                         dbat->cs.ts = commit_ts;
    4391             : 
    4392       85358 :                 ok = segments2cs(tr, dbat->segs, &dbat->cs);
    4393       85358 :                 if (ok == LOG_OK) {
    4394       85358 :                         merge_segments(dbat, tr, change, commit_ts, oldest);
    4395       85358 :                         if (oldest == commit_ts)
    4396       42689 :                                 merge_storage(dbat);
    4397             :                 }
    4398       85358 :                 if (dbat)
    4399       85358 :                         dbat->cs.cleared = false;
    4400          26 :         } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
    4401          26 :                 merge_segments(dbat, tr, change, commit_ts, oldest);
    4402          26 :                 ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
    4403          26 :                 storage *s = change->data;
    4404          26 :                 if (s->cs.ts == tr->tid)
    4405           0 :                         s->cs.ts = commit_ts;
    4406             :         }
    4407       89689 :         unlock_table(tr->store, t->base.id);
    4408       89689 :         return ok;
    4409             : }
    4410             : 
    4411             : /* only rollback (content version) case for now */
    4412             : static int
    4413       19626 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
    4414             : {
    4415       19626 :         sqlstore *store = Store;
    4416       19626 :         sql_column *c = (sql_column*)change->obj;
    4417             : 
    4418       19626 :         if (!c) /* cleaned earlier */
    4419             :                 return 1;
    4420             : 
    4421         175 :         if (change->handled || isDeleted(c->t)) {
    4422           0 :                 column_destroy(store, c);
    4423           0 :                 return 1;
    4424             :         }
    4425             : 
    4426             :         /* savepoint commit (did it merge ?) */
    4427         175 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4428           0 :                 column_destroy(store, c);
    4429           0 :                 return 1;
    4430             :         }
    4431         175 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4432             :                 return 0;
    4433         174 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4434         174 :         if (d && d->next) {
    4435             : 
    4436          66 :                 if (d->cs.ts > oldest)
    4437             :                         return LOG_OK; /* cannot cleanup yet */
    4438             : 
    4439             :                 // d is oldest reachable delta
    4440          63 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4441          62 :                         destroy_delta(d->next, true);
    4442          62 :                         d->next = NULL;
    4443             :                 }
    4444          63 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4445          63 :                 id->nr_updates += merge_delta(d);
    4446          63 :                 unlock_column(store, c->base.id);
    4447             :         }
    4448         171 :         column_destroy(store, c);
    4449         171 :         return 1;
    4450             : }
    4451             : 
    4452             : static int
    4453      743856 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
    4454             : {
    4455      743856 :         sqlstore *store = Store;
    4456      743856 :         sql_column *c = (sql_column*)change->obj;
    4457             : 
    4458      743856 :         if (!c) /* cleaned earlier */
    4459             :                 return 1;
    4460             : 
    4461      743856 :         if (change->handled || isDeleted(c->t)) {
    4462           3 :                 table_destroy(store, c->t);
    4463           3 :                 return 1;
    4464             :         }
    4465             : 
    4466             :         /* savepoint commit (did it merge ?) */
    4467      743853 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4468       31334 :                 table_destroy(store, c->t);
    4469       31334 :                 return 1;
    4470             :         }
    4471      712519 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4472             :                 return 0;
    4473      712518 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4474      712518 :         if (d && d->next) {
    4475             : 
    4476      712518 :                 if (d->cs.ts > oldest)
    4477             :                         return LOG_OK; /* cannot cleanup yet */
    4478             : 
    4479             :                 // d is oldest reachable delta
    4480      101405 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4481        5232 :                         destroy_delta(d->next, true);
    4482        5232 :                         d->next = NULL;
    4483             :                 }
    4484      101405 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4485      101405 :                 id->nr_updates += merge_delta(d);
    4486      101405 :                 unlock_column(store, c->base.id);
    4487             :         }
    4488      101405 :         table_destroy(store, c->t);
    4489      101405 :         return 1;
    4490             : }
    4491             : 
    4492             : static int
    4493        2609 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
    4494             : {
    4495        2609 :         sqlstore *store = Store;
    4496        2609 :         sql_idx *i = (sql_idx*)change->obj;
    4497             : 
    4498        2609 :         if (!i) /* cleaned earlier */
    4499             :                 return 1;
    4500             : 
    4501         634 :         if (change->handled || isDeleted(i->t)) {
    4502           0 :                 idx_destroy(store, i);
    4503           0 :                 return 1;
    4504             :         }
    4505             : 
    4506             :         /* savepoint commit (did it merge ?) */
    4507         634 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4508           0 :                 idx_destroy(store, i);
    4509           0 :                 return 1;
    4510             :         }
    4511         634 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4512             :                 return 0;
    4513         634 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4514         634 :         if (d && d->next) {
    4515           0 :                 if (d->cs.ts > oldest)
    4516             :                         return LOG_OK; /* cannot cleanup yet */
    4517             : 
    4518             :                 // d is oldest reachable delta
    4519           0 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4520           0 :                         destroy_delta(d->next, true);
    4521           0 :                         d->next = NULL;
    4522             :                 }
    4523           0 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4524           0 :                 id->nr_updates += merge_delta(d);
    4525           0 :                 unlock_column(store, i->base.id);
    4526             :         }
    4527         634 :         idx_destroy(store, i);
    4528         634 :         return 1;
    4529             : }
    4530             : 
    4531             : static int
    4532          26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
    4533             : {
    4534          26 :         sqlstore *store = Store;
    4535          26 :         sql_idx *i = (sql_idx*)change->obj;
    4536             : 
    4537          26 :         if (!i) /* cleaned earlier */
    4538             :                 return 1;
    4539             : 
    4540          26 :         if (change->handled || isDeleted(i->t)) {
    4541           0 :                 table_destroy(store, i->t);
    4542           0 :                 return 1;
    4543             :         }
    4544             : 
    4545             :         /* savepoint commit (did it merge ?) */
    4546          26 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4547           0 :                 table_destroy(store, i->t);
    4548           0 :                 return 1;
    4549             :         }
    4550          26 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4551             :                 return 0;
    4552          26 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4553          26 :         if (d && d->next) {
    4554          26 :                 if (d->cs.ts > oldest)
    4555             :                         return LOG_OK; /* cannot cleanup yet */
    4556             : 
    4557             :                 // d is oldest reachable delta
    4558          26 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4559          26 :                         destroy_delta(d->next, true);
    4560          26 :                         d->next = NULL;
    4561             :                 }
    4562          26 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4563          26 :                 id->nr_updates += merge_delta(d);
    4564          26 :                 unlock_column(store, i->base.id);
    4565             :         }
    4566          26 :         table_destroy(store, i->t);
    4567          26 :         return 1;
    4568             : }
    4569             : 
    4570             : static int
    4571      222604 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
    4572             : {
    4573      222604 :         sqlstore *store = Store;
    4574      222604 :         sql_table *t = (sql_table*)change->obj;
    4575             : 
    4576      222604 :         if (change->handled || isDeleted(t)) {
    4577        3556 :                 table_destroy(store, t);
    4578        3556 :                 return 1;
    4579             :         }
    4580             :         /* savepoint commit (did it merge ?) */
    4581      219048 :         if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
    4582        6325 :                 table_destroy(store, t);
    4583        6325 :                 return 1;
    4584             :         }
    4585      212723 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4586             :                 return 0;
    4587      212695 :         storage *d = (storage*)change->data;
    4588      212695 :         if (d->next) {
    4589      140738 :                 if (d->cs.ts > oldest)
    4590             :                         return LOG_OK; /* cannot cleanup yet */
    4591             : 
    4592       19176 :                 destroy_storage(d->next);
    4593       19176 :                 d->next = NULL;
    4594             :         }
    4595       91133 :         table_destroy(store, t);
    4596       91133 :         return 1;
    4597             : }
    4598             : 
    4599             : static int
    4600       30511 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
    4601             : {
    4602       30511 :         if (nr == 0)
    4603             :                 return LOG_OK;
    4604       30511 :         assert (nr > 0);
    4605       30511 :         if ((!offsets || !*offsets) && nr == total) {
    4606       30476 :                 *offset = slot;
    4607       30476 :                 return LOG_OK;
    4608             :         }
    4609          35 :         if (!*offsets) {
    4610           7 :                 *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
    4611           7 :                 if (!*offsets)
    4612             :                         return LOG_ERR;
    4613             :         }
    4614          35 :         oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
    4615       16174 :         for(size_t i = 0; i < nr; i++)
    4616       16139 :                 dst[i] = slot + i;
    4617          35 :         (*offsets)->batCount += nr;
    4618          35 :         (*offsets)->theap->dirty = true;
    4619          35 :         return LOG_OK;
    4620             : }
    4621             : 
    4622             : static int
    4623       30340 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4624             : {
    4625       30340 :         assert(s->segs);
    4626       30340 :         ulng oldest = store_oldest(tr->store, NULL);
    4627       30347 :         BUN slot = 0;
    4628       30347 :         size_t total = cnt;
    4629             : 
    4630       30347 :         if (!locked)
    4631       30404 :                 lock_table(tr->store, t->base.id);
    4632       30554 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4633             :         /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
    4634       61428 :         for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4635       30798 :                 if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* reuse old deleted or rolled back append */
    4636          35 :                         if ((seg->end - seg->start) >= cnt) {
    4637             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4638          13 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4639           2 :                                         slot = p->end;
    4640           2 :                                         p->end += cnt;
    4641           2 :                                         seg->start += cnt;
    4642           2 :                                         if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
    4643             :                                                 ok = LOG_ERR;
    4644             :                                                 break;
    4645             :                                         }
    4646           2 :                                         s->segs->nr_reused += cnt;
    4647           2 :                                         cnt = 0;
    4648           2 :                                         break;
    4649             :                                 }
    4650             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4651          11 :                                 size_t rcnt = seg->end - seg->start;
    4652          11 :                                 if (rcnt > cnt)
    4653             :                                         rcnt = cnt;
    4654          11 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
    4655             :                                         ok = LOG_ERR;
    4656             :                                         break;
    4657             :                                 }
    4658             :                         }
    4659          33 :                         seg->ts = tr->tid;
    4660          33 :                         seg->deleted = false;
    4661          33 :                         slot = seg->start;
    4662          33 :                         if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
    4663             :                                 ok = LOG_ERR;
    4664             :                                 break;
    4665             :                         }
    4666           1 :                         s->segs->nr_reused += (seg->end - seg->start);
    4667           1 :                         cnt -= (seg->end - seg->start);
    4668             :                 }
    4669             :         }
    4670       30632 :         if (ok == LOG_OK && cnt) {
    4671       30619 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4672       29477 :                         slot = s->segs->t->end;
    4673       29477 :                         s->segs->t->end += cnt;
    4674             :                 } else {
    4675        1142 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4676             :                                 ok = LOG_ERR;
    4677             :                         } else {
    4678        1159 :                                 if (!s->segs->h)
    4679           0 :                                         s->segs->h = s->segs->t;
    4680        1159 :                                 slot = s->segs->t->start;
    4681             :                         }
    4682             :                 }
    4683       30636 :                 if (ok == LOG_OK)
    4684       30636 :                         ok = add_offsets(slot, cnt, total, offset, offsets);
    4685             :         }
    4686       30479 :         if (!locked)
    4687       30519 :                 unlock_table(tr->store, t->base.id);
    4688             : 
    4689       30622 :         if (ok == LOG_OK) {
    4690             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4691       30667 :                 if (!in_transaction) {
    4692        1144 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4693        1144 :                         in_transaction = true;
    4694             :                 }
    4695       30667 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4696       30612 :                         tr->logchanges += (lng) total;
    4697       30648 :                 if (*offsets) {
    4698          28 :                         BAT *pos = *offsets;
    4699          28 :                         assert(BATcount(pos) == total);
    4700          28 :                         BATsetcount(pos, total); /* set other properties */
    4701           7 :                         pos->tnil = false;
    4702           7 :                         pos->tnonil = true;
    4703           7 :                         pos->tkey = true;
    4704           7 :                         pos->tsorted = true;
    4705           7 :                         pos->trevsorted = false;
    4706             :                 }
    4707             :         }
    4708       30582 :         return ok;
    4709             : }
    4710             : 
    4711             : static int
    4712     2095269 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4713             : {
    4714     2095269 :         if (cnt > 1 && offsets)
    4715       30341 :                 return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
    4716     2064928 :         assert(s->segs);
    4717     2064928 :         ulng oldest = store_oldest(tr->store, NULL);
    4718     2064941 :         BUN slot = 0;
    4719     2064941 :         int reused = 0;
    4720             : 
    4721     2064941 :         if (!locked)
    4722     1936839 :                 lock_table(tr->store, t->base.id);
    4723     2064983 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4724             :         /* naive vacuum approach, iterator through segments, check for large enough deleted segments
    4725             :          * or create new segment at the end */
    4726     9952547 :         for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4727     7960574 :                 if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* reuse old deleted or rolled back append */
    4728             : 
    4729       73022 :                         if ((seg->end - seg->start) >= cnt) {
    4730             : 
    4731             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4732       73022 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4733       56537 :                                         slot = p->end;
    4734       56537 :                                         p->end += cnt;
    4735       56537 :                                         seg->start += cnt;
    4736       56537 :                                         s->segs->nr_reused += cnt;
    4737       56537 :                                         reused = 1;
    4738       56537 :                                         break;
    4739             :                                 }
    4740             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4741       16485 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
    4742             :                                         ok = LOG_ERR;
    4743             :                                         break;
    4744             :                                 }
    4745             :                         }
    4746       16485 :                         seg->ts = tr->tid;
    4747       16485 :                         seg->deleted = false;
    4748       16485 :                         slot = seg->start;
    4749       16485 :                         s->segs->nr_reused += cnt;
    4750       16485 :                         reused = 1;
    4751       16485 :                         break;
    4752             :                 }
    4753             :         }
    4754     2064995 :         if (ok == LOG_OK && !reused) {
    4755     1991973 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4756     1956764 :                         slot = s->segs->t->end;
    4757     1956764 :                         s->segs->t->end += cnt;
    4758             :                 } else {
    4759       35209 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4760             :                                 ok = LOG_ERR;
    4761             :                         } else {
    4762       35209 :                                 if (!s->segs->h)
    4763           0 :                                         s->segs->h = s->segs->t;
    4764       35209 :                                 slot = s->segs->t->start;
    4765             :                         }
    4766             :                 }
    4767             :         }
    4768     2064995 :         if (!locked)
    4769     1936881 :                 unlock_table(tr->store, t->base.id);
    4770             : 
    4771     2064995 :         if (ok == LOG_OK) {
    4772             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4773     2064995 :                 if (!in_transaction) {
    4774       49293 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4775       49293 :                         in_transaction = true;
    4776             :                 }
    4777     2064995 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4778     2064564 :                         tr->logchanges += (lng) cnt;
    4779     2064995 :                 *offset = slot;
    4780             :         }
    4781             :         return ok;
    4782             : }
    4783             : 
    4784             : /*
    4785             :  * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
    4786             :  * of the global transaction and mark the newly added storage slots unused on the global
    4787             :  * level but used on the local transaction level. Besides this the local transaction needs
    4788             :  * to update (and mark unused) any slot in between the old end and new slots.
    4789             :  * */
    4790             : static int
    4791     1967182 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4792             : {
    4793     1967182 :         storage *s;
    4794             : 
    4795             :         /* we have a single segment structure for each persistent table
    4796             :          * for temporary tables each has its own */
    4797     1967182 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4798             :                 return LOG_ERR;
    4799             : 
    4800     1967275 :         return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
    4801             : }
    4802             : 
    4803             : /* some tables cannot be updated concurrently (user/roles etc) */
    4804             : static int
    4805      128118 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4806             : {
    4807      128118 :         storage *s;
    4808      128118 :         int res = 0;
    4809             : 
    4810             :         /* we have a single segment structure for each persistent table
    4811             :          * for temporary tables each has its own */
    4812      128118 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4813             :                 /* TODO check for other inserts ! */
    4814             :                 return LOG_ERR;
    4815             : 
    4816      128118 :         lock_table(tr->store, t->base.id);
    4817      128118 :         if ((res = segments_conflict(tr, s->segs, 1))) {
    4818           4 :                 unlock_table(tr->store, t->base.id);
    4819           4 :                 return LOG_CONFLICT;
    4820             :         }
    4821      128114 :         res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
    4822      128114 :         unlock_table(tr->store, t->base.id);
    4823      128114 :         return res;
    4824             : }
    4825             : 
    4826             : static int
    4827       20430 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
    4828             : {
    4829       20430 :         storage *s;
    4830       20430 :         int res = 0;
    4831             : 
    4832       20430 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4833             :                 return LOG_ERR;
    4834             : 
    4835       20430 :         lock_table(tr->store, t->base.id);
    4836       20430 :         res = segments_conflict(tr, s->segs, uncommitted);
    4837       20430 :         unlock_table(tr->store, t->base.id);
    4838       20430 :         return res ? LOG_CONFLICT : LOG_OK;
    4839             : }
    4840             : 
    4841             : static size_t
    4842     1612685 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
    4843             : {
    4844     1612685 :         size_t cnt = 0;
    4845             : 
    4846     1959479 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
    4847             :                 ;
    4848             : 
    4849     3989476 :         for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
    4850     2376780 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
    4851      156019 :                         cnt += s->end - s->start;
    4852             :         }
    4853     1612696 :         return cnt;
    4854             : }
    4855             : 
    4856             : static BAT *
    4857     1612654 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
    4858             : {
    4859     1612654 :         lock_table(tr->store, t->base.id);
    4860     1612697 :         segment *s = S->segs->h;
    4861             :         /* step one no deletes -> dense range */
    4862     1612697 :         uint32_t cur = 0;
    4863     1612697 :         size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
    4864     1612698 :         if (!dnr) {
    4865     1483466 :                 unlock_table(tr->store, t->base.id);
    4866     1483434 :                 return BATdense(start, start, end-start);
    4867             :         }
    4868             : 
    4869      129232 :         BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
    4870      129232 :         if (!b) {
    4871           0 :                 unlock_table(tr->store, t->base.id);
    4872           0 :                 return NULL;
    4873             :         }
    4874             : 
    4875      129232 :         uint32_t *restrict dst = Tloc(b, 0);
    4876    62275287 :         for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
    4877    62211495 :                 if (s->end < start)
    4878      260705 :                         continue;
    4879    61950790 :                 if (s->start >= end)
    4880             :                         break;
    4881    61885350 :                 msk m = (SEG_IS_VALID(s, tr));
    4882    61885350 :                 size_t lnr = s->end-s->start;
    4883    61885350 :                 if (s->start < start)
    4884       20047 :                         lnr -= (start - s->start);
    4885    61885350 :                 if (s->end > end)
    4886       15145 :                         lnr -= s->end - end;
    4887             : 
    4888    61885350 :                 if (m) {
    4889     1386613 :                         size_t used = pos&31, end = 32;
    4890     1386613 :                         if (used) {
    4891     1221350 :                                 if (lnr < (32-used))
    4892      746049 :                                         end = used + lnr;
    4893     1221350 :                                 assert(end > used);
    4894     1221350 :                                 cur |= ((1U << (end - used)) - 1) << used;
    4895     1221350 :                                 lnr -= end - used;
    4896     1221350 :                                 pos += end - used;
    4897     1221350 :                                 if (end == 32) {
    4898      475326 :                                         *dst++ = cur;
    4899      475326 :                                         cur = 0;
    4900             :                                 }
    4901             :                         }
    4902     1386613 :                         size_t full = lnr/32;
    4903     1386613 :                         size_t rest = lnr%32;
    4904     1386613 :                         if (full > 0) {
    4905      352417 :                                 memset(dst, ~0, full * sizeof(*dst));
    4906      352417 :                                 dst += full;
    4907      352417 :                                 lnr -= full * 32;
    4908      352417 :                                 pos += full * 32;
    4909             :                         }
    4910     1386613 :                         if (rest > 0) {
    4911      607229 :                                 cur |= (1U << rest) - 1;
    4912      607229 :                                 lnr -= rest;
    4913      607229 :                                 pos += rest;
    4914             :                         }
    4915     1386613 :                         assert(lnr==0);
    4916             :                 } else {
    4917    60498737 :                         size_t used = pos&31, end = 32;
    4918    60498737 :                         if (used) {
    4919    58626116 :                                 if (lnr < (32-used))
    4920    56651560 :                                         end = used + lnr;
    4921             : 
    4922    58626116 :                                 pos+= (end-used);
    4923    58626116 :                                 lnr-= (end-used);
    4924    58626116 :                                 if (end == 32) {
    4925     1974578 :                                         *dst++ = cur;
    4926     1974578 :                                         cur = 0;
    4927             :                                 }
    4928             :                         }
    4929    60498737 :                         size_t full = lnr/32;
    4930    60498737 :                         size_t rest = lnr%32;
    4931    60498737 :                         memset(dst, 0, full * sizeof(*dst));
    4932    60498737 :                         dst += full;
    4933    60498737 :                         lnr -= full * 32;
    4934    60498737 :                         pos += full * 32;
    4935    60498737 :                         pos+= rest;
    4936    60498737 :                         lnr-= rest;
    4937    60498737 :                         assert(lnr==0);
    4938             :                 }
    4939             :         }
    4940             : 
    4941      129232 :         unlock_table(tr->store, t->base.id);
    4942      129232 :         if (pos%32)
    4943      127246 :                 *dst=cur;
    4944      129232 :         BATsetcount(b, nr);
    4945      129232 :         bn = BATmaskedcands(start, nr, b, true);
    4946      129223 :         BBPreclaim(b);
    4947      129225 :         (void)pos;
    4948      129225 :         assert (pos == nr);
    4949             :         return bn;
    4950             : }
    4951             : 
    4952             : static void *                                   /* BAT * */
    4953     1618375 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
    4954             : {
    4955             :         /* with nr_of_parts - part_nr we can adjust parts */
    4956     1618375 :         storage *s = tab_timestamp_storage(tr, t);
    4957             : 
    4958     1618643 :         if (!s)
    4959             :                 return NULL;
    4960     1618643 :         size_t nr = segs_end(s->segs, tr, t);
    4961             : 
    4962     1619682 :         if (!nr)
    4963        7002 :                 return BATdense(0, 0, 0);
    4964             : 
    4965             :         /* compute proper part */
    4966     1612680 :         size_t part_size = nr/nr_of_parts;
    4967     1612680 :         size_t start = part_size * part_nr;
    4968     1612680 :         size_t end = start + part_size;
    4969     1612680 :         if (part_nr == (nr_of_parts-1))
    4970     1321646 :                 end = nr;
    4971     1612680 :         assert(end <= nr);
    4972     1612680 :         return segments2cands(s, tr, t, start, end);
    4973             : }
    4974             : 
    4975             : static int
    4976           5 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
    4977             : {
    4978           5 :         bool update_conflict = false;
    4979             : 
    4980           5 :         if (segments_in_transaction(tr, col->t))
    4981             :                 return LOG_CONFLICT;
    4982             : 
    4983           5 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    4984             : 
    4985           5 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    4986           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    4987           5 :         assert(d && d->cs.ts == tr->tid);
    4988           5 :         if (odelta != d)
    4989           5 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
    4990           5 :         if (d->cs.bid)
    4991           5 :                 temp_destroy(d->cs.bid);
    4992           5 :         if (d->cs.uibid)
    4993           5 :                 temp_destroy(d->cs.uibid);
    4994           5 :         if (d->cs.uvbid)
    4995           5 :                 temp_destroy(d->cs.uvbid);
    4996           5 :         bat_set_access(bn, BAT_READ);
    4997           5 :         d->cs.bid = temp_create(bn);
    4998           5 :         d->cs.uibid = 0;
    4999           5 :         d->cs.uvbid = 0;
    5000           5 :         d->cs.ucnt = 0;
    5001           5 :         d->cs.cleared = true;
    5002           5 :         d->cs.ts = tr->tid;
    5003           5 :         ATOMIC_INIT(&d->cs.refcnt, 1);
    5004           5 :         return LOG_OK;
    5005             : }
    5006             : 
    5007             : static int
    5008           5 : vacuum_col(sql_trans *tr, sql_column *c, bool force)
    5009             : {
    5010           5 :         if (segments_in_transaction(tr, c->t))
    5011             :                 return LOG_CONFLICT;
    5012             : 
    5013           5 :         sql_delta *d = NULL;
    5014             : 
    5015             :         /* do we have enough to clean */
    5016           5 :         if ((d = bind_col_data(tr, c, NULL)) == NULL)
    5017             :                 return LOG_CONFLICT;
    5018             : 
    5019             :         /* do we have enough to clean */
    5020           5 :         if (!force && (d->nr_updates) < 1024)
    5021             :                 return LOG_OK;
    5022             : 
    5023           5 :         BAT *b = NULL, *bn = NULL;;
    5024           5 :         if ((b = bind_col(tr, c, 0)) == NULL)
    5025             :                 return LOG_ERR;
    5026           5 :         if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
    5027           0 :                 BBPreclaim(b);
    5028           0 :                 return LOG_ERR;
    5029             :         }
    5030           5 :         int res = swap_bats(tr, c, bn);
    5031           5 :         d->nr_updates = 0;
    5032           5 :         BBPreclaim(b);
    5033           5 :         BBPreclaim(bn);
    5034           5 :         return res;
    5035             : }
    5036             : 
    5037             : static int
    5038           0 : vacuum_tab(sql_trans *tr, sql_table *t, bool force)
    5039             : {
    5040           0 :         if (segments_in_transaction(tr, t))
    5041             :                 return LOG_CONFLICT;
    5042             : 
    5043           0 :         storage *s;
    5044           0 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    5045             :                 return LOG_ERR;
    5046             : 
    5047           0 :         for( node *n = ol_first_node(t->columns); n; n = n->next) {
    5048           0 :                 sql_column *c = n->data;
    5049             : 
    5050           0 :                 if (!ATOMvarsized(c->type.type->localtype))
    5051           0 :                         continue;
    5052           0 :                 sql_delta *d = NULL;
    5053             : 
    5054             :                 /* do we have enough to clean */
    5055           0 :                 if ((d = bind_col_data(tr, c, NULL)) == NULL)
    5056             :                         return LOG_CONFLICT;
    5057             : 
    5058             :                 /* do we have enough to clean */
    5059           0 :                 if (!force && (d->nr_updates + s->segs->nr_reused) < 1024)
    5060           0 :                         continue;
    5061             : 
    5062           0 :                 BAT *b = NULL, *bn = NULL;;
    5063           0 :                 if ((b = bind_col(tr, c, 0)) == NULL)
    5064             :                         return LOG_ERR;
    5065           0 :                 if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
    5066           0 :                         BBPreclaim(b);
    5067           0 :                         return LOG_ERR;
    5068             :                 }
    5069           0 :                 int res = swap_bats(tr, c, bn);
    5070           0 :                 d->nr_updates = 0;
    5071           0 :                 BBPreclaim(b);
    5072           0 :                 BBPreclaim(bn);
    5073           0 :                 if (res != LOG_OK)
    5074           0 :                         return res;
    5075             :         }
    5076           0 :         s->segs->nr_reused = 0;
    5077           0 :         return LOG_OK;
    5078             : }
    5079             : 
    5080             : 
    5081             : static int
    5082          58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
    5083             : {
    5084          58 :         bool update_conflict = false;
    5085             : 
    5086          58 :         if (segments_in_transaction(tr, col->t))
    5087             :                 return LOG_CONFLICT;
    5088             : 
    5089          58 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    5090             : 
    5091          58 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    5092           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    5093          58 :         assert(d && d->cs.ts == tr->tid);
    5094          58 :         assert(col->t->persistence != SQL_DECLARED_TABLE);
    5095          58 :         if (odelta != d)
    5096          58 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
    5097             : 
    5098          58 :         d->cs.st = st;
    5099          58 :         d->cs.cleared = true;
    5100          58 :         if (d->cs.bid)
    5101          58 :                 temp_destroy(d->cs.bid);
    5102          58 :         o = transfer_to_systrans(o);
    5103          58 :         if (o == NULL)
    5104             :                 return LOG_ERR;
    5105          58 :         bat_set_access(o, BAT_READ);
    5106          58 :         d->cs.bid = temp_create(o);
    5107          58 :         if (u) {
    5108          53 :                 if (d->cs.ebid)
    5109           0 :                         temp_destroy(d->cs.ebid);
    5110          53 :                 u = transfer_to_systrans(u);
    5111          53 :                 if (u == NULL)
    5112             :                         return LOG_ERR;
    5113          53 :                 d->cs.ebid = temp_create(u);
    5114             :         }
    5115             :         return LOG_OK;
    5116             : }
    5117             : 
    5118             : void
    5119         357 : bat_storage_init( store_functions *sf)
    5120             : {
    5121         357 :         sf->bind_col = &bind_col;
    5122         357 :         sf->bind_updates = &bind_updates;
    5123         357 :         sf->bind_updates_idx = &bind_updates_idx;
    5124         357 :         sf->bind_idx = &bind_idx;
    5125         357 :         sf->bind_cands = &bind_cands;
    5126             : 
    5127         357 :         sf->claim_tab = &claim_tab;
    5128         357 :         sf->key_claim_tab = &key_claim_tab;
    5129         357 :         sf->tab_validate = &tab_validate;
    5130             : 
    5131         357 :         sf->append_col = &append_col;
    5132         357 :         sf->append_idx = &append_idx;
    5133             : 
    5134         357 :         sf->update_col = &update_col;
    5135         357 :         sf->update_idx = &update_idx;
    5136             : 
    5137         357 :         sf->delete_tab = &delete_tab;
    5138             : 
    5139         357 :         sf->count_del = &count_del;
    5140         357 :         sf->count_col = &count_col;
    5141         357 :         sf->count_idx = &count_idx;
    5142         357 :         sf->dcount_col = &dcount_col;
    5143         357 :         sf->min_max_col = &min_max_col;
    5144         357 :         sf->set_stats_col = &set_stats_col;
    5145         357 :         sf->sorted_col = &sorted_col;
    5146         357 :         sf->unique_col = &unique_col;
    5147         357 :         sf->double_elim_col = &double_elim_col;
    5148         357 :         sf->col_stats = &col_stats;
    5149         357 :         sf->col_set_range = &col_set_range;
    5150         357 :         sf->col_not_null = &col_not_null;
    5151             : 
    5152         357 :         sf->col_dup = &col_dup;
    5153         357 :         sf->idx_dup = &idx_dup;
    5154         357 :         sf->del_dup = &del_dup;
    5155             : 
    5156         357 :         sf->create_col = &create_col;    /* create and add to change list */
    5157         357 :         sf->create_idx = &create_idx;
    5158         357 :         sf->create_del = &create_del;
    5159             : 
    5160         357 :         sf->destroy_col = &destroy_col;  /* free resources */
    5161         357 :         sf->destroy_idx = &destroy_idx;
    5162         357 :         sf->destroy_del = &destroy_del;
    5163             : 
    5164         357 :         sf->drop_col = &drop_col;                /* add drop to change list */
    5165         357 :         sf->drop_idx = &drop_idx;
    5166         357 :         sf->drop_del = &drop_del;
    5167             : 
    5168         357 :         sf->clear_table = &clear_table;
    5169             : 
    5170         357 :         sf->vacuum_col = &vacuum_col;
    5171         357 :         sf->vacuum_tab = &vacuum_tab;
    5172         357 :         sf->col_compress = &col_compress;
    5173         357 : }

Generated by: LCOV version 1.14