Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * (c) Fabian Groffen, Martin Kersten
15 : * Remote querying functionality
16 : * Communication with other mservers at the MAL level is a delicate task.
17 : * However, it is indispensable for any distributed functionality. This
18 : * module provides an abstract way to store and retrieve objects on a
19 : * remote site. Additionally, functions on a remote site can be executed
20 : * using objects available in the remote session context. This yields in
21 : * four primitive functions that form the basis for distribution methods:
22 : * get, put, register and exec.
23 : *
24 : * The get method simply retrieves a copy of a remote object. Objects can
25 : * be simple values, strings or Column. The same holds for the put method,
26 : * but the other way around. A local object can be stored on a remote
27 : * site. Upon a successful store, the put method returns the remote
28 : * identifier for the stored object. With this identifier the object can
29 : * be addressed, e.g. using the get method to retrieve the object that was
30 : * stored using put.
31 : *
32 : * The get and put methods are symmetric. Performing a get on an
33 : * identifier that was returned by put, results in an object with the same
34 : * value and type as the one that was put. The result of such an operation is
35 : * equivalent to making an (expensive) copy of the original object.
36 : *
37 : * The register function takes a local MAL function and makes it known at a
38 : * remote site. It ensures that it does not overload an already known
39 : * operation remotely, which could create a semantic conflict.
40 : * Deregistering a function is forbidden, because it would allow for taking
41 : * over the remote site completely.
42 : * C-implemented functions, such as io.print() cannot be remotely stored.
43 : * It would require even more complicated (byte) code shipping and remote
44 : * compilation to make it work.
45 : *
46 : * The choice to let exec only execute functions avoids problems
47 : * to decide what should be returned to the caller. With a function it is
48 : * clear and simple to return that what the function signature prescribes.
49 : * Any side effect (e.g. io.print calls) may cause havoc in the system,
50 : * but are currently ignored.
51 : *
52 : * This leads to the final contract of this module. The methods should be
53 : * used correctly, by obeying their contract. Failing to do so will result
54 : * in errors and possibly undefined behaviour.
55 : *
56 : * The resolve() function can be used to query Merovingian. It returns one
57 : * or more databases discovered in its vicinity matching the given pattern.
58 : *
59 : */
60 : #include "monetdb_config.h"
61 : #include "remote.h"
62 :
63 : /*
64 : * Technically, these methods need to be serialised per connection,
65 : * hence a scheduler that interleaves e.g. multiple get calls, simply
66 : * violates this constraint. If parallelism to the same site is
67 : * desired, a user could create a second connection. This is not always
68 : * easy to generate at the proper place, e.g. overloading the dataflow
69 : * optimizer to patch connections structures is not acceptable.
70 : *
71 : * Instead, we maintain a simple lock with each connection, which can be
72 : * used to issue a safe, but blocking get/put/exec/register request.
73 : */
74 : #include "mal_exception.h"
75 : #include "mal_interpreter.h"
76 : #include "mal_function.h" /* for printFunction */
77 : #include "mal_listing.h"
78 : #include "mal_instruction.h" /* for getmodule/func macros */
79 : #include "mal_authorize.h"
80 : #include "mapi.h"
81 : #include "mutils.h"
82 :
83 : #define RMTT_L_ENDIAN (0<<1)
84 : #define RMTT_B_ENDIAN (1<<1)
85 : #define RMTT_32_BITS (0<<2)
86 : #define RMTT_64_BITS (1<<2)
87 : #define RMTT_32_OIDS (0<<3)
88 : #define RMTT_64_OIDS (1<<3)
89 : #define RMTT_HGE (1<<4)
90 :
91 : typedef struct _connection {
92 : MT_Lock lock; /* lock to avoid interference */
93 : str name; /* the handle for this connection */
94 : Mapi mconn; /* the Mapi handle for the connection */
95 : unsigned char type; /* binary profile of the connection target */
96 : bool int128; /* has int128 support */
97 : size_t nextid; /* id counter */
98 : struct _connection *next; /* the next connection in the list */
99 : } *connection;
100 :
101 : #ifndef WIN32
102 : #include <sys/socket.h> /* socket */
103 : #include <sys/un.h> /* sockaddr_un */
104 : #endif
105 : #include <unistd.h> /* gethostname */
106 :
107 : static MT_Lock mal_remoteLock = MT_LOCK_INITIALIZER(mal_remoteLock);
108 :
109 : static connection conns = NULL;
110 : static unsigned char localtype = 0177;
111 : static bool int128 = false;
112 :
113 : static inline str RMTquery(MapiHdl *ret, const char *func, Mapi conn,
114 : const char *query);
115 :
116 : /**
117 : * Returns a BAT with valid redirects for the given pattern. If
118 : * merovingian is not running, this function throws an error.
119 : */
120 : static str
121 0 : RMTresolve(bat *ret, const char *const *pat)
122 : {
123 : #ifdef NATIVE_WIN32
124 : (void) ret;
125 : (void) pat;
126 : throw(MAL, "remote.resolve", "merovingian is not available on " "your platform, sorry"); /* please upgrade to Linux, etc. */
127 : #else
128 0 : BAT *list;
129 0 : const char *mero_uri;
130 0 : char *p;
131 0 : unsigned int port;
132 0 : char **redirs;
133 0 : char **or;
134 :
135 0 : if (pat == NULL || *pat == NULL || strcmp(*pat, (str) str_nil) == 0)
136 0 : throw(ILLARG, "remote.resolve",
137 : ILLEGAL_ARGUMENT ": pattern is NULL or nil");
138 :
139 0 : mero_uri = GDKgetenv("merovingian_uri");
140 0 : if (mero_uri == NULL)
141 0 : throw(MAL, "remote.resolve", "this function needs the mserver "
142 : "have been started by merovingian");
143 :
144 0 : list = COLnew(0, TYPE_str, 0, TRANSIENT);
145 0 : if (list == NULL)
146 0 : throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
147 :
148 : /* extract port from mero_uri, let mapi figure out the rest */
149 0 : mero_uri += strlen("mapi:monetdb://");
150 0 : if (*mero_uri == '[') {
151 0 : if ((mero_uri = strchr(mero_uri, ']')) == NULL) {
152 0 : BBPreclaim(list);
153 0 : throw(MAL, "remote.resolve",
154 : "illegal IPv6 address on merovingian_uri: %s",
155 : GDKgetenv("merovingian_uri"));
156 : }
157 : }
158 0 : if ((p = strchr(mero_uri, ':')) == NULL) {
159 0 : BBPreclaim(list);
160 0 : throw(MAL, "remote.resolve", "illegal merovingian_uri setting: %s",
161 : GDKgetenv("merovingian_uri"));
162 : }
163 0 : port = (unsigned int) atoi(p + 1);
164 :
165 0 : or = redirs = mapi_resolve(NULL, port, *pat);
166 :
167 0 : if (redirs == NULL) {
168 0 : BBPreclaim(list);
169 0 : throw(MAL, "remote.resolve", "unknown failure when resolving pattern");
170 : }
171 :
172 0 : while (*redirs != NULL) {
173 0 : if (BUNappend(list, (ptr) *redirs, false) != GDK_SUCCEED) {
174 0 : BBPreclaim(list);
175 0 : do
176 0 : free(*redirs);
177 0 : while (*++redirs);
178 0 : free(or);
179 0 : throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
180 : }
181 0 : free(*redirs);
182 0 : redirs++;
183 : }
184 0 : free(or);
185 :
186 0 : *ret = list->batCacheid;
187 0 : BBPkeepref(list);
188 0 : return (MAL_SUCCEED);
189 : #endif
190 : }
191 :
192 :
193 : /* for unique connection identifiers */
194 : static size_t connection_id = 0;
195 :
196 : /**
197 : * Returns a connection to the given uri. It always returns a newly
198 : * created connection.
199 : */
200 : static str
201 191 : RMTconnectScen(str *ret,
202 : const char *const *ouri, const char *const *user, const char *const *passwd, const char *const *scen, bit *columnar)
203 : {
204 191 : connection c;
205 191 : char conn[BUFSIZ];
206 191 : char *s;
207 191 : Mapi m;
208 191 : MapiHdl hdl;
209 191 : str msg;
210 :
211 : /* just make sure the return isn't garbage */
212 191 : *ret = 0;
213 :
214 191 : if (ouri == NULL || *ouri == NULL || strcmp(*ouri, (str) str_nil) == 0)
215 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": database uri "
216 : "is NULL or nil");
217 191 : if (user == NULL || *user == NULL || strcmp(*user, (str) str_nil) == 0)
218 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": username is "
219 : "NULL or nil");
220 191 : if (passwd == NULL || *passwd == NULL
221 191 : || strcmp(*passwd, (str) str_nil) == 0)
222 0 : throw(ILLARG, "remote.connect",
223 : ILLEGAL_ARGUMENT ": password is " "NULL or nil");
224 191 : if (scen == NULL || *scen == NULL || strcmp(*scen, (str) str_nil) == 0)
225 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario is "
226 : "NULL or nil");
227 191 : if (strcmp(*scen, "mal") != 0 && strcmp(*scen, "msql") != 0)
228 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario '%s' "
229 : "is not supported", *scen);
230 :
231 191 : m = mapi_mapiuri(*ouri, *user, *passwd, *scen);
232 191 : if (mapi_error(m)) {
233 0 : msg = createException(MAL, "remote.connect",
234 : "unable to connect to '%s': %s",
235 : *ouri, mapi_error_str(m));
236 0 : mapi_destroy(m);
237 0 : return msg;
238 : }
239 :
240 191 : MT_lock_set(&mal_remoteLock);
241 :
242 : /* generate an unique connection name, they are only known
243 : * within one mserver, id is primary key, the rest is super key */
244 191 : snprintf(conn, BUFSIZ, "%s_%s_%zu", mapi_get_dbname(m), *user,
245 : connection_id++);
246 : /* make sure we can construct MAL identifiers using conn */
247 5380 : for (s = conn; *s != '\0'; s++) {
248 5189 : if (!isalnum((unsigned char) *s)) {
249 743 : *s = '_';
250 : }
251 : }
252 :
253 191 : if (mapi_reconnect(m) != MOK) {
254 5 : MT_lock_unset(&mal_remoteLock);
255 5 : msg = createException(IO, "remote.connect",
256 : "unable to connect to '%s': %s",
257 : *ouri, mapi_error_str(m));
258 5 : mapi_destroy(m);
259 5 : return msg;
260 : }
261 :
262 186 : if (columnar && *columnar) {
263 1 : char set_protocol_query_buf[50];
264 1 : snprintf(set_protocol_query_buf, 50, "sql.set_protocol(%d:int);",
265 : PROTOCOL_COLUMNAR);
266 1 : if ((msg = RMTquery(&hdl, "remote.connect", m, set_protocol_query_buf))) {
267 0 : mapi_destroy(m);
268 0 : MT_lock_unset(&mal_remoteLock);
269 0 : return msg;
270 : }
271 : }
272 :
273 : /* connection established, add to list */
274 186 : c = GDKzalloc(sizeof(struct _connection));
275 186 : if (c == NULL || (c->name = GDKstrdup(conn)) == NULL) {
276 0 : GDKfree(c);
277 0 : mapi_destroy(m);
278 0 : MT_lock_unset(&mal_remoteLock);
279 0 : throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
280 : }
281 186 : c->mconn = m;
282 186 : c->nextid = 0;
283 186 : MT_lock_init(&c->lock, c->name);
284 186 : c->next = conns;
285 186 : conns = c;
286 :
287 186 : msg = RMTquery(&hdl, "remote.connect", m, "remote.bintype();");
288 186 : if (msg) {
289 0 : MT_lock_unset(&mal_remoteLock);
290 0 : return msg;
291 : }
292 372 : if (hdl != NULL && mapi_fetch_row(hdl)) {
293 186 : char *val = mapi_fetch_field(hdl, 0);
294 186 : c->type = (unsigned char) atoi(val);
295 186 : mapi_close_handle(hdl);
296 : } else {
297 0 : c->type = 0;
298 : }
299 :
300 : #ifdef _DEBUG_MAPI_
301 : mapi_trace(c->mconn, true);
302 : #endif
303 186 : if (c->type != localtype && (c->type | RMTT_HGE) == localtype) {
304 : /* we support hge, and for remote, we don't know */
305 0 : msg = RMTquery(&hdl, "remote.connect", m, "x := 0:hge;");
306 0 : if (msg) {
307 0 : freeException(msg);
308 0 : c->int128 = false;
309 : } else {
310 0 : mapi_close_handle(hdl);
311 0 : c->int128 = true;
312 0 : c->type |= RMTT_HGE;
313 : }
314 186 : } else if (c->type == localtype) {
315 186 : c->int128 = int128;
316 : }
317 186 : MT_lock_unset(&mal_remoteLock);
318 :
319 186 : *ret = GDKstrdup(conn);
320 186 : if (*ret == NULL)
321 0 : throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
322 : return (MAL_SUCCEED);
323 : }
324 :
325 : static str
326 190 : RMTconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
327 : {
328 190 : (void) cntxt;
329 190 : (void) mb;
330 190 : str *ret = getArgReference_str(stk, pci, 0);
331 190 : const char *uri = *getArgReference_str(stk, pci, 1);
332 190 : const char *user = *getArgReference_str(stk, pci, 2);
333 190 : const char *passwd = *getArgReference_str(stk, pci, 3);
334 :
335 190 : const char *scen = "msql";
336 :
337 190 : if (pci->argc >= 5)
338 190 : scen = *getArgReference_str(stk, pci, 4);
339 :
340 190 : return RMTconnectScen(ret, &uri, &user, &passwd, &scen, NULL);
341 : }
342 :
343 : /**
344 : * Disconnects a connection. The connection needs not to exist in the
345 : * system, it only needs to exist for the client (i.e. it was once
346 : * created).
347 : */
348 : str
349 186 : RMTdisconnect(void *ret, const char *const *conn)
350 : {
351 186 : connection c, t;
352 :
353 186 : if (conn == NULL || *conn == NULL || strcmp(*conn, (str) str_nil) == 0)
354 0 : throw(ILLARG, "remote.disconnect", ILLEGAL_ARGUMENT ": connection "
355 : "is NULL or nil");
356 :
357 :
358 186 : (void) ret;
359 :
360 : /* we need a lock because the same user can be handled by multiple
361 : * threads */
362 186 : MT_lock_set(&mal_remoteLock);
363 186 : c = conns;
364 186 : t = NULL; /* parent */
365 : /* walk through the list */
366 251 : while (c != NULL) {
367 251 : if (strcmp(c->name, *conn) == 0) {
368 : /* ok, delete it... */
369 186 : if (t == NULL) {
370 156 : conns = c->next;
371 : } else {
372 30 : t->next = c->next;
373 : }
374 :
375 186 : MT_lock_set(&c->lock); /* shared connection */
376 186 : mapi_disconnect(c->mconn);
377 186 : mapi_destroy(c->mconn);
378 186 : MT_lock_unset(&c->lock);
379 186 : MT_lock_destroy(&c->lock);
380 186 : GDKfree(c->name);
381 186 : GDKfree(c);
382 186 : MT_lock_unset(&mal_remoteLock);
383 186 : return MAL_SUCCEED;
384 : }
385 65 : t = c;
386 65 : c = c->next;
387 : }
388 :
389 0 : MT_lock_unset(&mal_remoteLock);
390 0 : throw(MAL, "remote.disconnect", "no such connection: %s", *conn);
391 : }
392 :
393 : /**
394 : * Helper function to return a connection matching a given string, or an
395 : * error if it does not exist. Since this function is internal, it
396 : * doesn't check the argument conn, as it should have been checked
397 : * already.
398 : * NOTE: this function acquires the mal_remoteLock before accessing conns
399 : */
400 : static inline str
401 5403 : RMTfindconn(connection *ret, const char *conn)
402 : {
403 5403 : connection c;
404 :
405 : /* just make sure the return isn't garbage */
406 5403 : *ret = NULL;
407 5403 : MT_lock_set(&mal_remoteLock); /* protect c */
408 5403 : c = conns;
409 7141 : while (c != NULL) {
410 7141 : if (strcmp(c->name, conn) == 0) {
411 5403 : *ret = c;
412 5403 : MT_lock_unset(&mal_remoteLock);
413 5403 : return (MAL_SUCCEED);
414 : }
415 1738 : c = c->next;
416 : }
417 0 : MT_lock_unset(&mal_remoteLock);
418 0 : throw(MAL, "remote.<findconn>", "no such connection: %s", conn);
419 : }
420 :
421 : /**
422 : * Little helper function that returns a GDKmalloced string containing a
423 : * valid identifier that is supposed to be unique in the connection's
424 : * remote context. The generated string depends on the module and
425 : * function the caller is in. But also the runtime context is important.
426 : * The format is rmt<id>_<retvar>_<type>. Every RMTgetId uses a fresh id,
427 : * to distinguish amongst different (parallel) execution context.
428 : * Reuse of this remote identifier should be done with care.
429 : * The encoding of the type allows for ease of type checking later on.
430 : */
431 : static inline str
432 3270 : RMTgetId(char *buf, size_t buflen, MalBlkPtr mb, InstrPtr p, int arg)
433 : {
434 3270 : InstrPtr f;
435 3270 : const char *mod;
436 3270 : char *var;
437 3270 : str rt;
438 3270 : char name[IDLENGTH] = { 0 };
439 3270 : static ATOMIC_TYPE idtag = ATOMIC_VAR_INIT(0);
440 :
441 3270 : if (p->retc == 0)
442 0 : throw(MAL, "remote.getId",
443 : ILLEGAL_ARGUMENT "MAL instruction misses retc");
444 :
445 3270 : var = getArgNameIntoBuffer(mb, p, arg, name);
446 3270 : f = getInstrPtr(mb, 0); /* top level function */
447 3270 : mod = getModuleId(f);
448 3270 : if (mod == NULL)
449 3270 : mod = "user";
450 3270 : rt = getTypeIdentifier(getArgType(mb, p, arg));
451 3270 : if (rt == NULL)
452 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
453 :
454 3270 : snprintf(buf, buflen, "rmt%u_%s_%s", (unsigned) ATOMIC_ADD(&idtag, 1), var,
455 : rt);
456 :
457 3270 : GDKfree(rt);
458 3270 : return (MAL_SUCCEED);
459 : }
460 :
461 : /**
462 : * Helper function to execute a query over the given connection,
463 : * returning the result handle. If communication fails in one way or
464 : * another, an error is returned. Since this function is internal, it
465 : * doesn't check the input arguments func, conn and query, as they
466 : * should have been checked already.
467 : * NOTE: this function assumes a lock for conn is set
468 : */
469 : static inline str
470 4190 : RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query)
471 : {
472 4190 : MapiHdl mhdl;
473 :
474 4190 : *ret = NULL;
475 4190 : mhdl = mapi_query(conn, query);
476 4190 : if (mhdl) {
477 4190 : if (mapi_result_error(mhdl) != NULL) {
478 6 : str err = createException(getExceptionType(mapi_result_error(mhdl)),
479 : func,
480 : "(mapi:monetdb://%s@%s/%s) %s",
481 : mapi_get_user(conn),
482 : mapi_get_host(conn),
483 : mapi_get_dbname(conn),
484 : getExceptionMessage(mapi_result_error(mhdl)));
485 6 : mapi_close_handle(mhdl);
486 6 : return (err);
487 : }
488 : } else {
489 0 : if (mapi_error(conn) != MOK) {
490 0 : throw(IO, func, "an error occurred on connection: %s",
491 : mapi_error_str(conn));
492 : } else {
493 0 : throw(MAL, func,
494 : "remote function invocation didn't return a result");
495 : }
496 : }
497 :
498 4184 : *ret = mhdl;
499 4184 : return (MAL_SUCCEED);
500 : }
501 :
502 : static str
503 332 : RMTprelude(void)
504 : {
505 332 : unsigned int type = 0;
506 :
507 : #ifdef WORDS_BIGENDIAN
508 : type |= RMTT_B_ENDIAN;
509 : #else
510 332 : type |= RMTT_L_ENDIAN;
511 : #endif
512 : #if SIZEOF_SIZE_T == SIZEOF_LNG
513 332 : type |= RMTT_64_BITS;
514 : #else
515 : type |= RMTT_32_BITS;
516 : #endif
517 : #if SIZEOF_OID == SIZEOF_LNG
518 332 : type |= RMTT_64_OIDS;
519 : #else
520 : type |= RMTT_32_OIDS;
521 : #endif
522 : #ifdef HAVE_HGE
523 332 : type |= RMTT_HGE;
524 332 : int128 = true;
525 : #endif
526 332 : localtype = (unsigned char) type;
527 :
528 332 : return (MAL_SUCCEED);
529 : }
530 :
531 : static str
532 330 : RMTepilogue(void *ret)
533 : {
534 330 : connection c, t;
535 :
536 330 : (void) ret;
537 :
538 330 : MT_lock_set(&mal_remoteLock); /* nobody allowed here */
539 : /* free connections list */
540 330 : c = conns;
541 330 : while (c != NULL) {
542 0 : t = c;
543 0 : c = c->next;
544 0 : MT_lock_set(&t->lock);
545 0 : mapi_destroy(t->mconn);
546 0 : MT_lock_unset(&t->lock);
547 0 : MT_lock_destroy(&t->lock);
548 0 : GDKfree(t->name);
549 0 : GDKfree(t);
550 : }
551 : /* not sure, but better be safe than sorry */
552 330 : conns = NULL;
553 330 : MT_lock_unset(&mal_remoteLock);
554 :
555 330 : return (MAL_SUCCEED);
556 : }
557 :
558 : static str
559 1402 : RMTreadbatheader(stream *sin, char *buf)
560 : {
561 1402 : ssize_t sz = 0, rd;
562 :
563 : /* read the JSON header */
564 227624 : while ((rd = mnstr_read(sin, &buf[sz], 1, 1)) == 1 && buf[sz] != '\n') {
565 226222 : sz += rd;
566 : }
567 1402 : if (rd < 0) {
568 0 : throw(MAL, "remote.get", "could not read BAT JSON header");
569 : }
570 1402 : if (buf[0] == '!') {
571 0 : char *result;
572 0 : if ((result = GDKstrdup(buf)) == NULL)
573 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
574 : return result;
575 : }
576 :
577 1402 : buf[sz] = '\0';
578 :
579 1402 : return MAL_SUCCEED;
580 : }
581 :
582 : typedef struct _binbat_v1 {
583 : int Ttype;
584 : oid Hseqbase;
585 : oid Tseqbase;
586 : bool
587 : Tsorted:1, Trevsorted:1, Tkey:1, Tnonil:1, Tdense:1;
588 : BUN size;
589 : size_t tailsize;
590 : size_t theapsize;
591 : } binbat;
592 :
593 : static str
594 1402 : RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in, bool must_flush, bool cint128)
595 : {
596 1402 : binbat bb = { 0, 0, 0, false, false, false, false, false, 0, 0, 0 };
597 1402 : char *nme = NULL;
598 1402 : char *val = NULL;
599 1402 : char tmp;
600 1402 : size_t len;
601 1402 : lng lv, *lvp;
602 :
603 1402 : BAT *b;
604 :
605 1402 : (void) cint128;
606 : /* hdr is a JSON structure that looks like
607 : * {"version":1,"ttype":6,"tseqbase":0,"tailsize":4,"theapsize":0}
608 : * we take the binary data directly from the stream */
609 :
610 : /* could skip whitespace, but we just don't allow that */
611 1402 : if (*hdr++ != '{')
612 0 : throw(MAL, "remote.bincopyfrom",
613 : "illegal input, not a JSON header (got '%s')", hdr - 1);
614 226206 : while (*hdr != '\0') {
615 224804 : switch (*hdr) {
616 33647 : case '"':
617 : /* we assume only numeric values, so all strings are
618 : * elems */
619 33647 : if (nme != NULL) {
620 16823 : *hdr = '\0';
621 : } else {
622 16824 : nme = hdr + 1;
623 : }
624 : break;
625 16823 : case ':':
626 16823 : val = hdr + 1;
627 16823 : break;
628 16824 : case ',':
629 : case '}':
630 16824 : if (val == NULL)
631 0 : throw(MAL, "remote.bincopyfrom",
632 : "illegal input, JSON value missing");
633 16824 : *hdr = '\0';
634 :
635 16824 : lvp = &lv;
636 16824 : len = sizeof(lv);
637 : /* tseqbase can be 1<<31/1<<63 which causes overflow
638 : * in lngFromStr, so we check separately */
639 16824 : if (strcmp(val,
640 : #if SIZEOF_OID == 8
641 : "9223372036854775808"
642 : #else
643 : "2147483648"
644 : #endif
645 1372 : ) == 0 && strcmp(nme, "tseqbase") == 0) {
646 1372 : bb.Tseqbase = oid_nil;
647 : } else {
648 : /* all values should be non-negative, so we check that
649 : * here as well */
650 15452 : if (lngFromStr(val, &len, &lvp, true) < 0 ||
651 15452 : lv < 0 /* includes lng_nil */ )
652 0 : throw(MAL, "remote.bincopyfrom",
653 : "bad %s value: %s", nme, val);
654 :
655 : /* deal with nme and val */
656 15452 : if (strcmp(nme, "version") == 0) {
657 1402 : if (lv != 1)
658 0 : throw(MAL, "remote.bincopyfrom",
659 : "unsupported version: %s", val);
660 14050 : } else if (strcmp(nme, "hseqbase") == 0) {
661 : #if SIZEOF_OID < SIZEOF_LNG
662 : if (lv > GDK_oid_max)
663 : throw(MAL, "remote.bincopyfrom",
664 : "bad %s value: %s", nme, val);
665 : #endif
666 1402 : bb.Hseqbase = (oid) lv;
667 12648 : } else if (strcmp(nme, "ttype") == 0) {
668 1402 : if (lv >= GDKatomcnt)
669 0 : throw(MAL, "remote.bincopyfrom",
670 : "bad %s value: GDK atom number %s doesn't exist",
671 : nme, val);
672 1402 : bb.Ttype = (int) lv;
673 11246 : } else if (strcmp(nme, "tseqbase") == 0) {
674 : #if SIZEOF_OID < SIZEOF_LNG
675 : if (lv > GDK_oid_max)
676 : throw(MAL, "remote.bincopyfrom",
677 : "bad %s value: %s", nme, val);
678 : #endif
679 30 : bb.Tseqbase = (oid) lv;
680 11216 : } else if (strcmp(nme, "tsorted") == 0) {
681 1402 : bb.Tsorted = lv != 0;
682 9814 : } else if (strcmp(nme, "trevsorted") == 0) {
683 1402 : bb.Trevsorted = lv != 0;
684 8412 : } else if (strcmp(nme, "tkey") == 0) {
685 1402 : bb.Tkey = lv != 0;
686 7010 : } else if (strcmp(nme, "tnonil") == 0) {
687 1402 : bb.Tnonil = lv != 0;
688 5608 : } else if (strcmp(nme, "tdense") == 0) {
689 1402 : bb.Tdense = lv != 0;
690 4206 : } else if (strcmp(nme, "size") == 0) {
691 1402 : if (lv > (lng) BUN_MAX)
692 0 : throw(MAL, "remote.bincopyfrom",
693 : "bad %s value: %s", nme, val);
694 1402 : bb.size = (BUN) lv;
695 2804 : } else if (strcmp(nme, "tailsize") == 0) {
696 1402 : bb.tailsize = (size_t) lv;
697 1402 : } else if (strcmp(nme, "theapsize") == 0) {
698 1402 : bb.theapsize = (size_t) lv;
699 : } else {
700 0 : throw(MAL, "remote.bincopyfrom",
701 : "unknown element: %s", nme);
702 : }
703 : }
704 : nme = val = NULL;
705 : break;
706 : }
707 224804 : hdr++;
708 : }
709 : #ifdef HAVE_HGE
710 1402 : if (int128 && !cint128 && bb.Ttype >= TYPE_hge)
711 0 : bb.Ttype++;
712 : #else
713 : (void) cint128;
714 : #endif
715 :
716 2712 : b = COLnew2(bb.Hseqbase, bb.Ttype, bb.size, TRANSIENT,
717 1310 : bb.size > 0 ? (uint16_t) (bb.tailsize / bb.size) : 0);
718 1402 : if (b == NULL)
719 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
720 :
721 1402 : if (bb.tailsize > 0) {
722 2570 : if (HEAPextend(b->theap, bb.tailsize, true) != GDK_SUCCEED ||
723 1285 : mnstr_read(in, b->theap->base, bb.tailsize, 1) < 0)
724 0 : goto bailout;
725 1285 : b->theap->dirty = true;
726 : }
727 1402 : if (bb.theapsize > 0) {
728 152 : if ((b->tvheap->base == NULL &&
729 76 : (*BATatoms[b->ttype].atomHeap) (b->tvheap,
730 : b->batCapacity) != GDK_SUCCEED)
731 76 : || HEAPextend(b->tvheap, bb.theapsize, true) != GDK_SUCCEED
732 76 : || mnstr_read(in, b->tvheap->base, bb.theapsize, 1) < 0)
733 0 : goto bailout;
734 76 : b->tvheap->free = bb.theapsize;
735 76 : b->tvheap->dirty = true;
736 : }
737 :
738 : /* set properties */
739 1402 : b->tseqbase = bb.Tdense ? bb.Tseqbase : oid_nil;
740 1402 : b->tsorted = bb.Tsorted;
741 1402 : b->trevsorted = bb.Trevsorted;
742 1402 : b->tkey = bb.Tkey;
743 1402 : b->tnonil = bb.Tnonil;
744 1402 : if (bb.Ttype == TYPE_str && bb.size)
745 76 : BATsetcapacity(b, (BUN) (bb.tailsize >> b->tshift));
746 1402 : BATsetcount(b, bb.size);
747 :
748 : // read blockmode flush
749 1402 : while (must_flush && mnstr_read(in, &tmp, 1, 1) > 0) {
750 0 : TRC_ERROR(MAL_REMOTE, "Expected flush, got: %c\n", tmp);
751 : }
752 :
753 1402 : BATsettrivprop(b);
754 :
755 1402 : *ret = b;
756 1402 : return (MAL_SUCCEED);
757 :
758 : bailout:
759 0 : BBPreclaim(b);
760 0 : throw(MAL, "remote.bincopyfrom", "reading failed");
761 : }
762 :
763 : /**
764 : * get fetches the object referenced by ident over connection conn.
765 : * We are only interested in retrieving void-headed BATs, i.e. single columns.
766 : */
767 : static str
768 1400 : RMTget(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
769 : {
770 1400 : str conn, ident, tmp, rt;
771 1400 : connection c;
772 1400 : char qbuf[BUFSIZ + 1];
773 1400 : MapiHdl mhdl = NULL;
774 1400 : int rtype;
775 1400 : ValPtr v;
776 :
777 1400 : (void) mb;
778 1400 : (void) cntxt;
779 :
780 1400 : conn = *getArgReference_str(stk, pci, 1);
781 1400 : if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
782 0 : throw(ILLARG, "remote.get",
783 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
784 1400 : ident = *getArgReference_str(stk, pci, 2);
785 1400 : if (ident == 0 || isIdentifier(ident) < 0)
786 0 : throw(ILLARG, "remote.get",
787 : ILLEGAL_ARGUMENT ": identifier expected, got '%s'", ident);
788 :
789 : /* lookup conn, set c if valid */
790 1400 : rethrow("remote.get", tmp, RMTfindconn(&c, conn));
791 :
792 1400 : rtype = getArgType(mb, pci, 0);
793 1400 : v = &stk->stk[pci->argv[0]];
794 :
795 1400 : if (rtype == TYPE_any || isAnyExpression(rtype)) {
796 0 : char *tpe, *msg;
797 0 : tpe = getTypeName(rtype);
798 0 : msg = createException(MAL, "remote.get",
799 : ILLEGAL_ARGUMENT ": unsupported any type: %s",
800 : tpe);
801 0 : GDKfree(tpe);
802 0 : return msg;
803 : }
804 : /* check if the remote type complies with what we expect.
805 : Since the put() encodes the type as known to the remote site
806 : we can simple compare it here */
807 1400 : rt = getTypeIdentifier(rtype);
808 1400 : if (rt == NULL)
809 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
810 1400 : if (strcmp(ident + strlen(ident) - strlen(rt), rt)) {
811 0 : tmp = createException(MAL, "remote.get", ILLEGAL_ARGUMENT
812 : ": remote object type %s does not match expected type %s",
813 : rt, ident);
814 0 : GDKfree(rt);
815 0 : return tmp;
816 : }
817 1400 : GDKfree(rt);
818 :
819 1400 : if (isaBatType(rtype) && (localtype == 0177 || (localtype != c->type && localtype != (c->type | RMTT_HGE)))) {
820 0 : int t;
821 0 : size_t s;
822 0 : ptr r;
823 0 : str var;
824 0 : BAT *b;
825 :
826 0 : snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
827 :
828 0 : TRC_DEBUG(MAL_REMOTE, "Remote get: %s\n", qbuf);
829 :
830 : /* this call should be a single transaction over the channel */
831 0 : MT_lock_set(&c->lock);
832 :
833 0 : if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf))
834 : != MAL_SUCCEED) {
835 0 : TRC_ERROR(MAL_REMOTE, "Remote get: %s\n%s\n", qbuf, tmp);
836 0 : MT_lock_unset(&c->lock);
837 0 : var = createException(MAL, "remote.get", "%s", tmp);
838 0 : freeException(tmp);
839 0 : return var;
840 : }
841 0 : t = getBatType(rtype);
842 0 : b = COLnew(0, t, 0, TRANSIENT);
843 0 : if (b == NULL) {
844 0 : mapi_close_handle(mhdl);
845 0 : MT_lock_unset(&c->lock);
846 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
847 : }
848 :
849 0 : if (ATOMbasetype(t) == TYPE_str) {
850 0 : while (mapi_fetch_row(mhdl)) {
851 0 : var = mapi_fetch_field(mhdl, 1);
852 0 : if (BUNappend(b, var == NULL ? str_nil : var, false) != GDK_SUCCEED) {
853 0 : BBPreclaim(b);
854 0 : mapi_close_handle(mhdl);
855 0 : MT_lock_unset(&c->lock);
856 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
857 : }
858 : }
859 : } else
860 0 : while (mapi_fetch_row(mhdl)) {
861 0 : var = mapi_fetch_field(mhdl, 1);
862 0 : if (var == NULL)
863 0 : var = "nil";
864 0 : s = 0;
865 0 : r = NULL;
866 0 : if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
867 0 : BUNappend(b, r, false) != GDK_SUCCEED) {
868 0 : BBPreclaim(b);
869 0 : GDKfree(r);
870 0 : mapi_close_handle(mhdl);
871 0 : MT_lock_unset(&c->lock);
872 0 : throw(MAL, "remote.get", GDK_EXCEPTION);
873 : }
874 0 : GDKfree(r);
875 : }
876 :
877 0 : *v = (ValRecord) {
878 0 : .val.bval = b->batCacheid,
879 : .bat = true,
880 0 : .vtype = b->ttype,
881 : };
882 0 : BBPkeepref(b);
883 :
884 0 : mapi_close_handle(mhdl);
885 0 : MT_lock_unset(&c->lock);
886 1400 : } else if (isaBatType(rtype)) {
887 : /* binary compatible remote host, transfer BAT in binary form */
888 1400 : stream *sout;
889 1400 : stream *sin;
890 1400 : char buf[256];
891 1400 : BAT *b = NULL;
892 :
893 : /* this call should be a single transaction over the channel */
894 1400 : MT_lock_set(&c->lock);
895 :
896 : /* bypass Mapi from this point to efficiently write all data to
897 : * the server */
898 1400 : sout = mapi_get_to(c->mconn);
899 1400 : sin = mapi_get_from(c->mconn);
900 1400 : if (sin == NULL || sout == NULL) {
901 0 : MT_lock_unset(&c->lock);
902 0 : throw(MAL, "remote.get", "Connection lost");
903 : }
904 :
905 : /* call our remote helper to do this more efficiently */
906 1400 : mnstr_printf(sout, "remote.batbincopy(%s);\n", ident);
907 1400 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
908 :
909 1400 : if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED) {
910 0 : MT_lock_unset(&c->lock);
911 0 : return tmp;
912 : }
913 :
914 1400 : if ((tmp = RMTinternalcopyfrom(&b, buf, sin, true, c->int128)) != MAL_SUCCEED) {
915 0 : MT_lock_unset(&c->lock);
916 0 : return (tmp);
917 : }
918 :
919 1400 : *v = (ValRecord) {
920 1400 : .val.bval = b->batCacheid,
921 : .bat = true,
922 1400 : .vtype = b->ttype,
923 : };
924 1400 : BBPkeepref(b);
925 :
926 1400 : MT_lock_unset(&c->lock);
927 : } else {
928 0 : ptr p = NULL;
929 0 : str val;
930 0 : size_t len = 0;
931 :
932 0 : snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
933 0 : TRC_DEBUG(MAL_REMOTE, "Remote get: %s - %s\n", c->name, qbuf);
934 0 : if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf)) != MAL_SUCCEED) {
935 0 : return tmp;
936 : }
937 0 : (void) mapi_fetch_row(mhdl); /* should succeed */
938 0 : val = mapi_fetch_field(mhdl, 0);
939 :
940 0 : if (ATOMbasetype(rtype) == TYPE_str) {
941 0 : if (!VALinit(v, rtype, val == NULL ? str_nil : val)) {
942 0 : mapi_close_handle(mhdl);
943 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
944 : }
945 0 : } else if (ATOMfromstr(rtype, &p, &len, val == NULL ? "nil" : val, true)
946 : < 0) {
947 0 : char *msg;
948 0 : msg = createException(MAL, "remote.get",
949 : "unable to parse value: %s",
950 : val == NULL ? "nil" : val);
951 0 : mapi_close_handle(mhdl);
952 0 : GDKfree(p);
953 0 : return msg;
954 : } else {
955 0 : VALset(v, rtype, p);
956 0 : if (ATOMextern(rtype) == 0)
957 0 : GDKfree(p);
958 : }
959 :
960 0 : mapi_close_handle(mhdl);
961 : }
962 :
963 : return (MAL_SUCCEED);
964 : }
965 :
966 : /**
967 : * stores the given object on the remote host. The identifier of the
968 : * object on the remote host is returned for later use.
969 : */
970 : static str
971 3275 : RMTput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
972 : {
973 3275 : str conn, tmp;
974 3275 : char ident[512];
975 3275 : connection c;
976 3275 : ValPtr v;
977 3275 : int type;
978 3275 : ptr value;
979 3275 : MapiHdl mhdl = NULL;
980 :
981 3275 : (void) cntxt;
982 :
983 3275 : conn = *getArgReference_str(stk, pci, 1);
984 3275 : if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
985 5 : throw(ILLARG, "remote.put",
986 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
987 :
988 : /* lookup conn */
989 3270 : rethrow("remote.put", tmp, RMTfindconn(&c, conn));
990 :
991 : /* put the thing */
992 3270 : type = getArgType(mb, pci, 2);
993 3270 : value = getArgReference(stk, pci, 2);
994 :
995 : /* this call should be a single transaction over the channel */
996 3270 : MT_lock_set(&c->lock);
997 :
998 : /* get a free, typed identifier for the remote host */
999 3270 : tmp = RMTgetId(ident, sizeof(ident), mb, pci, 2);
1000 3270 : if (tmp != MAL_SUCCEED) {
1001 0 : MT_lock_unset(&c->lock);
1002 0 : return tmp;
1003 : }
1004 :
1005 : /* depending on the input object generate actions to store the
1006 : * object remotely*/
1007 3270 : if (type == TYPE_any || isAnyExpression(type)) {
1008 0 : char *tpe, *msg;
1009 0 : MT_lock_unset(&c->lock);
1010 0 : tpe = getTypeName(type);
1011 0 : msg = createException(MAL, "remote.put", "unsupported type: %s", tpe);
1012 0 : GDKfree(tpe);
1013 0 : return msg;
1014 4672 : } else if (isaBatType(type) && !is_bat_nil(*(bat *) value)) {
1015 1402 : BATiter bi;
1016 : /* naive approach using bat.new() and bat.insert() calls */
1017 1402 : char *tail;
1018 1402 : bat bid;
1019 1402 : BAT *b = NULL;
1020 1402 : BUN p, q;
1021 1402 : str tailv;
1022 1402 : stream *sout;
1023 :
1024 1402 : tail = getTypeIdentifier(getBatType(type));
1025 1402 : if (tail == NULL) {
1026 0 : MT_lock_unset(&c->lock);
1027 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1028 : }
1029 :
1030 1402 : bid = *(bat *) value;
1031 1402 : if (bid != 0) {
1032 1402 : if ((b = BATdescriptor(bid)) == NULL) {
1033 0 : MT_lock_unset(&c->lock);
1034 0 : GDKfree(tail);
1035 0 : throw(MAL, "remote.put",
1036 : SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
1037 : }
1038 : }
1039 :
1040 : /* bypass Mapi from this point to efficiently write all data to
1041 : * the server */
1042 1402 : sout = mapi_get_to(c->mconn);
1043 :
1044 : /* call our remote helper to do this more efficiently */
1045 1402 : mnstr_printf(sout,
1046 : "%s := remote.batload(nil:%s, " BUNFMT ");\n",
1047 : ident, tail, (bid == 0 ? 0 : BATcount(b)));
1048 1402 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
1049 1402 : GDKfree(tail);
1050 :
1051 : /* b can be NULL if bid == 0 (only type given, ugh) */
1052 1402 : if (b) {
1053 2804 : int tpe = getBatType(type), trivial = tpe < TYPE_date
1054 1402 : || ATOMbasetype(tpe) == TYPE_str;
1055 1402 : const void *nil = ATOMnilptr(tpe);
1056 1402 : int (*atomcmp)(const void *, const void *) = ATOMcompare(tpe);
1057 :
1058 1402 : bi = bat_iterator(b);
1059 1402 : BATloop(b, p, q) {
1060 0 : const void *v = BUNtail(bi, p);
1061 0 : tailv = ATOMformat(tpe, v);
1062 0 : if (tailv == NULL) {
1063 0 : bat_iterator_end(&bi);
1064 0 : BBPunfix(b->batCacheid);
1065 0 : MT_lock_unset(&c->lock);
1066 0 : throw(MAL, "remote.put", GDK_EXCEPTION);
1067 : }
1068 0 : if (trivial || atomcmp(v, nil) == 0)
1069 0 : mnstr_printf(sout, "%s\n", tailv);
1070 : else
1071 0 : mnstr_printf(sout, "\"%s\"\n", tailv);
1072 0 : GDKfree(tailv);
1073 : }
1074 1402 : bat_iterator_end(&bi);
1075 1402 : BBPunfix(b->batCacheid);
1076 : }
1077 :
1078 : /* write the empty line the server is waiting for, handles
1079 : * all errors at the same time, if any */
1080 1402 : if ((tmp = RMTquery(&mhdl, "remote.put", c->mconn, ""))
1081 : != MAL_SUCCEED) {
1082 0 : MT_lock_unset(&c->lock);
1083 0 : return tmp;
1084 : }
1085 1402 : mapi_close_handle(mhdl);
1086 0 : } else if (isaBatType(type) && is_bat_nil(*(bat *) value)) {
1087 0 : stream *sout;
1088 0 : str typename = getTypeName(type);
1089 0 : sout = mapi_get_to(c->mconn);
1090 0 : mnstr_printf(sout, "%s := nil:%s;\n", ident, typename);
1091 0 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
1092 0 : GDKfree(typename);
1093 : } else {
1094 1868 : size_t l;
1095 1868 : str val;
1096 1868 : char *tpe;
1097 1868 : char qbuf[512], *nbuf = qbuf;
1098 1868 : const void *nil = ATOMnilptr(type), *p = value;
1099 1868 : int (*atomcmp)(const void *, const void *) = ATOMcompare(type);
1100 :
1101 1868 : if (ATOMextern(type))
1102 1297 : p = *(ptr *) value;
1103 :
1104 1868 : val = ATOMformat(type, p);
1105 1868 : if (val == NULL) {
1106 0 : MT_lock_unset(&c->lock);
1107 0 : throw(MAL, "remote.put", GDK_EXCEPTION);
1108 : }
1109 1868 : tpe = getTypeIdentifier(type);
1110 1868 : if (tpe == NULL) {
1111 0 : MT_lock_unset(&c->lock);
1112 0 : GDKfree(val);
1113 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1114 : }
1115 1868 : l = strlen(val) + strlen(tpe) + strlen(ident) + 10;
1116 1868 : if (l > (ssize_t) sizeof(qbuf) && (nbuf = GDKmalloc(l)) == NULL) {
1117 0 : MT_lock_unset(&c->lock);
1118 0 : GDKfree(val);
1119 0 : GDKfree(tpe);
1120 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1121 : }
1122 1868 : if (type < TYPE_date || ATOMbasetype(type) == TYPE_str
1123 8 : || atomcmp(p, nil) == 0)
1124 1863 : snprintf(nbuf, l, "%s := %s:%s;\n", ident, val, tpe);
1125 : else
1126 5 : snprintf(nbuf, l, "%s := \"%s\":%s;\n", ident, val, tpe);
1127 1868 : GDKfree(tpe);
1128 1868 : GDKfree(val);
1129 1868 : TRC_DEBUG(MAL_REMOTE, "Remote put: %s - %s\n", c->name, nbuf);
1130 1868 : tmp = RMTquery(&mhdl, "remote.put", c->mconn, nbuf);
1131 1867 : if (nbuf != qbuf)
1132 33 : GDKfree(nbuf);
1133 1868 : if (tmp != MAL_SUCCEED) {
1134 0 : MT_lock_unset(&c->lock);
1135 0 : return tmp;
1136 : }
1137 1868 : mapi_close_handle(mhdl);
1138 : }
1139 3270 : MT_lock_unset(&c->lock);
1140 :
1141 : /* return the identifier */
1142 3270 : v = &stk->stk[pci->argv[0]];
1143 3270 : if (VALinit(v, TYPE_str, ident) == NULL)
1144 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1145 : return (MAL_SUCCEED);
1146 : }
1147 :
1148 : /**
1149 : * stores the given <mod>.<fcn> on the remote host.
1150 : * An error is returned if the function is already known at the remote site.
1151 : * The implementation is based on serialisation of the block into a string
1152 : * followed by remote parsing.
1153 : */
1154 : static str
1155 0 : RMTregisterInternal(Client cntxt, char **fcn_id, const char *conn,
1156 : const char *mod, const char *fcn)
1157 : {
1158 0 : str tmp, qry, msg;
1159 0 : connection c;
1160 0 : char buf[BUFSIZ];
1161 0 : MapiHdl mhdl = NULL;
1162 0 : Symbol sym;
1163 :
1164 0 : if (strNil(conn))
1165 0 : throw(ILLARG, "remote.register",
1166 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1167 :
1168 : /* find local definition */
1169 0 : sym = findSymbol(cntxt->usermodule, putName(mod), putName(fcn));
1170 0 : if (sym == NULL)
1171 0 : throw(MAL, "remote.register",
1172 : ILLEGAL_ARGUMENT ": no such function: %s.%s", mod, fcn);
1173 :
1174 : /* lookup conn */
1175 0 : rethrow("remote.register", tmp, RMTfindconn(&c, conn));
1176 :
1177 : /* this call should be a single transaction over the channel */
1178 0 : MT_lock_set(&c->lock);
1179 :
1180 : /* get a free, typed identifier for the remote host */
1181 0 : char ident[512];
1182 0 : tmp = RMTgetId(ident, sizeof(ident), sym->def, getInstrPtr(sym->def, 0), 0);
1183 0 : if (tmp != MAL_SUCCEED) {
1184 0 : MT_lock_unset(&c->lock);
1185 0 : return tmp;
1186 : }
1187 :
1188 : /* check remote definition */
1189 0 : snprintf(buf, BUFSIZ,
1190 : "b:bit:=inspect.getExistence(\"%s\",\"%s\");\nio.print(b);", mod,
1191 : ident);
1192 0 : TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, buf);
1193 0 : if ((msg = RMTquery(&mhdl, "remote.register", c->mconn, buf)) != MAL_SUCCEED) {
1194 0 : MT_lock_unset(&c->lock);
1195 0 : return msg;
1196 : }
1197 :
1198 0 : char *result;
1199 0 : if (mapi_get_field_count(mhdl) && mapi_fetch_row(mhdl)
1200 0 : && (result = mapi_fetch_field(mhdl, 0))) {
1201 0 : if (strcmp(result, "false") != 0)
1202 0 : msg = createException(MAL, "remote.register",
1203 : "function already exists at the remote site: %s.%s",
1204 : mod, fcn);
1205 : } else
1206 0 : msg = createException(MAL, "remote.register", OPERATION_FAILED);
1207 :
1208 0 : mapi_close_handle(mhdl);
1209 :
1210 0 : if (msg) {
1211 0 : MT_lock_unset(&c->lock);
1212 0 : return msg;
1213 : }
1214 :
1215 0 : *fcn_id = GDKstrdup(ident);
1216 0 : if (*fcn_id == NULL) {
1217 0 : MT_lock_unset(&c->lock);
1218 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1219 : }
1220 :
1221 0 : Symbol prg;
1222 0 : if ((prg = newFunctionArgs(putName(mod), putName(*fcn_id), FUNCTIONsymbol, -1)) == NULL) {
1223 0 : MT_lock_unset(&c->lock);
1224 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1225 : }
1226 :
1227 : // We only need the Symbol not the inner program stub. So we clear it.
1228 0 : freeMalBlk(prg->def);
1229 0 : prg->def = NULL;
1230 :
1231 0 : if ((prg->def = copyMalBlk(sym->def)) == NULL) {
1232 0 : MT_lock_unset(&c->lock);
1233 0 : freeSymbol(prg);
1234 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1235 : }
1236 0 : setFunctionId(getInstrPtr(prg->def, 0), putName(*fcn_id));
1237 :
1238 : /* make sure the program is error free */
1239 0 : msg = chkProgram(cntxt->usermodule, prg->def);
1240 0 : if (msg != MAL_SUCCEED || prg->def->errors) {
1241 0 : MT_lock_unset(&c->lock);
1242 0 : if (msg)
1243 : return msg;
1244 0 : throw(MAL, "remote.register",
1245 : "function '%s.%s' contains syntax or type errors", mod, *fcn_id);
1246 : }
1247 :
1248 0 : qry = mal2str(prg->def, 0, prg->def->stop);
1249 0 : TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, qry);
1250 0 : msg = RMTquery(&mhdl, "remote.register", c->mconn, qry);
1251 0 : GDKfree(qry);
1252 0 : if (mhdl)
1253 0 : mapi_close_handle(mhdl);
1254 :
1255 0 : freeSymbol(prg);
1256 :
1257 0 : MT_lock_unset(&c->lock);
1258 0 : return msg;
1259 : }
1260 :
1261 : static str
1262 0 : RMTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1263 : {
1264 0 : char **fcn_id = getArgReference_str(stk, pci, 0);
1265 0 : const char *conn = *getArgReference_str(stk, pci, 1);
1266 0 : const char *mod = *getArgReference_str(stk, pci, 2);
1267 0 : const char *fcn = *getArgReference_str(stk, pci, 3);
1268 0 : (void) mb;
1269 0 : return RMTregisterInternal(cntxt, fcn_id, conn, mod, fcn);
1270 : }
1271 :
1272 : /**
1273 : * exec executes the function with its given arguments on the remote
1274 : * host, returning the function's return value. exec is purposely kept
1275 : * very spartan. All arguments need to be handles to previously put()
1276 : * values. It calls the function with the given arguments at the remote
1277 : * site, and returns the handle which stores the return value of the
1278 : * remotely executed function. This return value can be retrieved using
1279 : * a get call. It handles multiple return arguments.
1280 : */
1281 : static str
1282 733 : RMTexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1283 : {
1284 733 : str conn, mod, func, tmp;
1285 733 : int i;
1286 733 : size_t len, buflen;
1287 733 : connection c = NULL;
1288 733 : char *qbuf;
1289 733 : MapiHdl mhdl;
1290 :
1291 733 : (void) cntxt;
1292 733 : (void) mb;
1293 733 : bool no_return_arguments = 0;
1294 :
1295 733 : columnar_result_callback *rcb = NULL;
1296 733 : ValRecord *v = &(stk)->stk[(pci)->argv[4]];
1297 733 : if (pci->retc == 1 && (pci->argc >= 4) && (v->vtype == TYPE_ptr)) {
1298 1 : rcb = (columnar_result_callback *) v->val.pval;
1299 : }
1300 :
1301 2687 : for (i = 0; i < pci->retc; i++) {
1302 1954 : if (stk->stk[pci->argv[i]].vtype == TYPE_str) {
1303 1953 : tmp = *getArgReference_str(stk, pci, i);
1304 1953 : if (tmp == NULL || strcmp(tmp, (str) str_nil) == 0)
1305 0 : throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT
1306 : ": return value %d is NULL or nil", i);
1307 : } else
1308 : no_return_arguments = 1;
1309 : }
1310 :
1311 733 : conn = *getArgReference_str(stk, pci, i++);
1312 733 : if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
1313 0 : throw(ILLARG, "remote.exec",
1314 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1315 733 : mod = *getArgReference_str(stk, pci, i++);
1316 733 : if (mod == NULL || strcmp(mod, (str) str_nil) == 0)
1317 0 : throw(ILLARG, "remote.exec",
1318 : ILLEGAL_ARGUMENT ": module name is NULL or nil");
1319 733 : func = *getArgReference_str(stk, pci, i++);
1320 733 : if (func == NULL || strcmp(func, (str) str_nil) == 0)
1321 0 : throw(ILLARG, "remote.exec",
1322 : ILLEGAL_ARGUMENT ": function name is NULL or nil");
1323 :
1324 : /* lookup conn */
1325 733 : rethrow("remote.exec", tmp, RMTfindconn(&c, conn));
1326 :
1327 : /* this call should be a single transaction over the channel */
1328 733 : MT_lock_set(&c->lock);
1329 :
1330 733 : if (!no_return_arguments && pci->argc - pci->retc < 3) { /* conn, mod, func, ... */
1331 0 : MT_lock_unset(&c->lock);
1332 0 : throw(MAL, "remote.exec",
1333 : ILLEGAL_ARGUMENT " MAL instruction misses arguments");
1334 : }
1335 :
1336 733 : len = 0;
1337 : /* count how big a buffer we need */
1338 733 : len += 2 * (pci->retc > 1);
1339 733 : if (!no_return_arguments)
1340 2685 : for (i = 0; i < pci->retc; i++) {
1341 1953 : len += 2 * (i > 0);
1342 1953 : len += strlen(*getArgReference_str(stk, pci, i));
1343 : }
1344 :
1345 733 : const int arg_index = rcb ? 4 : 3;
1346 :
1347 733 : len += strlen(mod) + strlen(func) + 6;
1348 2050 : for (i = arg_index; i < pci->argc - pci->retc; i++) {
1349 1317 : len += 2 * (i > arg_index);
1350 1317 : len += strlen(*getArgReference_str(stk, pci, pci->retc + i));
1351 : }
1352 733 : len += 2;
1353 733 : buflen = len + 1;
1354 733 : if ((qbuf = GDKmalloc(buflen)) == NULL) {
1355 0 : MT_lock_unset(&c->lock);
1356 0 : throw(MAL, "remote.exec", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1357 : }
1358 :
1359 733 : len = 0;
1360 :
1361 733 : if (pci->retc > 1)
1362 70 : qbuf[len++] = '(';
1363 733 : if (!no_return_arguments)
1364 2685 : for (i = 0; i < pci->retc; i++)
1365 1953 : len += snprintf(&qbuf[len], buflen - len, "%s%s",
1366 1953 : (i > 0 ? ", " : ""), *getArgReference_str(stk, pci,
1367 : i));
1368 :
1369 733 : if (pci->retc > 1)
1370 70 : qbuf[len++] = ')';
1371 :
1372 : /* build the function invocation string in qbuf */
1373 733 : if (!no_return_arguments && pci->retc > 0) {
1374 732 : len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, func);
1375 : } else {
1376 1 : len += snprintf(&qbuf[len], buflen - len, " %s.%s(", mod, func);
1377 : }
1378 :
1379 : /* handle the arguments to the function */
1380 :
1381 : /* put the arguments one by one, and dynamically build the
1382 : * invocation string */
1383 2050 : for (i = arg_index; i < pci->argc - pci->retc; i++) {
1384 1317 : len += snprintf(&qbuf[len], buflen - len, "%s%s",
1385 : (i > arg_index ? ", " : ""),
1386 1317 : *(getArgReference_str(stk, pci, pci->retc + i)));
1387 : }
1388 :
1389 : /* finish end execute the invocation string */
1390 733 : len += snprintf(&qbuf[len], buflen - len, ");");
1391 733 : TRC_DEBUG(MAL_REMOTE, "Remote exec: %s - %s\n", c->name, qbuf);
1392 733 : tmp = RMTquery(&mhdl, "remote.exec", c->mconn, qbuf);
1393 733 : GDKfree(qbuf);
1394 :
1395 : /* Temporary hack:
1396 : * use a callback to immediately handle columnar results before hdl is destroyed. */
1397 733 : if (tmp == MAL_SUCCEED && rcb && mhdl
1398 1 : && (mapi_get_querytype(mhdl) == Q_TABLE
1399 0 : || mapi_get_querytype(mhdl) == Q_PREPARE)) {
1400 1 : int fields = mapi_get_field_count(mhdl);
1401 1 : columnar_result *results = GDKzalloc(sizeof(columnar_result) * fields);
1402 :
1403 1 : if (!results) {
1404 0 : tmp = createException(MAL, "remote.exec",
1405 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
1406 : } else {
1407 1 : int i = 0;
1408 1 : char buf[256] = { 0 };
1409 1 : stream *sin = mapi_get_from(c->mconn);
1410 :
1411 4 : for (; i < fields; i++) {
1412 2 : BAT *b = NULL;
1413 :
1414 2 : if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED ||
1415 2 : (tmp = RMTinternalcopyfrom(&b, buf, sin, i == fields - 1, c->int128)) != MAL_SUCCEED) {
1416 : break;
1417 : }
1418 :
1419 2 : results[i].id = b->batCacheid;
1420 2 : BBPkeepref(b);
1421 2 : results[i].colname = mapi_get_name(mhdl, i);
1422 2 : results[i].tpename = mapi_get_type(mhdl, i);
1423 2 : results[i].digits = mapi_get_digits(mhdl, i);
1424 2 : results[i].scale = mapi_get_scale(mhdl, i);
1425 : }
1426 :
1427 1 : if (tmp != MAL_SUCCEED) {
1428 0 : for (int j = 0; j < i; j++)
1429 0 : BBPrelease(results[j].id);
1430 : } else {
1431 1 : assert(rcb->context);
1432 1 : tmp = rcb->call(rcb->context, mapi_get_table(mhdl, 0), results,
1433 : fields);
1434 3 : for (int j = 0; j < i; j++)
1435 2 : BBPrelease(results[j].id);
1436 : }
1437 1 : GDKfree(results);
1438 : }
1439 : }
1440 :
1441 733 : if (rcb) {
1442 1 : GDKfree(rcb->context);
1443 1 : GDKfree(rcb);
1444 : }
1445 :
1446 733 : if (mhdl)
1447 727 : mapi_close_handle(mhdl);
1448 733 : MT_lock_unset(&c->lock);
1449 733 : return tmp;
1450 : }
1451 :
1452 : /**
1453 : * batload is a helper function to make transferring a BAT with RMTput
1454 : * more efficient. It works by creating a BAT, and loading it with the
1455 : * data as comma separated values from the input stream, until an empty
1456 : * line is read. The given size argument is taken as a hint only, and
1457 : * is not enforced to match the number of rows read.
1458 : */
1459 : static str
1460 1402 : RMTbatload(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1461 : {
1462 1402 : ValPtr v;
1463 1402 : int t;
1464 1402 : int size;
1465 1402 : ptr r;
1466 1402 : size_t s;
1467 1402 : BAT *b;
1468 1402 : size_t len;
1469 1402 : char *var;
1470 1402 : str msg = MAL_SUCCEED;
1471 1402 : bstream *fdin = cntxt->fdin;
1472 :
1473 1402 : v = &stk->stk[pci->argv[0]]; /* return */
1474 1402 : t = getArgType(mb, pci, 1); /* tail type */
1475 1402 : size = *getArgReference_int(stk, pci, 2); /* size */
1476 :
1477 1402 : b = COLnew(0, t, size, TRANSIENT);
1478 1402 : if (b == NULL)
1479 0 : throw(MAL, "remote.load", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1480 :
1481 : /* grab the input stream and start reading */
1482 1402 : fdin->eof = false;
1483 1402 : len = fdin->pos;
1484 1402 : while (len < fdin->len || bstream_next(fdin) > 0) {
1485 : /* newline hunting (how spartan) */
1486 1402 : for (len = fdin->pos; len < fdin->len && fdin->buf[len] != '\n';
1487 0 : len++) ;
1488 : /* unterminated line, request more */
1489 1402 : if (fdin->buf[len] != '\n')
1490 0 : continue;
1491 : /* empty line, end of input */
1492 1402 : if (fdin->pos == len) {
1493 1402 : if (isa_block_stream(fdin->s)) {
1494 1402 : ssize_t n = bstream_next(fdin);
1495 1402 : if (n)
1496 0 : msg = createException(MAL, "remote.load",
1497 : SQLSTATE(HY013)
1498 : "Unexpected return from remote");
1499 : }
1500 : break;
1501 : }
1502 0 : fdin->buf[len] = '\0'; /* kill \n */
1503 0 : var = &fdin->buf[fdin->pos];
1504 : /* skip over this line */
1505 0 : fdin->pos = ++len;
1506 :
1507 0 : s = 0;
1508 0 : r = NULL;
1509 0 : if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
1510 0 : BUNappend(b, r, false) != GDK_SUCCEED) {
1511 0 : BBPreclaim(b);
1512 0 : GDKfree(r);
1513 0 : throw(MAL, "remote.get", GDK_EXCEPTION);
1514 : }
1515 0 : GDKfree(r);
1516 : }
1517 :
1518 1402 : *v = (ValRecord) {
1519 1402 : .val.bval = b->batCacheid,
1520 : .bat = true,
1521 1402 : .vtype = b->ttype,
1522 : };
1523 1402 : BBPkeepref(b);
1524 :
1525 1402 : return msg;
1526 : }
1527 :
1528 : /**
1529 : * dump given BAT to stream
1530 : */
1531 : static str
1532 1400 : RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1533 : {
1534 1400 : bat bid = *getArgReference_bat(stk, pci, 1);
1535 1400 : BAT *b = BBPquickdesc(bid), *v = b;
1536 1400 : char sendtheap = 0, sendtvheap = 0;
1537 :
1538 1400 : (void) mb;
1539 1400 : (void) stk;
1540 1400 : (void) pci;
1541 :
1542 1400 : if (b == NULL)
1543 0 : throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_UNDEFINED);
1544 :
1545 1400 : if (BBPfix(bid) <= 0)
1546 0 : throw(MAL, "remote.bincopyto", MAL_MALLOC_FAIL);
1547 :
1548 1400 : sendtheap = b->ttype != TYPE_void;
1549 1400 : sendtvheap = sendtheap && b->tvheap;
1550 90 : if (sendtvheap && VIEWvtparent(b)
1551 41 : && BATcount(b) < BATcount(BBP_desc(VIEWvtparent(b)))) {
1552 17 : if ((b = BATdescriptor(bid)) == NULL) {
1553 0 : BBPunfix(bid);
1554 0 : throw(MAL, "remote.bincopyto", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
1555 : }
1556 17 : v = COLcopy(b, b->ttype, true, TRANSIENT);
1557 17 : BBPunfix(b->batCacheid);
1558 17 : if (v == NULL) {
1559 0 : BBPunfix(bid);
1560 0 : throw(MAL, "remote.bincopyto", GDK_EXCEPTION);
1561 : }
1562 : }
1563 :
1564 1400 : BATiter vi = bat_iterator(v);
1565 1490 : mnstr_printf(cntxt->fdout, /*JSON*/ "{"
1566 : "\"version\":1,"
1567 : "\"ttype\":%d,"
1568 : "\"hseqbase\":" OIDFMT ","
1569 : "\"tseqbase\":" OIDFMT ","
1570 : "\"tsorted\":%d,"
1571 : "\"trevsorted\":%d,"
1572 : "\"tkey\":%d,"
1573 : "\"tnonil\":%d,"
1574 : "\"tdense\":%d,"
1575 : "\"size\":" BUNFMT ","
1576 : "\"tailsize\":%zu,"
1577 : "\"theapsize\":%zu"
1578 : "}\n",
1579 1400 : vi.type,
1580 : v->hseqbase, v->tseqbase,
1581 1400 : vi.sorted, vi.revsorted,
1582 1400 : vi.key,
1583 1400 : vi.nonil,
1584 1400 : BATtdensebi(&vi),
1585 : vi.count,
1586 1370 : sendtheap ? (size_t) vi.count << vi.shift : 0,
1587 90 : sendtvheap && vi.count > 0 ? vi.vhfree : 0);
1588 :
1589 1400 : if (sendtheap && vi.count > 0) {
1590 1283 : mnstr_write(cntxt->fdout, /* tail */
1591 1283 : vi.base, vi.count * vi.width, 1);
1592 1283 : if (sendtvheap)
1593 75 : mnstr_write(cntxt->fdout, /* theap */
1594 75 : vi.vh->base, vi.vhfree, 1);
1595 : }
1596 1400 : bat_iterator_end(&vi);
1597 : /* flush is done by the calling environment (MAL) */
1598 :
1599 1400 : if (b != v)
1600 17 : BBPreclaim(v);
1601 :
1602 1400 : BBPunfix(bid);
1603 :
1604 1400 : return (MAL_SUCCEED);
1605 : }
1606 :
1607 : /**
1608 : * read from the input stream and give the BAT handle back to the caller
1609 : */
1610 : static str
1611 0 : RMTbincopyfrom(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1612 : {
1613 0 : BAT *b = NULL;
1614 0 : ValPtr v;
1615 0 : str err;
1616 :
1617 0 : (void) mb;
1618 :
1619 : /* We receive a normal line, which contains the JSON header, the
1620 : * rest is binary data directly on the stream. We get the first
1621 : * line from the buffered stream we have here, and pass it on
1622 : * together with the raw stream we have. */
1623 0 : cntxt->fdin->eof = false; /* in case it was before */
1624 0 : if (bstream_next(cntxt->fdin) <= 0)
1625 0 : throw(MAL, "remote.bincopyfrom", "expected JSON header");
1626 :
1627 0 : cntxt->fdin->buf[cntxt->fdin->len] = '\0';
1628 0 : err = RMTinternalcopyfrom(&b,
1629 0 : &cntxt->fdin->buf[cntxt->fdin->pos], cntxt->fdin->s, true, int128 /* library should be compatible */);
1630 : /* skip the JSON line */
1631 0 : cntxt->fdin->pos = ++cntxt->fdin->len;
1632 0 : if (err !=MAL_SUCCEED)
1633 : return (err);
1634 :
1635 0 : v = &stk->stk[pci->argv[0]];
1636 0 : *v = (ValRecord) {
1637 0 : .val.bval = b->batCacheid,
1638 : .bat = true,
1639 0 : .vtype = b->ttype,
1640 : };
1641 0 : BBPkeepref(b);
1642 :
1643 0 : return (MAL_SUCCEED);
1644 : }
1645 :
1646 : /**
1647 : * bintype identifies the system on its binary profile. This is mainly
1648 : * used to determine if BATs can be sent binary across.
1649 : */
1650 : static str
1651 186 : RMTbintype(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1652 : {
1653 186 : (void)mb;
1654 186 : (void)stk;
1655 186 : (void)pci;
1656 :
1657 : /* TODO bintype should include the (bin) protocol version */
1658 186 : mnstr_printf(cntxt->fdout, "[ %d ]\n", localtype);
1659 186 : return(MAL_SUCCEED);
1660 : }
1661 :
1662 : /**
1663 : * Returns whether the underlying connection is still connected or not.
1664 : * Best effort implementation on top of mapi using a ping.
1665 : */
1666 : static str
1667 0 : RMTisalive(int *ret, const char *const *conn)
1668 : {
1669 0 : str tmp;
1670 0 : connection c;
1671 :
1672 0 : if (*conn == NULL || strcmp(*conn, (str) str_nil) == 0)
1673 0 : throw(ILLARG, "remote.get",
1674 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1675 :
1676 : /* lookup conn, set c if valid */
1677 0 : rethrow("remote.get", tmp, RMTfindconn(&c, *conn));
1678 :
1679 0 : *ret = 0;
1680 0 : if (mapi_is_connected(c->mconn) && mapi_ping(c->mconn) == MOK)
1681 0 : *ret = 1;
1682 :
1683 : return MAL_SUCCEED;
1684 : }
1685 :
1686 : // This is basically a no op
1687 : static str
1688 362 : RMTregisterSupervisor(int *ret, const char *const *sup_uuid, const char *const *query_uuid)
1689 : {
1690 362 : (void) sup_uuid;
1691 362 : (void) query_uuid;
1692 :
1693 362 : *ret = 0;
1694 362 : return MAL_SUCCEED;
1695 : }
1696 :
1697 : #include "mel.h"
1698 : mel_func remote_init_funcs[] = {
1699 : command("remote", "epilogue", RMTepilogue, false, "release the resources held by the remote module", args(1,1, arg("",void))),
1700 : command("remote", "resolve", RMTresolve, false, "resolve a pattern against Merovingian and return the URIs", args(1,2, batarg("",str),arg("pattern",str))),
1701 : pattern("remote", "connect", RMTconnect, false, "returns a newly created connection for uri, using user name and password", args(1,5, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str))),
1702 : command("remote", "connect", RMTconnectScen, false, "returns a newly created connection for uri, using user name, password and scenario", args(1,6, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str),arg("columnar",bit))),
1703 : command("remote", "disconnect", RMTdisconnect, false, "disconnects the connection pointed to by handle (received from a call to connect()", args(1,2, arg("",void),arg("conn",str))),
1704 : pattern("remote", "get", RMTget, false, "retrieves a copy of remote object ident", args(1,3, argany("",0),arg("conn",str),arg("ident",str))),
1705 : pattern("remote", "put", RMTput, false, "copies object to the remote site and returns its identifier", args(1,3, arg("",str),arg("conn",str),argany("object",0))),
1706 : pattern("remote", "register", RMTregister, false, "register <mod>.<fcn> at the remote site", args(1,4, arg("",str),arg("conn",str),arg("mod",str),arg("fcn",str))),
1707 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and returns the handle to its result", args(1,4, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
1708 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and returns the handle to its result", args(1,5, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
1709 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and applying function pointer rcb as callback to handle any results.", args(0,5, arg("conn",str),arg("mod",str),arg("func",str),arg("rcb",ptr), vararg("",str))),
1710 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and ignoring results.", args(0,4, arg("conn",str),arg("mod",str),arg("func",str), vararg("",str))),
1711 : command("remote", "isalive", RMTisalive, false, "check if conn is still valid and connected", args(1,2, arg("",int),arg("conn",str))),
1712 : pattern("remote", "batload", RMTbatload, false, "create a BAT of the given type and size, and load values from the input stream", args(1,3, batargany("",1),argany("tt",1),arg("size",int))),
1713 : pattern("remote", "batbincopy", RMTbincopyto, false, "dump BAT b in binary form to the stream", args(1,2, arg("",void),batargany("b",0))),
1714 : pattern("remote", "batbincopy", RMTbincopyfrom, false, "store the binary BAT data in the BBP and return as BAT", args(1,1, batargany("",0))),
1715 : pattern("remote", "bintype", RMTbintype, false, "print the binary type of this mserver5", args(1,1, arg("",void))),
1716 : command("remote", "register_supervisor", RMTregisterSupervisor, false, "Register the supervisor uuid at a remote site", args(1,3, arg("",int),arg("sup_uuid",str),arg("query_uuid",str))),
1717 : { .imp=NULL }
1718 : };
1719 : #include "mal_import.h"
1720 : #ifdef _MSC_VER
1721 : #undef read
1722 : #pragma section(".CRT$XCU",read)
1723 : #endif
1724 325 : LIB_STARTUP_FUNC(init_remote_mal)
1725 325 : { mal_module2("remote", NULL, remote_init_funcs, RMTprelude, NULL); }
|