Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * (author) M Kersten, S Mullender
15 : * Dataflow processing only works on a code
16 : * sequence that does not include additional (implicit) flow of control
17 : * statements and, ideally, consist of expensive BAT operations.
18 : * The dataflow portion is identified as a guarded block,
19 : * whose entry is controlled by the function language.dataflow();
20 : *
21 : * The dataflow worker tries to follow the sequence of actions
22 : * as layed out in the plan, but abandon this track when it hits
23 : * a blocking operator, or an instruction for which not all arguments
24 : * are available or resources become scarce.
25 : *
26 : * The flow graphs is organized such that parallel threads can
27 : * access it mostly without expensive locking and dependent
28 : * variables are easy to find..
29 : */
30 : #include "monetdb_config.h"
31 : #include "mal_dataflow.h"
32 : #include "mal_exception.h"
33 : #include "mal_private.h"
34 : #include "mal_internal.h"
35 : #include "mal_runtime.h"
36 : #include "mal_resource.h"
37 : #include "mal_function.h"
38 :
39 : #define DFLOWpending 0 /* runnable */
40 : #define DFLOWrunning 1 /* currently in progress */
41 : #define DFLOWwrapup 2 /* done! */
42 : #define DFLOWretry 3 /* reschedule */
43 : #define DFLOWskipped 4 /* due to errors */
44 :
45 : /* The per instruction status of execution */
46 : typedef struct FLOWEVENT {
47 : struct DATAFLOW *flow; /* execution context */
48 : int pc; /* pc in underlying malblock */
49 : int blocks; /* awaiting for variables */
50 : sht state; /* of execution */
51 : lng clk;
52 : sht cost;
53 : lng hotclaim; /* memory foot print of result variables */
54 : lng argclaim; /* memory foot print of arguments */
55 : lng maxclaim; /* memory foot print of largest argument, could be used to indicate result size */
56 : struct FLOWEVENT *next; /* linked list for queues */
57 : } *FlowEvent, FlowEventRec;
58 :
59 : typedef struct queue {
60 : int exitcount; /* how many threads should exit */
61 : FlowEvent first, last; /* first and last element of the queue */
62 : MT_Lock l; /* it's a shared resource, ie we need locks */
63 : MT_Sema s; /* threads wait on empty queues */
64 : } Queue;
65 :
66 : /*
67 : * The dataflow dependency is administered in a graph list structure.
68 : * For each instruction we keep the list of instructions that
69 : * should be checked for eligibility once we are finished with it.
70 : */
71 : typedef struct DATAFLOW {
72 : Client cntxt; /* for debugging and client resolution */
73 : MalBlkPtr mb; /* carry the context */
74 : MalStkPtr stk;
75 : int start, stop; /* guarded block under consideration */
76 : FlowEvent status; /* status of each instruction */
77 : ATOMIC_PTR_TYPE error; /* error encountered */
78 : int *nodes; /* dependency graph nodes */
79 : int *edges; /* dependency graph */
80 : MT_Lock flowlock; /* lock to protect the above */
81 : Queue *done; /* instructions handled */
82 : bool set_qry_ctx;
83 : } *DataFlow, DataFlowRec;
84 :
85 : struct worker {
86 : MT_Id id;
87 : enum { WAITING, RUNNING, FREE, EXITED, FINISHING } flag;
88 : ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */
89 : MT_Sema s;
90 : struct worker *next;
91 : char errbuf[GDKMAXERRLEN]; /* GDKerrbuf so that we can allocate before fork */
92 : };
93 : /* heads of three mutually exclusive linked lists, all using the .next
94 : * field in the worker struct */
95 : static struct worker *workers; /* "working" workers */
96 : static struct worker *exited_workers; /* to be joined threads (.flag==EXITED) */
97 : static struct worker *free_workers; /* free workers (.flag==FREE) */
98 : static int free_count = 0; /* number of free threads */
99 : static int free_max = 0; /* max number of spare free threads */
100 :
101 : static Queue *todo = 0; /* pending instructions */
102 :
103 : static ATOMIC_TYPE exiting = ATOMIC_VAR_INIT(0);
104 : static MT_Lock dataflowLock = MT_LOCK_INITIALIZER(dataflowLock);
105 :
106 : /*
107 : * Calculate the size of the dataflow dependency graph.
108 : */
109 : static int
110 : DFLOWgraphSize(MalBlkPtr mb, int start, int stop)
111 : {
112 : int cnt = 0;
113 :
114 8076385 : for (int i = start; i < stop; i++)
115 7931488 : cnt += getInstrPtr(mb, i)->argc;
116 144897 : return cnt;
117 : }
118 :
119 : /*
120 : * The dataflow execution is confined to a barrier block.
121 : * Within the block there are multiple flows, which, in principle,
122 : * can be executed in parallel.
123 : */
124 :
125 : static Queue *
126 145232 : q_create(const char *name)
127 : {
128 145232 : Queue *q = GDKzalloc(sizeof(Queue));
129 :
130 145232 : if (q == NULL)
131 : return NULL;
132 145232 : MT_lock_init(&q->l, name);
133 145231 : MT_sema_init(&q->s, 0, name);
134 145232 : return q;
135 : }
136 :
137 : static void
138 144896 : q_destroy(Queue *q)
139 : {
140 144896 : assert(q);
141 144896 : MT_lock_destroy(&q->l);
142 144896 : MT_sema_destroy(&q->s);
143 144897 : GDKfree(q);
144 144897 : }
145 :
146 : /* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue is possible */
147 : /* we might actually sort it for better scheduling behavior */
148 : static void
149 12501224 : q_enqueue(Queue *q, FlowEvent d)
150 : {
151 12501224 : assert(q);
152 12501224 : assert(d);
153 12501224 : MT_lock_set(&q->l);
154 12498739 : if (q->first == NULL) {
155 4304903 : assert(q->last == NULL);
156 4304903 : q->first = q->last = d;
157 : } else {
158 8193836 : assert(q->last != NULL);
159 8193836 : q->last->next = d;
160 8193836 : q->last = d;
161 : }
162 12498739 : d->next = NULL;
163 12498739 : MT_lock_unset(&q->l);
164 12500697 : MT_sema_up(&q->s);
165 12498631 : }
166 :
167 : /*
168 : * A priority queue over the hot claims of memory may
169 : * be more effective. It priorizes those instructions
170 : * that want to use a big recent result
171 : */
172 :
173 : static void
174 0 : q_requeue(Queue *q, FlowEvent d)
175 : {
176 0 : assert(q);
177 0 : assert(d);
178 0 : MT_lock_set(&q->l);
179 0 : if (q->first == NULL) {
180 0 : assert(q->last == NULL);
181 0 : q->first = q->last = d;
182 0 : d->next = NULL;
183 : } else {
184 0 : assert(q->last != NULL);
185 0 : d->next = q->first;
186 0 : q->first = d;
187 : }
188 0 : MT_lock_unset(&q->l);
189 0 : MT_sema_up(&q->s);
190 0 : }
191 :
192 : static FlowEvent
193 12870817 : q_dequeue(Queue *q, Client cntxt)
194 : {
195 12870817 : assert(q);
196 12870817 : MT_sema_down(&q->s);
197 12866849 : if (ATOMIC_GET(&exiting))
198 : return NULL;
199 12865548 : MT_lock_set(&q->l);
200 12847696 : if (cntxt == NULL && q->exitcount > 0) {
201 144897 : q->exitcount--;
202 144897 : MT_lock_unset(&q->l);
203 144897 : return NULL;
204 : }
205 :
206 12702799 : FlowEvent *dp = &q->first;
207 12702799 : FlowEvent pd = NULL;
208 : /* if cntxt == NULL, return the first event, if cntxt != NULL, find
209 : * the first event in the queue with matching cntxt value and return
210 : * that */
211 12702799 : if (cntxt != NULL) {
212 10460825 : while (*dp && (*dp)->flow->cntxt != cntxt) {
213 8733440 : pd = *dp;
214 8733440 : dp = &pd->next;
215 : }
216 : }
217 12702799 : FlowEvent d = *dp;
218 12702799 : if (d) {
219 12471708 : *dp = d->next;
220 12471708 : d->next = NULL;
221 12471708 : if (*dp == NULL)
222 4322163 : q->last = pd;
223 : }
224 12702799 : MT_lock_unset(&q->l);
225 12696114 : return d;
226 : }
227 :
228 : /*
229 : * We simply move an instruction into the front of the queue.
230 : * Beware, we assume that variables are assigned a value once, otherwise
231 : * the order may really create errors.
232 : * The order of the instructions should be retained as long as possible.
233 : * Delay processing when we run out of memory. Push the instruction back
234 : * on the end of queue, waiting for another attempt. Problem might become
235 : * that all threads but one are cycling through the queue, each time
236 : * finding an eligible instruction, but without enough space.
237 : * Therefore, we wait for a few milliseconds as an initial punishment.
238 : *
239 : * The process could be refined by checking for cheap operations,
240 : * i.e. those that would require no memory at all (aggr.count)
241 : * This, however, would lead to a dependency to the upper layers,
242 : * because in the kernel we don't know what routines are available
243 : * with this property. Nor do we maintain such properties.
244 : */
245 :
246 : static void
247 7741 : DFLOWworker(void *T)
248 : {
249 7741 : struct worker *t = (struct worker *) T;
250 7741 : bool locked = false;
251 : #ifdef _MSC_VER
252 : srand((unsigned int) GDKusec());
253 : #endif
254 7741 : GDKsetbuf(t->errbuf); /* where to leave errors */
255 7741 : snprintf(t->s.name, sizeof(t->s.name), "DFLOWsema%04zu", MT_getpid());
256 :
257 145899 : for (;;) {
258 145899 : DataFlow flow;
259 145899 : FlowEvent fe = 0, fnxt = 0;
260 145899 : str error = 0;
261 145899 : int i;
262 145899 : lng claim;
263 145899 : Client cntxt;
264 145899 : InstrPtr p;
265 :
266 145899 : GDKclrerr();
267 :
268 145900 : if (t->flag == WAITING) {
269 : /* wait until we are allowed to start working */
270 144895 : MT_sema_down(&t->s);
271 144892 : t->flag = RUNNING;
272 144892 : if (ATOMIC_GET(&exiting)) {
273 : break;
274 : }
275 : }
276 145897 : assert(t->flag == RUNNING);
277 145897 : cntxt = ATOMIC_PTR_GET(&t->cntxt);
278 8159651 : while (1) {
279 8159651 : MT_thread_set_qry_ctx(NULL);
280 8155001 : if (fnxt == 0) {
281 5089637 : MT_thread_setworking("waiting for work");
282 5090173 : cntxt = ATOMIC_PTR_GET(&t->cntxt);
283 5090173 : fe = q_dequeue(todo, cntxt);
284 5092371 : if (fe == NULL) {
285 377286 : if (cntxt) {
286 : /* we're not done yet with work for the current
287 : * client (as far as we know), so give up the CPU
288 : * and let the scheduler enter some more work, but
289 : * first compensate for the down we did in
290 : * dequeue */
291 231390 : MT_sema_up(&todo->s);
292 231388 : MT_sleep_ms(1);
293 8391038 : continue;
294 : }
295 : /* no more work to be done: exit */
296 145896 : break;
297 : }
298 4715085 : if (fe->flow->cntxt && fe->flow->cntxt->mythread)
299 4714281 : MT_thread_setworking(fe->flow->cntxt->mythread);
300 : } else
301 : fe = fnxt;
302 7780122 : if (ATOMIC_GET(&exiting)) {
303 : break;
304 : }
305 7780122 : fnxt = 0;
306 7780122 : assert(fe);
307 7780122 : flow = fe->flow;
308 7780122 : assert(flow);
309 7780122 : MT_thread_set_qry_ctx(flow->set_qry_ctx ? &flow->cntxt->
310 : qryctx : NULL);
311 :
312 : /* whenever we have a (concurrent) error, skip it */
313 7782744 : if (ATOMIC_PTR_GET(&flow->error)) {
314 4203 : q_enqueue(flow->done, fe);
315 4207 : continue;
316 : }
317 :
318 7778541 : p = getInstrPtr(flow->mb, fe->pc);
319 7778541 : claim = fe->argclaim;
320 15560423 : if (p->fcn != (MALfcn) deblockdataflow && /* never block on deblockdataflow() */
321 7778541 : !MALadmission_claim(flow->cntxt, flow->mb, flow->stk, p, claim)) {
322 0 : fe->hotclaim = 0; /* don't assume priority anymore */
323 0 : fe->maxclaim = 0;
324 0 : MT_lock_set(&todo->l);
325 0 : FlowEvent last = todo->last;
326 0 : MT_lock_unset(&todo->l);
327 0 : if (last == NULL)
328 0 : MT_sleep_ms(DELAYUNIT);
329 0 : q_requeue(todo, fe);
330 0 : continue;
331 : }
332 7781882 : ATOMIC_BASE_TYPE wrks = ATOMIC_INC(&flow->cntxt->workers);
333 7781882 : ATOMIC_BASE_TYPE mwrks = ATOMIC_GET(&flow->mb->workers);
334 7781894 : while (wrks > mwrks) {
335 15542 : if (ATOMIC_CAS(&flow->mb->workers, &mwrks, wrks))
336 : break;
337 : }
338 7781882 : error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1,
339 : flow->stk, 0, 0);
340 7779018 : ATOMIC_DEC(&flow->cntxt->workers);
341 : /* release the memory claim */
342 7779018 : MALadmission_release(flow->cntxt, flow->mb, flow->stk, p, claim);
343 :
344 7781857 : MT_lock_set(&flow->flowlock);
345 7782032 : fe->state = DFLOWwrapup;
346 7782032 : MT_lock_unset(&flow->flowlock);
347 7781815 : if (error) {
348 486 : void *null = NULL;
349 : /* only collect one error (from one thread, needed for stable testing) */
350 486 : if (!ATOMIC_PTR_CAS(&flow->error, &null, error))
351 9 : freeException(error);
352 : /* after an error we skip the rest of the block */
353 486 : q_enqueue(flow->done, fe);
354 486 : continue;
355 : }
356 :
357 : /* see if you can find an eligible instruction that uses the
358 : * result just produced. Then we can continue with it right away.
359 : * We are just looking forward for the last block, which means we
360 : * are safe from concurrent actions. No other thread can steal it,
361 : * because we hold the logical lock.
362 : * All eligible instructions are queued
363 : */
364 7781329 : p = getInstrPtr(flow->mb, fe->pc);
365 7781329 : assert(p);
366 7781329 : fe->hotclaim = 0;
367 7781329 : fe->maxclaim = 0;
368 :
369 16167304 : for (i = 0; i < p->retc; i++) {
370 8388600 : lng footprint;
371 8388600 : footprint = getMemoryClaim(flow->mb, flow->stk, p, i, FALSE);
372 8385975 : fe->hotclaim += footprint;
373 8385975 : if (footprint > fe->maxclaim)
374 3658658 : fe->maxclaim = footprint;
375 : }
376 :
377 : /* Try to get rid of the hot potato or locate an alternative to proceed.
378 : */
379 : #define HOTPOTATO
380 : #ifdef HOTPOTATO
381 : /* HOT potato choice */
382 7778704 : int last = 0, nxt = -1;
383 7778704 : lng nxtclaim = -1;
384 :
385 7778704 : MT_lock_set(&flow->flowlock);
386 7781618 : for (last = fe->pc - flow->start;
387 29537109 : last >= 0 && (i = flow->nodes[last]) > 0;
388 21755491 : last = flow->edges[last]) {
389 21755491 : if (flow->status[i].state == DFLOWpending
390 21754410 : && flow->status[i].blocks == 1) {
391 : /* find the one with the largest footprint */
392 5945525 : if (nxt == -1 || flow->status[i].argclaim > nxtclaim) {
393 3336184 : nxt = i;
394 3336184 : nxtclaim = flow->status[i].argclaim;
395 : }
396 : }
397 : }
398 : /* hot potato can not be removed, use alternative to proceed */
399 7781618 : if (nxt >= 0) {
400 3071742 : flow->status[nxt].state = DFLOWrunning;
401 3071742 : flow->status[nxt].blocks = 0;
402 3071742 : flow->status[nxt].hotclaim = fe->hotclaim;
403 3071742 : flow->status[nxt].argclaim += fe->hotclaim;
404 3071742 : if (flow->status[nxt].maxclaim < fe->maxclaim)
405 1585478 : flow->status[nxt].maxclaim = fe->maxclaim;
406 : fnxt = flow->status + nxt;
407 : }
408 7781618 : MT_lock_unset(&flow->flowlock);
409 : #endif
410 :
411 7781504 : q_enqueue(flow->done, fe);
412 7777674 : if (fnxt == 0 && profilerStatus) {
413 0 : profilerHeartbeatEvent("wait");
414 : }
415 : }
416 145896 : MT_lock_set(&dataflowLock);
417 145897 : if (GDKexiting() || ATOMIC_GET(&exiting) || free_count >= free_max) {
418 : locked = true;
419 : break;
420 : }
421 138750 : free_count++;
422 138750 : struct worker **tp = &workers;
423 543070 : while (*tp && *tp != t)
424 404320 : tp = &(*tp)->next;
425 138750 : assert(*tp && *tp == t);
426 138750 : *tp = t->next;
427 138750 : t->flag = FREE;
428 138750 : t->next = free_workers;
429 138750 : free_workers = t;
430 138750 : MT_lock_unset(&dataflowLock);
431 138750 : MT_thread_setworking("idle, waiting for new client");
432 138750 : MT_sema_down(&t->s);
433 138748 : if (GDKexiting() || ATOMIC_GET(&exiting))
434 : break;
435 138157 : assert(t->flag == WAITING);
436 : }
437 591 : if (!locked)
438 591 : MT_lock_set(&dataflowLock);
439 7738 : if (t->flag != FINISHING) {
440 7106 : struct worker **tp = t->flag == FREE ? &free_workers : &workers;
441 37258 : while (*tp && *tp != t)
442 30152 : tp = &(*tp)->next;
443 7106 : assert(*tp && *tp == t);
444 7106 : *tp = t->next;
445 7106 : t->flag = EXITED;
446 7106 : t->next = exited_workers;
447 7106 : exited_workers = t;
448 : }
449 7738 : MT_lock_unset(&dataflowLock);
450 7738 : GDKsetbuf(NULL);
451 7738 : }
452 :
453 : /*
454 : * Create an interpreter pool.
455 : * One worker will adaptively be available for each client.
456 : * The remainder are taken from the GDKnr_threads argument and
457 : * typically is equal to the number of cores
458 : * The workers are assembled in a local table to enable debugging.
459 : */
460 : static int
461 335 : DFLOWinitialize(void)
462 : {
463 335 : int limit;
464 335 : int created = 0;
465 :
466 335 : MT_lock_set(&mal_contextLock);
467 335 : MT_lock_set(&dataflowLock);
468 335 : if (todo) {
469 : /* somebody else beat us to it */
470 0 : MT_lock_unset(&dataflowLock);
471 0 : MT_lock_unset(&mal_contextLock);
472 0 : return 0;
473 : }
474 335 : free_max = GDKgetenv_int("dataflow_max_free",
475 : GDKnr_threads < 4 ? 4 : GDKnr_threads);
476 335 : todo = q_create("todo");
477 335 : if (todo == NULL) {
478 0 : MT_lock_unset(&dataflowLock);
479 0 : MT_lock_unset(&mal_contextLock);
480 0 : return -1;
481 : }
482 335 : limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
483 1338 : while (limit > 0) {
484 1003 : limit--;
485 1003 : struct worker *t = GDKmalloc(sizeof(*t));
486 1003 : if (t == NULL) {
487 0 : TRC_CRITICAL(MAL_SERVER, "cannot allocate structure for worker");
488 0 : continue;
489 : }
490 1003 : *t = (struct worker) {
491 : .flag = RUNNING,
492 : };
493 1003 : ATOMIC_PTR_INIT(&t->cntxt, NULL);
494 1003 : MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
495 1003 : if (MT_create_thread(&t->id, DFLOWworker, t,
496 : MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
497 0 : ATOMIC_PTR_DESTROY(&t->cntxt);
498 0 : MT_sema_destroy(&t->s);
499 0 : GDKfree(t);
500 : } else {
501 1003 : t->next = workers;
502 1003 : workers = t;
503 1003 : created++;
504 : }
505 : }
506 335 : if (created == 0) {
507 : /* no threads created */
508 0 : q_destroy(todo);
509 0 : todo = NULL;
510 0 : MT_lock_unset(&dataflowLock);
511 0 : MT_lock_unset(&mal_contextLock);
512 0 : return -1;
513 : }
514 335 : MT_lock_unset(&dataflowLock);
515 335 : MT_lock_unset(&mal_contextLock);
516 335 : return 0;
517 : }
518 :
519 : /*
520 : * The dataflow administration is based on administration of
521 : * how many variables are still missing before it can be executed.
522 : * For each instruction we keep a list of instructions whose
523 : * blocking counter should be decremented upon finishing it.
524 : */
525 : static str
526 144897 : DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size)
527 : {
528 144897 : int pc, i, j, k, l, n, etop = 0;
529 144897 : int *assign;
530 144897 : InstrPtr p;
531 :
532 144897 : if (flow == NULL)
533 0 : throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL");
534 144897 : if (mb == NULL)
535 0 : throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL");
536 144897 : assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
537 144897 : if (assign == NULL)
538 0 : throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
539 144897 : etop = flow->stop - flow->start;
540 7931772 : for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
541 7786875 : p = getInstrPtr(mb, pc);
542 7786875 : if (p == NULL) {
543 0 : GDKfree(assign);
544 0 : throw(MAL, "dataflow",
545 : "DFLOWinitBlk(): getInstrPtr() returned NULL");
546 : }
547 :
548 : /* initial state, ie everything can run */
549 7786875 : flow->status[n].flow = flow;
550 7786875 : flow->status[n].pc = pc;
551 7786875 : flow->status[n].state = DFLOWpending;
552 7786875 : flow->status[n].cost = -1;
553 7786875 : ATOMIC_PTR_SET(&flow->status[n].flow->error, NULL);
554 :
555 : /* administer flow dependencies */
556 36779763 : for (j = p->retc; j < p->argc; j++) {
557 : /* list of instructions that wake n-th instruction up */
558 28992888 : if (!isVarConstant(mb, getArg(p, j)) && (k = assign[getArg(p, j)])) {
559 13225138 : assert(k < pc); /* only dependencies on earlier instructions */
560 : /* add edge to the target instruction for wakeup call */
561 13225138 : k -= flow->start;
562 13225138 : if (flow->nodes[k]) {
563 : /* add wakeup to tail of list */
564 97737525 : for (i = k; flow->edges[i] > 0; i = flow->edges[i])
565 : ;
566 11513073 : flow->nodes[etop] = n;
567 11513073 : flow->edges[etop] = -1;
568 11513073 : flow->edges[i] = etop;
569 11513073 : etop++;
570 11513073 : (void) size;
571 11513073 : if (etop == size) {
572 220 : int *tmp;
573 : /* in case of realloc failure, the original
574 : * pointers will be freed by the caller */
575 220 : tmp = (int *) GDKrealloc(flow->nodes,
576 : sizeof(int) * 2 * size);
577 220 : if (tmp == NULL) {
578 0 : GDKfree(assign);
579 0 : throw(MAL, "dataflow",
580 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
581 : }
582 220 : flow->nodes = tmp;
583 220 : tmp = (int *) GDKrealloc(flow->edges,
584 : sizeof(int) * 2 * size);
585 220 : if (tmp == NULL) {
586 0 : GDKfree(assign);
587 0 : throw(MAL, "dataflow",
588 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
589 : }
590 220 : flow->edges = tmp;
591 220 : size *= 2;
592 : }
593 : } else {
594 1712065 : flow->nodes[k] = n;
595 1712065 : flow->edges[k] = -1;
596 : }
597 :
598 13225138 : flow->status[n].blocks++;
599 : }
600 :
601 : /* list of instructions to be woken up explicitly */
602 28992888 : if (!isVarConstant(mb, getArg(p, j))) {
603 : /* be careful, watch out for garbage collection interference */
604 : /* those should be scheduled after all its other uses */
605 14766836 : l = getEndScope(mb, getArg(p, j));
606 14766836 : if (l != pc && l < flow->stop && l > flow->start) {
607 : /* add edge to the target instruction for wakeup call */
608 8543154 : assert(pc < l); /* only dependencies on earlier instructions */
609 8543154 : l -= flow->start;
610 8543154 : if (flow->nodes[n]) {
611 : /* add wakeup to tail of list */
612 39398810 : for (i = n; flow->edges[i] > 0; i = flow->edges[i])
613 : ;
614 4280324 : flow->nodes[etop] = l;
615 4280324 : flow->edges[etop] = -1;
616 4280324 : flow->edges[i] = etop;
617 4280324 : etop++;
618 4280324 : if (etop == size) {
619 116 : int *tmp;
620 : /* in case of realloc failure, the original
621 : * pointers will be freed by the caller */
622 116 : tmp = (int *) GDKrealloc(flow->nodes,
623 : sizeof(int) * 2 * size);
624 116 : if (tmp == NULL) {
625 0 : GDKfree(assign);
626 0 : throw(MAL, "dataflow",
627 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
628 : }
629 116 : flow->nodes = tmp;
630 116 : tmp = (int *) GDKrealloc(flow->edges,
631 : sizeof(int) * 2 * size);
632 116 : if (tmp == NULL) {
633 0 : GDKfree(assign);
634 0 : throw(MAL, "dataflow",
635 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
636 : }
637 116 : flow->edges = tmp;
638 116 : size *= 2;
639 : }
640 : } else {
641 4262830 : flow->nodes[n] = l;
642 4262830 : flow->edges[n] = -1;
643 : }
644 8543154 : flow->status[l].blocks++;
645 : }
646 : }
647 : }
648 :
649 16181740 : for (j = 0; j < p->retc; j++)
650 8394865 : assign[getArg(p, j)] = pc; /* ensure recognition of dependency on first instruction and constant */
651 : }
652 144897 : GDKfree(assign);
653 :
654 144897 : return MAL_SUCCEED;
655 : }
656 :
657 : /*
658 : * Parallel processing is mostly driven by dataflow, but within this context
659 : * there may be different schemes to take instructions into execution.
660 : * The admission scheme (and wrapup) are the necessary scheduler hooks.
661 : * A scheduler registers the functions needed and should release them
662 : * at the end of the parallel block.
663 : * They take effect after we have ensured that the basic properties for
664 : * execution hold.
665 : */
666 : static str
667 144897 : DFLOWscheduler(DataFlow flow, struct worker *w)
668 : {
669 144897 : int last;
670 144897 : int i;
671 144897 : int j;
672 144897 : InstrPtr p;
673 144897 : int tasks = 0, actions = 0;
674 144897 : str ret = MAL_SUCCEED;
675 144897 : FlowEvent fe, f = 0;
676 :
677 144897 : if (flow == NULL)
678 0 : throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL");
679 144897 : actions = flow->stop - flow->start;
680 144897 : if (actions == 0)
681 0 : throw(MAL, "dataflow", "Empty dataflow block");
682 : /* initialize the eligible statements */
683 144897 : fe = flow->status;
684 :
685 144897 : ATOMIC_DEC(&flow->cntxt->workers);
686 144897 : MT_lock_set(&flow->flowlock);
687 8076765 : for (i = 0; i < actions; i++)
688 7786971 : if (fe[i].blocks == 0) {
689 897243 : p = getInstrPtr(flow->mb, fe[i].pc);
690 897243 : if (p == NULL) {
691 0 : MT_lock_unset(&flow->flowlock);
692 0 : ATOMIC_INC(&flow->cntxt->workers);
693 0 : throw(MAL, "dataflow",
694 : "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc) returned NULL");
695 : }
696 897243 : fe[i].argclaim = 0;
697 5276441 : for (j = p->retc; j < p->argc; j++)
698 4379198 : fe[i].argclaim += getMemoryClaim(fe[0].flow->mb,
699 4379200 : fe[0].flow->stk, p, j, FALSE);
700 897241 : flow->status[i].state = DFLOWrunning;
701 897241 : q_enqueue(todo, flow->status + i);
702 : }
703 144897 : MT_lock_unset(&flow->flowlock);
704 144897 : MT_sema_up(&w->s);
705 :
706 144897 : while (actions != tasks) {
707 7786589 : f = q_dequeue(flow->done, NULL);
708 7786705 : if (ATOMIC_GET(&exiting))
709 : break;
710 7786705 : if (f == NULL) {
711 0 : ATOMIC_INC(&flow->cntxt->workers);
712 0 : throw(MAL, "dataflow",
713 : "DFLOWscheduler(): q_dequeue(flow->done) returned NULL");
714 : }
715 :
716 : /*
717 : * When an instruction is finished we have to reduce the blocked
718 : * counter for all dependent instructions. for those where it
719 : * drops to zero we can scheduler it we do it here instead of the scheduler
720 : */
721 :
722 7786705 : MT_lock_set(&flow->flowlock);
723 7786273 : tasks++;
724 7786273 : for (last = f->pc - flow->start;
725 29552913 : last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last])
726 21766405 : if (flow->status[i].state == DFLOWpending) {
727 18695540 : flow->status[i].argclaim += f->hotclaim;
728 18695540 : if (flow->status[i].blocks == 1) {
729 3817850 : flow->status[i].blocks--;
730 3817850 : flow->status[i].state = DFLOWrunning;
731 3817850 : q_enqueue(todo, flow->status + i);
732 : } else {
733 14877690 : flow->status[i].blocks--;
734 : }
735 : }
736 7931405 : MT_lock_unset(&flow->flowlock);
737 : }
738 : /* release the worker from its specific task (turn it into a
739 : * generic worker) */
740 144897 : ATOMIC_PTR_SET(&w->cntxt, NULL);
741 144897 : ATOMIC_INC(&flow->cntxt->workers);
742 : /* wrap up errors */
743 144897 : assert(flow->done->last == 0);
744 144897 : if ((ret = ATOMIC_PTR_XCG(&flow->error, NULL)) != NULL) {
745 477 : TRC_DEBUG(MAL_SERVER, "Errors encountered: %s\n", ret);
746 : }
747 : return ret;
748 : }
749 :
750 : /* called and returns with dataflowLock locked, temporarily unlocks
751 : * join the thread associated with the worker and destroy the structure */
752 : static inline void
753 7738 : finish_worker(struct worker *t)
754 : {
755 7738 : t->flag = FINISHING;
756 7738 : MT_lock_unset(&dataflowLock);
757 7738 : MT_join_thread(t->id);
758 7738 : MT_sema_destroy(&t->s);
759 7738 : ATOMIC_PTR_DESTROY(&t->cntxt);
760 7738 : GDKfree(t);
761 7738 : MT_lock_set(&dataflowLock);
762 7738 : }
763 :
764 : /* We create a pool of GDKnr_threads-1 generic workers, that is,
765 : * workers that will take on jobs from any clients. In addition, we
766 : * create a single specific worker per client (i.e. each time we enter
767 : * here). This specific worker will only do work for the client for
768 : * which it was started. In this way we can guarantee that there will
769 : * always be progress for the client, even if all other workers are
770 : * doing something big.
771 : *
772 : * When all jobs for a client have been done (there are no more
773 : * entries for the client in the queue), the specific worker turns
774 : * itself into a generic worker. At the same time, we signal that one
775 : * generic worker should exit and this function returns. In this way
776 : * we make sure that there are once again GDKnr_threads-1 generic
777 : * workers. */
778 : str
779 144897 : runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc,
780 : MalStkPtr stk)
781 : {
782 144897 : DataFlow flow = NULL;
783 144897 : str msg = MAL_SUCCEED;
784 144897 : int size;
785 144897 : bit *ret;
786 144897 : struct worker *t;
787 :
788 144897 : if (stk == NULL)
789 0 : throw(MAL, "dataflow", "runMALdataflow(): Called with stk == NULL");
790 144897 : ret = getArgReference_bit(stk, getInstrPtr(mb, startpc), 0);
791 144897 : *ret = FALSE;
792 :
793 144897 : assert(stoppc > startpc);
794 :
795 : /* check existence of workers */
796 144897 : if (todo == NULL) {
797 : /* create thread pool */
798 335 : if (GDKnr_threads <= 1 || DFLOWinitialize() < 0) {
799 : /* no threads created, run serially */
800 0 : *ret = TRUE;
801 0 : return MAL_SUCCEED;
802 : }
803 : }
804 144897 : assert(todo);
805 : /* in addition, create one more worker that will only execute
806 : * tasks for the current client to compensate for our waiting
807 : * until all work is done */
808 144897 : MT_lock_set(&dataflowLock);
809 : /* join with already exited threads */
810 151044 : while (exited_workers != NULL) {
811 6147 : assert(exited_workers->flag == EXITED);
812 6147 : struct worker *t = exited_workers;
813 6147 : exited_workers = exited_workers->next;
814 6147 : finish_worker(t);
815 : }
816 144897 : assert(cntxt != NULL);
817 144897 : if (free_workers != NULL) {
818 138158 : t = free_workers;
819 138158 : assert(t->flag == FREE);
820 138158 : assert(free_count > 0);
821 138158 : free_count--;
822 138158 : free_workers = t->next;
823 138158 : t->next = workers;
824 138158 : workers = t;
825 138158 : t->flag = WAITING;
826 138158 : ATOMIC_PTR_SET(&t->cntxt, cntxt);
827 138158 : MT_sema_up(&t->s);
828 : } else {
829 6739 : t = GDKmalloc(sizeof(*t));
830 6739 : if (t != NULL) {
831 6739 : *t = (struct worker) {
832 : .flag = WAITING,
833 : };
834 6739 : ATOMIC_PTR_INIT(&t->cntxt, cntxt);
835 6739 : MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
836 6739 : if (MT_create_thread(&t->id, DFLOWworker, t,
837 : MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
838 0 : ATOMIC_PTR_DESTROY(&t->cntxt);
839 0 : MT_sema_destroy(&t->s);
840 0 : GDKfree(t);
841 0 : t = NULL;
842 : } else {
843 6739 : t->next = workers;
844 6739 : workers = t;
845 : }
846 : }
847 6739 : if (t == NULL) {
848 : /* cannot start new thread, run serially */
849 0 : *ret = TRUE;
850 0 : MT_lock_unset(&dataflowLock);
851 0 : return MAL_SUCCEED;
852 : }
853 : }
854 144897 : MT_lock_unset(&dataflowLock);
855 :
856 144897 : flow = (DataFlow) GDKzalloc(sizeof(DataFlowRec));
857 144897 : if (flow == NULL)
858 0 : throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
859 :
860 144897 : flow->cntxt = cntxt;
861 144897 : flow->mb = mb;
862 144897 : flow->stk = stk;
863 144897 : flow->set_qry_ctx = MT_thread_get_qry_ctx() != NULL;
864 :
865 : /* keep real block count, exclude brackets */
866 144897 : flow->start = startpc + 1;
867 144897 : flow->stop = stoppc;
868 :
869 144897 : flow->done = q_create("flow->done");
870 144897 : if (flow->done == NULL) {
871 0 : GDKfree(flow);
872 0 : throw(MAL, "dataflow",
873 : "runMALdataflow(): Failed to create flow->done queue");
874 : }
875 :
876 144897 : flow->status = (FlowEvent) GDKzalloc((stoppc - startpc + 1) *
877 : sizeof(FlowEventRec));
878 144897 : if (flow->status == NULL) {
879 0 : q_destroy(flow->done);
880 0 : GDKfree(flow);
881 0 : throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
882 : }
883 144897 : size = DFLOWgraphSize(mb, startpc, stoppc);
884 144897 : size += stoppc - startpc;
885 144897 : flow->nodes = (int *) GDKzalloc(sizeof(int) * size);
886 144897 : if (flow->nodes == NULL) {
887 0 : GDKfree(flow->status);
888 0 : q_destroy(flow->done);
889 0 : GDKfree(flow);
890 0 : throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
891 : }
892 144897 : flow->edges = (int *) GDKzalloc(sizeof(int) * size);
893 144897 : if (flow->edges == NULL) {
894 0 : GDKfree(flow->nodes);
895 0 : GDKfree(flow->status);
896 0 : q_destroy(flow->done);
897 0 : GDKfree(flow);
898 0 : throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
899 : }
900 144897 : MT_lock_init(&flow->flowlock, "flow->flowlock");
901 144897 : ATOMIC_PTR_INIT(&flow->error, NULL);
902 144897 : msg = DFLOWinitBlk(flow, mb, size);
903 :
904 144897 : if (msg == MAL_SUCCEED)
905 144897 : msg = DFLOWscheduler(flow, t);
906 :
907 144897 : GDKfree(flow->status);
908 144896 : GDKfree(flow->edges);
909 144897 : GDKfree(flow->nodes);
910 144896 : q_destroy(flow->done);
911 144897 : MT_lock_destroy(&flow->flowlock);
912 144897 : ATOMIC_PTR_DESTROY(&flow->error);
913 144897 : GDKfree(flow);
914 :
915 : /* we created one worker, now tell one worker to exit again */
916 144897 : MT_lock_set(&todo->l);
917 144897 : todo->exitcount++;
918 144897 : MT_lock_unset(&todo->l);
919 144897 : MT_sema_up(&todo->s);
920 :
921 144897 : return msg;
922 : }
923 :
924 : str
925 0 : deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
926 : {
927 0 : int *ret = getArgReference_int(stk, pci, 0);
928 0 : int *val = getArgReference_int(stk, pci, 1);
929 0 : (void) cntxt;
930 0 : (void) mb;
931 0 : *ret = *val;
932 0 : return MAL_SUCCEED;
933 : }
934 :
935 : static void
936 334 : stopMALdataflow(void)
937 : {
938 334 : ATOMIC_SET(&exiting, 1);
939 334 : if (todo) {
940 334 : MT_lock_set(&dataflowLock);
941 : /* first wake up all running threads */
942 334 : int n = 0;
943 925 : for (struct worker *t = free_workers; t; t = t->next)
944 591 : n++;
945 1334 : for (struct worker *t = workers; t; t = t->next)
946 1000 : n++;
947 1925 : while (n-- > 0) {
948 : /* one UP for each thread we know about */
949 1925 : MT_sema_up(&todo->s);
950 : }
951 925 : while (free_workers) {
952 591 : struct worker *t = free_workers;
953 591 : assert(free_count > 0);
954 591 : free_count--;
955 591 : free_workers = free_workers->next;
956 591 : MT_sema_up(&t->s);
957 591 : finish_worker(t);
958 : }
959 375 : while (workers) {
960 41 : struct worker *t = workers;
961 41 : workers = workers->next;
962 41 : finish_worker(t);
963 : }
964 1293 : while (exited_workers) {
965 959 : struct worker *t = exited_workers;
966 959 : exited_workers = exited_workers->next;
967 959 : finish_worker(t);
968 : }
969 334 : MT_lock_unset(&dataflowLock);
970 : }
971 334 : }
972 :
973 : void
974 334 : mal_dataflow_reset(void)
975 : {
976 334 : stopMALdataflow();
977 334 : workers = exited_workers = NULL;
978 334 : if (todo) {
979 334 : MT_lock_destroy(&todo->l);
980 334 : MT_sema_destroy(&todo->s);
981 334 : GDKfree(todo);
982 : }
983 334 : todo = 0; /* pending instructions */
984 334 : ATOMIC_SET(&exiting, 0);
985 334 : }
|