Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * The statements are all checked for being eligible for dataflow.
15 : */
16 : #include "monetdb_config.h"
17 : #include "opt_dataflow.h"
18 : #include "mal_instruction.h"
19 : #include "mal_interpreter.h"
20 : #include "manifold.h"
21 :
22 : /*
23 : * Dataflow processing incurs overhead and is only
24 : * relevant if multiple tasks kan be handled at the same time.
25 : * Also simple expressions dont have to be executed in parallel.
26 : *
27 : * The dataflow analysis centers around the read/write use patterns of
28 : * the variables and the occurrence of side-effect bearing functions.
29 : * Any such function should break the dataflow block as it may rely
30 : * on the sequential order in the plan.
31 : *
32 : * The following state properties can be distinguished for all variables:
33 : * VARWRITE - variable assigned a value in the dataflow block
34 : * VARREAD - variable is used in an argument
35 : * VAR2READ - variable is read in concurrent mode
36 : * VARBLOCK - variable next use terminate the // block, set after encountering an update
37 : *
38 : * Only some combinations are allowed.
39 : */
40 :
41 : #define VARFREE 0
42 : #define VARWRITE 1
43 : #define VARREAD 2
44 : #define VARBLOCK 4
45 : #define VAR2READ 8
46 :
47 : typedef char *States;
48 :
49 : #define setState(S,P,K,F) ( assert(getArg(P,K) < vlimit), (S)[getArg(P,K)] |= F)
50 : #define getState(S,P,K) ((S)[getArg(P,K)])
51 :
52 : typedef enum {
53 : no_region,
54 : singleton_region, // always a single statement
55 : dataflow_region, // statements without or with controlled side effects, in parallel
56 : existing_region, // existing barrier..exit region, copied as-is
57 : sql_region, // region of nonconflicting sql.append/sql.updates only
58 : } region_type;
59 :
60 : typedef struct {
61 : region_type type;
62 : union {
63 : struct {
64 : int level; // level of nesting
65 : } existing_region;
66 : } st;
67 : } region_state;
68 :
69 : static bool
70 1600489 : simpleFlow(InstrPtr *old, int start, int last, region_state *state)
71 : {
72 1600489 : int i, j, k;
73 1600489 : bool simple = true;
74 1600489 : InstrPtr p = NULL, q;
75 :
76 : /* ignore trivial blocks */
77 1600489 : if (last - start == 1)
78 : return true;
79 162789 : if (state->type == existing_region) {
80 : /* don't add additional barriers and garbage collection around
81 : * existing region. */
82 : return true;
83 : }
84 : /* skip sequence of simple arithmetic first */
85 325250 : for (; simple && start < last; start++)
86 163280 : if (old[start]) {
87 163280 : p = old[start];
88 324563 : simple = getModuleId(p) == calcRef || getModuleId(p) == mtimeRef
89 324563 : || getModuleId(p) == strRef || getModuleId(p) == mmathRef;
90 : }
91 285320 : for (i = start; i < last; i++)
92 266794 : if (old[i]) {
93 266794 : q = old[i];
94 254867 : simple = getModuleId(q) == calcRef || getModuleId(q) == mtimeRef
95 521661 : || getModuleId(q) == strRef || getModuleId(q) == mmathRef;
96 266794 : if (!simple) {
97 : /* if not arithmetic than we should consume the previous result directly */
98 1475486 : for (j = q->retc; j < q->argc; j++)
99 2951769 : for (k = 0; k < p->retc; k++)
100 1731131 : if (getArg(p, k) == getArg(q, j))
101 111837 : simple = true;
102 254848 : if (!simple)
103 : return false;
104 : }
105 : p = q;
106 : }
107 : return simple;
108 : }
109 :
110 : /* Updates are permitted if it is a unique update on
111 : * a BAT created in the context of this block
112 : * As far as we know, no SQL nor MAL test re-uses the
113 : * target BAT to insert again and subsequently calls dataflow.
114 : * In MAL scripts, they still can occur.
115 : */
116 :
117 : /* a limited set of MAL instructions may appear in the dataflow block*/
118 : static int
119 10764312 : dataflowBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr p, States states)
120 : {
121 10764312 : int j;
122 :
123 10764312 : if (p->token == ENDsymbol || p->barrier || isUnsafeFunction(p)
124 10628239 : || (isMultiplex(p) && MANIFOLDtypecheck(cntxt, mb, p, 0) == NULL)) {
125 136873 : return TRUE;
126 : }
127 :
128 : /* flow blocks should be closed when we reach a point
129 : where a variable is assigned more then once or already
130 : being read.
131 : */
132 22583116 : for (j = 0; j < p->retc; j++)
133 11955704 : if (getState(states, p, j) & (VARWRITE | VARREAD | VARBLOCK)) {
134 : return 1;
135 : }
136 :
137 : /* update instructions can be updated if the target variable
138 : * has not been read in the block so far */
139 10627412 : if (isUpdateInstruction(p)) {
140 : /* the SQL update functions change BATs that are not
141 : * explicitly mentioned as arguments (and certainly not as the
142 : * first argument), but that can still be available to the MAL
143 : * program (see bugs.monetdb.org/6641) */
144 312195 : if (getModuleId(p) == sqlRef)
145 : return 1;
146 309334 : return getState(states, p, p->retc) & (VARREAD | VARBLOCK);
147 : }
148 :
149 54550914 : for (j = p->retc; j < p->argc; j++) {
150 44235789 : if (getState(states, p, j) & VARBLOCK) {
151 : return 1;
152 : }
153 : }
154 10315125 : return hasSideEffects(mb, p, FALSE);
155 : }
156 :
157 : static str
158 15426351 : get_str_arg(MalBlkPtr mb, InstrPtr p, int argno)
159 : {
160 15426351 : int var = getArg(p, argno);
161 15426351 : return getVarConstant(mb, var).val.sval;
162 : }
163 :
164 : static str
165 368636 : get_sql_sname(MalBlkPtr mb, InstrPtr p)
166 : {
167 368636 : return get_str_arg(mb, p, 2);
168 : }
169 :
170 : static str
171 368636 : get_sql_tname(MalBlkPtr mb, InstrPtr p)
172 : {
173 368636 : return get_str_arg(mb, p, 3);
174 : }
175 :
176 : static str
177 15426355 : get_sql_cname(MalBlkPtr mb, InstrPtr p)
178 : {
179 15426355 : return get_str_arg(mb, p, 4);
180 : }
181 :
182 :
183 : static bool
184 1554319 : isSqlAppendUpdate(MalBlkPtr mb, InstrPtr p)
185 : {
186 1554319 : if (p->modname != sqlRef)
187 : return false;
188 1046413 : if (p->fcnname != appendRef && p->fcnname != updateRef)
189 : return false;
190 :
191 : // pattern("sql", "append", mvc_append_wrap, false, "...", args(1,8, arg("",int),
192 : // arg("mvc",int),
193 : // arg("sname",str),
194 : // arg("tname",str),
195 : // arg("cname",str),
196 : // arg("offset",lng),
197 : // batarg("pos",oid),
198 : // argany("ins",0))),
199 :
200 : // pattern("sql", "update", mvc_update_wrap, false, "...", args(1,7, arg("",int),
201 : // arg("mvc",int),
202 : // arg("sname",str),
203 : // arg("tname",str),
204 : // arg("cname",str),
205 : // argany("rids",0),
206 : // argany("upd",0)))
207 :
208 436644 : if ((p->fcnname == appendRef && p->argc != 8)
209 436654 : || (p->fcnname == updateRef && p->argc != 7))
210 : return false;
211 :
212 436654 : int mvc_var = getArg(p, 1);
213 436654 : if (getVarType(mb, mvc_var) != TYPE_int)
214 : return false;
215 :
216 436654 : int sname_var = getArg(p, 2);
217 436654 : if (getVarType(mb, sname_var) != TYPE_str || !isVarConstant(mb, sname_var))
218 : return false;
219 :
220 436682 : int tname_var = getArg(p, 3);
221 436682 : if (getVarType(mb, tname_var) != TYPE_str || !isVarConstant(mb, tname_var))
222 : return false;
223 :
224 436682 : int cname_var = getArg(p, 4);
225 436682 : if (getVarType(mb, cname_var) != TYPE_str || !isVarConstant(mb, cname_var))
226 : return false;
227 :
228 : return true;
229 : }
230 :
231 : static bool
232 436687 : sqlBreakpoint(MalBlkPtr mb, InstrPtr *first, InstrPtr *p)
233 : {
234 436687 : InstrPtr instr = *p;
235 436687 : if (!isSqlAppendUpdate(mb, instr))
236 : return true;
237 :
238 368638 : str my_sname = get_sql_sname(mb, instr);
239 368638 : str my_tname = get_sql_tname(mb, instr);
240 368638 : str my_cname = get_sql_cname(mb, instr);
241 15426357 : for (InstrPtr *q = first; q < p; q++) {
242 15057717 : str cname = get_sql_cname(mb, *q);
243 15057717 : if (strcmp(my_cname, cname) != 0) {
244 : // different cname, no conflict
245 15057719 : continue;
246 : }
247 0 : str tname = get_sql_tname(mb, *q);
248 0 : if (strcmp(my_tname, tname) != 0) {
249 : // different tname, no conflict
250 0 : continue;
251 : }
252 0 : str sname = get_sql_sname(mb, *q);
253 0 : if (strcmp(my_sname, sname) != 0) {
254 : // different sname, no conflict
255 0 : continue;
256 : }
257 : // Found a statement in the region that works on the same column so this is a breakpoint
258 : return true;
259 : }
260 :
261 : // None of the statements in the region works on this column so no breakpoint necessary
262 : return false;
263 : }
264 :
265 : static bool
266 12565032 : checkBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr *first, InstrPtr *p,
267 : States states, region_state *state)
268 : {
269 12565032 : InstrPtr instr = *p;
270 12565032 : switch (state->type) {
271 : case singleton_region:
272 : // by definition
273 : return true;
274 10764158 : case dataflow_region:
275 10764158 : return dataflowBreakpoint(cntxt, mb, instr, states);
276 6744 : case existing_region:
277 6744 : if (state->st.existing_region.level == 0) {
278 : // previous statement ended the region so we break here
279 : return true;
280 : }
281 5925 : if (blockStart(instr)) {
282 38 : state->st.existing_region.level += 1;
283 5887 : } else if (blockExit(instr)) {
284 857 : state->st.existing_region.level -= 1;
285 : }
286 : return false;
287 436647 : case sql_region:
288 436647 : return sqlBreakpoint(mb, first, p);
289 : default:
290 : // serious corruption has occurred.
291 0 : assert(0); /* corrupted region_type */
292 : abort();
293 : }
294 : assert(0); /* unreachable */
295 : return true;
296 : }
297 :
298 : static void
299 1118648 : decideRegionType(Client cntxt, MalBlkPtr mb, InstrPtr p, States states,
300 : region_state *state)
301 : {
302 1118648 : (void) cntxt;
303 :
304 1118648 : state->type = no_region;
305 1118648 : if (blockStart(p)) {
306 819 : state->type = existing_region;
307 819 : state->st.existing_region.level = 1;
308 1117829 : } else if (p->token == ENDsymbol) {
309 0 : state->type = existing_region;
310 1117829 : } else if (isSqlAppendUpdate(mb, p)) {
311 68034 : state->type = sql_region;
312 1049795 : } else if (p->barrier) {
313 644 : state->type = singleton_region;
314 1049151 : } else if (isUnsafeFunction(p)) {
315 365605 : state->type = singleton_region;
316 683674 : } else if (isUpdateInstruction(p)
317 36 : && getModuleId(p) != sqlRef
318 20 : && (getState(states, p, p->retc) & (VARREAD | VARBLOCK)) == 0) {
319 : // Special case. Unless they're from the sql module, instructions with
320 : // names like 'append', 'update', 'delete', 'grow', etc., are expected
321 : // to express their side effects as data dependencies, for example,
322 : // X5 := bat.append(X_5, ...)
323 20 : state->type = dataflow_region;
324 683557 : } else if (hasSideEffects(mb, p, false)) {
325 505286 : state->type = singleton_region;
326 178209 : } else if (isMultiplex(p)) {
327 4166 : state->type = singleton_region;
328 : } else {
329 174071 : state->type = dataflow_region;
330 : }
331 1118645 : assert(state->type != no_region);
332 1118645 : }
333 :
334 :
335 : /* dataflow blocks are transparent, because they are always
336 : executed, either sequentially or in parallel */
337 :
338 : str
339 481633 : OPTdataflowImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
340 : InstrPtr pci)
341 : {
342 481633 : int i, j, k, start, slimit, breakpoint, actions = 0;
343 481633 : bool simple = true;
344 481633 : int flowblock = 0;
345 481633 : InstrPtr p, *old = NULL, q;
346 481633 : int limit, vlimit;
347 481633 : States states = NULL;
348 481633 : region_state state = { singleton_region };
349 481633 : str msg = MAL_SUCCEED;
350 :
351 : /* don't use dataflow on single processor systems */
352 481633 : if (GDKnr_threads <= 1 || cntxt->workerlimit == 1)
353 0 : goto wrapup;
354 :
355 481633 : if (optimizerIsApplied(mb, dataflowRef))
356 0 : goto wrapup;
357 481697 : (void) stk;
358 : /* inlined functions will get their dataflow control later */
359 481697 : if (mb->inlineProp)
360 0 : goto wrapup;
361 :
362 481697 : vlimit = mb->vsize;
363 481697 : states = (States) GDKzalloc(vlimit * sizeof(char));
364 481714 : if (states == NULL) {
365 0 : throw(MAL, "optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
366 : }
367 :
368 481714 : setVariableScope(mb);
369 :
370 481592 : limit = mb->stop;
371 481592 : slimit = mb->ssize;
372 481592 : old = mb->stmt;
373 481592 : if (newMalBlkStmt(mb, mb->ssize) < 0) {
374 0 : GDKfree(states);
375 0 : throw(MAL, "optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
376 : }
377 :
378 : /* inject new dataflow barriers using a single pass through the program */
379 481709 : start = 0;
380 481709 : state.type = singleton_region;
381 12563857 : for (i = 1; mb->errors == NULL && i < limit; i++) {
382 12563857 : p = old[i];
383 12563857 : assert(p);
384 12563857 : breakpoint = checkBreakpoint(cntxt, mb, &old[start], &old[i], states, &state);
385 12563978 : if (breakpoint) {
386 : /* close previous flow block */
387 1600191 : simple = simpleFlow(old, start, i, &state);
388 :
389 1600191 : if (!simple) {
390 143436 : if ((flowblock = newTmpVariable(mb, TYPE_bit)) < 0
391 143487 : || (q = newFcnCall(mb, languageRef, dataflowRef)) == NULL) {
392 0 : msg = createException(MAL, "optimizer.dataflow",
393 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
394 0 : break;
395 : }
396 143552 : q->barrier = BARRIERsymbol;
397 143552 : getArg(q, 0) = flowblock;
398 143552 : pushInstruction(mb, q);
399 143552 : actions++;
400 : }
401 : // copyblock the collected statements
402 14163964 : for (j = start; j < i; j++) {
403 12563553 : q = old[j];
404 12563553 : pushInstruction(mb, q);
405 12563585 : old[j] = NULL;
406 : // collect BAT variables garbage collected within the block
407 12563585 : if (!simple)
408 59314582 : for (k = q->retc; k < q->argc; k++) {
409 48252033 : if (getState(states, q, k) & VAR2READ
410 14596677 : && getEndScope(mb, getArg(q, k)) == j
411 2680979 : && isaBatType(getVarType(mb, getArg(q, k)))) {
412 2614004 : InstrPtr r;
413 2614004 : r = newInstruction(NULL, languageRef, passRef);
414 2614006 : if (r == NULL) {
415 0 : msg = createException(MAL, "optimizer.dataflow",
416 : SQLSTATE(HY013)
417 : MAL_MALLOC_FAIL);
418 0 : break;
419 : }
420 2614006 : getArg(r, 0) = newTmpVariable(mb, TYPE_void);
421 2613998 : if (getArg(r, 0) < 0) {
422 0 : freeInstruction(r);
423 0 : msg = createException(MAL, "optimizer.dataflow",
424 : SQLSTATE(HY013)
425 : MAL_MALLOC_FAIL);
426 0 : break;
427 : }
428 2613998 : r = pushArgument(mb, r, getArg(q, k));
429 2613996 : pushInstruction(mb, r);
430 : }
431 : }
432 12563657 : if (msg)
433 : break;
434 : }
435 1600411 : if (msg)
436 : break;
437 : /* exit parallel block */
438 1600411 : if (!simple) {
439 143548 : q = newAssignment(mb);
440 143548 : if (q == NULL) {
441 0 : msg = createException(MAL, "optimizer.dataflow",
442 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
443 0 : break;
444 : }
445 143548 : q->barrier = EXITsymbol;
446 143548 : getArg(q, 0) = flowblock;
447 143548 : pushInstruction(mb, q);
448 : }
449 1600411 : if (p->token == ENDsymbol) {
450 : break;
451 : }
452 : // Start a new region
453 1118729 : memset((char *) states, 0, vlimit * sizeof(char));
454 1118729 : start = i;
455 1118729 : decideRegionType(cntxt, mb, p, states, &state);
456 : }
457 : // remember you assigned/read variables
458 25575990 : for (k = 0; k < p->retc; k++)
459 13493732 : setState(states, p, k, VARWRITE);
460 12082258 : if (isUpdateInstruction(p)
461 892279 : && (getState(states, p, 1) == 0
462 737381 : || getState(states, p, 1) & VARWRITE))
463 530939 : setState(states, p, 1, VARBLOCK);
464 64289521 : for (k = p->retc; k < p->argc; k++)
465 52207373 : if (!isVarConstant(mb, getArg(p, k))) {
466 25361443 : if (getState(states, p, k) & VARREAD)
467 11941078 : setState(states, p, k, VAR2READ);
468 13420365 : else if (getState(states, p, k) & VARWRITE)
469 11137799 : setState(states, p, k, VARREAD);
470 : }
471 : }
472 :
473 : /* take the remainder as is */
474 117550553 : for (; i < slimit; i++)
475 117068842 : if (old[i])
476 14002157 : pushInstruction(mb, old[i]);
477 : /* Defense line against incorrect plans */
478 481711 : if (msg == MAL_SUCCEED && actions > 0) {
479 130367 : msg = chkTypes(cntxt->usermodule, mb, FALSE);
480 130348 : if (msg == MAL_SUCCEED) {
481 130348 : msg = chkFlow(mb);
482 130332 : if (msg == MAL_SUCCEED)
483 130344 : msg = chkDeclarations(mb);
484 : }
485 : }
486 351332 : wrapup:
487 : /* keep actions taken as a fake argument */
488 481695 : (void) pushInt(mb, pci, actions);
489 :
490 481671 : if (states)
491 481671 : GDKfree(states);
492 481706 : if (old)
493 481706 : GDKfree(old);
494 : return msg;
495 : }
|