LCOV - code coverage report
Current view: top level - monetdb5/optimizer - opt_remoteQueries.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 45 205 22.0 %
Date: 2024-11-15 19:37:45 Functions: 1 2 50.0 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "opt_remoteQueries.h"
      15             : #include "mal_interpreter.h"  /* for showErrors() */
      16             : #include "mal_builder.h"
      17             : 
      18             : /*
      19             :  * The instruction sent is produced with a variation of call2str
      20             :  * from the debugger.
      21             :  */
      22             : static str
      23           0 : RQcall2str(MalBlkPtr mb, InstrPtr p)
      24             : {
      25           0 :         int k;
      26           0 :         size_t len = 1;
      27           0 :         str msg;
      28           0 :         str s, cv = NULL;
      29             : 
      30           0 :         msg = (str) GDKmalloc(BUFSIZ);
      31           0 :         if (msg == NULL)
      32             :                 return NULL;
      33           0 :         msg[0] = '#';
      34           0 :         msg[1] = 0;
      35           0 :         if (p->barrier)
      36           0 :                 strcat(msg, operatorName(p->barrier));
      37             : 
      38           0 :         if (p->retc > 1)
      39           0 :                 strcat(msg, "(");
      40           0 :         len = strlen(msg);
      41           0 :         for (k = 0; k < p->retc; k++) {
      42           0 :                 getVarNameIntoBuffer(mb, getArg(p, k), msg + len);
      43           0 :                 if (k < p->retc - 1)
      44           0 :                         strcat(msg, ",");
      45           0 :                 len = strlen(msg);
      46             :         }
      47           0 :         if (p->retc > 1)
      48           0 :                 strcat(msg, ")");
      49           0 :         sprintf(msg + len, ":= %s.%s(", getModuleId(p), getFunctionId(p));
      50           0 :         s = strchr(msg, '(');
      51           0 :         if (s) {
      52           0 :                 s++;
      53           0 :                 *s = 0;
      54           0 :                 len = strlen(msg);
      55           0 :                 for (k = p->retc; k < p->argc; k++) {
      56           0 :                         VarPtr v = getVar(mb, getArg(p, k));
      57           0 :                         if (isVarConstant(mb, getArg(p, k))) {
      58           0 :                                 if (v->type == TYPE_void) {
      59           0 :                                         sprintf(msg + len, "nil");
      60             :                                 } else {
      61           0 :                                         if ((cv = VALformat(&v->value)) == NULL) {
      62           0 :                                                 GDKfree(msg);
      63           0 :                                                 return NULL;
      64             :                                         }
      65           0 :                                         sprintf(msg + len, "%s:%s", cv, ATOMname(v->type));
      66           0 :                                         GDKfree(cv);
      67             :                                 }
      68             : 
      69             :                         } else
      70           0 :                                 getVarNameIntoBuffer(mb, getArg(p, k), msg + len);
      71           0 :                         if (k < p->argc - 1)
      72           0 :                                 strcat(msg, ",");
      73           0 :                         len = strlen(msg);
      74             :                 }
      75           0 :                 strcat(msg, ");");
      76             :         }
      77             : /* printf("#RQcall:%s\n",msg);*/
      78             :         return msg;
      79             : }
      80             : 
      81             : /*
      82             :  * The algorithm follows the common scheme used so far.
      83             :  * Instructions are taken out one-by-one and copied
      84             :  * to the new block.
      85             :  *
      86             :  * A local cache of connections is established, because
      87             :  * the statements related to a single remote database
      88             :  * should be executed in the same stack context.
      89             :  * A pitfall is to create multiple connections with
      90             :  * their isolated runtime environment.
      91             :  */
      92             : #define lookupServer(X)                                                                                                 \
      93             :         do {                                                                                                                            \
      94             :                 /* lookup the server connection */                                                              \
      95             :                 if (location[getArg(p, 0)] == 0) {                                                              \
      96             :                         db = 0;                                                                                                         \
      97             :                         if (isVarConstant(mb, getArg(p, X)))                                            \
      98             :                                 db = getVarConstant(mb, getArg(p, X)).val.sval;                 \
      99             :                         for (k = 0; k < dbtop; k++)                                                                  \
     100             :                                 if (strcmp(db, dbalias[k].dbname) == 0)                                 \
     101             :                                         break;                                                                                          \
     102             :                                                                                                                                                 \
     103             :                         if (k == dbtop) {                                                                                       \
     104             :                                 r = newInstruction(mb, mapiRef, lookupRef);                             \
     105             :                                 if (r == NULL) {                                                                                \
     106             :                                         msg = createException(MAL, "optimizer.remote",                \
     107             :                                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL); \
     108             :                                         goto bailout;                                                                           \
     109             :                                 }                                                                                                               \
     110             :                                 j = getArg(r, 0) = newTmpVariable(mb, TYPE_int);                \
     111             :                                 r = pushArgument(mb, r, getArg(p, X));                                  \
     112             :                                 pushInstruction(mb, r);                                                                 \
     113             :                                 dbalias[dbtop].dbhdl = j;                                                               \
     114             :                                 dbalias[dbtop++].dbname = db;                                                   \
     115             :                                 if (dbtop == 127)                                                                               \
     116             :                                         dbtop--;                                                                                        \
     117             :                         } else                                                                                                          \
     118             :                                 j = dbalias[k].dbhdl;                                                                   \
     119             :                         location[getArg(p, 0)] = j;                                                                     \
     120             :                 } else                                                                                                                  \
     121             :                         j = location[getArg(p, 0)];                                                                     \
     122             :         } while (0)
     123             : 
     124             : #define prepareRemote(X)                                                                                        \
     125             :         do {                                                                                                                    \
     126             :                 r = newInstruction(mb, mapiRef, rpcRef);                                        \
     127             :                 if (r == NULL) {                                                                                        \
     128             :                         msg = createException(MAL, "optimizer.remote",                        \
     129             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);     \
     130             :                         goto bailout;                                                                                   \
     131             :                 }                                                                                                                       \
     132             :                 getArg(r, 0) = newTmpVariable(mb, X);                                           \
     133             :                 r = pushArgument(mb, r, j);                                                                     \
     134             :         } while (0)
     135             : 
     136             : #define putRemoteVariables()                                                                                    \
     137             :         do {                                                                                                                            \
     138             :                 for (j = p->retc; j < p->argc; j++) {                                                  \
     139             :                         if (location[getArg(p, j)] == 0 && !isVarConstant(mb, getArg(p, j))) { \
     140             :                                 q = newInstruction(0, mapiRef, putRef);                                 \
     141             :                                 if (q == NULL) {                                                                                \
     142             :                                         freeInstruction(r);                                                                     \
     143             :                                         msg = createException(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL); \
     144             :                                         break;                                                                                          \
     145             :                                 }                                                                                                               \
     146             :                                 getArg(q, 0) = newTmpVariable(mb, TYPE_void);                   \
     147             :                                 q = pushArgument(mb, q, location[getArg(p, j)]);                \
     148             :                                 q = pushStr(mb, q, getVarNameIntoBuffer(mb, getArg(p, j), name));               \
     149             :                                 q = pushArgument(mb, q, getArg(p, j));                                  \
     150             :                                 pushInstruction(mb, q);                                                                 \
     151             :                         }                                                                                                                       \
     152             :                 }                                                                                                                               \
     153             :         } while (0)
     154             : 
     155             : #define remoteAction()                                                  \
     156             :         do {                                                                            \
     157             :                 s = RQcall2str(mb, p);                                  \
     158             :                 r = pushStr(mb, r, s + 1);                              \
     159             :                 GDKfree(s);                                                             \
     160             :                 pushInstruction(mb, r);                                 \
     161             :                 freeInstruction(p);                                             \
     162             :                 actions++;                                                              \
     163             :         } while (0)
     164             : 
     165             : typedef struct {
     166             :         str dbname;
     167             :         int dbhdl;
     168             : } DBalias;
     169             : 
     170             : str
     171           1 : OPTremoteQueriesImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
     172             :                                                            InstrPtr pci)
     173             : {
     174           1 :         InstrPtr p, q, r, *old;
     175           1 :         int i, j, k, cnt, limit, slimit, actions = 0;
     176           1 :         int remoteSite;
     177           1 :         bool collectFirst;
     178           1 :         int *location;
     179           1 :         DBalias *dbalias;
     180           1 :         int dbtop;
     181           1 :         char buf[BUFSIZ], *s, *db, name[IDLENGTH];
     182           1 :         ValRecord cst;
     183           1 :         str msg = MAL_SUCCEED;
     184             : 
     185           1 :         cst.vtype = TYPE_int;
     186           1 :         cst.val.ival = 0;
     187           1 :         cst.len = 0;
     188             : 
     189           1 :         (void) cntxt;
     190           1 :         (void) stk;
     191             : 
     192           1 :         limit = mb->stop;
     193           1 :         slimit = mb->ssize;
     194           1 :         old = mb->stmt;
     195             : 
     196           1 :         location = (int *) GDKzalloc(mb->vsize * sizeof(int));
     197           1 :         if (location == NULL)
     198           0 :                 throw(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     199           1 :         dbalias = (DBalias *) GDKzalloc(128 * sizeof(DBalias));
     200           1 :         if (dbalias == NULL) {
     201           0 :                 GDKfree(location);
     202           0 :                 throw(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     203             :         }
     204           1 :         dbtop = 0;
     205             : 
     206           1 :         if (newMalBlkStmt(mb, mb->ssize) < 0) {
     207           0 :                 GDKfree(dbalias);
     208           0 :                 GDKfree(location);
     209           0 :                 throw(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     210             :         }
     211             : 
     212           4 :         for (i = 0; i < limit; i++) {
     213           3 :                 p = old[i];
     214             : 
     215             :                 /* detect remote instructions */
     216           3 :                 cnt = 0;
     217           5 :                 for (j = 0; j < p->argc; j++)
     218           2 :                         if (location[getArg(p, j)])
     219           0 :                                 cnt++;
     220             : 
     221             :                 /* detect remote variable binding */
     222             : 
     223           3 :                 if ((getModuleId(p) == mapiRef && getFunctionId(p) == bindRef)) {
     224           0 :                         if (p->argc == 3 && getArgType(mb, p, 1) == TYPE_int) {
     225           0 :                                 int tpe;
     226           0 :                                 j = getArg(p, 1);       /* lookupServer with key */
     227           0 :                                 tpe = getArgType(mb, p, 0);
     228             :                                 /* result is remote */
     229           0 :                                 location[getArg(p, 0)] = j;
     230             : 
     231             :                                 /* turn the instruction into a local one */
     232             :                                 /* one argument less */
     233           0 :                                 p->argc--;
     234             :                                 /* only use the second argument (string) */
     235           0 :                                 getArg(p, 1) = getArg(p, 2);
     236             : 
     237           0 :                                 getModuleId(p) = bbpRef;
     238             : 
     239           0 :                                 prepareRemote(tpe);
     240           0 :                                 putRemoteVariables();
     241           0 :                                 remoteAction();
     242             :                         } else
     243           0 :                                 pushInstruction(mb, p);
     244           3 :                 } else if ((getModuleId(p) == sqlRef && getFunctionId(p) == evalRef)) {
     245           0 :                         if (p->argc == 3) {
     246             :                                 /* a remote sql eval is needed */
     247           0 :                                 lookupServer(1);
     248             :                                 /* turn the instruction into a local one */
     249             :                                 /* one argument less */
     250           0 :                                 p->argc--;
     251             :                                 /* only use the second argument (string) */
     252           0 :                                 getArg(p, 1) = getArg(p, 2);
     253             : 
     254           0 :                                 prepareRemote(TYPE_void);
     255           0 :                                 s = RQcall2str(mb, p);
     256           0 :                                 r = pushStr(mb, r, s + 1);
     257           0 :                                 GDKfree(s);
     258           0 :                                 pushInstruction(mb, r);
     259           0 :                                 freeInstruction(p);
     260           0 :                                 actions++;
     261             :                         }
     262           3 :                 } else if ((getModuleId(p) == sqlRef && getFunctionId(p) == bindRef)) {
     263             : 
     264           0 :                         if (p->argc == 6 && getArgType(mb, p, 4) == TYPE_str) {
     265           0 :                                 int tpe;
     266           0 :                                 j = getArg(p, 1);       /* lookupServer with key */
     267           0 :                                 tpe = getArgType(mb, p, 0);
     268             : 
     269           0 :                                 lookupServer(4);
     270             :                                 /* turn the instruction into a local one */
     271           0 :                                 k = defConstant(mb, TYPE_int, &cst);
     272           0 :                                 if (k >= 0) {
     273           0 :                                         getArg(p, 4) = k;
     274           0 :                                         prepareRemote(tpe);
     275           0 :                                         putRemoteVariables();
     276           0 :                                         remoteAction();
     277             :                                 }
     278             :                         } else
     279           0 :                                 pushInstruction(mb, p);
     280           3 :                 } else if (getModuleId(p) == sqlRef && getFunctionId(p) == binddbatRef) {
     281             : 
     282           0 :                         if (p->argc == 5 && getArgType(mb, p, 3) == TYPE_str) {
     283           0 :                                 lookupServer(3);
     284             :                                 /* turn the instruction into a local one */
     285           0 :                                 k = defConstant(mb, TYPE_int, &cst);
     286           0 :                                 if (k >= 0) {
     287           0 :                                         getArg(p, 3) = defConstant(mb, TYPE_int, &cst);
     288           0 :                                         prepareRemote(TYPE_void);
     289           0 :                                         putRemoteVariables();
     290           0 :                                         remoteAction();
     291             :                                 }
     292             :                         } else {
     293           0 :                                 pushInstruction(mb, p);
     294             :                         }
     295           3 :                 } else if (getModuleId(p) == optimizerRef || cnt == 0 || p->barrier) /* local only or flow control statement */
     296           3 :                         pushInstruction(mb, p);
     297             :                 else {
     298             :                         /*
     299             :                          * The hard part is to decide what to do with instructions that
     300             :                          * contain a reference to a remote variable.
     301             :                          * In the first implementation we use the following policy.
     302             :                          * If there are multiple sites involved, all arguments are
     303             :                          * moved local for processing. Moreover, all local arguments
     304             :                          * to be shipped should be simple.
     305             :                          */
     306             :                         remoteSite = 0;
     307             :                         collectFirst = false;
     308           0 :                         for (j = 0; j < p->argc; j++)
     309           0 :                                 if (location[getArg(p, j)]) {
     310           0 :                                         if (remoteSite == 0)
     311             :                                                 remoteSite = location[getArg(p, j)];
     312           0 :                                         else if (remoteSite != location[getArg(p, j)])
     313           0 :                                                 collectFirst = true;
     314             :                                 }
     315           0 :                         if (getModuleId(p) == ioRef
     316           0 :                                 || (getModuleId(p) == sqlRef
     317           0 :                                         && (getFunctionId(p) == resultSetRef
     318           0 :                                                 || getFunctionId(p) == rsColumnRef)))
     319           0 :                                 collectFirst = true;
     320             : 
     321             :                         /* local BATs are not shipped */
     322           0 :                         if (remoteSite && !collectFirst)
     323           0 :                                 for (j = p->retc; j < p->argc; j++)
     324           0 :                                         if (location[getArg(p, j)] == 0
     325           0 :                                                 && isaBatType(getVarType(mb, getArg(p, j))))
     326           0 :                                                 collectFirst = true;
     327             : 
     328           0 :                         if (collectFirst) {
     329             :                                 /* perform locally */
     330           0 :                                 for (j = p->retc; j < p->argc; j++)
     331           0 :                                         if (location[getArg(p, j)]) {
     332           0 :                                                 q = newInstruction(0, mapiRef, rpcRef);
     333           0 :                                                 if (q == NULL) {
     334           0 :                                                         msg = createException(MAL, "optimizer.remote",
     335             :                                                                                                   SQLSTATE(HY013)
     336             :                                                                                                   MAL_MALLOC_FAIL);
     337           0 :                                                         break;
     338             :                                                 }
     339           0 :                                                 getArg(q, 0) = getArg(p, j);
     340           0 :                                                 q = pushArgument(mb, q, location[getArg(p, j)]);
     341           0 :                                                 snprintf(buf, BUFSIZ, "io.print(%s);",
     342             :                                                                  getVarNameIntoBuffer(mb, getArg(p, j), name));
     343           0 :                                                 q = pushStr(mb, q, buf);
     344           0 :                                                 pushInstruction(mb, q);
     345             :                                         }
     346           0 :                                 if (msg)
     347             :                                         break;
     348           0 :                                 pushInstruction(mb, p);
     349             :                                 /* as of now all the targets are also local */
     350           0 :                                 for (j = 0; j < p->retc; j++)
     351           0 :                                         location[getArg(p, j)] = 0;
     352           0 :                                 actions++;
     353           0 :                         } else if (remoteSite) {
     354             :                                 /* single remote site involved */
     355           0 :                                 r = newInstruction(mb, mapiRef, rpcRef);
     356           0 :                                 if (r == NULL) {
     357           0 :                                         msg = createException(MAL, "optimizer.remote",
     358             :                                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     359           0 :                                         break;
     360             :                                 }
     361           0 :                                 getArg(r, 0) = newTmpVariable(mb, TYPE_void);
     362           0 :                                 r = pushArgument(mb, r, remoteSite);
     363             : 
     364           0 :                                 for (j = p->retc; j < p->argc; j++)
     365           0 :                                         if (location[getArg(p, j)] == 0
     366           0 :                                                 && !isVarConstant(mb, getArg(p, j))) {
     367           0 :                                                 q = newInstruction(0, mapiRef, putRef);
     368           0 :                                                 if (q == NULL) {
     369           0 :                                                         freeInstruction(r);
     370           0 :                                                         msg = createException(MAL, "optimizer.remote",
     371             :                                                                                                   SQLSTATE(HY013)
     372             :                                                                                                   MAL_MALLOC_FAIL);
     373           0 :                                                         break;
     374             :                                                 }
     375           0 :                                                 getArg(q, 0) = newTmpVariable(mb, TYPE_void);
     376           0 :                                                 q = pushArgument(mb, q, remoteSite);
     377           0 :                                                 q = pushStr(mb, q, getVarNameIntoBuffer(mb, getArg(p, j), name));
     378           0 :                                                 q = pushArgument(mb, q, getArg(p, j));
     379           0 :                                                 pushInstruction(mb, q);
     380             :                                         }
     381           0 :                                 s = RQcall2str(mb, p);
     382           0 :                                 pushInstruction(mb, r);
     383           0 :                                 (void) pushStr(mb, r, s + 1);
     384           0 :                                 GDKfree(s);
     385           0 :                                 for (j = 0; j < p->retc; j++)
     386           0 :                                         location[getArg(p, j)] = remoteSite;
     387           0 :                                 freeInstruction(p);
     388           0 :                                 actions++;
     389             :                         } else
     390           0 :                                 pushInstruction(mb, p);
     391             :                 }
     392             :         }
     393           2 :   bailout:
     394         254 :         for (; i < slimit; i++)
     395         253 :                 if (old[i])
     396           0 :                         pushInstruction(mb, old[i]);
     397           1 :         GDKfree(old);
     398           1 :         GDKfree(location);
     399           1 :         GDKfree(dbalias);
     400             : 
     401             :         /* Defense line against incorrect plans */
     402           1 :         if (msg == MAL_SUCCEED && actions) {
     403           0 :                 msg = chkTypes(cntxt->usermodule, mb, FALSE);
     404           0 :                 if (!msg)
     405           0 :                         msg = chkFlow(mb);
     406           0 :                 if (!msg)
     407           0 :                         msg = chkDeclarations(mb);
     408             :         }
     409             :         /* keep actions taken as a fake argument */
     410           1 :         (void) pushInt(mb, pci, actions);
     411           1 :         return msg;
     412             : }

Generated by: LCOV version 1.14