Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * (c) Martin Kersten, Lefteris Sidirourgos
15 : * Implement a parallel sort-merge MAL program generator
16 : */
17 : #include "monetdb_config.h"
18 : #include "orderidx.h"
19 : #include "gdk.h"
20 : #include "mal_exception.h"
21 : #include "mal_function.h"
22 : #include "opt_prelude.h"
23 :
24 : #define MIN_PIECE ((BUN) 1000) /* TODO use realistic size in production */
25 :
26 : str
27 0 : OIDXdropImplementation(Client cntxt, BAT *b)
28 : {
29 0 : (void) cntxt;
30 0 : OIDXdestroy(b);
31 0 : return MAL_SUCCEED;
32 : }
33 :
34 : str
35 81 : OIDXcreateImplementation(Client cntxt, int tpe, BAT *b, int pieces)
36 : {
37 81 : int i, loopvar, arg;
38 81 : BUN cnt, step = 0, o;
39 81 : MalBlkPtr smb;
40 81 : MalStkPtr newstk;
41 81 : Symbol snew = NULL;
42 81 : InstrPtr q, pack;
43 81 : char name[IDLENGTH];
44 81 : str msg = MAL_SUCCEED;
45 :
46 81 : if (BATcount(b) <= 1)
47 : return MAL_SUCCEED;
48 :
49 : /* check if b is sorted, then index does not make sense */
50 73 : if (b->tsorted || b->trevsorted)
51 : return MAL_SUCCEED;
52 :
53 : /* check if b already has index */
54 73 : if (BATcheckorderidx(b))
55 : return MAL_SUCCEED;
56 :
57 130 : switch (ATOMbasetype(b->ttype)) {
58 : case TYPE_void:
59 : /* trivially supported */
60 : return MAL_SUCCEED;
61 52 : case TYPE_bte:
62 : case TYPE_sht:
63 : case TYPE_int:
64 : case TYPE_lng:
65 : #ifdef HAVE_HGE
66 : case TYPE_hge:
67 : #endif
68 : case TYPE_flt:
69 : case TYPE_dbl:
70 52 : if (GDKnr_threads > 1 && BATcount(b) >= 2 * MIN_PIECE
71 0 : && (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) == 0)
72 : break;
73 : /* fall through */
74 : default:
75 72 : if (BATorderidx(b, true) != GDK_SUCCEED)
76 0 : throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
77 : return MAL_SUCCEED;
78 : }
79 :
80 0 : if (pieces <= 0) {
81 0 : if (GDKnr_threads <= 1) {
82 : pieces = 1;
83 0 : } else if (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) {
84 : /* we want many pieces, even tiny ones */
85 0 : if (BATcount(b) < 4)
86 : pieces = 1;
87 0 : else if (BATcount(b) / 2 < (BUN) GDKnr_threads)
88 0 : pieces = (int) (BATcount(b) / 2);
89 : else
90 : pieces = GDKnr_threads;
91 : } else {
92 0 : if (BATcount(b) < 2 * MIN_PIECE)
93 : pieces = 1;
94 0 : else if (BATcount(b) / MIN_PIECE < (BUN) GDKnr_threads)
95 0 : pieces = (int) (BATcount(b) / MIN_PIECE);
96 : else
97 : pieces = GDKnr_threads;
98 : }
99 0 : } else if (BATcount(b) < (BUN) pieces || BATcount(b) < MIN_PIECE) {
100 0 : pieces = 1;
101 : }
102 :
103 : /* create a temporary MAL function to sort the BAT in parallel */
104 0 : snprintf(name, IDLENGTH, "sort%d", rand() % 1000);
105 0 : snew = newFunction(putName("user"), putName(name), FUNCTIONsymbol);
106 0 : if (snew == NULL) {
107 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
108 : }
109 0 : smb = snew->def;
110 0 : q = getInstrPtr(smb, 0);
111 0 : if ((arg = newTmpVariable(smb, tpe)) < 0) {
112 0 : msg = createException(MAL, "bat.orderidx",
113 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
114 0 : goto bailout;
115 : }
116 0 : q = pushArgument(smb, q, arg);
117 0 : if ((getArg(q, 0) = newTmpVariable(smb, TYPE_void)) < 0) {
118 0 : msg = createException(MAL, "bat.orderidx",
119 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
120 0 : goto bailout;
121 : }
122 :
123 0 : if (resizeMalBlk(smb, 2 * pieces + 10) < 0)
124 0 : goto bailout; // large enough
125 : /* create the pack instruction first, as it will hold
126 : * intermediate variables */
127 0 : pack = newInstruction(0, putName("bat"), putName("orderidx"));
128 0 : if (pack == NULL || (pack->argv[0] = newTmpVariable(smb, TYPE_void)) < 0) {
129 0 : freeInstruction(pack);
130 0 : msg = createException(MAL, "bat.orderidx",
131 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
132 0 : goto bailout;
133 : }
134 0 : pack = pushArgument(smb, pack, arg);
135 0 : if (smb->errors) {
136 0 : freeInstruction(pack);
137 0 : msg = smb->errors;
138 0 : smb->errors = NULL;
139 0 : goto bailout;
140 : }
141 0 : setVarFixed(smb, getArg(pack, 0));
142 :
143 : /* the costly part executed as a parallel block */
144 0 : if ((loopvar = newTmpVariable(smb, TYPE_bit)) < 0) {
145 0 : freeInstruction(pack);
146 0 : msg = createException(MAL, "bat.orderidx",
147 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
148 0 : goto bailout;
149 : }
150 0 : q = newStmt(smb, putName("language"), putName("dataflow"));
151 0 : if (q == NULL) {
152 0 : freeInstruction(pack);
153 0 : msg = createException(MAL, "bat.orderidx",
154 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
155 0 : goto bailout;
156 : }
157 0 : q->barrier = BARRIERsymbol;
158 0 : q->argv[0] = loopvar;
159 0 : pushInstruction(smb, q);
160 :
161 0 : cnt = BATcount(b);
162 0 : step = cnt / pieces;
163 0 : o = 0;
164 0 : for (i = 0; smb->errors == NULL && i < pieces; i++) {
165 : /* add slice instruction */
166 0 : q = newInstruction(smb, algebraRef, putName("slice"));
167 0 : if (q == NULL || (setDestVar(q, newTmpVariable(smb, TYPE_any))) < 0) {
168 0 : freeInstruction(q);
169 0 : freeInstruction(pack);
170 0 : msg = createException(MAL, "bat.orderidx",
171 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
172 0 : goto bailout;
173 : }
174 0 : setVarType(smb, getArg(q, 0), tpe);
175 0 : setVarFixed(smb, getArg(q, 0));
176 0 : q = pushArgument(smb, q, arg);
177 0 : pack = pushArgument(smb, pack, getArg(q, 0));
178 0 : q = pushOid(smb, q, o);
179 0 : if (i == pieces - 1) {
180 : o = cnt;
181 : } else {
182 0 : o += step;
183 : }
184 0 : q = pushOid(smb, q, o - 1);
185 0 : pushInstruction(smb, q);
186 : }
187 0 : for (i = 0; smb->errors == NULL && i < pieces; i++) {
188 : /* add sort instruction */
189 0 : q = newInstruction(smb, algebraRef, putName("orderidx"));
190 0 : if (q == NULL || (setDestVar(q, newTmpVariable(smb, TYPE_any))) < 0) {
191 0 : freeInstruction(q);
192 0 : freeInstruction(pack);
193 0 : msg = createException(MAL, "bat.orderidx",
194 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
195 0 : goto bailout;
196 : }
197 0 : setVarType(smb, getArg(q, 0), tpe);
198 0 : setVarFixed(smb, getArg(q, 0));
199 0 : q = pushArgument(smb, q, pack->argv[2 + i]);
200 0 : q = pushBit(smb, q, 1);
201 0 : pack->argv[2 + i] = getArg(q, 0);
202 0 : pushInstruction(smb, q);
203 : }
204 : /* finalize OID packing, check, and evaluate */
205 0 : pushInstruction(smb, pack);
206 0 : q = newAssignment(smb);
207 0 : if (q == NULL) {
208 0 : msg = createException(MAL, "bat.orderidx",
209 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
210 0 : goto bailout;
211 : }
212 0 : q->barrier = EXITsymbol;
213 0 : q->argv[0] = loopvar;
214 0 : pushInstruction(smb, q);
215 0 : pushEndInstruction(smb);
216 0 : if (smb->errors)
217 0 : goto bailout;
218 0 : msg = chkProgram(cntxt->usermodule, smb);
219 0 : if (msg)
220 0 : goto bailout;
221 : /* evaluate MAL block and keep the ordered OID bat */
222 0 : newstk = prepareMALstack(smb, smb->vsize);
223 0 : if (newstk == NULL) {
224 0 : msg = createException(MAL, "bat.orderidx",
225 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
226 0 : goto bailout;
227 : }
228 0 : newstk->up = 0;
229 0 : newstk->stk[arg].vtype = TYPE_bat;
230 0 : newstk->stk[arg].val.bval = b->batCacheid;
231 0 : BBPretain(newstk->stk[arg].val.bval);
232 0 : msg = runMALsequence(cntxt, smb, 1, 0, newstk, 0, 0);
233 0 : freeStack(newstk);
234 : /* get rid of temporary MAL block */
235 0 : bailout:
236 0 : if (msg == MAL_SUCCEED && smb->errors) {
237 0 : msg = smb->errors;
238 0 : smb->errors = NULL;
239 : }
240 0 : freeSymbol(snew);
241 0 : return msg;
242 : }
243 :
244 : static str
245 6 : OIDXcreate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
246 : {
247 6 : BAT *b;
248 6 : str msg = MAL_SUCCEED;
249 6 : int pieces = -1;
250 :
251 6 : if (pci->argc == 3) {
252 6 : pieces = stk->stk[pci->argv[2]].val.ival;
253 6 : if (pieces < 0)
254 0 : throw(MAL, "bat.orderidx", "Positive number expected");
255 : }
256 :
257 6 : b = BATdescriptor(*getArgReference_bat(stk, pci, 1));
258 6 : if (b == NULL)
259 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
260 6 : msg = OIDXcreateImplementation(cntxt, getArgType(mb, pci, 1), b, pieces);
261 6 : BBPunfix(b->batCacheid);
262 6 : return msg;
263 : }
264 :
265 : static str
266 0 : OIDXhasorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
267 : {
268 0 : BAT *b;
269 0 : bit *ret = getArgReference_bit(stk, pci, 0);
270 0 : bat bid = *getArgReference_bat(stk, pci, 1);
271 :
272 0 : (void) cntxt;
273 0 : (void) mb;
274 :
275 0 : b = BATdescriptor(bid);
276 0 : if (b == NULL)
277 0 : throw(MAL, "bat.hasorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
278 :
279 0 : *ret = b->torderidx != NULL;
280 :
281 0 : BBPunfix(b->batCacheid);
282 0 : return MAL_SUCCEED;
283 : }
284 :
285 : static str
286 6 : OIDXgetorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
287 : {
288 6 : BAT *b;
289 6 : BAT *bn;
290 6 : bat *ret = getArgReference_bat(stk, pci, 0);
291 6 : bat bid = *getArgReference_bat(stk, pci, 1);
292 :
293 6 : (void) cntxt;
294 6 : (void) mb;
295 :
296 6 : b = BATdescriptor(bid);
297 6 : if (b == NULL)
298 0 : throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
299 :
300 6 : if (!BATcheckorderidx(b)) {
301 0 : BBPunfix(b->batCacheid);
302 0 : throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
303 : }
304 :
305 6 : if ((bn = COLnew(0, TYPE_oid, BATcount(b), TRANSIENT)) == NULL) {
306 0 : BBPunfix(b->batCacheid);
307 0 : throw(MAL, "bat.getorderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
308 : }
309 6 : memcpy(Tloc(bn, 0), (const oid *) b->torderidx->base + ORDERIDXOFF,
310 6 : BATcount(b) * SIZEOF_OID);
311 6 : BATsetcount(bn, BATcount(b));
312 6 : bn->tkey = true;
313 6 : bn->tsorted = bn->trevsorted = BATcount(b) <= 1;
314 6 : bn->tnil = false;
315 6 : bn->tnonil = true;
316 6 : *ret = bn->batCacheid;
317 6 : BBPkeepref(bn);
318 6 : BBPunfix(b->batCacheid);
319 6 : return MAL_SUCCEED;
320 : }
321 :
322 : static str
323 0 : OIDXorderidx(bat *ret, const bat *bid, const bit *stable)
324 : {
325 0 : BAT *b;
326 :
327 0 : (void) ret;
328 0 : b = BATdescriptor(*bid);
329 0 : if (b == NULL)
330 0 : throw(MAL, "algebra.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
331 :
332 0 : if (BATorderidx(b, *stable) != GDK_SUCCEED) {
333 0 : BBPunfix(*bid);
334 0 : throw(MAL, "algebra.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
335 : }
336 0 : *ret = *bid;
337 0 : BBPkeepref(b);
338 0 : return MAL_SUCCEED;
339 : }
340 :
341 : /*
342 : * Merge the collection of sorted OID BATs into one
343 : */
344 :
345 : static str
346 0 : OIDXmerge(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
347 : {
348 0 : bat bid;
349 0 : BAT *b;
350 0 : BAT **a;
351 0 : int i, j, n_ar;
352 0 : BUN m_sz;
353 0 : gdk_return rc;
354 :
355 0 : (void) cntxt;
356 0 : (void) mb;
357 :
358 0 : if (pci->retc != 1)
359 0 : throw(MAL, "bat.orderidx",
360 : SQLSTATE(HY002) "INTERNAL ERROR, retc != 1 ");
361 0 : if (pci->argc < 2)
362 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) "INTERNAL ERROR, argc != 2");
363 :
364 0 : n_ar = pci->argc - 2;
365 :
366 0 : bid = *getArgReference_bat(stk, pci, 1);
367 0 : b = BATdescriptor(bid);
368 0 : if (b == NULL)
369 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
370 :
371 0 : if (b->torderidx) {
372 0 : BBPunfix(bid);
373 0 : throw(MAL, "bat.orderidx",
374 : SQLSTATE(HY002) "INTERNAL ERROR, torderidx already set");
375 : }
376 :
377 0 : switch (ATOMbasetype(b->ttype)) {
378 : case TYPE_bte:
379 : case TYPE_sht:
380 : case TYPE_int:
381 : case TYPE_lng:
382 : #ifdef HAVE_HGE
383 : case TYPE_hge:
384 : #endif
385 : case TYPE_flt:
386 : case TYPE_dbl:
387 0 : break;
388 0 : case TYPE_str:
389 : /* TODO: support strings etc. */
390 : case TYPE_void:
391 : case TYPE_ptr:
392 : default:
393 0 : BBPunfix(bid);
394 0 : throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
395 : }
396 :
397 0 : if ((a = (BAT **) GDKmalloc(n_ar * sizeof(BAT *))) == NULL) {
398 0 : BBPunfix(bid);
399 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
400 : }
401 : m_sz = 0;
402 0 : for (i = 0; i < n_ar; i++) {
403 0 : a[i] = BATdescriptor(*getArgReference_bat(stk, pci, i + 2));
404 0 : if (a[i] == NULL) {
405 0 : for (j = i - 1; j >= 0; j--) {
406 0 : BBPunfix(a[j]->batCacheid);
407 : }
408 0 : GDKfree(a);
409 0 : BBPunfix(bid);
410 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
411 : }
412 0 : m_sz += BATcount(a[i]);
413 0 : if (BATcount(a[i]) == 0) {
414 0 : BBPunfix(a[i]->batCacheid);
415 0 : a[i] = NULL;
416 : }
417 : }
418 0 : assert(m_sz == BATcount(b));
419 0 : for (i = 0; i < n_ar; i++) {
420 0 : if (a[i] == NULL) {
421 0 : n_ar--;
422 0 : if (i < n_ar)
423 0 : a[i] = a[n_ar];
424 0 : i--;
425 : }
426 : }
427 0 : if (m_sz != BATcount(b)) {
428 : BBPunfix(bid);
429 : for (i = 0; i < n_ar; i++)
430 : BBPunfix(a[i]->batCacheid);
431 : GDKfree(a);
432 : throw(MAL, "bat.orderidx", "count mismatch");
433 : }
434 :
435 0 : rc = GDKmergeidx(b, a, n_ar);
436 :
437 0 : for (i = 0; i < n_ar; i++)
438 0 : BBPunfix(a[i]->batCacheid);
439 0 : GDKfree(a);
440 0 : BBPunfix(bid);
441 :
442 0 : if (rc != GDK_SUCCEED)
443 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
444 :
445 : return MAL_SUCCEED;
446 : }
447 :
448 : #include "mel.h"
449 : mel_func orderidx_init_funcs[] = {
450 : pattern("bat", "orderidx", OIDXcreate, false, "Introduces the OID index arrangement of ordered values", args(1,2, arg("",void),batargany("bv",1))),
451 : pattern("bat", "orderidx", OIDXcreate, false, "Introduces the OID index arrangement of ordered values", args(1,3, arg("",void),batargany("bv",1),arg("pieces",int))),
452 : pattern("bat", "orderidx", OIDXmerge, false, "Consolidates the OID index arrangement", args(1,3, arg("",void),batargany("bv",1),batvarargany("l",1))),
453 : pattern("bat", "hasorderidx", OIDXhasorderidx, false, "Return true if order index exists", args(1,2, arg("",bit),batargany("bv",1))),
454 : pattern("bat", "getorderidx", OIDXgetorderidx, false, "Return the order index if it exists", args(1,2, batarg("",oid),batargany("bv",1))),
455 : command("algebra", "orderidx", OIDXorderidx, false, "Create an order index", args(1,3, batargany("",1),batargany("bv",1),arg("stable",bit))),
456 : { .imp=NULL }
457 : };
458 : #include "mal_import.h"
459 : #ifdef _MSC_VER
460 : #undef read
461 : #pragma section(".CRT$XCU",read)
462 : #endif
463 329 : LIB_STARTUP_FUNC(init_orderidx_mal)
464 329 : { mal_module("orderidx", NULL, orderidx_init_funcs); }
|