Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 :
17 : #define ORDERIDX_VERSION ((oid) 3)
18 :
19 : #ifdef PERSISTENTIDX
20 : static void
21 6 : BATidxsync(void *arg)
22 : {
23 6 : BAT *b = arg;
24 6 : Heap *hp;
25 6 : int fd;
26 6 : lng t0 = GDKusec();
27 6 : const char *failed = " failed";
28 :
29 :
30 6 : MT_lock_set(&b->batIdxLock);
31 6 : if ((hp = b->torderidx) != NULL) {
32 6 : if (HEAPsave(hp, hp->filename, NULL, true, hp->free, NULL) == GDK_SUCCEED) {
33 6 : if (hp->storage == STORE_MEM) {
34 6 : if ((fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) >= 0) {
35 6 : ((oid *) hp->base)[0] |= (oid) 1 << 24;
36 6 : if (write(fd, hp->base, SIZEOF_OID) >= 0) {
37 6 : failed = ""; /* not failed */
38 6 : if (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)) {
39 : #if defined(NATIVE_WIN32)
40 : _commit(fd);
41 : #elif defined(HAVE_FDATASYNC)
42 0 : fdatasync(fd);
43 : #elif defined(HAVE_FSYNC)
44 : fsync(fd);
45 : #endif
46 : }
47 6 : hp->dirty = false;
48 : } else {
49 0 : perror("write hash");
50 : }
51 6 : close(fd);
52 : }
53 : } else {
54 0 : ((oid *) hp->base)[0] |= (oid) 1 << 24;
55 0 : if (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) &&
56 0 : MT_msync(hp->base, SIZEOF_OID) < 0) {
57 0 : ((oid *) hp->base)[0] &= ~((oid) 1 << 24);
58 : } else {
59 0 : hp->dirty = false;
60 0 : failed = ""; /* not failed */
61 : }
62 : }
63 6 : TRC_DEBUG(ACCELERATOR, "BATidxsync(%s): orderidx persisted"
64 : " (" LLFMT " usec)%s\n",
65 : BATgetId(b), GDKusec() - t0, failed);
66 : }
67 : }
68 6 : MT_lock_unset(&b->batIdxLock);
69 6 : BBPunfix(b->batCacheid);
70 6 : }
71 : #endif
72 :
73 : /* return TRUE if we have a orderidx on the tail, even if we need to read
74 : * one from disk */
75 : bool
76 1786579 : BATcheckorderidx(BAT *b)
77 : {
78 1786579 : bool ret;
79 1786579 : lng t = GDKusec();
80 :
81 1786559 : if (b == NULL)
82 : return false;
83 1786559 : MT_lock_set(&b->batIdxLock);
84 1786534 : if (b->torderidx == (Heap *) 1) {
85 0 : Heap *hp;
86 0 : const char *nme = BBP_physical(b->batCacheid);
87 0 : int fd;
88 :
89 0 : assert(!GDKinmemory(b->theap->farmid));
90 0 : b->torderidx = NULL;
91 0 : if ((hp = GDKzalloc(sizeof(*hp))) != NULL &&
92 0 : (hp->farmid = BBPselectfarm(b->batRole, b->ttype, orderidxheap)) >= 0) {
93 0 : strconcat_len(hp->filename,
94 : sizeof(hp->filename),
95 : nme, ".torderidx", NULL);
96 0 : hp->storage = hp->newstorage = STORE_INVALID;
97 :
98 : /* check whether a persisted orderidx can be found */
99 0 : if ((fd = GDKfdlocate(hp->farmid, nme, "rb+", "torderidx")) >= 0) {
100 0 : struct stat st;
101 0 : oid hdata[ORDERIDXOFF];
102 :
103 0 : if (read(fd, hdata, sizeof(hdata)) == sizeof(hdata) &&
104 0 : hdata[0] == (
105 : #ifdef PERSISTENTIDX
106 : ((oid) 1 << 24) |
107 : #endif
108 0 : ORDERIDX_VERSION) &&
109 0 : hdata[1] == (oid) BATcount(b) &&
110 0 : (hdata[2] == 0 || hdata[2] == 1) &&
111 0 : fstat(fd, &st) == 0 &&
112 0 : st.st_size >= (off_t) (hp->size = hp->free = (ORDERIDXOFF + hdata[1]) * SIZEOF_OID) &&
113 0 : HEAPload(hp, nme, "torderidx", false) == GDK_SUCCEED) {
114 0 : close(fd);
115 0 : ATOMIC_INIT(&hp->refs, 1);
116 0 : b->torderidx = hp;
117 0 : hp->hasfile = true;
118 0 : TRC_DEBUG(ACCELERATOR, "BATcheckorderidx(" ALGOBATFMT "): reusing persisted orderidx\n", ALGOBATPAR(b));
119 0 : MT_lock_unset(&b->batIdxLock);
120 0 : return true;
121 : }
122 0 : close(fd);
123 : /* unlink unusable file */
124 0 : GDKunlink(hp->farmid, BATDIR, nme, "torderidx");
125 0 : hp->hasfile = false;
126 : }
127 : }
128 0 : GDKfree(hp);
129 0 : GDKclrerr(); /* we're not currently interested in errors */
130 : }
131 1786534 : MT_lock_unset(&b->batIdxLock);
132 :
133 1786574 : ret = b->torderidx != NULL;
134 1786574 : if (ret)
135 142 : TRC_DEBUG(ACCELERATOR, "BATcheckorderidx(" ALGOBATFMT "): already has orderidx, waited " LLFMT " usec\n", ALGOBATPAR(b), GDKusec() - t);
136 : return ret;
137 : }
138 :
139 : /* create the heap for an order index; returns NULL on failure */
140 : Heap *
141 4441 : createOIDXheap(BAT *b, bool stable)
142 : {
143 4441 : Heap *m;
144 4441 : oid *restrict mv;
145 :
146 4441 : if ((m = GDKmalloc(sizeof(Heap))) == NULL)
147 : return NULL;
148 8882 : *m = (Heap) {
149 4441 : .farmid = BBPselectfarm(b->batRole, b->ttype, orderidxheap),
150 4441 : .parentid = b->batCacheid,
151 : .dirty = true,
152 : };
153 4441 : strconcat_len(m->filename, sizeof(m->filename),
154 4441 : BBP_physical(b->batCacheid), ".torderidx", NULL);
155 8882 : if (m->farmid < 0 ||
156 4441 : HEAPalloc(m, BATcount(b) + ORDERIDXOFF, SIZEOF_OID) != GDK_SUCCEED) {
157 0 : GDKfree(m);
158 0 : return NULL;
159 : }
160 4441 : m->free = (BATcount(b) + ORDERIDXOFF) * SIZEOF_OID;
161 :
162 4441 : mv = (oid *) m->base;
163 4441 : *mv++ = ORDERIDX_VERSION;
164 4441 : *mv++ = (oid) BATcount(b);
165 4441 : *mv++ = (oid) stable;
166 4441 : return m;
167 : }
168 :
169 : /* maybe persist the order index heap */
170 : void
171 4441 : persistOIDX(BAT *b)
172 : {
173 : #ifdef PERSISTENTIDX
174 4441 : if ((BBP_status(b->batCacheid) & BBPEXISTING) &&
175 8 : b->batInserted == b->batCount &&
176 12 : !b->theap->dirty &&
177 12 : !GDKinmemory(b->theap->farmid)) {
178 6 : MT_Id tid;
179 6 : BBPfix(b->batCacheid);
180 6 : char name[MT_NAME_LEN];
181 6 : snprintf(name, sizeof(name), "oidxsync%d", b->batCacheid);
182 6 : if (MT_create_thread(&tid, BATidxsync, b,
183 : MT_THR_DETACHED, name) < 0)
184 0 : BBPunfix(b->batCacheid);
185 : } else
186 4435 : TRC_DEBUG(ACCELERATOR, "persistOIDX(" ALGOBATFMT "): NOT persisting order index\n", ALGOBATPAR(b));
187 : #else
188 : (void) b;
189 : #endif
190 4441 : }
191 :
192 : gdk_return
193 72 : BATorderidx(BAT *b, bool stable)
194 : {
195 72 : if (BATcheckorderidx(b))
196 : return GDK_SUCCEED;
197 72 : if (!BATtdense(b)) {
198 72 : BAT *on;
199 72 : MT_thread_setalgorithm("create order index");
200 72 : TRC_DEBUG(ACCELERATOR, "BATorderidx(" ALGOBATFMT ",%d) create index\n", ALGOBATPAR(b), stable);
201 72 : if (BATsort(NULL, &on, NULL, b, NULL, NULL, false, false, stable) != GDK_SUCCEED)
202 0 : return GDK_FAIL;
203 72 : assert(BATcount(b) == BATcount(on));
204 72 : if (BATtdense(on)) {
205 : /* if the order bat is dense, the input was
206 : * sorted and we don't need an order index */
207 0 : MT_lock_set(&b->theaplock);
208 0 : assert(!b->tnosorted);
209 0 : if (!b->tsorted) {
210 0 : b->tsorted = true;
211 0 : b->tnosorted = 0;
212 : }
213 0 : MT_lock_unset(&b->theaplock);
214 : } else {
215 : /* BATsort quite possibly already created the
216 : * order index, but just to be sure... */
217 72 : MT_lock_set(&b->batIdxLock);
218 72 : if (b->torderidx == NULL) {
219 0 : Heap *m;
220 0 : if ((m = createOIDXheap(b, stable)) == NULL) {
221 0 : MT_lock_unset(&b->batIdxLock);
222 0 : return GDK_FAIL;
223 : }
224 0 : memcpy((oid *) m->base + ORDERIDXOFF, Tloc(on, 0), BATcount(on) * sizeof(oid));
225 0 : ATOMIC_INIT(&m->refs, 1);
226 0 : b->torderidx = m;
227 0 : persistOIDX(b);
228 : }
229 72 : MT_lock_unset(&b->batIdxLock);
230 : }
231 72 : BBPunfix(on->batCacheid);
232 : }
233 : return GDK_SUCCEED;
234 : }
235 :
236 : #define BINARY_MERGE(TYPE) \
237 : do { \
238 : TYPE *v = (TYPE *) bi.base; \
239 : if (p0 < q0 && p1 < q1) { \
240 : if (v[*p0 - b->hseqbase] <= v[*p1 - b->hseqbase]) { \
241 : *mv++ = *p0++; \
242 : } else { \
243 : *mv++ = *p1++; \
244 : } \
245 : } else if (p0 < q0) { \
246 : assert(p1 == q1); \
247 : *mv++ = *p0++; \
248 : } else if (p1 < q1) { \
249 : assert(p0 == q0); \
250 : *mv++ = *p1++; \
251 : } else { \
252 : assert(p0 == q0 && p1 == q1); \
253 : break; \
254 : } \
255 : while (p0 < q0 && p1 < q1) { \
256 : if (v[*p0 - b->hseqbase] <= v[*p1 - b->hseqbase]) { \
257 : *mv++ = *p0++; \
258 : } else { \
259 : *mv++ = *p1++; \
260 : } \
261 : } \
262 : while (p0 < q0) { \
263 : *mv++ = *p0++; \
264 : } \
265 : while (p1 < q1) { \
266 : *mv++ = *p1++; \
267 : } \
268 : } while(0)
269 :
270 : #define swap(X,Y,TMP) (TMP)=(X);(X)=(Y);(Y)=(TMP)
271 :
272 : #define left_child(X) (2*(X)+1)
273 : #define right_child(X) (2*(X)+2)
274 :
275 : #define HEAPIFY(X) \
276 : do { \
277 : int cur, min = X, chld; \
278 : do { \
279 : cur = min; \
280 : if ((chld = left_child(cur)) < n_ar && \
281 : (minhp[chld] < minhp[min] || \
282 : (minhp[chld] == minhp[min] && \
283 : *p[cur] < *p[min]))) { \
284 : min = chld; \
285 : } \
286 : if ((chld = right_child(cur)) < n_ar && \
287 : (minhp[chld] < minhp[min] || \
288 : (minhp[chld] == minhp[min] && \
289 : *p[cur] < *p[min]))) { \
290 : min = chld; \
291 : } \
292 : if (min != cur) { \
293 : swap(minhp[cur], minhp[min], t); \
294 : swap(p[cur], p[min], t_oid); \
295 : swap(q[cur], q[min], t_oid); \
296 : } \
297 : } while (cur != min); \
298 : } while (0)
299 :
300 : #define NWAY_MERGE(TYPE) \
301 : do { \
302 : TYPE *minhp, t; \
303 : TYPE *v = (TYPE *) bi.base; \
304 : if ((minhp = GDKmalloc(sizeof(TYPE)*n_ar)) == NULL) { \
305 : goto bailout; \
306 : } \
307 : /* init min heap */ \
308 : for (i = 0; i < n_ar; i++) { \
309 : minhp[i] = v[*p[i] - b->hseqbase]; \
310 : } \
311 : for (i = n_ar/2; i >=0 ; i--) { \
312 : HEAPIFY(i); \
313 : } \
314 : /* merge */ \
315 : *mv++ = *(p[0])++; \
316 : if (p[0] < q[0]) { \
317 : minhp[0] = v[*p[0] - b->hseqbase]; \
318 : HEAPIFY(0); \
319 : } else { \
320 : swap(minhp[0], minhp[n_ar-1], t); \
321 : swap(p[0], p[n_ar-1], t_oid); \
322 : swap(q[0], q[n_ar-1], t_oid); \
323 : n_ar--; \
324 : HEAPIFY(0); \
325 : } \
326 : while (n_ar > 1) { \
327 : *mv++ = *(p[0])++; \
328 : if (p[0] < q[0]) { \
329 : minhp[0] = v[*p[0] - b->hseqbase]; \
330 : HEAPIFY(0); \
331 : } else { \
332 : swap(minhp[0], minhp[n_ar-1], t); \
333 : swap(p[0], p[n_ar-1], t_oid); \
334 : swap(q[0], q[n_ar-1], t_oid); \
335 : n_ar--; \
336 : HEAPIFY(0); \
337 : } \
338 : } \
339 : while (p[0] < q[0]) { \
340 : *mv++ = *(p[0])++; \
341 : } \
342 : GDKfree(minhp); \
343 : } while (0)
344 :
345 : gdk_return
346 0 : GDKmergeidx(BAT *b, BAT**a, int n_ar)
347 : {
348 0 : Heap *m;
349 0 : int i;
350 0 : oid *restrict mv;
351 0 : const char *nme = BBP_physical(b->batCacheid);
352 :
353 0 : if (BATcheckorderidx(b))
354 : return GDK_SUCCEED;
355 0 : switch (ATOMbasetype(b->ttype)) {
356 : case TYPE_bte:
357 : case TYPE_sht:
358 : case TYPE_int:
359 : case TYPE_lng:
360 : #ifdef HAVE_HGE
361 : case TYPE_hge:
362 : #endif
363 : case TYPE_flt:
364 : case TYPE_dbl:
365 0 : break;
366 0 : default:
367 0 : GDKerror("type %s not supported.\n", ATOMname(b->ttype));
368 0 : return GDK_FAIL;
369 : }
370 0 : TRC_DEBUG(ACCELERATOR, "GDKmergeidx(" ALGOBATFMT ") create index\n", ALGOBATPAR(b));
371 0 : BATiter bi = bat_iterator(b);
372 0 : MT_lock_set(&b->batIdxLock);
373 0 : if (b->torderidx) {
374 0 : MT_lock_unset(&b->batIdxLock);
375 0 : bat_iterator_end(&bi);
376 0 : return GDK_SUCCEED;
377 : }
378 0 : if ((m = GDKmalloc(sizeof(Heap))) == NULL) {
379 0 : MT_lock_unset(&b->batIdxLock);
380 0 : bat_iterator_end(&bi);
381 0 : return GDK_FAIL;
382 : }
383 0 : *m = (Heap) {
384 0 : .farmid = BBPselectfarm(b->batRole, bi.type, orderidxheap),
385 0 : .parentid = b->batCacheid,
386 : .dirty = true,
387 : };
388 0 : strconcat_len(m->filename, sizeof(m->filename),
389 : nme, ".torderidx", NULL);
390 0 : if (m->farmid < 0 ||
391 0 : HEAPalloc(m, BATcount(b) + ORDERIDXOFF, SIZEOF_OID) != GDK_SUCCEED) {
392 0 : GDKfree(m);
393 0 : MT_lock_unset(&b->batIdxLock);
394 0 : bat_iterator_end(&bi);
395 0 : return GDK_FAIL;
396 : }
397 0 : m->free = (BATcount(b) + ORDERIDXOFF) * SIZEOF_OID;
398 :
399 0 : mv = (oid *) m->base;
400 0 : *mv++ = ORDERIDX_VERSION;
401 0 : *mv++ = (oid) BATcount(b);
402 : /* all participating indexes must be stable for the combined
403 : * index to be stable */
404 0 : *mv = 1;
405 0 : for (i = 0; i < n_ar; i++) {
406 0 : if ((*mv &= ((const oid *) a[i]->torderidx->base)[2]) == 0)
407 : break;
408 : }
409 0 : mv++;
410 :
411 0 : if (n_ar == 1) {
412 : /* One oid order bat, nothing to merge */
413 0 : assert(BATcount(a[0]) == BATcount(b));
414 0 : assert((VIEWtparent(a[0]) == b->batCacheid ||
415 : VIEWtparent(a[0]) == VIEWtparent(b)) &&
416 : a[0]->torderidx);
417 0 : memcpy(mv, (const oid *) a[0]->torderidx->base + ORDERIDXOFF,
418 : BATcount(a[0]) * SIZEOF_OID);
419 0 : } else if (n_ar == 2) {
420 : /* sort merge with 1 comparison per BUN */
421 0 : const oid *restrict p0, *restrict p1, *q0, *q1;
422 0 : assert(BATcount(a[0]) + BATcount(a[1]) == BATcount(b));
423 0 : assert((VIEWtparent(a[0]) == b->batCacheid ||
424 : VIEWtparent(a[0]) == VIEWtparent(b)) &&
425 : a[0]->torderidx);
426 0 : assert((VIEWtparent(a[1]) == b->batCacheid ||
427 : VIEWtparent(a[1]) == VIEWtparent(b)) &&
428 : a[1]->torderidx);
429 0 : p0 = (const oid *) a[0]->torderidx->base + ORDERIDXOFF;
430 0 : p1 = (const oid *) a[1]->torderidx->base + ORDERIDXOFF;
431 0 : q0 = p0 + BATcount(a[0]);
432 0 : q1 = p1 + BATcount(a[1]);
433 :
434 0 : switch (ATOMbasetype(bi.type)) {
435 0 : case TYPE_bte: BINARY_MERGE(bte); break;
436 0 : case TYPE_sht: BINARY_MERGE(sht); break;
437 0 : case TYPE_int: BINARY_MERGE(int); break;
438 0 : case TYPE_lng: BINARY_MERGE(lng); break;
439 : #ifdef HAVE_HGE
440 0 : case TYPE_hge: BINARY_MERGE(hge); break;
441 : #endif
442 0 : case TYPE_flt: BINARY_MERGE(flt); break;
443 0 : case TYPE_dbl: BINARY_MERGE(dbl); break;
444 : default:
445 : /* TODO: support strings, date, timestamps etc. */
446 0 : assert(0);
447 : HEAPfree(m, true);
448 : GDKfree(m);
449 : MT_lock_unset(&b->batIdxLock);
450 : bat_iterator_end(&bi);
451 : return GDK_FAIL;
452 : }
453 :
454 : } else {
455 : /* use min-heap */
456 0 : oid **p, **q, *t_oid;
457 :
458 0 : p = GDKmalloc(n_ar*sizeof(oid *));
459 0 : q = GDKmalloc(n_ar*sizeof(oid *));
460 0 : if (p == NULL || q == NULL) {
461 0 : bailout:
462 0 : GDKfree(p);
463 0 : GDKfree(q);
464 0 : HEAPfree(m, true);
465 0 : GDKfree(m);
466 0 : MT_lock_unset(&b->batIdxLock);
467 0 : bat_iterator_end(&bi);
468 0 : return GDK_FAIL;
469 : }
470 0 : for (i = 0; i < n_ar; i++) {
471 0 : assert((VIEWtparent(a[i]) == b->batCacheid ||
472 : VIEWtparent(a[i]) == VIEWtparent(b)) &&
473 : a[i]->torderidx);
474 0 : p[i] = (oid *) a[i]->torderidx->base + ORDERIDXOFF;
475 0 : q[i] = p[i] + BATcount(a[i]);
476 : }
477 :
478 0 : switch (ATOMbasetype(bi.type)) {
479 0 : case TYPE_bte: NWAY_MERGE(bte); break;
480 0 : case TYPE_sht: NWAY_MERGE(sht); break;
481 0 : case TYPE_int: NWAY_MERGE(int); break;
482 0 : case TYPE_lng: NWAY_MERGE(lng); break;
483 : #ifdef HAVE_HGE
484 0 : case TYPE_hge: NWAY_MERGE(hge); break;
485 : #endif
486 0 : case TYPE_flt: NWAY_MERGE(flt); break;
487 0 : case TYPE_dbl: NWAY_MERGE(dbl); break;
488 : case TYPE_void:
489 : case TYPE_str:
490 : case TYPE_ptr:
491 : default:
492 : /* TODO: support strings, date, timestamps etc. */
493 0 : assert(0);
494 : goto bailout;
495 : }
496 0 : GDKfree(p);
497 0 : GDKfree(q);
498 : }
499 :
500 0 : ATOMIC_INIT(&m->refs, 1);
501 0 : b->torderidx = m;
502 : #ifdef PERSISTENTIDX
503 0 : if ((BBP_status(b->batCacheid) & BBPEXISTING) &&
504 0 : b->batInserted == b->batCount) {
505 0 : MT_Id tid;
506 0 : BBPfix(b->batCacheid);
507 0 : char name[MT_NAME_LEN];
508 0 : snprintf(name, sizeof(name), "oidxsync%d", b->batCacheid);
509 0 : if (MT_create_thread(&tid, BATidxsync, b,
510 : MT_THR_DETACHED, name) < 0)
511 0 : BBPunfix(b->batCacheid);
512 : } else
513 0 : TRC_DEBUG(ACCELERATOR, "GDKmergeidx(%s): NOT persisting index\n", BATgetId(b));
514 : #endif
515 :
516 0 : MT_lock_unset(&b->batIdxLock);
517 0 : bat_iterator_end(&bi);
518 0 : return GDK_SUCCEED;
519 : }
520 :
521 : void
522 450631 : OIDXfree(BAT *b)
523 : {
524 450631 : if (b && b->torderidx) {
525 2 : Heap *hp;
526 :
527 2 : MT_lock_set(&b->batIdxLock);
528 2 : if ((hp = b->torderidx) != NULL && hp != (Heap *) 1) {
529 2 : if (GDKinmemory(b->theap->farmid)) {
530 0 : b->torderidx = NULL;
531 0 : HEAPdecref(hp, true);
532 : } else {
533 2 : b->torderidx = (Heap *) 1;
534 2 : HEAPdecref(hp, false);
535 : }
536 : }
537 2 : MT_lock_unset(&b->batIdxLock);
538 : }
539 450631 : }
540 :
541 : void
542 80747032 : OIDXdestroy(BAT *b)
543 : {
544 80747032 : if (b && b->torderidx) {
545 4439 : Heap *hp;
546 :
547 4439 : MT_lock_set(&b->batIdxLock);
548 4438 : hp = b->torderidx;
549 4438 : b->torderidx = NULL;
550 4438 : MT_lock_unset(&b->batIdxLock);
551 4439 : if (hp == (Heap *) 1) {
552 0 : GDKunlink(BBPselectfarm(b->batRole, b->ttype, orderidxheap),
553 : BATDIR,
554 0 : BBP_physical(b->batCacheid),
555 : "torderidx");
556 4439 : } else if (hp != NULL) {
557 4439 : HEAPdecref(hp, true);
558 : }
559 : }
560 80747032 : }
|