Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "opt_remoteQueries.h"
15 : #include "mal_interpreter.h" /* for showErrors() */
16 : #include "mal_builder.h"
17 :
18 : /*
19 : * The instruction sent is produced with a variation of call2str
20 : * from the debugger.
21 : */
22 : static str
23 0 : RQcall2str(MalBlkPtr mb, InstrPtr p)
24 : {
25 0 : int k;
26 0 : size_t len = 1;
27 0 : str msg;
28 0 : str s, cv = NULL;
29 :
30 0 : msg = (str) GDKmalloc(BUFSIZ);
31 0 : if (msg == NULL)
32 : return NULL;
33 0 : msg[0] = '#';
34 0 : msg[1] = 0;
35 0 : if (p->barrier)
36 0 : strcat(msg, operatorName(p->barrier));
37 :
38 0 : if (p->retc > 1)
39 0 : strcat(msg, "(");
40 0 : len = strlen(msg);
41 0 : for (k = 0; k < p->retc; k++) {
42 0 : sprintf(msg + len, "%s", getVarName(mb, getArg(p, k)));
43 0 : if (k < p->retc - 1)
44 0 : strcat(msg, ",");
45 0 : len = strlen(msg);
46 : }
47 0 : if (p->retc > 1)
48 0 : strcat(msg, ")");
49 0 : sprintf(msg + len, ":= %s.%s(", getModuleId(p), getFunctionId(p));
50 0 : s = strchr(msg, '(');
51 0 : if (s) {
52 0 : s++;
53 0 : *s = 0;
54 0 : len = strlen(msg);
55 0 : for (k = p->retc; k < p->argc; k++) {
56 0 : VarPtr v = getVar(mb, getArg(p, k));
57 0 : if (isVarConstant(mb, getArg(p, k))) {
58 0 : if (v->type == TYPE_void) {
59 0 : sprintf(msg + len, "nil");
60 : } else {
61 0 : if ((cv = VALformat(&v->value)) == NULL) {
62 0 : GDKfree(msg);
63 0 : return NULL;
64 : }
65 0 : sprintf(msg + len, "%s:%s", cv, ATOMname(v->type));
66 0 : GDKfree(cv);
67 : }
68 :
69 : } else
70 0 : sprintf(msg + len, "%s", getVarName(mb, getArg(p, k)));
71 0 : if (k < p->argc - 1)
72 0 : strcat(msg, ",");
73 0 : len = strlen(msg);
74 : }
75 0 : strcat(msg, ");");
76 : }
77 : /* printf("#RQcall:%s\n",msg);*/
78 : return msg;
79 : }
80 :
81 : /*
82 : * The algorithm follows the common scheme used so far.
83 : * Instructions are taken out one-by-one and copied
84 : * to the new block.
85 : *
86 : * A local cache of connections is established, because
87 : * the statements related to a single remote database
88 : * should be executed in the same stack context.
89 : * A pitfall is to create multiple connections with
90 : * their isolated runtime environment.
91 : */
92 : #define lookupServer(X) \
93 : do { \
94 : /* lookup the server connection */ \
95 : if (location[getArg(p, 0)] == 0) { \
96 : db = 0; \
97 : if (isVarConstant(mb, getArg(p, X))) \
98 : db = getVarConstant(mb, getArg(p, X)).val.sval; \
99 : for (k = 0; k < dbtop; k++) \
100 : if (strcmp(db, dbalias[k].dbname) == 0) \
101 : break; \
102 : \
103 : if (k == dbtop) { \
104 : r = newInstruction(mb, mapiRef, lookupRef); \
105 : if (r == NULL) { \
106 : msg = createException(MAL, "optimizer.remote", \
107 : SQLSTATE(HY013) MAL_MALLOC_FAIL); \
108 : goto bailout; \
109 : } \
110 : j = getArg(r, 0) = newTmpVariable(mb, TYPE_int); \
111 : r = pushArgument(mb, r, getArg(p, X)); \
112 : pushInstruction(mb, r); \
113 : dbalias[dbtop].dbhdl = j; \
114 : dbalias[dbtop++].dbname = db; \
115 : if (dbtop == 127) \
116 : dbtop--; \
117 : } else \
118 : j = dbalias[k].dbhdl; \
119 : location[getArg(p, 0)] = j; \
120 : } else \
121 : j = location[getArg(p, 0)]; \
122 : } while (0)
123 :
124 : #define prepareRemote(X) \
125 : do { \
126 : r = newInstruction(mb, mapiRef, rpcRef); \
127 : if (r == NULL) { \
128 : msg = createException(MAL, "optimizer.remote", \
129 : SQLSTATE(HY013) MAL_MALLOC_FAIL); \
130 : goto bailout; \
131 : } \
132 : getArg(r, 0) = newTmpVariable(mb, X); \
133 : r = pushArgument(mb, r, j); \
134 : } while (0)
135 :
136 : #define putRemoteVariables() \
137 : do { \
138 : for (j = p->retc; j < p->argc; j++) { \
139 : if (location[getArg(p, j)] == 0 && !isVarConstant(mb, getArg(p, j))) { \
140 : q = newInstruction(0, mapiRef, putRef); \
141 : if (q == NULL) { \
142 : freeInstruction(r); \
143 : msg = createException(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL); \
144 : break; \
145 : } \
146 : getArg(q, 0) = newTmpVariable(mb, TYPE_void); \
147 : q = pushArgument(mb, q, location[getArg(p, j)]); \
148 : q = pushStr(mb, q, getVarName(mb, getArg(p, j))); \
149 : q = pushArgument(mb, q, getArg(p, j)); \
150 : pushInstruction(mb, q); \
151 : } \
152 : } \
153 : } while (0)
154 :
155 : #define remoteAction() \
156 : do { \
157 : s = RQcall2str(mb, p); \
158 : r = pushStr(mb, r, s + 1); \
159 : GDKfree(s); \
160 : pushInstruction(mb, r); \
161 : freeInstruction(p); \
162 : actions++; \
163 : } while (0)
164 :
165 : typedef struct {
166 : str dbname;
167 : int dbhdl;
168 : } DBalias;
169 :
170 : str
171 1 : OPTremoteQueriesImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
172 : InstrPtr pci)
173 : {
174 1 : InstrPtr p, q, r, *old;
175 1 : int i, j, k, cnt, limit, slimit, actions = 0;
176 1 : int remoteSite;
177 1 : bool collectFirst;
178 1 : int *location;
179 1 : DBalias *dbalias;
180 1 : int dbtop;
181 1 : char buf[BUFSIZ], *s, *db;
182 1 : ValRecord cst;
183 1 : str msg = MAL_SUCCEED;
184 :
185 1 : cst.vtype = TYPE_int;
186 1 : cst.val.ival = 0;
187 1 : cst.len = 0;
188 :
189 1 : (void) cntxt;
190 1 : (void) stk;
191 :
192 1 : limit = mb->stop;
193 1 : slimit = mb->ssize;
194 1 : old = mb->stmt;
195 :
196 1 : location = (int *) GDKzalloc(mb->vsize * sizeof(int));
197 1 : if (location == NULL)
198 0 : throw(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL);
199 1 : dbalias = (DBalias *) GDKzalloc(128 * sizeof(DBalias));
200 1 : if (dbalias == NULL) {
201 0 : GDKfree(location);
202 0 : throw(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL);
203 : }
204 1 : dbtop = 0;
205 :
206 1 : if (newMalBlkStmt(mb, mb->ssize) < 0) {
207 0 : GDKfree(dbalias);
208 0 : GDKfree(location);
209 0 : throw(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL);
210 : }
211 :
212 4 : for (i = 0; i < limit; i++) {
213 3 : p = old[i];
214 :
215 : /* detect remote instructions */
216 3 : cnt = 0;
217 5 : for (j = 0; j < p->argc; j++)
218 2 : if (location[getArg(p, j)])
219 0 : cnt++;
220 :
221 : /* detect remote variable binding */
222 :
223 3 : if ((getModuleId(p) == mapiRef && getFunctionId(p) == bindRef)) {
224 0 : if (p->argc == 3 && getArgType(mb, p, 1) == TYPE_int) {
225 0 : int tpe;
226 0 : j = getArg(p, 1); /* lookupServer with key */
227 0 : tpe = getArgType(mb, p, 0);
228 : /* result is remote */
229 0 : location[getArg(p, 0)] = j;
230 :
231 : /* turn the instruction into a local one */
232 : /* one argument less */
233 0 : p->argc--;
234 : /* only use the second argument (string) */
235 0 : getArg(p, 1) = getArg(p, 2);
236 :
237 0 : getModuleId(p) = bbpRef;
238 :
239 0 : prepareRemote(tpe);
240 0 : putRemoteVariables();
241 0 : remoteAction();
242 : } else
243 0 : pushInstruction(mb, p);
244 3 : } else if ((getModuleId(p) == sqlRef && getFunctionId(p) == evalRef)) {
245 0 : if (p->argc == 3) {
246 : /* a remote sql eval is needed */
247 0 : lookupServer(1);
248 : /* turn the instruction into a local one */
249 : /* one argument less */
250 0 : p->argc--;
251 : /* only use the second argument (string) */
252 0 : getArg(p, 1) = getArg(p, 2);
253 :
254 0 : prepareRemote(TYPE_void);
255 0 : s = RQcall2str(mb, p);
256 0 : r = pushStr(mb, r, s + 1);
257 0 : GDKfree(s);
258 0 : pushInstruction(mb, r);
259 0 : freeInstruction(p);
260 0 : actions++;
261 : }
262 3 : } else if ((getModuleId(p) == sqlRef && getFunctionId(p) == bindRef)) {
263 :
264 0 : if (p->argc == 6 && getArgType(mb, p, 4) == TYPE_str) {
265 0 : int tpe;
266 0 : j = getArg(p, 1); /* lookupServer with key */
267 0 : tpe = getArgType(mb, p, 0);
268 :
269 0 : lookupServer(4);
270 : /* turn the instruction into a local one */
271 0 : k = defConstant(mb, TYPE_int, &cst);
272 0 : if (k >= 0) {
273 0 : getArg(p, 4) = k;
274 0 : prepareRemote(tpe);
275 0 : putRemoteVariables();
276 0 : remoteAction();
277 : }
278 : } else
279 0 : pushInstruction(mb, p);
280 3 : } else if (getModuleId(p) == sqlRef && getFunctionId(p) == binddbatRef) {
281 :
282 0 : if (p->argc == 5 && getArgType(mb, p, 3) == TYPE_str) {
283 0 : lookupServer(3);
284 : /* turn the instruction into a local one */
285 0 : k = defConstant(mb, TYPE_int, &cst);
286 0 : if (k >= 0) {
287 0 : getArg(p, 3) = defConstant(mb, TYPE_int, &cst);
288 0 : prepareRemote(TYPE_void);
289 0 : putRemoteVariables();
290 0 : remoteAction();
291 : }
292 : } else {
293 0 : pushInstruction(mb, p);
294 : }
295 3 : } else if (getModuleId(p) == optimizerRef || cnt == 0 || p->barrier) /* local only or flow control statement */
296 3 : pushInstruction(mb, p);
297 : else {
298 : /*
299 : * The hard part is to decide what to do with instructions that
300 : * contain a reference to a remote variable.
301 : * In the first implementation we use the following policy.
302 : * If there are multiple sites involved, all arguments are
303 : * moved local for processing. Moreover, all local arguments
304 : * to be shipped should be simple.
305 : */
306 : remoteSite = 0;
307 : collectFirst = false;
308 0 : for (j = 0; j < p->argc; j++)
309 0 : if (location[getArg(p, j)]) {
310 0 : if (remoteSite == 0)
311 : remoteSite = location[getArg(p, j)];
312 0 : else if (remoteSite != location[getArg(p, j)])
313 0 : collectFirst = true;
314 : }
315 0 : if (getModuleId(p) == ioRef
316 0 : || (getModuleId(p) == sqlRef
317 0 : && (getFunctionId(p) == resultSetRef
318 0 : || getFunctionId(p) == rsColumnRef)))
319 0 : collectFirst = true;
320 :
321 : /* local BATs are not shipped */
322 0 : if (remoteSite && !collectFirst)
323 0 : for (j = p->retc; j < p->argc; j++)
324 0 : if (location[getArg(p, j)] == 0
325 0 : && isaBatType(getVarType(mb, getArg(p, j))))
326 0 : collectFirst = true;
327 :
328 0 : if (collectFirst) {
329 : /* perform locally */
330 0 : for (j = p->retc; j < p->argc; j++)
331 0 : if (location[getArg(p, j)]) {
332 0 : q = newInstruction(0, mapiRef, rpcRef);
333 0 : if (q == NULL) {
334 0 : msg = createException(MAL, "optimizer.remote",
335 : SQLSTATE(HY013)
336 : MAL_MALLOC_FAIL);
337 0 : break;
338 : }
339 0 : getArg(q, 0) = getArg(p, j);
340 0 : q = pushArgument(mb, q, location[getArg(p, j)]);
341 0 : snprintf(buf, BUFSIZ, "io.print(%s);",
342 : getVarName(mb, getArg(p, j)));
343 0 : q = pushStr(mb, q, buf);
344 0 : pushInstruction(mb, q);
345 : }
346 0 : if (msg)
347 : break;
348 0 : pushInstruction(mb, p);
349 : /* as of now all the targets are also local */
350 0 : for (j = 0; j < p->retc; j++)
351 0 : location[getArg(p, j)] = 0;
352 0 : actions++;
353 0 : } else if (remoteSite) {
354 : /* single remote site involved */
355 0 : r = newInstruction(mb, mapiRef, rpcRef);
356 0 : if (r == NULL) {
357 0 : msg = createException(MAL, "optimizer.remote",
358 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
359 0 : break;
360 : }
361 0 : getArg(r, 0) = newTmpVariable(mb, TYPE_void);
362 0 : r = pushArgument(mb, r, remoteSite);
363 :
364 0 : for (j = p->retc; j < p->argc; j++)
365 0 : if (location[getArg(p, j)] == 0
366 0 : && !isVarConstant(mb, getArg(p, j))) {
367 0 : q = newInstruction(0, mapiRef, putRef);
368 0 : if (q == NULL) {
369 0 : freeInstruction(r);
370 0 : msg = createException(MAL, "optimizer.remote",
371 : SQLSTATE(HY013)
372 : MAL_MALLOC_FAIL);
373 0 : break;
374 : }
375 0 : getArg(q, 0) = newTmpVariable(mb, TYPE_void);
376 0 : q = pushArgument(mb, q, remoteSite);
377 0 : q = pushStr(mb, q, getVarName(mb, getArg(p, j)));
378 0 : q = pushArgument(mb, q, getArg(p, j));
379 0 : pushInstruction(mb, q);
380 : }
381 0 : s = RQcall2str(mb, p);
382 0 : pushInstruction(mb, r);
383 0 : (void) pushStr(mb, r, s + 1);
384 0 : GDKfree(s);
385 0 : for (j = 0; j < p->retc; j++)
386 0 : location[getArg(p, j)] = remoteSite;
387 0 : freeInstruction(p);
388 0 : actions++;
389 : } else
390 0 : pushInstruction(mb, p);
391 : }
392 : }
393 2 : bailout:
394 254 : for (; i < slimit; i++)
395 253 : if (old[i])
396 0 : pushInstruction(mb, old[i]);
397 1 : GDKfree(old);
398 1 : GDKfree(location);
399 1 : GDKfree(dbalias);
400 :
401 : /* Defense line against incorrect plans */
402 1 : if (msg == MAL_SUCCEED && actions) {
403 0 : msg = chkTypes(cntxt->usermodule, mb, FALSE);
404 0 : if (!msg)
405 0 : msg = chkFlow(mb);
406 0 : if (!msg)
407 0 : msg = chkDeclarations(mb);
408 : }
409 : /* keep actions taken as a fake argument */
410 1 : (void) pushInt(mb, pci, actions);
411 1 : return msg;
412 : }
|