Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * @a N.J. Nes P. Boncz, S. Mullender, M. Kersten
15 : * @v 1.1
16 : * @+ MAPI interface
17 : * The complete Mapi library is available to setup
18 : * communication with another Mserver.
19 : *
20 : * Clients may initialize a private listener to implement
21 : * specific services. For example, in an OLTP environment
22 : * it may make sense to have a listener for each transaction
23 : * type, which simply parses a sequence of transaction parameters.
24 : *
25 : * Authorization of access to the server is handled as part
26 : * of the client record initialization phase.
27 : *
28 : * This library internally uses pointer handles, which we replace with
29 : * an index in a locally maintained table. It provides a handle
30 : * to easily detect havoc clients.
31 : *
32 : * A cleaner and simplier interface for distributed processing is available in
33 : * the module remote.
34 : */
35 : #include "monetdb_config.h"
36 : #include "mal_client.h"
37 : #include "mal_session.h"
38 : #include "mal_exception.h"
39 : #include "mal_interpreter.h"
40 : #include "mal_authorize.h"
41 : #include "mal_internal.h"
42 : #include "msabaoth.h"
43 : #include "mcrypt.h"
44 : #include "stream.h"
45 : #include "streams.h" /* for Stream */
46 : #include <sys/types.h>
47 : #include "stream_socket.h"
48 : #include "mapi.h"
49 : #include "mutils.h"
50 :
51 : #if defined(HAVE_GETENTROPY) && defined(HAVE_SYS_RANDOM_H)
52 : #include <sys/random.h>
53 : #endif
54 :
55 : #ifdef HAVE_SYS_SOCKET_H
56 : # include <sys/select.h>
57 : # include <sys/socket.h>
58 : # include <unistd.h> /* gethostname() */
59 : # include <netinet/in.h> /* hton and ntoh */
60 : # include <arpa/inet.h> /* addr_in */
61 : #else /* UNIX specific */
62 : #ifdef HAVE_WINSOCK_H /* Windows specific */
63 : # include <winsock.h>
64 : #endif
65 : #endif
66 : #ifdef HAVE_SYS_UN_H
67 : # include <sys/un.h>
68 : #endif
69 : #ifdef HAVE_NETDB_H
70 : # include <netdb.h>
71 : # include <netinet/in.h>
72 : #endif
73 : #ifdef HAVE_POLL_H
74 : #include <poll.h>
75 : #endif
76 : #ifdef HAVE_SYS_UIO_H
77 : # include <sys/uio.h>
78 : #endif
79 : #ifdef HAVE_FCNTL_H
80 : #include <fcntl.h>
81 : #endif
82 :
83 : #ifdef HAVE_SOCKLEN_T
84 : #define SOCKLEN socklen_t
85 : #else
86 : #define SOCKLEN int
87 : #endif
88 :
89 : #if !defined(HAVE_ACCEPT4) || !defined(SOCK_CLOEXEC)
90 : #define accept4(sockfd, addr, addrlen, flags) accept(sockfd, addr, addrlen)
91 : #endif
92 :
93 : #define SERVERMAXUSERS SOMAXCONN
94 :
95 : static const char seedChars[] ={
96 : 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
97 : 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
98 : 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
99 : 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
100 : '1', '2', '3', '4', '5', '6', '7', '8', '9', '0'
101 : };
102 :
103 :
104 : #if !defined(HAVE_GETENTROPY) && defined(HAVE_RAND_S)
105 : static inline bool
106 : gen_win_challenge(char *buf, size_t size)
107 : {
108 : for (size_t i = 0; i < size;) {
109 : unsigned int r;
110 : if (rand_s(&r) != 0)
111 : return false;
112 : for (size_t j = 0; j < sizeof(size_t) && i < size; j++) {
113 : buf[i++] = seedChars[(r & 0xFF) % 62];
114 : r >>= 8;
115 : }
116 : }
117 : return true;
118 : }
119 : #endif
120 :
121 : static void
122 38151 : generateChallenge(str buf, int min, int max)
123 : {
124 38151 : size_t size;
125 38151 : size_t i;
126 :
127 : #ifdef __COVERITY__
128 : /* hide rand() calls from analysis */
129 : size = (min + max) / 2;
130 : for (i = 0; i < size; i++)
131 : buf[i] = seedChars[i % 62];
132 : buf[size] = '\0';
133 : #else
134 : /* don't seed the randomiser here, or you get the same challenge
135 : * during the same second */
136 : #if defined(HAVE_GETENTROPY)
137 38151 : if (getentropy(&size, sizeof(size)) < 0)
138 : #elif defined(HAVE_RAND_S)
139 : unsigned int r;
140 : if (rand_s(&r) == 0)
141 : size = (size_t) r;
142 : else
143 : #endif
144 0 : size = rand();
145 38151 : size = (size % (max - min)) + min;
146 : #if defined(HAVE_GETENTROPY)
147 38151 : if (getentropy(buf, size) == 0)
148 400931 : for (i = 0; i < size; i++)
149 362780 : buf[i] = seedChars[((unsigned char *) buf)[i] % 62];
150 : else
151 : #elif defined(HAVE_RAND_S)
152 : if (!gen_win_challenge(buf, size))
153 : #endif
154 0 : for (i = 0; i < size; i++)
155 0 : buf[i] = seedChars[rand() % 62];
156 38151 : buf[size] = '\0';
157 : #endif
158 38151 : }
159 :
160 : struct challengedata {
161 : stream *in;
162 : stream *out;
163 : char challenge[13];
164 : };
165 :
166 : static str SERVERsetAlias(void *ret, int *key, str *dbalias);
167 :
168 : static void
169 38145 : doChallenge(void *data)
170 : {
171 38145 : char *buf = GDKmalloc(BLOCK + 1);
172 38126 : char challenge[13];
173 :
174 38126 : stream *fdin = ((struct challengedata *) data)->in;
175 38126 : stream *fdout = ((struct challengedata *) data)->out;
176 38126 : bstream *bs;
177 38126 : ssize_t len = 0;
178 38126 : protocol_version protocol = PROTOCOL_9;
179 38126 : size_t buflen = BLOCK;
180 :
181 38126 : MT_thread_setworking("challenging client");
182 : #ifdef _MSC_VER
183 : srand((unsigned int) GDKusec());
184 : #endif
185 38144 : memcpy(challenge, ((struct challengedata *) data)->challenge,
186 : sizeof(challenge));
187 38144 : GDKfree(data);
188 38151 : if (buf == NULL) {
189 0 : TRC_ERROR(MAL_SERVER, MAL_MALLOC_FAIL "\n");
190 0 : close_stream(fdin);
191 0 : close_stream(fdout);
192 0 : return;
193 : }
194 :
195 : /* Send the challenge over the block stream
196 : * We can do binary transfers, and we can interrupt queries using
197 : * out-of-band messages */
198 38151 : mnstr_printf(fdout, "%s:mserver:9:%s:%s:%s:sql=%d:BINARY=1:OOBINTR=1:",
199 : challenge, mcrypt_getHashAlgorithms(),
200 : #ifdef WORDS_BIGENDIAN
201 : "BIG",
202 : #else
203 : "LIT",
204 : #endif
205 : MONETDB5_PASSWDHASH, MAPI_HANDSHAKE_OPTIONS_LEVEL);
206 38136 : mnstr_flush(fdout, MNSTR_FLUSH_DATA);
207 : /* get response */
208 38146 : if ((len = mnstr_read_block(fdin, buf, 1, BLOCK)) < 0) {
209 : /* the client must have gone away, so no reason to write anything */
210 0 : close_stream(fdin);
211 0 : close_stream(fdout);
212 0 : GDKfree(buf);
213 0 : return;
214 : }
215 38133 : buf[len] = 0;
216 :
217 38133 : bs = bstream_create(fdin, 128 * BLOCK);
218 :
219 38146 : if (bs == NULL) {
220 0 : mnstr_printf(fdout, "!allocation failure in the server\n");
221 0 : close_stream(fdin);
222 0 : close_stream(fdout);
223 0 : GDKfree(buf);
224 0 : GDKsyserror("SERVERlisten:" MAL_MALLOC_FAIL);
225 0 : return;
226 : }
227 38146 : bs->eof = true;
228 38146 : MSscheduleClient(buf, challenge, bs, fdout, protocol, buflen);
229 : }
230 :
231 : static ATOMIC_TYPE nlistener = ATOMIC_VAR_INIT(0); /* nr of listeners */
232 : static ATOMIC_TYPE serveractive = ATOMIC_VAR_INIT(0);
233 : static ATOMIC_TYPE serverexiting = ATOMIC_VAR_INIT(0); /* listeners should exit */
234 :
235 : static void
236 328 : SERVERlistenThread(SOCKET *Sock)
237 : {
238 328 : char *msg = NULL;
239 328 : int retval;
240 328 : SOCKET socks[3] = { Sock[0], Sock[1], Sock[2] };
241 328 : struct challengedata *data;
242 328 : MT_Id tid;
243 328 : stream *s;
244 328 : int i;
245 :
246 328 : GDKfree(Sock);
247 :
248 328 : ATOMIC_INC(&nlistener);
249 :
250 99891 : do {
251 99891 : SOCKET msgsock = INVALID_SOCKET;
252 : #ifdef HAVE_POLL
253 99891 : struct pollfd pfd[3];
254 99891 : nfds_t npfd;
255 99891 : npfd = 0;
256 399564 : for (i = 0; i < 3; i++) {
257 299673 : if (socks[i] != INVALID_SOCKET)
258 199782 : pfd[npfd++] = (struct pollfd) {
259 : .fd = socks[i],
260 : .events = POLLIN
261 : };
262 : }
263 : /* Wait up to 0.1 seconds (0.01 if testing) */
264 99891 : retval = poll(pfd, npfd,
265 99891 : ATOMIC_GET(&GDKdebug) & FORCEMITOMASK ? 10 : 100);
266 99891 : if (retval == -1 && errno == EINTR)
267 61412 : continue;
268 : #else
269 : fd_set fds;
270 : FD_ZERO(&fds);
271 : /* temporarily use msgsock to record the highest socket fd */
272 : for (i = 0; i < 3; i++) {
273 : if (socks[i] != INVALID_SOCKET) {
274 : FD_SET(socks[i], &fds);
275 : if (msgsock == INVALID_SOCKET || socks[i] > msgsock)
276 : msgsock = socks[i];
277 : }
278 : }
279 : /* Wait up to 0.1 seconds (0.01 if testing) */
280 : struct timeval tv = (struct timeval) {
281 : .tv_usec = ATOMIC_GET(&GDKdebug) & FORCEMITOMASK ? 10000 : 100000,
282 : };
283 :
284 : retval = select((int) msgsock + 1, &fds, NULL, NULL, &tv);
285 : msgsock = INVALID_SOCKET;
286 : #endif
287 99891 : if (ATOMIC_GET(&serverexiting) || GDKexiting())
288 : break;
289 99563 : if (retval == 0) {
290 : /* nothing interesting has happened */
291 61412 : continue;
292 : }
293 38151 : if (retval == SOCKET_ERROR) {
294 0 : if (
295 : #ifdef _MSC_VER
296 : WSAGetLastError() != WSAEINTR
297 : #else
298 0 : errno != EINTR
299 : #endif
300 : ) {
301 0 : msg = "select failed";
302 0 : goto error;
303 : }
304 0 : continue;
305 : }
306 73325 : bool isusock = false;
307 : #ifdef HAVE_POLL
308 73325 : for (i = 0; i < (int) npfd; i++) {
309 73325 : if (pfd[i].revents & POLLIN) {
310 38151 : msgsock = pfd[i].fd;
311 38151 : isusock = msgsock == socks[2];
312 38151 : break;
313 : }
314 : }
315 : #else
316 : for (i = 0; i < 3; i++) {
317 : if (socks[i] != INVALID_SOCKET && FD_ISSET(socks[i], &fds)) {
318 : msgsock = socks[i];
319 : isusock = i == 2;
320 : break;
321 : }
322 : }
323 : #endif
324 38151 : if (msgsock == INVALID_SOCKET)
325 0 : continue;
326 :
327 38151 : if ((msgsock = accept4(msgsock, NULL, NULL, SOCK_CLOEXEC)) == INVALID_SOCKET) {
328 0 : if (
329 : #ifdef _MSC_VER
330 : WSAGetLastError() != WSAEINTR
331 : #else
332 0 : errno != EINTR
333 : #endif
334 0 : || !ATOMIC_GET(&serveractive)) {
335 0 : msg = "accept failed";
336 0 : goto error;
337 : }
338 0 : continue;
339 : }
340 : #if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
341 : (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
342 : #endif
343 : #ifdef HAVE_SYS_UN_H
344 38151 : if (isusock) {
345 35174 : struct msghdr msgh;
346 35174 : struct iovec iov;
347 35174 : char buf[1];
348 35174 : int rv;
349 35174 : char ccmsg[CMSG_SPACE(sizeof(int))];
350 35174 : struct cmsghdr *cmsg;
351 :
352 : /* BEWARE: unix domain sockets have a slightly different
353 : * behaviour initialy than normal sockets, because we can
354 : * send filedescriptors or credentials with them. To do so,
355 : * we need to use sendmsg/recvmsg, which operates on a bare
356 : * socket. Unfortunately we *have* to send something, so it
357 : * is one byte that can optionally carry the ancillary data.
358 : * This byte is at this moment defined to contain a character:
359 : * '0' - there is no ancillary data
360 : * '1' - ancillary data for passing a file descriptor
361 : * The future may introduce a state for passing credentials.
362 : * Any unknown character must be interpreted as some unknown
363 : * action, and hence not supported by the server. */
364 :
365 35174 : iov.iov_base = buf;
366 35174 : iov.iov_len = 1;
367 :
368 35174 : msgh.msg_name = 0;
369 35174 : msgh.msg_namelen = 0;
370 35174 : msgh.msg_iov = &iov;
371 35174 : msgh.msg_iovlen = 1;
372 35174 : msgh.msg_flags = 0;
373 35174 : msgh.msg_control = ccmsg;
374 35174 : msgh.msg_controllen = sizeof(ccmsg);
375 :
376 35174 : rv = recvmsg(msgsock, &msgh, 0);
377 35174 : if (rv == -1) {
378 0 : closesocket(msgsock);
379 0 : continue;
380 : }
381 :
382 35174 : switch (buf[0]) {
383 : case '0':
384 : /* nothing special, nothing to do */
385 : break;
386 0 : case '1':
387 : {
388 0 : int *c_d;
389 : /* filedescriptor, put it in place of msgsock */
390 0 : cmsg = CMSG_FIRSTHDR(&msgh);
391 0 : (void) shutdown(msgsock, SHUT_WR);
392 0 : closesocket(msgsock);
393 0 : if (!cmsg || cmsg->cmsg_type != SCM_RIGHTS) {
394 0 : TRC_CRITICAL(MAL_SERVER,
395 : "Expected file descriptor, but received something else\n");
396 0 : continue;
397 : }
398 : /* HACK to avoid
399 : * "dereferencing type-punned pointer will break strict-aliasing rules"
400 : * (with gcc 4.5.1 on Fedora 14)
401 : */
402 0 : c_d = (int *) CMSG_DATA(cmsg);
403 0 : msgsock = *c_d;
404 : }
405 0 : break;
406 0 : default:
407 : /* some unknown state */
408 0 : closesocket(msgsock);
409 0 : TRC_CRITICAL(MAL_SERVER,
410 : "Unknown command type in first byte\n");
411 0 : continue;
412 : }
413 : }
414 : #endif
415 :
416 38151 : data = GDKzalloc(sizeof(*data));
417 38151 : if (data == NULL) {
418 0 : closesocket(msgsock);
419 0 : TRC_ERROR(MAL_SERVER, MAL_MALLOC_FAIL "\n");
420 0 : continue;
421 : }
422 38151 : data->in = socket_rstream(msgsock, "Server read");
423 38151 : if (data->in == NULL) {
424 0 : stream_alloc_fail:
425 0 : mnstr_destroy(data->in);
426 0 : mnstr_destroy(data->out);
427 0 : GDKfree(data);
428 0 : closesocket(msgsock);
429 0 : TRC_ERROR(MAL_SERVER, "Cannot allocate stream: %s\n",
430 : mnstr_peek_error(NULL));
431 0 : continue;
432 : }
433 38151 : data->out = socket_wstream(msgsock, "Server write");
434 38151 : if (data->out == NULL) {
435 0 : goto stream_alloc_fail;
436 : }
437 38151 : s = block_stream(data->in);
438 38151 : if (s == NULL) {
439 0 : goto stream_alloc_fail;
440 : }
441 38151 : data->in = s;
442 38151 : s = block_stream(data->out);
443 38151 : if (s == NULL) {
444 0 : goto stream_alloc_fail;
445 : }
446 38151 : data->out = s;
447 :
448 : /* generate the challenge string */
449 38151 : generateChallenge(data->challenge, 8, 12);
450 :
451 38151 : if (MT_create_thread(&tid, doChallenge, data, MT_THR_DETACHED, "clientXXXX") < 0) {
452 0 : mnstr_destroy(data->in);
453 0 : mnstr_destroy(data->out);
454 0 : GDKfree(data);
455 0 : closesocket(msgsock);
456 0 : TRC_ERROR(MAL_SERVER, "Cannot fork new client thread\n");
457 0 : continue;
458 : }
459 99563 : } while (!ATOMIC_GET(&serverexiting) && !GDKexiting());
460 0 : error:;
461 : #ifdef HAVE_SYS_UN_H
462 328 : const char *usockfile = GDKgetenv("mapi_usock");
463 656 : if (usockfile && MT_remove(usockfile) == -1 && errno != ENOENT)
464 0 : perror(usockfile);
465 : #endif
466 328 : ATOMIC_DEC(&nlistener);
467 1312 : for (i = 0; i < 3; i++)
468 984 : if (socks[i] != INVALID_SOCKET)
469 656 : closesocket(socks[i]);
470 328 : if (msg)
471 0 : TRC_CRITICAL(MAL_SERVER, "Terminating listener: %s\n", msg);
472 328 : return;
473 : }
474 :
475 : #ifdef _MSC_VER
476 : #define HOSTLEN int
477 : #else
478 : #define HOSTLEN size_t
479 : #endif
480 :
481 : static char *
482 328 : start_listen(SOCKET *sockp, int *portp, const char *listenaddr,
483 : char *host, size_t hostlen, int maxusers)
484 : {
485 328 : struct addrinfo *result = NULL;
486 328 : struct addrinfo hints = {
487 : .ai_family = AF_INET6,
488 : .ai_flags = AI_PASSIVE | AI_NUMERICSERV,
489 : .ai_socktype = SOCK_STREAM,
490 : .ai_protocol = IPPROTO_TCP,
491 : };
492 328 : int e = 0;
493 328 : int ipv6_vs6only = -1;
494 328 : SOCKET sock = INVALID_SOCKET;
495 328 : const char *err;
496 328 : int nsock = 0;
497 328 : sockp[0] = sockp[1] = INVALID_SOCKET;
498 328 : host[0] = 0;
499 328 : if (listenaddr == NULL || strcmp(listenaddr, "localhost") == 0) {
500 0 : hints.ai_family = AF_INET6;
501 0 : hints.ai_flags |= AI_NUMERICHOST;
502 0 : ipv6_vs6only = 0;
503 0 : listenaddr = "::1";
504 0 : strcpy_len(host, "localhost", hostlen);
505 328 : } else if (strcmp(listenaddr, "all") == 0) {
506 328 : hints.ai_family = AF_INET6;
507 328 : ipv6_vs6only = 0;
508 328 : listenaddr = NULL;
509 0 : } else if (strcmp(listenaddr, "::") == 0) {
510 0 : hints.ai_family = AF_INET6;
511 0 : ipv6_vs6only = 1;
512 0 : listenaddr = NULL;
513 0 : } else if (strcmp(listenaddr, "0.0.0.0") == 0) {
514 0 : hints.ai_family = AF_INET;
515 0 : hints.ai_flags |= AI_NUMERICHOST;
516 0 : listenaddr = NULL;
517 0 : } else if (strcmp(listenaddr, "::1") == 0) {
518 0 : hints.ai_family = AF_INET6;
519 0 : hints.ai_flags |= AI_NUMERICHOST;
520 0 : ipv6_vs6only = 1;
521 0 : strcpy_len(host, "localhost", hostlen);
522 0 : } else if (strcmp(listenaddr, "127.0.0.1") == 0) {
523 0 : hints.ai_family = AF_INET;
524 0 : hints.ai_flags |= AI_NUMERICHOST;
525 0 : strcpy_len(host, "localhost", hostlen);
526 : } else {
527 0 : hints.ai_family = AF_INET6;
528 0 : ipv6_vs6only = 0;
529 : }
530 328 : char sport[8]; /* max "65535" */
531 328 : snprintf(sport, sizeof(sport), "%d", *portp);
532 656 : for (;;) { /* max twice */
533 656 : int check = getaddrinfo(listenaddr, sport, &hints, &result);
534 656 : if (check != 0) {
535 : #ifdef _MSC_VER
536 : err = wsaerror(WSAGetLastError());
537 : #else
538 0 : err = gai_strerror(check);
539 : #endif
540 0 : throw(IO, "mal_mapi.listen",
541 : OPERATION_FAILED ": cannot get address "
542 : "information for %s and port %s: %s",
543 0 : listenaddr ? listenaddr : hints.ai_family == AF_INET6 ? "::" : "0.0.0.0", sport, err);
544 : }
545 :
546 984 : for (struct addrinfo * rp = result; rp; rp = rp->ai_next) {
547 656 : sock = socket(rp->ai_family, rp->ai_socktype
548 : #ifdef SOCK_CLOEXEC
549 : | SOCK_CLOEXEC
550 : #endif
551 : , rp->ai_protocol);
552 656 : if (sock == INVALID_SOCKET) {
553 : #ifdef _MSC_VER
554 : e = WSAGetLastError();
555 : #else
556 328 : e = errno;
557 : #endif
558 328 : continue;
559 : }
560 : #if defined(HAVE_FCNTL) && !defined(SOCK_CLOEXEC)
561 : (void) fcntl(sock, F_SETFD, FD_CLOEXEC);
562 : #endif
563 328 : if (ipv6_vs6only >= 0)
564 0 : if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY,
565 : (const char *) &ipv6_vs6only,
566 : (SOCKLEN) sizeof(int)) == -1)
567 0 : perror("setsockopt IPV6_V6ONLY");
568 :
569 : /* do not reuse addresses for ephemeral (autosense) ports */
570 328 : if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
571 328 : (const char *) &(int) { 1 },
572 : (SOCKLEN) sizeof(int)) == SOCKET_ERROR) {
573 : #ifdef _MSC_VER
574 : e = WSAGetLastError();
575 : #else
576 0 : e = errno;
577 : #endif
578 0 : closesocket(sock);
579 0 : sock = INVALID_SOCKET;
580 0 : continue;
581 : }
582 328 : if ((e = bind(sock, rp->ai_addr, (SOCKLEN) rp->ai_addrlen)) != 0) {
583 : /* return value of 1 is currently undocumented, but
584 : * seems to occur when binding a port to an IPv4 socket
585 : * when the same port is already bound to an IPv6 socket
586 : * that already also listens to IPv4; in this case the
587 : * port that is actually bound to here is a different
588 : * one, and we don't want that, so we close the socket
589 : * without error (if bind returned SOCKET_ERROR, we do
590 : * report the error) */
591 0 : if (e == SOCKET_ERROR) {
592 : #ifdef _MSC_VER
593 : e = WSAGetLastError();
594 : #else
595 0 : e = errno;
596 : #endif
597 0 : } else if (nsock == 0) {
598 0 : assert(e == 1);
599 : e = 0;
600 : }
601 0 : closesocket(sock);
602 0 : sock = INVALID_SOCKET;
603 0 : continue;
604 : }
605 328 : if (listen(sock, maxusers) == SOCKET_ERROR) {
606 : #ifdef _MSC_VER
607 : e = WSAGetLastError();
608 : #else
609 0 : e = errno;
610 : #endif
611 0 : closesocket(sock);
612 0 : sock = INVALID_SOCKET;
613 0 : continue;
614 : }
615 328 : struct sockaddr_storage addr;
616 328 : SOCKLEN addrlen = (SOCKLEN) sizeof(addr);
617 328 : if (getsockname(sock, (struct sockaddr *) &addr, &addrlen) == SOCKET_ERROR) {
618 : #ifdef _MSC_VER
619 : e = WSAGetLastError();
620 : #else
621 0 : e = errno;
622 : #endif
623 0 : closesocket(sock);
624 0 : sock = INVALID_SOCKET;
625 0 : continue;
626 : }
627 328 : if (getnameinfo((struct sockaddr *) &addr, addrlen,
628 : NULL, (SOCKLEN) 0,
629 : sport, (SOCKLEN) sizeof(sport),
630 : NI_NUMERICSERV) == 0)
631 328 : *portp = (int) strtol(sport, NULL, 10);
632 328 : sockp[nsock++] = sock;
633 328 : break;
634 : }
635 656 : freeaddrinfo(result);
636 656 : if (ipv6_vs6only == 0) {
637 328 : ipv6_vs6only = -1;
638 328 : hints.ai_family = AF_INET;
639 328 : if (listenaddr && strcmp(listenaddr, "::1") == 0)
640 0 : listenaddr = "127.0.0.1";
641 : } else
642 : break;
643 : }
644 :
645 328 : if (nsock == 0) {
646 : #ifdef _MSC_VER
647 : err = wsaerror(e);
648 : #else
649 0 : err = GDKstrerror(e, (char[128]) { 0 }, 128);
650 : #endif
651 0 : throw(IO, "mal_mapi.listen", OPERATION_FAILED ": bind to "
652 : "stream socket on address %s and port %s failed: %s",
653 0 : listenaddr ? listenaddr : hints.ai_family == AF_INET6 ? "::" : "0.0.0.0", sport, err);
654 : }
655 328 : if (host[0] == 0)
656 328 : gethostname(host, (HOSTLEN) hostlen);
657 : return NULL;
658 : }
659 :
660 : static str
661 328 : SERVERlisten(int port, const char *usockfile, int maxusers)
662 : {
663 328 : SOCKET socks[3];
664 328 : SOCKET *psock;
665 : #ifdef HAVE_SYS_UN_H
666 328 : struct sockaddr_un userver;
667 328 : char *usockfilenew = NULL;
668 : #endif
669 328 : SOCKLEN length = 0;
670 328 : MT_Id pid;
671 328 : str buf;
672 328 : char host[128] = "";
673 :
674 : /* early way out, we do not want to listen on any port when running in embedded mode */
675 328 : if (GDKgetenv_istrue("mapi_disable")) {
676 : return MAL_SUCCEED;
677 : }
678 :
679 328 : const char *listenaddr = port < 0 ? "none" : GDKgetenv("mapi_listenaddr");
680 :
681 656 : if (strNil(usockfile) || *usockfile == '\0') {
682 0 : usockfile = NULL;
683 : #ifndef HAVE_SYS_UN_H
684 : } else {
685 : throw(IO, "mal_mapi.listen",
686 : OPERATION_FAILED ": UNIX domain sockets are not supported");
687 : #endif
688 : }
689 328 : maxusers = (maxusers ? maxusers : SERVERMAXUSERS);
690 :
691 328 : if (listenaddr && strcmp(listenaddr, "none") == 0 && usockfile == NULL) {
692 0 : throw(ILLARG, "mal_mapi.listen",
693 : OPERATION_FAILED ": no port or socket file specified");
694 : }
695 :
696 328 : if (port > 65535) {
697 0 : throw(ILLARG, "mal_mapi.listen",
698 : OPERATION_FAILED ": port number should be between 0 and 65535");
699 : }
700 :
701 328 : socks[0] = socks[1] = socks[2] = INVALID_SOCKET;
702 :
703 328 : if (listenaddr == NULL || strcmp(listenaddr, "none") != 0) {
704 328 : char *msg = start_listen(socks, &port, listenaddr, host, sizeof(host),
705 : maxusers);
706 328 : if (msg != MAL_SUCCEED) {
707 0 : return msg;
708 : }
709 328 : char sport[10];
710 328 : snprintf(sport, sizeof(sport), "%d", port);
711 328 : if (GDKsetenv("mapi_port", sport) != GDK_SUCCEED) {
712 0 : for (int i = 0; i < 3; i++) {
713 0 : if (socks[i] != INVALID_SOCKET)
714 0 : closesocket(socks[i]);
715 : }
716 0 : throw(MAL, "mal_mapi.listen", GDK_EXCEPTION);
717 : }
718 : }
719 :
720 : #ifdef HAVE_SYS_UN_H
721 328 : if (usockfile) {
722 : /* prevent silent truncation, sun_path is typically around 108
723 : * chars long :/ */
724 328 : size_t ulen = strlen(usockfile);
725 328 : if (ulen >= sizeof(userver.sun_path)) {
726 0 : if (socks[0] != INVALID_SOCKET)
727 0 : closesocket(socks[0]);
728 0 : if (socks[1] != INVALID_SOCKET)
729 0 : closesocket(socks[1]);
730 0 : throw(MAL, "mal_mapi.listen",
731 : OPERATION_FAILED ": UNIX socket path too long: %s",
732 : usockfile);
733 : }
734 :
735 328 : socks[2] = socket(AF_UNIX, SOCK_STREAM
736 : #ifdef SOCK_CLOEXEC
737 : | SOCK_CLOEXEC
738 : #endif
739 : , 0);
740 328 : if (socks[2] == INVALID_SOCKET) {
741 : #ifdef _MSC_VER
742 : const char *err = wsaerror(WSAGetLastError());
743 : #else
744 0 : const char *err = GDKstrerror(errno, (char[128]) { 0 }, 128);
745 : #endif
746 0 : if (socks[0] != INVALID_SOCKET)
747 0 : closesocket(socks[0]);
748 0 : if (socks[1] != INVALID_SOCKET)
749 0 : closesocket(socks[1]);
750 0 : throw(IO, "mal_mapi.listen",
751 : OPERATION_FAILED ": creation of UNIX socket failed: %s", err);
752 : }
753 : #if !defined(SOCK_CLOEXEC) && defined(HAVE_FCNTL)
754 : (void) fcntl(socks[2], F_SETFD, FD_CLOEXEC);
755 : #endif
756 :
757 328 : userver.sun_family = AF_UNIX;
758 328 : const char *p;
759 328 : if ((p = strstr(usockfile, "${PORT}")) != NULL) {
760 328 : usockfilenew = GDKmalloc(ulen + 1);
761 : /* note, "${PORT}" is longer than the longest possible decimal
762 : * representation of a port number ("65535") */
763 328 : if (usockfilenew) {
764 328 : snprintf(usockfilenew, ulen + 1,
765 328 : "%.*s%d%s", (int) (p - usockfile), usockfile,
766 : port < 0 ? 0 : port, p + 7);
767 328 : usockfile = usockfilenew;
768 328 : ulen = strlen(usockfile);
769 : }
770 : }
771 328 : memcpy(userver.sun_path, usockfile, ulen + 1);
772 328 : length = (SOCKLEN) sizeof(userver);
773 328 : if (MT_remove(usockfile) == -1 && errno != ENOENT) {
774 0 : char *e = createException(IO, "mal_mapi.listen",
775 : OPERATION_FAILED
776 : ": remove UNIX socket file: %s",
777 0 : GDKstrerror(errno, (char[128]) { 0 }, 128));
778 0 : if (socks[0] != INVALID_SOCKET)
779 0 : closesocket(socks[0]);
780 0 : if (socks[1] != INVALID_SOCKET)
781 0 : closesocket(socks[1]);
782 0 : closesocket(socks[2]);
783 0 : if (usockfilenew)
784 0 : GDKfree(usockfilenew);
785 0 : return e;
786 : }
787 328 : if (bind(socks[2], (struct sockaddr *) &userver, length) == SOCKET_ERROR) {
788 : #ifdef _MSC_VER
789 : const char *err = wsaerror(WSAGetLastError());
790 : #else
791 0 : const char *err = GDKstrerror(errno, (char[128]) { 0 }, 128);
792 : #endif
793 0 : if (socks[0] != INVALID_SOCKET)
794 0 : closesocket(socks[0]);
795 0 : if (socks[1] != INVALID_SOCKET)
796 0 : closesocket(socks[1]);
797 0 : closesocket(socks[2]);
798 0 : (void) MT_remove(usockfile);
799 0 : buf = createException(IO, "mal_mapi.listen",
800 : OPERATION_FAILED
801 : ": binding to UNIX socket file %s failed: %s",
802 : usockfile, err);
803 0 : if (usockfilenew)
804 0 : GDKfree(usockfilenew);
805 0 : return buf;
806 : }
807 328 : if (listen(socks[2], maxusers) == SOCKET_ERROR) {
808 : #ifdef _MSC_VER
809 : const char *err = wsaerror(WSAGetLastError());
810 : #else
811 0 : const char *err = GDKstrerror(errno, (char[128]) { 0 }, 128);
812 : #endif
813 0 : if (socks[0] != INVALID_SOCKET)
814 0 : closesocket(socks[0]);
815 0 : if (socks[1] != INVALID_SOCKET)
816 0 : closesocket(socks[1]);
817 0 : closesocket(socks[2]);
818 0 : (void) MT_remove(usockfile);
819 0 : buf = createException(IO, "mal_mapi.listen",
820 : OPERATION_FAILED
821 : ": setting UNIX socket file %s to listen failed: %s",
822 : usockfile, err);
823 0 : if (usockfilenew)
824 0 : GDKfree(usockfilenew);
825 0 : return buf;
826 : }
827 328 : if (GDKsetenv("mapi_usock", usockfile) != GDK_SUCCEED) {
828 0 : for (int i = 0; i < 3; i++) {
829 0 : if (socks[i] != INVALID_SOCKET)
830 0 : closesocket(socks[i]);
831 : }
832 0 : throw(MAL, "mal_mapi.listen", GDK_EXCEPTION);
833 : }
834 : }
835 : #endif
836 :
837 : /* seed the randomiser such that our challenges aren't
838 : * predictable... */
839 328 : srand((unsigned int) GDKusec());
840 :
841 328 : psock = GDKmalloc(sizeof(socks));
842 328 : if (psock == NULL) {
843 0 : for (int i = 0; i < 3; i++) {
844 0 : if (socks[i] != INVALID_SOCKET)
845 0 : closesocket(socks[i]);
846 : }
847 0 : throw(MAL, "mal_mapi.listen", SQLSTATE(HY013) MAL_MALLOC_FAIL);
848 : }
849 328 : memcpy(psock, socks, sizeof(socks));
850 328 : if (MT_create_thread(&pid, (void (*)(void *)) SERVERlistenThread, psock,
851 : MT_THR_DETACHED, "listenThread") != 0) {
852 0 : for (int i = 0; i < 3; i++) {
853 0 : if (socks[i] != INVALID_SOCKET)
854 0 : closesocket(socks[i]);
855 : }
856 0 : GDKfree(psock);
857 0 : throw(MAL, "mal_mapi.listen",
858 : OPERATION_FAILED ": starting thread failed");
859 : }
860 :
861 328 : TRC_DEBUG(MAL_SERVER, "Ready to accept connections on: %s:%d\n", host,
862 : port);
863 :
864 328 : if (socks[0] != INVALID_SOCKET || socks[1] != INVALID_SOCKET) {
865 328 : if (!GDKinmemory(0) && (buf = msab_marchConnection(host, port)) != NULL)
866 0 : free(buf);
867 : else
868 : /* announce that we're now reachable */
869 328 : printf("# Listening for connection requests on "
870 : "mapi:monetdb://%s:%i/\n", host, port);
871 : }
872 : #ifdef HAVE_SYS_UN_H
873 328 : if (socks[2] != INVALID_SOCKET) {
874 328 : if (!GDKinmemory(0)
875 328 : && (buf = msab_marchConnection(usockfile, 0)) != NULL)
876 0 : free(buf);
877 : else
878 : /* announce that we're now reachable */
879 328 : printf("# Listening for UNIX domain connection requests on "
880 : "mapi:monetdb://%s\n", usockfile);
881 : }
882 328 : if (usockfilenew)
883 328 : GDKfree(usockfilenew);
884 : #endif
885 :
886 328 : fflush(stdout);
887 :
888 328 : return MAL_SUCCEED;
889 : }
890 :
891 : /*
892 : * @- Wrappers
893 : * The MonetDB Version 5 wrappers are collected here
894 : * The latest port known to gain access is stored
895 : * in the database, so that others can more easily
896 : * be notified.
897 : */
898 : static str
899 328 : MAPIprelude(void)
900 : {
901 328 : int port = MAPI_PORT;
902 328 : const char *p = GDKgetenv("mapi_port");
903 :
904 328 : if (p)
905 328 : port = (int) strtol(p, NULL, 10);
906 328 : p = GDKgetenv("mapi_usock");
907 328 : return SERVERlisten(port, p, SERVERMAXUSERS);
908 : }
909 :
910 : static str
911 0 : SERVERlisten_default(int *ret)
912 : {
913 0 : (void) ret;
914 0 : return MAPIprelude();
915 : }
916 :
917 : static str
918 0 : SERVERlisten_usock(int *ret, str *usock)
919 : {
920 0 : (void) ret;
921 0 : return SERVERlisten(-1, usock ? *usock : NULL, SERVERMAXUSERS);
922 : }
923 :
924 : static str
925 0 : SERVERlisten_port(int *ret, int *pid)
926 : {
927 0 : (void) ret;
928 0 : return SERVERlisten(*pid, NULL, SERVERMAXUSERS);
929 : }
930 :
931 : /*
932 : * The internet connection listener may be terminated from the server console,
933 : * or temporarily suspended to enable system maintenance.
934 : * It is advisable to trace the interactions of clients on the server
935 : * side. At least as far as it concerns requests received.
936 : * The kernel supports this 'spying' behavior with a file descriptor
937 : * field in the client record.
938 : */
939 :
940 : static str
941 0 : SERVERstop(void *ret)
942 : {
943 0 : TRC_INFO(MAL_SERVER, "Server stop\n");
944 0 : ATOMIC_SET(&serverexiting, 1);
945 : /* wait until they all exited, but skip the wait if the whole
946 : * system is going down */
947 0 : while (ATOMIC_GET(&nlistener) > 0 && !GDKexiting())
948 0 : MT_sleep_ms(100);
949 0 : (void) ret; /* fool compiler */
950 0 : return MAL_SUCCEED;
951 : }
952 :
953 :
954 : static str
955 0 : SERVERsuspend(void *res)
956 : {
957 0 : (void) res;
958 0 : ATOMIC_SET(&serveractive, 0);
959 0 : return MAL_SUCCEED;
960 : }
961 :
962 : static str
963 0 : SERVERresume(void *res)
964 : {
965 0 : ATOMIC_SET(&serveractive, 1);
966 0 : (void) res;
967 0 : return MAL_SUCCEED;
968 : }
969 :
970 : static str
971 0 : SERVERclient(void *res, const Stream *In, const Stream *Out)
972 : {
973 0 : struct challengedata *data;
974 0 : MT_Id tid;
975 :
976 0 : (void) res;
977 : /* in embedded mode we allow just one client */
978 0 : data = GDKmalloc(sizeof(*data));
979 0 : if (data == NULL)
980 0 : throw(MAL, "mapi.SERVERclient", SQLSTATE(HY013) MAL_MALLOC_FAIL);
981 0 : data->in = block_stream(*In);
982 0 : data->out = block_stream(*Out);
983 0 : if (data->in == NULL || data->out == NULL) {
984 0 : mnstr_destroy(data->in);
985 0 : mnstr_destroy(data->out);
986 0 : GDKfree(data);
987 0 : throw(MAL, "mapi.SERVERclient", SQLSTATE(HY013) MAL_MALLOC_FAIL);
988 : }
989 :
990 : /* generate the challenge string */
991 0 : generateChallenge(data->challenge, 8, 12);
992 :
993 0 : if (MT_create_thread(&tid, doChallenge, data, MT_THR_DETACHED, "clientXXXX") < 0) {
994 0 : mnstr_destroy(data->in);
995 0 : mnstr_destroy(data->out);
996 0 : GDKfree(data);
997 0 : throw(MAL, "mapi.SERVERclient", "cannot fork new client thread");
998 : }
999 : return MAL_SUCCEED;
1000 : }
1001 :
1002 : /*
1003 : * @+ Remote Processing
1004 : * The remainder of the file contains the wrappers around
1005 : * the Mapi library used by application programmers.
1006 : * Details on the functions can be found there.
1007 : *
1008 : * Sessions have a lifetime different from dynamic scopes.
1009 : * This means the user should use a session identifier
1010 : * to select the correct handle.
1011 : * For the time being we use the index in the global
1012 : * session table. The client pointer is retained to
1013 : * perform access control.
1014 : *
1015 : * We use a single result set handle. All data should be
1016 : * consumed before continueing.
1017 : *
1018 : * A few extra routines should be defined to
1019 : * dump and inspect the sessions table.
1020 : *
1021 : * The remote site may return a single error
1022 : * with a series of error lines. These contain
1023 : * then a starting !. They are all stripped here.
1024 : */
1025 : #define catchErrors(fcn) \
1026 : do { \
1027 : int rn = mapi_error(mid); \
1028 : if ((rn == -4 && hdl && mapi_result_error(hdl)) || rn) { \
1029 : const char *err, *e; \
1030 : str newerr; \
1031 : str ret; \
1032 : size_t l; \
1033 : char *f; \
1034 : \
1035 : if (hdl && mapi_result_error(hdl)) \
1036 : err = mapi_result_error(hdl); \
1037 : else \
1038 : err = mapi_result_error(SERVERsessions[i].hdl); \
1039 : \
1040 : if (err == NULL) \
1041 : err = "(no additional error message)"; \
1042 : \
1043 : l = 2 * strlen(err) + 8192; \
1044 : newerr = (str) GDKmalloc(l); \
1045 : if(newerr == NULL) { err = SQLSTATE(HY013) MAL_MALLOC_FAIL; break;} \
1046 : \
1047 : f = newerr; \
1048 : /* I think this code tries to deal with multiple errors, this \
1049 : * will fail this way if it does, since no ! is in the error \
1050 : * string, only newlines to separate them */ \
1051 : for (e = err; *e && l > 1; e++) { \
1052 : if (*e == '!' && *(e - 1) == '\n') { \
1053 : snprintf(f, l, "MALException:" fcn ":remote error:"); \
1054 : l -= strlen(f); \
1055 : while (*f) \
1056 : f++; \
1057 : } else { \
1058 : *f++ = *e; \
1059 : l--; \
1060 : } \
1061 : } \
1062 : \
1063 : *f = 0; \
1064 : ret = createException(MAL, fcn, \
1065 : OPERATION_FAILED ": remote error: %s", \
1066 : newerr); \
1067 : GDKfree(newerr); \
1068 : return ret; \
1069 : } \
1070 : } while (0)
1071 :
1072 : #define MAXSESSIONS 32
1073 : struct {
1074 : int key;
1075 : str dbalias; /* logical name of the session */
1076 : Client c;
1077 : Mapi mid; /* communication channel */
1078 : MapiHdl hdl; /* result set handle */
1079 : } SERVERsessions[MAXSESSIONS];
1080 :
1081 : static int sessionkey = 0;
1082 :
1083 : /* #define MAPI_TEST*/
1084 :
1085 : static str
1086 6 : SERVERconnectAll(Client cntxt, int *key, str *host, int *port, str *username,
1087 : str *password, str *lang)
1088 : {
1089 6 : Mapi mid;
1090 6 : int i;
1091 :
1092 6 : MT_lock_set(&mal_contextLock);
1093 12 : for (i = 1; i < MAXSESSIONS; i++)
1094 6 : if (SERVERsessions[i].c == 0)
1095 : break;
1096 :
1097 6 : if (i == MAXSESSIONS) {
1098 0 : MT_lock_unset(&mal_contextLock);
1099 0 : throw(IO, "mapi.connect", OPERATION_FAILED ": too many sessions");
1100 : }
1101 6 : SERVERsessions[i].c = cntxt;
1102 6 : SERVERsessions[i].key = ++sessionkey;
1103 6 : MT_lock_unset(&mal_contextLock);
1104 :
1105 6 : mid = mapi_connect(*host, *port, *username, *password, *lang, NULL);
1106 :
1107 6 : if (mid == NULL)
1108 0 : throw(IO, "mapi.connect", MAL_MALLOC_FAIL);
1109 :
1110 6 : if (mapi_error(mid)) {
1111 0 : const char *err = mapi_error_str(mid);
1112 0 : str ex;
1113 0 : if (err == NULL)
1114 0 : err = "(no reason given)";
1115 0 : if (err[0] == '!')
1116 0 : err = err +1;
1117 0 : SERVERsessions[i].c = NULL;
1118 0 : ex = createException(IO, "mapi.connect", "Could not connect: %s", err);
1119 0 : mapi_destroy(mid);
1120 0 : return (ex);
1121 : }
1122 :
1123 : #ifdef MAPI_TEST
1124 : mnstr_printf(SERVERsessions[i].c->fdout,
1125 : "Succeeded to establish session\n");
1126 : #endif
1127 6 : SERVERsessions[i].mid = mid;
1128 6 : *key = SERVERsessions[i].key;
1129 6 : return MAL_SUCCEED;
1130 : }
1131 :
1132 : static str
1133 0 : SERVERdisconnectALL(int *key)
1134 : {
1135 0 : int i;
1136 :
1137 0 : MT_lock_set(&mal_contextLock);
1138 :
1139 0 : for (i = 1; i < MAXSESSIONS; i++)
1140 0 : if (SERVERsessions[i].c != 0) {
1141 : #ifdef MAPI_TEST
1142 : mnstr_printf(SERVERsessions[i].c->fdout, "Close session %d\n", i);
1143 : #endif
1144 0 : SERVERsessions[i].c = 0;
1145 0 : if (SERVERsessions[i].dbalias)
1146 0 : GDKfree(SERVERsessions[i].dbalias);
1147 0 : SERVERsessions[i].dbalias = NULL;
1148 0 : *key = SERVERsessions[i].key;
1149 0 : if (SERVERsessions[i].hdl)
1150 0 : mapi_close_handle(SERVERsessions[i].hdl);
1151 0 : SERVERsessions[i].hdl = NULL;
1152 0 : mapi_disconnect(SERVERsessions[i].mid);
1153 : }
1154 :
1155 0 : MT_lock_unset(&mal_contextLock);
1156 :
1157 0 : return MAL_SUCCEED;
1158 : }
1159 :
1160 : static str
1161 0 : SERVERdisconnectWithAlias(int *key, str *dbalias)
1162 : {
1163 0 : int i;
1164 :
1165 0 : MT_lock_set(&mal_contextLock);
1166 :
1167 0 : for (i = 0; i < MAXSESSIONS; i++)
1168 0 : if (SERVERsessions[i].dbalias &&
1169 0 : strcmp(SERVERsessions[i].dbalias, *dbalias) == 0) {
1170 0 : SERVERsessions[i].c = 0;
1171 0 : if (SERVERsessions[i].dbalias)
1172 0 : GDKfree(SERVERsessions[i].dbalias);
1173 0 : SERVERsessions[i].dbalias = NULL;
1174 0 : *key = SERVERsessions[i].key;
1175 0 : if (SERVERsessions[i].hdl)
1176 0 : mapi_close_handle(SERVERsessions[i].hdl);
1177 0 : SERVERsessions[i].hdl = NULL;
1178 0 : mapi_disconnect(SERVERsessions[i].mid);
1179 0 : break;
1180 : }
1181 :
1182 0 : if (i == MAXSESSIONS) {
1183 0 : MT_lock_unset(&mal_contextLock);
1184 0 : throw(IO, "mapi.disconnect",
1185 : "Impossible to close session for db_alias: '%s'", *dbalias);
1186 : }
1187 :
1188 0 : MT_lock_unset(&mal_contextLock);
1189 0 : return MAL_SUCCEED;
1190 : }
1191 :
1192 : static str
1193 1 : SERVERconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1194 : {
1195 1 : int *key = getArgReference_int(stk, pci, 0);
1196 1 : str *host = getArgReference_str(stk, pci, 1);
1197 1 : int *port = getArgReference_int(stk, pci, 2);
1198 1 : str *username = getArgReference_str(stk, pci, 3);
1199 1 : str *password = getArgReference_str(stk, pci, 4);
1200 1 : str *lang = getArgReference_str(stk, pci, 5);
1201 :
1202 1 : (void) mb;
1203 1 : return SERVERconnectAll(cntxt, key, host, port, username, password, lang);
1204 : }
1205 :
1206 :
1207 : static str
1208 5 : SERVERreconnectAlias(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1209 : {
1210 5 : int *key = getArgReference_int(stk, pci, 0);
1211 5 : str *host = getArgReference_str(stk, pci, 1);
1212 5 : int *port = getArgReference_int(stk, pci, 2);
1213 5 : str *dbalias = getArgReference_str(stk, pci, 3);
1214 5 : str *username = getArgReference_str(stk, pci, 4);
1215 5 : str *password = getArgReference_str(stk, pci, 5);
1216 5 : str *lang = getArgReference_str(stk, pci, 6);
1217 5 : int i;
1218 5 : str msg = MAL_SUCCEED;
1219 :
1220 5 : (void) mb;
1221 :
1222 165 : for (i = 0; i < MAXSESSIONS; i++)
1223 160 : if (SERVERsessions[i].key &&
1224 5 : SERVERsessions[i].dbalias &&
1225 0 : strcmp(SERVERsessions[i].dbalias, *dbalias) == 0) {
1226 0 : *key = SERVERsessions[i].key;
1227 0 : return msg;
1228 : }
1229 :
1230 5 : msg = SERVERconnectAll(cntxt, key, host, port, username, password, lang);
1231 5 : if (msg == MAL_SUCCEED)
1232 5 : msg = SERVERsetAlias(NULL, key, dbalias);
1233 : return msg;
1234 : }
1235 :
1236 : static str
1237 0 : SERVERreconnectWithoutAlias(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
1238 : InstrPtr pci)
1239 : {
1240 0 : int *key = getArgReference_int(stk, pci, 0);
1241 0 : str *host = getArgReference_str(stk, pci, 1);
1242 0 : int *port = getArgReference_int(stk, pci, 2);
1243 0 : str *username = getArgReference_str(stk, pci, 3);
1244 0 : str *password = getArgReference_str(stk, pci, 4);
1245 0 : str *lang = getArgReference_str(stk, pci, 5);
1246 0 : int i;
1247 0 : str msg = MAL_SUCCEED, nme = "anonymous";
1248 :
1249 0 : (void) mb;
1250 :
1251 0 : for (i = 0; i < MAXSESSIONS; i++)
1252 0 : if (SERVERsessions[i].key) {
1253 0 : *key = SERVERsessions[i].key;
1254 0 : return msg;
1255 : }
1256 :
1257 0 : msg = SERVERconnectAll(cntxt, key, host, port, username, password, lang);
1258 0 : if (msg == MAL_SUCCEED)
1259 0 : msg = SERVERsetAlias(NULL, key, &nme);
1260 : return msg;
1261 : }
1262 :
1263 : #define accessTest(val, fcn) \
1264 : do { \
1265 : for(i=0; i< MAXSESSIONS; i++) \
1266 : if( SERVERsessions[i].c && \
1267 : SERVERsessions[i].key== (val)) break; \
1268 : if( i== MAXSESSIONS) \
1269 : throw(MAL, "mapi." fcn, "Access violation," \
1270 : " could not find matching session descriptor"); \
1271 : mid= SERVERsessions[i].mid; \
1272 : (void) mid; /* silence compilers */ \
1273 : } while (0)
1274 :
1275 : static str
1276 5 : SERVERsetAlias(void *ret, int *key, str *dbalias)
1277 : {
1278 5 : int i;
1279 5 : Mapi mid;
1280 10 : accessTest(*key, "setAlias");
1281 5 : SERVERsessions[i].dbalias = GDKstrdup(*dbalias);
1282 5 : if (SERVERsessions[i].dbalias == NULL)
1283 0 : throw(MAL, "mapi.set_alias", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1284 : (void) ret;
1285 : return MAL_SUCCEED;
1286 : }
1287 :
1288 : static str
1289 0 : SERVERlookup(int *ret, str *dbalias)
1290 : {
1291 0 : int i;
1292 0 : for (i = 0; i < MAXSESSIONS; i++)
1293 0 : if (SERVERsessions[i].dbalias &&
1294 0 : strcmp(SERVERsessions[i].dbalias, *dbalias) == 0) {
1295 0 : *ret = SERVERsessions[i].key;
1296 0 : return MAL_SUCCEED;
1297 : }
1298 0 : throw(MAL, "mapi.lookup", "Could not find database connection");
1299 : }
1300 :
1301 : static str
1302 0 : SERVERtrace(void *ret, int *key, int *flag)
1303 : {
1304 0 : (void) ret;
1305 0 : mapi_trace(SERVERsessions[*key].mid, (bool) *flag);
1306 0 : return MAL_SUCCEED;
1307 : }
1308 :
1309 : static str
1310 0 : SERVERdisconnect(void *ret, int *key)
1311 : {
1312 0 : int i;
1313 0 : Mapi mid;
1314 0 : (void) ret;
1315 0 : accessTest(*key, "disconnect");
1316 0 : if (SERVERsessions[i].hdl)
1317 0 : mapi_close_handle(SERVERsessions[i].hdl);
1318 0 : SERVERsessions[i].hdl = NULL;
1319 0 : mapi_disconnect(mid);
1320 0 : if (SERVERsessions[i].dbalias)
1321 0 : GDKfree(SERVERsessions[i].dbalias);
1322 0 : SERVERsessions[i].c = 0;
1323 0 : SERVERsessions[i].dbalias = 0;
1324 0 : return MAL_SUCCEED;
1325 : }
1326 :
1327 : static str
1328 6 : SERVERdestroy(void *ret, int *key)
1329 : {
1330 6 : int i;
1331 6 : Mapi mid;
1332 6 : (void) ret;
1333 12 : accessTest(*key, "destroy");
1334 6 : if (SERVERsessions[i].hdl)
1335 4 : mapi_close_handle(SERVERsessions[i].hdl);
1336 6 : SERVERsessions[i].hdl = NULL;
1337 6 : mapi_disconnect(mid);
1338 6 : mapi_destroy(mid);
1339 6 : SERVERsessions[i].c = 0;
1340 6 : if (SERVERsessions[i].dbalias)
1341 5 : GDKfree(SERVERsessions[i].dbalias);
1342 6 : SERVERsessions[i].dbalias = 0;
1343 6 : return MAL_SUCCEED;
1344 : }
1345 :
1346 : static str
1347 0 : SERVERreconnect(void *ret, int *key)
1348 : {
1349 0 : int i;
1350 0 : Mapi mid;
1351 0 : (void) ret;
1352 0 : accessTest(*key, "destroy");
1353 0 : if (SERVERsessions[i].hdl)
1354 0 : mapi_close_handle(SERVERsessions[i].hdl);
1355 0 : SERVERsessions[i].hdl = NULL;
1356 0 : mapi_reconnect(mid);
1357 0 : return MAL_SUCCEED;
1358 : }
1359 :
1360 : static str
1361 0 : SERVERping(int *ret, int *key)
1362 : {
1363 0 : int i;
1364 0 : Mapi mid;
1365 0 : accessTest(*key, "destroy");
1366 0 : *ret = mapi_ping(mid);
1367 0 : return MAL_SUCCEED;
1368 : }
1369 :
1370 : static str
1371 23 : SERVERquery(int *ret, int *key, str *qry)
1372 : {
1373 23 : Mapi mid;
1374 23 : MapiHdl hdl = 0;
1375 23 : int i;
1376 46 : accessTest(*key, "query");
1377 23 : if (SERVERsessions[i].hdl)
1378 19 : mapi_close_handle(SERVERsessions[i].hdl);
1379 23 : SERVERsessions[i].hdl = mapi_query(mid, *qry);
1380 23 : catchErrors("mapi.query");
1381 23 : *ret = *key;
1382 23 : return MAL_SUCCEED;
1383 : }
1384 :
1385 : static str
1386 0 : SERVERquery_handle(int *ret, int *key, str *qry)
1387 : {
1388 0 : Mapi mid;
1389 0 : MapiHdl hdl = 0;
1390 0 : int i;
1391 0 : accessTest(*key, "query_handle");
1392 0 : mapi_query_handle(SERVERsessions[i].hdl, *qry);
1393 0 : catchErrors("mapi.query_handle");
1394 0 : *ret = *key;
1395 0 : return MAL_SUCCEED;
1396 : }
1397 :
1398 : static str
1399 0 : SERVERquery_array(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pc)
1400 : {
1401 0 : (void) cntxt, (void) mb;
1402 0 : (void) stk;
1403 0 : (void) pc;
1404 0 : throw(MAL, "mapi.query_array", SQLSTATE(0 A000) PROGRAM_NYI);
1405 : }
1406 :
1407 : static str
1408 0 : SERVERprepare(int *ret, int *key, str *qry)
1409 : {
1410 0 : Mapi mid;
1411 0 : int i;
1412 0 : accessTest(*key, "prepare");
1413 0 : if (SERVERsessions[i].hdl)
1414 0 : mapi_close_handle(SERVERsessions[i].hdl);
1415 0 : SERVERsessions[i].hdl = mapi_prepare(mid, *qry);
1416 0 : if (mapi_error(mid))
1417 0 : throw(MAL, "mapi.prepare", "%s",
1418 : mapi_result_error(SERVERsessions[i].hdl));
1419 0 : *ret = *key;
1420 0 : return MAL_SUCCEED;
1421 : }
1422 :
1423 : static str
1424 0 : SERVERfinish(int *ret, int *key)
1425 : {
1426 0 : Mapi mid;
1427 0 : int i;
1428 0 : accessTest(*key, "finish");
1429 0 : mapi_finish(SERVERsessions[i].hdl);
1430 0 : if (mapi_error(mid))
1431 0 : throw(MAL, "mapi.finish", "%s",
1432 : mapi_result_error(SERVERsessions[i].hdl));
1433 0 : *ret = *key;
1434 0 : return MAL_SUCCEED;
1435 : }
1436 :
1437 : static str
1438 1 : SERVERget_row_count(lng *ret, int *key)
1439 : {
1440 1 : Mapi mid;
1441 1 : int i;
1442 2 : accessTest(*key, "get_row_count");
1443 1 : *ret = (lng) mapi_get_row_count(SERVERsessions[i].hdl);
1444 1 : if (mapi_error(mid))
1445 0 : throw(MAL, "mapi.get_row_count", "%s",
1446 : mapi_result_error(SERVERsessions[i].hdl));
1447 : return MAL_SUCCEED;
1448 : }
1449 :
1450 : static str
1451 1 : SERVERget_field_count(int *ret, int *key)
1452 : {
1453 1 : Mapi mid;
1454 1 : int i;
1455 2 : accessTest(*key, "get_field_count");
1456 1 : *ret = mapi_get_field_count(SERVERsessions[i].hdl);
1457 1 : if (mapi_error(mid))
1458 0 : throw(MAL, "mapi.get_field_count", "%s",
1459 : mapi_result_error(SERVERsessions[i].hdl));
1460 : return MAL_SUCCEED;
1461 : }
1462 :
1463 : static str
1464 0 : SERVERrows_affected(lng *ret, int *key)
1465 : {
1466 0 : Mapi mid;
1467 0 : int i;
1468 0 : accessTest(*key, "rows_affected");
1469 0 : *ret = (lng) mapi_rows_affected(SERVERsessions[i].hdl);
1470 0 : return MAL_SUCCEED;
1471 : }
1472 :
1473 : static str
1474 1 : SERVERfetch_row(int *ret, int *key)
1475 : {
1476 1 : Mapi mid;
1477 1 : int i;
1478 2 : accessTest(*key, "fetch_row");
1479 1 : *ret = mapi_fetch_row(SERVERsessions[i].hdl);
1480 1 : return MAL_SUCCEED;
1481 : }
1482 :
1483 : static str
1484 0 : SERVERfetch_all_rows(lng *ret, int *key)
1485 : {
1486 0 : Mapi mid;
1487 0 : int i;
1488 0 : accessTest(*key, "fetch_all_rows");
1489 0 : *ret = (lng) mapi_fetch_all_rows(SERVERsessions[i].hdl);
1490 0 : return MAL_SUCCEED;
1491 : }
1492 :
1493 : static str
1494 1 : SERVERfetch_field_str(str *ret, int *key, int *fnr)
1495 : {
1496 1 : Mapi mid;
1497 1 : int i;
1498 1 : str fld;
1499 2 : accessTest(*key, "fetch_field");
1500 1 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1501 1 : *ret = GDKstrdup(fld ? fld : str_nil);
1502 1 : if (*ret == NULL)
1503 0 : throw(MAL, "mapi.fetch_field_str", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1504 1 : if (mapi_error(mid))
1505 0 : throw(MAL, "mapi.fetch_field_str", "%s",
1506 : mapi_result_error(SERVERsessions[i].hdl));
1507 : return MAL_SUCCEED;
1508 : }
1509 :
1510 : static str
1511 1 : SERVERfetch_field_int(int *ret, int *key, int *fnr)
1512 : {
1513 1 : Mapi mid;
1514 1 : int i;
1515 1 : str fld;
1516 2 : accessTest(*key, "fetch_field");
1517 1 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1518 2 : *ret = fld ? (int) atol(fld) : int_nil;
1519 1 : if (mapi_error(mid))
1520 0 : throw(MAL, "mapi.fetch_field_int", "%s",
1521 : mapi_result_error(SERVERsessions[i].hdl));
1522 : return MAL_SUCCEED;
1523 : }
1524 :
1525 : static str
1526 0 : SERVERfetch_field_lng(lng *ret, int *key, int *fnr)
1527 : {
1528 0 : Mapi mid;
1529 0 : int i;
1530 0 : str fld;
1531 0 : accessTest(*key, "fetch_field");
1532 0 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1533 0 : *ret = fld ? atol(fld) : lng_nil;
1534 0 : if (mapi_error(mid))
1535 0 : throw(MAL, "mapi.fetch_field_lng", "%s",
1536 : mapi_result_error(SERVERsessions[i].hdl));
1537 : return MAL_SUCCEED;
1538 : }
1539 :
1540 : #ifdef HAVE_HGE
1541 : static str
1542 0 : SERVERfetch_field_hge(hge *ret, int *key, int *fnr)
1543 : {
1544 0 : Mapi mid;
1545 0 : int i;
1546 0 : str fld;
1547 0 : accessTest(*key, "fetch_field");
1548 0 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1549 0 : *ret = fld ? atol(fld) : hge_nil;
1550 0 : if (mapi_error(mid))
1551 0 : throw(MAL, "mapi.fetch_field_hge", "%s",
1552 : mapi_result_error(SERVERsessions[i].hdl));
1553 : return MAL_SUCCEED;
1554 : }
1555 : #endif
1556 :
1557 : static str
1558 0 : SERVERfetch_field_sht(sht *ret, int *key, int *fnr)
1559 : {
1560 0 : Mapi mid;
1561 0 : int i;
1562 0 : str fld;
1563 0 : accessTest(*key, "fetch_field");
1564 0 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1565 0 : *ret = fld ? (sht) atol(fld) : sht_nil;
1566 0 : if (mapi_error(mid))
1567 0 : throw(MAL, "mapi.fetch_field", "%s",
1568 : mapi_result_error(SERVERsessions[i].hdl));
1569 : return MAL_SUCCEED;
1570 : }
1571 :
1572 : static str
1573 0 : SERVERfetch_field_void(void *ret, int *key, int *fnr)
1574 : {
1575 0 : Mapi mid;
1576 0 : int i;
1577 0 : (void) ret;
1578 0 : (void) fnr;
1579 0 : accessTest(*key, "fetch_field");
1580 0 : throw(MAL, "mapi.fetch_field_void", "defaults to nil");
1581 : }
1582 :
1583 : static str
1584 0 : SERVERfetch_field_oid(oid *ret, int *key, int *fnr)
1585 : {
1586 0 : Mapi mid;
1587 0 : int i;
1588 0 : str fld;
1589 0 : accessTest(*key, "fetch_field");
1590 0 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1591 0 : if (mapi_error(mid))
1592 0 : throw(MAL, "mapi.fetch_field_oid", "%s",
1593 : mapi_result_error(SERVERsessions[i].hdl));
1594 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1595 0 : *(oid *) ret = void_nil;
1596 : else
1597 0 : *(oid *) ret = (oid) atol(fld);
1598 : return MAL_SUCCEED;
1599 : }
1600 :
1601 : static str
1602 0 : SERVERfetch_field_bte(bte *ret, int *key, int *fnr)
1603 : {
1604 0 : Mapi mid;
1605 0 : int i;
1606 0 : str fld;
1607 0 : accessTest(*key, "fetch_field");
1608 0 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1609 0 : if (mapi_error(mid))
1610 0 : throw(MAL, "mapi.fetch_field_bte", "%s",
1611 : mapi_result_error(SERVERsessions[i].hdl));
1612 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1613 0 : *(bte *) ret = bte_nil;
1614 : else
1615 0 : *(bte *) ret = *fld;
1616 : return MAL_SUCCEED;
1617 : }
1618 :
1619 : static str
1620 0 : SERVERfetch_line(str *ret, int *key)
1621 : {
1622 0 : Mapi mid;
1623 0 : int i;
1624 0 : str fld;
1625 0 : accessTest(*key, "fetch_line");
1626 0 : fld = mapi_fetch_line(SERVERsessions[i].hdl);
1627 0 : if (mapi_error(mid))
1628 0 : throw(MAL, "mapi.fetch_line", "%s",
1629 : mapi_result_error(SERVERsessions[i].hdl));
1630 0 : *ret = GDKstrdup(fld ? fld : str_nil);
1631 0 : if (*ret == NULL)
1632 0 : throw(MAL, "mapi.fetch_line", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1633 : return MAL_SUCCEED;
1634 : }
1635 :
1636 : static str
1637 0 : SERVERnext_result(int *ret, int *key)
1638 : {
1639 0 : Mapi mid;
1640 0 : int i;
1641 0 : accessTest(*key, "next_result");
1642 0 : mapi_next_result(SERVERsessions[i].hdl);
1643 0 : if (mapi_error(mid))
1644 0 : throw(MAL, "mapi.next_result", "%s",
1645 : mapi_result_error(SERVERsessions[i].hdl));
1646 0 : *ret = *key;
1647 0 : return MAL_SUCCEED;
1648 : }
1649 :
1650 : static str
1651 0 : SERVERfetch_reset(int *ret, int *key)
1652 : {
1653 0 : Mapi mid;
1654 0 : int i;
1655 0 : accessTest(*key, "fetch_reset");
1656 0 : mapi_fetch_reset(SERVERsessions[i].hdl);
1657 0 : if (mapi_error(mid))
1658 0 : throw(MAL, "mapi.fetch_reset", "%s",
1659 : mapi_result_error(SERVERsessions[i].hdl));
1660 0 : *ret = *key;
1661 0 : return MAL_SUCCEED;
1662 : }
1663 :
1664 : static str
1665 0 : SERVERfetch_field_bat(bat *bid, int *key)
1666 : {
1667 0 : int i, j, cnt;
1668 0 : Mapi mid;
1669 0 : char *fld;
1670 0 : BAT *b;
1671 :
1672 0 : accessTest(*key, "rpc");
1673 0 : b = COLnew(0, TYPE_str, 256, TRANSIENT);
1674 0 : if (b == NULL)
1675 0 : throw(MAL, "mapi.fetch", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1676 0 : cnt = mapi_get_field_count(SERVERsessions[i].hdl);
1677 0 : for (j = 0; j < cnt; j++) {
1678 0 : fld = mapi_fetch_field(SERVERsessions[i].hdl, j);
1679 0 : if (mapi_error(mid)) {
1680 0 : BBPreclaim(b);
1681 0 : throw(MAL, "mapi.fetch_field_bat", "%s",
1682 : mapi_result_error(SERVERsessions[i].hdl));
1683 : }
1684 0 : if (BUNappend(b, fld, false) != GDK_SUCCEED) {
1685 0 : BBPreclaim(b);
1686 0 : throw(MAL, "mapi.fetch_field_bat", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1687 : }
1688 : }
1689 0 : *bid = b->batCacheid;
1690 0 : BBPkeepref(b);
1691 0 : return MAL_SUCCEED;
1692 : }
1693 :
1694 : static str
1695 0 : SERVERerror(int *ret, int *key)
1696 : {
1697 0 : Mapi mid;
1698 0 : int i;
1699 0 : accessTest(*key, "error");
1700 0 : *ret = mapi_error(mid);
1701 0 : return MAL_SUCCEED;
1702 : }
1703 :
1704 : static str
1705 0 : SERVERgetError(str *ret, int *key)
1706 : {
1707 0 : Mapi mid;
1708 0 : int i;
1709 0 : accessTest(*key, "getError");
1710 0 : *ret = GDKstrdup(mapi_error_str(mid));
1711 0 : if (*ret == NULL)
1712 0 : throw(MAL, "mapi.get_error", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1713 : return MAL_SUCCEED;
1714 : }
1715 :
1716 : static str
1717 0 : SERVERexplain(str *ret, int *key)
1718 : {
1719 0 : Mapi mid;
1720 0 : int i;
1721 :
1722 0 : accessTest(*key, "explain");
1723 0 : *ret = GDKstrdup(mapi_error_str(mid));
1724 0 : if (*ret == NULL)
1725 0 : throw(MAL, "mapi.explain", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1726 : return MAL_SUCCEED;
1727 : }
1728 :
1729 : /*
1730 : * The remainder should contain the wrapping of
1731 : * relevant SERVER functions. Furthermore, we
1732 : * should analyse the return value and update
1733 : * the stack trace.
1734 : *
1735 : * Two routines should be
1736 : * mapi.rpc(key,"query")
1737 : *
1738 : * The generic scheme for handling a remote MAL
1739 : * procedure call with a single row answer.
1740 : */
1741 : static int
1742 18 : SERVERfieldAnalysis(str fld, int tpe, ValPtr v)
1743 : {
1744 18 : v->bat = false;
1745 18 : v->vtype = tpe;
1746 18 : switch (tpe) {
1747 0 : case TYPE_void:
1748 0 : v->val.oval = void_nil;
1749 0 : break;
1750 2 : case TYPE_oid:
1751 2 : if (fld == 0 || strcmp(fld, "nil") == 0)
1752 1 : v->val.oval = void_nil;
1753 : else
1754 1 : v->val.oval = (oid) atol(fld);
1755 : break;
1756 0 : case TYPE_bit:
1757 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1758 0 : v->val.btval = bit_nil;
1759 0 : else if (strcmp(fld, "true") == 0)
1760 0 : v->val.btval = TRUE;
1761 0 : else if (strcmp(fld, "false") == 0)
1762 0 : v->val.btval = FALSE;
1763 : break;
1764 0 : case TYPE_bte:
1765 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1766 0 : v->val.btval = bte_nil;
1767 : else
1768 0 : v->val.btval = *fld;
1769 : break;
1770 0 : case TYPE_sht:
1771 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1772 0 : v->val.shval = sht_nil;
1773 : else
1774 0 : v->val.shval = (sht) atol(fld);
1775 : break;
1776 12 : case TYPE_int:
1777 12 : if (fld == 0 || strcmp(fld, "nil") == 0)
1778 1 : v->val.ival = int_nil;
1779 : else
1780 11 : v->val.ival = (int) atol(fld);
1781 : break;
1782 2 : case TYPE_lng:
1783 2 : if (fld == 0 || strcmp(fld, "nil") == 0)
1784 0 : v->val.lval = lng_nil;
1785 : else
1786 2 : v->val.lval = (lng) atol(fld);
1787 : break;
1788 : #ifdef HAVE_HGE
1789 0 : case TYPE_hge:
1790 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1791 0 : v->val.hval = hge_nil;
1792 : else
1793 0 : v->val.hval = (hge) atol(fld);
1794 : break;
1795 : #endif
1796 0 : case TYPE_flt:
1797 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1798 0 : v->val.fval = flt_nil;
1799 : else
1800 0 : v->val.fval = (flt) atof(fld);
1801 : break;
1802 0 : case TYPE_dbl:
1803 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1804 0 : v->val.dval = dbl_nil;
1805 : else
1806 0 : v->val.dval = (dbl) atof(fld);
1807 : break;
1808 2 : case TYPE_str:
1809 2 : if (fld == 0 || strcmp(fld, "nil") == 0) {
1810 0 : if (VALinit(v, TYPE_str, str_nil) == NULL)
1811 : return -1;
1812 : } else {
1813 2 : if (VALinit(v, TYPE_str, fld) == NULL)
1814 : return -1;
1815 : }
1816 : break;
1817 : }
1818 : return 0;
1819 : }
1820 :
1821 : static str
1822 6 : SERVERmapi_rpc_single_row(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
1823 : InstrPtr pci)
1824 : {
1825 6 : int key, i, j;
1826 6 : Mapi mid;
1827 6 : MapiHdl hdl;
1828 6 : char *s, *fld, *qry = 0;
1829 :
1830 6 : (void) cntxt;
1831 6 : key = *getArgReference_int(stk, pci, pci->retc);
1832 12 : accessTest(key, "rpc");
1833 : #ifdef MAPI_TEST
1834 : mnstr_printf(cntxt->fdout, "about to send: %s\n", qry);
1835 : #endif
1836 : /* glue all strings together */
1837 12 : for (i = pci->retc + 1; i < pci->argc; i++) {
1838 6 : fld = *getArgReference_str(stk, pci, i);
1839 6 : if (qry == 0) {
1840 6 : qry = GDKstrdup(fld);
1841 6 : if (qry == NULL)
1842 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1843 : } else {
1844 0 : s = (char *) GDKmalloc(strlen(qry) + strlen(fld) + 1);
1845 0 : if (s == NULL) {
1846 0 : GDKfree(qry);
1847 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1848 : }
1849 0 : strcpy(s, qry);
1850 0 : strcat(s, fld);
1851 0 : GDKfree(qry);
1852 0 : qry = s;
1853 : }
1854 : }
1855 6 : hdl = mapi_query(mid, qry);
1856 6 : GDKfree(qry);
1857 6 : catchErrors("mapi.rpc");
1858 :
1859 : i = 0;
1860 12 : while (mapi_fetch_row(hdl)) {
1861 12 : for (j = 0; j < pci->retc; j++) {
1862 6 : fld = mapi_fetch_field(hdl, j);
1863 : #ifdef MAPI_TEST
1864 : mnstr_printf(cntxt->fdout, "Got: %s\n", fld);
1865 : #endif
1866 6 : switch (getVarType(mb, getArg(pci, j))) {
1867 6 : case TYPE_void:
1868 : case TYPE_oid:
1869 : case TYPE_bit:
1870 : case TYPE_bte:
1871 : case TYPE_sht:
1872 : case TYPE_int:
1873 : case TYPE_lng:
1874 : #ifdef HAVE_HGE
1875 : case TYPE_hge:
1876 : #endif
1877 : case TYPE_flt:
1878 : case TYPE_dbl:
1879 : case TYPE_str:
1880 6 : if (SERVERfieldAnalysis(fld, getVarType(mb, getArg(pci, j)),
1881 6 : &stk->stk[pci->argv[j]]) < 0) {
1882 0 : mapi_close_handle(hdl);
1883 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1884 : }
1885 6 : break;
1886 0 : default:
1887 0 : mapi_close_handle(hdl);
1888 0 : throw(MAL, "mapi.rpc", "Missing type implementation ");
1889 : /* all the other basic types come here */
1890 : }
1891 : }
1892 6 : i++;
1893 : }
1894 6 : mapi_close_handle(hdl);
1895 6 : if (i > 1)
1896 0 : throw(MAL, "mapi.rpc", "Too many answers");
1897 : return MAL_SUCCEED;
1898 : }
1899 :
1900 : /*
1901 : * Transport of the BATs is only slightly more complicated.
1902 : * The generic implementation based on a pattern is the next
1903 : * step.
1904 : */
1905 : static str
1906 5 : SERVERmapi_rpc_bat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1907 : {
1908 5 : bat *ret;
1909 5 : int *key;
1910 5 : str *qry, err = MAL_SUCCEED;
1911 5 : Mapi mid;
1912 5 : MapiHdl hdl;
1913 5 : char *fld2;
1914 5 : BAT *b;
1915 5 : ValRecord tval;
1916 5 : int i = 0, tt;
1917 :
1918 5 : (void) cntxt;
1919 5 : ret = getArgReference_bat(stk, pci, 0);
1920 5 : key = getArgReference_int(stk, pci, pci->retc);
1921 5 : qry = getArgReference_str(stk, pci, pci->retc + 1);
1922 10 : accessTest(*key, "rpc");
1923 5 : tt = getBatType(getVarType(mb, getArg(pci, 0)));
1924 :
1925 5 : hdl = mapi_query(mid, *qry);
1926 5 : catchErrors("mapi.rpc");
1927 :
1928 5 : b = COLnew(0, tt, 256, TRANSIENT);
1929 5 : if (b == NULL) {
1930 0 : mapi_close_handle(hdl);
1931 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1932 : }
1933 17 : while (mapi_fetch_row(hdl)) {
1934 12 : fld2 = mapi_fetch_field(hdl, 1);
1935 12 : if (SERVERfieldAnalysis(fld2, tt, &tval) < 0) {
1936 0 : BBPreclaim(b);
1937 0 : mapi_close_handle(hdl);
1938 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1939 : }
1940 12 : if (BUNappend(b, VALptr(&tval), false) != GDK_SUCCEED) {
1941 0 : BBPreclaim(b);
1942 0 : mapi_close_handle(hdl);
1943 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1944 : }
1945 : }
1946 5 : mapi_close_handle(hdl);
1947 5 : *ret = b->batCacheid;
1948 5 : BBPkeepref(b);
1949 :
1950 5 : return err;
1951 : }
1952 :
1953 : static str
1954 2 : SERVERput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1955 : {
1956 2 : int *key;
1957 2 : str *nme;
1958 2 : ptr val;
1959 2 : int i, tpe;
1960 2 : Mapi mid;
1961 2 : MapiHdl hdl = 0;
1962 2 : char *w = 0, buf[BUFSIZ];
1963 :
1964 2 : (void) cntxt;
1965 2 : key = getArgReference_int(stk, pci, pci->retc);
1966 2 : nme = getArgReference_str(stk, pci, pci->retc + 1);
1967 2 : val = getArgReference(stk, pci, pci->retc + 2);
1968 6 : accessTest(*key, "put");
1969 :
1970 2 : tpe = getArgType(mb, pci, pci->retc + 2);
1971 2 : if (isaBatType(tpe)) {
1972 : /* generate a tuple batch */
1973 : /* and reload it into the proper format */
1974 0 : str ht, tt;
1975 0 : BAT *b = BBPquickdesc(BBPindex(*nme));
1976 0 : size_t len;
1977 :
1978 0 : if (!b)
1979 0 : throw(MAL, "mapi.put", RUNTIME_OBJECT_MISSING);
1980 :
1981 : /* reconstruct the object */
1982 0 : ht = getTypeName(TYPE_oid);
1983 0 : tt = getTypeName(getBatType(tpe));
1984 0 : snprintf(buf, BUFSIZ, "%s:= bat.new(:%s,%s);", *nme, ht, tt);
1985 0 : len = strlen(buf);
1986 0 : snprintf(buf + len, BUFSIZ - len, "%s:= io.import(%s,tuples);", *nme,
1987 : *nme);
1988 :
1989 : /* and execute the request */
1990 0 : if (SERVERsessions[i].hdl)
1991 0 : mapi_close_handle(SERVERsessions[i].hdl);
1992 0 : SERVERsessions[i].hdl = mapi_query(mid, buf);
1993 :
1994 0 : GDKfree(ht);
1995 0 : GDKfree(tt);
1996 : } else {
1997 2 : switch (tpe) {
1998 0 : case TYPE_str:
1999 0 : snprintf(buf, BUFSIZ, "%s:=%s;", *nme, *(char **) val);
2000 0 : if (SERVERsessions[i].hdl)
2001 0 : mapi_close_handle(SERVERsessions[i].hdl);
2002 0 : SERVERsessions[i].hdl = mapi_query(mid, buf);
2003 0 : break;
2004 2 : default:
2005 2 : if ((w = ATOMformat(tpe, val)) == NULL)
2006 0 : throw(MAL, "mapi.put", GDK_EXCEPTION);
2007 2 : snprintf(buf, BUFSIZ, "%s:=%s;", *nme, w);
2008 2 : GDKfree(w);
2009 2 : if (SERVERsessions[i].hdl)
2010 2 : mapi_close_handle(SERVERsessions[i].hdl);
2011 2 : SERVERsessions[i].hdl = mapi_query(mid, buf);
2012 2 : break;
2013 : }
2014 : }
2015 2 : catchErrors("mapi.put");
2016 : return MAL_SUCCEED;
2017 : }
2018 :
2019 : static str
2020 0 : SERVERputLocal(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
2021 : {
2022 0 : str *ret, *nme;
2023 0 : ptr val;
2024 0 : int tpe;
2025 0 : char *w = 0, buf[BUFSIZ];
2026 :
2027 0 : (void) cntxt;
2028 0 : ret = getArgReference_str(stk, pci, 0);
2029 0 : nme = getArgReference_str(stk, pci, pci->retc);
2030 0 : val = getArgReference(stk, pci, pci->retc + 1);
2031 0 : tpe = getArgType(mb, pci, pci->retc + 1);
2032 0 : if (isaBatType(tpe))
2033 0 : throw(MAL, "mapi.glue", "Unsupported type");
2034 0 : switch (tpe) {
2035 0 : case TYPE_ptr:
2036 0 : throw(MAL, "mapi.glue", "Unsupported type");
2037 0 : case TYPE_str:
2038 0 : snprintf(buf, BUFSIZ, "%s:=%s;", *nme, *(char **) val);
2039 0 : break;
2040 0 : default:
2041 0 : if ((w = ATOMformat(tpe, val)) == NULL)
2042 0 : throw(MAL, "mapi.glue", GDK_EXCEPTION);
2043 0 : snprintf(buf, BUFSIZ, "%s:=%s;", *nme, w);
2044 0 : GDKfree(w);
2045 0 : break;
2046 : }
2047 0 : *ret = GDKstrdup(buf);
2048 0 : if (*ret == NULL)
2049 0 : throw(MAL, "mapi.glue", GDK_EXCEPTION);
2050 : return MAL_SUCCEED;
2051 : }
2052 :
2053 : static str
2054 1 : SERVERbindBAT(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
2055 : {
2056 1 : int *key;
2057 1 : str *nme, *tab, *col;
2058 1 : int i;
2059 1 : Mapi mid;
2060 1 : MapiHdl hdl = 0;
2061 1 : char buf[BUFSIZ];
2062 1 : char name[IDLENGTH] = { 0 };
2063 :
2064 1 : (void) cntxt;
2065 1 : key = getArgReference_int(stk, pci, pci->retc);
2066 1 : nme = getArgReference_str(stk, pci, pci->retc + 1);
2067 2 : accessTest(*key, "bind");
2068 1 : if (pci->argc == 6) {
2069 0 : char *tn;
2070 0 : tab = getArgReference_str(stk, pci, pci->retc + 2);
2071 0 : col = getArgReference_str(stk, pci, pci->retc + 3);
2072 0 : i = *getArgReference_int(stk, pci, pci->retc + 4);
2073 0 : tn = getTypeName(getBatType(getVarType(mb, getDestVar(pci))));
2074 0 : snprintf(buf, BUFSIZ, "%s:bat[:%s]:=sql.bind(\"%s\",\"%s\",\"%s\",%d);",
2075 : getVarNameIntoBuffer(mb, getDestVar(pci), name), tn, *nme, *tab, *col, i);
2076 0 : GDKfree(tn);
2077 1 : } else if (pci->argc == 5) {
2078 0 : tab = getArgReference_str(stk, pci, pci->retc + 2);
2079 0 : i = *getArgReference_int(stk, pci, pci->retc + 3);
2080 0 : snprintf(buf, BUFSIZ, "%s:bat[:oid]:=sql.bind(\"%s\",\"%s\",0,%d);",
2081 : getVarNameIntoBuffer(mb, getDestVar(pci), name), *nme, *tab, i);
2082 : } else {
2083 1 : str hn, tn;
2084 1 : int target = getArgType(mb, pci, 0);
2085 1 : hn = getTypeName(TYPE_oid);
2086 1 : tn = getTypeName(getBatType(target));
2087 1 : snprintf(buf, BUFSIZ, "%s:bat[:%s]:=bbp.bind(\"%s\");",
2088 : getVarNameIntoBuffer(mb, getDestVar(pci), name), tn, *nme);
2089 1 : GDKfree(hn);
2090 1 : GDKfree(tn);
2091 : }
2092 1 : if (SERVERsessions[i].hdl)
2093 1 : mapi_close_handle(SERVERsessions[i].hdl);
2094 1 : SERVERsessions[i].hdl = mapi_query(mid, buf);
2095 1 : catchErrors("mapi.bind");
2096 : return MAL_SUCCEED;
2097 : }
2098 :
2099 : #include "mel.h"
2100 : mel_func mal_mapi_init_funcs[] = {
2101 : command("mapi", "listen", SERVERlisten_default, false, "Start a Mapi server with the default settings.", args(1,1, arg("",int))),
2102 : command("mapi", "listen", SERVERlisten_port, false, "Start a Mapi listener on the port given.", args(1,2, arg("",int),arg("port",int))),
2103 : command("mapi", "listen", SERVERlisten_usock, false, "Start a Mapi listener on the unix socket file given.", args(1,2, arg("",int),arg("unixsocket",str))),
2104 : command("mapi", "stop", SERVERstop, false, "Terminate connection listeners.", args(1,1, arg("",void))),
2105 : command("mapi", "suspend", SERVERsuspend, false, "Suspend accepting connections.", args(1,1, arg("",void))),
2106 : command("mapi", "resume", SERVERresume, false, "Resume connection listeners.", args(1,1, arg("",void))),
2107 : command("mapi", "malclient", SERVERclient, false, "Start a Mapi client for a particular stream pair.", args(1,3, arg("",void),arg("in",streams),arg("out",streams))),
2108 : command("mapi", "trace", SERVERtrace, false, "Toggle the Mapi library debug tracer.", args(1,3, arg("",void),arg("mid",int),arg("flag",int))),
2109 : pattern("mapi", "reconnect", SERVERreconnectWithoutAlias, false, "Re-establish connection with a remote mserver.", args(1,6, arg("",int),arg("host",str),arg("port",int),arg("usr",str),arg("passwd",str),arg("lang",str))),
2110 : pattern("mapi", "reconnect", SERVERreconnectAlias, false, "Re-establish connection with a remote mserver.", args(1,7, arg("",int),arg("host",str),arg("port",int),arg("db_alias",str),arg("usr",str),arg("passwd",str),arg("lang",str))),
2111 : command("mapi", "reconnect", SERVERreconnect, false, "Re-establish a connection.", args(1,2, arg("",void),arg("mid",int))),
2112 : pattern("mapi", "connect", SERVERconnect, false, "Establish connection with a remote mserver.", args(1,6, arg("",int),arg("host",str),arg("port",int),arg("usr",str),arg("passwd",str),arg("lang",str))),
2113 : command("mapi", "disconnect", SERVERdisconnectWithAlias, false, "Close connection with a remote Mserver.", args(1,2, arg("",int),arg("dbalias",str))),
2114 : command("mapi", "disconnect", SERVERdisconnectALL, false, "Close connections with all remote Mserver.", args(1,1, arg("",int))),
2115 : command("mapi", "setAlias", SERVERsetAlias, false, "Give the channel a logical name.", args(0,2, arg("key",int),arg("dbalias",str))),
2116 : command("mapi", "lookup", SERVERlookup, false, "Retrieve the connection identifier.", args(1,2, arg("",int),arg("dbalias",str))),
2117 : command("mapi", "disconnect", SERVERdisconnect, false, "Terminate the session.", args(1,2, arg("",void),arg("mid",int))),
2118 : command("mapi", "destroy", SERVERdestroy, false, "Destroy the handle for an Mserver.", args(1,2, arg("",void),arg("mid",int))),
2119 : command("mapi", "ping", SERVERping, false, "Test availability of an Mserver.", args(1,2, arg("",int),arg("mid",int))),
2120 : command("mapi", "query", SERVERquery, false, "Send the query for execution", args(1,3, arg("",int),arg("mid",int),arg("qry",str))),
2121 : command("mapi", "query_handle", SERVERquery_handle, false, "Send the query for execution.", args(1,3, arg("",int),arg("mid",int),arg("qry",str))),
2122 : pattern("mapi", "query_array", SERVERquery_array, false, "Send the query for execution replacing '?' by arguments.", args(1,4, arg("",int),arg("mid",int),arg("qry",str),vararg("arg",str))),
2123 : command("mapi", "prepare", SERVERprepare, false, "Prepare a query for execution.", args(1,3, arg("",int),arg("mid",int),arg("qry",str))),
2124 : command("mapi", "finish", SERVERfinish, false, "Remove all remaining answers.", args(1,2, arg("",int),arg("hdl",int))),
2125 : command("mapi", "get_field_count", SERVERget_field_count, false, "Return number of fields.", args(1,2, arg("",int),arg("hdl",int))),
2126 : command("mapi", "get_row_count", SERVERget_row_count, false, "Return number of rows.", args(1,2, arg("",lng),arg("hdl",int))),
2127 : command("mapi", "rows_affected", SERVERrows_affected, false, "Return number of affected rows.", args(1,2, arg("",lng),arg("hdl",int))),
2128 : command("mapi", "fetch_row", SERVERfetch_row, false, "Retrieve the next row for analysis.", args(1,2, arg("",int),arg("hdl",int))),
2129 : command("mapi", "fetch_all_rows", SERVERfetch_all_rows, false, "Retrieve all rows into the cache.", args(1,2, arg("",lng),arg("hdl",int))),
2130 : command("mapi", "fetch_field", SERVERfetch_field_str, false, "Retrieve a single field.", args(1,3, arg("",str),arg("hdl",int),arg("fnr",int))),
2131 : command("mapi", "fetch_field", SERVERfetch_field_int, false, "Retrieve a single int field.", args(1,3, arg("",int),arg("hdl",int),arg("fnr",int))),
2132 : command("mapi", "fetch_field", SERVERfetch_field_lng, false, "Retrieve a single lng field.", args(1,3, arg("",lng),arg("hdl",int),arg("fnr",int))),
2133 : command("mapi", "fetch_field", SERVERfetch_field_sht, false, "Retrieve a single sht field.", args(1,3, arg("",sht),arg("hdl",int),arg("fnr",int))),
2134 : command("mapi", "fetch_field", SERVERfetch_field_void, false, "Retrieve a single void field.", args(1,3, arg("",void),arg("hdl",int),arg("fnr",int))),
2135 : command("mapi", "fetch_field", SERVERfetch_field_oid, false, "Retrieve a single void field.", args(1,3, arg("",oid),arg("hdl",int),arg("fnr",int))),
2136 : command("mapi", "fetch_field", SERVERfetch_field_bte, false, "Retrieve a single bte field.", args(1,3, arg("",bte),arg("hdl",int),arg("fnr",int))),
2137 : command("mapi", "fetch_field_array", SERVERfetch_field_bat, false, "Retrieve all fields for a row.", args(1,2, batarg("",str),arg("hdl",int))),
2138 : command("mapi", "fetch_line", SERVERfetch_line, false, "Retrieve a complete line.", args(1,2, arg("",str),arg("hdl",int))),
2139 : command("mapi", "fetch_reset", SERVERfetch_reset, false, "Reset the cache read line.", args(1,2, arg("",int),arg("hdl",int))),
2140 : command("mapi", "next_result", SERVERnext_result, false, "Go to next result set.", args(1,2, arg("",int),arg("hdl",int))),
2141 : command("mapi", "error", SERVERerror, false, "Check for an error in the communication.", args(1,2, arg("",int),arg("mid",int))),
2142 : command("mapi", "getError", SERVERgetError, false, "Get error message.", args(1,2, arg("",str),arg("mid",int))),
2143 : command("mapi", "explain", SERVERexplain, false, "Turn the error seen into a string.", args(1,2, arg("",str),arg("mid",int))),
2144 : pattern("mapi", "put", SERVERput, false, "Send a value to a remote site.", args(1,4, arg("",void),arg("mid",int),arg("nme",str),argany("val",1))),
2145 : pattern("mapi", "put", SERVERputLocal, false, "Prepare sending a value to a remote site.", args(1,3, arg("",str),arg("nme",str),argany("val",1))),
2146 : pattern("mapi", "rpc", SERVERmapi_rpc_single_row, false, "Send a simple query for execution and fetch result.", args(1,3, argany("",1),arg("key",int),vararg("qry",str))),
2147 : pattern("mapi", "rpc", SERVERmapi_rpc_bat, false, "", args(1,3, batargany("",1),arg("key",int),arg("qry",str))),
2148 : command("mapi", "rpc", SERVERquery, false, "Send a simple query for execution.", args(1,3, arg("",int),arg("key",int),arg("qry",str))),
2149 : pattern("mapi", "bind", SERVERbindBAT, false, "Bind a remote variable to a local one.", args(1,6, batargany("",1),arg("key",int),arg("rschema",str),arg("rtable",str),arg("rcolumn",str),arg("i",int))),
2150 : pattern("mapi", "bind", SERVERbindBAT, false, "Bind a remote variable to a local one.", args(1,5, batargany("",1),arg("key",int),arg("rschema",str),arg("rtable",str),arg("i",int))),
2151 : pattern("mapi", "bind", SERVERbindBAT, false, "Bind a remote variable to a local one.", args(1,3, batargany("",1),arg("key",int),arg("remoteName",str))),
2152 : #ifdef HAVE_HGE
2153 : command("mapi", "fetch_field", SERVERfetch_field_hge, false, "Retrieve a single hge field.", args(1,3, arg("",hge),arg("hdl",int),arg("fnr",int))),
2154 : #endif
2155 : { .imp=NULL }
2156 : };
2157 : #include "mal_import.h"
2158 : #ifdef _MSC_VER
2159 : #undef read
2160 : #pragma section(".CRT$XCU",read)
2161 : #endif
2162 334 : LIB_STARTUP_FUNC(init_mal_mapi_mal)
2163 334 : { mal_module2("mapi", NULL, mal_mapi_init_funcs, MAPIprelude, NULL); }
|