LCOV - code coverage report
Current view: top level - sql/storage/bat - bat_storage.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 2494 3165 78.8 %
Date: 2025-03-24 21:28:01 Functions: 156 164 95.1 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024, 2025 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "bat_storage.h"
      15             : #include "bat_utils.h"
      16             : #include "sql_string.h"
      17             : #include "matomic.h"
      18             : 
      19             : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
      20             : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
      21             : 
      22             : static int log_update_col( sql_trans *tr, sql_change *c);
      23             : static int log_update_idx( sql_trans *tr, sql_change *c);
      24             : static int log_update_del( sql_trans *tr, sql_change *c);
      25             : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      26             : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      27             : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      28             : static int log_create_col(sql_trans *tr, sql_change *change);
      29             : static int log_create_idx(sql_trans *tr, sql_change *change);
      30             : static int log_create_del(sql_trans *tr, sql_change *change);
      31             : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      32             : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      33             : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      34             : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
      35             : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
      36             : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
      37             : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
      38             : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
      39             : 
      40             : static lng merge_delta( sql_delta *obat);
      41             : 
      42             : /* valid
      43             :  * !deleted && VALID_4_READ(TS, tr)                             existing or newly created segment
      44             :  *  deleted && TS > tr->ts && OLDTS < tr->ts                deleted after current transaction
      45             :  */
      46             : 
      47             : #define VALID_4_READ(TS,tr) \
      48             :         (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
      49             : 
      50             : /* when changed, check if the old status is still valid */
      51             : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
      52             :                 (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
      53             : 
      54             : #define SEG_VALID_4_DELETE(seg,tr) \
      55             :         (!seg->deleted && VALID_4_READ(seg->ts, tr))
      56             : 
      57             : /* Delete (in current trans or by some other finished transaction, or re-used segment which used to be deleted */
      58             : #define SEG_IS_DELETED(seg,tr) \
      59             :         ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
      60             :          (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
      61             : 
      62             : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
      63             : #define SEG_IS_VALID(seg, tr) \
      64             :                 ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
      65             :                  (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
      66             : 
      67             : static inline BAT *
      68        5299 : transfer_to_systrans(BAT *b)
      69             : {
      70             :         /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
      71        5299 :         MT_lock_set(&b->theaplock);
      72        5299 :         if (VIEWtparent(b) || VIEWvtparent(b)) {
      73          29 :                 MT_lock_unset(&b->theaplock);
      74          29 :                 BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
      75          29 :                 BBPreclaim(b);
      76          29 :                 return bn;
      77             :         }
      78        5270 :         if (b->theap->farmid == TRANSIENT ||
      79          62 :                 (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
      80        4885 :                 QryCtx *qc = MT_thread_get_qry_ctx();
      81        4885 :                 if (qc) {
      82        2523 :                         if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
      83        2523 :                                 ATOMIC_SUB(&qc->datasize, b->theap->size);
      84        2523 :                                 b->theap->farmid = SYSTRANS;
      85        2523 :                                 b->batRole = SYSTRANS;
      86             :                         }
      87        2523 :                         if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
      88        1092 :                                 ATOMIC_SUB(&qc->datasize, b->tvheap->size);
      89        1092 :                                 b->tvheap->farmid = SYSTRANS;
      90             :                         }
      91             :                 }
      92             :         }
      93        5270 :         MT_lock_unset(&b->theaplock);
      94        5270 :         return b;
      95             : }
      96             : 
      97             : static void
      98    29534725 : lock_table(sqlstore *store, sqlid id)
      99             : {
     100    29534725 :         MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     101    29620213 : }
     102             : 
     103             : static void
     104    29619629 : unlock_table(sqlstore *store, sqlid id)
     105             : {
     106    29619629 :         MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     107    29617063 : }
     108             : 
     109             : static void
     110    22993474 : lock_column(sqlstore *store, sqlid id)
     111             : {
     112    22993474 :         MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     113    23019101 : }
     114             : 
     115             : static void
     116    23016401 : unlock_column(sqlstore *store, sqlid id)
     117             : {
     118    23016401 :         MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     119    23029520 : }
     120             : 
     121             : static void
     122      115253 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     123             : {
     124      115253 :         assert(cleanup);
     125      115253 :         trans_add(tr, dup_base(b), data, cleanup, commit, log);
     126      115250 : }
     127             : 
     128             : static void
     129      132874 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     130             : {
     131      132874 :         assert(cleanup);
     132      132874 :         dup_base(&t->base);
     133      132875 :         trans_add(tr, b, data, cleanup, commit, log);
     134      132861 : }
     135             : 
     136             : static int
     137       72718 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
     138             : {
     139       72718 :         segment *s = change->data;
     140             : 
     141       72718 :         if (s->ts <= oldest) {
     142       33704 :                 while(s) {
     143       20713 :                         segment *n = s->prev;
     144       20713 :                         _DELETE(s);
     145       20713 :                         s = n;
     146             :                 }
     147       12991 :                 sqlstore *store = Store;
     148       12991 :                 table_destroy(store, (sql_table*)change->obj);
     149       12991 :                 return 1;
     150             :         }
     151             :         return LOG_OK;
     152             : }
     153             : 
     154             : static void
     155       20713 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
     156             : {
     157             :         /* we can only be accessed by anything older then commit_ts */
     158       20713 :         if (c->cleanup == &tc_gc_seg)
     159        7722 :                 s->prev = c->data;
     160             :         else
     161       12991 :                 c->cleanup = &tc_gc_seg;
     162       20713 :         c->data = s;
     163       20713 :         s->ts = commit_ts;
     164       15841 : }
     165             : 
     166             : static segment *
     167       89206 : new_segment(segment *o, sql_trans *tr, size_t cnt)
     168             : {
     169       89206 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     170             : 
     171       89211 :         assert(tr);
     172       89211 :         if (n) {
     173       89211 :                 *n = (segment) {
     174       89211 :                         .ts = tr->tid,
     175             :                         .oldts = 0,
     176             :                         .deleted = false,
     177             :                         .start = 0,
     178             :                         .end = cnt,
     179             :                         .next = ATOMIC_PTR_VAR_INIT(NULL),
     180             :                         .prev = NULL,
     181             :                 };
     182       89211 :                 if (o) {
     183       36322 :                         n->start += o->end;
     184       36322 :                         n->end += o->end;
     185       36322 :                         ATOMIC_PTR_SET(&o->next, n);
     186             :                 }
     187             :         }
     188       89211 :         return n;
     189             : }
     190             : 
     191             : static segment *
     192       96241 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
     193             : {
     194       96241 :         assert(tr);
     195       96241 :         if (o->start == start && o->end == start+cnt) {
     196       11199 :                 assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
     197       11199 :                 o->oldts = o->ts;
     198       11199 :                 o->ts = tr->tid;
     199       11199 :                 o->deleted = deleted;
     200       11199 :                 return o;
     201             :         }
     202       85042 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     203             : 
     204       85042 :         if (!n)
     205             :                 return NULL;
     206       85042 :         n->prev = NULL;
     207             : 
     208       85042 :         if (o->ts == tr->tid) {
     209        5930 :                 n->oldts = 0;
     210        5930 :                 n->ts = 1;
     211        5930 :                 n->deleted = true;
     212             :         } else {
     213       79112 :                 n->oldts = o->ts;
     214       79112 :                 n->ts = tr->tid;
     215       79112 :                 n->deleted = deleted;
     216             :         }
     217       85042 :         if (start == o->start) {
     218             :                 /* 2-way split: o remains latter part of segment, new one is
     219             :                  * inserted before */
     220       68404 :                 n->start = o->start;
     221       68404 :                 n->end = n->start + cnt;
     222       68404 :                 ATOMIC_PTR_INIT(&n->next, o);
     223       68404 :                 if (segs->h == o)
     224         476 :                         segs->h = n;
     225       68404 :                 if (p)
     226       67928 :                         ATOMIC_PTR_SET(&p->next, n);
     227       68404 :                 o->start = n->end;
     228       16638 :         } else if (start+cnt == o->end) {
     229             :                 /* 2-way split: o remains first part of segment, new one is
     230             :                  * added after */
     231        5870 :                 n->start = o->end - cnt;
     232        5870 :                 n->end = o->end;
     233        5870 :                 ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
     234        5870 :                 ATOMIC_PTR_SET(&o->next, n);
     235        5870 :                 if (segs->t == o)
     236         906 :                         segs->t = n;
     237        5870 :                 o->end = n->start;
     238             :         } else {
     239             :                 /* 3-way split: o remains first part of segment, two new ones
     240             :                  * are added after */
     241       10768 :                 segment *n2 = GDKmalloc(sizeof(segment));
     242       10768 :                 if (n2 == NULL) {
     243           0 :                         GDKfree(n);
     244           0 :                         return NULL;
     245             :                 }
     246       10768 :                 ATOMIC_PTR_INIT(&n->next, n2);
     247       10768 :                 n->start = start;
     248       10768 :                 n->end = start + cnt;
     249       10768 :                 *n2 = *o;
     250       10768 :                 ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
     251       10768 :                 n2->start = n->end;
     252       10768 :                 n2->prev = NULL;
     253       10768 :                 if (segs->t == o)
     254        4488 :                         segs->t = n2;
     255       10768 :                 ATOMIC_PTR_SET(&o->next, n);
     256       10768 :                 o->end = start;
     257             :         }
     258             :         return n;
     259             : }
     260             : 
     261             : static void
     262        3978 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
     263             : {
     264        3978 :         segment *cur = segs->h, *seg = NULL;
     265       17158 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     266       13180 :                 if (cur->ts == tr->tid) { /* revert */
     267        4475 :                         cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
     268        4475 :                         cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
     269        4475 :                         cur->oldts = 0;
     270             :                 }
     271       13180 :                 if (cur->ts <= oldest) { /* possibly merge range */
     272       12901 :                         if (!seg) { /* skip first */
     273             :                                 seg = cur;
     274        8923 :                         } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
     275             :                                 /* merge with previous */
     276        4872 :                                 seg->end = cur->end;
     277        4872 :                                 ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     278        4872 :                                 if (cur == segs->t)
     279        2855 :                                         segs->t = seg;
     280        4872 :                                 mark4destroy(cur, change, store_get_timestamp(tr->store));
     281        4872 :                                 cur = seg;
     282             :                         } else {
     283             :                                 seg = cur; /* begin of new merge */
     284             :                         }
     285             :                 }
     286             :         }
     287        3978 : }
     288             : 
     289             : static size_t
     290      105059 : segs_end_include_deleted( segments *segs, sql_trans *tr)
     291             : {
     292      105059 :         size_t cnt = 0;
     293      105059 :         segment *s = segs->h, *l = NULL;
     294             : 
     295      479103 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     296      374044 :                 if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
     297             :                                 l = s;
     298             :         }
     299      105059 :         if (l)
     300      105052 :                 cnt = l->end;
     301      105059 :         return cnt;
     302             : }
     303             : 
     304             : static int
     305      105059 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
     306             : {
     307             :         /* set bits correctly */
     308      105059 :         BAT *b = temp_descriptor(cs->bid);
     309             : 
     310      105059 :         if (!b)
     311             :                 return LOG_ERR;
     312      105059 :         segment *s = segs->h;
     313             : 
     314      105059 :         size_t nr = segs_end_include_deleted(segs, tr);
     315      105059 :         size_t rounded_nr = ((nr+31)&~31);
     316      105059 :         if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
     317           0 :                 bat_destroy(b);
     318           0 :                 return LOG_ERR;
     319             :         }
     320             : 
     321             :         /* disable all properties here */
     322      105059 :         MT_lock_set(&b->theaplock);
     323      105059 :         b->tsorted = false;
     324      105059 :         b->trevsorted = false;
     325      105059 :         b->tnosorted = 0;
     326      105059 :         b->tnorevsorted = 0;
     327      105059 :         b->tseqbase = oid_nil;
     328      105059 :         b->tkey = false;
     329      105059 :         b->tnokey[0] = 0;
     330      105059 :         b->tnokey[1] = 0;
     331      105059 :         BUN cnt = BATcount(b);
     332             : 
     333      105059 :         uint32_t *restrict dst;
     334      421694 :         for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
     335      359654 :                 if (s->start >= nr)
     336             :                         break;
     337      316635 :                 if (s->ts == tr->tid && s->end != s->start) {
     338      152354 :                         if (cnt < s->start) { /* first mark as deleted ! */
     339        2805 :                                 size_t lnr = s->start-cnt;
     340        2805 :                                 size_t pos = cnt;
     341        2805 :                                 dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     342        2805 :                                 uint32_t cur = 0;
     343             : 
     344        2805 :                                 size_t used = pos&31, end = 32;
     345        2805 :                                 if (used) {
     346        2711 :                                         if (lnr < (32-used))
     347        2575 :                                                 end = used + lnr;
     348        2711 :                                         assert(end > used);
     349        2711 :                                         cur |= ((1U << (end - used)) - 1) << used;
     350        2711 :                                         lnr -= end - used;
     351        2711 :                                         *dst++ |= cur;
     352        2711 :                                         cur = 0;
     353             :                                 }
     354        2805 :                                 size_t full = lnr/32;
     355        2805 :                                 size_t rest = lnr%32;
     356        2805 :                                 if (full > 0) {
     357           9 :                                         memset(dst, ~0, full * sizeof(*dst));
     358           9 :                                         dst += full;
     359           9 :                                         lnr -= full * 32;
     360             :                                 }
     361        2805 :                                 if (rest > 0) {
     362         143 :                                         cur |= (1U << rest) - 1;
     363         143 :                                         lnr -= rest;
     364         143 :                                         *dst |= cur;
     365             :                                 }
     366        2805 :                                 assert(lnr==0);
     367             :                         }
     368      152354 :                         size_t lnr = s->end-s->start;
     369      152354 :                         size_t pos = s->start;
     370      152354 :                         dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     371      152354 :                         uint32_t cur = 0;
     372      152354 :                         size_t used = pos&31, end = 32;
     373      152354 :                         if (used) {
     374      113973 :                                 if (lnr < (32-used))
     375      108518 :                                         end = used + lnr;
     376      113973 :                                 assert(end > used);
     377      113973 :                                 cur |= ((1U << (end - used)) - 1) << used;
     378      113973 :                                 lnr -= end - used;
     379      113973 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     380      113973 :                                 dst++;
     381      113973 :                                 cur = 0;
     382             :                         }
     383      152354 :                         size_t full = lnr/32;
     384      152354 :                         size_t rest = lnr%32;
     385      152354 :                         if (full > 0) {
     386        3858 :                                 memset(dst, s->deleted?~0:0, full * sizeof(*dst));
     387        3858 :                                 dst += full;
     388        3858 :                                 lnr -= full * 32;
     389             :                         }
     390      152354 :                         if (rest > 0) {
     391       40918 :                                 cur |= (1U << rest) - 1;
     392       40918 :                                 lnr -= rest;
     393       40918 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     394             :                         }
     395      152354 :                         assert(lnr==0);
     396      152354 :                         if (cnt < s->end)
     397      316635 :                                 cnt = s->end;
     398             :                 }
     399             :         }
     400      105059 :         if (nr > BATcount(b)) {
     401       63420 :                 BATsetcount(b, nr);
     402             :         }
     403      105059 :         b->theap->dirty = true;
     404      105059 :         MT_lock_unset(&b->theaplock);
     405             : 
     406      105059 :         bat_destroy(b);
     407      105059 :         return LOG_OK;
     408             : }
     409             : 
     410             : /* TODO return LOG_OK/ERR */
     411             : static void
     412      105085 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
     413             : {
     414      105085 :         sqlstore* store = tr->store;
     415      105085 :         segment *cur = s->segs->h, *seg = NULL;
     416      479201 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     417      374116 :                 if (cur->ts == tr->tid) {
     418      167039 :                         if (!cur->deleted)
     419       93703 :                                 cur->oldts = 0;
     420      167039 :                         cur->ts = commit_ts;
     421             :                 }
     422      374116 :                 if (!seg) {
     423             :                         /* first segment */
     424             :                         seg = cur;
     425             :                 }
     426      269031 :                 else if (seg->ts < TRANSACTION_ID_BASE) {
     427             :                         /* possible merge since both deleted flags are equal */
     428      252505 :                         if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
     429      185345 :                                 int merge = 1;
     430      185345 :                                 node *n = store->active->h;
     431      569237 :                                 for (int i = 0; i < store->active->cnt; i++, n = n->next) {
     432      463958 :                                         sql_trans* other = ((sql_trans*)n->data);
     433      463958 :                                         ulng active = other->ts;
     434      463958 :                                         if(other->active == 2)
     435       21942 :                                                 continue; /* pretend that another recently committed transaction is no longer active */
     436      442016 :                                         if (active == tr->ts)
     437      131926 :                                                 continue; /* pretend that committing transaction has already committed and is no longer active */
     438      310090 :                                         if (seg->ts < active && cur->ts < active)
     439             :                                                 break;
     440      299307 :                                         if (seg->ts > active && cur->ts > active)
     441      230024 :                                                 continue;
     442             : 
     443       69283 :                                         assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
     444             :                                         /* cannot safely merge since there is an active transaction between the segments */
     445             :                                         merge = false;
     446             :                                         break;
     447             :                                 }
     448             :                                 /* merge segments */
     449      232124 :                                 if (merge) {
     450      116062 :                                         seg->end = cur->end;
     451      116062 :                                         ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     452      116062 :                                         if (cur == s->segs->t)
     453       28563 :                                                 s->segs->t = seg;
     454      116062 :                                         if (commit_ts == oldest) {
     455      100221 :                                                 _DELETE(cur);
     456             :                                         } else
     457       31682 :                                                 mark4destroy(cur, change, commit_ts);
     458      116062 :                                         cur = seg;
     459      116062 :                                         continue;
     460             :                                 }
     461             :                         }
     462             :                 }
     463             :                 seg = cur;
     464             :         }
     465      105085 : }
     466             : 
     467             : static int
     468     2219337 : segments_in_transaction(sql_trans *tr, sql_table *t)
     469             : {
     470     2219337 :         storage *s = ATOMIC_PTR_GET(&t->data);
     471     2219337 :         segment *seg = s->segs->h;
     472             : 
     473     2219337 :         if (seg && s->segs->t->ts == tr->tid)
     474             :                 return 1;
     475      600445 :         for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
     476      494955 :                 if (seg->ts == tr->tid)
     477             :                         return 1;
     478             :         }
     479             :         return 0;
     480             : }
     481             : 
     482             : static size_t
     483    21449547 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
     484             : {
     485    21449547 :         size_t cnt = 0;
     486             : 
     487             :         /* because a table can grow rows over the time a transaction is running, we need to find the last valid segment, to
     488             :          * keep all of the parts aligned */
     489    21449547 :         lock_table(tr->store, table->base.id);
     490    21511803 :         segment *s = segs->h, *l = NULL;
     491             : 
     492    21511803 :         if (segs->t && SEG_IS_VALID(segs->t, tr))
     493    17747860 :                 l = s = segs->t;
     494             : 
     495   221131727 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     496   199620184 :                 if (SEG_IS_VALID(s, tr))
     497             :                                 l = s;
     498             :         }
     499    21511543 :         if (l)
     500    21492875 :                 cnt = l->end;
     501    21511543 :         unlock_table(tr->store, table->base.id);
     502    21509695 :         return cnt;
     503             : }
     504             : 
     505             : static segments *
     506       52888 : new_segments(sql_trans *tr, size_t cnt)
     507             : {
     508       52888 :         segments *n = (segments*)GDKmalloc(sizeof(segments));
     509             : 
     510       52889 :         if (n) {
     511       52889 :                 n->nr_reused = 0;
     512       52889 :                 n->h = n->t = new_segment(NULL, tr, cnt);
     513       52889 :                 if (!n->h) {
     514           0 :                         GDKfree(n);
     515           0 :                         return NULL;
     516             :                 }
     517       52889 :                 sql_ref_init(&n->r);
     518             :         }
     519             :         return n;
     520             : }
     521             : 
     522             : static sql_delta *
     523    34611437 : timestamp_delta( sql_trans *tr, sql_delta *d)
     524             : {
     525    34634669 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     526       23232 :                 d = d->next;
     527    34618455 :         return d;
     528             : }
     529             : 
     530             : static sql_delta *
     531    34448271 : col_timestamp_delta( sql_trans *tr, sql_column *c)
     532             : {
     533    34448271 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
     534             : }
     535             : 
     536             : static sql_delta *
     537       27324 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
     538             : {
     539       27324 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
     540             : }
     541             : 
     542             : static storage *
     543    21939663 : timestamp_storage( sql_trans *tr, storage *d)
     544             : {
     545    21939663 :         if (!d)
     546             :                 return NULL;
     547    22015447 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     548       75784 :                 d = d->next;
     549             :         return d;
     550             : }
     551             : 
     552             : static storage *
     553    21927188 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
     554             : {
     555    21927188 :         return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
     556             : }
     557             : 
     558             : static sql_delta*
     559       21406 : delta_dup(sql_delta *d)
     560             : {
     561       21406 :         ATOMIC_INC(&d->cs.refcnt);
     562       21406 :         return d;
     563             : }
     564             : 
     565             : static void *
     566       19590 : col_dup(sql_column *c)
     567             : {
     568       19590 :         if (!ATOMIC_PTR_GET(&c->data))
     569             :                 return NULL;
     570       19590 :         return delta_dup(ATOMIC_PTR_GET(&c->data));
     571             : 
     572             : }
     573             : 
     574             : static void *
     575        3103 : idx_dup(sql_idx *i)
     576             : {
     577        3103 :         if (!ATOMIC_PTR_GET(&i->data))
     578             :                 return NULL;
     579        1816 :         return delta_dup(ATOMIC_PTR_GET(&i->data));
     580             : }
     581             : 
     582             : static storage*
     583        1607 : storage_dup(storage *d)
     584             : {
     585        1607 :         ATOMIC_INC(&d->cs.refcnt);
     586        1607 :         return d;
     587             : }
     588             : 
     589             : static void *
     590        1607 : del_dup(sql_table *t)
     591             : {
     592        1607 :         return storage_dup(ATOMIC_PTR_GET(&t->data));
     593             : }
     594             : 
     595             : static size_t
     596          17 : count_inserts( segment *s, sql_trans *tr)
     597             : {
     598          17 :         size_t cnt = 0;
     599             : 
     600          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     601          55 :                 if (!s->deleted && s->ts == tr->tid)
     602           4 :                         cnt += s->end - s->start;
     603             :         }
     604          17 :         return cnt;
     605             : }
     606             : 
     607             : static size_t
     608      880813 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
     609             : {
     610      880813 :         size_t cnt = 0;
     611             : 
     612     1032107 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
     613             :                 ;
     614             : 
     615     5627625 :         for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
     616     4746823 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
     617     1634651 :                         cnt += s->end - s->start;
     618             :         }
     619      880802 :         return cnt;
     620             : }
     621             : 
     622             : static size_t
     623             : count_deletes( segment *s, sql_trans *tr)
     624             : {
     625             :         size_t cnt = 0;
     626             : 
     627             :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     628             :                 if (SEG_IS_DELETED(s, tr))
     629             :                         cnt += s->end - s->start;
     630             :         }
     631             :         return cnt;
     632             : }
     633             : 
     634             : static size_t
     635    18696852 : count_col(sql_trans *tr, sql_column *c, int access)
     636             : {
     637    18696852 :         storage *d;
     638    18696852 :         sql_delta *ds;
     639             : 
     640    18696852 :         if (!isTable(c->t))
     641             :                 return 0;
     642    18696852 :         d = tab_timestamp_storage(tr, c->t);
     643    18705224 :         ds = col_timestamp_delta(tr, c);
     644    18725696 :         if (!d ||!ds)
     645             :                 return 0;
     646    18725696 :         if (access == 2)
     647      495087 :                 return ds?ds->cs.ucnt:0;
     648    18230609 :         if (access == 1)
     649          17 :                 return count_inserts(d->segs->h, tr);
     650    18230592 :         if (access == QUICK)
     651           0 :                 return d->segs->t?d->segs->t->end:0;
     652    18230592 :         if (access == CNT_ACTIVE) {
     653       42511 :                 size_t cnt = segs_end(d->segs, tr, c->t);
     654       42511 :                 lock_table(tr->store, c->t->base.id);
     655       42511 :                 cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
     656       42509 :                 unlock_table(tr->store, c->t->base.id);
     657       42509 :                 return cnt;
     658             :         }
     659    18188081 :         return segs_end(d->segs, tr, c->t);
     660             : }
     661             : 
     662             : static size_t
     663       23336 : count_idx(sql_trans *tr, sql_idx *i, int access)
     664             : {
     665       23336 :         storage *d;
     666       23336 :         sql_delta *ds;
     667             : 
     668       23336 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
     669        6719 :                 return 0;
     670       16616 :         d = tab_timestamp_storage(tr, i->t);
     671       16628 :         ds = idx_timestamp_delta(tr, i);
     672       16668 :         if (!d || !ds)
     673             :                 return 0;
     674       16668 :         if (access == 2)
     675        2871 :                 return ds?ds->cs.ucnt:0;
     676       13797 :         if (access == 1)
     677           0 :                 return count_inserts(d->segs->h, tr);
     678       13797 :         if (access == QUICK)
     679        3529 :                 return d->segs->t?d->segs->t->end:0;
     680       10268 :         return segs_end(d->segs, tr, i->t);
     681             : }
     682             : 
     683             : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
     684             : static BAT *
     685    15736672 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
     686             : {
     687    15736672 :         BAT *b;
     688             : 
     689    15736672 :         assert(access == RD_UPD_ID || access == RD_UPD_VAL);
     690             :         /* returns the updates for cs */
     691    15736672 :         if (cs->uibid && cs->uvbid && cs->ucnt) {
     692       14636 :                 if (access == RD_UPD_ID) {
     693        8811 :                         if (!(b = temp_descriptor(cs->uibid)))
     694             :                                 return NULL;
     695        8811 :                         if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
     696        2317 :                            (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
     697        6494 :                                         oid nil = oid_nil;
     698             :                                         /* less then cnt */
     699        6494 :                                         BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false, false);
     700        6494 :                                         if (!s) {
     701           0 :                                                 bat_destroy(b);
     702           0 :                                                 return NULL;
     703             :                                         }
     704             : 
     705        6494 :                                         BAT *nb = BATproject(s, b);
     706        6494 :                                         bat_destroy(s);
     707        6494 :                                         bat_destroy(b);
     708        6494 :                                         b = nb;
     709             :                         }
     710             :                 } else {
     711        5825 :                         b = temp_descriptor(cs->uvbid);
     712             :                 }
     713             :         } else {
     714    26204326 :                 b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
     715             :         }
     716             :         return b;
     717             : }
     718             : 
     719             : static BAT *
     720           0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
     721             : {
     722           0 :         int err = 0;
     723           0 :         BAT *uv = *UV;
     724           0 :         BUN cnt = BATcount(ui)+BATcount(oi);
     725           0 :         BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
     726           0 :         BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
     727             : 
     728           0 :         if (!ni || (uv && !nv)) {
     729           0 :                 bat_destroy(ni);
     730           0 :                 bat_destroy(nv);
     731           0 :                 bat_destroy(ui);
     732           0 :                 bat_destroy(uv);
     733           0 :                 bat_destroy(oi);
     734           0 :                 bat_destroy(ov);
     735           0 :                 return NULL;
     736             :         }
     737           0 :         BATiter uvi;
     738           0 :         BATiter ovi;
     739             : 
     740           0 :         if (uv) {
     741           0 :                 uvi = bat_iterator(uv);
     742           0 :                 ovi = bat_iterator(ov);
     743             :         }
     744             : 
     745             :         /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
     746           0 :         BUN uip = 0, uie = BATcount(ui);
     747           0 :         BUN oip = 0, oie = BATcount(oi);
     748             : 
     749           0 :         oid uiseqb = ui->tseqbase;
     750           0 :         oid oiseqb = oi->tseqbase;
     751           0 :         oid *uipt = NULL, *oipt = NULL;
     752           0 :         BATiter uii = bat_iterator(ui);
     753           0 :         BATiter oii = bat_iterator(oi);
     754           0 :         if (!BATtdensebi(&uii))
     755           0 :                 uipt = uii.base;
     756           0 :         if (!BATtdensebi(&oii))
     757           0 :                 oipt = oii.base;
     758           0 :         while (uip < uie && oip < oie && !err) {
     759           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     760           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     761             : 
     762           0 :                 if (uiid <= oiid) {
     763           0 :                         if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     764           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     765             :                                 err = 1;
     766           0 :                         uip++;
     767           0 :                         if (uiid == oiid)
     768           0 :                                 oip++;
     769             :                 } else { /* uiid > oiid */
     770           0 :                         if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     771           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     772             :                                 err = 1;
     773           0 :                         oip++;
     774             :                 }
     775             :         }
     776           0 :         while (uip < uie && !err) {
     777           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     778           0 :                 if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     779           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     780             :                         err = 1;
     781           0 :                 uip++;
     782             :         }
     783           0 :         while (oip < oie && !err) {
     784           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     785           0 :                 if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     786           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     787             :                         err = 1;
     788           0 :                 oip++;
     789             :         }
     790           0 :         if (uv) {
     791           0 :                 bat_iterator_end(&uvi);
     792           0 :                 bat_iterator_end(&ovi);
     793             :         }
     794           0 :         bat_iterator_end(&uii);
     795           0 :         bat_iterator_end(&oii);
     796           0 :         bat_destroy(ui);
     797           0 :         bat_destroy(uv);
     798           0 :         bat_destroy(oi);
     799           0 :         bat_destroy(ov);
     800           0 :         if (!err) {
     801           0 :                 if (nv)
     802           0 :                         *UV = nv;
     803           0 :                 return ni;
     804             :         }
     805           0 :         *UV = NULL;
     806           0 :         bat_destroy(ni);
     807           0 :         bat_destroy(nv);
     808           0 :         return NULL;
     809             : }
     810             : 
     811             : static sql_delta *
     812    10494334 : older_delta( sql_delta *d, sql_trans *tr)
     813             : {
     814    10494334 :         sql_delta *o = d->next;
     815             : 
     816    10503571 :         while (o && !o->cs.merged) {
     817        9210 :                 if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     818             :                         break;
     819             :                 else
     820        9237 :                         o = o->next;
     821             :         }
     822    10494361 :         if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     823           0 :                 return o;
     824             :         return NULL;
     825             : }
     826             : 
     827             : static BAT *
     828    10490806 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
     829             : {
     830    10490806 :         assert(tr->active);
     831    10490806 :         sql_delta *o = NULL;
     832    10490806 :         BAT *ui = NULL, *uv = NULL;
     833             : 
     834    10490806 :         if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
     835             :                 return NULL;
     836    10494088 :         if (access == RD_UPD_VAL) {
     837     5248725 :                 if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
     838           0 :                         bat_destroy(ui);
     839           0 :                         return NULL;
     840             :                 }
     841             :         }
     842    10494291 :         while ((o = older_delta(d, tr)) != NULL) {
     843           0 :                 BAT *oui = NULL, *ouv = NULL;
     844           0 :                 if (!oui)
     845           0 :                         oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
     846           0 :                 if (access == RD_UPD_VAL)
     847           0 :                         ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
     848           0 :                 if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
     849           0 :                         bat_destroy(ui);
     850           0 :                         bat_destroy(uv);
     851           0 :                         bat_destroy(oui);
     852           0 :                         bat_destroy(ouv);
     853           0 :                         return NULL;
     854             :                 }
     855           0 :                 if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
     856             :                         return NULL;
     857             :                 d = o;
     858             :         }
     859    10494377 :         if (uv) {
     860     5248917 :                 bat_destroy(ui);
     861     5248917 :                 return uv;
     862             :         }
     863             :         return ui;
     864             : }
     865             : 
     866             : static BAT *
     867        2842 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
     868             : {
     869        2842 :         lock_column(tr->store, c->base.id);
     870        2842 :         sql_delta *d = col_timestamp_delta(tr, c);
     871        2842 :         int type = c->type.multiset?TYPE_int:c->type.type->localtype;
     872             : 
     873        2842 :         if (!d) {
     874           0 :                 unlock_column(tr->store, c->base.id);
     875           0 :                 return NULL;
     876             :         }
     877        2842 :         if (d->cs.st == ST_DICT) {
     878           0 :                 BAT *b = quick_descriptor(d->cs.bid);
     879             : 
     880           0 :                 type = b->ttype;
     881             :         }
     882        2842 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     883        2842 :         unlock_column(tr->store, c->base.id);
     884        2842 :         return bn;
     885             : }
     886             : 
     887             : static BAT *
     888           0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
     889             : {
     890           0 :         lock_column(tr->store, i->base.id);
     891           0 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     892           0 :         sql_delta *d = idx_timestamp_delta(tr, i);
     893             : 
     894           0 :         if (!d) {
     895           0 :                 unlock_column(tr->store, i->base.id);
     896           0 :                 return NULL;
     897             :         }
     898           0 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     899           0 :         unlock_column(tr->store, i->base.id);
     900           0 :         return bn;
     901             : }
     902             : 
     903             : static BAT *
     904    10519833 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
     905             : {
     906    10519833 :         BAT *b;
     907             : 
     908    10519833 :         assert(access == RDONLY || access == QUICK || access == RD_EXT);
     909    10519833 :         assert(cs != NULL);
     910    10519833 :         if (access == QUICK)
     911      176695 :                 return quick_descriptor(cs->bid);
     912    10343138 :         if (access == RD_EXT)
     913         858 :                 return temp_descriptor(cs->ebid);
     914    10342280 :         assert(cs->bid);
     915    10342280 :         b = temp_descriptor(cs->bid);
     916    10343034 :         if (b == NULL)
     917             :                 return NULL;
     918    10343034 :         assert(b->batRestricted == BAT_READ);
     919             :         /* return slice */
     920    10343034 :         BAT *s = BATslice(b, 0, cnt);
     921    10331335 :         bat_destroy(b);
     922    10331335 :         return s;
     923             : }
     924             : 
     925             : static int
     926     5244747 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
     927             : {
     928     5244747 :         lock_column(tr->store, c->base.id);
     929     5244672 :         size_t cnt = count_col(tr, c, 0);
     930     5246019 :         sql_delta *d = col_timestamp_delta(tr, c);
     931     5246048 :         int type = c->type.multiset?TYPE_int:c->type.type->localtype;
     932             : 
     933     5246048 :         if (!d) {
     934           0 :                 unlock_column(tr->store, c->base.id);
     935           0 :                 return LOG_ERR;
     936             :         }
     937     5246048 :         if (d->cs.st == ST_DICT) {
     938           2 :                 BAT *b = quick_descriptor(d->cs.bid);
     939             : 
     940           2 :                 type = b->ttype;
     941             :         }
     942             : 
     943     5246048 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     944     5245995 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     945             : 
     946     5245954 :         unlock_column(tr->store, c->base.id);
     947             : 
     948     5245691 :         if (*ui == NULL || *uv == NULL) {
     949           0 :                 bat_destroy(*ui);
     950           0 :                 bat_destroy(*uv);
     951           0 :                 return LOG_ERR;
     952             :         }
     953             :         return LOG_OK;
     954             : }
     955             : 
     956             : static int
     957          23 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
     958             : {
     959          23 :         lock_column(tr->store, i->base.id);
     960          23 :         size_t cnt = count_idx(tr, i, 0);
     961          23 :         sql_delta *d = idx_timestamp_delta(tr, i);
     962          23 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     963             : 
     964          23 :         if (!d) {
     965           0 :                 unlock_column(tr->store, i->base.id);
     966           0 :                 return LOG_ERR;
     967             :         }
     968             : 
     969          23 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     970          23 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     971             : 
     972          23 :         unlock_column(tr->store, i->base.id);
     973             : 
     974          23 :         if (*ui == NULL || *uv == NULL) {
     975           0 :                 bat_destroy(*ui);
     976           0 :                 bat_destroy(*uv);
     977           0 :                 return LOG_ERR;
     978             :         }
     979             :         return LOG_OK;
     980             : }
     981             : 
     982             : static void *                                   /* BAT * */
     983    10507735 : bind_col(sql_trans *tr, sql_column *c, int access)
     984             : {
     985    10507735 :         assert(access == QUICK || tr->active);
     986    10507735 :         if (!isTable(c->t))
     987             :                 return NULL;
     988    10507735 :         sql_delta *d = col_timestamp_delta(tr, c);
     989    10509700 :         if (!d)
     990             :                 return NULL;
     991    10509700 :         size_t cnt = count_col(tr, c, 0);
     992    10513207 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
     993        2842 :                 return bind_ucol(tr, c, access, cnt);
     994    10510365 :         BAT *b = cs_bind_bat( &d->cs, access, cnt);
     995    10507188 :         int type = c->type.multiset?TYPE_int:c->type.type->localtype;
     996    10507188 :         (void)type;
     997    10507188 :         assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == type) || (access == QUICK && b->ttype < 0));
     998             :         return b;
     999             : }
    1000             : 
    1001             : static void *                                   /* BAT * */
    1002       10691 : bind_idx(sql_trans *tr, sql_idx * i, int access)
    1003             : {
    1004       10691 :         assert(access == QUICK || tr->active);
    1005       10691 :         if (!isTable(i->t))
    1006             :                 return NULL;
    1007       10691 :         sql_delta *d = idx_timestamp_delta(tr, i);
    1008       10701 :         if (!d)
    1009             :                 return NULL;
    1010       10701 :         size_t cnt = count_idx(tr, i, 0);
    1011       10706 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
    1012           0 :                 return bind_uidx(tr, i, access, cnt);
    1013       10706 :         return cs_bind_bat( &d->cs, access, cnt);
    1014             : }
    1015             : 
    1016             : static int
    1017        4152 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
    1018             : {
    1019        4152 :         if (!cs->uibid) {
    1020           0 :                 cs->uibid = e_bat(TYPE_oid);
    1021           0 :                 if (cs->uibid == BID_NIL)
    1022             :                         return LOG_ERR;
    1023             :         }
    1024        4152 :         if (!cs->uvbid) {
    1025           0 :                 BAT *cur = quick_descriptor(cs->bid);
    1026           0 :                 if (!cur)
    1027             :                         return LOG_ERR;
    1028           0 :                 int type = cur->ttype;
    1029           0 :                 cs->uvbid = e_bat(type);
    1030           0 :                 if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1031             :                         return LOG_ERR;
    1032             :         }
    1033        4152 :         BAT *ui = temp_descriptor(cs->uibid);
    1034        4152 :         BAT *uv = temp_descriptor(cs->uvbid);
    1035             : 
    1036        4152 :         if (ui == NULL || uv == NULL) {
    1037           0 :                 bat_destroy(ui);
    1038           0 :                 bat_destroy(uv);
    1039           0 :                 return LOG_ERR;
    1040             :         }
    1041        4152 :         assert(ui && uv);
    1042        4152 :         if (isEbat(ui)){
    1043         400 :                 temp_destroy(cs->uibid);
    1044         400 :                 cs->uibid = temp_copy(ui->batCacheid, true, true);
    1045         400 :                 bat_destroy(ui);
    1046         400 :                 if (cs->uibid == BID_NIL ||
    1047         400 :                     (ui = temp_descriptor(cs->uibid)) == NULL) {
    1048           0 :                         bat_destroy(uv);
    1049           0 :                         return LOG_ERR;
    1050             :                 }
    1051             :         }
    1052        4152 :         if (isEbat(uv)){
    1053         400 :                 temp_destroy(cs->uvbid);
    1054         400 :                 cs->uvbid = temp_copy(uv->batCacheid, true, true);
    1055         400 :                 bat_destroy(uv);
    1056         400 :                 if (cs->uvbid == BID_NIL ||
    1057         400 :                     (uv = temp_descriptor(cs->uvbid)) == NULL) {
    1058           0 :                         bat_destroy(ui);
    1059           0 :                         return LOG_ERR;
    1060             :                 }
    1061             :         }
    1062        4152 :         *Ui = ui;
    1063        4152 :         *Uv = uv;
    1064        4152 :         return LOG_OK;
    1065             : }
    1066             : 
    1067             : static int
    1068        6972 : segments_is_append(segment *s, sql_trans *tr, oid rid)
    1069             : {
    1070      102649 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1071      102649 :                 if (s->start <= rid && s->end > rid) {
    1072        6972 :                         if (s->ts == tr->tid && !s->deleted) {
    1073        2922 :                                 return 1;
    1074             :                         }
    1075             :                         break;
    1076             :                 }
    1077             :         }
    1078             :         return 0;
    1079             : }
    1080             : 
    1081             : static int
    1082        4050 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
    1083             : {
    1084       96455 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1085       96455 :                 if (s->start <= rid && s->end > rid) {
    1086        4050 :                         if (s->ts >= tr->ts && s->deleted) {
    1087           0 :                                 return 1;
    1088             :                         }
    1089             :                         break;
    1090             :                 }
    1091             :         }
    1092             :         return 0;
    1093             : }
    1094             : 
    1095             : static sql_delta *
    1096           0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
    1097             : {
    1098           0 :         sql_delta *n = ZNEW(sql_delta);
    1099           0 :         if (!n)
    1100             :                 return NULL;
    1101           0 :         *n = *bat;
    1102           0 :         n->next = NULL;
    1103           0 :         n->cs.ts = tr->tid;
    1104           0 :         return n;
    1105             : }
    1106             : 
    1107             : static BAT *
    1108          17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
    1109             : {
    1110          17 :         BAT *newoffsets = NULL;
    1111          17 :         sql_delta *bat = *batp;
    1112          17 :         column_storage *cs = &bat->cs;
    1113          17 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1114             : 
    1115          17 :         if (!u)
    1116             :                 return NULL;
    1117          17 :         BUN max_cnt = (BATcount(u) < 256)?256:(BATcount(u)<65536)?65536:INT_MAX;
    1118          17 :         if (DICTprepare4append(&newoffsets, i, u) < 0) {
    1119           0 :                 bat_destroy(u);
    1120           0 :                 return NULL;
    1121             :         } else {
    1122          17 :                 int new = 0;
    1123             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1124          17 :                 if (BATcount(u) >= max_cnt) {
    1125           1 :                         if (max_cnt == INT_MAX) { /* decompress */
    1126           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1127           0 :                                         bat_destroy(u);
    1128           0 :                                         return NULL;
    1129             :                                 }
    1130           0 :                                 if (cs->ucnt) {
    1131           0 :                                         BAT *ui = NULL, *uv = NULL;
    1132           0 :                                         BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1133           0 :                                         bat_destroy(b);
    1134           0 :                                         if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1135           0 :                                                 bat_destroy(nb);
    1136           0 :                                                 bat_destroy(u);
    1137           0 :                                                 return NULL;
    1138             :                                         }
    1139           0 :                                         b = nb;
    1140           0 :                                         if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1141           0 :                                                 bat_destroy(ui);
    1142           0 :                                                 bat_destroy(uv);
    1143           0 :                                                 bat_destroy(b);
    1144           0 :                                                 bat_destroy(u);
    1145             :                                         }
    1146           0 :                                         bat_destroy(ui);
    1147           0 :                                         bat_destroy(uv);
    1148             :                                 }
    1149           0 :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1150           0 :                                 bat_destroy(b);
    1151           0 :                                 assert(newoffsets == NULL);
    1152           0 :                                 if (!n) {
    1153           0 :                                         bat_destroy(u);
    1154           0 :                                         return NULL;
    1155             :                                 }
    1156           0 :                                 if (cs->ts != tr->tid) {
    1157           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1158           0 :                                                 bat_destroy(n);
    1159           0 :                                                 return NULL;
    1160             :                                         }
    1161           0 :                                         cs = &(*batp)->cs;
    1162           0 :                                         new = 1;
    1163             :                                 }
    1164           0 :                                 if (cs->bid && !new)
    1165           0 :                                         temp_destroy(cs->bid);
    1166           0 :                                 n = transfer_to_systrans(n);
    1167           0 :                                 if (n == NULL)
    1168             :                                         return NULL;
    1169           0 :                                 bat_set_access(n, BAT_READ);
    1170           0 :                                 cs->bid = temp_create(n);
    1171           0 :                                 bat_destroy(n);
    1172           0 :                                 if (cs->ebid && !new)
    1173           0 :                                         temp_destroy(cs->ebid);
    1174           0 :                                 cs->ebid = 0;
    1175           0 :                                 cs->ucnt = 0;
    1176           0 :                                 if (cs->uibid && !new)
    1177           0 :                                         temp_destroy(cs->uibid);
    1178           0 :                                 if (cs->uvbid && !new)
    1179           0 :                                         temp_destroy(cs->uvbid);
    1180           0 :                                 cs->uibid = cs->uvbid = 0;
    1181           0 :                                 cs->st = ST_DEFAULT;
    1182           0 :                                 cs->cleared = true;
    1183             :                         } else {
    1184           1 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1185           0 :                                         bat_destroy(newoffsets);
    1186           0 :                                         bat_destroy(u);
    1187           0 :                                         return NULL;
    1188             :                                 }
    1189           2 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
    1190           1 :                                 bat_destroy(b);
    1191           1 :                                 if (!n) {
    1192           0 :                                         bat_destroy(newoffsets);
    1193           0 :                                         bat_destroy(u);
    1194           0 :                                         return NULL;
    1195             :                                 }
    1196           1 :                                 if (cs->ts != tr->tid) {
    1197           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1198           0 :                                                 bat_destroy(n);
    1199           0 :                                                 return NULL;
    1200             :                                         }
    1201           0 :                                         cs = &(*batp)->cs;
    1202           0 :                                         new = 1;
    1203           0 :                                         temp_dup(cs->ebid);
    1204           0 :                                         if (cs->uibid) {
    1205           0 :                                                 temp_dup(cs->uibid);
    1206           0 :                                                 temp_dup(cs->uvbid);
    1207             :                                         }
    1208             :                                 }
    1209           1 :                                 if (cs->bid && !new)
    1210           1 :                                         temp_destroy(cs->bid);
    1211           1 :                                 n = transfer_to_systrans(n);
    1212           1 :                                 if (n == NULL)
    1213             :                                         return NULL;
    1214           1 :                                 bat_set_access(n, BAT_READ);
    1215           1 :                                 cs->bid = temp_create(n);
    1216           1 :                                 bat_destroy(n);
    1217           1 :                                 cs->cleared = true;
    1218           1 :                                 i = newoffsets;
    1219             :                         }
    1220             :                 } else { /* append */
    1221          16 :                         i = newoffsets;
    1222             :                 }
    1223             :         }
    1224          17 :         bat_destroy(u);
    1225          17 :         return i;
    1226             : }
    1227             : 
    1228             : static BAT *
    1229           0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
    1230             : {
    1231           0 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1232           0 :         BAT *newoffsets = NULL;
    1233           0 :         BAT *b = NULL, *n = NULL;
    1234             : 
    1235           0 :         if (!(b = temp_descriptor(cs->bid)))
    1236             :                 return NULL;
    1237             : 
    1238           0 :         if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
    1239           0 :                 bat_destroy(b);
    1240           0 :                 return NULL;
    1241             :         } else {
    1242             :                 /* returns new offset bat if values within min/max, else decompress */
    1243           0 :                 if (!newoffsets) { /* decompress */
    1244           0 :                         if (cs->ucnt) {
    1245           0 :                                 BAT *ui = NULL, *uv = NULL;
    1246           0 :                                 BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1247           0 :                                 bat_destroy(b);
    1248           0 :                                 if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1249           0 :                                         bat_destroy(nb);
    1250           0 :                                         return NULL;
    1251             :                                 }
    1252           0 :                                 b = nb;
    1253           0 :                                 if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1254           0 :                                         bat_destroy(ui);
    1255           0 :                                         bat_destroy(uv);
    1256           0 :                                         bat_destroy(b);
    1257             :                                 }
    1258           0 :                                 bat_destroy(ui);
    1259           0 :                                 bat_destroy(uv);
    1260             :                         }
    1261           0 :                         n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
    1262           0 :                         bat_destroy(b);
    1263           0 :                         if (!n)
    1264             :                                 return NULL;
    1265           0 :                         if (cs->bid)
    1266           0 :                                 temp_destroy(cs->bid);
    1267           0 :                         n = transfer_to_systrans(n);
    1268           0 :                         if (n == NULL)
    1269             :                                 return NULL;
    1270           0 :                         bat_set_access(n, BAT_READ);
    1271           0 :                         cs->bid = temp_create(n);
    1272           0 :                         cs->ucnt = 0;
    1273           0 :                         if (cs->uibid)
    1274           0 :                                 temp_destroy(cs->uibid);
    1275           0 :                         if (cs->uvbid)
    1276           0 :                                 temp_destroy(cs->uvbid);
    1277           0 :                         cs->uibid = cs->uvbid = 0;
    1278           0 :                         cs->st = ST_DEFAULT;
    1279           0 :                         cs->cleared = true;
    1280           0 :                         b = n;
    1281             :                 } else { /* append */
    1282             :                         i = newoffsets;
    1283             :                 }
    1284             :         }
    1285           0 :         bat_destroy(b);
    1286           0 :         return i;
    1287             : }
    1288             : 
    1289             : /*
    1290             :  * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
    1291             :  */
    1292             : static int
    1293        3158 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1294             : {
    1295        3158 :         int res = LOG_OK;
    1296        3158 :         sql_delta *bat = *batp;
    1297        3158 :         column_storage *cs = &bat->cs;
    1298        3158 :         BAT *otids = tids, *oupdates = updates;
    1299             : 
    1300        3158 :         if (!BATcount(tids))
    1301             :                 return LOG_OK;
    1302             : 
    1303        3158 :         if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
    1304           6 :                 tids = BATunmask(tids);
    1305           6 :                 if (!tids)
    1306             :                         return LOG_ERR;
    1307             :         }
    1308        3158 :         if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
    1309           0 :                 updates = BATunmask(updates);
    1310           0 :                 if (!updates) {
    1311           0 :                         if (otids != tids)
    1312           0 :                                 bat_destroy(tids);
    1313           0 :                         return LOG_ERR;
    1314             :                 }
    1315        3158 :         } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
    1316          42 :                 updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
    1317          42 :                 if (!updates) {
    1318           0 :                         if (otids != tids)
    1319           0 :                                 bat_destroy(tids);
    1320           0 :                         return LOG_ERR;
    1321             :                 }
    1322             :         }
    1323             : 
    1324        3158 :         if (cs->st == ST_DICT) {
    1325             :                 /* possibly a new array is returned */
    1326           4 :                 BAT *nupdates = dict_append_bat(tr, batp, updates);
    1327           4 :                 bat = *batp;
    1328           4 :                 cs = &bat->cs;
    1329           4 :                 if (oupdates != updates)
    1330           0 :                         bat_destroy(updates);
    1331           4 :                 updates = nupdates;
    1332           4 :                 if (!updates) {
    1333           0 :                         if (otids != tids)
    1334           0 :                                 bat_destroy(tids);
    1335           0 :                         return LOG_ERR;
    1336             :                 }
    1337             :         }
    1338             : 
    1339             :         /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1340             :         /* currently only one update delta is possible */
    1341        3158 :         lock_table(tr->store, t->base.id);
    1342        3158 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1343        3158 :         if (!is_new && !cs->cleared) {
    1344        2835 :                 if (!tids->tsorted /* make sure we have simple dense or oids */) {
    1345           6 :                         BAT *sorted, *order;
    1346           6 :                         if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
    1347           0 :                                 if (otids != tids)
    1348           0 :                                         bat_destroy(tids);
    1349           0 :                                 if (oupdates != updates)
    1350           0 :                                         bat_destroy(updates);
    1351           0 :                                 unlock_table(tr->store, t->base.id);
    1352           0 :                                 return LOG_ERR;
    1353             :                         }
    1354           6 :                         if (otids != tids)
    1355           0 :                                 bat_destroy(tids);
    1356           6 :                         tids = sorted;
    1357           6 :                         BAT *nupdates = BATproject(order, updates);
    1358           6 :                         bat_destroy(order);
    1359           6 :                         if (oupdates != updates)
    1360           0 :                                 bat_destroy(updates);
    1361           6 :                         updates = nupdates;
    1362           6 :                         if (!updates) {
    1363           0 :                                 bat_destroy(tids);
    1364           0 :                                 unlock_table(tr->store, t->base.id);
    1365           0 :                                 return LOG_ERR;
    1366             :                         }
    1367             :                 }
    1368        2835 :                 assert(tids->tsorted);
    1369        2835 :                 BAT *ui = NULL, *uv = NULL;
    1370             : 
    1371             :                 /* handle updates on just inserted bits */
    1372             :                 /* handle updates on updates (within one transaction) */
    1373        2835 :                 BATiter upi = bat_iterator(updates);
    1374        2835 :                 BUN cnt = 0, ucnt = BATcount(tids);
    1375        2835 :                 BAT *b, *ins = NULL;
    1376        2835 :                 int *msk = NULL;
    1377             : 
    1378        2835 :                 if((b = temp_descriptor(cs->bid)) == NULL)
    1379             :                         res = LOG_ERR;
    1380             : 
    1381        2835 :                 if (res == LOG_OK && BATtdense(tids)) {
    1382        2632 :                         oid start = tids->tseqbase, offset = start;
    1383        2632 :                         oid end = start + ucnt;
    1384             : 
    1385       13379 :                         for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
    1386       11429 :                                 if (seg->start <= start && seg->end > start) {
    1387             :                                         /* check for delete conflicts */
    1388        2632 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1389           0 :                                                 res = LOG_CONFLICT;
    1390           0 :                                                 continue;
    1391             :                                         }
    1392             : 
    1393             :                                         /* check for inplace updates */
    1394        2632 :                                         BUN lend = end < seg->end?end:seg->end;
    1395        2632 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1396         199 :                                                 if (!ins) {
    1397         199 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1398         199 :                                                         if (!ins)
    1399             :                                                                 res = LOG_ERR;
    1400             :                                                         else {
    1401         199 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1402         199 :                                                                 msk = (int*)Tloc(ins, 0);
    1403         199 :                                                                 BUN end = (ucnt+31)/32;
    1404         199 :                                                                 memset(msk, 0, end * sizeof(int));
    1405             :                                                         }
    1406             :                                                 }
    1407         684 :                                                 for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
    1408         485 :                                                         const void *upd = BUNtail(upi, rid-offset);
    1409         485 :                                                         if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1410           0 :                                                                 res = LOG_ERR;
    1411             : 
    1412         485 :                                                         oid word = i/32;
    1413         485 :                                                         int pos = i%32;
    1414         485 :                                                         msk[word] |= 1U<<pos;
    1415         485 :                                                         cnt++;
    1416             :                                                 }
    1417             :                                         }
    1418             :                                 }
    1419       11429 :                                 if (end < seg->end)
    1420             :                                         break;
    1421             :                         }
    1422         206 :                 } else if (res == LOG_OK && complex_cand(tids)) {
    1423           3 :                         struct canditer ci;
    1424           3 :                         segment *seg = s->segs->h;
    1425           3 :                         canditer_init(&ci, NULL, tids);
    1426           3 :                         BUN i = 0;
    1427        1036 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1428        1033 :                                 oid rid = canditer_next(&ci);
    1429        1033 :                                 if (seg->end <= rid)
    1430          13 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1431        1020 :                                 else if (seg->start <= rid && seg->end > rid) {
    1432             :                                         /* check for delete conflicts */
    1433        1020 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1434           0 :                                                 res = LOG_CONFLICT;
    1435           0 :                                                 continue;
    1436             :                                         }
    1437             : 
    1438             :                                         /* check for inplace updates */
    1439        1020 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1440           0 :                                                 if (!ins) {
    1441           0 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1442           0 :                                                         if (!ins) {
    1443             :                                                                 res = LOG_ERR;
    1444             :                                                                 break;
    1445             :                                                         } else {
    1446           0 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1447           0 :                                                                 msk = (int*)Tloc(ins, 0);
    1448           0 :                                                                 BUN end = (ucnt+31)/32;
    1449           0 :                                                                 memset(msk, 0, end * sizeof(int));
    1450             :                                                         }
    1451             :                                                 }
    1452           0 :                                                 ptr upd = BUNtail(upi, i);
    1453           0 :                                                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1454           0 :                                                         res = LOG_ERR;
    1455             : 
    1456           0 :                                                 oid word = i/32;
    1457           0 :                                                 int pos = i%32;
    1458           0 :                                                 msk[word] |= 1U<<pos;
    1459           0 :                                                 cnt++;
    1460             :                                         }
    1461        1020 :                                         i++;
    1462             :                                 }
    1463             :                         }
    1464         200 :                 } else if (res == LOG_OK) {
    1465         200 :                         BUN i = 0;
    1466         200 :                         oid *rid = Tloc(tids,0);
    1467         200 :                         segment *seg = s->segs->h;
    1468       21863 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1469       21663 :                                 if (seg->end <= rid[i])
    1470        8233 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1471       13430 :                                 else if (seg->start <= rid[i] && seg->end > rid[i]) {
    1472             :                                         /* check for delete conflicts */
    1473       13430 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1474           0 :                                                 res = LOG_CONFLICT;
    1475           0 :                                                 continue;
    1476             :                                         }
    1477             : 
    1478             :                                         /* check for inplace updates */
    1479       13430 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1480         993 :                                                 if (!ins) {
    1481          79 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1482          79 :                                                         if (!ins) {
    1483             :                                                                 res = LOG_ERR;
    1484             :                                                                 break;
    1485             :                                                         } else {
    1486          79 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1487          79 :                                                                 msk = (int*)Tloc(ins, 0);
    1488          79 :                                                                 BUN end = (ucnt+31)/32;
    1489          79 :                                                                 memset(msk, 0, end * sizeof(int));
    1490             :                                                         }
    1491             :                                                 }
    1492         993 :                                                 const void *upd = BUNtail(upi, i);
    1493         993 :                                                 if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
    1494           0 :                                                         res = LOG_ERR;
    1495             : 
    1496         993 :                                                 oid word = i/32;
    1497         993 :                                                 int pos = i%32;
    1498         993 :                                                 msk[word] |= 1U<<pos;
    1499         993 :                                                 cnt++;
    1500             :                                         }
    1501       13430 :                                         i++;
    1502             :                                 }
    1503             :                         }
    1504             :                 }
    1505             : 
    1506        2835 :                 if (res == LOG_OK && cnt < ucnt) {   /* now handle real updates */
    1507        2593 :                         if (cs->ucnt == 0) {
    1508        2491 :                                 if (cnt) {
    1509          12 :                                         BAT *nins = BATmaskedcands(0, ucnt, ins, false);
    1510          12 :                                         if (nins) {
    1511          12 :                                                 ui = BATproject(nins, tids);
    1512          12 :                                                 uv = BATproject(nins, updates);
    1513          12 :                                                 bat_destroy(nins);
    1514             :                                         }
    1515             :                                 } else {
    1516        2479 :                                         ui = temp_descriptor(tids->batCacheid);
    1517        2479 :                                         uv = temp_descriptor(updates->batCacheid);
    1518             :                                 }
    1519        2491 :                                 if (!ui || !uv) {
    1520             :                                         res = LOG_ERR;
    1521             :                                 } else {
    1522        2491 :                                         temp_destroy(cs->uibid);
    1523        2491 :                                         temp_destroy(cs->uvbid);
    1524        2491 :                                         ui = transfer_to_systrans(ui);
    1525        2491 :                                         uv = transfer_to_systrans(uv);
    1526        2491 :                                         if (ui == NULL || uv == NULL) {
    1527           0 :                                                 BBPreclaim(ui);
    1528           0 :                                                 BBPreclaim(uv);
    1529             :                                                 res = LOG_ERR;
    1530             :                                         } else {
    1531        2491 :                                                 cs->uibid = temp_create(ui);
    1532        2491 :                                                 cs->uvbid = temp_create(uv);
    1533        2491 :                                                 cs->ucnt = BATcount(ui);
    1534             :                                         }
    1535             :                                 }
    1536             :                         } else {
    1537         102 :                                 BAT *nui = NULL, *nuv = NULL;
    1538             : 
    1539             :                                 /* merge taking msk of inserted into account */
    1540         102 :                                 if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
    1541             :                                         res = LOG_ERR;
    1542             : 
    1543         102 :                                 if (res == LOG_OK) {
    1544         102 :                                         const void *upd = NULL;
    1545         102 :                                         nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
    1546         102 :                                         nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
    1547             : 
    1548         102 :                                         if (!nui || !nuv) {
    1549             :                                                 res = LOG_ERR;
    1550             :                                         } else {
    1551         102 :                                                 BATiter ovi = bat_iterator(uv);
    1552             : 
    1553             :                                                 /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
    1554         102 :                                                 BUN uip = 0, uie = BATcount(ui);
    1555         102 :                                                 BUN nip = 0, nie = BATcount(tids);
    1556         102 :                                                 oid uiseqb = ui->tseqbase;
    1557         102 :                                                 oid niseqb = tids->tseqbase;
    1558         102 :                                                 oid *uipt = NULL, *nipt = NULL;
    1559         102 :                                                 BATiter uii = bat_iterator(ui);
    1560         102 :                                                 BATiter tidsi = bat_iterator(tids);
    1561         102 :                                                 if (!BATtdensebi(&uii))
    1562          97 :                                                         uipt = uii.base;
    1563         102 :                                                 if (!BATtdensebi(&tidsi))
    1564          93 :                                                         nipt = tidsi.base;
    1565       16091 :                                                 while (uip < uie && nip < nie && res == LOG_OK) {
    1566       15989 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1567       15989 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1568             : 
    1569       15989 :                                                         if (uiv < niv) {
    1570        8068 :                                                                 upd = BUNtail(ovi, uip);
    1571       16136 :                                                                 if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1572        8068 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1573             :                                                                         res = LOG_ERR;
    1574        8068 :                                                                 uip++;
    1575        7921 :                                                         } else if (uiv == niv) {
    1576             :                                                                 /* handle == */
    1577        1522 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1578        1522 :                                                                         upd = BUNtail(upi, nip);
    1579        3044 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1580        1522 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1581             :                                                                                 res = LOG_ERR;
    1582             :                                                                 } else {
    1583           0 :                                                                         upd = BUNtail(ovi, uip);
    1584           0 :                                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1585           0 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1586             :                                                                                 res = LOG_ERR;
    1587             :                                                                 }
    1588        1522 :                                                                 uip++;
    1589        1522 :                                                                 nip++;
    1590             :                                                         } else { /* uiv > niv */
    1591        6399 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1592        6255 :                                                                         upd = BUNtail(upi, nip);
    1593       12510 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1594        6255 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1595             :                                                                                 res = LOG_ERR;
    1596             :                                                                 }
    1597        6399 :                                                                 nip++;
    1598             :                                                         }
    1599             :                                                 }
    1600         923 :                                                 while (uip < uie && res == LOG_OK) {
    1601         821 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1602         821 :                                                         upd = BUNtail(ovi, uip);
    1603        1642 :                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1604         821 :                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1605             :                                                                 res = LOG_ERR;
    1606         821 :                                                         uip++;
    1607             :                                                 }
    1608         703 :                                                 while (nip < nie && res == LOG_OK) {
    1609         601 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1610         601 :                                                         if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1611         329 :                                                                 upd = BUNtail(upi, nip);
    1612         658 :                                                                 if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1613         329 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1614             :                                                                         res = LOG_ERR;
    1615             :                                                         }
    1616         601 :                                                         nip++;
    1617             :                                                 }
    1618         102 :                                                 bat_iterator_end(&uii);
    1619         102 :                                                 bat_iterator_end(&tidsi);
    1620         102 :                                                 bat_iterator_end(&ovi);
    1621         102 :                                                 if (res == LOG_OK) {
    1622         102 :                                                         temp_destroy(cs->uibid);
    1623         102 :                                                         temp_destroy(cs->uvbid);
    1624         102 :                                                         nui = transfer_to_systrans(nui);
    1625         102 :                                                         nuv = transfer_to_systrans(nuv);
    1626         102 :                                                         if (nui == NULL || nuv == NULL) {
    1627             :                                                                 res = LOG_ERR;
    1628             :                                                         } else {
    1629         102 :                                                                 cs->uibid = temp_create(nui);
    1630         102 :                                                                 cs->uvbid = temp_create(nuv);
    1631         102 :                                                                 cs->ucnt = BATcount(nui);
    1632             :                                                         }
    1633             :                                                 }
    1634             :                                         }
    1635         102 :                                         bat_destroy(nui);
    1636         102 :                                         bat_destroy(nuv);
    1637             :                                 }
    1638             :                         }
    1639             :                 }
    1640        2835 :                 bat_iterator_end(&upi);
    1641        2835 :                 bat_destroy(b);
    1642        2835 :                 unlock_table(tr->store, t->base.id);
    1643        2835 :                 bat_destroy(ins);
    1644        2835 :                 bat_destroy(ui);
    1645        2835 :                 bat_destroy(uv);
    1646        2835 :                 if (otids != tids)
    1647          10 :                         bat_destroy(tids);
    1648        2835 :                 if (oupdates != updates)
    1649          11 :                         bat_destroy(updates);
    1650        2835 :                 return res;
    1651             :         } else if (is_new || cs->cleared) {
    1652         323 :                 BAT *b = temp_descriptor(cs->bid);
    1653             : 
    1654         323 :                 if (b == NULL) {
    1655             :                         res = LOG_ERR;
    1656             :                 } else {
    1657         323 :                         if (BATcount(b)==0) {
    1658           1 :                                 if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
    1659           0 :                                         res = LOG_ERR;
    1660         322 :                         } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
    1661           0 :                                 res = LOG_ERR;
    1662         323 :                         BBPcold(b->batCacheid);
    1663         323 :                         bat_destroy(b);
    1664             :                 }
    1665             :         }
    1666         323 :         unlock_table(tr->store, t->base.id);
    1667         323 :         if (otids != tids)
    1668           2 :                 bat_destroy(tids);
    1669         323 :         if (oupdates != updates)
    1670          41 :                 bat_destroy(updates);
    1671             :         return res;
    1672             : }
    1673             : 
    1674             : static int
    1675        3158 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1676             : {
    1677        3158 :         return cs_update_bat(tr, bat, t, tids, updates, is_new);
    1678             : }
    1679             : 
    1680             : static void *
    1681           4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
    1682             : {
    1683           4 :         void *newoffsets = NULL;
    1684           4 :         sql_delta *bat = *batp;
    1685           4 :         column_storage *cs = &bat->cs;
    1686           4 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1687             : 
    1688           4 :         if (!u)
    1689             :                 return NULL;
    1690           4 :         BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
    1691           4 :         if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
    1692           0 :                 bat_destroy(u);
    1693           0 :                 return NULL;
    1694             :         } else {
    1695           4 :                 int new = 0;
    1696             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1697           4 :                 if (BATcount(u) >= max_cnt) {
    1698           0 :                         if (max_cnt == INT_MAX) { /* decompress */
    1699             :                                 if (!(b = temp_descriptor(cs->bid))) {
    1700             :                                         bat_destroy(u);
    1701             :                                         return NULL;
    1702             :                                 }
    1703             :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1704             :                                 /* TODO decompress updates if any */
    1705             :                                 bat_destroy(b);
    1706             :                                 assert(newoffsets == NULL);
    1707             :                                 if (!n) {
    1708             :                                         bat_destroy(u);
    1709             :                                         return NULL;
    1710             :                                 }
    1711             :                                 if (cs->ts != tr->tid) {
    1712             :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1713             :                                                 bat_destroy(n);
    1714             :                                                 bat_destroy(u);
    1715             :                                                 return NULL;
    1716             :                                         }
    1717             :                                         cs = &(*batp)->cs;
    1718             :                                         new = 1;
    1719             :                                         cs->uibid = cs->uvbid = 0;
    1720             :                                 }
    1721             :                                 if (cs->bid && !new)
    1722             :                                         temp_destroy(cs->bid);
    1723             :                                 n = transfer_to_systrans(n);
    1724             :                                 if (n == NULL) {
    1725             :                                         bat_destroy(u);
    1726             :                                         return NULL;
    1727             :                                 }
    1728             :                                 bat_set_access(n, BAT_READ);
    1729             :                                 cs->bid = temp_create(n);
    1730             :                                 bat_destroy(n);
    1731             :                                 if (cs->ebid && !new)
    1732             :                                         temp_destroy(cs->ebid);
    1733             :                                 cs->ebid = 0;
    1734             :                                 cs->st = ST_DEFAULT;
    1735             :                                 /* at append_col the column's storage type is cleared */
    1736             :                                 cs->cleared = true;
    1737             :                         } else {
    1738           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1739           0 :                                         GDKfree(newoffsets);
    1740           0 :                                         bat_destroy(u);
    1741           0 :                                         return NULL;
    1742             :                                 }
    1743           0 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, (BATcount(u)>65536)?TYPE_int:TYPE_sht, PERSISTENT);
    1744           0 :                                 bat_destroy(b);
    1745           0 :                                 if (!n) {
    1746           0 :                                         GDKfree(newoffsets);
    1747           0 :                                         bat_destroy(u);
    1748           0 :                                         return NULL;
    1749             :                                 }
    1750           0 :                                 if (cs->ts != tr->tid) {
    1751           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1752           0 :                                                 bat_destroy(u);
    1753           0 :                                                 bat_destroy(n);
    1754           0 :                                                 return NULL;
    1755             :                                         }
    1756           0 :                                         cs = &(*batp)->cs;
    1757           0 :                                         new = 1;
    1758           0 :                                         temp_dup(cs->ebid);
    1759           0 :                                         if (cs->uibid) {
    1760           0 :                                                 temp_dup(cs->uibid);
    1761           0 :                                                 temp_dup(cs->uvbid);
    1762             :                                         }
    1763             :                                 }
    1764           0 :                                 if (cs->bid)
    1765           0 :                                         temp_destroy(cs->bid);
    1766           0 :                                 n = transfer_to_systrans(n);
    1767           0 :                                 if (n == NULL) {
    1768           0 :                                         bat_destroy(u);
    1769           0 :                                         return NULL;
    1770             :                                 }
    1771           0 :                                 bat_set_access(n, BAT_READ);
    1772           0 :                                 cs->bid = temp_create(n);
    1773           0 :                                 bat_destroy(n);
    1774           0 :                                 cs->cleared = true;
    1775           0 :                                 i = newoffsets;
    1776             :                         }
    1777             :                 } else { /* append */
    1778           4 :                         i = newoffsets;
    1779             :                 }
    1780             :         }
    1781           4 :         bat_destroy(u);
    1782           4 :         return i;
    1783             : }
    1784             : 
    1785             : static void *
    1786           1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
    1787             : {
    1788           1 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1789           1 :         void *newoffsets = NULL;
    1790           1 :         BAT *b = NULL, *n = NULL;
    1791             : 
    1792           1 :         if (!(b = temp_descriptor(cs->bid)))
    1793             :                 return NULL;
    1794             : 
    1795           1 :         if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
    1796           0 :                 bat_destroy(b);
    1797           0 :                 return NULL;
    1798             :         } else {
    1799             :                 /* returns new offset bat if values within min/max, else decompress */
    1800           1 :                 if (!newoffsets) {
    1801           1 :                         n = FORdecompress_(b, offsetval, tt, PERSISTENT);
    1802           1 :                         bat_destroy(b);
    1803           1 :                         if (!n)
    1804             :                                 return NULL;
    1805             :                         /* TODO decompress updates if any */
    1806           1 :                         if (cs->bid)
    1807           1 :                                 temp_destroy(cs->bid);
    1808           1 :                         n = transfer_to_systrans(n);
    1809           1 :                         if (n == NULL)
    1810             :                                 return NULL;
    1811           1 :                         bat_set_access(n, BAT_READ);
    1812           1 :                         cs->bid = temp_create(n);
    1813           1 :                         cs->st = ST_DEFAULT;
    1814             :                         /* at append_col the column's storage type is cleared */
    1815           1 :                         cs->cleared = true;
    1816           1 :                         b = n;
    1817             :                 } else { /* append */
    1818             :                         i = newoffsets;
    1819             :                 }
    1820             :         }
    1821           1 :         bat_destroy(b);
    1822           1 :         return i;
    1823             : }
    1824             : 
    1825             : static int
    1826        6972 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
    1827             : {
    1828        6972 :         void *oupd = upd;
    1829        6972 :         sql_delta *bat = *batp;
    1830        6972 :         column_storage *cs = &bat->cs;
    1831        6972 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1832        6972 :         assert(!is_oid_nil(rid));
    1833        6972 :         int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
    1834             : 
    1835        6972 :         if (cs->st == ST_DICT) {
    1836             :                 /* possibly a new array is returned */
    1837           0 :                 upd = dict_append_val(tr, batp, upd, 1);
    1838           0 :                 bat = *batp;
    1839           0 :                 cs = &bat->cs;
    1840           0 :                 if (!upd)
    1841             :                         return LOG_ERR;
    1842             :         }
    1843             : 
    1844             :         /* check if rid is insert ? */
    1845        6972 :         if (!inplace) {
    1846             :                 /* check conflict */
    1847        4050 :                 if (segments_is_deleted(s->segs->h, tr, rid)) {
    1848           0 :                         if (oupd != upd)
    1849           0 :                                 GDKfree(upd);
    1850           0 :                         return LOG_CONFLICT;
    1851             :                 }
    1852        4050 :                 BAT *ui, *uv;
    1853             : 
    1854             :                 /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1855             :                 /* currently only one update delta is possible */
    1856        4050 :                 if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1857           0 :                         if (oupd != upd)
    1858           0 :                                 GDKfree(upd);
    1859           0 :                         return LOG_ERR;
    1860             :                 }
    1861             : 
    1862        4050 :                 assert(uv->ttype);
    1863        4050 :                 assert(BATcount(ui) == BATcount(uv));
    1864        8100 :                 if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
    1865        4050 :                     BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
    1866           0 :                         if (oupd != upd)
    1867           0 :                                 GDKfree(upd);
    1868           0 :                         bat_destroy(ui);
    1869           0 :                         bat_destroy(uv);
    1870           0 :                         return LOG_ERR;
    1871             :                 }
    1872        4050 :                 assert(BATcount(ui) == BATcount(uv));
    1873        4050 :                 bat_destroy(ui);
    1874        4050 :                 bat_destroy(uv);
    1875        4050 :                 cs->ucnt++;
    1876             :         } else {
    1877        2922 :                 BAT *b = NULL;
    1878             : 
    1879        2922 :                 if((b = temp_descriptor(cs->bid)) == NULL) {
    1880           0 :                         if (oupd != upd)
    1881           0 :                                 GDKfree(upd);
    1882           0 :                         return LOG_ERR;
    1883             :                 }
    1884        2922 :                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
    1885           0 :                         if (oupd != upd)
    1886           0 :                                 GDKfree(upd);
    1887           0 :                         bat_destroy(b);
    1888           0 :                         return LOG_ERR;
    1889             :                 }
    1890        2922 :                 bat_destroy(b);
    1891             :         }
    1892        6972 :         if (oupd != upd)
    1893           0 :                 GDKfree(upd);
    1894             :         return LOG_OK;
    1895             : }
    1896             : 
    1897             : static int
    1898        6972 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
    1899             : {
    1900        6972 :         int res = LOG_OK;
    1901        6972 :         lock_table(tr->store, t->base.id);
    1902        6972 :         res = cs_update_val(tr, bat, t, rid, upd, is_new);
    1903        6972 :         unlock_table(tr->store, t->base.id);
    1904        6972 :         return res;
    1905             : }
    1906             : 
    1907             : static int
    1908      158875 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
    1909             : {
    1910      158875 :         (void)tr;
    1911      158875 :         if (!ocs)
    1912             :                 return LOG_OK;
    1913      158875 :         cs->bid = ocs->bid;
    1914      158875 :         cs->ebid = ocs->ebid;
    1915      158875 :         cs->uibid = ocs->uibid;
    1916      158875 :         cs->uvbid = ocs->uvbid;
    1917      158875 :         cs->ucnt = ocs->ucnt;
    1918             : 
    1919      158875 :         if (temp) {
    1920       25960 :                 cs->bid = temp_copy(cs->bid, true, false);
    1921       25947 :                 if (cs->bid == BID_NIL)
    1922             :                         return LOG_ERR;
    1923             :         } else {
    1924      132915 :                 temp_dup(cs->bid);
    1925             :         }
    1926      158869 :         if (cs->ebid)
    1927           6 :                 temp_dup(cs->ebid);
    1928      158869 :         cs->ucnt = 0;
    1929      158869 :         cs->uibid = e_bat(TYPE_oid);
    1930      158906 :         cs->uvbid = e_bat(type);
    1931      158906 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1932             :                 return LOG_ERR;
    1933      158906 :         cs->st = ocs->st;
    1934      158906 :         return LOG_OK;
    1935             : }
    1936             : 
    1937             : static void
    1938      326170 : destroy_delta(sql_delta *b, bool recursive)
    1939             : {
    1940      326170 :         if (ATOMIC_DEC(&b->cs.refcnt) > 0)
    1941             :                 return;
    1942      304764 :         if (recursive && b->next)
    1943      127526 :                 destroy_delta(b->next, true);
    1944      304764 :         if (b->cs.uibid)
    1945      101978 :                 temp_destroy(b->cs.uibid);
    1946      304764 :         if (b->cs.uvbid)
    1947      101978 :                 temp_destroy(b->cs.uvbid);
    1948      304764 :         if (b->cs.bid)
    1949      304764 :                 temp_destroy(b->cs.bid);
    1950      304764 :         if (b->cs.ebid)
    1951          61 :                 temp_destroy(b->cs.ebid);
    1952      304764 :         b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
    1953      304764 :         _DELETE(b);
    1954             : }
    1955             : 
    1956             : static sql_delta *
    1957    17773898 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
    1958             : {
    1959    17773898 :         sql_delta *obat = ATOMIC_PTR_GET(&c->data);
    1960             : 
    1961    17773898 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    1962    17641044 :                 return obat;
    1963      132854 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    1964             :                 /* abort */
    1965          12 :                 if (update_conflict)
    1966           4 :                         *update_conflict = true;
    1967           8 :                 else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
    1968           8 :                         return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
    1969           4 :                 return NULL;
    1970             :         }
    1971      132842 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
    1972             :                 return NULL;
    1973      132844 :         sql_delta* bat = ZNEW(sql_delta);
    1974      132900 :         if (!bat)
    1975             :                 return NULL;
    1976      132900 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    1977      132900 :         int type = c->type.multiset?TYPE_int:c->type.type->localtype;
    1978      132900 :         if (dup_cs(tr, &obat->cs, &bat->cs, type, 0) != LOG_OK) {
    1979           0 :                 destroy_delta(bat, false);
    1980           0 :                 return NULL;
    1981             :         }
    1982      132909 :         bat->cs.ts = tr->tid;
    1983             :         /* only one writer else abort */
    1984      132909 :         bat->next = obat;
    1985      132909 :         if (obat)
    1986      132909 :                 bat->nr_updates = obat->nr_updates;
    1987      132909 :         if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
    1988           0 :                 bat->next = NULL;
    1989           0 :                 destroy_delta(bat, false);
    1990           0 :                 if (update_conflict)
    1991           0 :                         *update_conflict = true;
    1992           0 :                 return NULL;
    1993             :         }
    1994             :         return bat;
    1995             : }
    1996             : 
    1997             : static int
    1998       10130 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
    1999             : {
    2000       10130 :         int ok = LOG_OK;
    2001             : 
    2002       10130 :         if (is_bat) {
    2003        3158 :                 BAT *tids = incoming_tids;
    2004        3158 :                 BAT *values = incoming_values;
    2005        3158 :                 if (BATcount(tids) == 0)
    2006             :                         return LOG_OK;
    2007        3158 :                 ok = delta_update_bat(tr, delta, table, tids, values, is_new);
    2008             :         } else {
    2009        6972 :                 ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
    2010             :         }
    2011             :         return ok;
    2012             : }
    2013             : 
    2014             : static int
    2015       10169 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
    2016             : {
    2017       10169 :         int res = LOG_OK;
    2018       10169 :         bool update_conflict = false;
    2019       10169 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2020             : 
    2021       10169 :         if (isbat) {
    2022        3194 :                 BAT *t = tids;
    2023        3194 :                 if (!BATcount(t))
    2024             :                         return LOG_OK;
    2025             :         }
    2026             : 
    2027        9855 :         if (c == NULL)
    2028             :                 return LOG_ERR;
    2029             : 
    2030        9855 :         if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
    2031           4 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2032             : 
    2033        9851 :         assert(delta && delta->cs.ts == tr->tid);
    2034        9851 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    2035        9851 :         if (odelta != delta)
    2036        3482 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    2037             : 
    2038        9851 :         odelta = delta;
    2039        9851 :         if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
    2040             :                 return res;
    2041        9851 :         assert(delta == odelta);
    2042        9851 :         if (delta->cs.st == ST_DEFAULT && !c->type.multiset && c->storage_type)
    2043           0 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2044             :         return res;
    2045             : }
    2046             : 
    2047             : static sql_delta *
    2048        2462 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
    2049             : {
    2050        2462 :         sql_delta *obat = ATOMIC_PTR_GET(&i->data);
    2051             : 
    2052        2462 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    2053        2434 :                 return obat;
    2054          28 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    2055             :                 /* abort */
    2056           0 :                 if (update_conflict)
    2057           0 :                         *update_conflict = true;
    2058           0 :                 return NULL;
    2059             :         }
    2060          28 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
    2061             :                 return NULL;
    2062          28 :         sql_delta* bat = ZNEW(sql_delta);
    2063          28 :         if (!bat)
    2064             :                 return NULL;
    2065          28 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2066          33 :         if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
    2067           0 :                 destroy_delta(bat, false);
    2068           0 :                 return NULL;
    2069             :         }
    2070          28 :         bat->cs.ts = tr->tid;
    2071             :         /* only one writer else abort */
    2072          28 :         bat->next = obat;
    2073          28 :         if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
    2074           0 :                 bat->next = NULL;
    2075           0 :                 destroy_delta(bat, false);
    2076           0 :                 if (update_conflict)
    2077           0 :                         *update_conflict = true;
    2078           0 :                 return NULL;
    2079             :         }
    2080             :         return bat;
    2081             : }
    2082             : 
    2083             : static int
    2084         782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
    2085             : {
    2086         782 :         int res = LOG_OK;
    2087         782 :         bool update_conflict = false;
    2088         782 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    2089             : 
    2090         782 :         if (isbat) {
    2091         782 :                 BAT *t = tids;
    2092         782 :                 if (!BATcount(t))
    2093             :                         return LOG_OK;
    2094             :         }
    2095             : 
    2096         279 :         if (i == NULL)
    2097             :                 return LOG_ERR;
    2098             : 
    2099         279 :         if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
    2100           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2101             : 
    2102         279 :         assert(delta && delta->cs.ts == tr->tid);
    2103         279 :         if (odelta != delta)
    2104          22 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    2105             : 
    2106         279 :         odelta = delta;
    2107         279 :         res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
    2108         279 :         assert(delta == odelta);
    2109             :         return res;
    2110             : }
    2111             : 
    2112             : static int
    2113      149694 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type)
    2114             : {
    2115      149694 :         BAT *b, *oi = i;
    2116      149694 :         int err = 0;
    2117      149694 :         sql_delta *bat = *batp;
    2118             : 
    2119      149694 :         assert(!offsets || BATcount(offsets) == BATcount(i));
    2120      149694 :         if (!BATcount(i))
    2121             :                 return LOG_OK;
    2122      149694 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
    2123             :                 return LOG_ERR;
    2124             : 
    2125      149694 :         lock_column(tr->store, id);
    2126      150360 :         if (bat->cs.st == ST_DICT) {
    2127          13 :                 BAT *ni = dict_append_bat(tr, batp, oi);
    2128          13 :                 bat = *batp;
    2129          13 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2130           0 :                         bat_destroy(oi);
    2131          13 :                 oi = ni;
    2132          13 :                 if (!oi) {
    2133           0 :                         unlock_column(tr->store, id);
    2134           0 :                         return LOG_ERR;
    2135             :                 }
    2136             :         }
    2137      150360 :         if (bat->cs.st == ST_FOR) {
    2138           0 :                 BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
    2139           0 :                 bat = *batp;
    2140           0 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2141           0 :                         bat_destroy(oi);
    2142           0 :                 oi = ni;
    2143           0 :                 if (!oi) {
    2144           0 :                         unlock_column(tr->store, id);
    2145           0 :                         return LOG_ERR;
    2146             :                 }
    2147             :         }
    2148             : 
    2149      150360 :         b = temp_descriptor(bat->cs.bid);
    2150      150181 :         if (b == NULL) {
    2151           0 :                 unlock_column(tr->store, id);
    2152           0 :                 if (oi != i)
    2153           0 :                         bat_destroy(oi);
    2154           0 :                 return LOG_ERR;
    2155             :         }
    2156      150181 :         if (!offsets && offset == b->hseqbase+BATcount(b)) {
    2157      149993 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2158          23 :                         err = 1;
    2159         176 :         } else if (!offsets) {
    2160         176 :                 if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
    2161          23 :                         err = 1;
    2162          12 :         } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
    2163           0 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2164          23 :                         err = 1;
    2165          12 :         } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
    2166          23 :                         err = 1;
    2167             :         }
    2168      149771 :         bat_destroy(b);
    2169      150689 :         unlock_column(tr->store, id);
    2170             : 
    2171      150838 :         if (oi != i)
    2172          13 :                 bat_destroy(oi);
    2173      150838 :         return (err)?LOG_ERR:LOG_OK;
    2174             : }
    2175             : 
    2176             : // Look at the offsets and find where the replacements end and the appends begin.
    2177             : static BUN
    2178           0 : start_of_appends(BAT *offsets, BUN bcnt)
    2179             : {
    2180           0 :         BUN ocnt = BATcount(offsets);
    2181           0 :         if (ocnt == 0)
    2182             :                 return 0;
    2183             : 
    2184           0 :         BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
    2185           0 :         if (highest < bcnt)
    2186             :                 // all are replacements
    2187             :                 return ocnt;
    2188             : 
    2189             :         // reason backward to find the first append.
    2190             :         // Suppose offsets has 15 entries, bcnt == 100
    2191             :         // and the highest offset in offsets is 109.
    2192           0 :         BUN new_bcnt = highest + 1; // 110
    2193           0 :         BUN nappends = new_bcnt - bcnt; // 10
    2194           0 :         BUN nreplacements = ocnt - nappends; // 5
    2195             : 
    2196             :         // The first append should be to position bcnt
    2197           0 :         assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
    2198             : 
    2199             :         return nreplacements;
    2200             : }
    2201             : 
    2202             : 
    2203             : static int
    2204    17475320 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
    2205             : {
    2206    17475320 :         void *oi = i;
    2207    17475320 :         BAT *b;
    2208    17475320 :         lock_column(tr->store, id);
    2209    17495108 :         sql_delta *bat = *batp;
    2210             : 
    2211    17495108 :         if (bat->cs.st == ST_DICT) {
    2212             :                 /* possibly a new array is returned */
    2213           4 :                 i = dict_append_val(tr, batp, i, cnt);
    2214           4 :                 bat = *batp;
    2215           4 :                 if (!i) {
    2216           0 :                         unlock_column(tr->store, id);
    2217           0 :                         return LOG_ERR;
    2218             :                 }
    2219             :         }
    2220    17495108 :         if (bat->cs.st == ST_FOR) {
    2221             :                 /* possibly a new array is returned */
    2222           1 :                 i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
    2223           1 :                 bat = *batp;
    2224           1 :                 if (!i) {
    2225           0 :                         unlock_column(tr->store, id);
    2226           0 :                         return LOG_ERR;
    2227             :                 }
    2228             :         }
    2229             : 
    2230    17495108 :         b = temp_descriptor(bat->cs.bid);
    2231    17495646 :         if (b == NULL) {
    2232           0 :                 if (i != oi)
    2233           0 :                         GDKfree(i);
    2234           0 :                 unlock_column(tr->store, id);
    2235           0 :                 return LOG_ERR;
    2236             :         }
    2237    17495646 :         BUN bcnt = BATcount(b);
    2238             : 
    2239    17495646 :         if (offsets) {
    2240             :                 // The first few might be replacements while later items might be appends.
    2241             :                 // Handle the replacements here while leaving the appends to the code below.
    2242           0 :                 BUN nreplacements = start_of_appends(offsets, bcnt);
    2243             : 
    2244           0 :                 oid *start = Tloc(offsets, 0);
    2245           0 :                 if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
    2246           0 :                         bat_destroy(b);
    2247           0 :                         if (i != oi)
    2248           0 :                                 GDKfree(i);
    2249           0 :                         unlock_column(tr->store, id);
    2250           0 :                         return LOG_ERR;
    2251             :                 }
    2252             : 
    2253             :                 // Replacements have been handled. The rest are appends.
    2254           0 :                 assert(offset == oid_nil);
    2255           0 :                 offset = bcnt;
    2256           0 :                 cnt -= nreplacements;
    2257             :         }
    2258             : 
    2259    17495646 :         if (bcnt > offset){
    2260      612379 :                 size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
    2261      612379 :                 if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
    2262           0 :                         bat_destroy(b);
    2263           0 :                         if (i != oi)
    2264           0 :                                 GDKfree(i);
    2265           0 :                         unlock_column(tr->store, id);
    2266           0 :                         return LOG_ERR;
    2267             :                 }
    2268      613537 :                 cnt -= ccnt;
    2269      613537 :                 offset += ccnt;
    2270             :         }
    2271    17496804 :         if (cnt) {
    2272    16882995 :                 if (BATcount(b) < offset) { /* add space */
    2273        6558 :                         BUN d = offset - BATcount(b);
    2274        6558 :                         if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
    2275           0 :                                 bat_destroy(b);
    2276           0 :                                 if (i != oi)
    2277           0 :                                         GDKfree(i);
    2278           0 :                                 unlock_column(tr->store, id);
    2279           0 :                                 return LOG_ERR;
    2280             :                         }
    2281             :                 }
    2282    16882995 :                 if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
    2283           0 :                         bat_destroy(b);
    2284           0 :                         if (i != oi)
    2285           0 :                                 GDKfree(i);
    2286           0 :                         unlock_column(tr->store, id);
    2287           0 :                         return LOG_ERR;
    2288             :                 }
    2289             :         }
    2290    17497108 :         bat_destroy(b);
    2291    17491009 :         if (i != oi)
    2292           4 :                 GDKfree(i);
    2293    17491009 :         unlock_column(tr->store, id);
    2294    17491009 :         return LOG_OK;
    2295             : }
    2296             : 
    2297             : static int
    2298       25972 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
    2299             : {
    2300       25972 :         if (!(bat->segs = new_segments(tr, 0)))
    2301             :                 return LOG_ERR;
    2302       25961 :         return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
    2303             : }
    2304             : 
    2305             : static int
    2306    17624821 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type)
    2307             : {
    2308    17624821 :         int ok = LOG_OK;
    2309             : 
    2310    17624821 :         if ((*delta)->cs.merged)
    2311       37852 :                 (*delta)->cs.merged = false; /* TODO needs to move */
    2312    17624821 :         if (isbat) {
    2313      149593 :                 BAT *bat = incoming_data;
    2314             : 
    2315      149593 :                 if (BATcount(bat))
    2316      149771 :                         ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type);
    2317             :         } else {
    2318    17475228 :                 ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
    2319             :         }
    2320    17653099 :         return ok;
    2321             : }
    2322             : 
    2323             : static int
    2324    17639968 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2325             : {
    2326    17639968 :         int res = LOG_OK;
    2327    17639968 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2328             : 
    2329    17639968 :         if (isbat) {
    2330      150016 :                 BAT *t = data;
    2331      150016 :                 if (!BATcount(t))
    2332             :                         return LOG_OK;
    2333             :         }
    2334             : 
    2335    17639195 :         if ((delta = bind_col_data(tr, c, NULL)) == NULL)
    2336             :                 return LOG_ERR;
    2337             : 
    2338    17633456 :         assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
    2339             : 
    2340    17633456 :         odelta = delta;
    2341    17633456 :         if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type)) != LOG_OK)
    2342             :                 return res;
    2343    17649211 :         if (odelta != delta) {
    2344           0 :                 delta->next = odelta;
    2345           0 :                 if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
    2346           0 :                         delta->next = NULL;
    2347           0 :                         destroy_delta(delta, false);
    2348           0 :                         return LOG_CONFLICT;
    2349             :                 }
    2350             :         }
    2351    17649211 :         if (delta->cs.st == ST_DEFAULT && !c->type.multiset && c->storage_type)
    2352           1 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2353             :         return res;
    2354             : }
    2355             : 
    2356             : static int
    2357        2189 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2358             : {
    2359        2189 :         int res = LOG_OK;
    2360        2189 :         sql_delta *delta;
    2361             : 
    2362        2189 :         if (isbat) {
    2363        1015 :                 BAT *t = data;
    2364        1015 :                 if (!BATcount(t))
    2365             :                         return LOG_OK;
    2366             :         }
    2367             : 
    2368        2177 :         if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
    2369             :                 return LOG_ERR;
    2370             : 
    2371        2177 :         assert(delta->cs.st == ST_DEFAULT);
    2372             : 
    2373        2177 :         res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL);
    2374        2177 :         return res;
    2375             : }
    2376             : 
    2377             : static int
    2378       79947 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
    2379             : {
    2380       79947 :         int err = 0;
    2381             : 
    2382             :         /* TODO check for conflicting updates */
    2383       79947 :         (void)rid;
    2384       79947 :         (void)cnt;
    2385      652772 :         for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
    2386      572825 :                 sql_column *c = n->data;
    2387      572825 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    2388             : 
    2389             :                 /* check for active updates */
    2390      572825 :                 if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
    2391             :                         return 1;
    2392             :         }
    2393             :         return 0;
    2394             : }
    2395             : 
    2396             : static int
    2397       76038 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
    2398             : {
    2399       76038 :         lock_table(tr->store, t->base.id);
    2400             : 
    2401       76038 :         int in_transaction = segments_in_transaction(tr, t);
    2402             : 
    2403             :         /* find segment of rid, split, mark new segment deleted (for tr->tid) */
    2404       76038 :         segment *seg = s->segs->h, *p = NULL;
    2405    52573235 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2406    52573235 :                 if (seg->start <= rid && seg->end > rid) {
    2407       76038 :                         if (!SEG_VALID_4_DELETE(seg,tr)) {
    2408           4 :                                 unlock_table(tr->store, t->base.id);
    2409           4 :                                 return LOG_CONFLICT;
    2410             :                         }
    2411       76034 :                         if (deletes_conflict_updates( tr, t, rid, 1)) {
    2412           0 :                                 unlock_table(tr->store, t->base.id);
    2413           0 :                                 return LOG_CONFLICT;
    2414             :                         }
    2415       76034 :                         if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
    2416           0 :                                 unlock_table(tr->store, t->base.id);
    2417           0 :                                 return LOG_ERR;
    2418             :                         }
    2419             :                         break;
    2420             :                 }
    2421             :         }
    2422       76034 :         unlock_table(tr->store, t->base.id);
    2423       76034 :         if (!in_transaction)
    2424       13323 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2425             :         return LOG_OK;
    2426             : }
    2427             : 
    2428             : static int
    2429        3911 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
    2430             : {
    2431        3911 :         segment *seg = *Seg, *p = NULL;
    2432       13342 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2433       13340 :                 if (seg->start <= start && seg->end > start) {
    2434        3961 :                         size_t lcnt = cnt;
    2435        3961 :                         if (start+lcnt > seg->end)
    2436          54 :                                 lcnt = seg->end-start;
    2437        3961 :                         if (SEG_IS_DELETED(seg, tr)) {
    2438          47 :                                 start += lcnt;
    2439          47 :                                 cnt -= lcnt;
    2440          47 :                                 continue;
    2441        3914 :                         } else if (!SEG_VALID_4_DELETE(seg, tr))
    2442           1 :                                 return LOG_CONFLICT;
    2443        3913 :                         if (deletes_conflict_updates( tr, t, start, lcnt))
    2444             :                                 return LOG_CONFLICT;
    2445        3913 :                         *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
    2446        3913 :                         if (!seg)
    2447             :                                 return LOG_ERR;
    2448        3913 :                         start += lcnt;
    2449        3913 :                         cnt -= lcnt;
    2450             :                 }
    2451       13292 :                 if (start+cnt <= seg->end)
    2452             :                         break;
    2453             :         }
    2454             :         return LOG_OK;
    2455             : }
    2456             : 
    2457             : static int
    2458         736 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
    2459             : {
    2460         736 :         segment *seg = s->segs->h;
    2461         736 :         return seg_delete_range(tr, t, s, &seg, start, cnt);
    2462             : }
    2463             : 
    2464             : static int
    2465         319 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
    2466             : {
    2467         319 :         int in_transaction = segments_in_transaction(tr, t);
    2468         319 :         BAT *oi = i;    /* update ids */
    2469         319 :         int ok = LOG_OK;
    2470             : 
    2471         319 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
    2472             :                 return LOG_ERR;
    2473         319 :         if (BATcount(i)) {
    2474         568 :                 if (BATtdense(i)) {
    2475         249 :                         size_t start = i->tseqbase;
    2476         249 :                         size_t cnt = BATcount(i);
    2477             : 
    2478         249 :                         lock_table(tr->store, t->base.id);
    2479         249 :                         ok = delete_range(tr, t, s, start, cnt);
    2480         249 :                         unlock_table(tr->store, t->base.id);
    2481          70 :                 } else if (complex_cand(i)) {
    2482           0 :                         struct canditer ci;
    2483           0 :                         oid f = 0, l = 0, cur = 0;
    2484             : 
    2485           0 :                         canditer_init(&ci, NULL, i);
    2486           0 :                         cur = f = canditer_next(&ci);
    2487             : 
    2488           0 :                         lock_table(tr->store, t->base.id);
    2489           0 :                         if (!is_oid_nil(f)) {
    2490           0 :                                 segment *seg = s->segs->h;
    2491           0 :                                 for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
    2492           0 :                                         if (cur+1 == l) {
    2493           0 :                                                 cur++;
    2494           0 :                                                 continue;
    2495             :                                         }
    2496           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2497           0 :                                         f = cur = l;
    2498             :                                 }
    2499           0 :                                 if (ok == LOG_OK)
    2500           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2501             :                         }
    2502           0 :                         unlock_table(tr->store, t->base.id);
    2503             :                 } else {
    2504          70 :                         if (!i->tsorted) {
    2505           0 :                                 assert(oi == i);
    2506           0 :                                 BAT *ni = NULL;
    2507           0 :                                 if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
    2508           0 :                                         ok = LOG_ERR;
    2509           0 :                                 if (ni)
    2510           0 :                                         i = ni;
    2511             :                         }
    2512          70 :                         assert(i->tsorted);
    2513          70 :                         BUN icnt = BATcount(i);
    2514          70 :                         BATiter ii = bat_iterator(i);
    2515          70 :                         oid *o = ii.base, n = o[0]+1;
    2516          70 :                         size_t lcnt = 1;
    2517             : 
    2518          70 :                         lock_table(tr->store, t->base.id);
    2519          70 :                         segment *seg = s->segs->h;
    2520       23233 :                         for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
    2521       23163 :                                 if (o[i] == n) {
    2522       22803 :                                         lcnt++;
    2523       22803 :                                         n++;
    2524             :                                 } else {
    2525         360 :                                         ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2526         360 :                                         lcnt = 0;
    2527             :                                 }
    2528       23163 :                                 if (!lcnt) {
    2529         360 :                                         n = o[i]+1;
    2530         360 :                                         lcnt = 1;
    2531             :                                 }
    2532             :                         }
    2533          70 :                         bat_iterator_end(&ii);
    2534          70 :                         if (lcnt && ok == LOG_OK)
    2535          70 :                                 ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2536          70 :                         unlock_table(tr->store, t->base.id);
    2537             :                 }
    2538             :         }
    2539         319 :         if (i != oi)
    2540          25 :                 bat_destroy(i);
    2541             :         // assert
    2542         319 :         if (!in_transaction)
    2543         271 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2544             :         return ok;
    2545             : }
    2546             : 
    2547             : static void
    2548       52850 : destroy_segments(segments *s)
    2549             : {
    2550       52850 :         if (!s || sql_ref_dec(&s->r) > 0)
    2551           0 :                 return;
    2552       52850 :         segment *seg = s->h;
    2553      116896 :         while(seg) {
    2554       64046 :                 segment *n = ATOMIC_PTR_GET(&seg->next);
    2555       64046 :                 _DELETE(seg);
    2556       64046 :                 seg = n;
    2557             :         }
    2558       52850 :         _DELETE(s);
    2559             : }
    2560             : 
    2561             : static void
    2562       54310 : destroy_storage(storage *bat)
    2563             : {
    2564       54310 :         if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
    2565             :                 return;
    2566       52703 :         if (bat->next)
    2567        6239 :                 destroy_storage(bat->next);
    2568       52703 :         destroy_segments(bat->segs);
    2569       52703 :         if (bat->cs.uibid)
    2570       31273 :                 temp_destroy(bat->cs.uibid);
    2571       52703 :         if (bat->cs.uvbid)
    2572       31273 :                 temp_destroy(bat->cs.uvbid);
    2573       52703 :         if (bat->cs.bid)
    2574       52703 :                 temp_destroy(bat->cs.bid);
    2575       52703 :         bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
    2576       52703 :         _DELETE(bat);
    2577             : }
    2578             : 
    2579             : static int
    2580      174614 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
    2581             : {
    2582      174614 :         if (uncommitted) {
    2583      455153 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2584      301072 :                         if (!VALID_4_READ(s->ts,tr))
    2585             :                                 return 1;
    2586             :         } else {
    2587      300524 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2588      281056 :                         if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
    2589             :                                 return 1;
    2590             :         }
    2591             : 
    2592             :         return 0;
    2593             : }
    2594             : 
    2595             : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
    2596             : 
    2597             : storage *
    2598     2239212 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
    2599             : {
    2600     2239212 :         storage *obat;
    2601             : 
    2602     2239212 :         obat = ATOMIC_PTR_GET(&t->data);
    2603             : 
    2604     2239212 :         if (obat->cs.ts != tr->tid)
    2605     1548815 :                 if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
    2606     1548760 :                         if (obat->cs.ts >= TRANSACTION_ID_BASE) {
    2607             :                                 /* abort */
    2608       15423 :                                 if (clear)
    2609       15423 :                                         *clear = true;
    2610       15423 :                                 return NULL;
    2611             :                         }
    2612             : 
    2613     2223789 :         if (!clear)
    2614             :                 return obat;
    2615             : 
    2616             :         /* remainder is only to handle clear */
    2617       26347 :         if (segments_conflict(tr, obat->segs, 1)) {
    2618         385 :                 *clear = true;
    2619         385 :                 return NULL;
    2620             :         }
    2621       25965 :         if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
    2622             :                 return NULL;
    2623       25966 :         storage *bat = ZNEW(storage);
    2624       25973 :         if (!bat)
    2625             :                 return NULL;
    2626       25973 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2627       25973 :         if (dup_storage(tr, obat, bat) != LOG_OK) {
    2628           0 :                 destroy_storage(bat);
    2629           0 :                 return NULL;
    2630             :         }
    2631       25972 :         bat->cs.cleared = true;
    2632       25972 :         bat->cs.ts = tr->tid;
    2633             :         /* only one writer else abort */
    2634       25972 :         bat->next = obat;
    2635       25972 :         if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
    2636          12 :                 bat->next = NULL;
    2637          12 :                 destroy_storage(bat);
    2638          12 :                 if (clear)
    2639          12 :                         *clear = true;
    2640          12 :                 return NULL;
    2641             :         }
    2642             :         return bat;
    2643             : }
    2644             : 
    2645             : static int
    2646       76405 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
    2647             : {
    2648       76405 :         int ok = LOG_OK;
    2649       76405 :         BAT *b = ib;
    2650       76405 :         storage *bat;
    2651             : 
    2652       76405 :         if (isbat && !BATcount(b))
    2653             :                 return ok;
    2654             : 
    2655       76357 :         if (t == NULL)
    2656             :                 return LOG_ERR;
    2657             : 
    2658       76357 :         if ((bat = bind_del_data(tr, t, NULL)) == NULL)
    2659             :                 return LOG_ERR;
    2660             : 
    2661       76357 :         if (isbat)
    2662         319 :                 ok = storage_delete_bat(tr, t, bat, ib);
    2663             :         else
    2664       76038 :                 ok = storage_delete_val(tr, t, bat, *(oid*)ib);
    2665             :         return ok;
    2666             : }
    2667             : 
    2668             : static size_t
    2669           0 : dcount_col(sql_trans *tr, sql_column *c)
    2670             : {
    2671           0 :         sql_delta *b;
    2672             : 
    2673           0 :         if (!isTable(c->t))
    2674             :                 return 0;
    2675           0 :         b = col_timestamp_delta(tr, c);
    2676           0 :         if (!b)
    2677             :                 return 1;
    2678             : 
    2679           0 :         storage *s = ATOMIC_PTR_GET(&c->t->data);
    2680           0 :         if (!s || !s->segs->t)
    2681             :                 return 1;
    2682           0 :         size_t cnt = s->segs->t->end;
    2683           0 :         if (cnt) {
    2684           0 :                 BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
    2685           0 :                 size_t dcnt = 0;
    2686             : 
    2687           0 :                 if (v)
    2688           0 :                         dcnt = BATguess_uniques(v, NULL);
    2689           0 :                 return dcnt;
    2690             :         }
    2691             :         return cnt;
    2692             : }
    2693             : 
    2694             : static BAT *
    2695     4028016 : bind_no_view(BAT *b, bool quick)
    2696             : {
    2697     4028016 :         if (VIEWtparent(b)) { /* If it is a view get the parent BAT */
    2698     4023890 :                 BAT *nb = BBP_desc(VIEWtparent(b));
    2699     4023890 :                 bat_destroy(b);
    2700     4024005 :                 if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
    2701             :                         return NULL;
    2702             :         }
    2703             :         return b;
    2704             : }
    2705             : 
    2706             : static int
    2707           0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
    2708             : {
    2709           0 :         int ok = 0;
    2710           0 :         assert(tr->active);
    2711           0 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2712           0 :                 return 0;
    2713           0 :         lock_column(tr->store, c->base.id);
    2714           0 :         if (unique_est) {
    2715           0 :                 sql_delta *d;
    2716           0 :                 if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
    2717           0 :                         BAT *b;
    2718           0 :                         if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
    2719           0 :                                 MT_lock_set(&b->theaplock);
    2720           0 :                                 b->tunique_est = *unique_est;
    2721           0 :                                 MT_lock_unset(&b->theaplock);
    2722           0 :                                 bat_destroy(b);
    2723             :                         }
    2724             :                 }
    2725             :         }
    2726           0 :         int type = c->type.multiset?TYPE_int:c->type.type->localtype;
    2727           0 :         if (min) {
    2728           0 :                 _DELETE(c->min);
    2729           0 :                 size_t minlen = ATOMlen(type, min);
    2730           0 :                 if ((c->min = GDKmalloc(minlen)) != NULL) {
    2731           0 :                         memcpy(c->min, min, minlen);
    2732           0 :                         ok = 1;
    2733             :                 }
    2734             :         }
    2735           0 :         if (max) {
    2736           0 :                 _DELETE(c->max);
    2737           0 :                 size_t maxlen = ATOMlen(type, max);
    2738           0 :                 if ((c->max = GDKmalloc(maxlen)) != NULL) {
    2739           0 :                         memcpy(c->max, max, maxlen);
    2740           0 :                         ok = 1;
    2741             :                 }
    2742             :         }
    2743           0 :         unlock_column(tr->store, c->base.id);
    2744           0 :         return ok;
    2745             : }
    2746             : 
    2747             : static int
    2748          19 : min_max_col(sql_trans *tr, sql_column *c)
    2749             : {
    2750          19 :         int ok = 0;
    2751          19 :         BAT *b = NULL;
    2752          19 :         sql_delta *d = NULL;
    2753             : 
    2754          19 :         assert(tr->active);
    2755          19 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2756           0 :                 return 0;
    2757          19 :         if (c->min && c->max)
    2758             :                 return 1;
    2759          19 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2760          19 :                 if (d->cs.st == ST_FOR)
    2761             :                         return 0;
    2762          19 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2763          19 :                 lock_column(tr->store, c->base.id);
    2764          19 :                 if (c->min && c->max) {
    2765           0 :                         unlock_column(tr->store, c->base.id);
    2766           0 :                         return 1;
    2767             :                 }
    2768          19 :                 _DELETE(c->min);
    2769          19 :                 _DELETE(c->max);
    2770          19 :                 if ((b = bind_col(tr, c, access))) {
    2771          19 :                         if (!(b = bind_no_view(b, false))) {
    2772           0 :                                 unlock_column(tr->store, c->base.id);
    2773           0 :                                 return 0;
    2774             :                         }
    2775          19 :                         BATiter bi = bat_iterator(b);
    2776          19 :                         if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
    2777          16 :                                 const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
    2778          16 :                                 size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
    2779             : 
    2780          16 :                                 if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
    2781           0 :                                         _DELETE(c->min);
    2782           0 :                                         _DELETE(c->max);
    2783             :                                 } else {
    2784          16 :                                         memcpy(c->min, nmin, minlen);
    2785          16 :                                         memcpy(c->max, nmax, maxlen);
    2786          16 :                                         ok = 1;
    2787             :                                 }
    2788             :                         }
    2789          19 :                         bat_iterator_end(&bi);
    2790          19 :                         bat_destroy(b);
    2791             :                 }
    2792          19 :                 unlock_column(tr->store, c->base.id);
    2793             :         }
    2794             :         return ok;
    2795             : }
    2796             : 
    2797             : static size_t
    2798          17 : count_segs(segment *s)
    2799             : {
    2800          17 :         size_t nr = 0;
    2801             : 
    2802          72 :         for( ; s; s = ATOMIC_PTR_GET(&s->next))
    2803          55 :                 nr++;
    2804          17 :         return nr;
    2805             : }
    2806             : 
    2807             : static size_t
    2808     1475612 : count_del(sql_trans *tr, sql_table *t, int access)
    2809             : {
    2810     1475612 :         storage *d;
    2811             : 
    2812     1475612 :         if (!isTable(t))
    2813             :                 return 0;
    2814     1475612 :         d = tab_timestamp_storage(tr, t);
    2815     1476141 :         if (!d)
    2816             :                 return 0;
    2817     1476141 :         if (access == 2)
    2818           0 :                 return d->cs.ucnt;
    2819     1476141 :         if (access == 1)
    2820           0 :                 return count_inserts(d->segs->h, tr);
    2821     1476141 :         if (access == CNT_ACTIVE) {
    2822      838136 :                 size_t cnt = segs_end(d->segs, tr, t);
    2823      838524 :                 lock_table(tr->store, t->base.id);
    2824      838429 :                 cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
    2825      838356 :                 unlock_table(tr->store, t->base.id);
    2826      838356 :                 return cnt;
    2827             :         }
    2828      638005 :         if (access == CNT_SEGS) /* special case for counting the number of segments */
    2829          17 :                 return count_segs(d->segs->h);
    2830      637988 :         if (CNT_RDONLY)
    2831      637988 :                 return segs_end(d->segs, tr, t);
    2832             :         return count_deletes(d->segs->h, tr);
    2833             : }
    2834             : 
    2835             : static int
    2836       18797 : sorted_col(sql_trans *tr, sql_column *col)
    2837             : {
    2838       18797 :         int sorted = 0;
    2839             : 
    2840       18797 :         assert(tr->active);
    2841       18797 :         if (!isTable(col->t) || !col->t->s)
    2842             :                 return 0;
    2843             : 
    2844       18797 :         if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
    2845       18779 :                 BAT *b = bind_col(tr, col, QUICK);
    2846             : 
    2847       18779 :                 if (b)
    2848       18779 :                         sorted = b->tsorted || b->trevsorted;
    2849             :         }
    2850             :         return sorted;
    2851             : }
    2852             : 
    2853             : static int
    2854        6810 : unique_col(sql_trans *tr, sql_column *col)
    2855             : {
    2856        6810 :         int distinct = 0;
    2857             : 
    2858        6810 :         assert(tr->active);
    2859        6810 :         if (!isTable(col->t) || !col->t->s)
    2860             :                 return 0;
    2861             : 
    2862        6810 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2863        6810 :                 BAT *b = bind_col(tr, col, QUICK);
    2864             : 
    2865        6810 :                 if (b)
    2866        6810 :                         distinct = b->tkey;
    2867             :         }
    2868             :         return distinct;
    2869             : }
    2870             : 
    2871             : static int
    2872        1868 : double_elim_col(sql_trans *tr, sql_column *col)
    2873             : {
    2874        1868 :         int de = 0;
    2875        1868 :         sql_delta *d;
    2876             : 
    2877        1868 :         assert(tr->active);
    2878        1868 :         if (!isTable(col->t) || !col->t->s)
    2879             :                 return 0;
    2880             : 
    2881        1868 :         if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
    2882           6 :                 if (d->cs.st == ST_DICT) {
    2883           6 :                         BAT *b = bind_col(tr, col, QUICK);
    2884           6 :                         if (b && b->ttype == TYPE_bte)
    2885             :                                 de = 1;
    2886           0 :                         else if (b && b->ttype == TYPE_sht)
    2887        1868 :                                 de = 2;
    2888             :                 }
    2889        1862 :         } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
    2890        1862 :                 BAT *b = bind_col(tr, col, QUICK);
    2891             : 
    2892        1862 :                 if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
    2893        1862 :                         de = GDK_ELIMDOUBLES(b->tvheap);
    2894        1862 :                         if (de)
    2895        1750 :                                 de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
    2896             :                 }
    2897        1750 :                 assert(de >= 0 && de <= 16);
    2898             :         }
    2899             :         return de;
    2900             : }
    2901             : 
    2902             : static int
    2903     4063234 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
    2904             : {
    2905     4063234 :         int ok = 0;
    2906     4063234 :         BAT *b = NULL, *off = NULL, *upv = NULL;
    2907     4063234 :         sql_delta *d = NULL;
    2908             : 
    2909     4063234 :         (void) tr;
    2910     4063234 :         assert(tr->active);
    2911     4063234 :         *nonil = false;
    2912     4063234 :         *unique = false;
    2913     4063234 :         *unique_est = 0.0;
    2914     4063234 :         if (!c || !isTable(c->t) || !c->t->s || c->type.type->composite)
    2915             :                 return ok;
    2916             : 
    2917     4063056 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2918     4024273 :                 if (d->cs.st == ST_FOR) {
    2919          27 :                         *nonil = true; /* TODO for min/max. I will do it later */
    2920          27 :                         return ok;
    2921             :                 }
    2922     4024246 :                 int eclass = c->type.type->eclass;
    2923     4024246 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2924     4024246 :                 if ((b = bind_col(tr, c, access))) {
    2925     4024719 :                         if (!(b = bind_no_view(b, false)))
    2926           0 :                                 return ok;
    2927     4025043 :                         BATiter bi = bat_iterator(b);
    2928     4024666 :                         *nonil = bi.nonil && !bi.nil;
    2929             : 
    2930     4024666 :                         if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
    2931     3714642 :                                 d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
    2932     2371862 :                                 if (c->min && VALinit(min, bi.type, c->min))
    2933             :                                         ok |= 1;
    2934     2371808 :                                 else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
    2935     2361931 :                                         ok |= 1;
    2936     2371790 :                                 if (c->max && VALinit(max, bi.type, c->max))
    2937          54 :                                         ok |= 2;
    2938     2371736 :                                 else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
    2939     2355725 :                                         ok |= 2;
    2940             :                         }
    2941     4024575 :                         if (d->cs.ucnt == 0) {
    2942     4020223 :                                 if (d->cs.st == ST_DEFAULT) {
    2943     4019522 :                                         *unique = bi.key;
    2944     4019522 :                                         *unique_est = bi.unique_est;
    2945     4019522 :                                         if (*unique_est == 0)
    2946     1295422 :                                                 *unique_est = (double)BATguess_uniques(b,NULL);
    2947         701 :                                 } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
    2948             :                                         /* for dict, check the offsets bat for uniqueness */
    2949         701 :                                         MT_lock_set(&off->theaplock);
    2950         701 :                                         *unique = off->tkey;
    2951         701 :                                         *unique_est = off->tunique_est;
    2952         701 :                                         MT_lock_unset(&off->theaplock);
    2953             :                                 }
    2954             :                         }
    2955     4024913 :                         bat_iterator_end(&bi);
    2956     4024818 :                         bat_destroy(b);
    2957     4024955 :                         if (*nonil && d->cs.ucnt > 0) {
    2958             :                                 /* This could use a quick descriptor */
    2959        2842 :                                 if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
    2960           0 :                                         *nonil = false;
    2961             :                                 } else {
    2962        2842 :                                         MT_lock_set(&upv->theaplock);
    2963        2842 :                                         *nonil &= upv->tnonil && !upv->tnil;
    2964        2842 :                                         MT_lock_unset(&upv->theaplock);
    2965        2842 :                                         bat_destroy(upv);
    2966             :                                 }
    2967             :                         }
    2968             :                 }
    2969             :         }
    2970             :         return ok;
    2971             : }
    2972             : 
    2973             : static int
    2974         257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
    2975             : {
    2976         257 :         assert(tr->active);
    2977         257 :         if (!isTable(col->t) || !col->t->s)
    2978             :                 return LOG_OK;
    2979             : 
    2980         252 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2981         252 :                 BAT *b = bind_col(tr, col, QUICK);
    2982             : 
    2983         252 :                 if (b) { /* add props for ranges [min, max> */
    2984         252 :                         MT_lock_set(&b->theaplock);
    2985         252 :                         if (add_range) {
    2986         179 :                                 BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
    2987         179 :                                 if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
    2988         103 :                                         BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
    2989             :                                 else
    2990          76 :                                         BATrmprop_nolock(b, GDK_MAX_BOUND);
    2991         179 :                                 if (!pt->with_nills || !col->null)
    2992         117 :                                         BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    2993             :                         } else {
    2994          73 :                                 BATrmprop_nolock(b, GDK_MIN_BOUND);
    2995          73 :                                 BATrmprop_nolock(b, GDK_MAX_BOUND);
    2996          73 :                                 BATrmprop_nolock(b, GDK_NOT_NULL);
    2997             :                         }
    2998         252 :                         MT_lock_unset(&b->theaplock);
    2999             :                 }
    3000             :         }
    3001             :         return LOG_OK;
    3002             : }
    3003             : 
    3004             : static int
    3005        4842 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
    3006             : {
    3007        4842 :         assert(tr->active);
    3008        4842 :         if (!isTable(col->t) || !col->t->s)
    3009             :                 return LOG_OK;
    3010             : 
    3011        4812 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    3012        4812 :                 BAT *b = bind_col(tr, col, QUICK);
    3013             : 
    3014        4812 :                 if (b) { /* add props for ranges [min, max> */
    3015        4812 :                         if (not_null) {
    3016        4810 :                                 BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    3017             :                         } else {
    3018           2 :                                 BATrmprop(b, GDK_NOT_NULL);
    3019             :                         }
    3020             :                 }
    3021             :         }
    3022             :         return LOG_OK;
    3023             : }
    3024             : 
    3025             : static int
    3026       33697 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
    3027             : {
    3028       33697 :         sqlstore *store = tr->store;
    3029       33697 :         int bid = log_find_bat(store->logger, id);
    3030       33697 :         if (bid <= 0)
    3031             :                 return LOG_ERR;
    3032       33697 :         cs->bid = temp_dup(bid);
    3033       33697 :         cs->ucnt = 0;
    3034       33697 :         cs->uibid = e_bat(TYPE_oid);
    3035       33697 :         cs->uvbid = e_bat(type);
    3036       33697 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    3037             :                 return LOG_ERR;
    3038             :         return LOG_OK;
    3039             : }
    3040             : 
    3041             : static int
    3042       70237 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
    3043             : {
    3044       70237 :         int res = LOG_OK;
    3045       70237 :         gdk_return ok;
    3046       70237 :         BAT *b = temp_descriptor(bat->cs.bid);
    3047             : 
    3048       70237 :         if (b == NULL)
    3049             :                 return LOG_ERR;
    3050             : 
    3051       70237 :         if (!bat->cs.uibid)
    3052       70229 :                 bat->cs.uibid = e_bat(TYPE_oid);
    3053       70237 :         if (!bat->cs.uvbid)
    3054       70229 :                 bat->cs.uvbid = e_bat(b->ttype);
    3055       70237 :         if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
    3056           0 :                 res = LOG_ERR;
    3057       70237 :         if (GDKinmemory(0)) {
    3058         181 :                 bat_destroy(b);
    3059         181 :                 return res;
    3060             :         }
    3061             : 
    3062       70056 :         bat_set_access(b, BAT_READ);
    3063       70056 :         sqlstore *store = tr->store;
    3064       70056 :         ok = log_bat_persists(store->logger, b, id);
    3065       70056 :         bat_destroy(b);
    3066       70056 :         if(res != LOG_OK)
    3067             :                 return res;
    3068       70056 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3069             : }
    3070             : 
    3071             : static int
    3072           0 : new_persistent_delta( sql_delta *bat)
    3073             : {
    3074           0 :         bat->cs.ucnt = 0;
    3075           0 :         return LOG_OK;
    3076             : }
    3077             : 
    3078             : static void
    3079      136309 : create_delta( sql_delta *d, BAT *b)
    3080             : {
    3081      136309 :         bat_set_access(b, BAT_READ);
    3082      136309 :         d->cs.bid = temp_create(b);
    3083      136309 :         d->cs.uibid = d->cs.uvbid = 0;
    3084      136309 :         d->cs.ucnt = 0;
    3085      136309 : }
    3086             : 
    3087             : static bat
    3088        7381 : copyBat (bat i, int type, oid seq)
    3089             : {
    3090        7381 :         BAT *b, *tb;
    3091        7381 :         bat res;
    3092             : 
    3093        7381 :         if (!i)
    3094             :                 return i;
    3095        7381 :         tb = quick_descriptor(i);
    3096        7381 :         if (tb == NULL)
    3097             :                 return 0;
    3098        7381 :         b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
    3099        7381 :         if (b == NULL)
    3100             :                 return 0;
    3101             : 
    3102        7381 :         bat_set_access(b, BAT_READ);
    3103             : 
    3104        7381 :         res = temp_create(b);
    3105        7381 :         bat_destroy(b);
    3106        7381 :         return res;
    3107             : }
    3108             : 
    3109             : static int
    3110      162234 : create_col(sql_trans *tr, sql_column *c)
    3111             : {
    3112      162234 :         int ok = LOG_OK, new = 0;
    3113      162234 :         int type = c->type.multiset?TYPE_int:c->type.type->localtype;
    3114      162234 :         sql_delta *bat = ATOMIC_PTR_GET(&c->data);
    3115             : 
    3116      162234 :         if (!bat) {
    3117      162234 :                 new = 1;
    3118      162234 :                 bat = ZNEW(sql_delta);
    3119      162234 :                 if (!bat)
    3120             :                         return LOG_ERR;
    3121      162234 :                 ATOMIC_PTR_SET(&c->data, bat);
    3122      162234 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3123             :         }
    3124             : 
    3125      162234 :         if (new)
    3126      162234 :                 bat->cs.ts = tr->tid;
    3127             : 
    3128      162234 :         if (!isNew(c)&& !isTempTable(c->t)){
    3129       25903 :                 bat->cs.ts = tr->ts;
    3130       25903 :                 ok = load_cs(tr, &bat->cs, type, c->base.id);
    3131       25903 :                 if (ok == LOG_OK && c->storage_type) {
    3132           9 :                         if (strcmp(c->storage_type, "DICT") == 0) {
    3133           2 :                                 sqlstore *store = tr->store;
    3134           2 :                                 int bid = log_find_bat(store->logger, -c->base.id);
    3135           2 :                                 if (bid <= 0)
    3136             :                                         return LOG_ERR;
    3137           2 :                                 bat->cs.ebid = temp_dup(bid);
    3138           2 :                                 bat->cs.st = ST_DICT;
    3139           7 :                         } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
    3140           2 :                                 bat->cs.st = ST_FOR;
    3141             :                         }
    3142             :                 }
    3143       25903 :                 return ok;
    3144      136331 :         } else if (bat && bat->cs.bid) {
    3145           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
    3146             :         } else {
    3147      136331 :                 sql_column *fc = NULL;
    3148      136331 :                 size_t cnt = 0;
    3149             : 
    3150             :                 /* alter ? */
    3151      136331 :                 if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
    3152       81715 :                         storage *s = tab_timestamp_storage(tr, fc->t);
    3153       81715 :                         if (s == NULL)
    3154             :                                 return LOG_ERR;
    3155       81715 :                         cnt = segs_end(s->segs, tr, c->t);
    3156             :                 }
    3157      136331 :                 if (cnt && fc != c) {
    3158          22 :                         sql_delta *d = ATOMIC_PTR_GET(&fc->data);
    3159             : 
    3160          22 :                         if (d->cs.bid) {
    3161          22 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3162          22 :                                 if(bat->cs.bid == BID_NIL)
    3163          22 :                                         ok = LOG_ERR;
    3164             :                         }
    3165          22 :                         if (d->cs.uibid) {
    3166          10 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3167          10 :                                 if (bat->cs.uibid == BID_NIL)
    3168          22 :                                         ok = LOG_ERR;
    3169             :                         }
    3170          22 :                         if (d->cs.uvbid) {
    3171          10 :                                 bat->cs.uvbid = e_bat(type);
    3172          10 :                                 if(bat->cs.uvbid == BID_NIL)
    3173           0 :                                         ok = LOG_ERR;
    3174             :                         }
    3175             :                 } else {
    3176      136309 :                         BAT *b = bat_new(type, c->t->sz, PERSISTENT);
    3177      136309 :                         if (!b) {
    3178             :                                 ok = LOG_ERR;
    3179             :                         } else {
    3180      136309 :                                 create_delta(ATOMIC_PTR_GET(&c->data), b);
    3181      136309 :                                 bat_destroy(b);
    3182             :                         }
    3183             : 
    3184      136309 :                         if (!new) {
    3185           0 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3186           0 :                                 if (bat->cs.uibid == BID_NIL)
    3187           0 :                                         ok = LOG_ERR;
    3188           0 :                                 bat->cs.uvbid = e_bat(type);
    3189           0 :                                 if(bat->cs.uvbid == BID_NIL)
    3190           0 :                                         ok = LOG_ERR;
    3191             :                         }
    3192             :                 }
    3193      136331 :                 bat->cs.ucnt = 0;
    3194             : 
    3195      136331 :                 if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
    3196          94 :                         trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
    3197             :         }
    3198             :         return ok;
    3199             : }
    3200             : 
    3201             : static int
    3202       63781 : log_create_col_(sql_trans *tr, sql_column *c)
    3203             : {
    3204       63781 :         assert(!isTempTable(c->t));
    3205       63781 :         return log_create_delta(tr,  ATOMIC_PTR_GET(&c->data), c->base.id);
    3206             : }
    3207             : 
    3208             : static int
    3209          90 : log_create_col(sql_trans *tr, sql_change *change)
    3210             : {
    3211          90 :         return log_create_col_(tr, (sql_column*)change->obj);
    3212             : }
    3213             : 
    3214             : static int
    3215      123654 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
    3216             : {
    3217      123654 :         (void) t; // TODO transaction_layer_revamp: remove if unnecessary
    3218      123654 :         (void)oldest;
    3219      123654 :         assert(delta->cs.ts == tr->tid);
    3220      123654 :         delta->cs.ts = commit_ts;
    3221             : 
    3222      123654 :         assert(delta->next == NULL);
    3223      123654 :         if (!delta->cs.merged)
    3224      123653 :                 delta->nr_updates += merge_delta(delta);
    3225      123654 :         if (!tr->parent)
    3226      123650 :                 base->new = 0;
    3227      123654 :         return LOG_OK;
    3228             : }
    3229             : 
    3230             : static int
    3231          94 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3232             : {
    3233          94 :         sql_column *c = (sql_column*)change->obj;
    3234          94 :         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3235          94 :         if (!tr->parent)
    3236          93 :                 c->base.new = 0;
    3237          94 :         return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
    3238             : }
    3239             : 
    3240             : /* will be called for new idx's and when new index columns are created */
    3241             : static int
    3242        9819 : create_idx(sql_trans *tr, sql_idx *ni)
    3243             : {
    3244        9819 :         int ok = LOG_OK, new = 0;
    3245        9819 :         sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
    3246        9819 :         int type = TYPE_lng;
    3247             : 
    3248        9819 :         if (oid_index(ni->type))
    3249         954 :                 type = TYPE_oid;
    3250             : 
    3251        9819 :         if (!bat) {
    3252        9819 :                 new = 1;
    3253        9819 :                 bat = ZNEW(sql_delta);
    3254        9819 :                 if (!bat)
    3255             :                         return LOG_ERR;
    3256        9819 :                 ATOMIC_PTR_INIT(&ni->data, bat);
    3257        9819 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3258             :         }
    3259             : 
    3260        9819 :         if (new)
    3261        9819 :                 bat->cs.ts = tr->tid;
    3262             : 
    3263        9819 :         if (!isNew(ni) && !isTempTable(ni->t)){
    3264        2460 :                 bat->cs.ts = 1;
    3265        2460 :                 return load_cs(tr, &bat->cs, type, ni->base.id);
    3266        7359 :         } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
    3267           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
    3268             :         } else {
    3269        7359 :                 sql_column *c = ol_first_node(ni->t->columns)->data;
    3270        7359 :                 sql_delta *d = col_timestamp_delta(tr, c);
    3271             : 
    3272        7359 :                 if (d) {
    3273             :                         /* Here we also handle indices created through alter stmts */
    3274             :                         /* These need to be created aligned to the existing data */
    3275        7359 :                         if (d->cs.bid) {
    3276        7359 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3277        7359 :                                 if(bat->cs.bid == BID_NIL)
    3278        7359 :                                         ok = LOG_ERR;
    3279             :                         }
    3280             :                 } else {
    3281             :                         return LOG_ERR;
    3282             :                 }
    3283             : 
    3284        7359 :                 bat->cs.ucnt = 0;
    3285             : 
    3286        7359 :                 if (!new) {
    3287           0 :                         bat->cs.uibid = e_bat(TYPE_oid);
    3288           0 :                         if (bat->cs.uibid == BID_NIL)
    3289           0 :                                 ok = LOG_ERR;
    3290           0 :                         bat->cs.uvbid = e_bat(type);
    3291           0 :                         if(bat->cs.uvbid == BID_NIL)
    3292           0 :                                 ok = LOG_ERR;
    3293             :                 }
    3294        7359 :                 bat->cs.ucnt = 0;
    3295        7359 :                 if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
    3296         632 :                         trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
    3297             :         }
    3298             :         return ok;
    3299             : }
    3300             : 
    3301             : static int
    3302        6456 : log_create_idx_(sql_trans *tr, sql_idx *i)
    3303             : {
    3304        6456 :         assert(!isTempTable(i->t));
    3305        6456 :         return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3306             : }
    3307             : 
    3308             : static int
    3309         618 : log_create_idx(sql_trans *tr, sql_change *change)
    3310             : {
    3311         618 :         return log_create_idx_(tr, (sql_idx*)change->obj);
    3312             : }
    3313             : 
    3314             : static int
    3315         632 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3316             : {
    3317         632 :         sql_idx *i = (sql_idx*)change->obj;
    3318         632 :         sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3319         632 :         if (!tr->parent)
    3320         632 :                 i->base.new = 0;
    3321         632 :         return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
    3322             :         return LOG_OK;
    3323             : }
    3324             : 
    3325             : static int
    3326        5334 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
    3327             : {
    3328        5334 :         int ok = load_cs(tr, &s->cs, TYPE_msk, id);
    3329        5334 :         BAT *b = NULL, *ib = NULL;
    3330             : 
    3331        5334 :         if (ok != LOG_OK)
    3332             :                 return ok;
    3333        5334 :         if (!(b = temp_descriptor(s->cs.bid)))
    3334             :                 return LOG_ERR;
    3335        5334 :         ib = b;
    3336             : 
    3337        5334 :         if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
    3338           0 :                 bat_destroy(ib);
    3339           0 :                 return LOG_ERR;
    3340             :         }
    3341             : 
    3342        5334 :         if (BATcount(b)) {
    3343         436 :                 if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
    3344           0 :                         bat_destroy(ib);
    3345           0 :                         return LOG_ERR;
    3346             :                 }
    3347         675 :                 if (BATtdense(b)) {
    3348         239 :                         size_t start = b->tseqbase;
    3349         239 :                         size_t cnt = BATcount(b);
    3350         239 :                         ok = delete_range(tr, t, s, start, cnt);
    3351             :                 } else {
    3352         197 :                         assert(b->tsorted);
    3353         197 :                         BUN icnt = BATcount(b);
    3354         197 :                         BATiter bi = bat_iterator(b);
    3355         197 :                         size_t lcnt = 1;
    3356         197 :                         oid n;
    3357         197 :                         segment *seg = s->segs->h;
    3358         197 :                         if (complex_cand(b)) {
    3359           0 :                                 oid o = * (oid *) Tpos(&bi, 0);
    3360           0 :                                 n = o + 1;
    3361           0 :                                 for (BUN i = 1; i < icnt; i++) {
    3362           0 :                                         o = * (oid *) Tpos(&bi, i);
    3363           0 :                                         if (o == n) {
    3364           0 :                                                 lcnt++;
    3365           0 :                                                 n++;
    3366             :                                         } else {
    3367           0 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3368             :                                                         break;
    3369             :                                                 lcnt = 0;
    3370             :                                         }
    3371           0 :                                         if (!lcnt) {
    3372           0 :                                                 n = o + 1;
    3373           0 :                                                 lcnt = 1;
    3374             :                                         }
    3375             :                                 }
    3376             :                         } else {
    3377         197 :                                 oid *o = bi.base;
    3378         197 :                                 n = o[0]+1;
    3379      294714 :                                 for (size_t i=1; i<icnt; i++) {
    3380      294517 :                                         if (o[i] == n) {
    3381      291772 :                                                 lcnt++;
    3382      291772 :                                                 n++;
    3383             :                                         } else {
    3384        2745 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3385             :                                                         break;
    3386             :                                                 lcnt = 0;
    3387             :                                         }
    3388      294517 :                                         if (!lcnt) {
    3389        2745 :                                                 n = o[i]+1;
    3390        2745 :                                                 lcnt = 1;
    3391             :                                         }
    3392             :                                 }
    3393             :                         }
    3394         197 :                         if (lcnt && ok == LOG_OK)
    3395         197 :                                 ok = delete_range(tr, t, s, n-lcnt, lcnt);
    3396         197 :                         bat_iterator_end(&bi);
    3397             :                 }
    3398         436 :                 if (ok == LOG_OK)
    3399        7045 :                         for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
    3400        6609 :                                 if (seg->ts == tr->tid)
    3401        3437 :                                         seg->ts = 1;
    3402             :         } else {
    3403        4898 :                 if (ok == LOG_OK) {
    3404        4898 :                         BAT *bb = quick_descriptor(s->cs.bid);
    3405             : 
    3406        4898 :                         if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
    3407             :                                 ok = LOG_ERR;
    3408             :                         } else {
    3409        4898 :                                 segment *seg = s->segs->h;
    3410        4898 :                                 if (seg->ts == tr->tid)
    3411        4898 :                                         seg->ts = 1;
    3412             :                         }
    3413             :                 }
    3414             :         }
    3415        5334 :         if (b != ib)
    3416        5334 :                 bat_destroy(b);
    3417        5334 :         bat_destroy(ib);
    3418             : 
    3419        5334 :         return ok;
    3420             : }
    3421             : 
    3422             : static int
    3423       26770 : create_del(sql_trans *tr, sql_table *t)
    3424             : {
    3425       26770 :         int ok = LOG_OK, new = 0;
    3426       26770 :         BAT *b;
    3427       26770 :         storage *bat = ATOMIC_PTR_GET(&t->data);
    3428             : 
    3429       26770 :         if (!bat) {
    3430       26770 :                 new = 1;
    3431       26770 :                 bat = ZNEW(storage);
    3432       26770 :                 if(!bat)
    3433             :                         return LOG_ERR;
    3434       26770 :                 ATOMIC_PTR_INIT(&t->data, bat);
    3435       26770 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3436       26770 :                 bat->cs.ts = tr->tid;
    3437             :         }
    3438             : 
    3439       26770 :         if (!isNew(t) && !isTempTable(t)) {
    3440        5334 :                 bat->cs.ts = tr->ts;
    3441        5334 :                 return load_storage(tr, t, bat, t->base.id);
    3442       21436 :         } else if (bat->cs.bid) {
    3443             :                 return ok;
    3444             :         } else {
    3445       21436 :                 assert(!bat->segs);
    3446       21436 :                 if (!(bat->segs = new_segments(tr, 0)))
    3447             :                         return LOG_ERR;
    3448             : 
    3449       21436 :                 b = bat_new(TYPE_msk, t->sz, PERSISTENT);
    3450       21436 :                 if(b != NULL) {
    3451       21436 :                         bat_set_access(b, BAT_READ);
    3452       21436 :                         bat->cs.bid = temp_create(b);
    3453       21436 :                         bat_destroy(b);
    3454             :                 } else {
    3455             :                         return LOG_ERR;
    3456             :                 }
    3457       21436 :                 if (new)
    3458       28317 :                         trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
    3459             :         }
    3460             :         return ok;
    3461             : }
    3462             : 
    3463             : static int
    3464      199880 : log_segment(sql_trans *tr, segment *s, sqlid id)
    3465             : {
    3466      199880 :         sqlstore *store = tr->store;
    3467      199880 :         msk m = s->deleted;
    3468      199880 :         return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
    3469             : }
    3470             : 
    3471             : static int
    3472       98080 : log_segments(sql_trans *tr, segments *segs, sqlid id)
    3473             : {
    3474             :         /* log segments */
    3475       98080 :         lock_table(tr->store, id);
    3476      461519 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    3477      363439 :                 unlock_table(tr->store, id);
    3478      363439 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    3479      151821 :                         if (log_segment(tr, seg, id) != LOG_OK) {
    3480             :                                 return LOG_ERR;
    3481             :                         }
    3482             :                 }
    3483      363439 :                 lock_table(tr->store, id);
    3484             :         }
    3485       98080 :         unlock_table(tr->store, id);
    3486       98080 :         return LOG_OK;
    3487             : }
    3488             : 
    3489             : static int
    3490       12791 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
    3491             : {
    3492       12791 :         BAT *b;
    3493       12791 :         int ok = LOG_OK;
    3494             : 
    3495       12791 :         if (GDKinmemory(0))
    3496             :                 return LOG_OK;
    3497             : 
    3498       12758 :         b = temp_descriptor(bat->cs.bid);
    3499       12758 :         if (b == NULL)
    3500             :                 return LOG_ERR;
    3501             : 
    3502       12758 :         sqlstore *store = tr->store;
    3503       12758 :         bat_set_access(b, BAT_READ);
    3504       12758 :         if (ok == LOG_OK)
    3505       12758 :                 ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
    3506       12758 :         if (ok == LOG_OK)
    3507       12758 :                 ok = log_segments(tr, bat->segs, t->base.id);
    3508       12758 :         bat_destroy(b);
    3509       12758 :         return ok;
    3510             : }
    3511             : 
    3512             : static int
    3513       12805 : log_create_del(sql_trans *tr, sql_change *change)
    3514             : {
    3515       12805 :         int ok = LOG_OK;
    3516       12805 :         sql_table *t = (sql_table*)change->obj;
    3517             : 
    3518       12805 :         if (t->base.deleted)
    3519             :                 return ok;
    3520       12791 :         assert(!isTempTable(t));
    3521       12791 :         ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
    3522       12791 :         if (ok == LOG_OK) {
    3523       76497 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3524       63706 :                         sql_column *c = n->data;
    3525             : 
    3526       63706 :                         if (c->data)
    3527       63691 :                                 ok = log_create_col_(tr, c);
    3528             :                 }
    3529       12791 :                 if (t->idxs) {
    3530       18641 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3531        5850 :                                 sql_idx *i = n->data;
    3532             : 
    3533        5850 :                                 if (ATOMIC_PTR_GET(&i->data))
    3534        5838 :                                         ok = log_create_idx_(tr, i);
    3535             :                         }
    3536             :                 }
    3537             :         }
    3538             :         return ok;
    3539             : }
    3540             : 
    3541             : static int
    3542       21438 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3543             : {
    3544       21438 :         int ok = LOG_OK;
    3545       21438 :         sql_table *t = (sql_table*)change->obj;
    3546       21438 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    3547             : 
    3548       21438 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    3549         125 :                 assert(isTempTable(t));
    3550         125 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    3551         125 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    3552         125 :                 return ok;
    3553             :         }
    3554             : 
    3555       21313 :         if (!commit_ts) /* rollback handled by ? */
    3556             :                 return ok;
    3557       19458 :         ok = segments2cs(tr, dbat->segs, &dbat->cs);
    3558       19458 :         assert(ok == LOG_OK);
    3559       19458 :         if (ok != LOG_OK)
    3560             :                 return ok;
    3561       19458 :         merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
    3562       19458 :         assert(dbat->cs.ts == tr->tid);
    3563       19458 :         dbat->cs.ts = commit_ts;
    3564       19458 :         if (ok == LOG_OK) {
    3565      136550 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3566      117092 :                         sql_column *c = n->data;
    3567      117092 :                         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3568             : 
    3569      117092 :                         if (delta)
    3570      117077 :                                 ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
    3571             :                 }
    3572       19458 :                 if (t->idxs) {
    3573       25321 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3574        5863 :                                 sql_idx *i = n->data;
    3575        5863 :                                 sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3576             : 
    3577        5863 :                                 if (delta)
    3578        5851 :                                         ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
    3579             :                         }
    3580             :                 }
    3581       19458 :                 if (!tr->parent)
    3582       19456 :                         t->base.new = 0;
    3583             :         }
    3584       19458 :         if (!tr->parent)
    3585       19456 :                 t->base.new = 0;
    3586             :         return ok;
    3587             : }
    3588             : 
    3589             : static int
    3590       21487 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
    3591             : {
    3592       21487 :         gdk_return ok = GDK_SUCCEED;
    3593             : 
    3594       21487 :         sqlstore *store = tr->store;
    3595       21487 :         if (!GDKinmemory(0) && b && b->cs.bid)
    3596       19091 :                 ok = log_bat_transient(store->logger, id);
    3597       21487 :         if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
    3598          25 :                 ok = log_bat_transient(store->logger, -id);
    3599       21487 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3600             : }
    3601             : 
    3602             : static int
    3603      181614 : destroy_col(sqlstore *store, sql_column *c)
    3604             : {
    3605      181614 :         (void)store;
    3606      181614 :         if (ATOMIC_PTR_GET(&c->data))
    3607      181614 :                 destroy_delta(ATOMIC_PTR_GET(&c->data), true);
    3608      181614 :         ATOMIC_PTR_SET(&c->data, NULL);
    3609      181614 :         return LOG_OK;
    3610             : }
    3611             : 
    3612             : static int
    3613       19588 : log_destroy_col_(sql_trans *tr, sql_column *c)
    3614             : {
    3615       19588 :         int ok = LOG_OK;
    3616       19588 :         assert(!isTempTable(c->t));
    3617       19588 :         if (!tr->parent) /* don't write save point commits */
    3618       19588 :                 ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
    3619       19588 :         return ok;
    3620             : }
    3621             : 
    3622             : static int
    3623       19588 : log_destroy_col(sql_trans *tr, sql_change *change)
    3624             : {
    3625       19588 :         sql_column *c = (sql_column*)change->obj;
    3626       19588 :         int res = log_destroy_col_(tr, c);
    3627       19588 :         change->obj = NULL;
    3628       19588 :         column_destroy(tr->store, c);
    3629       19588 :         return res;
    3630             : }
    3631             : 
    3632             : static int
    3633       11618 : destroy_idx(sqlstore *store, sql_idx *i)
    3634             : {
    3635       11618 :         (void)store;
    3636       11618 :         if (ATOMIC_PTR_GET(&i->data))
    3637       11618 :                 destroy_delta(ATOMIC_PTR_GET(&i->data), true);
    3638       11618 :         ATOMIC_PTR_SET(&i->data, NULL);
    3639       11618 :         return LOG_OK;
    3640             : }
    3641             : 
    3642             : static int
    3643        1975 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
    3644             : {
    3645        1975 :         int ok = LOG_OK;
    3646        1975 :         assert(!isTempTable(i->t));
    3647        1975 :         if (ATOMIC_PTR_GET(&i->data)) {
    3648        1899 :                 if (!tr->parent) /* don't write save point commits */
    3649        1899 :                         ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3650             :         }
    3651        1975 :         return ok;
    3652             : }
    3653             : 
    3654             : static int
    3655        1975 : log_destroy_idx(sql_trans *tr, sql_change *change)
    3656             : {
    3657        1975 :         sql_idx *i = (sql_idx*)change->obj;
    3658        1975 :         int res = log_destroy_idx_(tr, i);
    3659        1975 :         change->obj = NULL;
    3660        1975 :         idx_destroy(tr->store, i);
    3661        1975 :         return res;
    3662             : }
    3663             : 
    3664             : static int
    3665       28341 : destroy_del(sqlstore *store, sql_table *t)
    3666             : {
    3667       28341 :         (void)store;
    3668       28341 :         if (ATOMIC_PTR_GET(&t->data))
    3669       28337 :                 destroy_storage(ATOMIC_PTR_GET(&t->data));
    3670       28341 :         ATOMIC_PTR_SET(&t->data, NULL);
    3671       28341 :         return LOG_OK;
    3672             : }
    3673             : 
    3674             : static int
    3675        3345 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
    3676             : {
    3677        3345 :         gdk_return ok = GDK_SUCCEED;
    3678             : 
    3679        3345 :         sqlstore *store = tr->store;
    3680        3345 :         if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
    3681        3345 :             bat && bat->cs.bid)
    3682        3345 :                 ok = log_bat_transient(store->logger, id);
    3683        3345 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3684             : }
    3685             : 
    3686             : static int
    3687        3345 : log_destroy_del(sql_trans *tr, sql_change *change)
    3688             : {
    3689        3345 :         int ok = LOG_OK;
    3690        3345 :         sql_table *t = (sql_table*)change->obj;
    3691             : 
    3692        3345 :         assert(!isTempTable(t));
    3693        3345 :         ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
    3694        3345 :         return ok;
    3695             : }
    3696             : 
    3697             : static int
    3698       25001 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3699             : {
    3700       25001 :         (void)tr;
    3701       25001 :         (void)change;
    3702       25001 :         (void)commit_ts;
    3703       25001 :         (void)oldest;
    3704       25001 :         if (commit_ts)
    3705       24978 :                 change->handled = true;
    3706       25001 :         return 0;
    3707             : }
    3708             : 
    3709             : static int
    3710        3421 : drop_del(sql_trans *tr, sql_table *t)
    3711             : {
    3712        3421 :         int ok = LOG_OK;
    3713             : 
    3714        3421 :         if (!isNew(t)) {
    3715        3421 :                 storage *bat = ATOMIC_PTR_GET(&t->data);
    3716        3491 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, isTempTable(t) ? NULL : &log_destroy_del);
    3717             :         }
    3718        3421 :         return ok;
    3719             : }
    3720             : 
    3721             : static int
    3722       19603 : drop_col(sql_trans *tr, sql_column *c)
    3723             : {
    3724       19603 :         assert(!isNew(c));
    3725       19603 :         sql_delta *d = ATOMIC_PTR_GET(&c->data);
    3726       19603 :         trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, isTempTable(c->t) ? NULL : &log_destroy_col);
    3727       19603 :         return LOG_OK;
    3728             : }
    3729             : 
    3730             : static int
    3731        1977 : drop_idx(sql_trans *tr, sql_idx *i)
    3732             : {
    3733        1977 :         assert(!isNew(i));
    3734        1977 :         sql_delta *d = ATOMIC_PTR_GET(&i->data);
    3735        1977 :         trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, isTempTable(i->t) ? NULL : &log_destroy_idx);
    3736        1977 :         return LOG_OK;
    3737             : }
    3738             : 
    3739             : 
    3740             : static BUN
    3741      129465 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
    3742             : {
    3743      129465 :         BAT *b;
    3744      129465 :         BUN sz = 0;
    3745             : 
    3746      129465 :         (void)tr;
    3747      129465 :         assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
    3748      129465 :         if (cs->bid && renew) {
    3749      129493 :                 b = quick_descriptor(cs->bid);
    3750      129511 :                 if (b) {
    3751      129511 :                         sz += BATcount(b);
    3752      129511 :                         if (cs->st == ST_DICT) {
    3753           2 :                                 bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
    3754           2 :                                 BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
    3755             : 
    3756           2 :                                 if (nebid == BID_NIL || !n) {
    3757           0 :                                         temp_destroy(nebid);
    3758           0 :                                         bat_destroy(n);
    3759           0 :                                         return BUN_NONE;
    3760             :                                 }
    3761           2 :                                 temp_destroy(cs->ebid);
    3762           2 :                                 cs->ebid = nebid;
    3763           2 :                                 if (!temp)
    3764           2 :                                         bat_set_access(n, BAT_READ);
    3765           2 :                                 temp_destroy(cs->bid);
    3766           2 :                                 cs->bid = temp_create(n); /* create empty copy */
    3767           2 :                                 bat_destroy(n);
    3768             :                         } else {
    3769      129509 :                                 bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
    3770             : 
    3771      129298 :                                 if (nbid == BID_NIL)
    3772             :                                         return BUN_NONE;
    3773      129298 :                                 temp_destroy(cs->bid);
    3774      129435 :                                 cs->bid = nbid;
    3775             :                         }
    3776             :                 } else {
    3777             :                         return BUN_NONE;
    3778             :                 }
    3779             :         }
    3780      129409 :         if (cs->uibid) {
    3781      129258 :                 temp_destroy(cs->uibid);
    3782      129372 :                 cs->uibid = 0;
    3783             :         }
    3784      129523 :         if (cs->uvbid) {
    3785      129374 :                 temp_destroy(cs->uvbid);
    3786      129374 :                 cs->uvbid = 0;
    3787             :         }
    3788      129523 :         cs->cleared = true;
    3789      129523 :         cs->ucnt = 0;
    3790      129523 :         return sz;
    3791             : }
    3792             : 
    3793             : static BUN
    3794      129334 : clear_col(sql_trans *tr, sql_column *c, bool renew)
    3795             : {
    3796      129334 :         bool update_conflict = false;
    3797      129334 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    3798             : 
    3799      129334 :         if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
    3800           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3801      129368 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    3802      129368 :         if (odelta != delta)
    3803      129373 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    3804      129355 :         if (delta)
    3805      129355 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
    3806             :         return 0;
    3807             : }
    3808             : 
    3809             : static BUN
    3810          21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
    3811             : {
    3812          21 :         bool update_conflict = false;
    3813          21 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    3814             : 
    3815          21 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    3816          15 :                 return 0;
    3817           6 :         if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
    3818           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3819           6 :         assert(i->t->persistence != SQL_DECLARED_TABLE);
    3820           6 :         if (odelta != delta)
    3821           6 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    3822           6 :         if (delta)
    3823           6 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
    3824             :         return 0;
    3825             : }
    3826             : 
    3827             : static int
    3828         147 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
    3829             : {
    3830         147 :         if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
    3831             :                 return LOG_ERR;
    3832         147 :         if (s->segs)
    3833         147 :                 destroy_segments(s->segs);
    3834         147 :         if (!(s->segs = new_segments(tr, 0)))
    3835             :                 return LOG_ERR;
    3836             :         return LOG_OK;
    3837             : }
    3838             : 
    3839             : 
    3840             : /*
    3841             :  * Clear the table, in general this means replacing the storage,
    3842             :  * but in case of earlier deletes (or inserts by this transaction), we only mark
    3843             :  * all segments as deleted.
    3844             :  * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
    3845             :  */
    3846             : static BUN
    3847       41827 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
    3848             : {
    3849       41827 :         int clear = !in_transaction, ok = LOG_OK;
    3850       41827 :         bool conflict = false;
    3851       41827 :         storage *bat;
    3852             : 
    3853       41878 :         if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
    3854       15820 :                 return conflict?BUN_NONE-1:BUN_NONE;
    3855             : 
    3856       26012 :         if (!clear) {
    3857          51 :                 lock_table(tr->store, t->base.id);
    3858          51 :                 ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
    3859          51 :                 unlock_table(tr->store, t->base.id);
    3860             :         }
    3861       26012 :         assert(t->persistence != SQL_DECLARED_TABLE);
    3862       26012 :         if (!in_transaction)
    3863       25964 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    3864       26009 :         if (ok == LOG_ERR)
    3865             :                 return BUN_NONE;
    3866       26009 :         if (ok == LOG_CONFLICT)
    3867           0 :                 return BUN_NONE - 1;
    3868             :         return LOG_OK;
    3869             : }
    3870             : 
    3871             : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
    3872             : static BUN
    3873       41796 : clear_table(sql_trans *tr, sql_table *t)
    3874             : {
    3875       41796 :         node *n = ol_first_node(t->columns);
    3876       41796 :         storage *d = tab_timestamp_storage(tr, t);
    3877       41820 :         int in_transaction, clear;
    3878       41820 :         BUN sz, clear_ok;
    3879             : 
    3880       41820 :         sql_column *c = n->data;
    3881       41820 :         while (c && c->type.type->composite && !c->type.multiset) {
    3882           0 :                 n = n->next;
    3883           0 :                 c = n?n->data:NULL;
    3884             :         }
    3885       41820 :         if (!d || !c)
    3886             :                 return BUN_NONE;
    3887       41824 :         lock_table(tr->store, t->base.id);
    3888       41832 :         in_transaction = segments_in_transaction(tr, t);
    3889       41831 :         unlock_table(tr->store, t->base.id);
    3890       41832 :         clear = !in_transaction;
    3891       41832 :         sz = count_col(tr, c, CNT_ACTIVE);
    3892       41831 :         if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
    3893             :                 return clear_ok;
    3894             : 
    3895       26006 :         if (in_transaction)
    3896             :                 return sz;
    3897             : 
    3898      155298 :         for (; n; n = n->next) {
    3899      129337 :                 c = n->data;
    3900             : 
    3901      129337 :                 if (c->type.type->composite && !c->type.multiset)
    3902           0 :                         continue;
    3903             : 
    3904      129337 :                 if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
    3905           0 :                         return clear_ok;
    3906             :         }
    3907       25961 :         if (t->idxs) {
    3908       25982 :                 for (n = ol_first_node(t->idxs); n; n = n->next) {
    3909          21 :                         sql_idx *ci = n->data;
    3910             : 
    3911          21 :                         if (isTable(ci->t) && idx_has_column(ci->type) &&
    3912          21 :                                 (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
    3913           0 :                                 return clear_ok;
    3914             :                 }
    3915             :         }
    3916       25961 :         if (clear)
    3917       25961 :                 d->segs->nr_reused = 0;
    3918       25961 :         return sz;
    3919             : }
    3920             : 
    3921             : static int
    3922      158805 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
    3923             : {
    3924      158805 :         sqlstore *store = tr->store;
    3925      158805 :         gdk_return ok = GDK_SUCCEED;
    3926             : 
    3927      158805 :         (void) t;
    3928      158805 :         (void) segs;
    3929      158805 :         if (GDKinmemory(0))
    3930             :                 return LOG_OK;
    3931             : 
    3932      158798 :         if (cs->cleared) {
    3933      155388 :                 assert(cs->ucnt == 0);
    3934      155388 :                 BAT *ins = temp_descriptor(cs->bid);
    3935      155388 :                 if (!ins)
    3936             :                         return LOG_ERR;
    3937      155388 :                 assert(!isEbat(ins));
    3938      155388 :                 bat_set_access(ins, BAT_READ);
    3939      155388 :                 ok = log_bat_persists(store->logger, ins, id);
    3940      155388 :                 bat_destroy(ins);
    3941      155388 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    3942          56 :                         BAT *ins = temp_descriptor(cs->ebid);
    3943          56 :                         if (!ins)
    3944             :                                 return LOG_ERR;
    3945          56 :                         assert(!isEbat(ins));
    3946          56 :                         bat_set_access(ins, BAT_READ);
    3947          56 :                         ok = log_bat_persists(store->logger, ins, -id);
    3948          56 :                         bat_destroy(ins);
    3949             :                 }
    3950      155388 :                 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3951             :         }
    3952             : 
    3953        3410 :         assert(!isTempTable(t));
    3954             : 
    3955        3410 :         if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
    3956        2819 :                 BAT *ui = temp_descriptor(cs->uibid);
    3957        2819 :                 BAT *uv = temp_descriptor(cs->uvbid);
    3958             :                 /* any updates */
    3959        2819 :                 if (ui == NULL || uv == NULL) {
    3960             :                         ok = GDK_FAIL;
    3961        2819 :                 } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
    3962        2819 :                         ok = log_delta(store->logger, ui, uv, id);
    3963        2819 :                 bat_destroy(ui);
    3964        2819 :                 bat_destroy(uv);
    3965             :         }
    3966        2819 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3967             : }
    3968             : 
    3969             : static inline int
    3970       59367 : tr_log_table_start(sql_trans *tr, sql_table *t) {
    3971       59367 :         sqlstore *store = tr->store;
    3972       59367 :         return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3973             : }
    3974             : 
    3975             : static inline int
    3976       59367 : tr_log_table_end(sql_trans *tr, sql_table *t) {
    3977       59367 :         sqlstore *store = tr->store;
    3978       59367 :         return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3979             : }
    3980             : 
    3981             : static int
    3982       59367 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
    3983             : {
    3984       59367 :         sqlstore *store = tr->store;
    3985       59367 :         gdk_return ok = GDK_SUCCEED;
    3986             : 
    3987       59367 :         size_t end = segs_end(segs, tr, t);
    3988             : 
    3989       59367 :         if (tr_log_table_start(tr, t) != LOG_OK)
    3990             :                 return LOG_ERR;
    3991             : 
    3992       59367 :         size_t nr_appends = 0;
    3993             : 
    3994       59367 :         lock_table(tr->store, t->base.id);
    3995      384217 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    3996      324850 :                 unlock_table(tr->store, t->base.id);
    3997             : 
    3998      324850 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    3999      121096 :                         if (!seg->deleted) {
    4000       48059 :                                 if (log_segment(tr, seg, t->base.id) != LOG_OK)
    4001             :                                         return LOG_ERR;
    4002             : 
    4003       48059 :                                 nr_appends += (seg->end - seg->start);
    4004             :                         }
    4005             :                 }
    4006      324850 :                 lock_table(tr->store, t->base.id);
    4007             :         }
    4008       59367 :         unlock_table(tr->store, t->base.id);
    4009             : 
    4010      429935 :         for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
    4011      370568 :                 sql_column *c = n->data;
    4012      370568 :                 column_storage *cs = ATOMIC_PTR_GET(&c->data);
    4013             : 
    4014      370568 :                 if (c->type.type->composite && !c->type.multiset)
    4015          34 :                         continue;
    4016      370534 :                 if (cs->cleared) {
    4017           3 :                         ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    4018           3 :                         continue;
    4019             :                 }
    4020             : 
    4021      370531 :                 lock_table(tr->store, t->base.id);
    4022      370531 :                 if (!cs->cleared) {
    4023     2424476 :                         for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    4024     2053945 :                                 unlock_table(tr->store, t->base.id);
    4025     2053945 :                                 if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4026             :                                         /* append col*/
    4027      282212 :                                         BAT *ins = temp_descriptor(cs->bid);
    4028      282212 :                                         if (ins == NULL)
    4029             :                                                 return LOG_ERR;
    4030      282212 :                                         assert(BATcount(ins) >= cur->end);
    4031      282212 :                                         ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
    4032      282212 :                                         bat_destroy(ins);
    4033             :                                 }
    4034     2053945 :                                 lock_table(tr->store, t->base.id);
    4035             :                         }
    4036             :                 }
    4037      370531 :                 unlock_table(tr->store, t->base.id);
    4038             : 
    4039      370531 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    4040          19 :                         BAT *ins = temp_descriptor(cs->ebid);
    4041          19 :                         if (ins == NULL)
    4042             :                                 return LOG_ERR;
    4043          19 :                         if (BATcount(ins) > ins->batInserted)
    4044          17 :                                 ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
    4045          19 :                         BATcommit(ins, BATcount(ins));
    4046          19 :                         bat_destroy(ins);
    4047             :                 }
    4048             :         }
    4049             : 
    4050       59367 :         if (t->idxs) {
    4051       64554 :                 for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
    4052        5187 :                         sql_idx *i = n->data;
    4053             : 
    4054        5187 :                         if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    4055        4409 :                                 continue;
    4056         778 :                         column_storage *cs = ATOMIC_PTR_GET(&i->data);
    4057             : 
    4058         778 :                         if (cs) {
    4059         778 :                                 if (cs->cleared) {
    4060           0 :                                         ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    4061           0 :                                         continue;
    4062             :                                 }
    4063             : 
    4064         778 :                                 lock_table(tr->store, t->base.id);
    4065        2387 :                                 for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    4066        1609 :                                         unlock_table(tr->store, t->base.id);
    4067        1609 :                                         if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4068             :                                                 /* append idx */
    4069         730 :                                                 BAT *ins = temp_descriptor(cs->bid);
    4070         730 :                                                 if (ins == NULL)
    4071             :                                                         return LOG_ERR;
    4072         730 :                                                 assert(BATcount(ins) >= cur->end);
    4073         730 :                                                 ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
    4074         730 :                                                 bat_destroy(ins);
    4075             :                                         }
    4076        1609 :                                         lock_table(tr->store, t->base.id);
    4077             :                                 }
    4078         778 :                                 unlock_table(tr->store, t->base.id);
    4079             :                         }
    4080             :                 }
    4081             :         }
    4082             : 
    4083       59367 :         if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
    4084           0 :                 return LOG_ERR;
    4085             : 
    4086             :         return LOG_OK;
    4087             : }
    4088             : 
    4089             : static int
    4090       85322 : log_storage(sql_trans *tr, sql_table *t, storage *s)
    4091             : {
    4092       85322 :         int ok = LOG_OK;
    4093       85322 :         bool cleared = s->cs.cleared;
    4094       85322 :         if (ok == LOG_OK && cleared)
    4095       25955 :                 ok =  tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
    4096       25955 :         if (ok == LOG_OK)
    4097       85322 :                 ok = log_segments(tr, s->segs, t->base.id);
    4098       85322 :         if (ok == LOG_OK && !cleared)
    4099       59367 :                 ok = log_table_append(tr, t, s->segs);
    4100       85322 :         return ok;
    4101             : }
    4102             : 
    4103             : static void
    4104      426189 : merge_cs( column_storage *cs, const char* caller)
    4105             : {
    4106      426189 :         if (cs->bid && cs->ucnt) {
    4107        2828 :                 BAT *cur = temp_descriptor(cs->bid);
    4108        2828 :                 BAT *ui = temp_descriptor(cs->uibid);
    4109        2828 :                 BAT *uv = temp_descriptor(cs->uvbid);
    4110             : 
    4111        2828 :                 if (!cur || !ui || !uv) {
    4112           0 :                         bat_destroy(ui);
    4113           0 :                         bat_destroy(uv);
    4114           0 :                         bat_destroy(cur);
    4115           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4116             :                         return;
    4117             :                 }
    4118        2828 :                 assert(BATcount(ui) == BATcount(uv));
    4119             : 
    4120             :                 /* any updates */
    4121        2828 :                 assert(!isEbat(cur));
    4122        2828 :                 if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
    4123           0 :                         bat_destroy(ui);
    4124           0 :                         bat_destroy(uv);
    4125           0 :                         bat_destroy(cur);
    4126           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4127             :                         return;
    4128             :                 }
    4129             :                 /* cleanup the old deltas */
    4130        2828 :                 temp_destroy(cs->uibid);
    4131        2828 :                 temp_destroy(cs->uvbid);
    4132        2828 :                 cs->uibid = e_bat(TYPE_oid);
    4133        2828 :                 cs->uvbid = e_bat(cur->ttype);
    4134        2828 :                 assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
    4135        2828 :                 cs->ucnt = 0;
    4136        2828 :                 bat_destroy(ui);
    4137        2828 :                 bat_destroy(uv);
    4138        2828 :                 bat_destroy(cur);
    4139             :         }
    4140      426189 :         cs->cleared = false;
    4141      426189 :         cs->merged = true;
    4142      426189 :         return;
    4143             : }
    4144             : 
    4145             : static lng
    4146      382854 : merge_delta( sql_delta *obat)
    4147             : {
    4148      382854 :         lng res = 0;
    4149      382854 :         if (obat && obat->next && !obat->cs.merged)
    4150      132857 :                 res += merge_delta(obat->next);
    4151      382854 :         res += obat->cs.ucnt;
    4152      382854 :         merge_cs(&obat->cs, __func__);
    4153      382854 :         return res;
    4154             : }
    4155             : 
    4156             : static void
    4157       43335 : merge_storage(storage *tdb)
    4158             : {
    4159       43335 :         merge_cs(&tdb->cs, __func__);
    4160             : 
    4161       43335 :         if (tdb->next) {
    4162         454 :                 destroy_storage(tdb->next);
    4163         454 :                 tdb->next = NULL;
    4164             :         }
    4165       43335 : }
    4166             : 
    4167             : static sql_delta *
    4168           1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
    4169             : {
    4170             :         /* commit ie copy back to the parent transaction */
    4171           1 :         if (delta && delta->cs.ts == commit_ts && delta->next) {
    4172           1 :                 sql_delta *od = delta->next;
    4173           1 :                 if (od->cs.ts == commit_ts) {
    4174           0 :                         sql_delta t = *od, *n = od->next;
    4175           0 :                         *od = *delta;
    4176           0 :                         od->next = n;
    4177           0 :                         *delta = t;
    4178           0 :                         delta->next = NULL;
    4179           0 :                         destroy_delta(delta, true);
    4180           0 :                         return od;
    4181             :                 }
    4182             :         }
    4183             :         return delta;
    4184             : }
    4185             : 
    4186             : static int
    4187      132821 : log_update_col( sql_trans *tr, sql_change *change)
    4188             : {
    4189      132821 :         sql_column *c = (sql_column*)change->obj;
    4190      132821 :         assert(!isTempTable(c->t));
    4191             : 
    4192      132821 :         if (isDeleted(c->t)) {
    4193           0 :                 change->handled = true;
    4194           0 :                 return LOG_OK;
    4195             :         }
    4196             : 
    4197      132821 :         if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
    4198      132821 :                 storage *s = ATOMIC_PTR_GET(&c->t->data);
    4199      132821 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    4200      132821 :                 return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
    4201             :         }
    4202             :         return LOG_OK;
    4203             : }
    4204             : 
    4205             : static int
    4206         217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
    4207             : {
    4208         217 :         sqlstore *store = Store;
    4209             : 
    4210         217 :         sql_delta *d = (sql_delta*)change->data;
    4211         217 :         if (d->cs.ts < oldest) {
    4212          82 :                 destroy_delta(d, false);
    4213          82 :                 if (change->commit == &commit_update_idx)
    4214           2 :                         table_destroy(store, ((sql_idx*)change->obj)->t);
    4215             :                 else
    4216          80 :                         table_destroy(store, ((sql_column*)change->obj)->t);
    4217          82 :                 return 1;
    4218             :         }
    4219         135 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4220          82 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4221             :         return 0;
    4222             : }
    4223             : 
    4224             : static int
    4225           9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
    4226             : {
    4227           9 :         sqlstore *store = Store;
    4228             : 
    4229           9 :         storage *d = (storage*)change->data;
    4230           9 :         if (d->cs.ts < oldest) {
    4231           3 :                 destroy_storage(d);
    4232           3 :                 table_destroy(store, (sql_table*)change->obj);
    4233           3 :                 return 1;
    4234             :         }
    4235           6 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4236           3 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4237             :         return 0;
    4238             : }
    4239             : 
    4240             : static int
    4241      132939 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
    4242             : {
    4243      132939 :         (void) type; // TODO transaction_layer_revamp remove if remains unused
    4244             : 
    4245      132939 :         sql_delta *delta = ATOMIC_PTR_GET(data), *idelta = delta;
    4246             : 
    4247      132939 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4248           3 :                 int ok = LOG_OK;
    4249           3 :                 assert(isTempTable(t));
    4250           3 :                 if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
    4251           0 :                         ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
    4252           3 :                 if (!tr->parent)
    4253           0 :                         t->base.new = base->new = 0;
    4254           3 :                 change->handled = true;
    4255           3 :                 return ok;
    4256             :         }
    4257             : 
    4258      132936 :         if (commit_ts)
    4259      132854 :                 delta->cs.ts = commit_ts;
    4260      132936 :         if (!commit_ts) { /* rollback */
    4261          82 :                 sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
    4262             : 
    4263          82 :                 if (change->ts && t->base.new) /* handled by create col */
    4264             :                         return LOG_OK;
    4265          82 :                 if (o != d) {
    4266           0 :                         while(o && o->next != d)
    4267             :                                 o = o->next;
    4268             :                 }
    4269          82 :                 if (o == ATOMIC_PTR_GET(data))
    4270          82 :                         ATOMIC_PTR_SET(data, d->next);
    4271             :                 else
    4272           0 :                         o->next = d->next;
    4273          82 :                 d->next = NULL;
    4274          82 :                 change->cleanup = &tc_gc_rollbacked;
    4275      132854 :         } else if (!tr->parent) {
    4276             :                 /* merge deltas */
    4277      276397 :                 while (delta && delta->cs.ts > oldest)
    4278      143544 :                         delta = delta->next;
    4279      132853 :                 if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
    4280       24388 :                         lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
    4281       24388 :                         idelta->nr_updates += merge_delta(delta);
    4282       24388 :                         unlock_column(tr->store, base->id);
    4283             :                 }
    4284           1 :         } else if (tr->parent) /* move delta into older and cleanup current save points */
    4285           1 :                 ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
    4286             :         return LOG_OK;
    4287             : }
    4288             : 
    4289             : static int
    4290      132911 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4291             : {
    4292             : 
    4293      132911 :         sql_column *c = (sql_column*)change->obj;
    4294      132911 :         sql_base* base = &c->base;
    4295      132911 :         sql_table* t = c->t;
    4296      132911 :         ATOMIC_PTR_TYPE* data = &c->data;
    4297      132911 :         int type = c->type.multiset?TYPE_int:c->type.type->localtype;
    4298             : 
    4299      132911 :         if (change->handled || isDeleted(c->t))
    4300             :                 return LOG_OK;
    4301             : 
    4302      132911 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4303             : }
    4304             : 
    4305             : static int
    4306          26 : log_update_idx( sql_trans *tr, sql_change *change)
    4307             : {
    4308          26 :         sql_idx *i = (sql_idx*)change->obj;
    4309          26 :         assert(!isTempTable(i->t));
    4310             : 
    4311          26 :         if (isDeleted(i->t)) {
    4312           0 :                 change->handled = true;
    4313           0 :                 return LOG_OK;
    4314             :         }
    4315             : 
    4316          26 :         if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
    4317          26 :                 storage *s = ATOMIC_PTR_GET(&i->t->data);
    4318          26 :                 sql_delta *d = ATOMIC_PTR_GET(&i->data);
    4319          26 :                 return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
    4320             :         }
    4321             :         return LOG_OK;
    4322             : }
    4323             : 
    4324             : static int
    4325          28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4326             : {
    4327          28 :         sql_idx *i = (sql_idx*)change->obj;
    4328          28 :         sql_base* base = &i->base;
    4329          28 :         sql_table* t = i->t;
    4330          28 :         ATOMIC_PTR_TYPE* data = &i->data;
    4331          28 :         int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
    4332             : 
    4333          28 :         if (change->handled || isDeleted(i->t))
    4334             :                 return LOG_OK;
    4335             : 
    4336          28 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4337             : }
    4338             : 
    4339             : static storage *
    4340          26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
    4341             : {
    4342          26 :         if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
    4343           0 :                 storage *od = dbat->next;
    4344           0 :                 if (od->cs.ts == commit_ts) {
    4345           0 :                         storage t = *od, *n = od->next;
    4346           0 :                         *od = *dbat;
    4347           0 :                         od->next = n;
    4348           0 :                         *dbat = t;
    4349           0 :                         dbat->next = NULL;
    4350           0 :                         destroy_storage(dbat);
    4351           0 :                         return od;
    4352             :                 }
    4353             :         }
    4354             :         return dbat;
    4355             : }
    4356             : 
    4357             : static int
    4358       85322 : log_update_del( sql_trans *tr, sql_change *change)
    4359             : {
    4360       85322 :         sql_table *t = (sql_table*)change->obj;
    4361       85322 :         assert(!isTempTable(t));
    4362             : 
    4363       85322 :         if (isDeleted(t)) {
    4364           0 :                 change->handled = true;
    4365           0 :                 return LOG_OK;
    4366             :         }
    4367             : 
    4368       85322 :         if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
    4369       85322 :                 return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
    4370             :         return LOG_OK;
    4371             : }
    4372             : 
    4373             : static int
    4374       89633 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4375             : {
    4376       89633 :         int ok = LOG_OK;
    4377       89633 :         sql_table *t = (sql_table*)change->obj;
    4378       89633 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    4379             : 
    4380       89633 :         if (change->handled || isDeleted(t))
    4381             :                 return ok;
    4382             : 
    4383       89633 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4384          22 :                 assert(isTempTable(t));
    4385          22 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    4386          22 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    4387          22 :                 change->handled = true;
    4388          22 :                 return ok;
    4389             :         }
    4390             : 
    4391       89611 :         lock_table(tr->store, t->base.id);
    4392       89611 :         if (!commit_ts) { /* rollback */
    4393        3984 :                 if (dbat->cs.ts == tr->tid) {
    4394           6 :                         if (change->ts && t->base.new) { /* handled by the create table */
    4395           3 :                                 unlock_table(tr->store, t->base.id);
    4396           3 :                                 return ok;
    4397             :                         }
    4398           3 :                         storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
    4399             : 
    4400           3 :                         if (o != d) {
    4401           0 :                                 while(o && o->next != d)
    4402             :                                         o = o->next;
    4403             :                         }
    4404           3 :                         if (o == ATOMIC_PTR_GET(&t->data)) {
    4405           3 :                                 assert(d->next);
    4406           3 :                                 ATOMIC_PTR_SET(&t->data, d->next);
    4407             :                         } else
    4408           0 :                                 o->next = d->next;
    4409           3 :                         d->next = NULL;
    4410           3 :                         change->cleanup = &tc_gc_rollbacked_storage;
    4411             :                 } else
    4412        3978 :                         rollback_segments(dbat->segs, tr, change, oldest);
    4413       85627 :         } else if (ok == LOG_OK && !tr->parent) {
    4414       85601 :                 if (dbat->cs.ts == tr->tid) /* cleared table */
    4415       25957 :                         dbat->cs.ts = commit_ts;
    4416             : 
    4417       85601 :                 ok = segments2cs(tr, dbat->segs, &dbat->cs);
    4418       85601 :                 if (ok == LOG_OK) {
    4419       85601 :                         merge_segments(dbat, tr, change, commit_ts, oldest);
    4420       85601 :                         if (oldest == commit_ts)
    4421       43335 :                                 merge_storage(dbat);
    4422             :                 }
    4423       85601 :                 if (dbat)
    4424       85601 :                         dbat->cs.cleared = false;
    4425          26 :         } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
    4426          26 :                 merge_segments(dbat, tr, change, commit_ts, oldest);
    4427          26 :                 ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
    4428          26 :                 storage *s = change->data;
    4429          26 :                 if (s->cs.ts == tr->tid)
    4430           0 :                         s->cs.ts = commit_ts;
    4431             :         }
    4432       89608 :         unlock_table(tr->store, t->base.id);
    4433       89608 :         return ok;
    4434             : }
    4435             : 
    4436             : /* only rollback (content version) case for now */
    4437             : static int
    4438       19763 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
    4439             : {
    4440       19763 :         sqlstore *store = Store;
    4441       19763 :         sql_column *c = (sql_column*)change->obj;
    4442             : 
    4443       19763 :         if (!c) /* cleaned earlier */
    4444             :                 return 1;
    4445             : 
    4446         175 :         if (change->handled || isDeleted(c->t)) {
    4447           0 :                 column_destroy(store, c);
    4448           0 :                 return 1;
    4449             :         }
    4450             : 
    4451             :         /* savepoint commit (did it merge ?) */
    4452         175 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4453           0 :                 column_destroy(store, c);
    4454           0 :                 return 1;
    4455             :         }
    4456         175 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4457             :                 return 0;
    4458         174 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4459         174 :         if (d && d->next) {
    4460             : 
    4461          66 :                 if (d->cs.ts > oldest)
    4462             :                         return LOG_OK; /* cannot cleanup yet */
    4463             : 
    4464             :                 // d is oldest reachable delta
    4465          63 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4466          62 :                         destroy_delta(d->next, true);
    4467          62 :                         d->next = NULL;
    4468             :                 }
    4469          63 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4470          63 :                 id->nr_updates += merge_delta(d);
    4471          63 :                 unlock_column(store, c->base.id);
    4472             :         }
    4473         171 :         column_destroy(store, c);
    4474         171 :         return 1;
    4475             : }
    4476             : 
    4477             : static int
    4478      736304 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
    4479             : {
    4480      736304 :         sqlstore *store = Store;
    4481      736304 :         sql_column *c = (sql_column*)change->obj;
    4482             : 
    4483      736304 :         if (!c) /* cleaned earlier */
    4484             :                 return 1;
    4485             : 
    4486      736304 :         if (change->handled || isDeleted(c->t)) {
    4487           3 :                 table_destroy(store, c->t);
    4488           3 :                 return 1;
    4489             :         }
    4490             : 
    4491             :         /* savepoint commit (did it merge ?) */
    4492      736301 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4493       30897 :                 table_destroy(store, c->t);
    4494       30897 :                 return 1;
    4495             :         }
    4496      705404 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4497             :                 return 0;
    4498      705403 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4499      705403 :         if (d && d->next) {
    4500             : 
    4501      705403 :                 if (d->cs.ts > oldest)
    4502             :                         return LOG_OK; /* cannot cleanup yet */
    4503             : 
    4504             :                 // d is oldest reachable delta
    4505      101867 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4506        5242 :                         destroy_delta(d->next, true);
    4507        5242 :                         d->next = NULL;
    4508             :                 }
    4509      101867 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4510      101867 :                 id->nr_updates += merge_delta(d);
    4511      101867 :                 unlock_column(store, c->base.id);
    4512             :         }
    4513      101867 :         table_destroy(store, c->t);
    4514      101867 :         return 1;
    4515             : }
    4516             : 
    4517             : static int
    4518        2609 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
    4519             : {
    4520        2609 :         sqlstore *store = Store;
    4521        2609 :         sql_idx *i = (sql_idx*)change->obj;
    4522             : 
    4523        2609 :         if (!i) /* cleaned earlier */
    4524             :                 return 1;
    4525             : 
    4526         634 :         if (change->handled || isDeleted(i->t)) {
    4527           0 :                 idx_destroy(store, i);
    4528           0 :                 return 1;
    4529             :         }
    4530             : 
    4531             :         /* savepoint commit (did it merge ?) */
    4532         634 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4533           0 :                 idx_destroy(store, i);
    4534           0 :                 return 1;
    4535             :         }
    4536         634 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4537             :                 return 0;
    4538         634 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4539         634 :         if (d && d->next) {
    4540           0 :                 if (d->cs.ts > oldest)
    4541             :                         return LOG_OK; /* cannot cleanup yet */
    4542             : 
    4543             :                 // d is oldest reachable delta
    4544           0 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4545           0 :                         destroy_delta(d->next, true);
    4546           0 :                         d->next = NULL;
    4547             :                 }
    4548           0 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4549           0 :                 id->nr_updates += merge_delta(d);
    4550           0 :                 unlock_column(store, i->base.id);
    4551             :         }
    4552         634 :         idx_destroy(store, i);
    4553         634 :         return 1;
    4554             : }
    4555             : 
    4556             : static int
    4557          26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
    4558             : {
    4559          26 :         sqlstore *store = Store;
    4560          26 :         sql_idx *i = (sql_idx*)change->obj;
    4561             : 
    4562          26 :         if (!i) /* cleaned earlier */
    4563             :                 return 1;
    4564             : 
    4565          26 :         if (change->handled || isDeleted(i->t)) {
    4566           0 :                 table_destroy(store, i->t);
    4567           0 :                 return 1;
    4568             :         }
    4569             : 
    4570             :         /* savepoint commit (did it merge ?) */
    4571          26 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4572           0 :                 table_destroy(store, i->t);
    4573           0 :                 return 1;
    4574             :         }
    4575          26 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4576             :                 return 0;
    4577          26 :         sql_delta *d = (sql_delta*)change->data, *id = d;
    4578          26 :         if (d && d->next) {
    4579          26 :                 if (d->cs.ts > oldest)
    4580             :                         return LOG_OK; /* cannot cleanup yet */
    4581             : 
    4582             :                 // d is oldest reachable delta
    4583          26 :                 if (d->cs.merged && d->next) { // Unreachable can immediately be destroyed.
    4584          26 :                         destroy_delta(d->next, true);
    4585          26 :                         d->next = NULL;
    4586             :                 }
    4587          26 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4588          26 :                 id->nr_updates += merge_delta(d);
    4589          26 :                 unlock_column(store, i->base.id);
    4590             :         }
    4591          26 :         table_destroy(store, i->t);
    4592          26 :         return 1;
    4593             : }
    4594             : 
    4595             : static int
    4596      221502 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
    4597             : {
    4598      221502 :         sqlstore *store = Store;
    4599      221502 :         sql_table *t = (sql_table*)change->obj;
    4600             : 
    4601      221502 :         if (change->handled || isDeleted(t)) {
    4602        3595 :                 table_destroy(store, t);
    4603        3595 :                 return 1;
    4604             :         }
    4605             :         /* savepoint commit (did it merge ?) */
    4606      217907 :         if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
    4607        6239 :                 table_destroy(store, t);
    4608        6239 :                 return 1;
    4609             :         }
    4610      211668 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4611             :                 return 0;
    4612      211640 :         storage *d = (storage*)change->data;
    4613      211640 :         if (d->next) {
    4614      139269 :                 if (d->cs.ts > oldest)
    4615             :                         return LOG_OK; /* cannot cleanup yet */
    4616             : 
    4617       19265 :                 destroy_storage(d->next);
    4618       19265 :                 d->next = NULL;
    4619             :         }
    4620       91636 :         table_destroy(store, t);
    4621       91636 :         return 1;
    4622             : }
    4623             : 
    4624             : static int
    4625       30502 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
    4626             : {
    4627       30502 :         if (nr == 0)
    4628             :                 return LOG_OK;
    4629       30502 :         assert (nr > 0);
    4630       30502 :         if ((!offsets || !*offsets) && nr == total) {
    4631       30490 :                 *offset = slot;
    4632       30490 :                 return LOG_OK;
    4633             :         }
    4634          12 :         if (!*offsets) {
    4635           7 :                 *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
    4636           7 :                 if (!*offsets)
    4637             :                         return LOG_ERR;
    4638             :         }
    4639          12 :         oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
    4640       16151 :         for(size_t i = 0; i < nr; i++)
    4641       16139 :                 dst[i] = slot + i;
    4642          12 :         (*offsets)->batCount += nr;
    4643          12 :         (*offsets)->theap->dirty = true;
    4644          12 :         return LOG_OK;
    4645             : }
    4646             : 
    4647             : static int
    4648       30434 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4649             : {
    4650       30434 :         assert(s->segs);
    4651       30434 :         ulng oldest = store_oldest(tr->store, NULL);
    4652       30461 :         BUN slot = 0;
    4653       30461 :         size_t total = cnt;
    4654             : 
    4655       30461 :         if (!locked)
    4656       30488 :                 lock_table(tr->store, t->base.id);
    4657       30656 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4658             :         /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
    4659       61392 :         for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4660       30794 :                 if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* reuse old deleted or rolled back append */
    4661          35 :                         if ((seg->end - seg->start) >= cnt) {
    4662             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4663          13 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4664           2 :                                         slot = p->end;
    4665           2 :                                         p->end += cnt;
    4666           2 :                                         seg->start += cnt;
    4667           2 :                                         if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
    4668             :                                                 ok = LOG_ERR;
    4669             :                                                 break;
    4670             :                                         }
    4671           2 :                                         s->segs->nr_reused += cnt;
    4672           2 :                                         cnt = 0;
    4673           2 :                                         break;
    4674             :                                 }
    4675             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4676          11 :                                 size_t rcnt = seg->end - seg->start;
    4677          11 :                                 if (rcnt > cnt)
    4678             :                                         rcnt = cnt;
    4679          11 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
    4680             :                                         ok = LOG_ERR;
    4681             :                                         break;
    4682             :                                 }
    4683             :                         }
    4684          33 :                         seg->ts = tr->tid;
    4685          33 :                         seg->deleted = false;
    4686          33 :                         slot = seg->start;
    4687          33 :                         if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
    4688             :                                 ok = LOG_ERR;
    4689             :                                 break;
    4690             :                         }
    4691           0 :                         s->segs->nr_reused += (seg->end - seg->start);
    4692           0 :                         cnt -= (seg->end - seg->start);
    4693             :                 }
    4694             :         }
    4695       30600 :         if (ok == LOG_OK && cnt) {
    4696       30587 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4697       29410 :                         slot = s->segs->t->end;
    4698       29410 :                         s->segs->t->end += cnt;
    4699             :                 } else {
    4700        1177 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4701             :                                 ok = LOG_ERR;
    4702             :                         } else {
    4703        1218 :                                 if (!s->segs->h)
    4704           0 :                                         s->segs->h = s->segs->t;
    4705        1218 :                                 slot = s->segs->t->start;
    4706             :                         }
    4707             :                 }
    4708       30628 :                 if (ok == LOG_OK)
    4709       30628 :                         ok = add_offsets(slot, cnt, total, offset, offsets);
    4710             :         }
    4711       30503 :         if (!locked)
    4712       30509 :                 unlock_table(tr->store, t->base.id);
    4713             : 
    4714       30720 :         if (ok == LOG_OK) {
    4715             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4716       30732 :                 if (!in_transaction) {
    4717        1203 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4718        1203 :                         in_transaction = true;
    4719             :                 }
    4720       30732 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4721       30692 :                         tr->logchanges += (lng) total;
    4722       30715 :                 if (*offsets) {
    4723          17 :                         BAT *pos = *offsets;
    4724          17 :                         assert(BATcount(pos) == total);
    4725          17 :                         BATsetcount(pos, total); /* set other properties */
    4726           7 :                         pos->tnil = false;
    4727           7 :                         pos->tnonil = true;
    4728           7 :                         pos->tkey = true;
    4729           7 :                         pos->tsorted = true;
    4730           7 :                         pos->trevsorted = false;
    4731             :                 }
    4732             :         }
    4733       30693 :         return ok;
    4734             : }
    4735             : 
    4736             : static int
    4737     2100769 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4738             : {
    4739     2100769 :         if (cnt > 1 && offsets)
    4740       30434 :                 return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
    4741     2070335 :         assert(s->segs);
    4742     2070335 :         ulng oldest = store_oldest(tr->store, NULL);
    4743     2070344 :         BUN slot = 0;
    4744     2070344 :         int reused = 0;
    4745             : 
    4746     2070344 :         if (!locked)
    4747     1942236 :                 lock_table(tr->store, t->base.id);
    4748     2070361 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4749             :         /* naive vacuum approach, iterator through segments, check for large enough deleted segments
    4750             :          * or create new segment at the end */
    4751     9974508 :         for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4752     7977204 :                 if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* reuse old deleted or rolled back append */
    4753             : 
    4754       73063 :                         if ((seg->end - seg->start) >= cnt) {
    4755             : 
    4756             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4757       73063 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4758       56780 :                                         slot = p->end;
    4759       56780 :                                         p->end += cnt;
    4760       56780 :                                         seg->start += cnt;
    4761       56780 :                                         s->segs->nr_reused += cnt;
    4762       56780 :                                         reused = 1;
    4763       56780 :                                         break;
    4764             :                                 }
    4765             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4766       16283 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
    4767             :                                         ok = LOG_ERR;
    4768             :                                         break;
    4769             :                                 }
    4770             :                         }
    4771       16283 :                         seg->ts = tr->tid;
    4772       16283 :                         seg->deleted = false;
    4773       16283 :                         slot = seg->start;
    4774       16283 :                         s->segs->nr_reused += cnt;
    4775       16283 :                         reused = 1;
    4776       16283 :                         break;
    4777             :                 }
    4778             :         }
    4779     2070367 :         if (ok == LOG_OK && !reused) {
    4780     1997304 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4781     1962200 :                         slot = s->segs->t->end;
    4782     1962200 :                         s->segs->t->end += cnt;
    4783             :                 } else {
    4784       35104 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4785             :                                 ok = LOG_ERR;
    4786             :                         } else {
    4787       35104 :                                 if (!s->segs->h)
    4788           0 :                                         s->segs->h = s->segs->t;
    4789       35104 :                                 slot = s->segs->t->start;
    4790             :                         }
    4791             :                 }
    4792             :         }
    4793     2070367 :         if (!locked)
    4794     1942253 :                 unlock_table(tr->store, t->base.id);
    4795             : 
    4796     2070367 :         if (ok == LOG_OK) {
    4797             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4798     2070367 :                 if (!in_transaction) {
    4799       48996 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4800       48996 :                         in_transaction = true;
    4801             :                 }
    4802     2070367 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4803     2069914 :                         tr->logchanges += (lng) cnt;
    4804     2070367 :                 *offset = slot;
    4805             :         }
    4806             :         return ok;
    4807             : }
    4808             : 
    4809             : /*
    4810             :  * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
    4811             :  * of the global transaction and mark the newly added storage slots unused on the global
    4812             :  * level but used on the local transaction level. Besides this the local transaction needs
    4813             :  * to update (and mark unused) any slot in between the old end and new slots.
    4814             :  * */
    4815             : static int
    4816     1972694 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4817             : {
    4818     1972694 :         storage *s;
    4819             : 
    4820             :         /* we have a single segment structure for each persistent table
    4821             :          * for temporary tables each has its own */
    4822     1972694 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4823             :                 return LOG_ERR;
    4824             : 
    4825     1972787 :         return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
    4826             : }
    4827             : 
    4828             : /* some tables cannot be updated concurrently (user/roles etc) */
    4829             : static int
    4830      128118 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4831             : {
    4832      128118 :         storage *s;
    4833      128118 :         int res = 0;
    4834             : 
    4835             :         /* we have a single segment structure for each persistent table
    4836             :          * for temporary tables each has its own */
    4837      128118 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4838             :                 /* TODO check for other inserts ! */
    4839             :                 return LOG_ERR;
    4840             : 
    4841      128118 :         lock_table(tr->store, t->base.id);
    4842      128118 :         if ((res = segments_conflict(tr, s->segs, 1))) {
    4843           4 :                 unlock_table(tr->store, t->base.id);
    4844           4 :                 return LOG_CONFLICT;
    4845             :         }
    4846      128114 :         res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
    4847      128114 :         unlock_table(tr->store, t->base.id);
    4848      128114 :         return res;
    4849             : }
    4850             : 
    4851             : static int
    4852       20152 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
    4853             : {
    4854       20152 :         storage *s;
    4855       20152 :         int res = 0;
    4856             : 
    4857       20152 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4858             :                 return LOG_ERR;
    4859             : 
    4860       20152 :         lock_table(tr->store, t->base.id);
    4861       20152 :         res = segments_conflict(tr, s->segs, uncommitted);
    4862       20152 :         unlock_table(tr->store, t->base.id);
    4863       20152 :         return res ? LOG_CONFLICT : LOG_OK;
    4864             : }
    4865             : 
    4866             : static size_t
    4867     1617828 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
    4868             : {
    4869     1617828 :         size_t cnt = 0;
    4870             : 
    4871     1970567 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
    4872             :                 ;
    4873             : 
    4874     3998251 :         for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
    4875     2380410 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
    4876      157041 :                         cnt += s->end - s->start;
    4877             :         }
    4878     1617841 :         return cnt;
    4879             : }
    4880             : 
    4881             : static BAT *
    4882     1617789 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
    4883             : {
    4884     1617789 :         lock_table(tr->store, t->base.id);
    4885     1617840 :         segment *s = S->segs->h;
    4886             :         /* step one no deletes -> dense range */
    4887     1617840 :         uint32_t cur = 0;
    4888     1617840 :         size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
    4889     1617840 :         if (!dnr) {
    4890     1487639 :                 unlock_table(tr->store, t->base.id);
    4891     1487608 :                 return BATdense(start, start, end-start);
    4892             :         }
    4893             : 
    4894      130201 :         BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
    4895      130201 :         if (!b) {
    4896           0 :                 unlock_table(tr->store, t->base.id);
    4897           0 :                 return NULL;
    4898             :         }
    4899             : 
    4900      130201 :         uint32_t *restrict dst = Tloc(b, 0);
    4901    62294660 :         for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
    4902    62230392 :                 if (s->end < start)
    4903      262672 :                         continue;
    4904    61967720 :                 if (s->start >= end)
    4905             :                         break;
    4906    61901787 :                 msk m = (SEG_IS_VALID(s, tr));
    4907    61901787 :                 size_t lnr = s->end-s->start;
    4908    61901787 :                 if (s->start < start)
    4909       20052 :                         lnr -= (start - s->start);
    4910    61901787 :                 if (s->end > end)
    4911       15073 :                         lnr -= s->end - end;
    4912             : 
    4913    61901787 :                 if (m) {
    4914     1390169 :                         size_t used = pos&31, end = 32;
    4915     1390169 :                         if (used) {
    4916     1227882 :                                 if (lnr < (32-used))
    4917      750264 :                                         end = used + lnr;
    4918     1227882 :                                 assert(end > used);
    4919     1227882 :                                 cur |= ((1U << (end - used)) - 1) << used;
    4920     1227882 :                                 lnr -= end - used;
    4921     1227882 :                                 pos += end - used;
    4922     1227882 :                                 if (end == 32) {
    4923      477657 :                                         *dst++ = cur;
    4924      477657 :                                         cur = 0;
    4925             :                                 }
    4926             :                         }
    4927     1390169 :                         size_t full = lnr/32;
    4928     1390169 :                         size_t rest = lnr%32;
    4929     1390169 :                         if (full > 0) {
    4930      354622 :                                 memset(dst, ~0, full * sizeof(*dst));
    4931      354622 :                                 dst += full;
    4932      354622 :                                 lnr -= full * 32;
    4933      354622 :                                 pos += full * 32;
    4934             :                         }
    4935     1390169 :                         if (rest > 0) {
    4936      608352 :                                 cur |= (1U << rest) - 1;
    4937      608352 :                                 lnr -= rest;
    4938      608352 :                                 pos += rest;
    4939             :                         }
    4940     1390169 :                         assert(lnr==0);
    4941             :                 } else {
    4942    60511618 :                         size_t used = pos&31, end = 32;
    4943    60511618 :                         if (used) {
    4944    58629885 :                                 if (lnr < (32-used))
    4945    56639298 :                                         end = used + lnr;
    4946             : 
    4947    58629885 :                                 pos+= (end-used);
    4948    58629885 :                                 lnr-= (end-used);
    4949    58629885 :                                 if (end == 32) {
    4950     1990621 :                                         *dst++ = cur;
    4951     1990621 :                                         cur = 0;
    4952             :                                 }
    4953             :                         }
    4954    60511618 :                         size_t full = lnr/32;
    4955    60511618 :                         size_t rest = lnr%32;
    4956    60511618 :                         memset(dst, 0, full * sizeof(*dst));
    4957    60511618 :                         dst += full;
    4958    60511618 :                         lnr -= full * 32;
    4959    60511618 :                         pos += full * 32;
    4960    60511618 :                         pos+= rest;
    4961    60511618 :                         lnr-= rest;
    4962    60511618 :                         assert(lnr==0);
    4963             :                 }
    4964             :         }
    4965             : 
    4966      130201 :         unlock_table(tr->store, t->base.id);
    4967      130201 :         if (pos%32)
    4968      127094 :                 *dst=cur;
    4969      130201 :         BATsetcount(b, nr);
    4970      130201 :         bn = BATmaskedcands(start, nr, b, true);
    4971      130199 :         BBPreclaim(b);
    4972      130197 :         (void)pos;
    4973      130197 :         assert (pos == nr);
    4974             :         return bn;
    4975             : }
    4976             : 
    4977             : static void *                                   /* BAT * */
    4978     1623756 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
    4979             : {
    4980             :         /* with nr_of_parts - part_nr we can adjust parts */
    4981     1623756 :         storage *s = tab_timestamp_storage(tr, t);
    4982             : 
    4983     1624030 :         if (!s)
    4984             :                 return NULL;
    4985     1624030 :         size_t nr = segs_end(s->segs, tr, t);
    4986             : 
    4987     1624835 :         if (!nr)
    4988        7019 :                 return BATdense(0, 0, 0);
    4989             : 
    4990             :         /* compute proper part */
    4991     1617816 :         size_t part_size = nr/nr_of_parts;
    4992     1617816 :         size_t start = part_size * part_nr;
    4993     1617816 :         size_t end = start + part_size;
    4994     1617816 :         if (part_nr == (nr_of_parts-1))
    4995     1325849 :                 end = nr;
    4996     1617816 :         assert(end <= nr);
    4997     1617816 :         return segments2cands(s, tr, t, start, end);
    4998             : }
    4999             : 
    5000             : static int
    5001           5 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
    5002             : {
    5003           5 :         bool update_conflict = false;
    5004             : 
    5005           5 :         if (segments_in_transaction(tr, col->t))
    5006             :                 return LOG_CONFLICT;
    5007             : 
    5008           5 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    5009             : 
    5010           5 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    5011           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    5012           5 :         assert(d && d->cs.ts == tr->tid);
    5013           5 :         if (odelta != d)
    5014           5 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
    5015           5 :         if (d->cs.bid)
    5016           5 :                 temp_destroy(d->cs.bid);
    5017           5 :         if (d->cs.uibid)
    5018           5 :                 temp_destroy(d->cs.uibid);
    5019           5 :         if (d->cs.uvbid)
    5020           5 :                 temp_destroy(d->cs.uvbid);
    5021           5 :         bat_set_access(bn, BAT_READ);
    5022           5 :         d->cs.bid = temp_create(bn);
    5023           5 :         d->cs.uibid = 0;
    5024           5 :         d->cs.uvbid = 0;
    5025           5 :         d->cs.ucnt = 0;
    5026           5 :         d->cs.cleared = true;
    5027           5 :         d->cs.ts = tr->tid;
    5028           5 :         ATOMIC_INIT(&d->cs.refcnt, 1);
    5029           5 :         return LOG_OK;
    5030             : }
    5031             : 
    5032             : static int
    5033           5 : vacuum_col(sql_trans *tr, sql_column *c, bool force)
    5034             : {
    5035           5 :         if (segments_in_transaction(tr, c->t))
    5036             :                 return LOG_CONFLICT;
    5037             : 
    5038           5 :         sql_delta *d = NULL;
    5039             : 
    5040             :         /* do we have enough to clean */
    5041           5 :         if ((d = bind_col_data(tr, c, NULL)) == NULL)
    5042             :                 return LOG_CONFLICT;
    5043             : 
    5044             :         /* do we have enough to clean */
    5045           5 :         if (!force && (d->nr_updates) < 1024)
    5046             :                 return LOG_OK;
    5047             : 
    5048           5 :         BAT *b = NULL, *bn = NULL;;
    5049           5 :         if ((b = bind_col(tr, c, 0)) == NULL)
    5050             :                 return LOG_ERR;
    5051           5 :         if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
    5052           0 :                 BBPreclaim(b);
    5053           0 :                 return LOG_ERR;
    5054             :         }
    5055           5 :         int res = swap_bats(tr, c, bn);
    5056           5 :         d->nr_updates = 0;
    5057           5 :         BBPreclaim(b);
    5058           5 :         BBPreclaim(bn);
    5059           5 :         return res;
    5060             : }
    5061             : 
    5062             : static int
    5063           0 : vacuum_tab(sql_trans *tr, sql_table *t, bool force)
    5064             : {
    5065           0 :         if (segments_in_transaction(tr, t))
    5066             :                 return LOG_CONFLICT;
    5067             : 
    5068           0 :         storage *s;
    5069           0 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    5070             :                 return LOG_ERR;
    5071             : 
    5072           0 :         for( node *n = ol_first_node(t->columns); n; n = n->next) {
    5073           0 :                 sql_column *c = n->data;
    5074           0 :                 int type = c->type.multiset?TYPE_int:c->type.type->localtype;
    5075             : 
    5076           0 :                 if (!ATOMvarsized(type))
    5077           0 :                         continue;
    5078           0 :                 sql_delta *d = NULL;
    5079             : 
    5080             :                 /* do we have enough to clean */
    5081           0 :                 if ((d = bind_col_data(tr, c, NULL)) == NULL)
    5082             :                         return LOG_CONFLICT;
    5083             : 
    5084             :                 /* do we have enough to clean */
    5085           0 :                 if (!force && (d->nr_updates + s->segs->nr_reused) < 1024)
    5086           0 :                         continue;
    5087             : 
    5088           0 :                 BAT *b = NULL, *bn = NULL;;
    5089           0 :                 if ((b = bind_col(tr, c, 0)) == NULL)
    5090             :                         return LOG_ERR;
    5091           0 :                 if ((bn = COLcopy(b, b->ttype, true, PERSISTENT)) == NULL) {
    5092           0 :                         BBPreclaim(b);
    5093           0 :                         return LOG_ERR;
    5094             :                 }
    5095           0 :                 int res = swap_bats(tr, c, bn);
    5096           0 :                 d->nr_updates = 0;
    5097           0 :                 BBPreclaim(b);
    5098           0 :                 BBPreclaim(bn);
    5099           0 :                 if (res != LOG_OK)
    5100           0 :                         return res;
    5101             :         }
    5102           0 :         s->segs->nr_reused = 0;
    5103           0 :         return LOG_OK;
    5104             : }
    5105             : 
    5106             : 
    5107             : static int
    5108          58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
    5109             : {
    5110          58 :         bool update_conflict = false;
    5111             : 
    5112          58 :         if (segments_in_transaction(tr, col->t))
    5113             :                 return LOG_CONFLICT;
    5114             : 
    5115          58 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    5116             : 
    5117          58 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    5118           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    5119          58 :         assert(d && d->cs.ts == tr->tid);
    5120          58 :         assert(col->t->persistence != SQL_DECLARED_TABLE);
    5121          58 :         if (odelta != d)
    5122          58 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
    5123             : 
    5124          58 :         d->cs.st = st;
    5125          58 :         d->cs.cleared = true;
    5126          58 :         if (d->cs.bid)
    5127          58 :                 temp_destroy(d->cs.bid);
    5128          58 :         o = transfer_to_systrans(o);
    5129          58 :         if (o == NULL)
    5130             :                 return LOG_ERR;
    5131          58 :         bat_set_access(o, BAT_READ);
    5132          58 :         d->cs.bid = temp_create(o);
    5133          58 :         if (u) {
    5134          53 :                 if (d->cs.ebid)
    5135           0 :                         temp_destroy(d->cs.ebid);
    5136          53 :                 u = transfer_to_systrans(u);
    5137          53 :                 if (u == NULL)
    5138             :                         return LOG_ERR;
    5139          53 :                 d->cs.ebid = temp_create(u);
    5140             :         }
    5141             :         return LOG_OK;
    5142             : }
    5143             : 
    5144             : void
    5145         358 : bat_storage_init( store_functions *sf)
    5146             : {
    5147         358 :         sf->bind_col = &bind_col;
    5148         358 :         sf->bind_updates = &bind_updates;
    5149         358 :         sf->bind_updates_idx = &bind_updates_idx;
    5150         358 :         sf->bind_idx = &bind_idx;
    5151         358 :         sf->bind_cands = &bind_cands;
    5152             : 
    5153         358 :         sf->claim_tab = &claim_tab;
    5154         358 :         sf->key_claim_tab = &key_claim_tab;
    5155         358 :         sf->tab_validate = &tab_validate;
    5156             : 
    5157         358 :         sf->append_col = &append_col;
    5158         358 :         sf->append_idx = &append_idx;
    5159             : 
    5160         358 :         sf->update_col = &update_col;
    5161         358 :         sf->update_idx = &update_idx;
    5162             : 
    5163         358 :         sf->delete_tab = &delete_tab;
    5164             : 
    5165         358 :         sf->count_del = &count_del;
    5166         358 :         sf->count_col = &count_col;
    5167         358 :         sf->count_idx = &count_idx;
    5168         358 :         sf->dcount_col = &dcount_col;
    5169         358 :         sf->min_max_col = &min_max_col;
    5170         358 :         sf->set_stats_col = &set_stats_col;
    5171         358 :         sf->sorted_col = &sorted_col;
    5172         358 :         sf->unique_col = &unique_col;
    5173         358 :         sf->double_elim_col = &double_elim_col;
    5174         358 :         sf->col_stats = &col_stats;
    5175         358 :         sf->col_set_range = &col_set_range;
    5176         358 :         sf->col_not_null = &col_not_null;
    5177             : 
    5178         358 :         sf->col_dup = &col_dup;
    5179         358 :         sf->idx_dup = &idx_dup;
    5180         358 :         sf->del_dup = &del_dup;
    5181             : 
    5182         358 :         sf->create_col = &create_col;    /* create and add to change list */
    5183         358 :         sf->create_idx = &create_idx;
    5184         358 :         sf->create_del = &create_del;
    5185             : 
    5186         358 :         sf->destroy_col = &destroy_col;  /* free resources */
    5187         358 :         sf->destroy_idx = &destroy_idx;
    5188         358 :         sf->destroy_del = &destroy_del;
    5189             : 
    5190         358 :         sf->drop_col = &drop_col;                /* add drop to change list */
    5191         358 :         sf->drop_idx = &drop_idx;
    5192         358 :         sf->drop_del = &drop_del;
    5193             : 
    5194         358 :         sf->clear_table = &clear_table;
    5195             : 
    5196         358 :         sf->vacuum_col = &vacuum_col;
    5197         358 :         sf->vacuum_tab = &vacuum_tab;
    5198         358 :         sf->col_compress = &col_compress;
    5199         358 : }

Generated by: LCOV version 1.14