Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 : #include "gdk_calc_private.h"
17 :
18 : #define VALUE(x) (vars ? vars + VarHeapVal(vals, (x), width) : vals + (x) * width)
19 : /* BATunique returns a bat that indicates the unique tail values of
20 : * the input bat. This is essentially the same output as the
21 : * "extents" output of BATgroup. The difference is that BATunique
22 : * does not return the grouping bat.
23 : *
24 : * The first input is the bat from which unique rows are selected, the
25 : * second input is an optional candidate list.
26 : *
27 : * The return value is a candidate list.
28 : */
29 : BAT *
30 1394 : BATunique(BAT *b, BAT *s)
31 : {
32 1394 : BAT *bn;
33 1394 : const void *v;
34 1394 : const char *vals;
35 1394 : const char *vars;
36 1394 : int width;
37 1394 : oid i, o, hseq;
38 1394 : const char *nme;
39 1394 : Hash *hs = NULL;
40 1394 : BUN hb;
41 1394 : int (*cmp)(const void *, const void *);
42 1394 : struct canditer ci;
43 1394 : const char *algomsg = "";
44 1394 : lng t0 = 0;
45 :
46 1394 : QryCtx *qry_ctx = MT_thread_get_qry_ctx();
47 :
48 1394 : TRC_DEBUG_IF(ALGO) t0 = GDKusec();
49 :
50 1394 : BATcheck(b, NULL);
51 1394 : canditer_init(&ci, b, s);
52 :
53 1394 : (void) BATordered(b);
54 1394 : (void) BATordered_rev(b);
55 1394 : BATiter bi = bat_iterator(b);
56 1394 : if (bi.key || ci.ncand <= 1 || BATtdensebi(&bi)) {
57 : /* trivial: already unique */
58 879 : bn = canditer_slice(&ci, 0, ci.ncand);
59 879 : bat_iterator_end(&bi);
60 879 : TRC_DEBUG(ALGO, "b=" ALGOBATFMT
61 : ",s=" ALGOOPTBATFMT " -> " ALGOOPTBATFMT
62 : " (already unique, slice candidates -- "
63 : LLFMT "usec)\n",
64 : ALGOBATPAR(b), ALGOOPTBATPAR(s),
65 : ALGOOPTBATPAR(bn), GDKusec() - t0);
66 879 : return bn;
67 : }
68 :
69 515 : if ((bi.sorted && bi.revsorted) ||
70 455 : (bi.type == TYPE_void && is_oid_nil(b->tseqbase))) {
71 : /* trivial: all values are the same */
72 60 : bn = BATdense(0, ci.seq, 1);
73 60 : bat_iterator_end(&bi);
74 60 : TRC_DEBUG(ALGO, "b=" ALGOBATFMT
75 : ",s=" ALGOOPTBATFMT " -> " ALGOOPTBATFMT
76 : " (all equal -- "
77 : LLFMT "usec)\n",
78 : ALGOBATPAR(b), ALGOOPTBATPAR(s),
79 : ALGOOPTBATPAR(bn), GDKusec() - t0);
80 60 : return bn;
81 : }
82 :
83 455 : assert(bi.type != TYPE_void);
84 :
85 455 : BUN initsize = BUN_NONE;
86 455 : if (s == NULL) {
87 252 : MT_rwlock_rdlock(&b->thashlock);
88 252 : if (b->thash != NULL && b->thash != (Hash *) 1)
89 5 : initsize = b->thash->nunique;
90 252 : MT_rwlock_rdunlock(&b->thashlock);
91 252 : if (initsize == BUN_NONE && bi.unique_est != 0)
92 139 : initsize = (BUN) bi.unique_est;
93 : }
94 247 : if (initsize == BUN_NONE)
95 : initsize = 1024;
96 455 : bn = COLnew(0, TYPE_oid, initsize, TRANSIENT);
97 455 : if (bn == NULL) {
98 0 : bat_iterator_end(&bi);
99 0 : return NULL;
100 : }
101 455 : vals = bi.base;
102 455 : if (bi.vh && bi.type)
103 83 : vars = bi.vh->base;
104 : else
105 : vars = NULL;
106 455 : width = bi.width;
107 455 : cmp = ATOMcompare(bi.type);
108 455 : hseq = b->hseqbase;
109 :
110 455 : if (ATOMbasetype(bi.type) == TYPE_bte ||
111 43 : (bi.width == 1 &&
112 43 : ATOMstorage(bi.type) == TYPE_str &&
113 43 : GDK_ELIMDOUBLES(bi.vh))) {
114 60 : uint8_t val;
115 :
116 60 : algomsg = "unique: byte-sized atoms";
117 60 : uint32_t seen[256 >> 5];
118 60 : memset(seen, 0, sizeof(seen));
119 302423 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
120 302291 : o = canditer_next(&ci);
121 302291 : val = ((const uint8_t *) vals)[o - hseq];
122 302291 : uint32_t m = UINT32_C(1) << (val & 0x1F);
123 302291 : if (!(seen[val >> 5] & m)) {
124 270 : seen[val >> 5] |= m;
125 270 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
126 0 : goto bunins_failed;
127 270 : if (bn->batCount == 256) {
128 : /* there cannot be more than
129 : * 256 distinct values */
130 : break;
131 : }
132 : }
133 : }
134 60 : TIMEOUT_CHECK(qry_ctx,
135 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
136 395 : } else if (ATOMbasetype(bi.type) == TYPE_sht ||
137 34 : (bi.width == 2 &&
138 34 : ATOMstorage(bi.type) == TYPE_str &&
139 34 : GDK_ELIMDOUBLES(bi.vh))) {
140 41 : uint16_t val;
141 :
142 41 : algomsg = "unique: short-sized atoms";
143 41 : uint32_t seen[65536 >> 5];
144 41 : memset(seen, 0, sizeof(seen));
145 47234 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
146 47152 : o = canditer_next(&ci);
147 47152 : val = ((const uint16_t *) vals)[o - hseq];
148 47152 : uint32_t m = UINT32_C(1) << (val & 0x1F);
149 47152 : if (!(seen[val >> 5] & m)) {
150 4553 : seen[val >> 5] |= m;
151 4553 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
152 0 : goto bunins_failed;
153 4553 : if (bn->batCount == 65536) {
154 : /* there cannot be more than
155 : * 65536 distinct values */
156 : break;
157 : }
158 : }
159 : }
160 41 : TIMEOUT_CHECK(qry_ctx,
161 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
162 354 : } else if (bi.sorted || bi.revsorted) {
163 42 : const void *prev = NULL;
164 42 : algomsg = "unique: sorted";
165 238122 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
166 237984 : o = canditer_next(&ci);
167 237984 : v = VALUE(o - hseq);
168 237984 : if (prev == NULL || (*cmp)(v, prev) != 0) {
169 23054 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
170 0 : goto bunins_failed;
171 : }
172 237984 : prev = v;
173 : }
174 42 : TIMEOUT_CHECK(qry_ctx,
175 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
176 312 : } else if (BATcheckhash(b) ||
177 299 : ((!bi.transient ||
178 79 : (b->batRole == PERSISTENT && GDKinmemory(0))) &&
179 331 : ci.ncand == bi.count &&
180 111 : BAThash(b) == GDK_SUCCEED)) {
181 124 : BUN lo = 0;
182 :
183 : /* we already have a hash table on b, or b is
184 : * persistent and we could create a hash table, or b
185 : * is a view on a bat that already has a hash table */
186 124 : algomsg = "unique: existing hash";
187 124 : MT_rwlock_rdlock(&b->thashlock);
188 124 : hs = b->thash;
189 124 : if (hs == NULL) {
190 0 : MT_rwlock_rdunlock(&b->thashlock);
191 0 : goto lost_hash;
192 : }
193 83567 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
194 83319 : BUN p;
195 :
196 83319 : o = canditer_next(&ci);
197 83319 : p = o - hseq;
198 83319 : v = VALUE(p);
199 83319 : for (hb = HASHgetlink(hs, p + lo);
200 99207 : hb != BUN_NONE && hb >= lo;
201 15888 : hb = HASHgetlink(hs, hb)) {
202 62235 : assert(hb < p + lo);
203 108966 : if (cmp(v, BUNtail(bi, hb)) == 0 &&
204 46731 : canditer_contains(&ci, hb - lo + hseq)) {
205 : /* we've seen this value
206 : * before */
207 : break;
208 : }
209 : }
210 83319 : if (hb == BUN_NONE || hb < lo) {
211 36972 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED) {
212 0 : MT_rwlock_rdunlock(&b->thashlock);
213 0 : hs = NULL;
214 0 : goto bunins_failed;
215 : }
216 : }
217 : }
218 124 : MT_rwlock_rdunlock(&b->thashlock);
219 124 : TIMEOUT_CHECK(qry_ctx,
220 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
221 : } else {
222 188 : BUN prb;
223 188 : BUN p;
224 188 : BUN mask;
225 :
226 188 : lost_hash:
227 188 : GDKclrerr(); /* not interested in BAThash errors */
228 188 : algomsg = "unique: new partial hash";
229 188 : nme = BBP_physical(b->batCacheid);
230 188 : if (ATOMbasetype(bi.type) == TYPE_bte) {
231 : mask = (BUN) 1 << 8;
232 : cmp = NULL; /* no compare needed, "hash" is perfect */
233 188 : } else if (ATOMbasetype(bi.type) == TYPE_sht) {
234 : mask = (BUN) 1 << 16;
235 : cmp = NULL; /* no compare needed, "hash" is perfect */
236 : } else {
237 188 : mask = HASHmask(ci.ncand);
238 18 : if (mask < ((BUN) 1 << 16))
239 182 : mask = (BUN) 1 << 16;
240 : }
241 188 : if ((hs = GDKzalloc(sizeof(Hash))) == NULL) {
242 0 : GDKerror("cannot allocate hash table\n");
243 0 : goto bunins_failed;
244 : }
245 188 : hs->heapbckt.parentid = b->batCacheid;
246 188 : hs->heaplink.parentid = b->batCacheid;
247 188 : if ((hs->heaplink.farmid = BBPselectfarm(TRANSIENT, bi.type, hashheap)) < 0 ||
248 188 : (hs->heapbckt.farmid = BBPselectfarm(TRANSIENT, bi.type, hashheap)) < 0 ||
249 188 : snprintf(hs->heaplink.filename, sizeof(hs->heaplink.filename), "%s.thshunil%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs->heaplink.filename) ||
250 376 : snprintf(hs->heapbckt.filename, sizeof(hs->heapbckt.filename), "%s.thshunib%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs->heapbckt.filename) ||
251 188 : HASHnew(hs, bi.type, BATcount(b), mask, BUN_NONE, false) != GDK_SUCCEED) {
252 0 : GDKfree(hs);
253 0 : hs = NULL;
254 0 : GDKerror("cannot allocate hash table\n");
255 0 : goto bunins_failed;
256 : }
257 483802 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
258 483399 : o = canditer_next(&ci);
259 470627 : v = VALUE(o - hseq);
260 470627 : prb = HASHprobe(hs, v);
261 470953 : for (hb = HASHget(hs, prb);
262 482633 : hb != BUN_NONE;
263 11680 : hb = HASHgetlink(hs, hb)) {
264 435508 : if (cmp == NULL || cmp(v, BUNtail(bi, hb)) == 0)
265 : break;
266 : }
267 482544 : if (hb == BUN_NONE) {
268 47152 : p = o - hseq;
269 47152 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
270 0 : goto bunins_failed;
271 : /* enter into hash table */
272 47152 : HASHputlink(hs, p, HASHget(hs, prb));
273 47385 : HASHput(hs, prb, p);
274 : }
275 : }
276 188 : TIMEOUT_CHECK(qry_ctx,
277 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
278 188 : HEAPfree(&hs->heaplink, true);
279 188 : HEAPfree(&hs->heapbckt, true);
280 188 : GDKfree(hs);
281 : }
282 455 : if (BATcount(bn) == bi.count) {
283 : /* it turns out all values are distinct */
284 71 : MT_lock_set(&b->theaplock);
285 71 : if (BATcount(b) == bi.count) {
286 : /* and the input hasn't changed in the mean
287 : * time--the only allowed change being appends;
288 : * updates not allowed since the candidate list
289 : * covers the complete bat */
290 71 : assert(b->tnokey[0] == 0);
291 71 : assert(b->tnokey[1] == 0);
292 71 : b->tkey = true;
293 : }
294 71 : MT_lock_unset(&b->theaplock);
295 : }
296 455 : bat_iterator_end(&bi);
297 :
298 455 : bn->theap->dirty = true;
299 455 : bn->tsorted = true;
300 455 : bn->trevsorted = BATcount(bn) <= 1;
301 455 : bn->tkey = true;
302 455 : bn->tnil = false;
303 455 : bn->tnonil = true;
304 455 : bn = virtualize(bn);
305 455 : MT_thread_setalgorithm(algomsg);
306 455 : TRC_DEBUG(ALGO, "b=" ALGOBATFMT
307 : ",s=" ALGOOPTBATFMT " -> " ALGOOPTBATFMT
308 : " (%s -- " LLFMT "usec)\n",
309 : ALGOBATPAR(b), ALGOOPTBATPAR(s),
310 : ALGOOPTBATPAR(bn), algomsg, GDKusec() - t0);
311 : return bn;
312 :
313 0 : bunins_failed:
314 0 : bat_iterator_end(&bi);
315 0 : if (hs != NULL) {
316 0 : HEAPfree(&hs->heaplink, true);
317 0 : HEAPfree(&hs->heapbckt, true);
318 0 : GDKfree(hs);
319 : }
320 0 : BBPreclaim(bn);
321 0 : return NULL;
322 : }
|