Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * (c) Martin Kersten, Lefteris Sidirourgos
15 : * Implement a parallel sort-merge MAL program generator
16 : */
17 : #include "monetdb_config.h"
18 : #include "orderidx.h"
19 : #include "gdk.h"
20 : #include "mal_exception.h"
21 : #include "mal_function.h"
22 :
23 : #define MIN_PIECE ((BUN) 1000) /* TODO use realistic size in production */
24 :
25 : str
26 0 : OIDXdropImplementation(Client cntxt, BAT *b)
27 : {
28 0 : (void) cntxt;
29 0 : OIDXdestroy(b);
30 0 : return MAL_SUCCEED;
31 : }
32 :
33 : str
34 83 : OIDXcreateImplementation(Client cntxt, int tpe, BAT *b, int pieces)
35 : {
36 83 : int i, loopvar, arg;
37 83 : BUN cnt, step = 0, o;
38 83 : MalBlkPtr smb;
39 83 : MalStkPtr newstk;
40 83 : Symbol snew = NULL;
41 83 : InstrPtr q, pack;
42 83 : char name[IDLENGTH];
43 83 : str msg = MAL_SUCCEED;
44 :
45 83 : if (BATcount(b) <= 1)
46 : return MAL_SUCCEED;
47 :
48 : /* check if b is sorted, then index does not make sense */
49 74 : if (b->tsorted || b->trevsorted)
50 : return MAL_SUCCEED;
51 :
52 : /* check if b already has index */
53 74 : if (BATcheckorderidx(b))
54 : return MAL_SUCCEED;
55 :
56 132 : switch (ATOMbasetype(b->ttype)) {
57 : case TYPE_void:
58 : /* trivially supported */
59 : return MAL_SUCCEED;
60 53 : case TYPE_bte:
61 : case TYPE_sht:
62 : case TYPE_int:
63 : case TYPE_lng:
64 : #ifdef HAVE_HGE
65 : case TYPE_hge:
66 : #endif
67 : case TYPE_flt:
68 : case TYPE_dbl:
69 53 : if (GDKnr_threads > 1 && BATcount(b) >= 2 * MIN_PIECE
70 0 : && (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) == 0)
71 : break;
72 : /* fall through */
73 : default:
74 73 : if (BATorderidx(b, true) != GDK_SUCCEED)
75 0 : throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
76 : return MAL_SUCCEED;
77 : }
78 :
79 0 : if (pieces <= 0) {
80 0 : if (GDKnr_threads <= 1) {
81 : pieces = 1;
82 0 : } else if (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) {
83 : /* we want many pieces, even tiny ones */
84 0 : if (BATcount(b) < 4)
85 : pieces = 1;
86 0 : else if (BATcount(b) / 2 < (BUN) GDKnr_threads)
87 0 : pieces = (int) (BATcount(b) / 2);
88 : else
89 : pieces = GDKnr_threads;
90 : } else {
91 0 : if (BATcount(b) < 2 * MIN_PIECE)
92 : pieces = 1;
93 0 : else if (BATcount(b) / MIN_PIECE < (BUN) GDKnr_threads)
94 0 : pieces = (int) (BATcount(b) / MIN_PIECE);
95 : else
96 : pieces = GDKnr_threads;
97 : }
98 0 : } else if (BATcount(b) < (BUN) pieces || BATcount(b) < MIN_PIECE) {
99 0 : pieces = 1;
100 : }
101 :
102 : /* create a temporary MAL function to sort the BAT in parallel */
103 0 : snprintf(name, IDLENGTH, "sort%d", rand() % 1000);
104 0 : snew = newFunction(userRef, putName(name), FUNCTIONsymbol);
105 0 : if (snew == NULL) {
106 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
107 : }
108 0 : smb = snew->def;
109 0 : q = getInstrPtr(smb, 0);
110 0 : if ((arg = newTmpVariable(smb, tpe)) < 0) {
111 0 : msg = createException(MAL, "bat.orderidx",
112 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
113 0 : goto bailout;
114 : }
115 0 : q = pushArgument(smb, q, arg);
116 0 : if ((getArg(q, 0) = newTmpVariable(smb, TYPE_void)) < 0) {
117 0 : msg = createException(MAL, "bat.orderidx",
118 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
119 0 : goto bailout;
120 : }
121 :
122 0 : if (resizeMalBlk(smb, 2 * pieces + 10) < 0)
123 0 : goto bailout; // large enough
124 : /* create the pack instruction first, as it will hold
125 : * intermediate variables */
126 0 : pack = newInstruction(0, batRef, putName("orderidx"));
127 0 : if (pack == NULL || (pack->argv[0] = newTmpVariable(smb, TYPE_void)) < 0) {
128 0 : freeInstruction(pack);
129 0 : msg = createException(MAL, "bat.orderidx",
130 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
131 0 : goto bailout;
132 : }
133 0 : pack = pushArgument(smb, pack, arg);
134 0 : if (smb->errors) {
135 0 : freeInstruction(pack);
136 0 : msg = smb->errors;
137 0 : smb->errors = NULL;
138 0 : goto bailout;
139 : }
140 0 : setVarFixed(smb, getArg(pack, 0));
141 :
142 : /* the costly part executed as a parallel block */
143 0 : if ((loopvar = newTmpVariable(smb, TYPE_bit)) < 0) {
144 0 : freeInstruction(pack);
145 0 : msg = createException(MAL, "bat.orderidx",
146 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
147 0 : goto bailout;
148 : }
149 0 : q = newStmt(smb, languageRef, dataflowRef);
150 0 : if (q == NULL) {
151 0 : freeInstruction(pack);
152 0 : msg = createException(MAL, "bat.orderidx",
153 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
154 0 : goto bailout;
155 : }
156 0 : q->barrier = BARRIERsymbol;
157 0 : q->argv[0] = loopvar;
158 0 : pushInstruction(smb, q);
159 :
160 0 : cnt = BATcount(b);
161 0 : step = cnt / pieces;
162 0 : o = 0;
163 0 : for (i = 0; smb->errors == NULL && i < pieces; i++) {
164 : /* add slice instruction */
165 0 : q = newInstruction(smb, algebraRef, sliceRef);
166 0 : if (q == NULL || (setDestVar(q, newTmpVariable(smb, TYPE_any))) < 0) {
167 0 : freeInstruction(q);
168 0 : freeInstruction(pack);
169 0 : msg = createException(MAL, "bat.orderidx",
170 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
171 0 : goto bailout;
172 : }
173 0 : setVarType(smb, getArg(q, 0), tpe);
174 0 : setVarFixed(smb, getArg(q, 0));
175 0 : q = pushArgument(smb, q, arg);
176 0 : pack = pushArgument(smb, pack, getArg(q, 0));
177 0 : q = pushOid(smb, q, o);
178 0 : if (i == pieces - 1) {
179 : o = cnt;
180 : } else {
181 0 : o += step;
182 : }
183 0 : q = pushOid(smb, q, o - 1);
184 0 : pushInstruction(smb, q);
185 : }
186 0 : for (i = 0; smb->errors == NULL && i < pieces; i++) {
187 : /* add sort instruction */
188 0 : q = newInstruction(smb, algebraRef, putName("orderidx"));
189 0 : if (q == NULL || (setDestVar(q, newTmpVariable(smb, TYPE_any))) < 0) {
190 0 : freeInstruction(q);
191 0 : freeInstruction(pack);
192 0 : msg = createException(MAL, "bat.orderidx",
193 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
194 0 : goto bailout;
195 : }
196 0 : setVarType(smb, getArg(q, 0), tpe);
197 0 : setVarFixed(smb, getArg(q, 0));
198 0 : q = pushArgument(smb, q, pack->argv[2 + i]);
199 0 : q = pushBit(smb, q, 1);
200 0 : pack->argv[2 + i] = getArg(q, 0);
201 0 : pushInstruction(smb, q);
202 : }
203 : /* finalize OID packing, check, and evaluate */
204 0 : pushInstruction(smb, pack);
205 0 : q = newAssignment(smb);
206 0 : if (q == NULL) {
207 0 : msg = createException(MAL, "bat.orderidx",
208 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
209 0 : goto bailout;
210 : }
211 0 : q->barrier = EXITsymbol;
212 0 : q->argv[0] = loopvar;
213 0 : pushInstruction(smb, q);
214 0 : pushEndInstruction(smb);
215 0 : if (smb->errors)
216 0 : goto bailout;
217 0 : msg = chkProgram(cntxt->usermodule, smb);
218 0 : if (msg)
219 0 : goto bailout;
220 : /* evaluate MAL block and keep the ordered OID bat */
221 0 : newstk = prepareMALstack(smb, smb->vsize);
222 0 : if (newstk == NULL) {
223 0 : msg = createException(MAL, "bat.orderidx",
224 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
225 0 : goto bailout;
226 : }
227 0 : newstk->up = 0;
228 0 : newstk->stk[arg].vtype = b->ttype;
229 0 : newstk->stk[arg].bat = true;
230 0 : newstk->stk[arg].val.bval = b->batCacheid;
231 0 : BBPretain(newstk->stk[arg].val.bval);
232 0 : msg = runMALsequence(cntxt, smb, 1, 0, newstk, 0, 0);
233 0 : freeStack(newstk);
234 : /* get rid of temporary MAL block */
235 0 : bailout:
236 0 : if (msg == MAL_SUCCEED && smb->errors) {
237 0 : msg = smb->errors;
238 0 : smb->errors = NULL;
239 : }
240 0 : freeSymbol(snew);
241 0 : return msg;
242 : }
243 :
244 : static str
245 6 : OIDXcreate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
246 : {
247 6 : BAT *b;
248 6 : str msg = MAL_SUCCEED;
249 6 : int pieces = -1;
250 :
251 6 : if (pci->argc == 3) {
252 6 : pieces = stk->stk[pci->argv[2]].val.ival;
253 6 : if (pieces < 0)
254 0 : throw(MAL, "bat.orderidx", "Positive number expected");
255 : }
256 :
257 6 : b = BATdescriptor(*getArgReference_bat(stk, pci, 1));
258 6 : if (b == NULL)
259 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
260 6 : msg = OIDXcreateImplementation(cntxt, getArgType(mb, pci, 1), b, pieces);
261 6 : BBPunfix(b->batCacheid);
262 6 : return msg;
263 : }
264 :
265 : static str
266 0 : OIDXhasorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
267 : {
268 0 : BAT *b;
269 0 : bit *ret = getArgReference_bit(stk, pci, 0);
270 0 : bat bid = *getArgReference_bat(stk, pci, 1);
271 :
272 0 : (void) cntxt;
273 0 : (void) mb;
274 :
275 0 : b = BATdescriptor(bid);
276 0 : if (b == NULL)
277 0 : throw(MAL, "bat.hasorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
278 :
279 0 : *ret = b->torderidx != NULL;
280 :
281 0 : BBPunfix(b->batCacheid);
282 0 : return MAL_SUCCEED;
283 : }
284 :
285 : static str
286 6 : OIDXgetorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
287 : {
288 6 : BAT *b;
289 6 : BAT *bn;
290 6 : bat *ret = getArgReference_bat(stk, pci, 0);
291 6 : bat bid = *getArgReference_bat(stk, pci, 1);
292 :
293 6 : (void) cntxt;
294 6 : (void) mb;
295 :
296 6 : b = BATdescriptor(bid);
297 6 : if (b == NULL)
298 0 : throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
299 :
300 6 : if (!BATcheckorderidx(b)) {
301 0 : BBPunfix(b->batCacheid);
302 0 : throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
303 : }
304 :
305 6 : if ((bn = COLnew(0, TYPE_oid, BATcount(b), TRANSIENT)) == NULL) {
306 0 : BBPunfix(b->batCacheid);
307 0 : throw(MAL, "bat.getorderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
308 : }
309 6 : memcpy(Tloc(bn, 0), (const oid *) b->torderidx->base + ORDERIDXOFF,
310 6 : BATcount(b) * SIZEOF_OID);
311 6 : BATsetcount(bn, BATcount(b));
312 6 : bn->tkey = true;
313 6 : bn->tsorted = bn->trevsorted = BATcount(b) <= 1;
314 6 : bn->tnil = false;
315 6 : bn->tnonil = true;
316 6 : *ret = bn->batCacheid;
317 6 : BBPkeepref(bn);
318 6 : BBPunfix(b->batCacheid);
319 6 : return MAL_SUCCEED;
320 : }
321 :
322 : static str
323 0 : OIDXorderidx(bat *ret, const bat *bid, const bit *stable)
324 : {
325 0 : BAT *b;
326 :
327 0 : (void) ret;
328 0 : b = BATdescriptor(*bid);
329 0 : if (b == NULL)
330 0 : throw(MAL, "algebra.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
331 :
332 0 : if (BATorderidx(b, *stable) != GDK_SUCCEED) {
333 0 : BBPunfix(*bid);
334 0 : throw(MAL, "algebra.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
335 : }
336 0 : *ret = *bid;
337 0 : BBPkeepref(b);
338 0 : return MAL_SUCCEED;
339 : }
340 :
341 : /*
342 : * Merge the collection of sorted OID BATs into one
343 : */
344 :
345 : static str
346 0 : OIDXmerge(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
347 : {
348 0 : bat bid;
349 0 : BAT *b;
350 0 : BAT **a;
351 0 : int i, j, n_ar;
352 0 : BUN m_sz;
353 0 : gdk_return rc;
354 :
355 0 : (void) cntxt;
356 0 : (void) mb;
357 :
358 0 : if (pci->retc != 1)
359 0 : throw(MAL, "bat.orderidx",
360 : SQLSTATE(HY002) "INTERNAL ERROR, retc != 1 ");
361 0 : if (pci->argc < 2)
362 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) "INTERNAL ERROR, argc != 2");
363 :
364 0 : n_ar = pci->argc - 2;
365 :
366 0 : bid = *getArgReference_bat(stk, pci, 1);
367 0 : b = BATdescriptor(bid);
368 0 : if (b == NULL)
369 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
370 :
371 0 : if (b->torderidx) {
372 0 : BBPunfix(bid);
373 0 : throw(MAL, "bat.orderidx",
374 : SQLSTATE(HY002) "INTERNAL ERROR, torderidx already set");
375 : }
376 :
377 0 : switch (ATOMbasetype(b->ttype)) {
378 : case TYPE_bte:
379 : case TYPE_sht:
380 : case TYPE_int:
381 : case TYPE_lng:
382 : #ifdef HAVE_HGE
383 : case TYPE_hge:
384 : #endif
385 : case TYPE_flt:
386 : case TYPE_dbl:
387 0 : break;
388 0 : case TYPE_str:
389 : /* TODO: support strings etc. */
390 : case TYPE_void:
391 : case TYPE_ptr:
392 : default:
393 0 : BBPunfix(bid);
394 0 : throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
395 : }
396 :
397 0 : if ((a = (BAT **) GDKmalloc(n_ar * sizeof(BAT *))) == NULL) {
398 0 : BBPunfix(bid);
399 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
400 : }
401 : m_sz = 0;
402 0 : for (i = 0; i < n_ar; i++) {
403 0 : a[i] = BATdescriptor(*getArgReference_bat(stk, pci, i + 2));
404 0 : if (a[i] == NULL) {
405 0 : for (j = i - 1; j >= 0; j--) {
406 0 : BBPunfix(a[j]->batCacheid);
407 : }
408 0 : GDKfree(a);
409 0 : BBPunfix(bid);
410 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
411 : }
412 0 : m_sz += BATcount(a[i]);
413 0 : if (BATcount(a[i]) == 0) {
414 0 : BBPunfix(a[i]->batCacheid);
415 0 : a[i] = NULL;
416 : }
417 : }
418 0 : assert(m_sz == BATcount(b));
419 0 : for (i = 0; i < n_ar; i++) {
420 0 : if (a[i] == NULL) {
421 0 : n_ar--;
422 0 : if (i < n_ar)
423 0 : a[i] = a[n_ar];
424 0 : i--;
425 : }
426 : }
427 0 : if (m_sz != BATcount(b)) {
428 : BBPunfix(bid);
429 : for (i = 0; i < n_ar; i++)
430 : BBPunfix(a[i]->batCacheid);
431 : GDKfree(a);
432 : throw(MAL, "bat.orderidx", "count mismatch");
433 : }
434 :
435 0 : rc = GDKmergeidx(b, a, n_ar);
436 :
437 0 : for (i = 0; i < n_ar; i++)
438 0 : BBPunfix(a[i]->batCacheid);
439 0 : GDKfree(a);
440 0 : BBPunfix(bid);
441 :
442 0 : if (rc != GDK_SUCCEED)
443 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
444 :
445 : return MAL_SUCCEED;
446 : }
447 :
448 : #include "mel.h"
449 : mel_func orderidx_init_funcs[] = {
450 : pattern("bat", "orderidx", OIDXcreate, false, "Introduces the OID index arrangement of ordered values", args(1,2, arg("",void),batargany("bv",1))),
451 : pattern("bat", "orderidx", OIDXcreate, false, "Introduces the OID index arrangement of ordered values", args(1,3, arg("",void),batargany("bv",1),arg("pieces",int))),
452 : pattern("bat", "orderidx", OIDXmerge, false, "Consolidates the OID index arrangement", args(1,3, arg("",void),batargany("bv",1),batvarargany("l",1))),
453 : pattern("bat", "hasorderidx", OIDXhasorderidx, false, "Return true if order index exists", args(1,2, arg("",bit),batargany("bv",1))),
454 : pattern("bat", "getorderidx", OIDXgetorderidx, false, "Return the order index if it exists", args(1,2, batarg("",oid),batargany("bv",1))),
455 : command("algebra", "orderidx", OIDXorderidx, false, "Create an order index", args(1,3, batargany("",1),batargany("bv",1),arg("stable",bit))),
456 : { .imp=NULL }
457 : };
458 : #include "mal_import.h"
459 : #ifdef _MSC_VER
460 : #undef read
461 : #pragma section(".CRT$XCU",read)
462 : #endif
463 345 : LIB_STARTUP_FUNC(init_orderidx_mal)
464 345 : { mal_module("orderidx", NULL, orderidx_init_funcs); }
|