Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "opt_remoteQueries.h"
15 : #include "mal_interpreter.h" /* for showErrors() */
16 : #include "mal_builder.h"
17 :
18 : /*
19 : * The instruction sent is produced with a variation of call2str
20 : * from the debugger.
21 : */
22 : static str
23 0 : RQcall2str(MalBlkPtr mb, InstrPtr p)
24 : {
25 0 : int k;
26 0 : size_t len = 1;
27 0 : str msg;
28 0 : str s, cv = NULL;
29 :
30 0 : msg = (str) GDKmalloc(BUFSIZ);
31 0 : if (msg == NULL)
32 : return NULL;
33 0 : msg[0] = '#';
34 0 : msg[1] = 0;
35 0 : if (p->barrier)
36 0 : strcat(msg, operatorName(p->barrier));
37 :
38 0 : if (p->retc > 1)
39 0 : strcat(msg, "(");
40 0 : len = strlen(msg);
41 0 : for (k = 0; k < p->retc; k++) {
42 0 : getVarNameIntoBuffer(mb, getArg(p, k), msg + len);
43 0 : if (k < p->retc - 1)
44 0 : strcat(msg, ",");
45 0 : len = strlen(msg);
46 : }
47 0 : if (p->retc > 1)
48 0 : strcat(msg, ")");
49 0 : sprintf(msg + len, ":= %s.%s(", getModuleId(p), getFunctionId(p));
50 0 : s = strchr(msg, '(');
51 0 : if (s) {
52 0 : s++;
53 0 : *s = 0;
54 0 : len = strlen(msg);
55 0 : for (k = p->retc; k < p->argc; k++) {
56 0 : VarPtr v = getVar(mb, getArg(p, k));
57 0 : if (isVarConstant(mb, getArg(p, k))) {
58 0 : if (v->type == TYPE_void) {
59 0 : sprintf(msg + len, "nil");
60 : } else {
61 0 : if ((cv = VALformat(&v->value)) == NULL) {
62 0 : GDKfree(msg);
63 0 : return NULL;
64 : }
65 0 : sprintf(msg + len, "%s:%s", cv, ATOMname(v->type));
66 0 : GDKfree(cv);
67 : }
68 :
69 : } else
70 0 : getVarNameIntoBuffer(mb, getArg(p, k), msg + len);
71 0 : if (k < p->argc - 1)
72 0 : strcat(msg, ",");
73 0 : len = strlen(msg);
74 : }
75 0 : strcat(msg, ");");
76 : }
77 : /* printf("#RQcall:%s\n",msg);*/
78 : return msg;
79 : }
80 :
81 : /*
82 : * The algorithm follows the common scheme used so far.
83 : * Instructions are taken out one-by-one and copied
84 : * to the new block.
85 : *
86 : * A local cache of connections is established, because
87 : * the statements related to a single remote database
88 : * should be executed in the same stack context.
89 : * A pitfall is to create multiple connections with
90 : * their isolated runtime environment.
91 : */
92 : #define lookupServer(X) \
93 : do { \
94 : /* lookup the server connection */ \
95 : if (location[getArg(p, 0)] == 0) { \
96 : db = 0; \
97 : if (isVarConstant(mb, getArg(p, X))) \
98 : db = getVarConstant(mb, getArg(p, X)).val.sval; \
99 : for (k = 0; k < dbtop; k++) \
100 : if (strcmp(db, dbalias[k].dbname) == 0) \
101 : break; \
102 : \
103 : if (k == dbtop) { \
104 : r = newInstruction(mb, mapiRef, lookupRef); \
105 : if (r == NULL) { \
106 : msg = createException(MAL, "optimizer.remote", \
107 : SQLSTATE(HY013) MAL_MALLOC_FAIL); \
108 : goto bailout; \
109 : } \
110 : j = getArg(r, 0) = newTmpVariable(mb, TYPE_int); \
111 : r = pushArgument(mb, r, getArg(p, X)); \
112 : pushInstruction(mb, r); \
113 : dbalias[dbtop].dbhdl = j; \
114 : dbalias[dbtop++].dbname = db; \
115 : if (dbtop == 127) \
116 : dbtop--; \
117 : } else \
118 : j = dbalias[k].dbhdl; \
119 : location[getArg(p, 0)] = j; \
120 : } else \
121 : j = location[getArg(p, 0)]; \
122 : } while (0)
123 :
124 : #define prepareRemote(X) \
125 : do { \
126 : r = newInstruction(mb, mapiRef, rpcRef); \
127 : if (r == NULL) { \
128 : msg = createException(MAL, "optimizer.remote", \
129 : SQLSTATE(HY013) MAL_MALLOC_FAIL); \
130 : goto bailout; \
131 : } \
132 : getArg(r, 0) = newTmpVariable(mb, X); \
133 : r = pushArgument(mb, r, j); \
134 : } while (0)
135 :
136 : #define putRemoteVariables() \
137 : do { \
138 : for (j = p->retc; j < p->argc; j++) { \
139 : if (location[getArg(p, j)] == 0 && !isVarConstant(mb, getArg(p, j))) { \
140 : q = newInstruction(0, mapiRef, putRef); \
141 : if (q == NULL) { \
142 : freeInstruction(r); \
143 : msg = createException(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL); \
144 : break; \
145 : } \
146 : getArg(q, 0) = newTmpVariable(mb, TYPE_void); \
147 : q = pushArgument(mb, q, location[getArg(p, j)]); \
148 : q = pushStr(mb, q, getVarNameIntoBuffer(mb, getArg(p, j), name)); \
149 : q = pushArgument(mb, q, getArg(p, j)); \
150 : pushInstruction(mb, q); \
151 : } \
152 : } \
153 : } while (0)
154 :
155 : #define remoteAction() \
156 : do { \
157 : s = RQcall2str(mb, p); \
158 : r = pushStr(mb, r, s + 1); \
159 : GDKfree(s); \
160 : pushInstruction(mb, r); \
161 : freeInstruction(p); \
162 : actions++; \
163 : } while (0)
164 :
165 : typedef struct {
166 : str dbname;
167 : int dbhdl;
168 : } DBalias;
169 :
170 : str
171 1 : OPTremoteQueriesImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
172 : InstrPtr pci)
173 : {
174 1 : InstrPtr p, q, r, *old;
175 1 : int i, j, k, cnt, limit, slimit, actions = 0;
176 1 : int remoteSite;
177 1 : bool collectFirst;
178 1 : int *location;
179 1 : DBalias dbalias[128];
180 1 : int dbtop;
181 1 : char buf[BUFSIZ], *s, *db, name[IDLENGTH];
182 1 : ValRecord cst;
183 1 : str msg = MAL_SUCCEED;
184 :
185 1 : cst.vtype = TYPE_int;
186 1 : cst.val.ival = 0;
187 1 : cst.len = 0;
188 :
189 1 : (void) cntxt;
190 1 : (void) stk;
191 :
192 1 : limit = mb->stop;
193 1 : slimit = mb->ssize;
194 1 : old = mb->stmt;
195 :
196 1 : location = (int *) GDKzalloc(mb->vsize * sizeof(int));
197 1 : if (location == NULL)
198 0 : throw(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL);
199 1 : memset(dbalias, 0, sizeof(dbalias));
200 1 : dbtop = 0;
201 :
202 1 : if (newMalBlkStmt(mb, mb->ssize) < 0) {
203 0 : GDKfree(location);
204 0 : throw(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL);
205 : }
206 :
207 4 : for (i = 0; i < limit; i++) {
208 3 : p = old[i];
209 :
210 : /* detect remote instructions */
211 3 : cnt = 0;
212 5 : for (j = 0; j < p->argc; j++)
213 2 : if (location[getArg(p, j)])
214 0 : cnt++;
215 :
216 : /* detect remote variable binding */
217 :
218 3 : if ((getModuleId(p) == mapiRef && getFunctionId(p) == bindRef)) {
219 0 : if (p->argc == 3 && getArgType(mb, p, 1) == TYPE_int) {
220 0 : int tpe;
221 0 : j = getArg(p, 1); /* lookupServer with key */
222 0 : tpe = getArgType(mb, p, 0);
223 : /* result is remote */
224 0 : location[getArg(p, 0)] = j;
225 :
226 : /* turn the instruction into a local one */
227 : /* one argument less */
228 0 : p->argc--;
229 : /* only use the second argument (string) */
230 0 : getArg(p, 1) = getArg(p, 2);
231 :
232 0 : getModuleId(p) = bbpRef;
233 :
234 0 : prepareRemote(tpe);
235 0 : putRemoteVariables();
236 0 : remoteAction();
237 : } else
238 0 : pushInstruction(mb, p);
239 3 : } else if ((getModuleId(p) == sqlRef && getFunctionId(p) == evalRef)) {
240 0 : if (p->argc == 3) {
241 : /* a remote sql eval is needed */
242 0 : lookupServer(1);
243 : /* turn the instruction into a local one */
244 : /* one argument less */
245 0 : p->argc--;
246 : /* only use the second argument (string) */
247 0 : getArg(p, 1) = getArg(p, 2);
248 :
249 0 : prepareRemote(TYPE_void);
250 0 : s = RQcall2str(mb, p);
251 0 : r = pushStr(mb, r, s + 1);
252 0 : GDKfree(s);
253 0 : pushInstruction(mb, r);
254 0 : freeInstruction(p);
255 0 : actions++;
256 : }
257 3 : } else if ((getModuleId(p) == sqlRef && getFunctionId(p) == bindRef)) {
258 :
259 0 : if (p->argc == 6 && getArgType(mb, p, 4) == TYPE_str) {
260 0 : int tpe;
261 0 : j = getArg(p, 1); /* lookupServer with key */
262 0 : tpe = getArgType(mb, p, 0);
263 :
264 0 : lookupServer(4);
265 : /* turn the instruction into a local one */
266 0 : k = defConstant(mb, TYPE_int, &cst);
267 0 : if (k >= 0) {
268 0 : getArg(p, 4) = k;
269 0 : prepareRemote(tpe);
270 0 : putRemoteVariables();
271 0 : remoteAction();
272 : }
273 : } else
274 0 : pushInstruction(mb, p);
275 3 : } else if (getModuleId(p) == sqlRef && getFunctionId(p) == binddbatRef) {
276 :
277 0 : if (p->argc == 5 && getArgType(mb, p, 3) == TYPE_str) {
278 0 : lookupServer(3);
279 : /* turn the instruction into a local one */
280 0 : k = defConstant(mb, TYPE_int, &cst);
281 0 : if (k >= 0) {
282 0 : getArg(p, 3) = defConstant(mb, TYPE_int, &cst);
283 0 : prepareRemote(TYPE_void);
284 0 : putRemoteVariables();
285 0 : remoteAction();
286 : }
287 : } else {
288 0 : pushInstruction(mb, p);
289 : }
290 3 : } else if (getModuleId(p) == optimizerRef || cnt == 0 || p->barrier) /* local only or flow control statement */
291 3 : pushInstruction(mb, p);
292 : else {
293 : /*
294 : * The hard part is to decide what to do with instructions that
295 : * contain a reference to a remote variable.
296 : * In the first implementation we use the following policy.
297 : * If there are multiple sites involved, all arguments are
298 : * moved local for processing. Moreover, all local arguments
299 : * to be shipped should be simple.
300 : */
301 : remoteSite = 0;
302 : collectFirst = false;
303 0 : for (j = 0; j < p->argc; j++)
304 0 : if (location[getArg(p, j)]) {
305 0 : if (remoteSite == 0)
306 : remoteSite = location[getArg(p, j)];
307 0 : else if (remoteSite != location[getArg(p, j)])
308 0 : collectFirst = true;
309 : }
310 0 : if (getModuleId(p) == ioRef
311 0 : || (getModuleId(p) == sqlRef
312 0 : && (getFunctionId(p) == resultSetRef
313 0 : || getFunctionId(p) == rsColumnRef)))
314 0 : collectFirst = true;
315 :
316 : /* local BATs are not shipped */
317 0 : if (remoteSite && !collectFirst)
318 0 : for (j = p->retc; j < p->argc; j++)
319 0 : if (location[getArg(p, j)] == 0
320 0 : && isaBatType(getVarType(mb, getArg(p, j))))
321 0 : collectFirst = true;
322 :
323 0 : if (collectFirst) {
324 : /* perform locally */
325 0 : for (j = p->retc; j < p->argc; j++)
326 0 : if (location[getArg(p, j)]) {
327 0 : q = newInstruction(0, mapiRef, rpcRef);
328 0 : if (q == NULL) {
329 0 : msg = createException(MAL, "optimizer.remote",
330 : SQLSTATE(HY013)
331 : MAL_MALLOC_FAIL);
332 0 : break;
333 : }
334 0 : getArg(q, 0) = getArg(p, j);
335 0 : q = pushArgument(mb, q, location[getArg(p, j)]);
336 0 : snprintf(buf, BUFSIZ, "io.print(%s);",
337 : getVarNameIntoBuffer(mb, getArg(p, j), name));
338 0 : q = pushStr(mb, q, buf);
339 0 : pushInstruction(mb, q);
340 : }
341 0 : if (msg)
342 : break;
343 0 : pushInstruction(mb, p);
344 : /* as of now all the targets are also local */
345 0 : for (j = 0; j < p->retc; j++)
346 0 : location[getArg(p, j)] = 0;
347 0 : actions++;
348 0 : } else if (remoteSite) {
349 : /* single remote site involved */
350 0 : r = newInstruction(mb, mapiRef, rpcRef);
351 0 : if (r == NULL) {
352 0 : msg = createException(MAL, "optimizer.remote",
353 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
354 0 : break;
355 : }
356 0 : getArg(r, 0) = newTmpVariable(mb, TYPE_void);
357 0 : r = pushArgument(mb, r, remoteSite);
358 :
359 0 : for (j = p->retc; j < p->argc; j++)
360 0 : if (location[getArg(p, j)] == 0
361 0 : && !isVarConstant(mb, getArg(p, j))) {
362 0 : q = newInstruction(0, mapiRef, putRef);
363 0 : if (q == NULL) {
364 0 : freeInstruction(r);
365 0 : msg = createException(MAL, "optimizer.remote",
366 : SQLSTATE(HY013)
367 : MAL_MALLOC_FAIL);
368 0 : break;
369 : }
370 0 : getArg(q, 0) = newTmpVariable(mb, TYPE_void);
371 0 : q = pushArgument(mb, q, remoteSite);
372 0 : q = pushStr(mb, q, getVarNameIntoBuffer(mb, getArg(p, j), name));
373 0 : q = pushArgument(mb, q, getArg(p, j));
374 0 : pushInstruction(mb, q);
375 : }
376 0 : s = RQcall2str(mb, p);
377 0 : pushInstruction(mb, r);
378 0 : (void) pushStr(mb, r, s + 1);
379 0 : GDKfree(s);
380 0 : for (j = 0; j < p->retc; j++)
381 0 : location[getArg(p, j)] = remoteSite;
382 0 : freeInstruction(p);
383 0 : actions++;
384 : } else
385 0 : pushInstruction(mb, p);
386 : }
387 : }
388 2 : bailout:
389 254 : for (; i < slimit; i++)
390 253 : if (old[i])
391 0 : pushInstruction(mb, old[i]);
392 1 : GDKfree(old);
393 1 : GDKfree(location);
394 :
395 : /* Defense line against incorrect plans */
396 1 : if (msg == MAL_SUCCEED && actions) {
397 0 : msg = chkTypes(cntxt->usermodule, mb, FALSE);
398 0 : if (!msg)
399 0 : msg = chkFlow(mb);
400 0 : if (!msg)
401 0 : msg = chkDeclarations(mb);
402 : }
403 : /* keep actions taken as a fake argument */
404 1 : (void) pushInt(mb, pci, actions);
405 1 : return msg;
406 : }
|