Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * The statements are all checked for being eligible for dataflow.
15 : */
16 : #include "monetdb_config.h"
17 : #include "opt_dataflow.h"
18 : #include "mal_instruction.h"
19 : #include "mal_interpreter.h"
20 : #include "manifold.h"
21 :
22 : /*
23 : * Dataflow processing incurs overhead and is only
24 : * relevant if multiple tasks kan be handled at the same time.
25 : * Also simple expressions dont have to be executed in parallel.
26 : *
27 : * The dataflow analysis centers around the read/write use patterns of
28 : * the variables and the occurrence of side-effect bearing functions.
29 : * Any such function should break the dataflow block as it may rely
30 : * on the sequential order in the plan.
31 : *
32 : * The following state properties can be distinguished for all variables:
33 : * VARWRITE - variable assigned a value in the dataflow block
34 : * VARREAD - variable is used in an argument
35 : * VAR2READ - variable is read in concurrent mode
36 : * VARBLOCK - variable next use terminate the // block, set after encountering an update
37 : *
38 : * Only some combinations are allowed.
39 : */
40 :
41 : #define VARFREE 0
42 : #define VARWRITE 1
43 : #define VARREAD 2
44 : #define VARBLOCK 4
45 : #define VAR2READ 8
46 :
47 : typedef char *States;
48 :
49 : #define setState(S,P,K,F) ( assert(getArg(P,K) < vlimit), (S)[getArg(P,K)] |= F)
50 : #define getState(S,P,K) ((S)[getArg(P,K)])
51 :
52 : typedef enum {
53 : no_region,
54 : singleton_region, // always a single statement
55 : dataflow_region, // statements without or with controlled side effects, in parallel
56 : existing_region, // existing barrier..exit region, copied as-is
57 : sql_region, // region of nonconflicting sql.append/sql.updates only
58 : } region_type;
59 :
60 : typedef struct {
61 : region_type type;
62 : union {
63 : struct {
64 : int level; // level of nesting
65 : } existing_region;
66 : } st;
67 : } region_state;
68 :
69 : static bool
70 1600684 : simpleFlow(InstrPtr *old, int start, int last, region_state *state)
71 : {
72 1600684 : int i, j, k;
73 1600684 : bool simple = true;
74 1600684 : InstrPtr p = NULL, q;
75 :
76 : /* ignore trivial blocks */
77 1600684 : if (last - start == 1)
78 : return true;
79 163115 : if (state->type == existing_region) {
80 : /* don't add additional barriers and garbage collection around
81 : * existing region. */
82 : return true;
83 : }
84 : /* skip sequence of simple arithmetic first */
85 325956 : for (; simple && start < last; start++)
86 163660 : if (old[start]) {
87 163660 : p = old[start];
88 325323 : simple = getModuleId(p) == calcRef || getModuleId(p) == mtimeRef
89 325323 : || getModuleId(p) == strRef || getModuleId(p) == mmathRef;
90 : }
91 286206 : for (i = start; i < last; i++)
92 267753 : if (old[i]) {
93 267753 : q = old[i];
94 255747 : simple = getModuleId(q) == calcRef || getModuleId(q) == mtimeRef
95 523500 : || getModuleId(q) == strRef || getModuleId(q) == mmathRef;
96 267753 : if (!simple) {
97 : /* if not arithmetic than we should consume the previous result directly */
98 1480633 : for (j = q->retc; j < q->argc; j++)
99 2960307 : for (k = 0; k < p->retc; k++)
100 1735398 : if (getArg(p, k) == getArg(q, j))
101 112233 : simple = true;
102 255724 : if (!simple)
103 : return false;
104 : }
105 : p = q;
106 : }
107 : return simple;
108 : }
109 :
110 : /* Updates are permitted if it is a unique update on
111 : * a BAT created in the context of this block
112 : * As far as we know, no SQL nor MAL test re-uses the
113 : * target BAT to insert again and subsequently calls dataflow.
114 : * In MAL scripts, they still can occur.
115 : */
116 :
117 : /* a limited set of MAL instructions may appear in the dataflow block*/
118 : static int
119 6714161 : dataflowBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr p, States states)
120 : {
121 6714161 : int j;
122 :
123 6714161 : if (p->token == ENDsymbol || p->barrier || isUnsafeFunction(p)
124 6577431 : || (isMultiplex(p) && MANIFOLDtypecheck(cntxt, mb, p, 0) == NULL)) {
125 137485 : return TRUE;
126 : }
127 :
128 : /* flow blocks should be closed when we reach a point
129 : where a variable is assigned more then once or already
130 : being read.
131 : */
132 13946686 : for (j = 0; j < p->retc; j++)
133 7370236 : if (getState(states, p, j) & (VARWRITE | VARREAD | VARBLOCK)) {
134 : return 1;
135 : }
136 :
137 : /* update instructions can be updated if the target variable
138 : * has not been read in the block so far */
139 6576450 : if (isUpdateInstruction(p)) {
140 : /* the SQL update functions change BATs that are not
141 : * explicitly mentioned as arguments (and certainly not as the
142 : * first argument), but that can still be available to the MAL
143 : * program (see bugs.monetdb.org/6641) */
144 220282 : if (getModuleId(p) == sqlRef)
145 : return 1;
146 217419 : return getState(states, p, p->retc) & (VARREAD | VARBLOCK);
147 : }
148 :
149 32586043 : for (j = p->retc; j < p->argc; j++) {
150 26229869 : if (getState(states, p, j) & VARBLOCK) {
151 : return 1;
152 : }
153 : }
154 6356174 : return hasSideEffects(mb, p, FALSE);
155 : }
156 :
157 : static str
158 15426171 : get_str_arg(MalBlkPtr mb, InstrPtr p, int argno)
159 : {
160 15426171 : int var = getArg(p, argno);
161 15426171 : return getVarConstant(mb, var).val.sval;
162 : }
163 :
164 : static str
165 368575 : get_sql_sname(MalBlkPtr mb, InstrPtr p)
166 : {
167 368575 : return get_str_arg(mb, p, 2);
168 : }
169 :
170 : static str
171 368575 : get_sql_tname(MalBlkPtr mb, InstrPtr p)
172 : {
173 368575 : return get_str_arg(mb, p, 3);
174 : }
175 :
176 : static str
177 15426179 : get_sql_cname(MalBlkPtr mb, InstrPtr p)
178 : {
179 15426179 : return get_str_arg(mb, p, 4);
180 : }
181 :
182 :
183 : static bool
184 1554273 : isSqlAppendUpdate(MalBlkPtr mb, InstrPtr p)
185 : {
186 1554273 : if (p->modname != sqlRef)
187 : return false;
188 1047199 : if (p->fcnname != appendRef && p->fcnname != updateRef)
189 : return false;
190 :
191 : // pattern("sql", "append", mvc_append_wrap, false, "...", args(1,8, arg("",int),
192 : // arg("mvc",int),
193 : // arg("sname",str),
194 : // arg("tname",str),
195 : // arg("cname",str),
196 : // arg("offset",lng),
197 : // batarg("pos",oid),
198 : // argany("ins",0))),
199 :
200 : // pattern("sql", "update", mvc_update_wrap, false, "...", args(1,7, arg("",int),
201 : // arg("mvc",int),
202 : // arg("sname",str),
203 : // arg("tname",str),
204 : // arg("cname",str),
205 : // argany("rids",0),
206 : // argany("upd",0)))
207 :
208 436630 : if ((p->fcnname == appendRef && p->argc != 8)
209 436630 : || (p->fcnname == updateRef && p->argc != 7))
210 : return false;
211 :
212 436630 : int mvc_var = getArg(p, 1);
213 436630 : if (getVarType(mb, mvc_var) != TYPE_int)
214 : return false;
215 :
216 436630 : int sname_var = getArg(p, 2);
217 436630 : if (getVarType(mb, sname_var) != TYPE_str || !isVarConstant(mb, sname_var))
218 : return false;
219 :
220 436630 : int tname_var = getArg(p, 3);
221 436630 : if (getVarType(mb, tname_var) != TYPE_str || !isVarConstant(mb, tname_var))
222 : return false;
223 :
224 436630 : int cname_var = getArg(p, 4);
225 436630 : if (getVarType(mb, cname_var) != TYPE_str || !isVarConstant(mb, cname_var))
226 : return false;
227 :
228 : return true;
229 : }
230 :
231 : static bool
232 436633 : sqlBreakpoint(MalBlkPtr mb, InstrPtr *first, InstrPtr *p)
233 : {
234 436633 : InstrPtr instr = *p;
235 436633 : if (!isSqlAppendUpdate(mb, instr))
236 : return true;
237 :
238 368579 : str my_sname = get_sql_sname(mb, instr);
239 368579 : str my_tname = get_sql_tname(mb, instr);
240 368579 : str my_cname = get_sql_cname(mb, instr);
241 15426183 : for (InstrPtr *q = first; q < p; q++) {
242 15057600 : str cname = get_sql_cname(mb, *q);
243 15057600 : if (strcmp(my_cname, cname) != 0) {
244 : // different cname, no conflict
245 15057604 : continue;
246 : }
247 0 : str tname = get_sql_tname(mb, *q);
248 0 : if (strcmp(my_tname, tname) != 0) {
249 : // different tname, no conflict
250 0 : continue;
251 : }
252 0 : str sname = get_sql_sname(mb, *q);
253 0 : if (strcmp(my_sname, sname) != 0) {
254 : // different sname, no conflict
255 0 : continue;
256 : }
257 : // Found a statement in the region that works on the same column so this is a breakpoint
258 : return true;
259 : }
260 :
261 : // None of the statements in the region works on this column so no breakpoint necessary
262 : return false;
263 : }
264 :
265 : static bool
266 8514836 : checkBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr *first, InstrPtr *p,
267 : States states, region_state *state)
268 : {
269 8514836 : InstrPtr instr = *p;
270 8514836 : switch (state->type) {
271 : case singleton_region:
272 : // by definition
273 : return true;
274 6714155 : case dataflow_region:
275 6714155 : return dataflowBreakpoint(cntxt, mb, instr, states);
276 6744 : case existing_region:
277 6744 : if (state->st.existing_region.level == 0) {
278 : // previous statement ended the region so we break here
279 : return true;
280 : }
281 5925 : if (blockStart(instr)) {
282 38 : state->st.existing_region.level += 1;
283 5887 : } else if (blockExit(instr)) {
284 857 : state->st.existing_region.level -= 1;
285 : }
286 : return false;
287 436630 : case sql_region:
288 436630 : return sqlBreakpoint(mb, first, p);
289 : default:
290 : // serious corruption has occurred.
291 0 : assert(0); /* corrupted region_type */
292 : abort();
293 : }
294 : assert(0); /* unreachable */
295 : return true;
296 : }
297 :
298 : static void
299 1118489 : decideRegionType(Client cntxt, MalBlkPtr mb, InstrPtr p, States states,
300 : region_state *state)
301 : {
302 1118489 : (void) cntxt;
303 :
304 1118489 : state->type = no_region;
305 1118489 : if (blockStart(p)) {
306 819 : state->type = existing_region;
307 819 : state->st.existing_region.level = 1;
308 1117670 : } else if (p->token == ENDsymbol) {
309 0 : state->type = existing_region;
310 1117670 : } else if (isSqlAppendUpdate(mb, p)) {
311 68054 : state->type = sql_region;
312 1049616 : } else if (p->barrier) {
313 644 : state->type = singleton_region;
314 1048972 : } else if (isUnsafeFunction(p)) {
315 366119 : state->type = singleton_region;
316 682872 : } else if (isUpdateInstruction(p)
317 36 : && getModuleId(p) != sqlRef
318 20 : && (getState(states, p, p->retc) & (VARREAD | VARBLOCK)) == 0) {
319 : // Special case. Unless they're from the sql module, instructions with
320 : // names like 'append', 'update', 'delete', 'grow', etc., are expected
321 : // to express their side effects as data dependencies, for example,
322 : // X5 := bat.append(X_5, ...)
323 20 : state->type = dataflow_region;
324 682851 : } else if (hasSideEffects(mb, p, false)) {
325 506020 : state->type = singleton_region;
326 176839 : } else if (isMultiplex(p)) {
327 2343 : state->type = singleton_region;
328 : } else {
329 174500 : state->type = dataflow_region;
330 : }
331 1118519 : assert(state->type != no_region);
332 1118519 : }
333 :
334 :
335 : /* dataflow blocks are transparent, because they are always
336 : executed, either sequentially or in parallel */
337 :
338 : str
339 482209 : OPTdataflowImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
340 : InstrPtr pci)
341 : {
342 482209 : int i, j, k, start, slimit, breakpoint, actions = 0;
343 482209 : bool simple = true;
344 482209 : int flowblock = 0;
345 482209 : InstrPtr p, *old = NULL, q;
346 482209 : int limit, vlimit;
347 482209 : States states = NULL;
348 482209 : region_state state = { singleton_region };
349 482209 : str msg = MAL_SUCCEED;
350 :
351 : /* don't use dataflow on single processor systems */
352 482209 : if (GDKnr_threads <= 1 || cntxt->workerlimit == 1)
353 0 : goto wrapup;
354 :
355 482209 : if (optimizerIsApplied(mb, dataflowRef))
356 0 : goto wrapup;
357 482210 : (void) stk;
358 : /* inlined functions will get their dataflow control later */
359 482210 : if (mb->inlineProp)
360 0 : goto wrapup;
361 :
362 482210 : vlimit = mb->vsize;
363 482210 : states = (States) GDKzalloc(vlimit * sizeof(char));
364 482208 : if (states == NULL) {
365 0 : throw(MAL, "optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
366 : }
367 :
368 482208 : setVariableScope(mb);
369 :
370 482207 : limit = mb->stop;
371 482207 : slimit = mb->ssize;
372 482207 : old = mb->stmt;
373 482207 : if (newMalBlkStmt(mb, mb->ssize) < 0) {
374 0 : GDKfree(states);
375 0 : throw(MAL, "optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
376 : }
377 :
378 : /* inject new dataflow barriers using a single pass through the program */
379 482210 : start = 0;
380 482210 : state.type = singleton_region;
381 8514362 : for (i = 1; mb->errors == NULL && i < limit; i++) {
382 8514362 : p = old[i];
383 8514362 : assert(p);
384 8514362 : breakpoint = checkBreakpoint(cntxt, mb, &old[start], &old[i], states, &state);
385 8514640 : if (breakpoint) {
386 : /* close previous flow block */
387 1600649 : simple = simpleFlow(old, start, i, &state);
388 :
389 1600649 : if (!simple) {
390 143848 : if ((flowblock = newTmpVariable(mb, TYPE_bit)) < 0
391 143849 : || (q = newFcnCall(mb, languageRef, dataflowRef)) == NULL) {
392 0 : msg = createException(MAL, "optimizer.dataflow",
393 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
394 0 : break;
395 : }
396 143850 : q->barrier = BARRIERsymbol;
397 143850 : getArg(q, 0) = flowblock;
398 143850 : pushInstruction(mb, q);
399 143850 : actions++;
400 : }
401 : // copyblock the collected statements
402 10116356 : for (j = start; j < i; j++) {
403 8515684 : q = old[j];
404 8515684 : pushInstruction(mb, q);
405 8515727 : old[j] = NULL;
406 : // collect BAT variables garbage collected within the block
407 8515727 : if (!simple)
408 36909159 : for (k = q->retc; k < q->argc; k++) {
409 29895012 : if (getState(states, q, k) & VAR2READ
410 8229188 : && getEndScope(mb, getArg(q, k)) == j
411 1653609 : && isaBatType(getVarType(mb, getArg(q, k)))) {
412 1586338 : InstrPtr r;
413 1586338 : r = newInstruction(NULL, languageRef, passRef);
414 1586339 : if (r == NULL) {
415 0 : msg = createException(MAL, "optimizer.dataflow",
416 : SQLSTATE(HY013)
417 : MAL_MALLOC_FAIL);
418 0 : break;
419 : }
420 1586339 : getArg(r, 0) = newTmpVariable(mb, TYPE_void);
421 1586339 : if (getArg(r, 0) < 0) {
422 0 : freeInstruction(r);
423 0 : msg = createException(MAL, "optimizer.dataflow",
424 : SQLSTATE(HY013)
425 : MAL_MALLOC_FAIL);
426 0 : break;
427 : }
428 1586339 : r = pushArgument(mb, r, getArg(q, k));
429 1586339 : pushInstruction(mb, r);
430 : }
431 : }
432 8515705 : if (msg)
433 : break;
434 : }
435 1600672 : if (msg)
436 : break;
437 : /* exit parallel block */
438 1600672 : if (!simple) {
439 143850 : q = newAssignment(mb);
440 143848 : if (q == NULL) {
441 0 : msg = createException(MAL, "optimizer.dataflow",
442 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
443 0 : break;
444 : }
445 143848 : q->barrier = EXITsymbol;
446 143848 : getArg(q, 0) = flowblock;
447 143848 : pushInstruction(mb, q);
448 : }
449 1600670 : if (p->token == ENDsymbol) {
450 : break;
451 : }
452 : // Start a new region
453 1118462 : memset((char *) states, 0, vlimit * sizeof(char));
454 1118462 : start = i;
455 1118462 : decideRegionType(cntxt, mb, p, states, &state);
456 : }
457 : // remember you assigned/read variables
458 16942364 : for (k = 0; k < p->retc; k++)
459 8909904 : setState(states, p, k, VARWRITE);
460 8032460 : if (isUpdateInstruction(p)
461 800379 : && (getState(states, p, 1) == 0
462 647057 : || getState(states, p, 1) & VARWRITE))
463 439076 : setState(states, p, 1, VARBLOCK);
464 41887739 : for (k = p->retc; k < p->argc; k++)
465 33855587 : if (!isVarConstant(mb, getArg(p, k))) {
466 15481701 : if (getState(states, p, k) & VARREAD)
467 6585668 : setState(states, p, k, VAR2READ);
468 8896033 : else if (getState(states, p, k) & VARWRITE)
469 6648971 : setState(states, p, k, VARREAD);
470 : }
471 : }
472 :
473 : /* take the remainder as is */
474 118711262 : for (; i < slimit; i++)
475 118229052 : if (old[i])
476 14019380 : pushInstruction(mb, old[i]);
477 : /* Defense line against incorrect plans */
478 482210 : if (msg == MAL_SUCCEED && actions > 0) {
479 130664 : msg = chkTypes(cntxt->usermodule, mb, FALSE);
480 130663 : if (msg == MAL_SUCCEED) {
481 130663 : msg = chkFlow(mb);
482 130662 : if (msg == MAL_SUCCEED)
483 130662 : msg = chkDeclarations(mb);
484 : }
485 : }
486 351546 : wrapup:
487 : /* keep actions taken as a fake argument */
488 482209 : (void) pushInt(mb, pci, actions);
489 :
490 482208 : if (states)
491 482208 : GDKfree(states);
492 482210 : if (old)
493 482210 : GDKfree(old);
494 : return msg;
495 : }
|