Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 :
17 : #define ORDERIDX_VERSION ((oid) 3)
18 :
19 : static void
20 6 : BATidxsync(void *arg)
21 : {
22 6 : BAT *b = arg;
23 6 : Heap *hp;
24 6 : int fd;
25 6 : lng t0 = GDKusec();
26 6 : const char *failed = " failed";
27 :
28 :
29 6 : MT_lock_set(&b->batIdxLock);
30 6 : if ((hp = b->torderidx) != NULL) {
31 6 : if (HEAPsave(hp, hp->filename, NULL, true, hp->free, NULL) == GDK_SUCCEED) {
32 6 : if (hp->storage == STORE_MEM) {
33 6 : if ((fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) >= 0) {
34 6 : ((oid *) hp->base)[0] |= (oid) 1 << 24;
35 6 : if (write(fd, hp->base, SIZEOF_OID) >= 0) {
36 6 : failed = ""; /* not failed */
37 6 : if (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)) {
38 : #if defined(NATIVE_WIN32)
39 : _commit(fd);
40 : #elif defined(HAVE_FDATASYNC)
41 0 : fdatasync(fd);
42 : #elif defined(HAVE_FSYNC)
43 : fsync(fd);
44 : #endif
45 : }
46 6 : hp->dirty = false;
47 : } else {
48 0 : perror("write hash");
49 : }
50 6 : close(fd);
51 : }
52 : } else {
53 0 : ((oid *) hp->base)[0] |= (oid) 1 << 24;
54 0 : if (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) &&
55 0 : MT_msync(hp->base, SIZEOF_OID) < 0) {
56 0 : ((oid *) hp->base)[0] &= ~((oid) 1 << 24);
57 : } else {
58 0 : hp->dirty = false;
59 0 : failed = ""; /* not failed */
60 : }
61 : }
62 6 : TRC_DEBUG(ACCELERATOR, "BATidxsync(%s): orderidx persisted"
63 : " (" LLFMT " usec)%s\n",
64 : BATgetId(b), GDKusec() - t0, failed);
65 : }
66 : }
67 6 : MT_lock_unset(&b->batIdxLock);
68 6 : BBPunfix(b->batCacheid);
69 6 : }
70 :
71 : /* return TRUE if we have a orderidx on the tail, even if we need to read
72 : * one from disk */
73 : bool
74 1826405 : BATcheckorderidx(BAT *b)
75 : {
76 1826405 : bool ret;
77 1826405 : lng t = GDKusec();
78 :
79 1826393 : if (b == NULL)
80 : return false;
81 1826393 : MT_lock_set(&b->batIdxLock);
82 1826338 : if (b->torderidx == (Heap *) 1) {
83 0 : Heap *hp;
84 0 : const char *nme = BBP_physical(b->batCacheid);
85 0 : int fd;
86 :
87 0 : assert(!GDKinmemory(b->theap->farmid));
88 0 : b->torderidx = NULL;
89 0 : if ((hp = GDKzalloc(sizeof(*hp))) != NULL &&
90 0 : (hp->farmid = BBPselectfarm(b->batRole, b->ttype, orderidxheap)) >= 0) {
91 0 : strconcat_len(hp->filename,
92 : sizeof(hp->filename),
93 : nme, ".torderidx", NULL);
94 0 : hp->storage = hp->newstorage = STORE_INVALID;
95 :
96 : /* check whether a persisted orderidx can be found */
97 0 : if ((fd = GDKfdlocate(hp->farmid, nme, "rb+", "torderidx")) >= 0) {
98 0 : struct stat st;
99 0 : oid hdata[ORDERIDXOFF];
100 :
101 0 : if (read(fd, hdata, sizeof(hdata)) == sizeof(hdata) &&
102 0 : hdata[0] == (((oid) 1 << 24) | ORDERIDX_VERSION) &&
103 0 : hdata[1] == (oid) BATcount(b) &&
104 0 : (hdata[2] == 0 || hdata[2] == 1) &&
105 0 : fstat(fd, &st) == 0 &&
106 0 : st.st_size >= (off_t) (hp->size = hp->free = (ORDERIDXOFF + hdata[1]) * SIZEOF_OID) &&
107 0 : HEAPload(hp, nme, "torderidx", false) == GDK_SUCCEED) {
108 0 : close(fd);
109 0 : ATOMIC_INIT(&hp->refs, 1);
110 0 : b->torderidx = hp;
111 0 : hp->hasfile = true;
112 0 : TRC_DEBUG(ACCELERATOR, "BATcheckorderidx(" ALGOBATFMT "): reusing persisted orderidx\n", ALGOBATPAR(b));
113 0 : MT_lock_unset(&b->batIdxLock);
114 0 : return true;
115 : }
116 0 : close(fd);
117 : /* unlink unusable file */
118 0 : GDKunlink(hp->farmid, BATDIR, nme, "torderidx");
119 0 : hp->hasfile = false;
120 : }
121 : }
122 0 : GDKfree(hp);
123 0 : GDKclrerr(); /* we're not currently interested in errors */
124 : }
125 1826338 : MT_lock_unset(&b->batIdxLock);
126 :
127 1826398 : ret = b->torderidx != NULL;
128 1826398 : if (ret)
129 141 : TRC_DEBUG(ACCELERATOR, "BATcheckorderidx(" ALGOBATFMT "): already has orderidx, waited " LLFMT " usec\n", ALGOBATPAR(b), GDKusec() - t);
130 : return ret;
131 : }
132 :
133 : /* create the heap for an order index; returns NULL on failure */
134 : Heap *
135 3875 : createOIDXheap(BAT *b, bool stable)
136 : {
137 3875 : Heap *m;
138 3875 : oid *restrict mv;
139 :
140 3875 : if ((m = GDKmalloc(sizeof(Heap))) == NULL)
141 : return NULL;
142 7750 : *m = (Heap) {
143 3875 : .farmid = BBPselectfarm(b->batRole, b->ttype, orderidxheap),
144 3875 : .parentid = b->batCacheid,
145 : .dirty = true,
146 : .refs = ATOMIC_VAR_INIT(1),
147 : };
148 3875 : strconcat_len(m->filename, sizeof(m->filename),
149 3875 : BBP_physical(b->batCacheid), ".torderidx", NULL);
150 7750 : if (m->farmid < 0 ||
151 3875 : HEAPalloc(m, BATcount(b) + ORDERIDXOFF, SIZEOF_OID) != GDK_SUCCEED) {
152 0 : GDKfree(m);
153 0 : return NULL;
154 : }
155 3875 : m->free = (BATcount(b) + ORDERIDXOFF) * SIZEOF_OID;
156 :
157 3875 : mv = (oid *) m->base;
158 3875 : *mv++ = ORDERIDX_VERSION;
159 3875 : *mv++ = (oid) BATcount(b);
160 3875 : *mv++ = (oid) stable;
161 3875 : return m;
162 : }
163 :
164 : /* maybe persist the order index heap */
165 : void
166 3875 : persistOIDX(BAT *b)
167 : {
168 3875 : if ((BBP_status(b->batCacheid) & BBPEXISTING) &&
169 8 : b->batInserted == b->batCount &&
170 12 : !b->theap->dirty &&
171 12 : !GDKinmemory(b->theap->farmid)) {
172 6 : MT_Id tid;
173 6 : BBPfix(b->batCacheid);
174 6 : char name[MT_NAME_LEN];
175 6 : snprintf(name, sizeof(name), "oidxsync%d", b->batCacheid);
176 6 : if (MT_create_thread(&tid, BATidxsync, b,
177 : MT_THR_DETACHED, name) < 0)
178 0 : BBPunfix(b->batCacheid);
179 : } else
180 3869 : TRC_DEBUG(ACCELERATOR, "persistOIDX(" ALGOBATFMT "): NOT persisting order index\n", ALGOBATPAR(b));
181 3875 : }
182 :
183 : gdk_return
184 73 : BATorderidx(BAT *b, bool stable)
185 : {
186 73 : if (b->ttype == TYPE_void) {
187 0 : GDKerror("No order index on void type bats\n");
188 0 : return GDK_FAIL;
189 : }
190 73 : if (BATcheckorderidx(b))
191 : return GDK_SUCCEED;
192 73 : if (!BATtdense(b)) {
193 73 : BAT *on;
194 73 : MT_thread_setalgorithm("create order index");
195 73 : TRC_DEBUG(ACCELERATOR, "BATorderidx(" ALGOBATFMT ",%d) create index\n", ALGOBATPAR(b), stable);
196 73 : if (BATsort(NULL, &on, NULL, b, NULL, NULL, false, false, stable) != GDK_SUCCEED)
197 0 : return GDK_FAIL;
198 73 : assert(BATcount(b) == BATcount(on));
199 73 : if (BATtdense(on)) {
200 : /* if the order bat is dense, the input was
201 : * sorted and we don't need an order index */
202 0 : MT_lock_set(&b->theaplock);
203 0 : assert(!b->tnosorted);
204 0 : if (!b->tsorted) {
205 0 : b->tsorted = true;
206 0 : b->tnosorted = 0;
207 : }
208 0 : MT_lock_unset(&b->theaplock);
209 : } else {
210 : /* BATsort quite possibly already created the
211 : * order index, but just to be sure... */
212 73 : MT_lock_set(&b->batIdxLock);
213 73 : if (b->torderidx == NULL) {
214 0 : Heap *m;
215 0 : if ((m = createOIDXheap(b, stable)) == NULL) {
216 0 : MT_lock_unset(&b->batIdxLock);
217 0 : return GDK_FAIL;
218 : }
219 0 : memcpy((oid *) m->base + ORDERIDXOFF, Tloc(on, 0), BATcount(on) * sizeof(oid));
220 0 : b->torderidx = m;
221 0 : persistOIDX(b);
222 : }
223 73 : MT_lock_unset(&b->batIdxLock);
224 : }
225 73 : BBPunfix(on->batCacheid);
226 : }
227 : return GDK_SUCCEED;
228 : }
229 :
230 : #define BINARY_MERGE(TYPE) \
231 : do { \
232 : TYPE *v = (TYPE *) bi.base; \
233 : if (p0 < q0 && p1 < q1) { \
234 : if (v[*p0 - b->hseqbase] <= v[*p1 - b->hseqbase]) { \
235 : *mv++ = *p0++; \
236 : } else { \
237 : *mv++ = *p1++; \
238 : } \
239 : } else if (p0 < q0) { \
240 : assert(p1 == q1); \
241 : *mv++ = *p0++; \
242 : } else if (p1 < q1) { \
243 : assert(p0 == q0); \
244 : *mv++ = *p1++; \
245 : } else { \
246 : assert(p0 == q0 && p1 == q1); \
247 : break; \
248 : } \
249 : while (p0 < q0 && p1 < q1) { \
250 : if (v[*p0 - b->hseqbase] <= v[*p1 - b->hseqbase]) { \
251 : *mv++ = *p0++; \
252 : } else { \
253 : *mv++ = *p1++; \
254 : } \
255 : } \
256 : while (p0 < q0) { \
257 : *mv++ = *p0++; \
258 : } \
259 : while (p1 < q1) { \
260 : *mv++ = *p1++; \
261 : } \
262 : } while(0)
263 :
264 : #define swap(X,Y,TMP) (TMP)=(X);(X)=(Y);(Y)=(TMP)
265 :
266 : #define left_child(X) (2*(X)+1)
267 : #define right_child(X) (2*(X)+2)
268 :
269 : #define HEAPIFY(X) \
270 : do { \
271 : int cur, min = X, chld; \
272 : do { \
273 : cur = min; \
274 : if ((chld = left_child(cur)) < n_ar && \
275 : (minhp[chld] < minhp[min] || \
276 : (minhp[chld] == minhp[min] && \
277 : *p[cur] < *p[min]))) { \
278 : min = chld; \
279 : } \
280 : if ((chld = right_child(cur)) < n_ar && \
281 : (minhp[chld] < minhp[min] || \
282 : (minhp[chld] == minhp[min] && \
283 : *p[cur] < *p[min]))) { \
284 : min = chld; \
285 : } \
286 : if (min != cur) { \
287 : swap(minhp[cur], minhp[min], t); \
288 : swap(p[cur], p[min], t_oid); \
289 : swap(q[cur], q[min], t_oid); \
290 : } \
291 : } while (cur != min); \
292 : } while (0)
293 :
294 : #define NWAY_MERGE(TYPE) \
295 : do { \
296 : TYPE *minhp, t; \
297 : TYPE *v = (TYPE *) bi.base; \
298 : if ((minhp = GDKmalloc(sizeof(TYPE)*n_ar)) == NULL) { \
299 : goto bailout; \
300 : } \
301 : /* init min heap */ \
302 : for (i = 0; i < n_ar; i++) { \
303 : minhp[i] = v[*p[i] - b->hseqbase]; \
304 : } \
305 : for (i = n_ar/2; i >=0 ; i--) { \
306 : HEAPIFY(i); \
307 : } \
308 : /* merge */ \
309 : *mv++ = *(p[0])++; \
310 : if (p[0] < q[0]) { \
311 : minhp[0] = v[*p[0] - b->hseqbase]; \
312 : HEAPIFY(0); \
313 : } else { \
314 : swap(minhp[0], minhp[n_ar-1], t); \
315 : swap(p[0], p[n_ar-1], t_oid); \
316 : swap(q[0], q[n_ar-1], t_oid); \
317 : n_ar--; \
318 : HEAPIFY(0); \
319 : } \
320 : while (n_ar > 1) { \
321 : *mv++ = *(p[0])++; \
322 : if (p[0] < q[0]) { \
323 : minhp[0] = v[*p[0] - b->hseqbase]; \
324 : HEAPIFY(0); \
325 : } else { \
326 : swap(minhp[0], minhp[n_ar-1], t); \
327 : swap(p[0], p[n_ar-1], t_oid); \
328 : swap(q[0], q[n_ar-1], t_oid); \
329 : n_ar--; \
330 : HEAPIFY(0); \
331 : } \
332 : } \
333 : while (p[0] < q[0]) { \
334 : *mv++ = *(p[0])++; \
335 : } \
336 : GDKfree(minhp); \
337 : } while (0)
338 :
339 : gdk_return
340 0 : GDKmergeidx(BAT *b, BAT**a, int n_ar)
341 : {
342 0 : Heap *m;
343 0 : int i;
344 0 : oid *restrict mv;
345 0 : const char *nme = BBP_physical(b->batCacheid);
346 :
347 0 : if (BATcheckorderidx(b))
348 : return GDK_SUCCEED;
349 0 : switch (ATOMbasetype(b->ttype)) {
350 : case TYPE_bte:
351 : case TYPE_sht:
352 : case TYPE_int:
353 : case TYPE_lng:
354 : #ifdef HAVE_HGE
355 : case TYPE_hge:
356 : #endif
357 : case TYPE_flt:
358 : case TYPE_dbl:
359 0 : break;
360 0 : default:
361 0 : GDKerror("type %s not supported.\n", ATOMname(b->ttype));
362 0 : return GDK_FAIL;
363 : }
364 0 : TRC_DEBUG(ACCELERATOR, "GDKmergeidx(" ALGOBATFMT ") create index\n", ALGOBATPAR(b));
365 0 : BATiter bi = bat_iterator(b);
366 0 : MT_lock_set(&b->batIdxLock);
367 0 : if (b->torderidx) {
368 0 : MT_lock_unset(&b->batIdxLock);
369 0 : bat_iterator_end(&bi);
370 0 : return GDK_SUCCEED;
371 : }
372 0 : if ((m = GDKmalloc(sizeof(Heap))) == NULL) {
373 0 : MT_lock_unset(&b->batIdxLock);
374 0 : bat_iterator_end(&bi);
375 0 : return GDK_FAIL;
376 : }
377 0 : *m = (Heap) {
378 0 : .farmid = BBPselectfarm(b->batRole, bi.type, orderidxheap),
379 0 : .parentid = b->batCacheid,
380 : .dirty = true,
381 : .refs = ATOMIC_VAR_INIT(1),
382 : };
383 0 : strconcat_len(m->filename, sizeof(m->filename),
384 : nme, ".torderidx", NULL);
385 0 : if (m->farmid < 0 ||
386 0 : HEAPalloc(m, BATcount(b) + ORDERIDXOFF, SIZEOF_OID) != GDK_SUCCEED) {
387 0 : GDKfree(m);
388 0 : MT_lock_unset(&b->batIdxLock);
389 0 : bat_iterator_end(&bi);
390 0 : return GDK_FAIL;
391 : }
392 0 : m->free = (BATcount(b) + ORDERIDXOFF) * SIZEOF_OID;
393 :
394 0 : mv = (oid *) m->base;
395 0 : *mv++ = ORDERIDX_VERSION;
396 0 : *mv++ = (oid) BATcount(b);
397 : /* all participating indexes must be stable for the combined
398 : * index to be stable */
399 0 : *mv = 1;
400 0 : for (i = 0; i < n_ar; i++) {
401 0 : if ((*mv &= ((const oid *) a[i]->torderidx->base)[2]) == 0)
402 : break;
403 : }
404 0 : mv++;
405 :
406 0 : if (n_ar == 1) {
407 : /* One oid order bat, nothing to merge */
408 0 : assert(BATcount(a[0]) == BATcount(b));
409 0 : assert((VIEWtparent(a[0]) == b->batCacheid ||
410 : VIEWtparent(a[0]) == VIEWtparent(b)) &&
411 : a[0]->torderidx);
412 0 : memcpy(mv, (const oid *) a[0]->torderidx->base + ORDERIDXOFF,
413 : BATcount(a[0]) * SIZEOF_OID);
414 0 : } else if (n_ar == 2) {
415 : /* sort merge with 1 comparison per BUN */
416 0 : const oid *restrict p0, *restrict p1, *q0, *q1;
417 0 : assert(BATcount(a[0]) + BATcount(a[1]) == BATcount(b));
418 0 : assert((VIEWtparent(a[0]) == b->batCacheid ||
419 : VIEWtparent(a[0]) == VIEWtparent(b)) &&
420 : a[0]->torderidx);
421 0 : assert((VIEWtparent(a[1]) == b->batCacheid ||
422 : VIEWtparent(a[1]) == VIEWtparent(b)) &&
423 : a[1]->torderidx);
424 0 : p0 = (const oid *) a[0]->torderidx->base + ORDERIDXOFF;
425 0 : p1 = (const oid *) a[1]->torderidx->base + ORDERIDXOFF;
426 0 : q0 = p0 + BATcount(a[0]);
427 0 : q1 = p1 + BATcount(a[1]);
428 :
429 0 : switch (ATOMbasetype(bi.type)) {
430 0 : case TYPE_bte: BINARY_MERGE(bte); break;
431 0 : case TYPE_sht: BINARY_MERGE(sht); break;
432 0 : case TYPE_int: BINARY_MERGE(int); break;
433 0 : case TYPE_lng: BINARY_MERGE(lng); break;
434 : #ifdef HAVE_HGE
435 0 : case TYPE_hge: BINARY_MERGE(hge); break;
436 : #endif
437 0 : case TYPE_flt: BINARY_MERGE(flt); break;
438 0 : case TYPE_dbl: BINARY_MERGE(dbl); break;
439 : default:
440 : /* TODO: support strings, date, timestamps etc. */
441 0 : assert(0);
442 : HEAPfree(m, true);
443 : GDKfree(m);
444 : MT_lock_unset(&b->batIdxLock);
445 : bat_iterator_end(&bi);
446 : return GDK_FAIL;
447 : }
448 :
449 : } else {
450 : /* use min-heap */
451 0 : oid **p, **q, *t_oid;
452 :
453 0 : p = GDKmalloc(n_ar*sizeof(oid *));
454 0 : q = GDKmalloc(n_ar*sizeof(oid *));
455 0 : if (p == NULL || q == NULL) {
456 0 : bailout:
457 0 : GDKfree(p);
458 0 : GDKfree(q);
459 0 : HEAPfree(m, true);
460 0 : GDKfree(m);
461 0 : MT_lock_unset(&b->batIdxLock);
462 0 : bat_iterator_end(&bi);
463 0 : return GDK_FAIL;
464 : }
465 0 : for (i = 0; i < n_ar; i++) {
466 0 : assert((VIEWtparent(a[i]) == b->batCacheid ||
467 : VIEWtparent(a[i]) == VIEWtparent(b)) &&
468 : a[i]->torderidx);
469 0 : p[i] = (oid *) a[i]->torderidx->base + ORDERIDXOFF;
470 0 : q[i] = p[i] + BATcount(a[i]);
471 : }
472 :
473 0 : switch (ATOMbasetype(bi.type)) {
474 0 : case TYPE_bte: NWAY_MERGE(bte); break;
475 0 : case TYPE_sht: NWAY_MERGE(sht); break;
476 0 : case TYPE_int: NWAY_MERGE(int); break;
477 0 : case TYPE_lng: NWAY_MERGE(lng); break;
478 : #ifdef HAVE_HGE
479 0 : case TYPE_hge: NWAY_MERGE(hge); break;
480 : #endif
481 0 : case TYPE_flt: NWAY_MERGE(flt); break;
482 0 : case TYPE_dbl: NWAY_MERGE(dbl); break;
483 : case TYPE_void:
484 : case TYPE_str:
485 : case TYPE_ptr:
486 : default:
487 : /* TODO: support strings, date, timestamps etc. */
488 0 : assert(0);
489 : goto bailout;
490 : }
491 0 : GDKfree(p);
492 0 : GDKfree(q);
493 : }
494 :
495 0 : b->torderidx = m;
496 0 : if ((BBP_status(b->batCacheid) & BBPEXISTING) &&
497 0 : b->batInserted == b->batCount) {
498 0 : MT_Id tid;
499 0 : BBPfix(b->batCacheid);
500 0 : char name[MT_NAME_LEN];
501 0 : snprintf(name, sizeof(name), "oidxsync%d", b->batCacheid);
502 0 : if (MT_create_thread(&tid, BATidxsync, b,
503 : MT_THR_DETACHED, name) < 0)
504 0 : BBPunfix(b->batCacheid);
505 : } else
506 0 : TRC_DEBUG(ACCELERATOR, "GDKmergeidx(%s): NOT persisting index\n", BATgetId(b));
507 :
508 0 : MT_lock_unset(&b->batIdxLock);
509 0 : bat_iterator_end(&bi);
510 0 : return GDK_SUCCEED;
511 : }
512 :
513 : void
514 291481 : OIDXfree(BAT *b)
515 : {
516 291481 : if (b) {
517 291481 : Heap *hp;
518 :
519 291481 : MT_lock_set(&b->batIdxLock);
520 291481 : if ((hp = b->torderidx) != NULL && hp != (Heap *) 1) {
521 3 : if (GDKinmemory(b->theap->farmid)) {
522 0 : b->torderidx = NULL;
523 0 : HEAPdecref(hp, true);
524 : } else {
525 3 : b->torderidx = (Heap *) 1;
526 3 : HEAPdecref(hp, false);
527 : }
528 : }
529 291481 : MT_lock_unset(&b->batIdxLock);
530 : }
531 291481 : }
532 :
533 : void
534 71766594 : OIDXdestroy(BAT *b)
535 : {
536 71766594 : if (b) {
537 71766594 : Heap *hp;
538 :
539 71766594 : MT_lock_set(&b->batIdxLock);
540 71745011 : hp = b->torderidx;
541 71745011 : b->torderidx = NULL;
542 71745011 : MT_lock_unset(&b->batIdxLock);
543 71758304 : if (hp == (Heap *) 1) {
544 0 : GDKunlink(BBPselectfarm(b->batRole, b->ttype, orderidxheap),
545 : BATDIR,
546 0 : BBP_physical(b->batCacheid),
547 : "torderidx");
548 71758304 : } else if (hp != NULL) {
549 3872 : HEAPdecref(hp, true);
550 : }
551 : }
552 71758304 : }
|