Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * Implementation for the column imprints index.
15 : * See paper:
16 : * Column Imprints: A Secondary Index Structure,
17 : * L.Sidirourgos and M.Kersten.
18 : */
19 :
20 : #include "monetdb_config.h"
21 : #include "gdk.h"
22 : #include "gdk_private.h"
23 : #include "gdk_imprints.h"
24 :
25 : /*
26 : * The imprints heap consists of five parts:
27 : * - header
28 : * - bins
29 : * - stats
30 : * - imps
31 : * - dict
32 : *
33 : * The header consists of four size_t values `size_t hdata[4]':
34 : * - hdata[0] = (1 << 16) | (IMPRINTS_VERSION << 8) | (uint8_t) imprints->bits
35 : * - hdata[1] = imprints->impcnt
36 : * - hdata[2] = imprints->dictcnt
37 : * - hdata[3] = BATcount(b)
38 : * The first word of the header includes a version number in case the
39 : * format changes, and a bit to indicate that the data was synced to disk.
40 : * This bit is the last thing written, so if it is set when reading back
41 : * the imprints heap, the data is complete. The fourth word is the size of
42 : * the BAT for which the imprints were created. If this size differs from
43 : * the size of the BAT at the time of reading the imprints back from disk,
44 : * the imprints are not usable.
45 : *
46 : * The bins area starts immediately after the header. It consists of 64
47 : * values in the domain of the BAT `TYPE bins[64]'.
48 : *
49 : * The stats area starts immediately after the bins area. It consists of
50 : * three times an array of 64 64 (32) bit integers `BUN stats[3][64]'. The
51 : * three arrays represent respectively min, max, and cnt for each of the 64
52 : * bins, so stats can be seen as `BUN min_bins[64]; BUN max_bins[64]; BUN
53 : * cnt_bins[64];'. The min and max values are positions of the smallest
54 : * and largest non-nil value in the corresponding bin.
55 : *
56 : * The imps area starts immediately after the stats area. It consists of
57 : * one mask per "page" of the input BAT indicating in which bins the values
58 : * in that page fall. The size of the mask is given by imprints->bits.
59 : * The list of masks may be run-length compressed, see the dict area. A
60 : * "page" is 64 bytes worth of values, so the number of values depends on
61 : * the type of the value.
62 : *
63 : * The dict area starts immediately after the imps area. It consists of
64 : * one cchdc_t value per "page" of the input. The precise use is described
65 : * below.
66 : *
67 : * There are up to 64 bins into which values are sorted. The number of
68 : * bins depends on the number of unique values in the input BAT (actually
69 : * on the number of unique values in a random sample of 2048 values of the
70 : * input BAT) and is 8, 16, 32, or 64. The number of bits in the mask is
71 : * stored in imprints->bits. The boundaries of the bins are dynamically
72 : * determined when the imprints are created and stored in the bins array.
73 : * In fact, bins[n] contains the lower boundary of the n-th bin (0 <= n <
74 : * N, N the number of bins). The value of bins[0] is not actually used:
75 : * all values smaller than bins[1] are sorted into this bin, including NIL.
76 : * The boundaries are simply computed by stepping with large steps through
77 : * the sorted sample and taking 63 (or 31, 15, 7) equally spaced values
78 : * from there.
79 : *
80 : * Once the appropriate bin n is determined for a particular value v
81 : * (bins[n] <= v < bins[n+1]), a bitmask can be constructed for the value
82 : * as ((uintN_t)1 << n) where N is the number of bits that are used for the
83 : * bitmasks and n is the number of the bin (0 <= n < N).
84 : *
85 : * The input BAT is divided into "pages" where a page is 64 bytes. This
86 : * means the number of rows in a page depends on the size of the values: 64
87 : * for byte-sized values down to 4 for hugeint (128 bit) values. For each
88 : * page, a bitmask is created which is the imprint for that page. The
89 : * bitmask has a bit set for each bin into which a value inside the page
90 : * falls. These bitmasks (imprints) are stored in the imprints->imps
91 : * array, but with a twist, see below.
92 : *
93 : * The imprints->dict array is an array of cchdc_t values. A cchdc_t value
94 : * consists of a bit .repeat and a 24-bit value .cnt. The sum of the .cnt
95 : * values is equal to the total number of pages in the input BAT. If the
96 : * .repeat value is 0, there are .cnt consecutive imprint bitmasks in the
97 : * imprints->imps array, each for one page. If the .repeat value is 1,
98 : * there is a single imprint bitmask in the imprints->imps array which is
99 : * valid for the next .cnt pages. In this way a run-length encoding
100 : * compression scheme is implemented for imprints.
101 : *
102 : * Imprints are used for range selects, i.e. finding all rows in a BAT
103 : * whose value is inside some given range, or alternatively, all rows in a
104 : * BAT whose value is outside some given range (anti select).
105 : *
106 : * A range necessarily covers one or more consecutive bins. A bit mask is
107 : * created for all bins that fall fully inside the range being selected (in
108 : * gdk_select.c called "innermask"), and a bit mask is created for all bins
109 : * that fall fully or partially inside the range (called "mask" in
110 : * gdk_select.c). Note that for an "anti" select, i.e. a select which
111 : * matches everything except a given range, the bits in the bit masks are
112 : * not consecutive.
113 : *
114 : * We then go through the imps table. All pages where the only set bits
115 : * are also set in "innermask" can be blindly added to the result: all
116 : * values fall inside the range. All pages where none of the set bits are
117 : * also set in "mask" can be blindly skipped: no value falls inside the
118 : * range. For the remaining pages, we scan the page and check each
119 : * individual value to see whether it is selected.
120 : *
121 : * Extra speed up is achieved by the run-length encoding of the imps table.
122 : * If a mask is in the category of fully inside the range or fully outside,
123 : * the complete set of pages can be added/skipped in one go.
124 : */
125 :
126 : #define IMPRINTS_VERSION 2
127 : #define IMPRINTS_HEADER_SIZE 4 /* nr of size_t fields in header */
128 :
129 : #define BINSIZE(B, FUNC, T) \
130 : do { \
131 : switch (B) { \
132 : case 8: FUNC(T,8); break; \
133 : case 16: FUNC(T,16); break; \
134 : case 32: FUNC(T,32); break; \
135 : case 64: FUNC(T,64); break; \
136 : default: MT_UNREACHABLE(); break; \
137 : } \
138 : } while (0)
139 :
140 :
141 : #define GETBIN(Z,X,B) \
142 : do { \
143 : Z = 0; \
144 : for (int _i = 1; _i < B; _i++) \
145 : Z += ((X) >= bins[_i]); \
146 : } while (0)
147 :
148 :
149 : #define IMPS_CREATE(TYPE,B) \
150 : do { \
151 : uint##B##_t mask, prvmask; \
152 : uint##B##_t *restrict im = (uint##B##_t *) imps; \
153 : const TYPE *restrict col = (TYPE *) bi->base; \
154 : const TYPE *restrict bins = (TYPE *) inbins; \
155 : const BUN page = IMPS_PAGE / sizeof(TYPE); \
156 : prvmask = 0; \
157 : for (i = 0; i < bi->count; ) { \
158 : const BUN lim = MIN(i + page, bi->count); \
159 : /* new mask */ \
160 : mask = 0; \
161 : /* build mask for all BUNs in one PAGE */ \
162 : for ( ; i < lim; i++) { \
163 : const TYPE val = col[i]; \
164 : GETBIN(bin,val,B); \
165 : mask = IMPSsetBit(B,mask,bin); \
166 : if (!is_##TYPE##_nil(val)) { /* do not count nils */ \
167 : if (!cnt_bins[bin]++) { \
168 : /* first in the bin */ \
169 : min_bins[bin] = max_bins[bin] = i; \
170 : } else { \
171 : if (val < col[min_bins[bin]]) \
172 : min_bins[bin] = i; \
173 : if (val > col[max_bins[bin]]) \
174 : max_bins[bin] = i; \
175 : } \
176 : } \
177 : } \
178 : /* same mask as previous and enough count to add */ \
179 : if ((prvmask == mask) && (dcnt > 0) && \
180 : (dict[dcnt-1].cnt < (IMPS_MAX_CNT-1))) { \
181 : /* not a repeat header */ \
182 : if (!dict[dcnt-1].repeat) { \
183 : /* if compressed */ \
184 : if (dict[dcnt-1].cnt > 1) { \
185 : /* uncompress last */ \
186 : dict[dcnt-1].cnt--; \
187 : /* new header */ \
188 : dict[dcnt].cnt = 1; \
189 : dict[dcnt].flags = 0; \
190 : dcnt++; \
191 : } \
192 : /* set repeat */ \
193 : dict[dcnt-1].repeat = 1; \
194 : } \
195 : /* increase cnt */ \
196 : dict[dcnt-1].cnt++; \
197 : } else { /* new mask (or run out of header count) */ \
198 : prvmask=mask; \
199 : im[icnt] = mask; \
200 : icnt++; \
201 : if ((dcnt > 0) && !(dict[dcnt-1].repeat) && \
202 : (dict[dcnt-1].cnt < (IMPS_MAX_CNT-1))) { \
203 : dict[dcnt-1].cnt++; \
204 : } else { \
205 : dict[dcnt].cnt = 1; \
206 : dict[dcnt].repeat = 0; \
207 : dict[dcnt].flags = 0; \
208 : dcnt++; \
209 : } \
210 : } \
211 : } \
212 : } while (0)
213 :
214 : static void
215 52 : imprints_create(BAT *b, BATiter *bi, void *inbins, BUN *stats, bte bits,
216 : void *imps, BUN *impcnt, cchdc_t *dict, BUN *dictcnt)
217 : {
218 52 : BUN i;
219 52 : BUN dcnt, icnt;
220 52 : BUN *restrict min_bins = stats;
221 52 : BUN *restrict max_bins = min_bins + 64;
222 52 : BUN *restrict cnt_bins = max_bins + 64;
223 52 : int bin = 0;
224 52 : dcnt = icnt = 0;
225 : #ifndef NDEBUG
226 52 : memset(min_bins, 0, 64 * SIZEOF_BUN);
227 52 : memset(max_bins, 0, 64 * SIZEOF_BUN);
228 : #endif
229 52 : memset(cnt_bins, 0, 64 * SIZEOF_BUN);
230 :
231 93 : switch (ATOMbasetype(b->ttype)) {
232 3 : case TYPE_bte:
233 69 : BINSIZE(bits, IMPS_CREATE, bte);
234 : break;
235 1 : case TYPE_sht:
236 34 : BINSIZE(bits, IMPS_CREATE, sht);
237 : break;
238 13 : case TYPE_int:
239 367 : BINSIZE(bits, IMPS_CREATE, int);
240 : break;
241 25 : case TYPE_lng:
242 64125979 : BINSIZE(bits, IMPS_CREATE, lng);
243 : break;
244 : #ifdef HAVE_HGE
245 5 : case TYPE_hge:
246 750 : BINSIZE(bits, IMPS_CREATE, hge);
247 : break;
248 : #endif
249 3 : case TYPE_flt:
250 102 : BINSIZE(bits, IMPS_CREATE, flt);
251 : break;
252 2 : case TYPE_dbl:
253 68 : BINSIZE(bits, IMPS_CREATE, dbl);
254 : break;
255 : default:
256 : /* should never reach here */
257 0 : MT_UNREACHABLE();
258 : }
259 :
260 52 : *dictcnt = dcnt;
261 52 : *impcnt = icnt;
262 52 : }
263 :
264 : #ifdef NDEBUG
265 : #define CLRMEM() ((void) 0)
266 : #else
267 : #define CLRMEM() while (k < 64) h[k++] = 0
268 : #endif
269 :
270 : #define FILL_HISTOGRAM(TYPE) \
271 : do { \
272 : BUN k; \
273 : TYPE *restrict s = (TYPE *) Tloc(s4, 0); \
274 : TYPE *restrict h = imprints->bins; \
275 : if (cnt < 64-1) { \
276 : TYPE max = GDK_##TYPE##_max; \
277 : for (k = 0; k < cnt; k++) \
278 : h[k] = s[k]; \
279 : while (k < (BUN) imprints->bits) \
280 : h[k++] = max; \
281 : CLRMEM(); \
282 : } else { \
283 : double y, ystep = (double) cnt / (64 - 1); \
284 : for (k = 0, y = 0; (BUN) y < cnt; y += ystep, k++) \
285 : h[k] = s[(BUN) y]; \
286 : if (k == 64 - 1) /* there is one left */ \
287 : h[k] = s[cnt - 1]; \
288 : } \
289 : } while (0)
290 :
291 : /* Check whether we have imprints on b (and return true if we do). It
292 : * may be that the imprints were made persistent, but we hadn't seen
293 : * that yet, so check the file system. This also returns true if b is
294 : * a view and there are imprints on b's parent.
295 : *
296 : * Note that the b->timprints pointer can be NULL, meaning there are
297 : * no imprints; (Imprints *) 1, meaning there are no imprints loaded,
298 : * but they may exist on disk; or a valid pointer to loaded imprints.
299 : * These values are maintained here, in the IMPSdestroy and IMPSfree
300 : * functions, and in BBPdiskscan during initialization. */
301 : bool
302 176 : BATcheckimprints(BAT *b)
303 : {
304 176 : bool ret;
305 176 : BATiter bi = bat_iterator(b);
306 :
307 176 : if (VIEWtparent(b)) {
308 25 : assert(b->timprints == NULL);
309 25 : b = BATdescriptor(VIEWtparent(b));
310 25 : if (b == NULL) {
311 0 : bat_iterator_end(&bi);
312 0 : return false;
313 : }
314 : }
315 :
316 176 : MT_lock_set(&b->batIdxLock);
317 176 : if (b->timprints == (Imprints *) 1) {
318 0 : Imprints *imprints;
319 0 : const char *nme = BBP_physical(b->batCacheid);
320 :
321 0 : assert(!GDKinmemory(bi.h->farmid));
322 0 : b->timprints = NULL;
323 0 : if ((imprints = GDKzalloc(sizeof(Imprints))) != NULL &&
324 0 : (imprints->imprints.farmid = BBPselectfarm(b->batRole, bi.type, imprintsheap)) >= 0) {
325 0 : int fd;
326 :
327 0 : strconcat_len(imprints->imprints.filename,
328 : sizeof(imprints->imprints.filename),
329 : nme, ".timprints", NULL);
330 0 : imprints->imprints.storage = imprints->imprints.newstorage = STORE_INVALID;
331 : /* check whether a persisted imprints index
332 : * can be found */
333 0 : if ((fd = GDKfdlocate(imprints->imprints.farmid, nme, "rb", "timprints")) >= 0) {
334 0 : size_t hdata[4];
335 0 : struct stat st;
336 0 : size_t pages;
337 :
338 0 : pages = (((size_t) bi.count * bi.width) + IMPS_PAGE - 1) / IMPS_PAGE;
339 0 : if (read(fd, hdata, sizeof(hdata)) == sizeof(hdata) &&
340 0 : hdata[0] & ((size_t) 1 << 16) &&
341 0 : ((hdata[0] & 0xFF00) >> 8) == IMPRINTS_VERSION &&
342 0 : hdata[3] == (size_t) bi.count &&
343 0 : fstat(fd, &st) == 0 &&
344 0 : st.st_size >= (off_t) (imprints->imprints.size =
345 0 : imprints->imprints.free =
346 0 : 64 * bi.width +
347 : 64 * 3 * SIZEOF_BUN +
348 0 : pages * ((bte) hdata[0] / 8) +
349 0 : hdata[2] * sizeof(cchdc_t) +
350 : sizeof(uint64_t) /* padding for alignment */
351 0 : + 4 * SIZEOF_SIZE_T) &&
352 0 : HEAPload(&imprints->imprints, nme, "timprints", false) == GDK_SUCCEED) {
353 : /* usable */
354 0 : imprints->bits = (bte) (hdata[0] & 0xFF);
355 0 : imprints->impcnt = (BUN) hdata[1];
356 0 : imprints->dictcnt = (BUN) hdata[2];
357 0 : imprints->bins = imprints->imprints.base + 4 * SIZEOF_SIZE_T;
358 0 : imprints->stats = (BUN *) ((char *) imprints->bins + 64 * bi.width);
359 0 : imprints->imps = (void *) (imprints->stats + 64 * 3);
360 0 : imprints->dict = (void *) ((uintptr_t) ((char *) imprints->imps + pages * (imprints->bits / 8) + sizeof(uint64_t)) & ~(sizeof(uint64_t) - 1));
361 0 : close(fd);
362 0 : imprints->imprints.parentid = b->batCacheid;
363 0 : imprints->imprints.hasfile = true;
364 0 : ATOMIC_INIT(&imprints->imprints.refs, 1);
365 0 : b->timprints = imprints;
366 0 : TRC_DEBUG(ACCELERATOR, ALGOBATFMT " reusing persisted imprints\n", ALGOBATPAR(b));
367 0 : MT_lock_unset(&b->batIdxLock);
368 0 : if (bi.b != b)
369 0 : BBPunfix(b->batCacheid);
370 0 : bat_iterator_end(&bi);
371 0 : return true;
372 : }
373 0 : close(fd);
374 : /* unlink unusable file */
375 0 : GDKunlink(imprints->imprints.farmid, BATDIR, nme, "timprints");
376 0 : imprints->imprints.hasfile = false;
377 : }
378 : }
379 0 : GDKfree(imprints);
380 0 : GDKclrerr(); /* we're not currently interested in errors */
381 : }
382 176 : MT_lock_unset(&b->batIdxLock);
383 176 : if (bi.b != b)
384 25 : BBPunfix(b->batCacheid);
385 176 : bat_iterator_end(&bi);
386 176 : ret = b->timprints != NULL;
387 176 : if( ret)
388 0 : TRC_DEBUG(ACCELERATOR, ALGOBATFMT " already has imprints\n", ALGOBATPAR(b));
389 : return ret;
390 : }
391 :
392 : static void
393 41 : BATimpsync(void *arg)
394 : {
395 41 : BAT *b = arg;
396 41 : Imprints *imprints;
397 41 : int fd;
398 41 : lng t0 = GDKusec();
399 41 : const char *failed = " failed";
400 :
401 41 : MT_lock_set(&b->batIdxLock);
402 41 : if ((imprints = b->timprints) != NULL) {
403 41 : Heap *hp = &imprints->imprints;
404 41 : if (HEAPsave(hp, hp->filename, NULL, true, hp->free, NULL) == GDK_SUCCEED) {
405 41 : if (hp->storage == STORE_MEM) {
406 41 : if ((fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) >= 0) {
407 : /* add version number */
408 41 : ((size_t *) hp->base)[0] |= (size_t) IMPRINTS_VERSION << 8;
409 : /* sync-on-disk checked bit */
410 41 : ((size_t *) hp->base)[0] |= (size_t) 1 << 16;
411 41 : if (write(fd, hp->base, SIZEOF_SIZE_T) >= 0) {
412 41 : failed = ""; /* not failed */
413 41 : if (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)) {
414 : #if defined(NATIVE_WIN32)
415 : _commit(fd);
416 : #elif defined(HAVE_FDATASYNC)
417 0 : fdatasync(fd);
418 : #elif defined(HAVE_FSYNC)
419 : fsync(fd);
420 : #endif
421 : }
422 41 : hp->dirty = false;
423 : } else {
424 0 : failed = " write failed";
425 0 : perror("write hash");
426 : }
427 41 : close(fd);
428 : }
429 : } else {
430 : /* add version number */
431 0 : ((size_t *) hp->base)[0] |= (size_t) IMPRINTS_VERSION << 8;
432 : /* sync-on-disk checked bit */
433 0 : ((size_t *) hp->base)[0] |= (size_t) 1 << 16;
434 0 : if (!(ATOMIC_GET(&GDKdebug) & NOSYNCMASK) &&
435 0 : MT_msync(hp->base, SIZEOF_SIZE_T) < 0) {
436 0 : failed = " sync failed";
437 0 : ((size_t *) hp->base)[0] &= ~((size_t) IMPRINTS_VERSION << 8);
438 : } else {
439 0 : hp->dirty = false;
440 0 : failed = ""; /* not failed */
441 : }
442 : }
443 41 : TRC_DEBUG(ACCELERATOR, ALGOBATFMT " imprints persisted "
444 : "(" LLFMT " usec)%s\n", ALGOBATPAR(b),
445 : GDKusec() - t0, failed);
446 : }
447 : }
448 41 : MT_lock_unset(&b->batIdxLock);
449 41 : BBPunfix(b->batCacheid);
450 41 : }
451 :
452 : gdk_return
453 61 : BATimprints(BAT *b)
454 : {
455 61 : BAT *s1 = NULL, *s2 = NULL, *s3 = NULL, *s4 = NULL;
456 61 : bat unfix = 0;
457 61 : Imprints *imprints;
458 61 : BATiter bi;
459 61 : lng t0 = GDKusec();
460 :
461 61 : BATcheck(b, GDK_FAIL);
462 :
463 : /* we only create imprints for types that look like types we know */
464 61 : if (!imprintable(b->ttype)) {
465 : /* doesn't look enough like base type: do nothing */
466 9 : GDKerror("unsupported type\n");
467 9 : return GDK_FAIL;
468 : }
469 :
470 52 : if (BATcheckimprints(b))
471 : return GDK_SUCCEED;
472 :
473 52 : if (VIEWtparent(b)) {
474 : /* views always keep null pointer and need to obtain
475 : * the latest imprint from the parent at query time */
476 0 : s2 = b; /* remember for ACCELDEBUG print */
477 0 : b = BATdescriptor(VIEWtparent(b));
478 0 : if (b == NULL)
479 : return GDK_FAIL;
480 0 : unfix = b->batCacheid; /* bat to be unfixed */
481 0 : if (BATcheckimprints(b)) {
482 0 : BBPunfix(unfix);
483 0 : return GDK_SUCCEED;
484 : }
485 : }
486 52 : bi = bat_iterator(b);
487 52 : MT_lock_set(&b->batIdxLock);
488 :
489 52 : if (b->timprints == NULL) {
490 52 : BUN cnt;
491 52 : const char *nme = GDKinmemory(bi.h->farmid) ? ":memory:" : BBP_physical(b->batCacheid);
492 52 : size_t pages;
493 :
494 52 : MT_lock_unset(&b->batIdxLock);
495 52 : MT_thread_setalgorithm("create imprints");
496 :
497 52 : if (s2)
498 0 : TRC_DEBUG(ACCELERATOR, ALGOBATFMT
499 : " creating imprints on parent "
500 : ALGOBATFMT "\n",
501 : ALGOBATPAR(s2), ALGOBATPAR(b));
502 : else
503 52 : TRC_DEBUG(ACCELERATOR, ALGOBATFMT
504 : " creating imprints\n",
505 : ALGOBATPAR(b));
506 :
507 52 : s2 = NULL;
508 :
509 52 : imprints = GDKzalloc(sizeof(Imprints));
510 52 : if (imprints == NULL) {
511 0 : bat_iterator_end(&bi);
512 0 : if (unfix)
513 0 : BBPunfix(unfix);
514 0 : return GDK_FAIL;
515 : }
516 52 : strconcat_len(imprints->imprints.filename,
517 : sizeof(imprints->imprints.filename),
518 : nme, ".timprints", NULL);
519 52 : pages = (((size_t) bi.count * bi.width) + IMPS_PAGE - 1) / IMPS_PAGE;
520 52 : imprints->imprints.farmid = BBPselectfarm(b->batRole, bi.type,
521 : imprintsheap);
522 52 : imprints->imprints.parentid = b->batCacheid;
523 :
524 : #define SMP_SIZE 2048
525 52 : s1 = BATsample(b, SMP_SIZE);
526 52 : if (s1 == NULL) {
527 0 : GDKfree(imprints);
528 0 : bat_iterator_end(&bi);
529 0 : if (unfix)
530 0 : BBPunfix(unfix);
531 0 : return GDK_FAIL;
532 : }
533 52 : s2 = BATunique(b, s1);
534 52 : if (s2 == NULL) {
535 0 : BBPunfix(s1->batCacheid);
536 0 : GDKfree(imprints);
537 0 : bat_iterator_end(&bi);
538 0 : if (unfix)
539 0 : BBPunfix(unfix);
540 0 : return GDK_FAIL;
541 : }
542 52 : s3 = BATproject(s2, b);
543 52 : if (s3 == NULL) {
544 0 : BBPunfix(s1->batCacheid);
545 0 : BBPunfix(s2->batCacheid);
546 0 : GDKfree(imprints);
547 0 : bat_iterator_end(&bi);
548 0 : if (unfix)
549 0 : BBPunfix(unfix);
550 0 : return GDK_FAIL;
551 : }
552 52 : s3->tkey = true; /* we know is unique on tail now */
553 52 : if (BATsort(&s4, NULL, NULL, s3, NULL, NULL, false, false, false) != GDK_SUCCEED) {
554 0 : BBPunfix(s1->batCacheid);
555 0 : BBPunfix(s2->batCacheid);
556 0 : BBPunfix(s3->batCacheid);
557 0 : GDKfree(imprints);
558 0 : bat_iterator_end(&bi);
559 0 : if (unfix)
560 0 : BBPunfix(unfix);
561 0 : return GDK_FAIL;
562 : }
563 : /* s4 now is ordered and unique on tail */
564 52 : assert(s4->tkey && s4->tsorted);
565 52 : cnt = BATcount(s4);
566 52 : imprints->bits = 64;
567 52 : if (cnt <= 32)
568 51 : imprints->bits = 32;
569 51 : if (cnt <= 16)
570 51 : imprints->bits = 16;
571 51 : if (cnt <= 8)
572 51 : imprints->bits = 8;
573 :
574 : /* The heap we create here consists of four parts:
575 : * bins, max 64 entries with bin boundaries, domain of b;
576 : * stats, min/max/count for each bin, min/max are oid, and count BUN;
577 : * imps, max one entry per "page", entry is "bits" wide;
578 : * dict, max two entries per three "pages".
579 : * In addition, we add some housekeeping entries at
580 : * the start so that we can determine whether we can
581 : * trust the imprints when encountered on startup (including
582 : * a version number -- CURRENT VERSION is 2). */
583 52 : MT_lock_set(&b->batIdxLock);
584 104 : if (b->timprints != NULL ||
585 52 : HEAPalloc(&imprints->imprints,
586 : IMPRINTS_HEADER_SIZE * SIZEOF_SIZE_T + /* extra info */
587 52 : 64 * bi.width + /* bins */
588 : 64 * 3 * SIZEOF_BUN + /* {min,max,cnt}_bins */
589 52 : pages * (imprints->bits / 8) + /* imps */
590 52 : sizeof(uint64_t) + /* padding for alignment */
591 : pages * sizeof(cchdc_t), /* dict */
592 : 1) != GDK_SUCCEED) {
593 0 : MT_lock_unset(&b->batIdxLock);
594 0 : bat_iterator_end(&bi);
595 0 : GDKfree(imprints);
596 0 : BBPunfix(s1->batCacheid);
597 0 : BBPunfix(s2->batCacheid);
598 0 : BBPunfix(s3->batCacheid);
599 0 : BBPunfix(s4->batCacheid);
600 0 : if (b->timprints != NULL) {
601 0 : if (unfix)
602 0 : BBPunfix(unfix);
603 0 : return GDK_SUCCEED; /* we were beaten to it */
604 : }
605 0 : if (unfix)
606 0 : BBPunfix(unfix);
607 0 : return GDK_FAIL;
608 : }
609 52 : imprints->bins = imprints->imprints.base + IMPRINTS_HEADER_SIZE * SIZEOF_SIZE_T;
610 52 : imprints->stats = (BUN *) ((char *) imprints->bins + 64 * bi.width);
611 52 : imprints->imps = (void *) (imprints->stats + 64 * 3);
612 52 : imprints->dict = (void *) ((uintptr_t) ((char *) imprints->imps + pages * (imprints->bits / 8) + sizeof(uint64_t)) & ~(sizeof(uint64_t) - 1));
613 :
614 93 : switch (ATOMbasetype(bi.type)) {
615 3 : case TYPE_bte:
616 195 : FILL_HISTOGRAM(bte);
617 : break;
618 1 : case TYPE_sht:
619 65 : FILL_HISTOGRAM(sht);
620 : break;
621 13 : case TYPE_int:
622 845 : FILL_HISTOGRAM(int);
623 : break;
624 25 : case TYPE_lng:
625 1625 : FILL_HISTOGRAM(lng);
626 : break;
627 : #ifdef HAVE_HGE
628 5 : case TYPE_hge:
629 325 : FILL_HISTOGRAM(hge);
630 : break;
631 : #endif
632 3 : case TYPE_flt:
633 195 : FILL_HISTOGRAM(flt);
634 : break;
635 2 : case TYPE_dbl:
636 130 : FILL_HISTOGRAM(dbl);
637 : break;
638 : default:
639 : /* should never reach here */
640 0 : MT_UNREACHABLE();
641 : }
642 :
643 52 : imprints_create(b, &bi,
644 : imprints->bins,
645 : imprints->stats,
646 52 : imprints->bits,
647 : imprints->imps,
648 : &imprints->impcnt,
649 52 : imprints->dict,
650 : &imprints->dictcnt);
651 52 : assert(imprints->impcnt <= pages);
652 52 : assert(imprints->dictcnt <= pages);
653 : #ifndef NDEBUG
654 52 : memset((char *) imprints->imps + imprints->impcnt * (imprints->bits / 8), 0, (char *) imprints->dict - ((char *) imprints->imps + imprints->impcnt * (imprints->bits / 8)));
655 : #endif
656 52 : imprints->imprints.free = (size_t) ((char *) ((cchdc_t *) imprints->dict + imprints->dictcnt) - imprints->imprints.base);
657 : /* add info to heap for when they become persistent */
658 52 : ((size_t *) imprints->imprints.base)[0] = (size_t) (imprints->bits);
659 52 : ((size_t *) imprints->imprints.base)[1] = (size_t) imprints->impcnt;
660 52 : ((size_t *) imprints->imprints.base)[2] = (size_t) imprints->dictcnt;
661 52 : ((size_t *) imprints->imprints.base)[3] = (size_t) bi.count;
662 52 : imprints->imprints.dirty = true;
663 52 : MT_lock_set(&b->theaplock);
664 52 : if (b->batCount != bi.count) {
665 : /* bat changed under our feet, can't use imprints */
666 0 : MT_lock_unset(&b->theaplock);
667 0 : MT_lock_unset(&b->batIdxLock);
668 0 : bat_iterator_end(&bi);
669 0 : HEAPfree(&imprints->imprints, true);
670 0 : GDKfree(imprints);
671 0 : BBPunfix(s1->batCacheid);
672 0 : BBPunfix(s2->batCacheid);
673 0 : BBPunfix(s3->batCacheid);
674 0 : BBPunfix(s4->batCacheid);
675 0 : GDKerror("Imprints creation aborted due to concurrent change to bat\n");
676 0 : TRC_DEBUG(ACCELERATOR, "failed imprints construction: bat %s changed, " LLFMT " usec\n", BATgetId(b), GDKusec() - t0);
677 0 : if (unfix)
678 0 : BBPunfix(unfix);
679 0 : return GDK_FAIL;
680 : }
681 52 : ATOMIC_INIT(&imprints->imprints.refs, 1);
682 52 : b->timprints = imprints;
683 52 : MT_lock_unset(&b->theaplock);
684 52 : if (BBP_status(b->batCacheid) & BBPEXISTING &&
685 83 : !b->theap->dirty &&
686 82 : !GDKinmemory(bi.h->farmid) &&
687 41 : !GDKinmemory(imprints->imprints.farmid)) {
688 41 : MT_Id tid;
689 41 : BBPfix(b->batCacheid);
690 41 : char name[MT_NAME_LEN];
691 41 : snprintf(name, sizeof(name), "impssync%d", b->batCacheid);
692 41 : if (MT_create_thread(&tid, BATimpsync, b,
693 : MT_THR_DETACHED, name) < 0)
694 0 : BBPunfix(b->batCacheid);
695 : }
696 : }
697 :
698 52 : TRC_DEBUG(ACCELERATOR, "%s: imprints construction " LLFMT " usec\n",
699 : BATgetId(b), GDKusec() - t0);
700 52 : MT_lock_unset(&b->batIdxLock);
701 52 : bat_iterator_end(&bi);
702 :
703 : /* BBPUnfix tries to get the imprints lock which might lead to
704 : * a deadlock if those were unfixed earlier */
705 52 : if (s1) {
706 52 : BBPunfix(s1->batCacheid);
707 52 : BBPunfix(s2->batCacheid);
708 52 : BBPunfix(s3->batCacheid);
709 52 : BBPunfix(s4->batCacheid);
710 : }
711 52 : if (unfix)
712 0 : BBPunfix(unfix);
713 : return GDK_SUCCEED;
714 : }
715 :
716 : #define getbin(TYPE,B) \
717 : do { \
718 : const TYPE val = * (TYPE *) v; \
719 : GETBIN(ret,val,B); \
720 : } while (0)
721 :
722 : int
723 0 : IMPSgetbin(int tpe, bte bits, const char *restrict inbins, const void *restrict v)
724 : {
725 0 : int ret = -1;
726 :
727 0 : switch (tpe) {
728 0 : case TYPE_bte: {
729 0 : const bte *restrict bins = (bte *) inbins;
730 0 : BINSIZE(bits, getbin, bte);
731 : break;
732 : }
733 0 : case TYPE_sht: {
734 0 : const sht *restrict bins = (sht *) inbins;
735 0 : BINSIZE(bits, getbin, sht);
736 : break;
737 : }
738 0 : case TYPE_int: {
739 0 : const int *restrict bins = (int *) inbins;
740 0 : BINSIZE(bits, getbin, int);
741 : break;
742 : }
743 0 : case TYPE_lng: {
744 0 : const lng *restrict bins = (lng *) inbins;
745 0 : BINSIZE(bits, getbin, lng);
746 : break;
747 : }
748 : #ifdef HAVE_HGE
749 0 : case TYPE_hge: {
750 0 : const hge *restrict bins = (hge *) inbins;
751 0 : BINSIZE(bits, getbin, hge);
752 : break;
753 : }
754 : #endif
755 0 : case TYPE_flt: {
756 0 : const flt *restrict bins = (flt *) inbins;
757 0 : BINSIZE(bits, getbin, flt);
758 : break;
759 : }
760 0 : case TYPE_dbl: {
761 0 : const dbl *restrict bins = (dbl *) inbins;
762 0 : BINSIZE(bits, getbin, dbl);
763 : break;
764 : }
765 : default:
766 0 : MT_UNREACHABLE();
767 : }
768 0 : return ret;
769 : }
770 :
771 : lng
772 7235387 : IMPSimprintsize(BAT *b)
773 : {
774 7235387 : lng sz = 0;
775 7235387 : MT_lock_set(&b->batIdxLock);
776 7231298 : if (b->timprints && b->timprints != (Imprints *) 1) {
777 0 : sz = (lng) b->timprints->imprints.free;
778 : }
779 7231298 : MT_lock_unset(&b->batIdxLock);
780 7234998 : return sz;
781 : }
782 :
783 : void
784 76055121 : IMPSdestroy(BAT *b)
785 : {
786 76055121 : MT_lock_set(&b->batIdxLock);
787 76020481 : if (b->timprints == (Imprints *) 1) {
788 0 : b->timprints = NULL;
789 0 : GDKunlink(BBPselectfarm(b->batRole, b->ttype, imprintsheap),
790 : BATDIR,
791 0 : BBP_physical(b->batCacheid),
792 : "timprints");
793 76020481 : } else if (b->timprints != NULL) {
794 48 : IMPSdecref(b->timprints, b->timprints->imprints.parentid == b->batCacheid);
795 48 : b->timprints = NULL;
796 : }
797 76020481 : MT_lock_unset(&b->batIdxLock);
798 76067431 : }
799 :
800 : /* free the memory associated with the imprints, do not remove the
801 : * heap files; indicate that imprints are available on disk by setting
802 : * the imprints pointer to 1 */
803 : void
804 357046 : IMPSfree(BAT *b)
805 : {
806 357046 : Imprints *imprints;
807 :
808 357046 : MT_lock_set(&b->batIdxLock);
809 357046 : imprints = b->timprints;
810 357046 : if (imprints != NULL && imprints != (Imprints *) 1) {
811 4 : if (GDKinmemory(imprints->imprints.farmid)) {
812 0 : b->timprints = NULL;
813 0 : IMPSdecref(imprints, imprints->imprints.parentid == b->batCacheid);
814 : } else {
815 4 : if (imprints->imprints.parentid == b->batCacheid)
816 4 : b->timprints = (Imprints *) 1;
817 : else
818 0 : b->timprints = NULL;
819 4 : IMPSdecref(imprints, false);
820 : }
821 : }
822 357046 : MT_lock_unset(&b->batIdxLock);
823 357046 : }
824 :
825 : void
826 52 : IMPSdecref(Imprints *imprints, bool remove)
827 : {
828 52 : TRC_DEBUG(ACCELERATOR, "Decrement ref count of %s\n", imprints->imprints.filename);
829 52 : if (remove)
830 48 : ATOMIC_OR(&imprints->imprints.refs, HEAPREMOVE);
831 52 : ATOMIC_BASE_TYPE refs = ATOMIC_DEC(&imprints->imprints.refs);
832 52 : if ((refs & HEAPREFS) == 0) {
833 52 : HEAPfree(&imprints->imprints, (bool) (refs & HEAPREMOVE));
834 52 : GDKfree(imprints);
835 : }
836 52 : }
837 :
838 : void
839 0 : IMPSincref(Imprints *imprints)
840 : {
841 0 : TRC_DEBUG(ACCELERATOR, "Increment ref count of %s\n", imprints->imprints.filename);
842 0 : ATOMIC_INC(&imprints->imprints.refs);
843 0 : }
844 :
845 : #ifndef NDEBUG
846 : /* never called, useful for debugging */
847 :
848 : #define IMPSPRNTMASK(T, B) \
849 : do { \
850 : uint##B##_t *restrict im = (uint##B##_t *) imprints->imps; \
851 : for (j = 0; j < imprints->bits; j++) \
852 : s[j] = IMPSisSet(B, im[icnt], j) ? 'x' : '.'; \
853 : s[j] = '\0'; \
854 : } while (0)
855 :
856 : /* function used for debugging */
857 : void
858 0 : IMPSprint(BAT *b)
859 : {
860 0 : Imprints *imprints;
861 0 : cchdc_t *restrict d;
862 0 : char s[65]; /* max number of bits + 1 */
863 0 : BUN icnt, dcnt, l, pages;
864 0 : BUN *restrict min_bins, *restrict max_bins;
865 0 : BUN *restrict cnt_bins;
866 0 : bte j;
867 0 : int i;
868 :
869 0 : if (!BATcheckimprints(b)) {
870 0 : printf("No imprint\n");
871 0 : return;
872 : }
873 0 : imprints = b->timprints;
874 0 : d = (cchdc_t *) imprints->dict;
875 0 : min_bins = imprints->stats;
876 0 : max_bins = min_bins + 64;
877 0 : cnt_bins = max_bins + 64;
878 :
879 0 : printf("bits = %d, impcnt = " BUNFMT ", dictcnt = " BUNFMT "\n",
880 0 : imprints->bits, imprints->impcnt, imprints->dictcnt);
881 0 : printf("MIN\n");
882 0 : for (i = 0; i < imprints->bits; i++) {
883 0 : printf("[ " BUNFMT " ]\n", min_bins[i]);
884 : }
885 :
886 0 : printf("MAX\n");
887 0 : for (i = 0; i < imprints->bits; i++) {
888 0 : printf("[ " BUNFMT " ]\n", max_bins[i]);
889 : }
890 0 : printf("COUNT\n");
891 0 : for (i = 0; i < imprints->bits; i++) {
892 0 : printf("[ " BUNFMT " ]\n", cnt_bins[i]);
893 : }
894 0 : for (dcnt = 0, icnt = 0, pages = 1; dcnt < imprints->dictcnt; dcnt++) {
895 0 : if (d[dcnt].repeat) {
896 0 : BINSIZE(imprints->bits, IMPSPRNTMASK, " ");
897 0 : pages += d[dcnt].cnt;
898 0 : printf("[ " BUNFMT " ]r %s\n", pages, s);
899 0 : icnt++;
900 : } else {
901 0 : l = icnt + d[dcnt].cnt;
902 0 : for (; icnt < l; icnt++) {
903 0 : BINSIZE(imprints->bits, IMPSPRNTMASK, " ");
904 0 : printf("[ " BUNFMT " ] %s\n", pages++, s);
905 : }
906 : }
907 : }
908 0 : fflush(stdout);
909 : }
910 : #endif
|