Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * (c) Fabian Groffen, Martin Kersten
15 : * Remote querying functionality
16 : * Communication with other mservers at the MAL level is a delicate task.
17 : * However, it is indispensable for any distributed functionality. This
18 : * module provides an abstract way to store and retrieve objects on a
19 : * remote site. Additionally, functions on a remote site can be executed
20 : * using objects available in the remote session context. This yields in
21 : * four primitive functions that form the basis for distribution methods:
22 : * get, put, register and exec.
23 : *
24 : * The get method simply retrieves a copy of a remote object. Objects can
25 : * be simple values, strings or Column. The same holds for the put method,
26 : * but the other way around. A local object can be stored on a remote
27 : * site. Upon a successful store, the put method returns the remote
28 : * identifier for the stored object. With this identifier the object can
29 : * be addressed, e.g. using the get method to retrieve the object that was
30 : * stored using put.
31 : *
32 : * The get and put methods are symmetric. Performing a get on an
33 : * identifier that was returned by put, results in an object with the same
34 : * value and type as the one that was put. The result of such an operation is
35 : * equivalent to making an (expensive) copy of the original object.
36 : *
37 : * The register function takes a local MAL function and makes it known at a
38 : * remote site. It ensures that it does not overload an already known
39 : * operation remotely, which could create a semantic conflict.
40 : * Deregistering a function is forbidden, because it would allow for taking
41 : * over the remote site completely.
42 : * C-implemented functions, such as io.print() cannot be remotely stored.
43 : * It would require even more complicated (byte) code shipping and remote
44 : * compilation to make it work.
45 : *
46 : * The choice to let exec only execute functions avoids problems
47 : * to decide what should be returned to the caller. With a function it is
48 : * clear and simple to return that what the function signature prescribes.
49 : * Any side effect (e.g. io.print calls) may cause havoc in the system,
50 : * but are currently ignored.
51 : *
52 : * This leads to the final contract of this module. The methods should be
53 : * used correctly, by obeying their contract. Failing to do so will result
54 : * in errors and possibly undefined behaviour.
55 : *
56 : * The resolve() function can be used to query Merovingian. It returns one
57 : * or more databases discovered in its vicinity matching the given pattern.
58 : *
59 : */
60 : #include "monetdb_config.h"
61 : #include "remote.h"
62 :
63 : /*
64 : * Technically, these methods need to be serialised per connection,
65 : * hence a scheduler that interleaves e.g. multiple get calls, simply
66 : * violates this constraint. If parallelism to the same site is
67 : * desired, a user could create a second connection. This is not always
68 : * easy to generate at the proper place, e.g. overloading the dataflow
69 : * optimizer to patch connections structures is not acceptable.
70 : *
71 : * Instead, we maintain a simple lock with each connection, which can be
72 : * used to issue a safe, but blocking get/put/exec/register request.
73 : */
74 : #include "mal_exception.h"
75 : #include "mal_interpreter.h"
76 : #include "mal_function.h" /* for printFunction */
77 : #include "mal_listing.h"
78 : #include "mal_instruction.h" /* for getmodule/func macros */
79 : #include "mal_authorize.h"
80 : #include "mapi.h"
81 : #include "mutils.h"
82 :
83 : #define RMTT_L_ENDIAN (0<<1)
84 : #define RMTT_B_ENDIAN (1<<1)
85 : #define RMTT_32_BITS (0<<2)
86 : #define RMTT_64_BITS (1<<2)
87 : #define RMTT_32_OIDS (0<<3)
88 : #define RMTT_64_OIDS (1<<3)
89 : #define RMTT_HGE (1<<4)
90 :
91 : typedef struct _connection {
92 : MT_Lock lock; /* lock to avoid interference */
93 : str name; /* the handle for this connection */
94 : Mapi mconn; /* the Mapi handle for the connection */
95 : unsigned char type; /* binary profile of the connection target */
96 : bool int128; /* has int128 support */
97 : size_t nextid; /* id counter */
98 : struct _connection *next; /* the next connection in the list */
99 : } *connection;
100 :
101 : #ifndef WIN32
102 : #include <sys/socket.h> /* socket */
103 : #include <sys/un.h> /* sockaddr_un */
104 : #endif
105 : #include <unistd.h> /* gethostname */
106 :
107 : static MT_Lock mal_remoteLock = MT_LOCK_INITIALIZER(mal_remoteLock);
108 :
109 : static connection conns = NULL;
110 : static unsigned char localtype = 0177;
111 : static bool int128 = false;
112 :
113 : static inline str RMTquery(MapiHdl *ret, const char *func, Mapi conn,
114 : const char *query);
115 :
116 : /**
117 : * Returns a BAT with valid redirects for the given pattern. If
118 : * merovingian is not running, this function throws an error.
119 : */
120 : static str
121 0 : RMTresolve(bat *ret, str *pat)
122 : {
123 : #ifdef NATIVE_WIN32
124 : (void) ret;
125 : (void) pat;
126 : throw(MAL, "remote.resolve", "merovingian is not available on " "your platform, sorry"); /* please upgrade to Linux, etc. */
127 : #else
128 0 : BAT *list;
129 0 : const char *mero_uri;
130 0 : char *p;
131 0 : unsigned int port;
132 0 : char **redirs;
133 0 : char **or;
134 :
135 0 : if (pat == NULL || *pat == NULL || strcmp(*pat, (str) str_nil) == 0)
136 0 : throw(ILLARG, "remote.resolve",
137 : ILLEGAL_ARGUMENT ": pattern is NULL or nil");
138 :
139 0 : mero_uri = GDKgetenv("merovingian_uri");
140 0 : if (mero_uri == NULL)
141 0 : throw(MAL, "remote.resolve", "this function needs the mserver "
142 : "have been started by merovingian");
143 :
144 0 : list = COLnew(0, TYPE_str, 0, TRANSIENT);
145 0 : if (list == NULL)
146 0 : throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
147 :
148 : /* extract port from mero_uri, let mapi figure out the rest */
149 0 : mero_uri += strlen("mapi:monetdb://");
150 0 : if (*mero_uri == '[') {
151 0 : if ((mero_uri = strchr(mero_uri, ']')) == NULL) {
152 0 : BBPreclaim(list);
153 0 : throw(MAL, "remote.resolve",
154 : "illegal IPv6 address on merovingian_uri: %s",
155 : GDKgetenv("merovingian_uri"));
156 : }
157 : }
158 0 : if ((p = strchr(mero_uri, ':')) == NULL) {
159 0 : BBPreclaim(list);
160 0 : throw(MAL, "remote.resolve", "illegal merovingian_uri setting: %s",
161 : GDKgetenv("merovingian_uri"));
162 : }
163 0 : port = (unsigned int) atoi(p + 1);
164 :
165 0 : or = redirs = mapi_resolve(NULL, port, *pat);
166 :
167 0 : if (redirs == NULL) {
168 0 : BBPreclaim(list);
169 0 : throw(MAL, "remote.resolve", "unknown failure when resolving pattern");
170 : }
171 :
172 0 : while (*redirs != NULL) {
173 0 : if (BUNappend(list, (ptr) *redirs, false) != GDK_SUCCEED) {
174 0 : BBPreclaim(list);
175 0 : do
176 0 : free(*redirs);
177 0 : while (*++redirs);
178 0 : free(or);
179 0 : throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
180 : }
181 0 : free(*redirs);
182 0 : redirs++;
183 : }
184 0 : free(or);
185 :
186 0 : *ret = list->batCacheid;
187 0 : BBPkeepref(list);
188 0 : return (MAL_SUCCEED);
189 : #endif
190 : }
191 :
192 :
193 : /* for unique connection identifiers */
194 : static size_t connection_id = 0;
195 :
196 : /**
197 : * Returns a connection to the given uri. It always returns a newly
198 : * created connection.
199 : */
200 : static str
201 189 : RMTconnectScen(str *ret,
202 : str *ouri, str *user, str *passwd, str *scen, bit *columnar)
203 : {
204 189 : connection c;
205 189 : char conn[BUFSIZ];
206 189 : char *s;
207 189 : Mapi m;
208 189 : MapiHdl hdl;
209 189 : str msg;
210 :
211 : /* just make sure the return isn't garbage */
212 189 : *ret = 0;
213 :
214 189 : if (ouri == NULL || *ouri == NULL || strcmp(*ouri, (str) str_nil) == 0)
215 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": database uri "
216 : "is NULL or nil");
217 189 : if (user == NULL || *user == NULL || strcmp(*user, (str) str_nil) == 0)
218 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": username is "
219 : "NULL or nil");
220 189 : if (passwd == NULL || *passwd == NULL
221 189 : || strcmp(*passwd, (str) str_nil) == 0)
222 0 : throw(ILLARG, "remote.connect",
223 : ILLEGAL_ARGUMENT ": password is " "NULL or nil");
224 189 : if (scen == NULL || *scen == NULL || strcmp(*scen, (str) str_nil) == 0)
225 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario is "
226 : "NULL or nil");
227 189 : if (strcmp(*scen, "mal") != 0 && strcmp(*scen, "msql") != 0)
228 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario '%s' "
229 : "is not supported", *scen);
230 :
231 189 : m = mapi_mapiuri(*ouri, *user, *passwd, *scen);
232 190 : if (mapi_error(m)) {
233 0 : msg = createException(MAL, "remote.connect",
234 : "unable to connect to '%s': %s",
235 : *ouri, mapi_error_str(m));
236 0 : mapi_destroy(m);
237 0 : return msg;
238 : }
239 :
240 190 : MT_lock_set(&mal_remoteLock);
241 :
242 : /* generate an unique connection name, they are only known
243 : * within one mserver, id is primary key, the rest is super key */
244 190 : snprintf(conn, BUFSIZ, "%s_%s_%zu", mapi_get_dbname(m), *user,
245 : connection_id++);
246 : /* make sure we can construct MAL identifiers using conn */
247 5343 : for (s = conn; *s != '\0'; s++) {
248 5153 : if (!isalnum((unsigned char) *s)) {
249 738 : *s = '_';
250 : }
251 : }
252 :
253 190 : if (mapi_reconnect(m) != MOK) {
254 5 : MT_lock_unset(&mal_remoteLock);
255 5 : msg = createException(IO, "remote.connect",
256 : "unable to connect to '%s': %s",
257 : *ouri, mapi_error_str(m));
258 5 : mapi_destroy(m);
259 5 : return msg;
260 : }
261 :
262 185 : if (columnar && *columnar) {
263 1 : char set_protocol_query_buf[50];
264 1 : snprintf(set_protocol_query_buf, 50, "sql.set_protocol(%d:int);",
265 : PROTOCOL_COLUMNAR);
266 1 : if ((msg = RMTquery(&hdl, "remote.connect", m, set_protocol_query_buf))) {
267 0 : mapi_destroy(m);
268 0 : MT_lock_unset(&mal_remoteLock);
269 0 : return msg;
270 : }
271 : }
272 :
273 : /* connection established, add to list */
274 185 : c = GDKzalloc(sizeof(struct _connection));
275 185 : if (c == NULL || (c->name = GDKstrdup(conn)) == NULL) {
276 0 : GDKfree(c);
277 0 : mapi_destroy(m);
278 0 : MT_lock_unset(&mal_remoteLock);
279 0 : throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
280 : }
281 185 : c->mconn = m;
282 185 : c->nextid = 0;
283 185 : MT_lock_init(&c->lock, c->name);
284 185 : c->next = conns;
285 185 : conns = c;
286 :
287 185 : msg = RMTquery(&hdl, "remote.connect", m, "remote.bintype();");
288 185 : if (msg) {
289 0 : MT_lock_unset(&mal_remoteLock);
290 0 : return msg;
291 : }
292 370 : if (hdl != NULL && mapi_fetch_row(hdl)) {
293 185 : char *val = mapi_fetch_field(hdl, 0);
294 185 : c->type = (unsigned char) atoi(val);
295 185 : mapi_close_handle(hdl);
296 : } else {
297 0 : c->type = 0;
298 : }
299 :
300 : #ifdef _DEBUG_MAPI_
301 : mapi_trace(c->mconn, true);
302 : #endif
303 185 : if (c->type != localtype && (c->type | RMTT_HGE) == localtype) {
304 : /* we support hge, and for remote, we don't know */
305 0 : msg = RMTquery(&hdl, "remote.connect", m, "x := 0:hge;");
306 0 : if (msg) {
307 0 : c->int128 = false;
308 : } else {
309 0 : mapi_close_handle(hdl);
310 0 : c->int128 = true;
311 0 : c->type |= RMTT_HGE;
312 : }
313 185 : } else if (c->type == localtype) {
314 185 : c->int128 = int128;
315 : }
316 185 : MT_lock_unset(&mal_remoteLock);
317 :
318 185 : *ret = GDKstrdup(conn);
319 185 : if (*ret == NULL)
320 0 : throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
321 : return (MAL_SUCCEED);
322 : }
323 :
324 : static str
325 189 : RMTconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
326 : {
327 189 : (void) cntxt;
328 189 : (void) mb;
329 189 : str *ret = getArgReference_str(stk, pci, 0);
330 189 : str *uri = getArgReference_str(stk, pci, 1);
331 189 : str *user = getArgReference_str(stk, pci, 2);
332 189 : str *passwd = getArgReference_str(stk, pci, 3);
333 :
334 189 : str scen = "msql";
335 :
336 189 : if (pci->argc >= 5)
337 189 : scen = *getArgReference_str(stk, pci, 4);
338 :
339 189 : return RMTconnectScen(ret, uri, user, passwd, &scen, NULL);
340 : }
341 :
342 : /**
343 : * Disconnects a connection. The connection needs not to exist in the
344 : * system, it only needs to exist for the client (i.e. it was once
345 : * created).
346 : */
347 : str
348 185 : RMTdisconnect(void *ret, str *conn)
349 : {
350 185 : connection c, t;
351 :
352 185 : if (conn == NULL || *conn == NULL || strcmp(*conn, (str) str_nil) == 0)
353 0 : throw(ILLARG, "remote.disconnect", ILLEGAL_ARGUMENT ": connection "
354 : "is NULL or nil");
355 :
356 :
357 185 : (void) ret;
358 :
359 : /* we need a lock because the same user can be handled by multiple
360 : * threads */
361 185 : MT_lock_set(&mal_remoteLock);
362 185 : c = conns;
363 185 : t = NULL; /* parent */
364 : /* walk through the list */
365 248 : while (c != NULL) {
366 248 : if (strcmp(c->name, *conn) == 0) {
367 : /* ok, delete it... */
368 185 : if (t == NULL) {
369 150 : conns = c->next;
370 : } else {
371 35 : t->next = c->next;
372 : }
373 :
374 185 : MT_lock_set(&c->lock); /* shared connection */
375 185 : mapi_disconnect(c->mconn);
376 185 : mapi_destroy(c->mconn);
377 185 : MT_lock_unset(&c->lock);
378 185 : MT_lock_destroy(&c->lock);
379 185 : GDKfree(c->name);
380 185 : GDKfree(c);
381 185 : MT_lock_unset(&mal_remoteLock);
382 185 : return MAL_SUCCEED;
383 : }
384 63 : t = c;
385 63 : c = c->next;
386 : }
387 :
388 0 : MT_lock_unset(&mal_remoteLock);
389 0 : throw(MAL, "remote.disconnect", "no such connection: %s", *conn);
390 : }
391 :
392 : /**
393 : * Helper function to return a connection matching a given string, or an
394 : * error if it does not exist. Since this function is internal, it
395 : * doesn't check the argument conn, as it should have been checked
396 : * already.
397 : * NOTE: this function acquires the mal_remoteLock before accessing conns
398 : */
399 : static inline str
400 5369 : RMTfindconn(connection *ret, const char *conn)
401 : {
402 5369 : connection c;
403 :
404 : /* just make sure the return isn't garbage */
405 5369 : *ret = NULL;
406 5369 : MT_lock_set(&mal_remoteLock); /* protect c */
407 5369 : c = conns;
408 7015 : while (c != NULL) {
409 7015 : if (strcmp(c->name, conn) == 0) {
410 5369 : *ret = c;
411 5369 : MT_lock_unset(&mal_remoteLock);
412 5369 : return (MAL_SUCCEED);
413 : }
414 1646 : c = c->next;
415 : }
416 0 : MT_lock_unset(&mal_remoteLock);
417 0 : throw(MAL, "remote.<findconn>", "no such connection: %s", conn);
418 : }
419 :
420 : /**
421 : * Little helper function that returns a GDKmalloced string containing a
422 : * valid identifier that is supposed to be unique in the connection's
423 : * remote context. The generated string depends on the module and
424 : * function the caller is in. But also the runtime context is important.
425 : * The format is rmt<id>_<retvar>_<type>. Every RMTgetId uses a fresh id,
426 : * to distinguish amongst different (parallel) execution context.
427 : * Re-use of this remote identifier should be done with care.
428 : * The encoding of the type allows for ease of type checking later on.
429 : */
430 : static inline str
431 3249 : RMTgetId(char *buf, size_t buflen, MalBlkPtr mb, InstrPtr p, int arg)
432 : {
433 3249 : InstrPtr f;
434 3249 : const char *mod;
435 3249 : char *var;
436 3249 : str rt;
437 3249 : static ATOMIC_TYPE idtag = ATOMIC_VAR_INIT(0);
438 :
439 3249 : if (p->retc == 0)
440 0 : throw(MAL, "remote.getId",
441 : ILLEGAL_ARGUMENT "MAL instruction misses retc");
442 :
443 3249 : var = getArgName(mb, p, arg);
444 3249 : f = getInstrPtr(mb, 0); /* top level function */
445 3249 : mod = getModuleId(f);
446 3249 : if (mod == NULL)
447 3249 : mod = "user";
448 3249 : rt = getTypeIdentifier(getArgType(mb, p, arg));
449 3249 : if (rt == NULL)
450 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
451 :
452 3249 : snprintf(buf, buflen, "rmt%u_%s_%s", (unsigned) ATOMIC_ADD(&idtag, 1), var,
453 : rt);
454 :
455 3249 : GDKfree(rt);
456 3249 : return (MAL_SUCCEED);
457 : }
458 :
459 : /**
460 : * Helper function to execute a query over the given connection,
461 : * returning the result handle. If communication fails in one way or
462 : * another, an error is returned. Since this function is internal, it
463 : * doesn't check the input arguments func, conn and query, as they
464 : * should have been checked already.
465 : * NOTE: this function assumes a lock for conn is set
466 : */
467 : static inline str
468 4160 : RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query)
469 : {
470 4160 : MapiHdl mhdl;
471 :
472 4160 : *ret = NULL;
473 4160 : mhdl = mapi_query(conn, query);
474 4160 : if (mhdl) {
475 4160 : if (mapi_result_error(mhdl) != NULL) {
476 8 : str err = createException(getExceptionType(mapi_result_error(mhdl)),
477 : func,
478 : "(mapi:monetdb://%s@%s/%s) %s",
479 : mapi_get_user(conn),
480 : mapi_get_host(conn),
481 : mapi_get_dbname(conn),
482 : getExceptionMessage(mapi_result_error(mhdl)));
483 8 : mapi_close_handle(mhdl);
484 8 : return (err);
485 : }
486 : } else {
487 0 : if (mapi_error(conn) != MOK) {
488 0 : throw(IO, func, "an error occurred on connection: %s",
489 : mapi_error_str(conn));
490 : } else {
491 0 : throw(MAL, func,
492 : "remote function invocation didn't return a result");
493 : }
494 : }
495 :
496 4151 : *ret = mhdl;
497 4151 : return (MAL_SUCCEED);
498 : }
499 :
500 : static str
501 336 : RMTprelude(void)
502 : {
503 336 : unsigned int type = 0;
504 :
505 : #ifdef WORDS_BIGENDIAN
506 : type |= RMTT_B_ENDIAN;
507 : #else
508 336 : type |= RMTT_L_ENDIAN;
509 : #endif
510 : #if SIZEOF_SIZE_T == SIZEOF_LNG
511 336 : type |= RMTT_64_BITS;
512 : #else
513 : type |= RMTT_32_BITS;
514 : #endif
515 : #if SIZEOF_OID == SIZEOF_LNG
516 336 : type |= RMTT_64_OIDS;
517 : #else
518 : type |= RMTT_32_OIDS;
519 : #endif
520 : #ifdef HAVE_HGE
521 336 : type |= RMTT_HGE;
522 336 : int128 = true;
523 : #endif
524 336 : localtype = (unsigned char) type;
525 :
526 336 : return (MAL_SUCCEED);
527 : }
528 :
529 : static str
530 334 : RMTepilogue(void *ret)
531 : {
532 334 : connection c, t;
533 :
534 334 : (void) ret;
535 :
536 334 : MT_lock_set(&mal_remoteLock); /* nobody allowed here */
537 : /* free connections list */
538 334 : c = conns;
539 334 : while (c != NULL) {
540 0 : t = c;
541 0 : c = c->next;
542 0 : MT_lock_set(&t->lock);
543 0 : mapi_destroy(t->mconn);
544 0 : MT_lock_unset(&t->lock);
545 0 : MT_lock_destroy(&t->lock);
546 0 : GDKfree(t->name);
547 0 : GDKfree(t);
548 : }
549 : /* not sure, but better be safe than sorry */
550 334 : conns = NULL;
551 334 : MT_lock_unset(&mal_remoteLock);
552 :
553 334 : return (MAL_SUCCEED);
554 : }
555 :
556 : static str
557 1397 : RMTreadbatheader(stream *sin, char *buf)
558 : {
559 1397 : ssize_t sz = 0, rd;
560 :
561 : /* read the JSON header */
562 226836 : while ((rd = mnstr_read(sin, &buf[sz], 1, 1)) == 1 && buf[sz] != '\n') {
563 225439 : sz += rd;
564 : }
565 1397 : if (rd < 0) {
566 0 : throw(MAL, "remote.get", "could not read BAT JSON header");
567 : }
568 1397 : if (buf[0] == '!') {
569 0 : char *result;
570 0 : if ((result = GDKstrdup(buf)) == NULL)
571 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
572 : return result;
573 : }
574 :
575 1397 : buf[sz] = '\0';
576 :
577 1397 : return MAL_SUCCEED;
578 : }
579 :
580 : typedef struct _binbat_v1 {
581 : int Ttype;
582 : oid Hseqbase;
583 : oid Tseqbase;
584 : bool
585 : Tsorted:1, Trevsorted:1, Tkey:1, Tnonil:1, Tdense:1;
586 : BUN size;
587 : size_t tailsize;
588 : size_t theapsize;
589 : } binbat;
590 :
591 : static str
592 1397 : RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in, bool must_flush, bool cint128)
593 : {
594 1397 : binbat bb = { 0, 0, 0, false, false, false, false, false, 0, 0, 0 };
595 1397 : char *nme = NULL;
596 1397 : char *val = NULL;
597 1397 : char tmp;
598 1397 : size_t len;
599 1397 : lng lv, *lvp;
600 :
601 1397 : BAT *b;
602 :
603 1397 : (void) cint128;
604 : /* hdr is a JSON structure that looks like
605 : * {"version":1,"ttype":6,"tseqbase":0,"tailsize":4,"theapsize":0}
606 : * we take the binary data directly from the stream */
607 :
608 : /* could skip whitespace, but we just don't allow that */
609 1397 : if (*hdr++ != '{')
610 0 : throw(MAL, "remote.bincopyfrom",
611 : "illegal input, not a JSON header (got '%s')", hdr - 1);
612 225367 : while (*hdr != '\0') {
613 223970 : switch (*hdr) {
614 33524 : case '"':
615 : /* we assume only numeric values, so all strings are
616 : * elems */
617 33524 : if (nme != NULL) {
618 16762 : *hdr = '\0';
619 : } else {
620 16762 : nme = hdr + 1;
621 : }
622 : break;
623 16762 : case ':':
624 16762 : val = hdr + 1;
625 16762 : break;
626 16764 : case ',':
627 : case '}':
628 16764 : if (val == NULL)
629 0 : throw(MAL, "remote.bincopyfrom",
630 : "illegal input, JSON value missing");
631 16764 : *hdr = '\0';
632 :
633 16764 : lvp = &lv;
634 16764 : len = sizeof(lv);
635 : /* tseqbase can be 1<<31/1<<63 which causes overflow
636 : * in lngFromStr, so we check separately */
637 16764 : if (strcmp(val,
638 : #if SIZEOF_OID == 8
639 : "9223372036854775808"
640 : #else
641 : "2147483648"
642 : #endif
643 1369 : ) == 0 && strcmp(nme, "tseqbase") == 0) {
644 1368 : bb.Tseqbase = oid_nil;
645 : } else {
646 : /* all values should be non-negative, so we check that
647 : * here as well */
648 15396 : if (lngFromStr(val, &len, &lvp, true) < 0 ||
649 15396 : lv < 0 /* includes lng_nil */ )
650 0 : throw(MAL, "remote.bincopyfrom",
651 : "bad %s value: %s", nme, val);
652 :
653 : /* deal with nme and val */
654 15396 : if (strcmp(nme, "version") == 0) {
655 1397 : if (lv != 1)
656 0 : throw(MAL, "remote.bincopyfrom",
657 : "unsupported version: %s", val);
658 13999 : } else if (strcmp(nme, "hseqbase") == 0) {
659 : #if SIZEOF_OID < SIZEOF_LNG
660 : if (lv > GDK_oid_max)
661 : throw(MAL, "remote.bincopyfrom",
662 : "bad %s value: %s", nme, val);
663 : #endif
664 1397 : bb.Hseqbase = (oid) lv;
665 12602 : } else if (strcmp(nme, "ttype") == 0) {
666 1397 : if (lv >= GDKatomcnt)
667 0 : throw(MAL, "remote.bincopyfrom",
668 : "bad %s value: GDK atom number %s doesn't exist",
669 : nme, val);
670 1397 : bb.Ttype = (int) lv;
671 11205 : } else if (strcmp(nme, "tseqbase") == 0) {
672 : #if SIZEOF_OID < SIZEOF_LNG
673 : if (lv > GDK_oid_max)
674 : throw(MAL, "remote.bincopyfrom",
675 : "bad %s value: %s", nme, val);
676 : #endif
677 29 : bb.Tseqbase = (oid) lv;
678 11176 : } else if (strcmp(nme, "tsorted") == 0) {
679 1397 : bb.Tsorted = lv != 0;
680 9779 : } else if (strcmp(nme, "trevsorted") == 0) {
681 1397 : bb.Trevsorted = lv != 0;
682 8382 : } else if (strcmp(nme, "tkey") == 0) {
683 1397 : bb.Tkey = lv != 0;
684 6985 : } else if (strcmp(nme, "tnonil") == 0) {
685 1397 : bb.Tnonil = lv != 0;
686 5588 : } else if (strcmp(nme, "tdense") == 0) {
687 1397 : bb.Tdense = lv != 0;
688 4191 : } else if (strcmp(nme, "size") == 0) {
689 1397 : if (lv > (lng) BUN_MAX)
690 0 : throw(MAL, "remote.bincopyfrom",
691 : "bad %s value: %s", nme, val);
692 1397 : bb.size = (BUN) lv;
693 2794 : } else if (strcmp(nme, "tailsize") == 0) {
694 1397 : bb.tailsize = (size_t) lv;
695 1397 : } else if (strcmp(nme, "theapsize") == 0) {
696 1397 : bb.theapsize = (size_t) lv;
697 : } else {
698 0 : throw(MAL, "remote.bincopyfrom",
699 : "unknown element: %s", nme);
700 : }
701 : }
702 : nme = val = NULL;
703 : break;
704 : }
705 223970 : hdr++;
706 : }
707 : #ifdef HAVE_HGE
708 1397 : if (int128 && !cint128 && bb.Ttype >= TYPE_hge)
709 0 : bb.Ttype++;
710 : #else
711 : (void) cint128;
712 : #endif
713 :
714 2705 : b = COLnew2(bb.Hseqbase, bb.Ttype, bb.size, TRANSIENT,
715 1308 : bb.size > 0 ? (uint16_t) (bb.tailsize / bb.size) : 0);
716 1397 : if (b == NULL)
717 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
718 :
719 1397 : if (bb.tailsize > 0) {
720 2568 : if (HEAPextend(b->theap, bb.tailsize, true) != GDK_SUCCEED ||
721 1284 : mnstr_read(in, b->theap->base, bb.tailsize, 1) < 0)
722 0 : goto bailout;
723 1284 : b->theap->dirty = true;
724 : }
725 1397 : if (bb.theapsize > 0) {
726 152 : if ((b->tvheap->base == NULL &&
727 76 : (*BATatoms[b->ttype].atomHeap) (b->tvheap,
728 : b->batCapacity) != GDK_SUCCEED)
729 76 : || HEAPextend(b->tvheap, bb.theapsize, true) != GDK_SUCCEED
730 76 : || mnstr_read(in, b->tvheap->base, bb.theapsize, 1) < 0)
731 0 : goto bailout;
732 76 : b->tvheap->free = bb.theapsize;
733 76 : b->tvheap->dirty = true;
734 : }
735 :
736 : /* set properties */
737 1397 : b->tseqbase = bb.Tdense ? bb.Tseqbase : oid_nil;
738 1397 : b->tsorted = bb.Tsorted;
739 1397 : b->trevsorted = bb.Trevsorted;
740 1397 : b->tkey = bb.Tkey;
741 1397 : b->tnonil = bb.Tnonil;
742 1397 : if (bb.Ttype == TYPE_str && bb.size)
743 76 : BATsetcapacity(b, (BUN) (bb.tailsize >> b->tshift));
744 1397 : BATsetcount(b, bb.size);
745 :
746 : // read blockmode flush
747 1397 : while (must_flush && mnstr_read(in, &tmp, 1, 1) > 0) {
748 0 : TRC_ERROR(MAL_REMOTE, "Expected flush, got: %c\n", tmp);
749 : }
750 :
751 1397 : BATsettrivprop(b);
752 :
753 1397 : *ret = b;
754 1397 : return (MAL_SUCCEED);
755 :
756 : bailout:
757 0 : BBPreclaim(b);
758 0 : throw(MAL, "remote.bincopyfrom", "reading failed");
759 : }
760 :
761 : /**
762 : * get fetches the object referenced by ident over connection conn.
763 : * We are only interested in retrieving void-headed BATs, i.e. single columns.
764 : */
765 : static str
766 1395 : RMTget(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
767 : {
768 1395 : str conn, ident, tmp, rt;
769 1395 : connection c;
770 1395 : char qbuf[BUFSIZ + 1];
771 1395 : MapiHdl mhdl = NULL;
772 1395 : int rtype;
773 1395 : ValPtr v;
774 :
775 1395 : (void) mb;
776 1395 : (void) cntxt;
777 :
778 1395 : conn = *getArgReference_str(stk, pci, 1);
779 1395 : if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
780 0 : throw(ILLARG, "remote.get",
781 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
782 1395 : ident = *getArgReference_str(stk, pci, 2);
783 1395 : if (ident == 0 || isIdentifier(ident) < 0)
784 0 : throw(ILLARG, "remote.get",
785 : ILLEGAL_ARGUMENT ": identifier expected, got '%s'", ident);
786 :
787 : /* lookup conn, set c if valid */
788 1395 : rethrow("remote.get", tmp, RMTfindconn(&c, conn));
789 :
790 1395 : rtype = getArgType(mb, pci, 0);
791 1395 : v = &stk->stk[pci->argv[0]];
792 :
793 1395 : if (rtype == TYPE_any || isAnyExpression(rtype)) {
794 0 : char *tpe, *msg;
795 0 : tpe = getTypeName(rtype);
796 0 : msg = createException(MAL, "remote.get",
797 : ILLEGAL_ARGUMENT ": unsupported any type: %s",
798 : tpe);
799 0 : GDKfree(tpe);
800 0 : return msg;
801 : }
802 : /* check if the remote type complies with what we expect.
803 : Since the put() encodes the type as known to the remote site
804 : we can simple compare it here */
805 1395 : rt = getTypeIdentifier(rtype);
806 1395 : if (rt == NULL)
807 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
808 1395 : if (strcmp(ident + strlen(ident) - strlen(rt), rt)) {
809 0 : tmp = createException(MAL, "remote.get", ILLEGAL_ARGUMENT
810 : ": remote object type %s does not match expected type %s",
811 : rt, ident);
812 0 : GDKfree(rt);
813 0 : return tmp;
814 : }
815 1395 : GDKfree(rt);
816 :
817 1395 : if (isaBatType(rtype) && (localtype == 0177 || (localtype != c->type && localtype != (c->type | RMTT_HGE)))) {
818 0 : int t;
819 0 : size_t s;
820 0 : ptr r;
821 0 : str var;
822 0 : BAT *b;
823 :
824 0 : snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
825 :
826 0 : TRC_DEBUG(MAL_REMOTE, "Remote get: %s\n", qbuf);
827 :
828 : /* this call should be a single transaction over the channel */
829 0 : MT_lock_set(&c->lock);
830 :
831 0 : if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf))
832 : != MAL_SUCCEED) {
833 0 : TRC_ERROR(MAL_REMOTE, "Remote get: %s\n%s\n", qbuf, tmp);
834 0 : MT_lock_unset(&c->lock);
835 0 : var = createException(MAL, "remote.get", "%s", tmp);
836 0 : freeException(tmp);
837 0 : return var;
838 : }
839 0 : t = getBatType(rtype);
840 0 : b = COLnew(0, t, 0, TRANSIENT);
841 0 : if (b == NULL) {
842 0 : mapi_close_handle(mhdl);
843 0 : MT_lock_unset(&c->lock);
844 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
845 : }
846 :
847 0 : if (ATOMbasetype(t) == TYPE_str) {
848 0 : while (mapi_fetch_row(mhdl)) {
849 0 : var = mapi_fetch_field(mhdl, 1);
850 0 : if (BUNappend(b, var == NULL ? str_nil : var, false) != GDK_SUCCEED) {
851 0 : BBPreclaim(b);
852 0 : mapi_close_handle(mhdl);
853 0 : MT_lock_unset(&c->lock);
854 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
855 : }
856 : }
857 : } else
858 0 : while (mapi_fetch_row(mhdl)) {
859 0 : var = mapi_fetch_field(mhdl, 1);
860 0 : if (var == NULL)
861 0 : var = "nil";
862 0 : s = 0;
863 0 : r = NULL;
864 0 : if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
865 0 : BUNappend(b, r, false) != GDK_SUCCEED) {
866 0 : BBPreclaim(b);
867 0 : GDKfree(r);
868 0 : mapi_close_handle(mhdl);
869 0 : MT_lock_unset(&c->lock);
870 0 : throw(MAL, "remote.get", GDK_EXCEPTION);
871 : }
872 0 : GDKfree(r);
873 : }
874 :
875 0 : v->val.bval = b->batCacheid;
876 0 : v->vtype = TYPE_bat;
877 0 : BBPkeepref(b);
878 :
879 0 : mapi_close_handle(mhdl);
880 0 : MT_lock_unset(&c->lock);
881 2790 : } else if (isaBatType(rtype)) {
882 : /* binary compatible remote host, transfer BAT in binary form */
883 1395 : stream *sout;
884 1395 : stream *sin;
885 1395 : char buf[256];
886 1395 : BAT *b = NULL;
887 :
888 : /* this call should be a single transaction over the channel */
889 1395 : MT_lock_set(&c->lock);
890 :
891 : /* bypass Mapi from this point to efficiently write all data to
892 : * the server */
893 1395 : sout = mapi_get_to(c->mconn);
894 1395 : sin = mapi_get_from(c->mconn);
895 1395 : if (sin == NULL || sout == NULL) {
896 0 : MT_lock_unset(&c->lock);
897 0 : throw(MAL, "remote.get", "Connection lost");
898 : }
899 :
900 : /* call our remote helper to do this more efficiently */
901 1395 : mnstr_printf(sout, "remote.batbincopy(%s);\n", ident);
902 1395 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
903 :
904 1395 : if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED) {
905 0 : MT_lock_unset(&c->lock);
906 0 : return tmp;
907 : }
908 :
909 1395 : if ((tmp = RMTinternalcopyfrom(&b, buf, sin, true, c->int128)) != MAL_SUCCEED) {
910 0 : MT_lock_unset(&c->lock);
911 0 : return (tmp);
912 : }
913 :
914 1395 : v->val.bval = b->batCacheid;
915 1395 : v->vtype = TYPE_bat;
916 1395 : BBPkeepref(b);
917 :
918 1395 : MT_lock_unset(&c->lock);
919 : } else {
920 0 : ptr p = NULL;
921 0 : str val;
922 0 : size_t len = 0;
923 :
924 0 : snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
925 0 : TRC_DEBUG(MAL_REMOTE, "Remote get: %s - %s\n", c->name, qbuf);
926 0 : if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf)) != MAL_SUCCEED) {
927 0 : return tmp;
928 : }
929 0 : (void) mapi_fetch_row(mhdl); /* should succeed */
930 0 : val = mapi_fetch_field(mhdl, 0);
931 :
932 0 : if (ATOMbasetype(rtype) == TYPE_str) {
933 0 : if (!VALinit(v, rtype, val == NULL ? str_nil : val)) {
934 0 : mapi_close_handle(mhdl);
935 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
936 : }
937 0 : } else if (ATOMfromstr(rtype, &p, &len, val == NULL ? "nil" : val, true)
938 : < 0) {
939 0 : char *msg;
940 0 : msg = createException(MAL, "remote.get",
941 : "unable to parse value: %s",
942 : val == NULL ? "nil" : val);
943 0 : mapi_close_handle(mhdl);
944 0 : GDKfree(p);
945 0 : return msg;
946 : } else {
947 0 : VALset(v, rtype, p);
948 0 : if (ATOMextern(rtype) == 0)
949 0 : GDKfree(p);
950 : }
951 :
952 0 : mapi_close_handle(mhdl);
953 : }
954 :
955 : return (MAL_SUCCEED);
956 : }
957 :
958 : /**
959 : * stores the given object on the remote host. The identifier of the
960 : * object on the remote host is returned for later use.
961 : */
962 : static str
963 3254 : RMTput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
964 : {
965 3254 : str conn, tmp;
966 3254 : char ident[512];
967 3254 : connection c;
968 3254 : ValPtr v;
969 3254 : int type;
970 3254 : ptr value;
971 3254 : MapiHdl mhdl = NULL;
972 :
973 3254 : (void) cntxt;
974 :
975 3254 : conn = *getArgReference_str(stk, pci, 1);
976 3254 : if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
977 5 : throw(ILLARG, "remote.put",
978 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
979 :
980 : /* lookup conn */
981 3249 : rethrow("remote.put", tmp, RMTfindconn(&c, conn));
982 :
983 : /* put the thing */
984 3249 : type = getArgType(mb, pci, 2);
985 3249 : value = getArgReference(stk, pci, 2);
986 :
987 : /* this call should be a single transaction over the channel */
988 3249 : MT_lock_set(&c->lock);
989 :
990 : /* get a free, typed identifier for the remote host */
991 3249 : tmp = RMTgetId(ident, sizeof(ident), mb, pci, 2);
992 3249 : if (tmp != MAL_SUCCEED) {
993 0 : MT_lock_unset(&c->lock);
994 0 : return tmp;
995 : }
996 :
997 : /* depending on the input object generate actions to store the
998 : * object remotely*/
999 3249 : if (type == TYPE_any || type == TYPE_bat || isAnyExpression(type)) {
1000 0 : char *tpe, *msg;
1001 0 : MT_lock_unset(&c->lock);
1002 0 : tpe = getTypeName(type);
1003 0 : msg = createException(MAL, "remote.put", "unsupported type: %s", tpe);
1004 0 : GDKfree(tpe);
1005 0 : return msg;
1006 4646 : } else if (isaBatType(type) && !is_bat_nil(*(bat *) value)) {
1007 1397 : BATiter bi;
1008 : /* naive approach using bat.new() and bat.insert() calls */
1009 1397 : char *tail;
1010 1397 : bat bid;
1011 1397 : BAT *b = NULL;
1012 1397 : BUN p, q;
1013 1397 : str tailv;
1014 1397 : stream *sout;
1015 :
1016 1397 : tail = getTypeIdentifier(getBatType(type));
1017 1397 : if (tail == NULL) {
1018 0 : MT_lock_unset(&c->lock);
1019 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1020 : }
1021 :
1022 1397 : bid = *(bat *) value;
1023 1397 : if (bid != 0) {
1024 1397 : if ((b = BATdescriptor(bid)) == NULL) {
1025 0 : MT_lock_unset(&c->lock);
1026 0 : GDKfree(tail);
1027 0 : throw(MAL, "remote.put",
1028 : SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
1029 : }
1030 : }
1031 :
1032 : /* bypass Mapi from this point to efficiently write all data to
1033 : * the server */
1034 1397 : sout = mapi_get_to(c->mconn);
1035 :
1036 : /* call our remote helper to do this more efficiently */
1037 1397 : mnstr_printf(sout,
1038 : "%s := remote.batload(nil:%s, " BUNFMT ");\n",
1039 : ident, tail, (bid == 0 ? 0 : BATcount(b)));
1040 1397 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
1041 1397 : GDKfree(tail);
1042 :
1043 : /* b can be NULL if bid == 0 (only type given, ugh) */
1044 1397 : if (b) {
1045 2794 : int tpe = getBatType(type), trivial = tpe < TYPE_date
1046 1397 : || ATOMbasetype(tpe) == TYPE_str;
1047 1397 : const void *nil = ATOMnilptr(tpe);
1048 1397 : int (*atomcmp)(const void *, const void *) = ATOMcompare(tpe);
1049 :
1050 1397 : bi = bat_iterator(b);
1051 1397 : BATloop(b, p, q) {
1052 0 : const void *v = BUNtail(bi, p);
1053 0 : tailv = ATOMformat(tpe, v);
1054 0 : if (tailv == NULL) {
1055 0 : bat_iterator_end(&bi);
1056 0 : BBPunfix(b->batCacheid);
1057 0 : MT_lock_unset(&c->lock);
1058 0 : throw(MAL, "remote.put", GDK_EXCEPTION);
1059 : }
1060 0 : if (trivial || atomcmp(v, nil) == 0)
1061 0 : mnstr_printf(sout, "%s\n", tailv);
1062 : else
1063 0 : mnstr_printf(sout, "\"%s\"\n", tailv);
1064 0 : GDKfree(tailv);
1065 : }
1066 1397 : bat_iterator_end(&bi);
1067 1397 : BBPunfix(b->batCacheid);
1068 : }
1069 :
1070 : /* write the empty line the server is waiting for, handles
1071 : * all errors at the same time, if any */
1072 1397 : if ((tmp = RMTquery(&mhdl, "remote.put", c->mconn, ""))
1073 : != MAL_SUCCEED) {
1074 0 : MT_lock_unset(&c->lock);
1075 0 : return tmp;
1076 : }
1077 1397 : mapi_close_handle(mhdl);
1078 0 : } else if (isaBatType(type) && is_bat_nil(*(bat *) value)) {
1079 0 : stream *sout;
1080 0 : str typename = getTypeName(type);
1081 0 : sout = mapi_get_to(c->mconn);
1082 0 : mnstr_printf(sout, "%s := nil:%s;\n", ident, typename);
1083 0 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
1084 0 : GDKfree(typename);
1085 : } else {
1086 1852 : size_t l;
1087 1852 : str val;
1088 1852 : char *tpe;
1089 1852 : char qbuf[512], *nbuf = qbuf;
1090 1852 : const void *nil = ATOMnilptr(type), *p = value;
1091 1852 : int (*atomcmp)(const void *, const void *) = ATOMcompare(type);
1092 :
1093 1852 : if (ATOMextern(type))
1094 1286 : p = *(ptr *) value;
1095 :
1096 1852 : val = ATOMformat(type, p);
1097 1851 : if (val == NULL) {
1098 0 : MT_lock_unset(&c->lock);
1099 0 : throw(MAL, "remote.put", GDK_EXCEPTION);
1100 : }
1101 1851 : tpe = getTypeIdentifier(type);
1102 1851 : if (tpe == NULL) {
1103 0 : MT_lock_unset(&c->lock);
1104 0 : GDKfree(val);
1105 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1106 : }
1107 1851 : l = strlen(val) + strlen(tpe) + strlen(ident) + 10;
1108 1851 : if (l > (ssize_t) sizeof(qbuf) && (nbuf = GDKmalloc(l)) == NULL) {
1109 0 : MT_lock_unset(&c->lock);
1110 0 : GDKfree(val);
1111 0 : GDKfree(tpe);
1112 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1113 : }
1114 1851 : if (type < TYPE_date || ATOMbasetype(type) == TYPE_str
1115 8 : || atomcmp(p, nil) == 0)
1116 1846 : snprintf(nbuf, l, "%s := %s:%s;\n", ident, val, tpe);
1117 : else
1118 5 : snprintf(nbuf, l, "%s := \"%s\":%s;\n", ident, val, tpe);
1119 1851 : GDKfree(tpe);
1120 1852 : GDKfree(val);
1121 1852 : TRC_DEBUG(MAL_REMOTE, "Remote put: %s - %s\n", c->name, nbuf);
1122 1852 : tmp = RMTquery(&mhdl, "remote.put", c->mconn, nbuf);
1123 1852 : if (nbuf != qbuf)
1124 33 : GDKfree(nbuf);
1125 1852 : if (tmp != MAL_SUCCEED) {
1126 0 : MT_lock_unset(&c->lock);
1127 0 : return tmp;
1128 : }
1129 1852 : mapi_close_handle(mhdl);
1130 : }
1131 3249 : MT_lock_unset(&c->lock);
1132 :
1133 : /* return the identifier */
1134 3249 : v = &stk->stk[pci->argv[0]];
1135 3249 : if (VALinit(v, TYPE_str, ident) == NULL)
1136 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1137 : return (MAL_SUCCEED);
1138 : }
1139 :
1140 : /**
1141 : * stores the given <mod>.<fcn> on the remote host.
1142 : * An error is returned if the function is already known at the remote site.
1143 : * The implementation is based on serialisation of the block into a string
1144 : * followed by remote parsing.
1145 : */
1146 : static str
1147 0 : RMTregisterInternal(Client cntxt, char **fcn_id, const char *conn,
1148 : const char *mod, const char *fcn)
1149 : {
1150 0 : str tmp, qry, msg;
1151 0 : connection c;
1152 0 : char buf[BUFSIZ];
1153 0 : MapiHdl mhdl = NULL;
1154 0 : Symbol sym;
1155 :
1156 0 : if (strNil(conn))
1157 0 : throw(ILLARG, "remote.register",
1158 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1159 :
1160 : /* find local definition */
1161 0 : sym = findSymbol(cntxt->usermodule, putName(mod), putName(fcn));
1162 0 : if (sym == NULL)
1163 0 : throw(MAL, "remote.register",
1164 : ILLEGAL_ARGUMENT ": no such function: %s.%s", mod, fcn);
1165 :
1166 : /* lookup conn */
1167 0 : rethrow("remote.register", tmp, RMTfindconn(&c, conn));
1168 :
1169 : /* this call should be a single transaction over the channel */
1170 0 : MT_lock_set(&c->lock);
1171 :
1172 : /* get a free, typed identifier for the remote host */
1173 0 : char ident[512];
1174 0 : tmp = RMTgetId(ident, sizeof(ident), sym->def, getInstrPtr(sym->def, 0), 0);
1175 0 : if (tmp != MAL_SUCCEED) {
1176 0 : MT_lock_unset(&c->lock);
1177 0 : return tmp;
1178 : }
1179 :
1180 : /* check remote definition */
1181 0 : snprintf(buf, BUFSIZ,
1182 : "b:bit:=inspect.getExistence(\"%s\",\"%s\");\nio.print(b);", mod,
1183 : ident);
1184 0 : TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, buf);
1185 0 : if ((msg = RMTquery(&mhdl, "remote.register", c->mconn, buf)) != MAL_SUCCEED) {
1186 0 : MT_lock_unset(&c->lock);
1187 0 : return msg;
1188 : }
1189 :
1190 0 : char *result;
1191 0 : if (mapi_get_field_count(mhdl) && mapi_fetch_row(mhdl)
1192 0 : && (result = mapi_fetch_field(mhdl, 0))) {
1193 0 : if (strcmp(result, "false") != 0)
1194 0 : msg = createException(MAL, "remote.register",
1195 : "function already exists at the remote site: %s.%s",
1196 : mod, fcn);
1197 : } else
1198 0 : msg = createException(MAL, "remote.register", OPERATION_FAILED);
1199 :
1200 0 : mapi_close_handle(mhdl);
1201 :
1202 0 : if (msg) {
1203 0 : MT_lock_unset(&c->lock);
1204 0 : return msg;
1205 : }
1206 :
1207 0 : *fcn_id = GDKstrdup(ident);
1208 0 : if (*fcn_id == NULL) {
1209 0 : MT_lock_unset(&c->lock);
1210 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1211 : }
1212 :
1213 0 : Symbol prg;
1214 0 : if ((prg = newFunctionArgs(putName(mod), putName(*fcn_id), FUNCTIONsymbol, -1)) == NULL) {
1215 0 : MT_lock_unset(&c->lock);
1216 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1217 : }
1218 :
1219 : // We only need the Symbol not the inner program stub. So we clear it.
1220 0 : freeMalBlk(prg->def);
1221 0 : prg->def = NULL;
1222 :
1223 0 : if ((prg->def = copyMalBlk(sym->def)) == NULL) {
1224 0 : MT_lock_unset(&c->lock);
1225 0 : freeSymbol(prg);
1226 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1227 : }
1228 0 : setFunctionId(getInstrPtr(prg->def, 0), putName(*fcn_id));
1229 :
1230 : /* make sure the program is error free */
1231 0 : msg = chkProgram(cntxt->usermodule, prg->def);
1232 0 : if (msg != MAL_SUCCEED || prg->def->errors) {
1233 0 : MT_lock_unset(&c->lock);
1234 0 : if (msg)
1235 : return msg;
1236 0 : throw(MAL, "remote.register",
1237 : "function '%s.%s' contains syntax or type errors", mod, *fcn_id);
1238 : }
1239 :
1240 0 : qry = mal2str(prg->def, 0, prg->def->stop);
1241 0 : TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, qry);
1242 0 : msg = RMTquery(&mhdl, "remote.register", c->mconn, qry);
1243 0 : GDKfree(qry);
1244 0 : if (mhdl)
1245 0 : mapi_close_handle(mhdl);
1246 :
1247 0 : freeSymbol(prg);
1248 :
1249 0 : MT_lock_unset(&c->lock);
1250 0 : return msg;
1251 : }
1252 :
1253 : static str
1254 0 : RMTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1255 : {
1256 0 : char **fcn_id = getArgReference_str(stk, pci, 0);
1257 0 : const char *conn = *getArgReference_str(stk, pci, 1);
1258 0 : const char *mod = *getArgReference_str(stk, pci, 2);
1259 0 : const char *fcn = *getArgReference_str(stk, pci, 3);
1260 0 : (void) mb;
1261 0 : return RMTregisterInternal(cntxt, fcn_id, conn, mod, fcn);
1262 : }
1263 :
1264 : /**
1265 : * exec executes the function with its given arguments on the remote
1266 : * host, returning the function's return value. exec is purposely kept
1267 : * very spartan. All arguments need to be handles to previously put()
1268 : * values. It calls the function with the given arguments at the remote
1269 : * site, and returns the handle which stores the return value of the
1270 : * remotely executed function. This return value can be retrieved using
1271 : * a get call. It handles multiple return arguments.
1272 : */
1273 : static str
1274 725 : RMTexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1275 : {
1276 725 : str conn, mod, func, tmp;
1277 725 : int i;
1278 725 : size_t len, buflen;
1279 725 : connection c = NULL;
1280 725 : char *qbuf;
1281 725 : MapiHdl mhdl;
1282 :
1283 725 : (void) cntxt;
1284 725 : (void) mb;
1285 725 : bool no_return_arguments = 0;
1286 :
1287 725 : columnar_result_callback *rcb = NULL;
1288 725 : ValRecord *v = &(stk)->stk[(pci)->argv[4]];
1289 725 : if (pci->retc == 1 && (pci->argc >= 4) && (v->vtype == TYPE_ptr)) {
1290 1 : rcb = (columnar_result_callback *) v->val.pval;
1291 : }
1292 :
1293 2669 : for (i = 0; i < pci->retc; i++) {
1294 1944 : if (stk->stk[pci->argv[i]].vtype == TYPE_str) {
1295 1943 : tmp = *getArgReference_str(stk, pci, i);
1296 1943 : if (tmp == NULL || strcmp(tmp, (str) str_nil) == 0)
1297 0 : throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT
1298 : ": return value %d is NULL or nil", i);
1299 : } else
1300 : no_return_arguments = 1;
1301 : }
1302 :
1303 725 : conn = *getArgReference_str(stk, pci, i++);
1304 725 : if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
1305 0 : throw(ILLARG, "remote.exec",
1306 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1307 725 : mod = *getArgReference_str(stk, pci, i++);
1308 725 : if (mod == NULL || strcmp(mod, (str) str_nil) == 0)
1309 0 : throw(ILLARG, "remote.exec",
1310 : ILLEGAL_ARGUMENT ": module name is NULL or nil");
1311 725 : func = *getArgReference_str(stk, pci, i++);
1312 725 : if (func == NULL || strcmp(func, (str) str_nil) == 0)
1313 0 : throw(ILLARG, "remote.exec",
1314 : ILLEGAL_ARGUMENT ": function name is NULL or nil");
1315 :
1316 : /* lookup conn */
1317 725 : rethrow("remote.exec", tmp, RMTfindconn(&c, conn));
1318 :
1319 : /* this call should be a single transaction over the channel */
1320 725 : MT_lock_set(&c->lock);
1321 :
1322 725 : if (!no_return_arguments && pci->argc - pci->retc < 3) { /* conn, mod, func, ... */
1323 0 : MT_lock_unset(&c->lock);
1324 0 : throw(MAL, "remote.exec",
1325 : ILLEGAL_ARGUMENT " MAL instruction misses arguments");
1326 : }
1327 :
1328 725 : len = 0;
1329 : /* count how big a buffer we need */
1330 725 : len += 2 * (pci->retc > 1);
1331 725 : if (!no_return_arguments)
1332 2667 : for (i = 0; i < pci->retc; i++) {
1333 1943 : len += 2 * (i > 0);
1334 1943 : len += strlen(*getArgReference_str(stk, pci, i));
1335 : }
1336 :
1337 725 : const int arg_index = rcb ? 4 : 3;
1338 :
1339 725 : len += strlen(mod) + strlen(func) + 6;
1340 2031 : for (i = arg_index; i < pci->argc - pci->retc; i++) {
1341 1306 : len += 2 * (i > arg_index);
1342 1306 : len += strlen(*getArgReference_str(stk, pci, pci->retc + i));
1343 : }
1344 725 : len += 2;
1345 725 : buflen = len + 1;
1346 725 : if ((qbuf = GDKmalloc(buflen)) == NULL) {
1347 0 : MT_lock_unset(&c->lock);
1348 0 : throw(MAL, "remote.exec", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1349 : }
1350 :
1351 725 : len = 0;
1352 :
1353 725 : if (pci->retc > 1)
1354 69 : qbuf[len++] = '(';
1355 725 : if (!no_return_arguments)
1356 2667 : for (i = 0; i < pci->retc; i++)
1357 1943 : len += snprintf(&qbuf[len], buflen - len, "%s%s",
1358 1943 : (i > 0 ? ", " : ""), *getArgReference_str(stk, pci,
1359 : i));
1360 :
1361 725 : if (pci->retc > 1)
1362 69 : qbuf[len++] = ')';
1363 :
1364 : /* build the function invocation string in qbuf */
1365 725 : if (!no_return_arguments && pci->retc > 0) {
1366 724 : len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, func);
1367 : } else {
1368 1 : len += snprintf(&qbuf[len], buflen - len, " %s.%s(", mod, func);
1369 : }
1370 :
1371 : /* handle the arguments to the function */
1372 :
1373 : /* put the arguments one by one, and dynamically build the
1374 : * invocation string */
1375 2031 : for (i = arg_index; i < pci->argc - pci->retc; i++) {
1376 1306 : len += snprintf(&qbuf[len], buflen - len, "%s%s",
1377 : (i > arg_index ? ", " : ""),
1378 1306 : *(getArgReference_str(stk, pci, pci->retc + i)));
1379 : }
1380 :
1381 : /* finish end execute the invocation string */
1382 725 : len += snprintf(&qbuf[len], buflen - len, ");");
1383 725 : TRC_DEBUG(MAL_REMOTE, "Remote exec: %s - %s\n", c->name, qbuf);
1384 725 : tmp = RMTquery(&mhdl, "remote.exec", c->mconn, qbuf);
1385 725 : GDKfree(qbuf);
1386 :
1387 : /* Temporary hack:
1388 : * use a callback to immediately handle columnar results before hdl is destroyed. */
1389 725 : if (tmp == MAL_SUCCEED && rcb && mhdl
1390 1 : && (mapi_get_querytype(mhdl) == Q_TABLE
1391 0 : || mapi_get_querytype(mhdl) == Q_PREPARE)) {
1392 1 : int fields = mapi_get_field_count(mhdl);
1393 1 : columnar_result *results = GDKzalloc(sizeof(columnar_result) * fields);
1394 :
1395 1 : if (!results) {
1396 0 : tmp = createException(MAL, "remote.exec",
1397 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
1398 : } else {
1399 1 : int i = 0;
1400 1 : char buf[256] = { 0 };
1401 1 : stream *sin = mapi_get_from(c->mconn);
1402 :
1403 4 : for (; i < fields; i++) {
1404 2 : BAT *b = NULL;
1405 :
1406 2 : if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED ||
1407 2 : (tmp = RMTinternalcopyfrom(&b, buf, sin, i == fields - 1, c->int128)) != MAL_SUCCEED) {
1408 : break;
1409 : }
1410 :
1411 2 : results[i].id = b->batCacheid;
1412 2 : BBPkeepref(b);
1413 2 : results[i].colname = mapi_get_name(mhdl, i);
1414 2 : results[i].tpename = mapi_get_type(mhdl, i);
1415 2 : results[i].digits = mapi_get_digits(mhdl, i);
1416 2 : results[i].scale = mapi_get_scale(mhdl, i);
1417 : }
1418 :
1419 1 : if (tmp != MAL_SUCCEED) {
1420 0 : for (int j = 0; j < i; j++)
1421 0 : BBPrelease(results[j].id);
1422 : } else {
1423 1 : assert(rcb->context);
1424 1 : tmp = rcb->call(rcb->context, mapi_get_table(mhdl, 0), results,
1425 : fields);
1426 3 : for (int j = 0; j < i; j++)
1427 2 : BBPrelease(results[j].id);
1428 : }
1429 1 : GDKfree(results);
1430 : }
1431 : }
1432 :
1433 725 : if (rcb) {
1434 1 : GDKfree(rcb->context);
1435 1 : GDKfree(rcb);
1436 : }
1437 :
1438 725 : if (mhdl)
1439 717 : mapi_close_handle(mhdl);
1440 725 : MT_lock_unset(&c->lock);
1441 725 : return tmp;
1442 : }
1443 :
1444 : /**
1445 : * batload is a helper function to make transferring a BAT with RMTput
1446 : * more efficient. It works by creating a BAT, and loading it with the
1447 : * data as comma separated values from the input stream, until an empty
1448 : * line is read. The given size argument is taken as a hint only, and
1449 : * is not enforced to match the number of rows read.
1450 : */
1451 : static str
1452 1397 : RMTbatload(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1453 : {
1454 1397 : ValPtr v;
1455 1397 : int t;
1456 1397 : int size;
1457 1397 : ptr r;
1458 1397 : size_t s;
1459 1397 : BAT *b;
1460 1397 : size_t len;
1461 1397 : char *var;
1462 1397 : str msg = MAL_SUCCEED;
1463 1397 : bstream *fdin = cntxt->fdin;
1464 :
1465 1397 : v = &stk->stk[pci->argv[0]]; /* return */
1466 1397 : t = getArgType(mb, pci, 1); /* tail type */
1467 1397 : size = *getArgReference_int(stk, pci, 2); /* size */
1468 :
1469 1397 : b = COLnew(0, t, size, TRANSIENT);
1470 1397 : if (b == NULL)
1471 0 : throw(MAL, "remote.load", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1472 :
1473 : /* grab the input stream and start reading */
1474 1397 : fdin->eof = false;
1475 1397 : len = fdin->pos;
1476 1397 : while (len < fdin->len || bstream_next(fdin) > 0) {
1477 : /* newline hunting (how spartan) */
1478 1397 : for (len = fdin->pos; len < fdin->len && fdin->buf[len] != '\n';
1479 0 : len++) ;
1480 : /* unterminated line, request more */
1481 1397 : if (fdin->buf[len] != '\n')
1482 0 : continue;
1483 : /* empty line, end of input */
1484 1397 : if (fdin->pos == len) {
1485 1397 : if (isa_block_stream(fdin->s)) {
1486 1397 : ssize_t n = bstream_next(fdin);
1487 1397 : if (n)
1488 0 : msg = createException(MAL, "remote.load",
1489 : SQLSTATE(HY013)
1490 : "Unexpected return from remote");
1491 : }
1492 : break;
1493 : }
1494 0 : fdin->buf[len] = '\0'; /* kill \n */
1495 0 : var = &fdin->buf[fdin->pos];
1496 : /* skip over this line */
1497 0 : fdin->pos = ++len;
1498 :
1499 0 : s = 0;
1500 0 : r = NULL;
1501 0 : if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
1502 0 : BUNappend(b, r, false) != GDK_SUCCEED) {
1503 0 : BBPreclaim(b);
1504 0 : GDKfree(r);
1505 0 : throw(MAL, "remote.get", GDK_EXCEPTION);
1506 : }
1507 0 : GDKfree(r);
1508 : }
1509 :
1510 1397 : v->val.bval = b->batCacheid;
1511 1397 : v->vtype = TYPE_bat;
1512 1397 : BBPkeepref(b);
1513 :
1514 1397 : return msg;
1515 : }
1516 :
1517 : /**
1518 : * dump given BAT to stream
1519 : */
1520 : static str
1521 1395 : RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1522 : {
1523 1395 : bat bid = *getArgReference_bat(stk, pci, 1);
1524 1395 : BAT *b = BBPquickdesc(bid), *v = b;
1525 1395 : char sendtheap = 0, sendtvheap = 0;
1526 :
1527 1395 : (void) mb;
1528 1395 : (void) stk;
1529 1395 : (void) pci;
1530 :
1531 1395 : if (b == NULL)
1532 0 : throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_UNDEFINED);
1533 :
1534 1395 : if (BBPfix(bid) <= 0)
1535 0 : throw(MAL, "remote.bincopyto", MAL_MALLOC_FAIL);
1536 :
1537 1395 : sendtheap = b->ttype != TYPE_void;
1538 1395 : sendtvheap = sendtheap && b->tvheap;
1539 1395 : if (isVIEW(b) && sendtvheap && VIEWvtparent(b)
1540 42 : && BATcount(b) < BATcount(BBP_desc(VIEWvtparent(b)))) {
1541 18 : if ((b = BATdescriptor(bid)) == NULL) {
1542 0 : BBPunfix(bid);
1543 0 : throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_MISSING);
1544 : }
1545 18 : v = COLcopy(b, b->ttype, true, TRANSIENT);
1546 18 : BBPunfix(b->batCacheid);
1547 18 : if (v == NULL) {
1548 0 : BBPunfix(bid);
1549 0 : throw(MAL, "remote.bincopyto", GDK_EXCEPTION);
1550 : }
1551 : }
1552 :
1553 1395 : BATiter vi = bat_iterator(v);
1554 1485 : mnstr_printf(cntxt->fdout, /*JSON*/ "{"
1555 : "\"version\":1,"
1556 : "\"ttype\":%d,"
1557 : "\"hseqbase\":" OIDFMT ","
1558 : "\"tseqbase\":" OIDFMT ","
1559 : "\"tsorted\":%d,"
1560 : "\"trevsorted\":%d,"
1561 : "\"tkey\":%d,"
1562 : "\"tnonil\":%d,"
1563 : "\"tdense\":%d,"
1564 : "\"size\":" BUNFMT ","
1565 : "\"tailsize\":%zu,"
1566 : "\"theapsize\":%zu"
1567 : "}\n",
1568 1395 : vi.type,
1569 : v->hseqbase, v->tseqbase,
1570 1395 : vi.sorted, vi.revsorted,
1571 1395 : vi.key,
1572 1395 : vi.nonil,
1573 1395 : BATtdensebi(&vi),
1574 : vi.count,
1575 1366 : sendtheap ? (size_t) vi.count << vi.shift : 0,
1576 90 : sendtvheap && vi.count > 0 ? vi.vhfree : 0);
1577 :
1578 1395 : if (sendtheap && vi.count > 0) {
1579 1282 : mnstr_write(cntxt->fdout, /* tail */
1580 1282 : vi.base, vi.count * vi.width, 1);
1581 1282 : if (sendtvheap)
1582 75 : mnstr_write(cntxt->fdout, /* theap */
1583 75 : vi.vh->base, vi.vhfree, 1);
1584 : }
1585 1395 : bat_iterator_end(&vi);
1586 : /* flush is done by the calling environment (MAL) */
1587 :
1588 1395 : if (b != v)
1589 18 : BBPreclaim(v);
1590 :
1591 1395 : BBPunfix(bid);
1592 :
1593 1395 : return (MAL_SUCCEED);
1594 : }
1595 :
1596 : /**
1597 : * read from the input stream and give the BAT handle back to the caller
1598 : */
1599 : static str
1600 0 : RMTbincopyfrom(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1601 : {
1602 0 : BAT *b = NULL;
1603 0 : ValPtr v;
1604 0 : str err;
1605 :
1606 0 : (void) mb;
1607 :
1608 : /* We receive a normal line, which contains the JSON header, the
1609 : * rest is binary data directly on the stream. We get the first
1610 : * line from the buffered stream we have here, and pass it on
1611 : * together with the raw stream we have. */
1612 0 : cntxt->fdin->eof = false; /* in case it was before */
1613 0 : if (bstream_next(cntxt->fdin) <= 0)
1614 0 : throw(MAL, "remote.bincopyfrom", "expected JSON header");
1615 :
1616 0 : cntxt->fdin->buf[cntxt->fdin->len] = '\0';
1617 0 : err = RMTinternalcopyfrom(&b,
1618 0 : &cntxt->fdin->buf[cntxt->fdin->pos], cntxt->fdin->s, true, int128 /* library should be compatible */);
1619 : /* skip the JSON line */
1620 0 : cntxt->fdin->pos = ++cntxt->fdin->len;
1621 0 : if (err !=MAL_SUCCEED)
1622 : return (err);
1623 :
1624 0 : v = &stk->stk[pci->argv[0]];
1625 0 : v->val.bval = b->batCacheid;
1626 0 : v->vtype = TYPE_bat;
1627 0 : BBPkeepref(b);
1628 :
1629 0 : return (MAL_SUCCEED);
1630 : }
1631 :
1632 : /**
1633 : * bintype identifies the system on its binary profile. This is mainly
1634 : * used to determine if BATs can be sent binary across.
1635 : */
1636 : static str
1637 185 : RMTbintype(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1638 : {
1639 185 : (void)mb;
1640 185 : (void)stk;
1641 185 : (void)pci;
1642 :
1643 : /* TODO bintype should include the (bin) protocol version */
1644 185 : mnstr_printf(cntxt->fdout, "[ %d ]\n", localtype);
1645 185 : return(MAL_SUCCEED);
1646 : }
1647 :
1648 : /**
1649 : * Returns whether the underlying connection is still connected or not.
1650 : * Best effort implementation on top of mapi using a ping.
1651 : */
1652 : static str
1653 0 : RMTisalive(int *ret, str *conn)
1654 : {
1655 0 : str tmp;
1656 0 : connection c;
1657 :
1658 0 : if (*conn == NULL || strcmp(*conn, (str) str_nil) == 0)
1659 0 : throw(ILLARG, "remote.get",
1660 : ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1661 :
1662 : /* lookup conn, set c if valid */
1663 0 : rethrow("remote.get", tmp, RMTfindconn(&c, *conn));
1664 :
1665 0 : *ret = 0;
1666 0 : if (mapi_is_connected(c->mconn) && mapi_ping(c->mconn) == MOK)
1667 0 : *ret = 1;
1668 :
1669 : return MAL_SUCCEED;
1670 : }
1671 :
1672 : // This is basically a no op
1673 : static str
1674 356 : RMTregisterSupervisor(int *ret, str *sup_uuid, str *query_uuid)
1675 : {
1676 356 : (void) sup_uuid;
1677 356 : (void) query_uuid;
1678 :
1679 356 : *ret = 0;
1680 356 : return MAL_SUCCEED;
1681 : }
1682 :
1683 : #include "mel.h"
1684 : mel_func remote_init_funcs[] = {
1685 : command("remote", "epilogue", RMTepilogue, false, "release the resources held by the remote module", args(1,1, arg("",void))),
1686 : command("remote", "resolve", RMTresolve, false, "resolve a pattern against Merovingian and return the URIs", args(1,2, batarg("",str),arg("pattern",str))),
1687 : pattern("remote", "connect", RMTconnect, false, "returns a newly created connection for uri, using user name and password", args(1,5, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str))),
1688 : command("remote", "connect", RMTconnectScen, false, "returns a newly created connection for uri, using user name, password and scenario", args(1,6, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str),arg("columnar",bit))),
1689 : command("remote", "disconnect", RMTdisconnect, false, "disconnects the connection pointed to by handle (received from a call to connect()", args(1,2, arg("",void),arg("conn",str))),
1690 : pattern("remote", "get", RMTget, false, "retrieves a copy of remote object ident", args(1,3, argany("",0),arg("conn",str),arg("ident",str))),
1691 : pattern("remote", "put", RMTput, false, "copies object to the remote site and returns its identifier", args(1,3, arg("",str),arg("conn",str),argany("object",0))),
1692 : pattern("remote", "register", RMTregister, false, "register <mod>.<fcn> at the remote site", args(1,4, arg("",str),arg("conn",str),arg("mod",str),arg("fcn",str))),
1693 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and returns the handle to its result", args(1,4, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
1694 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and returns the handle to its result", args(1,5, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
1695 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and applying function pointer rcb as callback to handle any results.", args(0,5, arg("conn",str),arg("mod",str),arg("func",str),arg("rcb",ptr), vararg("",str))),
1696 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and ignoring results.", args(0,4, arg("conn",str),arg("mod",str),arg("func",str), vararg("",str))),
1697 : command("remote", "isalive", RMTisalive, false, "check if conn is still valid and connected", args(1,2, arg("",int),arg("conn",str))),
1698 : pattern("remote", "batload", RMTbatload, false, "create a BAT of the given type and size, and load values from the input stream", args(1,3, batargany("",1),argany("tt",1),arg("size",int))),
1699 : pattern("remote", "batbincopy", RMTbincopyto, false, "dump BAT b in binary form to the stream", args(1,2, arg("",void),batargany("b",0))),
1700 : pattern("remote", "batbincopy", RMTbincopyfrom, false, "store the binary BAT data in the BBP and return as BAT", args(1,1, batargany("",0))),
1701 : pattern("remote", "bintype", RMTbintype, false, "print the binary type of this mserver5", args(1,1, arg("",void))),
1702 : command("remote", "register_supervisor", RMTregisterSupervisor, false, "Register the supervisor uuid at a remote site", args(1,3, arg("",int),arg("sup_uuid",str),arg("query_uuid",str))),
1703 : { .imp=NULL }
1704 : };
1705 : #include "mal_import.h"
1706 : #ifdef _MSC_VER
1707 : #undef read
1708 : #pragma section(".CRT$XCU",read)
1709 : #endif
1710 329 : LIB_STARTUP_FUNC(init_remote_mal)
1711 329 : { mal_module2("remote", NULL, remote_init_funcs, RMTprelude, NULL); }
|