Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * Clients gain access to the Monet server through a internet connection.
15 : * Access through the internet requires a client program at the source,
16 : * which addresses the default port of a running server. It is a textual
17 : * interface for expert use.
18 : *
19 : * At the server side, each client is represented by a session record
20 : * with the current status, such as name, file descriptors, namespace,
21 : * and local stack. Each client session has a dedicated thread of
22 : * control.
23 : *
24 : * The number of clients permitted concurrent access is a run time
25 : * option.
26 : *
27 : * Client sessions remain in existence until the corresponding
28 : * communication channels break.
29 : *
30 : * A client record is initialized upon acceptance of a connection. The
31 : * client runs in his own thread of control until it finds a
32 : * soft-termination request mode (FINISHCLIENT) or its IO file descriptors
33 : * are closed. The latter generates an IO error, which leads to a safe
34 : * termination.
35 : *
36 : * The system administrator client runs in the primary thread of control
37 : * to simplify debugging with external debuggers.
38 : *
39 : * Searching a free client record is encapsulated in a critical section
40 : * to hand them out one-at-a-time. Marking them as being claimed avoids
41 : * any interference from parallel actions to obtain client records.
42 : */
43 :
44 : /* (author) M.L. Kersten */
45 : #include "monetdb_config.h"
46 : #include "mal_client.h"
47 : #include "mal_import.h"
48 : #include "mal_parser.h"
49 : #include "mal_namespace.h"
50 : #include "mal_private.h"
51 : #include "mal_internal.h"
52 : #include "mal_interpreter.h"
53 : #include "mal_runtime.h"
54 : #include "mal_authorize.h"
55 : #include "mapi_prompt.h"
56 :
57 : int MAL_MAXCLIENTS = 0;
58 : ClientRec *mal_clients = NULL;
59 :
60 : void
61 339 : mal_client_reset(void)
62 : {
63 339 : if (mal_clients) {
64 339 : GDKfree(mal_clients);
65 339 : mal_clients = NULL;
66 : }
67 339 : MAL_MAXCLIENTS = 0;
68 339 : }
69 :
70 : bool
71 341 : MCinit(void)
72 : {
73 341 : const char *max_clients = GDKgetenv("max_clients");
74 341 : int maxclients = 0;
75 :
76 341 : if (max_clients != NULL)
77 1 : maxclients = atoi(max_clients);
78 1 : if (maxclients <= 0) {
79 340 : maxclients = 64;
80 340 : if (GDKsetenv("max_clients", "64") != GDK_SUCCEED) {
81 0 : TRC_CRITICAL(MAL_SERVER,
82 : "Initialization failed: " MAL_MALLOC_FAIL "\n");
83 0 : return false;
84 : }
85 : }
86 :
87 341 : MAL_MAXCLIENTS = /* client connections */ maxclients;
88 341 : mal_clients = GDKzalloc(sizeof(ClientRec) * MAL_MAXCLIENTS);
89 341 : if (mal_clients == NULL) {
90 0 : TRC_CRITICAL(MAL_SERVER,
91 : "Initialization failed: " MAL_MALLOC_FAIL "\n");
92 0 : return false;
93 : }
94 22105 : for (int i = 0; i < MAL_MAXCLIENTS; i++) {
95 21764 : ATOMIC_INIT(&mal_clients[i].lastprint, 0);
96 21764 : ATOMIC_INIT(&mal_clients[i].workers, 1);
97 21764 : ATOMIC_INIT(&mal_clients[i].qryctx.datasize, 0);
98 21764 : mal_clients[i].idx = -1; /* indicate it's available */
99 : }
100 : return true;
101 : }
102 :
103 : /* stack the files from which you read */
104 : int
105 0 : MCpushClientInput(Client c, bstream *new_input, int listing, const char *prompt)
106 : {
107 0 : ClientInput *x = (ClientInput *) GDKmalloc(sizeof(ClientInput));
108 0 : if (x == 0)
109 : return -1;
110 0 : x->fdin = c->fdin;
111 0 : x->yycur = c->yycur;
112 0 : x->listing = c->listing;
113 0 : x->prompt = c->prompt;
114 0 : x->next = c->bak;
115 0 : c->bak = x;
116 0 : c->fdin = new_input;
117 0 : c->qryctx.bs = new_input;
118 0 : c->listing = listing;
119 0 : c->prompt = prompt ? prompt : "";
120 0 : c->promptlength = strlen(c->prompt);
121 0 : c->yycur = 0;
122 0 : return 0;
123 : }
124 :
125 : void
126 0 : MCpopClientInput(Client c)
127 : {
128 0 : ClientInput *x = c->bak;
129 0 : if (c->fdin) {
130 : /* missing protection against closing stdin stream */
131 0 : bstream_destroy(c->fdin);
132 : }
133 0 : c->fdin = x->fdin;
134 0 : c->qryctx.bs = c->fdin;
135 0 : c->yycur = x->yycur;
136 0 : c->listing = x->listing;
137 0 : c->prompt = x->prompt;
138 0 : c->promptlength = strlen(c->prompt);
139 0 : c->bak = x->next;
140 0 : GDKfree(x);
141 0 : }
142 :
143 : static Client
144 38339 : MCnewClient(void)
145 : {
146 508336 : for (Client c = mal_clients; c < mal_clients + MAL_MAXCLIENTS; c++) {
147 508336 : if (c->idx == -1) {
148 38339 : assert(c->mode == FREECLIENT);
149 38339 : c->mode = RUNCLIENT;
150 38339 : c->idx = (int) (c - mal_clients);
151 38339 : return c;
152 : }
153 : }
154 :
155 : return NULL;
156 : }
157 :
158 : /*
159 : * You can always retrieve a client record using the thread identifier,
160 : * because we maintain a 1-1 mapping between client and thread of
161 : * control. Therefore, we don't need locks either.
162 : * If the number of clients becomes too large, we have to change the
163 : * allocation and lookup scheme.
164 : *
165 : * Finding a client record is tricky when we are spawning threads as
166 : * co-workers. It is currently passed as an argument.
167 : */
168 :
169 : Client
170 376791 : MCgetClient(int id)
171 : {
172 376791 : if (id <0 || id >=MAL_MAXCLIENTS)
173 : return NULL;
174 376791 : return mal_clients + id;
175 : }
176 :
177 : /*
178 : * The resetProfiler is called when the owner of the event stream
179 : * leaves the scene. (Unclear if parallelism may cause errors)
180 : */
181 :
182 : static void
183 38338 : MCresetProfiler(stream *fdout)
184 : {
185 38338 : if (fdout != maleventstream)
186 : return;
187 0 : MT_lock_set(&mal_profileLock);
188 0 : maleventstream = NULL;
189 0 : profilerStatus = 0;
190 0 : profilerMode = 0;
191 0 : MT_lock_unset(&mal_profileLock);
192 : }
193 :
194 : static void
195 38338 : MCexitClient(Client c)
196 : {
197 38338 : MCresetProfiler(c->fdout);
198 : // Remove any left over constant symbols
199 38338 : if (c->curprg)
200 340 : resetMalBlk(c->curprg->def);
201 38338 : if (c->father == NULL) { /* normal client */
202 38338 : if (c->fdout && c->fdout != GDKstdout)
203 37975 : close_stream(c->fdout);
204 38338 : assert(c->bak == NULL);
205 38338 : if (c->fdin) {
206 : /* protection against closing stdin stream */
207 38338 : if (c->fdin->s == GDKstdin)
208 363 : c->fdin->s = NULL;
209 38338 : bstream_destroy(c->fdin);
210 : }
211 38338 : c->fdout = NULL;
212 38338 : c->fdin = NULL;
213 38338 : c->qryctx.bs = NULL;
214 : }
215 38338 : assert(c->query == NULL);
216 38338 : if (profilerStatus > 0) {
217 0 : lng Tend = GDKusec();
218 0 : profilerEvent(NULL,
219 : &(struct NonMalEvent)
220 0 : { CLIENT_END, c, Tend, NULL, NULL, 0,
221 0 : Tend - (c->session) });
222 : }
223 38338 : }
224 :
225 : static Client
226 38339 : MCinitClientRecord(Client c, oid user, bstream *fin, stream *fout)
227 : {
228 : /* mal_contextLock is held when this is called */
229 38339 : c->user = user;
230 38339 : c->username = 0;
231 38339 : c->scenario = NULL;
232 38339 : c->srcFile = NULL;
233 38339 : c->blkmode = 0;
234 :
235 38339 : c->fdin = fin ? fin : bstream_create(GDKstdin, 0);
236 38339 : if (c->fdin == NULL) {
237 0 : c->mode = FREECLIENT;
238 0 : c->idx = -1;
239 0 : TRC_ERROR(MAL_SERVER, "No stdin channel available\n");
240 0 : return NULL;
241 : }
242 38339 : c->qryctx.bs = c->fdin;
243 38339 : c->yycur = 0;
244 38339 : c->bak = NULL;
245 :
246 38339 : c->listing = 0;
247 38339 : c->fdout = fout ? fout : GDKstdout;
248 38339 : c->curprg = c->backup = 0;
249 38339 : c->glb = 0;
250 :
251 : /* remove garbage from previous connection
252 : * be aware, a user can introduce several modules
253 : * that should be freed to avoid memory leaks */
254 38339 : c->usermodule = c->curmodule = 0;
255 :
256 38339 : c->father = NULL;
257 38339 : c->idle = c->login = c->lastcmd = time(0);
258 38339 : c->session = GDKusec();
259 38339 : strcpy_len(c->optimizer, "default_pipe", sizeof(c->optimizer));
260 38339 : c->workerlimit = 0;
261 38339 : c->memorylimit = 0;
262 38339 : c->querytimeout = 0;
263 38339 : c->sessiontimeout = 0;
264 38339 : c->logical_sessiontimeout = 0;
265 38339 : c->qryctx.starttime = 0;
266 38339 : c->qryctx.endtime = 0;
267 38339 : ATOMIC_SET(&c->qryctx.datasize, 0);
268 38339 : c->qryctx.maxmem = 0;
269 38339 : c->maxmem = 0;
270 38339 : c->errbuf = 0;
271 :
272 38339 : c->prompt = PROMPT1;
273 38339 : c->promptlength = strlen(c->prompt);
274 :
275 38339 : c->profticks = c->profstmt = c->profevents = NULL;
276 38339 : c->error_row = c->error_fld = c->error_msg = c->error_input = NULL;
277 38339 : c->sqlprofiler = 0;
278 38339 : c->blocksize = BLOCK;
279 38339 : c->protocol = PROTOCOL_9;
280 :
281 38339 : c->filetrans = false;
282 38339 : c->handshake_options = NULL;
283 38339 : c->query = NULL;
284 :
285 38339 : char name[MT_NAME_LEN];
286 38339 : snprintf(name, sizeof(name), "Client%d->s", (int) (c - mal_clients));
287 38339 : MT_sema_init(&c->s, 0, name);
288 38339 : return c;
289 : }
290 :
291 : Client
292 38339 : MCinitClient(oid user, bstream *fin, stream *fout)
293 : {
294 38339 : Client c = NULL;
295 :
296 38339 : MT_lock_set(&mal_contextLock);
297 38339 : c = MCnewClient();
298 38339 : if (c) {
299 38339 : c = MCinitClientRecord(c, user, fin, fout);
300 38339 : MT_thread_set_qry_ctx(&c->qryctx);
301 : }
302 38339 : MT_lock_unset(&mal_contextLock);
303 :
304 38339 : if (c && profilerStatus > 0)
305 0 : profilerEvent(NULL,
306 : &(struct NonMalEvent)
307 0 : { CLIENT_START, c, c->session, NULL, NULL, 0, 0 }
308 : );
309 38339 : return c;
310 : }
311 :
312 :
313 : /*
314 : * The administrator should be initialized to enable interpretation of
315 : * the command line arguments, before it starts servicing statements
316 : */
317 : int
318 38303 : MCinitClientThread(Client c)
319 : {
320 : /*
321 : * The GDK thread administration should be set to reflect use of
322 : * the proper IO descriptors.
323 : */
324 38303 : c->mythread = MT_thread_getname();
325 38304 : c->errbuf = GDKerrbuf;
326 38304 : if (c->errbuf == NULL) {
327 38304 : char *n = GDKzalloc(GDKMAXERRLEN);
328 38305 : if (n == NULL) {
329 0 : MCresetProfiler(c->fdout);
330 0 : return -1;
331 : }
332 38305 : GDKsetbuf(n);
333 38304 : c->errbuf = GDKerrbuf;
334 : } else
335 0 : c->errbuf[0] = 0;
336 : return 0;
337 : }
338 :
339 : static bool shutdowninprogress = false;
340 :
341 : bool
342 0 : MCshutdowninprogress(void)
343 : {
344 0 : MT_lock_set(&mal_contextLock);
345 0 : bool ret = shutdowninprogress;
346 0 : MT_lock_unset(&mal_contextLock);
347 0 : return ret;
348 : }
349 :
350 : /*
351 : * When a client needs to be terminated then the file descriptors for
352 : * its input/output are simply closed. This leads to a graceful
353 : * degradation, but may take some time when the client is busy. A more
354 : * forceful method is to kill the client thread, but this may leave
355 : * locks and semaphores in an undesirable state.
356 : *
357 : * The routine freeClient ends a single client session, but through side
358 : * effects of sharing IO descriptors, also its children. Conversely, a
359 : * child can not close a parent.
360 : */
361 : void
362 38336 : MCcloseClient(Client c)
363 : {
364 38336 : MT_lock_set(&mal_contextLock);
365 38338 : if (c->mode == FREECLIENT) {
366 0 : assert(c->idx == -1);
367 0 : MT_lock_unset(&mal_contextLock);
368 0 : return;
369 : }
370 38338 : c->mode = FINISHCLIENT;
371 38338 : MT_lock_unset(&mal_contextLock);
372 :
373 38338 : MCexitClient(c);
374 :
375 : /* scope list and curprg can not be removed, because the client may
376 : * reside in a quit() command. Therefore the scopelist is re-used.
377 : */
378 38339 : c->scenario = NULL;
379 38339 : c->prompt = NULL;
380 38339 : c->promptlength = -1;
381 38339 : if (c->errbuf) {
382 : /* no client threads in embedded mode */
383 38305 : GDKsetbuf(NULL);
384 38303 : if (c->father == NULL)
385 38304 : GDKfree(c->errbuf);
386 38304 : c->errbuf = NULL;
387 : }
388 38338 : if (c->usermodule)
389 343 : freeModule(c->usermodule);
390 38338 : c->usermodule = c->curmodule = 0;
391 38338 : c->father = 0;
392 38338 : strcpy_len(c->optimizer, "default_pipe", sizeof(c->optimizer));
393 38336 : c->workerlimit = 0;
394 38336 : c->memorylimit = 0;
395 38336 : c->querytimeout = 0;
396 38336 : c->qryctx.endtime = 0;
397 38336 : c->sessiontimeout = 0;
398 38336 : c->logical_sessiontimeout = 0;
399 38336 : c->user = oid_nil;
400 38336 : if (c->username) {
401 37970 : GDKfree(c->username);
402 37972 : c->username = 0;
403 : }
404 38338 : c->mythread = NULL;
405 38338 : if (c->glb) {
406 38334 : freeStack(c->glb);
407 38334 : c->glb = NULL;
408 : }
409 38338 : if (c->profticks) {
410 21 : BBPunfix(c->profticks->batCacheid);
411 21 : BBPunfix(c->profstmt->batCacheid);
412 21 : BBPunfix(c->profevents->batCacheid);
413 21 : c->profticks = c->profstmt = c->profevents = NULL;
414 : }
415 38338 : if (c->error_row) {
416 451 : BBPunfix(c->error_row->batCacheid);
417 451 : BBPunfix(c->error_fld->batCacheid);
418 451 : BBPunfix(c->error_msg->batCacheid);
419 451 : BBPunfix(c->error_input->batCacheid);
420 451 : c->error_row = c->error_fld = c->error_msg = c->error_input = NULL;
421 : }
422 38338 : c->sqlprofiler = 0;
423 38338 : free(c->handshake_options);
424 38338 : c->handshake_options = NULL;
425 38338 : MT_thread_set_qry_ctx(NULL);
426 38338 : assert(c->qryctx.datasize == 0);
427 38338 : MT_sema_destroy(&c->s);
428 38337 : MT_lock_set(&mal_contextLock);
429 38338 : c->idle = c->login = c->lastcmd = 0;
430 38338 : if (shutdowninprogress) {
431 32 : c->mode = BLOCKCLIENT;
432 : } else {
433 38306 : c->mode = FREECLIENT;
434 38306 : c->idx = -1;
435 : }
436 38338 : MT_lock_unset(&mal_contextLock);
437 : }
438 :
439 : /*
440 : * If a client disappears from the scene (eof on stream), we should
441 : * terminate all its children. This is in principle a forceful action,
442 : * because the children may be ignoring the primary IO streams.
443 : * (Instead they may be blocked in an infinite loop)
444 : *
445 : * Special care should be taken by closing the 'adm' thread. It is
446 : * permitted to leave only when it is the sole user of the system.
447 : *
448 : * Furthermore, once we enter closeClient, the process in which it is
449 : * raised has already lost its file descriptors.
450 : *
451 : * When the server is about to shutdown, we should softly terminate
452 : * all outstanding session.
453 : */
454 : void
455 341 : MCstopClients(Client cntxt)
456 : {
457 341 : MT_lock_set(&mal_contextLock);
458 22105 : for (int i = 0; i < MAL_MAXCLIENTS; i++) {
459 21764 : Client c = mal_clients + i;
460 21764 : if (cntxt != c) {
461 21762 : if (c->mode == RUNCLIENT)
462 0 : c->mode = FINISHCLIENT;
463 21762 : else if (c->mode == FREECLIENT) {
464 21607 : assert(c->idx == -1);
465 21607 : c->idx = i;
466 21607 : c->mode = BLOCKCLIENT;
467 : }
468 : }
469 : }
470 341 : shutdowninprogress = true;
471 341 : MT_lock_unset(&mal_contextLock);
472 341 : }
473 :
474 : int
475 45485 : MCactiveClients(void)
476 : {
477 45485 : int active = 0;
478 :
479 45485 : MT_lock_set(&mal_contextLock);
480 2955025 : for (Client cntxt = mal_clients; cntxt < mal_clients + MAL_MAXCLIENTS;
481 2909540 : cntxt++) {
482 5663964 : active += (cntxt->idle == 0 && cntxt->mode == RUNCLIENT);
483 : }
484 45485 : MT_lock_unset(&mal_contextLock);
485 45485 : return active;
486 : }
487 :
488 : str
489 0 : MCsuspendClient(int id)
490 : {
491 0 : if (id <0 || id >=MAL_MAXCLIENTS)
492 0 : throw(INVCRED, "mal.clients", INVCRED_WRONG_ID);
493 : return MAL_SUCCEED;
494 : }
495 :
496 : str
497 0 : MCawakeClient(int id)
498 : {
499 0 : if (id <0 || id >=MAL_MAXCLIENTS)
500 0 : throw(INVCRED, "mal.clients", INVCRED_WRONG_ID);
501 : return MAL_SUCCEED;
502 : }
503 :
504 : /*
505 : * Input to be processed is collected in a Client specific buffer. It
506 : * is filled by reading information from a stream, a terminal, or by
507 : * scheduling strings constructed internally. The latter involves
508 : * removing any escape character needed to manipulate the string within
509 : * the kernel. The buffer space is automatically expanded to
510 : * accommodate new information and the read pointers are adjusted.
511 : *
512 : * The input is read from a (blocked) stream and stored in the client
513 : * record input buffer. The storage area grows automatically upon need.
514 : * The origin of the input stream depends on the connectivity mode.
515 : *
516 : * Each operation received from a front-end consists of at least one
517 : * line. To simplify misaligned communication with front-ends, we use
518 : * different prompts structures.
519 : *
520 : * The default action is to read information from an ascii-stream one
521 : * line at a time. This is the preferred mode for reading from terminal.
522 : *
523 : * The next statement block is to be read. Send a prompt to warn the
524 : * front-end to issue the request.
525 : */
526 : int
527 10908 : MCreadClient(Client c)
528 : {
529 10908 : bstream *in = c->fdin;
530 :
531 12304 : while (in->pos < in->len &&
532 1396 : (isspace((unsigned char) (in->buf[in->pos])) ||
533 0 : in->buf[in->pos] == ';' || !in->buf[in->pos]))
534 1396 : in->pos++;
535 :
536 10908 : if (in->pos >= in->len || in->mode) {
537 10908 : ssize_t rd;
538 :
539 10908 : if (in->eof || !isa_block_stream(c->fdout)) {
540 10908 : if (!isa_block_stream(c->fdout) && c->promptlength > 0)
541 0 : mnstr_write(c->fdout, c->prompt, c->promptlength, 1);
542 10908 : mnstr_flush(c->fdout, MNSTR_FLUSH_DATA);
543 10902 : in->eof = false;
544 : }
545 21313 : while ((rd = bstream_next(in)) > 0 && !in->eof) {
546 10411 : if (!in->mode) /* read one line at a time in line mode */
547 : break;
548 : }
549 10905 : if (in->mode) { /* find last new line */
550 10905 : char *p = in->buf + in->len - 1;
551 :
552 10905 : while (p > in->buf && *p != '\n') {
553 0 : *(p + 1) = *p;
554 0 : p--;
555 : }
556 10905 : if (p > in->buf)
557 10413 : *(p + 1) = 0;
558 10905 : if (p != in->buf + in->len - 1)
559 0 : in->len++;
560 : }
561 : }
562 10905 : if (in->pos >= in->len) {
563 : /* end of stream reached */
564 493 : if (c->bak) {
565 0 : MCpopClientInput(c);
566 0 : if (c->fdin == NULL)
567 : return 0;
568 : return MCreadClient(c);
569 : }
570 : return 0;
571 : }
572 : return 1;
573 : }
574 :
575 : int
576 51 : MCvalid(Client tc)
577 : {
578 51 : if (tc == NULL) {
579 : return 0;
580 : }
581 51 : MT_lock_set(&mal_contextLock);
582 111 : for (Client c = mal_clients; c < mal_clients + MAL_MAXCLIENTS; c++) {
583 111 : if (c == tc && c->mode == RUNCLIENT) {
584 51 : MT_lock_unset(&mal_contextLock);
585 51 : return 1;
586 : }
587 : }
588 0 : MT_lock_unset(&mal_contextLock);
589 0 : return 0;
590 : }
591 :
592 : void
593 114 : MCprintinfo(void)
594 : {
595 114 : int nrun = 0, nfinish = 0, nblock = 0;
596 :
597 114 : MT_lock_set(&mal_contextLock);
598 7350 : for (Client c = mal_clients; c < mal_clients + MAL_MAXCLIENTS; c++) {
599 7236 : switch (c->mode) {
600 0 : case RUNCLIENT:
601 : /* running */
602 0 : nrun++;
603 0 : if (c->idle)
604 0 : printf("client %d, user %s, using %"PRIu64" bytes of transient space, idle since %s", c->idx, c->username, (uint64_t) ATOMIC_GET(&c->qryctx.datasize), ctime(&c->idle));
605 : else
606 0 : printf("client %d, user %s, using %"PRIu64" bytes of transient space\n", c->idx, c->username, (uint64_t) ATOMIC_GET(&c->qryctx.datasize));
607 : break;
608 0 : case FINISHCLIENT:
609 : /* finishing */
610 0 : nfinish++;
611 0 : break;
612 0 : case BLOCKCLIENT:
613 : /* blocked */
614 0 : nblock++;
615 0 : break;
616 : case FREECLIENT:
617 : break;
618 : }
619 : }
620 114 : MT_lock_unset(&mal_contextLock);
621 114 : printf("%d active clients, %d finishing clients, %d blocked clients\n",
622 : nrun, nfinish, nblock);
623 114 : }
|