Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024, 2025 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "sql_catalog.h"
15 : #include "sql_storage.h"
16 :
17 : #include "gdk_atoms.h"
18 :
19 : struct versionhead ;
20 :
21 : #define active (0)
22 : #define under_destruction (1<<1)
23 : #define block_destruction (1<<2)
24 : #define deleted (1<<3)
25 : #define rollbacked (1<<4)
26 :
27 : /* This objectversion owns its associated versionhead.
28 : * When this objectversion gets destroyed,
29 : * the cleanup procedure should also destroy the associated (name|id) based versionhead.*/
30 : #define name_based_versionhead_owner (1<<5)
31 : #define id_based_versionhead_owner (1<<6)
32 :
33 : typedef struct objectversion {
34 : ulng ts;
35 : ATOMIC_TYPE state;
36 : sql_base *b; // base of underlying sql object
37 : struct objectset* os;
38 : struct objectversion *name_based_older;
39 : struct objectversion *name_based_newer;
40 : struct versionhead *name_based_head;
41 :
42 : struct objectversion *id_based_older;
43 : struct objectversion *id_based_newer;
44 : struct versionhead *id_based_head;
45 : } objectversion;
46 :
47 : typedef struct versionhead {
48 : struct versionhead * prev;
49 : struct versionhead * next;
50 : objectversion* ov;
51 : } versionhead ;
52 :
53 : typedef struct objectset {
54 : ATOMIC_TYPE refcnt;
55 : allocator *sa;
56 : destroy_fptr destroy;
57 : MT_RWLock rw_lock; /*readers-writer lock to protect the links (chains) in the objectversion chain.*/
58 : MT_Lock lock; /* global objectset lock for os_add/del */
59 : versionhead *name_based_h;
60 : versionhead *name_based_t;
61 : versionhead *id_based_h;
62 : versionhead *id_based_t;
63 : int name_based_cnt;
64 : int id_based_cnt;
65 : struct sql_hash *name_map;
66 : struct sql_hash *id_map;
67 : bool
68 : temporary:1,
69 : unique:1, /* names are unique */
70 : concurrent:1, /* concurrent inserts are allowed */
71 : nested:1;
72 : sql_store store;
73 : } objectset;
74 :
75 : static int
76 359251 : os_id_key(versionhead *n)
77 : {
78 277775 : return (int) BATatoms[TYPE_int].atomHash(&n->ov->b->id);
79 : }
80 :
81 : static inline void
82 26623045 : lock_reader(objectset* os)
83 : {
84 26623045 : MT_rwlock_rdlock(&os->rw_lock);
85 1044605 : }
86 :
87 : static inline void
88 26738581 : unlock_reader(objectset* os)
89 : {
90 26738581 : MT_rwlock_rdunlock(&os->rw_lock);
91 4359468 : }
92 :
93 : static inline void
94 655479 : lock_writer(objectset* os)
95 : {
96 655479 : MT_rwlock_wrlock(&os->rw_lock);
97 : }
98 :
99 : static inline void
100 655479 : unlock_writer(objectset* os)
101 : {
102 655479 : MT_rwlock_wrunlock(&os->rw_lock);
103 424 : }
104 :
105 23816404 : static bte os_atmc_get_state(objectversion *ov) {
106 23816404 : bte state = (bte) ATOMIC_GET(&ov->state);
107 23816404 : return state;
108 : }
109 :
110 69487 : static void os_atmc_set_state(objectversion *ov, bte state) {
111 69487 : ATOMIC_SET(&ov->state, state);
112 0 : }
113 :
114 : static versionhead *
115 473590 : find_id(objectset *os, sqlid id)
116 : {
117 473590 : if (os) {
118 473590 : lock_reader(os);
119 473590 : if (os->id_map) {
120 439178 : int key = (int) BATatoms[TYPE_int].atomHash(&id);
121 439178 : sql_hash_e *he = os->id_map->buckets[key&(os->id_map->size-1)];
122 :
123 1511464 : for (; he; he = he->chain) {
124 1252872 : versionhead *n = he->value;
125 :
126 1252872 : if (n && n->ov->b->id == id) {
127 180586 : unlock_reader(os);
128 180586 : return n;
129 : }
130 : }
131 258592 : unlock_reader(os);
132 258592 : return NULL;
133 : }
134 :
135 104590 : for (versionhead *n = os->id_based_h; n; n = n->next) {
136 72822 : objectversion *ov = n->ov;
137 :
138 : /* check if ids match */
139 72822 : if (id == ov->b->id) {
140 2644 : unlock_reader(os);
141 2644 : return n;
142 : }
143 : }
144 31768 : unlock_reader(os);
145 : }
146 :
147 : return NULL;
148 : }
149 :
150 : // TODO copy of static function from sql_list.c. Needs to be made external
151 : static void
152 30565 : hash_delete(sql_hash *h, void *data)
153 : {
154 30565 : int key = h->key(data);
155 30565 : sql_hash_e *e, *p = h->buckets[key&(h->size-1)];
156 :
157 30565 : e = p;
158 47358 : for (; p && p->value != data ; p = p->chain)
159 16793 : e = p;
160 30565 : if (p && p->value == data) {
161 30565 : if (p == e)
162 26010 : h->buckets[key&(h->size-1)] = p->chain;
163 : else
164 4555 : e->chain = p->chain;
165 30565 : if (!h->sa)
166 30565 : _DELETE(p);
167 : }
168 30565 : h->entries--;
169 30565 : }
170 :
171 : static void
172 578111 : node_destroy(objectset *os, sqlstore *store, versionhead *n)
173 : {
174 578111 : if (!os->sa)
175 578111 : _DELETE(n);
176 578111 : (void)store;
177 578111 : }
178 :
179 : static versionhead *
180 15934 : os_remove_name_based_chain(objectset *os, objectversion* ov)
181 : {
182 15934 : lock_writer(os);
183 15934 : versionhead *n = ov->name_based_head;
184 15934 : versionhead *p = os->name_based_h;
185 15934 : if (p != n)
186 1202795 : while (p && p->next != n)
187 : p = p->next;
188 15934 : assert(p==n||(p && p->next == n));
189 15934 : if (p == n) {
190 1383 : os->name_based_h = n->next;
191 1383 : if (os->name_based_h) // i.e. non-empty os
192 832 : os->name_based_h->prev = NULL;
193 : p = NULL;
194 14551 : } else if ( p != NULL) {
195 14551 : p->next = n->next;
196 14551 : if (p->next) // node in the middle
197 5612 : p->next->prev = p;
198 : }
199 15934 : if (n == os->name_based_t)
200 9490 : os->name_based_t = p;
201 :
202 15934 : if (os->name_map && n)
203 14840 : hash_delete(os->name_map, n);
204 :
205 15934 : os->name_based_cnt--;
206 15934 : unlock_writer(os);
207 :
208 15934 : bte state = os_atmc_get_state(ov);
209 15934 : state |= name_based_versionhead_owner;
210 15934 : os_atmc_set_state(ov, state);
211 15934 : return p;
212 : }
213 :
214 : static versionhead *
215 16299 : os_remove_id_based_chain(objectset *os, objectversion* ov)
216 : {
217 16299 : lock_writer(os);
218 16299 : versionhead *n = ov->id_based_head;
219 16299 : versionhead *p = os->id_based_h;
220 :
221 16299 : if (p != n)
222 1231369 : while (p && p->next != n)
223 : p = p->next;
224 16299 : assert(p==n||(p && p->next == n));
225 16299 : if (p == n) {
226 1383 : os->id_based_h = n->next;
227 1383 : if (os->id_based_h) // i.e. non-empty os
228 832 : os->id_based_h->prev = NULL;
229 : p = NULL;
230 14916 : } else if ( p != NULL) {
231 14916 : p->next = n->next;
232 14916 : if (p->next) // node in the middle
233 5978 : p->next->prev = p;
234 : }
235 16299 : if (n == os->id_based_t)
236 9489 : os->id_based_t = p;
237 :
238 16299 : if (os->id_map && n)
239 15725 : hash_delete(os->id_map, n);
240 :
241 16299 : os->name_based_cnt--;
242 16299 : unlock_writer(os);
243 :
244 16299 : bte state = os_atmc_get_state(ov);
245 16299 : state |= id_based_versionhead_owner;
246 16299 : os_atmc_set_state(ov, state);
247 16299 : return p;
248 : }
249 :
250 : static versionhead *
251 579275 : node_create(allocator *sa, objectversion *ov)
252 : {
253 579275 : versionhead *n = SA_NEW(sa, versionhead );
254 :
255 579275 : if (n == NULL)
256 : return NULL;
257 579275 : *n = (versionhead ) {
258 : .ov = ov,
259 : };
260 579275 : return n;
261 : }
262 :
263 : static inline int
264 347352 : os_name_key(versionhead *n)
265 : {
266 347352 : return hash_key(n->ov->b->name);
267 : }
268 :
269 : static objectset *
270 289455 : os_append_node_name(objectset *os, versionhead *n)
271 : {
272 289455 : lock_writer(os);
273 289455 : if ((!os->name_map || os->name_map->size*16 < os->name_based_cnt) && os->name_based_cnt > HASH_MIN_SIZE) {
274 4148 : hash_destroy(os->name_map);
275 4148 : os->name_map = hash_new(os->sa, os->name_based_cnt, (fkeyvalue)& os_name_key);
276 4148 : if (os->name_map == NULL) {
277 0 : unlock_writer(os);
278 0 : return NULL;
279 : }
280 :
281 75489 : for (versionhead *n = os->name_based_h; n; n = n->next ) {
282 71341 : int key = os_name_key(n);
283 :
284 71341 : if (hash_add(os->name_map, key, n) == NULL) {
285 0 : unlock_writer(os);
286 0 : return NULL;
287 : }
288 : }
289 : }
290 :
291 289455 : if (os->name_map) {
292 261171 : int key = os->name_map->key(n);
293 :
294 261171 : if (hash_add(os->name_map, key, n) == NULL) {
295 0 : unlock_writer(os);
296 0 : return NULL;
297 : }
298 : }
299 :
300 289455 : if (os->name_based_t) {
301 280700 : os->name_based_t->next = n;
302 : } else {
303 8755 : os->name_based_h = n;
304 : }
305 289455 : n->prev = os->name_based_t; // aka the double linked list.
306 289455 : os->name_based_t = n;
307 289455 : os->name_based_cnt++;
308 289455 : unlock_writer(os);
309 289455 : return os;
310 : }
311 :
312 : static objectset *
313 289455 : os_append_name(objectset *os, objectversion *ov)
314 : {
315 289455 : versionhead *n = node_create(os->sa, ov);
316 :
317 289455 : if (n == NULL)
318 : return NULL;
319 :
320 289455 : ov->name_based_head = n;
321 289455 : if (!(os = os_append_node_name(os, n))){
322 0 : _DELETE(n);
323 0 : return NULL;
324 : }
325 :
326 : return os;
327 : }
328 :
329 : static void
330 4263 : os_append_id_map(objectset *os)
331 : {
332 4263 : if (os->id_map)
333 495 : hash_destroy(os->id_map);
334 4263 : os->id_map = hash_new(os->sa, os->id_based_cnt, (fkeyvalue)&os_id_key);
335 4263 : if (os->id_map == NULL)
336 : return ;
337 85739 : for (versionhead *n = os->id_based_h; n; n = n->next ) {
338 81476 : int key = os_id_key(n);
339 :
340 81476 : if (hash_add(os->id_map, key, n) == NULL) {
341 0 : hash_destroy(os->id_map);
342 0 : os->id_map = NULL;
343 0 : return ;
344 : }
345 : }
346 : }
347 :
348 : static objectset *
349 289820 : os_append_node_id(objectset *os, versionhead *n)
350 : {
351 289820 : lock_writer(os);
352 289820 : if ((!os->id_map || os->id_map->size*16 < os->id_based_cnt) && os->id_based_cnt > HASH_MIN_SIZE)
353 4263 : os_append_id_map(os); /* on failure just fall back to slow method */
354 :
355 289820 : if (os->id_map) {
356 262050 : int key = os->id_map->key(n);
357 262050 : if (hash_add(os->id_map, key, n) == NULL) {
358 0 : hash_destroy(os->id_map);
359 0 : os->id_map = NULL;
360 : /* fall back to slow search */
361 : }
362 : }
363 :
364 289820 : if (os->id_based_t) {
365 281065 : os->id_based_t->next = n;
366 : } else {
367 8755 : os->id_based_h = n;
368 : }
369 289820 : n->prev = os->id_based_t; // aka the double linked list.
370 289820 : os->id_based_t = n;
371 289820 : os->id_based_cnt++;
372 289820 : unlock_writer(os);
373 289820 : return os;
374 : }
375 :
376 : static objectset *
377 289820 : os_append_id(objectset *os, objectversion *ov)
378 : {
379 289820 : versionhead *n = node_create(os->sa, ov);
380 :
381 289820 : if (n == NULL)
382 : return NULL;
383 289820 : ov->id_based_head = n;
384 289820 : if (!(os = os_append_node_id(os, n))){
385 0 : _DELETE(n);
386 0 : return NULL;
387 : }
388 :
389 : return os;
390 : }
391 :
392 : static versionhead * find_name(objectset *os, const char *name);
393 :
394 : static void
395 310832 : objectversion_destroy(sqlstore *store, objectset* os, objectversion *ov)
396 : {
397 :
398 310832 : bte state = os_atmc_get_state(ov);
399 :
400 310832 : if (state & name_based_versionhead_owner) {
401 15934 : node_destroy(ov->os, store, ov->name_based_head);
402 : }
403 :
404 310832 : if (state & id_based_versionhead_owner) {
405 16299 : node_destroy(ov->os, store, ov->id_based_head);
406 : }
407 :
408 310832 : if (os->destroy && ov->b)
409 284697 : os->destroy(store, ov->b);
410 :
411 310832 : if (os->temporary && (state & deleted || state & under_destruction || state & rollbacked))
412 205 : os_destroy(os, store); // TODO transaction_layer_revamp: embed into refcounting subproject : reference is already dropped by os_cleanup
413 310832 : _DELETE(ov);
414 310832 : }
415 :
416 : static void
417 6127 : _os_rollback(objectversion *ov, sqlstore *store)
418 : {
419 6127 : assert(ov->ts >= TRANSACTION_ID_BASE);
420 :
421 6127 : bte state = os_atmc_get_state(ov);
422 6127 : if (state & rollbacked) {
423 : return;
424 : }
425 :
426 5627 : state |= rollbacked;
427 5627 : os_atmc_set_state(ov, state);
428 :
429 5627 : bte state_older;
430 :
431 : /*
432 : * We have to use the readers-writer lock here,
433 : * since the pointer containing the address of the older objectversion might be concurrently overwritten if the older itself has just been put in the under_destruction state .
434 : */
435 5627 : lock_reader(ov->os);
436 5627 : objectversion* name_based_older = ov->name_based_older;
437 5627 : unlock_reader(ov->os);
438 :
439 5627 : if (name_based_older && !((state_older= os_atmc_get_state(name_based_older)) & rollbacked)) {
440 759 : if (ov->ts != name_based_older->ts) {
441 : // older is last committed state or belongs to parent transaction.
442 : // In any case, we restore versionhead pointer to that.
443 :
444 264 : ATOMIC_BASE_TYPE expected_deleted = deleted;
445 264 : if (state_older == active || (state_older == deleted && ATOMIC_CAS(&name_based_older->state, &expected_deleted, block_destruction))) {
446 264 : ov->name_based_head->ov = name_based_older;
447 264 : name_based_older->name_based_newer=NULL;
448 264 : if (state_older != active && expected_deleted == deleted)
449 0 : os_atmc_set_state(name_based_older, deleted); //Restore the deleted older back to its deleted state.
450 : }
451 : }
452 : else {
453 495 : _os_rollback(name_based_older, store);
454 : }
455 : }
456 4868 : else if (!name_based_older) {
457 : // this is a terminal node. i.e. this objectversion does not have name based committed history
458 4868 : if (ov->name_based_head) // The opposite can happen during an early conflict in os_add or os_del.
459 4862 : os_remove_name_based_chain(ov->os, ov);
460 : }
461 :
462 : /*
463 : * We have to use the readers-writer lock here,
464 : * since the pointer containing the address of the older objectversion might be concurrently overwritten if the older itself has just been put in the under_destruction state .
465 : */
466 5627 : lock_reader(ov->os);
467 5627 : objectversion* id_based_older = ov->id_based_older;
468 5627 : unlock_reader(ov->os);
469 5627 : if (id_based_older && !((state_older= os_atmc_get_state(id_based_older)) & rollbacked)) {
470 269 : if (ov->ts != id_based_older->ts) {
471 : // older is last committed state or belongs to parent transaction.
472 : // In any case, we restore versionhead pointer to that.
473 :
474 264 : ATOMIC_BASE_TYPE expected_deleted = deleted;
475 264 : if (state_older == active || (state_older == deleted && ATOMIC_CAS(&id_based_older->state, &expected_deleted, block_destruction))) {
476 264 : ov->id_based_head->ov = id_based_older;
477 264 : id_based_older->id_based_newer=NULL;
478 264 : if (state_older != active && expected_deleted == deleted)
479 0 : os_atmc_set_state(id_based_older, deleted); //Restore the deleted older back to its deleted state.
480 : }
481 : }
482 5 : else if (id_based_older != name_based_older)
483 5 : _os_rollback(id_based_older, store);
484 : }
485 4905 : else if (!id_based_older) {
486 : // this is a terminal node. i.e. this objectversion does not have id based committed history
487 4905 : os_remove_id_based_chain(ov->os, ov);
488 : }
489 :
490 5627 : if (ov->name_based_newer && !(os_atmc_get_state(ov->name_based_newer) & rollbacked)) {
491 0 : _os_rollback(ov->name_based_newer, store);
492 : }
493 :
494 5627 : if (ov->id_based_newer && ov->id_based_newer != ov->name_based_newer && !(os_atmc_get_state(ov->id_based_newer) & rollbacked)) {
495 0 : _os_rollback(ov->id_based_newer, store);
496 : }
497 : }
498 :
499 : static int
500 5627 : os_rollback(objectversion *ov, sqlstore *store)
501 : {
502 5627 : _os_rollback(ov, store);
503 :
504 5627 : return LOG_OK;
505 : }
506 :
507 : static void
508 26135 : ov_destroy_obj_recursive(sqlstore* store, objectversion *ov)
509 : {
510 26135 : if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
511 14886 : ov_destroy_obj_recursive(store, ov->id_based_older);
512 : }
513 26135 : if (ov->os->destroy && ov->b) {
514 26135 : ov->os->destroy(store, ov->b);
515 26135 : ov->b = NULL;
516 : }
517 26135 : }
518 :
519 : static inline void
520 11445 : try_to_mark_deleted_for_destruction(sqlstore* store, objectversion *ov)
521 : {
522 11445 : ATOMIC_BASE_TYPE expected_deleted = deleted;
523 11445 : if (ATOMIC_CAS(&ov->state, &expected_deleted, under_destruction)) {
524 :
525 11445 : if (!ov->name_based_newer || (os_atmc_get_state(ov->name_based_newer) & rollbacked)) {
526 11072 : os_remove_name_based_chain(ov->os, ov);
527 : }
528 : else {
529 373 : lock_writer(ov->os);
530 373 : ov->name_based_newer->name_based_older = NULL;
531 373 : unlock_writer(ov->os);
532 : }
533 :
534 11445 : if (!ov->id_based_newer || (os_atmc_get_state(ov->id_based_newer) & rollbacked)) {
535 11394 : os_remove_id_based_chain(ov->os, ov);
536 : }
537 : else {
538 51 : lock_writer(ov->os);
539 51 : ov->id_based_newer->id_based_older = NULL;
540 51 : unlock_writer(ov->os);
541 : }
542 :
543 11445 : ov->ts = store_get_timestamp(store)+1;
544 11445 : if (!ov->os->nested)
545 11249 : ov_destroy_obj_recursive(store, ov);
546 : }
547 11445 : }
548 :
549 : static void
550 31770 : objectversion_destroy_recursive(sqlstore* store, objectversion *ov)
551 : {
552 31770 : if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
553 15082 : objectversion_destroy_recursive(store, ov->id_based_older);
554 : }
555 31770 : objectversion_destroy(store, ov->os, ov);
556 31770 : }
557 :
558 : static int
559 339395 : os_cleanup(sqlstore* store, objectversion *ov, ulng oldest)
560 : {
561 339395 : if (os_atmc_get_state(ov) & under_destruction) {
562 15770 : if (ov->ts < oldest) {
563 : // This one is ready to be freed
564 11445 : objectversion_destroy_recursive(store, ov);
565 11445 : return LOG_ERR;
566 : }
567 :
568 : // not yet old enough to be safely removed. Try later.
569 : return LOG_OK;
570 : }
571 :
572 323625 : if (os_atmc_get_state(ov) & rollbacked) {
573 17756 : if (ov->ts < oldest) {
574 : // This one is ready to be freed
575 5627 : if (ov->name_based_older && ov->name_based_older->name_based_newer == ov)
576 495 : ov->name_based_older->name_based_newer=NULL;
577 5627 : if (ov->id_based_older && ov->id_based_older->id_based_newer == ov)
578 458 : ov->id_based_older->id_based_newer=NULL;
579 5627 : objectversion_destroy(store, ov->os, ov);
580 5627 : return LOG_ERR;
581 : }
582 :
583 12129 : if (ov->ts > TRANSACTION_ID_BASE) {
584 : /* We mark it with the latest possible starttime and reinsert it into the cleanup list.
585 : * This will cause a safe eventual destruction of this rollbacked ov.
586 : */
587 5627 : ov->ts = store_get_timestamp(store)+1;
588 : }
589 :
590 : // not yet old enough to be safely removed. Try later.
591 12129 : return LOG_OK;
592 : }
593 :
594 305869 : if (os_atmc_get_state(ov) == deleted) {
595 11527 : if (ov->ts <= oldest) {
596 : // the oldest relevant state is deleted so lets try to mark it as destroyed
597 11445 : try_to_mark_deleted_for_destruction(store, ov);
598 11445 : return LOG_OK+1;
599 : }
600 :
601 : // Keep it inplace on the cleanup list, either because it is now marked for destruction or
602 : // we want to retry marking it for destruction later.
603 : return LOG_OK;
604 : }
605 :
606 294342 : assert(os_atmc_get_state(ov) != deleted && os_atmc_get_state(ov) != under_destruction && os_atmc_get_state(ov) != rollbacked);
607 294342 : if (ov->os->temporary) os_destroy(ov->os, store); // TODO transaction_layer_revamp: embed into refcounting subproject: (old) live versions should drop their reference to the os
608 :
609 303718 : while (ov->id_based_older && ov->id_based_older == ov->name_based_older && ov->ts >= oldest) {
610 : ov = ov->id_based_older;
611 : }
612 :
613 294342 : if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
614 : // Destroy everything older then the oldest possibly relevant objectversion.
615 5243 : objectversion_destroy_recursive(store, ov->id_based_older);
616 5243 : ov->id_based_older = NULL;
617 : }
618 :
619 : return LOG_ERR;
620 : }
621 :
622 : static int
623 339402 : tc_gc_objectversion(sql_store store, sql_change *change, ulng oldest)
624 : {
625 : // assert(!change->handled);
626 339402 : objectversion *ov = (objectversion*)change->data;
627 :
628 339402 : if (oldest >= TRANSACTION_ID_BASE)
629 : return 0;
630 339395 : int res = os_cleanup( (sqlstore*) store, ov, oldest);
631 339395 : change->handled = (res)?true:false;
632 339395 : return res>=0?LOG_OK:LOG_ERR;
633 : }
634 :
635 : static int
636 311421 : tc_commit_objectversion(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
637 : {
638 311421 : objectversion *ov = (objectversion*)change->data;
639 311421 : if (commit_ts) {
640 305794 : assert(ov->ts == tr->tid);
641 305794 : ov->ts = commit_ts;
642 305794 : change->committed = commit_ts < TRANSACTION_ID_BASE ? true: false;
643 305794 : (void)oldest;
644 305794 : if (!tr->parent)
645 305787 : change->obj->new = 0;
646 305794 : if (!ov->os->temporary)
647 301145 : ATOMIC_INC(&tr->cat->schema_version);
648 : } else {
649 5627 : os_rollback(ov, tr->store);
650 : }
651 :
652 311421 : return LOG_OK;
653 : }
654 :
655 : objectset *
656 61296 : os_new(allocator *sa, destroy_fptr destroy, bool temporary, bool unique, bool concurrent, bool nested, sql_store store)
657 : {
658 61296 : assert(!sa);
659 61296 : objectset *os = SA_NEW(sa, objectset);
660 61296 : if (os) {
661 61296 : *os = (objectset) {
662 : .refcnt = ATOMIC_VAR_INIT(1),
663 : .sa = sa,
664 : .destroy = destroy,
665 : .temporary = temporary,
666 : .unique = unique,
667 : .concurrent = concurrent,
668 : .nested = nested,
669 : .store = store
670 : };
671 61296 : os->destroy = destroy;
672 61296 : MT_rwlock_init(&os->rw_lock, "sa_readers_lock");
673 61296 : MT_lock_init(&os->lock, "single_writer_lock");
674 : }
675 :
676 61296 : return os;
677 : }
678 :
679 : objectset *
680 4920 : os_dup(objectset *os)
681 : {
682 4920 : ATOMIC_INC(&os->refcnt);
683 4920 : return os;
684 : }
685 :
686 : void
687 66158 : os_destroy(objectset *os, sql_store store)
688 : {
689 66158 : if (ATOMIC_DEC(&os->refcnt) > 0)
690 : return;
691 61238 : MT_lock_destroy(&os->lock);
692 61238 : MT_rwlock_destroy(&os->rw_lock);
693 61238 : versionhead* n=os->id_based_h;
694 334177 : while(n) {
695 272939 : objectversion *ov = n->ov;
696 546374 : while(ov) {
697 273435 : objectversion *older = ov->id_based_older;
698 273435 : objectversion_destroy(store, os, ov);
699 273435 : ov = older;
700 : }
701 272939 : versionhead* hn =n->next;
702 272939 : node_destroy(os, store, n);
703 272939 : n = hn;
704 : }
705 :
706 61238 : n=os->name_based_h;
707 334177 : while(n) {
708 272939 : versionhead* hn =n->next;
709 272939 : node_destroy(os, store, n);
710 272939 : n = hn;
711 : }
712 :
713 61238 : if (os->id_map)
714 3758 : hash_destroy(os->id_map);
715 :
716 61238 : if (os->name_map)
717 3731 : hash_destroy(os->name_map);
718 :
719 61238 : if (!os->sa)
720 61238 : _DELETE(os);
721 : }
722 :
723 : static versionhead *
724 19583098 : find_name(objectset *os, const char *name)
725 : {
726 19583098 : lock_reader(os);
727 19697462 : if (os->name_map) {
728 16051828 : int key = hash_key(name);
729 16051828 : sql_hash_e *he = os->name_map->buckets[key&(os->name_map->size-1)];
730 :
731 88948054 : for (; he; he = he->chain) {
732 88573382 : versionhead *n = he->value;
733 :
734 88573382 : if (n && n->ov->b->name && strcmp(n->ov->b->name, name) == 0) {
735 15677156 : unlock_reader(os);
736 15677156 : return n;
737 : }
738 : }
739 374672 : unlock_reader(os);
740 374672 : return NULL;
741 : }
742 :
743 4352151 : for (versionhead *n = os->name_based_h; n; n = n->next) {
744 4046947 : objectversion *ov = n->ov;
745 :
746 : /* check if names match */
747 4046947 : if (name[0] == ov->b->name[0] && strcmp(name, ov->b->name) == 0) {
748 3340430 : unlock_reader(os);
749 3340430 : return n;
750 : }
751 : }
752 :
753 305204 : unlock_reader(os);
754 305204 : return NULL;
755 : }
756 :
757 : static objectversion*
758 20850276 : get_valid_object_name(sql_trans *tr, objectversion *ov, bool lock)
759 : {
760 20850443 : while(ov) {
761 20850412 : if (ov->ts == tr->tid || (tr->parent && tr_version_of_parent(tr, ov->ts)) || ov->ts < tr->ts)
762 20828929 : return ov;
763 : else {
764 167 : if (lock)
765 153 : lock_reader(ov->os);
766 167 : objectversion* name_based_older = ov->name_based_older;
767 14 : if (lock)
768 153 : unlock_reader(ov->os);
769 : ov = name_based_older;
770 : }
771 : }
772 : return ov;
773 : }
774 :
775 : static objectversion*
776 590921 : get_valid_object_id(sql_trans *tr, objectversion *ov, bool lock)
777 : {
778 591083 : while(ov) {
779 590974 : if (ov->ts == tr->tid || (tr->parent && tr_version_of_parent(tr, ov->ts)) || ov->ts < tr->ts)
780 590812 : return ov;
781 : else {
782 162 : if (lock)
783 40 : lock_reader(ov->os);
784 162 : objectversion* id_based_older = ov->id_based_older;
785 122 : if (lock)
786 40 : unlock_reader(ov->os);
787 : ov = id_based_older;
788 : }
789 : }
790 : return ov;
791 : }
792 :
793 : static int
794 299492 : os_add_name_based(objectset *os, struct sql_trans *tr, const char *name, objectversion *ov) {
795 299492 : versionhead *name_based_node = NULL;
796 299492 : if (ov->id_based_older && strcmp(ov->id_based_older->b->name, name) == 0)
797 9648 : name_based_node = ov->id_based_older->name_based_head;
798 289844 : else if (os->unique) // Previous name based objectversion is of a different id, so now we do have to perform an extensive look up
799 87869 : name_based_node = find_name(os, name);
800 : // else names are not unique and each id based version head maps to its own name based version head.
801 :
802 97517 : if (name_based_node) {
803 10037 : objectversion *co = name_based_node->ov;
804 10037 : objectversion *oo = get_valid_object_name(tr, co, true);
805 10037 : if (co != oo) { /* conflict ? */
806 6 : TRC_WARNING(SQL_STORE, "%s" "if (co != oo) { /* conflict ? */", __func__);
807 6 : return -3;
808 : }
809 :
810 10031 : assert(ov != oo); // Time loops are not allowed
811 :
812 10031 : bte state = os_atmc_get_state(oo);
813 10031 : if (state != active) {
814 : // This can only happen if the parent oo was a committed deleted at some point.
815 416 : assert(state == deleted || state == under_destruction || state == block_destruction);
816 : /* Since our parent oo is committed deleted objectversion, we might have a conflict with
817 : * another transaction that tries to clean up oo or also wants to add a new objectversion.
818 : */
819 416 : ATOMIC_BASE_TYPE expected_deleted = deleted;
820 416 : if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) {
821 0 : TRC_WARNING(SQL_STORE, "%s: " "if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) { /*conflict with cleaner or write-write conflict*/ ", __func__);
822 0 : return -3; /*conflict with cleaner or write-write conflict*/
823 : }
824 : }
825 :
826 : /* new object with same name within transaction, should have a delete in between */
827 9615 : assert(!(state == active && oo->ts == ov->ts && !(os_atmc_get_state(ov) & deleted)));
828 :
829 10031 : lock_writer(os);
830 10031 : ov->name_based_head = oo->name_based_head;
831 10031 : ov->name_based_older = oo;
832 :
833 10031 : name_based_node->ov = ov;
834 10031 : if (oo) {
835 10031 : oo->name_based_newer = ov;
836 : // if the parent was originally deleted, we restore it to that state.
837 10031 : os_atmc_set_state(oo, state);
838 : }
839 10031 : unlock_writer(os);
840 10031 : return 0;
841 : } else { /* new */
842 289455 : if (os_append_name(os, ov) == NULL)
843 : return -1; // MALLOC_FAIL
844 : return 0;
845 : }
846 : }
847 :
848 : static int
849 299502 : os_add_id_based(objectset *os, struct sql_trans *tr, sqlid id, objectversion *ov) {
850 299502 : versionhead *id_based_node;
851 :
852 299502 : id_based_node = find_id(os, id);
853 :
854 299502 : if (id_based_node) {
855 9682 : objectversion *co = id_based_node->ov;
856 9682 : objectversion *oo = get_valid_object_id(tr, co, true);
857 9682 : if (co != oo) { /* conflict ? */
858 10 : TRC_WARNING(SQL_STORE, "%s" "if (co != oo) { /* conflict ? */", __func__);
859 10 : return -3;
860 : }
861 :
862 9672 : assert(ov != oo); // Time loops are not allowed
863 :
864 9672 : bte state = os_atmc_get_state(oo);
865 9672 : if (state != active) {
866 : // This can only happen if the parent oo was a committed deleted at some point.
867 57 : assert(state == deleted || state == under_destruction || state == block_destruction);
868 : /* Since our parent oo is committed deleted objectversion, we might have a conflict with
869 : * another transaction that tries to clean up oo or also wants to add a new objectversion.
870 : */
871 57 : ATOMIC_BASE_TYPE expected_deleted = deleted;
872 57 : if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) {
873 0 : TRC_WARNING(SQL_STORE, "%s" "if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) { /*conflict with cleaner or write-write conflict*/", __func__);
874 0 : return -3; /*conflict with cleaner or write-write conflict*/
875 : }
876 : }
877 :
878 9672 : lock_writer(os);
879 9672 : ov->id_based_head = oo->id_based_head;
880 9672 : ov->id_based_older = oo;
881 :
882 9672 : id_based_node->ov = ov;
883 9672 : if (oo) {
884 9672 : oo->id_based_newer = ov;
885 : // if the parent was originally deleted, we restore it to that state.
886 9672 : os_atmc_set_state(oo, state);
887 : }
888 9672 : unlock_writer(os);
889 9672 : return 0;
890 : } else { /* new */
891 289820 : if (os_append_id(os, ov) == NULL)
892 : return -1; // MALLOC_FAIL
893 :
894 : return 0;
895 : }
896 : }
897 :
898 : static int /*ok, error (name existed) and conflict (added before) */
899 299503 : os_add_(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
900 : {
901 299503 : int res = 0;
902 299503 : objectversion *ov = SA_NEW(os->sa, objectversion);
903 :
904 299503 : *ov = (objectversion) {
905 299503 : .ts = tr->tid,
906 : .b = b,
907 : .os = os,
908 : };
909 :
910 299503 : if (!os->concurrent && os_has_changes(os, tr)) { /* for object sets without concurrent support, conflict if concurrent changes are there */
911 1 : if (os->destroy)
912 1 : os->destroy(os->store, ov->b);
913 1 : _DELETE(ov);
914 1 : TRC_WARNING(SQL_STORE, "%s" "if (!os->concurrent && os_has_changes(os, tr)) { /* for object sets without concurrent support, conflict if concurrent changes are there */", __func__);
915 1 : return -3; /* conflict */
916 : }
917 :
918 299502 : if ((res = os_add_id_based(os, tr, b->id, ov))) {
919 10 : if (os->destroy)
920 10 : os->destroy(os->store, ov->b);
921 10 : _DELETE(ov);
922 10 : return res;
923 : }
924 :
925 299492 : if ((res = os_add_name_based(os, tr, name, ov))) {
926 6 : trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
927 6 : return res;
928 : }
929 :
930 299486 : if (os->temporary) (void) os_dup(os); // TODO transaction_layer_revamp: embed into refcounting subproject
931 299486 : trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
932 299486 : return res;
933 : }
934 :
935 : int
936 299503 : os_add(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
937 : {
938 299503 : MT_lock_set(&os->lock);
939 299503 : int res = os_add_(os, tr, name, b);
940 299503 : MT_lock_unset(&os->lock);
941 299503 : return res;
942 : }
943 :
944 : static int
945 11922 : os_del_name_based(objectset *os, struct sql_trans *tr, const char *name, objectversion *ov) {
946 11922 : versionhead *name_based_node = NULL;
947 11922 : if (ov->id_based_older && strcmp(ov->id_based_older->b->name, name) == 0)
948 11922 : name_based_node = ov->id_based_older->name_based_head;
949 0 : else if (os->unique) // Previous name based objectversion is of a different id, so now we do have to perform an extensive look up
950 0 : name_based_node = find_name(os, name);
951 :
952 11922 : if (name_based_node) {
953 11922 : objectversion *co = name_based_node->ov;
954 11922 : objectversion *oo = get_valid_object_name(tr, co, true);
955 11922 : ov->name_based_head = oo->name_based_head;
956 11922 : if (co != oo) { /* conflict ? */
957 0 : TRC_WARNING(SQL_STORE, "%s: " "if (co != oo) { /* conflict ? */", __func__);
958 0 : return -3;
959 : }
960 11922 : ov->name_based_older = oo;
961 :
962 11922 : lock_writer(os);
963 11922 : if (oo) {
964 11922 : oo->name_based_newer = ov;
965 11922 : assert(os_atmc_get_state(oo) == active);
966 : }
967 11922 : name_based_node->ov = ov;
968 11922 : unlock_writer(os);
969 11922 : return 0;
970 : } else {
971 : /* missing */
972 0 : return -1;
973 : }
974 : }
975 :
976 : static int
977 11924 : os_del_id_based(objectset *os, struct sql_trans *tr, sqlid id, objectversion *ov) {
978 :
979 11924 : versionhead *id_based_node;
980 :
981 11924 : if (ov->name_based_older && ov->name_based_older->b->id == id)
982 0 : id_based_node = ov->name_based_older->id_based_head;
983 : else // Previous id based objectversion is of a different name, so now we do have to perform an extensive look up
984 11924 : id_based_node = find_id(os, id);
985 :
986 11924 : if (id_based_node) {
987 11924 : objectversion *co = id_based_node->ov;
988 11924 : objectversion *oo = get_valid_object_id(tr, co, true);
989 11924 : ov->id_based_head = oo->id_based_head;
990 11924 : if (co != oo) { /* conflict ? */
991 2 : TRC_WARNING(SQL_STORE, "%s" "if (co != oo) { /* conflict ? */", __func__);
992 2 : return -3;
993 : }
994 11922 : ov->id_based_older = oo;
995 :
996 11922 : lock_writer(os);
997 11922 : if (oo) {
998 11922 : oo->id_based_newer = ov;
999 11922 : assert(os_atmc_get_state(oo) == active);
1000 : }
1001 11922 : id_based_node->ov = ov;
1002 11922 : unlock_writer(os);
1003 11922 : return 0;
1004 : } else {
1005 : /* missing */
1006 : return -1;
1007 : }
1008 : }
1009 :
1010 : static int
1011 11924 : os_del_(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
1012 : {
1013 11924 : int res = 0;
1014 11924 : objectversion *ov = SA_NEW(os->sa, objectversion);
1015 :
1016 11924 : *ov = (objectversion) {
1017 11924 : .ts = tr->tid,
1018 : .b = b,
1019 : .os = os,
1020 : };
1021 11924 : os_atmc_set_state(ov, deleted);
1022 :
1023 11924 : if ((res = os_del_id_based(os, tr, b->id, ov))) {
1024 2 : if (os->destroy)
1025 2 : os->destroy(os->store, ov->b);
1026 2 : _DELETE(ov);
1027 2 : return res;
1028 : }
1029 :
1030 11922 : if ((res = os_del_name_based(os, tr, name, ov))) {
1031 0 : trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
1032 0 : return res;
1033 : }
1034 :
1035 11922 : if (os->temporary) (void) os_dup(os); // TODO transaction_layer_revamp: embed into refcounting subproject
1036 11922 : trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
1037 11922 : return res;
1038 : }
1039 :
1040 : int
1041 11924 : os_del(objectset *os, sql_trans *tr, const char *name, sql_base *b)
1042 : {
1043 11924 : MT_lock_set(&os->lock);
1044 11924 : int res = os_del_(os, tr, name, b);
1045 11924 : MT_lock_unset(&os->lock);
1046 11924 : return res;
1047 : }
1048 :
1049 : int
1050 323 : os_size(objectset *os, struct sql_trans *tr)
1051 : {
1052 323 : int cnt = 0;
1053 323 : if (os) {
1054 323 : lock_reader(os);
1055 332 : for(versionhead *n = os->name_based_h; n; n=n->next) {
1056 9 : objectversion *ov = n->ov;
1057 9 : assert(os == ov->os);
1058 9 : if ((ov=get_valid_object_name(tr, ov, false)) && os_atmc_get_state(ov) == active)
1059 6 : cnt++;
1060 : }
1061 323 : unlock_reader(os);
1062 : }
1063 323 : return cnt;
1064 : }
1065 :
1066 : int
1067 0 : os_empty(objectset *os, struct sql_trans *tr)
1068 : {
1069 0 : return os_size(os, tr)==0;
1070 : }
1071 :
1072 : int
1073 0 : os_remove(objectset *os, sql_trans *tr, const char *name)
1074 : {
1075 0 : (void) os;
1076 0 : (void) tr;
1077 0 : (void) name;
1078 : // TODO remove entire versionhead corresponding to this name.
1079 :
1080 : // TODO assert os->unique?s
1081 0 : return LOG_OK;
1082 : }
1083 :
1084 : sql_base *
1085 19507677 : os_find_name(objectset *os, struct sql_trans *tr, const char *name)
1086 : {
1087 19507677 : if (!os)
1088 : return NULL;
1089 :
1090 19507675 : assert(os->unique);
1091 19507675 : versionhead *n = find_name(os, name);
1092 :
1093 19675580 : if (n) {
1094 19083172 : objectversion *ov = get_valid_object_name(tr, n->ov, true);
1095 19028869 : if (ov && os_atmc_get_state(ov) == active)
1096 19028209 : return ov->b;
1097 : }
1098 : return NULL;
1099 : }
1100 :
1101 : sql_base *
1102 158609 : os_find_id(objectset *os, struct sql_trans *tr, sqlid id)
1103 : {
1104 158609 : if (!os)
1105 : return NULL;
1106 158609 : versionhead *n = find_id(os, id);
1107 :
1108 158609 : if (n) {
1109 158069 : objectversion *ov = get_valid_object_id(tr, n->ov, true);
1110 158069 : if (ov && os_atmc_get_state(ov) == active)
1111 158065 : return ov->b;
1112 : }
1113 : return NULL;
1114 : }
1115 :
1116 : void
1117 2228019 : os_iterator(struct os_iter *oi, struct objectset *os, struct sql_trans *tr, const char *name /*optional*/)
1118 : {
1119 2228019 : *oi = (struct os_iter) {
1120 : .os = os,
1121 : .tr = tr,
1122 : .name = name,
1123 : };
1124 :
1125 2228019 : lock_reader(os);
1126 2229128 : if (name && os->name_map) {
1127 1143983 : int key = hash_key(name);
1128 1143983 : oi->n = (void*)os->name_map->buckets[key&(os->name_map->size-1)];
1129 : } else
1130 1085145 : oi->n = os->name_based_h;
1131 2229128 : unlock_reader(os);
1132 2229376 : }
1133 :
1134 : sql_base *
1135 4326568 : oi_next(struct os_iter *oi)
1136 : {
1137 4326568 : sql_base *b = NULL;
1138 :
1139 4326568 : if (oi->name) {
1140 3282035 : lock_reader(oi->os); /* intentionally outside of while loop */
1141 3282060 : if (oi->os->name_map) {
1142 2896188 : sql_hash_e *he = (void*)oi->n;
1143 :
1144 6981966 : for (; he && !b; he = he->chain) {
1145 4085779 : versionhead *n = he->value;
1146 :
1147 4085779 : if (n->ov->b->name && strcmp(n->ov->b->name, oi->name) == 0) {
1148 1763753 : objectversion *ov = n->ov;
1149 :
1150 1763753 : assert(oi->os == ov->os);
1151 1763753 : ov = get_valid_object_name(oi->tr, ov, false);
1152 1763752 : if (ov && os_atmc_get_state(ov) == active)
1153 1761985 : b = ov->b;
1154 : }
1155 : }
1156 2896187 : oi->n = (void*)he;
1157 : } else {
1158 385872 : versionhead *n = oi->n;
1159 :
1160 432854 : while (n && !b) {
1161 :
1162 46982 : if (n->ov->b->name && strcmp(n->ov->b->name, oi->name) == 0) {
1163 12225 : objectversion *ov = n->ov;
1164 :
1165 12225 : n = oi->n = n->next;
1166 12225 : assert(oi->os == ov->os);
1167 12225 : ov = get_valid_object_name(oi->tr, ov, false);
1168 12225 : if (ov && os_atmc_get_state(ov) == active)
1169 12225 : b = ov->b;
1170 : } else {
1171 34757 : n = oi->n = n->next;
1172 : }
1173 : }
1174 : }
1175 3282059 : unlock_reader(oi->os);
1176 : } else {
1177 1044533 : versionhead *n = oi->n;
1178 :
1179 1044533 : lock_reader(oi->os); /* intentionally outside of while loop */
1180 2500384 : while (n && !b) {
1181 411279 : objectversion *ov = n->ov;
1182 411279 : n = oi->n = n->next;
1183 :
1184 411279 : assert(oi->os == ov->os);
1185 411279 : ov = get_valid_object_id(oi->tr, ov, false);
1186 411246 : if (ov && os_atmc_get_state(ov) == active)
1187 389743 : b = ov->b;
1188 : }
1189 1044572 : unlock_reader(oi->os);
1190 : }
1191 4327184 : return b;
1192 : }
1193 :
1194 : bool
1195 3555 : os_obj_intransaction(objectset *os, struct sql_trans *tr, sql_base *b)
1196 : {
1197 3555 : versionhead *n = find_id(os, b->id);
1198 :
1199 3555 : if (n) {
1200 3555 : objectversion *ov = n->ov;
1201 :
1202 3555 : if (ov && os_atmc_get_state(ov) == active && ov->ts == tr->tid)
1203 : return true;
1204 : }
1205 : return false;
1206 : }
1207 :
1208 : /* return true if this object set has changes pending for an other transaction */
1209 : bool
1210 196340 : os_has_changes(objectset *os, struct sql_trans *tr)
1211 : {
1212 196340 : versionhead *n = os->id_based_t;
1213 :
1214 196340 : if (n) {
1215 191754 : objectversion *ov = n->ov;
1216 :
1217 191754 : if (ov && os_atmc_get_state(ov) == active && ov->ts != tr->tid && ov->ts > TRANSACTION_ID_BASE)
1218 : return true;
1219 : }
1220 : return false;
1221 : }
|