Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "sql_catalog.h"
15 : #include "sql_storage.h"
16 :
17 : #include "gdk_atoms.h"
18 :
19 : struct versionhead ;
20 :
21 : #define active (0)
22 : #define under_destruction (1<<1)
23 : #define block_destruction (1<<2)
24 : #define deleted (1<<3)
25 : #define rollbacked (1<<4)
26 :
27 : /* This objectversion owns its associated versionhead.
28 : * When this objectversion gets destroyed,
29 : * the cleanup procedure should also destroy the associated (name|id) based versionhead.*/
30 : #define name_based_versionhead_owner (1<<5)
31 : #define id_based_versionhead_owner (1<<6)
32 :
33 : typedef struct objectversion {
34 : ulng ts;
35 : ATOMIC_TYPE state;
36 : sql_base *b; // base of underlying sql object
37 : struct objectset* os;
38 : struct objectversion *name_based_older;
39 : struct objectversion *name_based_newer;
40 : struct versionhead *name_based_head;
41 :
42 : struct objectversion *id_based_older;
43 : struct objectversion *id_based_newer;
44 : struct versionhead *id_based_head;
45 : } objectversion;
46 :
47 : typedef struct versionhead {
48 : struct versionhead * prev;
49 : struct versionhead * next;
50 : objectversion* ov;
51 : } versionhead ;
52 :
53 : typedef struct objectset {
54 : ATOMIC_TYPE refcnt;
55 : allocator *sa;
56 : destroy_fptr destroy;
57 : MT_RWLock rw_lock; /*readers-writer lock to protect the links (chains) in the objectversion chain.*/
58 : versionhead *name_based_h;
59 : versionhead *name_based_t;
60 : versionhead *id_based_h;
61 : versionhead *id_based_t;
62 : int name_based_cnt;
63 : int id_based_cnt;
64 : struct sql_hash *name_map;
65 : struct sql_hash *id_map;
66 : bool
67 : temporary:1,
68 : unique:1, /* names are unique */
69 : concurrent:1, /* concurrent inserts are allowed */
70 : nested:1;
71 : sql_store store;
72 : } objectset;
73 :
74 : static int
75 335512 : os_id_key(versionhead *n)
76 : {
77 259048 : return (int) BATatoms[TYPE_int].atomHash(&n->ov->b->id);
78 : }
79 :
80 : static inline void
81 24325080 : lock_reader(objectset* os)
82 : {
83 24325080 : MT_rwlock_rdlock(&os->rw_lock);
84 1113571 : }
85 :
86 : static inline void
87 24426303 : unlock_reader(objectset* os)
88 : {
89 24426303 : MT_rwlock_rdunlock(&os->rw_lock);
90 3998888 : }
91 :
92 : static inline void
93 609806 : lock_writer(objectset* os)
94 : {
95 609806 : MT_rwlock_wrlock(&os->rw_lock);
96 : }
97 :
98 : static inline void
99 609806 : unlock_writer(objectset* os)
100 : {
101 609806 : MT_rwlock_wrunlock(&os->rw_lock);
102 260 : }
103 :
104 21508300 : static bte os_atmc_get_state(objectversion *ov) {
105 21508300 : bte state = (bte) ATOMIC_GET(&ov->state);
106 21508300 : return state;
107 : }
108 :
109 61754 : static void os_atmc_set_state(objectversion *ov, bte state) {
110 61754 : ATOMIC_SET(&ov->state, state);
111 0 : }
112 :
113 : static versionhead *
114 442357 : find_id(objectset *os, sqlid id)
115 : {
116 442357 : if (os) {
117 442357 : lock_reader(os);
118 442357 : if (os->id_map) {
119 409879 : int key = (int) BATatoms[TYPE_int].atomHash(&id);
120 409879 : sql_hash_e *he = os->id_map->buckets[key&(os->id_map->size-1)];
121 :
122 1506861 : for (; he; he = he->chain) {
123 1265427 : versionhead *n = he->value;
124 :
125 1265427 : if (n && n->ov->b->id == id) {
126 168445 : unlock_reader(os);
127 168445 : return n;
128 : }
129 : }
130 241434 : unlock_reader(os);
131 241434 : return NULL;
132 : }
133 :
134 99254 : for (versionhead *n = os->id_based_h; n; n = n->next) {
135 69266 : objectversion *ov = n->ov;
136 :
137 : /* check if ids match */
138 69266 : if (id == ov->b->id) {
139 2490 : unlock_reader(os);
140 2490 : return n;
141 : }
142 : }
143 29988 : unlock_reader(os);
144 : }
145 :
146 : return NULL;
147 : }
148 :
149 : // TODO copy of static function from sql_list.c. Needs to be made external
150 : static void
151 27938 : hash_delete(sql_hash *h, void *data)
152 : {
153 27938 : int key = h->key(data);
154 27938 : sql_hash_e *e, *p = h->buckets[key&(h->size-1)];
155 :
156 27938 : e = p;
157 45320 : for (; p && p->value != data ; p = p->chain)
158 17382 : e = p;
159 27938 : if (p && p->value == data) {
160 27938 : if (p == e)
161 23334 : h->buckets[key&(h->size-1)] = p->chain;
162 : else
163 4604 : e->chain = p->chain;
164 27938 : if (!h->sa)
165 27938 : _DELETE(p);
166 : }
167 27938 : h->entries--;
168 27938 : }
169 :
170 : static void
171 540450 : node_destroy(objectset *os, sqlstore *store, versionhead *n)
172 : {
173 540450 : if (!os->sa)
174 540450 : _DELETE(n);
175 540450 : (void)store;
176 540450 : }
177 :
178 : static versionhead *
179 14587 : os_remove_name_based_chain(objectset *os, objectversion* ov)
180 : {
181 14587 : lock_writer(os);
182 14587 : versionhead *n = ov->name_based_head;
183 14587 : versionhead *p = os->name_based_h;
184 14587 : if (p != n)
185 1117078 : while (p && p->next != n)
186 : p = p->next;
187 14587 : assert(p==n||(p && p->next == n));
188 14587 : if (p == n) {
189 1277 : os->name_based_h = n->next;
190 1277 : if (os->name_based_h) // i.e. non-empty os
191 786 : os->name_based_h->prev = NULL;
192 : p = NULL;
193 13310 : } else if ( p != NULL) {
194 13310 : p->next = n->next;
195 13310 : if (p->next) // node in the middle
196 5505 : p->next->prev = p;
197 : }
198 14587 : if (n == os->name_based_t)
199 8296 : os->name_based_t = p;
200 :
201 14587 : if (os->name_map && n)
202 13623 : hash_delete(os->name_map, n);
203 :
204 14587 : os->name_based_cnt--;
205 14587 : unlock_writer(os);
206 :
207 14587 : bte state = os_atmc_get_state(ov);
208 14587 : state |= name_based_versionhead_owner;
209 14587 : os_atmc_set_state(ov, state);
210 14587 : return p;
211 : }
212 :
213 : static versionhead *
214 14825 : os_remove_id_based_chain(objectset *os, objectversion* ov)
215 : {
216 14825 : lock_writer(os);
217 14825 : versionhead *n = ov->id_based_head;
218 14825 : versionhead *p = os->id_based_h;
219 :
220 14825 : if (p != n)
221 1139954 : while (p && p->next != n)
222 : p = p->next;
223 14825 : assert(p==n||(p && p->next == n));
224 14825 : if (p == n) {
225 1277 : os->id_based_h = n->next;
226 1277 : if (os->id_based_h) // i.e. non-empty os
227 786 : os->id_based_h->prev = NULL;
228 : p = NULL;
229 13548 : } else if ( p != NULL) {
230 13548 : p->next = n->next;
231 13548 : if (p->next) // node in the middle
232 5743 : p->next->prev = p;
233 : }
234 14825 : if (n == os->id_based_t)
235 8296 : os->id_based_t = p;
236 :
237 14825 : if (os->id_map && n)
238 14315 : hash_delete(os->id_map, n);
239 :
240 14825 : os->name_based_cnt--;
241 14825 : unlock_writer(os);
242 :
243 14825 : bte state = os_atmc_get_state(ov);
244 14825 : state |= id_based_versionhead_owner;
245 14825 : os_atmc_set_state(ov, state);
246 14825 : return p;
247 : }
248 :
249 : static versionhead *
250 541594 : node_create(allocator *sa, objectversion *ov)
251 : {
252 541594 : versionhead *n = SA_NEW(sa, versionhead );
253 :
254 541594 : if (n == NULL)
255 : return NULL;
256 541594 : *n = (versionhead ) {
257 : .ov = ov,
258 : };
259 541594 : return n;
260 : }
261 :
262 : static inline int
263 324868 : os_name_key(versionhead *n)
264 : {
265 324868 : return hash_key(n->ov->b->name);
266 : }
267 :
268 : static objectset *
269 270678 : os_append_node_name(objectset *os, versionhead *n)
270 : {
271 270678 : lock_writer(os);
272 270678 : if ((!os->name_map || os->name_map->size*16 < os->name_based_cnt) && os->name_based_cnt > HASH_MIN_SIZE) {
273 3947 : hash_destroy(os->name_map);
274 3947 : os->name_map = hash_new(os->sa, os->name_based_cnt, (fkeyvalue)& os_name_key);
275 3947 : if (os->name_map == NULL) {
276 0 : unlock_writer(os);
277 0 : return NULL;
278 : }
279 :
280 71143 : for (versionhead *n = os->name_based_h; n; n = n->next ) {
281 67196 : int key = os_name_key(n);
282 :
283 67196 : if (hash_add(os->name_map, key, n) == NULL) {
284 0 : unlock_writer(os);
285 0 : return NULL;
286 : }
287 : }
288 : }
289 :
290 270678 : if (os->name_map) {
291 244049 : int key = os->name_map->key(n);
292 :
293 244049 : if (hash_add(os->name_map, key, n) == NULL) {
294 0 : unlock_writer(os);
295 0 : return NULL;
296 : }
297 : }
298 :
299 270678 : if (os->name_based_t) {
300 262580 : os->name_based_t->next = n;
301 : } else {
302 8098 : os->name_based_h = n;
303 : }
304 270678 : n->prev = os->name_based_t; // aka the double linked list.
305 270678 : os->name_based_t = n;
306 270678 : os->name_based_cnt++;
307 270678 : unlock_writer(os);
308 270678 : return os;
309 : }
310 :
311 : static objectset *
312 270678 : os_append_name(objectset *os, objectversion *ov)
313 : {
314 270678 : versionhead *n = node_create(os->sa, ov);
315 :
316 270678 : if (n == NULL)
317 : return NULL;
318 :
319 270678 : ov->name_based_head = n;
320 270678 : if (!(os = os_append_node_name(os, n))){
321 0 : _DELETE(n);
322 0 : return NULL;
323 : }
324 :
325 : return os;
326 : }
327 :
328 : static void
329 4053 : os_append_id_map(objectset *os)
330 : {
331 4053 : if (os->id_map)
332 462 : hash_destroy(os->id_map);
333 4053 : os->id_map = hash_new(os->sa, os->id_based_cnt, (fkeyvalue)&os_id_key);
334 4053 : if (os->id_map == NULL)
335 : return ;
336 80517 : for (versionhead *n = os->id_based_h; n; n = n->next ) {
337 76464 : int key = os_id_key(n);
338 :
339 76464 : if (hash_add(os->id_map, key, n) == NULL) {
340 0 : hash_destroy(os->id_map);
341 0 : os->id_map = NULL;
342 0 : return ;
343 : }
344 : }
345 : }
346 :
347 : static objectset *
348 270916 : os_append_node_id(objectset *os, versionhead *n)
349 : {
350 270916 : lock_writer(os);
351 270916 : if ((!os->id_map || os->id_map->size*16 < os->id_based_cnt) && os->id_based_cnt > HASH_MIN_SIZE)
352 4053 : os_append_id_map(os); /* on failure just fall back to slow method */
353 :
354 270916 : if (os->id_map) {
355 244733 : int key = os->id_map->key(n);
356 244733 : if (hash_add(os->id_map, key, n) == NULL) {
357 0 : hash_destroy(os->id_map);
358 0 : os->id_map = NULL;
359 : /* fall back to slow search */
360 : }
361 : }
362 :
363 270916 : if (os->id_based_t) {
364 262818 : os->id_based_t->next = n;
365 : } else {
366 8098 : os->id_based_h = n;
367 : }
368 270916 : n->prev = os->id_based_t; // aka the double linked list.
369 270916 : os->id_based_t = n;
370 270916 : os->id_based_cnt++;
371 270916 : unlock_writer(os);
372 270916 : return os;
373 : }
374 :
375 : static objectset *
376 270916 : os_append_id(objectset *os, objectversion *ov)
377 : {
378 270916 : versionhead *n = node_create(os->sa, ov);
379 :
380 270916 : if (n == NULL)
381 : return NULL;
382 270916 : ov->id_based_head = n;
383 270916 : if (!(os = os_append_node_id(os, n))){
384 0 : _DELETE(n);
385 0 : return NULL;
386 : }
387 :
388 : return os;
389 : }
390 :
391 : static versionhead * find_name(objectset *os, const char *name);
392 :
393 : static void
394 289498 : objectversion_destroy(sqlstore *store, objectset* os, objectversion *ov)
395 : {
396 :
397 289498 : bte state = os_atmc_get_state(ov);
398 :
399 289498 : if (state & name_based_versionhead_owner) {
400 14587 : node_destroy(ov->os, store, ov->name_based_head);
401 : }
402 :
403 289498 : if (state & id_based_versionhead_owner) {
404 14825 : node_destroy(ov->os, store, ov->id_based_head);
405 : }
406 :
407 289498 : if (os->destroy && ov->b)
408 265028 : os->destroy(store, ov->b);
409 :
410 289498 : if (os->temporary && (state & deleted || state & under_destruction || state & rollbacked))
411 197 : os_destroy(os, store); // TODO transaction_layer_revamp: embed into refcounting subproject : reference is already dropped by os_cleanup
412 289498 : _DELETE(ov);
413 289498 : }
414 :
415 : static void
416 5397 : _os_rollback(objectversion *ov, sqlstore *store)
417 : {
418 5397 : assert(ov->ts >= TRANSACTION_ID_BASE);
419 :
420 5397 : bte state = os_atmc_get_state(ov);
421 5397 : if (state & rollbacked) {
422 : return;
423 : }
424 :
425 4906 : state |= rollbacked;
426 4906 : os_atmc_set_state(ov, state);
427 :
428 4906 : bte state_older;
429 :
430 : /*
431 : * We have to use the readers-writer lock here,
432 : * since the pointer containing the adress of the older objectversion might be concurrently overwritten if the older itself hass just been put in the under_destruction state .
433 : */
434 4906 : lock_reader(ov->os);
435 4906 : objectversion* name_based_older = ov->name_based_older;
436 4906 : unlock_reader(ov->os);
437 :
438 4906 : if (name_based_older && !((state_older= os_atmc_get_state(name_based_older)) & rollbacked)) {
439 723 : if (ov->ts != name_based_older->ts) {
440 : // older is last committed state or belongs to parent transaction.
441 : // In any case, we restore versionhead pointer to that.
442 :
443 237 : ATOMIC_BASE_TYPE expected_deleted = deleted;
444 237 : if (state_older == active || (state_older == deleted && ATOMIC_CAS(&name_based_older->state, &expected_deleted, block_destruction))) {
445 237 : ov->name_based_head->ov = name_based_older;
446 237 : name_based_older->name_based_newer=NULL;
447 237 : if (state_older != active && expected_deleted == deleted)
448 0 : os_atmc_set_state(name_based_older, deleted); //Restore the deleted older back to its deleted state.
449 : }
450 : }
451 : else {
452 486 : _os_rollback(name_based_older, store);
453 : }
454 : }
455 4183 : else if (!name_based_older) {
456 : // this is a terminal node. i.e. this objectversion does not have name based committed history
457 4183 : if (ov->name_based_head) // The oposite can happen during an early conflict in os_add or os_del.
458 4177 : os_remove_name_based_chain(ov->os, ov);
459 : }
460 :
461 : /*
462 : * We have to use the readers-writer lock here,
463 : * since the pointer containing the adress of the older objectversion might be concurrently overwritten if the older itself hass just been put in the under_destruction state .
464 : */
465 4906 : lock_reader(ov->os);
466 4906 : objectversion* id_based_older = ov->id_based_older;
467 4906 : unlock_reader(ov->os);
468 4906 : if (id_based_older && !((state_older= os_atmc_get_state(id_based_older)) & rollbacked)) {
469 242 : if (ov->ts != id_based_older->ts) {
470 : // older is last committed state or belongs to parent transaction.
471 : // In any case, we restore versionhead pointer to that.
472 :
473 237 : ATOMIC_BASE_TYPE expected_deleted = deleted;
474 237 : if (state_older == active || (state_older == deleted && ATOMIC_CAS(&id_based_older->state, &expected_deleted, block_destruction))) {
475 237 : ov->id_based_head->ov = id_based_older;
476 237 : id_based_older->id_based_newer=NULL;
477 237 : if (state_older != active && expected_deleted == deleted)
478 0 : os_atmc_set_state(id_based_older, deleted); //Restore the deleted older back to its deleted state.
479 : }
480 : }
481 5 : else if (id_based_older != name_based_older)
482 5 : _os_rollback(id_based_older, store);
483 : }
484 4219 : else if (!id_based_older) {
485 : // this is a terminal node. i.e. this objectversion does not have id based committed history
486 4219 : os_remove_id_based_chain(ov->os, ov);
487 : }
488 :
489 4906 : if (ov->name_based_newer && !(os_atmc_get_state(ov->name_based_newer) & rollbacked)) {
490 0 : _os_rollback(ov->name_based_newer, store);
491 : }
492 :
493 4906 : if (ov->id_based_newer && ov->id_based_newer != ov->name_based_newer && !(os_atmc_get_state(ov->id_based_newer) & rollbacked)) {
494 0 : _os_rollback(ov->id_based_newer, store);
495 : }
496 : }
497 :
498 : static int
499 4906 : os_rollback(objectversion *ov, sqlstore *store)
500 : {
501 4906 : _os_rollback(ov, store);
502 :
503 4906 : return LOG_OK;
504 : }
505 :
506 : static void
507 24470 : ov_destroy_obj_recursive(sqlstore* store, objectversion *ov)
508 : {
509 24470 : if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
510 13989 : ov_destroy_obj_recursive(store, ov->id_based_older);
511 : }
512 24470 : if (ov->os->destroy && ov->b) {
513 24470 : ov->os->destroy(store, ov->b);
514 24470 : ov->b = NULL;
515 : }
516 24470 : }
517 :
518 : static inline void
519 10638 : try_to_mark_deleted_for_destruction(sqlstore* store, objectversion *ov)
520 : {
521 10638 : ATOMIC_BASE_TYPE expected_deleted = deleted;
522 10638 : if (ATOMIC_CAS(&ov->state, &expected_deleted, under_destruction)) {
523 :
524 10638 : if (!ov->name_based_newer || (os_atmc_get_state(ov->name_based_newer) & rollbacked)) {
525 10410 : os_remove_name_based_chain(ov->os, ov);
526 : }
527 : else {
528 228 : lock_writer(ov->os);
529 228 : ov->name_based_newer->name_based_older = NULL;
530 228 : unlock_writer(ov->os);
531 : }
532 :
533 10638 : if (!ov->id_based_newer || (os_atmc_get_state(ov->id_based_newer) & rollbacked)) {
534 10606 : os_remove_id_based_chain(ov->os, ov);
535 : }
536 : else {
537 32 : lock_writer(ov->os);
538 32 : ov->id_based_newer->id_based_older = NULL;
539 32 : unlock_writer(ov->os);
540 : }
541 :
542 10638 : ov->ts = store_get_timestamp(store)+1;
543 10638 : if (!ov->os->nested)
544 10481 : ov_destroy_obj_recursive(store, ov);
545 : }
546 10638 : }
547 :
548 : static void
549 28524 : objectversion_destroy_recursive(sqlstore* store, objectversion *ov)
550 : {
551 28524 : if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
552 14146 : objectversion_destroy_recursive(store, ov->id_based_older);
553 : }
554 28524 : objectversion_destroy(store, ov->os, ov);
555 28524 : }
556 :
557 : static int
558 316123 : os_cleanup(sqlstore* store, objectversion *ov, ulng oldest)
559 : {
560 316123 : if (os_atmc_get_state(ov) & under_destruction) {
561 15181 : if (ov->ts < oldest) {
562 : // This one is ready to be freed
563 10638 : objectversion_destroy_recursive(store, ov);
564 10638 : return LOG_ERR;
565 : }
566 :
567 : // not yet old enough to be safely removed. Try later.
568 : return LOG_OK;
569 : }
570 :
571 300942 : if (os_atmc_get_state(ov) & rollbacked) {
572 15705 : if (ov->ts < oldest) {
573 : // This one is ready to be freed
574 4906 : if (ov->name_based_older && ov->name_based_older->name_based_newer == ov)
575 486 : ov->name_based_older->name_based_newer=NULL;
576 4906 : if (ov->id_based_older && ov->id_based_older->id_based_newer == ov)
577 450 : ov->id_based_older->id_based_newer=NULL;
578 4906 : objectversion_destroy(store, ov->os, ov);
579 4906 : return LOG_ERR;
580 : }
581 :
582 10799 : if (ov->ts > TRANSACTION_ID_BASE) {
583 : /* We mark it with the latest possible starttime and reinsert it into the cleanup list.
584 : * This will cause a safe eventual destruction of this rollbacked ov.
585 : */
586 4906 : ov->ts = store_get_timestamp(store)+1;
587 : }
588 :
589 : // not yet old enough to be safely removed. Try later.
590 10799 : return LOG_OK;
591 : }
592 :
593 285237 : if (os_atmc_get_state(ov) == deleted) {
594 10711 : if (ov->ts <= oldest) {
595 : // the oldest relevant state is deleted so lets try to mark it as destroyed
596 10638 : try_to_mark_deleted_for_destruction(store, ov);
597 10638 : return LOG_OK+1;
598 : }
599 :
600 : // Keep it inplace on the cleanup list, either because it is now marked for destruction or
601 : // we want to retry marking it for destruction later.
602 : return LOG_OK;
603 : }
604 :
605 274526 : assert(os_atmc_get_state(ov) != deleted && os_atmc_get_state(ov) != under_destruction && os_atmc_get_state(ov) != rollbacked);
606 274526 : if (ov->os->temporary) os_destroy(ov->os, store); // TODO transaction_layer_revamp: embed into refcounting subproject: (old) live versions should drop their reference to the os
607 :
608 282323 : while (ov->id_based_older && ov->id_based_older == ov->name_based_older && ov->ts >= oldest) {
609 : ov = ov->id_based_older;
610 : }
611 :
612 274526 : if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
613 : // Destroy everything older then the oldest possibly relevant objectversion.
614 3740 : objectversion_destroy_recursive(store, ov->id_based_older);
615 3740 : ov->id_based_older = NULL;
616 : }
617 :
618 : return LOG_ERR;
619 : }
620 :
621 : static int
622 316130 : tc_gc_objectversion(sql_store store, sql_change *change, ulng oldest)
623 : {
624 : // assert(!change->handled);
625 316130 : objectversion *ov = (objectversion*)change->data;
626 :
627 316130 : if (oldest >= TRANSACTION_ID_BASE)
628 : return 0;
629 316123 : int res = os_cleanup( (sqlstore*) store, ov, oldest);
630 316123 : change->handled = (res)?true:false;
631 316123 : return res>=0?LOG_OK:LOG_ERR;
632 : }
633 :
634 : static int
635 290077 : tc_commit_objectversion(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
636 : {
637 290077 : objectversion *ov = (objectversion*)change->data;
638 290077 : if (commit_ts) {
639 285171 : assert(ov->ts == tr->tid);
640 285171 : ov->ts = commit_ts;
641 285171 : change->committed = commit_ts < TRANSACTION_ID_BASE ? true: false;
642 285171 : (void)oldest;
643 285171 : if (!tr->parent)
644 285164 : change->obj->new = 0;
645 : }
646 : else {
647 4906 : os_rollback(ov, tr->store);
648 : }
649 :
650 290077 : return LOG_OK;
651 : }
652 :
653 : objectset *
654 60236 : os_new(allocator *sa, destroy_fptr destroy, bool temporary, bool unique, bool concurrent, bool nested, sql_store store)
655 : {
656 60236 : assert(!sa);
657 60236 : objectset *os = SA_NEW(sa, objectset);
658 60236 : if (os) {
659 60236 : *os = (objectset) {
660 : .refcnt = ATOMIC_VAR_INIT(1),
661 : .sa = sa,
662 : .destroy = destroy,
663 : .temporary = temporary,
664 : .unique = unique,
665 : .concurrent = concurrent,
666 : .nested = nested,
667 : .store = store
668 : };
669 60236 : os->destroy = destroy;
670 60236 : MT_rwlock_init(&os->rw_lock, "sa_readers_lock");
671 : }
672 :
673 60236 : return os;
674 : }
675 :
676 : objectset *
677 4399 : os_dup(objectset *os)
678 : {
679 4399 : ATOMIC_INC(&os->refcnt);
680 4399 : return os;
681 : }
682 :
683 : void
684 64577 : os_destroy(objectset *os, sql_store store)
685 : {
686 64577 : if (ATOMIC_DEC(&os->refcnt) > 0)
687 : return;
688 60178 : MT_rwlock_destroy(&os->rw_lock);
689 60178 : versionhead* n=os->id_based_h;
690 315697 : while(n) {
691 255519 : objectversion *ov = n->ov;
692 511587 : while(ov) {
693 256068 : objectversion *older = ov->id_based_older;
694 256068 : objectversion_destroy(store, os, ov);
695 256068 : ov = older;
696 : }
697 255519 : versionhead* hn =n->next;
698 255519 : node_destroy(os, store, n);
699 255519 : n = hn;
700 : }
701 :
702 60178 : n=os->name_based_h;
703 315697 : while(n) {
704 255519 : versionhead* hn =n->next;
705 255519 : node_destroy(os, store, n);
706 255519 : n = hn;
707 : }
708 :
709 60178 : if (os->id_map)
710 3581 : hash_destroy(os->id_map);
711 :
712 60178 : if (os->name_map)
713 3555 : hash_destroy(os->name_map);
714 :
715 60178 : if (!os->sa)
716 60178 : _DELETE(os);
717 : }
718 :
719 : static versionhead *
720 17719801 : find_name(objectset *os, const char *name)
721 : {
722 17719801 : lock_reader(os);
723 17825976 : if (os->name_map) {
724 14357134 : int key = hash_key(name);
725 14357134 : sql_hash_e *he = os->name_map->buckets[key&(os->name_map->size-1)];
726 :
727 80034287 : for (; he; he = he->chain) {
728 79675544 : versionhead *n = he->value;
729 :
730 79675544 : if (n && n->ov->b->name && strcmp(n->ov->b->name, name) == 0) {
731 13998391 : unlock_reader(os);
732 13998391 : return n;
733 : }
734 : }
735 358743 : unlock_reader(os);
736 358743 : return NULL;
737 : }
738 :
739 4140268 : for (versionhead *n = os->name_based_h; n; n = n->next) {
740 3849530 : objectversion *ov = n->ov;
741 :
742 : /* check if names match */
743 3849530 : if (name[0] == ov->b->name[0] && strcmp(name, ov->b->name) == 0) {
744 3178104 : unlock_reader(os);
745 3178104 : return n;
746 : }
747 : }
748 :
749 290738 : unlock_reader(os);
750 290738 : return NULL;
751 : }
752 :
753 : static objectversion*
754 18724018 : get_valid_object_name(sql_trans *tr, objectversion *ov)
755 : {
756 18730819 : while(ov) {
757 18730789 : if (ov->ts == tr->tid || (tr->parent && tr_version_of_parent(tr, ov->ts)) || ov->ts < tr->ts)
758 18723836 : return ov;
759 : else {
760 6946 : lock_reader(ov->os);
761 168 : objectversion* name_based_older = ov->name_based_older;
762 168 : unlock_reader(ov->os);
763 168 : ov = name_based_older;
764 : }
765 : }
766 : return ov;
767 : }
768 :
769 : static objectversion*
770 547748 : get_valid_object_id(sql_trans *tr, objectversion *ov)
771 : {
772 547910 : while(ov) {
773 547800 : if (ov->ts == tr->tid || (tr->parent && tr_version_of_parent(tr, ov->ts)) || ov->ts < tr->ts)
774 547637 : return ov;
775 : else {
776 163 : lock_reader(ov->os);
777 163 : objectversion* id_based_older = ov->id_based_older;
778 163 : unlock_reader(ov->os);
779 163 : ov = id_based_older;
780 : }
781 : }
782 : return ov;
783 : }
784 :
785 : static int
786 278964 : os_add_name_based(objectset *os, struct sql_trans *tr, const char *name, objectversion *ov) {
787 278964 : versionhead *name_based_node = NULL;
788 278964 : if (ov->id_based_older && strcmp(ov->id_based_older->b->name, name) == 0)
789 8024 : name_based_node = ov->id_based_older->name_based_head;
790 270940 : else if (os->unique) // Previous name based objectversion is of a different id, so now we do have to perform an extensive look up
791 82370 : name_based_node = find_name(os, name);
792 : // else names are not unique and each id based version head maps to its own name based version head.
793 :
794 90394 : if (name_based_node) {
795 8286 : objectversion *co = name_based_node->ov;
796 8286 : objectversion *oo = get_valid_object_name(tr, co);
797 8286 : if (co != oo) { /* conflict ? */
798 6 : TRC_WARNING(SQL_STORE, "%s" "if (co != oo) { /* conflict ? */", __func__);
799 6 : return -3;
800 : }
801 :
802 8280 : assert(ov != oo); // Time loops are not allowed
803 :
804 8280 : bte state = os_atmc_get_state(oo);
805 8280 : if (state != active) {
806 : // This can only happen if the parent oo was a comitted deleted at some point.
807 270 : assert(state == deleted || state == under_destruction || state == block_destruction);
808 : /* Since our parent oo is comitted deleted objectversion, we might have a conflict with
809 : * another transaction that tries to clean up oo or also wants to add a new objectversion.
810 : */
811 270 : ATOMIC_BASE_TYPE expected_deleted = deleted;
812 270 : if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) {
813 0 : TRC_WARNING(SQL_STORE, "%s: " "if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) { /*conflict with cleaner or write-write conflict*/ ", __func__);
814 0 : return -3; /*conflict with cleaner or write-write conflict*/
815 : }
816 : }
817 :
818 : /* new object with same name within transaction, should have a delete in between */
819 8010 : assert(!(state == active && oo->ts == ov->ts && !(os_atmc_get_state(ov) & deleted)));
820 :
821 8280 : lock_writer(os);
822 8280 : ov->name_based_head = oo->name_based_head;
823 8280 : ov->name_based_older = oo;
824 :
825 8280 : name_based_node->ov = ov;
826 8280 : if (oo) {
827 8280 : oo->name_based_newer = ov;
828 : // if the parent was originally deleted, we restore it to that state.
829 8280 : os_atmc_set_state(oo, state);
830 : }
831 8280 : unlock_writer(os);
832 8280 : return 0;
833 : } else { /* new */
834 270678 : if (os_append_name(os, ov) == NULL)
835 : return -1; // MALLOC_FAIL
836 : return 0;
837 : }
838 : }
839 :
840 : static int
841 278974 : os_add_id_based(objectset *os, struct sql_trans *tr, sqlid id, objectversion *ov) {
842 278974 : versionhead *id_based_node;
843 :
844 278974 : id_based_node = find_id(os, id);
845 :
846 278974 : if (id_based_node) {
847 8058 : objectversion *co = id_based_node->ov;
848 8058 : objectversion *oo = get_valid_object_id(tr, co);
849 8058 : if (co != oo) { /* conflict ? */
850 10 : TRC_WARNING(SQL_STORE, "%s" "if (co != oo) { /* conflict ? */", __func__);
851 10 : return -3;
852 : }
853 :
854 8048 : assert(ov != oo); // Time loops are not allowed
855 :
856 8048 : bte state = os_atmc_get_state(oo);
857 8048 : if (state != active) {
858 : // This can only happen if the parent oo was a comitted deleted at some point.
859 38 : assert(state == deleted || state == under_destruction || state == block_destruction);
860 : /* Since our parent oo is comitted deleted objectversion, we might have a conflict with
861 : * another transaction that tries to clean up oo or also wants to add a new objectversion.
862 : */
863 38 : ATOMIC_BASE_TYPE expected_deleted = deleted;
864 38 : if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) {
865 0 : TRC_WARNING(SQL_STORE, "%s" "if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) { /*conflict with cleaner or write-write conflict*/", __func__);
866 0 : return -3; /*conflict with cleaner or write-write conflict*/
867 : }
868 : }
869 :
870 8048 : lock_writer(os);
871 8048 : ov->id_based_head = oo->id_based_head;
872 8048 : ov->id_based_older = oo;
873 :
874 8048 : id_based_node->ov = ov;
875 8048 : if (oo) {
876 8048 : oo->id_based_newer = ov;
877 : // if the parent was originally deleted, we restore it to that state.
878 8048 : os_atmc_set_state(oo, state);
879 : }
880 8048 : unlock_writer(os);
881 8048 : return 0;
882 : } else { /* new */
883 270916 : if (os_append_id(os, ov) == NULL)
884 : return -1; // MALLOC_FAIL
885 :
886 : return 0;
887 : }
888 : }
889 :
890 : static int /*ok, error (name existed) and conflict (added before) */
891 278975 : os_add_(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
892 : {
893 278975 : int res = 0;
894 278975 : objectversion *ov = SA_NEW(os->sa, objectversion);
895 :
896 278975 : *ov = (objectversion) {
897 278975 : .ts = tr->tid,
898 : .b = b,
899 : .os = os,
900 : };
901 :
902 278975 : if (!os->concurrent && os_has_changes(os, tr)) { /* for object sets without concurrent support, conflict if concurrent changes are there */
903 1 : if (os->destroy)
904 1 : os->destroy(os->store, ov->b);
905 1 : _DELETE(ov);
906 1 : TRC_WARNING(SQL_STORE, "%s" "if (!os->concurrent && os_has_changes(os, tr)) { /* for object sets without concurrent support, conflict if concurrent changes are there */", __func__);
907 1 : return -3; /* conflict */
908 : }
909 :
910 278974 : if ((res = os_add_id_based(os, tr, b->id, ov))) {
911 10 : if (os->destroy)
912 10 : os->destroy(os->store, ov->b);
913 10 : _DELETE(ov);
914 10 : return res;
915 : }
916 :
917 278964 : if ((res = os_add_name_based(os, tr, name, ov))) {
918 6 : trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
919 6 : return res;
920 : }
921 :
922 278958 : if (os->temporary) (void) os_dup(os); // TODO transaction_layer_revamp: embed into refcounting subproject
923 278958 : trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
924 278958 : return res;
925 : }
926 :
927 : int
928 278975 : os_add(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
929 : {
930 278975 : store_lock(tr->store);
931 278975 : int res = os_add_(os, tr, name, b);
932 278975 : store_unlock(tr->store);
933 278975 : return res;
934 : }
935 :
936 : static int
937 11106 : os_del_name_based(objectset *os, struct sql_trans *tr, const char *name, objectversion *ov) {
938 11106 : versionhead *name_based_node = NULL;
939 11106 : if (ov->id_based_older && strcmp(ov->id_based_older->b->name, name) == 0)
940 11106 : name_based_node = ov->id_based_older->name_based_head;
941 0 : else if (os->unique) // Previous name based objectversion is of a different id, so now we do have to perform an extensive look up
942 0 : name_based_node = find_name(os, name);
943 :
944 11106 : if (name_based_node) {
945 11106 : objectversion *co = name_based_node->ov;
946 11106 : objectversion *oo = get_valid_object_name(tr, co);
947 11106 : ov->name_based_head = oo->name_based_head;
948 11106 : if (co != oo) { /* conflict ? */
949 0 : TRC_WARNING(SQL_STORE, "%s: " "if (co != oo) { /* conflict ? */", __func__);
950 0 : return -3;
951 : }
952 11106 : ov->name_based_older = oo;
953 :
954 11106 : lock_writer(os);
955 11106 : if (oo) {
956 11106 : oo->name_based_newer = ov;
957 11106 : assert(os_atmc_get_state(oo) == active);
958 : }
959 11106 : name_based_node->ov = ov;
960 11106 : unlock_writer(os);
961 11106 : return 0;
962 : } else {
963 : /* missing */
964 0 : return -1;
965 : }
966 : }
967 :
968 : static int
969 11108 : os_del_id_based(objectset *os, struct sql_trans *tr, sqlid id, objectversion *ov) {
970 :
971 11108 : versionhead *id_based_node;
972 :
973 11108 : if (ov->name_based_older && ov->name_based_older->b->id == id)
974 0 : id_based_node = ov->name_based_older->id_based_head;
975 : else // Previous id based objectversion is of a different name, so now we do have to perform an extensive look up
976 11108 : id_based_node = find_id(os, id);
977 :
978 11108 : if (id_based_node) {
979 11108 : objectversion *co = id_based_node->ov;
980 11108 : objectversion *oo = get_valid_object_id(tr, co);
981 11108 : ov->id_based_head = oo->id_based_head;
982 11108 : if (co != oo) { /* conflict ? */
983 2 : TRC_WARNING(SQL_STORE, "%s" "if (co != oo) { /* conflict ? */", __func__);
984 2 : return -3;
985 : }
986 11106 : ov->id_based_older = oo;
987 :
988 11106 : lock_writer(os);
989 11106 : if (oo) {
990 11106 : oo->id_based_newer = ov;
991 11106 : assert(os_atmc_get_state(oo) == active);
992 : }
993 11106 : id_based_node->ov = ov;
994 11106 : unlock_writer(os);
995 11106 : return 0;
996 : } else {
997 : /* missing */
998 : return -1;
999 : }
1000 : }
1001 :
1002 : static int
1003 11108 : os_del_(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
1004 : {
1005 11108 : int res = 0;
1006 11108 : objectversion *ov = SA_NEW(os->sa, objectversion);
1007 :
1008 11108 : *ov = (objectversion) {
1009 11108 : .ts = tr->tid,
1010 : .b = b,
1011 : .os = os,
1012 : };
1013 11108 : os_atmc_set_state(ov, deleted);
1014 :
1015 11108 : if ((res = os_del_id_based(os, tr, b->id, ov))) {
1016 2 : if (os->destroy)
1017 2 : os->destroy(os->store, ov->b);
1018 2 : _DELETE(ov);
1019 2 : return res;
1020 : }
1021 :
1022 11106 : if ((res = os_del_name_based(os, tr, name, ov))) {
1023 0 : trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
1024 0 : return res;
1025 : }
1026 :
1027 11106 : if (os->temporary) (void) os_dup(os); // TODO transaction_layer_revamp: embed into refcounting subproject
1028 11106 : trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
1029 11106 : return res;
1030 : }
1031 :
1032 : int
1033 11108 : os_del(objectset *os, sql_trans *tr, const char *name, sql_base *b)
1034 : {
1035 11108 : store_lock(tr->store);
1036 11108 : int res = os_del_(os, tr, name, b);
1037 11108 : store_unlock(tr->store);
1038 11108 : return res;
1039 : }
1040 :
1041 : int
1042 291 : os_size(objectset *os, struct sql_trans *tr)
1043 : {
1044 291 : int cnt = 0;
1045 291 : if (os) {
1046 291 : lock_reader(os);
1047 300 : for(versionhead *n = os->name_based_h; n; n=n->next) {
1048 9 : objectversion *ov = n->ov;
1049 9 : if ((ov=get_valid_object_name(tr, ov)) && os_atmc_get_state(ov) == active)
1050 6 : cnt++;
1051 : }
1052 291 : unlock_reader(os);
1053 : }
1054 291 : return cnt;
1055 : }
1056 :
1057 : int
1058 0 : os_empty(objectset *os, struct sql_trans *tr)
1059 : {
1060 0 : return os_size(os, tr)==0;
1061 : }
1062 :
1063 : int
1064 0 : os_remove(objectset *os, sql_trans *tr, const char *name)
1065 : {
1066 0 : (void) os;
1067 0 : (void) tr;
1068 0 : (void) name;
1069 : // TODO remove entire versionhead corresponding to this name.
1070 :
1071 : // TODO assert os->unique?s
1072 0 : return LOG_OK;
1073 : }
1074 :
1075 : sql_base *
1076 17647958 : os_find_name(objectset *os, struct sql_trans *tr, const char *name)
1077 : {
1078 17647958 : if (!os)
1079 : return NULL;
1080 :
1081 17647956 : assert(os->unique);
1082 17647956 : versionhead *n = find_name(os, name);
1083 :
1084 17794112 : if (n) {
1085 17226695 : objectversion *ov = get_valid_object_name(tr, n->ov);
1086 17211653 : if (ov && os_atmc_get_state(ov) == active)
1087 17211217 : return ov->b;
1088 : }
1089 : return NULL;
1090 : }
1091 :
1092 : sql_base *
1093 148867 : os_find_id(objectset *os, struct sql_trans *tr, sqlid id)
1094 : {
1095 148867 : if (!os)
1096 : return NULL;
1097 148867 : versionhead *n = find_id(os, id);
1098 :
1099 148867 : if (n) {
1100 148361 : objectversion *ov = get_valid_object_id(tr, n->ov);
1101 148361 : if (ov && os_atmc_get_state(ov) == active)
1102 148357 : return ov->b;
1103 : }
1104 : return NULL;
1105 : }
1106 :
1107 : void
1108 2177871 : os_iterator(struct os_iter *oi, struct objectset *os, struct sql_trans *tr, const char *name /*optional*/)
1109 : {
1110 2177871 : *oi = (struct os_iter) {
1111 : .os = os,
1112 : .tr = tr,
1113 : .name = name,
1114 : };
1115 :
1116 2177871 : lock_reader(os);
1117 2179154 : if (name && os->name_map) {
1118 1015463 : int key = hash_key(name);
1119 1015463 : oi->n = (void*)os->name_map->buckets[key&(os->name_map->size-1)];
1120 : } else
1121 1163691 : oi->n = os->name_based_h;
1122 2179154 : unlock_reader(os);
1123 2179173 : }
1124 :
1125 : sql_base *
1126 3967839 : oi_next(struct os_iter *oi)
1127 : {
1128 3967839 : sql_base *b = NULL;
1129 :
1130 3967839 : if (oi->name) {
1131 2854635 : lock_reader(oi->os); /* intentionally outside of while loop */
1132 2854636 : if (oi->os->name_map) {
1133 2489119 : sql_hash_e *he = (void*)oi->n;
1134 :
1135 6216417 : for (; he && !b; he = he->chain) {
1136 3727298 : versionhead *n = he->value;
1137 :
1138 3727298 : if (n->ov->b->name && strcmp(n->ov->b->name, oi->name) == 0) {
1139 1481809 : objectversion *ov = n->ov;
1140 :
1141 1481809 : ov = get_valid_object_name(oi->tr, ov);
1142 1481809 : if (ov && os_atmc_get_state(ov) == active)
1143 1478771 : b = ov->b;
1144 : }
1145 : }
1146 2489119 : oi->n = (void*)he;
1147 : } else {
1148 365517 : versionhead *n = oi->n;
1149 :
1150 409552 : while (n && !b) {
1151 :
1152 44035 : if (n->ov->b->name && strcmp(n->ov->b->name, oi->name) == 0) {
1153 13084 : objectversion *ov = n->ov;
1154 :
1155 13084 : n = oi->n = n->next;
1156 13084 : ov = get_valid_object_name(oi->tr, ov);
1157 13084 : if (ov && os_atmc_get_state(ov) == active)
1158 13084 : b = ov->b;
1159 : } else {
1160 30951 : n = oi->n = n->next;
1161 : }
1162 : }
1163 : }
1164 2854636 : unlock_reader(oi->os);
1165 : } else {
1166 1113204 : versionhead *n = oi->n;
1167 :
1168 1113204 : lock_reader(oi->os); /* intentionally outside of while loop */
1169 2606995 : while (n && !b) {
1170 380045 : objectversion *ov = n->ov;
1171 380045 : n = oi->n = n->next;
1172 :
1173 380045 : ov = get_valid_object_id(oi->tr, ov);
1174 380220 : if (ov && os_atmc_get_state(ov) == active)
1175 355844 : b = ov->b;
1176 : }
1177 1113746 : unlock_reader(oi->os);
1178 : }
1179 3968609 : return b;
1180 : }
1181 :
1182 : bool
1183 3408 : os_obj_intransaction(objectset *os, struct sql_trans *tr, sql_base *b)
1184 : {
1185 3408 : versionhead *n = find_id(os, b->id);
1186 :
1187 3408 : if (n) {
1188 3408 : objectversion *ov = n->ov;
1189 :
1190 3408 : if (ov && os_atmc_get_state(ov) == active && ov->ts == tr->tid)
1191 : return true;
1192 : }
1193 : return false;
1194 : }
1195 :
1196 : /* return true if this object set has changes pending for an other transaction */
1197 : bool
1198 183161 : os_has_changes(objectset *os, struct sql_trans *tr)
1199 : {
1200 183161 : versionhead *n = os->id_based_t;
1201 :
1202 183161 : if (n) {
1203 179002 : objectversion *ov = n->ov;
1204 :
1205 179002 : if (ov && os_atmc_get_state(ov) == active && ov->ts != tr->tid && ov->ts > TRANSACTION_ID_BASE)
1206 : return true;
1207 : }
1208 : return false;
1209 : }
|