Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * The statements are all checked for being eligible for dataflow.
15 : */
16 : #include "monetdb_config.h"
17 : #include "opt_dataflow.h"
18 : #include "mal_instruction.h"
19 : #include "mal_interpreter.h"
20 : #include "manifold.h"
21 :
22 : /*
23 : * Dataflow processing incurs overhead and is only
24 : * relevant if multiple tasks kan be handled at the same time.
25 : * Also simple expressions dont have to be executed in parallel.
26 : *
27 : * The dataflow analysis centers around the read/write use patterns of
28 : * the variables and the occurrence of side-effect bearing functions.
29 : * Any such function should break the dataflow block as it may rely
30 : * on the sequential order in the plan.
31 : *
32 : * The following state properties can be distinguished for all variables:
33 : * VARWRITE - variable assigned a value in the dataflow block
34 : * VARREAD - variable is used in an argument
35 : * VAR2READ - variable is read in concurrent mode
36 : * VARBLOCK - variable next use terminate the // block, set after encountering an update
37 : *
38 : * Only some combinations are allowed.
39 : */
40 :
41 : #define VARFREE 0
42 : #define VARWRITE 1
43 : #define VARREAD 2
44 : #define VARBLOCK 4
45 : #define VAR2READ 8
46 :
47 : typedef char *States;
48 :
49 : #define setState(S,P,K,F) ( assert(getArg(P,K) < vlimit), (S)[getArg(P,K)] |= F)
50 : #define getState(S,P,K) ((S)[getArg(P,K)])
51 :
52 : typedef enum {
53 : no_region,
54 : singleton_region, // always a single statement
55 : dataflow_region, // statements without or with controlled side effects, in parallel
56 : existing_region, // existing barrier..exit region, copied as-is
57 : sql_region, // region of nonconflicting sql.append/sql.updates only
58 : } region_type;
59 :
60 : typedef struct {
61 : region_type type;
62 : union {
63 : struct {
64 : int level; // level of nesting
65 : } existing_region;
66 : } st;
67 : } region_state;
68 :
69 : static bool
70 1676000 : simpleFlow(InstrPtr *old, int start, int last, region_state *state)
71 : {
72 1676000 : int i, j, k;
73 1676000 : bool simple = true;
74 1676000 : InstrPtr p = NULL, q;
75 :
76 : /* ignore trivial blocks */
77 1676000 : if (last - start == 1)
78 : return true;
79 174798 : if (state->type == existing_region) {
80 : /* don't add additional barriers and garbage collection around
81 : * existing region. */
82 : return true;
83 : }
84 : /* skip sequence of simple arithmetic first */
85 349291 : for (; simple && start < last; start++)
86 175312 : if (old[start]) {
87 175312 : p = old[start];
88 348627 : simple = getModuleId(p) == calcRef || getModuleId(p) == mtimeRef
89 348627 : || getModuleId(p) == strRef || getModuleId(p) == mmathRef;
90 : }
91 298726 : for (i = start; i < last; i++)
92 280196 : if (old[i]) {
93 280196 : q = old[i];
94 268192 : simple = getModuleId(q) == calcRef || getModuleId(q) == mtimeRef
95 548388 : || getModuleId(q) == strRef || getModuleId(q) == mmathRef;
96 280196 : if (!simple) {
97 : /* if not arithmetic than we should consume the previous result directly */
98 1570041 : for (j = q->retc; j < q->argc; j++)
99 3114207 : for (k = 0; k < p->retc; k++)
100 1812335 : if (getArg(p, k) == getArg(q, j))
101 113153 : simple = true;
102 268169 : if (!simple)
103 : return false;
104 : }
105 : p = q;
106 : }
107 : return simple;
108 : }
109 :
110 : /* Updates are permitted if it is a unique update on
111 : * a BAT created in the context of this block
112 : * As far as we know, no SQL nor MAL test re-uses the
113 : * target BAT to insert again and subsequently calls dataflow.
114 : * In MAL scripts, they still can occur.
115 : */
116 :
117 : /* a limited set of MAL instructions may appear in the dataflow block*/
118 : static int
119 10886161 : dataflowBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr p, States states)
120 : {
121 10886161 : int j;
122 :
123 10886161 : if (p->token == ENDsymbol || p->barrier || isUnsafeFunction(p)
124 10738203 : || (isMultiplex(p) && MANIFOLDtypecheck(cntxt, mb, p, 0) == NULL)) {
125 148835 : return TRUE;
126 : }
127 :
128 : /* flow blocks should be closed when we reach a point
129 : where a variable is assigned more then once or already
130 : being read.
131 : */
132 22820460 : for (j = 0; j < p->retc; j++)
133 12083057 : if (getState(states, p, j) & (VARWRITE | VARREAD | VARBLOCK)) {
134 : return 1;
135 : }
136 :
137 : /* update instructions can be updated if the target variable
138 : * has not been read in the block so far */
139 10737403 : if (isUpdateInstruction(p)) {
140 : /* the SQL update functions change BATs that are not
141 : * explicitly mentioned as arguments (and certainly not as the
142 : * first argument), but that can still be available to the MAL
143 : * program (see bugs.monetdb.org/6641) */
144 313736 : if (getModuleId(p) == sqlRef)
145 : return 1;
146 310678 : return getState(states, p, p->retc) & (VARREAD | VARBLOCK);
147 : }
148 :
149 54989595 : for (j = p->retc; j < p->argc; j++) {
150 44566044 : if (getState(states, p, j) & VARBLOCK) {
151 : return 1;
152 : }
153 : }
154 10423551 : return hasSideEffects(mb, p, FALSE);
155 : }
156 :
157 : static str
158 16071738 : get_str_arg(MalBlkPtr mb, InstrPtr p, int argno)
159 : {
160 16071738 : int var = getArg(p, argno);
161 16071738 : return getVarConstant(mb, var).val.sval;
162 : }
163 :
164 : static str
165 466873 : get_sql_sname(MalBlkPtr mb, InstrPtr p)
166 : {
167 466873 : return get_str_arg(mb, p, 2);
168 : }
169 :
170 : static str
171 466873 : get_sql_tname(MalBlkPtr mb, InstrPtr p)
172 : {
173 466873 : return get_str_arg(mb, p, 3);
174 : }
175 :
176 : static str
177 16071738 : get_sql_cname(MalBlkPtr mb, InstrPtr p)
178 : {
179 16071738 : return get_str_arg(mb, p, 4);
180 : }
181 :
182 :
183 : static bool
184 1724542 : isSqlAppendUpdate(MalBlkPtr mb, InstrPtr p)
185 : {
186 1724542 : if (p->modname != sqlRef)
187 : return false;
188 1212582 : if (p->fcnname != appendRef && p->fcnname != updateRef)
189 : return false;
190 :
191 : // pattern("sql", "append", mvc_append_wrap, false, "...", args(1,8, arg("",int),
192 : // arg("mvc",int),
193 : // arg("sname",str),
194 : // arg("tname",str),
195 : // arg("cname",str),
196 : // arg("offset",lng),
197 : // batarg("pos",oid),
198 : // argany("ins",0))),
199 :
200 : // pattern("sql", "update", mvc_update_wrap, false, "...", args(1,7, arg("",int),
201 : // arg("mvc",int),
202 : // arg("sname",str),
203 : // arg("tname",str),
204 : // arg("cname",str),
205 : // argany("rids",0),
206 : // argany("upd",0)))
207 :
208 545939 : if ((p->fcnname == appendRef && p->argc != 8)
209 545946 : || (p->fcnname == updateRef && p->argc != 7))
210 : return false;
211 :
212 545946 : int mvc_var = getArg(p, 1);
213 545946 : if (getVarType(mb, mvc_var) != TYPE_int)
214 : return false;
215 :
216 545946 : int sname_var = getArg(p, 2);
217 545946 : if (getVarType(mb, sname_var) != TYPE_str || !isVarConstant(mb, sname_var))
218 : return false;
219 :
220 545959 : int tname_var = getArg(p, 3);
221 545959 : if (getVarType(mb, tname_var) != TYPE_str || !isVarConstant(mb, tname_var))
222 : return false;
223 :
224 545962 : int cname_var = getArg(p, 4);
225 545962 : if (getVarType(mb, cname_var) != TYPE_str || !isVarConstant(mb, cname_var))
226 : return false;
227 :
228 : return true;
229 : }
230 :
231 : static bool
232 545959 : sqlBreakpoint(MalBlkPtr mb, InstrPtr *first, InstrPtr *p)
233 : {
234 545959 : InstrPtr instr = *p;
235 545959 : if (!isSqlAppendUpdate(mb, instr))
236 : return true;
237 :
238 466873 : str my_sname = get_sql_sname(mb, instr);
239 466873 : str my_tname = get_sql_tname(mb, instr);
240 466873 : str my_cname = get_sql_cname(mb, instr);
241 16071738 : for (InstrPtr *q = first; q < p; q++) {
242 15604865 : str cname = get_sql_cname(mb, *q);
243 15604865 : if (strcmp(my_cname, cname) != 0) {
244 : // different cname, no conflict
245 15604865 : continue;
246 : }
247 0 : str tname = get_sql_tname(mb, *q);
248 0 : if (strcmp(my_tname, tname) != 0) {
249 : // different tname, no conflict
250 0 : continue;
251 : }
252 0 : str sname = get_sql_sname(mb, *q);
253 0 : if (strcmp(my_sname, sname) != 0) {
254 : // different sname, no conflict
255 0 : continue;
256 : }
257 : // Found a statement in the region that works on the same column so this is a breakpoint
258 : return true;
259 : }
260 :
261 : // None of the statements in the region works on this column so no breakpoint necessary
262 : return false;
263 : }
264 :
265 : static bool
266 12848421 : checkBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr *first, InstrPtr *p,
267 : States states, region_state *state)
268 : {
269 12848421 : InstrPtr instr = *p;
270 12848421 : switch (state->type) {
271 : case singleton_region:
272 : // by definition
273 : return true;
274 10885994 : case dataflow_region:
275 10885994 : return dataflowBreakpoint(cntxt, mb, instr, states);
276 6744 : case existing_region:
277 6744 : if (state->st.existing_region.level == 0) {
278 : // previous statement ended the region so we break here
279 : return true;
280 : }
281 5925 : if (blockStart(instr)) {
282 38 : state->st.existing_region.level += 1;
283 5887 : } else if (blockExit(instr)) {
284 857 : state->st.existing_region.level -= 1;
285 : }
286 : return false;
287 545944 : case sql_region:
288 545944 : return sqlBreakpoint(mb, first, p);
289 : default:
290 : // serious corruption has occurred.
291 0 : assert(0); /* corrupted region_type */
292 : abort();
293 : }
294 : assert(0); /* unreachable */
295 : return true;
296 : }
297 :
298 : static void
299 1179557 : decideRegionType(Client cntxt, MalBlkPtr mb, InstrPtr p, States states,
300 : region_state *state)
301 : {
302 1179557 : (void) cntxt;
303 :
304 1179557 : state->type = no_region;
305 1179557 : if (blockStart(p)) {
306 819 : state->type = existing_region;
307 819 : state->st.existing_region.level = 1;
308 1178738 : } else if (p->token == ENDsymbol) {
309 0 : state->type = existing_region;
310 1178738 : } else if (isSqlAppendUpdate(mb, p)) {
311 79080 : state->type = sql_region;
312 1099658 : } else if (p->barrier) {
313 644 : state->type = singleton_region;
314 1099014 : } else if (isUnsafeFunction(p)) {
315 399798 : state->type = singleton_region;
316 699393 : } else if (isUpdateInstruction(p)
317 36 : && getModuleId(p) != sqlRef
318 20 : && (getState(states, p, p->retc) & (VARREAD | VARBLOCK)) == 0) {
319 : // Special case. Unless they're from the sql module, instructions with
320 : // names like 'append', 'update', 'delete', 'grow', etc., are expected
321 : // to express their side effects as data dependencies, for example,
322 : // X5 := bat.append(X_5, ...)
323 20 : state->type = dataflow_region;
324 699208 : } else if (hasSideEffects(mb, p, false)) {
325 508677 : state->type = singleton_region;
326 190553 : } else if (isMultiplex(p)) {
327 4180 : state->type = singleton_region;
328 : } else {
329 186321 : state->type = dataflow_region;
330 : }
331 1179539 : assert(state->type != no_region);
332 1179539 : }
333 :
334 :
335 : /* dataflow blocks are transparent, because they are always
336 : executed, either sequentially or in parallel */
337 :
338 : str
339 496316 : OPTdataflowImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
340 : InstrPtr pci)
341 : {
342 496316 : int i, j, k, start, slimit, breakpoint, actions = 0;
343 496316 : bool simple = true;
344 496316 : int flowblock = 0;
345 496316 : InstrPtr p, *old = NULL, q;
346 496316 : int limit, vlimit;
347 496316 : States states = NULL;
348 496316 : region_state state = { singleton_region };
349 496316 : str msg = MAL_SUCCEED;
350 :
351 : /* don't use dataflow on single processor systems */
352 496316 : if (GDKnr_threads <= 1 || cntxt->workerlimit == 1)
353 0 : goto wrapup;
354 :
355 496316 : if (optimizerIsApplied(mb, dataflowRef))
356 0 : goto wrapup;
357 496356 : (void) stk;
358 : /* inlined functions will get their dataflow control later */
359 496356 : if (mb->inlineProp)
360 0 : goto wrapup;
361 :
362 496356 : vlimit = mb->vsize;
363 496356 : states = (States) GDKzalloc(vlimit * sizeof(char));
364 496364 : if (states == NULL) {
365 0 : throw(MAL, "optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
366 : }
367 :
368 496364 : setVariableScope(mb);
369 :
370 496253 : limit = mb->stop;
371 496253 : slimit = mb->ssize;
372 496253 : old = mb->stmt;
373 496253 : if (newMalBlkStmt(mb, mb->ssize) < 0) {
374 0 : GDKfree(states);
375 0 : throw(MAL, "optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
376 : }
377 :
378 : /* inject new dataflow barriers using a single pass through the program */
379 496366 : start = 0;
380 496366 : state.type = singleton_region;
381 12847235 : for (i = 1; mb->errors == NULL && i < limit; i++) {
382 12847235 : p = old[i];
383 12847235 : assert(p);
384 12847235 : breakpoint = checkBreakpoint(cntxt, mb, &old[start], &old[i], states, &state);
385 12847292 : if (breakpoint) {
386 : /* close previous flow block */
387 1675803 : simple = simpleFlow(old, start, i, &state);
388 :
389 1675803 : if (!simple) {
390 155427 : if ((flowblock = newTmpVariable(mb, TYPE_bit)) < 0
391 155437 : || (q = newFcnCall(mb, languageRef, dataflowRef)) == NULL) {
392 0 : msg = createException(MAL, "optimizer.dataflow",
393 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
394 0 : break;
395 : }
396 155512 : q->barrier = BARRIERsymbol;
397 155512 : getArg(q, 0) = flowblock;
398 155512 : pushInstruction(mb, q);
399 155512 : actions++;
400 : }
401 : // copyblock the collected statements
402 14523239 : for (j = start; j < i; j++) {
403 12847331 : q = old[j];
404 12847331 : pushInstruction(mb, q);
405 12847294 : old[j] = NULL;
406 : // collect BAT variables garbage collected within the block
407 12847294 : if (!simple)
408 60634792 : for (k = q->retc; k < q->argc; k++) {
409 49352322 : if (getState(states, q, k) & VAR2READ
410 14703958 : && getEndScope(mb, getArg(q, k)) == j
411 2709062 : && isaBatType(getVarType(mb, getArg(q, k)))) {
412 2641267 : InstrPtr r;
413 2641267 : r = newInstruction(NULL, languageRef, passRef);
414 2641277 : if (r == NULL) {
415 0 : msg = createException(MAL, "optimizer.dataflow",
416 : SQLSTATE(HY013)
417 : MAL_MALLOC_FAIL);
418 0 : break;
419 : }
420 2641277 : getArg(r, 0) = newTmpVariable(mb, TYPE_void);
421 2641273 : if (getArg(r, 0) < 0) {
422 0 : freeInstruction(r);
423 0 : msg = createException(MAL, "optimizer.dataflow",
424 : SQLSTATE(HY013)
425 : MAL_MALLOC_FAIL);
426 0 : break;
427 : }
428 2641273 : r = pushArgument(mb, r, getArg(q, k));
429 2641272 : pushInstruction(mb, r);
430 : }
431 : }
432 12847351 : if (msg)
433 : break;
434 : }
435 1675908 : if (msg)
436 : break;
437 : /* exit parallel block */
438 1675908 : if (!simple) {
439 155507 : q = newAssignment(mb);
440 155511 : if (q == NULL) {
441 0 : msg = createException(MAL, "optimizer.dataflow",
442 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
443 0 : break;
444 : }
445 155511 : q->barrier = EXITsymbol;
446 155511 : getArg(q, 0) = flowblock;
447 155511 : pushInstruction(mb, q);
448 : }
449 1675912 : if (p->token == ENDsymbol) {
450 : break;
451 : }
452 : // Start a new region
453 1179632 : memset((char *) states, 0, vlimit * sizeof(char));
454 1179632 : start = i;
455 1179632 : decideRegionType(cntxt, mb, p, states, &state);
456 : }
457 : // remember you assigned/read variables
458 26141961 : for (k = 0; k < p->retc; k++)
459 13791059 : setState(states, p, k, VARWRITE);
460 12350902 : if (isUpdateInstruction(p)
461 1024920 : && (getState(states, p, 1) == 0
462 847832 : || getState(states, p, 1) & VARWRITE))
463 565328 : setState(states, p, 1, VARBLOCK);
464 65763450 : for (k = p->retc; k < p->argc; k++)
465 53412581 : if (!isVarConstant(mb, getArg(p, k))) {
466 25918354 : if (getState(states, p, k) & VARREAD)
467 12019320 : setState(states, p, k, VAR2READ);
468 13899034 : else if (getState(states, p, k) & VARWRITE)
469 11257316 : setState(states, p, k, VARREAD);
470 : }
471 : }
472 :
473 : /* take the remainder as is */
474 121099731 : for (; i < slimit; i++)
475 120603371 : if (old[i])
476 14455928 : pushInstruction(mb, old[i]);
477 : /* Defense line against incorrect plans */
478 496360 : if (msg == MAL_SUCCEED && actions > 0) {
479 142250 : msg = chkTypes(cntxt->usermodule, mb, FALSE);
480 142240 : if (msg == MAL_SUCCEED) {
481 142240 : msg = chkFlow(mb);
482 142209 : if (msg == MAL_SUCCEED)
483 142208 : msg = chkDeclarations(mb);
484 : }
485 : }
486 354111 : wrapup:
487 : /* keep actions taken as a fake argument */
488 496344 : (void) pushInt(mb, pci, actions);
489 :
490 496325 : if (states)
491 496325 : GDKfree(states);
492 496367 : if (old)
493 496367 : GDKfree(old);
494 : return msg;
495 : }
|