Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 : #include "gdk_calc_private.h"
17 :
18 : #define VALUE(x) (vars ? vars + VarHeapVal(vals, (x), width) : vals + (x) * width)
19 : /* BATunique returns a bat that indicates the unique tail values of
20 : * the input bat. This is essentially the same output as the
21 : * "extents" output of BATgroup. The difference is that BATunique
22 : * does not return the grouping bat.
23 : *
24 : * The first input is the bat from which unique rows are selected, the
25 : * second input is an optional candidate list.
26 : *
27 : * The return value is a candidate list.
28 : */
29 : BAT *
30 1455 : BATunique(BAT *b, BAT *s)
31 : {
32 1455 : BAT *bn;
33 1455 : const void *v;
34 1455 : const char *vals;
35 1455 : const char *vars;
36 1455 : int width;
37 1455 : oid i, o, hseq;
38 1455 : const char *nme;
39 1455 : Hash *hs = NULL;
40 1455 : BUN hb;
41 1455 : int (*cmp)(const void *, const void *);
42 1455 : struct canditer ci;
43 1455 : const char *algomsg = "";
44 1455 : lng t0 = 0;
45 :
46 1455 : QryCtx *qry_ctx = MT_thread_get_qry_ctx();
47 :
48 1455 : TRC_DEBUG_IF(ALGO) t0 = GDKusec();
49 :
50 1455 : BATcheck(b, NULL);
51 1455 : canditer_init(&ci, b, s);
52 :
53 1455 : (void) BATordered(b);
54 1455 : (void) BATordered_rev(b);
55 1455 : BATiter bi = bat_iterator(b);
56 1455 : if (bi.key || ci.ncand <= 1 || BATtdensebi(&bi)) {
57 : /* trivial: already unique */
58 955 : bn = canditer_slice(&ci, 0, ci.ncand);
59 955 : bat_iterator_end(&bi);
60 955 : TRC_DEBUG(ALGO, "b=" ALGOBATFMT
61 : ",s=" ALGOOPTBATFMT " -> " ALGOOPTBATFMT
62 : " (already unique, slice candidates -- "
63 : LLFMT "usec)\n",
64 : ALGOBATPAR(b), ALGOOPTBATPAR(s),
65 : ALGOOPTBATPAR(bn), GDKusec() - t0);
66 955 : return bn;
67 : }
68 :
69 500 : if ((bi.sorted && bi.revsorted) ||
70 435 : (bi.type == TYPE_void && is_oid_nil(b->tseqbase))) {
71 : /* trivial: all values are the same */
72 65 : bn = BATdense(0, ci.seq, 1);
73 65 : bat_iterator_end(&bi);
74 65 : TRC_DEBUG(ALGO, "b=" ALGOBATFMT
75 : ",s=" ALGOOPTBATFMT " -> " ALGOOPTBATFMT
76 : " (all equal -- "
77 : LLFMT "usec)\n",
78 : ALGOBATPAR(b), ALGOOPTBATPAR(s),
79 : ALGOOPTBATPAR(bn), GDKusec() - t0);
80 65 : return bn;
81 : }
82 :
83 435 : assert(bi.type != TYPE_void);
84 :
85 435 : BUN initsize = BUN_NONE;
86 435 : if (s == NULL) {
87 273 : MT_rwlock_rdlock(&b->thashlock);
88 273 : if (b->thash != NULL && b->thash != (Hash *) 1)
89 5 : initsize = b->thash->nunique;
90 273 : MT_rwlock_rdunlock(&b->thashlock);
91 273 : if (initsize == BUN_NONE && bi.unique_est != 0)
92 156 : initsize = (BUN) bi.unique_est;
93 : }
94 268 : if (initsize == BUN_NONE)
95 : initsize = 1024;
96 435 : bn = COLnew(0, TYPE_oid, initsize, TRANSIENT);
97 435 : if (bn == NULL) {
98 0 : bat_iterator_end(&bi);
99 0 : return NULL;
100 : }
101 435 : vals = bi.base;
102 435 : if (bi.vh && bi.type)
103 95 : vars = bi.vh->base;
104 : else
105 : vars = NULL;
106 435 : width = bi.width;
107 435 : cmp = ATOMcompare(bi.type);
108 435 : hseq = b->hseqbase;
109 :
110 435 : if (ATOMbasetype(bi.type) == TYPE_bte ||
111 55 : (bi.width == 1 &&
112 55 : ATOMstorage(bi.type) == TYPE_str &&
113 55 : GDK_ELIMDOUBLES(bi.vh))) {
114 72 : uint8_t val;
115 :
116 72 : algomsg = "unique: byte-sized atoms";
117 72 : uint32_t seen[256 >> 5];
118 72 : memset(seen, 0, sizeof(seen));
119 306783 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
120 306627 : o = canditer_next(&ci);
121 306627 : val = ((const uint8_t *) vals)[o - hseq];
122 306627 : uint32_t m = UINT32_C(1) << (val & 0x1F);
123 306627 : if (!(seen[val >> 5] & m)) {
124 328 : seen[val >> 5] |= m;
125 328 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
126 0 : goto bunins_failed;
127 328 : if (bn->batCount == 256) {
128 : /* there cannot be more than
129 : * 256 distinct values */
130 : break;
131 : }
132 : }
133 : }
134 72 : TIMEOUT_CHECK(qry_ctx,
135 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
136 363 : } else if (ATOMbasetype(bi.type) == TYPE_sht ||
137 34 : (bi.width == 2 &&
138 34 : ATOMstorage(bi.type) == TYPE_str &&
139 34 : GDK_ELIMDOUBLES(bi.vh))) {
140 40 : uint16_t val;
141 :
142 40 : algomsg = "unique: short-sized atoms";
143 40 : uint32_t seen[65536 >> 5];
144 40 : memset(seen, 0, sizeof(seen));
145 47444 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
146 47364 : o = canditer_next(&ci);
147 47364 : val = ((const uint16_t *) vals)[o - hseq];
148 47364 : uint32_t m = UINT32_C(1) << (val & 0x1F);
149 47364 : if (!(seen[val >> 5] & m)) {
150 4593 : seen[val >> 5] |= m;
151 4593 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
152 0 : goto bunins_failed;
153 4593 : if (bn->batCount == 65536) {
154 : /* there cannot be more than
155 : * 65536 distinct values */
156 : break;
157 : }
158 : }
159 : }
160 40 : TIMEOUT_CHECK(qry_ctx,
161 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
162 323 : } else if (bi.sorted || bi.revsorted) {
163 44 : const void *prev = NULL;
164 44 : algomsg = "unique: sorted";
165 238320 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
166 238176 : o = canditer_next(&ci);
167 238176 : v = VALUE(o - hseq);
168 238176 : if (prev == NULL || (*cmp)(v, prev) != 0) {
169 23089 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
170 0 : goto bunins_failed;
171 : }
172 238176 : prev = v;
173 : }
174 44 : TIMEOUT_CHECK(qry_ctx,
175 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
176 279 : } else if (BATcheckhash(b) ||
177 274 : ((!bi.transient ||
178 91 : (b->batRole == PERSISTENT && GDKinmemory(0))) &&
179 257 : ci.ncand == bi.count &&
180 74 : BAThash(b) == GDK_SUCCEED)) {
181 : /* we already have a hash table on b, or b is
182 : * persistent and we could create a hash table */
183 79 : algomsg = "unique: existing hash";
184 79 : MT_rwlock_rdlock(&b->thashlock);
185 79 : hs = b->thash;
186 79 : if (hs == NULL) {
187 0 : MT_rwlock_rdunlock(&b->thashlock);
188 0 : goto lost_hash;
189 : }
190 84994 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
191 84836 : BUN p;
192 :
193 84836 : o = canditer_next(&ci);
194 84836 : p = o - hseq;
195 84836 : v = VALUE(p);
196 84836 : for (hb = HASHgetlink(hs, p);
197 100327 : hb != BUN_NONE;
198 15491 : hb = HASHgetlink(hs, hb)) {
199 62124 : assert(hb < p);
200 62124 : if (cmp(v, BUNtail(bi, hb)) == 0 &&
201 46633 : canditer_contains(&ci, hb + hseq)) {
202 : /* we've seen this value
203 : * before */
204 : break;
205 : }
206 : }
207 84836 : if (hb == BUN_NONE) {
208 38203 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED) {
209 0 : MT_rwlock_rdunlock(&b->thashlock);
210 0 : hs = NULL;
211 0 : goto bunins_failed;
212 : }
213 : }
214 : }
215 79 : MT_rwlock_rdunlock(&b->thashlock);
216 79 : TIMEOUT_CHECK(qry_ctx,
217 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
218 : } else {
219 200 : BUN prb;
220 200 : BUN p;
221 200 : BUN mask;
222 :
223 200 : lost_hash:
224 200 : GDKclrerr(); /* not interested in BAThash errors */
225 200 : algomsg = "unique: new partial hash";
226 200 : nme = BBP_physical(b->batCacheid);
227 200 : if (ATOMbasetype(bi.type) == TYPE_bte) {
228 : mask = (BUN) 1 << 8;
229 : cmp = NULL; /* no compare needed, "hash" is perfect */
230 200 : } else if (ATOMbasetype(bi.type) == TYPE_sht) {
231 : mask = (BUN) 1 << 16;
232 : cmp = NULL; /* no compare needed, "hash" is perfect */
233 : } else {
234 200 : mask = HASHmask(ci.ncand);
235 17 : if (mask < ((BUN) 1 << 16))
236 194 : mask = (BUN) 1 << 16;
237 : }
238 200 : if ((hs = GDKzalloc(sizeof(Hash))) == NULL) {
239 0 : GDKerror("cannot allocate hash table\n");
240 0 : goto bunins_failed;
241 : }
242 200 : hs->heapbckt.parentid = b->batCacheid;
243 200 : hs->heaplink.parentid = b->batCacheid;
244 200 : if ((hs->heaplink.farmid = BBPselectfarm(TRANSIENT, bi.type, hashheap)) < 0 ||
245 200 : (hs->heapbckt.farmid = BBPselectfarm(TRANSIENT, bi.type, hashheap)) < 0 ||
246 200 : snprintf(hs->heaplink.filename, sizeof(hs->heaplink.filename), "%s.thshunil%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs->heaplink.filename) ||
247 400 : snprintf(hs->heapbckt.filename, sizeof(hs->heapbckt.filename), "%s.thshunib%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs->heapbckt.filename) ||
248 200 : HASHnew(hs, bi.type, BATcount(b), mask, BUN_NONE, false) != GDK_SUCCEED) {
249 0 : GDKfree(hs);
250 0 : hs = NULL;
251 0 : GDKerror("cannot allocate hash table\n");
252 0 : goto bunins_failed;
253 : }
254 521227 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
255 520800 : o = canditer_next(&ci);
256 499003 : v = VALUE(o - hseq);
257 499003 : prb = HASHprobe(hs, v);
258 499116 : for (hb = HASHget(hs, prb);
259 520209 : hb != BUN_NONE;
260 21093 : hb = HASHgetlink(hs, hb)) {
261 464589 : if (cmp == NULL || cmp(v, BUNtail(bi, hb)) == 0)
262 : break;
263 : }
264 520312 : if (hb == BUN_NONE) {
265 55663 : p = o - hseq;
266 55663 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
267 0 : goto bunins_failed;
268 : /* enter into hash table */
269 55663 : HASHputlink(hs, p, HASHget(hs, prb));
270 55817 : HASHput(hs, prb, p);
271 : }
272 : }
273 200 : TIMEOUT_CHECK(qry_ctx,
274 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
275 200 : HEAPfree(&hs->heaplink, true);
276 200 : HEAPfree(&hs->heapbckt, true);
277 200 : GDKfree(hs);
278 : }
279 435 : if (BATcount(bn) == bi.count) {
280 : /* it turns out all values are distinct */
281 77 : MT_lock_set(&b->theaplock);
282 77 : if (BATcount(b) == bi.count) {
283 : /* and the input hasn't changed in the mean
284 : * time--the only allowed change being appends;
285 : * updates not allowed since the candidate list
286 : * covers the complete bat */
287 77 : assert(b->tnokey[0] == 0);
288 77 : assert(b->tnokey[1] == 0);
289 77 : b->tkey = true;
290 : }
291 77 : MT_lock_unset(&b->theaplock);
292 : }
293 435 : bat_iterator_end(&bi);
294 :
295 435 : bn->theap->dirty = true;
296 435 : bn->tsorted = true;
297 435 : bn->trevsorted = BATcount(bn) <= 1;
298 435 : bn->tkey = true;
299 435 : bn->tnil = false;
300 435 : bn->tnonil = true;
301 435 : bn = virtualize(bn);
302 435 : MT_thread_setalgorithm(algomsg);
303 435 : TRC_DEBUG(ALGO, "b=" ALGOBATFMT
304 : ",s=" ALGOOPTBATFMT " -> " ALGOOPTBATFMT
305 : " (%s -- " LLFMT "usec)\n",
306 : ALGOBATPAR(b), ALGOOPTBATPAR(s),
307 : ALGOOPTBATPAR(bn), algomsg, GDKusec() - t0);
308 : return bn;
309 :
310 0 : bunins_failed:
311 0 : bat_iterator_end(&bi);
312 0 : if (hs != NULL) {
313 0 : HEAPfree(&hs->heaplink, true);
314 0 : HEAPfree(&hs->heapbckt, true);
315 0 : GDKfree(hs);
316 : }
317 0 : BBPreclaim(bn);
318 0 : return NULL;
319 : }
|