LCOV - code coverage report
Current view: top level - monetdb5/optimizer - opt_dataflow.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 195 218 89.4 %
Date: 2024-12-20 21:24:02 Functions: 7 7 100.0 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /*
      14             :  * The statements are all checked for being eligible for dataflow.
      15             :  */
      16             : #include "monetdb_config.h"
      17             : #include "opt_dataflow.h"
      18             : #include "mal_instruction.h"
      19             : #include "mal_interpreter.h"
      20             : #include "manifold.h"
      21             : 
      22             : /*
      23             :  * Dataflow processing incurs overhead and is only
      24             :  * relevant if multiple tasks kan be handled at the same time.
      25             :  * Also simple expressions dont have to be executed in parallel.
      26             :  *
      27             :  * The dataflow analysis centers around the read/write use patterns of
      28             :  * the variables and the occurrence of side-effect bearing functions.
      29             :  * Any such function should break the dataflow block as it may rely
      30             :  * on the sequential order in the plan.
      31             :  *
      32             :  * The following state properties can be distinguished for all variables:
      33             :  * VARWRITE  - variable assigned a value in the dataflow block
      34             :  * VARREAD   - variable is used in an argument
      35             :  * VAR2READ  - variable is read in concurrent mode
      36             :  * VARBLOCK  - variable next use terminate the // block, set after encountering an update
      37             :  *
      38             :  * Only some combinations are allowed.
      39             :  */
      40             : 
      41             : #define VARFREE  0
      42             : #define VARWRITE 1
      43             : #define VARREAD  2
      44             : #define VARBLOCK 4
      45             : #define VAR2READ 8
      46             : 
      47             : typedef char *States;
      48             : 
      49             : #define setState(S,P,K,F)  ( assert(getArg(P,K) < vlimit), (S)[getArg(P,K)] |= F)
      50             : #define getState(S,P,K)  ((S)[getArg(P,K)])
      51             : 
      52             : typedef enum {
      53             :         no_region,
      54             :         singleton_region,                       // always a single statement
      55             :         dataflow_region,                        // statements without or with controlled side effects, in parallel
      56             :         existing_region,                        // existing barrier..exit region, copied as-is
      57             :         sql_region,                                     // region of nonconflicting sql.append/sql.updates only
      58             : } region_type;
      59             : 
      60             : typedef struct {
      61             :         region_type type;
      62             :         union {
      63             :                 struct {
      64             :                         int level;                      // level of nesting
      65             :                 } existing_region;
      66             :         } st;
      67             : } region_state;
      68             : 
      69             : static bool
      70     1676675 : simpleFlow(InstrPtr *old, int start, int last, region_state *state)
      71             : {
      72     1676675 :         int i, j, k;
      73     1676675 :         bool simple = true;
      74     1676675 :         InstrPtr p = NULL, q;
      75             : 
      76             :         /* ignore trivial blocks */
      77     1676675 :         if (last - start == 1)
      78             :                 return true;
      79      174779 :         if (state->type == existing_region) {
      80             :                 /* don't add additional barriers and garbage collection around
      81             :                  * existing region. */
      82             :                 return true;
      83             :         }
      84             :         /* skip sequence of simple arithmetic first */
      85      349241 :         for (; simple && start < last; start++)
      86      175281 :                 if (old[start]) {
      87      175281 :                         p = old[start];
      88      348559 :                         simple = getModuleId(p) == calcRef || getModuleId(p) == mtimeRef
      89      348559 :                                         || getModuleId(p) == strRef || getModuleId(p) == mmathRef;
      90             :                 }
      91      298573 :         for (i = start; i < last; i++)
      92      279909 :                 if (old[i]) {
      93      279909 :                         q = old[i];
      94      268028 :                         simple = getModuleId(q) == calcRef || getModuleId(q) == mtimeRef
      95      547937 :                                         || getModuleId(q) == strRef || getModuleId(q) == mmathRef;
      96      279909 :                         if (!simple) {
      97             :                                 /* if not arithmetic than we should consume the previous result directly */
      98     1569054 :                                 for (j = q->retc; j < q->argc; j++)
      99     3112506 :                                         for (k = 0; k < p->retc; k++)
     100     1811457 :                                                 if (getArg(p, k) == getArg(q, j))
     101      113130 :                                                         simple = true;
     102      268005 :                                 if (!simple)
     103             :                                         return false;
     104             :                         }
     105             :                         p = q;
     106             :                 }
     107             :         return simple;
     108             : }
     109             : 
     110             : /* Updates are permitted if it is a unique update on
     111             :  * a BAT created in the context of this block
     112             :  * As far as we know, no SQL nor MAL test re-uses the
     113             :  * target BAT to insert again and subsequently calls dataflow.
     114             :  * In MAL scripts, they still can occur.
     115             : */
     116             : 
     117             : /* a limited set of MAL instructions may appear in the dataflow block*/
     118             : static int
     119    10908106 : dataflowBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr p, States states)
     120             : {
     121    10908106 :         int j;
     122             : 
     123    10908106 :         if (p->token == ENDsymbol || p->barrier || isUnsafeFunction(p)
     124    10760354 :                 || (isMultiplex(p) && MANIFOLDtypecheck(cntxt, mb, p, 0) == NULL)) {
     125      148745 :                 return TRUE;
     126             :         }
     127             : 
     128             :         /* flow blocks should be closed when we reach a point
     129             :            where a variable is assigned  more then once or already
     130             :            being read.
     131             :          */
     132    22864811 :         for (j = 0; j < p->retc; j++)
     133    12105244 :                 if (getState(states, p, j) & (VARWRITE | VARREAD | VARBLOCK)) {
     134             :                         return 1;
     135             :                 }
     136             : 
     137             :         /* update instructions can be updated if the target variable
     138             :          * has not been read in the block so far */
     139    10759567 :         if (isUpdateInstruction(p)) {
     140             :                 /* the SQL update functions change BATs that are not
     141             :                  * explicitly mentioned as arguments (and certainly not as the
     142             :                  * first argument), but that can still be available to the MAL
     143             :                  * program (see bugs.monetdb.org/6641) */
     144      313698 :                 if (getModuleId(p) == sqlRef)
     145             :                         return 1;
     146      310626 :                 return getState(states, p, p->retc) & (VARREAD | VARBLOCK);
     147             :         }
     148             : 
     149    55150547 :         for (j = p->retc; j < p->argc; j++) {
     150    44704848 :                 if (getState(states, p, j) & VARBLOCK) {
     151             :                         return 1;
     152             :                 }
     153             :         }
     154    10445699 :         return hasSideEffects(mb, p, FALSE);
     155             : }
     156             : 
     157             : static str
     158    16066851 : get_str_arg(MalBlkPtr mb, InstrPtr p, int argno)
     159             : {
     160    16066851 :         int var = getArg(p, argno);
     161    16066851 :         return getVarConstant(mb, var).val.sval;
     162             : }
     163             : 
     164             : static str
     165      465496 : get_sql_sname(MalBlkPtr mb, InstrPtr p)
     166             : {
     167      465496 :         return get_str_arg(mb, p, 2);
     168             : }
     169             : 
     170             : static str
     171      465496 : get_sql_tname(MalBlkPtr mb, InstrPtr p)
     172             : {
     173      465496 :         return get_str_arg(mb, p, 3);
     174             : }
     175             : 
     176             : static str
     177    16066851 : get_sql_cname(MalBlkPtr mb, InstrPtr p)
     178             : {
     179    16066851 :         return get_str_arg(mb, p, 4);
     180             : }
     181             : 
     182             : 
     183             : static bool
     184     1722532 : isSqlAppendUpdate(MalBlkPtr mb, InstrPtr p)
     185             : {
     186     1722532 :         if (p->modname != sqlRef)
     187             :                 return false;
     188     1209767 :         if (p->fcnname != appendRef && p->fcnname != updateRef)
     189             :                 return false;
     190             : 
     191             :         // pattern("sql", "append", mvc_append_wrap, false, "...", args(1,8, arg("",int),
     192             :         //                        arg("mvc",int),
     193             :         //                        arg("sname",str),
     194             :         //                        arg("tname",str),
     195             :         //                        arg("cname",str),
     196             :         //                        arg("offset",lng),
     197             :         //                        batarg("pos",oid),
     198             :         //                        argany("ins",0))),
     199             : 
     200             :         // pattern("sql", "update", mvc_update_wrap, false, "...", args(1,7, arg("",int),
     201             :         //                        arg("mvc",int),
     202             :         //                        arg("sname",str),
     203             :         //                        arg("tname",str),
     204             :         //                        arg("cname",str),
     205             :         //                        argany("rids",0),
     206             :         //                        argany("upd",0)))
     207             : 
     208      544169 :         if ((p->fcnname == appendRef && p->argc != 8)
     209      544178 :                 || (p->fcnname == updateRef && p->argc != 7))
     210             :                 return false;
     211             : 
     212      544178 :         int mvc_var = getArg(p, 1);
     213      544178 :         if (getVarType(mb, mvc_var) != TYPE_int)
     214             :                 return false;
     215             : 
     216      544178 :         int sname_var = getArg(p, 2);
     217      544178 :         if (getVarType(mb, sname_var) != TYPE_str || !isVarConstant(mb, sname_var))
     218             :                 return false;
     219             : 
     220      544242 :         int tname_var = getArg(p, 3);
     221      544242 :         if (getVarType(mb, tname_var) != TYPE_str || !isVarConstant(mb, tname_var))
     222             :                 return false;
     223             : 
     224      544245 :         int cname_var = getArg(p, 4);
     225      544245 :         if (getVarType(mb, cname_var) != TYPE_str || !isVarConstant(mb, cname_var))
     226             :                 return false;
     227             : 
     228             :         return true;
     229             : }
     230             : 
     231             : static bool
     232      544243 : sqlBreakpoint(MalBlkPtr mb, InstrPtr *first, InstrPtr *p)
     233             : {
     234      544243 :         InstrPtr instr = *p;
     235      544243 :         if (!isSqlAppendUpdate(mb, instr))
     236             :                 return true;
     237             : 
     238      465496 :         str my_sname = get_sql_sname(mb, instr);
     239      465496 :         str my_tname = get_sql_tname(mb, instr);
     240      465496 :         str my_cname = get_sql_cname(mb, instr);
     241    16066851 :         for (InstrPtr *q = first; q < p; q++) {
     242    15601355 :                 str cname = get_sql_cname(mb, *q);
     243    15601355 :                 if (strcmp(my_cname, cname) != 0) {
     244             :                         // different cname, no conflict
     245    15601355 :                         continue;
     246             :                 }
     247           0 :                 str tname = get_sql_tname(mb, *q);
     248           0 :                 if (strcmp(my_tname, tname) != 0) {
     249             :                         // different tname, no conflict
     250           0 :                         continue;
     251             :                 }
     252           0 :                 str sname = get_sql_sname(mb, *q);
     253           0 :                 if (strcmp(my_sname, sname) != 0) {
     254             :                         // different sname, no conflict
     255           0 :                         continue;
     256             :                 }
     257             :                 // Found a statement in the region that works on the same column so this is a breakpoint
     258             :                 return true;
     259             :         }
     260             : 
     261             :         // None of the statements in the region works on this column so no breakpoint necessary
     262             :         return false;
     263             : }
     264             : 
     265             : static bool
     266    12869725 : checkBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr *first, InstrPtr *p,
     267             :                                 States states, region_state *state)
     268             : {
     269    12869725 :         InstrPtr instr = *p;
     270    12869725 :         switch (state->type) {
     271             :         case singleton_region:
     272             :                 // by definition
     273             :                 return true;
     274    10907952 :         case dataflow_region:
     275    10907952 :                 return dataflowBreakpoint(cntxt, mb, instr, states);
     276        6744 :         case existing_region:
     277        6744 :                 if (state->st.existing_region.level == 0) {
     278             :                         // previous statement ended the region so we break here
     279             :                         return true;
     280             :                 }
     281        5925 :                 if (blockStart(instr)) {
     282          38 :                         state->st.existing_region.level += 1;
     283        5887 :                 } else if (blockExit(instr)) {
     284         857 :                         state->st.existing_region.level -= 1;
     285             :                 }
     286             :                 return false;
     287      544212 :         case sql_region:
     288      544212 :                 return sqlBreakpoint(mb, first, p);
     289             :         default:
     290             :                 // serious corruption has occurred.
     291           0 :                 assert(0);                              /* corrupted region_type */
     292             :                 abort();
     293             :         }
     294             :         assert(0);                                      /* unreachable */
     295             :         return true;
     296             : }
     297             : 
     298             : static void
     299     1179295 : decideRegionType(Client cntxt, MalBlkPtr mb, InstrPtr p, States states,
     300             :                                  region_state *state)
     301             : {
     302     1179295 :         (void) cntxt;
     303             : 
     304     1179295 :         state->type = no_region;
     305     1179295 :         if (blockStart(p)) {
     306         819 :                 state->type = existing_region;
     307         819 :                 state->st.existing_region.level = 1;
     308     1178476 :         } else if (p->token == ENDsymbol) {
     309           0 :                 state->type = existing_region;
     310     1178476 :         } else if (isSqlAppendUpdate(mb, p)) {
     311       78732 :                 state->type = sql_region;
     312     1099744 :         } else if (p->barrier) {
     313         644 :                 state->type = singleton_region;
     314     1099100 :         } else if (isUnsafeFunction(p)) {
     315      399111 :                 state->type = singleton_region;
     316      700297 :         } else if (isUpdateInstruction(p)
     317          36 :                            && getModuleId(p) != sqlRef
     318          20 :                            && (getState(states, p, p->retc) & (VARREAD | VARBLOCK)) == 0) {
     319             :                 // Special case. Unless they're from the sql module, instructions with
     320             :                 // names like 'append', 'update', 'delete', 'grow', etc., are expected
     321             :                 // to express their side effects as data dependencies, for example,
     322             :                 //       X5 := bat.append(X_5, ...)
     323          20 :                 state->type = dataflow_region;
     324      699981 :         } else if (hasSideEffects(mb, p, false)) {
     325      509436 :                 state->type = singleton_region;
     326      190426 :         } else if (isMultiplex(p)) {
     327        4180 :                 state->type = singleton_region;
     328             :         } else {
     329      186193 :                 state->type = dataflow_region;
     330             :         }
     331     1179135 :         assert(state->type != no_region);
     332     1179135 : }
     333             : 
     334             : 
     335             : /* dataflow blocks are transparent, because they are always
     336             :    executed, either sequentially or in parallel */
     337             : 
     338             : str
     339      497220 : OPTdataflowImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
     340             :                                                   InstrPtr pci)
     341             : {
     342      497220 :         int i, j, k, start, slimit, breakpoint, actions = 0;
     343      497220 :         bool simple = true;
     344      497220 :         int flowblock = 0;
     345      497220 :         InstrPtr p, *old = NULL, q;
     346      497220 :         int limit, vlimit;
     347      497220 :         States states = NULL;
     348      497220 :         region_state state = { singleton_region };
     349      497220 :         str msg = MAL_SUCCEED;
     350             : 
     351             :         /* don't use dataflow on single processor systems */
     352      497220 :         if (GDKnr_threads <= 1 || cntxt->workerlimit == 1)
     353           0 :                 goto wrapup;
     354             : 
     355      497220 :         if (optimizerIsApplied(mb, dataflowRef))
     356           0 :                 goto wrapup;
     357      497314 :         (void) stk;
     358             :         /* inlined functions will get their dataflow control later */
     359      497314 :         if (mb->inlineProp)
     360           0 :                 goto wrapup;
     361             : 
     362      497314 :         vlimit = mb->vsize;
     363      497314 :         states = (States) GDKzalloc(vlimit * sizeof(char));
     364      497323 :         if (states == NULL) {
     365           0 :                 throw(MAL, "optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     366             :         }
     367             : 
     368      497323 :         setVariableScope(mb);
     369             : 
     370      497178 :         limit = mb->stop;
     371      497178 :         slimit = mb->ssize;
     372      497178 :         old = mb->stmt;
     373      497178 :         if (newMalBlkStmt(mb, mb->ssize) < 0) {
     374           0 :                 GDKfree(states);
     375           0 :                 throw(MAL, "optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     376             :         }
     377             : 
     378             :         /* inject new dataflow barriers using a single pass through the program */
     379      497322 :         start = 0;
     380      497322 :         state.type = singleton_region;
     381    12868435 :         for (i = 1; mb->errors == NULL && i < limit; i++) {
     382    12868435 :                 p = old[i];
     383    12868435 :                 assert(p);
     384    12868435 :                 breakpoint = checkBreakpoint(cntxt, mb, &old[start], &old[i], states, &state);
     385    12868772 :                 if (breakpoint) {
     386             :                         /* close previous flow block */
     387     1676231 :                         simple = simpleFlow(old, start, i, &state);
     388             : 
     389     1676231 :                         if (!simple) {
     390      155285 :                                 if ((flowblock = newTmpVariable(mb, TYPE_bit)) < 0
     391      155393 :                                         || (q = newFcnCall(mb, languageRef, dataflowRef)) == NULL) {
     392           0 :                                         msg = createException(MAL, "optimizer.dataflow",
     393             :                                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     394           0 :                                         break;
     395             :                                 }
     396      155519 :                                 q->barrier = BARRIERsymbol;
     397      155519 :                                 getArg(q, 0) = flowblock;
     398      155519 :                                 pushInstruction(mb, q);
     399      155519 :                                 actions++;
     400             :                         }
     401             :                         // copyblock the collected statements
     402    14545214 :                         for (j = start; j < i; j++) {
     403    12868523 :                                 q = old[j];
     404    12868523 :                                 pushInstruction(mb, q);
     405    12868713 :                                 old[j] = NULL;
     406             :                                 // collect BAT variables garbage collected within the block
     407    12868713 :                                 if (!simple)
     408    60781753 :                                         for (k = q->retc; k < q->argc; k++) {
     409    49478480 :                                                 if (getState(states, q, k) & VAR2READ
     410    14723947 :                                                         && getEndScope(mb, getArg(q, k)) == j
     411     2710319 :                                                         && isaBatType(getVarType(mb, getArg(q, k)))) {
     412     2642195 :                                                         InstrPtr r;
     413     2642195 :                                                         r = newInstruction(NULL, languageRef, passRef);
     414     2642202 :                                                         if (r == NULL) {
     415           0 :                                                                 msg = createException(MAL, "optimizer.dataflow",
     416             :                                                                                                           SQLSTATE(HY013)
     417             :                                                                                                           MAL_MALLOC_FAIL);
     418           0 :                                                                 break;
     419             :                                                         }
     420     2642202 :                                                         getArg(r, 0) = newTmpVariable(mb, TYPE_void);
     421     2642199 :                                                         if (getArg(r, 0) < 0) {
     422           0 :                                                                 freeInstruction(r);
     423           0 :                                                                 msg = createException(MAL, "optimizer.dataflow",
     424             :                                                                                                           SQLSTATE(HY013)
     425             :                                                                                                           MAL_MALLOC_FAIL);
     426           0 :                                                                 break;
     427             :                                                         }
     428     2642199 :                                                         r = pushArgument(mb, r, getArg(q, k));
     429     2642199 :                                                         pushInstruction(mb, r);
     430             :                                                 }
     431             :                                         }
     432    12868749 :                                 if (msg)
     433             :                                         break;
     434             :                         }
     435     1676691 :                         if (msg)
     436             :                                 break;
     437             :                         /* exit parallel block */
     438     1676691 :                         if (!simple) {
     439      155513 :                                 q = newAssignment(mb);
     440      155517 :                                 if (q == NULL) {
     441           0 :                                         msg = createException(MAL, "optimizer.dataflow",
     442             :                                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     443           0 :                                         break;
     444             :                                 }
     445      155517 :                                 q->barrier = EXITsymbol;
     446      155517 :                                 getArg(q, 0) = flowblock;
     447      155517 :                                 pushInstruction(mb, q);
     448             :                         }
     449     1676695 :                         if (p->token == ENDsymbol) {
     450             :                                 break;
     451             :                         }
     452             :                         // Start a new region
     453     1179411 :                         memset((char *) states, 0, vlimit * sizeof(char));
     454     1179411 :                         start = i;
     455     1179411 :                         decideRegionType(cntxt, mb, p, states, &state);
     456             :                 }
     457             :                 // remember you assigned/read variables
     458    26182190 :                 for (k = 0; k < p->retc; k++)
     459    13810620 :                         setState(states, p, k, VARWRITE);
     460    12371570 :                 if (isUpdateInstruction(p)
     461     1022474 :                         && (getState(states, p, 1) == 0
     462      846045 :                                 || getState(states, p, 1) & VARWRITE))
     463      564257 :                         setState(states, p, 1, VARBLOCK);
     464    65907724 :                 for (k = p->retc; k < p->argc; k++)
     465    53536611 :                         if (!isVarConstant(mb, getArg(p, k))) {
     466    25951510 :                                 if (getState(states, p, k) & VARREAD)
     467    12037965 :                                         setState(states, p, k, VAR2READ);
     468    13913545 :                                 else if (getState(states, p, k) & VARWRITE)
     469    11275862 :                                         setState(states, p, k, VARREAD);
     470             :                         }
     471             :         }
     472             : 
     473             :         /* take the remainder as is */
     474   121311598 :         for (; i < slimit; i++)
     475   120814280 :                 if (old[i])
     476    14485000 :                         pushInstruction(mb, old[i]);
     477             :         /* Defense line against incorrect plans */
     478      497318 :         if (msg == MAL_SUCCEED && actions > 0) {
     479      142257 :                 msg = chkTypes(cntxt->usermodule, mb, FALSE);
     480      142245 :                 if (msg == MAL_SUCCEED) {
     481      142245 :                         msg = chkFlow(mb);
     482      142233 :                         if (msg == MAL_SUCCEED)
     483      142237 :                                 msg = chkDeclarations(mb);
     484             :                 }
     485             :         }
     486      355057 :   wrapup:
     487             :         /* keep actions taken as a fake argument */
     488      497300 :         (void) pushInt(mb, pci, actions);
     489             : 
     490      497277 :         if (states)
     491      497277 :                 GDKfree(states);
     492      497320 :         if (old)
     493      497320 :                 GDKfree(old);
     494             :         return msg;
     495             : }

Generated by: LCOV version 1.14