LCOV - code coverage report
Current view: top level - sql/storage - objectset.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 585 624 93.8 %
Date: 2025-03-25 21:27:32 Functions: 42 44 95.5 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024, 2025 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "sql_catalog.h"
      15             : #include "sql_storage.h"
      16             : 
      17             : #include "gdk_atoms.h"
      18             : 
      19             : struct versionhead ;
      20             : 
      21             : #define active                                                  (0)
      22             : #define under_destruction                               (1<<1)
      23             : #define block_destruction                               (1<<2)
      24             : #define deleted                                                 (1<<3)
      25             : #define rollbacked                                              (1<<4)
      26             : 
      27             : /* This objectversion owns its associated versionhead.
      28             :  * When this objectversion gets destroyed,
      29             :  * the cleanup procedure should also destroy the associated (name|id) based versionhead.*/
      30             : #define name_based_versionhead_owner    (1<<5)
      31             : #define id_based_versionhead_owner              (1<<6)
      32             : 
      33             : typedef struct objectversion {
      34             :         ulng ts;
      35             :         ATOMIC_TYPE state;
      36             :         sql_base *b; // base of underlying sql object
      37             :         struct objectset* os;
      38             :         struct objectversion    *name_based_older;
      39             :         struct objectversion    *name_based_newer;
      40             :         struct versionhead              *name_based_head;
      41             : 
      42             :         struct objectversion    *id_based_older;
      43             :         struct objectversion    *id_based_newer;
      44             :         struct versionhead              *id_based_head;
      45             : } objectversion;
      46             : 
      47             : typedef struct versionhead  {
      48             :     struct versionhead * prev;
      49             :     struct versionhead * next;
      50             :     objectversion* ov;
      51             : } versionhead ;
      52             : 
      53             : typedef struct objectset {
      54             :         ATOMIC_TYPE refcnt;
      55             :         allocator *sa;
      56             :         destroy_fptr destroy;
      57             :         MT_RWLock rw_lock;      /*readers-writer lock to protect the links (chains) in the objectversion chain.*/
      58             :         MT_Lock lock;   /* global objectset lock for os_add/del */
      59             :         versionhead  *name_based_h;
      60             :         versionhead  *name_based_t;
      61             :         versionhead  *id_based_h;
      62             :         versionhead  *id_based_t;
      63             :         int name_based_cnt;
      64             :         int id_based_cnt;
      65             :         struct sql_hash *name_map;
      66             :         struct sql_hash *id_map;
      67             :         bool
      68             :                 temporary:1,
      69             :                 unique:1, /* names are unique */
      70             :                 concurrent:1,   /* concurrent inserts are allowed */
      71             :             nested:1;
      72             :         sql_store store;
      73             : } objectset;
      74             : 
      75             : static int
      76      359251 : os_id_key(versionhead  *n)
      77             : {
      78      277775 :         return (int) BATatoms[TYPE_int].atomHash(&n->ov->b->id);
      79             : }
      80             : 
      81             : static inline void
      82    26623045 : lock_reader(objectset* os)
      83             : {
      84    26623045 :         MT_rwlock_rdlock(&os->rw_lock);
      85     1044605 : }
      86             : 
      87             : static inline void
      88    26738581 : unlock_reader(objectset* os)
      89             : {
      90    26738581 :         MT_rwlock_rdunlock(&os->rw_lock);
      91     4359468 : }
      92             : 
      93             : static inline void
      94      655479 : lock_writer(objectset* os)
      95             : {
      96      655479 :         MT_rwlock_wrlock(&os->rw_lock);
      97             : }
      98             : 
      99             : static inline void
     100      655479 : unlock_writer(objectset* os)
     101             : {
     102      655479 :         MT_rwlock_wrunlock(&os->rw_lock);
     103         424 : }
     104             : 
     105    23816404 : static bte os_atmc_get_state(objectversion *ov) {
     106    23816404 :         bte state = (bte) ATOMIC_GET(&ov->state);
     107    23816404 :         return state;
     108             : }
     109             : 
     110       69487 : static void os_atmc_set_state(objectversion *ov, bte state) {
     111       69487 :         ATOMIC_SET(&ov->state, state);
     112           0 : }
     113             : 
     114             : static versionhead  *
     115      473590 : find_id(objectset *os, sqlid id)
     116             : {
     117      473590 :         if (os) {
     118      473590 :                 lock_reader(os);
     119      473590 :                 if (os->id_map) {
     120      439178 :                         int key = (int) BATatoms[TYPE_int].atomHash(&id);
     121      439178 :                         sql_hash_e *he = os->id_map->buckets[key&(os->id_map->size-1)];
     122             : 
     123     1511464 :                         for (; he; he = he->chain) {
     124     1252872 :                                 versionhead  *n = he->value;
     125             : 
     126     1252872 :                                 if (n && n->ov->b->id == id) {
     127      180586 :                                         unlock_reader(os);
     128      180586 :                                         return n;
     129             :                                 }
     130             :                         }
     131      258592 :                         unlock_reader(os);
     132      258592 :                         return NULL;
     133             :                 }
     134             : 
     135      104590 :                 for (versionhead  *n = os->id_based_h; n; n = n->next) {
     136       72822 :                         objectversion *ov = n->ov;
     137             : 
     138             :                         /* check if ids match */
     139       72822 :                         if (id == ov->b->id) {
     140        2644 :                                 unlock_reader(os);
     141        2644 :                                 return n;
     142             :                         }
     143             :                 }
     144       31768 :                 unlock_reader(os);
     145             :         }
     146             : 
     147             :         return NULL;
     148             : }
     149             : 
     150             : // TODO copy of static function from sql_list.c. Needs to be made external
     151             : static void
     152       30565 : hash_delete(sql_hash *h, void *data)
     153             : {
     154       30565 :         int key = h->key(data);
     155       30565 :         sql_hash_e *e, *p = h->buckets[key&(h->size-1)];
     156             : 
     157       30565 :         e = p;
     158       47358 :         for (;  p && p->value != data ; p = p->chain)
     159       16793 :                 e = p;
     160       30565 :         if (p && p->value == data) {
     161       30565 :                 if (p == e)
     162       26010 :                         h->buckets[key&(h->size-1)] = p->chain;
     163             :                 else
     164        4555 :                         e->chain = p->chain;
     165       30565 :                 if (!h->sa)
     166       30565 :                         _DELETE(p);
     167             :         }
     168       30565 :         h->entries--;
     169       30565 : }
     170             : 
     171             : static void
     172      578111 : node_destroy(objectset *os, sqlstore *store, versionhead  *n)
     173             : {
     174      578111 :         if (!os->sa)
     175      578111 :                 _DELETE(n);
     176      578111 :         (void)store;
     177      578111 : }
     178             : 
     179             : static versionhead  *
     180       15934 : os_remove_name_based_chain(objectset *os, objectversion* ov)
     181             : {
     182       15934 :         lock_writer(os);
     183       15934 :         versionhead  *n = ov->name_based_head;
     184       15934 :         versionhead  *p = os->name_based_h;
     185       15934 :         if (p != n)
     186     1202795 :                 while (p && p->next != n)
     187             :                         p = p->next;
     188       15934 :         assert(p==n||(p && p->next == n));
     189       15934 :         if (p == n) {
     190        1383 :                 os->name_based_h = n->next;
     191        1383 :                 if (os->name_based_h) // i.e. non-empty os
     192         832 :                         os->name_based_h->prev = NULL;
     193             :                 p = NULL;
     194       14551 :         } else if ( p != NULL)  {
     195       14551 :                 p->next = n->next;
     196       14551 :                 if (p->next) // node in the middle
     197        5612 :                         p->next->prev = p;
     198             :         }
     199       15934 :         if (n == os->name_based_t)
     200        9490 :                 os->name_based_t = p;
     201             : 
     202       15934 :         if (os->name_map && n)
     203       14840 :                 hash_delete(os->name_map, n);
     204             : 
     205       15934 :         os->name_based_cnt--;
     206       15934 :         unlock_writer(os);
     207             : 
     208       15934 :         bte state = os_atmc_get_state(ov);
     209       15934 :         state |= name_based_versionhead_owner;
     210       15934 :         os_atmc_set_state(ov, state);
     211       15934 :         return p;
     212             : }
     213             : 
     214             : static versionhead  *
     215       16299 : os_remove_id_based_chain(objectset *os, objectversion* ov)
     216             : {
     217       16299 :         lock_writer(os);
     218       16299 :         versionhead  *n = ov->id_based_head;
     219       16299 :         versionhead  *p = os->id_based_h;
     220             : 
     221       16299 :         if (p != n)
     222     1231369 :                 while (p && p->next != n)
     223             :                         p = p->next;
     224       16299 :         assert(p==n||(p && p->next == n));
     225       16299 :         if (p == n) {
     226        1383 :                 os->id_based_h = n->next;
     227        1383 :                 if (os->id_based_h) // i.e. non-empty os
     228         832 :                         os->id_based_h->prev = NULL;
     229             :                 p = NULL;
     230       14916 :         } else if ( p != NULL)  {
     231       14916 :                 p->next = n->next;
     232       14916 :                 if (p->next) // node in the middle
     233        5978 :                         p->next->prev = p;
     234             :         }
     235       16299 :         if (n == os->id_based_t)
     236        9489 :                 os->id_based_t = p;
     237             : 
     238       16299 :         if (os->id_map && n)
     239       15725 :                 hash_delete(os->id_map, n);
     240             : 
     241       16299 :         os->name_based_cnt--;
     242       16299 :         unlock_writer(os);
     243             : 
     244       16299 :         bte state = os_atmc_get_state(ov);
     245       16299 :         state |= id_based_versionhead_owner;
     246       16299 :         os_atmc_set_state(ov, state);
     247       16299 :         return p;
     248             : }
     249             : 
     250             : static versionhead  *
     251      579275 : node_create(allocator *sa, objectversion *ov)
     252             : {
     253      579275 :         versionhead  *n = SA_NEW(sa, versionhead );
     254             : 
     255      579275 :         if (n == NULL)
     256             :                 return NULL;
     257      579275 :         *n = (versionhead ) {
     258             :                 .ov = ov,
     259             :         };
     260      579275 :         return n;
     261             : }
     262             : 
     263             : static inline int
     264      347352 : os_name_key(versionhead  *n)
     265             : {
     266      347352 :         return hash_key(n->ov->b->name);
     267             : }
     268             : 
     269             : static objectset *
     270      289455 : os_append_node_name(objectset *os, versionhead  *n)
     271             : {
     272      289455 :         lock_writer(os);
     273      289455 :         if ((!os->name_map || os->name_map->size*16 < os->name_based_cnt) && os->name_based_cnt > HASH_MIN_SIZE) {
     274        4148 :                 hash_destroy(os->name_map);
     275        4148 :                 os->name_map = hash_new(os->sa, os->name_based_cnt, (fkeyvalue)& os_name_key);
     276        4148 :                 if (os->name_map == NULL) {
     277           0 :                         unlock_writer(os);
     278           0 :                         return NULL;
     279             :                 }
     280             : 
     281       75489 :                 for (versionhead  *n = os->name_based_h; n; n = n->next ) {
     282       71341 :                         int key = os_name_key(n);
     283             : 
     284       71341 :                         if (hash_add(os->name_map, key, n) == NULL) {
     285           0 :                                 unlock_writer(os);
     286           0 :                                 return NULL;
     287             :                         }
     288             :                 }
     289             :         }
     290             : 
     291      289455 :         if (os->name_map) {
     292      261171 :                 int key = os->name_map->key(n);
     293             : 
     294      261171 :                 if (hash_add(os->name_map, key, n) == NULL) {
     295           0 :                         unlock_writer(os);
     296           0 :                         return NULL;
     297             :                 }
     298             :         }
     299             : 
     300      289455 :         if (os->name_based_t) {
     301      280700 :                 os->name_based_t->next = n;
     302             :         } else {
     303        8755 :                 os->name_based_h = n;
     304             :         }
     305      289455 :         n->prev = os->name_based_t; // aka the double linked list.
     306      289455 :         os->name_based_t = n;
     307      289455 :         os->name_based_cnt++;
     308      289455 :         unlock_writer(os);
     309      289455 :         return os;
     310             : }
     311             : 
     312             : static objectset *
     313      289455 : os_append_name(objectset *os, objectversion *ov)
     314             : {
     315      289455 :         versionhead  *n = node_create(os->sa, ov);
     316             : 
     317      289455 :         if (n == NULL)
     318             :                 return NULL;
     319             : 
     320      289455 :         ov->name_based_head = n;
     321      289455 :         if (!(os = os_append_node_name(os, n))){
     322           0 :                 _DELETE(n);
     323           0 :                 return NULL;
     324             :         }
     325             : 
     326             :         return os;
     327             : }
     328             : 
     329             : static void
     330        4263 : os_append_id_map(objectset *os)
     331             : {
     332        4263 :         if (os->id_map)
     333         495 :                 hash_destroy(os->id_map);
     334        4263 :         os->id_map = hash_new(os->sa, os->id_based_cnt, (fkeyvalue)&os_id_key);
     335        4263 :         if (os->id_map == NULL)
     336             :                 return ;
     337       85739 :         for (versionhead  *n = os->id_based_h; n; n = n->next ) {
     338       81476 :                 int key = os_id_key(n);
     339             : 
     340       81476 :                 if (hash_add(os->id_map, key, n) == NULL) {
     341           0 :                         hash_destroy(os->id_map);
     342           0 :                         os->id_map = NULL;
     343           0 :                         return ;
     344             :                 }
     345             :         }
     346             : }
     347             : 
     348             : static objectset *
     349      289820 : os_append_node_id(objectset *os, versionhead  *n)
     350             : {
     351      289820 :         lock_writer(os);
     352      289820 :         if ((!os->id_map || os->id_map->size*16 < os->id_based_cnt) && os->id_based_cnt > HASH_MIN_SIZE)
     353        4263 :                 os_append_id_map(os); /* on failure just fall back to slow method */
     354             : 
     355      289820 :         if (os->id_map) {
     356      262050 :                 int key = os->id_map->key(n);
     357      262050 :                 if (hash_add(os->id_map, key, n) == NULL) {
     358           0 :                         hash_destroy(os->id_map);
     359           0 :                         os->id_map = NULL;
     360             :                         /* fall back to slow search */
     361             :                 }
     362             :         }
     363             : 
     364      289820 :         if (os->id_based_t) {
     365      281065 :                 os->id_based_t->next = n;
     366             :         } else {
     367        8755 :                 os->id_based_h = n;
     368             :         }
     369      289820 :         n->prev = os->id_based_t; // aka the double linked list.
     370      289820 :         os->id_based_t = n;
     371      289820 :         os->id_based_cnt++;
     372      289820 :         unlock_writer(os);
     373      289820 :         return os;
     374             : }
     375             : 
     376             : static objectset *
     377      289820 : os_append_id(objectset *os, objectversion *ov)
     378             : {
     379      289820 :         versionhead  *n = node_create(os->sa, ov);
     380             : 
     381      289820 :         if (n == NULL)
     382             :                 return NULL;
     383      289820 :         ov->id_based_head = n;
     384      289820 :         if (!(os = os_append_node_id(os, n))){
     385           0 :                 _DELETE(n);
     386           0 :                 return NULL;
     387             :         }
     388             : 
     389             :         return os;
     390             : }
     391             : 
     392             : static versionhead * find_name(objectset *os, const char *name);
     393             : 
     394             : static void
     395      310832 : objectversion_destroy(sqlstore *store, objectset* os, objectversion *ov)
     396             : {
     397             : 
     398      310832 :         bte state = os_atmc_get_state(ov);
     399             : 
     400      310832 :         if (state & name_based_versionhead_owner) {
     401       15934 :                 node_destroy(ov->os, store, ov->name_based_head);
     402             :         }
     403             : 
     404      310832 :         if (state & id_based_versionhead_owner) {
     405       16299 :                 node_destroy(ov->os, store, ov->id_based_head);
     406             :         }
     407             : 
     408      310832 :         if (os->destroy && ov->b)
     409      284697 :                 os->destroy(store, ov->b);
     410             : 
     411      310832 :         if (os->temporary && (state & deleted || state & under_destruction || state & rollbacked))
     412         205 :                 os_destroy(os, store); // TODO transaction_layer_revamp: embed into refcounting subproject : reference is already dropped by os_cleanup
     413      310832 :         _DELETE(ov);
     414      310832 : }
     415             : 
     416             : static void
     417        6127 : _os_rollback(objectversion *ov, sqlstore *store)
     418             : {
     419        6127 :         assert(ov->ts >= TRANSACTION_ID_BASE);
     420             : 
     421        6127 :         bte state = os_atmc_get_state(ov);
     422        6127 :         if (state & rollbacked) {
     423             :                 return;
     424             :         }
     425             : 
     426        5627 :         state |= rollbacked;
     427        5627 :         os_atmc_set_state(ov, state);
     428             : 
     429        5627 :         bte state_older;
     430             : 
     431             :         /*
     432             :          * We have to use the readers-writer lock here,
     433             :          * since the pointer containing the address of the older objectversion might be concurrently overwritten if the older itself has just been put in the under_destruction state .
     434             :          */
     435        5627 :         lock_reader(ov->os);
     436        5627 :         objectversion* name_based_older = ov->name_based_older;
     437        5627 :         unlock_reader(ov->os);
     438             : 
     439        5627 :         if (name_based_older && !((state_older= os_atmc_get_state(name_based_older)) & rollbacked)) {
     440         759 :                 if (ov->ts != name_based_older->ts) {
     441             :                         // older is last committed state or belongs to parent transaction.
     442             :                         // In any case, we restore versionhead pointer to that.
     443             : 
     444         264 :                         ATOMIC_BASE_TYPE expected_deleted = deleted;
     445         264 :                         if (state_older == active || (state_older == deleted && ATOMIC_CAS(&name_based_older->state, &expected_deleted, block_destruction))) {
     446         264 :                                 ov->name_based_head->ov = name_based_older;
     447         264 :                                 name_based_older->name_based_newer=NULL;
     448         264 :                                 if (state_older != active && expected_deleted == deleted)
     449           0 :                                         os_atmc_set_state(name_based_older, deleted); //Restore the deleted older back to its deleted state.
     450             :                         }
     451             :                 }
     452             :                 else {
     453         495 :                         _os_rollback(name_based_older, store);
     454             :                 }
     455             :         }
     456        4868 :         else if (!name_based_older) {
     457             :                 // this is a terminal node. i.e. this objectversion does not have name based committed history
     458        4868 :                 if (ov->name_based_head) // The opposite can happen during an early conflict in os_add or os_del.
     459        4862 :                         os_remove_name_based_chain(ov->os, ov);
     460             :         }
     461             : 
     462             :         /*
     463             :          * We have to use the readers-writer lock here,
     464             :          * since the pointer containing the address of the older objectversion might be concurrently overwritten if the older itself has just been put in the under_destruction state .
     465             :          */
     466        5627 :         lock_reader(ov->os);
     467        5627 :         objectversion* id_based_older = ov->id_based_older;
     468        5627 :         unlock_reader(ov->os);
     469        5627 :         if (id_based_older && !((state_older= os_atmc_get_state(id_based_older)) & rollbacked)) {
     470         269 :                 if (ov->ts != id_based_older->ts) {
     471             :                         // older is last committed state or belongs to parent transaction.
     472             :                         // In any case, we restore versionhead pointer to that.
     473             : 
     474         264 :                         ATOMIC_BASE_TYPE expected_deleted = deleted;
     475         264 :                         if (state_older == active || (state_older == deleted && ATOMIC_CAS(&id_based_older->state, &expected_deleted, block_destruction))) {
     476         264 :                                 ov->id_based_head->ov = id_based_older;
     477         264 :                                 id_based_older->id_based_newer=NULL;
     478         264 :                                 if (state_older != active && expected_deleted == deleted)
     479           0 :                                         os_atmc_set_state(id_based_older, deleted); //Restore the deleted older back to its deleted state.
     480             :                         }
     481             :                 }
     482           5 :                 else if (id_based_older != name_based_older)
     483           5 :                         _os_rollback(id_based_older, store);
     484             :         }
     485        4905 :         else if (!id_based_older) {
     486             :                 // this is a terminal node. i.e. this objectversion does not have id based committed history
     487        4905 :                 os_remove_id_based_chain(ov->os, ov);
     488             :         }
     489             : 
     490        5627 :         if (ov->name_based_newer && !(os_atmc_get_state(ov->name_based_newer) & rollbacked)) {
     491           0 :                 _os_rollback(ov->name_based_newer, store);
     492             :         }
     493             : 
     494        5627 :         if (ov->id_based_newer && ov->id_based_newer != ov->name_based_newer && !(os_atmc_get_state(ov->id_based_newer) & rollbacked)) {
     495           0 :                 _os_rollback(ov->id_based_newer, store);
     496             :         }
     497             : }
     498             : 
     499             : static int
     500        5627 : os_rollback(objectversion *ov, sqlstore *store)
     501             : {
     502        5627 :         _os_rollback(ov, store);
     503             : 
     504        5627 :         return LOG_OK;
     505             : }
     506             : 
     507             : static void
     508       26135 : ov_destroy_obj_recursive(sqlstore* store, objectversion *ov)
     509             : {
     510       26135 :         if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
     511       14886 :                 ov_destroy_obj_recursive(store, ov->id_based_older);
     512             :         }
     513       26135 :         if (ov->os->destroy && ov->b) {
     514       26135 :                 ov->os->destroy(store, ov->b);
     515       26135 :                 ov->b = NULL;
     516             :         }
     517       26135 : }
     518             : 
     519             : static inline void
     520       11445 : try_to_mark_deleted_for_destruction(sqlstore* store, objectversion *ov)
     521             : {
     522       11445 :         ATOMIC_BASE_TYPE expected_deleted = deleted;
     523       11445 :         if (ATOMIC_CAS(&ov->state, &expected_deleted, under_destruction)) {
     524             : 
     525       11445 :                 if (!ov->name_based_newer || (os_atmc_get_state(ov->name_based_newer) & rollbacked)) {
     526       11072 :                         os_remove_name_based_chain(ov->os, ov);
     527             :                 }
     528             :                 else {
     529         373 :                         lock_writer(ov->os);
     530         373 :                         ov->name_based_newer->name_based_older = NULL;
     531         373 :                         unlock_writer(ov->os);
     532             :                 }
     533             : 
     534       11445 :                 if (!ov->id_based_newer || (os_atmc_get_state(ov->id_based_newer) & rollbacked)) {
     535       11394 :                         os_remove_id_based_chain(ov->os, ov);
     536             :                 }
     537             :                 else {
     538          51 :                         lock_writer(ov->os);
     539          51 :                         ov->id_based_newer->id_based_older = NULL;
     540          51 :                         unlock_writer(ov->os);
     541             :                 }
     542             : 
     543       11445 :                 ov->ts = store_get_timestamp(store)+1;
     544       11445 :                 if (!ov->os->nested)
     545       11249 :                         ov_destroy_obj_recursive(store, ov);
     546             :         }
     547       11445 : }
     548             : 
     549             : static void
     550       31770 : objectversion_destroy_recursive(sqlstore* store, objectversion *ov)
     551             : {
     552       31770 :         if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
     553       15082 :                 objectversion_destroy_recursive(store, ov->id_based_older);
     554             :         }
     555       31770 :         objectversion_destroy(store, ov->os, ov);
     556       31770 : }
     557             : 
     558             : static int
     559      339395 : os_cleanup(sqlstore* store, objectversion *ov, ulng oldest)
     560             : {
     561      339395 :         if (os_atmc_get_state(ov) & under_destruction) {
     562       15770 :                 if (ov->ts < oldest) {
     563             :                         // This one is ready to be freed
     564       11445 :                         objectversion_destroy_recursive(store, ov);
     565       11445 :                         return LOG_ERR;
     566             :                 }
     567             : 
     568             :                 // not yet old enough to be safely removed. Try later.
     569             :                 return LOG_OK;
     570             :         }
     571             : 
     572      323625 :         if (os_atmc_get_state(ov) & rollbacked) {
     573       17756 :                 if (ov->ts < oldest) {
     574             :                         // This one is ready to be freed
     575        5627 :                         if (ov->name_based_older && ov->name_based_older->name_based_newer == ov)
     576         495 :                                 ov->name_based_older->name_based_newer=NULL;
     577        5627 :                         if (ov->id_based_older && ov->id_based_older->id_based_newer == ov)
     578         458 :                                 ov->id_based_older->id_based_newer=NULL;
     579        5627 :                         objectversion_destroy(store, ov->os, ov);
     580        5627 :                         return LOG_ERR;
     581             :                 }
     582             : 
     583       12129 :                 if (ov->ts > TRANSACTION_ID_BASE) {
     584             :                         /* We mark it with the latest possible starttime and reinsert it into the cleanup list.
     585             :                          * This will cause a safe eventual destruction of this rollbacked ov.
     586             :                          */
     587        5627 :                         ov->ts = store_get_timestamp(store)+1;
     588             :                 }
     589             : 
     590             :                 // not yet old enough to be safely removed. Try later.
     591       12129 :                 return LOG_OK;
     592             :         }
     593             : 
     594      305869 :         if (os_atmc_get_state(ov) == deleted) {
     595       11527 :                 if (ov->ts <= oldest) {
     596             :                         // the oldest relevant state is deleted so lets try to mark it as destroyed
     597       11445 :                         try_to_mark_deleted_for_destruction(store, ov);
     598       11445 :                         return LOG_OK+1;
     599             :                 }
     600             : 
     601             :                 // Keep it inplace on the cleanup list, either because it is now marked for destruction or
     602             :                 // we want to retry marking it for destruction later.
     603             :                 return LOG_OK;
     604             :         }
     605             : 
     606      294342 :         assert(os_atmc_get_state(ov) != deleted && os_atmc_get_state(ov) != under_destruction && os_atmc_get_state(ov) != rollbacked);
     607      294342 :         if (ov->os->temporary) os_destroy(ov->os, store); // TODO transaction_layer_revamp: embed into refcounting subproject: (old) live versions should drop their reference to the os
     608             : 
     609      303718 :         while (ov->id_based_older && ov->id_based_older == ov->name_based_older && ov->ts >= oldest) {
     610             :                 ov = ov->id_based_older;
     611             :         }
     612             : 
     613      294342 :         if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
     614             :                 // Destroy everything older then the oldest possibly relevant objectversion.
     615        5243 :                 objectversion_destroy_recursive(store, ov->id_based_older);
     616        5243 :                 ov->id_based_older = NULL;
     617             :         }
     618             : 
     619             :         return LOG_ERR;
     620             : }
     621             : 
     622             : static int
     623      339402 : tc_gc_objectversion(sql_store store, sql_change *change, ulng oldest)
     624             : {
     625             : //      assert(!change->handled);
     626      339402 :         objectversion *ov = (objectversion*)change->data;
     627             : 
     628      339402 :         if (oldest >= TRANSACTION_ID_BASE)
     629             :                 return 0;
     630      339395 :         int res = os_cleanup( (sqlstore*) store, ov, oldest);
     631      339395 :         change->handled = (res)?true:false;
     632      339395 :         return res>=0?LOG_OK:LOG_ERR;
     633             : }
     634             : 
     635             : static int
     636      311421 : tc_commit_objectversion(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
     637             : {
     638      311421 :         objectversion *ov = (objectversion*)change->data;
     639      311421 :         if (commit_ts) {
     640      305794 :                 assert(ov->ts == tr->tid);
     641      305794 :                 ov->ts = commit_ts;
     642      305794 :                 change->committed = commit_ts < TRANSACTION_ID_BASE ? true: false;
     643      305794 :                 (void)oldest;
     644      305794 :                 if (!tr->parent)
     645      305787 :                         change->obj->new = 0;
     646      305794 :                 if (!ov->os->temporary)
     647      301145 :                         ATOMIC_INC(&tr->cat->schema_version);
     648             :         } else {
     649        5627 :                 os_rollback(ov, tr->store);
     650             :         }
     651             : 
     652      311421 :         return LOG_OK;
     653             : }
     654             : 
     655             : objectset *
     656       61296 : os_new(allocator *sa, destroy_fptr destroy, bool temporary, bool unique, bool concurrent, bool nested, sql_store store)
     657             : {
     658       61296 :         assert(!sa);
     659       61296 :         objectset *os = SA_NEW(sa, objectset);
     660       61296 :         if (os) {
     661       61296 :                 *os = (objectset) {
     662             :                         .refcnt = ATOMIC_VAR_INIT(1),
     663             :                         .sa = sa,
     664             :                         .destroy = destroy,
     665             :                         .temporary = temporary,
     666             :                         .unique = unique,
     667             :                         .concurrent = concurrent,
     668             :                         .nested = nested,
     669             :                         .store = store
     670             :                 };
     671       61296 :                 os->destroy = destroy;
     672       61296 :                 MT_rwlock_init(&os->rw_lock, "sa_readers_lock");
     673       61296 :                 MT_lock_init(&os->lock, "single_writer_lock");
     674             :         }
     675             : 
     676       61296 :         return os;
     677             : }
     678             : 
     679             : objectset *
     680        4920 : os_dup(objectset *os)
     681             : {
     682        4920 :         ATOMIC_INC(&os->refcnt);
     683        4920 :         return os;
     684             : }
     685             : 
     686             : void
     687       66158 : os_destroy(objectset *os, sql_store store)
     688             : {
     689       66158 :         if (ATOMIC_DEC(&os->refcnt) > 0)
     690             :                 return;
     691       61238 :         MT_lock_destroy(&os->lock);
     692       61238 :         MT_rwlock_destroy(&os->rw_lock);
     693       61238 :         versionhead* n=os->id_based_h;
     694      334177 :         while(n) {
     695      272939 :                 objectversion *ov = n->ov;
     696      546374 :                 while(ov) {
     697      273435 :                         objectversion *older = ov->id_based_older;
     698      273435 :                         objectversion_destroy(store, os, ov);
     699      273435 :                         ov = older;
     700             :                 }
     701      272939 :                 versionhead* hn =n->next;
     702      272939 :                 node_destroy(os, store, n);
     703      272939 :                 n = hn;
     704             :         }
     705             : 
     706       61238 :         n=os->name_based_h;
     707      334177 :         while(n) {
     708      272939 :                 versionhead* hn =n->next;
     709      272939 :                 node_destroy(os, store, n);
     710      272939 :                 n = hn;
     711             :         }
     712             : 
     713       61238 :         if (os->id_map)
     714        3758 :                 hash_destroy(os->id_map);
     715             : 
     716       61238 :         if (os->name_map)
     717        3731 :                 hash_destroy(os->name_map);
     718             : 
     719       61238 :         if (!os->sa)
     720       61238 :                 _DELETE(os);
     721             : }
     722             : 
     723             : static versionhead  *
     724    19583098 : find_name(objectset *os, const char *name)
     725             : {
     726    19583098 :         lock_reader(os);
     727    19697462 :         if (os->name_map) {
     728    16051828 :                 int key = hash_key(name);
     729    16051828 :                 sql_hash_e *he = os->name_map->buckets[key&(os->name_map->size-1)];
     730             : 
     731    88948054 :                 for (; he; he = he->chain) {
     732    88573382 :                         versionhead  *n = he->value;
     733             : 
     734    88573382 :                         if (n && n->ov->b->name && strcmp(n->ov->b->name, name) == 0) {
     735    15677156 :                                 unlock_reader(os);
     736    15677156 :                                 return n;
     737             :                         }
     738             :                 }
     739      374672 :                 unlock_reader(os);
     740      374672 :                 return NULL;
     741             :         }
     742             : 
     743     4352151 :         for (versionhead  *n = os->name_based_h; n; n = n->next) {
     744     4046947 :                 objectversion *ov = n->ov;
     745             : 
     746             :                 /* check if names match */
     747     4046947 :                 if (name[0] == ov->b->name[0] && strcmp(name, ov->b->name) == 0) {
     748     3340430 :                         unlock_reader(os);
     749     3340430 :                         return n;
     750             :                 }
     751             :         }
     752             : 
     753      305204 :         unlock_reader(os);
     754      305204 :         return NULL;
     755             : }
     756             : 
     757             : static objectversion*
     758    20850276 : get_valid_object_name(sql_trans *tr, objectversion *ov, bool lock)
     759             : {
     760    20850443 :         while(ov) {
     761    20850412 :                 if (ov->ts == tr->tid || (tr->parent && tr_version_of_parent(tr, ov->ts)) || ov->ts < tr->ts)
     762    20828929 :                         return ov;
     763             :                 else {
     764         167 :                         if (lock)
     765         153 :                                 lock_reader(ov->os);
     766         167 :                         objectversion* name_based_older = ov->name_based_older;
     767          14 :                         if (lock)
     768         153 :                                 unlock_reader(ov->os);
     769             :                         ov = name_based_older;
     770             :                 }
     771             :         }
     772             :         return ov;
     773             : }
     774             : 
     775             : static objectversion*
     776      590921 : get_valid_object_id(sql_trans *tr, objectversion *ov, bool lock)
     777             : {
     778      591083 :         while(ov) {
     779      590974 :                 if (ov->ts == tr->tid || (tr->parent && tr_version_of_parent(tr, ov->ts))  || ov->ts < tr->ts)
     780      590812 :                         return ov;
     781             :                 else {
     782         162 :                         if (lock)
     783          40 :                                 lock_reader(ov->os);
     784         162 :                         objectversion* id_based_older = ov->id_based_older;
     785         122 :                         if (lock)
     786          40 :                                 unlock_reader(ov->os);
     787             :                         ov = id_based_older;
     788             :                 }
     789             :         }
     790             :         return ov;
     791             : }
     792             : 
     793             : static int
     794      299492 : os_add_name_based(objectset *os, struct sql_trans *tr, const char *name, objectversion *ov) {
     795      299492 :         versionhead  *name_based_node = NULL;
     796      299492 :         if (ov->id_based_older && strcmp(ov->id_based_older->b->name, name) == 0)
     797        9648 :                 name_based_node = ov->id_based_older->name_based_head;
     798      289844 :         else if (os->unique) // Previous name based objectversion is of a different id, so now we do have to perform an extensive look up
     799       87869 :                 name_based_node = find_name(os, name);
     800             :         // else names are not unique and each id based version head maps to its own name based version head.
     801             : 
     802       97517 :         if (name_based_node) {
     803       10037 :                 objectversion *co = name_based_node->ov;
     804       10037 :                 objectversion *oo = get_valid_object_name(tr, co, true);
     805       10037 :                 if (co != oo) { /* conflict ? */
     806           6 :                         TRC_WARNING(SQL_STORE, "%s" "if (co != oo) { /* conflict ? */", __func__);
     807           6 :                         return -3;
     808             :                 }
     809             : 
     810       10031 :                 assert(ov != oo); // Time loops are not allowed
     811             : 
     812       10031 :                 bte state = os_atmc_get_state(oo);
     813       10031 :                 if (state != active) {
     814             :                         // This can only happen if the parent oo was a committed deleted at some point.
     815         416 :                         assert(state == deleted || state == under_destruction || state == block_destruction);
     816             :                         /* Since our parent oo is committed deleted objectversion, we might have a conflict with
     817             :                         * another transaction that tries to clean up oo or also wants to add a new objectversion.
     818             :                         */
     819         416 :                         ATOMIC_BASE_TYPE expected_deleted = deleted;
     820         416 :                         if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) {
     821           0 :                                 TRC_WARNING(SQL_STORE, "%s: " "if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) { /*conflict with cleaner or write-write conflict*/ ", __func__);
     822           0 :                                 return -3; /*conflict with cleaner or write-write conflict*/
     823             :                         }
     824             :                 }
     825             : 
     826             :                 /* new object with same name within transaction, should have a delete in between */
     827        9615 :                 assert(!(state == active && oo->ts == ov->ts && !(os_atmc_get_state(ov) & deleted)));
     828             : 
     829       10031 :                 lock_writer(os);
     830       10031 :                 ov->name_based_head = oo->name_based_head;
     831       10031 :                 ov->name_based_older = oo;
     832             : 
     833       10031 :                 name_based_node->ov = ov;
     834       10031 :                 if (oo) {
     835       10031 :                         oo->name_based_newer = ov;
     836             :                         // if the parent was originally deleted, we restore it to that state.
     837       10031 :                         os_atmc_set_state(oo, state);
     838             :                 }
     839       10031 :                 unlock_writer(os);
     840       10031 :                 return 0;
     841             :         } else { /* new */
     842      289455 :                 if (os_append_name(os, ov) == NULL)
     843             :                         return -1; // MALLOC_FAIL
     844             :                 return 0;
     845             :         }
     846             : }
     847             : 
     848             : static int
     849      299502 : os_add_id_based(objectset *os, struct sql_trans *tr, sqlid id, objectversion *ov) {
     850      299502 :         versionhead  *id_based_node;
     851             : 
     852      299502 :         id_based_node = find_id(os, id);
     853             : 
     854      299502 :         if (id_based_node) {
     855        9682 :                 objectversion *co = id_based_node->ov;
     856        9682 :                 objectversion *oo = get_valid_object_id(tr, co, true);
     857        9682 :                 if (co != oo) { /* conflict ? */
     858          10 :                         TRC_WARNING(SQL_STORE, "%s" "if (co != oo) { /* conflict ? */", __func__);
     859          10 :                         return -3;
     860             :                 }
     861             : 
     862        9672 :                 assert(ov != oo); // Time loops are not allowed
     863             : 
     864        9672 :                 bte state = os_atmc_get_state(oo);
     865        9672 :                 if (state != active) {
     866             :                         // This can only happen if the parent oo was a committed deleted at some point.
     867          57 :                         assert(state == deleted || state == under_destruction || state == block_destruction);
     868             :                         /* Since our parent oo is committed deleted objectversion, we might have a conflict with
     869             :                         * another transaction that tries to clean up oo or also wants to add a new objectversion.
     870             :                         */
     871          57 :                         ATOMIC_BASE_TYPE expected_deleted = deleted;
     872          57 :                         if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) {
     873           0 :                                 TRC_WARNING(SQL_STORE, "%s" "if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) { /*conflict with cleaner or write-write conflict*/", __func__);
     874           0 :                                 return -3; /*conflict with cleaner or write-write conflict*/
     875             :                         }
     876             :                 }
     877             : 
     878        9672 :                 lock_writer(os);
     879        9672 :                 ov->id_based_head = oo->id_based_head;
     880        9672 :                 ov->id_based_older = oo;
     881             : 
     882        9672 :                 id_based_node->ov = ov;
     883        9672 :                 if (oo) {
     884        9672 :                         oo->id_based_newer = ov;
     885             :                         // if the parent was originally deleted, we restore it to that state.
     886        9672 :                         os_atmc_set_state(oo, state);
     887             :                 }
     888        9672 :                 unlock_writer(os);
     889        9672 :                 return 0;
     890             :         } else { /* new */
     891      289820 :                 if (os_append_id(os, ov) == NULL)
     892             :                         return -1; // MALLOC_FAIL
     893             : 
     894             :                 return 0;
     895             :         }
     896             : }
     897             : 
     898             : static int /*ok, error (name existed) and conflict (added before) */
     899      299503 : os_add_(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
     900             : {
     901      299503 :         int res = 0;
     902      299503 :         objectversion *ov = SA_NEW(os->sa, objectversion);
     903             : 
     904      299503 :         *ov = (objectversion) {
     905      299503 :                 .ts = tr->tid,
     906             :                 .b = b,
     907             :                 .os = os,
     908             :         };
     909             : 
     910      299503 :         if (!os->concurrent && os_has_changes(os, tr)) { /* for object sets without concurrent support, conflict if concurrent changes are there */
     911           1 :                 if (os->destroy)
     912           1 :                         os->destroy(os->store, ov->b);
     913           1 :                 _DELETE(ov);
     914           1 :                 TRC_WARNING(SQL_STORE, "%s" "if (!os->concurrent && os_has_changes(os, tr)) { /* for object sets without concurrent support, conflict if concurrent changes are there */", __func__);
     915           1 :                 return -3; /* conflict */
     916             :         }
     917             : 
     918      299502 :         if ((res = os_add_id_based(os, tr, b->id, ov))) {
     919          10 :                 if (os->destroy)
     920          10 :                         os->destroy(os->store, ov->b);
     921          10 :                 _DELETE(ov);
     922          10 :                 return res;
     923             :         }
     924             : 
     925      299492 :         if ((res = os_add_name_based(os, tr, name, ov))) {
     926           6 :                 trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
     927           6 :                 return res;
     928             :         }
     929             : 
     930      299486 :         if (os->temporary) (void) os_dup(os); // TODO transaction_layer_revamp: embed into refcounting subproject
     931      299486 :         trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
     932      299486 :         return res;
     933             : }
     934             : 
     935             : int
     936      299503 : os_add(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
     937             : {
     938      299503 :         MT_lock_set(&os->lock);
     939      299503 :         int res = os_add_(os, tr, name, b);
     940      299503 :         MT_lock_unset(&os->lock);
     941      299503 :         return res;
     942             : }
     943             : 
     944             : static int
     945       11922 : os_del_name_based(objectset *os, struct sql_trans *tr, const char *name, objectversion *ov) {
     946       11922 :         versionhead  *name_based_node = NULL;
     947       11922 :         if (ov->id_based_older && strcmp(ov->id_based_older->b->name, name) == 0)
     948       11922 :                 name_based_node = ov->id_based_older->name_based_head;
     949           0 :         else if (os->unique) // Previous name based objectversion is of a different id, so now we do have to perform an extensive look up
     950           0 :                 name_based_node = find_name(os, name);
     951             : 
     952       11922 :         if (name_based_node) {
     953       11922 :                 objectversion *co = name_based_node->ov;
     954       11922 :                 objectversion *oo = get_valid_object_name(tr, co, true);
     955       11922 :                 ov->name_based_head = oo->name_based_head;
     956       11922 :                 if (co != oo) { /* conflict ? */
     957           0 :                         TRC_WARNING(SQL_STORE, "%s: " "if (co != oo) { /* conflict ? */", __func__);
     958           0 :                         return -3;
     959             :                 }
     960       11922 :                 ov->name_based_older = oo;
     961             : 
     962       11922 :                 lock_writer(os);
     963       11922 :                 if (oo) {
     964       11922 :                         oo->name_based_newer = ov;
     965       11922 :                         assert(os_atmc_get_state(oo) == active);
     966             :                 }
     967       11922 :                 name_based_node->ov = ov;
     968       11922 :                 unlock_writer(os);
     969       11922 :                 return 0;
     970             :         } else {
     971             :                 /* missing */
     972           0 :                 return -1;
     973             :         }
     974             : }
     975             : 
     976             : static int
     977       11924 : os_del_id_based(objectset *os, struct sql_trans *tr, sqlid id, objectversion *ov) {
     978             : 
     979       11924 :         versionhead  *id_based_node;
     980             : 
     981       11924 :         if (ov->name_based_older && ov->name_based_older->b->id == id)
     982           0 :                 id_based_node = ov->name_based_older->id_based_head;
     983             :         else // Previous id based objectversion is of a different name, so now we do have to perform an extensive look up
     984       11924 :                 id_based_node = find_id(os, id);
     985             : 
     986       11924 :         if (id_based_node) {
     987       11924 :                 objectversion *co = id_based_node->ov;
     988       11924 :                 objectversion *oo = get_valid_object_id(tr, co, true);
     989       11924 :                 ov->id_based_head = oo->id_based_head;
     990       11924 :                 if (co != oo) { /* conflict ? */
     991           2 :                         TRC_WARNING(SQL_STORE, "%s" "if (co != oo) { /* conflict ? */", __func__);
     992           2 :                         return -3;
     993             :                 }
     994       11922 :                 ov->id_based_older = oo;
     995             : 
     996       11922 :                 lock_writer(os);
     997       11922 :                 if (oo) {
     998       11922 :                         oo->id_based_newer = ov;
     999       11922 :                         assert(os_atmc_get_state(oo) == active);
    1000             :                 }
    1001       11922 :                 id_based_node->ov = ov;
    1002       11922 :                 unlock_writer(os);
    1003       11922 :                 return 0;
    1004             :         } else {
    1005             :                 /* missing */
    1006             :                 return -1;
    1007             :         }
    1008             : }
    1009             : 
    1010             : static int
    1011       11924 : os_del_(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
    1012             : {
    1013       11924 :         int res = 0;
    1014       11924 :         objectversion *ov = SA_NEW(os->sa, objectversion);
    1015             : 
    1016       11924 :         *ov = (objectversion) {
    1017       11924 :                 .ts = tr->tid,
    1018             :                 .b = b,
    1019             :                 .os = os,
    1020             :         };
    1021       11924 :         os_atmc_set_state(ov, deleted);
    1022             : 
    1023       11924 :         if ((res = os_del_id_based(os, tr, b->id, ov))) {
    1024           2 :                 if (os->destroy)
    1025           2 :                         os->destroy(os->store, ov->b);
    1026           2 :                 _DELETE(ov);
    1027           2 :                 return res;
    1028             :         }
    1029             : 
    1030       11922 :         if ((res = os_del_name_based(os, tr, name, ov))) {
    1031           0 :                 trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
    1032           0 :                 return res;
    1033             :         }
    1034             : 
    1035       11922 :         if (os->temporary) (void) os_dup(os); // TODO transaction_layer_revamp: embed into refcounting subproject
    1036       11922 :         trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
    1037       11922 :         return res;
    1038             : }
    1039             : 
    1040             : int
    1041       11924 : os_del(objectset *os, sql_trans *tr, const char *name, sql_base *b)
    1042             : {
    1043       11924 :         MT_lock_set(&os->lock);
    1044       11924 :         int res = os_del_(os, tr, name, b);
    1045       11924 :         MT_lock_unset(&os->lock);
    1046       11924 :         return res;
    1047             : }
    1048             : 
    1049             : int
    1050         323 : os_size(objectset *os, struct sql_trans *tr)
    1051             : {
    1052         323 :         int cnt = 0;
    1053         323 :         if (os) {
    1054         323 :                 lock_reader(os);
    1055         332 :                 for(versionhead  *n = os->name_based_h; n; n=n->next) {
    1056           9 :                         objectversion *ov = n->ov;
    1057           9 :                         assert(os == ov->os);
    1058           9 :                         if ((ov=get_valid_object_name(tr, ov, false)) && os_atmc_get_state(ov) == active)
    1059           6 :                                 cnt++;
    1060             :                 }
    1061         323 :                 unlock_reader(os);
    1062             :         }
    1063         323 :         return cnt;
    1064             : }
    1065             : 
    1066             : int
    1067           0 : os_empty(objectset *os, struct sql_trans *tr)
    1068             : {
    1069           0 :         return os_size(os, tr)==0;
    1070             : }
    1071             : 
    1072             : int
    1073           0 : os_remove(objectset *os, sql_trans *tr, const char *name)
    1074             : {
    1075           0 :         (void) os;
    1076           0 :         (void) tr;
    1077           0 :         (void) name;
    1078             :         // TODO remove entire versionhead  corresponding to this name.
    1079             : 
    1080             :         // TODO assert os->unique?s
    1081           0 :         return LOG_OK;
    1082             : }
    1083             : 
    1084             : sql_base *
    1085    19507677 : os_find_name(objectset *os, struct sql_trans *tr, const char *name)
    1086             : {
    1087    19507677 :         if (!os)
    1088             :                 return NULL;
    1089             : 
    1090    19507675 :         assert(os->unique);
    1091    19507675 :         versionhead  *n = find_name(os, name);
    1092             : 
    1093    19675580 :         if (n) {
    1094    19083172 :                 objectversion *ov = get_valid_object_name(tr, n->ov, true);
    1095    19028869 :                 if (ov && os_atmc_get_state(ov) == active)
    1096    19028209 :                         return ov->b;
    1097             :         }
    1098             :         return NULL;
    1099             : }
    1100             : 
    1101             : sql_base *
    1102      158609 : os_find_id(objectset *os, struct sql_trans *tr, sqlid id)
    1103             : {
    1104      158609 :         if (!os)
    1105             :                 return NULL;
    1106      158609 :         versionhead  *n = find_id(os, id);
    1107             : 
    1108      158609 :         if (n) {
    1109      158069 :                 objectversion *ov = get_valid_object_id(tr, n->ov, true);
    1110      158069 :                 if (ov && os_atmc_get_state(ov) == active)
    1111      158065 :                         return ov->b;
    1112             :         }
    1113             :         return NULL;
    1114             : }
    1115             : 
    1116             : void
    1117     2228019 : os_iterator(struct os_iter *oi, struct objectset *os, struct sql_trans *tr, const char *name /*optional*/)
    1118             : {
    1119     2228019 :         *oi = (struct os_iter) {
    1120             :                 .os = os,
    1121             :                 .tr = tr,
    1122             :                 .name = name,
    1123             :         };
    1124             : 
    1125     2228019 :         lock_reader(os);
    1126     2229128 :         if (name && os->name_map) {
    1127     1143983 :                 int key = hash_key(name);
    1128     1143983 :                 oi->n = (void*)os->name_map->buckets[key&(os->name_map->size-1)];
    1129             :         } else
    1130     1085145 :                 oi->n =      os->name_based_h;
    1131     2229128 :         unlock_reader(os);
    1132     2229376 : }
    1133             : 
    1134             : sql_base *
    1135     4326568 : oi_next(struct os_iter *oi)
    1136             : {
    1137     4326568 :         sql_base *b = NULL;
    1138             : 
    1139     4326568 :         if (oi->name) {
    1140     3282035 :                 lock_reader(oi->os); /* intentionally outside of while loop */
    1141     3282060 :                 if (oi->os->name_map) {
    1142     2896188 :                         sql_hash_e  *he = (void*)oi->n;
    1143             : 
    1144     6981966 :                         for (; he && !b; he = he->chain) {
    1145     4085779 :                                 versionhead  *n = he->value;
    1146             : 
    1147     4085779 :                                 if (n->ov->b->name && strcmp(n->ov->b->name, oi->name) == 0) {
    1148     1763753 :                                         objectversion *ov = n->ov;
    1149             : 
    1150     1763753 :                                         assert(oi->os == ov->os);
    1151     1763753 :                                         ov = get_valid_object_name(oi->tr, ov, false);
    1152     1763752 :                                         if (ov && os_atmc_get_state(ov) == active)
    1153     1761985 :                                                 b = ov->b;
    1154             :                                 }
    1155             :                         }
    1156     2896187 :                         oi->n = (void*)he;
    1157             :                 } else {
    1158      385872 :                         versionhead  *n = oi->n;
    1159             : 
    1160      432854 :                         while (n && !b) {
    1161             : 
    1162       46982 :                                 if (n->ov->b->name && strcmp(n->ov->b->name, oi->name) == 0) {
    1163       12225 :                                         objectversion *ov = n->ov;
    1164             : 
    1165       12225 :                                         n = oi->n = n->next;
    1166       12225 :                                         assert(oi->os == ov->os);
    1167       12225 :                                         ov = get_valid_object_name(oi->tr, ov, false);
    1168       12225 :                                         if (ov && os_atmc_get_state(ov) == active)
    1169       12225 :                                                 b = ov->b;
    1170             :                                 } else {
    1171       34757 :                                         n = oi->n = n->next;
    1172             :                                 }
    1173             :                         }
    1174             :                 }
    1175     3282059 :                 unlock_reader(oi->os);
    1176             :         } else {
    1177     1044533 :                 versionhead  *n = oi->n;
    1178             : 
    1179     1044533 :                 lock_reader(oi->os); /* intentionally outside of while loop */
    1180     2500384 :                 while (n && !b) {
    1181      411279 :                         objectversion *ov = n->ov;
    1182      411279 :                         n = oi->n = n->next;
    1183             : 
    1184      411279 :                         assert(oi->os == ov->os);
    1185      411279 :                         ov = get_valid_object_id(oi->tr, ov, false);
    1186      411246 :                         if (ov && os_atmc_get_state(ov) == active)
    1187      389743 :                                 b = ov->b;
    1188             :                 }
    1189     1044572 :                 unlock_reader(oi->os);
    1190             :         }
    1191     4327184 :         return b;
    1192             : }
    1193             : 
    1194             : bool
    1195        3555 : os_obj_intransaction(objectset *os, struct sql_trans *tr, sql_base *b)
    1196             : {
    1197        3555 :         versionhead  *n = find_id(os, b->id);
    1198             : 
    1199        3555 :         if (n) {
    1200        3555 :                 objectversion *ov = n->ov;
    1201             : 
    1202        3555 :                 if (ov && os_atmc_get_state(ov) == active && ov->ts == tr->tid)
    1203             :                         return true;
    1204             :         }
    1205             :         return false;
    1206             : }
    1207             : 
    1208             : /* return true if this object set has changes pending for an other transaction */
    1209             : bool
    1210      196340 : os_has_changes(objectset *os, struct sql_trans *tr)
    1211             : {
    1212      196340 :         versionhead  *n = os->id_based_t;
    1213             : 
    1214      196340 :         if (n) {
    1215      191754 :                 objectversion *ov = n->ov;
    1216             : 
    1217      191754 :                 if (ov && os_atmc_get_state(ov) == active && ov->ts != tr->tid && ov->ts > TRANSACTION_ID_BASE)
    1218             :                         return true;
    1219             :         }
    1220             :         return false;
    1221             : }

Generated by: LCOV version 1.14