Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "gdk.h"
15 : #include "gdk_private.h"
16 : #include "gdk_calc_private.h"
17 :
18 : #define VALUE(x) (vars ? vars + VarHeapVal(vals, (x), width) : vals + (x) * width)
19 : /* BATunique returns a bat that indicates the unique tail values of
20 : * the input bat. This is essentially the same output as the
21 : * "extents" output of BATgroup. The difference is that BATunique
22 : * does not return the grouping bat.
23 : *
24 : * The first input is the bat from which unique rows are selected, the
25 : * second input is an optional candidate list.
26 : *
27 : * The return value is a candidate list.
28 : */
29 : BAT *
30 1500 : BATunique(BAT *b, BAT *s)
31 : {
32 1500 : BAT *bn;
33 1500 : const void *v;
34 1500 : const char *vals;
35 1500 : const char *vars;
36 1500 : int width;
37 1500 : oid i, o, hseq;
38 1500 : const char *nme;
39 1500 : Hash *hs = NULL;
40 1500 : BUN hb;
41 1500 : int (*cmp)(const void *, const void *);
42 1500 : struct canditer ci;
43 1500 : const char *algomsg = "";
44 1500 : lng t0 = 0;
45 :
46 1500 : QryCtx *qry_ctx = MT_thread_get_qry_ctx();
47 :
48 1500 : TRC_DEBUG_IF(ALGO) t0 = GDKusec();
49 :
50 1500 : BATcheck(b, NULL);
51 1500 : canditer_init(&ci, b, s);
52 :
53 1500 : (void) BATordered(b);
54 1500 : (void) BATordered_rev(b);
55 1500 : BATiter bi = bat_iterator(b);
56 1500 : if (bi.key || ci.ncand <= 1 || BATtdensebi(&bi)) {
57 : /* trivial: already unique */
58 955 : bn = canditer_slice(&ci, 0, ci.ncand);
59 955 : bat_iterator_end(&bi);
60 955 : TRC_DEBUG(ALGO, "b=" ALGOBATFMT
61 : ",s=" ALGOOPTBATFMT " -> " ALGOOPTBATFMT
62 : " (already unique, slice candidates -- "
63 : LLFMT "usec)\n",
64 : ALGOBATPAR(b), ALGOOPTBATPAR(s),
65 : ALGOOPTBATPAR(bn), GDKusec() - t0);
66 955 : return bn;
67 : }
68 :
69 545 : if ((bi.sorted && bi.revsorted) ||
70 480 : (bi.type == TYPE_void && is_oid_nil(b->tseqbase))) {
71 : /* trivial: all values are the same */
72 65 : bn = BATdense(0, ci.seq, 1);
73 65 : bat_iterator_end(&bi);
74 65 : TRC_DEBUG(ALGO, "b=" ALGOBATFMT
75 : ",s=" ALGOOPTBATFMT " -> " ALGOOPTBATFMT
76 : " (all equal -- "
77 : LLFMT "usec)\n",
78 : ALGOBATPAR(b), ALGOOPTBATPAR(s),
79 : ALGOOPTBATPAR(bn), GDKusec() - t0);
80 65 : return bn;
81 : }
82 :
83 480 : assert(bi.type != TYPE_void);
84 :
85 480 : BUN initsize = BUN_NONE;
86 480 : if (s == NULL) {
87 271 : MT_rwlock_rdlock(&b->thashlock);
88 271 : if (b->thash != NULL && b->thash != (Hash *) 1)
89 5 : initsize = b->thash->nunique;
90 271 : MT_rwlock_rdunlock(&b->thashlock);
91 271 : if (initsize == BUN_NONE && bi.unique_est != 0)
92 144 : initsize = (BUN) bi.unique_est;
93 : }
94 266 : if (initsize == BUN_NONE)
95 : initsize = 1024;
96 480 : bn = COLnew(0, TYPE_oid, initsize, TRANSIENT);
97 480 : if (bn == NULL) {
98 0 : bat_iterator_end(&bi);
99 0 : return NULL;
100 : }
101 480 : vals = bi.base;
102 480 : if (bi.vh && bi.type)
103 95 : vars = bi.vh->base;
104 : else
105 : vars = NULL;
106 480 : width = bi.width;
107 480 : cmp = ATOMcompare(bi.type);
108 480 : hseq = b->hseqbase;
109 :
110 480 : if (ATOMbasetype(bi.type) == TYPE_bte ||
111 55 : (bi.width == 1 &&
112 55 : ATOMstorage(bi.type) == TYPE_str &&
113 55 : GDK_ELIMDOUBLES(bi.vh))) {
114 72 : uint8_t val;
115 :
116 72 : algomsg = "unique: byte-sized atoms";
117 72 : uint32_t seen[256 >> 5];
118 72 : memset(seen, 0, sizeof(seen));
119 304222 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
120 304066 : o = canditer_next(&ci);
121 304066 : val = ((const uint8_t *) vals)[o - hseq];
122 304066 : uint32_t m = UINT32_C(1) << (val & 0x1F);
123 304066 : if (!(seen[val >> 5] & m)) {
124 326 : seen[val >> 5] |= m;
125 326 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
126 0 : goto bunins_failed;
127 326 : if (bn->batCount == 256) {
128 : /* there cannot be more than
129 : * 256 distinct values */
130 : break;
131 : }
132 : }
133 : }
134 72 : TIMEOUT_CHECK(qry_ctx,
135 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
136 408 : } else if (ATOMbasetype(bi.type) == TYPE_sht ||
137 33 : (bi.width == 2 &&
138 33 : ATOMstorage(bi.type) == TYPE_str &&
139 33 : GDK_ELIMDOUBLES(bi.vh))) {
140 41 : uint16_t val;
141 :
142 41 : algomsg = "unique: short-sized atoms";
143 41 : uint32_t seen[65536 >> 5];
144 41 : memset(seen, 0, sizeof(seen));
145 47392 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
146 47310 : o = canditer_next(&ci);
147 47310 : val = ((const uint16_t *) vals)[o - hseq];
148 47310 : uint32_t m = UINT32_C(1) << (val & 0x1F);
149 47310 : if (!(seen[val >> 5] & m)) {
150 4585 : seen[val >> 5] |= m;
151 4585 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
152 0 : goto bunins_failed;
153 4585 : if (bn->batCount == 65536) {
154 : /* there cannot be more than
155 : * 65536 distinct values */
156 : break;
157 : }
158 : }
159 : }
160 41 : TIMEOUT_CHECK(qry_ctx,
161 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
162 367 : } else if (bi.sorted || bi.revsorted) {
163 44 : const void *prev = NULL;
164 44 : algomsg = "unique: sorted";
165 238279 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
166 238135 : o = canditer_next(&ci);
167 238135 : v = VALUE(o - hseq);
168 238135 : if (prev == NULL || (*cmp)(v, prev) != 0) {
169 23071 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
170 0 : goto bunins_failed;
171 : }
172 238135 : prev = v;
173 : }
174 44 : TIMEOUT_CHECK(qry_ctx,
175 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
176 323 : } else if (BATcheckhash(b) ||
177 318 : ((!bi.transient ||
178 97 : (b->batRole == PERSISTENT && GDKinmemory(0))) &&
179 334 : ci.ncand == bi.count &&
180 113 : BAThash(b) == GDK_SUCCEED)) {
181 : /* we already have a hash table on b, or b is
182 : * persistent and we could create a hash table */
183 118 : algomsg = "unique: existing hash";
184 118 : MT_rwlock_rdlock(&b->thashlock);
185 118 : hs = b->thash;
186 118 : if (hs == NULL) {
187 0 : MT_rwlock_rdunlock(&b->thashlock);
188 0 : goto lost_hash;
189 : }
190 85230 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
191 84994 : BUN p;
192 :
193 84994 : o = canditer_next(&ci);
194 84994 : p = o - hseq;
195 84994 : v = VALUE(p);
196 84994 : for (hb = HASHgetlink(hs, p);
197 100636 : hb != BUN_NONE;
198 15642 : hb = HASHgetlink(hs, hb)) {
199 62350 : assert(hb < p);
200 109058 : if (cmp(v, BUNtail(bi, hb)) == 0 &&
201 46708 : canditer_contains(&ci, hb + hseq)) {
202 : /* we've seen this value
203 : * before */
204 : break;
205 : }
206 : }
207 84994 : if (hb == BUN_NONE) {
208 38286 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED) {
209 0 : MT_rwlock_rdunlock(&b->thashlock);
210 0 : hs = NULL;
211 0 : goto bunins_failed;
212 : }
213 : }
214 : }
215 118 : MT_rwlock_rdunlock(&b->thashlock);
216 118 : TIMEOUT_CHECK(qry_ctx,
217 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
218 : } else {
219 205 : BUN prb;
220 205 : BUN p;
221 205 : BUN mask;
222 :
223 205 : lost_hash:
224 205 : GDKclrerr(); /* not interested in BAThash errors */
225 205 : algomsg = "unique: new partial hash";
226 205 : nme = BBP_physical(b->batCacheid);
227 205 : if (ATOMbasetype(bi.type) == TYPE_bte) {
228 : mask = (BUN) 1 << 8;
229 : cmp = NULL; /* no compare needed, "hash" is perfect */
230 205 : } else if (ATOMbasetype(bi.type) == TYPE_sht) {
231 : mask = (BUN) 1 << 16;
232 : cmp = NULL; /* no compare needed, "hash" is perfect */
233 : } else {
234 205 : mask = HASHmask(ci.ncand);
235 18 : if (mask < ((BUN) 1 << 16))
236 199 : mask = (BUN) 1 << 16;
237 : }
238 205 : if ((hs = GDKzalloc(sizeof(Hash))) == NULL) {
239 0 : GDKerror("cannot allocate hash table\n");
240 0 : goto bunins_failed;
241 : }
242 205 : hs->heapbckt.parentid = b->batCacheid;
243 205 : hs->heaplink.parentid = b->batCacheid;
244 205 : if ((hs->heaplink.farmid = BBPselectfarm(TRANSIENT, bi.type, hashheap)) < 0 ||
245 205 : (hs->heapbckt.farmid = BBPselectfarm(TRANSIENT, bi.type, hashheap)) < 0 ||
246 205 : snprintf(hs->heaplink.filename, sizeof(hs->heaplink.filename), "%s.thshunil%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs->heaplink.filename) ||
247 410 : snprintf(hs->heapbckt.filename, sizeof(hs->heapbckt.filename), "%s.thshunib%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs->heapbckt.filename) ||
248 205 : HASHnew(hs, bi.type, BATcount(b), mask, BUN_NONE, false) != GDK_SUCCEED) {
249 0 : GDKfree(hs);
250 0 : hs = NULL;
251 0 : GDKerror("cannot allocate hash table\n");
252 0 : goto bunins_failed;
253 : }
254 493341 : TIMEOUT_LOOP_IDX(i, ci.ncand, qry_ctx) {
255 492904 : o = canditer_next(&ci);
256 465198 : v = VALUE(o - hseq);
257 465198 : prb = HASHprobe(hs, v);
258 465135 : for (hb = HASHget(hs, prb);
259 492121 : hb != BUN_NONE;
260 26986 : hb = HASHgetlink(hs, hb)) {
261 439742 : if (cmp == NULL || cmp(v, BUNtail(bi, hb)) == 0)
262 : break;
263 : }
264 492047 : if (hb == BUN_NONE) {
265 52394 : p = o - hseq;
266 52394 : if (bunfastappTYPE(oid, bn, &o) != GDK_SUCCEED)
267 0 : goto bunins_failed;
268 : /* enter into hash table */
269 52394 : HASHputlink(hs, p, HASHget(hs, prb));
270 52629 : HASHput(hs, prb, p);
271 : }
272 : }
273 205 : TIMEOUT_CHECK(qry_ctx,
274 : GOTO_LABEL_TIMEOUT_HANDLER(bunins_failed, qry_ctx));
275 205 : HEAPfree(&hs->heaplink, true);
276 205 : HEAPfree(&hs->heapbckt, true);
277 205 : GDKfree(hs);
278 : }
279 480 : if (BATcount(bn) == bi.count) {
280 : /* it turns out all values are distinct */
281 77 : MT_lock_set(&b->theaplock);
282 77 : if (BATcount(b) == bi.count) {
283 : /* and the input hasn't changed in the mean
284 : * time--the only allowed change being appends;
285 : * updates not allowed since the candidate list
286 : * covers the complete bat */
287 77 : assert(b->tnokey[0] == 0);
288 77 : assert(b->tnokey[1] == 0);
289 77 : b->tkey = true;
290 : }
291 77 : MT_lock_unset(&b->theaplock);
292 : }
293 480 : bat_iterator_end(&bi);
294 :
295 480 : bn->theap->dirty = true;
296 480 : bn->tsorted = true;
297 480 : bn->trevsorted = BATcount(bn) <= 1;
298 480 : bn->tkey = true;
299 480 : bn->tnil = false;
300 480 : bn->tnonil = true;
301 480 : bn = virtualize(bn);
302 480 : MT_thread_setalgorithm(algomsg);
303 480 : TRC_DEBUG(ALGO, "b=" ALGOBATFMT
304 : ",s=" ALGOOPTBATFMT " -> " ALGOOPTBATFMT
305 : " (%s -- " LLFMT "usec)\n",
306 : ALGOBATPAR(b), ALGOOPTBATPAR(s),
307 : ALGOOPTBATPAR(bn), algomsg, GDKusec() - t0);
308 : return bn;
309 :
310 0 : bunins_failed:
311 0 : bat_iterator_end(&bi);
312 0 : if (hs != NULL) {
313 0 : HEAPfree(&hs->heaplink, true);
314 0 : HEAPfree(&hs->heapbckt, true);
315 0 : GDKfree(hs);
316 : }
317 0 : BBPreclaim(bn);
318 0 : return NULL;
319 : }
|