Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * The statemens are all checked for being eligible for dataflow.
15 : */
16 : #include "monetdb_config.h"
17 : #include "opt_dataflow.h"
18 : #include "mal_instruction.h"
19 : #include "mal_interpreter.h"
20 : #include "manifold.h"
21 :
22 : /*
23 : * Dataflow processing incurs overhead and is only
24 : * relevant if multiple tasks kan be handled at the same time.
25 : * Also simple expressions dont have to be executed in parallel.
26 : *
27 : * The dataflow analysis centers around the read/write use patterns of
28 : * the variables and the occurrence of side-effect bearing functions.
29 : * Any such function should break the dataflow block as it may rely
30 : * on the sequential order in the plan.
31 : *
32 : * The following state properties can be distinguished for all variables:
33 : * VARWRITE - variable assigned a value in the dataflow block
34 : * VARREAD - variable is used in an argument
35 : * VAR2READ - variable is read in concurrent mode
36 : * VARBLOCK - variable next use terminate the // block, set after encountering an update
37 : *
38 : * Only some combinations are allowed.
39 : */
40 :
41 : #define VARFREE 0
42 : #define VARWRITE 1
43 : #define VARREAD 2
44 : #define VARBLOCK 4
45 : #define VAR2READ 8
46 :
47 : typedef char *States;
48 :
49 : #define setState(S,P,K,F) ( assert(getArg(P,K) < vlimit), (S)[getArg(P,K)] |= F)
50 : #define getState(S,P,K) ((S)[getArg(P,K)])
51 :
52 : typedef enum {
53 : no_region,
54 : singleton_region, // always a single statement
55 : dataflow_region, // statements without or with controlled side effects, in parallel
56 : existing_region, // existing barrier..exit region, copied as-is
57 : sql_region, // region of nonconflicting sql.append/sql.updates only
58 : } region_type;
59 :
60 : typedef struct {
61 : region_type type;
62 : union {
63 : struct {
64 : int level; // level of nesting
65 : } existing_region;
66 : } st;
67 : } region_state;
68 :
69 : static bool
70 1602851 : simpleFlow(InstrPtr *old, int start, int last, region_state *state)
71 : {
72 1602851 : int i, j, k;
73 1602851 : bool simple = true;
74 1602851 : InstrPtr p = NULL, q;
75 :
76 : /* ignore trivial blocks */
77 1602851 : if (last - start == 1)
78 : return true;
79 166013 : if (state->type == existing_region) {
80 : /* don't add additional barriers and garbage collection around
81 : * existing region. */
82 : return true;
83 : }
84 : /* skip sequence of simple arithmetic first */
85 331842 : for (; simple && start < last; start++)
86 166532 : if (old[start]) {
87 166532 : p = old[start];
88 331191 : simple = getModuleId(p) == calcRef || getModuleId(p) == mtimeRef
89 331191 : || getModuleId(p) == strRef || getModuleId(p) == mmathRef;
90 : }
91 282604 : for (i = start; i < last; i++)
92 264716 : if (old[i]) {
93 264716 : q = old[i];
94 253604 : simple = getModuleId(q) == calcRef || getModuleId(q) == mtimeRef
95 518320 : || getModuleId(q) == strRef || getModuleId(q) == mmathRef;
96 264716 : if (!simple) {
97 : /* if not arithmetic than we should consume the previous result directly */
98 1489477 : for (j = q->retc; j < q->argc; j++)
99 2982321 : for (k = 0; k < p->retc; k++)
100 1746427 : if (getArg(p, k) == getArg(q, j))
101 106525 : simple = true;
102 253583 : if (!simple)
103 : return false;
104 : }
105 : p = q;
106 : }
107 : return simple;
108 : }
109 :
110 : /* Updates are permitted if it is a unique update on
111 : * a BAT created in the context of this block
112 : * As far as we know, no SQL nor MAL test re-uses the
113 : * target BAT to insert again and subsequently calls dataflow.
114 : * In MAL scripts, they still can occur.
115 : */
116 :
117 : /* a limited set of MAL instructions may appear in the dataflow block*/
118 : static int
119 9145531 : dataflowBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr p, States states)
120 : {
121 9145531 : int j;
122 :
123 9145531 : if (p->token == ENDsymbol || p->barrier || isUnsafeFunction(p)
124 9003886 : || (isMultiplex(p) && MANIFOLDtypecheck(cntxt, mb, p, 0) == NULL)) {
125 141923 : return TRUE;
126 : }
127 :
128 : /* flow blocks should be closed when we reach a point
129 : where a variable is assigned more then once or already
130 : being read.
131 : */
132 19183141 : for (j = 0; j < p->retc; j++)
133 10179654 : if (getState(states, p, j) & (VARWRITE | VARREAD | VARBLOCK)) {
134 : return 1;
135 : }
136 :
137 : /* update instructions can be updated if the target variable
138 : * has not been read in the block so far */
139 9003487 : if (isUpdateInstruction(p)) {
140 : /* the SQL update functions change BATs that are not
141 : * explicitly mentioned as arguments (and certainly not as the
142 : * first argument), but that can still be available to the MAL
143 : * program (see bugs.monetdb.org/6641) */
144 200182 : if (getModuleId(p) == sqlRef)
145 : return 1;
146 196990 : return getState(states, p, p->retc) & (VARREAD | VARBLOCK);
147 : }
148 :
149 47277562 : for (j = p->retc; j < p->argc; j++) {
150 38474197 : if (getState(states, p, j) & VARBLOCK) {
151 : return 1;
152 : }
153 : }
154 8803365 : return hasSideEffects(mb, p, FALSE);
155 : }
156 :
157 : static str
158 15896646 : get_str_arg(MalBlkPtr mb, InstrPtr p, int argno)
159 : {
160 15896646 : int var = getArg(p, argno);
161 15896646 : return getVarConstant(mb, var).val.sval;
162 : }
163 :
164 : static str
165 450577 : get_sql_sname(MalBlkPtr mb, InstrPtr p)
166 : {
167 450577 : return get_str_arg(mb, p, 2);
168 : }
169 :
170 : static str
171 450577 : get_sql_tname(MalBlkPtr mb, InstrPtr p)
172 : {
173 450577 : return get_str_arg(mb, p, 3);
174 : }
175 :
176 : static str
177 15896650 : get_sql_cname(MalBlkPtr mb, InstrPtr p)
178 : {
179 15896650 : return get_str_arg(mb, p, 4);
180 : }
181 :
182 :
183 : static bool
184 1654214 : isSqlAppendUpdate(MalBlkPtr mb, InstrPtr p)
185 : {
186 1654214 : if (p->modname != sqlRef)
187 : return false;
188 1170969 : if (p->fcnname != appendRef && p->fcnname != updateRef)
189 : return false;
190 :
191 : // pattern("sql", "append", mvc_append_wrap, false, "...", args(1,8, arg("",int),
192 : // arg("mvc",int),
193 : // arg("sname",str),
194 : // arg("tname",str),
195 : // arg("cname",str),
196 : // arg("offset",lng),
197 : // batarg("pos",oid),
198 : // argany("ins",0))),
199 :
200 : // pattern("sql", "update", mvc_update_wrap, false, "...", args(1,7, arg("",int),
201 : // arg("mvc",int),
202 : // arg("sname",str),
203 : // arg("tname",str),
204 : // arg("cname",str),
205 : // argany("rids",0),
206 : // argany("upd",0)))
207 :
208 528017 : if ((p->fcnname == appendRef && p->argc != 8)
209 528009 : || (p->fcnname == updateRef && p->argc != 7))
210 : return false;
211 :
212 528009 : int mvc_var = getArg(p, 1);
213 528009 : if (getVarType(mb, mvc_var) != TYPE_int)
214 : return false;
215 :
216 528009 : int sname_var = getArg(p, 2);
217 528009 : if (getVarType(mb, sname_var) != TYPE_str || !isVarConstant(mb, sname_var))
218 : return false;
219 :
220 528020 : int tname_var = getArg(p, 3);
221 528020 : if (getVarType(mb, tname_var) != TYPE_str || !isVarConstant(mb, tname_var))
222 : return false;
223 :
224 528008 : int cname_var = getArg(p, 4);
225 528008 : if (getVarType(mb, cname_var) != TYPE_str || !isVarConstant(mb, cname_var))
226 : return false;
227 :
228 : return true;
229 : }
230 :
231 : static bool
232 528049 : sqlBreakpoint(MalBlkPtr mb, InstrPtr *first, InstrPtr *p)
233 : {
234 528049 : InstrPtr instr = *p;
235 528049 : if (!isSqlAppendUpdate(mb, instr))
236 : return true;
237 :
238 450579 : str my_sname = get_sql_sname(mb, instr);
239 450579 : str my_tname = get_sql_tname(mb, instr);
240 450579 : str my_cname = get_sql_cname(mb, instr);
241 15896652 : for (InstrPtr *q = first; q < p; q++) {
242 15446071 : str cname = get_sql_cname(mb, *q);
243 15446071 : if (strcmp(my_cname, cname) != 0) {
244 : // different cname, no conflict
245 15446073 : continue;
246 : }
247 0 : str tname = get_sql_tname(mb, *q);
248 0 : if (strcmp(my_tname, tname) != 0) {
249 : // different tname, no conflict
250 0 : continue;
251 : }
252 0 : str sname = get_sql_sname(mb, *q);
253 0 : if (strcmp(my_sname, sname) != 0) {
254 : // different sname, no conflict
255 0 : continue;
256 : }
257 : // Found a statement in the region that works on the same column so this is a breakpoint
258 : return true;
259 : }
260 :
261 : // None of the statements in the region works on this column so no breakpoint necessary
262 : return false;
263 : }
264 :
265 : static bool
266 11028737 : checkBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr *first, InstrPtr *p,
267 : States states, region_state *state)
268 : {
269 11028737 : InstrPtr instr = *p;
270 11028737 : switch (state->type) {
271 : case singleton_region:
272 : // by definition
273 : return true;
274 9145438 : case dataflow_region:
275 9145438 : return dataflowBreakpoint(cntxt, mb, instr, states);
276 7304 : case existing_region:
277 7304 : if (state->st.existing_region.level == 0) {
278 : // previous statement ended the region so we break here
279 : return true;
280 : }
281 6601 : if (blockStart(instr)) {
282 38 : state->st.existing_region.level += 1;
283 6563 : } else if (blockExit(instr)) {
284 741 : state->st.existing_region.level -= 1;
285 : }
286 : return false;
287 528015 : case sql_region:
288 528015 : return sqlBreakpoint(mb, first, p);
289 : default:
290 : // serious corruption has occurred.
291 0 : assert(0); /* corrupted region_type */
292 : abort();
293 : }
294 : assert(0); /* unreachable */
295 : return true;
296 : }
297 :
298 : static void
299 1127150 : decideRegionType(Client cntxt, MalBlkPtr mb, InstrPtr p, States states,
300 : region_state *state)
301 : {
302 1127150 : (void) cntxt;
303 :
304 1127150 : state->type = no_region;
305 1127150 : if (blockStart(p)) {
306 703 : state->type = existing_region;
307 703 : state->st.existing_region.level = 1;
308 1126447 : } else if (p->token == ENDsymbol) {
309 0 : state->type = existing_region;
310 1126447 : } else if (isSqlAppendUpdate(mb, p)) {
311 77456 : state->type = sql_region;
312 1048991 : } else if (p->barrier) {
313 611 : state->type = singleton_region;
314 1048380 : } else if (isUnsafeFunction(p)) {
315 386904 : state->type = singleton_region;
316 661757 : } else if (isUpdateInstruction(p)
317 36 : && getModuleId(p) != sqlRef
318 20 : && (getState(states, p, p->retc) & (VARREAD | VARBLOCK)) == 0) {
319 : // Special case. Unless they're from the sql module, instructions with
320 : // names like 'append', 'update', 'delete', 'grow', etc., are expected
321 : // to express their side effects as data dependencies, for example,
322 : // X5 := bat.append(X_5, ...)
323 20 : state->type = dataflow_region;
324 661514 : } else if (hasSideEffects(mb, p, false)) {
325 484075 : state->type = singleton_region;
326 177579 : } else if (isMultiplex(p)) {
327 766 : state->type = singleton_region;
328 : } else {
329 176771 : state->type = dataflow_region;
330 : }
331 1127306 : assert(state->type != no_region);
332 1127306 : }
333 :
334 :
335 : /* dataflow blocks are transparent, because they are always
336 : executed, either sequentially or in parallel */
337 :
338 : str
339 475641 : OPTdataflowImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
340 : InstrPtr pci)
341 : {
342 475641 : int i, j, k, start, slimit, breakpoint, actions = 0;
343 475641 : bool simple = true;
344 475641 : int flowblock = 0;
345 475641 : InstrPtr p, *old = NULL, q;
346 475641 : int limit, vlimit;
347 475641 : States states = NULL;
348 475641 : region_state state = { singleton_region };
349 475641 : str msg = MAL_SUCCEED;
350 :
351 : /* don't use dataflow on single processor systems */
352 475641 : if (GDKnr_threads <= 1 || cntxt->workerlimit == 1)
353 0 : goto wrapup;
354 :
355 475641 : if (optimizerIsApplied(mb, dataflowRef))
356 0 : goto wrapup;
357 475624 : (void) stk;
358 : /* inlined functions will get their dataflow control later */
359 475624 : if (mb->inlineProp)
360 0 : goto wrapup;
361 :
362 475624 : vlimit = mb->vsize;
363 475624 : states = (States) GDKzalloc(vlimit * sizeof(char));
364 475695 : if (states == NULL) {
365 0 : throw(MAL, "optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
366 : }
367 :
368 475695 : setVariableScope(mb);
369 :
370 475636 : limit = mb->stop;
371 475636 : slimit = mb->ssize;
372 475636 : old = mb->stmt;
373 475636 : if (newMalBlkStmt(mb, mb->ssize) < 0) {
374 0 : GDKfree(states);
375 0 : throw(MAL, "optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
376 : }
377 :
378 : /* inject new dataflow barriers using a single pass through the program */
379 475695 : start = 0;
380 475695 : state.type = singleton_region;
381 11027470 : for (i = 1; mb->errors == NULL && i < limit; i++) {
382 11027470 : p = old[i];
383 11027470 : assert(p);
384 11027470 : breakpoint = checkBreakpoint(cntxt, mb, &old[start], &old[i], states, &state);
385 11027175 : if (breakpoint) {
386 : /* close previous flow block */
387 1602604 : simple = simpleFlow(old, start, i, &state);
388 :
389 1602604 : if (!simple) {
390 147401 : if ((flowblock = newTmpVariable(mb, TYPE_bit)) < 0
391 147381 : || (q = newFcnCall(mb, languageRef, dataflowRef)) == NULL) {
392 0 : msg = createException(MAL, "optimizer.dataflow",
393 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
394 0 : break;
395 : }
396 147454 : q->barrier = BARRIERsymbol;
397 147454 : getArg(q, 0) = flowblock;
398 147454 : pushInstruction(mb, q);
399 147454 : actions++;
400 : }
401 : // copyblock the collected statements
402 12629869 : for (j = start; j < i; j++) {
403 11027111 : q = old[j];
404 11027111 : pushInstruction(mb, q);
405 11027227 : old[j] = NULL;
406 : // collect BAT variables garbage collected within the block
407 11027227 : if (!simple)
408 52204693 : for (k = q->retc; k < q->argc; k++) {
409 42675802 : if (getState(states, q, k) & VAR2READ
410 12613848 : && getEndScope(mb, getArg(q, k)) == j
411 2120749 : && isaBatType(getVarType(mb, getArg(q, k)))) {
412 2056420 : InstrPtr r;
413 2056420 : r = newInstruction(NULL, languageRef, passRef);
414 2056429 : if (r == NULL) {
415 0 : msg = createException(MAL, "optimizer.dataflow",
416 : SQLSTATE(HY013)
417 : MAL_MALLOC_FAIL);
418 0 : break;
419 : }
420 2056429 : getArg(r, 0) = newTmpVariable(mb, TYPE_void);
421 2056427 : if (getArg(r, 0) < 0) {
422 0 : freeInstruction(r);
423 0 : msg = createException(MAL, "optimizer.dataflow",
424 : SQLSTATE(HY013)
425 : MAL_MALLOC_FAIL);
426 0 : break;
427 : }
428 2056427 : r = pushArgument(mb, r, getArg(q, k));
429 2056428 : pushInstruction(mb, r);
430 : }
431 : }
432 11027212 : if (msg)
433 : break;
434 : }
435 1602758 : if (msg)
436 : break;
437 : /* exit parallel block */
438 1602758 : if (!simple) {
439 147448 : q = newAssignment(mb);
440 147452 : if (q == NULL) {
441 0 : msg = createException(MAL, "optimizer.dataflow",
442 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
443 0 : break;
444 : }
445 147452 : q->barrier = EXITsymbol;
446 147452 : getArg(q, 0) = flowblock;
447 147452 : pushInstruction(mb, q);
448 : }
449 1602762 : if (p->token == ENDsymbol) {
450 : break;
451 : }
452 : // Start a new region
453 1127126 : memset((char *) states, 0, vlimit * sizeof(char));
454 1127126 : start = i;
455 1127126 : decideRegionType(cntxt, mb, p, states, &state);
456 : }
457 : // remember you assigned/read variables
458 22370764 : for (k = 0; k < p->retc; k++)
459 11819215 : setState(states, p, k, VARWRITE);
460 10551549 : if (isUpdateInstruction(p)
461 889285 : && (getState(states, p, 1) == 0
462 719576 : || getState(states, p, 1) & VARWRITE))
463 445947 : setState(states, p, 1, VARBLOCK);
464 57120693 : for (k = p->retc; k < p->argc; k++)
465 46568918 : if (!isVarConstant(mb, getArg(p, k))) {
466 22414479 : if (getState(states, p, k) & VARREAD)
467 10517139 : setState(states, p, k, VAR2READ);
468 11897340 : else if (getState(states, p, k) & VARWRITE)
469 9427247 : setState(states, p, k, VARREAD);
470 : }
471 : }
472 :
473 : /* take the remainder as is */
474 116208684 : for (; i < slimit; i++)
475 115732997 : if (old[i])
476 13813944 : pushInstruction(mb, old[i]);
477 : /* Defense line against incorrect plans */
478 475687 : if (msg == MAL_SUCCEED && actions > 0) {
479 135356 : msg = chkTypes(cntxt->usermodule, mb, FALSE);
480 135341 : if (msg == MAL_SUCCEED) {
481 135341 : msg = chkFlow(mb);
482 135326 : if (msg == MAL_SUCCEED)
483 135334 : msg = chkDeclarations(mb);
484 : }
485 : }
486 340323 : wrapup:
487 : /* keep actions taken as a fake argument */
488 475673 : (void) pushInt(mb, pci, actions);
489 :
490 475634 : if (states)
491 475634 : GDKfree(states);
492 475697 : if (old)
493 475697 : GDKfree(old);
494 : return msg;
495 : }
|