LCOV - code coverage report
Current view: top level - sql/storage - objectset.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 575 614 93.6 %
Date: 2024-04-26 00:35:57 Functions: 42 44 95.5 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "sql_catalog.h"
      15             : #include "sql_storage.h"
      16             : 
      17             : #include "gdk_atoms.h"
      18             : 
      19             : struct versionhead ;
      20             : 
      21             : #define active                                                  (0)
      22             : #define under_destruction                               (1<<1)
      23             : #define block_destruction                               (1<<2)
      24             : #define deleted                                                 (1<<3)
      25             : #define rollbacked                                              (1<<4)
      26             : 
      27             : /* This objectversion owns its associated versionhead.
      28             :  * When this objectversion gets destroyed,
      29             :  * the cleanup procedure should also destroy the associated (name|id) based versionhead.*/
      30             : #define name_based_versionhead_owner    (1<<5)
      31             : #define id_based_versionhead_owner              (1<<6)
      32             : 
      33             : typedef struct objectversion {
      34             :         ulng ts;
      35             :         ATOMIC_TYPE state;
      36             :         sql_base *b; // base of underlying sql object
      37             :         struct objectset* os;
      38             :         struct objectversion    *name_based_older;
      39             :         struct objectversion    *name_based_newer;
      40             :         struct versionhead              *name_based_head;
      41             : 
      42             :         struct objectversion    *id_based_older;
      43             :         struct objectversion    *id_based_newer;
      44             :         struct versionhead              *id_based_head;
      45             : } objectversion;
      46             : 
      47             : typedef struct versionhead  {
      48             :     struct versionhead * prev;
      49             :     struct versionhead * next;
      50             :     objectversion* ov;
      51             : } versionhead ;
      52             : 
      53             : typedef struct objectset {
      54             :         ATOMIC_TYPE refcnt;
      55             :         allocator *sa;
      56             :         destroy_fptr destroy;
      57             :         MT_RWLock rw_lock;      /*readers-writer lock to protect the links (chains) in the objectversion chain.*/
      58             :         versionhead  *name_based_h;
      59             :         versionhead  *name_based_t;
      60             :         versionhead  *id_based_h;
      61             :         versionhead  *id_based_t;
      62             :         int name_based_cnt;
      63             :         int id_based_cnt;
      64             :         struct sql_hash *name_map;
      65             :         struct sql_hash *id_map;
      66             :         bool
      67             :                 temporary:1,
      68             :                 unique:1, /* names are unique */
      69             :                 concurrent:1,   /* concurrent inserts are allowed */
      70             :             nested:1;
      71             :         sql_store store;
      72             : } objectset;
      73             : 
      74             : static int
      75      335512 : os_id_key(versionhead  *n)
      76             : {
      77      259048 :         return (int) BATatoms[TYPE_int].atomHash(&n->ov->b->id);
      78             : }
      79             : 
      80             : static inline void
      81    24325080 : lock_reader(objectset* os)
      82             : {
      83    24325080 :         MT_rwlock_rdlock(&os->rw_lock);
      84     1113571 : }
      85             : 
      86             : static inline void
      87    24426303 : unlock_reader(objectset* os)
      88             : {
      89    24426303 :         MT_rwlock_rdunlock(&os->rw_lock);
      90     3998888 : }
      91             : 
      92             : static inline void
      93      609806 : lock_writer(objectset* os)
      94             : {
      95      609806 :         MT_rwlock_wrlock(&os->rw_lock);
      96             : }
      97             : 
      98             : static inline void
      99      609806 : unlock_writer(objectset* os)
     100             : {
     101      609806 :         MT_rwlock_wrunlock(&os->rw_lock);
     102         260 : }
     103             : 
     104    21508300 : static bte os_atmc_get_state(objectversion *ov) {
     105    21508300 :         bte state = (bte) ATOMIC_GET(&ov->state);
     106    21508300 :         return state;
     107             : }
     108             : 
     109       61754 : static void os_atmc_set_state(objectversion *ov, bte state) {
     110       61754 :         ATOMIC_SET(&ov->state, state);
     111           0 : }
     112             : 
     113             : static versionhead  *
     114      442357 : find_id(objectset *os, sqlid id)
     115             : {
     116      442357 :         if (os) {
     117      442357 :                 lock_reader(os);
     118      442357 :                 if (os->id_map) {
     119      409879 :                         int key = (int) BATatoms[TYPE_int].atomHash(&id);
     120      409879 :                         sql_hash_e *he = os->id_map->buckets[key&(os->id_map->size-1)];
     121             : 
     122     1506861 :                         for (; he; he = he->chain) {
     123     1265427 :                                 versionhead  *n = he->value;
     124             : 
     125     1265427 :                                 if (n && n->ov->b->id == id) {
     126      168445 :                                         unlock_reader(os);
     127      168445 :                                         return n;
     128             :                                 }
     129             :                         }
     130      241434 :                         unlock_reader(os);
     131      241434 :                         return NULL;
     132             :                 }
     133             : 
     134       99254 :                 for (versionhead  *n = os->id_based_h; n; n = n->next) {
     135       69266 :                         objectversion *ov = n->ov;
     136             : 
     137             :                         /* check if ids match */
     138       69266 :                         if (id == ov->b->id) {
     139        2490 :                                 unlock_reader(os);
     140        2490 :                                 return n;
     141             :                         }
     142             :                 }
     143       29988 :                 unlock_reader(os);
     144             :         }
     145             : 
     146             :         return NULL;
     147             : }
     148             : 
     149             : // TODO copy of static function from sql_list.c. Needs to be made external
     150             : static void
     151       27938 : hash_delete(sql_hash *h, void *data)
     152             : {
     153       27938 :         int key = h->key(data);
     154       27938 :         sql_hash_e *e, *p = h->buckets[key&(h->size-1)];
     155             : 
     156       27938 :         e = p;
     157       45320 :         for (;  p && p->value != data ; p = p->chain)
     158       17382 :                 e = p;
     159       27938 :         if (p && p->value == data) {
     160       27938 :                 if (p == e)
     161       23334 :                         h->buckets[key&(h->size-1)] = p->chain;
     162             :                 else
     163        4604 :                         e->chain = p->chain;
     164       27938 :                 if (!h->sa)
     165       27938 :                         _DELETE(p);
     166             :         }
     167       27938 :         h->entries--;
     168       27938 : }
     169             : 
     170             : static void
     171      540450 : node_destroy(objectset *os, sqlstore *store, versionhead  *n)
     172             : {
     173      540450 :         if (!os->sa)
     174      540450 :                 _DELETE(n);
     175      540450 :         (void)store;
     176      540450 : }
     177             : 
     178             : static versionhead  *
     179       14587 : os_remove_name_based_chain(objectset *os, objectversion* ov)
     180             : {
     181       14587 :         lock_writer(os);
     182       14587 :         versionhead  *n = ov->name_based_head;
     183       14587 :         versionhead  *p = os->name_based_h;
     184       14587 :         if (p != n)
     185     1117078 :                 while (p && p->next != n)
     186             :                         p = p->next;
     187       14587 :         assert(p==n||(p && p->next == n));
     188       14587 :         if (p == n) {
     189        1277 :                 os->name_based_h = n->next;
     190        1277 :                 if (os->name_based_h) // i.e. non-empty os
     191         786 :                         os->name_based_h->prev = NULL;
     192             :                 p = NULL;
     193       13310 :         } else if ( p != NULL)  {
     194       13310 :                 p->next = n->next;
     195       13310 :                 if (p->next) // node in the middle
     196        5505 :                         p->next->prev = p;
     197             :         }
     198       14587 :         if (n == os->name_based_t)
     199        8296 :                 os->name_based_t = p;
     200             : 
     201       14587 :         if (os->name_map && n)
     202       13623 :                 hash_delete(os->name_map, n);
     203             : 
     204       14587 :         os->name_based_cnt--;
     205       14587 :         unlock_writer(os);
     206             : 
     207       14587 :         bte state = os_atmc_get_state(ov);
     208       14587 :         state |= name_based_versionhead_owner;
     209       14587 :         os_atmc_set_state(ov, state);
     210       14587 :         return p;
     211             : }
     212             : 
     213             : static versionhead  *
     214       14825 : os_remove_id_based_chain(objectset *os, objectversion* ov)
     215             : {
     216       14825 :         lock_writer(os);
     217       14825 :         versionhead  *n = ov->id_based_head;
     218       14825 :         versionhead  *p = os->id_based_h;
     219             : 
     220       14825 :         if (p != n)
     221     1139954 :                 while (p && p->next != n)
     222             :                         p = p->next;
     223       14825 :         assert(p==n||(p && p->next == n));
     224       14825 :         if (p == n) {
     225        1277 :                 os->id_based_h = n->next;
     226        1277 :                 if (os->id_based_h) // i.e. non-empty os
     227         786 :                         os->id_based_h->prev = NULL;
     228             :                 p = NULL;
     229       13548 :         } else if ( p != NULL)  {
     230       13548 :                 p->next = n->next;
     231       13548 :                 if (p->next) // node in the middle
     232        5743 :                         p->next->prev = p;
     233             :         }
     234       14825 :         if (n == os->id_based_t)
     235        8296 :                 os->id_based_t = p;
     236             : 
     237       14825 :         if (os->id_map && n)
     238       14315 :                 hash_delete(os->id_map, n);
     239             : 
     240       14825 :         os->name_based_cnt--;
     241       14825 :         unlock_writer(os);
     242             : 
     243       14825 :         bte state = os_atmc_get_state(ov);
     244       14825 :         state |= id_based_versionhead_owner;
     245       14825 :         os_atmc_set_state(ov, state);
     246       14825 :         return p;
     247             : }
     248             : 
     249             : static versionhead  *
     250      541594 : node_create(allocator *sa, objectversion *ov)
     251             : {
     252      541594 :         versionhead  *n = SA_NEW(sa, versionhead );
     253             : 
     254      541594 :         if (n == NULL)
     255             :                 return NULL;
     256      541594 :         *n = (versionhead ) {
     257             :                 .ov = ov,
     258             :         };
     259      541594 :         return n;
     260             : }
     261             : 
     262             : static inline int
     263      324868 : os_name_key(versionhead  *n)
     264             : {
     265      324868 :         return hash_key(n->ov->b->name);
     266             : }
     267             : 
     268             : static objectset *
     269      270678 : os_append_node_name(objectset *os, versionhead  *n)
     270             : {
     271      270678 :         lock_writer(os);
     272      270678 :         if ((!os->name_map || os->name_map->size*16 < os->name_based_cnt) && os->name_based_cnt > HASH_MIN_SIZE) {
     273        3947 :                 hash_destroy(os->name_map);
     274        3947 :                 os->name_map = hash_new(os->sa, os->name_based_cnt, (fkeyvalue)& os_name_key);
     275        3947 :                 if (os->name_map == NULL) {
     276           0 :                         unlock_writer(os);
     277           0 :                         return NULL;
     278             :                 }
     279             : 
     280       71143 :                 for (versionhead  *n = os->name_based_h; n; n = n->next ) {
     281       67196 :                         int key = os_name_key(n);
     282             : 
     283       67196 :                         if (hash_add(os->name_map, key, n) == NULL) {
     284           0 :                                 unlock_writer(os);
     285           0 :                                 return NULL;
     286             :                         }
     287             :                 }
     288             :         }
     289             : 
     290      270678 :         if (os->name_map) {
     291      244049 :                 int key = os->name_map->key(n);
     292             : 
     293      244049 :                 if (hash_add(os->name_map, key, n) == NULL) {
     294           0 :                         unlock_writer(os);
     295           0 :                         return NULL;
     296             :                 }
     297             :         }
     298             : 
     299      270678 :         if (os->name_based_t) {
     300      262580 :                 os->name_based_t->next = n;
     301             :         } else {
     302        8098 :                 os->name_based_h = n;
     303             :         }
     304      270678 :         n->prev = os->name_based_t; // aka the double linked list.
     305      270678 :         os->name_based_t = n;
     306      270678 :         os->name_based_cnt++;
     307      270678 :         unlock_writer(os);
     308      270678 :         return os;
     309             : }
     310             : 
     311             : static objectset *
     312      270678 : os_append_name(objectset *os, objectversion *ov)
     313             : {
     314      270678 :         versionhead  *n = node_create(os->sa, ov);
     315             : 
     316      270678 :         if (n == NULL)
     317             :                 return NULL;
     318             : 
     319      270678 :         ov->name_based_head = n;
     320      270678 :         if (!(os = os_append_node_name(os, n))){
     321           0 :                 _DELETE(n);
     322           0 :                 return NULL;
     323             :         }
     324             : 
     325             :         return os;
     326             : }
     327             : 
     328             : static void
     329        4053 : os_append_id_map(objectset *os)
     330             : {
     331        4053 :         if (os->id_map)
     332         462 :                 hash_destroy(os->id_map);
     333        4053 :         os->id_map = hash_new(os->sa, os->id_based_cnt, (fkeyvalue)&os_id_key);
     334        4053 :         if (os->id_map == NULL)
     335             :                 return ;
     336       80517 :         for (versionhead  *n = os->id_based_h; n; n = n->next ) {
     337       76464 :                 int key = os_id_key(n);
     338             : 
     339       76464 :                 if (hash_add(os->id_map, key, n) == NULL) {
     340           0 :                         hash_destroy(os->id_map);
     341           0 :                         os->id_map = NULL;
     342           0 :                         return ;
     343             :                 }
     344             :         }
     345             : }
     346             : 
     347             : static objectset *
     348      270916 : os_append_node_id(objectset *os, versionhead  *n)
     349             : {
     350      270916 :         lock_writer(os);
     351      270916 :         if ((!os->id_map || os->id_map->size*16 < os->id_based_cnt) && os->id_based_cnt > HASH_MIN_SIZE)
     352        4053 :                 os_append_id_map(os); /* on failure just fall back to slow method */
     353             : 
     354      270916 :         if (os->id_map) {
     355      244733 :                 int key = os->id_map->key(n);
     356      244733 :                 if (hash_add(os->id_map, key, n) == NULL) {
     357           0 :                         hash_destroy(os->id_map);
     358           0 :                         os->id_map = NULL;
     359             :                         /* fall back to slow search */
     360             :                 }
     361             :         }
     362             : 
     363      270916 :         if (os->id_based_t) {
     364      262818 :                 os->id_based_t->next = n;
     365             :         } else {
     366        8098 :                 os->id_based_h = n;
     367             :         }
     368      270916 :         n->prev = os->id_based_t; // aka the double linked list.
     369      270916 :         os->id_based_t = n;
     370      270916 :         os->id_based_cnt++;
     371      270916 :         unlock_writer(os);
     372      270916 :         return os;
     373             : }
     374             : 
     375             : static objectset *
     376      270916 : os_append_id(objectset *os, objectversion *ov)
     377             : {
     378      270916 :         versionhead  *n = node_create(os->sa, ov);
     379             : 
     380      270916 :         if (n == NULL)
     381             :                 return NULL;
     382      270916 :         ov->id_based_head = n;
     383      270916 :         if (!(os = os_append_node_id(os, n))){
     384           0 :                 _DELETE(n);
     385           0 :                 return NULL;
     386             :         }
     387             : 
     388             :         return os;
     389             : }
     390             : 
     391             : static versionhead * find_name(objectset *os, const char *name);
     392             : 
     393             : static void
     394      289498 : objectversion_destroy(sqlstore *store, objectset* os, objectversion *ov)
     395             : {
     396             : 
     397      289498 :         bte state = os_atmc_get_state(ov);
     398             : 
     399      289498 :         if (state & name_based_versionhead_owner) {
     400       14587 :                 node_destroy(ov->os, store, ov->name_based_head);
     401             :         }
     402             : 
     403      289498 :         if (state & id_based_versionhead_owner) {
     404       14825 :                 node_destroy(ov->os, store, ov->id_based_head);
     405             :         }
     406             : 
     407      289498 :         if (os->destroy && ov->b)
     408      265028 :                 os->destroy(store, ov->b);
     409             : 
     410      289498 :         if (os->temporary && (state & deleted || state & under_destruction || state & rollbacked))
     411         197 :                 os_destroy(os, store); // TODO transaction_layer_revamp: embed into refcounting subproject : reference is already dropped by os_cleanup
     412      289498 :         _DELETE(ov);
     413      289498 : }
     414             : 
     415             : static void
     416        5397 : _os_rollback(objectversion *ov, sqlstore *store)
     417             : {
     418        5397 :         assert(ov->ts >= TRANSACTION_ID_BASE);
     419             : 
     420        5397 :         bte state = os_atmc_get_state(ov);
     421        5397 :         if (state & rollbacked) {
     422             :                 return;
     423             :         }
     424             : 
     425        4906 :         state |= rollbacked;
     426        4906 :         os_atmc_set_state(ov, state);
     427             : 
     428        4906 :         bte state_older;
     429             : 
     430             :         /*
     431             :          * We have to use the readers-writer lock here,
     432             :          * since the pointer containing the adress of the older objectversion might be concurrently overwritten if the older itself hass just been put in the under_destruction state .
     433             :          */
     434        4906 :         lock_reader(ov->os);
     435        4906 :         objectversion* name_based_older = ov->name_based_older;
     436        4906 :         unlock_reader(ov->os);
     437             : 
     438        4906 :         if (name_based_older && !((state_older= os_atmc_get_state(name_based_older)) & rollbacked)) {
     439         723 :                 if (ov->ts != name_based_older->ts) {
     440             :                         // older is last committed state or belongs to parent transaction.
     441             :                         // In any case, we restore versionhead pointer to that.
     442             : 
     443         237 :                         ATOMIC_BASE_TYPE expected_deleted = deleted;
     444         237 :                         if (state_older == active || (state_older == deleted && ATOMIC_CAS(&name_based_older->state, &expected_deleted, block_destruction))) {
     445         237 :                                 ov->name_based_head->ov = name_based_older;
     446         237 :                                 name_based_older->name_based_newer=NULL;
     447         237 :                                 if (state_older != active && expected_deleted == deleted)
     448           0 :                                         os_atmc_set_state(name_based_older, deleted); //Restore the deleted older back to its deleted state.
     449             :                         }
     450             :                 }
     451             :                 else {
     452         486 :                         _os_rollback(name_based_older, store);
     453             :                 }
     454             :         }
     455        4183 :         else if (!name_based_older) {
     456             :                 // this is a terminal node. i.e. this objectversion does not have name based committed history
     457        4183 :                 if (ov->name_based_head) // The oposite can happen during an early conflict in os_add or os_del.
     458        4177 :                         os_remove_name_based_chain(ov->os, ov);
     459             :         }
     460             : 
     461             :         /*
     462             :          * We have to use the readers-writer lock here,
     463             :          * since the pointer containing the adress of the older objectversion might be concurrently overwritten if the older itself hass just been put in the under_destruction state .
     464             :          */
     465        4906 :         lock_reader(ov->os);
     466        4906 :         objectversion* id_based_older = ov->id_based_older;
     467        4906 :         unlock_reader(ov->os);
     468        4906 :         if (id_based_older && !((state_older= os_atmc_get_state(id_based_older)) & rollbacked)) {
     469         242 :                 if (ov->ts != id_based_older->ts) {
     470             :                         // older is last committed state or belongs to parent transaction.
     471             :                         // In any case, we restore versionhead pointer to that.
     472             : 
     473         237 :                         ATOMIC_BASE_TYPE expected_deleted = deleted;
     474         237 :                         if (state_older == active || (state_older == deleted && ATOMIC_CAS(&id_based_older->state, &expected_deleted, block_destruction))) {
     475         237 :                                 ov->id_based_head->ov = id_based_older;
     476         237 :                                 id_based_older->id_based_newer=NULL;
     477         237 :                                 if (state_older != active && expected_deleted == deleted)
     478           0 :                                         os_atmc_set_state(id_based_older, deleted); //Restore the deleted older back to its deleted state.
     479             :                         }
     480             :                 }
     481           5 :                 else if (id_based_older != name_based_older)
     482           5 :                         _os_rollback(id_based_older, store);
     483             :         }
     484        4219 :         else if (!id_based_older) {
     485             :                 // this is a terminal node. i.e. this objectversion does not have id based committed history
     486        4219 :                 os_remove_id_based_chain(ov->os, ov);
     487             :         }
     488             : 
     489        4906 :         if (ov->name_based_newer && !(os_atmc_get_state(ov->name_based_newer) & rollbacked)) {
     490           0 :                 _os_rollback(ov->name_based_newer, store);
     491             :         }
     492             : 
     493        4906 :         if (ov->id_based_newer && ov->id_based_newer != ov->name_based_newer && !(os_atmc_get_state(ov->id_based_newer) & rollbacked)) {
     494           0 :                 _os_rollback(ov->id_based_newer, store);
     495             :         }
     496             : }
     497             : 
     498             : static int
     499        4906 : os_rollback(objectversion *ov, sqlstore *store)
     500             : {
     501        4906 :         _os_rollback(ov, store);
     502             : 
     503        4906 :         return LOG_OK;
     504             : }
     505             : 
     506             : static void
     507       24470 : ov_destroy_obj_recursive(sqlstore* store, objectversion *ov)
     508             : {
     509       24470 :         if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
     510       13989 :                 ov_destroy_obj_recursive(store, ov->id_based_older);
     511             :         }
     512       24470 :         if (ov->os->destroy && ov->b) {
     513       24470 :                 ov->os->destroy(store, ov->b);
     514       24470 :                 ov->b = NULL;
     515             :         }
     516       24470 : }
     517             : 
     518             : static inline void
     519       10638 : try_to_mark_deleted_for_destruction(sqlstore* store, objectversion *ov)
     520             : {
     521       10638 :         ATOMIC_BASE_TYPE expected_deleted = deleted;
     522       10638 :         if (ATOMIC_CAS(&ov->state, &expected_deleted, under_destruction)) {
     523             : 
     524       10638 :                 if (!ov->name_based_newer || (os_atmc_get_state(ov->name_based_newer) & rollbacked)) {
     525       10410 :                         os_remove_name_based_chain(ov->os, ov);
     526             :                 }
     527             :                 else {
     528         228 :                         lock_writer(ov->os);
     529         228 :                         ov->name_based_newer->name_based_older = NULL;
     530         228 :                         unlock_writer(ov->os);
     531             :                 }
     532             : 
     533       10638 :                 if (!ov->id_based_newer || (os_atmc_get_state(ov->id_based_newer) & rollbacked)) {
     534       10606 :                         os_remove_id_based_chain(ov->os, ov);
     535             :                 }
     536             :                 else {
     537          32 :                         lock_writer(ov->os);
     538          32 :                         ov->id_based_newer->id_based_older = NULL;
     539          32 :                         unlock_writer(ov->os);
     540             :                 }
     541             : 
     542       10638 :                 ov->ts = store_get_timestamp(store)+1;
     543       10638 :                 if (!ov->os->nested)
     544       10481 :                         ov_destroy_obj_recursive(store, ov);
     545             :         }
     546       10638 : }
     547             : 
     548             : static void
     549       28524 : objectversion_destroy_recursive(sqlstore* store, objectversion *ov)
     550             : {
     551       28524 :         if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
     552       14146 :                 objectversion_destroy_recursive(store, ov->id_based_older);
     553             :         }
     554       28524 :         objectversion_destroy(store, ov->os, ov);
     555       28524 : }
     556             : 
     557             : static int
     558      316123 : os_cleanup(sqlstore* store, objectversion *ov, ulng oldest)
     559             : {
     560      316123 :         if (os_atmc_get_state(ov) & under_destruction) {
     561       15181 :                 if (ov->ts < oldest) {
     562             :                         // This one is ready to be freed
     563       10638 :                         objectversion_destroy_recursive(store, ov);
     564       10638 :                         return LOG_ERR;
     565             :                 }
     566             : 
     567             :                 // not yet old enough to be safely removed. Try later.
     568             :                 return LOG_OK;
     569             :         }
     570             : 
     571      300942 :         if (os_atmc_get_state(ov) & rollbacked) {
     572       15705 :                 if (ov->ts < oldest) {
     573             :                         // This one is ready to be freed
     574        4906 :                         if (ov->name_based_older && ov->name_based_older->name_based_newer == ov)
     575         486 :                                 ov->name_based_older->name_based_newer=NULL;
     576        4906 :                         if (ov->id_based_older && ov->id_based_older->id_based_newer == ov)
     577         450 :                                 ov->id_based_older->id_based_newer=NULL;
     578        4906 :                         objectversion_destroy(store, ov->os, ov);
     579        4906 :                         return LOG_ERR;
     580             :                 }
     581             : 
     582       10799 :                 if (ov->ts > TRANSACTION_ID_BASE) {
     583             :                         /* We mark it with the latest possible starttime and reinsert it into the cleanup list.
     584             :                          * This will cause a safe eventual destruction of this rollbacked ov.
     585             :                          */
     586        4906 :                         ov->ts = store_get_timestamp(store)+1;
     587             :                 }
     588             : 
     589             :                 // not yet old enough to be safely removed. Try later.
     590       10799 :                 return LOG_OK;
     591             :         }
     592             : 
     593      285237 :         if (os_atmc_get_state(ov) == deleted) {
     594       10711 :                 if (ov->ts <= oldest) {
     595             :                         // the oldest relevant state is deleted so lets try to mark it as destroyed
     596       10638 :                         try_to_mark_deleted_for_destruction(store, ov);
     597       10638 :                         return LOG_OK+1;
     598             :                 }
     599             : 
     600             :                 // Keep it inplace on the cleanup list, either because it is now marked for destruction or
     601             :                 // we want to retry marking it for destruction later.
     602             :                 return LOG_OK;
     603             :         }
     604             : 
     605      274526 :         assert(os_atmc_get_state(ov) != deleted && os_atmc_get_state(ov) != under_destruction && os_atmc_get_state(ov) != rollbacked);
     606      274526 :         if (ov->os->temporary) os_destroy(ov->os, store); // TODO transaction_layer_revamp: embed into refcounting subproject: (old) live versions should drop their reference to the os
     607             : 
     608      282323 :         while (ov->id_based_older && ov->id_based_older == ov->name_based_older && ov->ts >= oldest) {
     609             :                 ov = ov->id_based_older;
     610             :         }
     611             : 
     612      274526 :         if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
     613             :                 // Destroy everything older then the oldest possibly relevant objectversion.
     614        3740 :                 objectversion_destroy_recursive(store, ov->id_based_older);
     615        3740 :                 ov->id_based_older = NULL;
     616             :         }
     617             : 
     618             :         return LOG_ERR;
     619             : }
     620             : 
     621             : static int
     622      316130 : tc_gc_objectversion(sql_store store, sql_change *change, ulng oldest)
     623             : {
     624             : //      assert(!change->handled);
     625      316130 :         objectversion *ov = (objectversion*)change->data;
     626             : 
     627      316130 :         if (oldest >= TRANSACTION_ID_BASE)
     628             :                 return 0;
     629      316123 :         int res = os_cleanup( (sqlstore*) store, ov, oldest);
     630      316123 :         change->handled = (res)?true:false;
     631      316123 :         return res>=0?LOG_OK:LOG_ERR;
     632             : }
     633             : 
     634             : static int
     635      290077 : tc_commit_objectversion(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
     636             : {
     637      290077 :         objectversion *ov = (objectversion*)change->data;
     638      290077 :         if (commit_ts) {
     639      285171 :                 assert(ov->ts == tr->tid);
     640      285171 :                 ov->ts = commit_ts;
     641      285171 :                 change->committed = commit_ts < TRANSACTION_ID_BASE ? true: false;
     642      285171 :                 (void)oldest;
     643      285171 :                 if (!tr->parent)
     644      285164 :                         change->obj->new = 0;
     645             :         }
     646             :         else {
     647        4906 :                 os_rollback(ov, tr->store);
     648             :         }
     649             : 
     650      290077 :         return LOG_OK;
     651             : }
     652             : 
     653             : objectset *
     654       60236 : os_new(allocator *sa, destroy_fptr destroy, bool temporary, bool unique, bool concurrent, bool nested, sql_store store)
     655             : {
     656       60236 :         assert(!sa);
     657       60236 :         objectset *os = SA_NEW(sa, objectset);
     658       60236 :         if (os) {
     659       60236 :                 *os = (objectset) {
     660             :                         .refcnt = ATOMIC_VAR_INIT(1),
     661             :                         .sa = sa,
     662             :                         .destroy = destroy,
     663             :                         .temporary = temporary,
     664             :                         .unique = unique,
     665             :                         .concurrent = concurrent,
     666             :                         .nested = nested,
     667             :                         .store = store
     668             :                 };
     669       60236 :                 os->destroy = destroy;
     670       60236 :                 MT_rwlock_init(&os->rw_lock, "sa_readers_lock");
     671             :         }
     672             : 
     673       60236 :         return os;
     674             : }
     675             : 
     676             : objectset *
     677        4399 : os_dup(objectset *os)
     678             : {
     679        4399 :         ATOMIC_INC(&os->refcnt);
     680        4399 :         return os;
     681             : }
     682             : 
     683             : void
     684       64577 : os_destroy(objectset *os, sql_store store)
     685             : {
     686       64577 :         if (ATOMIC_DEC(&os->refcnt) > 0)
     687             :                 return;
     688       60178 :         MT_rwlock_destroy(&os->rw_lock);
     689       60178 :         versionhead* n=os->id_based_h;
     690      315697 :         while(n) {
     691      255519 :                 objectversion *ov = n->ov;
     692      511587 :                 while(ov) {
     693      256068 :                         objectversion *older = ov->id_based_older;
     694      256068 :                         objectversion_destroy(store, os, ov);
     695      256068 :                         ov = older;
     696             :                 }
     697      255519 :                 versionhead* hn =n->next;
     698      255519 :                 node_destroy(os, store, n);
     699      255519 :                 n = hn;
     700             :         }
     701             : 
     702       60178 :         n=os->name_based_h;
     703      315697 :         while(n) {
     704      255519 :                 versionhead* hn =n->next;
     705      255519 :                 node_destroy(os, store, n);
     706      255519 :                 n = hn;
     707             :         }
     708             : 
     709       60178 :         if (os->id_map)
     710        3581 :                 hash_destroy(os->id_map);
     711             : 
     712       60178 :         if (os->name_map)
     713        3555 :                 hash_destroy(os->name_map);
     714             : 
     715       60178 :         if (!os->sa)
     716       60178 :                 _DELETE(os);
     717             : }
     718             : 
     719             : static versionhead  *
     720    17719801 : find_name(objectset *os, const char *name)
     721             : {
     722    17719801 :         lock_reader(os);
     723    17825976 :         if (os->name_map) {
     724    14357134 :                 int key = hash_key(name);
     725    14357134 :                 sql_hash_e *he = os->name_map->buckets[key&(os->name_map->size-1)];
     726             : 
     727    80034287 :                 for (; he; he = he->chain) {
     728    79675544 :                         versionhead  *n = he->value;
     729             : 
     730    79675544 :                         if (n && n->ov->b->name && strcmp(n->ov->b->name, name) == 0) {
     731    13998391 :                                 unlock_reader(os);
     732    13998391 :                                 return n;
     733             :                         }
     734             :                 }
     735      358743 :                 unlock_reader(os);
     736      358743 :                 return NULL;
     737             :         }
     738             : 
     739     4140268 :         for (versionhead  *n = os->name_based_h; n; n = n->next) {
     740     3849530 :                 objectversion *ov = n->ov;
     741             : 
     742             :                 /* check if names match */
     743     3849530 :                 if (name[0] == ov->b->name[0] && strcmp(name, ov->b->name) == 0) {
     744     3178104 :                         unlock_reader(os);
     745     3178104 :                         return n;
     746             :                 }
     747             :         }
     748             : 
     749      290738 :         unlock_reader(os);
     750      290738 :         return NULL;
     751             : }
     752             : 
     753             : static objectversion*
     754    18724018 : get_valid_object_name(sql_trans *tr, objectversion *ov)
     755             : {
     756    18730819 :         while(ov) {
     757    18730789 :                 if (ov->ts == tr->tid || (tr->parent && tr_version_of_parent(tr, ov->ts)) || ov->ts < tr->ts)
     758    18723836 :                         return ov;
     759             :                 else {
     760        6946 :                         lock_reader(ov->os);
     761         168 :                         objectversion* name_based_older = ov->name_based_older;
     762         168 :                         unlock_reader(ov->os);
     763         168 :                         ov = name_based_older;
     764             :                 }
     765             :         }
     766             :         return ov;
     767             : }
     768             : 
     769             : static objectversion*
     770      547748 : get_valid_object_id(sql_trans *tr, objectversion *ov)
     771             : {
     772      547910 :         while(ov) {
     773      547800 :                 if (ov->ts == tr->tid || (tr->parent && tr_version_of_parent(tr, ov->ts))  || ov->ts < tr->ts)
     774      547637 :                         return ov;
     775             :                 else {
     776         163 :                         lock_reader(ov->os);
     777         163 :                         objectversion* id_based_older = ov->id_based_older;
     778         163 :                         unlock_reader(ov->os);
     779         163 :                         ov = id_based_older;
     780             :                 }
     781             :         }
     782             :         return ov;
     783             : }
     784             : 
     785             : static int
     786      278964 : os_add_name_based(objectset *os, struct sql_trans *tr, const char *name, objectversion *ov) {
     787      278964 :         versionhead  *name_based_node = NULL;
     788      278964 :         if (ov->id_based_older && strcmp(ov->id_based_older->b->name, name) == 0)
     789        8024 :                 name_based_node = ov->id_based_older->name_based_head;
     790      270940 :         else if (os->unique) // Previous name based objectversion is of a different id, so now we do have to perform an extensive look up
     791       82370 :                 name_based_node = find_name(os, name);
     792             :         // else names are not unique and each id based version head maps to its own name based version head.
     793             : 
     794       90394 :         if (name_based_node) {
     795        8286 :                 objectversion *co = name_based_node->ov;
     796        8286 :                 objectversion *oo = get_valid_object_name(tr, co);
     797        8286 :                 if (co != oo) { /* conflict ? */
     798           6 :                         TRC_WARNING(SQL_STORE, "%s" "if (co != oo) { /* conflict ? */", __func__);
     799           6 :                         return -3;
     800             :                 }
     801             : 
     802        8280 :                 assert(ov != oo); // Time loops are not allowed
     803             : 
     804        8280 :                 bte state = os_atmc_get_state(oo);
     805        8280 :                 if (state != active) {
     806             :                         // This can only happen if the parent oo was a comitted deleted at some point.
     807         270 :                         assert(state == deleted || state == under_destruction || state == block_destruction);
     808             :                         /* Since our parent oo is comitted deleted objectversion, we might have a conflict with
     809             :                         * another transaction that tries to clean up oo or also wants to add a new objectversion.
     810             :                         */
     811         270 :                         ATOMIC_BASE_TYPE expected_deleted = deleted;
     812         270 :                         if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) {
     813           0 :                                 TRC_WARNING(SQL_STORE, "%s: " "if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) { /*conflict with cleaner or write-write conflict*/ ", __func__);
     814           0 :                                 return -3; /*conflict with cleaner or write-write conflict*/
     815             :                         }
     816             :                 }
     817             : 
     818             :                 /* new object with same name within transaction, should have a delete in between */
     819        8010 :                 assert(!(state == active && oo->ts == ov->ts && !(os_atmc_get_state(ov) & deleted)));
     820             : 
     821        8280 :                 lock_writer(os);
     822        8280 :                 ov->name_based_head = oo->name_based_head;
     823        8280 :                 ov->name_based_older = oo;
     824             : 
     825        8280 :                 name_based_node->ov = ov;
     826        8280 :                 if (oo) {
     827        8280 :                         oo->name_based_newer = ov;
     828             :                         // if the parent was originally deleted, we restore it to that state.
     829        8280 :                         os_atmc_set_state(oo, state);
     830             :                 }
     831        8280 :                 unlock_writer(os);
     832        8280 :                 return 0;
     833             :         } else { /* new */
     834      270678 :                 if (os_append_name(os, ov) == NULL)
     835             :                         return -1; // MALLOC_FAIL
     836             :                 return 0;
     837             :         }
     838             : }
     839             : 
     840             : static int
     841      278974 : os_add_id_based(objectset *os, struct sql_trans *tr, sqlid id, objectversion *ov) {
     842      278974 :         versionhead  *id_based_node;
     843             : 
     844      278974 :         id_based_node = find_id(os, id);
     845             : 
     846      278974 :         if (id_based_node) {
     847        8058 :                 objectversion *co = id_based_node->ov;
     848        8058 :                 objectversion *oo = get_valid_object_id(tr, co);
     849        8058 :                 if (co != oo) { /* conflict ? */
     850          10 :                         TRC_WARNING(SQL_STORE, "%s" "if (co != oo) { /* conflict ? */", __func__);
     851          10 :                         return -3;
     852             :                 }
     853             : 
     854        8048 :                 assert(ov != oo); // Time loops are not allowed
     855             : 
     856        8048 :                 bte state = os_atmc_get_state(oo);
     857        8048 :                 if (state != active) {
     858             :                         // This can only happen if the parent oo was a comitted deleted at some point.
     859          38 :                         assert(state == deleted || state == under_destruction || state == block_destruction);
     860             :                         /* Since our parent oo is comitted deleted objectversion, we might have a conflict with
     861             :                         * another transaction that tries to clean up oo or also wants to add a new objectversion.
     862             :                         */
     863          38 :                         ATOMIC_BASE_TYPE expected_deleted = deleted;
     864          38 :                         if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) {
     865           0 :                                 TRC_WARNING(SQL_STORE, "%s" "if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) { /*conflict with cleaner or write-write conflict*/", __func__);
     866           0 :                                 return -3; /*conflict with cleaner or write-write conflict*/
     867             :                         }
     868             :                 }
     869             : 
     870        8048 :                 lock_writer(os);
     871        8048 :                 ov->id_based_head = oo->id_based_head;
     872        8048 :                 ov->id_based_older = oo;
     873             : 
     874        8048 :                 id_based_node->ov = ov;
     875        8048 :                 if (oo) {
     876        8048 :                         oo->id_based_newer = ov;
     877             :                         // if the parent was originally deleted, we restore it to that state.
     878        8048 :                         os_atmc_set_state(oo, state);
     879             :                 }
     880        8048 :                 unlock_writer(os);
     881        8048 :                 return 0;
     882             :         } else { /* new */
     883      270916 :                 if (os_append_id(os, ov) == NULL)
     884             :                         return -1; // MALLOC_FAIL
     885             : 
     886             :                 return 0;
     887             :         }
     888             : }
     889             : 
     890             : static int /*ok, error (name existed) and conflict (added before) */
     891      278975 : os_add_(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
     892             : {
     893      278975 :         int res = 0;
     894      278975 :         objectversion *ov = SA_NEW(os->sa, objectversion);
     895             : 
     896      278975 :         *ov = (objectversion) {
     897      278975 :                 .ts = tr->tid,
     898             :                 .b = b,
     899             :                 .os = os,
     900             :         };
     901             : 
     902      278975 :         if (!os->concurrent && os_has_changes(os, tr)) { /* for object sets without concurrent support, conflict if concurrent changes are there */
     903           1 :                 if (os->destroy)
     904           1 :                         os->destroy(os->store, ov->b);
     905           1 :                 _DELETE(ov);
     906           1 :                 TRC_WARNING(SQL_STORE, "%s" "if (!os->concurrent && os_has_changes(os, tr)) { /* for object sets without concurrent support, conflict if concurrent changes are there */", __func__);
     907           1 :                 return -3; /* conflict */
     908             :         }
     909             : 
     910      278974 :         if ((res = os_add_id_based(os, tr, b->id, ov))) {
     911          10 :                 if (os->destroy)
     912          10 :                         os->destroy(os->store, ov->b);
     913          10 :                 _DELETE(ov);
     914          10 :                 return res;
     915             :         }
     916             : 
     917      278964 :         if ((res = os_add_name_based(os, tr, name, ov))) {
     918           6 :                 trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
     919           6 :                 return res;
     920             :         }
     921             : 
     922      278958 :         if (os->temporary) (void) os_dup(os); // TODO transaction_layer_revamp: embed into refcounting subproject
     923      278958 :         trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
     924      278958 :         return res;
     925             : }
     926             : 
     927             : int
     928      278975 : os_add(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
     929             : {
     930      278975 :         store_lock(tr->store);
     931      278975 :         int res = os_add_(os, tr, name, b);
     932      278975 :         store_unlock(tr->store);
     933      278975 :         return res;
     934             : }
     935             : 
     936             : static int
     937       11106 : os_del_name_based(objectset *os, struct sql_trans *tr, const char *name, objectversion *ov) {
     938       11106 :         versionhead  *name_based_node = NULL;
     939       11106 :         if (ov->id_based_older && strcmp(ov->id_based_older->b->name, name) == 0)
     940       11106 :                 name_based_node = ov->id_based_older->name_based_head;
     941           0 :         else if (os->unique) // Previous name based objectversion is of a different id, so now we do have to perform an extensive look up
     942           0 :                 name_based_node = find_name(os, name);
     943             : 
     944       11106 :         if (name_based_node) {
     945       11106 :                 objectversion *co = name_based_node->ov;
     946       11106 :                 objectversion *oo = get_valid_object_name(tr, co);
     947       11106 :                 ov->name_based_head = oo->name_based_head;
     948       11106 :                 if (co != oo) { /* conflict ? */
     949           0 :                         TRC_WARNING(SQL_STORE, "%s: " "if (co != oo) { /* conflict ? */", __func__);
     950           0 :                         return -3;
     951             :                 }
     952       11106 :                 ov->name_based_older = oo;
     953             : 
     954       11106 :                 lock_writer(os);
     955       11106 :                 if (oo) {
     956       11106 :                         oo->name_based_newer = ov;
     957       11106 :                         assert(os_atmc_get_state(oo) == active);
     958             :                 }
     959       11106 :                 name_based_node->ov = ov;
     960       11106 :                 unlock_writer(os);
     961       11106 :                 return 0;
     962             :         } else {
     963             :                 /* missing */
     964           0 :                 return -1;
     965             :         }
     966             : }
     967             : 
     968             : static int
     969       11108 : os_del_id_based(objectset *os, struct sql_trans *tr, sqlid id, objectversion *ov) {
     970             : 
     971       11108 :         versionhead  *id_based_node;
     972             : 
     973       11108 :         if (ov->name_based_older && ov->name_based_older->b->id == id)
     974           0 :                 id_based_node = ov->name_based_older->id_based_head;
     975             :         else // Previous id based objectversion is of a different name, so now we do have to perform an extensive look up
     976       11108 :                 id_based_node = find_id(os, id);
     977             : 
     978       11108 :         if (id_based_node) {
     979       11108 :                 objectversion *co = id_based_node->ov;
     980       11108 :                 objectversion *oo = get_valid_object_id(tr, co);
     981       11108 :                 ov->id_based_head = oo->id_based_head;
     982       11108 :                 if (co != oo) { /* conflict ? */
     983           2 :                         TRC_WARNING(SQL_STORE, "%s" "if (co != oo) { /* conflict ? */", __func__);
     984           2 :                         return -3;
     985             :                 }
     986       11106 :                 ov->id_based_older = oo;
     987             : 
     988       11106 :                 lock_writer(os);
     989       11106 :                 if (oo) {
     990       11106 :                         oo->id_based_newer = ov;
     991       11106 :                         assert(os_atmc_get_state(oo) == active);
     992             :                 }
     993       11106 :                 id_based_node->ov = ov;
     994       11106 :                 unlock_writer(os);
     995       11106 :                 return 0;
     996             :         } else {
     997             :                 /* missing */
     998             :                 return -1;
     999             :         }
    1000             : }
    1001             : 
    1002             : static int
    1003       11108 : os_del_(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
    1004             : {
    1005       11108 :         int res = 0;
    1006       11108 :         objectversion *ov = SA_NEW(os->sa, objectversion);
    1007             : 
    1008       11108 :         *ov = (objectversion) {
    1009       11108 :                 .ts = tr->tid,
    1010             :                 .b = b,
    1011             :                 .os = os,
    1012             :         };
    1013       11108 :         os_atmc_set_state(ov, deleted);
    1014             : 
    1015       11108 :         if ((res = os_del_id_based(os, tr, b->id, ov))) {
    1016           2 :                 if (os->destroy)
    1017           2 :                         os->destroy(os->store, ov->b);
    1018           2 :                 _DELETE(ov);
    1019           2 :                 return res;
    1020             :         }
    1021             : 
    1022       11106 :         if ((res = os_del_name_based(os, tr, name, ov))) {
    1023           0 :                 trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
    1024           0 :                 return res;
    1025             :         }
    1026             : 
    1027       11106 :         if (os->temporary) (void) os_dup(os); // TODO transaction_layer_revamp: embed into refcounting subproject
    1028       11106 :         trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
    1029       11106 :         return res;
    1030             : }
    1031             : 
    1032             : int
    1033       11108 : os_del(objectset *os, sql_trans *tr, const char *name, sql_base *b)
    1034             : {
    1035       11108 :         store_lock(tr->store);
    1036       11108 :         int res = os_del_(os, tr, name, b);
    1037       11108 :         store_unlock(tr->store);
    1038       11108 :         return res;
    1039             : }
    1040             : 
    1041             : int
    1042         291 : os_size(objectset *os, struct sql_trans *tr)
    1043             : {
    1044         291 :         int cnt = 0;
    1045         291 :         if (os) {
    1046         291 :                 lock_reader(os);
    1047         300 :                 for(versionhead  *n = os->name_based_h; n; n=n->next) {
    1048           9 :                         objectversion *ov = n->ov;
    1049           9 :                         if ((ov=get_valid_object_name(tr, ov)) && os_atmc_get_state(ov) == active)
    1050           6 :                                 cnt++;
    1051             :                 }
    1052         291 :                 unlock_reader(os);
    1053             :         }
    1054         291 :         return cnt;
    1055             : }
    1056             : 
    1057             : int
    1058           0 : os_empty(objectset *os, struct sql_trans *tr)
    1059             : {
    1060           0 :         return os_size(os, tr)==0;
    1061             : }
    1062             : 
    1063             : int
    1064           0 : os_remove(objectset *os, sql_trans *tr, const char *name)
    1065             : {
    1066           0 :         (void) os;
    1067           0 :         (void) tr;
    1068           0 :         (void) name;
    1069             :         // TODO remove entire versionhead  corresponding to this name.
    1070             : 
    1071             :         // TODO assert os->unique?s
    1072           0 :         return LOG_OK;
    1073             : }
    1074             : 
    1075             : sql_base *
    1076    17647958 : os_find_name(objectset *os, struct sql_trans *tr, const char *name)
    1077             : {
    1078    17647958 :         if (!os)
    1079             :                 return NULL;
    1080             : 
    1081    17647956 :         assert(os->unique);
    1082    17647956 :         versionhead  *n = find_name(os, name);
    1083             : 
    1084    17794112 :         if (n) {
    1085    17226695 :                  objectversion *ov = get_valid_object_name(tr, n->ov);
    1086    17211653 :                  if (ov && os_atmc_get_state(ov) == active)
    1087    17211217 :                          return ov->b;
    1088             :         }
    1089             :         return NULL;
    1090             : }
    1091             : 
    1092             : sql_base *
    1093      148867 : os_find_id(objectset *os, struct sql_trans *tr, sqlid id)
    1094             : {
    1095      148867 :         if (!os)
    1096             :                 return NULL;
    1097      148867 :         versionhead  *n = find_id(os, id);
    1098             : 
    1099      148867 :         if (n) {
    1100      148361 :                  objectversion *ov = get_valid_object_id(tr, n->ov);
    1101      148361 :                  if (ov && os_atmc_get_state(ov) == active)
    1102      148357 :                          return ov->b;
    1103             :         }
    1104             :         return NULL;
    1105             : }
    1106             : 
    1107             : void
    1108     2177871 : os_iterator(struct os_iter *oi, struct objectset *os, struct sql_trans *tr, const char *name /*optional*/)
    1109             : {
    1110     2177871 :         *oi = (struct os_iter) {
    1111             :                 .os = os,
    1112             :                 .tr = tr,
    1113             :                 .name = name,
    1114             :         };
    1115             : 
    1116     2177871 :         lock_reader(os);
    1117     2179154 :         if (name && os->name_map) {
    1118     1015463 :                 int key = hash_key(name);
    1119     1015463 :                 oi->n = (void*)os->name_map->buckets[key&(os->name_map->size-1)];
    1120             :         } else
    1121     1163691 :                 oi->n =      os->name_based_h;
    1122     2179154 :         unlock_reader(os);
    1123     2179173 : }
    1124             : 
    1125             : sql_base *
    1126     3967839 : oi_next(struct os_iter *oi)
    1127             : {
    1128     3967839 :         sql_base *b = NULL;
    1129             : 
    1130     3967839 :         if (oi->name) {
    1131     2854635 :                 lock_reader(oi->os); /* intentionally outside of while loop */
    1132     2854636 :                 if (oi->os->name_map) {
    1133     2489119 :                         sql_hash_e  *he = (void*)oi->n;
    1134             : 
    1135     6216417 :                         for (; he && !b; he = he->chain) {
    1136     3727298 :                                 versionhead  *n = he->value;
    1137             : 
    1138     3727298 :                                 if (n->ov->b->name && strcmp(n->ov->b->name, oi->name) == 0) {
    1139     1481809 :                                         objectversion *ov = n->ov;
    1140             : 
    1141     1481809 :                                         ov = get_valid_object_name(oi->tr, ov);
    1142     1481809 :                                         if (ov && os_atmc_get_state(ov) == active)
    1143     1478771 :                                                 b = ov->b;
    1144             :                                 }
    1145             :                         }
    1146     2489119 :                         oi->n = (void*)he;
    1147             :                 } else {
    1148      365517 :                         versionhead  *n = oi->n;
    1149             : 
    1150      409552 :                         while (n && !b) {
    1151             : 
    1152       44035 :                                 if (n->ov->b->name && strcmp(n->ov->b->name, oi->name) == 0) {
    1153       13084 :                                         objectversion *ov = n->ov;
    1154             : 
    1155       13084 :                                         n = oi->n = n->next;
    1156       13084 :                                         ov = get_valid_object_name(oi->tr, ov);
    1157       13084 :                                         if (ov && os_atmc_get_state(ov) == active)
    1158       13084 :                                                 b = ov->b;
    1159             :                                 } else {
    1160       30951 :                                         n = oi->n = n->next;
    1161             :                                 }
    1162             :                         }
    1163             :                 }
    1164     2854636 :                 unlock_reader(oi->os);
    1165             :         } else {
    1166     1113204 :                 versionhead  *n = oi->n;
    1167             : 
    1168     1113204 :                 lock_reader(oi->os); /* intentionally outside of while loop */
    1169     2606995 :                 while (n && !b) {
    1170      380045 :                         objectversion *ov = n->ov;
    1171      380045 :                         n = oi->n = n->next;
    1172             : 
    1173      380045 :                         ov = get_valid_object_id(oi->tr, ov);
    1174      380220 :                         if (ov && os_atmc_get_state(ov) == active)
    1175      355844 :                                 b = ov->b;
    1176             :                 }
    1177     1113746 :                 unlock_reader(oi->os);
    1178             :         }
    1179     3968609 :         return b;
    1180             : }
    1181             : 
    1182             : bool
    1183        3408 : os_obj_intransaction(objectset *os, struct sql_trans *tr, sql_base *b)
    1184             : {
    1185        3408 :         versionhead  *n = find_id(os, b->id);
    1186             : 
    1187        3408 :         if (n) {
    1188        3408 :                 objectversion *ov = n->ov;
    1189             : 
    1190        3408 :                 if (ov && os_atmc_get_state(ov) == active && ov->ts == tr->tid)
    1191             :                         return true;
    1192             :         }
    1193             :         return false;
    1194             : }
    1195             : 
    1196             : /* return true if this object set has changes pending for an other transaction */
    1197             : bool
    1198      183161 : os_has_changes(objectset *os, struct sql_trans *tr)
    1199             : {
    1200      183161 :         versionhead  *n = os->id_based_t;
    1201             : 
    1202      183161 :         if (n) {
    1203      179002 :                 objectversion *ov = n->ov;
    1204             : 
    1205      179002 :                 if (ov && os_atmc_get_state(ov) == active && ov->ts != tr->tid && ov->ts > TRANSACTION_ID_BASE)
    1206             :                         return true;
    1207             :         }
    1208             :         return false;
    1209             : }

Generated by: LCOV version 1.14