Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 :
17 : #define ORDERIDX_VERSION ((oid) 3)
18 :
19 : #ifdef PERSISTENTIDX
20 : static void
21 6 : BATidxsync(void *arg)
22 : {
23 6 : BAT *b = arg;
24 6 : Heap *hp;
25 6 : int fd;
26 6 : lng t0 = GDKusec();
27 6 : const char *failed = " failed";
28 :
29 :
30 6 : MT_lock_set(&b->batIdxLock);
31 6 : if ((hp = b->torderidx) != NULL) {
32 6 : if (HEAPsave(hp, hp->filename, NULL, true, hp->free, NULL) == GDK_SUCCEED) {
33 6 : if (hp->storage == STORE_MEM) {
34 6 : if ((fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) >= 0) {
35 6 : ((oid *) hp->base)[0] |= (oid) 1 << 24;
36 6 : if (write(fd, hp->base, SIZEOF_OID) >= 0) {
37 6 : failed = ""; /* not failed */
38 6 : if (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)) {
39 : #if defined(NATIVE_WIN32)
40 : _commit(fd);
41 : #elif defined(HAVE_FDATASYNC)
42 0 : fdatasync(fd);
43 : #elif defined(HAVE_FSYNC)
44 : fsync(fd);
45 : #endif
46 : }
47 6 : hp->dirty = false;
48 : } else {
49 0 : perror("write hash");
50 : }
51 6 : close(fd);
52 : }
53 : } else {
54 0 : ((oid *) hp->base)[0] |= (oid) 1 << 24;
55 0 : if (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) &&
56 0 : MT_msync(hp->base, SIZEOF_OID) < 0) {
57 0 : ((oid *) hp->base)[0] &= ~((oid) 1 << 24);
58 : } else {
59 0 : hp->dirty = false;
60 0 : failed = ""; /* not failed */
61 : }
62 : }
63 6 : TRC_DEBUG(ACCELERATOR, "BATidxsync(%s): orderidx persisted"
64 : " (" LLFMT " usec)%s\n",
65 : BATgetId(b), GDKusec() - t0, failed);
66 : }
67 : }
68 6 : MT_lock_unset(&b->batIdxLock);
69 6 : BBPunfix(b->batCacheid);
70 6 : }
71 : #endif
72 :
73 : /* return TRUE if we have a orderidx on the tail, even if we need to read
74 : * one from disk */
75 : bool
76 1829985 : BATcheckorderidx(BAT *b)
77 : {
78 1829985 : bool ret;
79 1829985 : lng t = GDKusec();
80 :
81 1830211 : if (b == NULL)
82 : return false;
83 1830211 : MT_lock_set(&b->batIdxLock);
84 1830291 : if (b->torderidx == (Heap *) 1) {
85 0 : Heap *hp;
86 0 : const char *nme = BBP_physical(b->batCacheid);
87 0 : int fd;
88 :
89 0 : assert(!GDKinmemory(b->theap->farmid));
90 0 : b->torderidx = NULL;
91 0 : if ((hp = GDKzalloc(sizeof(*hp))) != NULL &&
92 0 : (hp->farmid = BBPselectfarm(b->batRole, b->ttype, orderidxheap)) >= 0) {
93 0 : strconcat_len(hp->filename,
94 : sizeof(hp->filename),
95 : nme, ".torderidx", NULL);
96 0 : hp->storage = hp->newstorage = STORE_INVALID;
97 :
98 : /* check whether a persisted orderidx can be found */
99 0 : if ((fd = GDKfdlocate(hp->farmid, nme, "rb+", "torderidx")) >= 0) {
100 0 : struct stat st;
101 0 : oid hdata[ORDERIDXOFF];
102 :
103 0 : if (read(fd, hdata, sizeof(hdata)) == sizeof(hdata) &&
104 0 : hdata[0] == (
105 : #ifdef PERSISTENTIDX
106 : ((oid) 1 << 24) |
107 : #endif
108 0 : ORDERIDX_VERSION) &&
109 0 : hdata[1] == (oid) BATcount(b) &&
110 0 : (hdata[2] == 0 || hdata[2] == 1) &&
111 0 : fstat(fd, &st) == 0 &&
112 0 : st.st_size >= (off_t) (hp->size = hp->free = (ORDERIDXOFF + hdata[1]) * SIZEOF_OID) &&
113 0 : HEAPload(hp, nme, "torderidx", false) == GDK_SUCCEED) {
114 0 : close(fd);
115 0 : ATOMIC_INIT(&hp->refs, 1);
116 0 : b->torderidx = hp;
117 0 : hp->hasfile = true;
118 0 : TRC_DEBUG(ACCELERATOR, "BATcheckorderidx(" ALGOBATFMT "): reusing persisted orderidx\n", ALGOBATPAR(b));
119 0 : MT_lock_unset(&b->batIdxLock);
120 0 : return true;
121 : }
122 0 : close(fd);
123 : /* unlink unusable file */
124 0 : GDKunlink(hp->farmid, BATDIR, nme, "torderidx");
125 0 : hp->hasfile = false;
126 : }
127 : }
128 0 : GDKfree(hp);
129 0 : GDKclrerr(); /* we're not currently interested in errors */
130 : }
131 1830291 : MT_lock_unset(&b->batIdxLock);
132 :
133 1830619 : ret = b->torderidx != NULL;
134 1830619 : if (ret)
135 115 : TRC_DEBUG(ACCELERATOR, "BATcheckorderidx(" ALGOBATFMT "): already has orderidx, waited " LLFMT " usec\n", ALGOBATPAR(b), GDKusec() - t);
136 : return ret;
137 : }
138 :
139 : /* create the heap for an order index; returns NULL on failure */
140 : Heap *
141 4422 : createOIDXheap(BAT *b, bool stable)
142 : {
143 4422 : Heap *m;
144 4422 : oid *restrict mv;
145 :
146 4422 : if ((m = GDKmalloc(sizeof(Heap))) == NULL)
147 : return NULL;
148 8850 : *m = (Heap) {
149 4425 : .farmid = BBPselectfarm(b->batRole, b->ttype, orderidxheap),
150 4425 : .parentid = b->batCacheid,
151 : .dirty = true,
152 : .refs = ATOMIC_VAR_INIT(1),
153 : };
154 4425 : strconcat_len(m->filename, sizeof(m->filename),
155 4425 : BBP_physical(b->batCacheid), ".torderidx", NULL);
156 8849 : if (m->farmid < 0 ||
157 4424 : HEAPalloc(m, BATcount(b) + ORDERIDXOFF, SIZEOF_OID) != GDK_SUCCEED) {
158 0 : GDKfree(m);
159 0 : return NULL;
160 : }
161 4425 : m->free = (BATcount(b) + ORDERIDXOFF) * SIZEOF_OID;
162 :
163 4425 : mv = (oid *) m->base;
164 4425 : *mv++ = ORDERIDX_VERSION;
165 4425 : *mv++ = (oid) BATcount(b);
166 4425 : *mv++ = (oid) stable;
167 4425 : return m;
168 : }
169 :
170 : /* maybe persist the order index heap */
171 : void
172 4423 : persistOIDX(BAT *b)
173 : {
174 : #ifdef PERSISTENTIDX
175 4423 : if ((BBP_status(b->batCacheid) & BBPEXISTING) &&
176 8 : b->batInserted == b->batCount &&
177 12 : !b->theap->dirty &&
178 12 : !GDKinmemory(b->theap->farmid)) {
179 6 : MT_Id tid;
180 6 : BBPfix(b->batCacheid);
181 6 : char name[MT_NAME_LEN];
182 6 : snprintf(name, sizeof(name), "oidxsync%d", b->batCacheid);
183 6 : if (MT_create_thread(&tid, BATidxsync, b,
184 : MT_THR_DETACHED, name) < 0)
185 0 : BBPunfix(b->batCacheid);
186 : } else
187 4417 : TRC_DEBUG(ACCELERATOR, "persistOIDX(" ALGOBATFMT "): NOT persisting order index\n", ALGOBATPAR(b));
188 : #else
189 : (void) b;
190 : #endif
191 4423 : }
192 :
193 : gdk_return
194 73 : BATorderidx(BAT *b, bool stable)
195 : {
196 73 : if (BATcheckorderidx(b))
197 : return GDK_SUCCEED;
198 73 : if (!BATtdense(b)) {
199 73 : BAT *on;
200 73 : MT_thread_setalgorithm("create order index");
201 73 : TRC_DEBUG(ACCELERATOR, "BATorderidx(" ALGOBATFMT ",%d) create index\n", ALGOBATPAR(b), stable);
202 73 : if (BATsort(NULL, &on, NULL, b, NULL, NULL, false, false, stable) != GDK_SUCCEED)
203 0 : return GDK_FAIL;
204 73 : assert(BATcount(b) == BATcount(on));
205 73 : if (BATtdense(on)) {
206 : /* if the order bat is dense, the input was
207 : * sorted and we don't need an order index */
208 0 : MT_lock_set(&b->theaplock);
209 0 : assert(!b->tnosorted);
210 0 : if (!b->tsorted) {
211 0 : b->tsorted = true;
212 0 : b->tnosorted = 0;
213 : }
214 0 : MT_lock_unset(&b->theaplock);
215 : } else {
216 : /* BATsort quite possibly already created the
217 : * order index, but just to be sure... */
218 73 : MT_lock_set(&b->batIdxLock);
219 73 : if (b->torderidx == NULL) {
220 0 : Heap *m;
221 0 : if ((m = createOIDXheap(b, stable)) == NULL) {
222 0 : MT_lock_unset(&b->batIdxLock);
223 0 : return GDK_FAIL;
224 : }
225 0 : memcpy((oid *) m->base + ORDERIDXOFF, Tloc(on, 0), BATcount(on) * sizeof(oid));
226 0 : b->torderidx = m;
227 0 : persistOIDX(b);
228 : }
229 73 : MT_lock_unset(&b->batIdxLock);
230 : }
231 73 : BBPunfix(on->batCacheid);
232 : }
233 : return GDK_SUCCEED;
234 : }
235 :
236 : #define BINARY_MERGE(TYPE) \
237 : do { \
238 : TYPE *v = (TYPE *) bi.base; \
239 : if (p0 < q0 && p1 < q1) { \
240 : if (v[*p0 - b->hseqbase] <= v[*p1 - b->hseqbase]) { \
241 : *mv++ = *p0++; \
242 : } else { \
243 : *mv++ = *p1++; \
244 : } \
245 : } else if (p0 < q0) { \
246 : assert(p1 == q1); \
247 : *mv++ = *p0++; \
248 : } else if (p1 < q1) { \
249 : assert(p0 == q0); \
250 : *mv++ = *p1++; \
251 : } else { \
252 : assert(p0 == q0 && p1 == q1); \
253 : break; \
254 : } \
255 : while (p0 < q0 && p1 < q1) { \
256 : if (v[*p0 - b->hseqbase] <= v[*p1 - b->hseqbase]) { \
257 : *mv++ = *p0++; \
258 : } else { \
259 : *mv++ = *p1++; \
260 : } \
261 : } \
262 : while (p0 < q0) { \
263 : *mv++ = *p0++; \
264 : } \
265 : while (p1 < q1) { \
266 : *mv++ = *p1++; \
267 : } \
268 : } while(0)
269 :
270 : #define swap(X,Y,TMP) (TMP)=(X);(X)=(Y);(Y)=(TMP)
271 :
272 : #define left_child(X) (2*(X)+1)
273 : #define right_child(X) (2*(X)+2)
274 :
275 : #define HEAPIFY(X) \
276 : do { \
277 : int cur, min = X, chld; \
278 : do { \
279 : cur = min; \
280 : if ((chld = left_child(cur)) < n_ar && \
281 : (minhp[chld] < minhp[min] || \
282 : (minhp[chld] == minhp[min] && \
283 : *p[cur] < *p[min]))) { \
284 : min = chld; \
285 : } \
286 : if ((chld = right_child(cur)) < n_ar && \
287 : (minhp[chld] < minhp[min] || \
288 : (minhp[chld] == minhp[min] && \
289 : *p[cur] < *p[min]))) { \
290 : min = chld; \
291 : } \
292 : if (min != cur) { \
293 : swap(minhp[cur], minhp[min], t); \
294 : swap(p[cur], p[min], t_oid); \
295 : swap(q[cur], q[min], t_oid); \
296 : } \
297 : } while (cur != min); \
298 : } while (0)
299 :
300 : #define NWAY_MERGE(TYPE) \
301 : do { \
302 : TYPE *minhp, t; \
303 : TYPE *v = (TYPE *) bi.base; \
304 : if ((minhp = GDKmalloc(sizeof(TYPE)*n_ar)) == NULL) { \
305 : goto bailout; \
306 : } \
307 : /* init min heap */ \
308 : for (i = 0; i < n_ar; i++) { \
309 : minhp[i] = v[*p[i] - b->hseqbase]; \
310 : } \
311 : for (i = n_ar/2; i >=0 ; i--) { \
312 : HEAPIFY(i); \
313 : } \
314 : /* merge */ \
315 : *mv++ = *(p[0])++; \
316 : if (p[0] < q[0]) { \
317 : minhp[0] = v[*p[0] - b->hseqbase]; \
318 : HEAPIFY(0); \
319 : } else { \
320 : swap(minhp[0], minhp[n_ar-1], t); \
321 : swap(p[0], p[n_ar-1], t_oid); \
322 : swap(q[0], q[n_ar-1], t_oid); \
323 : n_ar--; \
324 : HEAPIFY(0); \
325 : } \
326 : while (n_ar > 1) { \
327 : *mv++ = *(p[0])++; \
328 : if (p[0] < q[0]) { \
329 : minhp[0] = v[*p[0] - b->hseqbase]; \
330 : HEAPIFY(0); \
331 : } else { \
332 : swap(minhp[0], minhp[n_ar-1], t); \
333 : swap(p[0], p[n_ar-1], t_oid); \
334 : swap(q[0], q[n_ar-1], t_oid); \
335 : n_ar--; \
336 : HEAPIFY(0); \
337 : } \
338 : } \
339 : while (p[0] < q[0]) { \
340 : *mv++ = *(p[0])++; \
341 : } \
342 : GDKfree(minhp); \
343 : } while (0)
344 :
345 : gdk_return
346 0 : GDKmergeidx(BAT *b, BAT**a, int n_ar)
347 : {
348 0 : Heap *m;
349 0 : int i;
350 0 : oid *restrict mv;
351 0 : const char *nme = BBP_physical(b->batCacheid);
352 :
353 0 : if (BATcheckorderidx(b))
354 : return GDK_SUCCEED;
355 0 : switch (ATOMbasetype(b->ttype)) {
356 : case TYPE_bte:
357 : case TYPE_sht:
358 : case TYPE_int:
359 : case TYPE_lng:
360 : #ifdef HAVE_HGE
361 : case TYPE_hge:
362 : #endif
363 : case TYPE_flt:
364 : case TYPE_dbl:
365 0 : break;
366 0 : default:
367 0 : GDKerror("type %s not supported.\n", ATOMname(b->ttype));
368 0 : return GDK_FAIL;
369 : }
370 0 : TRC_DEBUG(ACCELERATOR, "GDKmergeidx(" ALGOBATFMT ") create index\n", ALGOBATPAR(b));
371 0 : BATiter bi = bat_iterator(b);
372 0 : MT_lock_set(&b->batIdxLock);
373 0 : if (b->torderidx) {
374 0 : MT_lock_unset(&b->batIdxLock);
375 0 : bat_iterator_end(&bi);
376 0 : return GDK_SUCCEED;
377 : }
378 0 : if ((m = GDKmalloc(sizeof(Heap))) == NULL) {
379 0 : MT_lock_unset(&b->batIdxLock);
380 0 : bat_iterator_end(&bi);
381 0 : return GDK_FAIL;
382 : }
383 0 : *m = (Heap) {
384 0 : .farmid = BBPselectfarm(b->batRole, bi.type, orderidxheap),
385 0 : .parentid = b->batCacheid,
386 : .dirty = true,
387 : .refs = ATOMIC_VAR_INIT(1),
388 : };
389 0 : strconcat_len(m->filename, sizeof(m->filename),
390 : nme, ".torderidx", NULL);
391 0 : if (m->farmid < 0 ||
392 0 : HEAPalloc(m, BATcount(b) + ORDERIDXOFF, SIZEOF_OID) != GDK_SUCCEED) {
393 0 : GDKfree(m);
394 0 : MT_lock_unset(&b->batIdxLock);
395 0 : bat_iterator_end(&bi);
396 0 : return GDK_FAIL;
397 : }
398 0 : m->free = (BATcount(b) + ORDERIDXOFF) * SIZEOF_OID;
399 :
400 0 : mv = (oid *) m->base;
401 0 : *mv++ = ORDERIDX_VERSION;
402 0 : *mv++ = (oid) BATcount(b);
403 : /* all participating indexes must be stable for the combined
404 : * index to be stable */
405 0 : *mv = 1;
406 0 : for (i = 0; i < n_ar; i++) {
407 0 : if ((*mv &= ((const oid *) a[i]->torderidx->base)[2]) == 0)
408 : break;
409 : }
410 0 : mv++;
411 :
412 0 : if (n_ar == 1) {
413 : /* One oid order bat, nothing to merge */
414 0 : assert(BATcount(a[0]) == BATcount(b));
415 0 : assert((VIEWtparent(a[0]) == b->batCacheid ||
416 : VIEWtparent(a[0]) == VIEWtparent(b)) &&
417 : a[0]->torderidx);
418 0 : memcpy(mv, (const oid *) a[0]->torderidx->base + ORDERIDXOFF,
419 : BATcount(a[0]) * SIZEOF_OID);
420 0 : } else if (n_ar == 2) {
421 : /* sort merge with 1 comparison per BUN */
422 0 : const oid *restrict p0, *restrict p1, *q0, *q1;
423 0 : assert(BATcount(a[0]) + BATcount(a[1]) == BATcount(b));
424 0 : assert((VIEWtparent(a[0]) == b->batCacheid ||
425 : VIEWtparent(a[0]) == VIEWtparent(b)) &&
426 : a[0]->torderidx);
427 0 : assert((VIEWtparent(a[1]) == b->batCacheid ||
428 : VIEWtparent(a[1]) == VIEWtparent(b)) &&
429 : a[1]->torderidx);
430 0 : p0 = (const oid *) a[0]->torderidx->base + ORDERIDXOFF;
431 0 : p1 = (const oid *) a[1]->torderidx->base + ORDERIDXOFF;
432 0 : q0 = p0 + BATcount(a[0]);
433 0 : q1 = p1 + BATcount(a[1]);
434 :
435 0 : switch (ATOMbasetype(bi.type)) {
436 0 : case TYPE_bte: BINARY_MERGE(bte); break;
437 0 : case TYPE_sht: BINARY_MERGE(sht); break;
438 0 : case TYPE_int: BINARY_MERGE(int); break;
439 0 : case TYPE_lng: BINARY_MERGE(lng); break;
440 : #ifdef HAVE_HGE
441 0 : case TYPE_hge: BINARY_MERGE(hge); break;
442 : #endif
443 0 : case TYPE_flt: BINARY_MERGE(flt); break;
444 0 : case TYPE_dbl: BINARY_MERGE(dbl); break;
445 : default:
446 : /* TODO: support strings, date, timestamps etc. */
447 0 : assert(0);
448 : HEAPfree(m, true);
449 : GDKfree(m);
450 : MT_lock_unset(&b->batIdxLock);
451 : bat_iterator_end(&bi);
452 : return GDK_FAIL;
453 : }
454 :
455 : } else {
456 : /* use min-heap */
457 0 : oid **p, **q, *t_oid;
458 :
459 0 : p = GDKmalloc(n_ar*sizeof(oid *));
460 0 : q = GDKmalloc(n_ar*sizeof(oid *));
461 0 : if (p == NULL || q == NULL) {
462 0 : bailout:
463 0 : GDKfree(p);
464 0 : GDKfree(q);
465 0 : HEAPfree(m, true);
466 0 : GDKfree(m);
467 0 : MT_lock_unset(&b->batIdxLock);
468 0 : bat_iterator_end(&bi);
469 0 : return GDK_FAIL;
470 : }
471 0 : for (i = 0; i < n_ar; i++) {
472 0 : assert((VIEWtparent(a[i]) == b->batCacheid ||
473 : VIEWtparent(a[i]) == VIEWtparent(b)) &&
474 : a[i]->torderidx);
475 0 : p[i] = (oid *) a[i]->torderidx->base + ORDERIDXOFF;
476 0 : q[i] = p[i] + BATcount(a[i]);
477 : }
478 :
479 0 : switch (ATOMbasetype(bi.type)) {
480 0 : case TYPE_bte: NWAY_MERGE(bte); break;
481 0 : case TYPE_sht: NWAY_MERGE(sht); break;
482 0 : case TYPE_int: NWAY_MERGE(int); break;
483 0 : case TYPE_lng: NWAY_MERGE(lng); break;
484 : #ifdef HAVE_HGE
485 0 : case TYPE_hge: NWAY_MERGE(hge); break;
486 : #endif
487 0 : case TYPE_flt: NWAY_MERGE(flt); break;
488 0 : case TYPE_dbl: NWAY_MERGE(dbl); break;
489 : case TYPE_void:
490 : case TYPE_str:
491 : case TYPE_ptr:
492 : default:
493 : /* TODO: support strings, date, timestamps etc. */
494 0 : assert(0);
495 : goto bailout;
496 : }
497 0 : GDKfree(p);
498 0 : GDKfree(q);
499 : }
500 :
501 0 : b->torderidx = m;
502 : #ifdef PERSISTENTIDX
503 0 : if ((BBP_status(b->batCacheid) & BBPEXISTING) &&
504 0 : b->batInserted == b->batCount) {
505 0 : MT_Id tid;
506 0 : BBPfix(b->batCacheid);
507 0 : char name[MT_NAME_LEN];
508 0 : snprintf(name, sizeof(name), "oidxsync%d", b->batCacheid);
509 0 : if (MT_create_thread(&tid, BATidxsync, b,
510 : MT_THR_DETACHED, name) < 0)
511 0 : BBPunfix(b->batCacheid);
512 : } else
513 0 : TRC_DEBUG(ACCELERATOR, "GDKmergeidx(%s): NOT persisting index\n", BATgetId(b));
514 : #endif
515 :
516 0 : MT_lock_unset(&b->batIdxLock);
517 0 : bat_iterator_end(&bi);
518 0 : return GDK_SUCCEED;
519 : }
520 :
521 : void
522 521397 : OIDXfree(BAT *b)
523 : {
524 521397 : if (b && b->torderidx) {
525 3 : Heap *hp;
526 :
527 3 : MT_lock_set(&b->batIdxLock);
528 3 : if ((hp = b->torderidx) != NULL && hp != (Heap *) 1) {
529 3 : if (GDKinmemory(b->theap->farmid)) {
530 0 : b->torderidx = NULL;
531 0 : HEAPdecref(hp, true);
532 : } else {
533 3 : b->torderidx = (Heap *) 1;
534 3 : HEAPdecref(hp, false);
535 : }
536 : }
537 3 : MT_lock_unset(&b->batIdxLock);
538 : }
539 521397 : }
540 :
541 : void
542 85366506 : OIDXdestroy(BAT *b)
543 : {
544 85366506 : if (b && b->torderidx) {
545 4412 : Heap *hp;
546 :
547 4412 : MT_lock_set(&b->batIdxLock);
548 4419 : hp = b->torderidx;
549 4419 : b->torderidx = NULL;
550 4419 : MT_lock_unset(&b->batIdxLock);
551 4421 : if (hp == (Heap *) 1) {
552 0 : GDKunlink(BBPselectfarm(b->batRole, b->ttype, orderidxheap),
553 : BATDIR,
554 0 : BBP_physical(b->batCacheid),
555 : "torderidx");
556 4421 : } else if (hp != NULL) {
557 4421 : HEAPdecref(hp, true);
558 : }
559 : }
560 85366512 : }
|