LCOV - code coverage report
Current view: top level - monetdb5/mal - mal_dataflow.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 438 530 82.6 %
Date: 2024-04-25 20:03:45 Functions: 12 14 85.7 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /*
      14             :  * (author) M Kersten, S Mullender
      15             :  * Dataflow processing only works on a code
      16             :  * sequence that does not include additional (implicit) flow of control
      17             :  * statements and, ideally, consist of expensive BAT operations.
      18             :  * The dataflow portion is identified as a guarded block,
      19             :  * whose entry is controlled by the function language.dataflow();
      20             :  *
      21             :  * The dataflow worker tries to follow the sequence of actions
      22             :  * as layed out in the plan, but abandon this track when it hits
      23             :  * a blocking operator, or an instruction for which not all arguments
      24             :  * are available or resources become scarce.
      25             :  *
      26             :  * The flow graphs is organized such that parallel threads can
      27             :  * access it mostly without expensive locking and dependent
      28             :  * variables are easy to find..
      29             :  */
      30             : #include "monetdb_config.h"
      31             : #include "mal_dataflow.h"
      32             : #include "mal_exception.h"
      33             : #include "mal_private.h"
      34             : #include "mal_internal.h"
      35             : #include "mal_runtime.h"
      36             : #include "mal_resource.h"
      37             : #include "mal_function.h"
      38             : 
      39             : #define DFLOWpending 0                  /* runnable */
      40             : #define DFLOWrunning 1                  /* currently in progress */
      41             : #define DFLOWwrapup  2                  /* done! */
      42             : #define DFLOWretry   3                  /* reschedule */
      43             : #define DFLOWskipped 4                  /* due to errors */
      44             : 
      45             : /* The per instruction status of execution */
      46             : typedef struct FLOWEVENT {
      47             :         struct DATAFLOW *flow;          /* execution context */
      48             :         int pc;                                         /* pc in underlying malblock */
      49             :         int blocks;                                     /* awaiting for variables */
      50             :         sht state;                                      /* of execution */
      51             :         lng clk;
      52             :         sht cost;
      53             :         lng hotclaim;                           /* memory foot print of result variables */
      54             :         lng argclaim;                           /* memory foot print of arguments */
      55             :         lng maxclaim;                           /* memory foot print of largest argument, could be used to indicate result size */
      56             :         struct FLOWEVENT *next;         /* linked list for queues */
      57             : } *FlowEvent, FlowEventRec;
      58             : 
      59             : typedef struct queue {
      60             :         int exitcount;                          /* how many threads should exit */
      61             :         FlowEvent first, last;          /* first and last element of the queue */
      62             :         MT_Lock l;                                      /* it's a shared resource, ie we need locks */
      63             :         MT_Sema s;                                      /* threads wait on empty queues */
      64             : } Queue;
      65             : 
      66             : /*
      67             :  * The dataflow dependency is administered in a graph list structure.
      68             :  * For each instruction we keep the list of instructions that
      69             :  * should be checked for eligibility once we are finished with it.
      70             :  */
      71             : typedef struct DATAFLOW {
      72             :         Client cntxt;                           /* for debugging and client resolution */
      73             :         MalBlkPtr mb;                           /* carry the context */
      74             :         MalStkPtr stk;
      75             :         int start, stop;                        /* guarded block under consideration */
      76             :         FlowEvent status;                       /* status of each instruction */
      77             :         ATOMIC_PTR_TYPE error;          /* error encountered */
      78             :         int *nodes;                                     /* dependency graph nodes */
      79             :         int *edges;                                     /* dependency graph */
      80             :         MT_Lock flowlock;                       /* lock to protect the above */
      81             :         Queue *done;                            /* instructions handled */
      82             :         bool set_qry_ctx;
      83             : } *DataFlow, DataFlowRec;
      84             : 
      85             : struct worker {
      86             :         MT_Id id;
      87             :         enum { WAITING, RUNNING, FREE, EXITED, FINISHING } flag;
      88             :         ATOMIC_PTR_TYPE cntxt;          /* client we do work for (NULL -> any) */
      89             :         MT_Sema s;
      90             :         struct worker *next;
      91             :         char errbuf[GDKMAXERRLEN];      /* GDKerrbuf so that we can allocate before fork */
      92             : };
      93             : /* heads of three mutually exclusive linked lists, all using the .next
      94             :  * field in the worker struct */
      95             : static struct worker *workers;            /* "working" workers */
      96             : static struct worker *exited_workers; /* to be joined threads (.flag==EXITED) */
      97             : static struct worker *free_workers;     /* free workers (.flag==FREE) */
      98             : static int free_count = 0;              /* number of free threads */
      99             : static int free_max = 0;                /* max number of spare free threads */
     100             : 
     101             : static Queue *todo = 0;                 /* pending instructions */
     102             : 
     103             : static ATOMIC_TYPE exiting = ATOMIC_VAR_INIT(0);
     104             : static MT_Lock dataflowLock = MT_LOCK_INITIALIZER(dataflowLock);
     105             : 
     106             : /*
     107             :  * Calculate the size of the dataflow dependency graph.
     108             :  */
     109             : static int
     110             : DFLOWgraphSize(MalBlkPtr mb, int start, int stop)
     111             : {
     112             :         int cnt = 0;
     113             : 
     114     8076385 :         for (int i = start; i < stop; i++)
     115     7931488 :                 cnt += getInstrPtr(mb, i)->argc;
     116      144897 :         return cnt;
     117             : }
     118             : 
     119             : /*
     120             :  * The dataflow execution is confined to a barrier block.
     121             :  * Within the block there are multiple flows, which, in principle,
     122             :  * can be executed in parallel.
     123             :  */
     124             : 
     125             : static Queue *
     126      145232 : q_create(const char *name)
     127             : {
     128      145232 :         Queue *q = GDKzalloc(sizeof(Queue));
     129             : 
     130      145232 :         if (q == NULL)
     131             :                 return NULL;
     132      145232 :         MT_lock_init(&q->l, name);
     133      145231 :         MT_sema_init(&q->s, 0, name);
     134      145232 :         return q;
     135             : }
     136             : 
     137             : static void
     138      144896 : q_destroy(Queue *q)
     139             : {
     140      144896 :         assert(q);
     141      144896 :         MT_lock_destroy(&q->l);
     142      144896 :         MT_sema_destroy(&q->s);
     143      144897 :         GDKfree(q);
     144      144897 : }
     145             : 
     146             : /* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue is possible */
     147             : /* we might actually sort it for better scheduling behavior */
     148             : static void
     149    12501224 : q_enqueue(Queue *q, FlowEvent d)
     150             : {
     151    12501224 :         assert(q);
     152    12501224 :         assert(d);
     153    12501224 :         MT_lock_set(&q->l);
     154    12498739 :         if (q->first == NULL) {
     155     4304903 :                 assert(q->last == NULL);
     156     4304903 :                 q->first = q->last = d;
     157             :         } else {
     158     8193836 :                 assert(q->last != NULL);
     159     8193836 :                 q->last->next = d;
     160     8193836 :                 q->last = d;
     161             :         }
     162    12498739 :         d->next = NULL;
     163    12498739 :         MT_lock_unset(&q->l);
     164    12500697 :         MT_sema_up(&q->s);
     165    12498631 : }
     166             : 
     167             : /*
     168             :  * A priority queue over the hot claims of memory may
     169             :  * be more effective. It priorizes those instructions
     170             :  * that want to use a big recent result
     171             :  */
     172             : 
     173             : static void
     174           0 : q_requeue(Queue *q, FlowEvent d)
     175             : {
     176           0 :         assert(q);
     177           0 :         assert(d);
     178           0 :         MT_lock_set(&q->l);
     179           0 :         if (q->first == NULL) {
     180           0 :                 assert(q->last == NULL);
     181           0 :                 q->first = q->last = d;
     182           0 :                 d->next = NULL;
     183             :         } else {
     184           0 :                 assert(q->last != NULL);
     185           0 :                 d->next = q->first;
     186           0 :                 q->first = d;
     187             :         }
     188           0 :         MT_lock_unset(&q->l);
     189           0 :         MT_sema_up(&q->s);
     190           0 : }
     191             : 
     192             : static FlowEvent
     193    12870817 : q_dequeue(Queue *q, Client cntxt)
     194             : {
     195    12870817 :         assert(q);
     196    12870817 :         MT_sema_down(&q->s);
     197    12866849 :         if (ATOMIC_GET(&exiting))
     198             :                 return NULL;
     199    12865548 :         MT_lock_set(&q->l);
     200    12847696 :         if (cntxt == NULL && q->exitcount > 0) {
     201      144897 :                 q->exitcount--;
     202      144897 :                 MT_lock_unset(&q->l);
     203      144897 :                 return NULL;
     204             :         }
     205             : 
     206    12702799 :         FlowEvent *dp = &q->first;
     207    12702799 :         FlowEvent pd = NULL;
     208             :         /* if cntxt == NULL, return the first event, if cntxt != NULL, find
     209             :          * the first event in the queue with matching cntxt value and return
     210             :          * that */
     211    12702799 :         if (cntxt != NULL) {
     212    10460825 :                 while (*dp && (*dp)->flow->cntxt != cntxt) {
     213     8733440 :                         pd = *dp;
     214     8733440 :                         dp = &pd->next;
     215             :                 }
     216             :         }
     217    12702799 :         FlowEvent d = *dp;
     218    12702799 :         if (d) {
     219    12471708 :                 *dp = d->next;
     220    12471708 :                 d->next = NULL;
     221    12471708 :                 if (*dp == NULL)
     222     4322163 :                         q->last = pd;
     223             :         }
     224    12702799 :         MT_lock_unset(&q->l);
     225    12696114 :         return d;
     226             : }
     227             : 
     228             : /*
     229             :  * We simply move an instruction into the front of the queue.
     230             :  * Beware, we assume that variables are assigned a value once, otherwise
     231             :  * the order may really create errors.
     232             :  * The order of the instructions should be retained as long as possible.
     233             :  * Delay processing when we run out of memory.  Push the instruction back
     234             :  * on the end of queue, waiting for another attempt. Problem might become
     235             :  * that all threads but one are cycling through the queue, each time
     236             :  * finding an eligible instruction, but without enough space.
     237             :  * Therefore, we wait for a few milliseconds as an initial punishment.
     238             :  *
     239             :  * The process could be refined by checking for cheap operations,
     240             :  * i.e. those that would require no memory at all (aggr.count)
     241             :  * This, however, would lead to a dependency to the upper layers,
     242             :  * because in the kernel we don't know what routines are available
     243             :  * with this property. Nor do we maintain such properties.
     244             :  */
     245             : 
     246             : static void
     247        7741 : DFLOWworker(void *T)
     248             : {
     249        7741 :         struct worker *t = (struct worker *) T;
     250        7741 :         bool locked = false;
     251             : #ifdef _MSC_VER
     252             :         srand((unsigned int) GDKusec());
     253             : #endif
     254        7741 :         GDKsetbuf(t->errbuf);                /* where to leave errors */
     255        7741 :         snprintf(t->s.name, sizeof(t->s.name), "DFLOWsema%04zu", MT_getpid());
     256             : 
     257      145899 :         for (;;) {
     258      145899 :                 DataFlow flow;
     259      145899 :                 FlowEvent fe = 0, fnxt = 0;
     260      145899 :                 str error = 0;
     261      145899 :                 int i;
     262      145899 :                 lng claim;
     263      145899 :                 Client cntxt;
     264      145899 :                 InstrPtr p;
     265             : 
     266      145899 :                 GDKclrerr();
     267             : 
     268      145900 :                 if (t->flag == WAITING) {
     269             :                         /* wait until we are allowed to start working */
     270      144895 :                         MT_sema_down(&t->s);
     271      144892 :                         t->flag = RUNNING;
     272      144892 :                         if (ATOMIC_GET(&exiting)) {
     273             :                                 break;
     274             :                         }
     275             :                 }
     276      145897 :                 assert(t->flag == RUNNING);
     277      145897 :                 cntxt = ATOMIC_PTR_GET(&t->cntxt);
     278     8159651 :                 while (1) {
     279     8159651 :                         MT_thread_set_qry_ctx(NULL);
     280     8155001 :                         if (fnxt == 0) {
     281     5089637 :                                 MT_thread_setworking("waiting for work");
     282     5090173 :                                 cntxt = ATOMIC_PTR_GET(&t->cntxt);
     283     5090173 :                                 fe = q_dequeue(todo, cntxt);
     284     5092371 :                                 if (fe == NULL) {
     285      377286 :                                         if (cntxt) {
     286             :                                                 /* we're not done yet with work for the current
     287             :                                                  * client (as far as we know), so give up the CPU
     288             :                                                  * and let the scheduler enter some more work, but
     289             :                                                  * first compensate for the down we did in
     290             :                                                  * dequeue */
     291      231390 :                                                 MT_sema_up(&todo->s);
     292      231388 :                                                 MT_sleep_ms(1);
     293     8391038 :                                                 continue;
     294             :                                         }
     295             :                                         /* no more work to be done: exit */
     296      145896 :                                         break;
     297             :                                 }
     298     4715085 :                                 if (fe->flow->cntxt && fe->flow->cntxt->mythread)
     299     4714281 :                                         MT_thread_setworking(fe->flow->cntxt->mythread);
     300             :                         } else
     301             :                                 fe = fnxt;
     302     7780122 :                         if (ATOMIC_GET(&exiting)) {
     303             :                                 break;
     304             :                         }
     305     7780122 :                         fnxt = 0;
     306     7780122 :                         assert(fe);
     307     7780122 :                         flow = fe->flow;
     308     7780122 :                         assert(flow);
     309     7780122 :                         MT_thread_set_qry_ctx(flow->set_qry_ctx ? &flow->cntxt->
     310             :                                                                   qryctx : NULL);
     311             : 
     312             :                         /* whenever we have a (concurrent) error, skip it */
     313     7782744 :                         if (ATOMIC_PTR_GET(&flow->error)) {
     314        4203 :                                 q_enqueue(flow->done, fe);
     315        4207 :                                 continue;
     316             :                         }
     317             : 
     318     7778541 :                         p = getInstrPtr(flow->mb, fe->pc);
     319     7778541 :                         claim = fe->argclaim;
     320    15560423 :                         if (p->fcn != (MALfcn) deblockdataflow &&    /* never block on deblockdataflow() */
     321     7778541 :                                 !MALadmission_claim(flow->cntxt, flow->mb, flow->stk, p, claim)) {
     322           0 :                                 fe->hotclaim = 0;    /* don't assume priority anymore */
     323           0 :                                 fe->maxclaim = 0;
     324           0 :                                 MT_lock_set(&todo->l);
     325           0 :                                 FlowEvent last = todo->last;
     326           0 :                                 MT_lock_unset(&todo->l);
     327           0 :                                 if (last == NULL)
     328           0 :                                         MT_sleep_ms(DELAYUNIT);
     329           0 :                                 q_requeue(todo, fe);
     330           0 :                                 continue;
     331             :                         }
     332     7781882 :                         ATOMIC_BASE_TYPE wrks = ATOMIC_INC(&flow->cntxt->workers);
     333     7781882 :                         ATOMIC_BASE_TYPE mwrks = ATOMIC_GET(&flow->mb->workers);
     334     7781894 :                         while (wrks > mwrks) {
     335       15542 :                                 if (ATOMIC_CAS(&flow->mb->workers, &mwrks, wrks))
     336             :                                         break;
     337             :                         }
     338     7781882 :                         error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1,
     339             :                                                                    flow->stk, 0, 0);
     340     7779018 :                         ATOMIC_DEC(&flow->cntxt->workers);
     341             :                         /* release the memory claim */
     342     7779018 :                         MALadmission_release(flow->cntxt, flow->mb, flow->stk, p, claim);
     343             : 
     344     7781857 :                         MT_lock_set(&flow->flowlock);
     345     7782032 :                         fe->state = DFLOWwrapup;
     346     7782032 :                         MT_lock_unset(&flow->flowlock);
     347     7781815 :                         if (error) {
     348         486 :                                 void *null = NULL;
     349             :                                 /* only collect one error (from one thread, needed for stable testing) */
     350         486 :                                 if (!ATOMIC_PTR_CAS(&flow->error, &null, error))
     351           9 :                                         freeException(error);
     352             :                                 /* after an error we skip the rest of the block */
     353         486 :                                 q_enqueue(flow->done, fe);
     354         486 :                                 continue;
     355             :                         }
     356             : 
     357             :                         /* see if you can find an eligible instruction that uses the
     358             :                          * result just produced. Then we can continue with it right away.
     359             :                          * We are just looking forward for the last block, which means we
     360             :                          * are safe from concurrent actions. No other thread can steal it,
     361             :                          * because we hold the logical lock.
     362             :                          * All eligible instructions are queued
     363             :                          */
     364     7781329 :                         p = getInstrPtr(flow->mb, fe->pc);
     365     7781329 :                         assert(p);
     366     7781329 :                         fe->hotclaim = 0;
     367     7781329 :                         fe->maxclaim = 0;
     368             : 
     369    16167304 :                         for (i = 0; i < p->retc; i++) {
     370     8388600 :                                 lng footprint;
     371     8388600 :                                 footprint = getMemoryClaim(flow->mb, flow->stk, p, i, FALSE);
     372     8385975 :                                 fe->hotclaim += footprint;
     373     8385975 :                                 if (footprint > fe->maxclaim)
     374     3658658 :                                         fe->maxclaim = footprint;
     375             :                         }
     376             : 
     377             : /* Try to get rid of the hot potato or locate an alternative to proceed.
     378             :  */
     379             : #define HOTPOTATO
     380             : #ifdef HOTPOTATO
     381             :                         /* HOT potato choice */
     382     7778704 :                         int last = 0, nxt = -1;
     383     7778704 :                         lng nxtclaim = -1;
     384             : 
     385     7778704 :                         MT_lock_set(&flow->flowlock);
     386     7781618 :                         for (last = fe->pc - flow->start;
     387    29537109 :                                  last >= 0 && (i = flow->nodes[last]) > 0;
     388    21755491 :                                  last = flow->edges[last]) {
     389    21755491 :                                 if (flow->status[i].state == DFLOWpending
     390    21754410 :                                         && flow->status[i].blocks == 1) {
     391             :                                         /* find the one with the largest footprint */
     392     5945525 :                                         if (nxt == -1 || flow->status[i].argclaim > nxtclaim) {
     393     3336184 :                                                 nxt = i;
     394     3336184 :                                                 nxtclaim = flow->status[i].argclaim;
     395             :                                         }
     396             :                                 }
     397             :                         }
     398             :                         /* hot potato can not be removed, use alternative to proceed */
     399     7781618 :                         if (nxt >= 0) {
     400     3071742 :                                 flow->status[nxt].state = DFLOWrunning;
     401     3071742 :                                 flow->status[nxt].blocks = 0;
     402     3071742 :                                 flow->status[nxt].hotclaim = fe->hotclaim;
     403     3071742 :                                 flow->status[nxt].argclaim += fe->hotclaim;
     404     3071742 :                                 if (flow->status[nxt].maxclaim < fe->maxclaim)
     405     1585478 :                                         flow->status[nxt].maxclaim = fe->maxclaim;
     406             :                                 fnxt = flow->status + nxt;
     407             :                         }
     408     7781618 :                         MT_lock_unset(&flow->flowlock);
     409             : #endif
     410             : 
     411     7781504 :                         q_enqueue(flow->done, fe);
     412     7777674 :                         if (fnxt == 0 && profilerStatus) {
     413           0 :                                 profilerHeartbeatEvent("wait");
     414             :                         }
     415             :                 }
     416      145896 :                 MT_lock_set(&dataflowLock);
     417      145897 :                 if (GDKexiting() || ATOMIC_GET(&exiting) || free_count >= free_max) {
     418             :                         locked = true;
     419             :                         break;
     420             :                 }
     421      138750 :                 free_count++;
     422      138750 :                 struct worker **tp = &workers;
     423      543070 :                 while (*tp && *tp != t)
     424      404320 :                         tp = &(*tp)->next;
     425      138750 :                 assert(*tp && *tp == t);
     426      138750 :                 *tp = t->next;
     427      138750 :                 t->flag = FREE;
     428      138750 :                 t->next = free_workers;
     429      138750 :                 free_workers = t;
     430      138750 :                 MT_lock_unset(&dataflowLock);
     431      138750 :                 MT_thread_setworking("idle, waiting for new client");
     432      138750 :                 MT_sema_down(&t->s);
     433      138748 :                 if (GDKexiting() || ATOMIC_GET(&exiting))
     434             :                         break;
     435      138157 :                 assert(t->flag == WAITING);
     436             :         }
     437         591 :         if (!locked)
     438         591 :                 MT_lock_set(&dataflowLock);
     439        7738 :         if (t->flag != FINISHING) {
     440        7106 :                 struct worker **tp = t->flag == FREE ? &free_workers : &workers;
     441       37258 :                 while (*tp && *tp != t)
     442       30152 :                         tp = &(*tp)->next;
     443        7106 :                 assert(*tp && *tp == t);
     444        7106 :                 *tp = t->next;
     445        7106 :                 t->flag = EXITED;
     446        7106 :                 t->next = exited_workers;
     447        7106 :                 exited_workers = t;
     448             :         }
     449        7738 :         MT_lock_unset(&dataflowLock);
     450        7738 :         GDKsetbuf(NULL);
     451        7738 : }
     452             : 
     453             : /*
     454             :  * Create an interpreter pool.
     455             :  * One worker will adaptively be available for each client.
     456             :  * The remainder are taken from the GDKnr_threads argument and
     457             :  * typically is equal to the number of cores
     458             :  * The workers are assembled in a local table to enable debugging.
     459             :  */
     460             : static int
     461         335 : DFLOWinitialize(void)
     462             : {
     463         335 :         int limit;
     464         335 :         int created = 0;
     465             : 
     466         335 :         MT_lock_set(&mal_contextLock);
     467         335 :         MT_lock_set(&dataflowLock);
     468         335 :         if (todo) {
     469             :                 /* somebody else beat us to it */
     470           0 :                 MT_lock_unset(&dataflowLock);
     471           0 :                 MT_lock_unset(&mal_contextLock);
     472           0 :                 return 0;
     473             :         }
     474         335 :         free_max = GDKgetenv_int("dataflow_max_free",
     475             :                                                          GDKnr_threads < 4 ? 4 : GDKnr_threads);
     476         335 :         todo = q_create("todo");
     477         335 :         if (todo == NULL) {
     478           0 :                 MT_lock_unset(&dataflowLock);
     479           0 :                 MT_lock_unset(&mal_contextLock);
     480           0 :                 return -1;
     481             :         }
     482         335 :         limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
     483        1338 :         while (limit > 0) {
     484        1003 :                 limit--;
     485        1003 :                 struct worker *t = GDKmalloc(sizeof(*t));
     486        1003 :                 if (t == NULL) {
     487           0 :                         TRC_CRITICAL(MAL_SERVER, "cannot allocate structure for worker");
     488           0 :                         continue;
     489             :                 }
     490        1003 :                 *t = (struct worker) {
     491             :                         .flag = RUNNING,
     492             :                 };
     493        1003 :                 ATOMIC_PTR_INIT(&t->cntxt, NULL);
     494        1003 :                 MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
     495        1003 :                 if (MT_create_thread(&t->id, DFLOWworker, t,
     496             :                                                          MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
     497           0 :                         ATOMIC_PTR_DESTROY(&t->cntxt);
     498           0 :                         MT_sema_destroy(&t->s);
     499           0 :                         GDKfree(t);
     500             :                 } else {
     501        1003 :                         t->next = workers;
     502        1003 :                         workers = t;
     503        1003 :                         created++;
     504             :                 }
     505             :         }
     506         335 :         if (created == 0) {
     507             :                 /* no threads created */
     508           0 :                 q_destroy(todo);
     509           0 :                 todo = NULL;
     510           0 :                 MT_lock_unset(&dataflowLock);
     511           0 :                 MT_lock_unset(&mal_contextLock);
     512           0 :                 return -1;
     513             :         }
     514         335 :         MT_lock_unset(&dataflowLock);
     515         335 :         MT_lock_unset(&mal_contextLock);
     516         335 :         return 0;
     517             : }
     518             : 
     519             : /*
     520             :  * The dataflow administration is based on administration of
     521             :  * how many variables are still missing before it can be executed.
     522             :  * For each instruction we keep a list of instructions whose
     523             :  * blocking counter should be decremented upon finishing it.
     524             :  */
     525             : static str
     526      144897 : DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size)
     527             : {
     528      144897 :         int pc, i, j, k, l, n, etop = 0;
     529      144897 :         int *assign;
     530      144897 :         InstrPtr p;
     531             : 
     532      144897 :         if (flow == NULL)
     533           0 :                 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL");
     534      144897 :         if (mb == NULL)
     535           0 :                 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL");
     536      144897 :         assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
     537      144897 :         if (assign == NULL)
     538           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     539      144897 :         etop = flow->stop - flow->start;
     540     7931772 :         for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
     541     7786875 :                 p = getInstrPtr(mb, pc);
     542     7786875 :                 if (p == NULL) {
     543           0 :                         GDKfree(assign);
     544           0 :                         throw(MAL, "dataflow",
     545             :                                   "DFLOWinitBlk(): getInstrPtr() returned NULL");
     546             :                 }
     547             : 
     548             :                 /* initial state, ie everything can run */
     549     7786875 :                 flow->status[n].flow = flow;
     550     7786875 :                 flow->status[n].pc = pc;
     551     7786875 :                 flow->status[n].state = DFLOWpending;
     552     7786875 :                 flow->status[n].cost = -1;
     553     7786875 :                 ATOMIC_PTR_SET(&flow->status[n].flow->error, NULL);
     554             : 
     555             :                 /* administer flow dependencies */
     556    36779763 :                 for (j = p->retc; j < p->argc; j++) {
     557             :                         /* list of instructions that wake n-th instruction up */
     558    28992888 :                         if (!isVarConstant(mb, getArg(p, j)) && (k = assign[getArg(p, j)])) {
     559    13225138 :                                 assert(k < pc);      /* only dependencies on earlier instructions */
     560             :                                 /* add edge to the target instruction for wakeup call */
     561    13225138 :                                 k -= flow->start;
     562    13225138 :                                 if (flow->nodes[k]) {
     563             :                                         /* add wakeup to tail of list */
     564    97737525 :                                         for (i = k; flow->edges[i] > 0; i = flow->edges[i])
     565             :                                                 ;
     566    11513073 :                                         flow->nodes[etop] = n;
     567    11513073 :                                         flow->edges[etop] = -1;
     568    11513073 :                                         flow->edges[i] = etop;
     569    11513073 :                                         etop++;
     570    11513073 :                                         (void) size;
     571    11513073 :                                         if (etop == size) {
     572         220 :                                                 int *tmp;
     573             :                                                 /* in case of realloc failure, the original
     574             :                                                  * pointers will be freed by the caller */
     575         220 :                                                 tmp = (int *) GDKrealloc(flow->nodes,
     576             :                                                                                                  sizeof(int) * 2 * size);
     577         220 :                                                 if (tmp == NULL) {
     578           0 :                                                         GDKfree(assign);
     579           0 :                                                         throw(MAL, "dataflow",
     580             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     581             :                                                 }
     582         220 :                                                 flow->nodes = tmp;
     583         220 :                                                 tmp = (int *) GDKrealloc(flow->edges,
     584             :                                                                                                  sizeof(int) * 2 * size);
     585         220 :                                                 if (tmp == NULL) {
     586           0 :                                                         GDKfree(assign);
     587           0 :                                                         throw(MAL, "dataflow",
     588             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     589             :                                                 }
     590         220 :                                                 flow->edges = tmp;
     591         220 :                                                 size *= 2;
     592             :                                         }
     593             :                                 } else {
     594     1712065 :                                         flow->nodes[k] = n;
     595     1712065 :                                         flow->edges[k] = -1;
     596             :                                 }
     597             : 
     598    13225138 :                                 flow->status[n].blocks++;
     599             :                         }
     600             : 
     601             :                         /* list of instructions to be woken up explicitly */
     602    28992888 :                         if (!isVarConstant(mb, getArg(p, j))) {
     603             :                                 /* be careful, watch out for garbage collection interference */
     604             :                                 /* those should be scheduled after all its other uses */
     605    14766836 :                                 l = getEndScope(mb, getArg(p, j));
     606    14766836 :                                 if (l != pc && l < flow->stop && l > flow->start) {
     607             :                                         /* add edge to the target instruction for wakeup call */
     608     8543154 :                                         assert(pc < l);      /* only dependencies on earlier instructions */
     609     8543154 :                                         l -= flow->start;
     610     8543154 :                                         if (flow->nodes[n]) {
     611             :                                                 /* add wakeup to tail of list */
     612    39398810 :                                                 for (i = n; flow->edges[i] > 0; i = flow->edges[i])
     613             :                                                         ;
     614     4280324 :                                                 flow->nodes[etop] = l;
     615     4280324 :                                                 flow->edges[etop] = -1;
     616     4280324 :                                                 flow->edges[i] = etop;
     617     4280324 :                                                 etop++;
     618     4280324 :                                                 if (etop == size) {
     619         116 :                                                         int *tmp;
     620             :                                                         /* in case of realloc failure, the original
     621             :                                                          * pointers will be freed by the caller */
     622         116 :                                                         tmp = (int *) GDKrealloc(flow->nodes,
     623             :                                                                                                          sizeof(int) * 2 * size);
     624         116 :                                                         if (tmp == NULL) {
     625           0 :                                                                 GDKfree(assign);
     626           0 :                                                                 throw(MAL, "dataflow",
     627             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     628             :                                                         }
     629         116 :                                                         flow->nodes = tmp;
     630         116 :                                                         tmp = (int *) GDKrealloc(flow->edges,
     631             :                                                                                                          sizeof(int) * 2 * size);
     632         116 :                                                         if (tmp == NULL) {
     633           0 :                                                                 GDKfree(assign);
     634           0 :                                                                 throw(MAL, "dataflow",
     635             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     636             :                                                         }
     637         116 :                                                         flow->edges = tmp;
     638         116 :                                                         size *= 2;
     639             :                                                 }
     640             :                                         } else {
     641     4262830 :                                                 flow->nodes[n] = l;
     642     4262830 :                                                 flow->edges[n] = -1;
     643             :                                         }
     644     8543154 :                                         flow->status[l].blocks++;
     645             :                                 }
     646             :                         }
     647             :                 }
     648             : 
     649    16181740 :                 for (j = 0; j < p->retc; j++)
     650     8394865 :                         assign[getArg(p, j)] = pc;      /* ensure recognition of dependency on first instruction and constant */
     651             :         }
     652      144897 :         GDKfree(assign);
     653             : 
     654      144897 :         return MAL_SUCCEED;
     655             : }
     656             : 
     657             : /*
     658             :  * Parallel processing is mostly driven by dataflow, but within this context
     659             :  * there may be different schemes to take instructions into execution.
     660             :  * The admission scheme (and wrapup) are the necessary scheduler hooks.
     661             :  * A scheduler registers the functions needed and should release them
     662             :  * at the end of the parallel block.
     663             :  * They take effect after we have ensured that the basic properties for
     664             :  * execution hold.
     665             :  */
     666             : static str
     667      144897 : DFLOWscheduler(DataFlow flow, struct worker *w)
     668             : {
     669      144897 :         int last;
     670      144897 :         int i;
     671      144897 :         int j;
     672      144897 :         InstrPtr p;
     673      144897 :         int tasks = 0, actions = 0;
     674      144897 :         str ret = MAL_SUCCEED;
     675      144897 :         FlowEvent fe, f = 0;
     676             : 
     677      144897 :         if (flow == NULL)
     678           0 :                 throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL");
     679      144897 :         actions = flow->stop - flow->start;
     680      144897 :         if (actions == 0)
     681           0 :                 throw(MAL, "dataflow", "Empty dataflow block");
     682             :         /* initialize the eligible statements */
     683      144897 :         fe = flow->status;
     684             : 
     685      144897 :         ATOMIC_DEC(&flow->cntxt->workers);
     686      144897 :         MT_lock_set(&flow->flowlock);
     687     8076765 :         for (i = 0; i < actions; i++)
     688     7786971 :                 if (fe[i].blocks == 0) {
     689      897243 :                         p = getInstrPtr(flow->mb, fe[i].pc);
     690      897243 :                         if (p == NULL) {
     691           0 :                                 MT_lock_unset(&flow->flowlock);
     692           0 :                                 ATOMIC_INC(&flow->cntxt->workers);
     693           0 :                                 throw(MAL, "dataflow",
     694             :                                           "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc) returned NULL");
     695             :                         }
     696      897243 :                         fe[i].argclaim = 0;
     697     5276441 :                         for (j = p->retc; j < p->argc; j++)
     698     4379198 :                                 fe[i].argclaim += getMemoryClaim(fe[0].flow->mb,
     699     4379200 :                                                                                                  fe[0].flow->stk, p, j, FALSE);
     700      897241 :                         flow->status[i].state = DFLOWrunning;
     701      897241 :                         q_enqueue(todo, flow->status + i);
     702             :                 }
     703      144897 :         MT_lock_unset(&flow->flowlock);
     704      144897 :         MT_sema_up(&w->s);
     705             : 
     706      144897 :         while (actions != tasks) {
     707     7786589 :                 f = q_dequeue(flow->done, NULL);
     708     7786705 :                 if (ATOMIC_GET(&exiting))
     709             :                         break;
     710     7786705 :                 if (f == NULL) {
     711           0 :                         ATOMIC_INC(&flow->cntxt->workers);
     712           0 :                         throw(MAL, "dataflow",
     713             :                                   "DFLOWscheduler(): q_dequeue(flow->done) returned NULL");
     714             :                 }
     715             : 
     716             :                 /*
     717             :                  * When an instruction is finished we have to reduce the blocked
     718             :                  * counter for all dependent instructions.  for those where it
     719             :                  * drops to zero we can scheduler it we do it here instead of the scheduler
     720             :                  */
     721             : 
     722     7786705 :                 MT_lock_set(&flow->flowlock);
     723     7786273 :                 tasks++;
     724     7786273 :                 for (last = f->pc - flow->start;
     725    29552913 :                          last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last])
     726    21766405 :                         if (flow->status[i].state == DFLOWpending) {
     727    18695540 :                                 flow->status[i].argclaim += f->hotclaim;
     728    18695540 :                                 if (flow->status[i].blocks == 1) {
     729     3817850 :                                         flow->status[i].blocks--;
     730     3817850 :                                         flow->status[i].state = DFLOWrunning;
     731     3817850 :                                         q_enqueue(todo, flow->status + i);
     732             :                                 } else {
     733    14877690 :                                         flow->status[i].blocks--;
     734             :                                 }
     735             :                         }
     736     7931405 :                 MT_lock_unset(&flow->flowlock);
     737             :         }
     738             :         /* release the worker from its specific task (turn it into a
     739             :          * generic worker) */
     740      144897 :         ATOMIC_PTR_SET(&w->cntxt, NULL);
     741      144897 :         ATOMIC_INC(&flow->cntxt->workers);
     742             :         /* wrap up errors */
     743      144897 :         assert(flow->done->last == 0);
     744      144897 :         if ((ret = ATOMIC_PTR_XCG(&flow->error, NULL)) != NULL) {
     745         477 :                 TRC_DEBUG(MAL_SERVER, "Errors encountered: %s\n", ret);
     746             :         }
     747             :         return ret;
     748             : }
     749             : 
     750             : /* called and returns with dataflowLock locked, temporarily unlocks
     751             :  * join the thread associated with the worker and destroy the structure */
     752             : static inline void
     753        7738 : finish_worker(struct worker *t)
     754             : {
     755        7738 :         t->flag = FINISHING;
     756        7738 :         MT_lock_unset(&dataflowLock);
     757        7738 :         MT_join_thread(t->id);
     758        7738 :         MT_sema_destroy(&t->s);
     759        7738 :         ATOMIC_PTR_DESTROY(&t->cntxt);
     760        7738 :         GDKfree(t);
     761        7738 :         MT_lock_set(&dataflowLock);
     762        7738 : }
     763             : 
     764             : /* We create a pool of GDKnr_threads-1 generic workers, that is,
     765             :  * workers that will take on jobs from any clients.  In addition, we
     766             :  * create a single specific worker per client (i.e. each time we enter
     767             :  * here).  This specific worker will only do work for the client for
     768             :  * which it was started.  In this way we can guarantee that there will
     769             :  * always be progress for the client, even if all other workers are
     770             :  * doing something big.
     771             :  *
     772             :  * When all jobs for a client have been done (there are no more
     773             :  * entries for the client in the queue), the specific worker turns
     774             :  * itself into a generic worker.  At the same time, we signal that one
     775             :  * generic worker should exit and this function returns.  In this way
     776             :  * we make sure that there are once again GDKnr_threads-1 generic
     777             :  * workers. */
     778             : str
     779      144897 : runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc,
     780             :                            MalStkPtr stk)
     781             : {
     782      144897 :         DataFlow flow = NULL;
     783      144897 :         str msg = MAL_SUCCEED;
     784      144897 :         int size;
     785      144897 :         bit *ret;
     786      144897 :         struct worker *t;
     787             : 
     788      144897 :         if (stk == NULL)
     789           0 :                 throw(MAL, "dataflow", "runMALdataflow(): Called with stk == NULL");
     790      144897 :         ret = getArgReference_bit(stk, getInstrPtr(mb, startpc), 0);
     791      144897 :         *ret = FALSE;
     792             : 
     793      144897 :         assert(stoppc > startpc);
     794             : 
     795             :         /* check existence of workers */
     796      144897 :         if (todo == NULL) {
     797             :                 /* create thread pool */
     798         335 :                 if (GDKnr_threads <= 1 || DFLOWinitialize() < 0) {
     799             :                         /* no threads created, run serially */
     800           0 :                         *ret = TRUE;
     801           0 :                         return MAL_SUCCEED;
     802             :                 }
     803             :         }
     804      144897 :         assert(todo);
     805             :         /* in addition, create one more worker that will only execute
     806             :          * tasks for the current client to compensate for our waiting
     807             :          * until all work is done */
     808      144897 :         MT_lock_set(&dataflowLock);
     809             :         /* join with already exited threads */
     810      151044 :         while (exited_workers != NULL) {
     811        6147 :                 assert(exited_workers->flag == EXITED);
     812        6147 :                 struct worker *t = exited_workers;
     813        6147 :                 exited_workers = exited_workers->next;
     814        6147 :                 finish_worker(t);
     815             :         }
     816      144897 :         assert(cntxt != NULL);
     817      144897 :         if (free_workers != NULL) {
     818      138158 :                 t = free_workers;
     819      138158 :                 assert(t->flag == FREE);
     820      138158 :                 assert(free_count > 0);
     821      138158 :                 free_count--;
     822      138158 :                 free_workers = t->next;
     823      138158 :                 t->next = workers;
     824      138158 :                 workers = t;
     825      138158 :                 t->flag = WAITING;
     826      138158 :                 ATOMIC_PTR_SET(&t->cntxt, cntxt);
     827      138158 :                 MT_sema_up(&t->s);
     828             :         } else {
     829        6739 :                 t = GDKmalloc(sizeof(*t));
     830        6739 :                 if (t != NULL) {
     831        6739 :                         *t = (struct worker) {
     832             :                                 .flag = WAITING,
     833             :                         };
     834        6739 :                         ATOMIC_PTR_INIT(&t->cntxt, cntxt);
     835        6739 :                         MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
     836        6739 :                         if (MT_create_thread(&t->id, DFLOWworker, t,
     837             :                                                                  MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
     838           0 :                                 ATOMIC_PTR_DESTROY(&t->cntxt);
     839           0 :                                 MT_sema_destroy(&t->s);
     840           0 :                                 GDKfree(t);
     841           0 :                                 t = NULL;
     842             :                         } else {
     843        6739 :                                 t->next = workers;
     844        6739 :                                 workers = t;
     845             :                         }
     846             :                 }
     847        6739 :                 if (t == NULL) {
     848             :                         /* cannot start new thread, run serially */
     849           0 :                         *ret = TRUE;
     850           0 :                         MT_lock_unset(&dataflowLock);
     851           0 :                         return MAL_SUCCEED;
     852             :                 }
     853             :         }
     854      144897 :         MT_lock_unset(&dataflowLock);
     855             : 
     856      144897 :         flow = (DataFlow) GDKzalloc(sizeof(DataFlowRec));
     857      144897 :         if (flow == NULL)
     858           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     859             : 
     860      144897 :         flow->cntxt = cntxt;
     861      144897 :         flow->mb = mb;
     862      144897 :         flow->stk = stk;
     863      144897 :         flow->set_qry_ctx = MT_thread_get_qry_ctx() != NULL;
     864             : 
     865             :         /* keep real block count, exclude brackets */
     866      144897 :         flow->start = startpc + 1;
     867      144897 :         flow->stop = stoppc;
     868             : 
     869      144897 :         flow->done = q_create("flow->done");
     870      144897 :         if (flow->done == NULL) {
     871           0 :                 GDKfree(flow);
     872           0 :                 throw(MAL, "dataflow",
     873             :                           "runMALdataflow(): Failed to create flow->done queue");
     874             :         }
     875             : 
     876      144897 :         flow->status = (FlowEvent) GDKzalloc((stoppc - startpc + 1) *
     877             :                                                                                  sizeof(FlowEventRec));
     878      144897 :         if (flow->status == NULL) {
     879           0 :                 q_destroy(flow->done);
     880           0 :                 GDKfree(flow);
     881           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     882             :         }
     883      144897 :         size = DFLOWgraphSize(mb, startpc, stoppc);
     884      144897 :         size += stoppc - startpc;
     885      144897 :         flow->nodes = (int *) GDKzalloc(sizeof(int) * size);
     886      144897 :         if (flow->nodes == NULL) {
     887           0 :                 GDKfree(flow->status);
     888           0 :                 q_destroy(flow->done);
     889           0 :                 GDKfree(flow);
     890           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     891             :         }
     892      144897 :         flow->edges = (int *) GDKzalloc(sizeof(int) * size);
     893      144897 :         if (flow->edges == NULL) {
     894           0 :                 GDKfree(flow->nodes);
     895           0 :                 GDKfree(flow->status);
     896           0 :                 q_destroy(flow->done);
     897           0 :                 GDKfree(flow);
     898           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     899             :         }
     900      144897 :         MT_lock_init(&flow->flowlock, "flow->flowlock");
     901      144897 :         ATOMIC_PTR_INIT(&flow->error, NULL);
     902      144897 :         msg = DFLOWinitBlk(flow, mb, size);
     903             : 
     904      144897 :         if (msg == MAL_SUCCEED)
     905      144897 :                 msg = DFLOWscheduler(flow, t);
     906             : 
     907      144897 :         GDKfree(flow->status);
     908      144896 :         GDKfree(flow->edges);
     909      144897 :         GDKfree(flow->nodes);
     910      144896 :         q_destroy(flow->done);
     911      144897 :         MT_lock_destroy(&flow->flowlock);
     912      144897 :         ATOMIC_PTR_DESTROY(&flow->error);
     913      144897 :         GDKfree(flow);
     914             : 
     915             :         /* we created one worker, now tell one worker to exit again */
     916      144897 :         MT_lock_set(&todo->l);
     917      144897 :         todo->exitcount++;
     918      144897 :         MT_lock_unset(&todo->l);
     919      144897 :         MT_sema_up(&todo->s);
     920             : 
     921      144897 :         return msg;
     922             : }
     923             : 
     924             : str
     925           0 : deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     926             : {
     927           0 :         int *ret = getArgReference_int(stk, pci, 0);
     928           0 :         int *val = getArgReference_int(stk, pci, 1);
     929           0 :         (void) cntxt;
     930           0 :         (void) mb;
     931           0 :         *ret = *val;
     932           0 :         return MAL_SUCCEED;
     933             : }
     934             : 
     935             : static void
     936         334 : stopMALdataflow(void)
     937             : {
     938         334 :         ATOMIC_SET(&exiting, 1);
     939         334 :         if (todo) {
     940         334 :                 MT_lock_set(&dataflowLock);
     941             :                 /* first wake up all running threads */
     942         334 :                 int n = 0;
     943         925 :                 for (struct worker *t = free_workers; t; t = t->next)
     944         591 :                         n++;
     945        1334 :                 for (struct worker *t = workers; t; t = t->next)
     946        1000 :                         n++;
     947        1925 :                 while (n-- > 0) {
     948             :                         /* one UP for each thread we know about */
     949        1925 :                         MT_sema_up(&todo->s);
     950             :                 }
     951         925 :                 while (free_workers) {
     952         591 :                         struct worker *t = free_workers;
     953         591 :                         assert(free_count > 0);
     954         591 :                         free_count--;
     955         591 :                         free_workers = free_workers->next;
     956         591 :                         MT_sema_up(&t->s);
     957         591 :                         finish_worker(t);
     958             :                 }
     959         375 :                 while (workers) {
     960          41 :                         struct worker *t = workers;
     961          41 :                         workers = workers->next;
     962          41 :                         finish_worker(t);
     963             :                 }
     964        1293 :                 while (exited_workers) {
     965         959 :                         struct worker *t = exited_workers;
     966         959 :                         exited_workers = exited_workers->next;
     967         959 :                         finish_worker(t);
     968             :                 }
     969         334 :                 MT_lock_unset(&dataflowLock);
     970             :         }
     971         334 : }
     972             : 
     973             : void
     974         334 : mal_dataflow_reset(void)
     975             : {
     976         334 :         stopMALdataflow();
     977         334 :         workers = exited_workers = NULL;
     978         334 :         if (todo) {
     979         334 :                 MT_lock_destroy(&todo->l);
     980         334 :                 MT_sema_destroy(&todo->s);
     981         334 :                 GDKfree(todo);
     982             :         }
     983         334 :         todo = 0;                                       /* pending instructions */
     984         334 :         ATOMIC_SET(&exiting, 0);
     985         334 : }

Generated by: LCOV version 1.14