LCOV - code coverage report
Current view: top level - sql/storage/bat - bat_storage.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 2456 3150 78.0 %
Date: 2024-11-15 19:37:45 Functions: 157 165 95.2 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "bat_storage.h"
      15             : #include "bat_utils.h"
      16             : #include "sql_string.h"
      17             : #include "gdk_atoms.h"
      18             : #include "gdk_atoms.h"
      19             : #include "matomic.h"
      20             : 
      21             : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
      22             : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
      23             : 
      24             : static int log_update_col( sql_trans *tr, sql_change *c);
      25             : static int log_update_idx( sql_trans *tr, sql_change *c);
      26             : static int log_update_del( sql_trans *tr, sql_change *c);
      27             : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      28             : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      29             : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      30             : static int log_create_col(sql_trans *tr, sql_change *change);
      31             : static int log_create_idx(sql_trans *tr, sql_change *change);
      32             : static int log_create_del(sql_trans *tr, sql_change *change);
      33             : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      34             : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      35             : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      36             : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
      37             : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
      38             : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
      39             : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
      40             : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
      41             : 
      42             : static lng merge_delta( sql_delta *obat);
      43             : 
      44             : /* valid
      45             :  * !deleted && VALID_4_READ(TS, tr)                             existing or newly created segment
      46             :  *  deleted && TS > tr->ts && OLDTS < tr->ts                deleted after current transaction
      47             :  */
      48             : 
      49             : #define VALID_4_READ(TS,tr) \
      50             :         (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
      51             : 
      52             : /* when changed, check if the old status is still valid */
      53             : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
      54             :                 (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
      55             : 
      56             : #define SEG_VALID_4_DELETE(seg,tr) \
      57             :         (!seg->deleted && VALID_4_READ(seg->ts, tr))
      58             : 
      59             : /* Delete (in current trans or by some other finished transaction, or re-used segment which used to be deleted */
      60             : #define SEG_IS_DELETED(seg,tr) \
      61             :         ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
      62             :          (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
      63             : 
      64             : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
      65             : #define SEG_IS_VALID(seg, tr) \
      66             :                 ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
      67             :                  (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
      68             : 
      69             : static inline BAT *
      70        4955 : transfer_to_systrans(BAT *b)
      71             : {
      72             :         /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
      73        4955 :         MT_lock_set(&b->theaplock);
      74        4955 :         if (VIEWtparent(b) || VIEWvtparent(b)) {
      75          17 :                 MT_lock_unset(&b->theaplock);
      76          17 :                 BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
      77          17 :                 BBPreclaim(b);
      78          17 :                 return bn;
      79             :         }
      80        4938 :         if (b->theap->farmid == TRANSIENT ||
      81          14 :                 (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
      82        4753 :                 QryCtx *qc = MT_thread_get_qry_ctx();
      83        4753 :                 if (qc) {
      84        2523 :                         if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
      85        2523 :                                 ATOMIC_SUB(&qc->datasize, b->theap->size);
      86        2523 :                                 b->theap->farmid = SYSTRANS;
      87        2523 :                                 b->batRole = SYSTRANS;
      88             :                         }
      89        2523 :                         if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
      90        1092 :                                 ATOMIC_SUB(&qc->datasize, b->tvheap->size);
      91        1092 :                                 b->tvheap->farmid = SYSTRANS;
      92             :                         }
      93             :                 }
      94             :         }
      95        4938 :         MT_lock_unset(&b->theaplock);
      96        4938 :         return b;
      97             : }
      98             : 
      99             : static void
     100    25641912 : lock_table(sqlstore *store, sqlid id)
     101             : {
     102    25641912 :         MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     103    25723955 : }
     104             : 
     105             : static void
     106    25724036 : unlock_table(sqlstore *store, sqlid id)
     107             : {
     108    25724036 :         MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     109    25721901 : }
     110             : 
     111             : static void
     112    19817211 : lock_column(sqlstore *store, sqlid id)
     113             : {
     114    19817211 :         MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     115    19822342 : }
     116             : 
     117             : static void
     118    19822787 : unlock_column(sqlstore *store, sqlid id)
     119             : {
     120    19822787 :         MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     121    19825434 : }
     122             : 
     123             : static void
     124      113873 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     125             : {
     126      113873 :         assert(cleanup);
     127      113873 :         trans_add(tr, dup_base(b), data, cleanup, commit, log);
     128      113868 : }
     129             : 
     130             : static void
     131      132763 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     132             : {
     133      132763 :         assert(cleanup);
     134      132763 :         dup_base(&t->base);
     135      132764 :         trans_add(tr, b, data, cleanup, commit, log);
     136      132745 : }
     137             : 
     138             : static int
     139       92672 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
     140             : {
     141       92672 :         segment *s = change->data;
     142             : 
     143       92672 :         if (s->ts <= oldest) {
     144       34090 :                 while(s) {
     145       21266 :                         segment *n = s->prev;
     146       21266 :                         ATOMIC_PTR_DESTROY(&s->next);
     147       21266 :                         _DELETE(s);
     148       21266 :                         s = n;
     149             :                 }
     150       12824 :                 sqlstore *store = Store;
     151       12824 :                 table_destroy(store, (sql_table*)change->obj);
     152       12824 :                 return 1;
     153             :         }
     154             :         return LOG_OK;
     155             : }
     156             : 
     157             : static void
     158       21266 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
     159             : {
     160             :         /* we can only be accessed by anything older then commit_ts */
     161       21266 :         if (c->cleanup == &tc_gc_seg)
     162        8442 :                 s->prev = c->data;
     163             :         else
     164       12824 :                 c->cleanup = &tc_gc_seg;
     165       21266 :         c->data = s;
     166       21266 :         s->ts = commit_ts;
     167       16868 : }
     168             : 
     169             : static segment *
     170       86647 : new_segment(segment *o, sql_trans *tr, size_t cnt)
     171             : {
     172       86647 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     173             : 
     174       86645 :         assert(tr);
     175       86645 :         if (n) {
     176       86645 :                 *n = (segment) {
     177       86645 :                         .ts = tr->tid,
     178             :                         .oldts = 0,
     179             :                         .deleted = false,
     180             :                         .start = 0,
     181             :                         .end = cnt,
     182             :                         .next = ATOMIC_PTR_VAR_INIT(NULL),
     183             :                         .prev = NULL,
     184             :                 };
     185       86645 :                 if (o) {
     186       36210 :                         n->start += o->end;
     187       36210 :                         n->end += o->end;
     188       36210 :                         ATOMIC_PTR_SET(&o->next, n);
     189             :                 }
     190             :         }
     191       86645 :         return n;
     192             : }
     193             : 
     194             : static segment *
     195       81065 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
     196             : {
     197       81065 :         assert(tr);
     198       81065 :         if (o->start == start && o->end == start+cnt) {
     199       10001 :                 assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
     200       10001 :                 o->oldts = o->ts;
     201       10001 :                 o->ts = tr->tid;
     202       10001 :                 o->deleted = deleted;
     203       10001 :                 return o;
     204             :         }
     205       71064 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     206             : 
     207       71064 :         if (!n)
     208             :                 return NULL;
     209       71064 :         n->prev = NULL;
     210             : 
     211       71064 :         if (o->ts == tr->tid) {
     212        2240 :                 n->oldts = 0;
     213        2240 :                 n->ts = 1;
     214        2240 :                 n->deleted = true;
     215             :         } else {
     216       68824 :                 n->oldts = o->ts;
     217       68824 :                 n->ts = tr->tid;
     218       68824 :                 n->deleted = deleted;
     219             :         }
     220       71064 :         if (start == o->start) {
     221             :                 /* 2-way split: o remains latter part of segment, new one is
     222             :                  * inserted before */
     223       57917 :                 n->start = o->start;
     224       57917 :                 n->end = n->start + cnt;
     225       57917 :                 ATOMIC_PTR_INIT(&n->next, o);
     226       57917 :                 if (segs->h == o)
     227         418 :                         segs->h = n;
     228       57917 :                 if (p)
     229       57499 :                         ATOMIC_PTR_SET(&p->next, n);
     230       57917 :                 o->start = n->end;
     231       13147 :         } else if (start+cnt == o->end) {
     232             :                 /* 2-way split: o remains first part of segment, new one is
     233             :                  * added after */
     234        5588 :                 n->start = o->end - cnt;
     235        5588 :                 n->end = o->end;
     236        5588 :                 ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
     237        5588 :                 ATOMIC_PTR_SET(&o->next, n);
     238        5588 :                 if (segs->t == o)
     239         827 :                         segs->t = n;
     240        5588 :                 o->end = n->start;
     241             :         } else {
     242             :                 /* 3-way split: o remains first part of segment, two new ones
     243             :                  * are added after */
     244        7559 :                 segment *n2 = GDKmalloc(sizeof(segment));
     245        7559 :                 if (n2 == NULL) {
     246           0 :                         GDKfree(n);
     247           0 :                         return NULL;
     248             :                 }
     249        7559 :                 ATOMIC_PTR_INIT(&n->next, n2);
     250        7559 :                 n->start = start;
     251        7559 :                 n->end = start + cnt;
     252        7559 :                 *n2 = *o;
     253        7559 :                 ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
     254        7559 :                 n2->start = n->end;
     255        7559 :                 n2->prev = NULL;
     256        7559 :                 if (segs->t == o)
     257        2453 :                         segs->t = n2;
     258        7559 :                 ATOMIC_PTR_SET(&o->next, n);
     259        7559 :                 o->end = start;
     260             :         }
     261             :         return n;
     262             : }
     263             : 
     264             : static void
     265        4306 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
     266             : {
     267        4306 :         segment *cur = segs->h, *seg = NULL;
     268       18334 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     269       14028 :                 if (cur->ts == tr->tid) { /* revert */
     270        4803 :                         cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
     271        4803 :                         cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
     272        4803 :                         cur->oldts = 0;
     273             :                 }
     274       14028 :                 if (cur->ts <= oldest) { /* possibly merge range */
     275       13113 :                         if (!seg) { /* skip first */
     276             :                                 seg = cur;
     277        8807 :                         } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
     278             :                                 /* merge with previous */
     279        4398 :                                 seg->end = cur->end;
     280        4398 :                                 ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     281        4398 :                                 if (cur == segs->t)
     282        2861 :                                         segs->t = seg;
     283        4398 :                                 mark4destroy(cur, change, store_get_timestamp(tr->store));
     284        4398 :                                 cur = seg;
     285             :                         } else {
     286             :                                 seg = cur; /* begin of new merge */
     287             :                         }
     288             :                 }
     289             :         }
     290        4306 : }
     291             : 
     292             : static size_t
     293      103438 : segs_end_include_deleted( segments *segs, sql_trans *tr)
     294             : {
     295      103438 :         size_t cnt = 0;
     296      103438 :         segment *s = segs->h, *l = NULL;
     297             : 
     298      491927 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     299      388489 :                 if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
     300             :                                 l = s;
     301             :         }
     302      103438 :         if (l)
     303      103431 :                 cnt = l->end;
     304      103438 :         return cnt;
     305             : }
     306             : 
     307             : static int
     308      103438 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
     309             : {
     310             :         /* set bits correctly */
     311      103438 :         BAT *b = temp_descriptor(cs->bid);
     312             : 
     313      103438 :         if (!b)
     314             :                 return LOG_ERR;
     315      103438 :         segment *s = segs->h;
     316             : 
     317      103438 :         size_t nr = segs_end_include_deleted(segs, tr);
     318      103438 :         size_t rounded_nr = ((nr+31)&~31);
     319      103438 :         if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
     320           0 :                 bat_destroy(b);
     321           0 :                 return LOG_ERR;
     322             :         }
     323             : 
     324             :         /* disable all properties here */
     325      103438 :         MT_lock_set(&b->theaplock);
     326      103438 :         b->tsorted = false;
     327      103438 :         b->trevsorted = false;
     328      103438 :         b->tnosorted = 0;
     329      103438 :         b->tnorevsorted = 0;
     330      103438 :         b->tseqbase = oid_nil;
     331      103438 :         b->tkey = false;
     332      103438 :         b->tnokey[0] = 0;
     333      103438 :         b->tnokey[1] = 0;
     334      103438 :         b->theap->dirty = true;
     335      103438 :         BUN cnt = BATcount(b);
     336             : 
     337      103438 :         uint32_t *restrict dst;
     338      428207 :         for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
     339      368906 :                 if (s->start >= nr)
     340             :                         break;
     341      324769 :                 if (s->ts == tr->tid && s->end != s->start) {
     342      139970 :                         if (cnt < s->start) { /* first mark as deleted ! */
     343        3282 :                                 size_t lnr = s->start-cnt;
     344        3282 :                                 size_t pos = cnt;
     345        3282 :                                 dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     346        3282 :                                 uint32_t cur = 0;
     347             : 
     348        3282 :                                 size_t used = pos&31, end = 32;
     349        3282 :                                 if (used) {
     350        3180 :                                         if (lnr < (32-used))
     351        3006 :                                                 end = used + lnr;
     352        3180 :                                         assert(end > used);
     353        3180 :                                         cur |= ((1U << (end - used)) - 1) << used;
     354        3180 :                                         lnr -= end - used;
     355        3180 :                                         *dst++ |= cur;
     356        3180 :                                         cur = 0;
     357             :                                 }
     358        3282 :                                 size_t full = lnr/32;
     359        3282 :                                 size_t rest = lnr%32;
     360        3282 :                                 if (full > 0) {
     361           7 :                                         memset(dst, ~0, full * sizeof(*dst));
     362           7 :                                         dst += full;
     363           7 :                                         lnr -= full * 32;
     364             :                                 }
     365        3282 :                                 if (rest > 0) {
     366         163 :                                         cur |= (1U << rest) - 1;
     367         163 :                                         lnr -= rest;
     368         163 :                                         *dst |= cur;
     369             :                                 }
     370        3282 :                                 assert(lnr==0);
     371             :                         }
     372      139970 :                         size_t lnr = s->end-s->start;
     373      139970 :                         size_t pos = s->start;
     374      139970 :                         dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     375      139970 :                         uint32_t cur = 0;
     376      139970 :                         size_t used = pos&31, end = 32;
     377      139970 :                         if (used) {
     378      102486 :                                 if (lnr < (32-used))
     379       97798 :                                         end = used + lnr;
     380      102486 :                                 assert(end > used);
     381      102486 :                                 cur |= ((1U << (end - used)) - 1) << used;
     382      102486 :                                 lnr -= end - used;
     383      102486 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     384      102486 :                                 dst++;
     385      102486 :                                 cur = 0;
     386             :                         }
     387      139970 :                         size_t full = lnr/32;
     388      139970 :                         size_t rest = lnr%32;
     389      139970 :                         if (full > 0) {
     390        3413 :                                 memset(dst, s->deleted?~0:0, full * sizeof(*dst));
     391        3413 :                                 dst += full;
     392        3413 :                                 lnr -= full * 32;
     393             :                         }
     394      139970 :                         if (rest > 0) {
     395       39862 :                                 cur |= (1U << rest) - 1;
     396       39862 :                                 lnr -= rest;
     397       39862 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     398             :                         }
     399      139970 :                         assert(lnr==0);
     400      139970 :                         if (cnt < s->end)
     401      324769 :                                 cnt = s->end;
     402             :                 }
     403             :         }
     404      103438 :         if (nr > BATcount(b)) {
     405       62254 :                 BATsetcount(b, nr);
     406             :         }
     407      103438 :         MT_lock_unset(&b->theaplock);
     408             : 
     409      103438 :         bat_destroy(b);
     410      103438 :         return LOG_OK;
     411             : }
     412             : 
     413             : /* TODO return LOG_OK/ERR */
     414             : static void
     415      103464 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
     416             : {
     417      103464 :         sqlstore* store = tr->store;
     418      103464 :         segment *cur = s->segs->h, *seg = NULL;
     419      492025 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     420      388561 :                 if (cur->ts == tr->tid) {
     421      154018 :                         if (!cur->deleted)
     422       90974 :                                 cur->oldts = 0;
     423      154018 :                         cur->ts = commit_ts;
     424             :                 }
     425      388561 :                 if (!seg) {
     426             :                         /* first segment */
     427             :                         seg = cur;
     428             :                 }
     429      285097 :                 else if (seg->ts < TRANSACTION_ID_BASE) {
     430             :                         /* possible merge since both deleted flags are equal */
     431      262953 :                         if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
     432      203134 :                                 int merge = 1;
     433      203134 :                                 node *n = store->active->h;
     434      708086 :                                 for (int i = 0; i < store->active->cnt; i++, n = n->next) {
     435      614514 :                                         sql_trans* other = ((sql_trans*)n->data);
     436      614514 :                                         ulng active = other->ts;
     437      614514 :                                         if(other->active == 2)
     438       31753 :                                                 continue; /* pretend that another recently committed transaction is no longer active */
     439      582761 :                                         if (active == tr->ts)
     440      127731 :                                                 continue; /* pretend that committing transaction has already committed and is no longer active */
     441      455030 :                                         if (seg->ts < active && cur->ts < active)
     442             :                                                 break;
     443      442297 :                                         if (seg->ts > active && cur->ts > active)
     444      345468 :                                                 continue;
     445             : 
     446       96829 :                                         assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
     447             :                                         /* cannot safely merge since there is an active transaction between the segments */
     448             :                                         merge = false;
     449             :                                         break;
     450             :                                 }
     451             :                                 /* merge segments */
     452      212610 :                                 if (merge) {
     453      106305 :                                         seg->end = cur->end;
     454      106305 :                                         ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     455      106305 :                                         if (cur == s->segs->t)
     456       27176 :                                                 s->segs->t = seg;
     457      106305 :                                         if (commit_ts == oldest) {
     458       89437 :                                                 ATOMIC_PTR_DESTROY(&cur->next);
     459       89437 :                                                 _DELETE(cur);
     460             :                                         } else
     461       33736 :                                                 mark4destroy(cur, change, commit_ts);
     462      106305 :                                         cur = seg;
     463      106305 :                                         continue;
     464             :                                 }
     465             :                         }
     466             :                 }
     467             :                 seg = cur;
     468             :         }
     469      103464 : }
     470             : 
     471             : static int
     472     2113749 : segments_in_transaction(sql_trans *tr, sql_table *t)
     473             : {
     474     2113749 :         storage *s = ATOMIC_PTR_GET(&t->data);
     475     2113749 :         segment *seg = s->segs->h;
     476             : 
     477     2113749 :         if (seg && s->segs->t->ts == tr->tid)
     478             :                 return 1;
     479      564728 :         for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
     480      459850 :                 if (seg->ts == tr->tid)
     481             :                         return 1;
     482             :         }
     483             :         return 0;
     484             : }
     485             : 
     486             : static size_t
     487    17977373 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
     488             : {
     489    17977373 :         size_t cnt = 0;
     490             : 
     491             :         /* because a table can grow rows over the time a transaction is running, we need to find the last valid segment, to
     492             :          * keep all of the parts aligned */
     493    17977373 :         lock_table(tr->store, table->base.id);
     494    18040420 :         segment *s = segs->h, *l = NULL;
     495             : 
     496    18040420 :         if (segs->t && SEG_IS_VALID(segs->t, tr))
     497    14344591 :                 l = s = segs->t;
     498             : 
     499   219471494 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     500   201431245 :                 if (SEG_IS_VALID(s, tr))
     501             :                                 l = s;
     502             :         }
     503    18040249 :         if (l)
     504    18021848 :                 cnt = l->end;
     505    18040249 :         unlock_table(tr->store, table->base.id);
     506    18039124 :         return cnt;
     507             : }
     508             : 
     509             : static segments *
     510       50434 : new_segments(sql_trans *tr, size_t cnt)
     511             : {
     512       50434 :         segments *n = (segments*)GDKmalloc(sizeof(segments));
     513             : 
     514       50439 :         if (n) {
     515       50439 :                 n->nr_reused = 0;
     516       50439 :                 n->h = n->t = new_segment(NULL, tr, cnt);
     517       50435 :                 if (!n->h) {
     518           0 :                         GDKfree(n);
     519           0 :                         return NULL;
     520             :                 }
     521       50435 :                 sql_ref_init(&n->r);
     522             :         }
     523             :         return n;
     524             : }
     525             : 
     526             : static sql_delta *
     527    29774722 : timestamp_delta( sql_trans *tr, sql_delta *d)
     528             : {
     529    29827495 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     530       52773 :                 d = d->next;
     531    29780561 :         return d;
     532             : }
     533             : 
     534             : static sql_delta *
     535    29622658 : col_timestamp_delta( sql_trans *tr, sql_column *c)
     536             : {
     537    29622658 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
     538             : }
     539             : 
     540             : static sql_delta *
     541       23407 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
     542             : {
     543       23407 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
     544             : }
     545             : 
     546             : static storage *
     547    18418545 : timestamp_storage( sql_trans *tr, storage *d)
     548             : {
     549    18418545 :         if (!d)
     550             :                 return NULL;
     551    18488610 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     552       70065 :                 d = d->next;
     553             :         return d;
     554             : }
     555             : 
     556             : static storage *
     557    18396931 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
     558             : {
     559    18396931 :         return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
     560             : }
     561             : 
     562             : static sql_delta*
     563       19306 : delta_dup(sql_delta *d)
     564             : {
     565       19306 :         ATOMIC_INC(&d->cs.refcnt);
     566       19306 :         return d;
     567             : }
     568             : 
     569             : static void *
     570       17940 : col_dup(sql_column *c)
     571             : {
     572       17940 :         return delta_dup(ATOMIC_PTR_GET(&c->data));
     573             : }
     574             : 
     575             : static void *
     576        2653 : idx_dup(sql_idx *i)
     577             : {
     578        2653 :         if (!ATOMIC_PTR_GET(&i->data))
     579             :                 return NULL;
     580        1366 :         return delta_dup(ATOMIC_PTR_GET(&i->data));
     581             : }
     582             : 
     583             : static storage*
     584        1539 : storage_dup(storage *d)
     585             : {
     586        1539 :         ATOMIC_INC(&d->cs.refcnt);
     587        1539 :         return d;
     588             : }
     589             : 
     590             : static void *
     591        1539 : del_dup(sql_table *t)
     592             : {
     593        1539 :         return storage_dup(ATOMIC_PTR_GET(&t->data));
     594             : }
     595             : 
     596             : static size_t
     597          17 : count_inserts( segment *s, sql_trans *tr)
     598             : {
     599          17 :         size_t cnt = 0;
     600             : 
     601          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     602          55 :                 if (!s->deleted && s->ts == tr->tid)
     603           4 :                         cnt += s->end - s->start;
     604             :         }
     605          17 :         return cnt;
     606             : }
     607             : 
     608             : static size_t
     609      780889 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
     610             : {
     611      780889 :         size_t cnt = 0;
     612             : 
     613      922793 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
     614             :                 ;
     615             : 
     616     1929429 :         for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
     617     1148592 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
     618       23992 :                         cnt += s->end - s->start;
     619             :         }
     620      780837 :         return cnt;
     621             : }
     622             : 
     623             : static size_t
     624          17 : count_deletes( segment *s, sql_trans *tr)
     625             : {
     626          17 :         size_t cnt = 0;
     627             : 
     628          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     629          55 :                 if (SEG_IS_DELETED(s, tr))
     630          17 :                         cnt += s->end - s->start;
     631             :         }
     632          17 :         return cnt;
     633             : }
     634             : 
     635             : #define CNT_ACTIVE 10
     636             : 
     637             : static size_t
     638    16746668 : count_col(sql_trans *tr, sql_column *c, int access)
     639             : {
     640    16746668 :         storage *d;
     641    16746668 :         sql_delta *ds;
     642             : 
     643    16746668 :         if (!isTable(c->t))
     644             :                 return 0;
     645    16746668 :         d = tab_timestamp_storage(tr, c->t);
     646    16756873 :         ds = col_timestamp_delta(tr, c);
     647    16762303 :         if (!d ||!ds)
     648             :                 return 0;
     649    16762303 :         if (access == 2)
     650      433532 :                 return ds?ds->cs.ucnt:0;
     651    16328771 :         if (access == 1)
     652          17 :                 return count_inserts(d->segs->h, tr);
     653    16328754 :         if (access == QUICK)
     654           0 :                 return d->segs->t?d->segs->t->end:0;
     655    16328754 :         if (access == CNT_ACTIVE) {
     656      780416 :                 size_t cnt = segs_end(d->segs, tr, c->t);
     657      780976 :                 lock_table(tr->store, c->t->base.id);
     658      780905 :                 cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
     659      780852 :                 unlock_table(tr->store, c->t->base.id);
     660      780852 :                 return cnt;
     661             :         }
     662    15548338 :         return segs_end(d->segs, tr, c->t);
     663             : }
     664             : 
     665             : static size_t
     666       19352 : count_idx(sql_trans *tr, sql_idx *i, int access)
     667             : {
     668       19352 :         storage *d;
     669       19352 :         sql_delta *ds;
     670             : 
     671       19352 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
     672        2543 :                 return 0;
     673       16809 :         d = tab_timestamp_storage(tr, i->t);
     674       16823 :         ds = idx_timestamp_delta(tr, i);
     675       16828 :         if (!d || !ds)
     676             :                 return 0;
     677       16828 :         if (access == 2)
     678        2871 :                 return ds?ds->cs.ucnt:0;
     679       13957 :         if (access == 1)
     680           0 :                 return count_inserts(d->segs->h, tr);
     681       13957 :         if (access == QUICK)
     682        3550 :                 return d->segs->t?d->segs->t->end:0;
     683       10407 :         return segs_end(d->segs, tr, i->t);
     684             : }
     685             : 
     686             : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
     687             : static BAT *
     688    12754931 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
     689             : {
     690    12754931 :         BAT *b;
     691             : 
     692    12754931 :         assert(access == RD_UPD_ID || access == RD_UPD_VAL);
     693             :         /* returns the updates for cs */
     694    12754931 :         if (cs->uibid && cs->uvbid && cs->ucnt) {
     695       12460 :                 if (access == RD_UPD_ID) {
     696        7615 :                         if (!(b = temp_descriptor(cs->uibid)))
     697             :                                 return NULL;
     698        7615 :                         if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
     699        1337 :                            (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
     700        6278 :                                         oid nil = oid_nil;
     701             :                                         /* less then cnt */
     702        6278 :                                         BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false, false);
     703        6278 :                                         if (!s) {
     704           0 :                                                 bat_destroy(b);
     705           0 :                                                 return NULL;
     706             :                                         }
     707             : 
     708        6278 :                                         BAT *nb = BATproject(s, b);
     709        6278 :                                         bat_destroy(s);
     710        6278 :                                         bat_destroy(b);
     711        6278 :                                         b = nb;
     712             :                         }
     713             :                 } else {
     714        4845 :                         b = temp_descriptor(cs->uvbid);
     715             :                 }
     716             :         } else {
     717    21238227 :                 b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
     718             :         }
     719             :         return b;
     720             : }
     721             : 
     722             : static BAT *
     723           0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
     724             : {
     725           0 :         int err = 0;
     726           0 :         BAT *uv = *UV;
     727           0 :         BUN cnt = BATcount(ui)+BATcount(oi);
     728           0 :         BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
     729           0 :         BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
     730             : 
     731           0 :         if (!ni || (uv && !nv)) {
     732           0 :                 bat_destroy(ni);
     733           0 :                 bat_destroy(nv);
     734           0 :                 bat_destroy(ui);
     735           0 :                 bat_destroy(uv);
     736           0 :                 bat_destroy(oi);
     737           0 :                 bat_destroy(ov);
     738           0 :                 return NULL;
     739             :         }
     740           0 :         BATiter uvi;
     741           0 :         BATiter ovi;
     742             : 
     743           0 :         if (uv) {
     744           0 :                 uvi = bat_iterator(uv);
     745           0 :                 ovi = bat_iterator(ov);
     746             :         }
     747             : 
     748             :         /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
     749           0 :         BUN uip = 0, uie = BATcount(ui);
     750           0 :         BUN oip = 0, oie = BATcount(oi);
     751             : 
     752           0 :         oid uiseqb = ui->tseqbase;
     753           0 :         oid oiseqb = oi->tseqbase;
     754           0 :         oid *uipt = NULL, *oipt = NULL;
     755           0 :         BATiter uii = bat_iterator(ui);
     756           0 :         BATiter oii = bat_iterator(oi);
     757           0 :         if (!BATtdensebi(&uii))
     758           0 :                 uipt = uii.base;
     759           0 :         if (!BATtdensebi(&oii))
     760           0 :                 oipt = oii.base;
     761           0 :         while (uip < uie && oip < oie && !err) {
     762           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     763           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     764             : 
     765           0 :                 if (uiid <= oiid) {
     766           0 :                         if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     767           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     768             :                                 err = 1;
     769           0 :                         uip++;
     770           0 :                         if (uiid == oiid)
     771           0 :                                 oip++;
     772             :                 } else { /* uiid > oiid */
     773           0 :                         if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     774           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     775             :                                 err = 1;
     776           0 :                         oip++;
     777             :                 }
     778             :         }
     779           0 :         while (uip < uie && !err) {
     780           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     781           0 :                 if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     782           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     783             :                         err = 1;
     784           0 :                 uip++;
     785             :         }
     786           0 :         while (oip < oie && !err) {
     787           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     788           0 :                 if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     789           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     790             :                         err = 1;
     791           0 :                 oip++;
     792             :         }
     793           0 :         if (uv) {
     794           0 :                 bat_iterator_end(&uvi);
     795           0 :                 bat_iterator_end(&ovi);
     796             :         }
     797           0 :         bat_iterator_end(&uii);
     798           0 :         bat_iterator_end(&oii);
     799           0 :         bat_destroy(ui);
     800           0 :         bat_destroy(uv);
     801           0 :         bat_destroy(oi);
     802           0 :         bat_destroy(ov);
     803           0 :         if (!err) {
     804           0 :                 if (nv)
     805           0 :                         *UV = nv;
     806           0 :                 return ni;
     807             :         }
     808           0 :         *UV = NULL;
     809           0 :         bat_destroy(ni);
     810           0 :         bat_destroy(nv);
     811           0 :         return NULL;
     812             : }
     813             : 
     814             : static sql_delta *
     815     8506905 : older_delta( sql_delta *d, sql_trans *tr)
     816             : {
     817     8506905 :         sql_delta *o = d->next;
     818             : 
     819     8514518 :         while (o && !o->cs.merged) {
     820        7602 :                 if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     821             :                         break;
     822             :                 else
     823        7613 :                         o = o->next;
     824             :         }
     825     8506916 :         if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     826           0 :                 return o;
     827             :         return NULL;
     828             : }
     829             : 
     830             : static BAT *
     831     8503215 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
     832             : {
     833     8503215 :         assert(tr->active);
     834     8503215 :         sql_delta *o = NULL;
     835     8503215 :         BAT *ui = NULL, *uv = NULL;
     836             : 
     837     8503215 :         if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
     838             :                 return NULL;
     839     8506555 :         if (access == RD_UPD_VAL) {
     840     4254488 :                 if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
     841           0 :                         bat_destroy(ui);
     842           0 :                         return NULL;
     843             :                 }
     844             :         }
     845     8506829 :         while ((o = older_delta(d, tr)) != NULL) {
     846           0 :                 BAT *oui = NULL, *ouv = NULL;
     847           0 :                 if (!oui)
     848           0 :                         oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
     849           0 :                 if (access == RD_UPD_VAL)
     850           0 :                         ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
     851           0 :                 if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
     852           0 :                         bat_destroy(ui);
     853           0 :                         bat_destroy(uv);
     854           0 :                         bat_destroy(oui);
     855           0 :                         bat_destroy(ouv);
     856           0 :                         return NULL;
     857             :                 }
     858           0 :                 if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
     859             :                         return NULL;
     860             :                 d = o;
     861             :         }
     862     8506925 :         if (uv) {
     863     4254764 :                 bat_destroy(ui);
     864     4254764 :                 return uv;
     865             :         }
     866             :         return ui;
     867             : }
     868             : 
     869             : static BAT *
     870        2078 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
     871             : {
     872        2078 :         lock_column(tr->store, c->base.id);
     873        2078 :         sql_delta *d = col_timestamp_delta(tr, c);
     874        2078 :         int type = c->type.type->localtype;
     875             : 
     876        2078 :         if (!d) {
     877           0 :                 unlock_column(tr->store, c->base.id);
     878           0 :                 return NULL;
     879             :         }
     880        2078 :         if (d->cs.st == ST_DICT) {
     881           0 :                 BAT *b = quick_descriptor(d->cs.bid);
     882             : 
     883           0 :                 type = b->ttype;
     884             :         }
     885        2078 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     886        2078 :         unlock_column(tr->store, c->base.id);
     887        2078 :         return bn;
     888             : }
     889             : 
     890             : static BAT *
     891           0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
     892             : {
     893           0 :         lock_column(tr->store, i->base.id);
     894           0 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     895           0 :         sql_delta *d = idx_timestamp_delta(tr, i);
     896             : 
     897           0 :         if (!d) {
     898           0 :                 unlock_column(tr->store, i->base.id);
     899           0 :                 return NULL;
     900             :         }
     901           0 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     902           0 :         unlock_column(tr->store, i->base.id);
     903           0 :         return bn;
     904             : }
     905             : 
     906             : static BAT *
     907     8646523 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
     908             : {
     909     8646523 :         BAT *b;
     910             : 
     911     8646523 :         assert(access == RDONLY || access == QUICK || access == RD_EXT);
     912     8646523 :         assert(cs != NULL);
     913     8646523 :         if (access == QUICK)
     914       87679 :                 return quick_descriptor(cs->bid);
     915     8558844 :         if (access == RD_EXT)
     916         857 :                 return temp_descriptor(cs->ebid);
     917     8557987 :         assert(cs->bid);
     918     8557987 :         b = temp_descriptor(cs->bid);
     919     8559315 :         if (b == NULL)
     920             :                 return NULL;
     921     8559315 :         assert(b->batRestricted == BAT_READ);
     922             :         /* return slice */
     923     8559315 :         BAT *s = BATslice(b, 0, cnt);
     924     8544275 :         bat_destroy(b);
     925     8544275 :         return s;
     926             : }
     927             : 
     928             : static int
     929     4250782 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
     930             : {
     931     4250782 :         lock_column(tr->store, c->base.id);
     932     4250956 :         size_t cnt = count_col(tr, c, 0);
     933     4252573 :         sql_delta *d = col_timestamp_delta(tr, c);
     934     4252594 :         int type = c->type.type->localtype;
     935             : 
     936     4252594 :         if (!d) {
     937           0 :                 unlock_column(tr->store, c->base.id);
     938           0 :                 return LOG_ERR;
     939             :         }
     940     4252594 :         if (d->cs.st == ST_DICT) {
     941           2 :                 BAT *b = quick_descriptor(d->cs.bid);
     942             : 
     943           2 :                 type = b->ttype;
     944             :         }
     945             : 
     946     4252594 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     947     4252609 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     948             : 
     949     4252521 :         unlock_column(tr->store, c->base.id);
     950             : 
     951     4252311 :         if (*ui == NULL || *uv == NULL) {
     952           0 :                 bat_destroy(*ui);
     953           0 :                 bat_destroy(*uv);
     954           0 :                 return LOG_ERR;
     955             :         }
     956             :         return LOG_OK;
     957             : }
     958             : 
     959             : static int
     960          23 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
     961             : {
     962          23 :         lock_column(tr->store, i->base.id);
     963          23 :         size_t cnt = count_idx(tr, i, 0);
     964          23 :         sql_delta *d = idx_timestamp_delta(tr, i);
     965          23 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     966             : 
     967          23 :         if (!d) {
     968           0 :                 unlock_column(tr->store, i->base.id);
     969           0 :                 return LOG_ERR;
     970             :         }
     971             : 
     972          23 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     973          23 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     974             : 
     975          23 :         unlock_column(tr->store, i->base.id);
     976             : 
     977          23 :         if (*ui == NULL || *uv == NULL) {
     978           0 :                 bat_destroy(*ui);
     979           0 :                 bat_destroy(*uv);
     980           0 :                 return LOG_ERR;
     981             :         }
     982             :         return LOG_OK;
     983             : }
     984             : 
     985             : static void *                                   /* BAT * */
     986     8637377 : bind_col(sql_trans *tr, sql_column *c, int access)
     987             : {
     988     8637377 :         assert(access == QUICK || tr->active);
     989     8637377 :         if (!isTable(c->t))
     990             :                 return NULL;
     991     8637377 :         sql_delta *d = col_timestamp_delta(tr, c);
     992     8639359 :         if (!d)
     993             :                 return NULL;
     994     8639359 :         size_t cnt = count_col(tr, c, 0);
     995     8643299 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
     996        2078 :                 return bind_ucol(tr, c, access, cnt);
     997     8641221 :         BAT *b = cs_bind_bat( &d->cs, access, cnt);
     998     8636261 :         assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
     999             :         return b;
    1000             : }
    1001             : 
    1002             : static void *                                   /* BAT * */
    1003        6594 : bind_idx(sql_trans *tr, sql_idx * i, int access)
    1004             : {
    1005        6594 :         assert(access == QUICK || tr->active);
    1006        6594 :         if (!isTable(i->t))
    1007             :                 return NULL;
    1008        6594 :         sql_delta *d = idx_timestamp_delta(tr, i);
    1009        6598 :         if (!d)
    1010             :                 return NULL;
    1011        6598 :         size_t cnt = count_idx(tr, i, 0);
    1012        6599 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
    1013           0 :                 return bind_uidx(tr, i, access, cnt);
    1014        6599 :         return cs_bind_bat( &d->cs, access, cnt);
    1015             : }
    1016             : 
    1017             : static int
    1018        4052 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
    1019             : {
    1020        4052 :         if (!cs->uibid) {
    1021           0 :                 cs->uibid = e_bat(TYPE_oid);
    1022           0 :                 if (cs->uibid == BID_NIL)
    1023             :                         return LOG_ERR;
    1024             :         }
    1025        4052 :         if (!cs->uvbid) {
    1026           0 :                 BAT *cur = quick_descriptor(cs->bid);
    1027           0 :                 if (!cur)
    1028             :                         return LOG_ERR;
    1029           0 :                 int type = cur->ttype;
    1030           0 :                 cs->uvbid = e_bat(type);
    1031           0 :                 if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1032             :                         return LOG_ERR;
    1033             :         }
    1034        4052 :         BAT *ui = temp_descriptor(cs->uibid);
    1035        4052 :         BAT *uv = temp_descriptor(cs->uvbid);
    1036             : 
    1037        4052 :         if (ui == NULL || uv == NULL) {
    1038           0 :                 bat_destroy(ui);
    1039           0 :                 bat_destroy(uv);
    1040           0 :                 return LOG_ERR;
    1041             :         }
    1042        4052 :         assert(ui && uv);
    1043        4052 :         if (isEbat(ui)){
    1044         400 :                 temp_destroy(cs->uibid);
    1045         400 :                 cs->uibid = temp_copy(ui->batCacheid, true, true);
    1046         400 :                 bat_destroy(ui);
    1047         400 :                 if (cs->uibid == BID_NIL ||
    1048         400 :                     (ui = temp_descriptor(cs->uibid)) == NULL) {
    1049           0 :                         bat_destroy(uv);
    1050           0 :                         return LOG_ERR;
    1051             :                 }
    1052             :         }
    1053        4052 :         if (isEbat(uv)){
    1054         400 :                 temp_destroy(cs->uvbid);
    1055         400 :                 cs->uvbid = temp_copy(uv->batCacheid, true, true);
    1056         400 :                 bat_destroy(uv);
    1057         400 :                 if (cs->uvbid == BID_NIL ||
    1058         400 :                     (uv = temp_descriptor(cs->uvbid)) == NULL) {
    1059           0 :                         bat_destroy(ui);
    1060           0 :                         return LOG_ERR;
    1061             :                 }
    1062             :         }
    1063        4052 :         *Ui = ui;
    1064        4052 :         *Uv = uv;
    1065        4052 :         return LOG_OK;
    1066             : }
    1067             : 
    1068             : static int
    1069        6908 : segments_is_append(segment *s, sql_trans *tr, oid rid)
    1070             : {
    1071      102221 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1072      102221 :                 if (s->start <= rid && s->end > rid) {
    1073        6908 :                         if (s->ts == tr->tid && !s->deleted) {
    1074        2858 :                                 return 1;
    1075             :                         }
    1076             :                         break;
    1077             :                 }
    1078             :         }
    1079             :         return 0;
    1080             : }
    1081             : 
    1082             : static int
    1083        4050 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
    1084             : {
    1085       96455 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1086       96455 :                 if (s->start <= rid && s->end > rid) {
    1087        4050 :                         if (s->ts >= tr->ts && s->deleted) {
    1088           0 :                                 return 1;
    1089             :                         }
    1090             :                         break;
    1091             :                 }
    1092             :         }
    1093             :         return 0;
    1094             : }
    1095             : 
    1096             : static sql_delta *
    1097           0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
    1098             : {
    1099           0 :         sql_delta *n = ZNEW(sql_delta);
    1100           0 :         if (!n)
    1101             :                 return NULL;
    1102           0 :         *n = *bat;
    1103           0 :         n->next = NULL;
    1104           0 :         n->cs.ts = tr->tid;
    1105           0 :         return n;
    1106             : }
    1107             : 
    1108             : static BAT *
    1109          17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
    1110             : {
    1111          17 :         BAT *newoffsets = NULL;
    1112          17 :         sql_delta *bat = *batp;
    1113          17 :         column_storage *cs = &bat->cs;
    1114          17 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1115             : 
    1116          17 :         if (!u)
    1117             :                 return NULL;
    1118          17 :         BUN max_cnt = (BATcount(u) < 256)?256:(BATcount(u)<65536)?65536:INT_MAX;
    1119          17 :         if (DICTprepare4append(&newoffsets, i, u) < 0) {
    1120           0 :                 bat_destroy(u);
    1121           0 :                 return NULL;
    1122             :         } else {
    1123          17 :                 int new = 0;
    1124             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1125          17 :                 if (BATcount(u) >= max_cnt) {
    1126           1 :                         if (max_cnt == INT_MAX) { /* decompress */
    1127           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1128           0 :                                         bat_destroy(u);
    1129           0 :                                         return NULL;
    1130             :                                 }
    1131           0 :                                 if (cs->ucnt) {
    1132           0 :                                         BAT *ui = NULL, *uv = NULL;
    1133           0 :                                         BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1134           0 :                                         bat_destroy(b);
    1135           0 :                                         if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1136           0 :                                                 bat_destroy(nb);
    1137           0 :                                                 bat_destroy(u);
    1138           0 :                                                 return NULL;
    1139             :                                         }
    1140           0 :                                         b = nb;
    1141           0 :                                         if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1142           0 :                                                 bat_destroy(ui);
    1143           0 :                                                 bat_destroy(uv);
    1144           0 :                                                 bat_destroy(b);
    1145           0 :                                                 bat_destroy(u);
    1146             :                                         }
    1147           0 :                                         bat_destroy(ui);
    1148           0 :                                         bat_destroy(uv);
    1149             :                                 }
    1150           0 :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1151           0 :                                 bat_destroy(b);
    1152           0 :                                 assert(newoffsets == NULL);
    1153           0 :                                 if (!n) {
    1154           0 :                                         bat_destroy(u);
    1155           0 :                                         return NULL;
    1156             :                                 }
    1157           0 :                                 if (cs->ts != tr->tid) {
    1158           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1159           0 :                                                 bat_destroy(n);
    1160           0 :                                                 return NULL;
    1161             :                                         }
    1162           0 :                                         cs = &(*batp)->cs;
    1163           0 :                                         new = 1;
    1164             :                                 }
    1165           0 :                                 if (cs->bid && !new)
    1166           0 :                                         temp_destroy(cs->bid);
    1167           0 :                                 n = transfer_to_systrans(n);
    1168           0 :                                 if (n == NULL)
    1169             :                                         return NULL;
    1170           0 :                                 bat_set_access(n, BAT_READ);
    1171           0 :                                 cs->bid = temp_create(n);
    1172           0 :                                 bat_destroy(n);
    1173           0 :                                 if (cs->ebid && !new)
    1174           0 :                                         temp_destroy(cs->ebid);
    1175           0 :                                 cs->ebid = 0;
    1176           0 :                                 cs->ucnt = 0;
    1177           0 :                                 if (cs->uibid && !new)
    1178           0 :                                         temp_destroy(cs->uibid);
    1179           0 :                                 if (cs->uvbid && !new)
    1180           0 :                                         temp_destroy(cs->uvbid);
    1181           0 :                                 cs->uibid = cs->uvbid = 0;
    1182           0 :                                 cs->st = ST_DEFAULT;
    1183           0 :                                 cs->cleared = true;
    1184             :                         } else {
    1185           1 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1186           0 :                                         bat_destroy(newoffsets);
    1187           0 :                                         bat_destroy(u);
    1188           0 :                                         return NULL;
    1189             :                                 }
    1190           2 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
    1191           1 :                                 bat_destroy(b);
    1192           1 :                                 if (!n) {
    1193           0 :                                         bat_destroy(newoffsets);
    1194           0 :                                         bat_destroy(u);
    1195           0 :                                         return NULL;
    1196             :                                 }
    1197           1 :                                 if (cs->ts != tr->tid) {
    1198           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1199           0 :                                                 bat_destroy(n);
    1200           0 :                                                 return NULL;
    1201             :                                         }
    1202           0 :                                         cs = &(*batp)->cs;
    1203           0 :                                         new = 1;
    1204           0 :                                         temp_dup(cs->ebid);
    1205           0 :                                         if (cs->uibid) {
    1206           0 :                                                 temp_dup(cs->uibid);
    1207           0 :                                                 temp_dup(cs->uvbid);
    1208             :                                         }
    1209             :                                 }
    1210           1 :                                 if (cs->bid && !new)
    1211           1 :                                         temp_destroy(cs->bid);
    1212           1 :                                 n = transfer_to_systrans(n);
    1213           1 :                                 if (n == NULL)
    1214             :                                         return NULL;
    1215           1 :                                 bat_set_access(n, BAT_READ);
    1216           1 :                                 cs->bid = temp_create(n);
    1217           1 :                                 bat_destroy(n);
    1218           1 :                                 cs->cleared = true;
    1219           1 :                                 i = newoffsets;
    1220             :                         }
    1221             :                 } else { /* append */
    1222          16 :                         i = newoffsets;
    1223             :                 }
    1224             :         }
    1225          17 :         bat_destroy(u);
    1226          17 :         return i;
    1227             : }
    1228             : 
    1229             : static BAT *
    1230           0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
    1231             : {
    1232           0 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1233           0 :         BAT *newoffsets = NULL;
    1234           0 :         BAT *b = NULL, *n = NULL;
    1235             : 
    1236           0 :         if (!(b = temp_descriptor(cs->bid)))
    1237             :                 return NULL;
    1238             : 
    1239           0 :         if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
    1240           0 :                 bat_destroy(b);
    1241           0 :                 return NULL;
    1242             :         } else {
    1243             :                 /* returns new offset bat if values within min/max, else decompress */
    1244           0 :                 if (!newoffsets) { /* decompress */
    1245           0 :                         if (cs->ucnt) {
    1246           0 :                                 BAT *ui = NULL, *uv = NULL;
    1247           0 :                                 BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1248           0 :                                 bat_destroy(b);
    1249           0 :                                 if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1250           0 :                                         bat_destroy(nb);
    1251           0 :                                         return NULL;
    1252             :                                 }
    1253           0 :                                 b = nb;
    1254           0 :                                 if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1255           0 :                                         bat_destroy(ui);
    1256           0 :                                         bat_destroy(uv);
    1257           0 :                                         bat_destroy(b);
    1258             :                                 }
    1259           0 :                                 bat_destroy(ui);
    1260           0 :                                 bat_destroy(uv);
    1261             :                         }
    1262           0 :                         n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
    1263           0 :                         bat_destroy(b);
    1264           0 :                         if (!n)
    1265             :                                 return NULL;
    1266           0 :                         if (cs->bid)
    1267           0 :                                 temp_destroy(cs->bid);
    1268           0 :                         n = transfer_to_systrans(n);
    1269           0 :                         if (n == NULL)
    1270             :                                 return NULL;
    1271           0 :                         bat_set_access(n, BAT_READ);
    1272           0 :                         cs->bid = temp_create(n);
    1273           0 :                         cs->ucnt = 0;
    1274           0 :                         if (cs->uibid)
    1275           0 :                                 temp_destroy(cs->uibid);
    1276           0 :                         if (cs->uvbid)
    1277           0 :                                 temp_destroy(cs->uvbid);
    1278           0 :                         cs->uibid = cs->uvbid = 0;
    1279           0 :                         cs->st = ST_DEFAULT;
    1280           0 :                         cs->cleared = true;
    1281           0 :                         b = n;
    1282             :                 } else { /* append */
    1283             :                         i = newoffsets;
    1284             :                 }
    1285             :         }
    1286           0 :         bat_destroy(b);
    1287           0 :         return i;
    1288             : }
    1289             : 
    1290             : /*
    1291             :  * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
    1292             :  */
    1293             : static int
    1294        2756 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1295             : {
    1296        2756 :         int res = LOG_OK;
    1297        2756 :         sql_delta *bat = *batp;
    1298        2756 :         column_storage *cs = &bat->cs;
    1299        2756 :         BAT *otids = tids, *oupdates = updates;
    1300             : 
    1301        2756 :         if (!BATcount(tids))
    1302             :                 return LOG_OK;
    1303             : 
    1304        2756 :         if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
    1305           6 :                 tids = BATunmask(tids);
    1306           6 :                 if (!tids)
    1307             :                         return LOG_ERR;
    1308             :         }
    1309        2756 :         if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
    1310           0 :                 updates = BATunmask(updates);
    1311           0 :                 if (!updates) {
    1312           0 :                         if (otids != tids)
    1313           0 :                                 bat_destroy(tids);
    1314           0 :                         return LOG_ERR;
    1315             :                 }
    1316        2756 :         } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
    1317          42 :                 updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
    1318          42 :                 if (!updates) {
    1319           0 :                         if (otids != tids)
    1320           0 :                                 bat_destroy(tids);
    1321           0 :                         return LOG_ERR;
    1322             :                 }
    1323             :         }
    1324             : 
    1325        2756 :         if (cs->st == ST_DICT) {
    1326             :                 /* possibly a new array is returned */
    1327           4 :                 BAT *nupdates = dict_append_bat(tr, batp, updates);
    1328           4 :                 bat = *batp;
    1329           4 :                 cs = &bat->cs;
    1330           4 :                 if (oupdates != updates)
    1331           0 :                         bat_destroy(updates);
    1332           4 :                 updates = nupdates;
    1333           4 :                 if (!updates) {
    1334           0 :                         if (otids != tids)
    1335           0 :                                 bat_destroy(tids);
    1336           0 :                         return LOG_ERR;
    1337             :                 }
    1338             :         }
    1339             : 
    1340             :         /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1341             :         /* currently only one update delta is possible */
    1342        2756 :         lock_table(tr->store, t->base.id);
    1343        2756 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1344        2756 :         if (!is_new && !cs->cleared) {
    1345        2433 :                 if (!tids->tsorted /* make sure we have simple dense or oids */) {
    1346           6 :                         BAT *sorted, *order;
    1347           6 :                         if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
    1348           0 :                                 if (otids != tids)
    1349           0 :                                         bat_destroy(tids);
    1350           0 :                                 if (oupdates != updates)
    1351           0 :                                         bat_destroy(updates);
    1352           0 :                                 unlock_table(tr->store, t->base.id);
    1353           0 :                                 return LOG_ERR;
    1354             :                         }
    1355           6 :                         if (otids != tids)
    1356           0 :                                 bat_destroy(tids);
    1357           6 :                         tids = sorted;
    1358           6 :                         BAT *nupdates = BATproject(order, updates);
    1359           6 :                         bat_destroy(order);
    1360           6 :                         if (oupdates != updates)
    1361           0 :                                 bat_destroy(updates);
    1362           6 :                         updates = nupdates;
    1363           6 :                         if (!updates) {
    1364           0 :                                 bat_destroy(tids);
    1365           0 :                                 unlock_table(tr->store, t->base.id);
    1366           0 :                                 return LOG_ERR;
    1367             :                         }
    1368             :                 }
    1369        2433 :                 assert(tids->tsorted);
    1370        2433 :                 BAT *ui = NULL, *uv = NULL;
    1371             : 
    1372             :                 /* handle updates on just inserted bits */
    1373             :                 /* handle updates on updates (within one transaction) */
    1374        2433 :                 BATiter upi = bat_iterator(updates);
    1375        2433 :                 BUN cnt = 0, ucnt = BATcount(tids);
    1376        2433 :                 BAT *b, *ins = NULL;
    1377        2433 :                 int *msk = NULL;
    1378             : 
    1379        2433 :                 if((b = temp_descriptor(cs->bid)) == NULL)
    1380             :                         res = LOG_ERR;
    1381             : 
    1382        2433 :                 if (res == LOG_OK && BATtdense(tids)) {
    1383        2412 :                         oid start = tids->tseqbase, offset = start;
    1384        2412 :                         oid end = start + ucnt;
    1385             : 
    1386        5184 :                         for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
    1387        3332 :                                 if (seg->start <= start && seg->end > start) {
    1388             :                                         /* check for delete conflicts */
    1389        2412 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1390           0 :                                                 res = LOG_CONFLICT;
    1391           0 :                                                 continue;
    1392             :                                         }
    1393             : 
    1394             :                                         /* check for inplace updates */
    1395        2412 :                                         BUN lend = end < seg->end?end:seg->end;
    1396        2412 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1397          11 :                                                 if (!ins) {
    1398          11 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1399          11 :                                                         if (!ins)
    1400             :                                                                 res = LOG_ERR;
    1401             :                                                         else {
    1402          11 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1403          11 :                                                                 msk = (int*)Tloc(ins, 0);
    1404          11 :                                                                 BUN end = (ucnt+31)/32;
    1405          11 :                                                                 memset(msk, 0, end * sizeof(int));
    1406             :                                                         }
    1407             :                                                 }
    1408          26 :                                                 for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
    1409          15 :                                                         const void *upd = BUNtail(upi, rid-offset);
    1410          15 :                                                         if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1411           0 :                                                                 res = LOG_ERR;
    1412             : 
    1413          15 :                                                         oid word = i/32;
    1414          15 :                                                         int pos = i%32;
    1415          15 :                                                         msk[word] |= 1U<<pos;
    1416          15 :                                                         cnt++;
    1417             :                                                 }
    1418             :                                         }
    1419             :                                 }
    1420        3332 :                                 if (end < seg->end)
    1421             :                                         break;
    1422             :                         }
    1423          24 :                 } else if (res == LOG_OK && complex_cand(tids)) {
    1424           3 :                         struct canditer ci;
    1425           3 :                         segment *seg = s->segs->h;
    1426           3 :                         canditer_init(&ci, NULL, tids);
    1427           3 :                         BUN i = 0;
    1428        1036 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1429        1033 :                                 oid rid = canditer_next(&ci);
    1430        1033 :                                 if (seg->end <= rid)
    1431          13 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1432        1020 :                                 else if (seg->start <= rid && seg->end > rid) {
    1433             :                                         /* check for delete conflicts */
    1434        1020 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1435           0 :                                                 res = LOG_CONFLICT;
    1436           0 :                                                 continue;
    1437             :                                         }
    1438             : 
    1439             :                                         /* check for inplace updates */
    1440        1020 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1441           0 :                                                 if (!ins) {
    1442           0 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1443           0 :                                                         if (!ins) {
    1444             :                                                                 res = LOG_ERR;
    1445             :                                                                 break;
    1446             :                                                         } else {
    1447           0 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1448           0 :                                                                 msk = (int*)Tloc(ins, 0);
    1449           0 :                                                                 BUN end = (ucnt+31)/32;
    1450           0 :                                                                 memset(msk, 0, end * sizeof(int));
    1451             :                                                         }
    1452             :                                                 }
    1453           0 :                                                 ptr upd = BUNtail(upi, i);
    1454           0 :                                                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1455           0 :                                                         res = LOG_ERR;
    1456             : 
    1457           0 :                                                 oid word = i/32;
    1458           0 :                                                 int pos = i%32;
    1459           0 :                                                 msk[word] |= 1U<<pos;
    1460           0 :                                                 cnt++;
    1461             :                                         }
    1462        1020 :                                         i++;
    1463             :                                 }
    1464             :                         }
    1465          18 :                 } else if (res == LOG_OK) {
    1466          18 :                         BUN i = 0;
    1467          18 :                         oid *rid = Tloc(tids,0);
    1468          18 :                         segment *seg = s->segs->h;
    1469          92 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1470          74 :                                 if (seg->end <= rid[i])
    1471          18 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1472          56 :                                 else if (seg->start <= rid[i] && seg->end > rid[i]) {
    1473             :                                         /* check for delete conflicts */
    1474          56 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1475           0 :                                                 res = LOG_CONFLICT;
    1476           0 :                                                 continue;
    1477             :                                         }
    1478             : 
    1479             :                                         /* check for inplace updates */
    1480          56 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1481           3 :                                                 if (!ins) {
    1482           1 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1483           1 :                                                         if (!ins) {
    1484             :                                                                 res = LOG_ERR;
    1485             :                                                                 break;
    1486             :                                                         } else {
    1487           1 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1488           1 :                                                                 msk = (int*)Tloc(ins, 0);
    1489           1 :                                                                 BUN end = (ucnt+31)/32;
    1490           1 :                                                                 memset(msk, 0, end * sizeof(int));
    1491             :                                                         }
    1492             :                                                 }
    1493           3 :                                                 const void *upd = BUNtail(upi, i);
    1494           3 :                                                 if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
    1495           0 :                                                         res = LOG_ERR;
    1496             : 
    1497           3 :                                                 oid word = i/32;
    1498           3 :                                                 int pos = i%32;
    1499           3 :                                                 msk[word] |= 1U<<pos;
    1500           3 :                                                 cnt++;
    1501             :                                         }
    1502          56 :                                         i++;
    1503             :                                 }
    1504             :                         }
    1505             :                 }
    1506             : 
    1507        2433 :                 if (res == LOG_OK && cnt < ucnt) {   /* now handle real updates */
    1508        2421 :                         if (cs->ucnt == 0) {
    1509        2419 :                                 if (cnt) {
    1510           0 :                                         BAT *nins = BATmaskedcands(0, ucnt, ins, false);
    1511           0 :                                         if (nins) {
    1512           0 :                                                 ui = BATproject(nins, tids);
    1513           0 :                                                 uv = BATproject(nins, updates);
    1514           0 :                                                 bat_destroy(nins);
    1515             :                                         }
    1516             :                                 } else {
    1517        2419 :                                         ui = temp_descriptor(tids->batCacheid);
    1518        2419 :                                         uv = temp_descriptor(updates->batCacheid);
    1519             :                                 }
    1520        2419 :                                 if (!ui || !uv) {
    1521             :                                         res = LOG_ERR;
    1522             :                                 } else {
    1523        2419 :                                         temp_destroy(cs->uibid);
    1524        2419 :                                         temp_destroy(cs->uvbid);
    1525        2419 :                                         ui = transfer_to_systrans(ui);
    1526        2419 :                                         uv = transfer_to_systrans(uv);
    1527        2419 :                                         if (ui == NULL || uv == NULL) {
    1528           0 :                                                 BBPreclaim(ui);
    1529           0 :                                                 BBPreclaim(uv);
    1530             :                                                 res = LOG_ERR;
    1531             :                                         } else {
    1532        2419 :                                                 cs->uibid = temp_create(ui);
    1533        2419 :                                                 cs->uvbid = temp_create(uv);
    1534        2419 :                                                 cs->ucnt = BATcount(ui);
    1535             :                                         }
    1536             :                                 }
    1537             :                         } else {
    1538           2 :                                 BAT *nui = NULL, *nuv = NULL;
    1539             : 
    1540             :                                 /* merge taking msk of inserted into account */
    1541           2 :                                 if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
    1542             :                                         res = LOG_ERR;
    1543             : 
    1544           2 :                                 if (res == LOG_OK) {
    1545           2 :                                         const void *upd = NULL;
    1546           2 :                                         nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
    1547           2 :                                         nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
    1548             : 
    1549           2 :                                         if (!nui || !nuv) {
    1550             :                                                 res = LOG_ERR;
    1551             :                                         } else {
    1552           2 :                                                 BATiter ovi = bat_iterator(uv);
    1553             : 
    1554             :                                                 /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
    1555           2 :                                                 BUN uip = 0, uie = BATcount(ui);
    1556           2 :                                                 BUN nip = 0, nie = BATcount(tids);
    1557           2 :                                                 oid uiseqb = ui->tseqbase;
    1558           2 :                                                 oid niseqb = tids->tseqbase;
    1559           2 :                                                 oid *uipt = NULL, *nipt = NULL;
    1560           2 :                                                 BATiter uii = bat_iterator(ui);
    1561           2 :                                                 BATiter tidsi = bat_iterator(tids);
    1562           2 :                                                 if (!BATtdensebi(&uii))
    1563           1 :                                                         uipt = uii.base;
    1564           2 :                                                 if (!BATtdensebi(&tidsi))
    1565           1 :                                                         nipt = tidsi.base;
    1566          20 :                                                 while (uip < uie && nip < nie && res == LOG_OK) {
    1567          18 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1568          18 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1569             : 
    1570          18 :                                                         if (uiv < niv) {
    1571           0 :                                                                 upd = BUNtail(ovi, uip);
    1572           0 :                                                                 if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1573           0 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1574             :                                                                         res = LOG_ERR;
    1575           0 :                                                                 uip++;
    1576          18 :                                                         } else if (uiv == niv) {
    1577             :                                                                 /* handle == */
    1578          18 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1579          18 :                                                                         upd = BUNtail(upi, nip);
    1580          36 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1581          18 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1582             :                                                                                 res = LOG_ERR;
    1583             :                                                                 } else {
    1584           0 :                                                                         upd = BUNtail(ovi, uip);
    1585           0 :                                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1586           0 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1587             :                                                                                 res = LOG_ERR;
    1588             :                                                                 }
    1589          18 :                                                                 uip++;
    1590          18 :                                                                 nip++;
    1591             :                                                         } else { /* uiv > niv */
    1592           0 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1593           0 :                                                                         upd = BUNtail(upi, nip);
    1594           0 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1595           0 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1596             :                                                                                 res = LOG_ERR;
    1597             :                                                                 }
    1598           0 :                                                                 nip++;
    1599             :                                                         }
    1600             :                                                 }
    1601           2 :                                                 while (uip < uie && res == LOG_OK) {
    1602           0 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1603           0 :                                                         upd = BUNtail(ovi, uip);
    1604           0 :                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1605           0 :                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1606             :                                                                 res = LOG_ERR;
    1607           0 :                                                         uip++;
    1608             :                                                 }
    1609           2 :                                                 while (nip < nie && res == LOG_OK) {
    1610           0 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1611           0 :                                                         if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1612           0 :                                                                 upd = BUNtail(upi, nip);
    1613           0 :                                                                 if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1614           0 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1615             :                                                                         res = LOG_ERR;
    1616             :                                                         }
    1617           0 :                                                         nip++;
    1618             :                                                 }
    1619           2 :                                                 bat_iterator_end(&uii);
    1620           2 :                                                 bat_iterator_end(&tidsi);
    1621           2 :                                                 bat_iterator_end(&ovi);
    1622           2 :                                                 if (res == LOG_OK) {
    1623           2 :                                                         temp_destroy(cs->uibid);
    1624           2 :                                                         temp_destroy(cs->uvbid);
    1625           2 :                                                         nui = transfer_to_systrans(nui);
    1626           2 :                                                         nuv = transfer_to_systrans(nuv);
    1627           2 :                                                         if (nui == NULL || nuv == NULL) {
    1628             :                                                                 res = LOG_ERR;
    1629             :                                                         } else {
    1630           2 :                                                                 cs->uibid = temp_create(nui);
    1631           2 :                                                                 cs->uvbid = temp_create(nuv);
    1632           2 :                                                                 cs->ucnt = BATcount(nui);
    1633             :                                                         }
    1634             :                                                 }
    1635             :                                         }
    1636           2 :                                         bat_destroy(nui);
    1637           2 :                                         bat_destroy(nuv);
    1638             :                                 }
    1639             :                         }
    1640             :                 }
    1641        2433 :                 bat_iterator_end(&upi);
    1642        2433 :                 bat_destroy(b);
    1643        2433 :                 unlock_table(tr->store, t->base.id);
    1644        2433 :                 bat_destroy(ins);
    1645        2433 :                 bat_destroy(ui);
    1646        2433 :                 bat_destroy(uv);
    1647        2433 :                 if (otids != tids)
    1648          10 :                         bat_destroy(tids);
    1649        2433 :                 if (oupdates != updates)
    1650          11 :                         bat_destroy(updates);
    1651        2433 :                 return res;
    1652             :         } else if (is_new || cs->cleared) {
    1653         323 :                 BAT *b = temp_descriptor(cs->bid);
    1654             : 
    1655         323 :                 if (b == NULL) {
    1656             :                         res = LOG_ERR;
    1657             :                 } else {
    1658         323 :                         if (BATcount(b)==0) {
    1659           1 :                                 if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
    1660           0 :                                         res = LOG_ERR;
    1661         322 :                         } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
    1662           0 :                                 res = LOG_ERR;
    1663         323 :                         BBPcold(b->batCacheid);
    1664         323 :                         bat_destroy(b);
    1665             :                 }
    1666             :         }
    1667         323 :         unlock_table(tr->store, t->base.id);
    1668         323 :         if (otids != tids)
    1669           2 :                 bat_destroy(tids);
    1670         323 :         if (oupdates != updates)
    1671          41 :                 bat_destroy(updates);
    1672             :         return res;
    1673             : }
    1674             : 
    1675             : static int
    1676        2756 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1677             : {
    1678        2756 :         return cs_update_bat(tr, bat, t, tids, updates, is_new);
    1679             : }
    1680             : 
    1681             : static void *
    1682           4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
    1683             : {
    1684           4 :         void *newoffsets = NULL;
    1685           4 :         sql_delta *bat = *batp;
    1686           4 :         column_storage *cs = &bat->cs;
    1687           4 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1688             : 
    1689           4 :         if (!u)
    1690             :                 return NULL;
    1691           4 :         BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
    1692           4 :         if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
    1693           0 :                 bat_destroy(u);
    1694           0 :                 return NULL;
    1695             :         } else {
    1696           4 :                 int new = 0;
    1697             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1698           4 :                 if (BATcount(u) >= max_cnt) {
    1699           0 :                         if (max_cnt == INT_MAX) { /* decompress */
    1700             :                                 if (!(b = temp_descriptor(cs->bid))) {
    1701             :                                         bat_destroy(u);
    1702             :                                         return NULL;
    1703             :                                 }
    1704             :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1705             :                                 /* TODO decompress updates if any */
    1706             :                                 bat_destroy(b);
    1707             :                                 assert(newoffsets == NULL);
    1708             :                                 if (!n) {
    1709             :                                         bat_destroy(u);
    1710             :                                         return NULL;
    1711             :                                 }
    1712             :                                 if (cs->ts != tr->tid) {
    1713             :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1714             :                                                 bat_destroy(n);
    1715             :                                                 bat_destroy(u);
    1716             :                                                 return NULL;
    1717             :                                         }
    1718             :                                         cs = &(*batp)->cs;
    1719             :                                         new = 1;
    1720             :                                         cs->uibid = cs->uvbid = 0;
    1721             :                                 }
    1722             :                                 if (cs->bid && !new)
    1723             :                                         temp_destroy(cs->bid);
    1724             :                                 n = transfer_to_systrans(n);
    1725             :                                 if (n == NULL) {
    1726             :                                         bat_destroy(u);
    1727             :                                         return NULL;
    1728             :                                 }
    1729             :                                 bat_set_access(n, BAT_READ);
    1730             :                                 cs->bid = temp_create(n);
    1731             :                                 bat_destroy(n);
    1732             :                                 if (cs->ebid && !new)
    1733             :                                         temp_destroy(cs->ebid);
    1734             :                                 cs->ebid = 0;
    1735             :                                 cs->st = ST_DEFAULT;
    1736             :                                 /* at append_col the column's storage type is cleared */
    1737             :                                 cs->cleared = true;
    1738             :                         } else {
    1739           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1740           0 :                                         GDKfree(newoffsets);
    1741           0 :                                         bat_destroy(u);
    1742           0 :                                         return NULL;
    1743             :                                 }
    1744           0 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
    1745           0 :                                 bat_destroy(b);
    1746           0 :                                 if (!n) {
    1747           0 :                                         GDKfree(newoffsets);
    1748           0 :                                         bat_destroy(u);
    1749           0 :                                         return NULL;
    1750             :                                 }
    1751           0 :                                 if (cs->ts != tr->tid) {
    1752           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1753           0 :                                                 bat_destroy(u);
    1754           0 :                                                 bat_destroy(n);
    1755           0 :                                                 return NULL;
    1756             :                                         }
    1757           0 :                                         cs = &(*batp)->cs;
    1758           0 :                                         new = 1;
    1759           0 :                                         temp_dup(cs->ebid);
    1760           0 :                                         if (cs->uibid) {
    1761           0 :                                                 temp_dup(cs->uibid);
    1762           0 :                                                 temp_dup(cs->uvbid);
    1763             :                                         }
    1764             :                                 }
    1765           0 :                                 if (cs->bid)
    1766           0 :                                         temp_destroy(cs->bid);
    1767           0 :                                 n = transfer_to_systrans(n);
    1768           0 :                                 if (n == NULL) {
    1769           0 :                                         bat_destroy(u);
    1770           0 :                                         return NULL;
    1771             :                                 }
    1772           0 :                                 bat_set_access(n, BAT_READ);
    1773           0 :                                 cs->bid = temp_create(n);
    1774           0 :                                 bat_destroy(n);
    1775           0 :                                 cs->cleared = true;
    1776           0 :                                 i = newoffsets;
    1777             :                         }
    1778             :                 } else { /* append */
    1779           4 :                         i = newoffsets;
    1780             :                 }
    1781             :         }
    1782           4 :         bat_destroy(u);
    1783           4 :         return i;
    1784             : }
    1785             : 
    1786             : static void *
    1787           1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
    1788             : {
    1789           1 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1790           1 :         void *newoffsets = NULL;
    1791           1 :         BAT *b = NULL, *n = NULL;
    1792             : 
    1793           1 :         if (!(b = temp_descriptor(cs->bid)))
    1794             :                 return NULL;
    1795             : 
    1796           1 :         if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
    1797           0 :                 bat_destroy(b);
    1798           0 :                 return NULL;
    1799             :         } else {
    1800             :                 /* returns new offset bat if values within min/max, else decompress */
    1801           1 :                 if (!newoffsets) {
    1802           1 :                         n = FORdecompress_(b, offsetval, tt, PERSISTENT);
    1803           1 :                         bat_destroy(b);
    1804           1 :                         if (!n)
    1805             :                                 return NULL;
    1806             :                         /* TODO decompress updates if any */
    1807           1 :                         if (cs->bid)
    1808           1 :                                 temp_destroy(cs->bid);
    1809           1 :                         n = transfer_to_systrans(n);
    1810           1 :                         if (n == NULL)
    1811             :                                 return NULL;
    1812           1 :                         bat_set_access(n, BAT_READ);
    1813           1 :                         cs->bid = temp_create(n);
    1814           1 :                         cs->st = ST_DEFAULT;
    1815             :                         /* at append_col the column's storage type is cleared */
    1816           1 :                         cs->cleared = true;
    1817           1 :                         b = n;
    1818             :                 } else { /* append */
    1819             :                         i = newoffsets;
    1820             :                 }
    1821             :         }
    1822           1 :         bat_destroy(b);
    1823           1 :         return i;
    1824             : }
    1825             : 
    1826             : static int
    1827        6908 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
    1828             : {
    1829        6908 :         void *oupd = upd;
    1830        6908 :         sql_delta *bat = *batp;
    1831        6908 :         column_storage *cs = &bat->cs;
    1832        6908 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1833        6908 :         assert(!is_oid_nil(rid));
    1834        6908 :         int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
    1835             : 
    1836        6908 :         if (cs->st == ST_DICT) {
    1837             :                 /* possibly a new array is returned */
    1838           0 :                 upd = dict_append_val(tr, batp, upd, 1);
    1839           0 :                 bat = *batp;
    1840           0 :                 cs = &bat->cs;
    1841           0 :                 if (!upd)
    1842             :                         return LOG_ERR;
    1843             :         }
    1844             : 
    1845             :         /* check if rid is insert ? */
    1846        6908 :         if (!inplace) {
    1847             :                 /* check conflict */
    1848        4050 :                 if (segments_is_deleted(s->segs->h, tr, rid)) {
    1849           0 :                         if (oupd != upd)
    1850           0 :                                 GDKfree(upd);
    1851           0 :                         return LOG_CONFLICT;
    1852             :                 }
    1853        4050 :                 BAT *ui, *uv;
    1854             : 
    1855             :                 /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1856             :                 /* currently only one update delta is possible */
    1857        4050 :                 if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1858           0 :                         if (oupd != upd)
    1859           0 :                                 GDKfree(upd);
    1860           0 :                         return LOG_ERR;
    1861             :                 }
    1862             : 
    1863        4050 :                 assert(uv->ttype);
    1864        4050 :                 assert(BATcount(ui) == BATcount(uv));
    1865        8100 :                 if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
    1866        4050 :                     BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
    1867           0 :                         if (oupd != upd)
    1868           0 :                                 GDKfree(upd);
    1869           0 :                         bat_destroy(ui);
    1870           0 :                         bat_destroy(uv);
    1871           0 :                         return LOG_ERR;
    1872             :                 }
    1873        4050 :                 assert(BATcount(ui) == BATcount(uv));
    1874        4050 :                 bat_destroy(ui);
    1875        4050 :                 bat_destroy(uv);
    1876        4050 :                 cs->ucnt++;
    1877             :         } else {
    1878        2858 :                 BAT *b = NULL;
    1879             : 
    1880        2858 :                 if((b = temp_descriptor(cs->bid)) == NULL) {
    1881           0 :                         if (oupd != upd)
    1882           0 :                                 GDKfree(upd);
    1883           0 :                         return LOG_ERR;
    1884             :                 }
    1885        2858 :                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
    1886           0 :                         if (oupd != upd)
    1887           0 :                                 GDKfree(upd);
    1888           0 :                         bat_destroy(b);
    1889           0 :                         return LOG_ERR;
    1890             :                 }
    1891        2858 :                 bat_destroy(b);
    1892             :         }
    1893        6908 :         if (oupd != upd)
    1894           0 :                 GDKfree(upd);
    1895             :         return LOG_OK;
    1896             : }
    1897             : 
    1898             : static int
    1899        6908 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
    1900             : {
    1901        6908 :         int res = LOG_OK;
    1902        6908 :         lock_table(tr->store, t->base.id);
    1903        6908 :         res = cs_update_val(tr, bat, t, rid, upd, is_new);
    1904        6908 :         unlock_table(tr->store, t->base.id);
    1905        6908 :         return res;
    1906             : }
    1907             : 
    1908             : static int
    1909      158761 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
    1910             : {
    1911      158761 :         (void)tr;
    1912      158761 :         if (!ocs)
    1913             :                 return LOG_OK;
    1914      158761 :         cs->bid = ocs->bid;
    1915      158761 :         cs->ebid = ocs->ebid;
    1916      158761 :         cs->uibid = ocs->uibid;
    1917      158761 :         cs->uvbid = ocs->uvbid;
    1918      158761 :         cs->ucnt = ocs->ucnt;
    1919             : 
    1920      158761 :         if (temp) {
    1921       25967 :                 cs->bid = temp_copy(cs->bid, true, false);
    1922       25954 :                 if (cs->bid == BID_NIL)
    1923             :                         return LOG_ERR;
    1924             :         } else {
    1925      132794 :                 temp_dup(cs->bid);
    1926             :         }
    1927      158770 :         if (cs->ebid)
    1928           6 :                 temp_dup(cs->ebid);
    1929      158770 :         cs->ucnt = 0;
    1930      158770 :         cs->uibid = e_bat(TYPE_oid);
    1931      158798 :         cs->uvbid = e_bat(type);
    1932      158795 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1933             :                 return LOG_ERR;
    1934      158795 :         cs->st = ocs->st;
    1935      158795 :         return LOG_OK;
    1936             : }
    1937             : 
    1938             : static void
    1939      302284 : destroy_delta(sql_delta *b, bool recursive)
    1940             : {
    1941      302284 :         if (ATOMIC_DEC(&b->cs.refcnt) > 0)
    1942             :                 return;
    1943      282978 :         if (recursive && b->next)
    1944      128416 :                 destroy_delta(b->next, true);
    1945      282978 :         if (b->cs.uibid)
    1946       86894 :                 temp_destroy(b->cs.uibid);
    1947      282978 :         if (b->cs.uvbid)
    1948       86894 :                 temp_destroy(b->cs.uvbid);
    1949      282978 :         if (b->cs.bid)
    1950      282978 :                 temp_destroy(b->cs.bid);
    1951      282978 :         if (b->cs.ebid)
    1952          61 :                 temp_destroy(b->cs.ebid);
    1953      282978 :         b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
    1954      282978 :         _DELETE(b);
    1955             : }
    1956             : 
    1957             : static sql_delta *
    1958    15576026 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
    1959             : {
    1960    15576026 :         sql_delta *obat = ATOMIC_PTR_GET(&c->data);
    1961             : 
    1962    15576026 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    1963    15443288 :                 return obat;
    1964      132738 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    1965             :                 /* abort */
    1966          12 :                 if (update_conflict)
    1967           4 :                         *update_conflict = true;
    1968           8 :                 else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
    1969           8 :                         return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
    1970           4 :                 return NULL;
    1971             :         }
    1972      132726 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
    1973             :                 return NULL;
    1974      132744 :         sql_delta* bat = ZNEW(sql_delta);
    1975      132779 :         if (!bat)
    1976             :                 return NULL;
    1977      132779 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    1978      132779 :         if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
    1979           0 :                 destroy_delta(bat, false);
    1980           0 :                 return NULL;
    1981             :         }
    1982      132797 :         bat->cs.ts = tr->tid;
    1983             :         /* only one writer else abort */
    1984      132797 :         bat->next = obat;
    1985      132797 :         if (obat)
    1986      132797 :                 bat->nr_updates = obat->nr_updates;
    1987      132797 :         if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
    1988           0 :                 bat->next = NULL;
    1989           0 :                 destroy_delta(bat, false);
    1990           0 :                 if (update_conflict)
    1991           0 :                         *update_conflict = true;
    1992           0 :                 return NULL;
    1993             :         }
    1994             :         return bat;
    1995             : }
    1996             : 
    1997             : static int
    1998        9664 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
    1999             : {
    2000        9664 :         int ok = LOG_OK;
    2001             : 
    2002        9664 :         if (is_bat) {
    2003        2756 :                 BAT *tids = incoming_tids;
    2004        2756 :                 BAT *values = incoming_values;
    2005        2756 :                 if (BATcount(tids) == 0)
    2006             :                         return LOG_OK;
    2007        2756 :                 ok = delta_update_bat(tr, delta, table, tids, values, is_new);
    2008             :         } else {
    2009        6908 :                 ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
    2010             :         }
    2011             :         return ok;
    2012             : }
    2013             : 
    2014             : static int
    2015        9675 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
    2016             : {
    2017        9675 :         int res = LOG_OK;
    2018        9675 :         bool update_conflict = false;
    2019        9675 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2020             : 
    2021        9675 :         if (isbat) {
    2022        2764 :                 BAT *t = tids;
    2023        2764 :                 if (!BATcount(t))
    2024             :                         return LOG_OK;
    2025             :         }
    2026             : 
    2027        9389 :         if (c == NULL)
    2028             :                 return LOG_ERR;
    2029             : 
    2030        9389 :         if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
    2031           4 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2032             : 
    2033        9385 :         assert(delta && delta->cs.ts == tr->tid);
    2034        9385 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    2035        9385 :         if (odelta != delta)
    2036        3362 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    2037             : 
    2038        9385 :         odelta = delta;
    2039        9385 :         if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
    2040             :                 return res;
    2041        9385 :         assert(delta == odelta);
    2042        9385 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2043           0 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2044             :         return res;
    2045             : }
    2046             : 
    2047             : static sql_delta *
    2048        2457 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
    2049             : {
    2050        2457 :         sql_delta *obat = ATOMIC_PTR_GET(&i->data);
    2051             : 
    2052        2457 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    2053        2429 :                 return obat;
    2054          28 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    2055             :                 /* abort */
    2056           0 :                 if (update_conflict)
    2057           0 :                         *update_conflict = true;
    2058           0 :                 return NULL;
    2059             :         }
    2060          28 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
    2061             :                 return NULL;
    2062          28 :         sql_delta* bat = ZNEW(sql_delta);
    2063          28 :         if (!bat)
    2064             :                 return NULL;
    2065          28 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2066          33 :         if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
    2067           0 :                 destroy_delta(bat, false);
    2068           0 :                 return NULL;
    2069             :         }
    2070          28 :         bat->cs.ts = tr->tid;
    2071             :         /* only one writer else abort */
    2072          28 :         bat->next = obat;
    2073          28 :         if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
    2074           0 :                 bat->next = NULL;
    2075           0 :                 destroy_delta(bat, false);
    2076           0 :                 if (update_conflict)
    2077           0 :                         *update_conflict = true;
    2078           0 :                 return NULL;
    2079             :         }
    2080             :         return bat;
    2081             : }
    2082             : 
    2083             : static int
    2084         782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
    2085             : {
    2086         782 :         int res = LOG_OK;
    2087         782 :         bool update_conflict = false;
    2088         782 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    2089             : 
    2090         782 :         if (isbat) {
    2091         782 :                 BAT *t = tids;
    2092         782 :                 if (!BATcount(t))
    2093             :                         return LOG_OK;
    2094             :         }
    2095             : 
    2096         279 :         if (i == NULL)
    2097             :                 return LOG_ERR;
    2098             : 
    2099         279 :         if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
    2100           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2101             : 
    2102         279 :         assert(delta && delta->cs.ts == tr->tid);
    2103         279 :         if (odelta != delta)
    2104          22 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    2105             : 
    2106         279 :         odelta = delta;
    2107         279 :         res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
    2108         279 :         assert(delta == odelta);
    2109             :         return res;
    2110             : }
    2111             : 
    2112             : static int
    2113      149187 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
    2114             : {
    2115      149187 :         BAT *b, *oi = i;
    2116      149187 :         int err = 0;
    2117      149187 :         sql_delta *bat = *batp;
    2118             : 
    2119      149187 :         assert(!offsets || BATcount(offsets) == BATcount(i));
    2120      149187 :         if (!BATcount(i))
    2121             :                 return LOG_OK;
    2122      149187 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
    2123             :                 return LOG_ERR;
    2124             : 
    2125      149187 :         lock_column(tr->store, id);
    2126      150003 :         if (bat->cs.st == ST_DICT) {
    2127          13 :                 BAT *ni = dict_append_bat(tr, batp, oi);
    2128          13 :                 bat = *batp;
    2129          13 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2130           0 :                         bat_destroy(oi);
    2131          13 :                 oi = ni;
    2132          13 :                 if (!oi) {
    2133           0 :                         unlock_column(tr->store, id);
    2134           0 :                         return LOG_ERR;
    2135             :                 }
    2136             :         }
    2137      150003 :         if (bat->cs.st == ST_FOR) {
    2138           0 :                 BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
    2139           0 :                 bat = *batp;
    2140           0 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2141           0 :                         bat_destroy(oi);
    2142           0 :                 oi = ni;
    2143           0 :                 if (!oi) {
    2144           0 :                         unlock_column(tr->store, id);
    2145           0 :                         return LOG_ERR;
    2146             :                 }
    2147             :         }
    2148             : 
    2149      150003 :         b = temp_descriptor(bat->cs.bid);
    2150      149863 :         if (b == NULL) {
    2151           0 :                 unlock_column(tr->store, id);
    2152           0 :                 if (oi != i)
    2153           0 :                         bat_destroy(oi);
    2154           0 :                 return LOG_ERR;
    2155             :         }
    2156      149863 :         if (!offsets && offset == b->hseqbase+BATcount(b)) {
    2157      149675 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2158         785 :                         err = 1;
    2159         176 :         } else if (!offsets) {
    2160         176 :                 if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
    2161         785 :                         err = 1;
    2162          12 :         } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
    2163           0 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2164         785 :                         err = 1;
    2165          12 :         } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
    2166         785 :                         err = 1;
    2167             :         }
    2168      149966 :         bat_destroy(b);
    2169      150213 :         unlock_column(tr->store, id);
    2170             : 
    2171      150467 :         if (oi != i)
    2172          13 :                 bat_destroy(oi);
    2173      150467 :         return (err)?LOG_ERR:LOG_OK;
    2174             : }
    2175             : 
    2176             : // Look at the offsets and find where the replacements end and the appends begin.
    2177             : static BUN
    2178           0 : start_of_appends(BAT *offsets, BUN bcnt)
    2179             : {
    2180           0 :         BUN ocnt = BATcount(offsets);
    2181           0 :         if (ocnt == 0)
    2182             :                 return 0;
    2183             : 
    2184           0 :         BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
    2185           0 :         if (highest < bcnt)
    2186             :                 // all are replacements
    2187             :                 return ocnt;
    2188             : 
    2189             :         // reason backward to find the first append.
    2190             :         // Suppose offsets has 15 entries, bcnt == 100
    2191             :         // and the highest offset in offsets is 109.
    2192           0 :         BUN new_bcnt = highest + 1; // 110
    2193           0 :         BUN nappends = new_bcnt - bcnt; // 10
    2194           0 :         BUN nreplacements = ocnt - nappends; // 5
    2195             : 
    2196             :         // The first append should be to position bcnt
    2197           0 :         assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
    2198             : 
    2199             :         return nreplacements;
    2200             : }
    2201             : 
    2202             : 
    2203             : static int
    2204    15288589 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
    2205             : {
    2206    15288589 :         void *oi = i;
    2207    15288589 :         BAT *b;
    2208    15288589 :         lock_column(tr->store, id);
    2209    15291548 :         sql_delta *bat = *batp;
    2210             : 
    2211    15291548 :         if (bat->cs.st == ST_DICT) {
    2212             :                 /* possibly a new array is returned */
    2213           4 :                 i = dict_append_val(tr, batp, i, cnt);
    2214           4 :                 bat = *batp;
    2215           4 :                 if (!i) {
    2216           0 :                         unlock_column(tr->store, id);
    2217           0 :                         return LOG_ERR;
    2218             :                 }
    2219             :         }
    2220    15291548 :         if (bat->cs.st == ST_FOR) {
    2221             :                 /* possibly a new array is returned */
    2222           1 :                 i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
    2223           1 :                 bat = *batp;
    2224           1 :                 if (!i) {
    2225           0 :                         unlock_column(tr->store, id);
    2226           0 :                         return LOG_ERR;
    2227             :                 }
    2228             :         }
    2229             : 
    2230    15291548 :         b = temp_descriptor(bat->cs.bid);
    2231    15289813 :         if (b == NULL) {
    2232           0 :                 if (i != oi)
    2233           0 :                         GDKfree(i);
    2234           0 :                 unlock_column(tr->store, id);
    2235           0 :                 return LOG_ERR;
    2236             :         }
    2237    15289813 :         BUN bcnt = BATcount(b);
    2238             : 
    2239    15289813 :         if (offsets) {
    2240             :                 // The first few might be replacements while later items might be appends.
    2241             :                 // Handle the replacements here while leaving the appends to the code below.
    2242           0 :                 BUN nreplacements = start_of_appends(offsets, bcnt);
    2243             : 
    2244           0 :                 oid *start = Tloc(offsets, 0);
    2245           0 :                 if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
    2246           0 :                         bat_destroy(b);
    2247           0 :                         if (i != oi)
    2248           0 :                                 GDKfree(i);
    2249           0 :                         unlock_column(tr->store, id);
    2250           0 :                         return LOG_ERR;
    2251             :                 }
    2252             : 
    2253             :                 // Replacements have been handled. The rest are appends.
    2254           0 :                 assert(offset == oid_nil);
    2255           0 :                 offset = bcnt;
    2256           0 :                 cnt -= nreplacements;
    2257             :         }
    2258             : 
    2259    15289813 :         if (bcnt > offset){
    2260      329290 :                 size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
    2261      329290 :                 if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
    2262           0 :                         bat_destroy(b);
    2263           0 :                         if (i != oi)
    2264           0 :                                 GDKfree(i);
    2265           0 :                         unlock_column(tr->store, id);
    2266           0 :                         return LOG_ERR;
    2267             :                 }
    2268      329439 :                 cnt -= ccnt;
    2269      329439 :                 offset += ccnt;
    2270             :         }
    2271    15289962 :         if (cnt) {
    2272    14960505 :                 if (BATcount(b) < offset) { /* add space */
    2273        8308 :                         BUN d = offset - BATcount(b);
    2274        8308 :                         if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
    2275           0 :                                 bat_destroy(b);
    2276           0 :                                 if (i != oi)
    2277           0 :                                         GDKfree(i);
    2278           0 :                                 unlock_column(tr->store, id);
    2279           0 :                                 return LOG_ERR;
    2280             :                         }
    2281             :                 }
    2282    14960503 :                 if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
    2283           0 :                         bat_destroy(b);
    2284           0 :                         if (i != oi)
    2285           0 :                                 GDKfree(i);
    2286           0 :                         unlock_column(tr->store, id);
    2287           0 :                         return LOG_ERR;
    2288             :                 }
    2289             :         }
    2290    15290825 :         bat_destroy(b);
    2291    15290355 :         if (i != oi)
    2292           4 :                 GDKfree(i);
    2293    15290355 :         unlock_column(tr->store, id);
    2294    15290355 :         return LOG_OK;
    2295             : }
    2296             : 
    2297             : static int
    2298       25970 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
    2299             : {
    2300       25970 :         if (!(bat->segs = new_segments(tr, 0)))
    2301             :                 return LOG_ERR;
    2302       25967 :         return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
    2303             : }
    2304             : 
    2305             : static int
    2306    15437837 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type)
    2307             : {
    2308    15437837 :         int ok = LOG_OK;
    2309             : 
    2310    15437837 :         if ((*delta)->cs.merged)
    2311       36526 :                 (*delta)->cs.merged = false; /* TODO needs to move */
    2312    15437837 :         if (isbat) {
    2313      149127 :                 BAT *bat = incoming_data;
    2314             : 
    2315      149127 :                 if (BATcount(bat))
    2316      149255 :                         ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
    2317             :         } else {
    2318    15288710 :                 ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
    2319             :         }
    2320    15442435 :         return ok;
    2321             : }
    2322             : 
    2323             : static int
    2324    15437593 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2325             : {
    2326    15437593 :         int res = LOG_OK;
    2327    15437593 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2328             : 
    2329    15437593 :         if (isbat) {
    2330      149272 :                 BAT *t = data;
    2331      149272 :                 if (!BATcount(t))
    2332             :                         return LOG_OK;
    2333             :         }
    2334             : 
    2335    15436816 :         if ((delta = bind_col_data(tr, c, NULL)) == NULL)
    2336             :                 return LOG_ERR;
    2337             : 
    2338    15437466 :         assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
    2339             : 
    2340    15437466 :         odelta = delta;
    2341    15437466 :         if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type)) != LOG_OK)
    2342             :                 return res;
    2343    15440418 :         if (odelta != delta) {
    2344           0 :                 delta->next = odelta;
    2345           0 :                 if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
    2346           0 :                         delta->next = NULL;
    2347           0 :                         destroy_delta(delta, false);
    2348           0 :                         return LOG_CONFLICT;
    2349             :                 }
    2350             :         }
    2351    15440418 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2352           1 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2353             :         return res;
    2354             : }
    2355             : 
    2356             : static int
    2357        2184 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2358             : {
    2359        2184 :         int res = LOG_OK;
    2360        2184 :         sql_delta *delta;
    2361             : 
    2362        2184 :         if (isbat) {
    2363        1010 :                 BAT *t = data;
    2364        1010 :                 if (!BATcount(t))
    2365             :                         return LOG_OK;
    2366             :         }
    2367             : 
    2368        2172 :         if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
    2369             :                 return LOG_ERR;
    2370             : 
    2371        2172 :         assert(delta->cs.st == ST_DEFAULT);
    2372             : 
    2373        2172 :         res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL);
    2374        2172 :         return res;
    2375             : }
    2376             : 
    2377             : static int
    2378       66259 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
    2379             : {
    2380       66259 :         int err = 0;
    2381             : 
    2382             :         /* TODO check for conflicting updates */
    2383       66259 :         (void)rid;
    2384       66259 :         (void)cnt;
    2385      516677 :         for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
    2386      450418 :                 sql_column *c = n->data;
    2387      450418 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    2388             : 
    2389             :                 /* check for active updates */
    2390      450418 :                 if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
    2391             :                         return 1;
    2392             :         }
    2393             :         return 0;
    2394             : }
    2395             : 
    2396             : static int
    2397       64311 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
    2398             : {
    2399       64311 :         int in_transaction = segments_in_transaction(tr, t);
    2400             : 
    2401       64311 :         lock_table(tr->store, t->base.id);
    2402             :         /* find segment of rid, split, mark new segment deleted (for tr->tid) */
    2403       64311 :         segment *seg = s->segs->h, *p = NULL;
    2404    51205442 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2405    51205442 :                 if (seg->start <= rid && seg->end > rid) {
    2406       64311 :                         if (!SEG_VALID_4_DELETE(seg,tr)) {
    2407           4 :                                 unlock_table(tr->store, t->base.id);
    2408           4 :                                 return LOG_CONFLICT;
    2409             :                         }
    2410       64307 :                         if (deletes_conflict_updates( tr, t, rid, 1)) {
    2411           0 :                                 unlock_table(tr->store, t->base.id);
    2412           0 :                                 return LOG_CONFLICT;
    2413             :                         }
    2414       64307 :                         if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
    2415           0 :                                 unlock_table(tr->store, t->base.id);
    2416           0 :                                 return LOG_ERR;
    2417             :                         }
    2418             :                         break;
    2419             :                 }
    2420             :         }
    2421       64307 :         unlock_table(tr->store, t->base.id);
    2422       64307 :         if (!in_transaction)
    2423       12783 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2424             :         return LOG_OK;
    2425             : }
    2426             : 
    2427             : static int
    2428        1950 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
    2429             : {
    2430        1950 :         segment *seg = *Seg, *p = NULL;
    2431        6143 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2432        6141 :                 if (seg->start <= start && seg->end > start) {
    2433        2000 :                         size_t lcnt = cnt;
    2434        2000 :                         if (start+lcnt > seg->end)
    2435          54 :                                 lcnt = seg->end-start;
    2436        2000 :                         if (SEG_IS_DELETED(seg, tr)) {
    2437          47 :                                 start += lcnt;
    2438          47 :                                 cnt -= lcnt;
    2439          47 :                                 continue;
    2440        1953 :                         } else if (!SEG_VALID_4_DELETE(seg, tr))
    2441           1 :                                 return LOG_CONFLICT;
    2442        1952 :                         if (deletes_conflict_updates( tr, t, start, lcnt))
    2443             :                                 return LOG_CONFLICT;
    2444        1952 :                         *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
    2445        1952 :                         if (!seg)
    2446             :                                 return LOG_ERR;
    2447        1952 :                         start += lcnt;
    2448        1952 :                         cnt -= lcnt;
    2449             :                 }
    2450        6093 :                 if (start+cnt <= seg->end)
    2451             :                         break;
    2452             :         }
    2453             :         return LOG_OK;
    2454             : }
    2455             : 
    2456             : static int
    2457         486 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
    2458             : {
    2459         486 :         segment *seg = s->segs->h;
    2460         486 :         return seg_delete_range(tr, t, s, &seg, start, cnt);
    2461             : }
    2462             : 
    2463             : static int
    2464         302 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
    2465             : {
    2466         302 :         int in_transaction = segments_in_transaction(tr, t);
    2467         302 :         BAT *oi = i;    /* update ids */
    2468         302 :         int ok = LOG_OK;
    2469             : 
    2470         302 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
    2471             :                 return LOG_ERR;
    2472         302 :         if (BATcount(i)) {
    2473         539 :                 if (BATtdense(i)) {
    2474         237 :                         size_t start = i->tseqbase;
    2475         237 :                         size_t cnt = BATcount(i);
    2476             : 
    2477         237 :                         lock_table(tr->store, t->base.id);
    2478         237 :                         ok = delete_range(tr, t, s, start, cnt);
    2479         237 :                         unlock_table(tr->store, t->base.id);
    2480          65 :                 } else if (complex_cand(i)) {
    2481           0 :                         struct canditer ci;
    2482           0 :                         oid f = 0, l = 0, cur = 0;
    2483             : 
    2484           0 :                         canditer_init(&ci, NULL, i);
    2485           0 :                         cur = f = canditer_next(&ci);
    2486             : 
    2487           0 :                         lock_table(tr->store, t->base.id);
    2488           0 :                         if (!is_oid_nil(f)) {
    2489           0 :                                 segment *seg = s->segs->h;
    2490           0 :                                 for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
    2491           0 :                                         if (cur+1 == l) {
    2492           0 :                                                 cur++;
    2493           0 :                                                 continue;
    2494             :                                         }
    2495           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2496           0 :                                         f = cur = l;
    2497             :                                 }
    2498           0 :                                 if (ok == LOG_OK)
    2499           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2500             :                         }
    2501           0 :                         unlock_table(tr->store, t->base.id);
    2502             :                 } else {
    2503          65 :                         if (!i->tsorted) {
    2504           0 :                                 assert(oi == i);
    2505           0 :                                 BAT *ni = NULL;
    2506           0 :                                 if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
    2507           0 :                                         ok = LOG_ERR;
    2508           0 :                                 if (ni)
    2509           0 :                                         i = ni;
    2510             :                         }
    2511          65 :                         assert(i->tsorted);
    2512          65 :                         BUN icnt = BATcount(i);
    2513          65 :                         BATiter ii = bat_iterator(i);
    2514          65 :                         oid *o = ii.base, n = o[0]+1;
    2515          65 :                         size_t lcnt = 1;
    2516             : 
    2517          65 :                         lock_table(tr->store, t->base.id);
    2518          65 :                         segment *seg = s->segs->h;
    2519       22227 :                         for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
    2520       22162 :                                 if (o[i] == n) {
    2521       21807 :                                         lcnt++;
    2522       21807 :                                         n++;
    2523             :                                 } else {
    2524         355 :                                         ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2525         355 :                                         lcnt = 0;
    2526             :                                 }
    2527       22162 :                                 if (!lcnt) {
    2528         355 :                                         n = o[i]+1;
    2529         355 :                                         lcnt = 1;
    2530             :                                 }
    2531             :                         }
    2532          65 :                         bat_iterator_end(&ii);
    2533          65 :                         if (lcnt && ok == LOG_OK)
    2534          65 :                                 ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2535          65 :                         unlock_table(tr->store, t->base.id);
    2536             :                 }
    2537             :         }
    2538         302 :         if (i != oi)
    2539          25 :                 bat_destroy(i);
    2540             :         // assert
    2541         302 :         if (!in_transaction)
    2542         270 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2543             :         return ok;
    2544             : }
    2545             : 
    2546             : static void
    2547       49565 : destroy_segments(segments *s)
    2548             : {
    2549       49565 :         if (!s || sql_ref_dec(&s->r) > 0)
    2550           0 :                 return;
    2551       49565 :         segment *seg = s->h;
    2552      100326 :         while(seg) {
    2553       50761 :                 segment *n = ATOMIC_PTR_GET(&seg->next);
    2554       50761 :                 ATOMIC_PTR_DESTROY(&seg->next);
    2555       50761 :                 _DELETE(seg);
    2556       50761 :                 seg = n;
    2557             :         }
    2558       49565 :         _DELETE(s);
    2559             : }
    2560             : 
    2561             : static void
    2562       50962 : destroy_storage(storage *bat)
    2563             : {
    2564       50962 :         if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
    2565             :                 return;
    2566       49423 :         if (bat->next)
    2567        6587 :                 destroy_storage(bat->next);
    2568       49423 :         destroy_segments(bat->segs);
    2569       49423 :         if (bat->cs.uibid)
    2570       28863 :                 temp_destroy(bat->cs.uibid);
    2571       49423 :         if (bat->cs.uvbid)
    2572       28863 :                 temp_destroy(bat->cs.uvbid);
    2573       49423 :         if (bat->cs.bid)
    2574       49423 :                 temp_destroy(bat->cs.bid);
    2575       49423 :         bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
    2576       49423 :         _DELETE(bat);
    2577             : }
    2578             : 
    2579             : static int
    2580      171151 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
    2581             : {
    2582      171151 :         if (uncommitted) {
    2583      419396 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2584      268765 :                         if (!VALID_4_READ(s->ts,tr))
    2585             :                                 return 1;
    2586             :         } else {
    2587       55221 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2588       36121 :                         if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
    2589             :                                 return 1;
    2590             :         }
    2591             : 
    2592             :         return 0;
    2593             : }
    2594             : 
    2595             : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
    2596             : 
    2597             : storage *
    2598     2133702 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
    2599             : {
    2600     2133702 :         storage *obat;
    2601             : 
    2602     2133702 :         obat = ATOMIC_PTR_GET(&t->data);
    2603             : 
    2604     2133702 :         if (obat->cs.ts != tr->tid)
    2605     1457954 :                 if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
    2606     1457899 :                         if (obat->cs.ts >= TRANSACTION_ID_BASE) {
    2607             :                                 /* abort */
    2608       15392 :                                 if (clear)
    2609       15392 :                                         *clear = true;
    2610       15392 :                                 return NULL;
    2611             :                         }
    2612             : 
    2613     2118310 :         if (!clear)
    2614             :                 return obat;
    2615             : 
    2616             :         /* remainder is only to handle clear */
    2617       26384 :         if (segments_conflict(tr, obat->segs, 1)) {
    2618         417 :                 *clear = true;
    2619         417 :                 return NULL;
    2620             :         }
    2621       25966 :         if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
    2622             :                 return NULL;
    2623       25967 :         storage *bat = ZNEW(storage);
    2624       25972 :         if (!bat)
    2625             :                 return NULL;
    2626       25972 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2627       25972 :         if (dup_storage(tr, obat, bat) != LOG_OK) {
    2628           0 :                 destroy_storage(bat);
    2629           0 :                 return NULL;
    2630             :         }
    2631       25972 :         bat->cs.cleared = true;
    2632       25972 :         bat->cs.ts = tr->tid;
    2633             :         /* only one writer else abort */
    2634       25972 :         bat->next = obat;
    2635       25972 :         if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
    2636           9 :                 bat->next = NULL;
    2637           9 :                 destroy_storage(bat);
    2638           9 :                 if (clear)
    2639           9 :                         *clear = true;
    2640           9 :                 return NULL;
    2641             :         }
    2642             :         return bat;
    2643             : }
    2644             : 
    2645             : static int
    2646       64662 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
    2647             : {
    2648       64662 :         int ok = LOG_OK;
    2649       64662 :         BAT *b = ib;
    2650       64662 :         storage *bat;
    2651             : 
    2652       64662 :         if (isbat && !BATcount(b))
    2653             :                 return ok;
    2654             : 
    2655       64613 :         if (t == NULL)
    2656             :                 return LOG_ERR;
    2657             : 
    2658       64613 :         if ((bat = bind_del_data(tr, t, NULL)) == NULL)
    2659             :                 return LOG_ERR;
    2660             : 
    2661       64613 :         if (isbat)
    2662         302 :                 ok = storage_delete_bat(tr, t, bat, ib);
    2663             :         else
    2664       64311 :                 ok = storage_delete_val(tr, t, bat, *(oid*)ib);
    2665             :         return ok;
    2666             : }
    2667             : 
    2668             : static size_t
    2669           0 : dcount_col(sql_trans *tr, sql_column *c)
    2670             : {
    2671           0 :         sql_delta *b;
    2672             : 
    2673           0 :         if (!isTable(c->t))
    2674             :                 return 0;
    2675           0 :         b = col_timestamp_delta(tr, c);
    2676           0 :         if (!b)
    2677             :                 return 1;
    2678             : 
    2679           0 :         storage *s = ATOMIC_PTR_GET(&c->t->data);
    2680           0 :         if (!s || !s->segs->t)
    2681             :                 return 1;
    2682           0 :         size_t cnt = s->segs->t->end;
    2683           0 :         if (cnt) {
    2684           0 :                 BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
    2685           0 :                 size_t dcnt = 0;
    2686             : 
    2687           0 :                 if (v)
    2688           0 :                         dcnt = BATguess_uniques(v, NULL);
    2689           0 :                 return dcnt;
    2690             :         }
    2691             :         return cnt;
    2692             : }
    2693             : 
    2694             : static BAT *
    2695     3255267 : bind_no_view(BAT *b, bool quick)
    2696             : {
    2697     3255267 :         if (VIEWtparent(b)) { /* If it is a view get the parent BAT */
    2698     3251865 :                 BAT *nb = BBP_desc(VIEWtparent(b));
    2699     3251865 :                 bat_destroy(b);
    2700     3251869 :                 if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
    2701             :                         return NULL;
    2702             :         }
    2703             :         return b;
    2704             : }
    2705             : 
    2706             : static int
    2707           0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
    2708             : {
    2709           0 :         int ok = 0;
    2710           0 :         assert(tr->active);
    2711           0 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2712           0 :                 return 0;
    2713           0 :         lock_column(tr->store, c->base.id);
    2714           0 :         if (unique_est) {
    2715           0 :                 sql_delta *d;
    2716           0 :                 if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
    2717           0 :                         BAT *b;
    2718           0 :                         if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
    2719           0 :                                 MT_lock_set(&b->theaplock);
    2720           0 :                                 b->tunique_est = *unique_est;
    2721           0 :                                 MT_lock_unset(&b->theaplock);
    2722           0 :                                 bat_destroy(b);
    2723             :                         }
    2724             :                 }
    2725             :         }
    2726           0 :         if (min) {
    2727           0 :                 _DELETE(c->min);
    2728           0 :                 size_t minlen = ATOMlen(c->type.type->localtype, min);
    2729           0 :                 if ((c->min = GDKmalloc(minlen)) != NULL) {
    2730           0 :                         memcpy(c->min, min, minlen);
    2731           0 :                         ok = 1;
    2732             :                 }
    2733             :         }
    2734           0 :         if (max) {
    2735           0 :                 _DELETE(c->max);
    2736           0 :                 size_t maxlen = ATOMlen(c->type.type->localtype, max);
    2737           0 :                 if ((c->max = GDKmalloc(maxlen)) != NULL) {
    2738           0 :                         memcpy(c->max, max, maxlen);
    2739           0 :                         ok = 1;
    2740             :                 }
    2741             :         }
    2742           0 :         unlock_column(tr->store, c->base.id);
    2743           0 :         return ok;
    2744             : }
    2745             : 
    2746             : static int
    2747          19 : min_max_col(sql_trans *tr, sql_column *c)
    2748             : {
    2749          19 :         int ok = 0;
    2750          19 :         BAT *b = NULL;
    2751          19 :         sql_delta *d = NULL;
    2752             : 
    2753          19 :         assert(tr->active);
    2754          19 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2755           0 :                 return 0;
    2756          19 :         if (c->min && c->max)
    2757             :                 return 1;
    2758          19 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2759          19 :                 if (d->cs.st == ST_FOR)
    2760             :                         return 0;
    2761          19 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2762          19 :                 lock_column(tr->store, c->base.id);
    2763          19 :                 if (c->min && c->max) {
    2764           0 :                         unlock_column(tr->store, c->base.id);
    2765           0 :                         return 1;
    2766             :                 }
    2767          19 :                 _DELETE(c->min);
    2768          19 :                 _DELETE(c->max);
    2769          19 :                 if ((b = bind_col(tr, c, access))) {
    2770          19 :                         if (!(b = bind_no_view(b, false))) {
    2771           0 :                                 unlock_column(tr->store, c->base.id);
    2772           0 :                                 return 0;
    2773             :                         }
    2774          19 :                         BATiter bi = bat_iterator(b);
    2775          19 :                         if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
    2776          16 :                                 const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
    2777          16 :                                 size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
    2778             : 
    2779          16 :                                 if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
    2780           0 :                                         _DELETE(c->min);
    2781           0 :                                         _DELETE(c->max);
    2782             :                                 } else {
    2783          16 :                                         memcpy(c->min, nmin, minlen);
    2784          16 :                                         memcpy(c->max, nmax, maxlen);
    2785          16 :                                         ok = 1;
    2786             :                                 }
    2787             :                         }
    2788          19 :                         bat_iterator_end(&bi);
    2789          19 :                         bat_destroy(b);
    2790             :                 }
    2791          19 :                 unlock_column(tr->store, c->base.id);
    2792             :         }
    2793             :         return ok;
    2794             : }
    2795             : 
    2796             : static size_t
    2797          17 : count_segs(segment *s)
    2798             : {
    2799          17 :         size_t nr = 0;
    2800             : 
    2801          72 :         for( ; s; s = ATOMIC_PTR_GET(&s->next))
    2802          55 :                 nr++;
    2803          17 :         return nr;
    2804             : }
    2805             : 
    2806             : static size_t
    2807          34 : count_del(sql_trans *tr, sql_table *t, int access)
    2808             : {
    2809          34 :         storage *d;
    2810             : 
    2811          34 :         if (!isTable(t))
    2812             :                 return 0;
    2813          34 :         d = tab_timestamp_storage(tr, t);
    2814          34 :         if (!d)
    2815             :                 return 0;
    2816          34 :         if (access == 2)
    2817           0 :                 return d->cs.ucnt;
    2818          34 :         if (access == 1)
    2819           0 :                 return count_inserts(d->segs->h, tr);
    2820          34 :         if (access == 10) /* special case for counting the number of segments */
    2821          17 :                 return count_segs(d->segs->h);
    2822          17 :         return count_deletes(d->segs->h, tr);
    2823             : }
    2824             : 
    2825             : static int
    2826       19380 : sorted_col(sql_trans *tr, sql_column *col)
    2827             : {
    2828       19380 :         int sorted = 0;
    2829             : 
    2830       19380 :         assert(tr->active);
    2831       19380 :         if (!isTable(col->t) || !col->t->s)
    2832             :                 return 0;
    2833             : 
    2834       19380 :         if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
    2835       19361 :                 BAT *b = bind_col(tr, col, QUICK);
    2836             : 
    2837       19361 :                 if (b)
    2838       19361 :                         sorted = b->tsorted || b->trevsorted;
    2839             :         }
    2840             :         return sorted;
    2841             : }
    2842             : 
    2843             : static int
    2844        7468 : unique_col(sql_trans *tr, sql_column *col)
    2845             : {
    2846        7468 :         int distinct = 0;
    2847             : 
    2848        7468 :         assert(tr->active);
    2849        7468 :         if (!isTable(col->t) || !col->t->s)
    2850             :                 return 0;
    2851             : 
    2852        7468 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2853        7468 :                 BAT *b = bind_col(tr, col, QUICK);
    2854             : 
    2855        7468 :                 if (b)
    2856        7468 :                         distinct = b->tkey;
    2857             :         }
    2858             :         return distinct;
    2859             : }
    2860             : 
    2861             : static int
    2862        1849 : double_elim_col(sql_trans *tr, sql_column *col)
    2863             : {
    2864        1849 :         int de = 0;
    2865        1849 :         sql_delta *d;
    2866             : 
    2867        1849 :         assert(tr->active);
    2868        1849 :         if (!isTable(col->t) || !col->t->s)
    2869             :                 return 0;
    2870             : 
    2871        1849 :         if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
    2872           6 :                 if (d->cs.st == ST_DICT) {
    2873           6 :                         BAT *b = bind_col(tr, col, QUICK);
    2874           6 :                         if (b && b->ttype == TYPE_bte)
    2875             :                                 de = 1;
    2876           0 :                         else if (b && b->ttype == TYPE_sht)
    2877        1849 :                                 de = 2;
    2878             :                 }
    2879        1843 :         } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
    2880        1843 :                 BAT *b = bind_col(tr, col, QUICK);
    2881             : 
    2882        1843 :                 if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
    2883        1843 :                         de = GDK_ELIMDOUBLES(b->tvheap);
    2884        1843 :                         if (de)
    2885        1621 :                                 de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
    2886             :                 }
    2887        1621 :                 assert(de >= 0 && de <= 16);
    2888             :         }
    2889             :         return de;
    2890             : }
    2891             : 
    2892             : static int
    2893     3290926 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
    2894             : {
    2895     3290926 :         int ok = 0;
    2896     3290926 :         BAT *b = NULL, *off = NULL, *upv = NULL;
    2897     3290926 :         sql_delta *d = NULL;
    2898             : 
    2899     3290926 :         (void) tr;
    2900     3290926 :         assert(tr->active);
    2901     3290926 :         *nonil = false;
    2902     3290926 :         *unique = false;
    2903     3290926 :         *unique_est = 0.0;
    2904     3290926 :         if (!c || !isTable(c->t) || !c->t->s)
    2905             :                 return ok;
    2906             : 
    2907     3290688 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2908     3252263 :                 if (d->cs.st == ST_FOR) {
    2909          27 :                         *nonil = true; /* TODO for min/max. I will do it later */
    2910          27 :                         return ok;
    2911             :                 }
    2912     3252236 :                 int eclass = c->type.type->eclass;
    2913     3252236 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2914     3252236 :                 if ((b = bind_col(tr, c, access))) {
    2915     3252660 :                         if (!(b = bind_no_view(b, false)))
    2916           0 :                                 return ok;
    2917     3253111 :                         BATiter bi = bat_iterator(b);
    2918     3252838 :                         *nonil = bi.nonil && !bi.nil;
    2919             : 
    2920     3252838 :                         if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
    2921     3024495 :                                 d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
    2922     2053842 :                                 if (c->min && VALinit(min, bi.type, c->min))
    2923             :                                         ok |= 1;
    2924     2053788 :                                 else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
    2925     2045837 :                                         ok |= 1;
    2926     2053815 :                                 if (c->max && VALinit(max, bi.type, c->max))
    2927          54 :                                         ok |= 2;
    2928     2053772 :                                 else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
    2929     2038367 :                                         ok |= 2;
    2930             :                         }
    2931     3252772 :                         if (d->cs.ucnt == 0) {
    2932     3250208 :                                 if (d->cs.st == ST_DEFAULT) {
    2933     3249508 :                                         *unique = bi.key;
    2934     3249508 :                                         *unique_est = bi.unique_est;
    2935     3249508 :                                         if (*unique_est == 0)
    2936     1172283 :                                                 *unique_est = (double)BATguess_uniques(b,NULL);
    2937         700 :                                 } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
    2938             :                                         /* for dict, check the offsets bat for uniqueness */
    2939         700 :                                         MT_lock_set(&off->theaplock);
    2940         700 :                                         *unique = off->tkey;
    2941         700 :                                         *unique_est = off->tunique_est;
    2942         700 :                                         MT_lock_unset(&off->theaplock);
    2943             :                                 }
    2944             :                         }
    2945     3253454 :                         bat_iterator_end(&bi);
    2946     3252667 :                         bat_destroy(b);
    2947     3252808 :                         if (*nonil && d->cs.ucnt > 0) {
    2948             :                                 /* This could use a quick descriptor */
    2949        2078 :                                 if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
    2950           0 :                                         *nonil = false;
    2951             :                                 } else {
    2952        2078 :                                         MT_lock_set(&upv->theaplock);
    2953        2078 :                                         *nonil &= upv->tnonil && !upv->tnil;
    2954        2078 :                                         MT_lock_unset(&upv->theaplock);
    2955        2078 :                                         bat_destroy(upv);
    2956             :                                 }
    2957             :                         }
    2958             :                 }
    2959             :         }
    2960             :         return ok;
    2961             : }
    2962             : 
    2963             : static int
    2964         233 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
    2965             : {
    2966         233 :         assert(tr->active);
    2967         233 :         if (!isTable(col->t) || !col->t->s)
    2968             :                 return LOG_OK;
    2969             : 
    2970         228 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2971         228 :                 BAT *b = bind_col(tr, col, QUICK);
    2972             : 
    2973         228 :                 if (b) { /* add props for ranges [min, max> */
    2974         228 :                         MT_lock_set(&b->theaplock);
    2975         228 :                         if (add_range) {
    2976         155 :                                 BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
    2977         155 :                                 if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
    2978          95 :                                         BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
    2979             :                                 else
    2980          60 :                                         BATrmprop_nolock(b, GDK_MAX_BOUND);
    2981         155 :                                 if (!pt->with_nills || !col->null)
    2982         105 :                                         BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    2983             :                         } else {
    2984          73 :                                 BATrmprop_nolock(b, GDK_MIN_BOUND);
    2985          73 :                                 BATrmprop_nolock(b, GDK_MAX_BOUND);
    2986          73 :                                 BATrmprop_nolock(b, GDK_NOT_NULL);
    2987             :                         }
    2988         228 :                         MT_lock_unset(&b->theaplock);
    2989             :                 }
    2990             :         }
    2991             :         return LOG_OK;
    2992             : }
    2993             : 
    2994             : static int
    2995        3492 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
    2996             : {
    2997        3492 :         assert(tr->active);
    2998        3492 :         if (!isTable(col->t) || !col->t->s)
    2999             :                 return LOG_OK;
    3000             : 
    3001        3474 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    3002        3474 :                 BAT *b = bind_col(tr, col, QUICK);
    3003             : 
    3004        3474 :                 if (b) { /* add props for ranges [min, max> */
    3005        3474 :                         if (not_null) {
    3006        3472 :                                 BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    3007             :                         } else {
    3008           2 :                                 BATrmprop(b, GDK_NOT_NULL);
    3009             :                         }
    3010             :                 }
    3011             :         }
    3012             :         return LOG_OK;
    3013             : }
    3014             : 
    3015             : static int
    3016       22819 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
    3017             : {
    3018       22819 :         sqlstore *store = tr->store;
    3019       22819 :         int bid = log_find_bat(store->logger, id);
    3020       22819 :         if (bid <= 0)
    3021             :                 return LOG_ERR;
    3022       22819 :         cs->bid = temp_dup(bid);
    3023       22819 :         cs->ucnt = 0;
    3024       22819 :         cs->uibid = e_bat(TYPE_oid);
    3025       22819 :         cs->uvbid = e_bat(type);
    3026       22819 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    3027             :                 return LOG_ERR;
    3028             :         return LOG_OK;
    3029             : }
    3030             : 
    3031             : static int
    3032       68467 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
    3033             : {
    3034       68467 :         int res = LOG_OK;
    3035       68467 :         gdk_return ok;
    3036       68467 :         BAT *b = temp_descriptor(bat->cs.bid);
    3037             : 
    3038       68467 :         if (b == NULL)
    3039             :                 return LOG_ERR;
    3040             : 
    3041       68467 :         if (!bat->cs.uibid)
    3042       68459 :                 bat->cs.uibid = e_bat(TYPE_oid);
    3043       68467 :         if (!bat->cs.uvbid)
    3044       68459 :                 bat->cs.uvbid = e_bat(b->ttype);
    3045       68467 :         if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
    3046           0 :                 res = LOG_ERR;
    3047       68467 :         if (GDKinmemory(0)) {
    3048         178 :                 bat_destroy(b);
    3049         178 :                 return res;
    3050             :         }
    3051             : 
    3052       68289 :         bat_set_access(b, BAT_READ);
    3053       68289 :         sqlstore *store = tr->store;
    3054       68289 :         ok = log_bat_persists(store->logger, b, id);
    3055       68289 :         bat_destroy(b);
    3056       68289 :         if(res != LOG_OK)
    3057             :                 return res;
    3058       68289 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3059             : }
    3060             : 
    3061             : static int
    3062           0 : new_persistent_delta( sql_delta *bat)
    3063             : {
    3064           0 :         bat->cs.ucnt = 0;
    3065           0 :         return LOG_OK;
    3066             : }
    3067             : 
    3068             : static void
    3069      128617 : create_delta( sql_delta *d, BAT *b)
    3070             : {
    3071      128617 :         bat_set_access(b, BAT_READ);
    3072      128617 :         d->cs.bid = temp_create(b);
    3073      128617 :         d->cs.uibid = d->cs.uvbid = 0;
    3074      128617 :         d->cs.ucnt = 0;
    3075      128617 : }
    3076             : 
    3077             : static bat
    3078        7277 : copyBat (bat i, int type, oid seq)
    3079             : {
    3080        7277 :         BAT *b, *tb;
    3081        7277 :         bat res;
    3082             : 
    3083        7277 :         if (!i)
    3084             :                 return i;
    3085        7277 :         tb = quick_descriptor(i);
    3086        7277 :         if (tb == NULL)
    3087             :                 return 0;
    3088        7277 :         b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
    3089        7277 :         if (b == NULL)
    3090             :                 return 0;
    3091             : 
    3092        7277 :         bat_set_access(b, BAT_READ);
    3093             : 
    3094        7277 :         res = temp_create(b);
    3095        7277 :         bat_destroy(b);
    3096        7277 :         return res;
    3097             : }
    3098             : 
    3099             : static int
    3100      146106 : create_col(sql_trans *tr, sql_column *c)
    3101             : {
    3102      146106 :         int ok = LOG_OK, new = 0;
    3103      146106 :         int type = c->type.type->localtype;
    3104      146106 :         sql_delta *bat = ATOMIC_PTR_GET(&c->data);
    3105             : 
    3106      146106 :         if (!bat) {
    3107      146106 :                 new = 1;
    3108      146106 :                 bat = ZNEW(sql_delta);
    3109      146106 :                 if (!bat)
    3110             :                         return LOG_ERR;
    3111      146106 :                 ATOMIC_PTR_SET(&c->data, bat);
    3112      146106 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3113             :         }
    3114             : 
    3115      146106 :         if (new)
    3116      146106 :                 bat->cs.ts = tr->tid;
    3117             : 
    3118      146106 :         if (!isNew(c)&& !isTempTable(c->t)){
    3119       17467 :                 bat->cs.ts = tr->ts;
    3120       17467 :                 ok = load_cs(tr, &bat->cs, type, c->base.id);
    3121       17467 :                 if (ok == LOG_OK && c->storage_type) {
    3122           4 :                         if (strcmp(c->storage_type, "DICT") == 0) {
    3123           2 :                                 sqlstore *store = tr->store;
    3124           2 :                                 int bid = log_find_bat(store->logger, -c->base.id);
    3125           2 :                                 if (bid <= 0)
    3126             :                                         return LOG_ERR;
    3127           2 :                                 bat->cs.ebid = temp_dup(bid);
    3128           2 :                                 bat->cs.st = ST_DICT;
    3129           2 :                         } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
    3130           2 :                                 bat->cs.st = ST_FOR;
    3131             :                         }
    3132             :                 }
    3133       17467 :                 return ok;
    3134      128639 :         } else if (bat && bat->cs.bid) {
    3135           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
    3136             :         } else {
    3137      128639 :                 sql_column *fc = NULL;
    3138      128639 :                 size_t cnt = 0;
    3139             : 
    3140             :                 /* alter ? */
    3141      128639 :                 if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
    3142       80049 :                         storage *s = tab_timestamp_storage(tr, fc->t);
    3143       80049 :                         if (s == NULL)
    3144             :                                 return LOG_ERR;
    3145       80049 :                         cnt = segs_end(s->segs, tr, c->t);
    3146             :                 }
    3147      128639 :                 if (cnt && fc != c) {
    3148          22 :                         sql_delta *d = ATOMIC_PTR_GET(&fc->data);
    3149             : 
    3150          22 :                         if (d->cs.bid) {
    3151          22 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3152          22 :                                 if(bat->cs.bid == BID_NIL)
    3153          22 :                                         ok = LOG_ERR;
    3154             :                         }
    3155          22 :                         if (d->cs.uibid) {
    3156          10 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3157          10 :                                 if (bat->cs.uibid == BID_NIL)
    3158          22 :                                         ok = LOG_ERR;
    3159             :                         }
    3160          22 :                         if (d->cs.uvbid) {
    3161          10 :                                 bat->cs.uvbid = e_bat(type);
    3162          10 :                                 if(bat->cs.uvbid == BID_NIL)
    3163           0 :                                         ok = LOG_ERR;
    3164             :                         }
    3165             :                 } else {
    3166      128617 :                         BAT *b = bat_new(type, c->t->sz, PERSISTENT);
    3167      128617 :                         if (!b) {
    3168             :                                 ok = LOG_ERR;
    3169             :                         } else {
    3170      128617 :                                 create_delta(ATOMIC_PTR_GET(&c->data), b);
    3171      128617 :                                 bat_destroy(b);
    3172             :                         }
    3173             : 
    3174      128617 :                         if (!new) {
    3175           0 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3176           0 :                                 if (bat->cs.uibid == BID_NIL)
    3177           0 :                                         ok = LOG_ERR;
    3178           0 :                                 bat->cs.uvbid = e_bat(type);
    3179           0 :                                 if(bat->cs.uvbid == BID_NIL)
    3180           0 :                                         ok = LOG_ERR;
    3181             :                         }
    3182             :                 }
    3183      128639 :                 bat->cs.ucnt = 0;
    3184             : 
    3185      128639 :                 if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
    3186          91 :                         trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
    3187             :         }
    3188             :         return ok;
    3189             : }
    3190             : 
    3191             : static int
    3192       62115 : log_create_col_(sql_trans *tr, sql_column *c)
    3193             : {
    3194       62115 :         assert(!isTempTable(c->t));
    3195       62115 :         return log_create_delta(tr,  ATOMIC_PTR_GET(&c->data), c->base.id);
    3196             : }
    3197             : 
    3198             : static int
    3199          87 : log_create_col(sql_trans *tr, sql_change *change)
    3200             : {
    3201          87 :         return log_create_col_(tr, (sql_column*)change->obj);
    3202             : }
    3203             : 
    3204             : static int
    3205      116279 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
    3206             : {
    3207      116279 :         (void) t; // TODO transaction_layer_revamp: remove if unnecessary
    3208      116279 :         (void)oldest;
    3209      116279 :         assert(delta->cs.ts == tr->tid);
    3210      116279 :         delta->cs.ts = commit_ts;
    3211             : 
    3212      116279 :         assert(delta->next == NULL);
    3213      116279 :         if (!delta->cs.merged)
    3214      116278 :                 delta->nr_updates += merge_delta(delta);
    3215      116279 :         if (!tr->parent)
    3216      116275 :                 base->new = 0;
    3217      116279 :         return LOG_OK;
    3218             : }
    3219             : 
    3220             : static int
    3221          91 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3222             : {
    3223          91 :         sql_column *c = (sql_column*)change->obj;
    3224          91 :         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3225          91 :         if (!tr->parent)
    3226          90 :                 c->base.new = 0;
    3227          91 :         return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
    3228             : }
    3229             : 
    3230             : /* will be called for new idx's and when new index columns are created */
    3231             : static int
    3232        8944 : create_idx(sql_trans *tr, sql_idx *ni)
    3233             : {
    3234        8944 :         int ok = LOG_OK, new = 0;
    3235        8944 :         sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
    3236        8944 :         int type = TYPE_lng;
    3237             : 
    3238        8944 :         if (oid_index(ni->type))
    3239         938 :                 type = TYPE_oid;
    3240             : 
    3241        8944 :         if (!bat) {
    3242        8944 :                 new = 1;
    3243        8944 :                 bat = ZNEW(sql_delta);
    3244        8944 :                 if (!bat)
    3245             :                         return LOG_ERR;
    3246        8944 :                 ATOMIC_PTR_INIT(&ni->data, bat);
    3247        8944 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3248             :         }
    3249             : 
    3250        8944 :         if (new)
    3251        8944 :                 bat->cs.ts = tr->tid;
    3252             : 
    3253        8944 :         if (!isNew(ni) && !isTempTable(ni->t)){
    3254        1689 :                 bat->cs.ts = 1;
    3255        1689 :                 return load_cs(tr, &bat->cs, type, ni->base.id);
    3256        7255 :         } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
    3257           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
    3258             :         } else {
    3259        7255 :                 sql_column *c = ol_first_node(ni->t->columns)->data;
    3260        7255 :                 sql_delta *d = col_timestamp_delta(tr, c);
    3261             : 
    3262        7255 :                 if (d) {
    3263             :                         /* Here we also handle indices created through alter stmts */
    3264             :                         /* These need to be created aligned to the existing data */
    3265        7255 :                         if (d->cs.bid) {
    3266        7255 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3267        7255 :                                 if(bat->cs.bid == BID_NIL)
    3268        7255 :                                         ok = LOG_ERR;
    3269             :                         }
    3270             :                 } else {
    3271             :                         return LOG_ERR;
    3272             :                 }
    3273             : 
    3274        7255 :                 bat->cs.ucnt = 0;
    3275             : 
    3276        7255 :                 if (!new) {
    3277           0 :                         bat->cs.uibid = e_bat(TYPE_oid);
    3278           0 :                         if (bat->cs.uibid == BID_NIL)
    3279           0 :                                 ok = LOG_ERR;
    3280           0 :                         bat->cs.uvbid = e_bat(type);
    3281           0 :                         if(bat->cs.uvbid == BID_NIL)
    3282           0 :                                 ok = LOG_ERR;
    3283             :                 }
    3284        7255 :                 bat->cs.ucnt = 0;
    3285        7255 :                 if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
    3286         631 :                         trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
    3287             :         }
    3288             :         return ok;
    3289             : }
    3290             : 
    3291             : static int
    3292        6352 : log_create_idx_(sql_trans *tr, sql_idx *i)
    3293             : {
    3294        6352 :         assert(!isTempTable(i->t));
    3295        6352 :         return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3296             : }
    3297             : 
    3298             : static int
    3299         617 : log_create_idx(sql_trans *tr, sql_change *change)
    3300             : {
    3301         617 :         return log_create_idx_(tr, (sql_idx*)change->obj);
    3302             : }
    3303             : 
    3304             : static int
    3305         631 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3306             : {
    3307         631 :         sql_idx *i = (sql_idx*)change->obj;
    3308         631 :         sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3309         631 :         if (!tr->parent)
    3310         631 :                 i->base.new = 0;
    3311         631 :         return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
    3312             :         return LOG_OK;
    3313             : }
    3314             : 
    3315             : static int
    3316        3663 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
    3317             : {
    3318        3663 :         int ok = load_cs(tr, &s->cs, TYPE_msk, id);
    3319        3663 :         BAT *b = NULL, *ib = NULL;
    3320             : 
    3321        3663 :         if (ok != LOG_OK)
    3322             :                 return ok;
    3323        3663 :         if (!(b = temp_descriptor(s->cs.bid)))
    3324             :                 return LOG_ERR;
    3325        3663 :         ib = b;
    3326             : 
    3327        3663 :         if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
    3328           0 :                 bat_destroy(ib);
    3329           0 :                 return LOG_ERR;
    3330             :         }
    3331             : 
    3332        3663 :         if (BATcount(b)) {
    3333         198 :                 if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
    3334           0 :                         bat_destroy(ib);
    3335           0 :                         return LOG_ERR;
    3336             :                 }
    3337         327 :                 if (BATtdense(b)) {
    3338         129 :                         size_t start = b->tseqbase;
    3339         129 :                         size_t cnt = BATcount(b);
    3340         129 :                         ok = delete_range(tr, t, s, start, cnt);
    3341             :                 } else {
    3342          69 :                         assert(b->tsorted);
    3343          69 :                         BUN icnt = BATcount(b);
    3344          69 :                         BATiter bi = bat_iterator(b);
    3345          69 :                         size_t lcnt = 1;
    3346          69 :                         oid n;
    3347          69 :                         segment *seg = s->segs->h;
    3348          69 :                         if (complex_cand(b)) {
    3349           0 :                                 oid o = * (oid *) Tpos(&bi, 0);
    3350           0 :                                 n = o + 1;
    3351           0 :                                 for (BUN i = 1; i < icnt; i++) {
    3352           0 :                                         o = * (oid *) Tpos(&bi, i);
    3353           0 :                                         if (o == n) {
    3354           0 :                                                 lcnt++;
    3355           0 :                                                 n++;
    3356             :                                         } else {
    3357           0 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3358             :                                                         break;
    3359             :                                                 lcnt = 0;
    3360             :                                         }
    3361           0 :                                         if (!lcnt) {
    3362           0 :                                                 n = o + 1;
    3363           0 :                                                 lcnt = 1;
    3364             :                                         }
    3365             :                                 }
    3366             :                         } else {
    3367          69 :                                 oid *o = bi.base;
    3368          69 :                                 n = o[0]+1;
    3369      133331 :                                 for (size_t i=1; i<icnt; i++) {
    3370      133262 :                                         if (o[i] == n) {
    3371      132218 :                                                 lcnt++;
    3372      132218 :                                                 n++;
    3373             :                                         } else {
    3374        1044 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3375             :                                                         break;
    3376             :                                                 lcnt = 0;
    3377             :                                         }
    3378      133262 :                                         if (!lcnt) {
    3379        1044 :                                                 n = o[i]+1;
    3380        1044 :                                                 lcnt = 1;
    3381             :                                         }
    3382             :                                 }
    3383             :                         }
    3384          69 :                         if (lcnt && ok == LOG_OK)
    3385          69 :                                 ok = delete_range(tr, t, s, n-lcnt, lcnt);
    3386          69 :                         bat_iterator_end(&bi);
    3387             :                 }
    3388         198 :                 if (ok == LOG_OK)
    3389        2775 :                         for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
    3390        2577 :                                 if (seg->ts == tr->tid)
    3391        1342 :                                         seg->ts = 1;
    3392             :         } else {
    3393        3465 :                 if (ok == LOG_OK) {
    3394        3465 :                         BAT *bb = quick_descriptor(s->cs.bid);
    3395             : 
    3396        3465 :                         if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
    3397             :                                 ok = LOG_ERR;
    3398             :                         } else {
    3399        3465 :                                 segment *seg = s->segs->h;
    3400        3465 :                                 if (seg->ts == tr->tid)
    3401        3465 :                                         seg->ts = 1;
    3402             :                         }
    3403             :                 }
    3404             :         }
    3405        3663 :         if (b != ib)
    3406        3663 :                 bat_destroy(b);
    3407        3663 :         bat_destroy(ib);
    3408             : 
    3409        3663 :         return ok;
    3410             : }
    3411             : 
    3412             : static int
    3413       24325 : create_del(sql_trans *tr, sql_table *t)
    3414             : {
    3415       24325 :         int ok = LOG_OK, new = 0;
    3416       24325 :         BAT *b;
    3417       24325 :         storage *bat = ATOMIC_PTR_GET(&t->data);
    3418             : 
    3419       24325 :         if (!bat) {
    3420       24325 :                 new = 1;
    3421       24325 :                 bat = ZNEW(storage);
    3422       24325 :                 if(!bat)
    3423             :                         return LOG_ERR;
    3424       24325 :                 ATOMIC_PTR_INIT(&t->data, bat);
    3425       24325 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3426       24325 :                 bat->cs.ts = tr->tid;
    3427             :         }
    3428             : 
    3429       24325 :         if (!isNew(t) && !isTempTable(t)) {
    3430        3663 :                 bat->cs.ts = tr->ts;
    3431        3663 :                 return load_storage(tr, t, bat, t->base.id);
    3432       20662 :         } else if (bat->cs.bid) {
    3433             :                 return ok;
    3434             :         } else {
    3435       20662 :                 assert(!bat->segs);
    3436       20662 :                 if (!(bat->segs = new_segments(tr, 0)))
    3437             :                         return LOG_ERR;
    3438             : 
    3439       20662 :                 b = bat_new(TYPE_msk, t->sz, PERSISTENT);
    3440       20662 :                 if(b != NULL) {
    3441       20662 :                         bat_set_access(b, BAT_READ);
    3442       20662 :                         bat->cs.bid = temp_create(b);
    3443       20662 :                         bat_destroy(b);
    3444             :                 } else {
    3445             :                         return LOG_ERR;
    3446             :                 }
    3447       20662 :                 if (new)
    3448       27012 :                         trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
    3449             :         }
    3450             :         return ok;
    3451             : }
    3452             : 
    3453             : static int
    3454      185529 : log_segment(sql_trans *tr, segment *s, sqlid id)
    3455             : {
    3456      185529 :         sqlstore *store = tr->store;
    3457      185529 :         msk m = s->deleted;
    3458      185529 :         return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
    3459             : }
    3460             : 
    3461             : static int
    3462       96958 : log_segments(sql_trans *tr, segments *segs, sqlid id)
    3463             : {
    3464             :         /* log segments */
    3465       96958 :         lock_table(tr->store, id);
    3466      474416 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    3467      377458 :                 unlock_table(tr->store, id);
    3468      377458 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    3469      139461 :                         if (log_segment(tr, seg, id) != LOG_OK) {
    3470             :                                 return LOG_ERR;
    3471             :                         }
    3472             :                 }
    3473      377458 :                 lock_table(tr->store, id);
    3474             :         }
    3475       96958 :         unlock_table(tr->store, id);
    3476       96958 :         return LOG_OK;
    3477             : }
    3478             : 
    3479             : static int
    3480       12548 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
    3481             : {
    3482       12548 :         BAT *b;
    3483       12548 :         int ok = LOG_OK;
    3484             : 
    3485       12548 :         if (GDKinmemory(0))
    3486             :                 return LOG_OK;
    3487             : 
    3488       12515 :         b = temp_descriptor(bat->cs.bid);
    3489       12515 :         if (b == NULL)
    3490             :                 return LOG_ERR;
    3491             : 
    3492       12515 :         sqlstore *store = tr->store;
    3493       12515 :         bat_set_access(b, BAT_READ);
    3494       12515 :         if (ok == LOG_OK)
    3495       12515 :                 ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
    3496       12515 :         if (ok == LOG_OK)
    3497       12515 :                 ok = log_segments(tr, bat->segs, t->base.id);
    3498       12515 :         bat_destroy(b);
    3499       12515 :         return ok;
    3500             : }
    3501             : 
    3502             : static int
    3503       12562 : log_create_del(sql_trans *tr, sql_change *change)
    3504             : {
    3505       12562 :         int ok = LOG_OK;
    3506       12562 :         sql_table *t = (sql_table*)change->obj;
    3507             : 
    3508       12562 :         if (t->base.deleted)
    3509             :                 return ok;
    3510       12548 :         assert(!isTempTable(t));
    3511       12548 :         ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
    3512       12548 :         if (ok == LOG_OK) {
    3513       74576 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3514       62028 :                         sql_column *c = n->data;
    3515             : 
    3516       62028 :                         ok = log_create_col_(tr, c);
    3517             :                 }
    3518       12548 :                 if (t->idxs) {
    3519       18295 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3520        5747 :                                 sql_idx *i = n->data;
    3521             : 
    3522        5747 :                                 if (ATOMIC_PTR_GET(&i->data))
    3523        5735 :                                         ok = log_create_idx_(tr, i);
    3524             :                         }
    3525             :                 }
    3526             :         }
    3527             :         return ok;
    3528             : }
    3529             : 
    3530             : static int
    3531       20664 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3532             : {
    3533       20664 :         int ok = LOG_OK;
    3534       20664 :         sql_table *t = (sql_table*)change->obj;
    3535       20664 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    3536             : 
    3537       20664 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    3538         120 :                 assert(isTempTable(t));
    3539         120 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    3540         120 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    3541         120 :                 return ok;
    3542             :         }
    3543             : 
    3544       20544 :         if (!commit_ts) /* rollback handled by ? */
    3545             :                 return ok;
    3546       18721 :         ok = segments2cs(tr, dbat->segs, &dbat->cs);
    3547       18721 :         assert(ok == LOG_OK);
    3548       18721 :         if (ok != LOG_OK)
    3549             :                 return ok;
    3550       18721 :         merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
    3551       18721 :         assert(dbat->cs.ts == tr->tid);
    3552       18721 :         dbat->cs.ts = commit_ts;
    3553       18721 :         if (ok == LOG_OK) {
    3554      128530 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3555      109809 :                         sql_column *c = n->data;
    3556      109809 :                         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3557             : 
    3558      109809 :                         ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
    3559             :                 }
    3560       18721 :                 if (t->idxs) {
    3561       24481 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3562        5760 :                                 sql_idx *i = n->data;
    3563        5760 :                                 sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3564             : 
    3565        5760 :                                 if (delta)
    3566        5748 :                                         ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
    3567             :                         }
    3568             :                 }
    3569       18721 :                 if (!tr->parent)
    3570       18719 :                         t->base.new = 0;
    3571             :         }
    3572       18721 :         if (!tr->parent)
    3573       18719 :                 t->base.new = 0;
    3574             :         return ok;
    3575             : }
    3576             : 
    3577             : static int
    3578       19376 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
    3579             : {
    3580       19376 :         gdk_return ok = GDK_SUCCEED;
    3581             : 
    3582       19376 :         sqlstore *store = tr->store;
    3583       19376 :         if (!GDKinmemory(0) && b && b->cs.bid)
    3584       18811 :                 ok = log_bat_transient(store->logger, id);
    3585       19376 :         if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
    3586          25 :                 ok = log_bat_transient(store->logger, -id);
    3587       19376 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3588             : }
    3589             : 
    3590             : static int
    3591      159517 : destroy_col(sqlstore *store, sql_column *c)
    3592             : {
    3593      159517 :         (void)store;
    3594      159517 :         if (ATOMIC_PTR_GET(&c->data))
    3595      159517 :                 destroy_delta(ATOMIC_PTR_GET(&c->data), true);
    3596      159517 :         ATOMIC_PTR_SET(&c->data, NULL);
    3597      159517 :         return LOG_OK;
    3598             : }
    3599             : 
    3600             : static int
    3601       17509 : log_destroy_col_(sql_trans *tr, sql_column *c)
    3602             : {
    3603       17509 :         int ok = LOG_OK;
    3604       17509 :         assert(!isTempTable(c->t));
    3605       17509 :         if (!tr->parent) /* don't write save point commits */
    3606       17509 :                 ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
    3607       17509 :         return ok;
    3608             : }
    3609             : 
    3610             : static int
    3611       17509 : log_destroy_col(sql_trans *tr, sql_change *change)
    3612             : {
    3613       17509 :         sql_column *c = (sql_column*)change->obj;
    3614       17509 :         int res = log_destroy_col_(tr, c);
    3615       17509 :         change->obj = NULL;
    3616       17509 :         column_destroy(tr->store, c);
    3617       17509 :         return res;
    3618             : }
    3619             : 
    3620             : static int
    3621        9939 : destroy_idx(sqlstore *store, sql_idx *i)
    3622             : {
    3623        9939 :         (void)store;
    3624        9939 :         if (ATOMIC_PTR_GET(&i->data))
    3625        9939 :                 destroy_delta(ATOMIC_PTR_GET(&i->data), true);
    3626        9939 :         ATOMIC_PTR_SET(&i->data, NULL);
    3627        9939 :         return LOG_OK;
    3628             : }
    3629             : 
    3630             : static int
    3631        1943 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
    3632             : {
    3633        1943 :         int ok = LOG_OK;
    3634        1943 :         assert(!isTempTable(i->t));
    3635        1943 :         if (ATOMIC_PTR_GET(&i->data)) {
    3636        1867 :                 if (!tr->parent) /* don't write save point commits */
    3637        1867 :                         ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3638             :         }
    3639        1943 :         return ok;
    3640             : }
    3641             : 
    3642             : static int
    3643        1943 : log_destroy_idx(sql_trans *tr, sql_change *change)
    3644             : {
    3645        1943 :         sql_idx *i = (sql_idx*)change->obj;
    3646        1943 :         int res = log_destroy_idx_(tr, i);
    3647        1943 :         change->obj = NULL;
    3648        1943 :         idx_destroy(tr->store, i);
    3649        1943 :         return res;
    3650             : }
    3651             : 
    3652             : static int
    3653       24994 : destroy_del(sqlstore *store, sql_table *t)
    3654             : {
    3655       24994 :         (void)store;
    3656       24994 :         if (ATOMIC_PTR_GET(&t->data))
    3657       24990 :                 destroy_storage(ATOMIC_PTR_GET(&t->data));
    3658       24994 :         ATOMIC_PTR_SET(&t->data, NULL);
    3659       24994 :         return LOG_OK;
    3660             : }
    3661             : 
    3662             : static int
    3663        3256 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
    3664             : {
    3665        3256 :         gdk_return ok = GDK_SUCCEED;
    3666             : 
    3667        3256 :         sqlstore *store = tr->store;
    3668        3256 :         if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
    3669        3256 :             bat && bat->cs.bid)
    3670        3256 :                 ok = log_bat_transient(store->logger, id);
    3671        3256 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3672             : }
    3673             : 
    3674             : static int
    3675        3256 : log_destroy_del(sql_trans *tr, sql_change *change)
    3676             : {
    3677        3256 :         int ok = LOG_OK;
    3678        3256 :         sql_table *t = (sql_table*)change->obj;
    3679             : 
    3680        3256 :         assert(!isTempTable(t));
    3681        3256 :         ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
    3682        3256 :         return ok;
    3683             : }
    3684             : 
    3685             : static int
    3686       22796 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3687             : {
    3688       22796 :         (void)tr;
    3689       22796 :         (void)change;
    3690       22796 :         (void)commit_ts;
    3691       22796 :         (void)oldest;
    3692       22796 :         if (commit_ts)
    3693       22773 :                 change->handled = true;
    3694       22796 :         return 0;
    3695             : }
    3696             : 
    3697             : static int
    3698        3327 : drop_del(sql_trans *tr, sql_table *t)
    3699             : {
    3700        3327 :         int ok = LOG_OK;
    3701             : 
    3702        3327 :         if (!isNew(t)) {
    3703        3327 :                 storage *bat = ATOMIC_PTR_GET(&t->data);
    3704        3392 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, isTempTable(t) ? NULL : &log_destroy_del);
    3705             :         }
    3706        3327 :         return ok;
    3707             : }
    3708             : 
    3709             : static int
    3710       17524 : drop_col(sql_trans *tr, sql_column *c)
    3711             : {
    3712       17524 :         assert(!isNew(c));
    3713       17524 :         sql_delta *d = ATOMIC_PTR_GET(&c->data);
    3714       17524 :         trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, isTempTable(c->t) ? NULL : &log_destroy_col);
    3715       17524 :         return LOG_OK;
    3716             : }
    3717             : 
    3718             : static int
    3719        1945 : drop_idx(sql_trans *tr, sql_idx *i)
    3720             : {
    3721        1945 :         assert(!isNew(i));
    3722        1945 :         sql_delta *d = ATOMIC_PTR_GET(&i->data);
    3723        1945 :         trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, isTempTable(i->t) ? NULL : &log_destroy_idx);
    3724        1945 :         return LOG_OK;
    3725             : }
    3726             : 
    3727             : 
    3728             : static BUN
    3729      129457 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
    3730             : {
    3731      129457 :         BAT *b;
    3732      129457 :         BUN sz = 0;
    3733             : 
    3734      129457 :         (void)tr;
    3735      129457 :         assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
    3736      129457 :         if (cs->bid && renew) {
    3737      129500 :                 b = quick_descriptor(cs->bid);
    3738      129463 :                 if (b) {
    3739      129463 :                         sz += BATcount(b);
    3740      129463 :                         if (cs->st == ST_DICT) {
    3741           2 :                                 bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
    3742           2 :                                 BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
    3743             : 
    3744           2 :                                 if (nebid == BID_NIL || !n) {
    3745           0 :                                         temp_destroy(nebid);
    3746           0 :                                         bat_destroy(n);
    3747           0 :                                         return BUN_NONE;
    3748             :                                 }
    3749           2 :                                 temp_destroy(cs->ebid);
    3750           2 :                                 cs->ebid = nebid;
    3751           2 :                                 if (!temp)
    3752           2 :                                         bat_set_access(n, BAT_READ);
    3753           2 :                                 temp_destroy(cs->bid);
    3754           2 :                                 cs->bid = temp_create(n); /* create empty copy */
    3755           2 :                                 bat_destroy(n);
    3756             :                         } else {
    3757      129461 :                                 bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
    3758             : 
    3759      129380 :                                 if (nbid == BID_NIL)
    3760             :                                         return BUN_NONE;
    3761      129380 :                                 temp_destroy(cs->bid);
    3762      129423 :                                 cs->bid = nbid;
    3763             :                         }
    3764             :                 } else {
    3765             :                         return BUN_NONE;
    3766             :                 }
    3767             :         }
    3768      129382 :         if (cs->uibid) {
    3769      129239 :                 temp_destroy(cs->uibid);
    3770      129385 :                 cs->uibid = 0;
    3771             :         }
    3772      129528 :         if (cs->uvbid) {
    3773      129384 :                 temp_destroy(cs->uvbid);
    3774      129386 :                 cs->uvbid = 0;
    3775             :         }
    3776      129530 :         cs->cleared = true;
    3777      129530 :         cs->ucnt = 0;
    3778      129530 :         return sz;
    3779             : }
    3780             : 
    3781             : static BUN
    3782      129342 : clear_col(sql_trans *tr, sql_column *c, bool renew)
    3783             : {
    3784      129342 :         bool update_conflict = false;
    3785      129342 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    3786             : 
    3787      129342 :         if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
    3788           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3789      129377 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    3790      129377 :         if (odelta != delta)
    3791      129382 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    3792      129359 :         if (delta)
    3793      129359 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
    3794             :         return 0;
    3795             : }
    3796             : 
    3797             : static BUN
    3798          21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
    3799             : {
    3800          21 :         bool update_conflict = false;
    3801          21 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    3802             : 
    3803          21 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    3804          15 :                 return 0;
    3805           6 :         if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
    3806           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3807           6 :         assert(i->t->persistence != SQL_DECLARED_TABLE);
    3808           6 :         if (odelta != delta)
    3809           6 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    3810           6 :         if (delta)
    3811           6 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
    3812             :         return 0;
    3813             : }
    3814             : 
    3815             : static int
    3816         142 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
    3817             : {
    3818         142 :         if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
    3819             :                 return LOG_ERR;
    3820         142 :         if (s->segs)
    3821         142 :                 destroy_segments(s->segs);
    3822         142 :         if (!(s->segs = new_segments(tr, 0)))
    3823             :                 return LOG_ERR;
    3824             :         return LOG_OK;
    3825             : }
    3826             : 
    3827             : 
    3828             : /*
    3829             :  * Clear the table, in general this means replacing the storage,
    3830             :  * but in case of earlier deletes (or inserts by this transaction), we only mark
    3831             :  * all segments as deleted.
    3832             :  * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
    3833             :  */
    3834             : static BUN
    3835       41831 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
    3836             : {
    3837       41831 :         int clear = !in_transaction, ok = LOG_OK;
    3838       41831 :         bool conflict = false;
    3839       41831 :         storage *bat;
    3840             : 
    3841       41882 :         if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
    3842       15818 :                 return conflict?BUN_NONE-1:BUN_NONE;
    3843             : 
    3844       26014 :         if (!clear) {
    3845          51 :                 lock_table(tr->store, t->base.id);
    3846          51 :                 ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
    3847          51 :                 unlock_table(tr->store, t->base.id);
    3848             :         }
    3849       26014 :         assert(t->persistence != SQL_DECLARED_TABLE);
    3850       26014 :         if (!in_transaction)
    3851       25966 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    3852       26009 :         if (ok == LOG_ERR)
    3853             :                 return BUN_NONE;
    3854       26009 :         if (ok == LOG_CONFLICT)
    3855           0 :                 return BUN_NONE - 1;
    3856             :         return LOG_OK;
    3857             : }
    3858             : 
    3859             : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
    3860             : static BUN
    3861       41799 : clear_table(sql_trans *tr, sql_table *t)
    3862             : {
    3863       41799 :         node *n = ol_first_node(t->columns);
    3864       41799 :         sql_column *c = n->data;
    3865       41799 :         storage *d = tab_timestamp_storage(tr, t);
    3866       41831 :         int in_transaction, clear;
    3867       41831 :         BUN sz, clear_ok;
    3868             : 
    3869       41831 :         if (!d)
    3870             :                 return BUN_NONE;
    3871       41831 :         in_transaction = segments_in_transaction(tr, t);
    3872       41827 :         clear = !in_transaction;
    3873       41827 :         sz = count_col(tr, c, CNT_ACTIVE);
    3874       41832 :         if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
    3875             :                 return clear_ok;
    3876             : 
    3877       26011 :         if (in_transaction)
    3878             :                 return sz;
    3879             : 
    3880      155339 :         for (; n; n = n->next) {
    3881      129376 :                 c = n->data;
    3882             : 
    3883      129376 :                 if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
    3884           0 :                         return clear_ok;
    3885             :         }
    3886       25963 :         if (t->idxs) {
    3887       25984 :                 for (n = ol_first_node(t->idxs); n; n = n->next) {
    3888          21 :                         sql_idx *ci = n->data;
    3889             : 
    3890          21 :                         if (isTable(ci->t) && idx_has_column(ci->type) &&
    3891          21 :                                 (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
    3892           0 :                                 return clear_ok;
    3893             :                 }
    3894             :         }
    3895       25963 :         if (clear)
    3896       25963 :                 d->segs->nr_reused = 0;
    3897       25963 :         return sz;
    3898             : }
    3899             : 
    3900             : static int
    3901      158697 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
    3902             : {
    3903      158697 :         sqlstore *store = tr->store;
    3904      158697 :         gdk_return ok = GDK_SUCCEED;
    3905             : 
    3906      158697 :         (void) t;
    3907      158697 :         (void) segs;
    3908      158697 :         if (GDKinmemory(0))
    3909             :                 return LOG_OK;
    3910             : 
    3911      158690 :         if (cs->cleared) {
    3912      155400 :                 assert(cs->ucnt == 0);
    3913      155400 :                 BAT *ins = temp_descriptor(cs->bid);
    3914      155400 :                 if (!ins)
    3915             :                         return LOG_ERR;
    3916      155400 :                 assert(!isEbat(ins));
    3917      155400 :                 bat_set_access(ins, BAT_READ);
    3918      155400 :                 ok = log_bat_persists(store->logger, ins, id);
    3919      155400 :                 bat_destroy(ins);
    3920      155400 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    3921          56 :                         BAT *ins = temp_descriptor(cs->ebid);
    3922          56 :                         if (!ins)
    3923             :                                 return LOG_ERR;
    3924          56 :                         assert(!isEbat(ins));
    3925          56 :                         bat_set_access(ins, BAT_READ);
    3926          56 :                         ok = log_bat_persists(store->logger, ins, -id);
    3927          56 :                         bat_destroy(ins);
    3928             :                 }
    3929      155400 :                 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3930             :         }
    3931             : 
    3932        3290 :         assert(!isTempTable(t));
    3933             : 
    3934        3290 :         if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
    3935        2747 :                 BAT *ui = temp_descriptor(cs->uibid);
    3936        2747 :                 BAT *uv = temp_descriptor(cs->uvbid);
    3937             :                 /* any updates */
    3938        2747 :                 if (ui == NULL || uv == NULL) {
    3939             :                         ok = GDK_FAIL;
    3940        2747 :                 } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
    3941        2747 :                         ok = log_delta(store->logger, ui, uv, id);
    3942        2747 :                 bat_destroy(ui);
    3943        2747 :                 bat_destroy(uv);
    3944             :         }
    3945        2747 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3946             : }
    3947             : 
    3948             : static inline int
    3949       58486 : tr_log_table_start(sql_trans *tr, sql_table *t) {
    3950       58486 :         sqlstore *store = tr->store;
    3951       58486 :         return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3952             : }
    3953             : 
    3954             : static inline int
    3955       58486 : tr_log_table_end(sql_trans *tr, sql_table *t) {
    3956       58486 :         sqlstore *store = tr->store;
    3957       58486 :         return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3958             : }
    3959             : 
    3960             : static int
    3961       58486 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
    3962             : {
    3963       58486 :         sqlstore *store = tr->store;
    3964       58486 :         gdk_return ok = GDK_SUCCEED;
    3965             : 
    3966       58486 :         size_t end = segs_end(segs, tr, t);
    3967             : 
    3968       58486 :         if (tr_log_table_start(tr, t) != LOG_OK)
    3969             :                 return LOG_ERR;
    3970             : 
    3971       58486 :         size_t nr_appends = 0;
    3972             : 
    3973       58486 :         lock_table(tr->store, t->base.id);
    3974      397664 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    3975      339178 :                 unlock_table(tr->store, t->base.id);
    3976             : 
    3977      339178 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    3978      108835 :                         if (!seg->deleted) {
    3979       46068 :                                 if (log_segment(tr, seg, t->base.id) != LOG_OK)
    3980             :                                         return LOG_ERR;
    3981             : 
    3982       46068 :                                 nr_appends += (seg->end - seg->start);
    3983             :                         }
    3984             :                 }
    3985      339178 :                 lock_table(tr->store, t->base.id);
    3986             :         }
    3987       58486 :         unlock_table(tr->store, t->base.id);
    3988             : 
    3989      406107 :         for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
    3990      347621 :                 sql_column *c = n->data;
    3991      347621 :                 column_storage *cs = ATOMIC_PTR_GET(&c->data);
    3992             : 
    3993      347621 :                 if (cs->cleared) {
    3994           3 :                         ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    3995           3 :                         continue;
    3996             :                 }
    3997             : 
    3998      347618 :                 lock_table(tr->store, t->base.id);
    3999      347618 :                 if (!cs->cleared) {
    4000     2326848 :                         for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    4001     1979230 :                                 unlock_table(tr->store, t->base.id);
    4002     1979230 :                                 if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4003             :                                         /* append col*/
    4004      259800 :                                         BAT *ins = temp_descriptor(cs->bid);
    4005      259800 :                                         if (ins == NULL)
    4006             :                                                 return LOG_ERR;
    4007      259800 :                                         assert(BATcount(ins) >= cur->end);
    4008      259800 :                                         ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
    4009      259800 :                                         bat_destroy(ins);
    4010             :                                 }
    4011     1979230 :                                 lock_table(tr->store, t->base.id);
    4012             :                         }
    4013             :                 }
    4014      347618 :                 unlock_table(tr->store, t->base.id);
    4015             : 
    4016      347618 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    4017          19 :                         BAT *ins = temp_descriptor(cs->ebid);
    4018          19 :                         if (ins == NULL)
    4019             :                                 return LOG_ERR;
    4020          19 :                         if (BATcount(ins) > ins->batInserted)
    4021          17 :                                 ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
    4022          19 :                         BATcommit(ins, BATcount(ins));
    4023          19 :                         bat_destroy(ins);
    4024             :                 }
    4025             :         }
    4026             : 
    4027       58486 :         if (t->idxs) {
    4028       63668 :                 for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
    4029        5182 :                         sql_idx *i = n->data;
    4030             : 
    4031        5182 :                         if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    4032        4404 :                                 continue;
    4033         778 :                         column_storage *cs = ATOMIC_PTR_GET(&i->data);
    4034             : 
    4035         778 :                         if (cs) {
    4036         778 :                                 if (cs->cleared) {
    4037           0 :                                         ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    4038           0 :                                         continue;
    4039             :                                 }
    4040             : 
    4041         778 :                                 lock_table(tr->store, t->base.id);
    4042        2387 :                                 for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    4043        1609 :                                         unlock_table(tr->store, t->base.id);
    4044        1609 :                                         if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4045             :                                                 /* append idx */
    4046         730 :                                                 BAT *ins = temp_descriptor(cs->bid);
    4047         730 :                                                 if (ins == NULL)
    4048             :                                                         return LOG_ERR;
    4049         730 :                                                 assert(BATcount(ins) >= cur->end);
    4050         730 :                                                 ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
    4051         730 :                                                 bat_destroy(ins);
    4052             :                                         }
    4053        1609 :                                         lock_table(tr->store, t->base.id);
    4054             :                                 }
    4055         778 :                                 unlock_table(tr->store, t->base.id);
    4056             :                         }
    4057             :                 }
    4058             :         }
    4059             : 
    4060       58486 :         if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
    4061           0 :                 return LOG_ERR;
    4062             : 
    4063             :         return LOG_OK;
    4064             : }
    4065             : 
    4066             : static int
    4067       84443 : log_storage(sql_trans *tr, sql_table *t, storage *s)
    4068             : {
    4069       84443 :         int ok = LOG_OK;
    4070       84443 :         bool cleared = s->cs.cleared;
    4071       84443 :         if (ok == LOG_OK && cleared)
    4072       25957 :                 ok =  tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
    4073       25957 :         if (ok == LOG_OK)
    4074       84443 :                 ok = log_segments(tr, s->segs, t->base.id);
    4075       84443 :         if (ok == LOG_OK && !cleared)
    4076       58486 :                 ok = log_table_append(tr, t, s->segs);
    4077       84443 :         return ok;
    4078             : }
    4079             : 
    4080             : static void
    4081      418846 : merge_cs( column_storage *cs, const char* caller)
    4082             : {
    4083      418846 :         if (cs->bid && cs->ucnt) {
    4084        2756 :                 BAT *cur = temp_descriptor(cs->bid);
    4085        2756 :                 BAT *ui = temp_descriptor(cs->uibid);
    4086        2756 :                 BAT *uv = temp_descriptor(cs->uvbid);
    4087             : 
    4088        2756 :                 if (!cur || !ui || !uv) {
    4089           0 :                         bat_destroy(ui);
    4090           0 :                         bat_destroy(uv);
    4091           0 :                         bat_destroy(cur);
    4092           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4093             :                         return;
    4094             :                 }
    4095        2756 :                 assert(BATcount(ui) == BATcount(uv));
    4096             : 
    4097             :                 /* any updates */
    4098        2756 :                 assert(!isEbat(cur));
    4099        2756 :                 if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
    4100           0 :                         bat_destroy(ui);
    4101           0 :                         bat_destroy(uv);
    4102           0 :                         bat_destroy(cur);
    4103           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4104             :                         return;
    4105             :                 }
    4106             :                 /* cleanup the old deltas */
    4107        2756 :                 temp_destroy(cs->uibid);
    4108        2756 :                 temp_destroy(cs->uvbid);
    4109        2756 :                 cs->uibid = e_bat(TYPE_oid);
    4110        2756 :                 cs->uvbid = e_bat(cur->ttype);
    4111        2756 :                 assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
    4112        2756 :                 cs->ucnt = 0;
    4113        2756 :                 bat_destroy(ui);
    4114        2756 :                 bat_destroy(uv);
    4115        2756 :                 bat_destroy(cur);
    4116             :         }
    4117      418846 :         cs->cleared = false;
    4118      418846 :         cs->merged = true;
    4119      418846 :         return;
    4120             : }
    4121             : 
    4122             : static lng
    4123      377091 : merge_delta( sql_delta *obat)
    4124             : {
    4125      377091 :         lng res = 0;
    4126      377091 :         if (obat && obat->next && !obat->cs.merged)
    4127      132747 :                 res += merge_delta(obat->next);
    4128      377091 :         res += obat->cs.ucnt;
    4129      377091 :         merge_cs(&obat->cs, __func__);
    4130      377091 :         return res;
    4131             : }
    4132             : 
    4133             : static void
    4134       41755 : merge_storage(storage *tdb)
    4135             : {
    4136       41755 :         merge_cs(&tdb->cs, __func__);
    4137             : 
    4138       41755 :         if (tdb->next) {
    4139         278 :                 destroy_storage(tdb->next);
    4140         278 :                 tdb->next = NULL;
    4141             :         }
    4142       41755 : }
    4143             : 
    4144             : static sql_delta *
    4145           1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
    4146             : {
    4147             :         /* commit ie copy back to the parent transaction */
    4148           1 :         if (delta && delta->cs.ts == commit_ts && delta->next) {
    4149           1 :                 sql_delta *od = delta->next;
    4150           1 :                 if (od->cs.ts == commit_ts) {
    4151           0 :                         sql_delta t = *od, *n = od->next;
    4152           0 :                         *od = *delta;
    4153           0 :                         od->next = n;
    4154           0 :                         *delta = t;
    4155           0 :                         delta->next = NULL;
    4156           0 :                         destroy_delta(delta, true);
    4157           0 :                         return od;
    4158             :                 }
    4159             :         }
    4160             :         return delta;
    4161             : }
    4162             : 
    4163             : static int
    4164      132711 : log_update_col( sql_trans *tr, sql_change *change)
    4165             : {
    4166      132711 :         sql_column *c = (sql_column*)change->obj;
    4167      132711 :         assert(!isTempTable(c->t));
    4168             : 
    4169      132711 :         if (isDeleted(c->t)) {
    4170           0 :                 change->handled = true;
    4171           0 :                 return LOG_OK;
    4172             :         }
    4173             : 
    4174      132711 :         if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
    4175      132711 :                 storage *s = ATOMIC_PTR_GET(&c->t->data);
    4176      132711 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    4177      132711 :                 return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
    4178             :         }
    4179             :         return LOG_OK;
    4180             : }
    4181             : 
    4182             : static int
    4183         217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
    4184             : {
    4185         217 :         sqlstore *store = Store;
    4186             : 
    4187         217 :         sql_delta *d = (sql_delta*)change->data;
    4188         217 :         if (d->cs.ts < oldest) {
    4189          82 :                 destroy_delta(d, false);
    4190          82 :                 if (change->commit == &commit_update_idx)
    4191           2 :                         table_destroy(store, ((sql_idx*)change->obj)->t);
    4192             :                 else
    4193          80 :                         table_destroy(store, ((sql_column*)change->obj)->t);
    4194          82 :                 return 1;
    4195             :         }
    4196         135 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4197          82 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4198             :         return 0;
    4199             : }
    4200             : 
    4201             : static int
    4202           9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
    4203             : {
    4204           9 :         sqlstore *store = Store;
    4205             : 
    4206           9 :         storage *d = (storage*)change->data;
    4207           9 :         if (d->cs.ts < oldest) {
    4208           3 :                 destroy_storage(d);
    4209           3 :                 table_destroy(store, (sql_table*)change->obj);
    4210           3 :                 return 1;
    4211             :         }
    4212           6 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4213           3 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4214             :         return 0;
    4215             : }
    4216             : 
    4217             : static int
    4218      132829 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
    4219             : {
    4220      132829 :         (void) type; // TODO transaction_layer_revamp remove if remains unused
    4221             : 
    4222      132829 :         sql_delta *delta = ATOMIC_PTR_GET(data), *idelta = delta;
    4223             : 
    4224      132829 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4225           3 :                 int ok = LOG_OK;
    4226           3 :                 assert(isTempTable(t));
    4227           3 :                 if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
    4228           0 :                         ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
    4229           3 :                 if (!tr->parent)
    4230           0 :                         t->base.new = base->new = 0;
    4231           3 :                 change->handled = true;
    4232           3 :                 return ok;
    4233             :         }
    4234             : 
    4235      132826 :         if (commit_ts)
    4236      132744 :                 delta->cs.ts = commit_ts;
    4237      132826 :         if (!commit_ts) { /* rollback */
    4238          82 :                 sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
    4239             : 
    4240          82 :                 if (change->ts && t->base.new) /* handled by create col */
    4241             :                         return LOG_OK;
    4242          82 :                 if (o != d) {
    4243           0 :                         while(o && o->next != d)
    4244             :                                 o = o->next;
    4245             :                 }
    4246          82 :                 if (o == ATOMIC_PTR_GET(data))
    4247          82 :                         ATOMIC_PTR_SET(data, d->next);
    4248             :                 else
    4249           0 :                         o->next = d->next;
    4250          82 :                 d->next = NULL;
    4251          82 :                 change->cleanup = &tc_gc_rollbacked;
    4252      132744 :         } else if (!tr->parent) {
    4253             :                 /* merge deltas */
    4254      274662 :                 while (delta && delta->cs.ts > oldest)
    4255      141919 :                         delta = delta->next;
    4256      132743 :                 if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
    4257       27678 :                         lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
    4258       27678 :                         idelta->nr_updates += merge_delta(delta);
    4259       27678 :                         unlock_column(tr->store, base->id);
    4260             :                 }
    4261           1 :         } else if (tr->parent) /* move delta into older and cleanup current save points */
    4262           1 :                 ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
    4263             :         return LOG_OK;
    4264             : }
    4265             : 
    4266             : static int
    4267      132801 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4268             : {
    4269             : 
    4270      132801 :         sql_column *c = (sql_column*)change->obj;
    4271      132801 :         sql_base* base = &c->base;
    4272      132801 :         sql_table* t = c->t;
    4273      132801 :         ATOMIC_PTR_TYPE* data = &c->data;
    4274      132801 :         int type = c->type.type->localtype;
    4275             : 
    4276      132801 :         if (change->handled || isDeleted(c->t))
    4277             :                 return LOG_OK;
    4278             : 
    4279      132801 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4280             : }
    4281             : 
    4282             : static int
    4283          26 : log_update_idx( sql_trans *tr, sql_change *change)
    4284             : {
    4285          26 :         sql_idx *i = (sql_idx*)change->obj;
    4286          26 :         assert(!isTempTable(i->t));
    4287             : 
    4288          26 :         if (isDeleted(i->t)) {
    4289           0 :                 change->handled = true;
    4290           0 :                 return LOG_OK;
    4291             :         }
    4292             : 
    4293          26 :         if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
    4294          26 :                 storage *s = ATOMIC_PTR_GET(&i->t->data);
    4295          26 :                 sql_delta *d = ATOMIC_PTR_GET(&i->data);
    4296          26 :                 return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
    4297             :         }
    4298             :         return LOG_OK;
    4299             : }
    4300             : 
    4301             : static int
    4302          28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4303             : {
    4304          28 :         sql_idx *i = (sql_idx*)change->obj;
    4305          28 :         sql_base* base = &i->base;
    4306          28 :         sql_table* t = i->t;
    4307          28 :         ATOMIC_PTR_TYPE* data = &i->data;
    4308          28 :         int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
    4309             : 
    4310          28 :         if (change->handled || isDeleted(i->t))
    4311             :                 return LOG_OK;
    4312             : 
    4313          28 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4314             : }
    4315             : 
    4316             : static storage *
    4317          26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
    4318             : {
    4319          26 :         if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
    4320           0 :                 storage *od = dbat->next;
    4321           0 :                 if (od->cs.ts == commit_ts) {
    4322           0 :                         storage t = *od, *n = od->next;
    4323           0 :                         *od = *dbat;
    4324           0 :                         od->next = n;
    4325           0 :                         *dbat = t;
    4326           0 :                         dbat->next = NULL;
    4327           0 :                         destroy_storage(dbat);
    4328           0 :                         return od;
    4329             :                 }
    4330             :         }
    4331             :         return dbat;
    4332             : }
    4333             : 
    4334             : static int
    4335       84443 : log_update_del( sql_trans *tr, sql_change *change)
    4336             : {
    4337       84443 :         sql_table *t = (sql_table*)change->obj;
    4338       84443 :         assert(!isTempTable(t));
    4339             : 
    4340       84443 :         if (isDeleted(t)) {
    4341           0 :                 change->handled = true;
    4342           0 :                 return LOG_OK;
    4343             :         }
    4344             : 
    4345       84443 :         if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
    4346       84443 :                 return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
    4347             :         return LOG_OK;
    4348             : }
    4349             : 
    4350             : static int
    4351       89077 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4352             : {
    4353       89077 :         int ok = LOG_OK;
    4354       89077 :         sql_table *t = (sql_table*)change->obj;
    4355       89077 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    4356             : 
    4357       89077 :         if (change->handled || isDeleted(t))
    4358             :                 return ok;
    4359             : 
    4360       89077 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4361          22 :                 assert(isTempTable(t));
    4362          22 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    4363          22 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    4364          22 :                 change->handled = true;
    4365          22 :                 return ok;
    4366             :         }
    4367             : 
    4368       89055 :         lock_table(tr->store, t->base.id);
    4369       89055 :         if (!commit_ts) { /* rollback */
    4370        4312 :                 if (dbat->cs.ts == tr->tid) {
    4371           6 :                         if (change->ts && t->base.new) { /* handled by the create table */
    4372           3 :                                 unlock_table(tr->store, t->base.id);
    4373           3 :                                 return ok;
    4374             :                         }
    4375           3 :                         storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
    4376             : 
    4377           3 :                         if (o != d) {
    4378           0 :                                 while(o && o->next != d)
    4379             :                                         o = o->next;
    4380             :                         }
    4381           3 :                         if (o == ATOMIC_PTR_GET(&t->data)) {
    4382           3 :                                 assert(d->next);
    4383           3 :                                 ATOMIC_PTR_SET(&t->data, d->next);
    4384             :                         } else
    4385           0 :                                 o->next = d->next;
    4386           3 :                         d->next = NULL;
    4387           3 :                         change->cleanup = &tc_gc_rollbacked_storage;
    4388             :                 } else
    4389        4306 :                         rollback_segments(dbat->segs, tr, change, oldest);
    4390       84743 :         } else if (ok == LOG_OK && !tr->parent) {
    4391       84717 :                 if (dbat->cs.ts == tr->tid) /* cleared table */
    4392       25959 :                         dbat->cs.ts = commit_ts;
    4393             : 
    4394       84717 :                 ok = segments2cs(tr, dbat->segs, &dbat->cs);
    4395       84717 :                 if (ok == LOG_OK) {
    4396       84717 :                         merge_segments(dbat, tr, change, commit_ts, oldest);
    4397       84717 :                         if (oldest == commit_ts)
    4398       41755 :                                 merge_storage(dbat);
    4399             :                 }
    4400       84717 :                 if (dbat)
    4401       84717 :                         dbat->cs.cleared = false;
    4402          26 :         } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
    4403          26 :                 merge_segments(dbat, tr, change, commit_ts, oldest);
    4404          26 :                 ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
    4405          26 :                 storage *s = change->data;
    4406          26 :                 if (s->cs.ts == tr->tid)
    4407           0 :                         s->cs.ts = commit_ts;
    4408             :         }
    4409       89052 :         unlock_table(tr->store, t->base.id);
    4410       89052 :         return ok;
    4411             : }
    4412             : 
    4413             : /* only rollback (content version) case for now */
    4414             : static int
    4415       17681 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
    4416             : {
    4417       17681 :         sqlstore *store = Store;
    4418       17681 :         sql_column *c = (sql_column*)change->obj;
    4419             : 
    4420       17681 :         if (!c) /* cleaned earlier */
    4421             :                 return 1;
    4422             : 
    4423         172 :         if (change->handled || isDeleted(c->t)) {
    4424           0 :                 column_destroy(store, c);
    4425           0 :                 return 1;
    4426             :         }
    4427             : 
    4428             :         /* savepoint commit (did it merge ?) */
    4429         172 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4430           0 :                 column_destroy(store, c);
    4431           0 :                 return 1;
    4432             :         }
    4433         172 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4434             :                 return 0;
    4435         171 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4436         171 :         if (d && d->next) {
    4437             : 
    4438          66 :                 if (d->cs.ts > oldest)
    4439             :                         return LOG_OK; /* cannot cleanup yet */
    4440             : 
    4441             :                 // d is oldest reachable delta
    4442          63 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4443          62 :                         destroy_delta(d->next, true);
    4444          62 :                         d->next = NULL;
    4445             :                 }
    4446          63 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4447          63 :                 id->nr_updates += merge_delta(d);
    4448          63 :                 unlock_column(store, c->base.id);
    4449             :         }
    4450         168 :         column_destroy(store, c);
    4451         168 :         return 1;
    4452             : }
    4453             : 
    4454             : static int
    4455      790309 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
    4456             : {
    4457      790309 :         sqlstore *store = Store;
    4458      790309 :         sql_column *c = (sql_column*)change->obj;
    4459             : 
    4460      790309 :         if (!c) /* cleaned earlier */
    4461             :                 return 1;
    4462             : 
    4463      790309 :         if (change->handled || isDeleted(c->t)) {
    4464           3 :                 table_destroy(store, c->t);
    4465           3 :                 return 1;
    4466             :         }
    4467             : 
    4468             :         /* savepoint commit (did it merge ?) */
    4469      790306 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4470       32355 :                 table_destroy(store, c->t);
    4471       32355 :                 return 1;
    4472             :         }
    4473      757951 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4474             :                 return 0;
    4475      757950 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4476      757950 :         if (d && d->next) {
    4477             : 
    4478      757950 :                 if (d->cs.ts > oldest)
    4479             :                         return LOG_OK; /* cannot cleanup yet */
    4480             : 
    4481             :                 // d is oldest reachable delta
    4482      100299 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4483        4242 :                         destroy_delta(d->next, true);
    4484        4242 :                         d->next = NULL;
    4485             :                 }
    4486      100299 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4487      100299 :                 id->nr_updates += merge_delta(d);
    4488      100299 :                 unlock_column(store, c->base.id);
    4489             :         }
    4490      100299 :         table_destroy(store, c->t);
    4491      100299 :         return 1;
    4492             : }
    4493             : 
    4494             : static int
    4495        2576 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
    4496             : {
    4497        2576 :         sqlstore *store = Store;
    4498        2576 :         sql_idx *i = (sql_idx*)change->obj;
    4499             : 
    4500        2576 :         if (!i) /* cleaned earlier */
    4501             :                 return 1;
    4502             : 
    4503         633 :         if (change->handled || isDeleted(i->t)) {
    4504           0 :                 idx_destroy(store, i);
    4505           0 :                 return 1;
    4506             :         }
    4507             : 
    4508             :         /* savepoint commit (did it merge ?) */
    4509         633 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4510           0 :                 idx_destroy(store, i);
    4511           0 :                 return 1;
    4512             :         }
    4513         633 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4514             :                 return 0;
    4515         633 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4516         633 :         if (d && d->next) {
    4517           0 :                 if (d->cs.ts > oldest)
    4518             :                         return LOG_OK; /* cannot cleanup yet */
    4519             : 
    4520             :                 // d is oldest reachable delta
    4521           0 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4522           0 :                         destroy_delta(d->next, true);
    4523           0 :                         d->next = NULL;
    4524             :                 }
    4525           0 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4526           0 :                 id->nr_updates += merge_delta(d);
    4527           0 :                 unlock_column(store, i->base.id);
    4528             :         }
    4529         633 :         idx_destroy(store, i);
    4530         633 :         return 1;
    4531             : }
    4532             : 
    4533             : static int
    4534          26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
    4535             : {
    4536          26 :         sqlstore *store = Store;
    4537          26 :         sql_idx *i = (sql_idx*)change->obj;
    4538             : 
    4539          26 :         if (!i) /* cleaned earlier */
    4540             :                 return 1;
    4541             : 
    4542          26 :         if (change->handled || isDeleted(i->t)) {
    4543           0 :                 table_destroy(store, i->t);
    4544           0 :                 return 1;
    4545             :         }
    4546             : 
    4547             :         /* savepoint commit (did it merge ?) */
    4548          26 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4549           0 :                 table_destroy(store, i->t);
    4550           0 :                 return 1;
    4551             :         }
    4552          26 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4553             :                 return 0;
    4554          26 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4555          26 :         if (d && d->next) {
    4556          26 :                 if (d->cs.ts > oldest)
    4557             :                         return LOG_OK; /* cannot cleanup yet */
    4558             : 
    4559             :                 // d is oldest reachable delta
    4560          26 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4561          26 :                         destroy_delta(d->next, true);
    4562          26 :                         d->next = NULL;
    4563             :                 }
    4564          26 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4565          26 :                 id->nr_updates += merge_delta(d);
    4566          26 :                 unlock_column(store, i->base.id);
    4567             :         }
    4568          26 :         table_destroy(store, i->t);
    4569          26 :         return 1;
    4570             : }
    4571             : 
    4572             : static int
    4573      231174 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
    4574             : {
    4575      231174 :         sqlstore *store = Store;
    4576      231174 :         sql_table *t = (sql_table*)change->obj;
    4577             : 
    4578      231174 :         if (change->handled || isDeleted(t)) {
    4579        3501 :                 table_destroy(store, t);
    4580        3501 :                 return 1;
    4581             :         }
    4582             :         /* savepoint commit (did it merge ?) */
    4583      227673 :         if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
    4584        6587 :                 table_destroy(store, t);
    4585        6587 :                 return 1;
    4586             :         }
    4587      221086 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4588             :                 return 0;
    4589      221058 :         storage *d = (storage*)change->data;
    4590      221058 :         if (d->next) {
    4591      150028 :                 if (d->cs.ts > oldest)
    4592             :                         return LOG_OK; /* cannot cleanup yet */
    4593             : 
    4594       19095 :                 destroy_storage(d->next);
    4595       19095 :                 d->next = NULL;
    4596             :         }
    4597       90125 :         table_destroy(store, t);
    4598       90125 :         return 1;
    4599             : }
    4600             : 
    4601             : static int
    4602       30467 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
    4603             : {
    4604       30467 :         if (nr == 0)
    4605             :                 return LOG_OK;
    4606       30467 :         assert (nr > 0);
    4607       30467 :         if ((!offsets || !*offsets) && nr == total) {
    4608       30471 :                 *offset = slot;
    4609       30471 :                 return LOG_OK;
    4610             :         }
    4611           0 :         if (!*offsets) {
    4612           7 :                 *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
    4613           7 :                 if (!*offsets)
    4614             :                         return LOG_ERR;
    4615             :         }
    4616           0 :         oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
    4617       16135 :         for(size_t i = 0; i < nr; i++)
    4618       16139 :                 dst[i] = slot + i;
    4619           0 :         (*offsets)->batCount += nr;
    4620           0 :         (*offsets)->theap->dirty = true;
    4621           0 :         return LOG_OK;
    4622             : }
    4623             : 
    4624             : static int
    4625       30336 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4626             : {
    4627       30336 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4628       30593 :         assert(s->segs);
    4629       30593 :         ulng oldest = store_oldest(tr->store, NULL);
    4630       30395 :         BUN slot = 0;
    4631       30395 :         size_t total = cnt;
    4632             : 
    4633       30395 :         if (!locked)
    4634       30404 :                 lock_table(tr->store, t->base.id);
    4635             :         /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
    4636       61224 :         for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4637       30694 :                 if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* reuse old deleted or rolled back append */
    4638          35 :                         if ((seg->end - seg->start) >= cnt) {
    4639             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4640          13 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4641           2 :                                         slot = p->end;
    4642           2 :                                         p->end += cnt;
    4643           2 :                                         seg->start += cnt;
    4644           2 :                                         if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
    4645             :                                                 ok = LOG_ERR;
    4646             :                                                 break;
    4647             :                                         }
    4648           2 :                                         s->segs->nr_reused += cnt;
    4649           2 :                                         cnt = 0;
    4650           2 :                                         break;
    4651             :                                 }
    4652             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4653          11 :                                 size_t rcnt = seg->end - seg->start;
    4654          11 :                                 if (rcnt > cnt)
    4655             :                                         rcnt = cnt;
    4656          11 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
    4657             :                                         ok = LOG_ERR;
    4658             :                                         break;
    4659             :                                 }
    4660             :                         }
    4661          33 :                         seg->ts = tr->tid;
    4662          33 :                         seg->deleted = false;
    4663          33 :                         slot = seg->start;
    4664          33 :                         if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
    4665             :                                 ok = LOG_ERR;
    4666             :                                 break;
    4667             :                         }
    4668          10 :                         s->segs->nr_reused += (seg->end - seg->start);
    4669          10 :                         cnt -= (seg->end - seg->start);
    4670             :                 }
    4671             :         }
    4672       30532 :         if (ok == LOG_OK && cnt) {
    4673       30519 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4674       29410 :                         slot = s->segs->t->end;
    4675       29410 :                         s->segs->t->end += cnt;
    4676             :                 } else {
    4677        1109 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4678             :                                 ok = LOG_ERR;
    4679             :                         } else {
    4680        1150 :                                 if (!s->segs->h)
    4681           0 :                                         s->segs->h = s->segs->t;
    4682        1150 :                                 slot = s->segs->t->start;
    4683             :                         }
    4684             :                 }
    4685       30560 :                 if (ok == LOG_OK)
    4686       30560 :                         ok = add_offsets(slot, cnt, total, offset, offsets);
    4687             :         }
    4688       30479 :         if (!locked)
    4689       30426 :                 unlock_table(tr->store, t->base.id);
    4690             : 
    4691       30644 :         if (ok == LOG_OK) {
    4692             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4693       30598 :                 if (!in_transaction) {
    4694        1135 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4695        1135 :                         in_transaction = true;
    4696             :                 }
    4697       30598 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4698       30586 :                         tr->logchanges += (lng) total;
    4699       30598 :                 if (*offsets) {
    4700          15 :                         BAT *pos = *offsets;
    4701          15 :                         assert(BATcount(pos) == total);
    4702          15 :                         BATsetcount(pos, total); /* set other properties */
    4703           7 :                         pos->tnil = false;
    4704           7 :                         pos->tnonil = true;
    4705           7 :                         pos->tkey = true;
    4706           7 :                         pos->tsorted = true;
    4707           7 :                         pos->trevsorted = false;
    4708             :                 }
    4709             :         }
    4710       30636 :         return ok;
    4711             : }
    4712             : 
    4713             : static int
    4714     2007044 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4715             : {
    4716     2007044 :         if (cnt > 1 && offsets)
    4717       30336 :                 return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
    4718     1976708 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4719     1976743 :         assert(s->segs);
    4720     1976743 :         ulng oldest = store_oldest(tr->store, NULL);
    4721     1976712 :         BUN slot = 0;
    4722     1976712 :         int reused = 0;
    4723             : 
    4724     1976712 :         if (!locked)
    4725     1852051 :                 lock_table(tr->store, t->base.id);
    4726             :         /* naive vacuum approach, iterator through segments, check for large enough deleted segments
    4727             :          * or create new segment at the end */
    4728     5469595 :         for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4729     3539076 :                 if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* reuse old deleted or rolled back append */
    4730             : 
    4731       46234 :                         if ((seg->end - seg->start) >= cnt) {
    4732             : 
    4733             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4734       46234 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4735       31439 :                                         slot = p->end;
    4736       31439 :                                         p->end += cnt;
    4737       31439 :                                         seg->start += cnt;
    4738       31439 :                                         s->segs->nr_reused += cnt;
    4739       31439 :                                         reused = 1;
    4740       31439 :                                         break;
    4741             :                                 }
    4742             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4743       14795 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
    4744             :                                         ok = LOG_ERR;
    4745             :                                         break;
    4746             :                                 }
    4747             :                         }
    4748       14795 :                         seg->ts = tr->tid;
    4749       14795 :                         seg->deleted = false;
    4750       14795 :                         slot = seg->start;
    4751       14795 :                         s->segs->nr_reused += cnt;
    4752       14795 :                         reused = 1;
    4753       14795 :                         break;
    4754             :                 }
    4755             :         }
    4756     1976753 :         if (ok == LOG_OK && !reused) {
    4757     1930520 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4758     1895460 :                         slot = s->segs->t->end;
    4759     1895460 :                         s->segs->t->end += cnt;
    4760             :                 } else {
    4761       35060 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4762             :                                 ok = LOG_ERR;
    4763             :                         } else {
    4764       35060 :                                 if (!s->segs->h)
    4765           0 :                                         s->segs->h = s->segs->t;
    4766       35060 :                                 slot = s->segs->t->start;
    4767             :                         }
    4768             :                 }
    4769             :         }
    4770     1976753 :         if (!locked)
    4771     1852092 :                 unlock_table(tr->store, t->base.id);
    4772             : 
    4773     1976753 :         if (ok == LOG_OK) {
    4774             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4775     1976754 :                 if (!in_transaction) {
    4776       49093 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4777       49093 :                         in_transaction = true;
    4778             :                 }
    4779     1976754 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4780     1976323 :                         tr->logchanges += (lng) cnt;
    4781     1976754 :                 *offset = slot;
    4782             :         }
    4783             :         return ok;
    4784             : }
    4785             : 
    4786             : /*
    4787             :  * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
    4788             :  * of the global transaction and mark the newly added storage slots unused on the global
    4789             :  * level but used on the local transaction level. Besides this the local transaction needs
    4790             :  * to update (and mark unused) any slot in between the old end and new slots.
    4791             :  * */
    4792             : static int
    4793     1882391 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4794             : {
    4795     1882391 :         storage *s;
    4796             : 
    4797             :         /* we have a single segment structure for each persistent table
    4798             :          * for temporary tables each has its own */
    4799     1882391 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4800             :                 return LOG_ERR;
    4801             : 
    4802     1882505 :         return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
    4803             : }
    4804             : 
    4805             : /* some tables cannot be updated concurrently (user/roles etc) */
    4806             : static int
    4807      124666 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4808             : {
    4809      124666 :         storage *s;
    4810      124666 :         int res = 0;
    4811             : 
    4812             :         /* we have a single segment structure for each persistent table
    4813             :          * for temporary tables each has its own */
    4814      124666 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4815             :                 /* TODO check for other inserts ! */
    4816             :                 return LOG_ERR;
    4817             : 
    4818      124666 :         lock_table(tr->store, t->base.id);
    4819      124666 :         if ((res = segments_conflict(tr, s->segs, 1))) {
    4820           4 :                 unlock_table(tr->store, t->base.id);
    4821           4 :                 return LOG_CONFLICT;
    4822             :         }
    4823      124662 :         res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
    4824      124662 :         unlock_table(tr->store, t->base.id);
    4825      124662 :         return res;
    4826             : }
    4827             : 
    4828             : static int
    4829       20103 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
    4830             : {
    4831       20103 :         storage *s;
    4832       20103 :         int res = 0;
    4833             : 
    4834       20103 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4835             :                 return LOG_ERR;
    4836             : 
    4837       20103 :         lock_table(tr->store, t->base.id);
    4838       20103 :         res = segments_conflict(tr, s->segs, uncommitted);
    4839       20103 :         unlock_table(tr->store, t->base.id);
    4840       20103 :         return res ? LOG_CONFLICT : LOG_OK;
    4841             : }
    4842             : 
    4843             : static size_t
    4844     1513024 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
    4845             : {
    4846     1513024 :         size_t cnt = 0;
    4847             : 
    4848     1524368 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
    4849             :                 ;
    4850             : 
    4851     3700702 :         for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
    4852     2187667 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
    4853       84830 :                         cnt += s->end - s->start;
    4854             :         }
    4855     1513035 :         return cnt;
    4856             : }
    4857             : 
    4858             : static BAT *
    4859     1512982 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
    4860             : {
    4861     1512982 :         lock_table(tr->store, t->base.id);
    4862     1513041 :         segment *s = S->segs->h;
    4863             :         /* step one no deletes -> dense range */
    4864     1513041 :         uint32_t cur = 0;
    4865     1513041 :         size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
    4866     1513046 :         if (!dnr) {
    4867     1437068 :                 unlock_table(tr->store, t->base.id);
    4868     1437064 :                 return BATdense(start, start, end-start);
    4869             :         }
    4870             : 
    4871       75978 :         BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
    4872       75978 :         if (!b) {
    4873           0 :                 unlock_table(tr->store, t->base.id);
    4874           0 :                 return NULL;
    4875             :         }
    4876             : 
    4877       75978 :         uint32_t *restrict dst = Tloc(b, 0);
    4878    56749670 :         for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
    4879    56724826 :                 if (s->end < start)
    4880         143 :                         continue;
    4881    56724683 :                 if (s->start >= end)
    4882             :                         break;
    4883    56673549 :                 msk m = (SEG_IS_VALID(s, tr));
    4884    56673549 :                 size_t lnr = s->end-s->start;
    4885    56673549 :                 if (s->start < start)
    4886        3716 :                         lnr -= (start - s->start);
    4887    56673549 :                 if (s->end > end)
    4888          51 :                         lnr -= s->end - end;
    4889             : 
    4890    56673549 :                 if (m) {
    4891      475045 :                         size_t used = pos&31, end = 32;
    4892      475045 :                         if (used) {
    4893      386461 :                                 if (lnr < (32-used))
    4894      245645 :                                         end = used + lnr;
    4895      386461 :                                 assert(end > used);
    4896      386461 :                                 cur |= ((1U << (end - used)) - 1) << used;
    4897      386461 :                                 lnr -= end - used;
    4898      386461 :                                 pos += end - used;
    4899      386461 :                                 if (end == 32) {
    4900      140816 :                                         *dst++ = cur;
    4901      140816 :                                         cur = 0;
    4902             :                                 }
    4903             :                         }
    4904      475045 :                         size_t full = lnr/32;
    4905      475045 :                         size_t rest = lnr%32;
    4906      475045 :                         if (full > 0) {
    4907      151280 :                                 memset(dst, ~0, full * sizeof(*dst));
    4908      151280 :                                 dst += full;
    4909      151280 :                                 lnr -= full * 32;
    4910      151280 :                                 pos += full * 32;
    4911             :                         }
    4912      475045 :                         if (rest > 0) {
    4913      218147 :                                 cur |= (1U << rest) - 1;
    4914      218147 :                                 lnr -= rest;
    4915      218147 :                                 pos += rest;
    4916             :                         }
    4917      475045 :                         assert(lnr==0);
    4918             :                 } else {
    4919    56198504 :                         size_t used = pos&31, end = 32;
    4920    56198504 :                         if (used) {
    4921    54447512 :                                 if (lnr < (32-used))
    4922    52647348 :                                         end = used + lnr;
    4923             : 
    4924    54447512 :                                 pos+= (end-used);
    4925    54447512 :                                 lnr-= (end-used);
    4926    54447512 :                                 if (end == 32) {
    4927     1800164 :                                         *dst++ = cur;
    4928     1800164 :                                         cur = 0;
    4929             :                                 }
    4930             :                         }
    4931    56198504 :                         size_t full = lnr/32;
    4932    56198504 :                         size_t rest = lnr%32;
    4933    56198504 :                         memset(dst, 0, full * sizeof(*dst));
    4934    56198504 :                         dst += full;
    4935    56198504 :                         lnr -= full * 32;
    4936    56198504 :                         pos += full * 32;
    4937    56198504 :                         pos+= rest;
    4938    56198504 :                         lnr-= rest;
    4939    56198504 :                         assert(lnr==0);
    4940             :                 }
    4941             :         }
    4942             : 
    4943       75978 :         unlock_table(tr->store, t->base.id);
    4944       75978 :         if (pos%32)
    4945       74103 :                 *dst=cur;
    4946       75978 :         BATsetcount(b, nr);
    4947       75978 :         bn = BATmaskedcands(start, nr, b, true);
    4948       75975 :         BBPreclaim(b);
    4949       75977 :         (void)pos;
    4950       75977 :         assert (pos == nr);
    4951             :         return bn;
    4952             : }
    4953             : 
    4954             : static void *                                   /* BAT * */
    4955     1518285 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
    4956             : {
    4957             :         /* with nr_of_parts - part_nr we can adjust parts */
    4958     1518285 :         storage *s = tab_timestamp_storage(tr, t);
    4959             : 
    4960     1518464 :         if (!s)
    4961             :                 return NULL;
    4962     1518464 :         size_t nr = segs_end(s->segs, tr, t);
    4963             : 
    4964     1519178 :         if (!nr)
    4965        6186 :                 return BATdense(0, 0, 0);
    4966             : 
    4967             :         /* compute proper part */
    4968     1512992 :         size_t part_size = nr/nr_of_parts;
    4969     1512992 :         size_t start = part_size * part_nr;
    4970     1512992 :         size_t end = start + part_size;
    4971     1512992 :         if (part_nr == (nr_of_parts-1))
    4972     1249800 :                 end = nr;
    4973     1512992 :         assert(end <= nr);
    4974     1512992 :         return segments2cands(s, tr, t, start, end);
    4975             : }
    4976             : 
    4977             : static int
    4978           5 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
    4979             : {
    4980           5 :         bool update_conflict = false;
    4981             : 
    4982           5 :         if (segments_in_transaction(tr, col->t))
    4983             :                 return LOG_CONFLICT;
    4984             : 
    4985           5 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    4986             : 
    4987           5 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    4988           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    4989           5 :         assert(d && d->cs.ts == tr->tid);
    4990           5 :         if (odelta != d)
    4991           5 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
    4992           5 :         if (d->cs.bid)
    4993           5 :                 temp_destroy(d->cs.bid);
    4994           5 :         if (d->cs.uibid)
    4995           5 :                 temp_destroy(d->cs.uibid);
    4996           5 :         if (d->cs.uvbid)
    4997           5 :                 temp_destroy(d->cs.uvbid);
    4998           5 :         bat_set_access(bn, BAT_READ);
    4999           5 :         d->cs.bid = temp_create(bn);
    5000           5 :         d->cs.uibid = 0;
    5001           5 :         d->cs.uvbid = 0;
    5002           5 :         d->cs.ucnt = 0;
    5003           5 :         d->cs.cleared = true;
    5004           5 :         d->cs.ts = tr->tid;
    5005           5 :         ATOMIC_INIT(&d->cs.refcnt, 1);
    5006           5 :         return LOG_OK;
    5007             : }
    5008             : 
    5009             : static int
    5010           5 : vacuum_col(sql_trans *tr, sql_column *c, bool force)
    5011             : {
    5012           5 :         if (segments_in_transaction(tr, c->t))
    5013             :                 return LOG_CONFLICT;
    5014             : 
    5015           5 :         sql_delta *d = NULL;
    5016             : 
    5017             :         /* do we have enough to clean */
    5018           5 :         if ((d = bind_col_data(tr, c, NULL)) == NULL)
    5019             :                 return LOG_CONFLICT;
    5020             : 
    5021             :         /* do we have enough to clean */
    5022           5 :         if (!force && (d->nr_updates) < 1024)
    5023             :                 return LOG_OK;
    5024             : 
    5025           5 :         BAT *b = NULL, *bn = NULL;;
    5026           5 :         if ((b = bind_col(tr, c, 0)) == NULL)
    5027             :                 return LOG_ERR;
    5028           5 :         if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
    5029           0 :                 BBPreclaim(b);
    5030           0 :                 return LOG_ERR;
    5031             :         }
    5032           5 :         int res = swap_bats(tr, c, bn);
    5033           5 :         d->nr_updates = 0;
    5034           5 :         BBPreclaim(b);
    5035           5 :         BBPreclaim(bn);
    5036           5 :         return res;
    5037             : }
    5038             : 
    5039             : static int
    5040           0 : vacuum_tab(sql_trans *tr, sql_table *t, bool force)
    5041             : {
    5042           0 :         if (segments_in_transaction(tr, t))
    5043             :                 return LOG_CONFLICT;
    5044             : 
    5045           0 :         storage *s;
    5046           0 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    5047             :                 return LOG_ERR;
    5048             : 
    5049           0 :         for( node *n = ol_first_node(t->columns); n; n = n->next) {
    5050           0 :                 sql_column *c = n->data;
    5051             : 
    5052           0 :                 if (!ATOMvarsized(c->type.type->localtype))
    5053           0 :                         continue;
    5054           0 :                 sql_delta *d = NULL;
    5055             : 
    5056             :                 /* do we have enough to clean */
    5057           0 :                 if ((d = bind_col_data(tr, c, NULL)) == NULL)
    5058             :                         return LOG_CONFLICT;
    5059             : 
    5060             :                 /* do we have enough to clean */
    5061           0 :                 if (!force && (d->nr_updates + s->segs->nr_reused) < 1024)
    5062           0 :                         continue;
    5063             : 
    5064           0 :                 BAT *b = NULL, *bn = NULL;;
    5065           0 :                 if ((b = bind_col(tr, c, 0)) == NULL)
    5066             :                         return LOG_ERR;
    5067           0 :                 if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
    5068           0 :                         BBPreclaim(b);
    5069           0 :                         return LOG_ERR;
    5070             :                 }
    5071           0 :                 int res = swap_bats(tr, c, bn);
    5072           0 :                 d->nr_updates = 0;
    5073           0 :                 BBPreclaim(b);
    5074           0 :                 BBPreclaim(bn);
    5075           0 :                 if (res != LOG_OK)
    5076           0 :                         return res;
    5077             :         }
    5078           0 :         s->segs->nr_reused = 0;
    5079           0 :         return LOG_OK;
    5080             : }
    5081             : 
    5082             : 
    5083             : static int
    5084          58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
    5085             : {
    5086          58 :         bool update_conflict = false;
    5087             : 
    5088          58 :         if (segments_in_transaction(tr, col->t))
    5089             :                 return LOG_CONFLICT;
    5090             : 
    5091          58 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    5092             : 
    5093          58 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    5094           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    5095          58 :         assert(d && d->cs.ts == tr->tid);
    5096          58 :         assert(col->t->persistence != SQL_DECLARED_TABLE);
    5097          58 :         if (odelta != d)
    5098          58 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
    5099             : 
    5100          58 :         d->cs.st = st;
    5101          58 :         d->cs.cleared = true;
    5102          58 :         if (d->cs.bid)
    5103          58 :                 temp_destroy(d->cs.bid);
    5104          58 :         o = transfer_to_systrans(o);
    5105          58 :         if (o == NULL)
    5106             :                 return LOG_ERR;
    5107          58 :         bat_set_access(o, BAT_READ);
    5108          58 :         d->cs.bid = temp_create(o);
    5109          58 :         if (u) {
    5110          53 :                 if (d->cs.ebid)
    5111           0 :                         temp_destroy(d->cs.ebid);
    5112          53 :                 u = transfer_to_systrans(u);
    5113          53 :                 if (u == NULL)
    5114             :                         return LOG_ERR;
    5115          53 :                 d->cs.ebid = temp_create(u);
    5116             :         }
    5117             :         return LOG_OK;
    5118             : }
    5119             : 
    5120             : void
    5121         315 : bat_storage_init( store_functions *sf)
    5122             : {
    5123         315 :         sf->bind_col = &bind_col;
    5124         315 :         sf->bind_updates = &bind_updates;
    5125         315 :         sf->bind_updates_idx = &bind_updates_idx;
    5126         315 :         sf->bind_idx = &bind_idx;
    5127         315 :         sf->bind_cands = &bind_cands;
    5128             : 
    5129         315 :         sf->claim_tab = &claim_tab;
    5130         315 :         sf->key_claim_tab = &key_claim_tab;
    5131         315 :         sf->tab_validate = &tab_validate;
    5132             : 
    5133         315 :         sf->append_col = &append_col;
    5134         315 :         sf->append_idx = &append_idx;
    5135             : 
    5136         315 :         sf->update_col = &update_col;
    5137         315 :         sf->update_idx = &update_idx;
    5138             : 
    5139         315 :         sf->delete_tab = &delete_tab;
    5140             : 
    5141         315 :         sf->count_del = &count_del;
    5142         315 :         sf->count_col = &count_col;
    5143         315 :         sf->count_idx = &count_idx;
    5144         315 :         sf->dcount_col = &dcount_col;
    5145         315 :         sf->min_max_col = &min_max_col;
    5146         315 :         sf->set_stats_col = &set_stats_col;
    5147         315 :         sf->sorted_col = &sorted_col;
    5148         315 :         sf->unique_col = &unique_col;
    5149         315 :         sf->double_elim_col = &double_elim_col;
    5150         315 :         sf->col_stats = &col_stats;
    5151         315 :         sf->col_set_range = &col_set_range;
    5152         315 :         sf->col_not_null = &col_not_null;
    5153             : 
    5154         315 :         sf->col_dup = &col_dup;
    5155         315 :         sf->idx_dup = &idx_dup;
    5156         315 :         sf->del_dup = &del_dup;
    5157             : 
    5158         315 :         sf->create_col = &create_col;    /* create and add to change list */
    5159         315 :         sf->create_idx = &create_idx;
    5160         315 :         sf->create_del = &create_del;
    5161             : 
    5162         315 :         sf->destroy_col = &destroy_col;  /* free resources */
    5163         315 :         sf->destroy_idx = &destroy_idx;
    5164         315 :         sf->destroy_del = &destroy_del;
    5165             : 
    5166         315 :         sf->drop_col = &drop_col;                /* add drop to change list */
    5167         315 :         sf->drop_idx = &drop_idx;
    5168         315 :         sf->drop_del = &drop_del;
    5169             : 
    5170         315 :         sf->clear_table = &clear_table;
    5171             : 
    5172         315 :         sf->vacuum_col = &vacuum_col;
    5173         315 :         sf->vacuum_tab = &vacuum_tab;
    5174         315 :         sf->col_compress = &col_compress;
    5175         315 : }

Generated by: LCOV version 1.14