Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * (c) Fabian Groffen, Martin Kersten
15 : * Remote querying functionality
16 : * Communication with other mservers at the MAL level is a delicate task.
17 : * However, it is indispensable for any distributed functionality. This
18 : * module provides an abstract way to store and retrieve objects on a
19 : * remote site. Additionally, functions on a remote site can be executed
20 : * using objects available in the remote session context. This yields in
21 : * four primitive functions that form the basis for distribution methods:
22 : * get, put, register and exec.
23 : *
24 : * The get method simply retrieves a copy of a remote object. Objects can
25 : * be simple values, strings or Column. The same holds for the put method,
26 : * but the other way around. A local object can be stored on a remote
27 : * site. Upon a successful store, the put method returns the remote
28 : * identifier for the stored object. With this identifier the object can
29 : * be addressed, e.g. using the get method to retrieve the object that was
30 : * stored using put.
31 : *
32 : * The get and put methods are symmetric. Performing a get on an
33 : * identifier that was returned by put, results in an object with the same
34 : * value and type as the one that was put. The result of such an operation is
35 : * equivalent to making an (expensive) copy of the original object.
36 : *
37 : * The register function takes a local MAL function and makes it known at a
38 : * remote site. It ensures that it does not overload an already known
39 : * operation remotely, which could create a semantic conflict.
40 : * Deregistering a function is forbidden, because it would allow for taking
41 : * over the remote site completely.
42 : * C-implemented functions, such as io.print() cannot be remotely stored.
43 : * It would require even more complicated (byte) code shipping and remote
44 : * compilation to make it work.
45 : *
46 : * The choice to let exec only execute functions avoids problems
47 : * to decide what should be returned to the caller. With a function it is
48 : * clear and simple to return that what the function signature prescribes.
49 : * Any side effect (e.g. io.print calls) may cause havoc in the system,
50 : * but are currently ignored.
51 : *
52 : * This leads to the final contract of this module. The methods should be
53 : * used correctly, by obeying their contract. Failing to do so will result
54 : * in errors and possibly undefined behaviour.
55 : *
56 : * The resolve() function can be used to query Merovingian. It returns one
57 : * or more databases discovered in its vicinity matching the given pattern.
58 : *
59 : */
60 : #include "monetdb_config.h"
61 : #include "remote.h"
62 :
63 : /*
64 : * Technically, these methods need to be serialised per connection,
65 : * hence a scheduler that interleaves e.g. multiple get calls, simply
66 : * violates this constraint. If parallelism to the same site is
67 : * desired, a user could create a second connection. This is not always
68 : * easy to generate at the proper place, e.g. overloading the dataflow
69 : * optimizer to patch connections structures is not acceptable.
70 : *
71 : * Instead, we maintain a simple lock with each connection, which can be
72 : * used to issue a safe, but blocking get/put/exec/register request.
73 : */
74 : #include "mal_exception.h"
75 : #include "mal_interpreter.h"
76 : #include "mal_function.h" /* for printFunction */
77 : #include "mal_listing.h"
78 : #include "mal_instruction.h" /* for getmodule/func macros */
79 : #include "mal_authorize.h"
80 : #include "mapi.h"
81 : #include "mutils.h"
82 :
83 : #define RMTT_L_ENDIAN (0<<1)
84 : #define RMTT_B_ENDIAN (1<<1)
85 : #define RMTT_32_BITS (0<<2)
86 : #define RMTT_64_BITS (1<<2)
87 : #define RMTT_32_OIDS (0<<3)
88 : #define RMTT_64_OIDS (1<<3)
89 : #define RMTT_HGE (1<<4)
90 :
91 : typedef struct _connection {
92 : MT_Lock lock; /* lock to avoid interference */
93 : str name; /* the handle for this connection */
94 : Mapi mconn; /* the Mapi handle for the connection */
95 : unsigned char type; /* binary profile of the connection target */
96 : bool int128; /* has int128 support */
97 : size_t nextid; /* id counter */
98 : struct _connection *next; /* the next connection in the list */
99 : } *connection;
100 :
101 : #ifndef WIN32
102 : #include <sys/socket.h> /* socket */
103 : #include <sys/un.h> /* sockaddr_un */
104 : #endif
105 : #include <unistd.h> /* gethostname */
106 :
107 : static MT_Lock mal_remoteLock = MT_LOCK_INITIALIZER(mal_remoteLock);
108 :
109 : static connection conns = NULL;
110 : static unsigned char localtype = 0177;
111 : static bool int128 = false;
112 :
113 : static inline str RMTquery(MapiHdl *ret, const char *func, Mapi conn,
114 : const char *query);
115 :
116 : /**
117 : * Returns a BAT with valid redirects for the given pattern. If
118 : * merovingian is not running, this function throws an error.
119 : */
120 : static str
121 0 : RMTresolve(bat *ret, str *pat)
122 : {
123 : #ifdef NATIVE_WIN32
124 : (void) ret;
125 : (void) pat;
126 : throw(MAL, "remote.resolve", "merovingian is not available on " "your platform, sorry"); /* please upgrade to Linux, etc. */
127 : #else
128 0 : BAT *list;
129 0 : const char *mero_uri;
130 0 : char *p;
131 0 : unsigned int port;
132 0 : char **redirs;
133 0 : char **or;
134 :
135 0 : if (pat == NULL || *pat == NULL || strcmp(*pat, (str) str_nil) == 0)
136 0 : throw(ILLARG, "remote.resolve",
137 : ILLEGAL_ARGUMENT ": pattern is NULL or nil");
138 :
139 0 : mero_uri = GDKgetenv("merovingian_uri");
140 0 : if (mero_uri == NULL)
141 0 : throw(MAL, "remote.resolve", "this function needs the mserver "
142 : "have been started by merovingian");
143 :
144 0 : list = COLnew(0, TYPE_str, 0, TRANSIENT);
145 0 : if (list == NULL)
146 0 : throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
147 :
148 : /* extract port from mero_uri, let mapi figure out the rest */
149 0 : mero_uri += strlen("mapi:monetdb://");
150 0 : if (*mero_uri == '[') {
151 0 : if ((mero_uri = strchr(mero_uri, ']')) == NULL) {
152 0 : BBPreclaim(list);
153 0 : throw(MAL, "remote.resolve",
154 : "illegal IPv6 address on merovingian_uri: %s",
155 : GDKgetenv("merovingian_uri"));
156 : }
157 : }
158 0 : if ((p = strchr(mero_uri, ':')) == NULL) {
159 0 : BBPreclaim(list);
160 0 : throw(MAL, "remote.resolve", "illegal merovingian_uri setting: %s",
161 : GDKgetenv("merovingian_uri"));
162 : }
163 0 : port = (unsigned int) atoi(p + 1);
164 :
165 0 : or = redirs = mapi_resolve(NULL, port, *pat);
166 :
167 0 : if (redirs == NULL) {
168 0 : BBPreclaim(list);
169 0 : throw(MAL, "remote.resolve", "unknown failure when resolving pattern");
170 : }
171 :
172 0 : while (*redirs != NULL) {
173 0 : if (BUNappend(list, (ptr) *redirs, false) != GDK_SUCCEED) {
174 0 : BBPreclaim(list);
175 0 : do
176 0 : free(*redirs);
177 0 : while (*++redirs);
178 0 : free(or);
179 0 : throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
180 : }
181 0 : free(*redirs);
182 0 : redirs++;
183 : }
184 0 : free(or);
185 :
186 0 : *ret = list->batCacheid;
187 0 : BBPkeepref(list);
188 0 : return (MAL_SUCCEED);
189 : #endif
190 : }
191 :
192 :
193 : /* for unique connection identifiers */
194 : static size_t connection_id = 0;
195 :
196 : /**
197 : * Returns a connection to the given uri. It always returns a newly
198 : * created connection.
199 : */
200 : static str
201 183 : RMTconnectScen(str *ret,
202 : str *ouri, str *user, str *passwd, str *scen, bit *columnar)
203 : {
204 183 : connection c;
205 183 : char conn[BUFSIZ];
206 183 : char *s;
207 183 : Mapi m;
208 183 : MapiHdl hdl;
209 183 : str msg;
210 :
211 : /* just make sure the return isn't garbage */
212 183 : *ret = 0;
213 :
214 183 : if (ouri == NULL || *ouri == NULL || strcmp(*ouri, (str) str_nil) == 0)
215 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": database uri "
216 : "is NULL or nil");
217 183 : if (user == NULL || *user == NULL || strcmp(*user, (str) str_nil) == 0)
218 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": username is "
219 : "NULL or nil");
220 183 : if (passwd == NULL || *passwd == NULL
221 183 : || strcmp(*passwd, (str) str_nil) == 0)
222 0 : throw(ILLARG, "remote.connect",
223 : ILLEGAL_ARGUMENT ": password is " "NULL or nil");
224 183 : if (scen == NULL || *scen == NULL || strcmp(*scen, (str) str_nil) == 0)
225 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario is "
226 : "NULL or nil");
227 183 : if (strcmp(*scen, "mal") != 0 && strcmp(*scen, "msql") != 0)
228 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario '%s' "
229 : "is not supported", *scen);
230 :
231 183 : m = mapi_mapiuri(*ouri, *user, *passwd, *scen);
232 184 : if (mapi_error(m)) {
233 0 : msg = createException(MAL, "remote.connect",
234 : "unable to connect to '%s': %s",
235 : *ouri, mapi_error_str(m));
236 0 : mapi_destroy(m);
237 0 : return msg;
238 : }
239 :
240 184 : MT_lock_set(&mal_remoteLock);
241 :
242 : /* generate an unique connection name, they are only known
243 : * within one mserver, id is primary key, the rest is super key */
244 186 : snprintf(conn, BUFSIZ, "%s_%s_%zu", mapi_get_dbname(m), *user,
245 : connection_id++);
246 : /* make sure we can construct MAL identifiers using conn */
247 5190 : for (s = conn; *s != '\0'; s++) {
248 5004 : if (!isalnum((unsigned char) *s)) {
249 717 : *s = '_';
250 : }
251 : }
252 :
253 186 : if (mapi_reconnect(m) != MOK) {
254 5 : MT_lock_unset(&mal_remoteLock);
255 5 : msg = createException(IO, "remote.connect",
256 : "unable to connect to '%s': %s",
257 : *ouri, mapi_error_str(m));
258 5 : mapi_destroy(m);
259 5 : return msg;
260 : }
261 :
262 181 : if (columnar && *columnar) {
263 1 : char set_protocol_query_buf[50];
264 1 : snprintf(set_protocol_query_buf, 50, "sql.set_protocol(%d:int);",
265 : PROTOCOL_COLUMNAR);
266 1 : if ((msg = RMTquery(&hdl, "remote.connect", m, set_protocol_query_buf))) {
267 0 : mapi_destroy(m);
268 0 : MT_lock_unset(&mal_remoteLock);
269 0 : return msg;
270 : }
271 : }
272 :
273 : /* connection established, add to list */
274 181 : c = GDKzalloc(sizeof(struct _connection));
275 181 : if (c == NULL || (c->name = GDKstrdup(conn)) == NULL) {
276 0 : GDKfree(c);
277 0 : mapi_destroy(m);
278 0 : MT_lock_unset(&mal_remoteLock);
279 0 : throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
280 : }
281 181 : c->mconn = m;
282 181 : c->nextid = 0;
283 181 : MT_lock_init(&c->lock, c->name);
284 181 : c->next = conns;
285 181 : conns = c;
286 :
287 181 : msg = RMTquery(&hdl, "remote.connect", m, "remote.bintype();");
288 181 : if (msg) {
289 0 : MT_lock_unset(&mal_remoteLock);
290 0 : return msg;
291 : }
292 362 : if (hdl != NULL && mapi_fetch_row(hdl)) {
293 181 : char *val = mapi_fetch_field(hdl, 0);
294 181 : c->type = (unsigned char) atoi(val);
295 181 : mapi_close_handle(hdl);
296 : } else {
297 0 : c->type = 0;
298 : }
299 :
300 : #ifdef _DEBUG_MAPI_
301 : mapi_trace(c->mconn, true);
302 : #endif
303 181 : if (c->type != localtype && (c->type | RMTT_HGE) == localtype) {
304 : /* we support hge, and for remote, we don't know */
305 0 : msg = RMTquery(&hdl, "remote.connect", m, "x := 0:hge;");
306 0 : if (msg) {
307 0 : c->int128 = false;
308 : } else {
309 0 : mapi_close_handle(hdl);
310 0 : c->int128 = true;
311 0 : c->type |= RMTT_HGE;
312 : }
313 181 : } else if (c->type == localtype) {
314 181 : c->int128 = int128;
315 : }
316 181 : MT_lock_unset(&mal_remoteLock);
317 :
318 181 : *ret = GDKstrdup(conn);
319 181 : if (*ret == NULL)
320 0 : throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
321 : return (MAL_SUCCEED);
322 : }
323 :
324 : static str
325 179 : RMTconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
326 : {
327 179 : (void) cntxt;
328 179 : (void) mb;
329 179 : str *ret = getArgReference_str(stk, pci, 0);
330 179 : str *uri = getArgReference_str(stk, pci, 1);
331 179 : str *user = getArgReference_str(stk, pci, 2);
332 179 : str *passwd = getArgReference_str(stk, pci, 3);
333 :
334 179 : str scen = "msql";
335 :
336 179 : if (pci->argc >= 5)
337 179 : scen = *getArgReference_str(stk, pci, 4);
338 :
339 179 : return RMTconnectScen(ret, uri, user, passwd, &scen, NULL);
340 : }
341 :
342 : /**
343 : * Disconnects a connection. The connection needs not to exist in the
344 : * system, it only needs to exist for the client (i.e. it was once
345 : * created).
346 : */
347 : str
348 180 : RMTdisconnect(void *ret, str *conn)
349 : {
350 180 : connection c, t;
351 :
352 180 : if (conn == NULL || *conn == NULL || strcmp(*conn, (str) str_nil) == 0)
353 0 : throw(ILLARG, "remote.disconnect", ILLEGAL_ARGUMENT ": connection "
354 : "is NULL or nil");
355 :
356 :
357 181 : (void) ret;
358 :
359 : /* we need a lock because the same user can be handled by multiple
360 : * threads */
361 181 : MT_lock_set(&mal_remoteLock);
362 181 : c = conns;
363 181 : t = NULL; /* parent */
364 : /* walk through the list */
365 249 : while (c != NULL) {
366 249 : if (strcmp(c->name, *conn) == 0) {
367 : /* ok, delete it... */
368 181 : if (t == NULL) {
369 151 : conns = c->next;
370 : } else {
371 30 : t->next = c->next;
372 : }
373 :
374 181 : MT_lock_set(&c->lock); /* shared connection */
375 181 : mapi_disconnect(c->mconn);
376 181 : mapi_destroy(c->mconn);
377 181 : MT_lock_unset(&c->lock);
378 181 : MT_lock_destroy(&c->lock);
379 181 : GDKfree(c->name);
380 181 : GDKfree(c);
381 181 : MT_lock_unset(&mal_remoteLock);
382 181 : return MAL_SUCCEED;
383 : }
384 68 : t = c;
385 68 : c = c->next;
386 : }
387 :
388 0 : MT_lock_unset(&mal_remoteLock);
389 0 : throw(MAL, "remote.disconnect", "no such connection: %s", *conn);
390 : }
391 :
392 : /**
393 : * Helper function to return a connection matching a given string, or an
394 : * error if it does not exist. Since this function is internal, it
395 : * doesn't check the argument conn, as it should have been checked
396 : * already.
397 : * NOTE: this function acquires the mal_remoteLock before accessing conns
398 : */
399 : static inline str
400 5325 : RMTfindconn(connection *ret, const char *conn)
401 : {
402 5325 : connection c;
403 :
404 : /* just make sure the return isn't garbage */
405 5325 : *ret = NULL;
406 5325 : MT_lock_set(&mal_remoteLock); /* protect c */
407 5326 : c = conns;
408 7258 : while (c != NULL) {
409 7258 : if (strcmp(c->name, conn) == 0) {
410 5326 : *ret = c;
411 5326 : MT_lock_unset(&mal_remoteLock);
412 5326 : return (MAL_SUCCEED);
413 : }
414 1932 : c = c->next;
415 : }
416 0 : MT_lock_unset(&mal_remoteLock);
417 0 : throw(MAL, "remote.<findconn>", "no such connection: %s", conn);
418 : }
419 :
420 : /**
421 : * Little helper function that returns a GDKmalloced string containing a
422 : * valid identifier that is supposed to be unique in the connection's
423 : * remote context. The generated string depends on the module and
424 : * function the caller is in. But also the runtime context is important.
425 : * The format is rmt<id>_<retvar>_<type>. Every RMTgetId uses a fresh id,
426 : * to distinguish amongst different (parallel) execution context.
427 : * Re-use of this remote identifier should be done with care.
428 : * The encoding of the type allows for ease of type checking later on.
429 : */
430 : static inline str
431 3217 : RMTgetId(char *buf, size_t buflen, MalBlkPtr mb, InstrPtr p, int arg)
432 : {
433 3217 : InstrPtr f;
434 3217 : const char *mod;
435 3217 : char *var;
436 3217 : str rt;
437 3217 : char name[IDLENGTH] = { 0 };
438 3217 : static ATOMIC_TYPE idtag = ATOMIC_VAR_INIT(0);
439 :
440 3217 : if (p->retc == 0)
441 0 : throw(MAL, "remote.getId",
442 : ILLEGAL_ARGUMENT "MAL instruction misses retc");
443 :
444 3217 : var = getArgNameIntoBuffer(mb, p, arg, name);
445 3216 : f = getInstrPtr(mb, 0); /* top level function */
446 3216 : mod = getModuleId(f);
447 3216 : if (mod == NULL)
448 3216 : mod = "user";
449 3216 : rt = getTypeIdentifier(getArgType(mb, p, arg));
450 3217 : if (rt == NULL)
451 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
452 :
453 3217 : snprintf(buf, buflen, "rmt%u_%s_%s", (unsigned) ATOMIC_ADD(&idtag, 1), var,
454 : rt);
455 :
456 3217 : GDKfree(rt);
457 3217 : return (MAL_SUCCEED);
458 : }
459 :
460 : /**
461 : * Helper function to execute a query over the given connection,
462 : * returning the result handle. If communication fails in one way or
463 : * another, an error is returned. Since this function is internal, it
464 : * doesn't check the input arguments func, conn and query, as they
465 : * should have been checked already.
466 : * NOTE: this function assumes a lock for conn is set
467 : */
468 : static inline str
469 4111 : RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query)
470 : {
471 4111 : MapiHdl mhdl;
472 :
473 4111 : *ret = NULL;
474 4111 : mhdl = mapi_query(conn, query);
475 4110 : if (mhdl) {
476 4110 : if (mapi_result_error(mhdl) != NULL) {
477 5 : str err = createException(getExceptionType(mapi_result_error(mhdl)),
478 : func,
479 : "(mapi:monetdb://%s@%s/%s) %s",
480 : mapi_get_user(conn),
481 : mapi_get_host(conn),
482 : mapi_get_dbname(conn),
483 : getExceptionMessage(mapi_result_error(mhdl)));
484 5 : mapi_close_handle(mhdl);
485 5 : return (err);
486 : }
487 : } else {
488 0 : if (mapi_error(conn) != MOK) {
489 0 : throw(IO, func, "an error occurred on connection: %s",
490 : mapi_error_str(conn));
491 : } else {
492 0 : throw(MAL, func,
493 : "remote function invocation didn't return a result");
494 : }
495 : }
496 :
497 4104 : *ret = mhdl;
498 4104 : return (MAL_SUCCEED);
499 : }
500 :
501 : static str
502 341 : RMTprelude(void)
503 : {
504 341 : unsigned int type = 0;
505 :
506 : #ifdef WORDS_BIGENDIAN
507 : type |= RMTT_B_ENDIAN;
508 : #else
509 341 : type |= RMTT_L_ENDIAN;
510 : #endif
511 : #if SIZEOF_SIZE_T == SIZEOF_LNG
512 341 : type |= RMTT_64_BITS;
513 : #else
514 : type |= RMTT_32_BITS;
515 : #endif
516 : #if SIZEOF_OID == SIZEOF_LNG
517 341 : type |= RMTT_64_OIDS;
518 : #else
519 : type |= RMTT_32_OIDS;
520 : #endif
521 : #ifdef HAVE_HGE
522 341 : type |= RMTT_HGE;
523 341 : int128 = true;
524 : #endif
525 341 : localtype = (unsigned char) type;
526 :
527 341 : return (MAL_SUCCEED);
528 : }
529 :
530 : static str
531 339 : RMTepilogue(void *ret)
532 : {
533 339 : connection c, t;
534 :
535 339 : (void) ret;
536 :
537 339 : MT_lock_set(&mal_remoteLock); /* nobody allowed here */
538 : /* free connections list */
539 339 : c = conns;
540 339 : while (c != NULL) {
541 0 : t = c;
542 0 : c = c->next;
543 0 : MT_lock_set(&t->lock);
544 0 : mapi_destroy(t->mconn);
545 0 : MT_lock_unset(&t->lock);
546 0 : MT_lock_destroy(&t->lock);
547 0 : GDKfree(t->name);
548 0 : GDKfree(t);
549 : }
550 : /* not sure, but better be safe than sorry */
551 339 : conns = NULL;
552 339 : MT_lock_unset(&mal_remoteLock);
553 :
554 339 : return (MAL_SUCCEED);
555 : }
556 :
557 : static str
558 1393 : RMTreadbatheader(stream *sin, char *buf)
559 : {
560 1393 : ssize_t sz = 0, rd;
561 :
562 : /* read the JSON header */
563 224683 : while ((rd = mnstr_read(sin, &buf[sz], 1, 1)) == 1 && buf[sz] != '\n') {
564 223290 : sz += rd;
565 : }
566 1394 : if (rd < 0) {
567 0 : throw(MAL, "remote.get", "could not read BAT JSON header");
568 : }
569 1394 : if (buf[0] == '!') {
570 0 : char *result;
571 0 : if ((result = GDKstrdup(buf)) == NULL)
572 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
573 : return result;
574 : }
575 :
576 1394 : buf[sz] = '\0';
577 :
578 1394 : return MAL_SUCCEED;
579 : }
580 :
581 : typedef struct _binbat_v1 {
582 : int Ttype;
583 : oid Hseqbase;
584 : oid Tseqbase;
585 : bool
586 : Tsorted:1, Trevsorted:1, Tkey:1, Tnonil:1, Tdense:1;
587 : BUN size;
588 : size_t tailsize;
589 : size_t theapsize;
590 : } binbat;
591 :
592 : static str
593 1393 : RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in, bool must_flush, bool cint128)
594 : {
595 1393 : binbat bb = { 0, 0, 0, false, false, false, false, false, 0, 0, 0 };
596 1393 : char *nme = NULL;
597 1393 : char *val = NULL;
598 1393 : char tmp;
599 1393 : size_t len;
600 1393 : lng lv, *lvp;
601 :
602 1393 : BAT *b;
603 :
604 1393 : (void) cint128;
605 : /* hdr is a JSON structure that looks like
606 : * {"version":1,"ttype":6,"tseqbase":0,"tailsize":4,"theapsize":0}
607 : * we take the binary data directly from the stream */
608 :
609 : /* could skip whitespace, but we just don't allow that */
610 1393 : if (*hdr++ != '{')
611 0 : throw(MAL, "remote.bincopyfrom",
612 : "illegal input, not a JSON header (got '%s')", hdr - 1);
613 225077 : while (*hdr != '\0') {
614 223682 : switch (*hdr) {
615 33477 : case '"':
616 : /* we assume only numeric values, so all strings are
617 : * elems */
618 33477 : if (nme != NULL) {
619 16743 : *hdr = '\0';
620 : } else {
621 16734 : nme = hdr + 1;
622 : }
623 : break;
624 16743 : case ':':
625 16743 : val = hdr + 1;
626 16743 : break;
627 16743 : case ',':
628 : case '}':
629 16743 : if (val == NULL)
630 0 : throw(MAL, "remote.bincopyfrom",
631 : "illegal input, JSON value missing");
632 16743 : *hdr = '\0';
633 :
634 16743 : lvp = &lv;
635 16743 : len = sizeof(lv);
636 : /* tseqbase can be 1<<31/1<<63 which causes overflow
637 : * in lngFromStr, so we check separately */
638 16743 : if (strcmp(val,
639 : #if SIZEOF_OID == 8
640 : "9223372036854775808"
641 : #else
642 : "2147483648"
643 : #endif
644 1363 : ) == 0 && strcmp(nme, "tseqbase") == 0) {
645 1368 : bb.Tseqbase = oid_nil;
646 : } else {
647 : /* all values should be non-negative, so we check that
648 : * here as well */
649 15375 : if (lngFromStr(val, &len, &lvp, true) < 0 ||
650 15377 : lv < 0 /* includes lng_nil */ )
651 0 : throw(MAL, "remote.bincopyfrom",
652 : "bad %s value: %s", nme, val);
653 :
654 : /* deal with nme and val */
655 15377 : if (strcmp(nme, "version") == 0) {
656 1393 : if (lv != 1)
657 0 : throw(MAL, "remote.bincopyfrom",
658 : "unsupported version: %s", val);
659 13984 : } else if (strcmp(nme, "hseqbase") == 0) {
660 : #if SIZEOF_OID < SIZEOF_LNG
661 : if (lv > GDK_oid_max)
662 : throw(MAL, "remote.bincopyfrom",
663 : "bad %s value: %s", nme, val);
664 : #endif
665 1396 : bb.Hseqbase = (oid) lv;
666 12588 : } else if (strcmp(nme, "ttype") == 0) {
667 1394 : if (lv >= GDKatomcnt)
668 0 : throw(MAL, "remote.bincopyfrom",
669 : "bad %s value: GDK atom number %s doesn't exist",
670 : nme, val);
671 1394 : bb.Ttype = (int) lv;
672 11194 : } else if (strcmp(nme, "tseqbase") == 0) {
673 : #if SIZEOF_OID < SIZEOF_LNG
674 : if (lv > GDK_oid_max)
675 : throw(MAL, "remote.bincopyfrom",
676 : "bad %s value: %s", nme, val);
677 : #endif
678 28 : bb.Tseqbase = (oid) lv;
679 11166 : } else if (strcmp(nme, "tsorted") == 0) {
680 1396 : bb.Tsorted = lv != 0;
681 9770 : } else if (strcmp(nme, "trevsorted") == 0) {
682 1396 : bb.Trevsorted = lv != 0;
683 8374 : } else if (strcmp(nme, "tkey") == 0) {
684 1394 : bb.Tkey = lv != 0;
685 6980 : } else if (strcmp(nme, "tnonil") == 0) {
686 1396 : bb.Tnonil = lv != 0;
687 5584 : } else if (strcmp(nme, "tdense") == 0) {
688 1396 : bb.Tdense = lv != 0;
689 4188 : } else if (strcmp(nme, "size") == 0) {
690 1396 : if (lv > (lng) BUN_MAX)
691 0 : throw(MAL, "remote.bincopyfrom",
692 : "bad %s value: %s", nme, val);
693 1396 : bb.size = (BUN) lv;
694 2792 : } else if (strcmp(nme, "tailsize") == 0) {
695 1396 : bb.tailsize = (size_t) lv;
696 1396 : } else if (strcmp(nme, "theapsize") == 0) {
697 1396 : bb.theapsize = (size_t) lv;
698 : } else {
699 0 : throw(MAL, "remote.bincopyfrom",
700 : "unknown element: %s", nme);
701 : }
702 : }
703 : nme = val = NULL;
704 : break;
705 : }
706 223684 : hdr++;
707 : }
708 : #ifdef HAVE_HGE
709 1395 : if (int128 && !cint128 && bb.Ttype >= TYPE_hge)
710 0 : bb.Ttype++;
711 : #else
712 : (void) cint128;
713 : #endif
714 :
715 2702 : b = COLnew2(bb.Hseqbase, bb.Ttype, bb.size, TRANSIENT,
716 1307 : bb.size > 0 ? (uint16_t) (bb.tailsize / bb.size) : 0);
717 1395 : if (b == NULL)
718 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
719 :
720 1395 : if (bb.tailsize > 0) {
721 2568 : if (HEAPextend(b->theap, bb.tailsize, true) != GDK_SUCCEED ||
722 1284 : mnstr_read(in, b->theap->base, bb.tailsize, 1) < 0)
723 0 : goto bailout;
724 1284 : b->theap->dirty = true;
725 : }
726 1395 : if (bb.theapsize > 0) {
727 152 : if ((b->tvheap->base == NULL &&
728 76 : (*BATatoms[b->ttype].atomHeap) (b->tvheap,
729 : b->batCapacity) != GDK_SUCCEED)
730 76 : || HEAPextend(b->tvheap, bb.theapsize, true) != GDK_SUCCEED
731 76 : || mnstr_read(in, b->tvheap->base, bb.theapsize, 1) < 0)
732 0 : goto bailout;
733 76 : b->tvheap->free = bb.theapsize;
734 76 : b->tvheap->dirty = true;
735 : }
736 :
737 : /* set properties */
738 1395 : b->tseqbase = bb.Tdense ? bb.Tseqbase : oid_nil;
739 1395 : b->tsorted = bb.Tsorted;
740 1395 : b->trevsorted = bb.Trevsorted;
741 1395 : b->tkey = bb.Tkey;
742 1395 : b->tnonil = bb.Tnonil;
743 1395 : if (bb.Ttype == TYPE_str && bb.size)
744 76 : BATsetcapacity(b, (BUN) (bb.tailsize >> b->tshift));
745 1394 : BATsetcount(b, bb.size);
746 :
747 : // read blockmode flush
748 1395 : while (must_flush && mnstr_read(in, &tmp, 1, 1) > 0) {
749 0 : TRC_ERROR(MAL_REMOTE, "Expected flush, got: %c\n", tmp);
750 : }
751 :
752 1395 : BATsettrivprop(b);
753 :
754 1395 : *ret = b;
755 1395 : return (MAL_SUCCEED);
756 :
757 : bailout:
758 0 : BBPreclaim(b);
759 0 : throw(MAL, "remote.bincopyfrom", "reading failed");
760 : }
761 :
762 : /**
763 : * get fetches the object referenced by ident over connection conn.
764 : * We are only interested in retrieving void-headed BATs, i.e. single columns.
765 : */
766 : static str
767 1394 : RMTget(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
768 : {
769 1394 : str conn, ident, tmp, rt;
770 1394 : connection c;
771 1394 : char qbuf[BUFSIZ + 1];
772 1394 : MapiHdl mhdl = NULL;
773 1394 : int rtype;
774 1394 : ValPtr v;
775 :
776 1394 : (void) mb;
777 1394 : (void) cntxt;
778 :
779 1394 : conn = *getArgReference_str(stk, pci, 1);
780 1394 : if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
781 0 : throw(ILLARG, "remote.get",
782 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
783 1394 : ident = *getArgReference_str(stk, pci, 2);
784 1394 : if (ident == 0 || isIdentifier(ident) < 0)
785 0 : throw(ILLARG, "remote.get",
786 : ILLEGAL_ARGUMENT ": identifier expected, got '%s'", ident);
787 :
788 : /* lookup conn, set c if valid */
789 1394 : rethrow("remote.get", tmp, RMTfindconn(&c, conn));
790 :
791 1394 : rtype = getArgType(mb, pci, 0);
792 1394 : v = &stk->stk[pci->argv[0]];
793 :
794 1394 : if (rtype == TYPE_any || isAnyExpression(rtype)) {
795 0 : char *tpe, *msg;
796 0 : tpe = getTypeName(rtype);
797 0 : msg = createException(MAL, "remote.get",
798 : ILLEGAL_ARGUMENT ": unsupported any type: %s",
799 : tpe);
800 0 : GDKfree(tpe);
801 0 : return msg;
802 : }
803 : /* check if the remote type complies with what we expect.
804 : Since the put() encodes the type as known to the remote site
805 : we can simple compare it here */
806 1394 : rt = getTypeIdentifier(rtype);
807 1392 : if (rt == NULL)
808 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
809 1392 : if (strcmp(ident + strlen(ident) - strlen(rt), rt)) {
810 0 : tmp = createException(MAL, "remote.get", ILLEGAL_ARGUMENT
811 : ": remote object type %s does not match expected type %s",
812 : rt, ident);
813 0 : GDKfree(rt);
814 0 : return tmp;
815 : }
816 1392 : GDKfree(rt);
817 :
818 1392 : if (isaBatType(rtype) && (localtype == 0177 || (localtype != c->type && localtype != (c->type | RMTT_HGE)))) {
819 0 : int t;
820 0 : size_t s;
821 0 : ptr r;
822 0 : str var;
823 0 : BAT *b;
824 :
825 0 : snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
826 :
827 0 : TRC_DEBUG(MAL_REMOTE, "Remote get: %s\n", qbuf);
828 :
829 : /* this call should be a single transaction over the channel */
830 0 : MT_lock_set(&c->lock);
831 :
832 0 : if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf))
833 : != MAL_SUCCEED) {
834 0 : TRC_ERROR(MAL_REMOTE, "Remote get: %s\n%s\n", qbuf, tmp);
835 0 : MT_lock_unset(&c->lock);
836 0 : var = createException(MAL, "remote.get", "%s", tmp);
837 0 : freeException(tmp);
838 0 : return var;
839 : }
840 0 : t = getBatType(rtype);
841 0 : b = COLnew(0, t, 0, TRANSIENT);
842 0 : if (b == NULL) {
843 0 : mapi_close_handle(mhdl);
844 0 : MT_lock_unset(&c->lock);
845 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
846 : }
847 :
848 0 : if (ATOMbasetype(t) == TYPE_str) {
849 0 : while (mapi_fetch_row(mhdl)) {
850 0 : var = mapi_fetch_field(mhdl, 1);
851 0 : if (BUNappend(b, var == NULL ? str_nil : var, false) != GDK_SUCCEED) {
852 0 : BBPreclaim(b);
853 0 : mapi_close_handle(mhdl);
854 0 : MT_lock_unset(&c->lock);
855 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
856 : }
857 : }
858 : } else
859 0 : while (mapi_fetch_row(mhdl)) {
860 0 : var = mapi_fetch_field(mhdl, 1);
861 0 : if (var == NULL)
862 0 : var = "nil";
863 0 : s = 0;
864 0 : r = NULL;
865 0 : if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
866 0 : BUNappend(b, r, false) != GDK_SUCCEED) {
867 0 : BBPreclaim(b);
868 0 : GDKfree(r);
869 0 : mapi_close_handle(mhdl);
870 0 : MT_lock_unset(&c->lock);
871 0 : throw(MAL, "remote.get", GDK_EXCEPTION);
872 : }
873 0 : GDKfree(r);
874 : }
875 :
876 0 : *v = (ValRecord) {
877 0 : .val.bval = b->batCacheid,
878 : .bat = true,
879 0 : .vtype = b->ttype,
880 : };
881 0 : BBPkeepref(b);
882 :
883 0 : mapi_close_handle(mhdl);
884 0 : MT_lock_unset(&c->lock);
885 1392 : } else if (isaBatType(rtype)) {
886 : /* binary compatible remote host, transfer BAT in binary form */
887 1392 : stream *sout;
888 1392 : stream *sin;
889 1392 : char buf[256];
890 1392 : BAT *b = NULL;
891 :
892 : /* this call should be a single transaction over the channel */
893 1392 : MT_lock_set(&c->lock);
894 :
895 : /* bypass Mapi from this point to efficiently write all data to
896 : * the server */
897 1394 : sout = mapi_get_to(c->mconn);
898 1392 : sin = mapi_get_from(c->mconn);
899 1394 : if (sin == NULL || sout == NULL) {
900 0 : MT_lock_unset(&c->lock);
901 0 : throw(MAL, "remote.get", "Connection lost");
902 : }
903 :
904 : /* call our remote helper to do this more efficiently */
905 1394 : mnstr_printf(sout, "remote.batbincopy(%s);\n", ident);
906 1391 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
907 :
908 1392 : if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED) {
909 0 : MT_lock_unset(&c->lock);
910 0 : return tmp;
911 : }
912 :
913 1393 : if ((tmp = RMTinternalcopyfrom(&b, buf, sin, true, c->int128)) != MAL_SUCCEED) {
914 0 : MT_lock_unset(&c->lock);
915 0 : return (tmp);
916 : }
917 :
918 1393 : *v = (ValRecord) {
919 1393 : .val.bval = b->batCacheid,
920 : .bat = true,
921 1393 : .vtype = b->ttype,
922 : };
923 1393 : BBPkeepref(b);
924 :
925 1394 : MT_lock_unset(&c->lock);
926 : } else {
927 0 : ptr p = NULL;
928 0 : str val;
929 0 : size_t len = 0;
930 :
931 0 : snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
932 0 : TRC_DEBUG(MAL_REMOTE, "Remote get: %s - %s\n", c->name, qbuf);
933 0 : if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf)) != MAL_SUCCEED) {
934 0 : return tmp;
935 : }
936 0 : (void) mapi_fetch_row(mhdl); /* should succeed */
937 0 : val = mapi_fetch_field(mhdl, 0);
938 :
939 0 : if (ATOMbasetype(rtype) == TYPE_str) {
940 0 : if (!VALinit(v, rtype, val == NULL ? str_nil : val)) {
941 0 : mapi_close_handle(mhdl);
942 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
943 : }
944 0 : } else if (ATOMfromstr(rtype, &p, &len, val == NULL ? "nil" : val, true)
945 : < 0) {
946 0 : char *msg;
947 0 : msg = createException(MAL, "remote.get",
948 : "unable to parse value: %s",
949 : val == NULL ? "nil" : val);
950 0 : mapi_close_handle(mhdl);
951 0 : GDKfree(p);
952 0 : return msg;
953 : } else {
954 0 : VALset(v, rtype, p);
955 0 : if (ATOMextern(rtype) == 0)
956 0 : GDKfree(p);
957 : }
958 :
959 0 : mapi_close_handle(mhdl);
960 : }
961 :
962 : return (MAL_SUCCEED);
963 : }
964 :
965 : /**
966 : * stores the given object on the remote host. The identifier of the
967 : * object on the remote host is returned for later use.
968 : */
969 : static str
970 3221 : RMTput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
971 : {
972 3221 : str conn, tmp;
973 3221 : char ident[512];
974 3221 : connection c;
975 3221 : ValPtr v;
976 3221 : int type;
977 3221 : ptr value;
978 3221 : MapiHdl mhdl = NULL;
979 :
980 3221 : (void) cntxt;
981 :
982 3221 : conn = *getArgReference_str(stk, pci, 1);
983 3221 : if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
984 4 : throw(ILLARG, "remote.put",
985 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
986 :
987 : /* lookup conn */
988 3217 : rethrow("remote.put", tmp, RMTfindconn(&c, conn));
989 :
990 : /* put the thing */
991 3217 : type = getArgType(mb, pci, 2);
992 3217 : value = getArgReference(stk, pci, 2);
993 :
994 : /* this call should be a single transaction over the channel */
995 3217 : MT_lock_set(&c->lock);
996 :
997 : /* get a free, typed identifier for the remote host */
998 3217 : tmp = RMTgetId(ident, sizeof(ident), mb, pci, 2);
999 3217 : if (tmp != MAL_SUCCEED) {
1000 0 : MT_lock_unset(&c->lock);
1001 0 : return tmp;
1002 : }
1003 :
1004 : /* depending on the input object generate actions to store the
1005 : * object remotely*/
1006 3217 : if (type == TYPE_any || isAnyExpression(type)) {
1007 0 : char *tpe, *msg;
1008 0 : MT_lock_unset(&c->lock);
1009 0 : tpe = getTypeName(type);
1010 0 : msg = createException(MAL, "remote.put", "unsupported type: %s", tpe);
1011 0 : GDKfree(tpe);
1012 0 : return msg;
1013 4612 : } else if (isaBatType(type) && !is_bat_nil(*(bat *) value)) {
1014 1396 : BATiter bi;
1015 : /* naive approach using bat.new() and bat.insert() calls */
1016 1396 : char *tail;
1017 1396 : bat bid;
1018 1396 : BAT *b = NULL;
1019 1396 : BUN p, q;
1020 1396 : str tailv;
1021 1396 : stream *sout;
1022 :
1023 1396 : tail = getTypeIdentifier(getBatType(type));
1024 1396 : if (tail == NULL) {
1025 0 : MT_lock_unset(&c->lock);
1026 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1027 : }
1028 :
1029 1396 : bid = *(bat *) value;
1030 1396 : if (bid != 0) {
1031 1396 : if ((b = BATdescriptor(bid)) == NULL) {
1032 0 : MT_lock_unset(&c->lock);
1033 0 : GDKfree(tail);
1034 0 : throw(MAL, "remote.put",
1035 : SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
1036 : }
1037 : }
1038 :
1039 : /* bypass Mapi from this point to efficiently write all data to
1040 : * the server */
1041 1396 : sout = mapi_get_to(c->mconn);
1042 :
1043 : /* call our remote helper to do this more efficiently */
1044 1396 : mnstr_printf(sout,
1045 : "%s := remote.batload(nil:%s, " BUNFMT ");\n",
1046 : ident, tail, (bid == 0 ? 0 : BATcount(b)));
1047 1396 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
1048 1394 : GDKfree(tail);
1049 :
1050 : /* b can be NULL if bid == 0 (only type given, ugh) */
1051 1395 : if (b) {
1052 2790 : int tpe = getBatType(type), trivial = tpe < TYPE_date
1053 1395 : || ATOMbasetype(tpe) == TYPE_str;
1054 1395 : const void *nil = ATOMnilptr(tpe);
1055 1395 : int (*atomcmp)(const void *, const void *) = ATOMcompare(tpe);
1056 :
1057 1395 : bi = bat_iterator(b);
1058 1396 : BATloop(b, p, q) {
1059 0 : const void *v = BUNtail(bi, p);
1060 0 : tailv = ATOMformat(tpe, v);
1061 0 : if (tailv == NULL) {
1062 0 : bat_iterator_end(&bi);
1063 0 : BBPunfix(b->batCacheid);
1064 0 : MT_lock_unset(&c->lock);
1065 0 : throw(MAL, "remote.put", GDK_EXCEPTION);
1066 : }
1067 0 : if (trivial || atomcmp(v, nil) == 0)
1068 0 : mnstr_printf(sout, "%s\n", tailv);
1069 : else
1070 0 : mnstr_printf(sout, "\"%s\"\n", tailv);
1071 0 : GDKfree(tailv);
1072 : }
1073 1396 : bat_iterator_end(&bi);
1074 1395 : BBPunfix(b->batCacheid);
1075 : }
1076 :
1077 : /* write the empty line the server is waiting for, handles
1078 : * all errors at the same time, if any */
1079 1396 : if ((tmp = RMTquery(&mhdl, "remote.put", c->mconn, ""))
1080 : != MAL_SUCCEED) {
1081 0 : MT_lock_unset(&c->lock);
1082 0 : return tmp;
1083 : }
1084 1395 : mapi_close_handle(mhdl);
1085 0 : } else if (isaBatType(type) && is_bat_nil(*(bat *) value)) {
1086 0 : stream *sout;
1087 0 : str typename = getTypeName(type);
1088 0 : sout = mapi_get_to(c->mconn);
1089 0 : mnstr_printf(sout, "%s := nil:%s;\n", ident, typename);
1090 0 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
1091 0 : GDKfree(typename);
1092 : } else {
1093 1821 : size_t l;
1094 1821 : str val;
1095 1821 : char *tpe;
1096 1821 : char qbuf[512], *nbuf = qbuf;
1097 1821 : const void *nil = ATOMnilptr(type), *p = value;
1098 1821 : int (*atomcmp)(const void *, const void *) = ATOMcompare(type);
1099 :
1100 1821 : if (ATOMextern(type))
1101 1264 : p = *(ptr *) value;
1102 :
1103 1821 : val = ATOMformat(type, p);
1104 1818 : if (val == NULL) {
1105 0 : MT_lock_unset(&c->lock);
1106 0 : throw(MAL, "remote.put", GDK_EXCEPTION);
1107 : }
1108 1818 : tpe = getTypeIdentifier(type);
1109 1819 : if (tpe == NULL) {
1110 0 : MT_lock_unset(&c->lock);
1111 0 : GDKfree(val);
1112 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1113 : }
1114 1819 : l = strlen(val) + strlen(tpe) + strlen(ident) + 10;
1115 1819 : if (l > (ssize_t) sizeof(qbuf) && (nbuf = GDKmalloc(l)) == NULL) {
1116 0 : MT_lock_unset(&c->lock);
1117 0 : GDKfree(val);
1118 0 : GDKfree(tpe);
1119 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1120 : }
1121 1819 : if (type < TYPE_date || ATOMbasetype(type) == TYPE_str
1122 8 : || atomcmp(p, nil) == 0)
1123 1814 : snprintf(nbuf, l, "%s := %s:%s;\n", ident, val, tpe);
1124 : else
1125 5 : snprintf(nbuf, l, "%s := \"%s\":%s;\n", ident, val, tpe);
1126 1819 : GDKfree(tpe);
1127 1821 : GDKfree(val);
1128 1821 : TRC_DEBUG(MAL_REMOTE, "Remote put: %s - %s\n", c->name, nbuf);
1129 1821 : tmp = RMTquery(&mhdl, "remote.put", c->mconn, nbuf);
1130 1816 : if (nbuf != qbuf)
1131 33 : GDKfree(nbuf);
1132 1816 : if (tmp != MAL_SUCCEED) {
1133 0 : MT_lock_unset(&c->lock);
1134 0 : return tmp;
1135 : }
1136 1816 : mapi_close_handle(mhdl);
1137 : }
1138 3213 : MT_lock_unset(&c->lock);
1139 :
1140 : /* return the identifier */
1141 3217 : v = &stk->stk[pci->argv[0]];
1142 3217 : if (VALinit(v, TYPE_str, ident) == NULL)
1143 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1144 : return (MAL_SUCCEED);
1145 : }
1146 :
1147 : /**
1148 : * stores the given <mod>.<fcn> on the remote host.
1149 : * An error is returned if the function is already known at the remote site.
1150 : * The implementation is based on serialisation of the block into a string
1151 : * followed by remote parsing.
1152 : */
1153 : static str
1154 0 : RMTregisterInternal(Client cntxt, char **fcn_id, const char *conn,
1155 : const char *mod, const char *fcn)
1156 : {
1157 0 : str tmp, qry, msg;
1158 0 : connection c;
1159 0 : char buf[BUFSIZ];
1160 0 : MapiHdl mhdl = NULL;
1161 0 : Symbol sym;
1162 :
1163 0 : if (strNil(conn))
1164 0 : throw(ILLARG, "remote.register",
1165 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1166 :
1167 : /* find local definition */
1168 0 : sym = findSymbol(cntxt->usermodule, putName(mod), putName(fcn));
1169 0 : if (sym == NULL)
1170 0 : throw(MAL, "remote.register",
1171 : ILLEGAL_ARGUMENT ": no such function: %s.%s", mod, fcn);
1172 :
1173 : /* lookup conn */
1174 0 : rethrow("remote.register", tmp, RMTfindconn(&c, conn));
1175 :
1176 : /* this call should be a single transaction over the channel */
1177 0 : MT_lock_set(&c->lock);
1178 :
1179 : /* get a free, typed identifier for the remote host */
1180 0 : char ident[512];
1181 0 : tmp = RMTgetId(ident, sizeof(ident), sym->def, getInstrPtr(sym->def, 0), 0);
1182 0 : if (tmp != MAL_SUCCEED) {
1183 0 : MT_lock_unset(&c->lock);
1184 0 : return tmp;
1185 : }
1186 :
1187 : /* check remote definition */
1188 0 : snprintf(buf, BUFSIZ,
1189 : "b:bit:=inspect.getExistence(\"%s\",\"%s\");\nio.print(b);", mod,
1190 : ident);
1191 0 : TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, buf);
1192 0 : if ((msg = RMTquery(&mhdl, "remote.register", c->mconn, buf)) != MAL_SUCCEED) {
1193 0 : MT_lock_unset(&c->lock);
1194 0 : return msg;
1195 : }
1196 :
1197 0 : char *result;
1198 0 : if (mapi_get_field_count(mhdl) && mapi_fetch_row(mhdl)
1199 0 : && (result = mapi_fetch_field(mhdl, 0))) {
1200 0 : if (strcmp(result, "false") != 0)
1201 0 : msg = createException(MAL, "remote.register",
1202 : "function already exists at the remote site: %s.%s",
1203 : mod, fcn);
1204 : } else
1205 0 : msg = createException(MAL, "remote.register", OPERATION_FAILED);
1206 :
1207 0 : mapi_close_handle(mhdl);
1208 :
1209 0 : if (msg) {
1210 0 : MT_lock_unset(&c->lock);
1211 0 : return msg;
1212 : }
1213 :
1214 0 : *fcn_id = GDKstrdup(ident);
1215 0 : if (*fcn_id == NULL) {
1216 0 : MT_lock_unset(&c->lock);
1217 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1218 : }
1219 :
1220 0 : Symbol prg;
1221 0 : if ((prg = newFunctionArgs(putName(mod), putName(*fcn_id), FUNCTIONsymbol, -1)) == NULL) {
1222 0 : MT_lock_unset(&c->lock);
1223 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1224 : }
1225 :
1226 : // We only need the Symbol not the inner program stub. So we clear it.
1227 0 : freeMalBlk(prg->def);
1228 0 : prg->def = NULL;
1229 :
1230 0 : if ((prg->def = copyMalBlk(sym->def)) == NULL) {
1231 0 : MT_lock_unset(&c->lock);
1232 0 : freeSymbol(prg);
1233 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1234 : }
1235 0 : setFunctionId(getInstrPtr(prg->def, 0), putName(*fcn_id));
1236 :
1237 : /* make sure the program is error free */
1238 0 : msg = chkProgram(cntxt->usermodule, prg->def);
1239 0 : if (msg != MAL_SUCCEED || prg->def->errors) {
1240 0 : MT_lock_unset(&c->lock);
1241 0 : if (msg)
1242 : return msg;
1243 0 : throw(MAL, "remote.register",
1244 : "function '%s.%s' contains syntax or type errors", mod, *fcn_id);
1245 : }
1246 :
1247 0 : qry = mal2str(prg->def, 0, prg->def->stop);
1248 0 : TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, qry);
1249 0 : msg = RMTquery(&mhdl, "remote.register", c->mconn, qry);
1250 0 : GDKfree(qry);
1251 0 : if (mhdl)
1252 0 : mapi_close_handle(mhdl);
1253 :
1254 0 : freeSymbol(prg);
1255 :
1256 0 : MT_lock_unset(&c->lock);
1257 0 : return msg;
1258 : }
1259 :
1260 : static str
1261 0 : RMTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1262 : {
1263 0 : char **fcn_id = getArgReference_str(stk, pci, 0);
1264 0 : const char *conn = *getArgReference_str(stk, pci, 1);
1265 0 : const char *mod = *getArgReference_str(stk, pci, 2);
1266 0 : const char *fcn = *getArgReference_str(stk, pci, 3);
1267 0 : (void) mb;
1268 0 : return RMTregisterInternal(cntxt, fcn_id, conn, mod, fcn);
1269 : }
1270 :
1271 : /**
1272 : * exec executes the function with its given arguments on the remote
1273 : * host, returning the function's return value. exec is purposely kept
1274 : * very spartan. All arguments need to be handles to previously put()
1275 : * values. It calls the function with the given arguments at the remote
1276 : * site, and returns the handle which stores the return value of the
1277 : * remotely executed function. This return value can be retrieved using
1278 : * a get call. It handles multiple return arguments.
1279 : */
1280 : static str
1281 715 : RMTexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1282 : {
1283 715 : str conn, mod, func, tmp;
1284 715 : int i;
1285 715 : size_t len, buflen;
1286 715 : connection c = NULL;
1287 715 : char *qbuf;
1288 715 : MapiHdl mhdl;
1289 :
1290 715 : (void) cntxt;
1291 715 : (void) mb;
1292 715 : bool no_return_arguments = 0;
1293 :
1294 715 : columnar_result_callback *rcb = NULL;
1295 715 : ValRecord *v = &(stk)->stk[(pci)->argv[4]];
1296 715 : if (pci->retc == 1 && (pci->argc >= 4) && (v->vtype == TYPE_ptr)) {
1297 1 : rcb = (columnar_result_callback *) v->val.pval;
1298 : }
1299 :
1300 2649 : for (i = 0; i < pci->retc; i++) {
1301 1934 : if (stk->stk[pci->argv[i]].vtype == TYPE_str) {
1302 1933 : tmp = *getArgReference_str(stk, pci, i);
1303 1933 : if (tmp == NULL || strcmp(tmp, (str) str_nil) == 0)
1304 0 : throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT
1305 : ": return value %d is NULL or nil", i);
1306 : } else
1307 : no_return_arguments = 1;
1308 : }
1309 :
1310 715 : conn = *getArgReference_str(stk, pci, i++);
1311 715 : if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
1312 0 : throw(ILLARG, "remote.exec",
1313 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1314 715 : mod = *getArgReference_str(stk, pci, i++);
1315 715 : if (mod == NULL || strcmp(mod, (str) str_nil) == 0)
1316 0 : throw(ILLARG, "remote.exec",
1317 : ILLEGAL_ARGUMENT ": module name is NULL or nil");
1318 715 : func = *getArgReference_str(stk, pci, i++);
1319 715 : if (func == NULL || strcmp(func, (str) str_nil) == 0)
1320 0 : throw(ILLARG, "remote.exec",
1321 : ILLEGAL_ARGUMENT ": function name is NULL or nil");
1322 :
1323 : /* lookup conn */
1324 715 : rethrow("remote.exec", tmp, RMTfindconn(&c, conn));
1325 :
1326 : /* this call should be a single transaction over the channel */
1327 715 : MT_lock_set(&c->lock);
1328 :
1329 715 : if (!no_return_arguments && pci->argc - pci->retc < 3) { /* conn, mod, func, ... */
1330 0 : MT_lock_unset(&c->lock);
1331 0 : throw(MAL, "remote.exec",
1332 : ILLEGAL_ARGUMENT " MAL instruction misses arguments");
1333 : }
1334 :
1335 715 : len = 0;
1336 : /* count how big a buffer we need */
1337 715 : len += 2 * (pci->retc > 1);
1338 715 : if (!no_return_arguments)
1339 2647 : for (i = 0; i < pci->retc; i++) {
1340 1933 : len += 2 * (i > 0);
1341 1933 : len += strlen(*getArgReference_str(stk, pci, i));
1342 : }
1343 :
1344 715 : const int arg_index = rcb ? 4 : 3;
1345 :
1346 715 : len += strlen(mod) + strlen(func) + 6;
1347 1999 : for (i = arg_index; i < pci->argc - pci->retc; i++) {
1348 1284 : len += 2 * (i > arg_index);
1349 1284 : len += strlen(*getArgReference_str(stk, pci, pci->retc + i));
1350 : }
1351 715 : len += 2;
1352 715 : buflen = len + 1;
1353 715 : if ((qbuf = GDKmalloc(buflen)) == NULL) {
1354 0 : MT_lock_unset(&c->lock);
1355 0 : throw(MAL, "remote.exec", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1356 : }
1357 :
1358 715 : len = 0;
1359 :
1360 715 : if (pci->retc > 1)
1361 69 : qbuf[len++] = '(';
1362 715 : if (!no_return_arguments)
1363 2646 : for (i = 0; i < pci->retc; i++)
1364 1932 : len += snprintf(&qbuf[len], buflen - len, "%s%s",
1365 1932 : (i > 0 ? ", " : ""), *getArgReference_str(stk, pci,
1366 : i));
1367 :
1368 715 : if (pci->retc > 1)
1369 69 : qbuf[len++] = ')';
1370 :
1371 : /* build the function invocation string in qbuf */
1372 715 : if (!no_return_arguments && pci->retc > 0) {
1373 714 : len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, func);
1374 : } else {
1375 1 : len += snprintf(&qbuf[len], buflen - len, " %s.%s(", mod, func);
1376 : }
1377 :
1378 : /* handle the arguments to the function */
1379 :
1380 : /* put the arguments one by one, and dynamically build the
1381 : * invocation string */
1382 1995 : for (i = arg_index; i < pci->argc - pci->retc; i++) {
1383 1280 : len += snprintf(&qbuf[len], buflen - len, "%s%s",
1384 : (i > arg_index ? ", " : ""),
1385 1280 : *(getArgReference_str(stk, pci, pci->retc + i)));
1386 : }
1387 :
1388 : /* finish end execute the invocation string */
1389 715 : len += snprintf(&qbuf[len], buflen - len, ");");
1390 715 : TRC_DEBUG(MAL_REMOTE, "Remote exec: %s - %s\n", c->name, qbuf);
1391 715 : tmp = RMTquery(&mhdl, "remote.exec", c->mconn, qbuf);
1392 715 : GDKfree(qbuf);
1393 :
1394 : /* Temporary hack:
1395 : * use a callback to immediately handle columnar results before hdl is destroyed. */
1396 713 : if (tmp == MAL_SUCCEED && rcb && mhdl
1397 1 : && (mapi_get_querytype(mhdl) == Q_TABLE
1398 0 : || mapi_get_querytype(mhdl) == Q_PREPARE)) {
1399 1 : int fields = mapi_get_field_count(mhdl);
1400 1 : columnar_result *results = GDKzalloc(sizeof(columnar_result) * fields);
1401 :
1402 1 : if (!results) {
1403 0 : tmp = createException(MAL, "remote.exec",
1404 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
1405 : } else {
1406 1 : int i = 0;
1407 1 : char buf[256] = { 0 };
1408 1 : stream *sin = mapi_get_from(c->mconn);
1409 :
1410 4 : for (; i < fields; i++) {
1411 2 : BAT *b = NULL;
1412 :
1413 2 : if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED ||
1414 2 : (tmp = RMTinternalcopyfrom(&b, buf, sin, i == fields - 1, c->int128)) != MAL_SUCCEED) {
1415 : break;
1416 : }
1417 :
1418 2 : results[i].id = b->batCacheid;
1419 2 : BBPkeepref(b);
1420 2 : results[i].colname = mapi_get_name(mhdl, i);
1421 2 : results[i].tpename = mapi_get_type(mhdl, i);
1422 2 : results[i].digits = mapi_get_digits(mhdl, i);
1423 2 : results[i].scale = mapi_get_scale(mhdl, i);
1424 : }
1425 :
1426 1 : if (tmp != MAL_SUCCEED) {
1427 0 : for (int j = 0; j < i; j++)
1428 0 : BBPrelease(results[j].id);
1429 : } else {
1430 1 : assert(rcb->context);
1431 1 : tmp = rcb->call(rcb->context, mapi_get_table(mhdl, 0), results,
1432 : fields);
1433 3 : for (int j = 0; j < i; j++)
1434 2 : BBPrelease(results[j].id);
1435 : }
1436 1 : GDKfree(results);
1437 : }
1438 : }
1439 :
1440 713 : if (rcb) {
1441 1 : GDKfree(rcb->context);
1442 1 : GDKfree(rcb);
1443 : }
1444 :
1445 713 : if (mhdl)
1446 708 : mapi_close_handle(mhdl);
1447 714 : MT_lock_unset(&c->lock);
1448 714 : return tmp;
1449 : }
1450 :
1451 : /**
1452 : * batload is a helper function to make transferring a BAT with RMTput
1453 : * more efficient. It works by creating a BAT, and loading it with the
1454 : * data as comma separated values from the input stream, until an empty
1455 : * line is read. The given size argument is taken as a hint only, and
1456 : * is not enforced to match the number of rows read.
1457 : */
1458 : static str
1459 1396 : RMTbatload(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1460 : {
1461 1396 : ValPtr v;
1462 1396 : int t;
1463 1396 : int size;
1464 1396 : ptr r;
1465 1396 : size_t s;
1466 1396 : BAT *b;
1467 1396 : size_t len;
1468 1396 : char *var;
1469 1396 : str msg = MAL_SUCCEED;
1470 1396 : bstream *fdin = cntxt->fdin;
1471 :
1472 1396 : v = &stk->stk[pci->argv[0]]; /* return */
1473 1396 : t = getArgType(mb, pci, 1); /* tail type */
1474 1396 : size = *getArgReference_int(stk, pci, 2); /* size */
1475 :
1476 1396 : b = COLnew(0, t, size, TRANSIENT);
1477 1396 : if (b == NULL)
1478 0 : throw(MAL, "remote.load", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1479 :
1480 : /* grab the input stream and start reading */
1481 1396 : fdin->eof = false;
1482 1396 : len = fdin->pos;
1483 1396 : while (len < fdin->len || bstream_next(fdin) > 0) {
1484 : /* newline hunting (how spartan) */
1485 1396 : for (len = fdin->pos; len < fdin->len && fdin->buf[len] != '\n';
1486 0 : len++) ;
1487 : /* unterminated line, request more */
1488 1396 : if (fdin->buf[len] != '\n')
1489 0 : continue;
1490 : /* empty line, end of input */
1491 1396 : if (fdin->pos == len) {
1492 1396 : if (isa_block_stream(fdin->s)) {
1493 1396 : ssize_t n = bstream_next(fdin);
1494 1396 : if (n)
1495 0 : msg = createException(MAL, "remote.load",
1496 : SQLSTATE(HY013)
1497 : "Unexpected return from remote");
1498 : }
1499 : break;
1500 : }
1501 0 : fdin->buf[len] = '\0'; /* kill \n */
1502 0 : var = &fdin->buf[fdin->pos];
1503 : /* skip over this line */
1504 0 : fdin->pos = ++len;
1505 :
1506 0 : s = 0;
1507 0 : r = NULL;
1508 0 : if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
1509 0 : BUNappend(b, r, false) != GDK_SUCCEED) {
1510 0 : BBPreclaim(b);
1511 0 : GDKfree(r);
1512 0 : throw(MAL, "remote.get", GDK_EXCEPTION);
1513 : }
1514 0 : GDKfree(r);
1515 : }
1516 :
1517 1396 : *v = (ValRecord) {
1518 1396 : .val.bval = b->batCacheid,
1519 : .bat = true,
1520 1396 : .vtype = b->ttype,
1521 : };
1522 1396 : BBPkeepref(b);
1523 :
1524 1396 : return msg;
1525 : }
1526 :
1527 : /**
1528 : * dump given BAT to stream
1529 : */
1530 : static str
1531 1394 : RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1532 : {
1533 1394 : bat bid = *getArgReference_bat(stk, pci, 1);
1534 1394 : BAT *b = BBPquickdesc(bid), *v = b;
1535 1394 : char sendtheap = 0, sendtvheap = 0;
1536 :
1537 1394 : (void) mb;
1538 1394 : (void) stk;
1539 1394 : (void) pci;
1540 :
1541 1394 : if (b == NULL)
1542 0 : throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_UNDEFINED);
1543 :
1544 1394 : if (BBPfix(bid) <= 0)
1545 0 : throw(MAL, "remote.bincopyto", MAL_MALLOC_FAIL);
1546 :
1547 1394 : sendtheap = b->ttype != TYPE_void;
1548 1394 : sendtvheap = sendtheap && b->tvheap;
1549 1394 : if (isVIEW(b) && sendtvheap && VIEWvtparent(b)
1550 42 : && BATcount(b) < BATcount(BBP_desc(VIEWvtparent(b)))) {
1551 18 : if ((b = BATdescriptor(bid)) == NULL) {
1552 0 : BBPunfix(bid);
1553 0 : throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_MISSING);
1554 : }
1555 18 : v = COLcopy(b, b->ttype, true, TRANSIENT);
1556 18 : BBPunfix(b->batCacheid);
1557 18 : if (v == NULL) {
1558 0 : BBPunfix(bid);
1559 0 : throw(MAL, "remote.bincopyto", GDK_EXCEPTION);
1560 : }
1561 : }
1562 :
1563 1394 : BATiter vi = bat_iterator(v);
1564 1484 : mnstr_printf(cntxt->fdout, /*JSON*/ "{"
1565 : "\"version\":1,"
1566 : "\"ttype\":%d,"
1567 : "\"hseqbase\":" OIDFMT ","
1568 : "\"tseqbase\":" OIDFMT ","
1569 : "\"tsorted\":%d,"
1570 : "\"trevsorted\":%d,"
1571 : "\"tkey\":%d,"
1572 : "\"tnonil\":%d,"
1573 : "\"tdense\":%d,"
1574 : "\"size\":" BUNFMT ","
1575 : "\"tailsize\":%zu,"
1576 : "\"theapsize\":%zu"
1577 : "}\n",
1578 1394 : vi.type,
1579 : v->hseqbase, v->tseqbase,
1580 1394 : vi.sorted, vi.revsorted,
1581 1394 : vi.key,
1582 1394 : vi.nonil,
1583 1394 : BATtdensebi(&vi),
1584 : vi.count,
1585 1366 : sendtheap ? (size_t) vi.count << vi.shift : 0,
1586 90 : sendtvheap && vi.count > 0 ? vi.vhfree : 0);
1587 :
1588 1394 : if (sendtheap && vi.count > 0) {
1589 1282 : mnstr_write(cntxt->fdout, /* tail */
1590 1282 : vi.base, vi.count * vi.width, 1);
1591 1282 : if (sendtvheap)
1592 75 : mnstr_write(cntxt->fdout, /* theap */
1593 75 : vi.vh->base, vi.vhfree, 1);
1594 : }
1595 1394 : bat_iterator_end(&vi);
1596 : /* flush is done by the calling environment (MAL) */
1597 :
1598 1394 : if (b != v)
1599 18 : BBPreclaim(v);
1600 :
1601 1394 : BBPunfix(bid);
1602 :
1603 1394 : return (MAL_SUCCEED);
1604 : }
1605 :
1606 : /**
1607 : * read from the input stream and give the BAT handle back to the caller
1608 : */
1609 : static str
1610 0 : RMTbincopyfrom(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1611 : {
1612 0 : BAT *b = NULL;
1613 0 : ValPtr v;
1614 0 : str err;
1615 :
1616 0 : (void) mb;
1617 :
1618 : /* We receive a normal line, which contains the JSON header, the
1619 : * rest is binary data directly on the stream. We get the first
1620 : * line from the buffered stream we have here, and pass it on
1621 : * together with the raw stream we have. */
1622 0 : cntxt->fdin->eof = false; /* in case it was before */
1623 0 : if (bstream_next(cntxt->fdin) <= 0)
1624 0 : throw(MAL, "remote.bincopyfrom", "expected JSON header");
1625 :
1626 0 : cntxt->fdin->buf[cntxt->fdin->len] = '\0';
1627 0 : err = RMTinternalcopyfrom(&b,
1628 0 : &cntxt->fdin->buf[cntxt->fdin->pos], cntxt->fdin->s, true, int128 /* library should be compatible */);
1629 : /* skip the JSON line */
1630 0 : cntxt->fdin->pos = ++cntxt->fdin->len;
1631 0 : if (err !=MAL_SUCCEED)
1632 : return (err);
1633 :
1634 0 : v = &stk->stk[pci->argv[0]];
1635 0 : *v = (ValRecord) {
1636 0 : .val.bval = b->batCacheid,
1637 : .bat = true,
1638 0 : .vtype = b->ttype,
1639 : };
1640 0 : BBPkeepref(b);
1641 :
1642 0 : return (MAL_SUCCEED);
1643 : }
1644 :
1645 : /**
1646 : * bintype identifies the system on its binary profile. This is mainly
1647 : * used to determine if BATs can be sent binary across.
1648 : */
1649 : static str
1650 181 : RMTbintype(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1651 : {
1652 181 : (void)mb;
1653 181 : (void)stk;
1654 181 : (void)pci;
1655 :
1656 : /* TODO bintype should include the (bin) protocol version */
1657 181 : mnstr_printf(cntxt->fdout, "[ %d ]\n", localtype);
1658 181 : return(MAL_SUCCEED);
1659 : }
1660 :
1661 : /**
1662 : * Returns whether the underlying connection is still connected or not.
1663 : * Best effort implementation on top of mapi using a ping.
1664 : */
1665 : static str
1666 0 : RMTisalive(int *ret, str *conn)
1667 : {
1668 0 : str tmp;
1669 0 : connection c;
1670 :
1671 0 : if (*conn == NULL || strcmp(*conn, (str) str_nil) == 0)
1672 0 : throw(ILLARG, "remote.get",
1673 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1674 :
1675 : /* lookup conn, set c if valid */
1676 0 : rethrow("remote.get", tmp, RMTfindconn(&c, *conn));
1677 :
1678 0 : *ret = 0;
1679 0 : if (mapi_is_connected(c->mconn) && mapi_ping(c->mconn) == MOK)
1680 0 : *ret = 1;
1681 :
1682 : return MAL_SUCCEED;
1683 : }
1684 :
1685 : // This is basically a no op
1686 : static str
1687 354 : RMTregisterSupervisor(int *ret, str *sup_uuid, str *query_uuid)
1688 : {
1689 354 : (void) sup_uuid;
1690 354 : (void) query_uuid;
1691 :
1692 354 : *ret = 0;
1693 354 : return MAL_SUCCEED;
1694 : }
1695 :
1696 : #include "mel.h"
1697 : mel_func remote_init_funcs[] = {
1698 : command("remote", "epilogue", RMTepilogue, false, "release the resources held by the remote module", args(1,1, arg("",void))),
1699 : command("remote", "resolve", RMTresolve, false, "resolve a pattern against Merovingian and return the URIs", args(1,2, batarg("",str),arg("pattern",str))),
1700 : pattern("remote", "connect", RMTconnect, false, "returns a newly created connection for uri, using user name and password", args(1,5, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str))),
1701 : command("remote", "connect", RMTconnectScen, false, "returns a newly created connection for uri, using user name, password and scenario", args(1,6, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str),arg("columnar",bit))),
1702 : command("remote", "disconnect", RMTdisconnect, false, "disconnects the connection pointed to by handle (received from a call to connect()", args(1,2, arg("",void),arg("conn",str))),
1703 : pattern("remote", "get", RMTget, false, "retrieves a copy of remote object ident", args(1,3, argany("",0),arg("conn",str),arg("ident",str))),
1704 : pattern("remote", "put", RMTput, false, "copies object to the remote site and returns its identifier", args(1,3, arg("",str),arg("conn",str),argany("object",0))),
1705 : pattern("remote", "register", RMTregister, false, "register <mod>.<fcn> at the remote site", args(1,4, arg("",str),arg("conn",str),arg("mod",str),arg("fcn",str))),
1706 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and returns the handle to its result", args(1,4, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
1707 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and returns the handle to its result", args(1,5, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
1708 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and applying function pointer rcb as callback to handle any results.", args(0,5, arg("conn",str),arg("mod",str),arg("func",str),arg("rcb",ptr), vararg("",str))),
1709 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and ignoring results.", args(0,4, arg("conn",str),arg("mod",str),arg("func",str), vararg("",str))),
1710 : command("remote", "isalive", RMTisalive, false, "check if conn is still valid and connected", args(1,2, arg("",int),arg("conn",str))),
1711 : pattern("remote", "batload", RMTbatload, false, "create a BAT of the given type and size, and load values from the input stream", args(1,3, batargany("",1),argany("tt",1),arg("size",int))),
1712 : pattern("remote", "batbincopy", RMTbincopyto, false, "dump BAT b in binary form to the stream", args(1,2, arg("",void),batargany("b",0))),
1713 : pattern("remote", "batbincopy", RMTbincopyfrom, false, "store the binary BAT data in the BBP and return as BAT", args(1,1, batargany("",0))),
1714 : pattern("remote", "bintype", RMTbintype, false, "print the binary type of this mserver5", args(1,1, arg("",void))),
1715 : command("remote", "register_supervisor", RMTregisterSupervisor, false, "Register the supervisor uuid at a remote site", args(1,3, arg("",int),arg("sup_uuid",str),arg("query_uuid",str))),
1716 : { .imp=NULL }
1717 : };
1718 : #include "mal_import.h"
1719 : #ifdef _MSC_VER
1720 : #undef read
1721 : #pragma section(".CRT$XCU",read)
1722 : #endif
1723 334 : LIB_STARTUP_FUNC(init_remote_mal)
1724 334 : { mal_module2("remote", NULL, remote_init_funcs, RMTprelude, NULL); }
|