LCOV - code coverage report
Current view: top level - monetdb5/optimizer - opt_remoteQueries.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 43 200 21.5 %
Date: 2024-12-19 23:10:26 Functions: 1 2 50.0 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "opt_remoteQueries.h"
      15             : #include "mal_interpreter.h"  /* for showErrors() */
      16             : #include "mal_builder.h"
      17             : 
      18             : /*
      19             :  * The instruction sent is produced with a variation of call2str
      20             :  * from the debugger.
      21             :  */
      22             : static str
      23           0 : RQcall2str(MalBlkPtr mb, InstrPtr p)
      24             : {
      25           0 :         int k;
      26           0 :         size_t len = 1;
      27           0 :         str msg;
      28           0 :         str s, cv = NULL;
      29             : 
      30           0 :         msg = (str) GDKmalloc(BUFSIZ);
      31           0 :         if (msg == NULL)
      32             :                 return NULL;
      33           0 :         msg[0] = '#';
      34           0 :         msg[1] = 0;
      35           0 :         if (p->barrier)
      36           0 :                 strcat(msg, operatorName(p->barrier));
      37             : 
      38           0 :         if (p->retc > 1)
      39           0 :                 strcat(msg, "(");
      40           0 :         len = strlen(msg);
      41           0 :         for (k = 0; k < p->retc; k++) {
      42           0 :                 getVarNameIntoBuffer(mb, getArg(p, k), msg + len);
      43           0 :                 if (k < p->retc - 1)
      44           0 :                         strcat(msg, ",");
      45           0 :                 len = strlen(msg);
      46             :         }
      47           0 :         if (p->retc > 1)
      48           0 :                 strcat(msg, ")");
      49           0 :         sprintf(msg + len, ":= %s.%s(", getModuleId(p), getFunctionId(p));
      50           0 :         s = strchr(msg, '(');
      51           0 :         if (s) {
      52           0 :                 s++;
      53           0 :                 *s = 0;
      54           0 :                 len = strlen(msg);
      55           0 :                 for (k = p->retc; k < p->argc; k++) {
      56           0 :                         VarPtr v = getVar(mb, getArg(p, k));
      57           0 :                         if (isVarConstant(mb, getArg(p, k))) {
      58           0 :                                 if (v->type == TYPE_void) {
      59           0 :                                         sprintf(msg + len, "nil");
      60             :                                 } else {
      61           0 :                                         if ((cv = VALformat(&v->value)) == NULL) {
      62           0 :                                                 GDKfree(msg);
      63           0 :                                                 return NULL;
      64             :                                         }
      65           0 :                                         sprintf(msg + len, "%s:%s", cv, ATOMname(v->type));
      66           0 :                                         GDKfree(cv);
      67             :                                 }
      68             : 
      69             :                         } else
      70           0 :                                 getVarNameIntoBuffer(mb, getArg(p, k), msg + len);
      71           0 :                         if (k < p->argc - 1)
      72           0 :                                 strcat(msg, ",");
      73           0 :                         len = strlen(msg);
      74             :                 }
      75           0 :                 strcat(msg, ");");
      76             :         }
      77             : /* printf("#RQcall:%s\n",msg);*/
      78             :         return msg;
      79             : }
      80             : 
      81             : /*
      82             :  * The algorithm follows the common scheme used so far.
      83             :  * Instructions are taken out one-by-one and copied
      84             :  * to the new block.
      85             :  *
      86             :  * A local cache of connections is established, because
      87             :  * the statements related to a single remote database
      88             :  * should be executed in the same stack context.
      89             :  * A pitfall is to create multiple connections with
      90             :  * their isolated runtime environment.
      91             :  */
      92             : #define lookupServer(X)                                                                                                 \
      93             :         do {                                                                                                                            \
      94             :                 /* lookup the server connection */                                                              \
      95             :                 if (location[getArg(p, 0)] == 0) {                                                              \
      96             :                         db = 0;                                                                                                         \
      97             :                         if (isVarConstant(mb, getArg(p, X)))                                            \
      98             :                                 db = getVarConstant(mb, getArg(p, X)).val.sval;                 \
      99             :                         for (k = 0; k < dbtop; k++)                                                                  \
     100             :                                 if (strcmp(db, dbalias[k].dbname) == 0)                                 \
     101             :                                         break;                                                                                          \
     102             :                                                                                                                                                 \
     103             :                         if (k == dbtop) {                                                                                       \
     104             :                                 r = newInstruction(mb, mapiRef, lookupRef);                             \
     105             :                                 if (r == NULL) {                                                                                \
     106             :                                         msg = createException(MAL, "optimizer.remote",                \
     107             :                                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL); \
     108             :                                         goto bailout;                                                                           \
     109             :                                 }                                                                                                               \
     110             :                                 j = getArg(r, 0) = newTmpVariable(mb, TYPE_int);                \
     111             :                                 r = pushArgument(mb, r, getArg(p, X));                                  \
     112             :                                 pushInstruction(mb, r);                                                                 \
     113             :                                 dbalias[dbtop].dbhdl = j;                                                               \
     114             :                                 dbalias[dbtop++].dbname = db;                                                   \
     115             :                                 if (dbtop == 127)                                                                               \
     116             :                                         dbtop--;                                                                                        \
     117             :                         } else                                                                                                          \
     118             :                                 j = dbalias[k].dbhdl;                                                                   \
     119             :                         location[getArg(p, 0)] = j;                                                                     \
     120             :                 } else                                                                                                                  \
     121             :                         j = location[getArg(p, 0)];                                                                     \
     122             :         } while (0)
     123             : 
     124             : #define prepareRemote(X)                                                                                        \
     125             :         do {                                                                                                                    \
     126             :                 r = newInstruction(mb, mapiRef, rpcRef);                                        \
     127             :                 if (r == NULL) {                                                                                        \
     128             :                         msg = createException(MAL, "optimizer.remote",                        \
     129             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);     \
     130             :                         goto bailout;                                                                                   \
     131             :                 }                                                                                                                       \
     132             :                 getArg(r, 0) = newTmpVariable(mb, X);                                           \
     133             :                 r = pushArgument(mb, r, j);                                                                     \
     134             :         } while (0)
     135             : 
     136             : #define putRemoteVariables()                                                                                    \
     137             :         do {                                                                                                                            \
     138             :                 for (j = p->retc; j < p->argc; j++) {                                                  \
     139             :                         if (location[getArg(p, j)] == 0 && !isVarConstant(mb, getArg(p, j))) { \
     140             :                                 q = newInstruction(0, mapiRef, putRef);                                 \
     141             :                                 if (q == NULL) {                                                                                \
     142             :                                         freeInstruction(r);                                                                     \
     143             :                                         msg = createException(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL); \
     144             :                                         break;                                                                                          \
     145             :                                 }                                                                                                               \
     146             :                                 getArg(q, 0) = newTmpVariable(mb, TYPE_void);                   \
     147             :                                 q = pushArgument(mb, q, location[getArg(p, j)]);                \
     148             :                                 q = pushStr(mb, q, getVarNameIntoBuffer(mb, getArg(p, j), name));               \
     149             :                                 q = pushArgument(mb, q, getArg(p, j));                                  \
     150             :                                 pushInstruction(mb, q);                                                                 \
     151             :                         }                                                                                                                       \
     152             :                 }                                                                                                                               \
     153             :         } while (0)
     154             : 
     155             : #define remoteAction()                                                  \
     156             :         do {                                                                            \
     157             :                 s = RQcall2str(mb, p);                                  \
     158             :                 r = pushStr(mb, r, s + 1);                              \
     159             :                 GDKfree(s);                                                             \
     160             :                 pushInstruction(mb, r);                                 \
     161             :                 freeInstruction(p);                                             \
     162             :                 actions++;                                                              \
     163             :         } while (0)
     164             : 
     165             : typedef struct {
     166             :         str dbname;
     167             :         int dbhdl;
     168             : } DBalias;
     169             : 
     170             : str
     171           1 : OPTremoteQueriesImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
     172             :                                                            InstrPtr pci)
     173             : {
     174           1 :         InstrPtr p, q, r, *old;
     175           1 :         int i, j, k, cnt, limit, slimit, actions = 0;
     176           1 :         int remoteSite;
     177           1 :         bool collectFirst;
     178           1 :         int *location;
     179           1 :         DBalias dbalias[128];
     180           1 :         int dbtop;
     181           1 :         char buf[BUFSIZ], *s, *db, name[IDLENGTH];
     182           1 :         ValRecord cst;
     183           1 :         str msg = MAL_SUCCEED;
     184             : 
     185           1 :         cst.vtype = TYPE_int;
     186           1 :         cst.val.ival = 0;
     187           1 :         cst.len = 0;
     188             : 
     189           1 :         (void) cntxt;
     190           1 :         (void) stk;
     191             : 
     192           1 :         limit = mb->stop;
     193           1 :         slimit = mb->ssize;
     194           1 :         old = mb->stmt;
     195             : 
     196           1 :         location = (int *) GDKzalloc(mb->vsize * sizeof(int));
     197           1 :         if (location == NULL)
     198           0 :                 throw(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     199           1 :         memset(dbalias, 0, sizeof(dbalias));
     200           1 :         dbtop = 0;
     201             : 
     202           1 :         if (newMalBlkStmt(mb, mb->ssize) < 0) {
     203           0 :                 GDKfree(location);
     204           0 :                 throw(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     205             :         }
     206             : 
     207           4 :         for (i = 0; i < limit; i++) {
     208           3 :                 p = old[i];
     209             : 
     210             :                 /* detect remote instructions */
     211           3 :                 cnt = 0;
     212           5 :                 for (j = 0; j < p->argc; j++)
     213           2 :                         if (location[getArg(p, j)])
     214           0 :                                 cnt++;
     215             : 
     216             :                 /* detect remote variable binding */
     217             : 
     218           3 :                 if ((getModuleId(p) == mapiRef && getFunctionId(p) == bindRef)) {
     219           0 :                         if (p->argc == 3 && getArgType(mb, p, 1) == TYPE_int) {
     220           0 :                                 int tpe;
     221           0 :                                 j = getArg(p, 1);       /* lookupServer with key */
     222           0 :                                 tpe = getArgType(mb, p, 0);
     223             :                                 /* result is remote */
     224           0 :                                 location[getArg(p, 0)] = j;
     225             : 
     226             :                                 /* turn the instruction into a local one */
     227             :                                 /* one argument less */
     228           0 :                                 p->argc--;
     229             :                                 /* only use the second argument (string) */
     230           0 :                                 getArg(p, 1) = getArg(p, 2);
     231             : 
     232           0 :                                 getModuleId(p) = bbpRef;
     233             : 
     234           0 :                                 prepareRemote(tpe);
     235           0 :                                 putRemoteVariables();
     236           0 :                                 remoteAction();
     237             :                         } else
     238           0 :                                 pushInstruction(mb, p);
     239           3 :                 } else if ((getModuleId(p) == sqlRef && getFunctionId(p) == evalRef)) {
     240           0 :                         if (p->argc == 3) {
     241             :                                 /* a remote sql eval is needed */
     242           0 :                                 lookupServer(1);
     243             :                                 /* turn the instruction into a local one */
     244             :                                 /* one argument less */
     245           0 :                                 p->argc--;
     246             :                                 /* only use the second argument (string) */
     247           0 :                                 getArg(p, 1) = getArg(p, 2);
     248             : 
     249           0 :                                 prepareRemote(TYPE_void);
     250           0 :                                 s = RQcall2str(mb, p);
     251           0 :                                 r = pushStr(mb, r, s + 1);
     252           0 :                                 GDKfree(s);
     253           0 :                                 pushInstruction(mb, r);
     254           0 :                                 freeInstruction(p);
     255           0 :                                 actions++;
     256             :                         }
     257           3 :                 } else if ((getModuleId(p) == sqlRef && getFunctionId(p) == bindRef)) {
     258             : 
     259           0 :                         if (p->argc == 6 && getArgType(mb, p, 4) == TYPE_str) {
     260           0 :                                 int tpe;
     261           0 :                                 j = getArg(p, 1);       /* lookupServer with key */
     262           0 :                                 tpe = getArgType(mb, p, 0);
     263             : 
     264           0 :                                 lookupServer(4);
     265             :                                 /* turn the instruction into a local one */
     266           0 :                                 k = defConstant(mb, TYPE_int, &cst);
     267           0 :                                 if (k >= 0) {
     268           0 :                                         getArg(p, 4) = k;
     269           0 :                                         prepareRemote(tpe);
     270           0 :                                         putRemoteVariables();
     271           0 :                                         remoteAction();
     272             :                                 }
     273             :                         } else
     274           0 :                                 pushInstruction(mb, p);
     275           3 :                 } else if (getModuleId(p) == sqlRef && getFunctionId(p) == binddbatRef) {
     276             : 
     277           0 :                         if (p->argc == 5 && getArgType(mb, p, 3) == TYPE_str) {
     278           0 :                                 lookupServer(3);
     279             :                                 /* turn the instruction into a local one */
     280           0 :                                 k = defConstant(mb, TYPE_int, &cst);
     281           0 :                                 if (k >= 0) {
     282           0 :                                         getArg(p, 3) = defConstant(mb, TYPE_int, &cst);
     283           0 :                                         prepareRemote(TYPE_void);
     284           0 :                                         putRemoteVariables();
     285           0 :                                         remoteAction();
     286             :                                 }
     287             :                         } else {
     288           0 :                                 pushInstruction(mb, p);
     289             :                         }
     290           3 :                 } else if (getModuleId(p) == optimizerRef || cnt == 0 || p->barrier) /* local only or flow control statement */
     291           3 :                         pushInstruction(mb, p);
     292             :                 else {
     293             :                         /*
     294             :                          * The hard part is to decide what to do with instructions that
     295             :                          * contain a reference to a remote variable.
     296             :                          * In the first implementation we use the following policy.
     297             :                          * If there are multiple sites involved, all arguments are
     298             :                          * moved local for processing. Moreover, all local arguments
     299             :                          * to be shipped should be simple.
     300             :                          */
     301             :                         remoteSite = 0;
     302             :                         collectFirst = false;
     303           0 :                         for (j = 0; j < p->argc; j++)
     304           0 :                                 if (location[getArg(p, j)]) {
     305           0 :                                         if (remoteSite == 0)
     306             :                                                 remoteSite = location[getArg(p, j)];
     307           0 :                                         else if (remoteSite != location[getArg(p, j)])
     308           0 :                                                 collectFirst = true;
     309             :                                 }
     310           0 :                         if (getModuleId(p) == ioRef
     311           0 :                                 || (getModuleId(p) == sqlRef
     312           0 :                                         && (getFunctionId(p) == resultSetRef
     313           0 :                                                 || getFunctionId(p) == rsColumnRef)))
     314           0 :                                 collectFirst = true;
     315             : 
     316             :                         /* local BATs are not shipped */
     317           0 :                         if (remoteSite && !collectFirst)
     318           0 :                                 for (j = p->retc; j < p->argc; j++)
     319           0 :                                         if (location[getArg(p, j)] == 0
     320           0 :                                                 && isaBatType(getVarType(mb, getArg(p, j))))
     321           0 :                                                 collectFirst = true;
     322             : 
     323           0 :                         if (collectFirst) {
     324             :                                 /* perform locally */
     325           0 :                                 for (j = p->retc; j < p->argc; j++)
     326           0 :                                         if (location[getArg(p, j)]) {
     327           0 :                                                 q = newInstruction(0, mapiRef, rpcRef);
     328           0 :                                                 if (q == NULL) {
     329           0 :                                                         msg = createException(MAL, "optimizer.remote",
     330             :                                                                                                   SQLSTATE(HY013)
     331             :                                                                                                   MAL_MALLOC_FAIL);
     332           0 :                                                         break;
     333             :                                                 }
     334           0 :                                                 getArg(q, 0) = getArg(p, j);
     335           0 :                                                 q = pushArgument(mb, q, location[getArg(p, j)]);
     336           0 :                                                 snprintf(buf, BUFSIZ, "io.print(%s);",
     337             :                                                                  getVarNameIntoBuffer(mb, getArg(p, j), name));
     338           0 :                                                 q = pushStr(mb, q, buf);
     339           0 :                                                 pushInstruction(mb, q);
     340             :                                         }
     341           0 :                                 if (msg)
     342             :                                         break;
     343           0 :                                 pushInstruction(mb, p);
     344             :                                 /* as of now all the targets are also local */
     345           0 :                                 for (j = 0; j < p->retc; j++)
     346           0 :                                         location[getArg(p, j)] = 0;
     347           0 :                                 actions++;
     348           0 :                         } else if (remoteSite) {
     349             :                                 /* single remote site involved */
     350           0 :                                 r = newInstruction(mb, mapiRef, rpcRef);
     351           0 :                                 if (r == NULL) {
     352           0 :                                         msg = createException(MAL, "optimizer.remote",
     353             :                                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     354           0 :                                         break;
     355             :                                 }
     356           0 :                                 getArg(r, 0) = newTmpVariable(mb, TYPE_void);
     357           0 :                                 r = pushArgument(mb, r, remoteSite);
     358             : 
     359           0 :                                 for (j = p->retc; j < p->argc; j++)
     360           0 :                                         if (location[getArg(p, j)] == 0
     361           0 :                                                 && !isVarConstant(mb, getArg(p, j))) {
     362           0 :                                                 q = newInstruction(0, mapiRef, putRef);
     363           0 :                                                 if (q == NULL) {
     364           0 :                                                         freeInstruction(r);
     365           0 :                                                         msg = createException(MAL, "optimizer.remote",
     366             :                                                                                                   SQLSTATE(HY013)
     367             :                                                                                                   MAL_MALLOC_FAIL);
     368           0 :                                                         break;
     369             :                                                 }
     370           0 :                                                 getArg(q, 0) = newTmpVariable(mb, TYPE_void);
     371           0 :                                                 q = pushArgument(mb, q, remoteSite);
     372           0 :                                                 q = pushStr(mb, q, getVarNameIntoBuffer(mb, getArg(p, j), name));
     373           0 :                                                 q = pushArgument(mb, q, getArg(p, j));
     374           0 :                                                 pushInstruction(mb, q);
     375             :                                         }
     376           0 :                                 s = RQcall2str(mb, p);
     377           0 :                                 pushInstruction(mb, r);
     378           0 :                                 (void) pushStr(mb, r, s + 1);
     379           0 :                                 GDKfree(s);
     380           0 :                                 for (j = 0; j < p->retc; j++)
     381           0 :                                         location[getArg(p, j)] = remoteSite;
     382           0 :                                 freeInstruction(p);
     383           0 :                                 actions++;
     384             :                         } else
     385           0 :                                 pushInstruction(mb, p);
     386             :                 }
     387             :         }
     388           2 :   bailout:
     389         254 :         for (; i < slimit; i++)
     390         253 :                 if (old[i])
     391           0 :                         pushInstruction(mb, old[i]);
     392           1 :         GDKfree(old);
     393           1 :         GDKfree(location);
     394             : 
     395             :         /* Defense line against incorrect plans */
     396           1 :         if (msg == MAL_SUCCEED && actions) {
     397           0 :                 msg = chkTypes(cntxt->usermodule, mb, FALSE);
     398           0 :                 if (!msg)
     399           0 :                         msg = chkFlow(mb);
     400           0 :                 if (!msg)
     401           0 :                         msg = chkDeclarations(mb);
     402             :         }
     403             :         /* keep actions taken as a fake argument */
     404           1 :         (void) pushInt(mb, pci, actions);
     405           1 :         return msg;
     406             : }

Generated by: LCOV version 1.14