Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * (c) Fabian Groffen, Martin Kersten
15 : * Remote querying functionality
16 : * Communication with other mservers at the MAL level is a delicate task.
17 : * However, it is indispensable for any distributed functionality. This
18 : * module provides an abstract way to store and retrieve objects on a
19 : * remote site. Additionally, functions on a remote site can be executed
20 : * using objects available in the remote session context. This yields in
21 : * four primitive functions that form the basis for distribution methods:
22 : * get, put, register and exec.
23 : *
24 : * The get method simply retrieves a copy of a remote object. Objects can
25 : * be simple values, strings or Column. The same holds for the put method,
26 : * but the other way around. A local object can be stored on a remote
27 : * site. Upon a successful store, the put method returns the remote
28 : * identifier for the stored object. With this identifier the object can
29 : * be addressed, e.g. using the get method to retrieve the object that was
30 : * stored using put.
31 : *
32 : * The get and put methods are symmetric. Performing a get on an
33 : * identifier that was returned by put, results in an object with the same
34 : * value and type as the one that was put. The result of such an operation is
35 : * equivalent to making an (expensive) copy of the original object.
36 : *
37 : * The register function takes a local MAL function and makes it known at a
38 : * remote site. It ensures that it does not overload an already known
39 : * operation remotely, which could create a semantic conflict.
40 : * Deregistering a function is forbidden, because it would allow for taking
41 : * over the remote site completely.
42 : * C-implemented functions, such as io.print() cannot be remotely stored.
43 : * It would require even more complicated (byte) code shipping and remote
44 : * compilation to make it work.
45 : *
46 : * The choice to let exec only execute functions avoids problems
47 : * to decide what should be returned to the caller. With a function it is
48 : * clear and simple to return that what the function signature prescribes.
49 : * Any side effect (e.g. io.print calls) may cause havoc in the system,
50 : * but are currently ignored.
51 : *
52 : * This leads to the final contract of this module. The methods should be
53 : * used correctly, by obeying their contract. Failing to do so will result
54 : * in errors and possibly undefined behaviour.
55 : *
56 : * The resolve() function can be used to query Merovingian. It returns one
57 : * or more databases discovered in its vicinity matching the given pattern.
58 : *
59 : */
60 : #include "monetdb_config.h"
61 : #include "remote.h"
62 :
63 : /*
64 : * Technically, these methods need to be serialised per connection,
65 : * hence a scheduler that interleaves e.g. multiple get calls, simply
66 : * violates this constraint. If parallelism to the same site is
67 : * desired, a user could create a second connection. This is not always
68 : * easy to generate at the proper place, e.g. overloading the dataflow
69 : * optimizer to patch connections structures is not acceptable.
70 : *
71 : * Instead, we maintain a simple lock with each connection, which can be
72 : * used to issue a safe, but blocking get/put/exec/register request.
73 : */
74 : #include "mal_exception.h"
75 : #include "mal_interpreter.h"
76 : #include "mal_function.h" /* for printFunction */
77 : #include "mal_listing.h"
78 : #include "mal_instruction.h" /* for getmodule/func macros */
79 : #include "mal_authorize.h"
80 : #include "mapi.h"
81 : #include "mutils.h"
82 :
83 : #define RMTT_L_ENDIAN (0<<1)
84 : #define RMTT_B_ENDIAN (1<<1)
85 : #define RMTT_32_BITS (0<<2)
86 : #define RMTT_64_BITS (1<<2)
87 : #define RMTT_32_OIDS (0<<3)
88 : #define RMTT_64_OIDS (1<<3)
89 : #define RMTT_HGE (1<<4)
90 :
91 : typedef struct _connection {
92 : MT_Lock lock; /* lock to avoid interference */
93 : str name; /* the handle for this connection */
94 : Mapi mconn; /* the Mapi handle for the connection */
95 : unsigned char type; /* binary profile of the connection target */
96 : bool int128; /* has int128 support */
97 : size_t nextid; /* id counter */
98 : struct _connection *next; /* the next connection in the list */
99 : } *connection;
100 :
101 : #ifndef WIN32
102 : #include <sys/socket.h> /* socket */
103 : #include <sys/un.h> /* sockaddr_un */
104 : #endif
105 : #include <unistd.h> /* gethostname */
106 :
107 : static MT_Lock mal_remoteLock = MT_LOCK_INITIALIZER(mal_remoteLock);
108 :
109 : static connection conns = NULL;
110 : static unsigned char localtype = 0177;
111 : static bool int128 = false;
112 :
113 : static inline str RMTquery(MapiHdl *ret, const char *func, Mapi conn,
114 : const char *query);
115 :
116 : /**
117 : * Returns a BAT with valid redirects for the given pattern. If
118 : * merovingian is not running, this function throws an error.
119 : */
120 : static str
121 0 : RMTresolve(bat *ret, const char *const *pat)
122 : {
123 : #ifdef NATIVE_WIN32
124 : (void) ret;
125 : (void) pat;
126 : throw(MAL, "remote.resolve", "merovingian is not available on " "your platform, sorry"); /* please upgrade to Linux, etc. */
127 : #else
128 0 : BAT *list;
129 0 : const char *mero_uri;
130 0 : char *p;
131 0 : unsigned int port;
132 0 : char **redirs;
133 0 : char **or;
134 :
135 0 : if (pat == NULL || *pat == NULL || strcmp(*pat, (str) str_nil) == 0)
136 0 : throw(ILLARG, "remote.resolve",
137 : ILLEGAL_ARGUMENT ": pattern is NULL or nil");
138 :
139 0 : mero_uri = GDKgetenv("merovingian_uri");
140 0 : if (mero_uri == NULL)
141 0 : throw(MAL, "remote.resolve", "this function needs the mserver "
142 : "have been started by merovingian");
143 :
144 0 : list = COLnew(0, TYPE_str, 0, TRANSIENT);
145 0 : if (list == NULL)
146 0 : throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
147 :
148 : /* extract port from mero_uri, let mapi figure out the rest */
149 0 : mero_uri += strlen("mapi:monetdb://");
150 0 : if (*mero_uri == '[') {
151 0 : if ((mero_uri = strchr(mero_uri, ']')) == NULL) {
152 0 : BBPreclaim(list);
153 0 : throw(MAL, "remote.resolve",
154 : "illegal IPv6 address on merovingian_uri: %s",
155 : GDKgetenv("merovingian_uri"));
156 : }
157 : }
158 0 : if ((p = strchr(mero_uri, ':')) == NULL) {
159 0 : BBPreclaim(list);
160 0 : throw(MAL, "remote.resolve", "illegal merovingian_uri setting: %s",
161 : GDKgetenv("merovingian_uri"));
162 : }
163 0 : port = (unsigned int) atoi(p + 1);
164 :
165 0 : or = redirs = mapi_resolve(NULL, port, *pat);
166 :
167 0 : if (redirs == NULL) {
168 0 : BBPreclaim(list);
169 0 : throw(MAL, "remote.resolve", "unknown failure when resolving pattern");
170 : }
171 :
172 0 : while (*redirs != NULL) {
173 0 : if (BUNappend(list, (ptr) *redirs, false) != GDK_SUCCEED) {
174 0 : BBPreclaim(list);
175 0 : do
176 0 : free(*redirs);
177 0 : while (*++redirs);
178 0 : free(or);
179 0 : throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
180 : }
181 0 : free(*redirs);
182 0 : redirs++;
183 : }
184 0 : free(or);
185 :
186 0 : *ret = list->batCacheid;
187 0 : BBPkeepref(list);
188 0 : return (MAL_SUCCEED);
189 : #endif
190 : }
191 :
192 :
193 : /* for unique connection identifiers */
194 : static size_t connection_id = 0;
195 :
196 : /**
197 : * Returns a connection to the given uri. It always returns a newly
198 : * created connection.
199 : */
200 : static str
201 187 : RMTconnectScen(str *ret,
202 : const char *const *ouri, const char *const *user, const char *const *passwd, const char *const *scen, bit *columnar)
203 : {
204 187 : connection c;
205 187 : char conn[BUFSIZ];
206 187 : char *s;
207 187 : Mapi m;
208 187 : MapiHdl hdl;
209 187 : str msg;
210 :
211 : /* just make sure the return isn't garbage */
212 187 : *ret = 0;
213 :
214 187 : if (ouri == NULL || *ouri == NULL || strcmp(*ouri, (str) str_nil) == 0)
215 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": database uri "
216 : "is NULL or nil");
217 187 : if (user == NULL || *user == NULL || strcmp(*user, (str) str_nil) == 0)
218 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": username is "
219 : "NULL or nil");
220 187 : if (passwd == NULL || *passwd == NULL
221 187 : || strcmp(*passwd, (str) str_nil) == 0)
222 0 : throw(ILLARG, "remote.connect",
223 : ILLEGAL_ARGUMENT ": password is " "NULL or nil");
224 187 : if (scen == NULL || *scen == NULL || strcmp(*scen, (str) str_nil) == 0)
225 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario is "
226 : "NULL or nil");
227 187 : if (strcmp(*scen, "mal") != 0 && strcmp(*scen, "msql") != 0)
228 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario '%s' "
229 : "is not supported", *scen);
230 :
231 187 : m = mapi_mapiuri(*ouri, *user, *passwd, *scen);
232 188 : if (mapi_error(m)) {
233 0 : msg = createException(MAL, "remote.connect",
234 : "unable to connect to '%s': %s",
235 : *ouri, mapi_error_str(m));
236 0 : mapi_destroy(m);
237 0 : return msg;
238 : }
239 :
240 189 : MT_lock_set(&mal_remoteLock);
241 :
242 : /* generate an unique connection name, they are only known
243 : * within one mserver, id is primary key, the rest is super key */
244 191 : snprintf(conn, BUFSIZ, "%s_%s_%zu", mapi_get_dbname(m), *user,
245 : connection_id++);
246 : /* make sure we can construct MAL identifiers using conn */
247 5380 : for (s = conn; *s != '\0'; s++) {
248 5189 : if (!isalnum((unsigned char) *s)) {
249 743 : *s = '_';
250 : }
251 : }
252 :
253 191 : if (mapi_reconnect(m) != MOK) {
254 5 : MT_lock_unset(&mal_remoteLock);
255 5 : msg = createException(IO, "remote.connect",
256 : "unable to connect to '%s': %s",
257 : *ouri, mapi_error_str(m));
258 5 : mapi_destroy(m);
259 5 : return msg;
260 : }
261 :
262 186 : if (columnar && *columnar) {
263 1 : char set_protocol_query_buf[50];
264 1 : snprintf(set_protocol_query_buf, 50, "sql.set_protocol(%d:int);",
265 : PROTOCOL_COLUMNAR);
266 1 : if ((msg = RMTquery(&hdl, "remote.connect", m, set_protocol_query_buf))) {
267 0 : mapi_destroy(m);
268 0 : MT_lock_unset(&mal_remoteLock);
269 0 : return msg;
270 : }
271 : }
272 :
273 : /* connection established, add to list */
274 186 : c = GDKzalloc(sizeof(struct _connection));
275 186 : if (c == NULL || (c->name = GDKstrdup(conn)) == NULL) {
276 0 : GDKfree(c);
277 0 : mapi_destroy(m);
278 0 : MT_lock_unset(&mal_remoteLock);
279 0 : throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
280 : }
281 186 : c->mconn = m;
282 186 : c->nextid = 0;
283 186 : MT_lock_init(&c->lock, c->name);
284 186 : c->next = conns;
285 186 : conns = c;
286 :
287 186 : msg = RMTquery(&hdl, "remote.connect", m, "remote.bintype();");
288 186 : if (msg) {
289 0 : MT_lock_unset(&mal_remoteLock);
290 0 : return msg;
291 : }
292 372 : if (hdl != NULL && mapi_fetch_row(hdl)) {
293 186 : char *val = mapi_fetch_field(hdl, 0);
294 186 : c->type = (unsigned char) atoi(val);
295 186 : mapi_close_handle(hdl);
296 : } else {
297 0 : c->type = 0;
298 : }
299 :
300 : #ifdef _DEBUG_MAPI_
301 : mapi_trace(c->mconn, true);
302 : #endif
303 186 : if (c->type != localtype && (c->type | RMTT_HGE) == localtype) {
304 : /* we support hge, and for remote, we don't know */
305 0 : msg = RMTquery(&hdl, "remote.connect", m, "x := 0:hge;");
306 0 : if (msg) {
307 0 : freeException(msg);
308 0 : c->int128 = false;
309 : } else {
310 0 : mapi_close_handle(hdl);
311 0 : c->int128 = true;
312 0 : c->type |= RMTT_HGE;
313 : }
314 186 : } else if (c->type == localtype) {
315 186 : c->int128 = int128;
316 : }
317 186 : MT_lock_unset(&mal_remoteLock);
318 :
319 186 : *ret = GDKstrdup(conn);
320 186 : if (*ret == NULL)
321 0 : throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
322 : return (MAL_SUCCEED);
323 : }
324 :
325 : static str
326 188 : RMTconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
327 : {
328 188 : (void) cntxt;
329 188 : (void) mb;
330 188 : str *ret = getArgReference_str(stk, pci, 0);
331 188 : const char *uri = *getArgReference_str(stk, pci, 1);
332 188 : const char *user = *getArgReference_str(stk, pci, 2);
333 188 : const char *passwd = *getArgReference_str(stk, pci, 3);
334 :
335 188 : const char *scen = "msql";
336 :
337 188 : if (pci->argc >= 5)
338 188 : scen = *getArgReference_str(stk, pci, 4);
339 :
340 188 : return RMTconnectScen(ret, &uri, &user, &passwd, &scen, NULL);
341 : }
342 :
343 : /**
344 : * Disconnects a connection. The connection needs not to exist in the
345 : * system, it only needs to exist for the client (i.e. it was once
346 : * created).
347 : */
348 : str
349 186 : RMTdisconnect(void *ret, const char *const *conn)
350 : {
351 186 : connection c, t;
352 :
353 186 : if (conn == NULL || *conn == NULL || strcmp(*conn, (str) str_nil) == 0)
354 1 : throw(ILLARG, "remote.disconnect", ILLEGAL_ARGUMENT ": connection "
355 : "is NULL or nil");
356 :
357 :
358 185 : (void) ret;
359 :
360 : /* we need a lock because the same user can be handled by multiple
361 : * threads */
362 185 : MT_lock_set(&mal_remoteLock);
363 186 : c = conns;
364 186 : t = NULL; /* parent */
365 : /* walk through the list */
366 266 : while (c != NULL) {
367 266 : if (strcmp(c->name, *conn) == 0) {
368 : /* ok, delete it... */
369 186 : if (t == NULL) {
370 148 : conns = c->next;
371 : } else {
372 38 : t->next = c->next;
373 : }
374 :
375 186 : MT_lock_set(&c->lock); /* shared connection */
376 186 : mapi_disconnect(c->mconn);
377 186 : mapi_destroy(c->mconn);
378 186 : MT_lock_unset(&c->lock);
379 186 : MT_lock_destroy(&c->lock);
380 186 : GDKfree(c->name);
381 186 : GDKfree(c);
382 186 : MT_lock_unset(&mal_remoteLock);
383 186 : return MAL_SUCCEED;
384 : }
385 80 : t = c;
386 80 : c = c->next;
387 : }
388 :
389 0 : MT_lock_unset(&mal_remoteLock);
390 0 : throw(MAL, "remote.disconnect", "no such connection: %s", *conn);
391 : }
392 :
393 : /**
394 : * Helper function to return a connection matching a given string, or an
395 : * error if it does not exist. Since this function is internal, it
396 : * doesn't check the argument conn, as it should have been checked
397 : * already.
398 : * NOTE: this function acquires the mal_remoteLock before accessing conns
399 : */
400 : static inline str
401 5395 : RMTfindconn(connection *ret, const char *conn)
402 : {
403 5395 : connection c;
404 :
405 : /* just make sure the return isn't garbage */
406 5395 : *ret = NULL;
407 5395 : MT_lock_set(&mal_remoteLock); /* protect c */
408 5403 : c = conns;
409 7503 : while (c != NULL) {
410 7503 : if (strcmp(c->name, conn) == 0) {
411 5403 : *ret = c;
412 5403 : MT_lock_unset(&mal_remoteLock);
413 5403 : return (MAL_SUCCEED);
414 : }
415 2100 : c = c->next;
416 : }
417 0 : MT_lock_unset(&mal_remoteLock);
418 0 : throw(MAL, "remote.<findconn>", "no such connection: %s", conn);
419 : }
420 :
421 : /**
422 : * Little helper function that returns a GDKmalloced string containing a
423 : * valid identifier that is supposed to be unique in the connection's
424 : * remote context. The generated string depends on the module and
425 : * function the caller is in. But also the runtime context is important.
426 : * The format is rmt<id>_<retvar>_<type>. Every RMTgetId uses a fresh id,
427 : * to distinguish amongst different (parallel) execution context.
428 : * Reuse of this remote identifier should be done with care.
429 : * The encoding of the type allows for ease of type checking later on.
430 : */
431 : static inline str
432 3270 : RMTgetId(char *buf, size_t buflen, MalBlkPtr mb, InstrPtr p, int arg)
433 : {
434 3270 : str rt;
435 3270 : char name[IDLENGTH];
436 3270 : static ATOMIC_TYPE idtag = ATOMIC_VAR_INIT(0);
437 :
438 3270 : if (p->retc == 0)
439 0 : throw(MAL, "remote.getId",
440 : ILLEGAL_ARGUMENT "MAL instruction misses retc");
441 :
442 3270 : getArgNameIntoBuffer(mb, p, arg, name);
443 3269 : rt = getTypeIdentifier(getArgType(mb, p, arg));
444 3267 : if (rt == NULL)
445 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
446 :
447 3267 : snprintf(buf, buflen, "rmt%u_%s_%s", (unsigned) ATOMIC_ADD(&idtag, 1), name,
448 : rt);
449 :
450 3267 : GDKfree(rt);
451 3267 : return (MAL_SUCCEED);
452 : }
453 :
454 : /**
455 : * Helper function to execute a query over the given connection,
456 : * returning the result handle. If communication fails in one way or
457 : * another, an error is returned. Since this function is internal, it
458 : * doesn't check the input arguments func, conn and query, as they
459 : * should have been checked already.
460 : * NOTE: this function assumes a lock for conn is set
461 : */
462 : static inline str
463 4186 : RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query)
464 : {
465 4186 : MapiHdl mhdl;
466 :
467 4186 : *ret = NULL;
468 4186 : mhdl = mapi_query(conn, query);
469 4175 : if (mhdl) {
470 4175 : if (mapi_result_error(mhdl) != NULL) {
471 6 : str err = createException(getExceptionType(mapi_result_error(mhdl)),
472 : func,
473 : "(mapi:monetdb://%s@%s/%s) %s",
474 : mapi_get_user(conn),
475 : mapi_get_host(conn),
476 : mapi_get_dbname(conn),
477 : getExceptionMessage(mapi_result_error(mhdl)));
478 6 : mapi_close_handle(mhdl);
479 6 : return (err);
480 : }
481 : } else {
482 0 : if (mapi_error(conn) != MOK) {
483 0 : throw(IO, func, "an error occurred on connection: %s",
484 : mapi_error_str(conn));
485 : } else {
486 0 : throw(MAL, func,
487 : "remote function invocation didn't return a result");
488 : }
489 : }
490 :
491 4170 : *ret = mhdl;
492 4170 : return (MAL_SUCCEED);
493 : }
494 :
495 : static str
496 352 : RMTprelude(void)
497 : {
498 352 : unsigned int type = 0;
499 :
500 : #ifdef WORDS_BIGENDIAN
501 : type |= RMTT_B_ENDIAN;
502 : #else
503 352 : type |= RMTT_L_ENDIAN;
504 : #endif
505 : #if SIZEOF_SIZE_T == SIZEOF_LNG
506 352 : type |= RMTT_64_BITS;
507 : #else
508 : type |= RMTT_32_BITS;
509 : #endif
510 : #if SIZEOF_OID == SIZEOF_LNG
511 352 : type |= RMTT_64_OIDS;
512 : #else
513 : type |= RMTT_32_OIDS;
514 : #endif
515 : #ifdef HAVE_HGE
516 352 : type |= RMTT_HGE;
517 352 : int128 = true;
518 : #endif
519 352 : localtype = (unsigned char) type;
520 :
521 352 : return (MAL_SUCCEED);
522 : }
523 :
524 : static str
525 350 : RMTepilogue(void *ret)
526 : {
527 350 : connection c, t;
528 :
529 350 : (void) ret;
530 :
531 350 : MT_lock_set(&mal_remoteLock); /* nobody allowed here */
532 : /* free connections list */
533 350 : c = conns;
534 350 : while (c != NULL) {
535 0 : t = c;
536 0 : c = c->next;
537 0 : MT_lock_set(&t->lock);
538 0 : mapi_destroy(t->mconn);
539 0 : MT_lock_unset(&t->lock);
540 0 : MT_lock_destroy(&t->lock);
541 0 : GDKfree(t->name);
542 0 : GDKfree(t);
543 : }
544 : /* not sure, but better be safe than sorry */
545 350 : conns = NULL;
546 350 : MT_lock_unset(&mal_remoteLock);
547 :
548 350 : return (MAL_SUCCEED);
549 : }
550 :
551 : static str
552 1397 : RMTreadbatheader(stream *sin, char *buf)
553 : {
554 1397 : ssize_t sz = 0, rd;
555 :
556 : /* read the JSON header */
557 225489 : while ((rd = mnstr_read(sin, &buf[sz], 1, 1)) == 1 && buf[sz] != '\n') {
558 224092 : sz += rd;
559 : }
560 1402 : if (rd < 0) {
561 0 : throw(MAL, "remote.get", "could not read BAT JSON header");
562 : }
563 1402 : if (buf[0] == '!') {
564 0 : char *result;
565 0 : if ((result = GDKstrdup(buf)) == NULL)
566 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
567 : return result;
568 : }
569 :
570 1402 : buf[sz] = '\0';
571 :
572 1402 : return MAL_SUCCEED;
573 : }
574 :
575 : typedef struct _binbat_v1 {
576 : int Ttype;
577 : oid Hseqbase;
578 : oid Tseqbase;
579 : bool
580 : Tsorted:1, Trevsorted:1, Tkey:1, Tnonil:1, Tdense:1;
581 : BUN size;
582 : size_t tailsize;
583 : size_t theapsize;
584 : } binbat;
585 :
586 : static str
587 1401 : RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in, bool must_flush, bool cint128)
588 : {
589 1401 : binbat bb = { 0, 0, 0, false, false, false, false, false, 0, 0, 0 };
590 1401 : char *nme = NULL;
591 1401 : char *val = NULL;
592 1401 : char tmp;
593 1401 : size_t len;
594 1401 : lng lv, *lvp;
595 :
596 1401 : BAT *b;
597 :
598 1401 : (void) cint128;
599 : /* hdr is a JSON structure that looks like
600 : * {"version":1,"ttype":6,"tseqbase":0,"tailsize":4,"theapsize":0}
601 : * we take the binary data directly from the stream */
602 :
603 : /* could skip whitespace, but we just don't allow that */
604 1401 : if (*hdr++ != '{')
605 0 : throw(MAL, "remote.bincopyfrom",
606 : "illegal input, not a JSON header (got '%s')", hdr - 1);
607 226056 : while (*hdr != '\0') {
608 224654 : switch (*hdr) {
609 33624 : case '"':
610 : /* we assume only numeric values, so all strings are
611 : * elems */
612 33624 : if (nme != NULL) {
613 16815 : *hdr = '\0';
614 : } else {
615 16809 : nme = hdr + 1;
616 : }
617 : break;
618 16818 : case ':':
619 16818 : val = hdr + 1;
620 16818 : break;
621 16819 : case ',':
622 : case '}':
623 16819 : if (val == NULL)
624 0 : throw(MAL, "remote.bincopyfrom",
625 : "illegal input, JSON value missing");
626 16819 : *hdr = '\0';
627 :
628 16819 : lvp = &lv;
629 16819 : len = sizeof(lv);
630 : /* tseqbase can be 1<<31/1<<63 which causes overflow
631 : * in lngFromStr, so we check separately */
632 16819 : if (strcmp(val,
633 : #if SIZEOF_OID == 8
634 : "9223372036854775808"
635 : #else
636 : "2147483648"
637 : #endif
638 1375 : ) == 0 && strcmp(nme, "tseqbase") == 0) {
639 1372 : bb.Tseqbase = oid_nil;
640 : } else {
641 : /* all values should be non-negative, so we check that
642 : * here as well */
643 15447 : if (lngFromStr(val, &len, &lvp, true) < 0 ||
644 15448 : lv < 0 /* includes lng_nil */ )
645 0 : throw(MAL, "remote.bincopyfrom",
646 : "bad %s value: %s", nme, val);
647 :
648 : /* deal with nme and val */
649 15448 : if (strcmp(nme, "version") == 0) {
650 1401 : if (lv != 1)
651 0 : throw(MAL, "remote.bincopyfrom",
652 : "unsupported version: %s", val);
653 14047 : } else if (strcmp(nme, "hseqbase") == 0) {
654 : #if SIZEOF_OID < SIZEOF_LNG
655 : if (lv > GDK_oid_max)
656 : throw(MAL, "remote.bincopyfrom",
657 : "bad %s value: %s", nme, val);
658 : #endif
659 1401 : bb.Hseqbase = (oid) lv;
660 12646 : } else if (strcmp(nme, "ttype") == 0) {
661 1402 : if (lv >= GDKatomcnt)
662 0 : throw(MAL, "remote.bincopyfrom",
663 : "bad %s value: GDK atom number %s doesn't exist",
664 : nme, val);
665 1402 : bb.Ttype = (int) lv;
666 11244 : } else if (strcmp(nme, "tseqbase") == 0) {
667 : #if SIZEOF_OID < SIZEOF_LNG
668 : if (lv > GDK_oid_max)
669 : throw(MAL, "remote.bincopyfrom",
670 : "bad %s value: %s", nme, val);
671 : #endif
672 30 : bb.Tseqbase = (oid) lv;
673 11214 : } else if (strcmp(nme, "tsorted") == 0) {
674 1401 : bb.Tsorted = lv != 0;
675 9813 : } else if (strcmp(nme, "trevsorted") == 0) {
676 1402 : bb.Trevsorted = lv != 0;
677 8411 : } else if (strcmp(nme, "tkey") == 0) {
678 1401 : bb.Tkey = lv != 0;
679 7010 : } else if (strcmp(nme, "tnonil") == 0) {
680 1402 : bb.Tnonil = lv != 0;
681 5608 : } else if (strcmp(nme, "tdense") == 0) {
682 1402 : bb.Tdense = lv != 0;
683 4206 : } else if (strcmp(nme, "size") == 0) {
684 1402 : if (lv > (lng) BUN_MAX)
685 0 : throw(MAL, "remote.bincopyfrom",
686 : "bad %s value: %s", nme, val);
687 1402 : bb.size = (BUN) lv;
688 2804 : } else if (strcmp(nme, "tailsize") == 0) {
689 1402 : bb.tailsize = (size_t) lv;
690 1402 : } else if (strcmp(nme, "theapsize") == 0) {
691 1402 : bb.theapsize = (size_t) lv;
692 : } else {
693 0 : throw(MAL, "remote.bincopyfrom",
694 : "unknown element: %s", nme);
695 : }
696 : }
697 : nme = val = NULL;
698 : break;
699 : }
700 224655 : hdr++;
701 : }
702 : #ifdef HAVE_HGE
703 1402 : if (int128 && !cint128 && bb.Ttype >= TYPE_hge)
704 0 : bb.Ttype++;
705 : #else
706 : (void) cint128;
707 : #endif
708 :
709 2712 : b = COLnew2(bb.Hseqbase, bb.Ttype, bb.size, TRANSIENT,
710 1310 : bb.size > 0 ? (uint16_t) (bb.tailsize / bb.size) : 0);
711 1402 : if (b == NULL)
712 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
713 :
714 1402 : if (bb.tailsize > 0) {
715 2570 : if (HEAPextend(b->theap, bb.tailsize, true) != GDK_SUCCEED ||
716 1285 : mnstr_read(in, b->theap->base, bb.tailsize, 1) < 0)
717 0 : goto bailout;
718 1285 : b->theap->dirty = true;
719 : }
720 1402 : if (bb.theapsize > 0) {
721 152 : if ((b->tvheap->base == NULL &&
722 76 : (*BATatoms[b->ttype].atomHeap) (b->tvheap,
723 : b->batCapacity) != GDK_SUCCEED)
724 76 : || HEAPextend(b->tvheap, bb.theapsize, true) != GDK_SUCCEED
725 76 : || mnstr_read(in, b->tvheap->base, bb.theapsize, 1) < 0)
726 0 : goto bailout;
727 76 : b->tvheap->free = bb.theapsize;
728 76 : b->tvheap->dirty = true;
729 : }
730 :
731 : /* set properties */
732 1402 : b->tseqbase = bb.Tdense ? bb.Tseqbase : oid_nil;
733 1402 : b->tsorted = bb.Tsorted;
734 1402 : b->trevsorted = bb.Trevsorted;
735 1402 : b->tkey = bb.Tkey;
736 1402 : b->tnonil = bb.Tnonil;
737 1402 : if (bb.Ttype == TYPE_str && bb.size)
738 76 : BATsetcapacity(b, (BUN) (bb.tailsize >> b->tshift));
739 1402 : BATsetcount(b, bb.size);
740 :
741 : // read blockmode flush
742 1402 : while (must_flush && mnstr_read(in, &tmp, 1, 1) > 0) {
743 0 : TRC_ERROR(MAL_REMOTE, "Expected flush, got: %c\n", tmp);
744 : }
745 :
746 1401 : BATsettrivprop(b);
747 :
748 1401 : *ret = b;
749 1401 : return (MAL_SUCCEED);
750 :
751 : bailout:
752 0 : BBPreclaim(b);
753 0 : throw(MAL, "remote.bincopyfrom", "reading failed");
754 : }
755 :
756 : /**
757 : * get fetches the object referenced by ident over connection conn.
758 : * We are only interested in retrieving void-headed BATs, i.e. single columns.
759 : */
760 : static str
761 1400 : RMTget(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
762 : {
763 1400 : str conn, ident, tmp, rt;
764 1400 : connection c;
765 1400 : char qbuf[BUFSIZ + 1];
766 1400 : MapiHdl mhdl = NULL;
767 1400 : int rtype;
768 1400 : ValPtr v;
769 :
770 1400 : (void) mb;
771 1400 : (void) cntxt;
772 :
773 1400 : conn = *getArgReference_str(stk, pci, 1);
774 1400 : if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
775 0 : throw(ILLARG, "remote.get",
776 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
777 1400 : ident = *getArgReference_str(stk, pci, 2);
778 1400 : if (ident == 0 || isIdentifier(ident) < 0)
779 0 : throw(ILLARG, "remote.get",
780 : ILLEGAL_ARGUMENT ": identifier expected, got '%s'", ident);
781 :
782 : /* lookup conn, set c if valid */
783 1399 : rethrow("remote.get", tmp, RMTfindconn(&c, conn));
784 :
785 1400 : rtype = getArgType(mb, pci, 0);
786 1400 : v = &stk->stk[pci->argv[0]];
787 :
788 1400 : if (rtype == TYPE_any || isAnyExpression(rtype)) {
789 0 : char *tpe, *msg;
790 0 : tpe = getTypeName(rtype);
791 0 : msg = createException(MAL, "remote.get",
792 : ILLEGAL_ARGUMENT ": unsupported any type: %s",
793 : tpe);
794 0 : GDKfree(tpe);
795 0 : return msg;
796 : }
797 : /* check if the remote type complies with what we expect.
798 : Since the put() encodes the type as known to the remote site
799 : we can simple compare it here */
800 1400 : rt = getTypeIdentifier(rtype);
801 1400 : if (rt == NULL)
802 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
803 1400 : if (strcmp(ident + strlen(ident) - strlen(rt), rt)) {
804 0 : tmp = createException(MAL, "remote.get", ILLEGAL_ARGUMENT
805 : ": remote object type %s does not match expected type %s",
806 : rt, ident);
807 0 : GDKfree(rt);
808 0 : return tmp;
809 : }
810 1400 : GDKfree(rt);
811 :
812 1400 : if (isaBatType(rtype) && (localtype == 0177 || (localtype != c->type && localtype != (c->type | RMTT_HGE)))) {
813 0 : int t;
814 0 : size_t s;
815 0 : ptr r;
816 0 : str var;
817 0 : BAT *b;
818 :
819 0 : snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
820 :
821 0 : TRC_DEBUG(MAL_REMOTE, "Remote get: %s\n", qbuf);
822 :
823 : /* this call should be a single transaction over the channel */
824 0 : MT_lock_set(&c->lock);
825 :
826 0 : if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf))
827 : != MAL_SUCCEED) {
828 0 : TRC_ERROR(MAL_REMOTE, "Remote get: %s\n%s\n", qbuf, tmp);
829 0 : MT_lock_unset(&c->lock);
830 0 : var = createException(MAL, "remote.get", "%s", tmp);
831 0 : freeException(tmp);
832 0 : return var;
833 : }
834 0 : t = getBatType(rtype);
835 0 : b = COLnew(0, t, 0, TRANSIENT);
836 0 : if (b == NULL) {
837 0 : mapi_close_handle(mhdl);
838 0 : MT_lock_unset(&c->lock);
839 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
840 : }
841 :
842 0 : if (ATOMbasetype(t) == TYPE_str) {
843 0 : while (mapi_fetch_row(mhdl)) {
844 0 : var = mapi_fetch_field(mhdl, 1);
845 0 : if (BUNappend(b, var == NULL ? str_nil : var, false) != GDK_SUCCEED) {
846 0 : BBPreclaim(b);
847 0 : mapi_close_handle(mhdl);
848 0 : MT_lock_unset(&c->lock);
849 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
850 : }
851 : }
852 : } else
853 0 : while (mapi_fetch_row(mhdl)) {
854 0 : var = mapi_fetch_field(mhdl, 1);
855 0 : if (var == NULL)
856 0 : var = "nil";
857 0 : s = 0;
858 0 : r = NULL;
859 0 : if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
860 0 : BUNappend(b, r, false) != GDK_SUCCEED) {
861 0 : BBPreclaim(b);
862 0 : GDKfree(r);
863 0 : mapi_close_handle(mhdl);
864 0 : MT_lock_unset(&c->lock);
865 0 : throw(MAL, "remote.get", GDK_EXCEPTION);
866 : }
867 0 : GDKfree(r);
868 : }
869 :
870 0 : *v = (ValRecord) {
871 0 : .val.bval = b->batCacheid,
872 : .bat = true,
873 0 : .vtype = b->ttype,
874 : };
875 0 : BBPkeepref(b);
876 :
877 0 : mapi_close_handle(mhdl);
878 0 : MT_lock_unset(&c->lock);
879 1400 : } else if (isaBatType(rtype)) {
880 : /* binary compatible remote host, transfer BAT in binary form */
881 1400 : stream *sout;
882 1400 : stream *sin;
883 1400 : char buf[256];
884 1400 : BAT *b = NULL;
885 :
886 : /* this call should be a single transaction over the channel */
887 1400 : MT_lock_set(&c->lock);
888 :
889 : /* bypass Mapi from this point to efficiently write all data to
890 : * the server */
891 1400 : sout = mapi_get_to(c->mconn);
892 1400 : sin = mapi_get_from(c->mconn);
893 1400 : if (sin == NULL || sout == NULL) {
894 0 : MT_lock_unset(&c->lock);
895 0 : throw(MAL, "remote.get", "Connection lost");
896 : }
897 :
898 : /* call our remote helper to do this more efficiently */
899 1400 : mnstr_printf(sout, "remote.batbincopy(%s);\n", ident);
900 1400 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
901 :
902 1396 : if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED) {
903 0 : MT_lock_unset(&c->lock);
904 0 : return tmp;
905 : }
906 :
907 1400 : if ((tmp = RMTinternalcopyfrom(&b, buf, sin, true, c->int128)) != MAL_SUCCEED) {
908 0 : MT_lock_unset(&c->lock);
909 0 : return (tmp);
910 : }
911 :
912 1399 : *v = (ValRecord) {
913 1399 : .val.bval = b->batCacheid,
914 : .bat = true,
915 1399 : .vtype = b->ttype,
916 : };
917 1399 : BBPkeepref(b);
918 :
919 1400 : MT_lock_unset(&c->lock);
920 : } else {
921 0 : ptr p = NULL;
922 0 : str val;
923 0 : size_t len = 0;
924 :
925 0 : snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
926 0 : TRC_DEBUG(MAL_REMOTE, "Remote get: %s - %s\n", c->name, qbuf);
927 0 : if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf)) != MAL_SUCCEED) {
928 0 : return tmp;
929 : }
930 0 : (void) mapi_fetch_row(mhdl); /* should succeed */
931 0 : val = mapi_fetch_field(mhdl, 0);
932 :
933 0 : if (ATOMbasetype(rtype) == TYPE_str) {
934 0 : if (!VALinit(v, rtype, val == NULL ? str_nil : val)) {
935 0 : mapi_close_handle(mhdl);
936 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
937 : }
938 0 : } else if (ATOMfromstr(rtype, &p, &len, val == NULL ? "nil" : val, true)
939 : < 0) {
940 0 : char *msg;
941 0 : msg = createException(MAL, "remote.get",
942 : "unable to parse value: %s",
943 : val == NULL ? "nil" : val);
944 0 : mapi_close_handle(mhdl);
945 0 : GDKfree(p);
946 0 : return msg;
947 : } else {
948 0 : VALset(v, rtype, p);
949 0 : if (ATOMextern(rtype) == 0)
950 0 : GDKfree(p);
951 : }
952 :
953 0 : mapi_close_handle(mhdl);
954 : }
955 :
956 : return (MAL_SUCCEED);
957 : }
958 :
959 : /**
960 : * stores the given object on the remote host. The identifier of the
961 : * object on the remote host is returned for later use.
962 : */
963 : static str
964 3271 : RMTput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
965 : {
966 3271 : str conn, tmp;
967 3271 : char ident[512];
968 3271 : connection c;
969 3271 : ValPtr v;
970 3271 : int type;
971 3271 : ptr value;
972 3271 : MapiHdl mhdl = NULL;
973 :
974 3271 : (void) cntxt;
975 :
976 3271 : conn = *getArgReference_str(stk, pci, 1);
977 3271 : if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
978 4 : throw(ILLARG, "remote.put",
979 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
980 :
981 : /* lookup conn */
982 3267 : rethrow("remote.put", tmp, RMTfindconn(&c, conn));
983 :
984 : /* put the thing */
985 3270 : type = getArgType(mb, pci, 2);
986 3270 : value = getArgReference(stk, pci, 2);
987 :
988 : /* this call should be a single transaction over the channel */
989 3270 : MT_lock_set(&c->lock);
990 :
991 : /* get a free, typed identifier for the remote host */
992 3270 : tmp = RMTgetId(ident, sizeof(ident), mb, pci, 2);
993 3269 : if (tmp != MAL_SUCCEED) {
994 0 : MT_lock_unset(&c->lock);
995 0 : return tmp;
996 : }
997 :
998 : /* depending on the input object generate actions to store the
999 : * object remotely*/
1000 3269 : if (type == TYPE_any || isAnyExpression(type)) {
1001 4 : char *tpe, *msg;
1002 4 : MT_lock_unset(&c->lock);
1003 0 : tpe = getTypeName(type);
1004 0 : msg = createException(MAL, "remote.put", "unsupported type: %s", tpe);
1005 0 : GDKfree(tpe);
1006 0 : return msg;
1007 4666 : } else if (isaBatType(type) && !is_bat_nil(*(bat *) value)) {
1008 1402 : BATiter bi;
1009 : /* naive approach using bat.new() and bat.insert() calls */
1010 1402 : char *tail;
1011 1402 : bat bid;
1012 1402 : BAT *b = NULL;
1013 1402 : BUN p, q;
1014 1402 : str tailv;
1015 1402 : stream *sout;
1016 :
1017 1402 : tail = getTypeIdentifier(getBatType(type));
1018 1402 : if (tail == NULL) {
1019 0 : MT_lock_unset(&c->lock);
1020 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1021 : }
1022 :
1023 1402 : bid = *(bat *) value;
1024 1402 : if (bid != 0) {
1025 1402 : if ((b = BATdescriptor(bid)) == NULL) {
1026 0 : MT_lock_unset(&c->lock);
1027 0 : GDKfree(tail);
1028 0 : throw(MAL, "remote.put",
1029 : SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
1030 : }
1031 : }
1032 :
1033 : /* bypass Mapi from this point to efficiently write all data to
1034 : * the server */
1035 1402 : sout = mapi_get_to(c->mconn);
1036 :
1037 : /* call our remote helper to do this more efficiently */
1038 1402 : mnstr_printf(sout,
1039 : "%s := remote.batload(nil:%s, " BUNFMT ");\n",
1040 : ident, tail, (bid == 0 ? 0 : BATcount(b)));
1041 1400 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
1042 1401 : GDKfree(tail);
1043 :
1044 : /* b can be NULL if bid == 0 (only type given, ugh) */
1045 1402 : if (b) {
1046 2802 : int tpe = getBatType(type), trivial = tpe < TYPE_date
1047 1401 : || ATOMbasetype(tpe) == TYPE_str;
1048 1401 : const void *nil = ATOMnilptr(tpe);
1049 1401 : int (*atomcmp)(const void *, const void *) = ATOMcompare(tpe);
1050 :
1051 1401 : bi = bat_iterator(b);
1052 1401 : BATloop(b, p, q) {
1053 0 : const void *v = BUNtail(bi, p);
1054 0 : tailv = ATOMformat(tpe, v);
1055 0 : if (tailv == NULL) {
1056 0 : bat_iterator_end(&bi);
1057 0 : BBPunfix(b->batCacheid);
1058 0 : MT_lock_unset(&c->lock);
1059 0 : throw(MAL, "remote.put", GDK_EXCEPTION);
1060 : }
1061 0 : if (trivial || atomcmp(v, nil) == 0)
1062 0 : mnstr_printf(sout, "%s\n", tailv);
1063 : else
1064 0 : mnstr_printf(sout, "\"%s\"\n", tailv);
1065 0 : GDKfree(tailv);
1066 : }
1067 1401 : bat_iterator_end(&bi);
1068 1400 : BBPunfix(b->batCacheid);
1069 : }
1070 :
1071 : /* write the empty line the server is waiting for, handles
1072 : * all errors at the same time, if any */
1073 1403 : if ((tmp = RMTquery(&mhdl, "remote.put", c->mconn, ""))
1074 : != MAL_SUCCEED) {
1075 0 : MT_lock_unset(&c->lock);
1076 0 : return tmp;
1077 : }
1078 1400 : mapi_close_handle(mhdl);
1079 0 : } else if (isaBatType(type) && is_bat_nil(*(bat *) value)) {
1080 0 : stream *sout;
1081 0 : str typename = getTypeName(type);
1082 0 : sout = mapi_get_to(c->mconn);
1083 0 : mnstr_printf(sout, "%s := nil:%s;\n", ident, typename);
1084 0 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
1085 0 : GDKfree(typename);
1086 : } else {
1087 1863 : size_t l;
1088 1863 : str val;
1089 1863 : char *tpe;
1090 1863 : char qbuf[512], *nbuf = qbuf;
1091 1863 : const void *nil = ATOMnilptr(type), *p = value;
1092 1863 : int (*atomcmp)(const void *, const void *) = ATOMcompare(type);
1093 :
1094 1863 : if (ATOMextern(type))
1095 1291 : p = *(ptr *) value;
1096 :
1097 1863 : val = ATOMformat(type, p);
1098 1864 : if (val == NULL) {
1099 0 : MT_lock_unset(&c->lock);
1100 0 : throw(MAL, "remote.put", GDK_EXCEPTION);
1101 : }
1102 1864 : tpe = getTypeIdentifier(type);
1103 1864 : if (tpe == NULL) {
1104 0 : MT_lock_unset(&c->lock);
1105 0 : GDKfree(val);
1106 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1107 : }
1108 1864 : l = strlen(val) + strlen(tpe) + strlen(ident) + 10;
1109 1864 : if (l > (ssize_t) sizeof(qbuf) && (nbuf = GDKmalloc(l)) == NULL) {
1110 0 : MT_lock_unset(&c->lock);
1111 0 : GDKfree(val);
1112 0 : GDKfree(tpe);
1113 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1114 : }
1115 1864 : if (type < TYPE_date || ATOMbasetype(type) == TYPE_str
1116 8 : || atomcmp(p, nil) == 0)
1117 1859 : snprintf(nbuf, l, "%s := %s:%s;\n", ident, val, tpe);
1118 : else
1119 5 : snprintf(nbuf, l, "%s := \"%s\":%s;\n", ident, val, tpe);
1120 1864 : GDKfree(tpe);
1121 1866 : GDKfree(val);
1122 1868 : TRC_DEBUG(MAL_REMOTE, "Remote put: %s - %s\n", c->name, nbuf);
1123 1868 : tmp = RMTquery(&mhdl, "remote.put", c->mconn, nbuf);
1124 1854 : if (nbuf != qbuf)
1125 33 : GDKfree(nbuf);
1126 1863 : if (tmp != MAL_SUCCEED) {
1127 0 : MT_lock_unset(&c->lock);
1128 0 : return tmp;
1129 : }
1130 1863 : mapi_close_handle(mhdl);
1131 : }
1132 3265 : MT_lock_unset(&c->lock);
1133 :
1134 : /* return the identifier */
1135 3270 : v = &stk->stk[pci->argv[0]];
1136 3270 : if (VALinit(v, TYPE_str, ident) == NULL)
1137 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1138 : return (MAL_SUCCEED);
1139 : }
1140 :
1141 : /**
1142 : * stores the given <mod>.<fcn> on the remote host.
1143 : * An error is returned if the function is already known at the remote site.
1144 : * The implementation is based on serialisation of the block into a string
1145 : * followed by remote parsing.
1146 : */
1147 : static str
1148 0 : RMTregisterInternal(Client cntxt, char **fcn_id, const char *conn,
1149 : const char *mod, const char *fcn)
1150 : {
1151 0 : str tmp, qry, msg;
1152 0 : connection c;
1153 0 : char buf[BUFSIZ];
1154 0 : MapiHdl mhdl = NULL;
1155 0 : Symbol sym;
1156 :
1157 0 : if (strNil(conn))
1158 0 : throw(ILLARG, "remote.register",
1159 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1160 :
1161 : /* find local definition */
1162 0 : sym = findSymbol(cntxt->usermodule, putName(mod), putName(fcn));
1163 0 : if (sym == NULL)
1164 0 : throw(MAL, "remote.register",
1165 : ILLEGAL_ARGUMENT ": no such function: %s.%s", mod, fcn);
1166 :
1167 : /* lookup conn */
1168 0 : rethrow("remote.register", tmp, RMTfindconn(&c, conn));
1169 :
1170 : /* this call should be a single transaction over the channel */
1171 0 : MT_lock_set(&c->lock);
1172 :
1173 : /* get a free, typed identifier for the remote host */
1174 0 : char ident[512];
1175 0 : tmp = RMTgetId(ident, sizeof(ident), sym->def, getInstrPtr(sym->def, 0), 0);
1176 0 : if (tmp != MAL_SUCCEED) {
1177 0 : MT_lock_unset(&c->lock);
1178 0 : return tmp;
1179 : }
1180 :
1181 : /* check remote definition */
1182 0 : snprintf(buf, BUFSIZ,
1183 : "b:bit:=inspect.getExistence(\"%s\",\"%s\");\nio.print(b);", mod,
1184 : ident);
1185 0 : TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, buf);
1186 0 : if ((msg = RMTquery(&mhdl, "remote.register", c->mconn, buf)) != MAL_SUCCEED) {
1187 0 : MT_lock_unset(&c->lock);
1188 0 : return msg;
1189 : }
1190 :
1191 0 : char *result;
1192 0 : if (mapi_get_field_count(mhdl) && mapi_fetch_row(mhdl)
1193 0 : && (result = mapi_fetch_field(mhdl, 0))) {
1194 0 : if (strcmp(result, "false") != 0)
1195 0 : msg = createException(MAL, "remote.register",
1196 : "function already exists at the remote site: %s.%s",
1197 : mod, fcn);
1198 : } else
1199 0 : msg = createException(MAL, "remote.register", OPERATION_FAILED);
1200 :
1201 0 : mapi_close_handle(mhdl);
1202 :
1203 0 : if (msg) {
1204 0 : MT_lock_unset(&c->lock);
1205 0 : return msg;
1206 : }
1207 :
1208 0 : *fcn_id = GDKstrdup(ident);
1209 0 : if (*fcn_id == NULL) {
1210 0 : MT_lock_unset(&c->lock);
1211 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1212 : }
1213 :
1214 0 : Symbol prg;
1215 0 : if ((prg = newFunctionArgs(putName(mod), putName(*fcn_id), FUNCTIONsymbol, -1)) == NULL) {
1216 0 : MT_lock_unset(&c->lock);
1217 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1218 : }
1219 :
1220 : // We only need the Symbol not the inner program stub. So we clear it.
1221 0 : freeMalBlk(prg->def);
1222 0 : prg->def = NULL;
1223 :
1224 0 : if ((prg->def = copyMalBlk(sym->def)) == NULL) {
1225 0 : MT_lock_unset(&c->lock);
1226 0 : freeSymbol(prg);
1227 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1228 : }
1229 0 : setFunctionId(getInstrPtr(prg->def, 0), putName(*fcn_id));
1230 :
1231 : /* make sure the program is error free */
1232 0 : msg = chkProgram(cntxt->usermodule, prg->def);
1233 0 : if (msg != MAL_SUCCEED || prg->def->errors) {
1234 0 : MT_lock_unset(&c->lock);
1235 0 : if (msg)
1236 : return msg;
1237 0 : throw(MAL, "remote.register",
1238 : "function '%s.%s' contains syntax or type errors", mod, *fcn_id);
1239 : }
1240 :
1241 0 : qry = mal2str(prg->def, 0, prg->def->stop);
1242 0 : TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, qry);
1243 0 : msg = RMTquery(&mhdl, "remote.register", c->mconn, qry);
1244 0 : GDKfree(qry);
1245 0 : if (mhdl)
1246 0 : mapi_close_handle(mhdl);
1247 :
1248 0 : freeSymbol(prg);
1249 :
1250 0 : MT_lock_unset(&c->lock);
1251 0 : return msg;
1252 : }
1253 :
1254 : static str
1255 0 : RMTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1256 : {
1257 0 : char **fcn_id = getArgReference_str(stk, pci, 0);
1258 0 : const char *conn = *getArgReference_str(stk, pci, 1);
1259 0 : const char *mod = *getArgReference_str(stk, pci, 2);
1260 0 : const char *fcn = *getArgReference_str(stk, pci, 3);
1261 0 : (void) mb;
1262 0 : return RMTregisterInternal(cntxt, fcn_id, conn, mod, fcn);
1263 : }
1264 :
1265 : /**
1266 : * exec executes the function with its given arguments on the remote
1267 : * host, returning the function's return value. exec is purposely kept
1268 : * very spartan. All arguments need to be handles to previously put()
1269 : * values. It calls the function with the given arguments at the remote
1270 : * site, and returns the handle which stores the return value of the
1271 : * remotely executed function. This return value can be retrieved using
1272 : * a get call. It handles multiple return arguments.
1273 : */
1274 : static str
1275 733 : RMTexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1276 : {
1277 733 : str conn, mod, func, tmp;
1278 733 : int i;
1279 733 : size_t len, buflen;
1280 733 : connection c = NULL;
1281 733 : char *qbuf;
1282 733 : MapiHdl mhdl;
1283 :
1284 733 : (void) cntxt;
1285 733 : (void) mb;
1286 733 : bool no_return_arguments = 0;
1287 :
1288 733 : columnar_result_callback *rcb = NULL;
1289 733 : ValRecord *v = &(stk)->stk[(pci)->argv[4]];
1290 733 : if (pci->retc == 1 && (pci->argc >= 4) && (v->vtype == TYPE_ptr)) {
1291 1 : rcb = (columnar_result_callback *) v->val.pval;
1292 : }
1293 :
1294 2686 : for (i = 0; i < pci->retc; i++) {
1295 1953 : if (stk->stk[pci->argv[i]].vtype == TYPE_str) {
1296 1952 : tmp = *getArgReference_str(stk, pci, i);
1297 1952 : if (tmp == NULL || strcmp(tmp, (str) str_nil) == 0)
1298 0 : throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT
1299 : ": return value %d is NULL or nil", i);
1300 : } else
1301 : no_return_arguments = 1;
1302 : }
1303 :
1304 733 : conn = *getArgReference_str(stk, pci, i++);
1305 733 : if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
1306 0 : throw(ILLARG, "remote.exec",
1307 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1308 733 : mod = *getArgReference_str(stk, pci, i++);
1309 733 : if (mod == NULL || strcmp(mod, (str) str_nil) == 0)
1310 0 : throw(ILLARG, "remote.exec",
1311 : ILLEGAL_ARGUMENT ": module name is NULL or nil");
1312 733 : func = *getArgReference_str(stk, pci, i++);
1313 733 : if (func == NULL || strcmp(func, (str) str_nil) == 0)
1314 0 : throw(ILLARG, "remote.exec",
1315 : ILLEGAL_ARGUMENT ": function name is NULL or nil");
1316 :
1317 : /* lookup conn */
1318 733 : rethrow("remote.exec", tmp, RMTfindconn(&c, conn));
1319 :
1320 : /* this call should be a single transaction over the channel */
1321 733 : MT_lock_set(&c->lock);
1322 :
1323 733 : if (!no_return_arguments && pci->argc - pci->retc < 3) { /* conn, mod, func, ... */
1324 0 : MT_lock_unset(&c->lock);
1325 0 : throw(MAL, "remote.exec",
1326 : ILLEGAL_ARGUMENT " MAL instruction misses arguments");
1327 : }
1328 :
1329 733 : len = 0;
1330 : /* count how big a buffer we need */
1331 733 : len += 2 * (pci->retc > 1);
1332 733 : if (!no_return_arguments)
1333 2685 : for (i = 0; i < pci->retc; i++) {
1334 1953 : len += 2 * (i > 0);
1335 1953 : len += strlen(*getArgReference_str(stk, pci, i));
1336 : }
1337 :
1338 733 : const int arg_index = rcb ? 4 : 3;
1339 :
1340 733 : len += strlen(mod) + strlen(func) + 6;
1341 2050 : for (i = arg_index; i < pci->argc - pci->retc; i++) {
1342 1317 : len += 2 * (i > arg_index);
1343 1317 : len += strlen(*getArgReference_str(stk, pci, pci->retc + i));
1344 : }
1345 733 : len += 2;
1346 733 : buflen = len + 1;
1347 733 : if ((qbuf = GDKmalloc(buflen)) == NULL) {
1348 0 : MT_lock_unset(&c->lock);
1349 0 : throw(MAL, "remote.exec", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1350 : }
1351 :
1352 733 : len = 0;
1353 :
1354 733 : if (pci->retc > 1)
1355 70 : qbuf[len++] = '(';
1356 733 : if (!no_return_arguments)
1357 2685 : for (i = 0; i < pci->retc; i++)
1358 1953 : len += snprintf(&qbuf[len], buflen - len, "%s%s",
1359 1953 : (i > 0 ? ", " : ""), *getArgReference_str(stk, pci,
1360 : i));
1361 :
1362 733 : if (pci->retc > 1)
1363 70 : qbuf[len++] = ')';
1364 :
1365 : /* build the function invocation string in qbuf */
1366 733 : if (!no_return_arguments && pci->retc > 0) {
1367 732 : len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, func);
1368 : } else {
1369 1 : len += snprintf(&qbuf[len], buflen - len, " %s.%s(", mod, func);
1370 : }
1371 :
1372 : /* handle the arguments to the function */
1373 :
1374 : /* put the arguments one by one, and dynamically build the
1375 : * invocation string */
1376 2049 : for (i = arg_index; i < pci->argc - pci->retc; i++) {
1377 1316 : len += snprintf(&qbuf[len], buflen - len, "%s%s",
1378 : (i > arg_index ? ", " : ""),
1379 1316 : *(getArgReference_str(stk, pci, pci->retc + i)));
1380 : }
1381 :
1382 : /* finish end execute the invocation string */
1383 733 : len += snprintf(&qbuf[len], buflen - len, ");");
1384 733 : TRC_DEBUG(MAL_REMOTE, "Remote exec: %s - %s\n", c->name, qbuf);
1385 733 : tmp = RMTquery(&mhdl, "remote.exec", c->mconn, qbuf);
1386 732 : GDKfree(qbuf);
1387 :
1388 : /* Temporary hack:
1389 : * use a callback to immediately handle columnar results before hdl is destroyed. */
1390 733 : if (tmp == MAL_SUCCEED && rcb && mhdl
1391 1 : && (mapi_get_querytype(mhdl) == Q_TABLE
1392 0 : || mapi_get_querytype(mhdl) == Q_PREPARE)) {
1393 1 : int fields = mapi_get_field_count(mhdl);
1394 1 : columnar_result *results = GDKzalloc(sizeof(columnar_result) * fields);
1395 :
1396 1 : if (!results) {
1397 0 : tmp = createException(MAL, "remote.exec",
1398 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
1399 : } else {
1400 1 : int i = 0;
1401 1 : char buf[256] = { 0 };
1402 1 : stream *sin = mapi_get_from(c->mconn);
1403 :
1404 4 : for (; i < fields; i++) {
1405 2 : BAT *b = NULL;
1406 :
1407 2 : if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED ||
1408 2 : (tmp = RMTinternalcopyfrom(&b, buf, sin, i == fields - 1, c->int128)) != MAL_SUCCEED) {
1409 : break;
1410 : }
1411 :
1412 2 : results[i].id = b->batCacheid;
1413 2 : BBPkeepref(b);
1414 2 : results[i].colname = mapi_get_name(mhdl, i);
1415 2 : results[i].tpename = mapi_get_type(mhdl, i);
1416 2 : results[i].digits = mapi_get_digits(mhdl, i);
1417 2 : results[i].scale = mapi_get_scale(mhdl, i);
1418 : }
1419 :
1420 1 : if (tmp != MAL_SUCCEED) {
1421 0 : for (int j = 0; j < i; j++)
1422 0 : BBPrelease(results[j].id);
1423 : } else {
1424 1 : assert(rcb->context);
1425 1 : tmp = rcb->call(rcb->context, mapi_get_table(mhdl, 0), results,
1426 : fields);
1427 3 : for (int j = 0; j < i; j++)
1428 2 : BBPrelease(results[j].id);
1429 : }
1430 1 : GDKfree(results);
1431 : }
1432 : }
1433 :
1434 733 : if (rcb) {
1435 1 : GDKfree(rcb->context);
1436 1 : GDKfree(rcb);
1437 : }
1438 :
1439 733 : if (mhdl)
1440 727 : mapi_close_handle(mhdl);
1441 732 : MT_lock_unset(&c->lock);
1442 733 : return tmp;
1443 : }
1444 :
1445 : /**
1446 : * batload is a helper function to make transferring a BAT with RMTput
1447 : * more efficient. It works by creating a BAT, and loading it with the
1448 : * data as comma separated values from the input stream, until an empty
1449 : * line is read. The given size argument is taken as a hint only, and
1450 : * is not enforced to match the number of rows read.
1451 : */
1452 : static str
1453 1402 : RMTbatload(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1454 : {
1455 1402 : ValPtr v;
1456 1402 : int t;
1457 1402 : int size;
1458 1402 : ptr r;
1459 1402 : size_t s;
1460 1402 : BAT *b;
1461 1402 : size_t len;
1462 1402 : char *var;
1463 1402 : str msg = MAL_SUCCEED;
1464 1402 : bstream *fdin = cntxt->fdin;
1465 :
1466 1402 : v = &stk->stk[pci->argv[0]]; /* return */
1467 1402 : t = getArgType(mb, pci, 1); /* tail type */
1468 1402 : size = *getArgReference_int(stk, pci, 2); /* size */
1469 :
1470 1402 : b = COLnew(0, t, size, TRANSIENT);
1471 1402 : if (b == NULL)
1472 0 : throw(MAL, "remote.load", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1473 :
1474 : /* grab the input stream and start reading */
1475 1402 : fdin->eof = false;
1476 1402 : len = fdin->pos;
1477 1402 : while (len < fdin->len || bstream_next(fdin) > 0) {
1478 : /* newline hunting (how spartan) */
1479 1402 : for (len = fdin->pos; len < fdin->len && fdin->buf[len] != '\n';
1480 0 : len++) ;
1481 : /* unterminated line, request more */
1482 1402 : if (fdin->buf[len] != '\n')
1483 0 : continue;
1484 : /* empty line, end of input */
1485 1402 : if (fdin->pos == len) {
1486 1402 : if (isa_block_stream(fdin->s)) {
1487 1402 : ssize_t n = bstream_next(fdin);
1488 1402 : if (n)
1489 0 : msg = createException(MAL, "remote.load",
1490 : SQLSTATE(HY013)
1491 : "Unexpected return from remote");
1492 : }
1493 : break;
1494 : }
1495 0 : fdin->buf[len] = '\0'; /* kill \n */
1496 0 : var = &fdin->buf[fdin->pos];
1497 : /* skip over this line */
1498 0 : fdin->pos = ++len;
1499 :
1500 0 : s = 0;
1501 0 : r = NULL;
1502 0 : if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
1503 0 : BUNappend(b, r, false) != GDK_SUCCEED) {
1504 0 : BBPreclaim(b);
1505 0 : GDKfree(r);
1506 0 : throw(MAL, "remote.get", GDK_EXCEPTION);
1507 : }
1508 0 : GDKfree(r);
1509 : }
1510 :
1511 1402 : *v = (ValRecord) {
1512 1402 : .val.bval = b->batCacheid,
1513 : .bat = true,
1514 1402 : .vtype = b->ttype,
1515 : };
1516 1402 : BBPkeepref(b);
1517 :
1518 1402 : return msg;
1519 : }
1520 :
1521 : /**
1522 : * dump given BAT to stream
1523 : */
1524 : static str
1525 1400 : RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1526 : {
1527 1400 : bat bid = *getArgReference_bat(stk, pci, 1);
1528 1400 : BAT *b = BBPquickdesc(bid), *v = b;
1529 1400 : char sendtheap = 0, sendtvheap = 0;
1530 :
1531 1400 : (void) mb;
1532 1400 : (void) stk;
1533 1400 : (void) pci;
1534 :
1535 1400 : if (b == NULL)
1536 0 : throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_UNDEFINED);
1537 :
1538 1400 : if (BBPfix(bid) <= 0)
1539 0 : throw(MAL, "remote.bincopyto", MAL_MALLOC_FAIL);
1540 :
1541 1400 : sendtheap = b->ttype != TYPE_void;
1542 1400 : sendtvheap = sendtheap && b->tvheap;
1543 90 : if (sendtvheap && VIEWvtparent(b)
1544 41 : && BATcount(b) < BATcount(BBP_desc(VIEWvtparent(b)))) {
1545 17 : if ((b = BATdescriptor(bid)) == NULL) {
1546 0 : BBPunfix(bid);
1547 0 : throw(MAL, "remote.bincopyto", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
1548 : }
1549 17 : v = COLcopy(b, b->ttype, true, TRANSIENT);
1550 17 : BBPunfix(b->batCacheid);
1551 17 : if (v == NULL) {
1552 0 : BBPunfix(bid);
1553 0 : throw(MAL, "remote.bincopyto", GDK_EXCEPTION);
1554 : }
1555 : }
1556 :
1557 1400 : BATiter vi = bat_iterator(v);
1558 1490 : mnstr_printf(cntxt->fdout, /*JSON*/ "{"
1559 : "\"version\":1,"
1560 : "\"ttype\":%d,"
1561 : "\"hseqbase\":" OIDFMT ","
1562 : "\"tseqbase\":" OIDFMT ","
1563 : "\"tsorted\":%d,"
1564 : "\"trevsorted\":%d,"
1565 : "\"tkey\":%d,"
1566 : "\"tnonil\":%d,"
1567 : "\"tdense\":%d,"
1568 : "\"size\":" BUNFMT ","
1569 : "\"tailsize\":%zu,"
1570 : "\"theapsize\":%zu"
1571 : "}\n",
1572 1400 : vi.type,
1573 : v->hseqbase, v->tseqbase,
1574 1400 : vi.sorted, vi.revsorted,
1575 1400 : vi.key,
1576 1400 : vi.nonil,
1577 1400 : BATtdensebi(&vi),
1578 : vi.count,
1579 1370 : sendtheap ? (size_t) vi.count << vi.shift : 0,
1580 90 : sendtvheap && vi.count > 0 ? vi.vhfree : 0);
1581 :
1582 1400 : if (sendtheap && vi.count > 0) {
1583 1283 : mnstr_write(cntxt->fdout, /* tail */
1584 1283 : vi.base, vi.count * vi.width, 1);
1585 1283 : if (sendtvheap)
1586 75 : mnstr_write(cntxt->fdout, /* theap */
1587 75 : vi.vh->base, vi.vhfree, 1);
1588 : }
1589 1400 : bat_iterator_end(&vi);
1590 : /* flush is done by the calling environment (MAL) */
1591 :
1592 1400 : if (b != v)
1593 17 : BBPreclaim(v);
1594 :
1595 1400 : BBPunfix(bid);
1596 :
1597 1400 : return (MAL_SUCCEED);
1598 : }
1599 :
1600 : /**
1601 : * read from the input stream and give the BAT handle back to the caller
1602 : */
1603 : static str
1604 0 : RMTbincopyfrom(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1605 : {
1606 0 : BAT *b = NULL;
1607 0 : ValPtr v;
1608 0 : str err;
1609 :
1610 0 : (void) mb;
1611 :
1612 : /* We receive a normal line, which contains the JSON header, the
1613 : * rest is binary data directly on the stream. We get the first
1614 : * line from the buffered stream we have here, and pass it on
1615 : * together with the raw stream we have. */
1616 0 : cntxt->fdin->eof = false; /* in case it was before */
1617 0 : if (bstream_next(cntxt->fdin) <= 0)
1618 0 : throw(MAL, "remote.bincopyfrom", "expected JSON header");
1619 :
1620 0 : cntxt->fdin->buf[cntxt->fdin->len] = '\0';
1621 0 : err = RMTinternalcopyfrom(&b,
1622 0 : &cntxt->fdin->buf[cntxt->fdin->pos], cntxt->fdin->s, true, int128 /* library should be compatible */);
1623 : /* skip the JSON line */
1624 0 : cntxt->fdin->pos = ++cntxt->fdin->len;
1625 0 : if (err !=MAL_SUCCEED)
1626 : return (err);
1627 :
1628 0 : v = &stk->stk[pci->argv[0]];
1629 0 : *v = (ValRecord) {
1630 0 : .val.bval = b->batCacheid,
1631 : .bat = true,
1632 0 : .vtype = b->ttype,
1633 : };
1634 0 : BBPkeepref(b);
1635 :
1636 0 : return (MAL_SUCCEED);
1637 : }
1638 :
1639 : /**
1640 : * bintype identifies the system on its binary profile. This is mainly
1641 : * used to determine if BATs can be sent binary across.
1642 : */
1643 : static str
1644 186 : RMTbintype(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1645 : {
1646 186 : (void)mb;
1647 186 : (void)stk;
1648 186 : (void)pci;
1649 :
1650 : /* TODO bintype should include the (bin) protocol version */
1651 186 : mnstr_printf(cntxt->fdout, "[ %d ]\n", localtype);
1652 186 : return(MAL_SUCCEED);
1653 : }
1654 :
1655 : /**
1656 : * Returns whether the underlying connection is still connected or not.
1657 : * Best effort implementation on top of mapi using a ping.
1658 : */
1659 : static str
1660 0 : RMTisalive(int *ret, const char *const *conn)
1661 : {
1662 0 : str tmp;
1663 0 : connection c;
1664 :
1665 0 : if (*conn == NULL || strcmp(*conn, (str) str_nil) == 0)
1666 0 : throw(ILLARG, "remote.get",
1667 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1668 :
1669 : /* lookup conn, set c if valid */
1670 0 : rethrow("remote.get", tmp, RMTfindconn(&c, *conn));
1671 :
1672 0 : *ret = 0;
1673 0 : if (mapi_is_connected(c->mconn) && mapi_ping(c->mconn) == MOK)
1674 0 : *ret = 1;
1675 :
1676 : return MAL_SUCCEED;
1677 : }
1678 :
1679 : // This is basically a no op
1680 : static str
1681 362 : RMTregisterSupervisor(int *ret, const char *const *sup_uuid, const char *const *query_uuid)
1682 : {
1683 362 : (void) sup_uuid;
1684 362 : (void) query_uuid;
1685 :
1686 362 : *ret = 0;
1687 362 : return MAL_SUCCEED;
1688 : }
1689 :
1690 : #include "mel.h"
1691 : mel_func remote_init_funcs[] = {
1692 : command("remote", "epilogue", RMTepilogue, false, "release the resources held by the remote module", args(1,1, arg("",void))),
1693 : command("remote", "resolve", RMTresolve, false, "resolve a pattern against Merovingian and return the URIs", args(1,2, batarg("",str),arg("pattern",str))),
1694 : pattern("remote", "connect", RMTconnect, false, "returns a newly created connection for uri, using user name and password", args(1,5, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str))),
1695 : command("remote", "connect", RMTconnectScen, false, "returns a newly created connection for uri, using user name, password and scenario", args(1,6, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str),arg("columnar",bit))),
1696 : command("remote", "disconnect", RMTdisconnect, false, "disconnects the connection pointed to by handle (received from a call to connect()", args(1,2, arg("",void),arg("conn",str))),
1697 : pattern("remote", "get", RMTget, false, "retrieves a copy of remote object ident", args(1,3, argany("",0),arg("conn",str),arg("ident",str))),
1698 : pattern("remote", "put", RMTput, false, "copies object to the remote site and returns its identifier", args(1,3, arg("",str),arg("conn",str),argany("object",0))),
1699 : pattern("remote", "register", RMTregister, false, "register <mod>.<fcn> at the remote site", args(1,4, arg("",str),arg("conn",str),arg("mod",str),arg("fcn",str))),
1700 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and returns the handle to its result", args(1,4, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
1701 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and returns the handle to its result", args(1,5, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
1702 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and applying function pointer rcb as callback to handle any results.", args(0,5, arg("conn",str),arg("mod",str),arg("func",str),arg("rcb",ptr), vararg("",str))),
1703 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and ignoring results.", args(0,4, arg("conn",str),arg("mod",str),arg("func",str), vararg("",str))),
1704 : command("remote", "isalive", RMTisalive, false, "check if conn is still valid and connected", args(1,2, arg("",int),arg("conn",str))),
1705 : pattern("remote", "batload", RMTbatload, false, "create a BAT of the given type and size, and load values from the input stream", args(1,3, batargany("",1),argany("tt",1),arg("size",int))),
1706 : pattern("remote", "batbincopy", RMTbincopyto, false, "dump BAT b in binary form to the stream", args(1,2, arg("",void),batargany("b",0))),
1707 : pattern("remote", "batbincopy", RMTbincopyfrom, false, "store the binary BAT data in the BBP and return as BAT", args(1,1, batargany("",0))),
1708 : pattern("remote", "bintype", RMTbintype, false, "print the binary type of this mserver5", args(1,1, arg("",void))),
1709 : command("remote", "register_supervisor", RMTregisterSupervisor, false, "Register the supervisor uuid at a remote site", args(1,3, arg("",int),arg("sup_uuid",str),arg("query_uuid",str))),
1710 : { .imp=NULL }
1711 : };
1712 : #include "mal_import.h"
1713 : #ifdef _MSC_VER
1714 : #undef read
1715 : #pragma section(".CRT$XCU",read)
1716 : #endif
1717 345 : LIB_STARTUP_FUNC(init_remote_mal)
1718 345 : { mal_module2("remote", NULL, remote_init_funcs, RMTprelude, NULL); }
|