Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * @a N.J. Nes P. Boncz, S. Mullender, M. Kersten
15 : * @v 1.1
16 : * @+ MAPI interface
17 : * The complete Mapi library is available to setup
18 : * communication with another Mserver.
19 : *
20 : * Clients may initialize a private listener to implement
21 : * specific services. For example, in an OLTP environment
22 : * it may make sense to have a listener for each transaction
23 : * type, which simply parses a sequence of transaction parameters.
24 : *
25 : * Authorization of access to the server is handled as part
26 : * of the client record initialization phase.
27 : *
28 : * This library internally uses pointer handles, which we replace with
29 : * an index in a locally maintained table. It provides a handle
30 : * to easily detect havoc clients.
31 : *
32 : * A cleaner and simplier interface for distributed processing is available in
33 : * the module remote.
34 : */
35 : #include "monetdb_config.h"
36 : #include "mal_client.h"
37 : #include "mal_session.h"
38 : #include "mal_exception.h"
39 : #include "mal_interpreter.h"
40 : #include "mal_authorize.h"
41 : #include "mal_internal.h"
42 : #include "msabaoth.h"
43 : #include "mcrypt.h"
44 : #include "stream.h"
45 : #include "streams.h" /* for Stream */
46 : #include <sys/types.h>
47 : #include "stream_socket.h"
48 : #include "mapi.h"
49 : #include "mutils.h"
50 :
51 : #if defined(HAVE_GETENTROPY) && defined(HAVE_SYS_RANDOM_H)
52 : #include <sys/random.h>
53 : #endif
54 :
55 : #ifdef HAVE_SYS_SOCKET_H
56 : # include <sys/select.h>
57 : # include <sys/socket.h>
58 : # include <unistd.h> /* gethostname() */
59 : # include <netinet/in.h> /* hton and ntoh */
60 : # include <arpa/inet.h> /* addr_in */
61 : #else /* UNIX specific */
62 : #ifdef HAVE_WINSOCK_H /* Windows specific */
63 : # include <winsock.h>
64 : #endif
65 : #endif
66 : #ifdef HAVE_SYS_UN_H
67 : # include <sys/un.h>
68 : #endif
69 : #ifdef HAVE_NETDB_H
70 : # include <netdb.h>
71 : # include <netinet/in.h>
72 : #endif
73 : #ifdef HAVE_POLL_H
74 : #include <poll.h>
75 : #endif
76 : #ifdef HAVE_SYS_UIO_H
77 : # include <sys/uio.h>
78 : #endif
79 : #ifdef HAVE_FCNTL_H
80 : #include <fcntl.h>
81 : #endif
82 :
83 : #ifdef HAVE_SOCKLEN_T
84 : #define SOCKLEN socklen_t
85 : #else
86 : #define SOCKLEN int
87 : #endif
88 :
89 : #if !defined(HAVE_ACCEPT4) || !defined(SOCK_CLOEXEC)
90 : #define accept4(sockfd, addr, addrlen, flags) accept(sockfd, addr, addrlen)
91 : #endif
92 :
93 : #define SERVERMAXUSERS SOMAXCONN
94 :
95 : static const char seedChars[] ={
96 : 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
97 : 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
98 : 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
99 : 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
100 : '1', '2', '3', '4', '5', '6', '7', '8', '9', '0'
101 : };
102 :
103 :
104 : #if !defined(HAVE_GETENTROPY) && defined(HAVE_RAND_S)
105 : static inline bool
106 : gen_win_challenge(char *buf, size_t size)
107 : {
108 : for (size_t i = 0; i < size;) {
109 : unsigned int r;
110 : if (rand_s(&r) != 0)
111 : return false;
112 : for (size_t j = 0; j < sizeof(size_t) && i < size; j++) {
113 : buf[i++] = seedChars[(r & 0xFF) % 62];
114 : r >>= 8;
115 : }
116 : }
117 : return true;
118 : }
119 : #endif
120 :
121 : static void
122 38376 : generateChallenge(str buf, int min, int max)
123 : {
124 38376 : size_t size;
125 38376 : size_t i;
126 :
127 : #ifdef __COVERITY__
128 : /* hide rand() calls from analysis */
129 : size = (min + max) / 2;
130 : for (i = 0; i < size; i++)
131 : buf[i] = seedChars[i % 62];
132 : buf[size] = '\0';
133 : #else
134 : /* don't seed the randomiser here, or you get the same challenge
135 : * during the same second */
136 : #if defined(HAVE_GETENTROPY)
137 38376 : if (getentropy(&size, sizeof(size)) < 0)
138 : #elif defined(HAVE_RAND_S)
139 : unsigned int r;
140 : if (rand_s(&r) == 0)
141 : size = (size_t) r;
142 : else
143 : #endif
144 0 : size = rand();
145 38376 : size = (size % (max - min)) + min;
146 : #if defined(HAVE_GETENTROPY)
147 38376 : if (getentropy(buf, size) == 0)
148 402689 : for (i = 0; i < size; i++)
149 364313 : buf[i] = seedChars[((unsigned char *) buf)[i] % 62];
150 : else
151 : #elif defined(HAVE_RAND_S)
152 : if (!gen_win_challenge(buf, size))
153 : #endif
154 0 : for (i = 0; i < size; i++)
155 0 : buf[i] = seedChars[rand() % 62];
156 38376 : buf[size] = '\0';
157 : #endif
158 38376 : }
159 :
160 : struct challengedata {
161 : stream *in;
162 : stream *out;
163 : char challenge[13];
164 : };
165 :
166 : static str SERVERsetAlias(void *ret, int *key, str *dbalias);
167 :
168 : static void
169 38376 : doChallenge(void *data)
170 : {
171 38376 : char *buf = GDKmalloc(BLOCK + 1);
172 38371 : char challenge[13];
173 :
174 38371 : stream *fdin = ((struct challengedata *) data)->in;
175 38371 : stream *fdout = ((struct challengedata *) data)->out;
176 38371 : bstream *bs;
177 38371 : ssize_t len = 0;
178 38371 : protocol_version protocol = PROTOCOL_9;
179 38371 : size_t buflen = BLOCK;
180 :
181 38371 : MT_thread_setworking("challenging client");
182 : #ifdef _MSC_VER
183 : srand((unsigned int) GDKusec());
184 : #endif
185 38375 : memcpy(challenge, ((struct challengedata *) data)->challenge,
186 : sizeof(challenge));
187 38375 : GDKfree(data);
188 38375 : if (buf == NULL) {
189 0 : TRC_ERROR(MAL_SERVER, MAL_MALLOC_FAIL "\n");
190 0 : close_stream(fdin);
191 0 : close_stream(fdout);
192 0 : return;
193 : }
194 :
195 : // Send the challenge over the block stream
196 38375 : mnstr_printf(fdout, "%s:mserver:9:%s:%s:%s:sql=%d:BINARY=1:",
197 : challenge, mcrypt_getHashAlgorithms(),
198 : #ifdef WORDS_BIGENDIAN
199 : "BIG",
200 : #else
201 : "LIT",
202 : #endif
203 : MONETDB5_PASSWDHASH, MAPI_HANDSHAKE_OPTIONS_LEVEL);
204 38376 : mnstr_flush(fdout, MNSTR_FLUSH_DATA);
205 : /* get response */
206 38376 : if ((len = mnstr_read_block(fdin, buf, 1, BLOCK)) < 0) {
207 : /* the client must have gone away, so no reason to write anything */
208 0 : close_stream(fdin);
209 0 : close_stream(fdout);
210 0 : GDKfree(buf);
211 0 : return;
212 : }
213 38376 : buf[len] = 0;
214 :
215 38376 : bs = bstream_create(fdin, 128 * BLOCK);
216 :
217 38376 : if (bs == NULL) {
218 0 : mnstr_printf(fdout, "!allocation failure in the server\n");
219 0 : close_stream(fdin);
220 0 : close_stream(fdout);
221 0 : GDKfree(buf);
222 0 : GDKsyserror("SERVERlisten:" MAL_MALLOC_FAIL);
223 0 : return;
224 : }
225 38376 : bs->eof = true;
226 38376 : MSscheduleClient(buf, challenge, bs, fdout, protocol, buflen);
227 : }
228 :
229 : static ATOMIC_TYPE nlistener = ATOMIC_VAR_INIT(0); /* nr of listeners */
230 : static ATOMIC_TYPE serveractive = ATOMIC_VAR_INIT(0);
231 : static ATOMIC_TYPE serverexiting = ATOMIC_VAR_INIT(0); /* listeners should exit */
232 :
233 : static void
234 323 : SERVERlistenThread(SOCKET *Sock)
235 : {
236 323 : char *msg = NULL;
237 323 : int retval;
238 323 : SOCKET socks[3] = { Sock[0], Sock[1], Sock[2] };
239 323 : struct challengedata *data;
240 323 : MT_Id tid;
241 323 : stream *s;
242 323 : int i;
243 :
244 323 : GDKfree(Sock);
245 :
246 323 : ATOMIC_INC(&nlistener);
247 :
248 169237 : do {
249 169237 : SOCKET msgsock = INVALID_SOCKET;
250 : #ifdef HAVE_POLL
251 169237 : struct pollfd pfd[3];
252 169237 : nfds_t npfd;
253 169237 : npfd = 0;
254 676948 : for (i = 0; i < 3; i++) {
255 507711 : if (socks[i] != INVALID_SOCKET)
256 338474 : pfd[npfd++] = (struct pollfd) {
257 : .fd = socks[i],
258 : .events = POLLIN
259 : };
260 : }
261 : /* Wait up to 0.1 seconds (0.01 if testing) */
262 169237 : retval = poll(pfd, npfd,
263 169237 : ATOMIC_GET(&GDKdebug) & FORCEMITOMASK ? 10 : 100);
264 169237 : if (retval == -1 && errno == EINTR)
265 130538 : continue;
266 : #else
267 : fd_set fds;
268 : FD_ZERO(&fds);
269 : /* temporarily use msgsock to record the highest socket fd */
270 : for (i = 0; i < 3; i++) {
271 : if (socks[i] != INVALID_SOCKET) {
272 : FD_SET(socks[i], &fds);
273 : if (msgsock == INVALID_SOCKET || socks[i] > msgsock)
274 : msgsock = socks[i];
275 : }
276 : }
277 : /* Wait up to 0.1 seconds (0.01 if testing) */
278 : struct timeval tv = (struct timeval) {
279 : .tv_usec = ATOMIC_GET(&GDKdebug) & FORCEMITOMASK ? 10000 : 100000,
280 : };
281 :
282 : retval = select((int) msgsock + 1, &fds, NULL, NULL, &tv);
283 : msgsock = INVALID_SOCKET;
284 : #endif
285 169237 : if (ATOMIC_GET(&serverexiting) || GDKexiting())
286 : break;
287 168914 : if (retval == 0) {
288 : /* nothing interesting has happened */
289 130538 : continue;
290 : }
291 38376 : if (retval == SOCKET_ERROR) {
292 0 : if (
293 : #ifdef _MSC_VER
294 : WSAGetLastError() != WSAEINTR
295 : #else
296 0 : errno != EINTR
297 : #endif
298 : ) {
299 0 : msg = "select failed";
300 0 : goto error;
301 : }
302 0 : continue;
303 : }
304 73770 : bool isusock = false;
305 : #ifdef HAVE_POLL
306 73770 : for (i = 0; i < (int) npfd; i++) {
307 73770 : if (pfd[i].revents & POLLIN) {
308 38376 : msgsock = pfd[i].fd;
309 38376 : isusock = msgsock == socks[2];
310 38376 : break;
311 : }
312 : }
313 : #else
314 : for (i = 0; i < 3; i++) {
315 : if (socks[i] != INVALID_SOCKET && FD_ISSET(socks[i], &fds)) {
316 : msgsock = socks[i];
317 : isusock = i == 2;
318 : break;
319 : }
320 : }
321 : #endif
322 38376 : if (msgsock == INVALID_SOCKET)
323 0 : continue;
324 :
325 38376 : if ((msgsock = accept4(msgsock, NULL, NULL, SOCK_CLOEXEC)) == INVALID_SOCKET) {
326 0 : if (
327 : #ifdef _MSC_VER
328 : WSAGetLastError() != WSAEINTR
329 : #else
330 0 : errno != EINTR
331 : #endif
332 0 : || !ATOMIC_GET(&serveractive)) {
333 0 : msg = "accept failed";
334 0 : goto error;
335 : }
336 0 : continue;
337 : }
338 : #if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
339 : (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
340 : #endif
341 : #ifdef HAVE_SYS_UN_H
342 38376 : if (isusock) {
343 35394 : struct msghdr msgh;
344 35394 : struct iovec iov;
345 35394 : char buf[1];
346 35394 : int rv;
347 35394 : char ccmsg[CMSG_SPACE(sizeof(int))];
348 35394 : struct cmsghdr *cmsg;
349 :
350 : /* BEWARE: unix domain sockets have a slightly different
351 : * behaviour initialy than normal sockets, because we can
352 : * send filedescriptors or credentials with them. To do so,
353 : * we need to use sendmsg/recvmsg, which operates on a bare
354 : * socket. Unfortunately we *have* to send something, so it
355 : * is one byte that can optionally carry the ancillary data.
356 : * This byte is at this moment defined to contain a character:
357 : * '0' - there is no ancillary data
358 : * '1' - ancillary data for passing a file descriptor
359 : * The future may introduce a state for passing credentials.
360 : * Any unknown character must be interpreted as some unknown
361 : * action, and hence not supported by the server. */
362 :
363 35394 : iov.iov_base = buf;
364 35394 : iov.iov_len = 1;
365 :
366 35394 : msgh.msg_name = 0;
367 35394 : msgh.msg_namelen = 0;
368 35394 : msgh.msg_iov = &iov;
369 35394 : msgh.msg_iovlen = 1;
370 35394 : msgh.msg_flags = 0;
371 35394 : msgh.msg_control = ccmsg;
372 35394 : msgh.msg_controllen = sizeof(ccmsg);
373 :
374 35394 : rv = recvmsg(msgsock, &msgh, 0);
375 35394 : if (rv == -1) {
376 0 : closesocket(msgsock);
377 0 : continue;
378 : }
379 :
380 35394 : switch (buf[0]) {
381 : case '0':
382 : /* nothing special, nothing to do */
383 : break;
384 0 : case '1':
385 : {
386 0 : int *c_d;
387 : /* filedescriptor, put it in place of msgsock */
388 0 : cmsg = CMSG_FIRSTHDR(&msgh);
389 0 : (void) shutdown(msgsock, SHUT_WR);
390 0 : closesocket(msgsock);
391 0 : if (!cmsg || cmsg->cmsg_type != SCM_RIGHTS) {
392 0 : TRC_CRITICAL(MAL_SERVER,
393 : "Expected file descriptor, but received something else\n");
394 0 : continue;
395 : }
396 : /* HACK to avoid
397 : * "dereferencing type-punned pointer will break strict-aliasing rules"
398 : * (with gcc 4.5.1 on Fedora 14)
399 : */
400 0 : c_d = (int *) CMSG_DATA(cmsg);
401 0 : msgsock = *c_d;
402 : }
403 0 : break;
404 0 : default:
405 : /* some unknown state */
406 0 : closesocket(msgsock);
407 0 : TRC_CRITICAL(MAL_SERVER,
408 : "Unknown command type in first byte\n");
409 0 : continue;
410 : }
411 : }
412 : #endif
413 :
414 38376 : data = GDKzalloc(sizeof(*data));
415 38376 : if (data == NULL) {
416 0 : closesocket(msgsock);
417 0 : TRC_ERROR(MAL_SERVER, MAL_MALLOC_FAIL "\n");
418 0 : continue;
419 : }
420 38376 : data->in = socket_rstream(msgsock, "Server read");
421 38376 : if (data->in == NULL) {
422 0 : stream_alloc_fail:
423 0 : mnstr_destroy(data->in);
424 0 : mnstr_destroy(data->out);
425 0 : GDKfree(data);
426 0 : closesocket(msgsock);
427 0 : TRC_ERROR(MAL_SERVER, "Cannot allocate stream: %s\n",
428 : mnstr_peek_error(NULL));
429 0 : continue;
430 : }
431 38376 : data->out = socket_wstream(msgsock, "Server write");
432 38376 : if (data->out == NULL) {
433 0 : goto stream_alloc_fail;
434 : }
435 38376 : s = block_stream(data->in);
436 38376 : if (s == NULL) {
437 0 : goto stream_alloc_fail;
438 : }
439 38376 : data->in = s;
440 38376 : s = block_stream(data->out);
441 38376 : if (s == NULL) {
442 0 : goto stream_alloc_fail;
443 : }
444 38376 : data->out = s;
445 :
446 : /* generate the challenge string */
447 38376 : generateChallenge(data->challenge, 8, 12);
448 :
449 38376 : if (MT_create_thread(&tid, doChallenge, data, MT_THR_DETACHED, "clientXXXX") < 0) {
450 0 : mnstr_destroy(data->in);
451 0 : mnstr_destroy(data->out);
452 0 : GDKfree(data);
453 0 : closesocket(msgsock);
454 0 : TRC_ERROR(MAL_SERVER, "Cannot fork new client thread\n");
455 0 : continue;
456 : }
457 168914 : } while (!ATOMIC_GET(&serverexiting) && !GDKexiting());
458 0 : error:;
459 : #ifdef HAVE_SYS_UN_H
460 323 : const char *usockfile = GDKgetenv("mapi_usock");
461 646 : if (usockfile && MT_remove(usockfile) == -1 && errno != ENOENT)
462 0 : perror(usockfile);
463 : #endif
464 323 : ATOMIC_DEC(&nlistener);
465 1292 : for (i = 0; i < 3; i++)
466 969 : if (socks[i] != INVALID_SOCKET)
467 646 : closesocket(socks[i]);
468 323 : if (msg)
469 0 : TRC_CRITICAL(MAL_SERVER, "Terminating listener: %s\n", msg);
470 323 : return;
471 : }
472 :
473 : #ifdef _MSC_VER
474 : #define HOSTLEN int
475 : #else
476 : #define HOSTLEN size_t
477 : #endif
478 :
479 : static char *
480 323 : start_listen(SOCKET *sockp, int *portp, const char *listenaddr,
481 : char *host, size_t hostlen, int maxusers)
482 : {
483 323 : struct addrinfo *result = NULL;
484 323 : struct addrinfo hints = {
485 : .ai_family = AF_INET6,
486 : .ai_flags = AI_PASSIVE | AI_NUMERICSERV,
487 : .ai_socktype = SOCK_STREAM,
488 : .ai_protocol = IPPROTO_TCP,
489 : };
490 323 : int e = 0;
491 323 : int ipv6_vs6only = -1;
492 323 : SOCKET sock = INVALID_SOCKET;
493 323 : const char *err;
494 323 : int nsock = 0;
495 323 : sockp[0] = sockp[1] = INVALID_SOCKET;
496 323 : host[0] = 0;
497 323 : if (listenaddr == NULL || strcmp(listenaddr, "localhost") == 0) {
498 0 : hints.ai_family = AF_INET6;
499 0 : hints.ai_flags |= AI_NUMERICHOST;
500 0 : ipv6_vs6only = 0;
501 0 : listenaddr = "::1";
502 0 : strcpy_len(host, "localhost", hostlen);
503 323 : } else if (strcmp(listenaddr, "all") == 0) {
504 323 : hints.ai_family = AF_INET6;
505 323 : ipv6_vs6only = 0;
506 323 : listenaddr = NULL;
507 0 : } else if (strcmp(listenaddr, "::") == 0) {
508 0 : hints.ai_family = AF_INET6;
509 0 : ipv6_vs6only = 1;
510 0 : listenaddr = NULL;
511 0 : } else if (strcmp(listenaddr, "0.0.0.0") == 0) {
512 0 : hints.ai_family = AF_INET;
513 0 : hints.ai_flags |= AI_NUMERICHOST;
514 0 : listenaddr = NULL;
515 0 : } else if (strcmp(listenaddr, "::1") == 0) {
516 0 : hints.ai_family = AF_INET6;
517 0 : hints.ai_flags |= AI_NUMERICHOST;
518 0 : ipv6_vs6only = 1;
519 0 : strcpy_len(host, "localhost", hostlen);
520 0 : } else if (strcmp(listenaddr, "127.0.0.1") == 0) {
521 0 : hints.ai_family = AF_INET;
522 0 : hints.ai_flags |= AI_NUMERICHOST;
523 0 : strcpy_len(host, "localhost", hostlen);
524 : } else {
525 0 : hints.ai_family = AF_INET6;
526 0 : ipv6_vs6only = 0;
527 : }
528 323 : char sport[8]; /* max "65535" */
529 323 : snprintf(sport, sizeof(sport), "%d", *portp);
530 646 : for (;;) { /* max twice */
531 646 : int check = getaddrinfo(listenaddr, sport, &hints, &result);
532 646 : if (check != 0) {
533 : #ifdef _MSC_VER
534 : err = wsaerror(WSAGetLastError());
535 : #else
536 0 : err = gai_strerror(check);
537 : #endif
538 0 : throw(IO, "mal_mapi.listen",
539 : OPERATION_FAILED ": cannot get address "
540 : "information for %s and port %s: %s",
541 0 : listenaddr ? listenaddr : hints.ai_family == AF_INET6 ? "::" : "0.0.0.0", sport, err);
542 : }
543 :
544 969 : for (struct addrinfo * rp = result; rp; rp = rp->ai_next) {
545 646 : sock = socket(rp->ai_family, rp->ai_socktype
546 : #ifdef SOCK_CLOEXEC
547 : | SOCK_CLOEXEC
548 : #endif
549 : , rp->ai_protocol);
550 646 : if (sock == INVALID_SOCKET) {
551 : #ifdef _MSC_VER
552 : e = WSAGetLastError();
553 : #else
554 323 : e = errno;
555 : #endif
556 323 : continue;
557 : }
558 : #if defined(HAVE_FCNTL) && !defined(SOCK_CLOEXEC)
559 : (void) fcntl(sock, F_SETFD, FD_CLOEXEC);
560 : #endif
561 323 : if (ipv6_vs6only >= 0)
562 0 : if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY,
563 : (const char *) &ipv6_vs6only,
564 : (SOCKLEN) sizeof(int)) == -1)
565 0 : perror("setsockopt IPV6_V6ONLY");
566 :
567 : /* do not reuse addresses for ephemeral (autosense) ports */
568 323 : if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
569 323 : (const char *) &(int) { 1 },
570 : (SOCKLEN) sizeof(int)) == SOCKET_ERROR) {
571 : #ifdef _MSC_VER
572 : e = WSAGetLastError();
573 : #else
574 0 : e = errno;
575 : #endif
576 0 : closesocket(sock);
577 0 : sock = INVALID_SOCKET;
578 0 : continue;
579 : }
580 323 : if ((e = bind(sock, rp->ai_addr, (SOCKLEN) rp->ai_addrlen)) != 0) {
581 : /* return value of 1 is currently undocumented, but
582 : * seems to occur when binding a port to an IPv4 socket
583 : * when the same port is already bound to an IPv6 socket
584 : * that already also listens to IPv4; in this case the
585 : * port that is actually bound to here is a different
586 : * one, and we don't want that, so we close the socket
587 : * without error (if bind returned SOCKET_ERROR, we do
588 : * report the error) */
589 0 : if (e == SOCKET_ERROR) {
590 : #ifdef _MSC_VER
591 : e = WSAGetLastError();
592 : #else
593 0 : e = errno;
594 : #endif
595 0 : } else if (nsock == 0) {
596 0 : assert(e == 1);
597 : e = 0;
598 : }
599 0 : closesocket(sock);
600 0 : sock = INVALID_SOCKET;
601 0 : continue;
602 : }
603 323 : if (listen(sock, maxusers) == SOCKET_ERROR) {
604 : #ifdef _MSC_VER
605 : e = WSAGetLastError();
606 : #else
607 0 : e = errno;
608 : #endif
609 0 : closesocket(sock);
610 0 : sock = INVALID_SOCKET;
611 0 : continue;
612 : }
613 323 : struct sockaddr_storage addr;
614 323 : SOCKLEN addrlen = (SOCKLEN) sizeof(addr);
615 323 : if (getsockname(sock, (struct sockaddr *) &addr, &addrlen) == SOCKET_ERROR) {
616 : #ifdef _MSC_VER
617 : e = WSAGetLastError();
618 : #else
619 0 : e = errno;
620 : #endif
621 0 : closesocket(sock);
622 0 : sock = INVALID_SOCKET;
623 0 : continue;
624 : }
625 323 : if (getnameinfo((struct sockaddr *) &addr, addrlen,
626 : NULL, (SOCKLEN) 0,
627 : sport, (SOCKLEN) sizeof(sport),
628 : NI_NUMERICSERV) == 0)
629 323 : *portp = (int) strtol(sport, NULL, 10);
630 323 : sockp[nsock++] = sock;
631 323 : break;
632 : }
633 646 : freeaddrinfo(result);
634 646 : if (ipv6_vs6only == 0) {
635 323 : ipv6_vs6only = -1;
636 323 : hints.ai_family = AF_INET;
637 323 : if (listenaddr && strcmp(listenaddr, "::1") == 0)
638 0 : listenaddr = "127.0.0.1";
639 : } else
640 : break;
641 : }
642 :
643 323 : if (nsock == 0) {
644 : #ifdef _MSC_VER
645 : err = wsaerror(e);
646 : #else
647 0 : err = GDKstrerror(e, (char[128]) { 0 }, 128);
648 : #endif
649 0 : throw(IO, "mal_mapi.listen", OPERATION_FAILED ": bind to "
650 : "stream socket on address %s and port %s failed: %s",
651 0 : listenaddr ? listenaddr : hints.ai_family == AF_INET6 ? "::" : "0.0.0.0", sport, err);
652 : }
653 323 : if (host[0] == 0)
654 323 : gethostname(host, (HOSTLEN) hostlen);
655 : return NULL;
656 : }
657 :
658 : static str
659 323 : SERVERlisten(int port, const char *usockfile, int maxusers)
660 : {
661 323 : SOCKET socks[3];
662 323 : SOCKET *psock;
663 : #ifdef HAVE_SYS_UN_H
664 323 : struct sockaddr_un userver;
665 323 : char *usockfilenew = NULL;
666 : #endif
667 323 : SOCKLEN length = 0;
668 323 : MT_Id pid;
669 323 : str buf;
670 323 : char host[128] = "";
671 :
672 : /* early way out, we do not want to listen on any port when running in embedded mode */
673 323 : if (GDKgetenv_istrue("mapi_disable")) {
674 : return MAL_SUCCEED;
675 : }
676 :
677 323 : const char *listenaddr = port < 0 ? "none" : GDKgetenv("mapi_listenaddr");
678 :
679 646 : if (strNil(usockfile) || *usockfile == '\0') {
680 0 : usockfile = NULL;
681 : #ifndef HAVE_SYS_UN_H
682 : } else {
683 : throw(IO, "mal_mapi.listen",
684 : OPERATION_FAILED ": UNIX domain sockets are not supported");
685 : #endif
686 : }
687 323 : maxusers = (maxusers ? maxusers : SERVERMAXUSERS);
688 :
689 323 : if (listenaddr && strcmp(listenaddr, "none") == 0 && usockfile == NULL) {
690 0 : throw(ILLARG, "mal_mapi.listen",
691 : OPERATION_FAILED ": no port or socket file specified");
692 : }
693 :
694 323 : if (port > 65535) {
695 0 : throw(ILLARG, "mal_mapi.listen",
696 : OPERATION_FAILED ": port number should be between 0 and 65535");
697 : }
698 :
699 323 : socks[0] = socks[1] = socks[2] = INVALID_SOCKET;
700 :
701 323 : if (listenaddr == NULL || strcmp(listenaddr, "none") != 0) {
702 323 : char *msg = start_listen(socks, &port, listenaddr, host, sizeof(host),
703 : maxusers);
704 323 : if (msg != MAL_SUCCEED) {
705 0 : return msg;
706 : }
707 323 : char sport[10];
708 323 : snprintf(sport, sizeof(sport), "%d", port);
709 323 : if (GDKsetenv("mapi_port", sport) != GDK_SUCCEED) {
710 0 : for (int i = 0; i < 3; i++) {
711 0 : if (socks[i] != INVALID_SOCKET)
712 0 : closesocket(socks[i]);
713 : }
714 0 : throw(MAL, "mal_mapi.listen", GDK_EXCEPTION);
715 : }
716 : }
717 :
718 : #ifdef HAVE_SYS_UN_H
719 323 : if (usockfile) {
720 : /* prevent silent truncation, sun_path is typically around 108
721 : * chars long :/ */
722 323 : size_t ulen = strlen(usockfile);
723 323 : if (ulen >= sizeof(userver.sun_path)) {
724 0 : if (socks[0] != INVALID_SOCKET)
725 0 : closesocket(socks[0]);
726 0 : if (socks[1] != INVALID_SOCKET)
727 0 : closesocket(socks[1]);
728 0 : throw(MAL, "mal_mapi.listen",
729 : OPERATION_FAILED ": UNIX socket path too long: %s",
730 : usockfile);
731 : }
732 :
733 323 : socks[2] = socket(AF_UNIX, SOCK_STREAM
734 : #ifdef SOCK_CLOEXEC
735 : | SOCK_CLOEXEC
736 : #endif
737 : , 0);
738 323 : if (socks[2] == INVALID_SOCKET) {
739 : #ifdef _MSC_VER
740 : const char *err = wsaerror(WSAGetLastError());
741 : #else
742 0 : const char *err = GDKstrerror(errno, (char[128]) { 0 }, 128);
743 : #endif
744 0 : if (socks[0] != INVALID_SOCKET)
745 0 : closesocket(socks[0]);
746 0 : if (socks[1] != INVALID_SOCKET)
747 0 : closesocket(socks[1]);
748 0 : throw(IO, "mal_mapi.listen",
749 : OPERATION_FAILED ": creation of UNIX socket failed: %s", err);
750 : }
751 : #if !defined(SOCK_CLOEXEC) && defined(HAVE_FCNTL)
752 : (void) fcntl(socks[2], F_SETFD, FD_CLOEXEC);
753 : #endif
754 :
755 323 : userver.sun_family = AF_UNIX;
756 323 : const char *p;
757 323 : if ((p = strstr(usockfile, "${PORT}")) != NULL) {
758 323 : usockfilenew = GDKmalloc(ulen + 1);
759 : /* note, "${PORT}" is longer than the longest possible decimal
760 : * representation of a port number ("65535") */
761 323 : if (usockfilenew) {
762 323 : snprintf(usockfilenew, ulen + 1,
763 323 : "%.*s%d%s", (int) (p - usockfile), usockfile,
764 : port < 0 ? 0 : port, p + 7);
765 323 : usockfile = usockfilenew;
766 323 : ulen = strlen(usockfile);
767 : }
768 : }
769 323 : memcpy(userver.sun_path, usockfile, ulen + 1);
770 323 : length = (SOCKLEN) sizeof(userver);
771 323 : if (MT_remove(usockfile) == -1 && errno != ENOENT) {
772 0 : char *e = createException(IO, "mal_mapi.listen",
773 : OPERATION_FAILED
774 : ": remove UNIX socket file: %s",
775 0 : GDKstrerror(errno, (char[128]) { 0 }, 128));
776 0 : if (socks[0] != INVALID_SOCKET)
777 0 : closesocket(socks[0]);
778 0 : if (socks[1] != INVALID_SOCKET)
779 0 : closesocket(socks[1]);
780 0 : closesocket(socks[2]);
781 0 : if (usockfilenew)
782 0 : GDKfree(usockfilenew);
783 0 : return e;
784 : }
785 323 : if (bind(socks[2], (struct sockaddr *) &userver, length) == SOCKET_ERROR) {
786 : #ifdef _MSC_VER
787 : const char *err = wsaerror(WSAGetLastError());
788 : #else
789 0 : const char *err = GDKstrerror(errno, (char[128]) { 0 }, 128);
790 : #endif
791 0 : if (socks[0] != INVALID_SOCKET)
792 0 : closesocket(socks[0]);
793 0 : if (socks[1] != INVALID_SOCKET)
794 0 : closesocket(socks[1]);
795 0 : closesocket(socks[2]);
796 0 : (void) MT_remove(usockfile);
797 0 : buf = createException(IO, "mal_mapi.listen",
798 : OPERATION_FAILED
799 : ": binding to UNIX socket file %s failed: %s",
800 : usockfile, err);
801 0 : if (usockfilenew)
802 0 : GDKfree(usockfilenew);
803 0 : return buf;
804 : }
805 323 : if (listen(socks[2], maxusers) == SOCKET_ERROR) {
806 : #ifdef _MSC_VER
807 : const char *err = wsaerror(WSAGetLastError());
808 : #else
809 0 : const char *err = GDKstrerror(errno, (char[128]) { 0 }, 128);
810 : #endif
811 0 : if (socks[0] != INVALID_SOCKET)
812 0 : closesocket(socks[0]);
813 0 : if (socks[1] != INVALID_SOCKET)
814 0 : closesocket(socks[1]);
815 0 : closesocket(socks[2]);
816 0 : (void) MT_remove(usockfile);
817 0 : buf = createException(IO, "mal_mapi.listen",
818 : OPERATION_FAILED
819 : ": setting UNIX socket file %s to listen failed: %s",
820 : usockfile, err);
821 0 : if (usockfilenew)
822 0 : GDKfree(usockfilenew);
823 0 : return buf;
824 : }
825 323 : if (GDKsetenv("mapi_usock", usockfile) != GDK_SUCCEED) {
826 0 : for (int i = 0; i < 3; i++) {
827 0 : if (socks[i] != INVALID_SOCKET)
828 0 : closesocket(socks[i]);
829 : }
830 0 : throw(MAL, "mal_mapi.listen", GDK_EXCEPTION);
831 : }
832 : }
833 : #endif
834 :
835 : /* seed the randomiser such that our challenges aren't
836 : * predictable... */
837 323 : srand((unsigned int) GDKusec());
838 :
839 323 : psock = GDKmalloc(sizeof(socks));
840 323 : if (psock == NULL) {
841 0 : for (int i = 0; i < 3; i++) {
842 0 : if (socks[i] != INVALID_SOCKET)
843 0 : closesocket(socks[i]);
844 : }
845 0 : throw(MAL, "mal_mapi.listen", SQLSTATE(HY013) MAL_MALLOC_FAIL);
846 : }
847 323 : memcpy(psock, socks, sizeof(socks));
848 323 : if (MT_create_thread(&pid, (void (*)(void *)) SERVERlistenThread, psock,
849 : MT_THR_DETACHED, "listenThread") != 0) {
850 0 : for (int i = 0; i < 3; i++) {
851 0 : if (socks[i] != INVALID_SOCKET)
852 0 : closesocket(socks[i]);
853 : }
854 0 : GDKfree(psock);
855 0 : throw(MAL, "mal_mapi.listen",
856 : OPERATION_FAILED ": starting thread failed");
857 : }
858 :
859 323 : TRC_DEBUG(MAL_SERVER, "Ready to accept connections on: %s:%d\n", host,
860 : port);
861 :
862 323 : if (socks[0] != INVALID_SOCKET || socks[1] != INVALID_SOCKET) {
863 323 : if (!GDKinmemory(0) && (buf = msab_marchConnection(host, port)) != NULL)
864 0 : free(buf);
865 : else
866 : /* announce that we're now reachable */
867 323 : printf("# Listening for connection requests on "
868 : "mapi:monetdb://%s:%i/\n", host, port);
869 : }
870 : #ifdef HAVE_SYS_UN_H
871 323 : if (socks[2] != INVALID_SOCKET) {
872 323 : if (!GDKinmemory(0)
873 323 : && (buf = msab_marchConnection(usockfile, 0)) != NULL)
874 0 : free(buf);
875 : else
876 : /* announce that we're now reachable */
877 323 : printf("# Listening for UNIX domain connection requests on "
878 : "mapi:monetdb://%s\n", usockfile);
879 : }
880 323 : if (usockfilenew)
881 323 : GDKfree(usockfilenew);
882 : #endif
883 :
884 323 : fflush(stdout);
885 :
886 323 : return MAL_SUCCEED;
887 : }
888 :
889 : /*
890 : * @- Wrappers
891 : * The MonetDB Version 5 wrappers are collected here
892 : * The latest port known to gain access is stored
893 : * in the database, so that others can more easily
894 : * be notified.
895 : */
896 : static str
897 323 : MAPIprelude(void)
898 : {
899 323 : int port = MAPI_PORT;
900 323 : const char *p = GDKgetenv("mapi_port");
901 :
902 323 : if (p)
903 323 : port = (int) strtol(p, NULL, 10);
904 323 : p = GDKgetenv("mapi_usock");
905 323 : return SERVERlisten(port, p, SERVERMAXUSERS);
906 : }
907 :
908 : static str
909 0 : SERVERlisten_default(int *ret)
910 : {
911 0 : (void) ret;
912 0 : return MAPIprelude();
913 : }
914 :
915 : static str
916 0 : SERVERlisten_usock(int *ret, str *usock)
917 : {
918 0 : (void) ret;
919 0 : return SERVERlisten(-1, usock ? *usock : NULL, SERVERMAXUSERS);
920 : }
921 :
922 : static str
923 0 : SERVERlisten_port(int *ret, int *pid)
924 : {
925 0 : (void) ret;
926 0 : return SERVERlisten(*pid, NULL, SERVERMAXUSERS);
927 : }
928 :
929 : /*
930 : * The internet connection listener may be terminated from the server console,
931 : * or temporarily suspended to enable system maintenance.
932 : * It is advisable to trace the interactions of clients on the server
933 : * side. At least as far as it concerns requests received.
934 : * The kernel supports this 'spying' behavior with a file descriptor
935 : * field in the client record.
936 : */
937 :
938 : static str
939 0 : SERVERstop(void *ret)
940 : {
941 0 : TRC_INFO(MAL_SERVER, "Server stop\n");
942 0 : ATOMIC_SET(&serverexiting, 1);
943 : /* wait until they all exited, but skip the wait if the whole
944 : * system is going down */
945 0 : while (ATOMIC_GET(&nlistener) > 0 && !GDKexiting())
946 0 : MT_sleep_ms(100);
947 0 : (void) ret; /* fool compiler */
948 0 : return MAL_SUCCEED;
949 : }
950 :
951 :
952 : static str
953 0 : SERVERsuspend(void *res)
954 : {
955 0 : (void) res;
956 0 : ATOMIC_SET(&serveractive, 0);
957 0 : return MAL_SUCCEED;
958 : }
959 :
960 : static str
961 0 : SERVERresume(void *res)
962 : {
963 0 : ATOMIC_SET(&serveractive, 1);
964 0 : (void) res;
965 0 : return MAL_SUCCEED;
966 : }
967 :
968 : static str
969 0 : SERVERclient(void *res, const Stream *In, const Stream *Out)
970 : {
971 0 : struct challengedata *data;
972 0 : MT_Id tid;
973 :
974 0 : (void) res;
975 : /* in embedded mode we allow just one client */
976 0 : data = GDKmalloc(sizeof(*data));
977 0 : if (data == NULL)
978 0 : throw(MAL, "mapi.SERVERclient", SQLSTATE(HY013) MAL_MALLOC_FAIL);
979 0 : data->in = block_stream(*In);
980 0 : data->out = block_stream(*Out);
981 0 : if (data->in == NULL || data->out == NULL) {
982 0 : mnstr_destroy(data->in);
983 0 : mnstr_destroy(data->out);
984 0 : GDKfree(data);
985 0 : throw(MAL, "mapi.SERVERclient", SQLSTATE(HY013) MAL_MALLOC_FAIL);
986 : }
987 :
988 : /* generate the challenge string */
989 0 : generateChallenge(data->challenge, 8, 12);
990 :
991 0 : if (MT_create_thread(&tid, doChallenge, data, MT_THR_DETACHED, "clientXXXX") < 0) {
992 0 : mnstr_destroy(data->in);
993 0 : mnstr_destroy(data->out);
994 0 : GDKfree(data);
995 0 : throw(MAL, "mapi.SERVERclient", "cannot fork new client thread");
996 : }
997 : return MAL_SUCCEED;
998 : }
999 :
1000 : /*
1001 : * @+ Remote Processing
1002 : * The remainder of the file contains the wrappers around
1003 : * the Mapi library used by application programmers.
1004 : * Details on the functions can be found there.
1005 : *
1006 : * Sessions have a lifetime different from dynamic scopes.
1007 : * This means the user should use a session identifier
1008 : * to select the correct handle.
1009 : * For the time being we use the index in the global
1010 : * session table. The client pointer is retained to
1011 : * perform access control.
1012 : *
1013 : * We use a single result set handle. All data should be
1014 : * consumed before continueing.
1015 : *
1016 : * A few extra routines should be defined to
1017 : * dump and inspect the sessions table.
1018 : *
1019 : * The remote site may return a single error
1020 : * with a series of error lines. These contain
1021 : * then a starting !. They are all stripped here.
1022 : */
1023 : #define catchErrors(fcn) \
1024 : do { \
1025 : int rn = mapi_error(mid); \
1026 : if ((rn == -4 && hdl && mapi_result_error(hdl)) || rn) { \
1027 : const char *err, *e; \
1028 : str newerr; \
1029 : str ret; \
1030 : size_t l; \
1031 : char *f; \
1032 : \
1033 : if (hdl && mapi_result_error(hdl)) \
1034 : err = mapi_result_error(hdl); \
1035 : else \
1036 : err = mapi_result_error(SERVERsessions[i].hdl); \
1037 : \
1038 : if (err == NULL) \
1039 : err = "(no additional error message)"; \
1040 : \
1041 : l = 2 * strlen(err) + 8192; \
1042 : newerr = (str) GDKmalloc(l); \
1043 : if(newerr == NULL) { err = SQLSTATE(HY013) MAL_MALLOC_FAIL; break;} \
1044 : \
1045 : f = newerr; \
1046 : /* I think this code tries to deal with multiple errors, this \
1047 : * will fail this way if it does, since no ! is in the error \
1048 : * string, only newlines to separate them */ \
1049 : for (e = err; *e && l > 1; e++) { \
1050 : if (*e == '!' && *(e - 1) == '\n') { \
1051 : snprintf(f, l, "MALException:" fcn ":remote error:"); \
1052 : l -= strlen(f); \
1053 : while (*f) \
1054 : f++; \
1055 : } else { \
1056 : *f++ = *e; \
1057 : l--; \
1058 : } \
1059 : } \
1060 : \
1061 : *f = 0; \
1062 : ret = createException(MAL, fcn, \
1063 : OPERATION_FAILED ": remote error: %s", \
1064 : newerr); \
1065 : GDKfree(newerr); \
1066 : return ret; \
1067 : } \
1068 : } while (0)
1069 :
1070 : #define MAXSESSIONS 32
1071 : struct {
1072 : int key;
1073 : str dbalias; /* logical name of the session */
1074 : Client c;
1075 : Mapi mid; /* communication channel */
1076 : MapiHdl hdl; /* result set handle */
1077 : } SERVERsessions[MAXSESSIONS];
1078 :
1079 : static int sessionkey = 0;
1080 :
1081 : /* #define MAPI_TEST*/
1082 :
1083 : static str
1084 6 : SERVERconnectAll(Client cntxt, int *key, str *host, int *port, str *username,
1085 : str *password, str *lang)
1086 : {
1087 6 : Mapi mid;
1088 6 : int i;
1089 :
1090 6 : MT_lock_set(&mal_contextLock);
1091 12 : for (i = 1; i < MAXSESSIONS; i++)
1092 6 : if (SERVERsessions[i].c == 0)
1093 : break;
1094 :
1095 6 : if (i == MAXSESSIONS) {
1096 0 : MT_lock_unset(&mal_contextLock);
1097 0 : throw(IO, "mapi.connect", OPERATION_FAILED ": too many sessions");
1098 : }
1099 6 : SERVERsessions[i].c = cntxt;
1100 6 : SERVERsessions[i].key = ++sessionkey;
1101 6 : MT_lock_unset(&mal_contextLock);
1102 :
1103 6 : mid = mapi_connect(*host, *port, *username, *password, *lang, NULL);
1104 :
1105 6 : if (mid == NULL)
1106 0 : throw(IO, "mapi.connect", MAL_MALLOC_FAIL);
1107 :
1108 6 : if (mapi_error(mid)) {
1109 0 : const char *err = mapi_error_str(mid);
1110 0 : str ex;
1111 0 : if (err == NULL)
1112 0 : err = "(no reason given)";
1113 0 : if (err[0] == '!')
1114 0 : err = err +1;
1115 0 : SERVERsessions[i].c = NULL;
1116 0 : ex = createException(IO, "mapi.connect", "Could not connect: %s", err);
1117 0 : mapi_destroy(mid);
1118 0 : return (ex);
1119 : }
1120 :
1121 : #ifdef MAPI_TEST
1122 : mnstr_printf(SERVERsessions[i].c->fdout,
1123 : "Succeeded to establish session\n");
1124 : #endif
1125 6 : SERVERsessions[i].mid = mid;
1126 6 : *key = SERVERsessions[i].key;
1127 6 : return MAL_SUCCEED;
1128 : }
1129 :
1130 : static str
1131 0 : SERVERdisconnectALL(int *key)
1132 : {
1133 0 : int i;
1134 :
1135 0 : MT_lock_set(&mal_contextLock);
1136 :
1137 0 : for (i = 1; i < MAXSESSIONS; i++)
1138 0 : if (SERVERsessions[i].c != 0) {
1139 : #ifdef MAPI_TEST
1140 : mnstr_printf(SERVERsessions[i].c->fdout, "Close session %d\n", i);
1141 : #endif
1142 0 : SERVERsessions[i].c = 0;
1143 0 : if (SERVERsessions[i].dbalias)
1144 0 : GDKfree(SERVERsessions[i].dbalias);
1145 0 : SERVERsessions[i].dbalias = NULL;
1146 0 : *key = SERVERsessions[i].key;
1147 0 : if (SERVERsessions[i].hdl)
1148 0 : mapi_close_handle(SERVERsessions[i].hdl);
1149 0 : SERVERsessions[i].hdl = NULL;
1150 0 : mapi_disconnect(SERVERsessions[i].mid);
1151 : }
1152 :
1153 0 : MT_lock_unset(&mal_contextLock);
1154 :
1155 0 : return MAL_SUCCEED;
1156 : }
1157 :
1158 : static str
1159 0 : SERVERdisconnectWithAlias(int *key, str *dbalias)
1160 : {
1161 0 : int i;
1162 :
1163 0 : MT_lock_set(&mal_contextLock);
1164 :
1165 0 : for (i = 0; i < MAXSESSIONS; i++)
1166 0 : if (SERVERsessions[i].dbalias &&
1167 0 : strcmp(SERVERsessions[i].dbalias, *dbalias) == 0) {
1168 0 : SERVERsessions[i].c = 0;
1169 0 : if (SERVERsessions[i].dbalias)
1170 0 : GDKfree(SERVERsessions[i].dbalias);
1171 0 : SERVERsessions[i].dbalias = NULL;
1172 0 : *key = SERVERsessions[i].key;
1173 0 : if (SERVERsessions[i].hdl)
1174 0 : mapi_close_handle(SERVERsessions[i].hdl);
1175 0 : SERVERsessions[i].hdl = NULL;
1176 0 : mapi_disconnect(SERVERsessions[i].mid);
1177 0 : break;
1178 : }
1179 :
1180 0 : if (i == MAXSESSIONS) {
1181 0 : MT_lock_unset(&mal_contextLock);
1182 0 : throw(IO, "mapi.disconnect",
1183 : "Impossible to close session for db_alias: '%s'", *dbalias);
1184 : }
1185 :
1186 0 : MT_lock_unset(&mal_contextLock);
1187 0 : return MAL_SUCCEED;
1188 : }
1189 :
1190 : static str
1191 1 : SERVERconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1192 : {
1193 1 : int *key = getArgReference_int(stk, pci, 0);
1194 1 : str *host = getArgReference_str(stk, pci, 1);
1195 1 : int *port = getArgReference_int(stk, pci, 2);
1196 1 : str *username = getArgReference_str(stk, pci, 3);
1197 1 : str *password = getArgReference_str(stk, pci, 4);
1198 1 : str *lang = getArgReference_str(stk, pci, 5);
1199 :
1200 1 : (void) mb;
1201 1 : return SERVERconnectAll(cntxt, key, host, port, username, password, lang);
1202 : }
1203 :
1204 :
1205 : static str
1206 5 : SERVERreconnectAlias(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1207 : {
1208 5 : int *key = getArgReference_int(stk, pci, 0);
1209 5 : str *host = getArgReference_str(stk, pci, 1);
1210 5 : int *port = getArgReference_int(stk, pci, 2);
1211 5 : str *dbalias = getArgReference_str(stk, pci, 3);
1212 5 : str *username = getArgReference_str(stk, pci, 4);
1213 5 : str *password = getArgReference_str(stk, pci, 5);
1214 5 : str *lang = getArgReference_str(stk, pci, 6);
1215 5 : int i;
1216 5 : str msg = MAL_SUCCEED;
1217 :
1218 5 : (void) mb;
1219 :
1220 165 : for (i = 0; i < MAXSESSIONS; i++)
1221 160 : if (SERVERsessions[i].key &&
1222 5 : SERVERsessions[i].dbalias &&
1223 0 : strcmp(SERVERsessions[i].dbalias, *dbalias) == 0) {
1224 0 : *key = SERVERsessions[i].key;
1225 0 : return msg;
1226 : }
1227 :
1228 5 : msg = SERVERconnectAll(cntxt, key, host, port, username, password, lang);
1229 5 : if (msg == MAL_SUCCEED)
1230 5 : msg = SERVERsetAlias(NULL, key, dbalias);
1231 : return msg;
1232 : }
1233 :
1234 : static str
1235 0 : SERVERreconnectWithoutAlias(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
1236 : InstrPtr pci)
1237 : {
1238 0 : int *key = getArgReference_int(stk, pci, 0);
1239 0 : str *host = getArgReference_str(stk, pci, 1);
1240 0 : int *port = getArgReference_int(stk, pci, 2);
1241 0 : str *username = getArgReference_str(stk, pci, 3);
1242 0 : str *password = getArgReference_str(stk, pci, 4);
1243 0 : str *lang = getArgReference_str(stk, pci, 5);
1244 0 : int i;
1245 0 : str msg = MAL_SUCCEED, nme = "anonymous";
1246 :
1247 0 : (void) mb;
1248 :
1249 0 : for (i = 0; i < MAXSESSIONS; i++)
1250 0 : if (SERVERsessions[i].key) {
1251 0 : *key = SERVERsessions[i].key;
1252 0 : return msg;
1253 : }
1254 :
1255 0 : msg = SERVERconnectAll(cntxt, key, host, port, username, password, lang);
1256 0 : if (msg == MAL_SUCCEED)
1257 0 : msg = SERVERsetAlias(NULL, key, &nme);
1258 : return msg;
1259 : }
1260 :
1261 : #define accessTest(val, fcn) \
1262 : do { \
1263 : for(i=0; i< MAXSESSIONS; i++) \
1264 : if( SERVERsessions[i].c && \
1265 : SERVERsessions[i].key== (val)) break; \
1266 : if( i== MAXSESSIONS) \
1267 : throw(MAL, "mapi." fcn, "Access violation," \
1268 : " could not find matching session descriptor"); \
1269 : mid= SERVERsessions[i].mid; \
1270 : (void) mid; /* silence compilers */ \
1271 : } while (0)
1272 :
1273 : static str
1274 5 : SERVERsetAlias(void *ret, int *key, str *dbalias)
1275 : {
1276 5 : int i;
1277 5 : Mapi mid;
1278 10 : accessTest(*key, "setAlias");
1279 5 : SERVERsessions[i].dbalias = GDKstrdup(*dbalias);
1280 5 : if (SERVERsessions[i].dbalias == NULL)
1281 0 : throw(MAL, "mapi.set_alias", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1282 : (void) ret;
1283 : return MAL_SUCCEED;
1284 : }
1285 :
1286 : static str
1287 0 : SERVERlookup(int *ret, str *dbalias)
1288 : {
1289 0 : int i;
1290 0 : for (i = 0; i < MAXSESSIONS; i++)
1291 0 : if (SERVERsessions[i].dbalias &&
1292 0 : strcmp(SERVERsessions[i].dbalias, *dbalias) == 0) {
1293 0 : *ret = SERVERsessions[i].key;
1294 0 : return MAL_SUCCEED;
1295 : }
1296 0 : throw(MAL, "mapi.lookup", "Could not find database connection");
1297 : }
1298 :
1299 : static str
1300 0 : SERVERtrace(void *ret, int *key, int *flag)
1301 : {
1302 0 : (void) ret;
1303 0 : mapi_trace(SERVERsessions[*key].mid, (bool) *flag);
1304 0 : return MAL_SUCCEED;
1305 : }
1306 :
1307 : static str
1308 0 : SERVERdisconnect(void *ret, int *key)
1309 : {
1310 0 : int i;
1311 0 : Mapi mid;
1312 0 : (void) ret;
1313 0 : accessTest(*key, "disconnect");
1314 0 : if (SERVERsessions[i].hdl)
1315 0 : mapi_close_handle(SERVERsessions[i].hdl);
1316 0 : SERVERsessions[i].hdl = NULL;
1317 0 : mapi_disconnect(mid);
1318 0 : if (SERVERsessions[i].dbalias)
1319 0 : GDKfree(SERVERsessions[i].dbalias);
1320 0 : SERVERsessions[i].c = 0;
1321 0 : SERVERsessions[i].dbalias = 0;
1322 0 : return MAL_SUCCEED;
1323 : }
1324 :
1325 : static str
1326 6 : SERVERdestroy(void *ret, int *key)
1327 : {
1328 6 : int i;
1329 6 : Mapi mid;
1330 6 : (void) ret;
1331 12 : accessTest(*key, "destroy");
1332 6 : if (SERVERsessions[i].hdl)
1333 4 : mapi_close_handle(SERVERsessions[i].hdl);
1334 6 : SERVERsessions[i].hdl = NULL;
1335 6 : mapi_disconnect(mid);
1336 6 : mapi_destroy(mid);
1337 6 : SERVERsessions[i].c = 0;
1338 6 : if (SERVERsessions[i].dbalias)
1339 5 : GDKfree(SERVERsessions[i].dbalias);
1340 6 : SERVERsessions[i].dbalias = 0;
1341 6 : return MAL_SUCCEED;
1342 : }
1343 :
1344 : static str
1345 0 : SERVERreconnect(void *ret, int *key)
1346 : {
1347 0 : int i;
1348 0 : Mapi mid;
1349 0 : (void) ret;
1350 0 : accessTest(*key, "destroy");
1351 0 : if (SERVERsessions[i].hdl)
1352 0 : mapi_close_handle(SERVERsessions[i].hdl);
1353 0 : SERVERsessions[i].hdl = NULL;
1354 0 : mapi_reconnect(mid);
1355 0 : return MAL_SUCCEED;
1356 : }
1357 :
1358 : static str
1359 0 : SERVERping(int *ret, int *key)
1360 : {
1361 0 : int i;
1362 0 : Mapi mid;
1363 0 : accessTest(*key, "destroy");
1364 0 : *ret = mapi_ping(mid);
1365 0 : return MAL_SUCCEED;
1366 : }
1367 :
1368 : static str
1369 23 : SERVERquery(int *ret, int *key, str *qry)
1370 : {
1371 23 : Mapi mid;
1372 23 : MapiHdl hdl = 0;
1373 23 : int i;
1374 46 : accessTest(*key, "query");
1375 23 : if (SERVERsessions[i].hdl)
1376 19 : mapi_close_handle(SERVERsessions[i].hdl);
1377 23 : SERVERsessions[i].hdl = mapi_query(mid, *qry);
1378 23 : catchErrors("mapi.query");
1379 23 : *ret = *key;
1380 23 : return MAL_SUCCEED;
1381 : }
1382 :
1383 : static str
1384 0 : SERVERquery_handle(int *ret, int *key, str *qry)
1385 : {
1386 0 : Mapi mid;
1387 0 : MapiHdl hdl = 0;
1388 0 : int i;
1389 0 : accessTest(*key, "query_handle");
1390 0 : mapi_query_handle(SERVERsessions[i].hdl, *qry);
1391 0 : catchErrors("mapi.query_handle");
1392 0 : *ret = *key;
1393 0 : return MAL_SUCCEED;
1394 : }
1395 :
1396 : static str
1397 0 : SERVERquery_array(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pc)
1398 : {
1399 0 : (void) cntxt, (void) mb;
1400 0 : (void) stk;
1401 0 : (void) pc;
1402 0 : throw(MAL, "mapi.query_array", SQLSTATE(0 A000) PROGRAM_NYI);
1403 : }
1404 :
1405 : static str
1406 0 : SERVERprepare(int *ret, int *key, str *qry)
1407 : {
1408 0 : Mapi mid;
1409 0 : int i;
1410 0 : accessTest(*key, "prepare");
1411 0 : if (SERVERsessions[i].hdl)
1412 0 : mapi_close_handle(SERVERsessions[i].hdl);
1413 0 : SERVERsessions[i].hdl = mapi_prepare(mid, *qry);
1414 0 : if (mapi_error(mid))
1415 0 : throw(MAL, "mapi.prepare", "%s",
1416 : mapi_result_error(SERVERsessions[i].hdl));
1417 0 : *ret = *key;
1418 0 : return MAL_SUCCEED;
1419 : }
1420 :
1421 : static str
1422 0 : SERVERfinish(int *ret, int *key)
1423 : {
1424 0 : Mapi mid;
1425 0 : int i;
1426 0 : accessTest(*key, "finish");
1427 0 : mapi_finish(SERVERsessions[i].hdl);
1428 0 : if (mapi_error(mid))
1429 0 : throw(MAL, "mapi.finish", "%s",
1430 : mapi_result_error(SERVERsessions[i].hdl));
1431 0 : *ret = *key;
1432 0 : return MAL_SUCCEED;
1433 : }
1434 :
1435 : static str
1436 1 : SERVERget_row_count(lng *ret, int *key)
1437 : {
1438 1 : Mapi mid;
1439 1 : int i;
1440 2 : accessTest(*key, "get_row_count");
1441 1 : *ret = (lng) mapi_get_row_count(SERVERsessions[i].hdl);
1442 1 : if (mapi_error(mid))
1443 0 : throw(MAL, "mapi.get_row_count", "%s",
1444 : mapi_result_error(SERVERsessions[i].hdl));
1445 : return MAL_SUCCEED;
1446 : }
1447 :
1448 : static str
1449 1 : SERVERget_field_count(int *ret, int *key)
1450 : {
1451 1 : Mapi mid;
1452 1 : int i;
1453 2 : accessTest(*key, "get_field_count");
1454 1 : *ret = mapi_get_field_count(SERVERsessions[i].hdl);
1455 1 : if (mapi_error(mid))
1456 0 : throw(MAL, "mapi.get_field_count", "%s",
1457 : mapi_result_error(SERVERsessions[i].hdl));
1458 : return MAL_SUCCEED;
1459 : }
1460 :
1461 : static str
1462 0 : SERVERrows_affected(lng *ret, int *key)
1463 : {
1464 0 : Mapi mid;
1465 0 : int i;
1466 0 : accessTest(*key, "rows_affected");
1467 0 : *ret = (lng) mapi_rows_affected(SERVERsessions[i].hdl);
1468 0 : return MAL_SUCCEED;
1469 : }
1470 :
1471 : static str
1472 1 : SERVERfetch_row(int *ret, int *key)
1473 : {
1474 1 : Mapi mid;
1475 1 : int i;
1476 2 : accessTest(*key, "fetch_row");
1477 1 : *ret = mapi_fetch_row(SERVERsessions[i].hdl);
1478 1 : return MAL_SUCCEED;
1479 : }
1480 :
1481 : static str
1482 0 : SERVERfetch_all_rows(lng *ret, int *key)
1483 : {
1484 0 : Mapi mid;
1485 0 : int i;
1486 0 : accessTest(*key, "fetch_all_rows");
1487 0 : *ret = (lng) mapi_fetch_all_rows(SERVERsessions[i].hdl);
1488 0 : return MAL_SUCCEED;
1489 : }
1490 :
1491 : static str
1492 1 : SERVERfetch_field_str(str *ret, int *key, int *fnr)
1493 : {
1494 1 : Mapi mid;
1495 1 : int i;
1496 1 : str fld;
1497 2 : accessTest(*key, "fetch_field");
1498 1 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1499 1 : *ret = GDKstrdup(fld ? fld : str_nil);
1500 1 : if (*ret == NULL)
1501 0 : throw(MAL, "mapi.fetch_field_str", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1502 1 : if (mapi_error(mid))
1503 0 : throw(MAL, "mapi.fetch_field_str", "%s",
1504 : mapi_result_error(SERVERsessions[i].hdl));
1505 : return MAL_SUCCEED;
1506 : }
1507 :
1508 : static str
1509 1 : SERVERfetch_field_int(int *ret, int *key, int *fnr)
1510 : {
1511 1 : Mapi mid;
1512 1 : int i;
1513 1 : str fld;
1514 2 : accessTest(*key, "fetch_field");
1515 1 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1516 2 : *ret = fld ? (int) atol(fld) : int_nil;
1517 1 : if (mapi_error(mid))
1518 0 : throw(MAL, "mapi.fetch_field_int", "%s",
1519 : mapi_result_error(SERVERsessions[i].hdl));
1520 : return MAL_SUCCEED;
1521 : }
1522 :
1523 : static str
1524 0 : SERVERfetch_field_lng(lng *ret, int *key, int *fnr)
1525 : {
1526 0 : Mapi mid;
1527 0 : int i;
1528 0 : str fld;
1529 0 : accessTest(*key, "fetch_field");
1530 0 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1531 0 : *ret = fld ? atol(fld) : lng_nil;
1532 0 : if (mapi_error(mid))
1533 0 : throw(MAL, "mapi.fetch_field_lng", "%s",
1534 : mapi_result_error(SERVERsessions[i].hdl));
1535 : return MAL_SUCCEED;
1536 : }
1537 :
1538 : #ifdef HAVE_HGE
1539 : static str
1540 0 : SERVERfetch_field_hge(hge *ret, int *key, int *fnr)
1541 : {
1542 0 : Mapi mid;
1543 0 : int i;
1544 0 : str fld;
1545 0 : accessTest(*key, "fetch_field");
1546 0 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1547 0 : *ret = fld ? atol(fld) : hge_nil;
1548 0 : if (mapi_error(mid))
1549 0 : throw(MAL, "mapi.fetch_field_hge", "%s",
1550 : mapi_result_error(SERVERsessions[i].hdl));
1551 : return MAL_SUCCEED;
1552 : }
1553 : #endif
1554 :
1555 : static str
1556 0 : SERVERfetch_field_sht(sht *ret, int *key, int *fnr)
1557 : {
1558 0 : Mapi mid;
1559 0 : int i;
1560 0 : str fld;
1561 0 : accessTest(*key, "fetch_field");
1562 0 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1563 0 : *ret = fld ? (sht) atol(fld) : sht_nil;
1564 0 : if (mapi_error(mid))
1565 0 : throw(MAL, "mapi.fetch_field", "%s",
1566 : mapi_result_error(SERVERsessions[i].hdl));
1567 : return MAL_SUCCEED;
1568 : }
1569 :
1570 : static str
1571 0 : SERVERfetch_field_void(void *ret, int *key, int *fnr)
1572 : {
1573 0 : Mapi mid;
1574 0 : int i;
1575 0 : (void) ret;
1576 0 : (void) fnr;
1577 0 : accessTest(*key, "fetch_field");
1578 0 : throw(MAL, "mapi.fetch_field_void", "defaults to nil");
1579 : }
1580 :
1581 : static str
1582 0 : SERVERfetch_field_oid(oid *ret, int *key, int *fnr)
1583 : {
1584 0 : Mapi mid;
1585 0 : int i;
1586 0 : str fld;
1587 0 : accessTest(*key, "fetch_field");
1588 0 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1589 0 : if (mapi_error(mid))
1590 0 : throw(MAL, "mapi.fetch_field_oid", "%s",
1591 : mapi_result_error(SERVERsessions[i].hdl));
1592 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1593 0 : *(oid *) ret = void_nil;
1594 : else
1595 0 : *(oid *) ret = (oid) atol(fld);
1596 : return MAL_SUCCEED;
1597 : }
1598 :
1599 : static str
1600 0 : SERVERfetch_field_bte(bte *ret, int *key, int *fnr)
1601 : {
1602 0 : Mapi mid;
1603 0 : int i;
1604 0 : str fld;
1605 0 : accessTest(*key, "fetch_field");
1606 0 : fld = mapi_fetch_field(SERVERsessions[i].hdl, *fnr);
1607 0 : if (mapi_error(mid))
1608 0 : throw(MAL, "mapi.fetch_field_bte", "%s",
1609 : mapi_result_error(SERVERsessions[i].hdl));
1610 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1611 0 : *(bte *) ret = bte_nil;
1612 : else
1613 0 : *(bte *) ret = *fld;
1614 : return MAL_SUCCEED;
1615 : }
1616 :
1617 : static str
1618 0 : SERVERfetch_line(str *ret, int *key)
1619 : {
1620 0 : Mapi mid;
1621 0 : int i;
1622 0 : str fld;
1623 0 : accessTest(*key, "fetch_line");
1624 0 : fld = mapi_fetch_line(SERVERsessions[i].hdl);
1625 0 : if (mapi_error(mid))
1626 0 : throw(MAL, "mapi.fetch_line", "%s",
1627 : mapi_result_error(SERVERsessions[i].hdl));
1628 0 : *ret = GDKstrdup(fld ? fld : str_nil);
1629 0 : if (*ret == NULL)
1630 0 : throw(MAL, "mapi.fetch_line", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1631 : return MAL_SUCCEED;
1632 : }
1633 :
1634 : static str
1635 0 : SERVERnext_result(int *ret, int *key)
1636 : {
1637 0 : Mapi mid;
1638 0 : int i;
1639 0 : accessTest(*key, "next_result");
1640 0 : mapi_next_result(SERVERsessions[i].hdl);
1641 0 : if (mapi_error(mid))
1642 0 : throw(MAL, "mapi.next_result", "%s",
1643 : mapi_result_error(SERVERsessions[i].hdl));
1644 0 : *ret = *key;
1645 0 : return MAL_SUCCEED;
1646 : }
1647 :
1648 : static str
1649 0 : SERVERfetch_reset(int *ret, int *key)
1650 : {
1651 0 : Mapi mid;
1652 0 : int i;
1653 0 : accessTest(*key, "fetch_reset");
1654 0 : mapi_fetch_reset(SERVERsessions[i].hdl);
1655 0 : if (mapi_error(mid))
1656 0 : throw(MAL, "mapi.fetch_reset", "%s",
1657 : mapi_result_error(SERVERsessions[i].hdl));
1658 0 : *ret = *key;
1659 0 : return MAL_SUCCEED;
1660 : }
1661 :
1662 : static str
1663 0 : SERVERfetch_field_bat(bat *bid, int *key)
1664 : {
1665 0 : int i, j, cnt;
1666 0 : Mapi mid;
1667 0 : char *fld;
1668 0 : BAT *b;
1669 :
1670 0 : accessTest(*key, "rpc");
1671 0 : b = COLnew(0, TYPE_str, 256, TRANSIENT);
1672 0 : if (b == NULL)
1673 0 : throw(MAL, "mapi.fetch", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1674 0 : cnt = mapi_get_field_count(SERVERsessions[i].hdl);
1675 0 : for (j = 0; j < cnt; j++) {
1676 0 : fld = mapi_fetch_field(SERVERsessions[i].hdl, j);
1677 0 : if (mapi_error(mid)) {
1678 0 : BBPreclaim(b);
1679 0 : throw(MAL, "mapi.fetch_field_bat", "%s",
1680 : mapi_result_error(SERVERsessions[i].hdl));
1681 : }
1682 0 : if (BUNappend(b, fld, false) != GDK_SUCCEED) {
1683 0 : BBPreclaim(b);
1684 0 : throw(MAL, "mapi.fetch_field_bat", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1685 : }
1686 : }
1687 0 : *bid = b->batCacheid;
1688 0 : BBPkeepref(b);
1689 0 : return MAL_SUCCEED;
1690 : }
1691 :
1692 : static str
1693 0 : SERVERerror(int *ret, int *key)
1694 : {
1695 0 : Mapi mid;
1696 0 : int i;
1697 0 : accessTest(*key, "error");
1698 0 : *ret = mapi_error(mid);
1699 0 : return MAL_SUCCEED;
1700 : }
1701 :
1702 : static str
1703 0 : SERVERgetError(str *ret, int *key)
1704 : {
1705 0 : Mapi mid;
1706 0 : int i;
1707 0 : accessTest(*key, "getError");
1708 0 : *ret = GDKstrdup(mapi_error_str(mid));
1709 0 : if (*ret == NULL)
1710 0 : throw(MAL, "mapi.get_error", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1711 : return MAL_SUCCEED;
1712 : }
1713 :
1714 : static str
1715 0 : SERVERexplain(str *ret, int *key)
1716 : {
1717 0 : Mapi mid;
1718 0 : int i;
1719 :
1720 0 : accessTest(*key, "explain");
1721 0 : *ret = GDKstrdup(mapi_error_str(mid));
1722 0 : if (*ret == NULL)
1723 0 : throw(MAL, "mapi.explain", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1724 : return MAL_SUCCEED;
1725 : }
1726 :
1727 : /*
1728 : * The remainder should contain the wrapping of
1729 : * relevant SERVER functions. Furthermore, we
1730 : * should analyse the return value and update
1731 : * the stack trace.
1732 : *
1733 : * Two routines should be
1734 : * mapi.rpc(key,"query")
1735 : *
1736 : * The generic scheme for handling a remote MAL
1737 : * procedure call with a single row answer.
1738 : */
1739 : static int
1740 18 : SERVERfieldAnalysis(str fld, int tpe, ValPtr v)
1741 : {
1742 18 : v->vtype = tpe;
1743 18 : switch (tpe) {
1744 0 : case TYPE_void:
1745 0 : v->val.oval = void_nil;
1746 0 : break;
1747 2 : case TYPE_oid:
1748 2 : if (fld == 0 || strcmp(fld, "nil") == 0)
1749 1 : v->val.oval = void_nil;
1750 : else
1751 1 : v->val.oval = (oid) atol(fld);
1752 : break;
1753 0 : case TYPE_bit:
1754 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1755 0 : v->val.btval = bit_nil;
1756 0 : else if (strcmp(fld, "true") == 0)
1757 0 : v->val.btval = TRUE;
1758 0 : else if (strcmp(fld, "false") == 0)
1759 0 : v->val.btval = FALSE;
1760 : break;
1761 0 : case TYPE_bte:
1762 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1763 0 : v->val.btval = bte_nil;
1764 : else
1765 0 : v->val.btval = *fld;
1766 : break;
1767 0 : case TYPE_sht:
1768 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1769 0 : v->val.shval = sht_nil;
1770 : else
1771 0 : v->val.shval = (sht) atol(fld);
1772 : break;
1773 12 : case TYPE_int:
1774 12 : if (fld == 0 || strcmp(fld, "nil") == 0)
1775 1 : v->val.ival = int_nil;
1776 : else
1777 11 : v->val.ival = (int) atol(fld);
1778 : break;
1779 2 : case TYPE_lng:
1780 2 : if (fld == 0 || strcmp(fld, "nil") == 0)
1781 0 : v->val.lval = lng_nil;
1782 : else
1783 2 : v->val.lval = (lng) atol(fld);
1784 : break;
1785 : #ifdef HAVE_HGE
1786 0 : case TYPE_hge:
1787 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1788 0 : v->val.hval = hge_nil;
1789 : else
1790 0 : v->val.hval = (hge) atol(fld);
1791 : break;
1792 : #endif
1793 0 : case TYPE_flt:
1794 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1795 0 : v->val.fval = flt_nil;
1796 : else
1797 0 : v->val.fval = (flt) atof(fld);
1798 : break;
1799 0 : case TYPE_dbl:
1800 0 : if (fld == 0 || strcmp(fld, "nil") == 0)
1801 0 : v->val.dval = dbl_nil;
1802 : else
1803 0 : v->val.dval = (dbl) atof(fld);
1804 : break;
1805 2 : case TYPE_str:
1806 2 : if (fld == 0 || strcmp(fld, "nil") == 0) {
1807 0 : if (VALinit(v, TYPE_str, str_nil) == NULL)
1808 : return -1;
1809 : } else {
1810 2 : if (VALinit(v, TYPE_str, fld) == NULL)
1811 : return -1;
1812 : }
1813 : break;
1814 : }
1815 : return 0;
1816 : }
1817 :
1818 : static str
1819 6 : SERVERmapi_rpc_single_row(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
1820 : InstrPtr pci)
1821 : {
1822 6 : int key, i, j;
1823 6 : Mapi mid;
1824 6 : MapiHdl hdl;
1825 6 : char *s, *fld, *qry = 0;
1826 :
1827 6 : (void) cntxt;
1828 6 : key = *getArgReference_int(stk, pci, pci->retc);
1829 12 : accessTest(key, "rpc");
1830 : #ifdef MAPI_TEST
1831 : mnstr_printf(cntxt->fdout, "about to send: %s\n", qry);
1832 : #endif
1833 : /* glue all strings together */
1834 12 : for (i = pci->retc + 1; i < pci->argc; i++) {
1835 6 : fld = *getArgReference_str(stk, pci, i);
1836 6 : if (qry == 0) {
1837 6 : qry = GDKstrdup(fld);
1838 6 : if (qry == NULL)
1839 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1840 : } else {
1841 0 : s = (char *) GDKmalloc(strlen(qry) + strlen(fld) + 1);
1842 0 : if (s == NULL) {
1843 0 : GDKfree(qry);
1844 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1845 : }
1846 0 : strcpy(s, qry);
1847 0 : strcat(s, fld);
1848 0 : GDKfree(qry);
1849 0 : qry = s;
1850 : }
1851 : }
1852 6 : hdl = mapi_query(mid, qry);
1853 6 : GDKfree(qry);
1854 6 : catchErrors("mapi.rpc");
1855 :
1856 : i = 0;
1857 12 : while (mapi_fetch_row(hdl)) {
1858 12 : for (j = 0; j < pci->retc; j++) {
1859 6 : fld = mapi_fetch_field(hdl, j);
1860 : #ifdef MAPI_TEST
1861 : mnstr_printf(cntxt->fdout, "Got: %s\n", fld);
1862 : #endif
1863 6 : switch (getVarType(mb, getArg(pci, j))) {
1864 6 : case TYPE_void:
1865 : case TYPE_oid:
1866 : case TYPE_bit:
1867 : case TYPE_bte:
1868 : case TYPE_sht:
1869 : case TYPE_int:
1870 : case TYPE_lng:
1871 : #ifdef HAVE_HGE
1872 : case TYPE_hge:
1873 : #endif
1874 : case TYPE_flt:
1875 : case TYPE_dbl:
1876 : case TYPE_str:
1877 6 : if (SERVERfieldAnalysis(fld, getVarType(mb, getArg(pci, j)),
1878 6 : &stk->stk[pci->argv[j]]) < 0) {
1879 0 : mapi_close_handle(hdl);
1880 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1881 : }
1882 6 : break;
1883 0 : default:
1884 0 : mapi_close_handle(hdl);
1885 0 : throw(MAL, "mapi.rpc", "Missing type implementation ");
1886 : /* all the other basic types come here */
1887 : }
1888 : }
1889 6 : i++;
1890 : }
1891 6 : mapi_close_handle(hdl);
1892 6 : if (i > 1)
1893 0 : throw(MAL, "mapi.rpc", "Too many answers");
1894 : return MAL_SUCCEED;
1895 : }
1896 :
1897 : /*
1898 : * Transport of the BATs is only slightly more complicated.
1899 : * The generic implementation based on a pattern is the next
1900 : * step.
1901 : */
1902 : static str
1903 5 : SERVERmapi_rpc_bat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1904 : {
1905 5 : bat *ret;
1906 5 : int *key;
1907 5 : str *qry, err = MAL_SUCCEED;
1908 5 : Mapi mid;
1909 5 : MapiHdl hdl;
1910 5 : char *fld2;
1911 5 : BAT *b;
1912 5 : ValRecord tval;
1913 5 : int i = 0, tt;
1914 :
1915 5 : (void) cntxt;
1916 5 : ret = getArgReference_bat(stk, pci, 0);
1917 5 : key = getArgReference_int(stk, pci, pci->retc);
1918 5 : qry = getArgReference_str(stk, pci, pci->retc + 1);
1919 10 : accessTest(*key, "rpc");
1920 5 : tt = getBatType(getVarType(mb, getArg(pci, 0)));
1921 :
1922 5 : hdl = mapi_query(mid, *qry);
1923 5 : catchErrors("mapi.rpc");
1924 :
1925 5 : b = COLnew(0, tt, 256, TRANSIENT);
1926 5 : if (b == NULL) {
1927 0 : mapi_close_handle(hdl);
1928 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1929 : }
1930 17 : while (mapi_fetch_row(hdl)) {
1931 12 : fld2 = mapi_fetch_field(hdl, 1);
1932 12 : if (SERVERfieldAnalysis(fld2, tt, &tval) < 0) {
1933 0 : BBPreclaim(b);
1934 0 : mapi_close_handle(hdl);
1935 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1936 : }
1937 12 : if (BUNappend(b, VALptr(&tval), false) != GDK_SUCCEED) {
1938 0 : BBPreclaim(b);
1939 0 : mapi_close_handle(hdl);
1940 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1941 : }
1942 : }
1943 5 : mapi_close_handle(hdl);
1944 5 : *ret = b->batCacheid;
1945 5 : BBPkeepref(b);
1946 :
1947 5 : return err;
1948 : }
1949 :
1950 : static str
1951 2 : SERVERput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1952 : {
1953 2 : int *key;
1954 2 : str *nme;
1955 2 : ptr val;
1956 2 : int i, tpe;
1957 2 : Mapi mid;
1958 2 : MapiHdl hdl = 0;
1959 2 : char *w = 0, buf[BUFSIZ];
1960 :
1961 2 : (void) cntxt;
1962 2 : key = getArgReference_int(stk, pci, pci->retc);
1963 2 : nme = getArgReference_str(stk, pci, pci->retc + 1);
1964 2 : val = getArgReference(stk, pci, pci->retc + 2);
1965 6 : accessTest(*key, "put");
1966 2 : switch ((tpe = getArgType(mb, pci, pci->retc + 2))) {
1967 0 : case TYPE_bat:{
1968 : /* generate a tuple batch */
1969 : /* and reload it into the proper format */
1970 0 : str ht, tt;
1971 0 : BAT *b = BBPquickdesc(BBPindex(*nme));
1972 0 : size_t len;
1973 :
1974 0 : if (!b)
1975 0 : throw(MAL, "mapi.put", RUNTIME_OBJECT_MISSING);
1976 :
1977 : /* reconstruct the object */
1978 0 : ht = getTypeName(TYPE_oid);
1979 0 : tt = getTypeName(getBatType(tpe));
1980 0 : snprintf(buf, BUFSIZ, "%s:= bat.new(:%s,%s);", *nme, ht, tt);
1981 0 : len = strlen(buf);
1982 0 : snprintf(buf + len, BUFSIZ - len, "%s:= io.import(%s,tuples);", *nme,
1983 : *nme);
1984 :
1985 : /* and execute the request */
1986 0 : if (SERVERsessions[i].hdl)
1987 0 : mapi_close_handle(SERVERsessions[i].hdl);
1988 0 : SERVERsessions[i].hdl = mapi_query(mid, buf);
1989 :
1990 0 : GDKfree(ht);
1991 0 : GDKfree(tt);
1992 0 : break;
1993 : }
1994 0 : case TYPE_str:
1995 0 : snprintf(buf, BUFSIZ, "%s:=%s;", *nme, *(char **) val);
1996 0 : if (SERVERsessions[i].hdl)
1997 0 : mapi_close_handle(SERVERsessions[i].hdl);
1998 0 : SERVERsessions[i].hdl = mapi_query(mid, buf);
1999 0 : break;
2000 2 : default:
2001 2 : if ((w = ATOMformat(tpe, val)) == NULL)
2002 0 : throw(MAL, "mapi.put", GDK_EXCEPTION);
2003 2 : snprintf(buf, BUFSIZ, "%s:=%s;", *nme, w);
2004 2 : GDKfree(w);
2005 2 : if (SERVERsessions[i].hdl)
2006 2 : mapi_close_handle(SERVERsessions[i].hdl);
2007 2 : SERVERsessions[i].hdl = mapi_query(mid, buf);
2008 2 : break;
2009 : }
2010 2 : catchErrors("mapi.put");
2011 : return MAL_SUCCEED;
2012 : }
2013 :
2014 : static str
2015 0 : SERVERputLocal(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
2016 : {
2017 0 : str *ret, *nme;
2018 0 : ptr val;
2019 0 : int tpe;
2020 0 : char *w = 0, buf[BUFSIZ];
2021 :
2022 0 : (void) cntxt;
2023 0 : ret = getArgReference_str(stk, pci, 0);
2024 0 : nme = getArgReference_str(stk, pci, pci->retc);
2025 0 : val = getArgReference(stk, pci, pci->retc + 1);
2026 0 : switch ((tpe = getArgType(mb, pci, pci->retc + 1))) {
2027 0 : case TYPE_bat:
2028 : case TYPE_ptr:
2029 0 : throw(MAL, "mapi.glue", "Unsupported type");
2030 0 : case TYPE_str:
2031 0 : snprintf(buf, BUFSIZ, "%s:=%s;", *nme, *(char **) val);
2032 0 : break;
2033 0 : default:
2034 0 : if ((w = ATOMformat(tpe, val)) == NULL)
2035 0 : throw(MAL, "mapi.glue", GDK_EXCEPTION);
2036 0 : snprintf(buf, BUFSIZ, "%s:=%s;", *nme, w);
2037 0 : GDKfree(w);
2038 0 : break;
2039 : }
2040 0 : *ret = GDKstrdup(buf);
2041 0 : if (*ret == NULL)
2042 0 : throw(MAL, "mapi.glue", GDK_EXCEPTION);
2043 : return MAL_SUCCEED;
2044 : }
2045 :
2046 : static str
2047 1 : SERVERbindBAT(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
2048 : {
2049 1 : int *key;
2050 1 : str *nme, *tab, *col;
2051 1 : int i;
2052 1 : Mapi mid;
2053 1 : MapiHdl hdl = 0;
2054 1 : char buf[BUFSIZ];
2055 :
2056 1 : (void) cntxt;
2057 1 : key = getArgReference_int(stk, pci, pci->retc);
2058 1 : nme = getArgReference_str(stk, pci, pci->retc + 1);
2059 2 : accessTest(*key, "bind");
2060 1 : if (pci->argc == 6) {
2061 0 : char *tn;
2062 0 : tab = getArgReference_str(stk, pci, pci->retc + 2);
2063 0 : col = getArgReference_str(stk, pci, pci->retc + 3);
2064 0 : i = *getArgReference_int(stk, pci, pci->retc + 4);
2065 0 : tn = getTypeName(getBatType(getVarType(mb, getDestVar(pci))));
2066 0 : snprintf(buf, BUFSIZ, "%s:bat[:%s]:=sql.bind(\"%s\",\"%s\",\"%s\",%d);",
2067 : getVarName(mb, getDestVar(pci)), tn, *nme, *tab, *col, i);
2068 0 : GDKfree(tn);
2069 1 : } else if (pci->argc == 5) {
2070 0 : tab = getArgReference_str(stk, pci, pci->retc + 2);
2071 0 : i = *getArgReference_int(stk, pci, pci->retc + 3);
2072 0 : snprintf(buf, BUFSIZ, "%s:bat[:oid]:=sql.bind(\"%s\",\"%s\",0,%d);",
2073 : getVarName(mb, getDestVar(pci)), *nme, *tab, i);
2074 : } else {
2075 1 : str hn, tn;
2076 1 : int target = getArgType(mb, pci, 0);
2077 1 : hn = getTypeName(TYPE_oid);
2078 1 : tn = getTypeName(getBatType(target));
2079 1 : snprintf(buf, BUFSIZ, "%s:bat[:%s]:=bbp.bind(\"%s\");",
2080 : getVarName(mb, getDestVar(pci)), tn, *nme);
2081 1 : GDKfree(hn);
2082 1 : GDKfree(tn);
2083 : }
2084 1 : if (SERVERsessions[i].hdl)
2085 1 : mapi_close_handle(SERVERsessions[i].hdl);
2086 1 : SERVERsessions[i].hdl = mapi_query(mid, buf);
2087 1 : catchErrors("mapi.bind");
2088 : return MAL_SUCCEED;
2089 : }
2090 :
2091 : #include "mel.h"
2092 : mel_func mal_mapi_init_funcs[] = {
2093 : command("mapi", "listen", SERVERlisten_default, false, "Start a Mapi server with the default settings.", args(1,1, arg("",int))),
2094 : command("mapi", "listen", SERVERlisten_port, false, "Start a Mapi listener on the port given.", args(1,2, arg("",int),arg("port",int))),
2095 : command("mapi", "listen", SERVERlisten_usock, false, "Start a Mapi listener on the unix socket file given.", args(1,2, arg("",int),arg("unixsocket",str))),
2096 : command("mapi", "stop", SERVERstop, false, "Terminate connection listeners.", args(1,1, arg("",void))),
2097 : command("mapi", "suspend", SERVERsuspend, false, "Suspend accepting connections.", args(1,1, arg("",void))),
2098 : command("mapi", "resume", SERVERresume, false, "Resume connection listeners.", args(1,1, arg("",void))),
2099 : command("mapi", "malclient", SERVERclient, false, "Start a Mapi client for a particular stream pair.", args(1,3, arg("",void),arg("in",streams),arg("out",streams))),
2100 : command("mapi", "trace", SERVERtrace, false, "Toggle the Mapi library debug tracer.", args(1,3, arg("",void),arg("mid",int),arg("flag",int))),
2101 : pattern("mapi", "reconnect", SERVERreconnectWithoutAlias, false, "Re-establish connection with a remote mserver.", args(1,6, arg("",int),arg("host",str),arg("port",int),arg("usr",str),arg("passwd",str),arg("lang",str))),
2102 : pattern("mapi", "reconnect", SERVERreconnectAlias, false, "Re-establish connection with a remote mserver.", args(1,7, arg("",int),arg("host",str),arg("port",int),arg("db_alias",str),arg("usr",str),arg("passwd",str),arg("lang",str))),
2103 : command("mapi", "reconnect", SERVERreconnect, false, "Re-establish a connection.", args(1,2, arg("",void),arg("mid",int))),
2104 : pattern("mapi", "connect", SERVERconnect, false, "Establish connection with a remote mserver.", args(1,6, arg("",int),arg("host",str),arg("port",int),arg("usr",str),arg("passwd",str),arg("lang",str))),
2105 : command("mapi", "disconnect", SERVERdisconnectWithAlias, false, "Close connection with a remote Mserver.", args(1,2, arg("",int),arg("dbalias",str))),
2106 : command("mapi", "disconnect", SERVERdisconnectALL, false, "Close connections with all remote Mserver.", args(1,1, arg("",int))),
2107 : command("mapi", "setAlias", SERVERsetAlias, false, "Give the channel a logical name.", args(0,2, arg("key",int),arg("dbalias",str))),
2108 : command("mapi", "lookup", SERVERlookup, false, "Retrieve the connection identifier.", args(1,2, arg("",int),arg("dbalias",str))),
2109 : command("mapi", "disconnect", SERVERdisconnect, false, "Terminate the session.", args(1,2, arg("",void),arg("mid",int))),
2110 : command("mapi", "destroy", SERVERdestroy, false, "Destroy the handle for an Mserver.", args(1,2, arg("",void),arg("mid",int))),
2111 : command("mapi", "ping", SERVERping, false, "Test availability of an Mserver.", args(1,2, arg("",int),arg("mid",int))),
2112 : command("mapi", "query", SERVERquery, false, "Send the query for execution", args(1,3, arg("",int),arg("mid",int),arg("qry",str))),
2113 : command("mapi", "query_handle", SERVERquery_handle, false, "Send the query for execution.", args(1,3, arg("",int),arg("mid",int),arg("qry",str))),
2114 : pattern("mapi", "query_array", SERVERquery_array, false, "Send the query for execution replacing '?' by arguments.", args(1,4, arg("",int),arg("mid",int),arg("qry",str),vararg("arg",str))),
2115 : command("mapi", "prepare", SERVERprepare, false, "Prepare a query for execution.", args(1,3, arg("",int),arg("mid",int),arg("qry",str))),
2116 : command("mapi", "finish", SERVERfinish, false, "Remove all remaining answers.", args(1,2, arg("",int),arg("hdl",int))),
2117 : command("mapi", "get_field_count", SERVERget_field_count, false, "Return number of fields.", args(1,2, arg("",int),arg("hdl",int))),
2118 : command("mapi", "get_row_count", SERVERget_row_count, false, "Return number of rows.", args(1,2, arg("",lng),arg("hdl",int))),
2119 : command("mapi", "rows_affected", SERVERrows_affected, false, "Return number of affected rows.", args(1,2, arg("",lng),arg("hdl",int))),
2120 : command("mapi", "fetch_row", SERVERfetch_row, false, "Retrieve the next row for analysis.", args(1,2, arg("",int),arg("hdl",int))),
2121 : command("mapi", "fetch_all_rows", SERVERfetch_all_rows, false, "Retrieve all rows into the cache.", args(1,2, arg("",lng),arg("hdl",int))),
2122 : command("mapi", "fetch_field", SERVERfetch_field_str, false, "Retrieve a single field.", args(1,3, arg("",str),arg("hdl",int),arg("fnr",int))),
2123 : command("mapi", "fetch_field", SERVERfetch_field_int, false, "Retrieve a single int field.", args(1,3, arg("",int),arg("hdl",int),arg("fnr",int))),
2124 : command("mapi", "fetch_field", SERVERfetch_field_lng, false, "Retrieve a single lng field.", args(1,3, arg("",lng),arg("hdl",int),arg("fnr",int))),
2125 : command("mapi", "fetch_field", SERVERfetch_field_sht, false, "Retrieve a single sht field.", args(1,3, arg("",sht),arg("hdl",int),arg("fnr",int))),
2126 : command("mapi", "fetch_field", SERVERfetch_field_void, false, "Retrieve a single void field.", args(1,3, arg("",void),arg("hdl",int),arg("fnr",int))),
2127 : command("mapi", "fetch_field", SERVERfetch_field_oid, false, "Retrieve a single void field.", args(1,3, arg("",oid),arg("hdl",int),arg("fnr",int))),
2128 : command("mapi", "fetch_field", SERVERfetch_field_bte, false, "Retrieve a single bte field.", args(1,3, arg("",bte),arg("hdl",int),arg("fnr",int))),
2129 : command("mapi", "fetch_field_array", SERVERfetch_field_bat, false, "Retrieve all fields for a row.", args(1,2, batarg("",str),arg("hdl",int))),
2130 : command("mapi", "fetch_line", SERVERfetch_line, false, "Retrieve a complete line.", args(1,2, arg("",str),arg("hdl",int))),
2131 : command("mapi", "fetch_reset", SERVERfetch_reset, false, "Reset the cache read line.", args(1,2, arg("",int),arg("hdl",int))),
2132 : command("mapi", "next_result", SERVERnext_result, false, "Go to next result set.", args(1,2, arg("",int),arg("hdl",int))),
2133 : command("mapi", "error", SERVERerror, false, "Check for an error in the communication.", args(1,2, arg("",int),arg("mid",int))),
2134 : command("mapi", "getError", SERVERgetError, false, "Get error message.", args(1,2, arg("",str),arg("mid",int))),
2135 : command("mapi", "explain", SERVERexplain, false, "Turn the error seen into a string.", args(1,2, arg("",str),arg("mid",int))),
2136 : pattern("mapi", "put", SERVERput, false, "Send a value to a remote site.", args(1,4, arg("",void),arg("mid",int),arg("nme",str),argany("val",1))),
2137 : pattern("mapi", "put", SERVERputLocal, false, "Prepare sending a value to a remote site.", args(1,3, arg("",str),arg("nme",str),argany("val",1))),
2138 : pattern("mapi", "rpc", SERVERmapi_rpc_single_row, false, "Send a simple query for execution and fetch result.", args(1,3, argany("",0),arg("key",int),vararg("qry",str))),
2139 : pattern("mapi", "rpc", SERVERmapi_rpc_bat, false, "", args(1,3, batargany("",2),arg("key",int),arg("qry",str))),
2140 : command("mapi", "rpc", SERVERquery, false, "Send a simple query for execution.", args(1,3, arg("",int),arg("key",int),arg("qry",str))),
2141 : pattern("mapi", "bind", SERVERbindBAT, false, "Bind a remote variable to a local one.", args(1,6, batargany("",2),arg("key",int),arg("rschema",str),arg("rtable",str),arg("rcolumn",str),arg("i",int))),
2142 : pattern("mapi", "bind", SERVERbindBAT, false, "Bind a remote variable to a local one.", args(1,5, batargany("",2),arg("key",int),arg("rschema",str),arg("rtable",str),arg("i",int))),
2143 : pattern("mapi", "bind", SERVERbindBAT, false, "Bind a remote variable to a local one.", args(1,3, batargany("",2),arg("key",int),arg("remoteName",str))),
2144 : #ifdef HAVE_HGE
2145 : command("mapi", "fetch_field", SERVERfetch_field_hge, false, "Retrieve a single hge field.", args(1,3, arg("",hge),arg("hdl",int),arg("fnr",int))),
2146 : #endif
2147 : { .imp=NULL }
2148 : };
2149 : #include "mal_import.h"
2150 : #ifdef _MSC_VER
2151 : #undef read
2152 : #pragma section(".CRT$XCU",read)
2153 : #endif
2154 329 : LIB_STARTUP_FUNC(init_mal_mapi_mal)
2155 329 : { mal_module2("mapi", NULL, mal_mapi_init_funcs, MAPIprelude, NULL); }
|