Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * (c) Martin Kersten, Lefteris Sidirourgos
15 : * Implement a parallel sort-merge MAL program generator
16 : */
17 : #include "monetdb_config.h"
18 : #include "orderidx.h"
19 : #include "gdk.h"
20 : #include "mal_exception.h"
21 : #include "mal_function.h"
22 : #include "opt_prelude.h"
23 :
24 : #define MIN_PIECE ((BUN) 1000) /* TODO use realistic size in production */
25 :
26 : str
27 0 : OIDXdropImplementation(Client cntxt, BAT *b)
28 : {
29 0 : (void) cntxt;
30 0 : OIDXdestroy(b);
31 0 : return MAL_SUCCEED;
32 : }
33 :
34 : str
35 83 : OIDXcreateImplementation(Client cntxt, int tpe, BAT *b, int pieces)
36 : {
37 83 : int i, loopvar, arg;
38 83 : BUN cnt, step = 0, o;
39 83 : MalBlkPtr smb;
40 83 : MalStkPtr newstk;
41 83 : Symbol snew = NULL;
42 83 : InstrPtr q, pack;
43 83 : char name[IDLENGTH];
44 83 : str msg = MAL_SUCCEED;
45 :
46 83 : if (BATcount(b) <= 1)
47 : return MAL_SUCCEED;
48 :
49 : /* check if b is sorted, then index does not make sense */
50 74 : if (b->tsorted || b->trevsorted)
51 : return MAL_SUCCEED;
52 :
53 : /* check if b already has index */
54 74 : if (BATcheckorderidx(b))
55 : return MAL_SUCCEED;
56 :
57 132 : switch (ATOMbasetype(b->ttype)) {
58 : case TYPE_void:
59 : /* trivially supported */
60 : return MAL_SUCCEED;
61 53 : case TYPE_bte:
62 : case TYPE_sht:
63 : case TYPE_int:
64 : case TYPE_lng:
65 : #ifdef HAVE_HGE
66 : case TYPE_hge:
67 : #endif
68 : case TYPE_flt:
69 : case TYPE_dbl:
70 53 : if (GDKnr_threads > 1 && BATcount(b) >= 2 * MIN_PIECE
71 0 : && (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) == 0)
72 : break;
73 : /* fall through */
74 : default:
75 73 : if (BATorderidx(b, true) != GDK_SUCCEED)
76 0 : throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
77 : return MAL_SUCCEED;
78 : }
79 :
80 0 : if (pieces <= 0) {
81 0 : if (GDKnr_threads <= 1) {
82 : pieces = 1;
83 0 : } else if (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) {
84 : /* we want many pieces, even tiny ones */
85 0 : if (BATcount(b) < 4)
86 : pieces = 1;
87 0 : else if (BATcount(b) / 2 < (BUN) GDKnr_threads)
88 0 : pieces = (int) (BATcount(b) / 2);
89 : else
90 : pieces = GDKnr_threads;
91 : } else {
92 0 : if (BATcount(b) < 2 * MIN_PIECE)
93 : pieces = 1;
94 0 : else if (BATcount(b) / MIN_PIECE < (BUN) GDKnr_threads)
95 0 : pieces = (int) (BATcount(b) / MIN_PIECE);
96 : else
97 : pieces = GDKnr_threads;
98 : }
99 0 : } else if (BATcount(b) < (BUN) pieces || BATcount(b) < MIN_PIECE) {
100 0 : pieces = 1;
101 : }
102 :
103 : /* create a temporary MAL function to sort the BAT in parallel */
104 0 : snprintf(name, IDLENGTH, "sort%d", rand() % 1000);
105 0 : snew = newFunction(putName("user"), putName(name), FUNCTIONsymbol);
106 0 : if (snew == NULL) {
107 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
108 : }
109 0 : smb = snew->def;
110 0 : q = getInstrPtr(smb, 0);
111 0 : if ((arg = newTmpVariable(smb, tpe)) < 0) {
112 0 : msg = createException(MAL, "bat.orderidx",
113 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
114 0 : goto bailout;
115 : }
116 0 : q = pushArgument(smb, q, arg);
117 0 : if ((getArg(q, 0) = newTmpVariable(smb, TYPE_void)) < 0) {
118 0 : msg = createException(MAL, "bat.orderidx",
119 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
120 0 : goto bailout;
121 : }
122 :
123 0 : if (resizeMalBlk(smb, 2 * pieces + 10) < 0)
124 0 : goto bailout; // large enough
125 : /* create the pack instruction first, as it will hold
126 : * intermediate variables */
127 0 : pack = newInstruction(0, putName("bat"), putName("orderidx"));
128 0 : if (pack == NULL || (pack->argv[0] = newTmpVariable(smb, TYPE_void)) < 0) {
129 0 : freeInstruction(pack);
130 0 : msg = createException(MAL, "bat.orderidx",
131 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
132 0 : goto bailout;
133 : }
134 0 : pack = pushArgument(smb, pack, arg);
135 0 : if (smb->errors) {
136 0 : freeInstruction(pack);
137 0 : msg = smb->errors;
138 0 : smb->errors = NULL;
139 0 : goto bailout;
140 : }
141 0 : setVarFixed(smb, getArg(pack, 0));
142 :
143 : /* the costly part executed as a parallel block */
144 0 : if ((loopvar = newTmpVariable(smb, TYPE_bit)) < 0) {
145 0 : freeInstruction(pack);
146 0 : msg = createException(MAL, "bat.orderidx",
147 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
148 0 : goto bailout;
149 : }
150 0 : q = newStmt(smb, putName("language"), putName("dataflow"));
151 0 : if (q == NULL) {
152 0 : freeInstruction(pack);
153 0 : msg = createException(MAL, "bat.orderidx",
154 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
155 0 : goto bailout;
156 : }
157 0 : q->barrier = BARRIERsymbol;
158 0 : q->argv[0] = loopvar;
159 0 : pushInstruction(smb, q);
160 :
161 0 : cnt = BATcount(b);
162 0 : step = cnt / pieces;
163 0 : o = 0;
164 0 : for (i = 0; smb->errors == NULL && i < pieces; i++) {
165 : /* add slice instruction */
166 0 : q = newInstruction(smb, algebraRef, putName("slice"));
167 0 : if (q == NULL || (setDestVar(q, newTmpVariable(smb, TYPE_any))) < 0) {
168 0 : freeInstruction(q);
169 0 : freeInstruction(pack);
170 0 : msg = createException(MAL, "bat.orderidx",
171 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
172 0 : goto bailout;
173 : }
174 0 : setVarType(smb, getArg(q, 0), tpe);
175 0 : setVarFixed(smb, getArg(q, 0));
176 0 : q = pushArgument(smb, q, arg);
177 0 : pack = pushArgument(smb, pack, getArg(q, 0));
178 0 : q = pushOid(smb, q, o);
179 0 : if (i == pieces - 1) {
180 : o = cnt;
181 : } else {
182 0 : o += step;
183 : }
184 0 : q = pushOid(smb, q, o - 1);
185 0 : pushInstruction(smb, q);
186 : }
187 0 : for (i = 0; smb->errors == NULL && i < pieces; i++) {
188 : /* add sort instruction */
189 0 : q = newInstruction(smb, algebraRef, putName("orderidx"));
190 0 : if (q == NULL || (setDestVar(q, newTmpVariable(smb, TYPE_any))) < 0) {
191 0 : freeInstruction(q);
192 0 : freeInstruction(pack);
193 0 : msg = createException(MAL, "bat.orderidx",
194 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
195 0 : goto bailout;
196 : }
197 0 : setVarType(smb, getArg(q, 0), tpe);
198 0 : setVarFixed(smb, getArg(q, 0));
199 0 : q = pushArgument(smb, q, pack->argv[2 + i]);
200 0 : q = pushBit(smb, q, 1);
201 0 : pack->argv[2 + i] = getArg(q, 0);
202 0 : pushInstruction(smb, q);
203 : }
204 : /* finalize OID packing, check, and evaluate */
205 0 : pushInstruction(smb, pack);
206 0 : q = newAssignment(smb);
207 0 : if (q == NULL) {
208 0 : msg = createException(MAL, "bat.orderidx",
209 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
210 0 : goto bailout;
211 : }
212 0 : q->barrier = EXITsymbol;
213 0 : q->argv[0] = loopvar;
214 0 : pushInstruction(smb, q);
215 0 : pushEndInstruction(smb);
216 0 : if (smb->errors)
217 0 : goto bailout;
218 0 : msg = chkProgram(cntxt->usermodule, smb);
219 0 : if (msg)
220 0 : goto bailout;
221 : /* evaluate MAL block and keep the ordered OID bat */
222 0 : newstk = prepareMALstack(smb, smb->vsize);
223 0 : if (newstk == NULL) {
224 0 : msg = createException(MAL, "bat.orderidx",
225 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
226 0 : goto bailout;
227 : }
228 0 : newstk->up = 0;
229 0 : newstk->stk[arg].vtype = b->ttype;
230 0 : newstk->stk[arg].bat = true;
231 0 : newstk->stk[arg].val.bval = b->batCacheid;
232 0 : BBPretain(newstk->stk[arg].val.bval);
233 0 : msg = runMALsequence(cntxt, smb, 1, 0, newstk, 0, 0);
234 0 : freeStack(newstk);
235 : /* get rid of temporary MAL block */
236 0 : bailout:
237 0 : if (msg == MAL_SUCCEED && smb->errors) {
238 0 : msg = smb->errors;
239 0 : smb->errors = NULL;
240 : }
241 0 : freeSymbol(snew);
242 0 : return msg;
243 : }
244 :
245 : static str
246 6 : OIDXcreate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
247 : {
248 6 : BAT *b;
249 6 : str msg = MAL_SUCCEED;
250 6 : int pieces = -1;
251 :
252 6 : if (pci->argc == 3) {
253 6 : pieces = stk->stk[pci->argv[2]].val.ival;
254 6 : if (pieces < 0)
255 0 : throw(MAL, "bat.orderidx", "Positive number expected");
256 : }
257 :
258 6 : b = BATdescriptor(*getArgReference_bat(stk, pci, 1));
259 6 : if (b == NULL)
260 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
261 6 : msg = OIDXcreateImplementation(cntxt, getArgType(mb, pci, 1), b, pieces);
262 6 : BBPunfix(b->batCacheid);
263 6 : return msg;
264 : }
265 :
266 : static str
267 0 : OIDXhasorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
268 : {
269 0 : BAT *b;
270 0 : bit *ret = getArgReference_bit(stk, pci, 0);
271 0 : bat bid = *getArgReference_bat(stk, pci, 1);
272 :
273 0 : (void) cntxt;
274 0 : (void) mb;
275 :
276 0 : b = BATdescriptor(bid);
277 0 : if (b == NULL)
278 0 : throw(MAL, "bat.hasorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
279 :
280 0 : *ret = b->torderidx != NULL;
281 :
282 0 : BBPunfix(b->batCacheid);
283 0 : return MAL_SUCCEED;
284 : }
285 :
286 : static str
287 6 : OIDXgetorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
288 : {
289 6 : BAT *b;
290 6 : BAT *bn;
291 6 : bat *ret = getArgReference_bat(stk, pci, 0);
292 6 : bat bid = *getArgReference_bat(stk, pci, 1);
293 :
294 6 : (void) cntxt;
295 6 : (void) mb;
296 :
297 6 : b = BATdescriptor(bid);
298 6 : if (b == NULL)
299 0 : throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
300 :
301 6 : if (!BATcheckorderidx(b)) {
302 0 : BBPunfix(b->batCacheid);
303 0 : throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
304 : }
305 :
306 6 : if ((bn = COLnew(0, TYPE_oid, BATcount(b), TRANSIENT)) == NULL) {
307 0 : BBPunfix(b->batCacheid);
308 0 : throw(MAL, "bat.getorderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
309 : }
310 6 : memcpy(Tloc(bn, 0), (const oid *) b->torderidx->base + ORDERIDXOFF,
311 6 : BATcount(b) * SIZEOF_OID);
312 6 : BATsetcount(bn, BATcount(b));
313 6 : bn->tkey = true;
314 6 : bn->tsorted = bn->trevsorted = BATcount(b) <= 1;
315 6 : bn->tnil = false;
316 6 : bn->tnonil = true;
317 6 : *ret = bn->batCacheid;
318 6 : BBPkeepref(bn);
319 6 : BBPunfix(b->batCacheid);
320 6 : return MAL_SUCCEED;
321 : }
322 :
323 : static str
324 0 : OIDXorderidx(bat *ret, const bat *bid, const bit *stable)
325 : {
326 0 : BAT *b;
327 :
328 0 : (void) ret;
329 0 : b = BATdescriptor(*bid);
330 0 : if (b == NULL)
331 0 : throw(MAL, "algebra.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
332 :
333 0 : if (BATorderidx(b, *stable) != GDK_SUCCEED) {
334 0 : BBPunfix(*bid);
335 0 : throw(MAL, "algebra.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
336 : }
337 0 : *ret = *bid;
338 0 : BBPkeepref(b);
339 0 : return MAL_SUCCEED;
340 : }
341 :
342 : /*
343 : * Merge the collection of sorted OID BATs into one
344 : */
345 :
346 : static str
347 0 : OIDXmerge(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
348 : {
349 0 : bat bid;
350 0 : BAT *b;
351 0 : BAT **a;
352 0 : int i, j, n_ar;
353 0 : BUN m_sz;
354 0 : gdk_return rc;
355 :
356 0 : (void) cntxt;
357 0 : (void) mb;
358 :
359 0 : if (pci->retc != 1)
360 0 : throw(MAL, "bat.orderidx",
361 : SQLSTATE(HY002) "INTERNAL ERROR, retc != 1 ");
362 0 : if (pci->argc < 2)
363 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) "INTERNAL ERROR, argc != 2");
364 :
365 0 : n_ar = pci->argc - 2;
366 :
367 0 : bid = *getArgReference_bat(stk, pci, 1);
368 0 : b = BATdescriptor(bid);
369 0 : if (b == NULL)
370 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
371 :
372 0 : if (b->torderidx) {
373 0 : BBPunfix(bid);
374 0 : throw(MAL, "bat.orderidx",
375 : SQLSTATE(HY002) "INTERNAL ERROR, torderidx already set");
376 : }
377 :
378 0 : switch (ATOMbasetype(b->ttype)) {
379 : case TYPE_bte:
380 : case TYPE_sht:
381 : case TYPE_int:
382 : case TYPE_lng:
383 : #ifdef HAVE_HGE
384 : case TYPE_hge:
385 : #endif
386 : case TYPE_flt:
387 : case TYPE_dbl:
388 0 : break;
389 0 : case TYPE_str:
390 : /* TODO: support strings etc. */
391 : case TYPE_void:
392 : case TYPE_ptr:
393 : default:
394 0 : BBPunfix(bid);
395 0 : throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
396 : }
397 :
398 0 : if ((a = (BAT **) GDKmalloc(n_ar * sizeof(BAT *))) == NULL) {
399 0 : BBPunfix(bid);
400 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
401 : }
402 : m_sz = 0;
403 0 : for (i = 0; i < n_ar; i++) {
404 0 : a[i] = BATdescriptor(*getArgReference_bat(stk, pci, i + 2));
405 0 : if (a[i] == NULL) {
406 0 : for (j = i - 1; j >= 0; j--) {
407 0 : BBPunfix(a[j]->batCacheid);
408 : }
409 0 : GDKfree(a);
410 0 : BBPunfix(bid);
411 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
412 : }
413 0 : m_sz += BATcount(a[i]);
414 0 : if (BATcount(a[i]) == 0) {
415 0 : BBPunfix(a[i]->batCacheid);
416 0 : a[i] = NULL;
417 : }
418 : }
419 0 : assert(m_sz == BATcount(b));
420 0 : for (i = 0; i < n_ar; i++) {
421 0 : if (a[i] == NULL) {
422 0 : n_ar--;
423 0 : if (i < n_ar)
424 0 : a[i] = a[n_ar];
425 0 : i--;
426 : }
427 : }
428 0 : if (m_sz != BATcount(b)) {
429 : BBPunfix(bid);
430 : for (i = 0; i < n_ar; i++)
431 : BBPunfix(a[i]->batCacheid);
432 : GDKfree(a);
433 : throw(MAL, "bat.orderidx", "count mismatch");
434 : }
435 :
436 0 : rc = GDKmergeidx(b, a, n_ar);
437 :
438 0 : for (i = 0; i < n_ar; i++)
439 0 : BBPunfix(a[i]->batCacheid);
440 0 : GDKfree(a);
441 0 : BBPunfix(bid);
442 :
443 0 : if (rc != GDK_SUCCEED)
444 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
445 :
446 : return MAL_SUCCEED;
447 : }
448 :
449 : #include "mel.h"
450 : mel_func orderidx_init_funcs[] = {
451 : pattern("bat", "orderidx", OIDXcreate, false, "Introduces the OID index arrangement of ordered values", args(1,2, arg("",void),batargany("bv",1))),
452 : pattern("bat", "orderidx", OIDXcreate, false, "Introduces the OID index arrangement of ordered values", args(1,3, arg("",void),batargany("bv",1),arg("pieces",int))),
453 : pattern("bat", "orderidx", OIDXmerge, false, "Consolidates the OID index arrangement", args(1,3, arg("",void),batargany("bv",1),batvarargany("l",1))),
454 : pattern("bat", "hasorderidx", OIDXhasorderidx, false, "Return true if order index exists", args(1,2, arg("",bit),batargany("bv",1))),
455 : pattern("bat", "getorderidx", OIDXgetorderidx, false, "Return the order index if it exists", args(1,2, batarg("",oid),batargany("bv",1))),
456 : command("algebra", "orderidx", OIDXorderidx, false, "Create an order index", args(1,3, batargany("",1),batargany("bv",1),arg("stable",bit))),
457 : { .imp=NULL }
458 : };
459 : #include "mal_import.h"
460 : #ifdef _MSC_VER
461 : #undef read
462 : #pragma section(".CRT$XCU",read)
463 : #endif
464 325 : LIB_STARTUP_FUNC(init_orderidx_mal)
465 325 : { mal_module("orderidx", NULL, orderidx_init_funcs); }
|