Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 : #include "gdk_calc_private.h"
17 :
18 : #define VALUE(x) (vars ? vars + VarHeapVal(vals, (x), width) : vals + (x) * width)
19 : /* BATunique returns a bat that indicates the unique tail values of
20 : * the input bat. This is essentially the same output as the
21 : * "extents" output of BATgroup. The difference is that BATunique
22 : * does not return the grouping bat.
23 : *
24 : * The first input is the bat from which unique rows are selected, the
25 : * second input is an optional candidate list.
26 : *
27 : * The return value is a candidate list.
28 : */
29 : BAT *
30 1453 : BATunique(BAT *b, BAT *s)
31 : {
32 1453 : BAT *bn;
33 1453 : const void *v;
34 1453 : const char *vals;
35 1453 : const char *vars;
36 1453 : int width;
37 1453 : oid i, o, hseq;
38 1453 : const char *nme;
39 1453 : Hash *hs = NULL;
40 1453 : BUN hb;
41 1453 : int (*cmp)(const void *, const void *);
42 1453 : struct canditer ci;
43 1453 : const char *algomsg = "";
44 1453 : lng t0 = 0;
45 :
46 1453 : QryCtx *qry_ctx = MT_thread_get_qry_ctx();
47 :
48 1453 : TRC_DEBUG_IF(ALGO) t0 = GDKusec();
49 :
50 1453 : BATcheck(b, NULL);
51 1453 : canditer_init(&ci, b, s);
52 :
53 1453 : (void) BATordered(b);
54 1453 : (void) BATordered_rev(b);
55 1453 : BATiter bi = bat_iterator(b);
56 1453 : if (bi.key || ci.ncand <= 1 || BATtdensebi(&bi)) {
57 : /* trivial: already unique */
58 955 : bn = canditer_slice(&ci, 0, ci.ncand);
59 955 : bat_iterator_end(&bi);
60 955 : TRC_DEBUG(ALGO, "b=" ALGOBATFMT
61 : ",s=" ALGOOPTBATFMT " -> " ALGOOPTBATFMT
62 : " (already unique, slice candidates -- "
63 : LLFMT "usec)\n",
64 : ALGOBATPAR(b), ALGOOPTBATPAR(s),
65 : ALGOOPTBATPAR(bn), GDKusec() - t0);
66 955 : return bn;
67 : }
68 :
69 498 : if ((bi.sorted && bi.revsorted) ||
70 433 : (bi.type == TYPE_void && is_oid_nil(b->tseqbase))) {
71 : /* trivial: all values are the same */
72 65 : bn = BATdense(0, ci.seq, 1);
73 65 : bat_iterator_end(&bi);
74 65 : TRC_DEBUG(ALGO, "b=" ALGOBATFMT
75 : ",s=" ALGOOPTBATFMT " -> " ALGOOPTBATFMT
76 : " (all equal -- "
77 : LLFMT "usec)\n",
78 : ALGOBATPAR(b), ALGOOPTBATPAR(s),
79 : ALGOOPTBATPAR(bn), GDKusec() - t0);
80 65 : return bn;
81 : }
82 :
83 433 : assert(bi.type != TYPE_void);
84 :
85 433 : BUN initsize = BUN_NONE;
86 433 : if (s == NULL) {
87 271 : MT_rwlock_rdlock(&b->thashlock);
88 271 : if (b->thash != NULL && b->thash != (Hash *) 1)
89 5 : initsize = b->thash->nunique;
90 271 : MT_rwlock_rdunlock(&b->thashlock);
91 271 : if (initsize == BUN_NONE && bi.unique_est != 0)
92 154 : initsize = (BUN) bi.unique_est;
93 : }
94 266 : if (initsize == BUN_NONE)
95 : initsize = 1024;
96 433 : bn = COLnew(0, TYPE_oid, initsize, TRANSIENT);
97 433 : if (bn == NULL) {
98 0 : bat_iterator_end(&bi);
99 0 : return NULL;
100 : }
101 433 : vals = bi.base;
102 433 : if (bi.vh && bi.type)
103 95 : vars = bi.vh->base;
104 : else
105 : vars = NULL;
106 433 : width = bi.width;
107 433 : cmp = ATOMcompare(bi.type);
108 433 : hseq = b->hseqbase;
109 :
110 433 : if (ATOMbasetype(bi.type) == TYPE_bte ||
111 55 : (bi.width == 1 &&
112 55 : ATOMstorage(bi.type) == TYPE_str &&
113 55 : GDK_ELIMDOUBLES(bi.vh))) {
114 70 : uint8_t val;
115 :
116 70 : algomsg = "unique: byte-sized atoms";
117 70 : uint32_t seen[256 >> 5];
118 70 : memset(seen, 0, sizeof(seen));
119 304270 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
120 304118 : o = canditer_next(&ci);
121 304118 : val = ((const uint8_t *) vals)[o - hseq];
122 304118 : uint32_t m = UINT32_C(1) << (val & 0x1F);
123 304118 : if (!(seen[val >> 5] & m)) {
124 322 : seen[val >> 5] |= m;
125 322 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
126 0 : goto bunins_failed;
127 322 : if (bn->batCount == 256) {
128 : /* there cannot be more than
129 : * 256 distinct values */
130 : break;
131 : }
132 : }
133 : }
134 70 : TIMEOUT_CHECK(qry_ctx,
135 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
136 363 : } else if (ATOMbasetype(bi.type) == TYPE_sht ||
137 33 : (bi.width == 2 &&
138 33 : ATOMstorage(bi.type) == TYPE_str &&
139 33 : GDK_ELIMDOUBLES(bi.vh))) {
140 40 : uint16_t val;
141 :
142 40 : algomsg = "unique: short-sized atoms";
143 40 : uint32_t seen[65536 >> 5];
144 40 : memset(seen, 0, sizeof(seen));
145 47470 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
146 47390 : o = canditer_next(&ci);
147 47390 : val = ((const uint16_t *) vals)[o - hseq];
148 47390 : uint32_t m = UINT32_C(1) << (val & 0x1F);
149 47390 : if (!(seen[val >> 5] & m)) {
150 4591 : seen[val >> 5] |= m;
151 4591 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
152 0 : goto bunins_failed;
153 4591 : if (bn->batCount == 65536) {
154 : /* there cannot be more than
155 : * 65536 distinct values */
156 : break;
157 : }
158 : }
159 : }
160 40 : TIMEOUT_CHECK(qry_ctx,
161 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
162 323 : } else if (bi.sorted || bi.revsorted) {
163 43 : const void *prev = NULL;
164 43 : algomsg = "unique: sorted";
165 237483 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
166 237342 : o = canditer_next(&ci);
167 237342 : v = VALUE(o - hseq);
168 237342 : if (prev == NULL || (*cmp)(v, prev) != 0) {
169 22853 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
170 0 : goto bunins_failed;
171 : }
172 237342 : prev = v;
173 : }
174 43 : TIMEOUT_CHECK(qry_ctx,
175 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
176 280 : } else if (BATcheckhash(b) ||
177 275 : ((!bi.transient ||
178 91 : (b->batRole == PERSISTENT && GDKinmemory(0))) &&
179 258 : ci.ncand == bi.count &&
180 74 : BAThash(b) == GDK_SUCCEED)) {
181 : /* we already have a hash table on b, or b is
182 : * persistent and we could create a hash table */
183 79 : algomsg = "unique: existing hash";
184 79 : MT_rwlock_rdlock(&b->thashlock);
185 79 : hs = b->thash;
186 79 : if (hs == NULL) {
187 0 : MT_rwlock_rdunlock(&b->thashlock);
188 0 : goto lost_hash;
189 : }
190 85042 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
191 84884 : BUN p;
192 :
193 84884 : o = canditer_next(&ci);
194 84884 : p = o - hseq;
195 84884 : v = VALUE(p);
196 84884 : for (hb = HASHgetlink(hs, p);
197 100299 : hb != BUN_NONE;
198 15415 : hb = HASHgetlink(hs, hb)) {
199 62100 : assert(hb < p);
200 62100 : if (cmp(v, BUNtail(bi, hb)) == 0 &&
201 46685 : canditer_contains(&ci, hb + hseq)) {
202 : /* we've seen this value
203 : * before */
204 : break;
205 : }
206 : }
207 84884 : if (hb == BUN_NONE) {
208 38199 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED) {
209 0 : MT_rwlock_rdunlock(&b->thashlock);
210 0 : hs = NULL;
211 0 : goto bunins_failed;
212 : }
213 : }
214 : }
215 79 : MT_rwlock_rdunlock(&b->thashlock);
216 79 : TIMEOUT_CHECK(qry_ctx,
217 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
218 : } else {
219 201 : BUN prb;
220 201 : BUN p;
221 201 : BUN mask;
222 :
223 201 : lost_hash:
224 201 : GDKclrerr(); /* not interested in BAThash errors */
225 201 : algomsg = "unique: new partial hash";
226 201 : nme = BBP_physical(b->batCacheid);
227 201 : if (ATOMbasetype(bi.type) == TYPE_bte) {
228 : mask = (BUN) 1 << 8;
229 : cmp = NULL; /* no compare needed, "hash" is perfect */
230 201 : } else if (ATOMbasetype(bi.type) == TYPE_sht) {
231 : mask = (BUN) 1 << 16;
232 : cmp = NULL; /* no compare needed, "hash" is perfect */
233 : } else {
234 201 : mask = HASHmask(ci.ncand);
235 18 : if (mask < ((BUN) 1 << 16))
236 195 : mask = (BUN) 1 << 16;
237 : }
238 201 : if ((hs = GDKzalloc(sizeof(Hash))) == NULL) {
239 0 : GDKerror("cannot allocate hash table\n");
240 0 : goto bunins_failed;
241 : }
242 201 : hs->heapbckt.parentid = b->batCacheid;
243 201 : hs->heaplink.parentid = b->batCacheid;
244 201 : if ((hs->heaplink.farmid = BBPselectfarm(TRANSIENT, bi.type, hashheap)) < 0 ||
245 201 : (hs->heapbckt.farmid = BBPselectfarm(TRANSIENT, bi.type, hashheap)) < 0 ||
246 201 : snprintf(hs->heaplink.filename, sizeof(hs->heaplink.filename), "%s.thshunil%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs->heaplink.filename) ||
247 402 : snprintf(hs->heapbckt.filename, sizeof(hs->heapbckt.filename), "%s.thshunib%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs->heapbckt.filename) ||
248 201 : HASHnew(hs, bi.type, BATcount(b), mask, BUN_NONE, false) != GDK_SUCCEED) {
249 0 : GDKfree(hs);
250 0 : hs = NULL;
251 0 : GDKerror("cannot allocate hash table\n");
252 0 : goto bunins_failed;
253 : }
254 516758 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
255 516329 : o = canditer_next(&ci);
256 496537 : v = VALUE(o - hseq);
257 496537 : prb = HASHprobe(hs, v);
258 496567 : for (hb = HASHget(hs, prb);
259 515570 : hb != BUN_NONE;
260 19003 : hb = HASHgetlink(hs, hb)) {
261 462904 : if (cmp == NULL || cmp(v, BUNtail(bi, hb)) == 0)
262 : break;
263 : }
264 515703 : if (hb == BUN_NONE) {
265 52718 : p = o - hseq;
266 52718 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
267 0 : goto bunins_failed;
268 : /* enter into hash table */
269 52718 : HASHputlink(hs, p, HASHget(hs, prb));
270 52893 : HASHput(hs, prb, p);
271 : }
272 : }
273 201 : TIMEOUT_CHECK(qry_ctx,
274 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
275 201 : HEAPfree(&hs->heaplink, true);
276 201 : HEAPfree(&hs->heapbckt, true);
277 201 : GDKfree(hs);
278 : }
279 433 : if (BATcount(bn) == bi.count) {
280 : /* it turns out all values are distinct */
281 77 : MT_lock_set(&b->theaplock);
282 77 : if (BATcount(b) == bi.count) {
283 : /* and the input hasn't changed in the mean
284 : * time--the only allowed change being appends;
285 : * updates not allowed since the candidate list
286 : * covers the complete bat */
287 77 : assert(b->tnokey[0] == 0);
288 77 : assert(b->tnokey[1] == 0);
289 77 : b->tkey = true;
290 : }
291 77 : MT_lock_unset(&b->theaplock);
292 : }
293 433 : bat_iterator_end(&bi);
294 :
295 433 : bn->theap->dirty = true;
296 433 : bn->tsorted = true;
297 433 : bn->trevsorted = BATcount(bn) <= 1;
298 433 : bn->tkey = true;
299 433 : bn->tnil = false;
300 433 : bn->tnonil = true;
301 433 : bn = virtualize(bn);
302 433 : MT_thread_setalgorithm(algomsg);
303 433 : TRC_DEBUG(ALGO, "b=" ALGOBATFMT
304 : ",s=" ALGOOPTBATFMT " -> " ALGOOPTBATFMT
305 : " (%s -- " LLFMT "usec)\n",
306 : ALGOBATPAR(b), ALGOOPTBATPAR(s),
307 : ALGOOPTBATPAR(bn), algomsg, GDKusec() - t0);
308 : return bn;
309 :
310 0 : bunins_failed:
311 0 : bat_iterator_end(&bi);
312 0 : if (hs != NULL) {
313 0 : HEAPfree(&hs->heaplink, true);
314 0 : HEAPfree(&hs->heapbckt, true);
315 0 : GDKfree(hs);
316 : }
317 0 : BBPreclaim(bn);
318 0 : return NULL;
319 : }
|