Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024, 2025 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 :
17 : #define ORDERIDX_VERSION ((oid) 3)
18 :
19 : static void
20 6 : BATidxsync(void *arg)
21 : {
22 6 : BAT *b = arg;
23 6 : Heap *hp;
24 6 : int fd;
25 6 : lng t0 = GDKusec();
26 6 : const char *failed = " failed";
27 :
28 :
29 6 : MT_lock_set(&b->batIdxLock);
30 6 : if ((hp = b->torderidx) != NULL) {
31 6 : if (HEAPsave(hp, hp->filename, NULL, true, hp->free, NULL) == GDK_SUCCEED) {
32 6 : if (hp->storage == STORE_MEM) {
33 6 : if ((fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) >= 0) {
34 6 : ((oid *) hp->base)[0] |= (oid) 1 << 24;
35 6 : if (write(fd, hp->base, SIZEOF_OID) >= 0) {
36 6 : failed = ""; /* not failed */
37 6 : if (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)) {
38 : #if defined(NATIVE_WIN32)
39 : _commit(fd);
40 : #elif defined(HAVE_FDATASYNC)
41 0 : fdatasync(fd);
42 : #elif defined(HAVE_FSYNC)
43 : fsync(fd);
44 : #endif
45 : }
46 : } else {
47 0 : hp->dirty = true;
48 0 : perror("write hash");
49 : }
50 6 : close(fd);
51 : }
52 : } else {
53 0 : ((oid *) hp->base)[0] |= (oid) 1 << 24;
54 0 : if (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) &&
55 0 : MT_msync(hp->base, SIZEOF_OID) < 0) {
56 0 : hp->dirty = true;
57 0 : failed = " sync failed";
58 0 : ((oid *) hp->base)[0] &= ~((oid) 1 << 24);
59 : } else {
60 : failed = ""; /* not failed */
61 : }
62 : }
63 6 : TRC_DEBUG(ACCELERATOR, "BATidxsync(%s): orderidx persisted"
64 : " (" LLFMT " usec)%s\n",
65 : BATgetId(b), GDKusec() - t0, failed);
66 : }
67 : }
68 6 : MT_lock_unset(&b->batIdxLock);
69 6 : BBPunfix(b->batCacheid);
70 6 : }
71 :
72 : /* return TRUE if we have a orderidx on the tail, even if we need to read
73 : * one from disk */
74 : bool
75 1930109 : BATcheckorderidx(BAT *b)
76 : {
77 1930109 : bool ret;
78 1930109 : lng t = GDKusec();
79 :
80 1930359 : if (b == NULL)
81 : return false;
82 1930359 : MT_lock_set(&b->batIdxLock);
83 1930324 : if (b->torderidx == (Heap *) 1) {
84 0 : Heap *hp;
85 0 : const char *nme = BBP_physical(b->batCacheid);
86 0 : int fd;
87 :
88 0 : assert(!GDKinmemory(b->theap->farmid));
89 0 : b->torderidx = NULL;
90 0 : if ((hp = GDKzalloc(sizeof(*hp))) != NULL &&
91 0 : (hp->farmid = BBPselectfarm(b->batRole, b->ttype, orderidxheap)) >= 0) {
92 0 : strconcat_len(hp->filename,
93 : sizeof(hp->filename),
94 : nme, ".torderidx", NULL);
95 0 : hp->storage = hp->newstorage = STORE_INVALID;
96 :
97 : /* check whether a persisted orderidx can be found */
98 0 : if ((fd = GDKfdlocate(hp->farmid, nme, "rb+", "torderidx")) >= 0) {
99 0 : struct stat st;
100 0 : oid hdata[ORDERIDXOFF];
101 :
102 0 : if (read(fd, hdata, sizeof(hdata)) == sizeof(hdata) &&
103 0 : hdata[0] == (((oid) 1 << 24) | ORDERIDX_VERSION) &&
104 0 : hdata[1] == (oid) BATcount(b) &&
105 0 : (hdata[2] == 0 || hdata[2] == 1) &&
106 0 : fstat(fd, &st) == 0 &&
107 0 : st.st_size >= (off_t) (hp->size = hp->free = (ORDERIDXOFF + hdata[1]) * SIZEOF_OID) &&
108 0 : HEAPload(hp, nme, "torderidx", false) == GDK_SUCCEED) {
109 0 : close(fd);
110 0 : ATOMIC_INIT(&hp->refs, 1);
111 0 : b->torderidx = hp;
112 0 : hp->hasfile = true;
113 0 : TRC_DEBUG(ACCELERATOR, "BATcheckorderidx(" ALGOBATFMT "): reusing persisted orderidx\n", ALGOBATPAR(b));
114 0 : MT_lock_unset(&b->batIdxLock);
115 0 : return true;
116 : }
117 0 : close(fd);
118 : /* unlink unusable file */
119 0 : GDKunlink(hp->farmid, BATDIR, nme, "torderidx");
120 0 : hp->hasfile = false;
121 : }
122 : }
123 0 : GDKfree(hp);
124 0 : GDKclrerr(); /* we're not currently interested in errors */
125 : }
126 1930324 : MT_lock_unset(&b->batIdxLock);
127 :
128 1930699 : ret = b->torderidx != NULL;
129 1930699 : if (ret)
130 121 : TRC_DEBUG(ACCELERATOR, "BATcheckorderidx(" ALGOBATFMT "): already has orderidx, waited " LLFMT " usec\n", ALGOBATPAR(b), GDKusec() - t);
131 : return ret;
132 : }
133 :
134 : /* create the heap for an order index; returns NULL on failure */
135 : Heap *
136 4635 : createOIDXheap(BAT *b, bool stable)
137 : {
138 4635 : Heap *m;
139 4635 : oid *restrict mv;
140 :
141 4635 : if ((m = GDKmalloc(sizeof(Heap))) == NULL)
142 : return NULL;
143 9274 : *m = (Heap) {
144 4637 : .farmid = BBPselectfarm(b->batRole, b->ttype, orderidxheap),
145 4637 : .parentid = b->batCacheid,
146 : .dirty = true,
147 : .refs = ATOMIC_VAR_INIT(1),
148 : };
149 4637 : strconcat_len(m->filename, sizeof(m->filename),
150 4637 : BBP_physical(b->batCacheid), ".torderidx", NULL);
151 9276 : if (m->farmid < 0 ||
152 4638 : HEAPalloc(m, BATcount(b) + ORDERIDXOFF, SIZEOF_OID) != GDK_SUCCEED) {
153 0 : GDKfree(m);
154 0 : return NULL;
155 : }
156 4638 : m->free = (BATcount(b) + ORDERIDXOFF) * SIZEOF_OID;
157 :
158 4638 : mv = (oid *) m->base;
159 4638 : *mv++ = ORDERIDX_VERSION;
160 4638 : *mv++ = (oid) BATcount(b);
161 4638 : *mv++ = (oid) stable;
162 4638 : return m;
163 : }
164 :
165 : /* maybe persist the order index heap */
166 : void
167 4637 : persistOIDX(BAT *b)
168 : {
169 4637 : if ((BBP_status(b->batCacheid) & BBPEXISTING) &&
170 8 : b->batInserted == b->batCount &&
171 12 : !b->theap->dirty &&
172 12 : !GDKinmemory(b->theap->farmid)) {
173 6 : MT_Id tid;
174 6 : BBPfix(b->batCacheid);
175 6 : char name[MT_NAME_LEN];
176 6 : snprintf(name, sizeof(name), "oidxsync%d", b->batCacheid);
177 6 : if (MT_create_thread(&tid, BATidxsync, b,
178 : MT_THR_DETACHED, name) < 0)
179 0 : BBPunfix(b->batCacheid);
180 : } else
181 4631 : TRC_DEBUG(ACCELERATOR, "persistOIDX(" ALGOBATFMT "): NOT persisting order index\n", ALGOBATPAR(b));
182 4637 : }
183 :
184 : gdk_return
185 73 : BATorderidx(BAT *b, bool stable)
186 : {
187 73 : if (b->ttype == TYPE_void) {
188 0 : GDKerror("No order index on void type bats\n");
189 0 : return GDK_FAIL;
190 : }
191 73 : if (BATcheckorderidx(b))
192 : return GDK_SUCCEED;
193 73 : if (!BATtdense(b)) {
194 73 : BAT *on;
195 73 : MT_thread_setalgorithm("create order index");
196 73 : TRC_DEBUG(ACCELERATOR, "BATorderidx(" ALGOBATFMT ",%d) create index\n", ALGOBATPAR(b), stable);
197 73 : if (BATsort(NULL, &on, NULL, b, NULL, NULL, false, false, stable) != GDK_SUCCEED)
198 0 : return GDK_FAIL;
199 73 : assert(BATcount(b) == BATcount(on));
200 73 : if (BATtdense(on)) {
201 : /* if the order bat is dense, the input was
202 : * sorted and we don't need an order index */
203 0 : MT_lock_set(&b->theaplock);
204 0 : assert(!b->tnosorted);
205 0 : if (!b->tsorted) {
206 0 : b->tsorted = true;
207 0 : b->tnosorted = 0;
208 : }
209 0 : MT_lock_unset(&b->theaplock);
210 : } else {
211 : /* BATsort quite possibly already created the
212 : * order index, but just to be sure... */
213 73 : MT_lock_set(&b->batIdxLock);
214 73 : if (b->torderidx == NULL) {
215 0 : Heap *m;
216 0 : if ((m = createOIDXheap(b, stable)) == NULL) {
217 0 : MT_lock_unset(&b->batIdxLock);
218 0 : return GDK_FAIL;
219 : }
220 0 : memcpy((oid *) m->base + ORDERIDXOFF, Tloc(on, 0), BATcount(on) * sizeof(oid));
221 0 : b->torderidx = m;
222 0 : persistOIDX(b);
223 : }
224 73 : MT_lock_unset(&b->batIdxLock);
225 : }
226 73 : BBPunfix(on->batCacheid);
227 : }
228 : return GDK_SUCCEED;
229 : }
230 :
231 : #define BINARY_MERGE(TYPE) \
232 : do { \
233 : TYPE *v = (TYPE *) bi.base; \
234 : if (p0 < q0 && p1 < q1) { \
235 : if (v[*p0 - b->hseqbase] <= v[*p1 - b->hseqbase]) { \
236 : *mv++ = *p0++; \
237 : } else { \
238 : *mv++ = *p1++; \
239 : } \
240 : } else if (p0 < q0) { \
241 : assert(p1 == q1); \
242 : *mv++ = *p0++; \
243 : } else if (p1 < q1) { \
244 : assert(p0 == q0); \
245 : *mv++ = *p1++; \
246 : } else { \
247 : assert(p0 == q0 && p1 == q1); \
248 : break; \
249 : } \
250 : while (p0 < q0 && p1 < q1) { \
251 : if (v[*p0 - b->hseqbase] <= v[*p1 - b->hseqbase]) { \
252 : *mv++ = *p0++; \
253 : } else { \
254 : *mv++ = *p1++; \
255 : } \
256 : } \
257 : while (p0 < q0) { \
258 : *mv++ = *p0++; \
259 : } \
260 : while (p1 < q1) { \
261 : *mv++ = *p1++; \
262 : } \
263 : } while(0)
264 :
265 : #define swap(X,Y,TMP) (TMP)=(X);(X)=(Y);(Y)=(TMP)
266 :
267 : #define left_child(X) (2*(X)+1)
268 : #define right_child(X) (2*(X)+2)
269 :
270 : #define HEAPIFY(X) \
271 : do { \
272 : int cur, min = X, chld; \
273 : do { \
274 : cur = min; \
275 : if ((chld = left_child(cur)) < n_ar && \
276 : (minhp[chld] < minhp[min] || \
277 : (minhp[chld] == minhp[min] && \
278 : *p[cur] < *p[min]))) { \
279 : min = chld; \
280 : } \
281 : if ((chld = right_child(cur)) < n_ar && \
282 : (minhp[chld] < minhp[min] || \
283 : (minhp[chld] == minhp[min] && \
284 : *p[cur] < *p[min]))) { \
285 : min = chld; \
286 : } \
287 : if (min != cur) { \
288 : swap(minhp[cur], minhp[min], t); \
289 : swap(p[cur], p[min], t_oid); \
290 : swap(q[cur], q[min], t_oid); \
291 : } \
292 : } while (cur != min); \
293 : } while (0)
294 :
295 : #define NWAY_MERGE(TYPE) \
296 : do { \
297 : TYPE *minhp, t; \
298 : TYPE *v = (TYPE *) bi.base; \
299 : if ((minhp = GDKmalloc(sizeof(TYPE)*n_ar)) == NULL) { \
300 : goto bailout; \
301 : } \
302 : /* init min heap */ \
303 : for (i = 0; i < n_ar; i++) { \
304 : minhp[i] = v[*p[i] - b->hseqbase]; \
305 : } \
306 : for (i = n_ar/2; i >=0 ; i--) { \
307 : HEAPIFY(i); \
308 : } \
309 : /* merge */ \
310 : *mv++ = *(p[0])++; \
311 : if (p[0] < q[0]) { \
312 : minhp[0] = v[*p[0] - b->hseqbase]; \
313 : HEAPIFY(0); \
314 : } else { \
315 : swap(minhp[0], minhp[n_ar-1], t); \
316 : swap(p[0], p[n_ar-1], t_oid); \
317 : swap(q[0], q[n_ar-1], t_oid); \
318 : n_ar--; \
319 : HEAPIFY(0); \
320 : } \
321 : while (n_ar > 1) { \
322 : *mv++ = *(p[0])++; \
323 : if (p[0] < q[0]) { \
324 : minhp[0] = v[*p[0] - b->hseqbase]; \
325 : HEAPIFY(0); \
326 : } else { \
327 : swap(minhp[0], minhp[n_ar-1], t); \
328 : swap(p[0], p[n_ar-1], t_oid); \
329 : swap(q[0], q[n_ar-1], t_oid); \
330 : n_ar--; \
331 : HEAPIFY(0); \
332 : } \
333 : } \
334 : while (p[0] < q[0]) { \
335 : *mv++ = *(p[0])++; \
336 : } \
337 : GDKfree(minhp); \
338 : } while (0)
339 :
340 : gdk_return
341 0 : GDKmergeidx(BAT *b, BAT**a, int n_ar)
342 : {
343 0 : Heap *m;
344 0 : int i;
345 0 : oid *restrict mv;
346 0 : const char *nme = BBP_physical(b->batCacheid);
347 :
348 0 : if (BATcheckorderidx(b))
349 : return GDK_SUCCEED;
350 0 : switch (ATOMbasetype(b->ttype)) {
351 : case TYPE_bte:
352 : case TYPE_sht:
353 : case TYPE_int:
354 : case TYPE_lng:
355 : #ifdef HAVE_HGE
356 : case TYPE_hge:
357 : #endif
358 : case TYPE_flt:
359 : case TYPE_dbl:
360 0 : break;
361 0 : default:
362 0 : GDKerror("type %s not supported.\n", ATOMname(b->ttype));
363 0 : return GDK_FAIL;
364 : }
365 0 : TRC_DEBUG(ACCELERATOR, "GDKmergeidx(" ALGOBATFMT ") create index\n", ALGOBATPAR(b));
366 0 : BATiter bi = bat_iterator(b);
367 0 : MT_lock_set(&b->batIdxLock);
368 0 : if (b->torderidx) {
369 0 : MT_lock_unset(&b->batIdxLock);
370 0 : bat_iterator_end(&bi);
371 0 : return GDK_SUCCEED;
372 : }
373 0 : if ((m = GDKmalloc(sizeof(Heap))) == NULL) {
374 0 : MT_lock_unset(&b->batIdxLock);
375 0 : bat_iterator_end(&bi);
376 0 : return GDK_FAIL;
377 : }
378 0 : *m = (Heap) {
379 0 : .farmid = BBPselectfarm(b->batRole, bi.type, orderidxheap),
380 0 : .parentid = b->batCacheid,
381 : .dirty = true,
382 : .refs = ATOMIC_VAR_INIT(1),
383 : };
384 0 : strconcat_len(m->filename, sizeof(m->filename),
385 : nme, ".torderidx", NULL);
386 0 : if (m->farmid < 0 ||
387 0 : HEAPalloc(m, BATcount(b) + ORDERIDXOFF, SIZEOF_OID) != GDK_SUCCEED) {
388 0 : GDKfree(m);
389 0 : MT_lock_unset(&b->batIdxLock);
390 0 : bat_iterator_end(&bi);
391 0 : return GDK_FAIL;
392 : }
393 0 : m->free = (BATcount(b) + ORDERIDXOFF) * SIZEOF_OID;
394 :
395 0 : mv = (oid *) m->base;
396 0 : *mv++ = ORDERIDX_VERSION;
397 0 : *mv++ = (oid) BATcount(b);
398 : /* all participating indexes must be stable for the combined
399 : * index to be stable */
400 0 : *mv = 1;
401 0 : for (i = 0; i < n_ar; i++) {
402 0 : if ((*mv &= ((const oid *) a[i]->torderidx->base)[2]) == 0)
403 : break;
404 : }
405 0 : mv++;
406 :
407 0 : if (n_ar == 1) {
408 : /* One oid order bat, nothing to merge */
409 0 : assert(BATcount(a[0]) == BATcount(b));
410 0 : assert((VIEWtparent(a[0]) == b->batCacheid ||
411 : VIEWtparent(a[0]) == VIEWtparent(b)) &&
412 : a[0]->torderidx);
413 0 : memcpy(mv, (const oid *) a[0]->torderidx->base + ORDERIDXOFF,
414 : BATcount(a[0]) * SIZEOF_OID);
415 0 : } else if (n_ar == 2) {
416 : /* sort merge with 1 comparison per BUN */
417 0 : const oid *restrict p0, *restrict p1, *q0, *q1;
418 0 : assert(BATcount(a[0]) + BATcount(a[1]) == BATcount(b));
419 0 : assert((VIEWtparent(a[0]) == b->batCacheid ||
420 : VIEWtparent(a[0]) == VIEWtparent(b)) &&
421 : a[0]->torderidx);
422 0 : assert((VIEWtparent(a[1]) == b->batCacheid ||
423 : VIEWtparent(a[1]) == VIEWtparent(b)) &&
424 : a[1]->torderidx);
425 0 : p0 = (const oid *) a[0]->torderidx->base + ORDERIDXOFF;
426 0 : p1 = (const oid *) a[1]->torderidx->base + ORDERIDXOFF;
427 0 : q0 = p0 + BATcount(a[0]);
428 0 : q1 = p1 + BATcount(a[1]);
429 :
430 0 : switch (ATOMbasetype(bi.type)) {
431 0 : case TYPE_bte: BINARY_MERGE(bte); break;
432 0 : case TYPE_sht: BINARY_MERGE(sht); break;
433 0 : case TYPE_int: BINARY_MERGE(int); break;
434 0 : case TYPE_lng: BINARY_MERGE(lng); break;
435 : #ifdef HAVE_HGE
436 0 : case TYPE_hge: BINARY_MERGE(hge); break;
437 : #endif
438 0 : case TYPE_flt: BINARY_MERGE(flt); break;
439 0 : case TYPE_dbl: BINARY_MERGE(dbl); break;
440 : default:
441 : /* TODO: support strings, date, timestamps etc. */
442 0 : assert(0);
443 : HEAPfree(m, true);
444 : GDKfree(m);
445 : MT_lock_unset(&b->batIdxLock);
446 : bat_iterator_end(&bi);
447 : return GDK_FAIL;
448 : }
449 :
450 : } else {
451 : /* use min-heap */
452 0 : oid **p, **q, *t_oid;
453 :
454 0 : p = GDKmalloc(n_ar*sizeof(oid *));
455 0 : q = GDKmalloc(n_ar*sizeof(oid *));
456 0 : if (p == NULL || q == NULL) {
457 0 : bailout:
458 0 : GDKfree(p);
459 0 : GDKfree(q);
460 0 : HEAPfree(m, true);
461 0 : GDKfree(m);
462 0 : MT_lock_unset(&b->batIdxLock);
463 0 : bat_iterator_end(&bi);
464 0 : return GDK_FAIL;
465 : }
466 0 : for (i = 0; i < n_ar; i++) {
467 0 : assert((VIEWtparent(a[i]) == b->batCacheid ||
468 : VIEWtparent(a[i]) == VIEWtparent(b)) &&
469 : a[i]->torderidx);
470 0 : p[i] = (oid *) a[i]->torderidx->base + ORDERIDXOFF;
471 0 : q[i] = p[i] + BATcount(a[i]);
472 : }
473 :
474 0 : switch (ATOMbasetype(bi.type)) {
475 0 : case TYPE_bte: NWAY_MERGE(bte); break;
476 0 : case TYPE_sht: NWAY_MERGE(sht); break;
477 0 : case TYPE_int: NWAY_MERGE(int); break;
478 0 : case TYPE_lng: NWAY_MERGE(lng); break;
479 : #ifdef HAVE_HGE
480 0 : case TYPE_hge: NWAY_MERGE(hge); break;
481 : #endif
482 0 : case TYPE_flt: NWAY_MERGE(flt); break;
483 0 : case TYPE_dbl: NWAY_MERGE(dbl); break;
484 : case TYPE_void:
485 : case TYPE_str:
486 : case TYPE_ptr:
487 : default:
488 : /* TODO: support strings, date, timestamps etc. */
489 0 : assert(0);
490 : goto bailout;
491 : }
492 0 : GDKfree(p);
493 0 : GDKfree(q);
494 : }
495 :
496 0 : b->torderidx = m;
497 0 : if ((BBP_status(b->batCacheid) & BBPEXISTING) &&
498 0 : b->batInserted == b->batCount) {
499 0 : MT_Id tid;
500 0 : BBPfix(b->batCacheid);
501 0 : char name[MT_NAME_LEN];
502 0 : snprintf(name, sizeof(name), "oidxsync%d", b->batCacheid);
503 0 : if (MT_create_thread(&tid, BATidxsync, b,
504 : MT_THR_DETACHED, name) < 0)
505 0 : BBPunfix(b->batCacheid);
506 : } else
507 0 : TRC_DEBUG(ACCELERATOR, "GDKmergeidx(%s): NOT persisting index\n", BATgetId(b));
508 :
509 0 : MT_lock_unset(&b->batIdxLock);
510 0 : bat_iterator_end(&bi);
511 0 : return GDK_SUCCEED;
512 : }
513 :
514 : void
515 582418 : OIDXfree(BAT *b)
516 : {
517 582418 : if (b) {
518 582418 : Heap *hp;
519 :
520 582418 : MT_lock_set(&b->batIdxLock);
521 582418 : if ((hp = b->torderidx) != NULL && hp != (Heap *) 1) {
522 3 : if (GDKinmemory(b->theap->farmid)) {
523 0 : b->torderidx = NULL;
524 0 : HEAPdecref(hp, true);
525 : } else {
526 3 : b->torderidx = (Heap *) 1;
527 3 : HEAPdecref(hp, false);
528 : }
529 : }
530 582418 : MT_lock_unset(&b->batIdxLock);
531 : }
532 582418 : }
533 :
534 : void
535 83400622 : OIDXdestroy(BAT *b)
536 : {
537 83400622 : if (b) {
538 83400622 : Heap *hp;
539 :
540 83400622 : MT_lock_set(&b->batIdxLock);
541 83502583 : hp = b->torderidx;
542 83502583 : b->torderidx = NULL;
543 83502583 : MT_lock_unset(&b->batIdxLock);
544 83612715 : if (hp == (Heap *) 1) {
545 0 : GDKunlink(BBPselectfarm(b->batRole, b->ttype, orderidxheap),
546 : BATDIR,
547 0 : BBP_physical(b->batCacheid),
548 : "torderidx");
549 83612715 : } else if (hp != NULL) {
550 4636 : HEAPdecref(hp, true);
551 : }
552 : }
553 83612714 : }
|