Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * The statements are all checked for being eligible for dataflow.
15 : */
16 : #include "monetdb_config.h"
17 : #include "opt_dataflow.h"
18 : #include "mal_instruction.h"
19 : #include "mal_interpreter.h"
20 : #include "manifold.h"
21 :
22 : /*
23 : * Dataflow processing incurs overhead and is only
24 : * relevant if multiple tasks kan be handled at the same time.
25 : * Also simple expressions dont have to be executed in parallel.
26 : *
27 : * The dataflow analysis centers around the read/write use patterns of
28 : * the variables and the occurrence of side-effect bearing functions.
29 : * Any such function should break the dataflow block as it may rely
30 : * on the sequential order in the plan.
31 : *
32 : * The following state properties can be distinguished for all variables:
33 : * VARWRITE - variable assigned a value in the dataflow block
34 : * VARREAD - variable is used in an argument
35 : * VAR2READ - variable is read in concurrent mode
36 : * VARBLOCK - variable next use terminate the // block, set after encountering an update
37 : *
38 : * Only some combinations are allowed.
39 : */
40 :
41 : #define VARFREE 0
42 : #define VARWRITE 1
43 : #define VARREAD 2
44 : #define VARBLOCK 4
45 : #define VAR2READ 8
46 :
47 : typedef char *States;
48 :
49 : #define setState(S,P,K,F) ( assert(getArg(P,K) < vlimit), (S)[getArg(P,K)] |= F)
50 : #define getState(S,P,K) ((S)[getArg(P,K)])
51 :
52 : typedef enum {
53 : no_region,
54 : singleton_region, // always a single statement
55 : dataflow_region, // statements without or with controlled side effects, in parallel
56 : existing_region, // existing barrier..exit region, copied as-is
57 : sql_region, // region of nonconflicting sql.append/sql.updates only
58 : } region_type;
59 :
60 : typedef struct {
61 : region_type type;
62 : union {
63 : struct {
64 : int level; // level of nesting
65 : } existing_region;
66 : } st;
67 : } region_state;
68 :
69 : static bool
70 1661220 : simpleFlow(InstrPtr *old, int start, int last, region_state *state)
71 : {
72 1661220 : int i, j, k;
73 1661220 : bool simple = true;
74 1661220 : InstrPtr p = NULL, q;
75 :
76 : /* ignore trivial blocks */
77 1661220 : if (last - start == 1)
78 : return true;
79 172750 : if (state->type == existing_region) {
80 : /* don't add additional barriers and garbage collection around
81 : * existing region. */
82 : return true;
83 : }
84 : /* skip sequence of simple arithmetic first */
85 345148 : for (; simple && start < last; start++)
86 173212 : if (old[start]) {
87 173212 : p = old[start];
88 344435 : simple = getModuleId(p) == calcRef || getModuleId(p) == mtimeRef
89 344435 : || getModuleId(p) == strRef || getModuleId(p) == mmathRef;
90 : }
91 294752 : for (i = start; i < last; i++)
92 276261 : if (old[i]) {
93 276261 : q = old[i];
94 264325 : simple = getModuleId(q) == calcRef || getModuleId(q) == mtimeRef
95 540586 : || getModuleId(q) == strRef || getModuleId(q) == mmathRef;
96 276261 : if (!simple) {
97 : /* if not arithmetic than we should consume the previous result directly */
98 1536943 : for (j = q->retc; j < q->argc; j++)
99 3056964 : for (k = 0; k < p->retc; k++)
100 1784325 : if (getArg(p, k) == getArg(q, j))
101 111341 : simple = true;
102 264304 : if (!simple)
103 : return false;
104 : }
105 : p = q;
106 : }
107 : return simple;
108 : }
109 :
110 : /* Updates are permitted if it is a unique update on
111 : * a BAT created in the context of this block
112 : * As far as we know, no SQL nor MAL test re-uses the
113 : * target BAT to insert again and subsequently calls dataflow.
114 : * In MAL scripts, they still can occur.
115 : */
116 :
117 : /* a limited set of MAL instructions may appear in the dataflow block*/
118 : static int
119 9873614 : dataflowBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr p, States states)
120 : {
121 9873614 : int j;
122 :
123 9873614 : if (p->token == ENDsymbol || p->barrier || isUnsafeFunction(p)
124 9727470 : || (isMultiplex(p) && MANIFOLDtypecheck(cntxt, mb, p, 0) == NULL)) {
125 146962 : return TRUE;
126 : }
127 :
128 : /* flow blocks should be closed when we reach a point
129 : where a variable is assigned more then once or already
130 : being read.
131 : */
132 20700910 : for (j = 0; j < p->retc; j++)
133 10974253 : if (getState(states, p, j) & (VARWRITE | VARREAD | VARBLOCK)) {
134 : return 1;
135 : }
136 :
137 : /* update instructions can be updated if the target variable
138 : * has not been read in the block so far */
139 9726657 : if (isUpdateInstruction(p)) {
140 : /* the SQL update functions change BATs that are not
141 : * explicitly mentioned as arguments (and certainly not as the
142 : * first argument), but that can still be available to the MAL
143 : * program (see bugs.monetdb.org/6641) */
144 216979 : if (getModuleId(p) == sqlRef)
145 : return 1;
146 214092 : return getState(states, p, p->retc) & (VARREAD | VARBLOCK);
147 : }
148 :
149 50443968 : for (j = p->retc; j < p->argc; j++) {
150 40934325 : if (getState(states, p, j) & VARBLOCK) {
151 : return 1;
152 : }
153 : }
154 9509643 : return hasSideEffects(mb, p, FALSE);
155 : }
156 :
157 : static str
158 15922021 : get_str_arg(MalBlkPtr mb, InstrPtr p, int argno)
159 : {
160 15922021 : int var = getArg(p, argno);
161 15922021 : return getVarConstant(mb, var).val.sval;
162 : }
163 :
164 : static str
165 455098 : get_sql_sname(MalBlkPtr mb, InstrPtr p)
166 : {
167 455098 : return get_str_arg(mb, p, 2);
168 : }
169 :
170 : static str
171 455098 : get_sql_tname(MalBlkPtr mb, InstrPtr p)
172 : {
173 455098 : return get_str_arg(mb, p, 3);
174 : }
175 :
176 : static str
177 15922041 : get_sql_cname(MalBlkPtr mb, InstrPtr p)
178 : {
179 15922041 : return get_str_arg(mb, p, 4);
180 : }
181 :
182 :
183 : static bool
184 1702314 : isSqlAppendUpdate(MalBlkPtr mb, InstrPtr p)
185 : {
186 1702314 : if (p->modname != sqlRef)
187 : return false;
188 1195784 : if (p->fcnname != appendRef && p->fcnname != updateRef)
189 : return false;
190 :
191 : // pattern("sql", "append", mvc_append_wrap, false, "...", args(1,8, arg("",int),
192 : // arg("mvc",int),
193 : // arg("sname",str),
194 : // arg("tname",str),
195 : // arg("cname",str),
196 : // arg("offset",lng),
197 : // batarg("pos",oid),
198 : // argany("ins",0))),
199 :
200 : // pattern("sql", "update", mvc_update_wrap, false, "...", args(1,7, arg("",int),
201 : // arg("mvc",int),
202 : // arg("sname",str),
203 : // arg("tname",str),
204 : // arg("cname",str),
205 : // argany("rids",0),
206 : // argany("upd",0)))
207 :
208 533863 : if ((p->fcnname == appendRef && p->argc != 8)
209 533870 : || (p->fcnname == updateRef && p->argc != 7))
210 : return false;
211 :
212 533870 : int mvc_var = getArg(p, 1);
213 533870 : if (getVarType(mb, mvc_var) != TYPE_int)
214 : return false;
215 :
216 533870 : int sname_var = getArg(p, 2);
217 533870 : if (getVarType(mb, sname_var) != TYPE_str || !isVarConstant(mb, sname_var))
218 : return false;
219 :
220 533886 : int tname_var = getArg(p, 3);
221 533886 : if (getVarType(mb, tname_var) != TYPE_str || !isVarConstant(mb, tname_var))
222 : return false;
223 :
224 533885 : int cname_var = getArg(p, 4);
225 533885 : if (getVarType(mb, cname_var) != TYPE_str || !isVarConstant(mb, cname_var))
226 : return false;
227 :
228 : return true;
229 : }
230 :
231 : static bool
232 533875 : sqlBreakpoint(MalBlkPtr mb, InstrPtr *first, InstrPtr *p)
233 : {
234 533875 : InstrPtr instr = *p;
235 533875 : if (!isSqlAppendUpdate(mb, instr))
236 : return true;
237 :
238 455108 : str my_sname = get_sql_sname(mb, instr);
239 455108 : str my_tname = get_sql_tname(mb, instr);
240 455108 : str my_cname = get_sql_cname(mb, instr);
241 15922051 : for (InstrPtr *q = first; q < p; q++) {
242 15466933 : str cname = get_sql_cname(mb, *q);
243 15466933 : if (strcmp(my_cname, cname) != 0) {
244 : // different cname, no conflict
245 15466943 : continue;
246 : }
247 0 : str tname = get_sql_tname(mb, *q);
248 0 : if (strcmp(my_tname, tname) != 0) {
249 : // different tname, no conflict
250 0 : continue;
251 : }
252 0 : str sname = get_sql_sname(mb, *q);
253 0 : if (strcmp(my_sname, sname) != 0) {
254 : // different sname, no conflict
255 0 : continue;
256 : }
257 : // Found a statement in the region that works on the same column so this is a breakpoint
258 : return true;
259 : }
260 :
261 : // None of the statements in the region works on this column so no breakpoint necessary
262 : return false;
263 : }
264 :
265 : static bool
266 11811665 : checkBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr *first, InstrPtr *p,
267 : States states, region_state *state)
268 : {
269 11811665 : InstrPtr instr = *p;
270 11811665 : switch (state->type) {
271 : case singleton_region:
272 : // by definition
273 : return true;
274 9873510 : case dataflow_region:
275 9873510 : return dataflowBreakpoint(cntxt, mb, instr, states);
276 6701 : case existing_region:
277 6701 : if (state->st.existing_region.level == 0) {
278 : // previous statement ended the region so we break here
279 : return true;
280 : }
281 5887 : if (blockStart(instr)) {
282 38 : state->st.existing_region.level += 1;
283 5849 : } else if (blockExit(instr)) {
284 852 : state->st.existing_region.level -= 1;
285 : }
286 : return false;
287 533872 : case sql_region:
288 533872 : return sqlBreakpoint(mb, first, p);
289 : default:
290 : // serious corruption has occurred.
291 0 : assert(0); /* corrupted region_type */
292 : abort();
293 : }
294 : assert(0); /* unreachable */
295 : return true;
296 : }
297 :
298 : static void
299 1169435 : decideRegionType(Client cntxt, MalBlkPtr mb, InstrPtr p, States states,
300 : region_state *state)
301 : {
302 1169435 : (void) cntxt;
303 :
304 1169435 : state->type = no_region;
305 1169435 : if (blockStart(p)) {
306 816 : state->type = existing_region;
307 816 : state->st.existing_region.level = 1;
308 1168619 : } else if (p->token == ENDsymbol) {
309 0 : state->type = existing_region;
310 1168619 : } else if (isSqlAppendUpdate(mb, p)) {
311 78755 : state->type = sql_region;
312 1089864 : } else if (p->barrier) {
313 638 : state->type = singleton_region;
314 1089226 : } else if (isUnsafeFunction(p)) {
315 396946 : state->type = singleton_region;
316 692403 : } else if (isUpdateInstruction(p)
317 36 : && getModuleId(p) != sqlRef
318 20 : && (getState(states, p, p->retc) & (VARREAD | VARBLOCK)) == 0) {
319 : // Special case. Unless they're from the sql module, instructions with
320 : // names like 'append', 'update', 'delete', 'grow', etc., are expected
321 : // to express their side effects as data dependencies, for example,
322 : // X5 := bat.append(X_5, ...)
323 20 : state->type = dataflow_region;
324 692279 : } else if (hasSideEffects(mb, p, false)) {
325 504274 : state->type = singleton_region;
326 188025 : } else if (isMultiplex(p)) {
327 3938 : state->type = singleton_region;
328 : } else {
329 184042 : state->type = dataflow_region;
330 : }
331 1169429 : assert(state->type != no_region);
332 1169429 : }
333 :
334 :
335 : /* dataflow blocks are transparent, because they are always
336 : executed, either sequentially or in parallel */
337 :
338 : str
339 491600 : OPTdataflowImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
340 : InstrPtr pci)
341 : {
342 491600 : int i, j, k, start, slimit, breakpoint, actions = 0;
343 491600 : bool simple = true;
344 491600 : int flowblock = 0;
345 491600 : InstrPtr p, *old = NULL, q;
346 491600 : int limit, vlimit;
347 491600 : States states = NULL;
348 491600 : region_state state = { singleton_region };
349 491600 : str msg = MAL_SUCCEED;
350 :
351 : /* don't use dataflow on single processor systems */
352 491600 : if (GDKnr_threads <= 1 || cntxt->workerlimit == 1)
353 0 : goto wrapup;
354 :
355 491600 : if (optimizerIsApplied(mb, dataflowRef))
356 0 : goto wrapup;
357 491648 : (void) stk;
358 : /* inlined functions will get their dataflow control later */
359 491648 : if (mb->inlineProp)
360 0 : goto wrapup;
361 :
362 491648 : vlimit = mb->vsize;
363 491648 : states = (States) GDKzalloc(vlimit * sizeof(char));
364 491662 : if (states == NULL) {
365 0 : throw(MAL, "optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
366 : }
367 :
368 491662 : setVariableScope(mb);
369 :
370 491530 : limit = mb->stop;
371 491530 : slimit = mb->ssize;
372 491530 : old = mb->stmt;
373 491530 : if (newMalBlkStmt(mb, mb->ssize) < 0) {
374 0 : GDKfree(states);
375 0 : throw(MAL, "optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
376 : }
377 :
378 : /* inject new dataflow barriers using a single pass through the program */
379 491650 : start = 0;
380 491650 : state.type = singleton_region;
381 11810441 : for (i = 1; mb->errors == NULL && i < limit; i++) {
382 11810441 : p = old[i];
383 11810441 : assert(p);
384 11810441 : breakpoint = checkBreakpoint(cntxt, mb, &old[start], &old[i], states, &state);
385 11810438 : if (breakpoint) {
386 : /* close previous flow block */
387 1660941 : simple = simpleFlow(old, start, i, &state);
388 :
389 1660941 : if (!simple) {
390 153400 : if ((flowblock = newTmpVariable(mb, TYPE_bit)) < 0
391 153419 : || (q = newFcnCall(mb, languageRef, dataflowRef)) == NULL) {
392 0 : msg = createException(MAL, "optimizer.dataflow",
393 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
394 0 : break;
395 : }
396 153528 : q->barrier = BARRIERsymbol;
397 153528 : getArg(q, 0) = flowblock;
398 153528 : pushInstruction(mb, q);
399 153528 : actions++;
400 : }
401 : // copyblock the collected statements
402 13471378 : for (j = start; j < i; j++) {
403 11810345 : q = old[j];
404 11810345 : pushInstruction(mb, q);
405 11810254 : old[j] = NULL;
406 : // collect BAT variables garbage collected within the block
407 11810254 : if (!simple)
408 55509725 : for (k = q->retc; k < q->argc; k++) {
409 45251370 : if (getState(states, q, k) & VAR2READ
410 13290052 : && getEndScope(mb, getArg(q, k)) == j
411 2298004 : && isaBatType(getVarType(mb, getArg(q, k)))) {
412 2231651 : InstrPtr r;
413 2231651 : r = newInstruction(NULL, languageRef, passRef);
414 2231657 : if (r == NULL) {
415 0 : msg = createException(MAL, "optimizer.dataflow",
416 : SQLSTATE(HY013)
417 : MAL_MALLOC_FAIL);
418 0 : break;
419 : }
420 2231657 : getArg(r, 0) = newTmpVariable(mb, TYPE_void);
421 2231657 : if (getArg(r, 0) < 0) {
422 0 : freeInstruction(r);
423 0 : msg = createException(MAL, "optimizer.dataflow",
424 : SQLSTATE(HY013)
425 : MAL_MALLOC_FAIL);
426 0 : break;
427 : }
428 2231657 : r = pushArgument(mb, r, getArg(q, k));
429 2231654 : pushInstruction(mb, r);
430 : }
431 : }
432 11810309 : if (msg)
433 : break;
434 : }
435 1661033 : if (msg)
436 : break;
437 : /* exit parallel block */
438 1661033 : if (!simple) {
439 153523 : q = newAssignment(mb);
440 153526 : if (q == NULL) {
441 0 : msg = createException(MAL, "optimizer.dataflow",
442 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
443 0 : break;
444 : }
445 153526 : q->barrier = EXITsymbol;
446 153526 : getArg(q, 0) = flowblock;
447 153526 : pushInstruction(mb, q);
448 : }
449 1661035 : if (p->token == ENDsymbol) {
450 : break;
451 : }
452 : // Start a new region
453 1169505 : memset((char *) states, 0, vlimit * sizeof(char));
454 1169505 : start = i;
455 1169505 : decideRegionType(cntxt, mb, p, states, &state);
456 : }
457 : // remember you assigned/read variables
458 23980300 : for (k = 0; k < p->retc; k++)
459 12661535 : setState(states, p, k, VARWRITE);
460 11318765 : if (isUpdateInstruction(p)
461 915711 : && (getState(states, p, 1) == 0
462 739535 : || getState(states, p, 1) & VARWRITE))
463 467884 : setState(states, p, 1, VARBLOCK);
464 60595133 : for (k = p->retc; k < p->argc; k++)
465 49276342 : if (!isVarConstant(mb, getArg(p, k))) {
466 23776609 : if (getState(states, p, k) & VARREAD)
467 11016845 : setState(states, p, k, VAR2READ);
468 12759764 : else if (getState(states, p, k) & VARWRITE)
469 10172617 : setState(states, p, k, VARREAD);
470 : }
471 : }
472 :
473 : /* take the remainder as is */
474 119973630 : for (; i < slimit; i++)
475 119481974 : if (old[i])
476 14309786 : pushInstruction(mb, old[i]);
477 : /* Defense line against incorrect plans */
478 491656 : if (msg == MAL_SUCCEED && actions > 0) {
479 140404 : msg = chkTypes(cntxt->usermodule, mb, FALSE);
480 140392 : if (msg == MAL_SUCCEED) {
481 140392 : msg = chkFlow(mb);
482 140351 : if (msg == MAL_SUCCEED)
483 140366 : msg = chkDeclarations(mb);
484 : }
485 : }
486 351237 : wrapup:
487 : /* keep actions taken as a fake argument */
488 491621 : (void) pushInt(mb, pci, actions);
489 :
490 491620 : if (states)
491 491620 : GDKfree(states);
492 491663 : if (old)
493 491663 : GDKfree(old);
494 : return msg;
495 : }
|