LCOV - code coverage report
Current view: top level - monetdb5/modules/mal - remote.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 518 886 58.5 %
Date: 2024-11-13 19:37:10 Functions: 18 23 78.3 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /*
      14             :  * (c) Fabian Groffen, Martin Kersten
      15             :  * Remote querying functionality
      16             :  * Communication with other mservers at the MAL level is a delicate task.
      17             :  * However, it is indispensable for any distributed functionality.  This
      18             :  * module provides an abstract way to store and retrieve objects on a
      19             :  * remote site.  Additionally, functions on a remote site can be executed
      20             :  * using objects available in the remote session context.  This yields in
      21             :  * four primitive functions that form the basis for distribution methods:
      22             :  * get, put, register and exec.
      23             :  *
      24             :  * The get method simply retrieves a copy of a remote object.  Objects can
      25             :  * be simple values, strings or Column.  The same holds for the put method,
      26             :  * but the other way around.  A local object can be stored on a remote
      27             :  * site.  Upon a successful store, the put method returns the remote
      28             :  * identifier for the stored object.  With this identifier the object can
      29             :  * be addressed, e.g. using the get method to retrieve the object that was
      30             :  * stored using put.
      31             :  *
      32             :  * The get and put methods are symmetric.  Performing a get on an
      33             :  * identifier that was returned by put, results in an object with the same
      34             :  * value and type as the one that was put.  The result of such an operation is
      35             :  * equivalent to making an (expensive) copy of the original object.
      36             :  *
      37             :  * The register function takes a local MAL function and makes it known at a
      38             :  * remote site. It ensures that it does not overload an already known
      39             :  * operation remotely, which could create a semantic conflict.
      40             :  * Deregistering a function is forbidden, because it would allow for taking
      41             :  * over the remote site completely.
      42             :  * C-implemented functions, such as io.print() cannot be remotely stored.
      43             :  * It would require even more complicated (byte) code shipping and remote
      44             :  * compilation to make it work.
      45             :  *
      46             :  * The choice to let exec only execute functions avoids problems
      47             :  * to decide what should be returned to the caller.  With a function it is
      48             :  * clear and simple to return that what the function signature prescribes.
      49             :  * Any side effect (e.g. io.print calls) may cause havoc in the system,
      50             :  * but are currently ignored.
      51             :  *
      52             :  * This leads to the final contract of this module.  The methods should be
      53             :  * used correctly, by obeying their contract.  Failing to do so will result
      54             :  * in errors and possibly undefined behaviour.
      55             :  *
      56             :  * The resolve() function can be used to query Merovingian.  It returns one
      57             :  * or more databases discovered in its vicinity matching the given pattern.
      58             :  *
      59             :  */
      60             : #include "monetdb_config.h"
      61             : #include "remote.h"
      62             : 
      63             : /*
      64             :  * Technically, these methods need to be serialised per connection,
      65             :  * hence a scheduler that interleaves e.g. multiple get calls, simply
      66             :  * violates this constraint.  If parallelism to the same site is
      67             :  * desired, a user could create a second connection.  This is not always
      68             :  * easy to generate at the proper place, e.g. overloading the dataflow
      69             :  * optimizer to patch connections structures is not acceptable.
      70             :  *
      71             :  * Instead, we maintain a simple lock with each connection, which can be
      72             :  * used to issue a safe, but blocking get/put/exec/register request.
      73             :  */
      74             : #include "mal_exception.h"
      75             : #include "mal_interpreter.h"
      76             : #include "mal_function.h"             /* for printFunction */
      77             : #include "mal_listing.h"
      78             : #include "mal_instruction.h"  /* for getmodule/func macros */
      79             : #include "mal_authorize.h"
      80             : #include "mapi.h"
      81             : #include "mutils.h"
      82             : 
      83             : #define RMTT_L_ENDIAN   (0<<1)
      84             : #define RMTT_B_ENDIAN   (1<<1)
      85             : #define RMTT_32_BITS    (0<<2)
      86             : #define RMTT_64_BITS    (1<<2)
      87             : #define RMTT_32_OIDS    (0<<3)
      88             : #define RMTT_64_OIDS    (1<<3)
      89             : #define RMTT_HGE            (1<<4)
      90             : 
      91             : typedef struct _connection {
      92             :         MT_Lock lock;                           /* lock to avoid interference */
      93             :         str name;                                       /* the handle for this connection */
      94             :         Mapi mconn;                                     /* the Mapi handle for the connection */
      95             :         unsigned char type;                     /* binary profile of the connection target */
      96             :         bool int128;                            /* has int128 support */
      97             :         size_t nextid;                          /* id counter */
      98             :         struct _connection *next;       /* the next connection in the list */
      99             : } *connection;
     100             : 
     101             : #ifndef WIN32
     102             : #include <sys/socket.h>                   /* socket */
     103             : #include <sys/un.h>                               /* sockaddr_un */
     104             : #endif
     105             : #include <unistd.h>                               /* gethostname */
     106             : 
     107             : static MT_Lock mal_remoteLock = MT_LOCK_INITIALIZER(mal_remoteLock);
     108             : 
     109             : static connection conns = NULL;
     110             : static unsigned char localtype = 0177;
     111             : static bool int128 = false;
     112             : 
     113             : static inline str RMTquery(MapiHdl *ret, const char *func, Mapi conn,
     114             :                                                    const char *query);
     115             : 
     116             : /**
     117             :  * Returns a BAT with valid redirects for the given pattern.  If
     118             :  * merovingian is not running, this function throws an error.
     119             :  */
     120             : static str
     121           0 : RMTresolve(bat *ret, const char *const *pat)
     122             : {
     123             : #ifdef NATIVE_WIN32
     124             :         (void) ret;
     125             :         (void) pat;
     126             :         throw(MAL, "remote.resolve", "merovingian is not available on " "your platform, sorry");  /* please upgrade to Linux, etc. */
     127             : #else
     128           0 :         BAT *list;
     129           0 :         const char *mero_uri;
     130           0 :         char *p;
     131           0 :         unsigned int port;
     132           0 :         char **redirs;
     133           0 :         char **or;
     134             : 
     135           0 :         if (pat == NULL || *pat == NULL || strcmp(*pat, (str) str_nil) == 0)
     136           0 :                 throw(ILLARG, "remote.resolve",
     137             :                           ILLEGAL_ARGUMENT ": pattern is NULL or nil");
     138             : 
     139           0 :         mero_uri = GDKgetenv("merovingian_uri");
     140           0 :         if (mero_uri == NULL)
     141           0 :                 throw(MAL, "remote.resolve", "this function needs the mserver "
     142             :                           "have been started by merovingian");
     143             : 
     144           0 :         list = COLnew(0, TYPE_str, 0, TRANSIENT);
     145           0 :         if (list == NULL)
     146           0 :                 throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     147             : 
     148             :         /* extract port from mero_uri, let mapi figure out the rest */
     149           0 :         mero_uri += strlen("mapi:monetdb://");
     150           0 :         if (*mero_uri == '[') {
     151           0 :                 if ((mero_uri = strchr(mero_uri, ']')) == NULL) {
     152           0 :                         BBPreclaim(list);
     153           0 :                         throw(MAL, "remote.resolve",
     154             :                                   "illegal IPv6 address on merovingian_uri: %s",
     155             :                                   GDKgetenv("merovingian_uri"));
     156             :                 }
     157             :         }
     158           0 :         if ((p = strchr(mero_uri, ':')) == NULL) {
     159           0 :                 BBPreclaim(list);
     160           0 :                 throw(MAL, "remote.resolve", "illegal merovingian_uri setting: %s",
     161             :                           GDKgetenv("merovingian_uri"));
     162             :         }
     163           0 :         port = (unsigned int) atoi(p + 1);
     164             : 
     165           0 :         or = redirs = mapi_resolve(NULL, port, *pat);
     166             : 
     167           0 :         if (redirs == NULL) {
     168           0 :                 BBPreclaim(list);
     169           0 :                 throw(MAL, "remote.resolve", "unknown failure when resolving pattern");
     170             :         }
     171             : 
     172           0 :         while (*redirs != NULL) {
     173           0 :                 if (BUNappend(list, (ptr) *redirs, false) != GDK_SUCCEED) {
     174           0 :                         BBPreclaim(list);
     175           0 :                         do
     176           0 :                                 free(*redirs);
     177           0 :                         while (*++redirs);
     178           0 :                         free(or);
     179           0 :                         throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     180             :                 }
     181           0 :                 free(*redirs);
     182           0 :                 redirs++;
     183             :         }
     184           0 :         free(or);
     185             : 
     186           0 :         *ret = list->batCacheid;
     187           0 :         BBPkeepref(list);
     188           0 :         return (MAL_SUCCEED);
     189             : #endif
     190             : }
     191             : 
     192             : 
     193             : /* for unique connection identifiers */
     194             : static size_t connection_id = 0;
     195             : 
     196             : /**
     197             :  * Returns a connection to the given uri.  It always returns a newly
     198             :  * created connection.
     199             :  */
     200             : static str
     201         187 : RMTconnectScen(str *ret,
     202             :                            const char *const *ouri, const char *const *user, const char *const *passwd, const char *const *scen, bit *columnar)
     203             : {
     204         187 :         connection c;
     205         187 :         char conn[BUFSIZ];
     206         187 :         char *s;
     207         187 :         Mapi m;
     208         187 :         MapiHdl hdl;
     209         187 :         str msg;
     210             : 
     211             :         /* just make sure the return isn't garbage */
     212         187 :         *ret = 0;
     213             : 
     214         187 :         if (ouri == NULL || *ouri == NULL || strcmp(*ouri, (str) str_nil) == 0)
     215           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": database uri "
     216             :                           "is NULL or nil");
     217         187 :         if (user == NULL || *user == NULL || strcmp(*user, (str) str_nil) == 0)
     218           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": username is "
     219             :                           "NULL or nil");
     220         187 :         if (passwd == NULL || *passwd == NULL
     221         187 :                 || strcmp(*passwd, (str) str_nil) == 0)
     222           0 :                 throw(ILLARG, "remote.connect",
     223             :                           ILLEGAL_ARGUMENT ": password is " "NULL or nil");
     224         187 :         if (scen == NULL || *scen == NULL || strcmp(*scen, (str) str_nil) == 0)
     225           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario is "
     226             :                           "NULL or nil");
     227         187 :         if (strcmp(*scen, "mal") != 0 && strcmp(*scen, "msql") != 0)
     228           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario '%s' "
     229             :                           "is not supported", *scen);
     230             : 
     231         187 :         m = mapi_mapiuri(*ouri, *user, *passwd, *scen);
     232         188 :         if (mapi_error(m)) {
     233           0 :                 msg = createException(MAL, "remote.connect",
     234             :                                                           "unable to connect to '%s': %s",
     235             :                                                           *ouri, mapi_error_str(m));
     236           0 :                 mapi_destroy(m);
     237           0 :                 return msg;
     238             :         }
     239             : 
     240         188 :         MT_lock_set(&mal_remoteLock);
     241             : 
     242             :         /* generate an unique connection name, they are only known
     243             :          * within one mserver, id is primary key, the rest is super key */
     244         191 :         snprintf(conn, BUFSIZ, "%s_%s_%zu", mapi_get_dbname(m), *user,
     245             :                          connection_id++);
     246             :         /* make sure we can construct MAL identifiers using conn */
     247        5380 :         for (s = conn; *s != '\0'; s++) {
     248        5189 :                 if (!isalnum((unsigned char) *s)) {
     249         743 :                         *s = '_';
     250             :                 }
     251             :         }
     252             : 
     253         191 :         if (mapi_reconnect(m) != MOK) {
     254           5 :                 MT_lock_unset(&mal_remoteLock);
     255           5 :                 msg = createException(IO, "remote.connect",
     256             :                                                           "unable to connect to '%s': %s",
     257             :                                                           *ouri, mapi_error_str(m));
     258           5 :                 mapi_destroy(m);
     259           5 :                 return msg;
     260             :         }
     261             : 
     262         186 :         if (columnar && *columnar) {
     263           1 :                 char set_protocol_query_buf[50];
     264           1 :                 snprintf(set_protocol_query_buf, 50, "sql.set_protocol(%d:int);",
     265             :                                  PROTOCOL_COLUMNAR);
     266           1 :                 if ((msg = RMTquery(&hdl, "remote.connect", m, set_protocol_query_buf))) {
     267           0 :                         mapi_destroy(m);
     268           0 :                         MT_lock_unset(&mal_remoteLock);
     269           0 :                         return msg;
     270             :                 }
     271             :         }
     272             : 
     273             :         /* connection established, add to list */
     274         186 :         c = GDKzalloc(sizeof(struct _connection));
     275         186 :         if (c == NULL || (c->name = GDKstrdup(conn)) == NULL) {
     276           0 :                 GDKfree(c);
     277           0 :                 mapi_destroy(m);
     278           0 :                 MT_lock_unset(&mal_remoteLock);
     279           0 :                 throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     280             :         }
     281         186 :         c->mconn = m;
     282         186 :         c->nextid = 0;
     283         186 :         MT_lock_init(&c->lock, c->name);
     284         186 :         c->next = conns;
     285         186 :         conns = c;
     286             : 
     287         186 :         msg = RMTquery(&hdl, "remote.connect", m, "remote.bintype();");
     288         186 :         if (msg) {
     289           0 :                 MT_lock_unset(&mal_remoteLock);
     290           0 :                 return msg;
     291             :         }
     292         372 :         if (hdl != NULL && mapi_fetch_row(hdl)) {
     293         186 :                 char *val = mapi_fetch_field(hdl, 0);
     294         186 :                 c->type = (unsigned char) atoi(val);
     295         186 :                 mapi_close_handle(hdl);
     296             :         } else {
     297           0 :                 c->type = 0;
     298             :         }
     299             : 
     300             : #ifdef _DEBUG_MAPI_
     301             :         mapi_trace(c->mconn, true);
     302             : #endif
     303         186 :         if (c->type != localtype && (c->type | RMTT_HGE) == localtype) {
     304             :                 /* we support hge, and for remote, we don't know */
     305           0 :                 msg = RMTquery(&hdl, "remote.connect", m, "x := 0:hge;");
     306           0 :                 if (msg) {
     307           0 :                         freeException(msg);
     308           0 :                         c->int128 = false;
     309             :                 } else {
     310           0 :                         mapi_close_handle(hdl);
     311           0 :                         c->int128 = true;
     312           0 :                         c->type |= RMTT_HGE;
     313             :                 }
     314         186 :         } else if (c->type == localtype) {
     315         186 :                 c->int128 = int128;
     316             :         }
     317         186 :         MT_lock_unset(&mal_remoteLock);
     318             : 
     319         186 :         *ret = GDKstrdup(conn);
     320         186 :         if (*ret == NULL)
     321           0 :                 throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     322             :         return (MAL_SUCCEED);
     323             : }
     324             : 
     325             : static str
     326         185 : RMTconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     327             : {
     328         185 :         (void) cntxt;
     329         185 :         (void) mb;
     330         185 :         str *ret = getArgReference_str(stk, pci, 0);
     331         185 :         const char *uri = *getArgReference_str(stk, pci, 1);
     332         185 :         const char *user = *getArgReference_str(stk, pci, 2);
     333         185 :         const char *passwd = *getArgReference_str(stk, pci, 3);
     334             : 
     335         185 :         const char *scen = "msql";
     336             : 
     337         185 :         if (pci->argc >= 5)
     338         185 :                 scen = *getArgReference_str(stk, pci, 4);
     339             : 
     340         185 :         return RMTconnectScen(ret, &uri, &user, &passwd, &scen, NULL);
     341             : }
     342             : 
     343             : /**
     344             :  * Disconnects a connection.  The connection needs not to exist in the
     345             :  * system, it only needs to exist for the client (i.e. it was once
     346             :  * created).
     347             :  */
     348             : str
     349         186 : RMTdisconnect(void *ret, const char *const *conn)
     350             : {
     351         186 :         connection c, t;
     352             : 
     353         186 :         if (conn == NULL || *conn == NULL || strcmp(*conn, (str) str_nil) == 0)
     354           0 :                 throw(ILLARG, "remote.disconnect", ILLEGAL_ARGUMENT ": connection "
     355             :                           "is NULL or nil");
     356             : 
     357             : 
     358         186 :         (void) ret;
     359             : 
     360             :         /* we need a lock because the same user can be handled by multiple
     361             :          * threads */
     362         186 :         MT_lock_set(&mal_remoteLock);
     363         186 :         c = conns;
     364         186 :         t = NULL;                                       /* parent */
     365             :         /* walk through the list */
     366         262 :         while (c != NULL) {
     367         262 :                 if (strcmp(c->name, *conn) == 0) {
     368             :                         /* ok, delete it... */
     369         186 :                         if (t == NULL) {
     370         150 :                                 conns = c->next;
     371             :                         } else {
     372          36 :                                 t->next = c->next;
     373             :                         }
     374             : 
     375         186 :                         MT_lock_set(&c->lock);   /* shared connection */
     376         186 :                         mapi_disconnect(c->mconn);
     377         186 :                         mapi_destroy(c->mconn);
     378         186 :                         MT_lock_unset(&c->lock);
     379         186 :                         MT_lock_destroy(&c->lock);
     380         186 :                         GDKfree(c->name);
     381         186 :                         GDKfree(c);
     382         186 :                         MT_lock_unset(&mal_remoteLock);
     383         186 :                         return MAL_SUCCEED;
     384             :                 }
     385          76 :                 t = c;
     386          76 :                 c = c->next;
     387             :         }
     388             : 
     389           0 :         MT_lock_unset(&mal_remoteLock);
     390           0 :         throw(MAL, "remote.disconnect", "no such connection: %s", *conn);
     391             : }
     392             : 
     393             : /**
     394             :  * Helper function to return a connection matching a given string, or an
     395             :  * error if it does not exist.  Since this function is internal, it
     396             :  * doesn't check the argument conn, as it should have been checked
     397             :  * already.
     398             :  * NOTE: this function acquires the mal_remoteLock before accessing conns
     399             :  */
     400             : static inline str
     401        5393 : RMTfindconn(connection *ret, const char *conn)
     402             : {
     403        5393 :         connection c;
     404             : 
     405             :         /* just make sure the return isn't garbage */
     406        5393 :         *ret = NULL;
     407        5393 :         MT_lock_set(&mal_remoteLock);       /* protect c */
     408        5403 :         c = conns;
     409        7483 :         while (c != NULL) {
     410        7483 :                 if (strcmp(c->name, conn) == 0) {
     411        5403 :                         *ret = c;
     412        5403 :                         MT_lock_unset(&mal_remoteLock);
     413        5403 :                         return (MAL_SUCCEED);
     414             :                 }
     415        2080 :                 c = c->next;
     416             :         }
     417           0 :         MT_lock_unset(&mal_remoteLock);
     418           0 :         throw(MAL, "remote.<findconn>", "no such connection: %s", conn);
     419             : }
     420             : 
     421             : /**
     422             :  * Little helper function that returns a GDKmalloced string containing a
     423             :  * valid identifier that is supposed to be unique in the connection's
     424             :  * remote context.  The generated string depends on the module and
     425             :  * function the caller is in. But also the runtime context is important.
     426             :  * The format is rmt<id>_<retvar>_<type>.  Every RMTgetId uses a fresh id,
     427             :  * to distinguish amongst different (parallel) execution context.
     428             :  * Reuse of this remote identifier should be done with care.
     429             :  * The encoding of the type allows for ease of type checking later on.
     430             :  */
     431             : static inline str
     432        3270 : RMTgetId(char *buf, size_t buflen, MalBlkPtr mb, InstrPtr p, int arg)
     433             : {
     434        3270 :         InstrPtr f;
     435        3270 :         const char *mod;
     436        3270 :         char *var;
     437        3270 :         str rt;
     438        3270 :         char name[IDLENGTH] = { 0 };
     439        3270 :         static ATOMIC_TYPE idtag = ATOMIC_VAR_INIT(0);
     440             : 
     441        3270 :         if (p->retc == 0)
     442           0 :                 throw(MAL, "remote.getId",
     443             :                           ILLEGAL_ARGUMENT "MAL instruction misses retc");
     444             : 
     445        3270 :         var = getArgNameIntoBuffer(mb, p, arg, name);
     446        3268 :         f = getInstrPtr(mb, 0);         /* top level function */
     447        3268 :         mod = getModuleId(f);
     448        3268 :         if (mod == NULL)
     449        3268 :                 mod = "user";
     450        3268 :         rt = getTypeIdentifier(getArgType(mb, p, arg));
     451        3266 :         if (rt == NULL)
     452           0 :                 throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     453             : 
     454        3266 :         snprintf(buf, buflen, "rmt%u_%s_%s", (unsigned) ATOMIC_ADD(&idtag, 1), var,
     455             :                          rt);
     456             : 
     457        3266 :         GDKfree(rt);
     458        3266 :         return (MAL_SUCCEED);
     459             : }
     460             : 
     461             : /**
     462             :  * Helper function to execute a query over the given connection,
     463             :  * returning the result handle.  If communication fails in one way or
     464             :  * another, an error is returned.  Since this function is internal, it
     465             :  * doesn't check the input arguments func, conn and query, as they
     466             :  * should have been checked already.
     467             :  * NOTE: this function assumes a lock for conn is set
     468             :  */
     469             : static inline str
     470        4182 : RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query)
     471             : {
     472        4182 :         MapiHdl mhdl;
     473             : 
     474        4182 :         *ret = NULL;
     475        4182 :         mhdl = mapi_query(conn, query);
     476        4178 :         if (mhdl) {
     477        4178 :                 if (mapi_result_error(mhdl) != NULL) {
     478           6 :                         str err = createException(getExceptionType(mapi_result_error(mhdl)),
     479             :                                                                           func,
     480             :                                                                           "(mapi:monetdb://%s@%s/%s) %s",
     481             :                                                                           mapi_get_user(conn),
     482             :                                                                           mapi_get_host(conn),
     483             :                                                                           mapi_get_dbname(conn),
     484             :                                                                           getExceptionMessage(mapi_result_error(mhdl)));
     485           6 :                         mapi_close_handle(mhdl);
     486           6 :                         return (err);
     487             :                 }
     488             :         } else {
     489           0 :                 if (mapi_error(conn) != MOK) {
     490           0 :                         throw(IO, func, "an error occurred on connection: %s",
     491             :                                   mapi_error_str(conn));
     492             :                 } else {
     493           0 :                         throw(MAL, func,
     494             :                                   "remote function invocation didn't return a result");
     495             :                 }
     496             :         }
     497             : 
     498        4169 :         *ret = mhdl;
     499        4169 :         return (MAL_SUCCEED);
     500             : }
     501             : 
     502             : static str
     503         328 : RMTprelude(void)
     504             : {
     505         328 :         unsigned int type = 0;
     506             : 
     507             : #ifdef WORDS_BIGENDIAN
     508             :         type |= RMTT_B_ENDIAN;
     509             : #else
     510         328 :         type |= RMTT_L_ENDIAN;
     511             : #endif
     512             : #if SIZEOF_SIZE_T == SIZEOF_LNG
     513         328 :         type |= RMTT_64_BITS;
     514             : #else
     515             :         type |= RMTT_32_BITS;
     516             : #endif
     517             : #if SIZEOF_OID == SIZEOF_LNG
     518         328 :         type |= RMTT_64_OIDS;
     519             : #else
     520             :         type |= RMTT_32_OIDS;
     521             : #endif
     522             : #ifdef HAVE_HGE
     523         328 :         type |= RMTT_HGE;
     524         328 :         int128 = true;
     525             : #endif
     526         328 :         localtype = (unsigned char) type;
     527             : 
     528         328 :         return (MAL_SUCCEED);
     529             : }
     530             : 
     531             : static str
     532         326 : RMTepilogue(void *ret)
     533             : {
     534         326 :         connection c, t;
     535             : 
     536         326 :         (void) ret;
     537             : 
     538         326 :         MT_lock_set(&mal_remoteLock);       /* nobody allowed here */
     539             :         /* free connections list */
     540         326 :         c = conns;
     541         326 :         while (c != NULL) {
     542           0 :                 t = c;
     543           0 :                 c = c->next;
     544           0 :                 MT_lock_set(&t->lock);
     545           0 :                 mapi_destroy(t->mconn);
     546           0 :                 MT_lock_unset(&t->lock);
     547           0 :                 MT_lock_destroy(&t->lock);
     548           0 :                 GDKfree(t->name);
     549           0 :                 GDKfree(t);
     550             :         }
     551             :         /* not sure, but better be safe than sorry */
     552         326 :         conns = NULL;
     553         326 :         MT_lock_unset(&mal_remoteLock);
     554             : 
     555         326 :         return (MAL_SUCCEED);
     556             : }
     557             : 
     558             : static str
     559        1402 : RMTreadbatheader(stream *sin, char *buf)
     560             : {
     561        1402 :         ssize_t sz = 0, rd;
     562             : 
     563             :         /* read the JSON header */
     564      226113 :         while ((rd = mnstr_read(sin, &buf[sz], 1, 1)) == 1 && buf[sz] != '\n') {
     565      224711 :                 sz += rd;
     566             :         }
     567        1402 :         if (rd < 0) {
     568           0 :                 throw(MAL, "remote.get", "could not read BAT JSON header");
     569             :         }
     570        1402 :         if (buf[0] == '!') {
     571           0 :                 char *result;
     572           0 :                 if ((result = GDKstrdup(buf)) == NULL)
     573           0 :                         throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     574             :                 return result;
     575             :         }
     576             : 
     577        1402 :         buf[sz] = '\0';
     578             : 
     579        1402 :         return MAL_SUCCEED;
     580             : }
     581             : 
     582             : typedef struct _binbat_v1 {
     583             :         int Ttype;
     584             :         oid Hseqbase;
     585             :         oid Tseqbase;
     586             :         bool
     587             :          Tsorted:1, Trevsorted:1, Tkey:1, Tnonil:1, Tdense:1;
     588             :         BUN size;
     589             :         size_t tailsize;
     590             :         size_t theapsize;
     591             : } binbat;
     592             : 
     593             : static str
     594        1401 : RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in, bool must_flush, bool cint128)
     595             : {
     596        1401 :         binbat bb = { 0, 0, 0, false, false, false, false, false, 0, 0, 0 };
     597        1401 :         char *nme = NULL;
     598        1401 :         char *val = NULL;
     599        1401 :         char tmp;
     600        1401 :         size_t len;
     601        1401 :         lng lv, *lvp;
     602             : 
     603        1401 :         BAT *b;
     604             : 
     605        1401 :         (void) cint128;
     606             :         /* hdr is a JSON structure that looks like
     607             :          * {"version":1,"ttype":6,"tseqbase":0,"tailsize":4,"theapsize":0}
     608             :          * we take the binary data directly from the stream */
     609             : 
     610             :         /* could skip whitespace, but we just don't allow that */
     611        1401 :         if (*hdr++ != '{')
     612           0 :                 throw(MAL, "remote.bincopyfrom",
     613             :                           "illegal input, not a JSON header (got '%s')", hdr - 1);
     614      226086 :         while (*hdr != '\0') {
     615      224684 :                 switch (*hdr) {
     616       33632 :                 case '"':
     617             :                         /* we assume only numeric values, so all strings are
     618             :                          * elems */
     619       33632 :                         if (nme != NULL) {
     620       16819 :                                 *hdr = '\0';
     621             :                         } else {
     622       16813 :                                 nme = hdr + 1;
     623             :                         }
     624             :                         break;
     625       16819 :                 case ':':
     626       16819 :                         val = hdr + 1;
     627       16819 :                         break;
     628       16823 :                 case ',':
     629             :                 case '}':
     630       16823 :                         if (val == NULL)
     631           0 :                                 throw(MAL, "remote.bincopyfrom",
     632             :                                           "illegal input, JSON value missing");
     633       16823 :                         *hdr = '\0';
     634             : 
     635       16823 :                         lvp = &lv;
     636       16823 :                         len = sizeof(lv);
     637             :                         /* tseqbase can be 1<<31/1<<63 which causes overflow
     638             :                          * in lngFromStr, so we check separately */
     639       16823 :                         if (strcmp(val,
     640             : #if SIZEOF_OID == 8
     641             :                                            "9223372036854775808"
     642             : #else
     643             :                                            "2147483648"
     644             : #endif
     645        1375 :                                 ) == 0 && strcmp(nme, "tseqbase") == 0) {
     646        1372 :                                 bb.Tseqbase = oid_nil;
     647             :                         } else {
     648             :                                 /* all values should be non-negative, so we check that
     649             :                                  * here as well */
     650       15451 :                                 if (lngFromStr(val, &len, &lvp, true) < 0 ||
     651       15452 :                                         lv < 0 /* includes lng_nil */ )
     652           0 :                                         throw(MAL, "remote.bincopyfrom",
     653             :                                                   "bad %s value: %s", nme, val);
     654             : 
     655             :                                 /* deal with nme and val */
     656       15452 :                                 if (strcmp(nme, "version") == 0) {
     657        1402 :                                         if (lv != 1)
     658           0 :                                                 throw(MAL, "remote.bincopyfrom",
     659             :                                                           "unsupported version: %s", val);
     660       14050 :                                 } else if (strcmp(nme, "hseqbase") == 0) {
     661             : #if SIZEOF_OID < SIZEOF_LNG
     662             :                                         if (lv > GDK_oid_max)
     663             :                                                 throw(MAL, "remote.bincopyfrom",
     664             :                                                           "bad %s value: %s", nme, val);
     665             : #endif
     666        1402 :                                         bb.Hseqbase = (oid) lv;
     667       12648 :                                 } else if (strcmp(nme, "ttype") == 0) {
     668        1402 :                                         if (lv >= GDKatomcnt)
     669           0 :                                                 throw(MAL, "remote.bincopyfrom",
     670             :                                                           "bad %s value: GDK atom number %s doesn't exist",
     671             :                                                           nme, val);
     672        1402 :                                         bb.Ttype = (int) lv;
     673       11246 :                                 } else if (strcmp(nme, "tseqbase") == 0) {
     674             : #if SIZEOF_OID < SIZEOF_LNG
     675             :                                         if (lv > GDK_oid_max)
     676             :                                                 throw(MAL, "remote.bincopyfrom",
     677             :                                                           "bad %s value: %s", nme, val);
     678             : #endif
     679          30 :                                         bb.Tseqbase = (oid) lv;
     680       11216 :                                 } else if (strcmp(nme, "tsorted") == 0) {
     681        1402 :                                         bb.Tsorted = lv != 0;
     682        9814 :                                 } else if (strcmp(nme, "trevsorted") == 0) {
     683        1402 :                                         bb.Trevsorted = lv != 0;
     684        8412 :                                 } else if (strcmp(nme, "tkey") == 0) {
     685        1402 :                                         bb.Tkey = lv != 0;
     686        7010 :                                 } else if (strcmp(nme, "tnonil") == 0) {
     687        1402 :                                         bb.Tnonil = lv != 0;
     688        5608 :                                 } else if (strcmp(nme, "tdense") == 0) {
     689        1402 :                                         bb.Tdense = lv != 0;
     690        4206 :                                 } else if (strcmp(nme, "size") == 0) {
     691        1402 :                                         if (lv > (lng) BUN_MAX)
     692           0 :                                                 throw(MAL, "remote.bincopyfrom",
     693             :                                                           "bad %s value: %s", nme, val);
     694        1402 :                                         bb.size = (BUN) lv;
     695        2804 :                                 } else if (strcmp(nme, "tailsize") == 0) {
     696        1402 :                                         bb.tailsize = (size_t) lv;
     697        1402 :                                 } else if (strcmp(nme, "theapsize") == 0) {
     698        1402 :                                         bb.theapsize = (size_t) lv;
     699             :                                 } else {
     700           0 :                                         throw(MAL, "remote.bincopyfrom",
     701             :                                                   "unknown element: %s", nme);
     702             :                                 }
     703             :                         }
     704             :                         nme = val = NULL;
     705             :                         break;
     706             :                 }
     707      224685 :                 hdr++;
     708             :         }
     709             : #ifdef HAVE_HGE
     710        1402 :         if (int128 && !cint128 && bb.Ttype >= TYPE_hge)
     711           0 :                 bb.Ttype++;
     712             : #else
     713             :         (void) cint128;
     714             : #endif
     715             : 
     716        2712 :         b = COLnew2(bb.Hseqbase, bb.Ttype, bb.size, TRANSIENT,
     717        1310 :                                 bb.size > 0 ? (uint16_t) (bb.tailsize / bb.size) : 0);
     718        1402 :         if (b == NULL)
     719           0 :                 throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     720             : 
     721        1402 :         if (bb.tailsize > 0) {
     722        2570 :                 if (HEAPextend(b->theap, bb.tailsize, true) != GDK_SUCCEED ||
     723        1285 :                         mnstr_read(in, b->theap->base, bb.tailsize, 1) < 0)
     724           0 :                         goto bailout;
     725        1285 :                 b->theap->dirty = true;
     726             :         }
     727        1402 :         if (bb.theapsize > 0) {
     728         152 :                 if ((b->tvheap->base == NULL &&
     729          76 :                          (*BATatoms[b->ttype].atomHeap) (b->tvheap,
     730             :                                                                                          b->batCapacity) != GDK_SUCCEED)
     731          76 :                         || HEAPextend(b->tvheap, bb.theapsize, true) != GDK_SUCCEED
     732          76 :                         || mnstr_read(in, b->tvheap->base, bb.theapsize, 1) < 0)
     733           0 :                         goto bailout;
     734          76 :                 b->tvheap->free = bb.theapsize;
     735          76 :                 b->tvheap->dirty = true;
     736             :         }
     737             : 
     738             :         /* set properties */
     739        1402 :         b->tseqbase = bb.Tdense ? bb.Tseqbase : oid_nil;
     740        1402 :         b->tsorted = bb.Tsorted;
     741        1402 :         b->trevsorted = bb.Trevsorted;
     742        1402 :         b->tkey = bb.Tkey;
     743        1402 :         b->tnonil = bb.Tnonil;
     744        1402 :         if (bb.Ttype == TYPE_str && bb.size)
     745          76 :                 BATsetcapacity(b, (BUN) (bb.tailsize >> b->tshift));
     746        1402 :         BATsetcount(b, bb.size);
     747             : 
     748             :         // read blockmode flush
     749        1402 :         while (must_flush && mnstr_read(in, &tmp, 1, 1) > 0) {
     750           0 :                 TRC_ERROR(MAL_REMOTE, "Expected flush, got: %c\n", tmp);
     751             :         }
     752             : 
     753        1402 :         BATsettrivprop(b);
     754             : 
     755        1402 :         *ret = b;
     756        1402 :         return (MAL_SUCCEED);
     757             : 
     758             :   bailout:
     759           0 :         BBPreclaim(b);
     760           0 :         throw(MAL, "remote.bincopyfrom", "reading failed");
     761             : }
     762             : 
     763             : /**
     764             :  * get fetches the object referenced by ident over connection conn.
     765             :  * We are only interested in retrieving void-headed BATs, i.e. single columns.
     766             :  */
     767             : static str
     768        1400 : RMTget(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     769             : {
     770        1400 :         str conn, ident, tmp, rt;
     771        1400 :         connection c;
     772        1400 :         char qbuf[BUFSIZ + 1];
     773        1400 :         MapiHdl mhdl = NULL;
     774        1400 :         int rtype;
     775        1400 :         ValPtr v;
     776             : 
     777        1400 :         (void) mb;
     778        1400 :         (void) cntxt;
     779             : 
     780        1400 :         conn = *getArgReference_str(stk, pci, 1);
     781        1400 :         if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
     782           0 :                 throw(ILLARG, "remote.get",
     783             :                           ILLEGAL_ARGUMENT ": connection name is NULL or nil");
     784        1400 :         ident = *getArgReference_str(stk, pci, 2);
     785        1400 :         if (ident == 0 || isIdentifier(ident) < 0)
     786           0 :                 throw(ILLARG, "remote.get",
     787             :                           ILLEGAL_ARGUMENT ": identifier expected, got '%s'", ident);
     788             : 
     789             :         /* lookup conn, set c if valid */
     790        1400 :         rethrow("remote.get", tmp, RMTfindconn(&c, conn));
     791             : 
     792        1400 :         rtype = getArgType(mb, pci, 0);
     793        1400 :         v = &stk->stk[pci->argv[0]];
     794             : 
     795        1400 :         if (rtype == TYPE_any || isAnyExpression(rtype)) {
     796           0 :                 char *tpe, *msg;
     797           0 :                 tpe = getTypeName(rtype);
     798           0 :                 msg = createException(MAL, "remote.get",
     799             :                                                           ILLEGAL_ARGUMENT ": unsupported any type: %s",
     800             :                                                           tpe);
     801           0 :                 GDKfree(tpe);
     802           0 :                 return msg;
     803             :         }
     804             :         /* check if the remote type complies with what we expect.
     805             :            Since the put() encodes the type as known to the remote site
     806             :            we can simple compare it here */
     807        1400 :         rt = getTypeIdentifier(rtype);
     808        1400 :         if (rt == NULL)
     809           0 :                 throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     810        1400 :         if (strcmp(ident + strlen(ident) - strlen(rt), rt)) {
     811           0 :                 tmp = createException(MAL, "remote.get", ILLEGAL_ARGUMENT
     812             :                                                           ": remote object type %s does not match expected type %s",
     813             :                                                           rt, ident);
     814           0 :                 GDKfree(rt);
     815           0 :                 return tmp;
     816             :         }
     817        1400 :         GDKfree(rt);
     818             : 
     819        1400 :         if (isaBatType(rtype) && (localtype == 0177 || (localtype != c->type && localtype != (c->type | RMTT_HGE)))) {
     820           0 :                 int t;
     821           0 :                 size_t s;
     822           0 :                 ptr r;
     823           0 :                 str var;
     824           0 :                 BAT *b;
     825             : 
     826           0 :                 snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
     827             : 
     828           0 :                 TRC_DEBUG(MAL_REMOTE, "Remote get: %s\n", qbuf);
     829             : 
     830             :                 /* this call should be a single transaction over the channel */
     831           0 :                 MT_lock_set(&c->lock);
     832             : 
     833           0 :                 if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf))
     834             :                         != MAL_SUCCEED) {
     835           0 :                         TRC_ERROR(MAL_REMOTE, "Remote get: %s\n%s\n", qbuf, tmp);
     836           0 :                         MT_lock_unset(&c->lock);
     837           0 :                         var = createException(MAL, "remote.get", "%s", tmp);
     838           0 :                         freeException(tmp);
     839           0 :                         return var;
     840             :                 }
     841           0 :                 t = getBatType(rtype);
     842           0 :                 b = COLnew(0, t, 0, TRANSIENT);
     843           0 :                 if (b == NULL) {
     844           0 :                         mapi_close_handle(mhdl);
     845           0 :                         MT_lock_unset(&c->lock);
     846           0 :                         throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     847             :                 }
     848             : 
     849           0 :                 if (ATOMbasetype(t) == TYPE_str) {
     850           0 :                         while (mapi_fetch_row(mhdl)) {
     851           0 :                                 var = mapi_fetch_field(mhdl, 1);
     852           0 :                                 if (BUNappend(b, var == NULL ? str_nil : var, false) != GDK_SUCCEED) {
     853           0 :                                         BBPreclaim(b);
     854           0 :                                         mapi_close_handle(mhdl);
     855           0 :                                         MT_lock_unset(&c->lock);
     856           0 :                                         throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     857             :                                 }
     858             :                         }
     859             :                 } else
     860           0 :                         while (mapi_fetch_row(mhdl)) {
     861           0 :                                 var = mapi_fetch_field(mhdl, 1);
     862           0 :                                 if (var == NULL)
     863           0 :                                         var = "nil";
     864           0 :                                 s = 0;
     865           0 :                                 r = NULL;
     866           0 :                                 if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
     867           0 :                                         BUNappend(b, r, false) != GDK_SUCCEED) {
     868           0 :                                         BBPreclaim(b);
     869           0 :                                         GDKfree(r);
     870           0 :                                         mapi_close_handle(mhdl);
     871           0 :                                         MT_lock_unset(&c->lock);
     872           0 :                                         throw(MAL, "remote.get", GDK_EXCEPTION);
     873             :                                 }
     874           0 :                                 GDKfree(r);
     875             :                         }
     876             : 
     877           0 :                 *v = (ValRecord) {
     878           0 :                         .val.bval = b->batCacheid,
     879             :                         .bat = true,
     880           0 :                         .vtype = b->ttype,
     881             :                 };
     882           0 :                 BBPkeepref(b);
     883             : 
     884           0 :                 mapi_close_handle(mhdl);
     885           0 :                 MT_lock_unset(&c->lock);
     886        1400 :         } else if (isaBatType(rtype)) {
     887             :                 /* binary compatible remote host, transfer BAT in binary form */
     888        1400 :                 stream *sout;
     889        1400 :                 stream *sin;
     890        1400 :                 char buf[256];
     891        1400 :                 BAT *b = NULL;
     892             : 
     893             :                 /* this call should be a single transaction over the channel */
     894        1400 :                 MT_lock_set(&c->lock);
     895             : 
     896             :                 /* bypass Mapi from this point to efficiently write all data to
     897             :                  * the server */
     898        1400 :                 sout = mapi_get_to(c->mconn);
     899        1400 :                 sin = mapi_get_from(c->mconn);
     900        1400 :                 if (sin == NULL || sout == NULL) {
     901           0 :                         MT_lock_unset(&c->lock);
     902           0 :                         throw(MAL, "remote.get", "Connection lost");
     903             :                 }
     904             : 
     905             :                 /* call our remote helper to do this more efficiently */
     906        1400 :                 mnstr_printf(sout, "remote.batbincopy(%s);\n", ident);
     907        1400 :                 mnstr_flush(sout, MNSTR_FLUSH_DATA);
     908             : 
     909        1400 :                 if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED) {
     910           0 :                         MT_lock_unset(&c->lock);
     911           0 :                         return tmp;
     912             :                 }
     913             : 
     914        1399 :                 if ((tmp = RMTinternalcopyfrom(&b, buf, sin, true, c->int128)) != MAL_SUCCEED) {
     915           0 :                         MT_lock_unset(&c->lock);
     916           0 :                         return (tmp);
     917             :                 }
     918             : 
     919        1400 :                 *v = (ValRecord) {
     920        1400 :                         .val.bval = b->batCacheid,
     921             :                         .bat = true,
     922        1400 :                         .vtype = b->ttype,
     923             :                 };
     924        1400 :                 BBPkeepref(b);
     925             : 
     926        1400 :                 MT_lock_unset(&c->lock);
     927             :         } else {
     928           0 :                 ptr p = NULL;
     929           0 :                 str val;
     930           0 :                 size_t len = 0;
     931             : 
     932           0 :                 snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
     933           0 :                 TRC_DEBUG(MAL_REMOTE, "Remote get: %s - %s\n", c->name, qbuf);
     934           0 :                 if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf)) != MAL_SUCCEED) {
     935           0 :                         return tmp;
     936             :                 }
     937           0 :                 (void) mapi_fetch_row(mhdl);    /* should succeed */
     938           0 :                 val = mapi_fetch_field(mhdl, 0);
     939             : 
     940           0 :                 if (ATOMbasetype(rtype) == TYPE_str) {
     941           0 :                         if (!VALinit(v, rtype, val == NULL ? str_nil : val)) {
     942           0 :                                 mapi_close_handle(mhdl);
     943           0 :                                 throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     944             :                         }
     945           0 :                 } else if (ATOMfromstr(rtype, &p, &len, val == NULL ? "nil" : val, true)
     946             :                                    < 0) {
     947           0 :                         char *msg;
     948           0 :                         msg = createException(MAL, "remote.get",
     949             :                                                                   "unable to parse value: %s",
     950             :                                                                   val == NULL ? "nil" : val);
     951           0 :                         mapi_close_handle(mhdl);
     952           0 :                         GDKfree(p);
     953           0 :                         return msg;
     954             :                 } else {
     955           0 :                         VALset(v, rtype, p);
     956           0 :                         if (ATOMextern(rtype) == 0)
     957           0 :                                 GDKfree(p);
     958             :                 }
     959             : 
     960           0 :                 mapi_close_handle(mhdl);
     961             :         }
     962             : 
     963             :         return (MAL_SUCCEED);
     964             : }
     965             : 
     966             : /**
     967             :  * stores the given object on the remote host.  The identifier of the
     968             :  * object on the remote host is returned for later use.
     969             :  */
     970             : static str
     971        3268 : RMTput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     972             : {
     973        3268 :         str conn, tmp;
     974        3268 :         char ident[512];
     975        3268 :         connection c;
     976        3268 :         ValPtr v;
     977        3268 :         int type;
     978        3268 :         ptr value;
     979        3268 :         MapiHdl mhdl = NULL;
     980             : 
     981        3268 :         (void) cntxt;
     982             : 
     983        3268 :         conn = *getArgReference_str(stk, pci, 1);
     984        3268 :         if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
     985           3 :                 throw(ILLARG, "remote.put",
     986             :                           ILLEGAL_ARGUMENT ": connection name is NULL or nil");
     987             : 
     988             :         /* lookup conn */
     989        3265 :         rethrow("remote.put", tmp, RMTfindconn(&c, conn));
     990             : 
     991             :         /* put the thing */
     992        3270 :         type = getArgType(mb, pci, 2);
     993        3270 :         value = getArgReference(stk, pci, 2);
     994             : 
     995             :         /* this call should be a single transaction over the channel */
     996        3270 :         MT_lock_set(&c->lock);
     997             : 
     998             :         /* get a free, typed identifier for the remote host */
     999        3270 :         tmp = RMTgetId(ident, sizeof(ident), mb, pci, 2);
    1000        3263 :         if (tmp != MAL_SUCCEED) {
    1001           0 :                 MT_lock_unset(&c->lock);
    1002           0 :                 return tmp;
    1003             :         }
    1004             : 
    1005             :         /* depending on the input object generate actions to store the
    1006             :          * object remotely*/
    1007        3263 :         if (type == TYPE_any || isAnyExpression(type)) {
    1008           0 :                 char *tpe, *msg;
    1009           0 :                 MT_lock_unset(&c->lock);
    1010           0 :                 tpe = getTypeName(type);
    1011           0 :                 msg = createException(MAL, "remote.put", "unsupported type: %s", tpe);
    1012           0 :                 GDKfree(tpe);
    1013           0 :                 return msg;
    1014        4664 :         } else if (isaBatType(type) && !is_bat_nil(*(bat *) value)) {
    1015        1402 :                 BATiter bi;
    1016             :                 /* naive approach using bat.new() and bat.insert() calls */
    1017        1402 :                 char *tail;
    1018        1402 :                 bat bid;
    1019        1402 :                 BAT *b = NULL;
    1020        1402 :                 BUN p, q;
    1021        1402 :                 str tailv;
    1022        1402 :                 stream *sout;
    1023             : 
    1024        1402 :                 tail = getTypeIdentifier(getBatType(type));
    1025        1402 :                 if (tail == NULL) {
    1026           0 :                         MT_lock_unset(&c->lock);
    1027           0 :                         throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1028             :                 }
    1029             : 
    1030        1402 :                 bid = *(bat *) value;
    1031        1402 :                 if (bid != 0) {
    1032        1401 :                         if ((b = BATdescriptor(bid)) == NULL) {
    1033           0 :                                 MT_lock_unset(&c->lock);
    1034           0 :                                 GDKfree(tail);
    1035           0 :                                 throw(MAL, "remote.put",
    1036             :                                           SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
    1037             :                         }
    1038             :                 }
    1039             : 
    1040             :                 /* bypass Mapi from this point to efficiently write all data to
    1041             :                  * the server */
    1042        1403 :                 sout = mapi_get_to(c->mconn);
    1043             : 
    1044             :                 /* call our remote helper to do this more efficiently */
    1045        1402 :                 mnstr_printf(sout,
    1046             :                                          "%s := remote.batload(nil:%s, " BUNFMT ");\n",
    1047             :                                          ident, tail, (bid == 0 ? 0 : BATcount(b)));
    1048        1401 :                 mnstr_flush(sout, MNSTR_FLUSH_DATA);
    1049        1401 :                 GDKfree(tail);
    1050             : 
    1051             :                 /* b can be NULL if bid == 0 (only type given, ugh) */
    1052        1402 :                 if (b) {
    1053        2804 :                         int tpe = getBatType(type), trivial = tpe < TYPE_date
    1054        1402 :                                         || ATOMbasetype(tpe) == TYPE_str;
    1055        1402 :                         const void *nil = ATOMnilptr(tpe);
    1056        1402 :                         int (*atomcmp)(const void *, const void *) = ATOMcompare(tpe);
    1057             : 
    1058        1402 :                         bi = bat_iterator(b);
    1059        1402 :                         BATloop(b, p, q) {
    1060           0 :                                 const void *v = BUNtail(bi, p);
    1061           0 :                                 tailv = ATOMformat(tpe, v);
    1062           0 :                                 if (tailv == NULL) {
    1063           0 :                                         bat_iterator_end(&bi);
    1064           0 :                                         BBPunfix(b->batCacheid);
    1065           0 :                                         MT_lock_unset(&c->lock);
    1066           0 :                                         throw(MAL, "remote.put", GDK_EXCEPTION);
    1067             :                                 }
    1068           0 :                                 if (trivial || atomcmp(v, nil) == 0)
    1069           0 :                                         mnstr_printf(sout, "%s\n", tailv);
    1070             :                                 else
    1071           0 :                                         mnstr_printf(sout, "\"%s\"\n", tailv);
    1072           0 :                                 GDKfree(tailv);
    1073             :                         }
    1074        1402 :                         bat_iterator_end(&bi);
    1075        1402 :                         BBPunfix(b->batCacheid);
    1076             :                 }
    1077             : 
    1078             :                 /* write the empty line the server is waiting for, handles
    1079             :                  * all errors at the same time, if any */
    1080        1401 :                 if ((tmp = RMTquery(&mhdl, "remote.put", c->mconn, ""))
    1081             :                         != MAL_SUCCEED) {
    1082           0 :                         MT_lock_unset(&c->lock);
    1083           0 :                         return tmp;
    1084             :                 }
    1085        1401 :                 mapi_close_handle(mhdl);
    1086           0 :         } else if (isaBatType(type) && is_bat_nil(*(bat *) value)) {
    1087           0 :                 stream *sout;
    1088           0 :                 str typename = getTypeName(type);
    1089           0 :                 sout = mapi_get_to(c->mconn);
    1090           0 :                 mnstr_printf(sout, "%s := nil:%s;\n", ident, typename);
    1091           0 :                 mnstr_flush(sout, MNSTR_FLUSH_DATA);
    1092           0 :                 GDKfree(typename);
    1093             :         } else {
    1094        1861 :                 size_t l;
    1095        1861 :                 str val;
    1096        1861 :                 char *tpe;
    1097        1861 :                 char qbuf[512], *nbuf = qbuf;
    1098        1861 :                 const void *nil = ATOMnilptr(type), *p = value;
    1099        1861 :                 int (*atomcmp)(const void *, const void *) = ATOMcompare(type);
    1100             : 
    1101        1861 :                 if (ATOMextern(type))
    1102        1289 :                         p = *(ptr *) value;
    1103             : 
    1104        1861 :                 val = ATOMformat(type, p);
    1105        1863 :                 if (val == NULL) {
    1106           0 :                         MT_lock_unset(&c->lock);
    1107           0 :                         throw(MAL, "remote.put", GDK_EXCEPTION);
    1108             :                 }
    1109        1863 :                 tpe = getTypeIdentifier(type);
    1110        1859 :                 if (tpe == NULL) {
    1111           0 :                         MT_lock_unset(&c->lock);
    1112           0 :                         GDKfree(val);
    1113           0 :                         throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1114             :                 }
    1115        1859 :                 l = strlen(val) + strlen(tpe) + strlen(ident) + 10;
    1116        1859 :                 if (l > (ssize_t) sizeof(qbuf) && (nbuf = GDKmalloc(l)) == NULL) {
    1117           0 :                         MT_lock_unset(&c->lock);
    1118           0 :                         GDKfree(val);
    1119           0 :                         GDKfree(tpe);
    1120           0 :                         throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1121             :                 }
    1122        1860 :                 if (type < TYPE_date || ATOMbasetype(type) == TYPE_str
    1123           8 :                         || atomcmp(p, nil) == 0)
    1124        1854 :                         snprintf(nbuf, l, "%s := %s:%s;\n", ident, val, tpe);
    1125             :                 else
    1126           5 :                         snprintf(nbuf, l, "%s := \"%s\":%s;\n", ident, val, tpe);
    1127        1859 :                 GDKfree(tpe);
    1128        1868 :                 GDKfree(val);
    1129        1868 :                 TRC_DEBUG(MAL_REMOTE, "Remote put: %s - %s\n", c->name, nbuf);
    1130        1868 :                 tmp = RMTquery(&mhdl, "remote.put", c->mconn, nbuf);
    1131        1856 :                 if (nbuf != qbuf)
    1132          32 :                         GDKfree(nbuf);
    1133        1855 :                 if (tmp != MAL_SUCCEED) {
    1134           0 :                         MT_lock_unset(&c->lock);
    1135           0 :                         return tmp;
    1136             :                 }
    1137        1855 :                 mapi_close_handle(mhdl);
    1138             :         }
    1139        3263 :         MT_lock_unset(&c->lock);
    1140             : 
    1141             :         /* return the identifier */
    1142        3267 :         v = &stk->stk[pci->argv[0]];
    1143        3267 :         if (VALinit(v, TYPE_str, ident) == NULL)
    1144           0 :                 throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1145             :         return (MAL_SUCCEED);
    1146             : }
    1147             : 
    1148             : /**
    1149             :  * stores the given <mod>.<fcn> on the remote host.
    1150             :  * An error is returned if the function is already known at the remote site.
    1151             :  * The implementation is based on serialisation of the block into a string
    1152             :  * followed by remote parsing.
    1153             :  */
    1154             : static str
    1155           0 : RMTregisterInternal(Client cntxt, char **fcn_id, const char *conn,
    1156             :                                         const char *mod, const char *fcn)
    1157             : {
    1158           0 :         str tmp, qry, msg;
    1159           0 :         connection c;
    1160           0 :         char buf[BUFSIZ];
    1161           0 :         MapiHdl mhdl = NULL;
    1162           0 :         Symbol sym;
    1163             : 
    1164           0 :         if (strNil(conn))
    1165           0 :                 throw(ILLARG, "remote.register",
    1166             :                           ILLEGAL_ARGUMENT ": connection name is NULL or nil");
    1167             : 
    1168             :         /* find local definition */
    1169           0 :         sym = findSymbol(cntxt->usermodule, putName(mod), putName(fcn));
    1170           0 :         if (sym == NULL)
    1171           0 :                 throw(MAL, "remote.register",
    1172             :                           ILLEGAL_ARGUMENT ": no such function: %s.%s", mod, fcn);
    1173             : 
    1174             :         /* lookup conn */
    1175           0 :         rethrow("remote.register", tmp, RMTfindconn(&c, conn));
    1176             : 
    1177             :         /* this call should be a single transaction over the channel */
    1178           0 :         MT_lock_set(&c->lock);
    1179             : 
    1180             :         /* get a free, typed identifier for the remote host */
    1181           0 :         char ident[512];
    1182           0 :         tmp = RMTgetId(ident, sizeof(ident), sym->def, getInstrPtr(sym->def, 0), 0);
    1183           0 :         if (tmp != MAL_SUCCEED) {
    1184           0 :                 MT_lock_unset(&c->lock);
    1185           0 :                 return tmp;
    1186             :         }
    1187             : 
    1188             :         /* check remote definition */
    1189           0 :         snprintf(buf, BUFSIZ,
    1190             :                          "b:bit:=inspect.getExistence(\"%s\",\"%s\");\nio.print(b);", mod,
    1191             :                          ident);
    1192           0 :         TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, buf);
    1193           0 :         if ((msg = RMTquery(&mhdl, "remote.register", c->mconn, buf)) != MAL_SUCCEED) {
    1194           0 :                 MT_lock_unset(&c->lock);
    1195           0 :                 return msg;
    1196             :         }
    1197             : 
    1198           0 :         char *result;
    1199           0 :         if (mapi_get_field_count(mhdl) && mapi_fetch_row(mhdl)
    1200           0 :                 && (result = mapi_fetch_field(mhdl, 0))) {
    1201           0 :                 if (strcmp(result, "false") != 0)
    1202           0 :                         msg = createException(MAL, "remote.register",
    1203             :                                                                   "function already exists at the remote site: %s.%s",
    1204             :                                                                   mod, fcn);
    1205             :         } else
    1206           0 :                 msg = createException(MAL, "remote.register", OPERATION_FAILED);
    1207             : 
    1208           0 :         mapi_close_handle(mhdl);
    1209             : 
    1210           0 :         if (msg) {
    1211           0 :                 MT_lock_unset(&c->lock);
    1212           0 :                 return msg;
    1213             :         }
    1214             : 
    1215           0 :         *fcn_id = GDKstrdup(ident);
    1216           0 :         if (*fcn_id == NULL) {
    1217           0 :                 MT_lock_unset(&c->lock);
    1218           0 :                 throw(MAL, "Remote register", MAL_MALLOC_FAIL);
    1219             :         }
    1220             : 
    1221           0 :         Symbol prg;
    1222           0 :         if ((prg = newFunctionArgs(putName(mod), putName(*fcn_id), FUNCTIONsymbol, -1)) == NULL) {
    1223           0 :                 MT_lock_unset(&c->lock);
    1224           0 :                 throw(MAL, "Remote register", MAL_MALLOC_FAIL);
    1225             :         }
    1226             : 
    1227             :         // We only need the Symbol not the inner program stub. So we clear it.
    1228           0 :         freeMalBlk(prg->def);
    1229           0 :         prg->def = NULL;
    1230             : 
    1231           0 :         if ((prg->def = copyMalBlk(sym->def)) == NULL) {
    1232           0 :                 MT_lock_unset(&c->lock);
    1233           0 :                 freeSymbol(prg);
    1234           0 :                 throw(MAL, "Remote register", MAL_MALLOC_FAIL);
    1235             :         }
    1236           0 :         setFunctionId(getInstrPtr(prg->def, 0), putName(*fcn_id));
    1237             : 
    1238             :         /* make sure the program is error free */
    1239           0 :         msg = chkProgram(cntxt->usermodule, prg->def);
    1240           0 :         if (msg != MAL_SUCCEED || prg->def->errors) {
    1241           0 :                 MT_lock_unset(&c->lock);
    1242           0 :                 if (msg)
    1243             :                         return msg;
    1244           0 :                 throw(MAL, "remote.register",
    1245             :                           "function '%s.%s' contains syntax or type errors", mod, *fcn_id);
    1246             :         }
    1247             : 
    1248           0 :         qry = mal2str(prg->def, 0, prg->def->stop);
    1249           0 :         TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, qry);
    1250           0 :         msg = RMTquery(&mhdl, "remote.register", c->mconn, qry);
    1251           0 :         GDKfree(qry);
    1252           0 :         if (mhdl)
    1253           0 :                 mapi_close_handle(mhdl);
    1254             : 
    1255           0 :         freeSymbol(prg);
    1256             : 
    1257           0 :         MT_lock_unset(&c->lock);
    1258           0 :         return msg;
    1259             : }
    1260             : 
    1261             : static str
    1262           0 : RMTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1263             : {
    1264           0 :         char **fcn_id = getArgReference_str(stk, pci, 0);
    1265           0 :         const char *conn = *getArgReference_str(stk, pci, 1);
    1266           0 :         const char *mod = *getArgReference_str(stk, pci, 2);
    1267           0 :         const char *fcn = *getArgReference_str(stk, pci, 3);
    1268           0 :         (void) mb;
    1269           0 :         return RMTregisterInternal(cntxt, fcn_id, conn, mod, fcn);
    1270             : }
    1271             : 
    1272             : /**
    1273             :  * exec executes the function with its given arguments on the remote
    1274             :  * host, returning the function's return value.  exec is purposely kept
    1275             :  * very spartan.  All arguments need to be handles to previously put()
    1276             :  * values.  It calls the function with the given arguments at the remote
    1277             :  * site, and returns the handle which stores the return value of the
    1278             :  * remotely executed function.  This return value can be retrieved using
    1279             :  * a get call. It handles multiple return arguments.
    1280             :  */
    1281             : static str
    1282         733 : RMTexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1283             : {
    1284         733 :         str conn, mod, func, tmp;
    1285         733 :         int i;
    1286         733 :         size_t len, buflen;
    1287         733 :         connection c = NULL;
    1288         733 :         char *qbuf;
    1289         733 :         MapiHdl mhdl;
    1290             : 
    1291         733 :         (void) cntxt;
    1292         733 :         (void) mb;
    1293         733 :         bool no_return_arguments = 0;
    1294             : 
    1295         733 :         columnar_result_callback *rcb = NULL;
    1296         733 :         ValRecord *v = &(stk)->stk[(pci)->argv[4]];
    1297         733 :         if (pci->retc == 1 && (pci->argc >= 4) && (v->vtype == TYPE_ptr)) {
    1298           1 :                 rcb = (columnar_result_callback *) v->val.pval;
    1299             :         }
    1300             : 
    1301        2687 :         for (i = 0; i < pci->retc; i++) {
    1302        1954 :                 if (stk->stk[pci->argv[i]].vtype == TYPE_str) {
    1303        1953 :                         tmp = *getArgReference_str(stk, pci, i);
    1304        1953 :                         if (tmp == NULL || strcmp(tmp, (str) str_nil) == 0)
    1305           0 :                                 throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT
    1306             :                                           ": return value %d is NULL or nil", i);
    1307             :                 } else
    1308             :                         no_return_arguments = 1;
    1309             :         }
    1310             : 
    1311         733 :         conn = *getArgReference_str(stk, pci, i++);
    1312         733 :         if (conn == NULL || strcmp(conn, (str) str_nil) == 0)
    1313           0 :                 throw(ILLARG, "remote.exec",
    1314             :                           ILLEGAL_ARGUMENT ": connection name is NULL or nil");
    1315         733 :         mod = *getArgReference_str(stk, pci, i++);
    1316         733 :         if (mod == NULL || strcmp(mod, (str) str_nil) == 0)
    1317           0 :                 throw(ILLARG, "remote.exec",
    1318             :                           ILLEGAL_ARGUMENT ": module name is NULL or nil");
    1319         733 :         func = *getArgReference_str(stk, pci, i++);
    1320         733 :         if (func == NULL || strcmp(func, (str) str_nil) == 0)
    1321           0 :                 throw(ILLARG, "remote.exec",
    1322             :                           ILLEGAL_ARGUMENT ": function name is NULL or nil");
    1323             : 
    1324             :         /* lookup conn */
    1325         733 :         rethrow("remote.exec", tmp, RMTfindconn(&c, conn));
    1326             : 
    1327             :         /* this call should be a single transaction over the channel */
    1328         733 :         MT_lock_set(&c->lock);
    1329             : 
    1330         733 :         if (!no_return_arguments && pci->argc - pci->retc < 3) {       /* conn, mod, func, ... */
    1331           0 :                 MT_lock_unset(&c->lock);
    1332           0 :                 throw(MAL, "remote.exec",
    1333             :                           ILLEGAL_ARGUMENT " MAL instruction misses arguments");
    1334             :         }
    1335             : 
    1336         733 :         len = 0;
    1337             :         /* count how big a buffer we need */
    1338         733 :         len += 2 * (pci->retc > 1);
    1339         733 :         if (!no_return_arguments)
    1340        2685 :                 for (i = 0; i < pci->retc; i++) {
    1341        1953 :                         len += 2 * (i > 0);
    1342        1953 :                         len += strlen(*getArgReference_str(stk, pci, i));
    1343             :                 }
    1344             : 
    1345         733 :         const int arg_index = rcb ? 4 : 3;
    1346             : 
    1347         733 :         len += strlen(mod) + strlen(func) + 6;
    1348        2050 :         for (i = arg_index; i < pci->argc - pci->retc; i++) {
    1349        1317 :                 len += 2 * (i > arg_index);
    1350        1317 :                 len += strlen(*getArgReference_str(stk, pci, pci->retc + i));
    1351             :         }
    1352         733 :         len += 2;
    1353         733 :         buflen = len + 1;
    1354         733 :         if ((qbuf = GDKmalloc(buflen)) == NULL) {
    1355           0 :                 MT_lock_unset(&c->lock);
    1356           0 :                 throw(MAL, "remote.exec", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1357             :         }
    1358             : 
    1359         729 :         len = 0;
    1360             : 
    1361         729 :         if (pci->retc > 1)
    1362          70 :                 qbuf[len++] = '(';
    1363         729 :         if (!no_return_arguments)
    1364        2685 :                 for (i = 0; i < pci->retc; i++)
    1365        1953 :                         len += snprintf(&qbuf[len], buflen - len, "%s%s",
    1366        1953 :                                                         (i > 0 ? ", " : ""), *getArgReference_str(stk, pci,
    1367             :                                                                                                                                           i));
    1368             : 
    1369         729 :         if (pci->retc > 1)
    1370          70 :                 qbuf[len++] = ')';
    1371             : 
    1372             :         /* build the function invocation string in qbuf */
    1373         729 :         if (!no_return_arguments && pci->retc > 0) {
    1374         731 :                 len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, func);
    1375             :         } else {
    1376           0 :                 len += snprintf(&qbuf[len], buflen - len, " %s.%s(", mod, func);
    1377             :         }
    1378             : 
    1379             :         /* handle the arguments to the function */
    1380             : 
    1381             :         /* put the arguments one by one, and dynamically build the
    1382             :          * invocation string */
    1383        2044 :         for (i = arg_index; i < pci->argc - pci->retc; i++) {
    1384        1315 :                 len += snprintf(&qbuf[len], buflen - len, "%s%s",
    1385             :                                                 (i > arg_index ? ", " : ""),
    1386        1315 :                                                 *(getArgReference_str(stk, pci, pci->retc + i)));
    1387             :         }
    1388             : 
    1389             :         /* finish end execute the invocation string */
    1390         729 :         len += snprintf(&qbuf[len], buflen - len, ");");
    1391         729 :         TRC_DEBUG(MAL_REMOTE, "Remote exec: %s - %s\n", c->name, qbuf);
    1392         729 :         tmp = RMTquery(&mhdl, "remote.exec", c->mconn, qbuf);
    1393         732 :         GDKfree(qbuf);
    1394             : 
    1395             :         /* Temporary hack:
    1396             :          * use a callback to immediately handle columnar results before hdl is destroyed. */
    1397         732 :         if (tmp == MAL_SUCCEED && rcb && mhdl
    1398           1 :                 && (mapi_get_querytype(mhdl) == Q_TABLE
    1399           0 :                         || mapi_get_querytype(mhdl) == Q_PREPARE)) {
    1400           1 :                 int fields = mapi_get_field_count(mhdl);
    1401           1 :                 columnar_result *results = GDKzalloc(sizeof(columnar_result) * fields);
    1402             : 
    1403           1 :                 if (!results) {
    1404           0 :                         tmp = createException(MAL, "remote.exec",
    1405             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1406             :                 } else {
    1407           1 :                         int i = 0;
    1408           1 :                         char buf[256] = { 0 };
    1409           1 :                         stream *sin = mapi_get_from(c->mconn);
    1410             : 
    1411           4 :                         for (; i < fields; i++) {
    1412           2 :                                 BAT *b = NULL;
    1413             : 
    1414           2 :                                 if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED ||
    1415           2 :                                         (tmp = RMTinternalcopyfrom(&b, buf, sin, i == fields - 1, c->int128)) != MAL_SUCCEED) {
    1416             :                                         break;
    1417             :                                 }
    1418             : 
    1419           2 :                                 results[i].id = b->batCacheid;
    1420           2 :                                 BBPkeepref(b);
    1421           2 :                                 results[i].colname = mapi_get_name(mhdl, i);
    1422           2 :                                 results[i].tpename = mapi_get_type(mhdl, i);
    1423           2 :                                 results[i].digits = mapi_get_digits(mhdl, i);
    1424           2 :                                 results[i].scale = mapi_get_scale(mhdl, i);
    1425             :                         }
    1426             : 
    1427           1 :                         if (tmp != MAL_SUCCEED) {
    1428           0 :                                 for (int j = 0; j < i; j++)
    1429           0 :                                         BBPrelease(results[j].id);
    1430             :                         } else {
    1431           1 :                                 assert(rcb->context);
    1432           1 :                                 tmp = rcb->call(rcb->context, mapi_get_table(mhdl, 0), results,
    1433             :                                                                 fields);
    1434           3 :                                 for (int j = 0; j < i; j++)
    1435           2 :                                         BBPrelease(results[j].id);
    1436             :                         }
    1437           1 :                         GDKfree(results);
    1438             :                 }
    1439             :         }
    1440             : 
    1441         732 :         if (rcb) {
    1442           1 :                 GDKfree(rcb->context);
    1443           1 :                 GDKfree(rcb);
    1444             :         }
    1445             : 
    1446         732 :         if (mhdl)
    1447         726 :                 mapi_close_handle(mhdl);
    1448         733 :         MT_lock_unset(&c->lock);
    1449         733 :         return tmp;
    1450             : }
    1451             : 
    1452             : /**
    1453             :  * batload is a helper function to make transferring a BAT with RMTput
    1454             :  * more efficient.  It works by creating a BAT, and loading it with the
    1455             :  * data as comma separated values from the input stream, until an empty
    1456             :  * line is read.  The given size argument is taken as a hint only, and
    1457             :  * is not enforced to match the number of rows read.
    1458             :  */
    1459             : static str
    1460        1402 : RMTbatload(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1461             : {
    1462        1402 :         ValPtr v;
    1463        1402 :         int t;
    1464        1402 :         int size;
    1465        1402 :         ptr r;
    1466        1402 :         size_t s;
    1467        1402 :         BAT *b;
    1468        1402 :         size_t len;
    1469        1402 :         char *var;
    1470        1402 :         str msg = MAL_SUCCEED;
    1471        1402 :         bstream *fdin = cntxt->fdin;
    1472             : 
    1473        1402 :         v = &stk->stk[pci->argv[0]];  /* return */
    1474        1402 :         t = getArgType(mb, pci, 1);     /* tail type */
    1475        1402 :         size = *getArgReference_int(stk, pci, 2);       /* size */
    1476             : 
    1477        1402 :         b = COLnew(0, t, size, TRANSIENT);
    1478        1402 :         if (b == NULL)
    1479           0 :                 throw(MAL, "remote.load", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1480             : 
    1481             :         /* grab the input stream and start reading */
    1482        1402 :         fdin->eof = false;
    1483        1402 :         len = fdin->pos;
    1484        1402 :         while (len < fdin->len || bstream_next(fdin) > 0) {
    1485             :                 /* newline hunting (how spartan) */
    1486        1402 :                 for (len = fdin->pos; len < fdin->len && fdin->buf[len] != '\n';
    1487           0 :                          len++) ;
    1488             :                 /* unterminated line, request more */
    1489        1402 :                 if (fdin->buf[len] != '\n')
    1490           0 :                         continue;
    1491             :                 /* empty line, end of input */
    1492        1402 :                 if (fdin->pos == len) {
    1493        1402 :                         if (isa_block_stream(fdin->s)) {
    1494        1402 :                                 ssize_t n = bstream_next(fdin);
    1495        1402 :                                 if (n)
    1496           0 :                                         msg = createException(MAL, "remote.load",
    1497             :                                                                                   SQLSTATE(HY013)
    1498             :                                                                                   "Unexpected return from remote");
    1499             :                         }
    1500             :                         break;
    1501             :                 }
    1502           0 :                 fdin->buf[len] = '\0';       /* kill \n */
    1503           0 :                 var = &fdin->buf[fdin->pos];
    1504             :                 /* skip over this line */
    1505           0 :                 fdin->pos = ++len;
    1506             : 
    1507           0 :                 s = 0;
    1508           0 :                 r = NULL;
    1509           0 :                 if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
    1510           0 :                         BUNappend(b, r, false) != GDK_SUCCEED) {
    1511           0 :                         BBPreclaim(b);
    1512           0 :                         GDKfree(r);
    1513           0 :                         throw(MAL, "remote.get", GDK_EXCEPTION);
    1514             :                 }
    1515           0 :                 GDKfree(r);
    1516             :         }
    1517             : 
    1518        1402 :         *v = (ValRecord) {
    1519        1402 :                 .val.bval = b->batCacheid,
    1520             :                 .bat = true,
    1521        1402 :                 .vtype = b->ttype,
    1522             :         };
    1523        1402 :         BBPkeepref(b);
    1524             : 
    1525        1402 :         return msg;
    1526             : }
    1527             : 
    1528             : /**
    1529             :  * dump given BAT to stream
    1530             :  */
    1531             : static str
    1532        1400 : RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1533             : {
    1534        1400 :         bat bid = *getArgReference_bat(stk, pci, 1);
    1535        1400 :         BAT *b = BBPquickdesc(bid), *v = b;
    1536        1400 :         char sendtheap = 0, sendtvheap = 0;
    1537             : 
    1538        1400 :         (void) mb;
    1539        1400 :         (void) stk;
    1540        1400 :         (void) pci;
    1541             : 
    1542        1400 :         if (b == NULL)
    1543           0 :                 throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_UNDEFINED);
    1544             : 
    1545        1400 :         if (BBPfix(bid) <= 0)
    1546           0 :                 throw(MAL, "remote.bincopyto", MAL_MALLOC_FAIL);
    1547             : 
    1548        1400 :         sendtheap = b->ttype != TYPE_void;
    1549        1400 :         sendtvheap = sendtheap && b->tvheap;
    1550          90 :         if (sendtvheap && VIEWvtparent(b)
    1551          41 :                 && BATcount(b) < BATcount(BBP_desc(VIEWvtparent(b)))) {
    1552          17 :                 if ((b = BATdescriptor(bid)) == NULL) {
    1553           0 :                         BBPunfix(bid);
    1554           0 :                         throw(MAL, "remote.bincopyto", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
    1555             :                 }
    1556          17 :                 v = COLcopy(b, b->ttype, true, TRANSIENT);
    1557          17 :                 BBPunfix(b->batCacheid);
    1558          17 :                 if (v == NULL) {
    1559           0 :                         BBPunfix(bid);
    1560           0 :                         throw(MAL, "remote.bincopyto", GDK_EXCEPTION);
    1561             :                 }
    1562             :         }
    1563             : 
    1564        1400 :         BATiter vi = bat_iterator(v);
    1565        1490 :         mnstr_printf(cntxt->fdout, /*JSON*/ "{"
    1566             :                                  "\"version\":1,"
    1567             :                                  "\"ttype\":%d,"
    1568             :                                  "\"hseqbase\":" OIDFMT ","
    1569             :                                  "\"tseqbase\":" OIDFMT ","
    1570             :                                  "\"tsorted\":%d,"
    1571             :                                  "\"trevsorted\":%d,"
    1572             :                                  "\"tkey\":%d,"
    1573             :                                  "\"tnonil\":%d,"
    1574             :                                  "\"tdense\":%d,"
    1575             :                                  "\"size\":" BUNFMT ","
    1576             :                                  "\"tailsize\":%zu,"
    1577             :                                  "\"theapsize\":%zu"
    1578             :                                  "}\n",
    1579        1400 :                                  vi.type,
    1580             :                                  v->hseqbase, v->tseqbase,
    1581        1400 :                                  vi.sorted, vi.revsorted,
    1582        1400 :                                  vi.key,
    1583        1400 :                                  vi.nonil,
    1584        1400 :                                  BATtdensebi(&vi),
    1585             :                                  vi.count,
    1586        1370 :                                  sendtheap ? (size_t) vi.count << vi.shift : 0,
    1587          90 :                                  sendtvheap && vi.count > 0 ? vi.vhfree : 0);
    1588             : 
    1589        1400 :         if (sendtheap && vi.count > 0) {
    1590        1283 :                 mnstr_write(cntxt->fdout,    /* tail */
    1591        1283 :                                         vi.base, vi.count * vi.width, 1);
    1592        1283 :                 if (sendtvheap)
    1593          75 :                         mnstr_write(cntxt->fdout,    /* theap */
    1594          75 :                                                 vi.vh->base, vi.vhfree, 1);
    1595             :         }
    1596        1400 :         bat_iterator_end(&vi);
    1597             :         /* flush is done by the calling environment (MAL) */
    1598             : 
    1599        1400 :         if (b != v)
    1600          17 :                 BBPreclaim(v);
    1601             : 
    1602        1400 :         BBPunfix(bid);
    1603             : 
    1604        1400 :         return (MAL_SUCCEED);
    1605             : }
    1606             : 
    1607             : /**
    1608             :  * read from the input stream and give the BAT handle back to the caller
    1609             :  */
    1610             : static str
    1611           0 : RMTbincopyfrom(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1612             : {
    1613           0 :         BAT *b = NULL;
    1614           0 :         ValPtr v;
    1615           0 :         str err;
    1616             : 
    1617           0 :         (void) mb;
    1618             : 
    1619             :         /* We receive a normal line, which contains the JSON header, the
    1620             :          * rest is binary data directly on the stream.  We get the first
    1621             :          * line from the buffered stream we have here, and pass it on
    1622             :          * together with the raw stream we have. */
    1623           0 :         cntxt->fdin->eof = false; /* in case it was before */
    1624           0 :         if (bstream_next(cntxt->fdin) <= 0)
    1625           0 :                 throw(MAL, "remote.bincopyfrom", "expected JSON header");
    1626             : 
    1627           0 :         cntxt->fdin->buf[cntxt->fdin->len] = '\0';
    1628           0 :         err = RMTinternalcopyfrom(&b,
    1629           0 :                         &cntxt->fdin->buf[cntxt->fdin->pos], cntxt->fdin->s, true, int128 /* library should be compatible */);
    1630             :         /* skip the JSON line */
    1631           0 :         cntxt->fdin->pos = ++cntxt->fdin->len;
    1632           0 :         if (err !=MAL_SUCCEED)
    1633             :                 return (err);
    1634             : 
    1635           0 :         v = &stk->stk[pci->argv[0]];
    1636           0 :         *v = (ValRecord) {
    1637           0 :                 .val.bval = b->batCacheid,
    1638             :                 .bat = true,
    1639           0 :                 .vtype = b->ttype,
    1640             :         };
    1641           0 :         BBPkeepref(b);
    1642             : 
    1643           0 :         return (MAL_SUCCEED);
    1644             : }
    1645             : 
    1646             : /**
    1647             :  * bintype identifies the system on its binary profile.  This is mainly
    1648             :  * used to determine if BATs can be sent binary across.
    1649             :  */
    1650             : static str
    1651         186 : RMTbintype(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1652             : {
    1653         186 :         (void)mb;
    1654         186 :         (void)stk;
    1655         186 :         (void)pci;
    1656             : 
    1657             :         /* TODO bintype should include the (bin) protocol version */
    1658         186 :         mnstr_printf(cntxt->fdout, "[ %d ]\n", localtype);
    1659         186 :         return(MAL_SUCCEED);
    1660             : }
    1661             : 
    1662             : /**
    1663             :  * Returns whether the underlying connection is still connected or not.
    1664             :  * Best effort implementation on top of mapi using a ping.
    1665             :  */
    1666             : static str
    1667           0 : RMTisalive(int *ret, const char *const *conn)
    1668             : {
    1669           0 :         str tmp;
    1670           0 :         connection c;
    1671             : 
    1672           0 :         if (*conn == NULL || strcmp(*conn, (str) str_nil) == 0)
    1673           0 :                 throw(ILLARG, "remote.get",
    1674             :                           ILLEGAL_ARGUMENT ": connection name is NULL or nil");
    1675             : 
    1676             :         /* lookup conn, set c if valid */
    1677           0 :         rethrow("remote.get", tmp, RMTfindconn(&c, *conn));
    1678             : 
    1679           0 :         *ret = 0;
    1680           0 :         if (mapi_is_connected(c->mconn) && mapi_ping(c->mconn) == MOK)
    1681           0 :                 *ret = 1;
    1682             : 
    1683             :         return MAL_SUCCEED;
    1684             : }
    1685             : 
    1686             : // This is basically a no op
    1687             : static str
    1688         362 : RMTregisterSupervisor(int *ret, const char *const *sup_uuid, const char *const *query_uuid)
    1689             : {
    1690         362 :         (void) sup_uuid;
    1691         362 :         (void) query_uuid;
    1692             : 
    1693         362 :         *ret = 0;
    1694         362 :         return MAL_SUCCEED;
    1695             : }
    1696             : 
    1697             : #include "mel.h"
    1698             : mel_func remote_init_funcs[] = {
    1699             :  command("remote", "epilogue", RMTepilogue, false, "release the resources held by the remote module", args(1,1, arg("",void))),
    1700             :  command("remote", "resolve", RMTresolve, false, "resolve a pattern against Merovingian and return the URIs", args(1,2, batarg("",str),arg("pattern",str))),
    1701             :  pattern("remote", "connect", RMTconnect, false, "returns a newly created connection for uri, using user name and password", args(1,5, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str))),
    1702             :  command("remote", "connect", RMTconnectScen, false, "returns a newly created connection for uri, using user name, password and scenario", args(1,6, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str),arg("columnar",bit))),
    1703             :  command("remote", "disconnect", RMTdisconnect, false, "disconnects the connection pointed to by handle (received from a call to connect()", args(1,2, arg("",void),arg("conn",str))),
    1704             :  pattern("remote", "get", RMTget, false, "retrieves a copy of remote object ident", args(1,3, argany("",0),arg("conn",str),arg("ident",str))),
    1705             :  pattern("remote", "put", RMTput, false, "copies object to the remote site and returns its identifier", args(1,3, arg("",str),arg("conn",str),argany("object",0))),
    1706             :  pattern("remote", "register", RMTregister, false, "register <mod>.<fcn> at the remote site", args(1,4, arg("",str),arg("conn",str),arg("mod",str),arg("fcn",str))),
    1707             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and returns the handle to its result", args(1,4, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
    1708             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and returns the handle to its result", args(1,5, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
    1709             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and applying function pointer rcb as callback to handle any results.", args(0,5, arg("conn",str),arg("mod",str),arg("func",str),arg("rcb",ptr), vararg("",str))),
    1710             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and ignoring results.", args(0,4, arg("conn",str),arg("mod",str),arg("func",str), vararg("",str))),
    1711             :  command("remote", "isalive", RMTisalive, false, "check if conn is still valid and connected", args(1,2, arg("",int),arg("conn",str))),
    1712             :  pattern("remote", "batload", RMTbatload, false, "create a BAT of the given type and size, and load values from the input stream", args(1,3, batargany("",1),argany("tt",1),arg("size",int))),
    1713             :  pattern("remote", "batbincopy", RMTbincopyto, false, "dump BAT b in binary form to the stream", args(1,2, arg("",void),batargany("b",0))),
    1714             :  pattern("remote", "batbincopy", RMTbincopyfrom, false, "store the binary BAT data in the BBP and return as BAT", args(1,1, batargany("",0))),
    1715             :  pattern("remote", "bintype", RMTbintype, false, "print the binary type of this mserver5", args(1,1, arg("",void))),
    1716             :  command("remote", "register_supervisor", RMTregisterSupervisor, false, "Register the supervisor uuid at a remote site", args(1,3, arg("",int),arg("sup_uuid",str),arg("query_uuid",str))),
    1717             :  { .imp=NULL }
    1718             : };
    1719             : #include "mal_import.h"
    1720             : #ifdef _MSC_VER
    1721             : #undef read
    1722             : #pragma section(".CRT$XCU",read)
    1723             : #endif
    1724         321 : LIB_STARTUP_FUNC(init_remote_mal)
    1725         321 : { mal_module2("remote", NULL, remote_init_funcs, RMTprelude, NULL); }

Generated by: LCOV version 1.14