Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024, 2025 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "opt_mitosis.h"
15 : #include "mal_interpreter.h"
16 : #include "gdk_utils.h"
17 :
18 : #define MIN_PART_SIZE 100000 /* minimal record count per partition */
19 : #define MAX_PARTS2THREADS_RATIO 4 /* There should be at most this multiple more of partitions then threads */
20 :
21 :
22 : str
23 409490 : OPTmitosisImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
24 : InstrPtr pci)
25 : {
26 409490 : int i, j, limit, slimit, estimate = 0, pieces = 1, mito_parts = 0,
27 409490 : mito_size = 0, row_size = 0, mt = -1, nr_cols = 0, nr_aggrs = 0,
28 409490 : nr_maps = 0;
29 409490 : str schema = 0, table = 0;
30 409490 : BUN r = 0, rowcnt = 0; /* table should be sizeable to consider parallel execution */
31 409490 : InstrPtr p, q, *old, target = 0;
32 409490 : size_t argsize = 6 * sizeof(lng), m = 0;
33 : /* estimate size per operator estimate: 4 args + 2 res */
34 409490 : int threads = GDKnr_threads ? GDKnr_threads : 1, maxparts = MAXSLICES;
35 409490 : str msg = MAL_SUCCEED;
36 :
37 : /* if the user has associated limitation on the number of threads, respect it in the
38 : * generation of the number of partitions. Beware, they may lead to larger pieces, it only
39 : * limits the CPU power */
40 409490 : if (cntxt->workerlimit)
41 0 : threads = cntxt->workerlimit;
42 409490 : (void) cntxt;
43 409490 : (void) stk;
44 :
45 409490 : old = mb->stmt;
46 16444361 : for (i = 1; i < mb->stop; i++) {
47 16037529 : InstrPtr p = old[i];
48 :
49 16037529 : if (getModuleId(p) == sqlRef && getFunctionId(p) == assertRef
50 9220 : && p->argc > 2 && getArgType(mb, p, 2) == TYPE_str
51 9220 : && isVarConstant(mb, getArg(p, 2))
52 9220 : && getVarConstant(mb, getArg(p, 2)).val.sval != NULL
53 9220 : &&
54 9220 : (strstr(getVarConstant(mb, getArg(p, 2)).val.sval,
55 : "PRIMARY KEY constraint")
56 8009 : || strstr(getVarConstant(mb, getArg(p, 2)).val.sval,
57 : "UNIQUE constraint"))) {
58 1279 : pieces = 0;
59 1279 : goto bailout;
60 : }
61 :
62 : /* mitosis/mergetable bailout conditions */
63 : /* Crude protection against self join explosion */
64 16036250 : if (p->retc == 2 && isMatJoinOp(p))
65 16036232 : maxparts = threads;
66 :
67 16036232 : nr_aggrs += (p->argc > 2 && getModuleId(p) == aggrRef);
68 16036232 : nr_maps += (isMapOp(p));
69 :
70 16036227 : if ((getModuleId(p) == algebraRef &&
71 1820840 : getFunctionId(p) == groupedfirstnRef) ||
72 16036222 : (p->argc > 2 && getModuleId(p) == aggrRef
73 6978 : && getFunctionId(p) != subcountRef && getFunctionId(p) != subminRef
74 2976 : && getFunctionId(p) != submaxRef && getFunctionId(p) != subavgRef
75 2466 : && getFunctionId(p) != subsumRef && getFunctionId(p) != subprodRef
76 943 : && getFunctionId(p) != countRef && getFunctionId(p) != minRef
77 325 : && getFunctionId(p) != maxRef && getFunctionId(p) != avgRef
78 273 : && getFunctionId(p) != sumRef && getFunctionId(p) != prodRef)) {
79 278 : pieces = 0;
80 278 : goto bailout;
81 : }
82 :
83 : /* rtree functions should not be optimized by mitosis
84 : * (single-threaded execution) */
85 16035949 : if (getModuleId(p) == rtreeRef) {
86 0 : pieces = 0;
87 0 : goto bailout;
88 : }
89 :
90 : /* do not split up floating point bat that is being summed */
91 16035949 : if (p->retc == 1 &&
92 15337954 : (((p->argc == 5 || p->argc == 6)
93 1001875 : && getModuleId(p) == aggrRef
94 5939 : && getFunctionId(p) == subsumRef)
95 15336435 : || (p->argc == 4
96 655860 : && getModuleId(p) == aggrRef
97 0 : && getFunctionId(p) == sumRef))
98 1519 : && isaBatType(getArgType(mb, p, p->retc))
99 1519 : && (getBatType(getArgType(mb, p, p->retc)) == TYPE_flt
100 : || getBatType(getArgType(mb, p, p->retc)) == TYPE_dbl)) {
101 14 : pieces = 0;
102 14 : goto bailout;
103 : }
104 :
105 16035935 : if (p->argc > 2
106 8504715 : && (getModuleId(p) == capiRef || getModuleId(p) == rapiRef
107 8504684 : || getModuleId(p) == pyapi3Ref)
108 143 : && getFunctionId(p) == subeval_aggrRef) {
109 26 : pieces = 0;
110 26 : goto bailout;
111 : }
112 :
113 : /* Mergetable cannot handle intersect/except's for now */
114 16035909 : if (getModuleId(p) == algebraRef && getFunctionId(p) == groupbyRef) {
115 1038 : pieces = 0;
116 1038 : goto bailout;
117 : }
118 :
119 : /* locate the largest non-partitioned table */
120 16034871 : if (getModuleId(p) != sqlRef
121 1206787 : || (getFunctionId(p) != bindRef && getFunctionId(p) != bindidxRef
122 678220 : && getFunctionId(p) != tidRef))
123 15374110 : continue;
124 : /* don't split insert BATs */
125 660761 : if (p->argc > 5 && getVarConstant(mb, getArg(p, 5)).val.ival == 1)
126 0 : continue;
127 660761 : if (p->argc > 6)
128 146849 : continue; /* already partitioned */
129 : /*
130 : * The SQL optimizer already collects the counts of the base
131 : * table and passes them on as a row property. All pieces for a
132 : * single subplan should ideally fit together.
133 : */
134 513912 : r = getRowCnt(mb, getArg(p, 0));
135 513912 : if (r == rowcnt)
136 258712 : nr_cols++;
137 513912 : if (r > rowcnt) {
138 : /* the rowsize depends on the column types, assume void-headed */
139 51121 : row_size = ATOMsize(getBatType(getArgType(mb, p, 0)));
140 51121 : rowcnt = r;
141 51121 : nr_cols = 1;
142 51121 : target = p;
143 51121 : estimate++;
144 51121 : r = 0;
145 : }
146 : }
147 406832 : if (target == 0) {
148 358874 : pieces = 0;
149 358874 : goto bailout;
150 : }
151 : /*
152 : * The number of pieces should be based on the footprint of the
153 : * query plan, such that preferably it can be handled without
154 : * swapping intermediates. For the time being we just go for pieces
155 : * that fit into memory in isolation. A fictive rowcount is derived
156 : * based on argument types, such that all pieces would fit into
157 : * memory conveniently for processing. We attempt to use not more
158 : * threads than strictly needed.
159 : * Experience shows that the pieces should not be too small.
160 : * If we should limit to |threads| is still an open issue.
161 : *
162 : * Take into account the number of client connections,
163 : * because all user together are responsible for resource contentions
164 : */
165 47958 : MT_lock_set(&mal_contextLock);
166 47971 : cntxt->idle = 0; // this one is definitely not idle
167 47971 : MT_lock_unset(&mal_contextLock);
168 :
169 : /* improve memory usage estimation */
170 47971 : if (nr_cols > 1 || nr_aggrs > 1 || nr_maps > 1)
171 47870 : argsize = (nr_cols + nr_aggrs + nr_maps) * sizeof(lng);
172 : /* We haven't assigned the number of pieces.
173 : * Determine the memory available for this client
174 : */
175 :
176 : /* respect the memory limit size set for the user
177 : * and determine the column part size
178 : */
179 47971 : m = GDK_mem_maxsize / MCactiveClients(); /* use temporarily */
180 47970 : if (cntxt->memorylimit > 0 && (size_t) cntxt->memorylimit << 20 < m)
181 0 : m = ((size_t) cntxt->memorylimit << 20) / argsize;
182 47970 : else if (cntxt->maxmem > 0 && cntxt->maxmem < (lng) m)
183 0 : m = (size_t) (cntxt->maxmem / argsize);
184 : else
185 47970 : m = m / argsize;
186 :
187 : /* if data exceeds memory size,
188 : * i.e., (rowcnt*argsize > GDK_mem_maxsize),
189 : * i.e., (rowcnt > GDK_mem_maxsize/argsize = m) */
190 47970 : if (rowcnt > m && m / threads > 0) {
191 : /* create |pieces| > |threads| partitions such that
192 : * |threads| partitions at a time fit in memory,
193 : * i.e., (threads*(rowcnt/pieces) <= m),
194 : * i.e., (rowcnt/pieces <= m/threads),
195 : * i.e., (pieces => rowcnt/(m/threads))
196 : * (assuming that (m > threads*MIN_PART_SIZE)) */
197 : /* the number of pieces affects SF-100, going beyond 8x increases
198 : * the optimizer costs beyond the execution time
199 : */
200 0 : pieces = ((int) ceil((double) rowcnt / (m / threads)));
201 0 : if (pieces <= threads)
202 : pieces = threads;
203 47970 : } else if (rowcnt > MIN_PART_SIZE) {
204 : /* exploit parallelism, but ensure minimal partition size to
205 : * limit overhead */
206 189 : pieces = MIN((int) ceil((double) rowcnt / MIN_PART_SIZE),
207 : MAX_PARTS2THREADS_RATIO * threads);
208 : }
209 :
210 : /* when testing, always aim for full parallelism, but avoid
211 : * empty pieces */
212 47970 : FORCEMITODEBUG if (pieces < threads)
213 47712 : pieces = (int) MIN((BUN) threads, rowcnt);
214 : /* prevent plan explosion */
215 47970 : if (pieces > maxparts)
216 : pieces = maxparts;
217 : /* to enable experimentation we introduce the option to set
218 : * the number of partitions required and/or the size of each chunk (in K)
219 : */
220 47970 : mito_parts = GDKgetenv_int("mito_parts", 0);
221 47971 : if (mito_parts > 0)
222 0 : pieces = mito_parts;
223 47971 : mito_size = GDKgetenv_int("mito_size", 0);
224 47971 : if (mito_size > 0)
225 0 : pieces = (int) ((rowcnt * row_size) / (mito_size * 1024));
226 :
227 47971 : if (pieces <= 1) {
228 398 : pieces = 0;
229 398 : goto bailout;
230 : }
231 :
232 : /* at this stage we have identified the #chunks to be used for the largest table */
233 47573 : limit = mb->stop;
234 47573 : slimit = mb->ssize;
235 47573 : if (newMalBlkStmt(mb, mb->stop + 2 * estimate) < 0)
236 0 : throw(MAL, "optimizer.mitosis", SQLSTATE(HY013) MAL_MALLOC_FAIL);
237 47573 : estimate = 0;
238 :
239 47573 : schema = getVarConstant(mb, getArg(target, 2)).val.sval;
240 47573 : table = getVarConstant(mb, getArg(target, 3)).val.sval;
241 4465232 : for (i = 0; mb->errors == NULL && i < limit; i++) {
242 4417664 : int upd = 0, qtpe, rtpe = 0, qv, rv;
243 4417664 : InstrPtr matq, matr = NULL;
244 4417664 : p = old[i];
245 :
246 4417664 : if (getModuleId(p) != sqlRef
247 930959 : || !(getFunctionId(p) == bindRef || getFunctionId(p) == bindidxRef
248 : || getFunctionId(p) == tidRef)) {
249 3801909 : pushInstruction(mb, p);
250 3801893 : continue;
251 : }
252 : /* don't split insert BATs */
253 615755 : if (p->argc == 6 && getVarConstant(mb, getArg(p, 5)).val.ival == 1) {
254 0 : pushInstruction(mb, p);
255 0 : continue;
256 : }
257 615755 : r = getRowCnt(mb, getArg(p, 0));
258 615755 : if (r < rowcnt) {
259 316247 : pushInstruction(mb, p);
260 316247 : continue;
261 : }
262 : /* Don't split the (index) bat if we already have identified a range */
263 : /* This will happen if we inline separately optimized routines */
264 299508 : if (p->argc > 7) {
265 0 : pushInstruction(mb, p);
266 0 : continue;
267 : }
268 299508 : if (p->retc == 2)
269 64467 : upd = 1;
270 299996 : if (mt < 0
271 299508 : && (strcmp(schema, getVarConstant(mb, getArg(p, 2 + upd)).val.sval)
272 299508 : || strcmp(table,
273 299508 : getVarConstant(mb, getArg(p, 3 + upd)).val.sval))) {
274 482 : pushInstruction(mb, p);
275 488 : continue;
276 : }
277 : /* we keep the original bind operation, because it allows for
278 : * easy undo when the mergtable can not do something */
279 : // pushInstruction(mb, p);
280 :
281 299026 : qtpe = getVarType(mb, getArg(p, 0));
282 :
283 299026 : matq = newInstructionArgs(NULL, matRef, newRef, pieces + 1);
284 299021 : if (matq == NULL) {
285 0 : msg = createException(MAL, "optimizer.mitosis",
286 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
287 0 : break;
288 : }
289 299021 : getArg(matq, 0) = getArg(p, 0);
290 :
291 299021 : if (upd) {
292 64467 : matr = newInstructionArgs(NULL, matRef, newRef, pieces + 1);
293 64467 : if (matr == NULL) {
294 0 : freeInstruction(matq);
295 0 : msg = createException(MAL, "optimizer.mitosis",
296 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
297 0 : break;
298 : }
299 64467 : getArg(matr, 0) = getArg(p, 1);
300 64467 : rtpe = getVarType(mb, getArg(p, 1));
301 : }
302 :
303 2549838 : for (j = 0; j < pieces; j++) {
304 2250826 : q = copyInstruction(p);
305 2251265 : if (q == NULL) {
306 0 : freeInstruction(matr);
307 0 : freeInstruction(matq);
308 0 : for (; i < limit; i++)
309 0 : if (old[i])
310 0 : pushInstruction(mb, old[i]);
311 0 : GDKfree(old);
312 0 : throw(MAL, "optimizer.mitosis",
313 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
314 : }
315 2251265 : q = pushInt(mb, q, j);
316 2250098 : q = pushInt(mb, q, pieces);
317 :
318 2250366 : qv = getArg(q, 0) = newTmpVariable(mb, qtpe);
319 2250798 : if (upd) {
320 474099 : rv = getArg(q, 1) = newTmpVariable(mb, rtpe);
321 : }
322 2250798 : pushInstruction(mb, q);
323 2250812 : matq = pushArgument(mb, matq, qv);
324 2250817 : if (upd)
325 474099 : matr = pushArgument(mb, matr, rv);
326 : }
327 299012 : pushInstruction(mb, matq);
328 299016 : if (upd)
329 64467 : pushInstruction(mb, matr);
330 299016 : freeInstruction(p);
331 : }
332 9464118 : for (; i < slimit; i++)
333 9416546 : if (old[i])
334 0 : pushInstruction(mb, old[i]);
335 47572 : GDKfree(old);
336 :
337 : /* Defense line against incorrect plans */
338 47572 : if (msg == MAL_SUCCEED) {
339 47572 : msg = chkTypes(cntxt->usermodule, mb, FALSE);
340 47570 : if (msg == MAL_SUCCEED) {
341 47570 : msg = chkFlow(mb);
342 47570 : if (msg == MAL_SUCCEED)
343 47570 : msg = chkDeclarations(mb);
344 : }
345 : }
346 0 : bailout:
347 : /* keep actions taken as a fake argument */
348 409480 : (void) pushInt(mb, pci, pieces);
349 409480 : return msg;
350 : }
|