Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "opt_mitosis.h"
15 : #include "mal_interpreter.h"
16 : #include "gdk_utils.h"
17 :
18 : #define MIN_PART_SIZE 100000 /* minimal record count per partition */
19 : #define MAX_PARTS2THREADS_RATIO 4 /* There should be at most this multiple more of partitions then threads */
20 :
21 :
22 : str
23 344267 : OPTmitosisImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
24 : InstrPtr pci)
25 : {
26 344267 : int i, j, limit, slimit, estimate = 0, pieces = 1, mito_parts = 0,
27 344267 : mito_size = 0, row_size = 0, mt = -1, nr_cols = 0, nr_aggrs = 0,
28 344267 : nr_maps = 0;
29 344267 : str schema = 0, table = 0;
30 344267 : BUN r = 0, rowcnt = 0; /* table should be sizeable to consider parallel execution */
31 344267 : InstrPtr p, q, *old, target = 0;
32 344267 : size_t argsize = 6 * sizeof(lng), m = 0;
33 : /* estimate size per operator estimate: 4 args + 2 res */
34 344267 : int threads = GDKnr_threads ? GDKnr_threads : 1, maxparts = MAXSLICES;
35 344267 : str msg = MAL_SUCCEED;
36 :
37 : /* if the user has associated limitation on the number of threads, respect it in the
38 : * generation of the number of partitions. Beware, they may lead to larger pieces, it only
39 : * limits the CPU power */
40 344267 : if (cntxt->workerlimit)
41 0 : threads = cntxt->workerlimit;
42 344267 : (void) cntxt;
43 344267 : (void) stk;
44 :
45 344267 : old = mb->stmt;
46 13957543 : for (i = 1; i < mb->stop; i++) {
47 13615862 : InstrPtr p = old[i];
48 :
49 13615862 : if (getModuleId(p) == sqlRef && getFunctionId(p) == assertRef
50 9132 : && p->argc > 2 && getArgType(mb, p, 2) == TYPE_str
51 9132 : && isVarConstant(mb, getArg(p, 2))
52 9132 : && getVarConstant(mb, getArg(p, 2)).val.sval != NULL
53 9132 : &&
54 9132 : (strstr(getVarConstant(mb, getArg(p, 2)).val.sval,
55 : "PRIMARY KEY constraint")
56 7944 : || strstr(getVarConstant(mb, getArg(p, 2)).val.sval,
57 : "UNIQUE constraint"))) {
58 1256 : pieces = 0;
59 1256 : goto bailout;
60 : }
61 :
62 : /* mitosis/mergetable bailout conditions */
63 : /* Crude protection against self join explosion */
64 13614606 : if (p->retc == 2 && isMatJoinOp(p))
65 13614606 : maxparts = threads;
66 :
67 13614606 : nr_aggrs += (p->argc > 2 && getModuleId(p) == aggrRef);
68 13614606 : nr_maps += (isMapOp(p));
69 :
70 13614602 : if (p->argc > 2 && getModuleId(p) == aggrRef
71 6738 : && getFunctionId(p) != subcountRef && getFunctionId(p) != subminRef
72 3050 : && getFunctionId(p) != submaxRef && getFunctionId(p) != subavgRef
73 2552 : && getFunctionId(p) != subsumRef && getFunctionId(p) != subprodRef
74 1000 : && getFunctionId(p) != countRef && getFunctionId(p) != minRef
75 300 : && getFunctionId(p) != maxRef && getFunctionId(p) != avgRef
76 248 : && getFunctionId(p) != sumRef && getFunctionId(p) != prodRef) {
77 248 : pieces = 0;
78 248 : goto bailout;
79 : }
80 :
81 : /* rtree functions should not be optimized by mitosis
82 : * (single-threaded execution) */
83 13614354 : if (getModuleId(p) == rtreeRef) {
84 0 : pieces = 0;
85 0 : goto bailout;
86 : }
87 :
88 : /* do not split up floating point bat that is being summed */
89 13614354 : if (p->retc == 1 &&
90 13015827 : (((p->argc == 5 || p->argc == 6)
91 905653 : && getModuleId(p) == aggrRef
92 5642 : && getFunctionId(p) == subsumRef)
93 13014280 : || (p->argc == 4
94 573339 : && getModuleId(p) == aggrRef
95 0 : && getFunctionId(p) == sumRef))
96 1547 : && isaBatType(getArgType(mb, p, p->retc))
97 1547 : && (getBatType(getArgType(mb, p, p->retc)) == TYPE_flt
98 1547 : || getBatType(getArgType(mb, p, p->retc)) == TYPE_dbl)) {
99 16 : pieces = 0;
100 16 : goto bailout;
101 : }
102 :
103 13614338 : if (p->argc > 2
104 7395827 : && (getModuleId(p) == capiRef || getModuleId(p) == rapiRef
105 7395797 : || getModuleId(p) == pyapi3Ref)
106 140 : && getFunctionId(p) == subeval_aggrRef) {
107 25 : pieces = 0;
108 25 : goto bailout;
109 : }
110 :
111 : /* Mergetable cannot handle intersect/except's for now */
112 13614313 : if (getModuleId(p) == algebraRef && getFunctionId(p) == groupbyRef) {
113 1037 : pieces = 0;
114 1037 : goto bailout;
115 : }
116 :
117 : /* locate the largest non-partitioned table */
118 13613276 : if (getModuleId(p) != sqlRef
119 1106679 : || (getFunctionId(p) != bindRef && getFunctionId(p) != bindidxRef
120 626523 : && getFunctionId(p) != tidRef))
121 13008520 : continue;
122 : /* don't split insert BATs */
123 604756 : if (p->argc > 5 && getVarConstant(mb, getArg(p, 5)).val.ival == 1)
124 0 : continue;
125 604756 : if (p->argc > 6)
126 123055 : continue; /* already partitioned */
127 : /*
128 : * The SQL optimizer already collects the counts of the base
129 : * table and passes them on as a row property. All pieces for a
130 : * single subplan should ideally fit together.
131 : */
132 481701 : r = getRowCnt(mb, getArg(p, 0));
133 481701 : if (r == rowcnt)
134 226171 : nr_cols++;
135 481701 : if (r > rowcnt) {
136 : /* the rowsize depends on the column types, assume void-headed */
137 69439 : row_size = ATOMsize(getBatType(getArgType(mb, p, 0)));
138 69439 : rowcnt = r;
139 69439 : nr_cols = 1;
140 69439 : target = p;
141 69439 : estimate++;
142 69439 : r = 0;
143 : }
144 : }
145 341681 : if (target == 0) {
146 295865 : pieces = 0;
147 295865 : goto bailout;
148 : }
149 : /*
150 : * The number of pieces should be based on the footprint of the
151 : * queryplan, such that preferrably it can be handled without
152 : * swapping intermediates. For the time being we just go for pieces
153 : * that fit into memory in isolation. A fictive rowcount is derived
154 : * based on argument types, such that all pieces would fit into
155 : * memory conveniently for processing. We attempt to use not more
156 : * threads than strictly needed.
157 : * Experience shows that the pieces should not be too small.
158 : * If we should limit to |threads| is still an open issue.
159 : *
160 : * Take into account the number of client connections,
161 : * because all user together are responsible for resource contentions
162 : */
163 45816 : MT_lock_set(&mal_contextLock);
164 45818 : cntxt->idle = 0; // this one is definitely not idle
165 45818 : MT_lock_unset(&mal_contextLock);
166 :
167 : /* improve memory usage estimation */
168 45818 : if (nr_cols > 1 || nr_aggrs > 1 || nr_maps > 1)
169 44539 : argsize = (nr_cols + nr_aggrs + nr_maps) * sizeof(lng);
170 : /* We haven't assigned the number of pieces.
171 : * Determine the memory available for this client
172 : */
173 :
174 : /* respect the memory limit size set for the user
175 : * and determine the column part size
176 : */
177 45818 : m = GDK_mem_maxsize / MCactiveClients(); /* use temporarily */
178 45818 : if (cntxt->memorylimit > 0 && (size_t) cntxt->memorylimit << 20 < m)
179 0 : m = ((size_t) cntxt->memorylimit << 20) / argsize;
180 45818 : else if (cntxt->maxmem > 0 && cntxt->maxmem < (lng) m)
181 0 : m = (size_t) (cntxt->maxmem / argsize);
182 : else
183 45818 : m = m / argsize;
184 :
185 : /* if data exceeds memory size,
186 : * i.e., (rowcnt*argsize > GDK_mem_maxsize),
187 : * i.e., (rowcnt > GDK_mem_maxsize/argsize = m) */
188 45818 : if (rowcnt > m && m / threads > 0) {
189 : /* create |pieces| > |threads| partitions such that
190 : * |threads| partitions at a time fit in memory,
191 : * i.e., (threads*(rowcnt/pieces) <= m),
192 : * i.e., (rowcnt/pieces <= m/threads),
193 : * i.e., (pieces => rowcnt/(m/threads))
194 : * (assuming that (m > threads*MIN_PART_SIZE)) */
195 : /* the number of pieces affects SF-100, going beyond 8x increases
196 : * the optimizer costs beyond the execution time
197 : */
198 0 : pieces = ((int) ceil((double) rowcnt / (m / threads)));
199 0 : if (pieces <= threads)
200 : pieces = threads;
201 45818 : } else if (rowcnt > MIN_PART_SIZE) {
202 : /* exploit parallelism, but ensure minimal partition size to
203 : * limit overhead */
204 195 : pieces = MIN((int) ceil((double) rowcnt / MIN_PART_SIZE),
205 : MAX_PARTS2THREADS_RATIO * threads);
206 : }
207 :
208 : /* when testing, always aim for full parallelism, but avoid
209 : * empty pieces */
210 45818 : FORCEMITODEBUG if (pieces < threads)
211 45543 : pieces = (int) MIN((BUN) threads, rowcnt);
212 : /* prevent plan explosion */
213 45818 : if (pieces > maxparts)
214 : pieces = maxparts;
215 : /* to enable experimentation we introduce the option to set
216 : * the number of partitions required and/or the size of each chunk (in K)
217 : */
218 45818 : mito_parts = GDKgetenv_int("mito_parts", 0);
219 45818 : if (mito_parts > 0)
220 0 : pieces = mito_parts;
221 45818 : mito_size = GDKgetenv_int("mito_size", 0);
222 45818 : if (mito_size > 0)
223 0 : pieces = (int) ((rowcnt * row_size) / (mito_size * 1024));
224 :
225 45818 : if (pieces <= 1) {
226 372 : pieces = 0;
227 372 : goto bailout;
228 : }
229 :
230 : /* at this stage we have identified the #chunks to be used for the largest table */
231 45446 : limit = mb->stop;
232 45446 : slimit = mb->ssize;
233 45446 : if (newMalBlkStmt(mb, mb->stop + 2 * estimate) < 0)
234 0 : throw(MAL, "optimizer.mitosis", SQLSTATE(HY013) MAL_MALLOC_FAIL);
235 45444 : estimate = 0;
236 :
237 45444 : schema = getVarConstant(mb, getArg(target, 2)).val.sval;
238 45444 : table = getVarConstant(mb, getArg(target, 3)).val.sval;
239 4187404 : for (i = 0; mb->errors == NULL && i < limit; i++) {
240 4141963 : int upd = 0, qtpe, rtpe = 0, qv, rv;
241 4141963 : InstrPtr matq, matr = NULL;
242 4141963 : p = old[i];
243 :
244 4141963 : if (getModuleId(p) != sqlRef
245 854461 : || !(getFunctionId(p) == bindRef || getFunctionId(p) == bindidxRef
246 399558 : || getFunctionId(p) == tidRef)) {
247 3573848 : pushInstruction(mb, p);
248 3573840 : continue;
249 : }
250 : /* don't split insert BATs */
251 568115 : if (p->argc == 6 && getVarConstant(mb, getArg(p, 5)).val.ival == 1) {
252 0 : pushInstruction(mb, p);
253 0 : continue;
254 : }
255 568115 : r = getRowCnt(mb, getArg(p, 0));
256 568115 : if (r < rowcnt) {
257 307903 : pushInstruction(mb, p);
258 307903 : continue;
259 : }
260 : /* Don't split the (index) bat if we already have identified a range */
261 : /* This will happen if we inline separately optimized routines */
262 260212 : if (p->argc > 7) {
263 0 : pushInstruction(mb, p);
264 0 : continue;
265 : }
266 260212 : if (p->retc == 2)
267 55834 : upd = 1;
268 260598 : if (mt < 0
269 260212 : && (strcmp(schema, getVarConstant(mb, getArg(p, 2 + upd)).val.sval)
270 260212 : || strcmp(table,
271 260212 : getVarConstant(mb, getArg(p, 3 + upd)).val.sval))) {
272 387 : pushInstruction(mb, p);
273 386 : continue;
274 : }
275 : /* we keep the original bind operation, because it allows for
276 : * easy undo when the mergtable can not do something */
277 : // pushInstruction(mb, p);
278 :
279 259825 : qtpe = getVarType(mb, getArg(p, 0));
280 :
281 259825 : matq = newInstructionArgs(NULL, matRef, newRef, pieces + 1);
282 259828 : if (matq == NULL) {
283 0 : msg = createException(MAL, "optimizer.mitosis",
284 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
285 0 : break;
286 : }
287 259828 : getArg(matq, 0) = getArg(p, 0);
288 :
289 259828 : if (upd) {
290 55834 : matr = newInstructionArgs(NULL, matRef, newRef, pieces + 1);
291 55834 : if (matr == NULL) {
292 0 : freeInstruction(matq);
293 0 : msg = createException(MAL, "optimizer.mitosis",
294 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
295 0 : break;
296 : }
297 55834 : getArg(matr, 0) = getArg(p, 1);
298 55834 : rtpe = getVarType(mb, getArg(p, 1));
299 : }
300 :
301 1276197 : for (j = 0; j < pieces; j++) {
302 1016385 : q = copyInstruction(p);
303 1016567 : if (q == NULL) {
304 0 : freeInstruction(matr);
305 0 : freeInstruction(matq);
306 0 : for (; i < limit; i++)
307 0 : if (old[i])
308 0 : pushInstruction(mb, old[i]);
309 0 : GDKfree(old);
310 0 : throw(MAL, "optimizer.mitosis",
311 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
312 : }
313 1016567 : q = pushInt(mb, q, j);
314 1016459 : q = pushInt(mb, q, pieces);
315 :
316 1016396 : qv = getArg(q, 0) = newTmpVariable(mb, qtpe);
317 1016373 : if (upd) {
318 215328 : rv = getArg(q, 1) = newTmpVariable(mb, rtpe);
319 : }
320 1016373 : pushInstruction(mb, q);
321 1016367 : matq = pushArgument(mb, matq, qv);
322 1016369 : if (upd)
323 215328 : matr = pushArgument(mb, matr, rv);
324 : }
325 259812 : pushInstruction(mb, matq);
326 259811 : if (upd)
327 55834 : pushInstruction(mb, matr);
328 259811 : freeInstruction(p);
329 : }
330 9159880 : for (; i < slimit; i++)
331 9114434 : if (old[i])
332 0 : pushInstruction(mb, old[i]);
333 45446 : GDKfree(old);
334 :
335 : /* Defense line against incorrect plans */
336 45446 : if (msg == MAL_SUCCEED) {
337 45446 : msg = chkTypes(cntxt->usermodule, mb, FALSE);
338 45446 : if (msg == MAL_SUCCEED) {
339 45446 : msg = chkFlow(mb);
340 45446 : if (msg == MAL_SUCCEED)
341 45446 : msg = chkDeclarations(mb);
342 : }
343 : }
344 0 : bailout:
345 : /* keep actions taken as a fake argument */
346 344264 : (void) pushInt(mb, pci, pieces);
347 344264 : return msg;
348 : }
|