Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * Clients gain access to the Monet server through a internet connection.
15 : * Access through the internet requires a client program at the source,
16 : * which addresses the default port of a running server. It is a textual
17 : * interface for expert use.
18 : *
19 : * At the server side, each client is represented by a session record
20 : * with the current status, such as name, file descriptors, namespace,
21 : * and local stack. Each client session has a dedicated thread of
22 : * control.
23 : *
24 : * The number of clients permitted concurrent access is a run time
25 : * option.
26 : *
27 : * Client sessions remain in existence until the corresponding
28 : * communication channels break.
29 : *
30 : * A client record is initialized upon acceptance of a connection. The
31 : * client runs in his own thread of control until it finds a
32 : * soft-termination request mode (FINISHCLIENT) or its IO file descriptors
33 : * are closed. The latter generates an IO error, which leads to a safe
34 : * termination.
35 : *
36 : * The system administrator client runs in the primary thread of control
37 : * to simplify debugging with external debuggers.
38 : *
39 : * Searching a free client record is encapsulated in a critical section
40 : * to hand them out one-at-a-time. Marking them as being claimed avoids
41 : * any interference from parallel actions to obtain client records.
42 : */
43 :
44 : /* (author) M.L. Kersten */
45 : #include "monetdb_config.h"
46 : #include "mal_client.h"
47 : #include "mal_import.h"
48 : #include "mal_parser.h"
49 : #include "mal_namespace.h"
50 : #include "mal_private.h"
51 : #include "mal_internal.h"
52 : #include "mal_interpreter.h"
53 : #include "mal_runtime.h"
54 : #include "mal_authorize.h"
55 : #include "mapi_prompt.h"
56 :
57 : int MAL_MAXCLIENTS = 0;
58 : ClientRec *mal_clients = NULL;
59 :
60 : void
61 334 : mal_client_reset(void)
62 : {
63 334 : if (mal_clients) {
64 334 : for (int i = 0; i < MAL_MAXCLIENTS; i++) {
65 : ATOMIC_DESTROY(&mal_clients[i].lastprint);
66 : ATOMIC_DESTROY(&mal_clients[i].workers);
67 : ATOMIC_DESTROY(&mal_clients[i].qryctx.datasize);
68 : }
69 334 : GDKfree(mal_clients);
70 334 : mal_clients = NULL;
71 : }
72 334 : MAL_MAXCLIENTS = 0;
73 334 : }
74 :
75 : bool
76 336 : MCinit(void)
77 : {
78 336 : const char *max_clients = GDKgetenv("max_clients");
79 336 : int maxclients = 0;
80 :
81 336 : if (max_clients != NULL)
82 1 : maxclients = atoi(max_clients);
83 1 : if (maxclients <= 0) {
84 335 : maxclients = 64;
85 335 : if (GDKsetenv("max_clients", "64") != GDK_SUCCEED) {
86 0 : TRC_CRITICAL(MAL_SERVER,
87 : "Initialization failed: " MAL_MALLOC_FAIL "\n");
88 0 : return false;
89 : }
90 : }
91 :
92 336 : MAL_MAXCLIENTS = /* client connections */ maxclients;
93 336 : mal_clients = GDKzalloc(sizeof(ClientRec) * MAL_MAXCLIENTS);
94 336 : if (mal_clients == NULL) {
95 0 : TRC_CRITICAL(MAL_SERVER,
96 : "Initialization failed: " MAL_MALLOC_FAIL "\n");
97 0 : return false;
98 : }
99 21780 : for (int i = 0; i < MAL_MAXCLIENTS; i++) {
100 21444 : ATOMIC_INIT(&mal_clients[i].lastprint, 0);
101 21444 : ATOMIC_INIT(&mal_clients[i].workers, 1);
102 21444 : ATOMIC_INIT(&mal_clients[i].qryctx.datasize, 0);
103 21444 : mal_clients[i].idx = -1; /* indicate it's available */
104 : }
105 : return true;
106 : }
107 :
108 : /* stack the files from which you read */
109 : int
110 0 : MCpushClientInput(Client c, bstream *new_input, int listing, const char *prompt)
111 : {
112 0 : ClientInput *x = (ClientInput *) GDKmalloc(sizeof(ClientInput));
113 0 : if (x == 0)
114 : return -1;
115 0 : x->fdin = c->fdin;
116 0 : x->yycur = c->yycur;
117 0 : x->listing = c->listing;
118 0 : x->prompt = c->prompt;
119 0 : x->next = c->bak;
120 0 : c->bak = x;
121 0 : c->fdin = new_input;
122 0 : c->listing = listing;
123 0 : c->prompt = prompt ? prompt : "";
124 0 : c->promptlength = strlen(c->prompt);
125 0 : c->yycur = 0;
126 0 : return 0;
127 : }
128 :
129 : void
130 0 : MCpopClientInput(Client c)
131 : {
132 0 : ClientInput *x = c->bak;
133 0 : if (c->fdin) {
134 : /* missing protection against closing stdin stream */
135 0 : bstream_destroy(c->fdin);
136 : }
137 0 : c->fdin = x->fdin;
138 0 : c->yycur = x->yycur;
139 0 : c->listing = x->listing;
140 0 : c->prompt = x->prompt;
141 0 : c->promptlength = strlen(c->prompt);
142 0 : c->bak = x->next;
143 0 : GDKfree(x);
144 0 : }
145 :
146 : static Client
147 38562 : MCnewClient(void)
148 : {
149 471938 : for (Client c = mal_clients; c < mal_clients + MAL_MAXCLIENTS; c++) {
150 471938 : if (c->idx == -1) {
151 38562 : assert(c->mode == FREECLIENT);
152 38562 : c->mode = RUNCLIENT;
153 38562 : c->idx = (int) (c - mal_clients);
154 38562 : return c;
155 : }
156 : }
157 :
158 : return NULL;
159 : }
160 :
161 : /*
162 : * You can always retrieve a client record using the thread identifier,
163 : * because we maintain a 1-1 mapping between client and thread of
164 : * control. Therefore, we don't need locks either.
165 : * If the number of clients becomes too large, we have to change the
166 : * allocation and lookup scheme.
167 : *
168 : * Finding a client record is tricky when we are spawning threads as
169 : * co-workers. It is currently passed as an argument.
170 : */
171 :
172 : Client
173 409396 : MCgetClient(int id)
174 : {
175 409396 : if (id <0 || id >=MAL_MAXCLIENTS)
176 : return NULL;
177 409396 : return mal_clients + id;
178 : }
179 :
180 : /*
181 : * The resetProfiler is called when the owner of the event stream
182 : * leaves the scene. (Unclear if parallelism may cause errors)
183 : */
184 :
185 : static void
186 38561 : MCresetProfiler(stream *fdout)
187 : {
188 38561 : if (fdout != maleventstream)
189 : return;
190 0 : MT_lock_set(&mal_profileLock);
191 0 : maleventstream = NULL;
192 0 : profilerStatus = 0;
193 0 : profilerMode = 0;
194 0 : MT_lock_unset(&mal_profileLock);
195 : }
196 :
197 : static void
198 38561 : MCexitClient(Client c)
199 : {
200 38561 : MCresetProfiler(c->fdout);
201 : // Remove any left over constant symbols
202 38561 : if (c->curprg)
203 335 : resetMalBlk(c->curprg->def);
204 38561 : if (c->father == NULL) { /* normal client */
205 38561 : if (c->fdout && c->fdout != GDKstdout)
206 38203 : close_stream(c->fdout);
207 38561 : assert(c->bak == NULL);
208 38561 : if (c->fdin) {
209 : /* protection against closing stdin stream */
210 38561 : if (c->fdin->s == GDKstdin)
211 358 : c->fdin->s = NULL;
212 38561 : bstream_destroy(c->fdin);
213 : }
214 38561 : c->fdout = NULL;
215 38561 : c->fdin = NULL;
216 : }
217 38561 : assert(c->query == NULL);
218 38561 : if (profilerStatus > 0) {
219 0 : lng Tend = GDKusec();
220 0 : profilerEvent(NULL,
221 : &(struct NonMalEvent)
222 0 : { CLIENT_END, c, Tend, NULL, NULL, 0,
223 0 : Tend - (c->session) });
224 : }
225 38561 : }
226 :
227 : static Client
228 38562 : MCinitClientRecord(Client c, oid user, bstream *fin, stream *fout)
229 : {
230 : /* mal_contextLock is held when this is called */
231 38562 : c->user = user;
232 38562 : c->username = 0;
233 38562 : c->scenario = NULL;
234 38562 : c->srcFile = NULL;
235 38562 : c->blkmode = 0;
236 :
237 38562 : c->fdin = fin ? fin : bstream_create(GDKstdin, 0);
238 38562 : if (c->fdin == NULL) {
239 0 : c->mode = FREECLIENT;
240 0 : c->idx = -1;
241 0 : TRC_ERROR(MAL_SERVER, "No stdin channel available\n");
242 0 : return NULL;
243 : }
244 38562 : c->yycur = 0;
245 38562 : c->bak = NULL;
246 :
247 38562 : c->listing = 0;
248 38562 : c->fdout = fout ? fout : GDKstdout;
249 38562 : c->curprg = c->backup = 0;
250 38562 : c->glb = 0;
251 :
252 : /* remove garbage from previous connection
253 : * be aware, a user can introduce several modules
254 : * that should be freed to avoid memory leaks */
255 38562 : c->usermodule = c->curmodule = 0;
256 :
257 38562 : c->father = NULL;
258 38562 : c->idle = c->login = c->lastcmd = time(0);
259 38562 : c->session = GDKusec();
260 38562 : strcpy_len(c->optimizer, "default_pipe", sizeof(c->optimizer));
261 38562 : c->workerlimit = 0;
262 38562 : c->memorylimit = 0;
263 38562 : c->qryctx.querytimeout = 0;
264 38562 : c->sessiontimeout = 0;
265 38562 : c->logical_sessiontimeout = 0;
266 38562 : c->qryctx.starttime = 0;
267 38562 : ATOMIC_SET(&c->qryctx.datasize, 0);
268 38562 : c->qryctx.maxmem = 0;
269 38562 : c->maxmem = 0;
270 38562 : c->errbuf = 0;
271 :
272 38562 : c->prompt = PROMPT1;
273 38562 : c->promptlength = strlen(c->prompt);
274 :
275 38562 : c->profticks = c->profstmt = c->profevents = NULL;
276 38562 : c->error_row = c->error_fld = c->error_msg = c->error_input = NULL;
277 38562 : c->sqlprofiler = 0;
278 38562 : c->blocksize = BLOCK;
279 38562 : c->protocol = PROTOCOL_9;
280 :
281 38562 : c->filetrans = false;
282 38562 : c->handshake_options = NULL;
283 38562 : c->query = NULL;
284 :
285 38562 : char name[MT_NAME_LEN];
286 38562 : snprintf(name, sizeof(name), "Client%d->s", (int) (c - mal_clients));
287 38562 : MT_sema_init(&c->s, 0, name);
288 38562 : return c;
289 : }
290 :
291 : Client
292 38562 : MCinitClient(oid user, bstream *fin, stream *fout)
293 : {
294 38562 : Client c = NULL;
295 :
296 38562 : MT_lock_set(&mal_contextLock);
297 38562 : c = MCnewClient();
298 38562 : if (c) {
299 38562 : c = MCinitClientRecord(c, user, fin, fout);
300 38562 : MT_thread_set_qry_ctx(&c->qryctx);
301 : }
302 38562 : MT_lock_unset(&mal_contextLock);
303 :
304 38562 : if (c && profilerStatus > 0)
305 0 : profilerEvent(NULL,
306 : &(struct NonMalEvent)
307 0 : { CLIENT_START, c, c->session, NULL, NULL, 0, 0 }
308 : );
309 38562 : return c;
310 : }
311 :
312 :
313 : /*
314 : * The administrator should be initialized to enable interpretation of
315 : * the command line arguments, before it starts servicing statements
316 : */
317 : int
318 38528 : MCinitClientThread(Client c)
319 : {
320 : /*
321 : * The GDK thread administration should be set to reflect use of
322 : * the proper IO descriptors.
323 : */
324 38528 : c->mythread = MT_thread_getname();
325 38529 : c->errbuf = GDKerrbuf;
326 38529 : if (c->errbuf == NULL) {
327 38529 : char *n = GDKzalloc(GDKMAXERRLEN);
328 38528 : if (n == NULL) {
329 0 : MCresetProfiler(c->fdout);
330 0 : return -1;
331 : }
332 38528 : GDKsetbuf(n);
333 38529 : c->errbuf = GDKerrbuf;
334 : } else
335 0 : c->errbuf[0] = 0;
336 : return 0;
337 : }
338 :
339 : static bool shutdowninprogress = false;
340 :
341 : bool
342 0 : MCshutdowninprogress(void)
343 : {
344 0 : MT_lock_set(&mal_contextLock);
345 0 : bool ret = shutdowninprogress;
346 0 : MT_lock_unset(&mal_contextLock);
347 0 : return ret;
348 : }
349 :
350 : /*
351 : * When a client needs to be terminated then the file descriptors for
352 : * its input/output are simply closed. This leads to a graceful
353 : * degradation, but may take some time when the client is busy. A more
354 : * forceful method is to kill the client thread, but this may leave
355 : * locks and semaphores in an undesirable state.
356 : *
357 : * The routine freeClient ends a single client session, but through side
358 : * effects of sharing IO descriptors, also its children. Conversely, a
359 : * child can not close a parent.
360 : */
361 : void
362 38561 : MCcloseClient(Client c)
363 : {
364 38561 : MT_lock_set(&mal_contextLock);
365 38561 : if (c->mode == FREECLIENT) {
366 0 : assert(c->idx == -1);
367 0 : MT_lock_unset(&mal_contextLock);
368 0 : return;
369 : }
370 38561 : c->mode = FINISHCLIENT;
371 38561 : MT_lock_unset(&mal_contextLock);
372 :
373 38561 : MCexitClient(c);
374 :
375 : /* scope list and curprg can not be removed, because the client may
376 : * reside in a quit() command. Therefore the scopelist is re-used.
377 : */
378 38561 : c->scenario = NULL;
379 38561 : c->prompt = NULL;
380 38561 : c->promptlength = -1;
381 38561 : if (c->errbuf) {
382 : /* no client threads in embedded mode */
383 38528 : GDKsetbuf(NULL);
384 38528 : if (c->father == NULL)
385 38528 : GDKfree(c->errbuf);
386 38528 : c->errbuf = NULL;
387 : }
388 38561 : if (c->usermodule)
389 338 : freeModule(c->usermodule);
390 38561 : c->usermodule = c->curmodule = 0;
391 38561 : c->father = 0;
392 38561 : strcpy_len(c->optimizer, "default_pipe", sizeof(c->optimizer));
393 38561 : c->workerlimit = 0;
394 38561 : c->memorylimit = 0;
395 38561 : c->qryctx.querytimeout = 0;
396 38561 : c->sessiontimeout = 0;
397 38561 : c->logical_sessiontimeout = 0;
398 38561 : c->user = oid_nil;
399 38561 : if (c->username) {
400 38200 : GDKfree(c->username);
401 38200 : c->username = 0;
402 : }
403 38561 : c->mythread = NULL;
404 38561 : if (c->glb) {
405 38557 : freeStack(c->glb);
406 38557 : c->glb = NULL;
407 : }
408 38561 : if (c->profticks) {
409 22 : BBPunfix(c->profticks->batCacheid);
410 22 : BBPunfix(c->profstmt->batCacheid);
411 22 : BBPunfix(c->profevents->batCacheid);
412 22 : c->profticks = c->profstmt = c->profevents = NULL;
413 : }
414 38561 : if (c->error_row) {
415 472 : BBPunfix(c->error_row->batCacheid);
416 472 : BBPunfix(c->error_fld->batCacheid);
417 472 : BBPunfix(c->error_msg->batCacheid);
418 472 : BBPunfix(c->error_input->batCacheid);
419 472 : c->error_row = c->error_fld = c->error_msg = c->error_input = NULL;
420 : }
421 38561 : c->sqlprofiler = 0;
422 38561 : free(c->handshake_options);
423 38561 : c->handshake_options = NULL;
424 38561 : MT_thread_set_qry_ctx(NULL);
425 38561 : assert(c->qryctx.datasize == 0);
426 38561 : MT_sema_destroy(&c->s);
427 38561 : MT_lock_set(&mal_contextLock);
428 38561 : c->idle = c->login = c->lastcmd = 0;
429 38561 : if (shutdowninprogress) {
430 32 : c->mode = BLOCKCLIENT;
431 : } else {
432 38529 : c->mode = FREECLIENT;
433 38529 : c->idx = -1;
434 : }
435 38561 : MT_lock_unset(&mal_contextLock);
436 : }
437 :
438 : /*
439 : * If a client disappears from the scene (eof on stream), we should
440 : * terminate all its children. This is in principle a forceful action,
441 : * because the children may be ignoring the primary IO streams.
442 : * (Instead they may be blocked in an infinite loop)
443 : *
444 : * Special care should be taken by closing the 'adm' thread. It is
445 : * permitted to leave only when it is the sole user of the system.
446 : *
447 : * Furthermore, once we enter closeClient, the process in which it is
448 : * raised has already lost its file descriptors.
449 : *
450 : * When the server is about to shutdown, we should softly terminate
451 : * all outstanding session.
452 : */
453 : void
454 336 : MCstopClients(Client cntxt)
455 : {
456 336 : MT_lock_set(&mal_contextLock);
457 21780 : for (int i = 0; i < MAL_MAXCLIENTS; i++) {
458 21444 : Client c = mal_clients + i;
459 21444 : if (cntxt != c) {
460 21442 : if (c->mode == RUNCLIENT)
461 0 : c->mode = FINISHCLIENT;
462 21442 : else if (c->mode == FREECLIENT) {
463 21287 : assert(c->idx == -1);
464 21287 : c->idx = i;
465 21287 : c->mode = BLOCKCLIENT;
466 : }
467 : }
468 : }
469 336 : shutdowninprogress = true;
470 336 : MT_lock_unset(&mal_contextLock);
471 336 : }
472 :
473 : int
474 45820 : MCactiveClients(void)
475 : {
476 45820 : int active = 0;
477 :
478 45820 : MT_lock_set(&mal_contextLock);
479 2976800 : for (Client cntxt = mal_clients; cntxt < mal_clients + MAL_MAXCLIENTS;
480 2930980 : cntxt++) {
481 5681514 : active += (cntxt->idle == 0 && cntxt->mode == RUNCLIENT);
482 : }
483 45820 : MT_lock_unset(&mal_contextLock);
484 45820 : return active;
485 : }
486 :
487 : str
488 0 : MCsuspendClient(int id)
489 : {
490 0 : if (id <0 || id >=MAL_MAXCLIENTS)
491 0 : throw(INVCRED, "mal.clients", INVCRED_WRONG_ID);
492 : return MAL_SUCCEED;
493 : }
494 :
495 : str
496 0 : MCawakeClient(int id)
497 : {
498 0 : if (id <0 || id >=MAL_MAXCLIENTS)
499 0 : throw(INVCRED, "mal.clients", INVCRED_WRONG_ID);
500 : return MAL_SUCCEED;
501 : }
502 :
503 : /*
504 : * Input to be processed is collected in a Client specific buffer. It
505 : * is filled by reading information from a stream, a terminal, or by
506 : * scheduling strings constructed internally. The latter involves
507 : * removing any escape character needed to manipulate the string within
508 : * the kernel. The buffer space is automatically expanded to
509 : * accommodate new information and the read pointers are adjusted.
510 : *
511 : * The input is read from a (blocked) stream and stored in the client
512 : * record input buffer. The storage area grows automatically upon need.
513 : * The origin of the input stream depends on the connectivity mode.
514 : *
515 : * Each operation received from a front-end consists of at least one
516 : * line. To simplify misaligned communication with front-ends, we use
517 : * different prompts structures.
518 : *
519 : * The default action is to read information from an ascii-stream one
520 : * line at a time. This is the preferred mode for reading from terminal.
521 : *
522 : * The next statement block is to be read. Send a prompt to warn the
523 : * front-end to issue the request.
524 : */
525 : int
526 10959 : MCreadClient(Client c)
527 : {
528 10959 : bstream *in = c->fdin;
529 :
530 12356 : while (in->pos < in->len &&
531 1397 : (isspace((unsigned char) (in->buf[in->pos])) ||
532 0 : in->buf[in->pos] == ';' || !in->buf[in->pos]))
533 1397 : in->pos++;
534 :
535 10959 : if (in->pos >= in->len || in->mode) {
536 10959 : ssize_t rd;
537 :
538 10959 : if (in->eof || !isa_block_stream(c->fdout)) {
539 10959 : if (!isa_block_stream(c->fdout) && c->promptlength > 0)
540 0 : mnstr_write(c->fdout, c->prompt, c->promptlength, 1);
541 10959 : mnstr_flush(c->fdout, MNSTR_FLUSH_DATA);
542 10959 : in->eof = false;
543 : }
544 21420 : while ((rd = bstream_next(in)) > 0 && !in->eof) {
545 10461 : if (!in->mode) /* read one line at a time in line mode */
546 : break;
547 : }
548 10959 : if (in->mode) { /* find last new line */
549 10959 : char *p = in->buf + in->len - 1;
550 :
551 10959 : while (p > in->buf && *p != '\n') {
552 0 : *(p + 1) = *p;
553 0 : p--;
554 : }
555 10959 : if (p > in->buf)
556 10461 : *(p + 1) = 0;
557 10959 : if (p != in->buf + in->len - 1)
558 0 : in->len++;
559 : }
560 : }
561 10959 : if (in->pos >= in->len) {
562 : /* end of stream reached */
563 498 : if (c->bak) {
564 0 : MCpopClientInput(c);
565 0 : if (c->fdin == NULL)
566 : return 0;
567 : return MCreadClient(c);
568 : }
569 : return 0;
570 : }
571 : return 1;
572 : }
573 :
574 : int
575 51 : MCvalid(Client tc)
576 : {
577 51 : if (tc == NULL) {
578 : return 0;
579 : }
580 51 : MT_lock_set(&mal_contextLock);
581 111 : for (Client c = mal_clients; c < mal_clients + MAL_MAXCLIENTS; c++) {
582 111 : if (c == tc && c->mode == RUNCLIENT) {
583 51 : MT_lock_unset(&mal_contextLock);
584 51 : return 1;
585 : }
586 : }
587 0 : MT_lock_unset(&mal_contextLock);
588 0 : return 0;
589 : }
590 :
591 : void
592 114 : MCprintinfo(void)
593 : {
594 114 : int nrun = 0, nfinish = 0, nblock = 0;
595 :
596 114 : MT_lock_set(&mal_contextLock);
597 7350 : for (Client c = mal_clients; c < mal_clients + MAL_MAXCLIENTS; c++) {
598 7236 : switch (c->mode) {
599 0 : case RUNCLIENT:
600 : /* running */
601 0 : nrun++;
602 0 : if (c->idle)
603 0 : printf("client %d, user %s, using %"PRIu64" bytes of transient space, idle since %s", c->idx, c->username, (uint64_t) ATOMIC_GET(&c->qryctx.datasize), ctime(&c->idle));
604 : else
605 0 : printf("client %d, user %s, using %"PRIu64" bytes of transient space\n", c->idx, c->username, (uint64_t) ATOMIC_GET(&c->qryctx.datasize));
606 : break;
607 0 : case FINISHCLIENT:
608 : /* finishing */
609 0 : nfinish++;
610 0 : break;
611 0 : case BLOCKCLIENT:
612 : /* blocked */
613 0 : nblock++;
614 0 : break;
615 : case FREECLIENT:
616 : break;
617 : }
618 : }
619 114 : MT_lock_unset(&mal_contextLock);
620 114 : printf("%d active clients, %d finishing clients, %d blocked clients\n",
621 : nrun, nfinish, nblock);
622 114 : }
|