LCOV - code coverage report
Current view: top level - monetdb5/mal - mal_dataflow.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 428 515 83.1 %
Date: 2025-03-24 23:16:36 Functions: 12 14 85.7 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024, 2025 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /*
      14             :  * (author) M Kersten, S Mullender
      15             :  * Dataflow processing only works on a code
      16             :  * sequence that does not include additional (implicit) flow of control
      17             :  * statements and, ideally, consist of expensive BAT operations.
      18             :  * The dataflow portion is identified as a guarded block,
      19             :  * whose entry is controlled by the function language.dataflow();
      20             :  *
      21             :  * The dataflow worker tries to follow the sequence of actions
      22             :  * as laid out in the plan, but abandon this track when it hits
      23             :  * a blocking operator, or an instruction for which not all arguments
      24             :  * are available or resources become scarce.
      25             :  *
      26             :  * The flow graphs is organized such that parallel threads can
      27             :  * access it mostly without expensive locking and dependent
      28             :  * variables are easy to find..
      29             :  */
      30             : #include "monetdb_config.h"
      31             : #include "mal_dataflow.h"
      32             : #include "mal_exception.h"
      33             : #include "mal_private.h"
      34             : #include "mal_internal.h"
      35             : #include "mal_runtime.h"
      36             : #include "mal_resource.h"
      37             : #include "mal_function.h"
      38             : 
      39             : #define DFLOWpending 0                  /* runnable */
      40             : #define DFLOWrunning 1                  /* currently in progress */
      41             : #define DFLOWwrapup  2                  /* done! */
      42             : #define DFLOWretry   3                  /* reschedule */
      43             : #define DFLOWskipped 4                  /* due to errors */
      44             : 
      45             : /* The per instruction status of execution */
      46             : typedef struct FLOWEVENT {
      47             :         struct DATAFLOW *flow;          /* execution context */
      48             :         int pc;                                         /* pc in underlying malblock */
      49             :         int blocks;                                     /* awaiting for variables */
      50             :         sht state;                                      /* of execution */
      51             :         lng clk;
      52             :         sht cost;
      53             :         lng hotclaim;                           /* memory foot print of result variables */
      54             :         lng argclaim;                           /* memory foot print of arguments */
      55             :         lng maxclaim;                           /* memory foot print of largest argument, could be used to indicate result size */
      56             :         struct FLOWEVENT *next;         /* linked list for queues */
      57             : } *FlowEvent, FlowEventRec;
      58             : 
      59             : typedef struct queue {
      60             :         int exitcount;                          /* how many threads should exit */
      61             :         FlowEvent first, last;          /* first and last element of the queue */
      62             :         MT_Lock l;                                      /* it's a shared resource, ie we need locks */
      63             :         MT_Sema s;                                      /* threads wait on empty queues */
      64             : } Queue;
      65             : 
      66             : /*
      67             :  * The dataflow dependency is administered in a graph list structure.
      68             :  * For each instruction we keep the list of instructions that
      69             :  * should be checked for eligibility once we are finished with it.
      70             :  */
      71             : typedef struct DATAFLOW {
      72             :         Client cntxt;                           /* for debugging and client resolution */
      73             :         MalBlkPtr mb;                           /* carry the context */
      74             :         MalStkPtr stk;
      75             :         int start, stop;                        /* guarded block under consideration */
      76             :         FlowEvent status;                       /* status of each instruction */
      77             :         ATOMIC_PTR_TYPE error;          /* error encountered */
      78             :         int *nodes;                                     /* dependency graph nodes */
      79             :         int *edges;                                     /* dependency graph */
      80             :         MT_Lock flowlock;                       /* lock to protect the above */
      81             :         Queue *done;                            /* instructions handled */
      82             :         bool set_qry_ctx;
      83             : } *DataFlow, DataFlowRec;
      84             : 
      85             : struct worker {
      86             :         MT_Id id;
      87             :         enum { WAITING, RUNNING, FREE, EXITED, FINISHING } flag;
      88             :         ATOMIC_PTR_TYPE cntxt;          /* client we do work for (NULL -> any) */
      89             :         MT_Sema s;
      90             :         struct worker *next;
      91             :         char errbuf[GDKMAXERRLEN];      /* GDKerrbuf so that we can allocate before fork */
      92             : };
      93             : /* heads of three mutually exclusive linked lists, all using the .next
      94             :  * field in the worker struct */
      95             : static struct worker *workers;            /* "working" workers */
      96             : static struct worker *exited_workers; /* to be joined threads (.flag==EXITED) */
      97             : static struct worker *free_workers;     /* free workers (.flag==FREE) */
      98             : static int free_count = 0;              /* number of free threads */
      99             : static int free_max = 0;                /* max number of spare free threads */
     100             : 
     101             : static Queue *todo = 0;                 /* pending instructions */
     102             : 
     103             : static ATOMIC_TYPE exiting = ATOMIC_VAR_INIT(0);
     104             : static MT_Lock dataflowLock = MT_LOCK_INITIALIZER(dataflowLock);
     105             : 
     106             : /*
     107             :  * Calculate the size of the dataflow dependency graph.
     108             :  */
     109             : static int
     110             : DFLOWgraphSize(MalBlkPtr mb, int start, int stop)
     111             : {
     112             :         int cnt = 0;
     113             : 
     114    14354313 :         for (int i = start; i < stop; i++)
     115    14194096 :                 cnt += getInstrPtr(mb, i)->argc;
     116      160217 :         return cnt;
     117             : }
     118             : 
     119             : /*
     120             :  * The dataflow execution is confined to a barrier block.
     121             :  * Within the block there are multiple flows, which, in principle,
     122             :  * can be executed in parallel.
     123             :  */
     124             : 
     125             : static Queue *
     126      160573 : q_create(const char *name)
     127             : {
     128      160573 :         Queue *q = GDKzalloc(sizeof(Queue));
     129             : 
     130      160573 :         if (q == NULL)
     131             :                 return NULL;
     132      160573 :         MT_lock_init(&q->l, name);
     133      160573 :         MT_sema_init(&q->s, 0, name);
     134      160573 :         return q;
     135             : }
     136             : 
     137             : static void
     138      160217 : q_destroy(Queue *q)
     139             : {
     140      160217 :         assert(q);
     141      160217 :         MT_lock_destroy(&q->l);
     142      160216 :         MT_sema_destroy(&q->s);
     143      160214 :         GDKfree(q);
     144      160217 : }
     145             : 
     146             : /* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue is possible */
     147             : /* we might actually sort it for better scheduling behavior */
     148             : static void
     149    22140354 : q_enqueue(Queue *q, FlowEvent d)
     150             : {
     151    22140354 :         assert(q);
     152    22140354 :         assert(d);
     153    22140354 :         MT_lock_set(&q->l);
     154    22165597 :         if (q->first == NULL) {
     155     5345394 :                 assert(q->last == NULL);
     156     5345394 :                 q->first = q->last = d;
     157             :         } else {
     158    16820203 :                 assert(q->last != NULL);
     159    16820203 :                 q->last->next = d;
     160    16820203 :                 q->last = d;
     161             :         }
     162    22165597 :         d->next = NULL;
     163    22165597 :         MT_lock_unset(&q->l);
     164    22196522 :         MT_sema_up(&q->s);
     165    22153639 : }
     166             : 
     167             : /*
     168             :  * A priority queue over the hot claims of memory may
     169             :  * be more effective. It prioritizes those instructions
     170             :  * that want to use a big recent result
     171             :  */
     172             : 
     173             : static void
     174           0 : q_requeue(Queue *q, FlowEvent d)
     175             : {
     176           0 :         assert(q);
     177           0 :         assert(d);
     178           0 :         MT_lock_set(&q->l);
     179           0 :         if (q->first == NULL) {
     180           0 :                 assert(q->last == NULL);
     181           0 :                 q->first = q->last = d;
     182           0 :                 d->next = NULL;
     183             :         } else {
     184           0 :                 assert(q->last != NULL);
     185           0 :                 d->next = q->first;
     186           0 :                 q->first = d;
     187             :         }
     188           0 :         MT_lock_unset(&q->l);
     189           0 :         MT_sema_up(&q->s);
     190           0 : }
     191             : 
     192             : static FlowEvent
     193    22437486 : q_dequeue(Queue *q, Client cntxt)
     194             : {
     195    22437486 :         assert(q);
     196    22437486 :         MT_sema_down(&q->s);
     197    22440777 :         if (ATOMIC_GET(&exiting))
     198             :                 return NULL;
     199    22438233 :         MT_lock_set(&q->l);
     200    22403346 :         if (cntxt == NULL && q->exitcount > 0) {
     201      160217 :                 q->exitcount--;
     202      160217 :                 MT_lock_unset(&q->l);
     203      160217 :                 return NULL;
     204             :         }
     205             : 
     206    22243129 :         FlowEvent *dp = &q->first;
     207    22243129 :         FlowEvent pd = NULL;
     208             :         /* if cntxt == NULL, return the first event, if cntxt != NULL, find
     209             :          * the first event in the queue with matching cntxt value and return
     210             :          * that */
     211    22243129 :         if (cntxt != NULL) {
     212    22580971 :                 while (*dp && (*dp)->flow->cntxt != cntxt) {
     213    21032769 :                         pd = *dp;
     214    21032769 :                         dp = &pd->next;
     215             :                 }
     216             :         }
     217    22243129 :         FlowEvent d = *dp;
     218    22243129 :         if (d) {
     219    22105210 :                 *dp = d->next;
     220    22105210 :                 d->next = NULL;
     221    22105210 :                 if (*dp == NULL)
     222     5363536 :                         q->last = pd;
     223             :         }
     224    22243129 :         MT_lock_unset(&q->l);
     225    22213041 :         return d;
     226             : }
     227             : 
     228             : /*
     229             :  * We simply move an instruction into the front of the queue.
     230             :  * Beware, we assume that variables are assigned a value once, otherwise
     231             :  * the order may really create errors.
     232             :  * The order of the instructions should be retained as long as possible.
     233             :  * Delay processing when we run out of memory.  Push the instruction back
     234             :  * on the end of queue, waiting for another attempt. Problem might become
     235             :  * that all threads but one are cycling through the queue, each time
     236             :  * finding an eligible instruction, but without enough space.
     237             :  * Therefore, we wait for a few milliseconds as an initial punishment.
     238             :  *
     239             :  * The process could be refined by checking for cheap operations,
     240             :  * i.e. those that would require no memory at all (aggr.count)
     241             :  * This, however, would lead to a dependency to the upper layers,
     242             :  * because in the kernel we don't know what routines are available
     243             :  * with this property. Nor do we maintain such properties.
     244             :  */
     245             : 
     246             : static void
     247        4764 : DFLOWworker(void *T)
     248             : {
     249        4764 :         struct worker *t = (struct worker *) T;
     250        4764 :         bool locked = false;
     251             : #ifdef _MSC_VER
     252             :         srand((unsigned int) GDKusec());
     253             : #endif
     254        4764 :         GDKsetbuf(t->errbuf);                /* where to leave errors */
     255        4754 :         snprintf(t->s.name, sizeof(t->s.name), "DFLOWsema%04zu", MT_getpid());
     256             : 
     257      162659 :         for (;;) {
     258      162659 :                 DataFlow flow;
     259      162659 :                 FlowEvent fe = 0, fnxt = 0;
     260      162659 :                 str error = 0;
     261      162659 :                 int i;
     262      162659 :                 lng claim;
     263      162659 :                 Client cntxt;
     264      162659 :                 InstrPtr p;
     265             : 
     266      162659 :                 GDKclrerr();
     267             : 
     268      162614 :                 if (t->flag == WAITING) {
     269             :                         /* wait until we are allowed to start working */
     270      160203 :                         MT_sema_down(&t->s);
     271      160210 :                         t->flag = RUNNING;
     272      160210 :                         if (ATOMIC_GET(&exiting)) {
     273             :                                 break;
     274             :                         }
     275             :                 }
     276      162621 :                 assert(t->flag == RUNNING);
     277      162621 :                 cntxt = ATOMIC_PTR_GET(&t->cntxt);
     278    14290379 :                 while (1) {
     279    14290379 :                         MT_thread_set_qry_ctx(NULL);
     280    14266520 :                         if (fnxt == 0) {
     281     8451260 :                                 MT_thread_setworking("waiting for work");
     282     8459212 :                                 cntxt = ATOMIC_PTR_GET(&t->cntxt);
     283     8459212 :                                 fe = q_dequeue(todo, cntxt);
     284     8470511 :                                 if (fe == NULL) {
     285      300417 :                                         if (cntxt) {
     286             :                                                 /* we're not done yet with work for the current
     287             :                                                  * client (as far as we know), so give up the CPU
     288             :                                                  * and let the scheduler enter some more work, but
     289             :                                                  * first compensate for the down we did in
     290             :                                                  * dequeue */
     291      138315 :                                                 MT_sema_up(&todo->s);
     292      138312 :                                                 MT_sleep_ms(1);
     293    14428541 :                                                 continue;
     294             :                                         }
     295             :                                         /* no more work to be done: exit */
     296      162102 :                                         break;
     297             :                                 }
     298     8170094 :                                 if (fe->flow->cntxt && fe->flow->cntxt->mythread)
     299     8169258 :                                         MT_thread_setworking(fe->flow->cntxt->mythread);
     300             :                         } else
     301             :                                 fe = fnxt;
     302    13986600 :                         if (ATOMIC_GET(&exiting)) {
     303             :                                 break;
     304             :                         }
     305    13986600 :                         fnxt = 0;
     306    13986600 :                         assert(fe);
     307    13986600 :                         flow = fe->flow;
     308    13986600 :                         assert(flow);
     309    13986600 :                         MT_thread_set_qry_ctx(flow->set_qry_ctx ? &flow->cntxt->
     310             :                                                                   qryctx : NULL);
     311             : 
     312             :                         /* whenever we have a (concurrent) error, skip it */
     313    14000266 :                         if (ATOMIC_PTR_GET(&flow->error)) {
     314        4322 :                                 q_enqueue(flow->done, fe);
     315        4311 :                                 continue;
     316             :                         }
     317             : 
     318    13995944 :                         p = getInstrPtr(flow->mb, fe->pc);
     319    13995944 :                         claim = fe->argclaim;
     320    28018370 :                         if (p->fcn != (MALfcn) deblockdataflow &&    /* never block on deblockdataflow() */
     321    13995944 :                                 !MALadmission_claim(flow->cntxt, flow->mb, flow->stk, p, claim)) {
     322           0 :                                 fe->hotclaim = 0;    /* don't assume priority anymore */
     323           0 :                                 fe->maxclaim = 0;
     324           0 :                                 MT_lock_set(&todo->l);
     325           0 :                                 FlowEvent last = todo->last;
     326           0 :                                 MT_lock_unset(&todo->l);
     327           0 :                                 if (last == NULL)
     328           0 :                                         MT_sleep_ms(DELAYUNIT);
     329           0 :                                 q_requeue(todo, fe);
     330           0 :                                 continue;
     331             :                         }
     332    14022426 :                         ATOMIC_BASE_TYPE wrks = ATOMIC_INC(&flow->cntxt->workers);
     333    14022426 :                         ATOMIC_BASE_TYPE mwrks = ATOMIC_GET(&flow->mb->workers);
     334    14023102 :                         while (wrks > mwrks) {
     335       39629 :                                 if (ATOMIC_CAS(&flow->mb->workers, &mwrks, wrks))
     336             :                                         break;
     337             :                         }
     338    14022467 :                         error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1,
     339             :                                                                    flow->stk, 0, 0);
     340    13874370 :                         ATOMIC_DEC(&flow->cntxt->workers);
     341             :                         /* release the memory claim */
     342    13874370 :                         MALadmission_release(flow->cntxt, flow->mb, flow->stk, p, claim);
     343             : 
     344    13943377 :                         MT_lock_set(&flow->flowlock);
     345    14026143 :                         fe->state = DFLOWwrapup;
     346    14026143 :                         MT_lock_unset(&flow->flowlock);
     347    14007756 :                         if (error) {
     348         536 :                                 void *null = NULL;
     349             :                                 /* only collect one error (from one thread, needed for stable testing) */
     350         536 :                                 if (!ATOMIC_PTR_CAS(&flow->error, &null, error))
     351          54 :                                         freeException(error);
     352             :                                 /* after an error we skip the rest of the block */
     353         536 :                                 q_enqueue(flow->done, fe);
     354         536 :                                 continue;
     355             :                         }
     356             : 
     357             :                         /* see if you can find an eligible instruction that uses the
     358             :                          * result just produced. Then we can continue with it right away.
     359             :                          * We are just looking forward for the last block, which means we
     360             :                          * are safe from concurrent actions. No other thread can steal it,
     361             :                          * because we hold the logical lock.
     362             :                          * All eligible instructions are queued
     363             :                          */
     364    14007220 :                         p = getInstrPtr(flow->mb, fe->pc);
     365    14007220 :                         assert(p);
     366    14007220 :                         fe->hotclaim = 0;
     367    14007220 :                         fe->maxclaim = 0;
     368             : 
     369    29165561 :                         for (i = 0; i < p->retc; i++) {
     370    15164296 :                                 lng footprint;
     371    15164296 :                                 footprint = getMemoryClaim(flow->mb, flow->stk, p, i, FALSE);
     372    15158341 :                                 fe->hotclaim += footprint;
     373    15158341 :                                 if (footprint > fe->maxclaim)
     374     5491251 :                                         fe->maxclaim = footprint;
     375             :                         }
     376             : 
     377             : /* Try to get rid of the hot potato or locate an alternative to proceed.
     378             :  */
     379             : #define HOTPOTATO
     380             : #ifdef HOTPOTATO
     381             :                         /* HOT potato choice */
     382    14001265 :                         int last = 0, nxt = -1;
     383    14001265 :                         lng nxtclaim = -1;
     384             : 
     385    14001265 :                         MT_lock_set(&flow->flowlock);
     386    14027606 :                         for (last = fe->pc - flow->start;
     387    56166996 :                                  last >= 0 && (i = flow->nodes[last]) > 0;
     388    42139390 :                                  last = flow->edges[last]) {
     389    42139390 :                                 if (flow->status[i].state == DFLOWpending
     390    42134251 :                                         && flow->status[i].blocks == 1) {
     391             :                                         /* find the one with the largest footprint */
     392    11436726 :                                         if (nxt == -1 || flow->status[i].argclaim > nxtclaim) {
     393     6332871 :                                                 nxt = i;
     394     6332871 :                                                 nxtclaim = flow->status[i].argclaim;
     395             :                                         }
     396             :                                 }
     397             :                         }
     398             :                         /* hot potato can not be removed, use alternative to proceed */
     399    14027606 :                         if (nxt >= 0) {
     400     5861980 :                                 flow->status[nxt].state = DFLOWrunning;
     401     5861980 :                                 flow->status[nxt].blocks = 0;
     402     5861980 :                                 flow->status[nxt].hotclaim = fe->hotclaim;
     403     5861980 :                                 flow->status[nxt].argclaim += fe->hotclaim;
     404     5861980 :                                 if (flow->status[nxt].maxclaim < fe->maxclaim)
     405     2280707 :                                         flow->status[nxt].maxclaim = fe->maxclaim;
     406             :                                 fnxt = flow->status + nxt;
     407             :                         }
     408    14027606 :                         MT_lock_unset(&flow->flowlock);
     409             : #endif
     410             : 
     411    14009007 :                         q_enqueue(flow->done, fe);
     412    13984749 :                         if (fnxt == 0 && profilerStatus) {
     413           0 :                                 profilerHeartbeatEvent("wait");
     414             :                         }
     415             :                 }
     416      162102 :                 MT_lock_set(&dataflowLock);
     417      162696 :                 if (GDKexiting() || ATOMIC_GET(&exiting) || free_count >= free_max) {
     418             :                         locked = true;
     419             :                         break;
     420             :                 }
     421      158566 :                 free_count++;
     422      158566 :                 struct worker **tp = &workers;
     423      941444 :                 while (*tp && *tp != t)
     424      782878 :                         tp = &(*tp)->next;
     425      158566 :                 assert(*tp && *tp == t);
     426      158566 :                 *tp = t->next;
     427      158566 :                 t->flag = FREE;
     428      158566 :                 t->next = free_workers;
     429      158566 :                 free_workers = t;
     430      158566 :                 MT_lock_unset(&dataflowLock);
     431      158566 :                 MT_thread_setworking("idle, waiting for new client");
     432      158566 :                 MT_sema_down(&t->s);
     433      158563 :                 if (GDKexiting() || ATOMIC_GET(&exiting))
     434             :                         break;
     435      157923 :                 assert(t->flag == WAITING);
     436             :         }
     437         633 :         if (!locked)
     438         633 :                 MT_lock_set(&dataflowLock);
     439        4763 :         if (t->flag != FINISHING) {
     440        4100 :                 struct worker **tp = t->flag == FREE ? &free_workers : &workers;
     441       17712 :                 while (*tp && *tp != t)
     442       13612 :                         tp = &(*tp)->next;
     443        4100 :                 assert(*tp && *tp == t);
     444        4100 :                 *tp = t->next;
     445        4100 :                 t->flag = EXITED;
     446        4100 :                 t->next = exited_workers;
     447        4100 :                 exited_workers = t;
     448             :         }
     449        4763 :         MT_lock_unset(&dataflowLock);
     450        4753 :         GDKsetbuf(NULL);
     451        4751 : }
     452             : 
     453             : /*
     454             :  * Create an interpreter pool.
     455             :  * One worker will adaptively be available for each client.
     456             :  * The remainder are taken from the GDKnr_threads argument and
     457             :  * typically is equal to the number of cores
     458             :  * The workers are assembled in a local table to enable debugging.
     459             :  */
     460             : static int
     461         356 : DFLOWinitialize(void)
     462             : {
     463         356 :         int limit;
     464         356 :         int created = 0;
     465             : 
     466         356 :         MT_lock_set(&mal_contextLock);
     467         356 :         MT_lock_set(&dataflowLock);
     468         356 :         if (todo) {
     469             :                 /* somebody else beat us to it */
     470           0 :                 MT_lock_unset(&dataflowLock);
     471           0 :                 MT_lock_unset(&mal_contextLock);
     472           0 :                 return 0;
     473             :         }
     474         356 :         free_max = GDKgetenv_int("dataflow_max_free",
     475             :                                                          GDKnr_threads < 4 ? 4 : GDKnr_threads);
     476         356 :         todo = q_create("todo");
     477         356 :         if (todo == NULL) {
     478           0 :                 MT_lock_unset(&dataflowLock);
     479           0 :                 MT_lock_unset(&mal_contextLock);
     480           0 :                 return -1;
     481             :         }
     482         356 :         limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
     483        2842 :         while (limit > 0) {
     484        2486 :                 limit--;
     485        2486 :                 struct worker *t = GDKmalloc(sizeof(*t));
     486        2486 :                 if (t == NULL) {
     487           0 :                         TRC_CRITICAL(MAL_SERVER, "cannot allocate structure for worker");
     488           0 :                         continue;
     489             :                 }
     490        2486 :                 *t = (struct worker) {
     491             :                         .flag = RUNNING,
     492             :                         .cntxt = ATOMIC_PTR_VAR_INIT(NULL),
     493             :                 };
     494        2486 :                 MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
     495        2486 :                 if (MT_create_thread(&t->id, DFLOWworker, t,
     496             :                                                          MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
     497           0 :                         MT_sema_destroy(&t->s);
     498           0 :                         GDKfree(t);
     499             :                 } else {
     500        2486 :                         t->next = workers;
     501        2486 :                         workers = t;
     502        2486 :                         created++;
     503             :                 }
     504             :         }
     505         356 :         if (created == 0) {
     506             :                 /* no threads created */
     507           0 :                 q_destroy(todo);
     508           0 :                 todo = NULL;
     509           0 :                 MT_lock_unset(&dataflowLock);
     510           0 :                 MT_lock_unset(&mal_contextLock);
     511           0 :                 return -1;
     512             :         }
     513         356 :         MT_lock_unset(&dataflowLock);
     514         356 :         MT_lock_unset(&mal_contextLock);
     515         356 :         return 0;
     516             : }
     517             : 
     518             : /*
     519             :  * The dataflow administration is based on administration of
     520             :  * how many variables are still missing before it can be executed.
     521             :  * For each instruction we keep a list of instructions whose
     522             :  * blocking counter should be decremented upon finishing it.
     523             :  */
     524             : static str
     525      160217 : DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size)
     526             : {
     527      160217 :         int pc, i, j, k, l, n, etop = 0;
     528      160217 :         int *assign;
     529      160217 :         InstrPtr p;
     530             : 
     531      160217 :         if (flow == NULL)
     532           0 :                 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL");
     533      160217 :         if (mb == NULL)
     534           0 :                 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL");
     535      160217 :         assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
     536      160217 :         if (assign == NULL)
     537           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     538      160217 :         etop = flow->stop - flow->start;
     539    14193374 :         for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
     540    14033158 :                 p = getInstrPtr(mb, pc);
     541    14033158 :                 if (p == NULL) {
     542           0 :                         GDKfree(assign);
     543           0 :                         throw(MAL, "dataflow",
     544             :                                   "DFLOWinitBlk(): getInstrPtr() returned NULL");
     545             :                 }
     546             : 
     547             :                 /* initial state, ie everything can run */
     548    14033158 :                 flow->status[n].flow = flow;
     549    14033158 :                 flow->status[n].pc = pc;
     550    14033158 :                 flow->status[n].state = DFLOWpending;
     551    14033158 :                 flow->status[n].cost = -1;
     552    14033158 :                 ATOMIC_PTR_SET(&flow->status[n].flow->error, NULL);
     553             : 
     554             :                 /* administer flow dependencies */
     555    66394491 :                 for (j = p->retc; j < p->argc; j++) {
     556             :                         /* list of instructions that wake n-th instruction up */
     557    52361334 :                         if (!isVarConstant(mb, getArg(p, j)) && (k = assign[getArg(p, j)])) {
     558    26005132 :                                 assert(k < pc);      /* only dependencies on earlier instructions */
     559             :                                 /* add edge to the target instruction for wakeup call */
     560    26005132 :                                 k -= flow->start;
     561    26005132 :                                 if (flow->nodes[k]) {
     562             :                                         /* add wakeup to tail of list */
     563   226731884 :                                         for (i = k; flow->edges[i] > 0; i = flow->edges[i])
     564             :                                                 ;
     565    23370616 :                                         flow->nodes[etop] = n;
     566    23370616 :                                         flow->edges[etop] = -1;
     567    23370616 :                                         flow->edges[i] = etop;
     568    23370616 :                                         etop++;
     569    23370616 :                                         (void) size;
     570    23370616 :                                         if (etop == size) {
     571         213 :                                                 int *tmp;
     572             :                                                 /* in case of realloc failure, the original
     573             :                                                  * pointers will be freed by the caller */
     574         213 :                                                 tmp = (int *) GDKrealloc(flow->nodes,
     575             :                                                                                                  sizeof(int) * 2 * size);
     576         212 :                                                 if (tmp == NULL) {
     577           0 :                                                         GDKfree(assign);
     578           0 :                                                         throw(MAL, "dataflow",
     579             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     580             :                                                 }
     581         212 :                                                 flow->nodes = tmp;
     582         212 :                                                 tmp = (int *) GDKrealloc(flow->edges,
     583             :                                                                                                  sizeof(int) * 2 * size);
     584         212 :                                                 if (tmp == NULL) {
     585           0 :                                                         GDKfree(assign);
     586           0 :                                                         throw(MAL, "dataflow",
     587             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     588             :                                                 }
     589         212 :                                                 flow->edges = tmp;
     590         212 :                                                 size *= 2;
     591             :                                         }
     592             :                                 } else {
     593     2634516 :                                         flow->nodes[k] = n;
     594     2634516 :                                         flow->edges[k] = -1;
     595             :                                 }
     596             : 
     597    26005131 :                                 flow->status[n].blocks++;
     598             :                         }
     599             : 
     600             :                         /* list of instructions to be woken up explicitly */
     601    52361333 :                         if (!isVarConstant(mb, getArg(p, j))) {
     602             :                                 /* be careful, watch out for garbage collection interference */
     603             :                                 /* those should be scheduled after all its other uses */
     604    27799584 :                                 l = getEndScope(mb, getArg(p, j));
     605    27799584 :                                 if (l != pc && l < flow->stop && l > flow->start) {
     606             :                                         /* add edge to the target instruction for wakeup call */
     607    16157025 :                                         assert(pc < l);      /* only dependencies on earlier instructions */
     608    16157025 :                                         l -= flow->start;
     609    16157025 :                                         if (flow->nodes[n]) {
     610             :                                                 /* add wakeup to tail of list */
     611    78242996 :                                                 for (i = n; flow->edges[i] > 0; i = flow->edges[i])
     612             :                                                         ;
     613     7927688 :                                                 flow->nodes[etop] = l;
     614     7927688 :                                                 flow->edges[etop] = -1;
     615     7927688 :                                                 flow->edges[i] = etop;
     616     7927688 :                                                 etop++;
     617     7927688 :                                                 if (etop == size) {
     618         209 :                                                         int *tmp;
     619             :                                                         /* in case of realloc failure, the original
     620             :                                                          * pointers will be freed by the caller */
     621         209 :                                                         tmp = (int *) GDKrealloc(flow->nodes,
     622             :                                                                                                          sizeof(int) * 2 * size);
     623         209 :                                                         if (tmp == NULL) {
     624           0 :                                                                 GDKfree(assign);
     625           0 :                                                                 throw(MAL, "dataflow",
     626             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     627             :                                                         }
     628         209 :                                                         flow->nodes = tmp;
     629         209 :                                                         tmp = (int *) GDKrealloc(flow->edges,
     630             :                                                                                                          sizeof(int) * 2 * size);
     631         209 :                                                         if (tmp == NULL) {
     632           0 :                                                                 GDKfree(assign);
     633           0 :                                                                 throw(MAL, "dataflow",
     634             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     635             :                                                         }
     636         209 :                                                         flow->edges = tmp;
     637         209 :                                                         size *= 2;
     638             :                                                 }
     639             :                                         } else {
     640     8229337 :                                                 flow->nodes[n] = l;
     641     8229337 :                                                 flow->edges[n] = -1;
     642             :                                         }
     643    16157025 :                                         flow->status[l].blocks++;
     644             :                                 }
     645             :                         }
     646             :                 }
     647             : 
     648    29229626 :                 for (j = 0; j < p->retc; j++)
     649    15196469 :                         assign[getArg(p, j)] = pc;      /* ensure recognition of dependency on first instruction and constant */
     650             :         }
     651      160216 :         GDKfree(assign);
     652             : 
     653      160216 :         return MAL_SUCCEED;
     654             : }
     655             : 
     656             : /*
     657             :  * Parallel processing is mostly driven by dataflow, but within this context
     658             :  * there may be different schemes to take instructions into execution.
     659             :  * The admission scheme (and wrapup) are the necessary scheduler hooks.
     660             :  * A scheduler registers the functions needed and should release them
     661             :  * at the end of the parallel block.
     662             :  * They take effect after we have ensured that the basic properties for
     663             :  * execution hold.
     664             :  */
     665             : static str
     666      160217 : DFLOWscheduler(DataFlow flow, struct worker *w)
     667             : {
     668      160217 :         int last;
     669      160217 :         int i;
     670      160217 :         int j;
     671      160217 :         InstrPtr p;
     672      160217 :         int tasks = 0, actions = 0;
     673      160217 :         str ret = MAL_SUCCEED;
     674      160217 :         FlowEvent fe, f = 0;
     675             : 
     676      160217 :         if (flow == NULL)
     677           0 :                 throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL");
     678      160217 :         actions = flow->stop - flow->start;
     679      160217 :         if (actions == 0)
     680           0 :                 throw(MAL, "dataflow", "Empty dataflow block");
     681             :         /* initialize the eligible statements */
     682      160217 :         fe = flow->status;
     683             : 
     684      160217 :         ATOMIC_DEC(&flow->cntxt->workers);
     685      160217 :         MT_lock_set(&flow->flowlock);
     686    14354158 :         for (i = 0; i < actions; i++)
     687    14033724 :                 if (fe[i].blocks == 0) {
     688     1022564 :                         p = getInstrPtr(flow->mb, fe[i].pc);
     689     1022564 :                         if (p == NULL) {
     690           0 :                                 MT_lock_unset(&flow->flowlock);
     691           0 :                                 ATOMIC_INC(&flow->cntxt->workers);
     692           0 :                                 throw(MAL, "dataflow",
     693             :                                           "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc) returned NULL");
     694             :                         }
     695     1022564 :                         fe[i].argclaim = 0;
     696     6078002 :                         for (j = p->retc; j < p->argc; j++)
     697     5055438 :                                 fe[i].argclaim += getMemoryClaim(fe[0].flow->mb,
     698     5055440 :                                                                                                  fe[0].flow->stk, p, j, FALSE);
     699     1022562 :                         flow->status[i].state = DFLOWrunning;
     700     1022562 :                         q_enqueue(todo, flow->status + i);
     701             :                 }
     702      160217 :         MT_lock_unset(&flow->flowlock);
     703      160217 :         MT_sema_up(&w->s);
     704             : 
     705      160217 :         while (actions != tasks) {
     706    14032736 :                 f = q_dequeue(flow->done, NULL);
     707    14033140 :                 if (ATOMIC_GET(&exiting))
     708             :                         break;
     709    14033140 :                 if (f == NULL) {
     710           0 :                         ATOMIC_INC(&flow->cntxt->workers);
     711           0 :                         throw(MAL, "dataflow",
     712             :                                   "DFLOWscheduler(): q_dequeue(flow->done) returned NULL");
     713             :                 }
     714             : 
     715             :                 /*
     716             :                  * When an instruction is finished we have to reduce the blocked
     717             :                  * counter for all dependent instructions.  for those where it
     718             :                  * drops to zero we can scheduler it we do it here instead of the scheduler
     719             :                  */
     720             : 
     721    14033140 :                 MT_lock_set(&flow->flowlock);
     722    14032670 :                 tasks++;
     723    14032670 :                 for (last = f->pc - flow->start;
     724    56186131 :                          last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last])
     725    42153767 :                         if (flow->status[i].state == DFLOWpending) {
     726    36297846 :                                 flow->status[i].argclaim += f->hotclaim;
     727    36297846 :                                 if (flow->status[i].blocks == 1) {
     728     7148597 :                                         flow->status[i].blocks--;
     729     7148597 :                                         flow->status[i].state = DFLOWrunning;
     730     7148597 :                                         q_enqueue(todo, flow->status + i);
     731             :                                 } else {
     732    29149249 :                                         flow->status[i].blocks--;
     733             :                                 }
     734             :                         }
     735    14192581 :                 MT_lock_unset(&flow->flowlock);
     736             :         }
     737             :         /* release the worker from its specific task (turn it into a
     738             :          * generic worker) */
     739      160217 :         ATOMIC_PTR_SET(&w->cntxt, NULL);
     740      160217 :         ATOMIC_INC(&flow->cntxt->workers);
     741             :         /* wrap up errors */
     742      160217 :         assert(flow->done->last == 0);
     743      160217 :         if ((ret = ATOMIC_PTR_XCG(&flow->error, NULL)) != NULL) {
     744         482 :                 TRC_DEBUG(MAL_SERVER, "Errors encountered: %s\n", ret);
     745             :         }
     746             :         return ret;
     747             : }
     748             : 
     749             : /* called and returns with dataflowLock locked, temporarily unlocks
     750             :  * join the thread associated with the worker and destroy the structure */
     751             : static inline void
     752        4763 : finish_worker(struct worker *t)
     753             : {
     754        4763 :         t->flag = FINISHING;
     755        4763 :         MT_lock_unset(&dataflowLock);
     756        4763 :         MT_join_thread(t->id);
     757        4763 :         MT_sema_destroy(&t->s);
     758        4763 :         GDKfree(t);
     759        4763 :         MT_lock_set(&dataflowLock);
     760        4763 : }
     761             : 
     762             : /* We create a pool of GDKnr_threads-1 generic workers, that is,
     763             :  * workers that will take on jobs from any clients.  In addition, we
     764             :  * create a single specific worker per client (i.e. each time we enter
     765             :  * here).  This specific worker will only do work for the client for
     766             :  * which it was started.  In this way we can guarantee that there will
     767             :  * always be progress for the client, even if all other workers are
     768             :  * doing something big.
     769             :  *
     770             :  * When all jobs for a client have been done (there are no more
     771             :  * entries for the client in the queue), the specific worker turns
     772             :  * itself into a generic worker.  At the same time, we signal that one
     773             :  * generic worker should exit and this function returns.  In this way
     774             :  * we make sure that there are once again GDKnr_threads-1 generic
     775             :  * workers. */
     776             : str
     777      160198 : runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc,
     778             :                            MalStkPtr stk)
     779             : {
     780      160198 :         DataFlow flow = NULL;
     781      160198 :         str msg = MAL_SUCCEED;
     782      160198 :         int size;
     783      160198 :         bit *ret;
     784      160198 :         struct worker *t;
     785             : 
     786      160198 :         if (stk == NULL)
     787           0 :                 throw(MAL, "dataflow", "runMALdataflow(): Called with stk == NULL");
     788      160198 :         ret = getArgReference_bit(stk, getInstrPtr(mb, startpc), 0);
     789      160198 :         *ret = FALSE;
     790             : 
     791      160198 :         assert(stoppc > startpc);
     792             : 
     793             :         /* check existence of workers */
     794      160198 :         if (todo == NULL) {
     795             :                 /* create thread pool */
     796         356 :                 if (GDKnr_threads <= 1 || DFLOWinitialize() < 0) {
     797             :                         /* no threads created, run serially */
     798           0 :                         *ret = TRUE;
     799           0 :                         return MAL_SUCCEED;
     800             :                 }
     801             :         }
     802      160198 :         assert(todo);
     803             :         /* in addition, create one more worker that will only execute
     804             :          * tasks for the current client to compensate for our waiting
     805             :          * until all work is done */
     806      160198 :         MT_lock_set(&dataflowLock);
     807             :         /* join with already exited threads */
     808      161868 :         while (exited_workers != NULL) {
     809        1651 :                 assert(exited_workers->flag == EXITED);
     810        1651 :                 struct worker *t = exited_workers;
     811        1651 :                 exited_workers = exited_workers->next;
     812        1651 :                 finish_worker(t);
     813             :         }
     814      160217 :         assert(cntxt != NULL);
     815      160217 :         if (free_workers != NULL) {
     816      157932 :                 t = free_workers;
     817      157932 :                 assert(t->flag == FREE);
     818      157932 :                 assert(free_count > 0);
     819      157932 :                 free_count--;
     820      157932 :                 free_workers = t->next;
     821      157932 :                 t->next = workers;
     822      157932 :                 workers = t;
     823      157932 :                 t->flag = WAITING;
     824      157932 :                 ATOMIC_PTR_SET(&t->cntxt, cntxt);
     825      157932 :                 MT_sema_up(&t->s);
     826             :         } else {
     827        2285 :                 t = GDKmalloc(sizeof(*t));
     828        2285 :                 if (t != NULL) {
     829        2285 :                         *t = (struct worker) {
     830             :                                 .flag = WAITING,
     831             :                                 .cntxt = ATOMIC_PTR_VAR_INIT(cntxt),
     832             :                         };
     833        2285 :                         MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
     834        2285 :                         if (MT_create_thread(&t->id, DFLOWworker, t,
     835             :                                                                  MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
     836           0 :                                 MT_sema_destroy(&t->s);
     837           0 :                                 GDKfree(t);
     838           0 :                                 t = NULL;
     839             :                         } else {
     840        2285 :                                 t->next = workers;
     841        2285 :                                 workers = t;
     842             :                         }
     843             :                 }
     844        2285 :                 if (t == NULL) {
     845             :                         /* cannot start new thread, run serially */
     846           0 :                         *ret = TRUE;
     847           0 :                         MT_lock_unset(&dataflowLock);
     848           0 :                         return MAL_SUCCEED;
     849             :                 }
     850             :         }
     851      160217 :         MT_lock_unset(&dataflowLock);
     852             : 
     853      160217 :         flow = (DataFlow) GDKzalloc(sizeof(DataFlowRec));
     854      160217 :         if (flow == NULL)
     855           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     856             : 
     857      160217 :         size = DFLOWgraphSize(mb, startpc, stoppc);
     858      160217 :         size += stoppc - startpc;
     859             : 
     860      320434 :         *flow = (DataFlowRec) {
     861             :                 .cntxt = cntxt,
     862             :                 .mb = mb,
     863             :                 .stk = stk,
     864      160217 :                 .set_qry_ctx = MT_thread_get_qry_ctx() != NULL,
     865             :                 /* keep real block count, exclude brackets */
     866      160217 :                 .start = startpc + 1,
     867             :                 .stop = stoppc,
     868      160217 :                 .done = q_create("flow->done"),
     869      160217 :                 .status = (FlowEvent) GDKzalloc((stoppc - startpc + 1) *
     870             :                                                                                 sizeof(FlowEventRec)),
     871             :                 .error = ATOMIC_PTR_VAR_INIT(NULL),
     872      160217 :                 .nodes = (int *) GDKzalloc(sizeof(int) * size),
     873      160217 :                 .edges = (int *) GDKzalloc(sizeof(int) * size),
     874             :         };
     875             : 
     876      160217 :         if (flow->done == NULL) {
     877           0 :                 GDKfree(flow->status);
     878           0 :                 GDKfree(flow->nodes);
     879           0 :                 GDKfree(flow->edges);
     880           0 :                 GDKfree(flow);
     881           0 :                 throw(MAL, "dataflow",
     882             :                           "runMALdataflow(): Failed to create flow->done queue");
     883             :         }
     884             : 
     885      160217 :         if (flow->status == NULL || flow->nodes == NULL || flow->edges == NULL) {
     886           0 :                 q_destroy(flow->done);
     887           0 :                 GDKfree(flow->status);
     888           0 :                 GDKfree(flow->nodes);
     889           0 :                 GDKfree(flow->edges);
     890           0 :                 GDKfree(flow);
     891           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     892             :         }
     893             : 
     894      160217 :         MT_lock_init(&flow->flowlock, "flow->flowlock");
     895      160216 :         msg = DFLOWinitBlk(flow, mb, size);
     896             : 
     897      160217 :         if (msg == MAL_SUCCEED)
     898      160217 :                 msg = DFLOWscheduler(flow, t);
     899             : 
     900      160217 :         GDKfree(flow->status);
     901      160217 :         GDKfree(flow->edges);
     902      160217 :         GDKfree(flow->nodes);
     903      160217 :         q_destroy(flow->done);
     904      160217 :         MT_lock_destroy(&flow->flowlock);
     905      160217 :         GDKfree(flow);
     906             : 
     907             :         /* we created one worker, now tell one worker to exit again */
     908      160217 :         MT_lock_set(&todo->l);
     909      160217 :         todo->exitcount++;
     910      160217 :         MT_lock_unset(&todo->l);
     911      160217 :         MT_sema_up(&todo->s);
     912             : 
     913      160217 :         return msg;
     914             : }
     915             : 
     916             : str
     917           0 : deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     918             : {
     919           0 :         int *ret = getArgReference_int(stk, pci, 0);
     920           0 :         int *val = getArgReference_int(stk, pci, 1);
     921           0 :         (void) cntxt;
     922           0 :         (void) mb;
     923           0 :         *ret = *val;
     924           0 :         return MAL_SUCCEED;
     925             : }
     926             : 
     927             : static void
     928         355 : stopMALdataflow(void)
     929             : {
     930         355 :         ATOMIC_SET(&exiting, 1);
     931         355 :         if (todo) {
     932         355 :                 MT_lock_set(&dataflowLock);
     933             :                 /* first wake up all running threads */
     934         355 :                 int n = 0;
     935         988 :                 for (struct worker *t = free_workers; t; t = t->next)
     936         633 :                         n++;
     937        2834 :                 for (struct worker *t = workers; t; t = t->next)
     938        2479 :                         n++;
     939        3467 :                 while (n-- > 0) {
     940             :                         /* one UP for each thread we know about */
     941        3467 :                         MT_sema_up(&todo->s);
     942             :                 }
     943         988 :                 while (free_workers) {
     944         633 :                         struct worker *t = free_workers;
     945         633 :                         assert(free_count > 0);
     946         633 :                         free_count--;
     947         633 :                         free_workers = free_workers->next;
     948         633 :                         MT_sema_up(&t->s);
     949         633 :                         finish_worker(t);
     950             :                 }
     951         385 :                 while (workers) {
     952          30 :                         struct worker *t = workers;
     953          30 :                         workers = workers->next;
     954          30 :                         finish_worker(t);
     955             :                 }
     956        2804 :                 while (exited_workers) {
     957        2449 :                         struct worker *t = exited_workers;
     958        2449 :                         exited_workers = exited_workers->next;
     959        2449 :                         finish_worker(t);
     960             :                 }
     961         355 :                 MT_lock_unset(&dataflowLock);
     962             :         }
     963         355 : }
     964             : 
     965             : void
     966         355 : mal_dataflow_reset(void)
     967             : {
     968         355 :         stopMALdataflow();
     969         355 :         workers = exited_workers = NULL;
     970         355 :         if (todo) {
     971         355 :                 MT_lock_destroy(&todo->l);
     972         355 :                 MT_sema_destroy(&todo->s);
     973         355 :                 GDKfree(todo);
     974             :         }
     975         355 :         todo = 0;                                       /* pending instructions */
     976         355 :         ATOMIC_SET(&exiting, 0);
     977         355 : }

Generated by: LCOV version 1.14