LCOV - code coverage report
Current view: top level - monetdb5/mal - mal_dataflow.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 449 519 86.5 %
Date: 2024-10-03 20:03:20 Functions: 13 14 92.9 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /*
      14             :  * (author) M Kersten, S Mullender
      15             :  * Dataflow processing only works on a code
      16             :  * sequence that does not include additional (implicit) flow of control
      17             :  * statements and, ideally, consist of expensive BAT operations.
      18             :  * The dataflow portion is identified as a guarded block,
      19             :  * whose entry is controlled by the function language.dataflow();
      20             :  *
      21             :  * The dataflow worker tries to follow the sequence of actions
      22             :  * as laid out in the plan, but abandon this track when it hits
      23             :  * a blocking operator, or an instruction for which not all arguments
      24             :  * are available or resources become scarce.
      25             :  *
      26             :  * The flow graphs is organized such that parallel threads can
      27             :  * access it mostly without expensive locking and dependent
      28             :  * variables are easy to find..
      29             :  */
      30             : #include "monetdb_config.h"
      31             : #include "mal_dataflow.h"
      32             : #include "mal_exception.h"
      33             : #include "mal_private.h"
      34             : #include "mal_internal.h"
      35             : #include "mal_runtime.h"
      36             : #include "mal_resource.h"
      37             : #include "mal_function.h"
      38             : 
      39             : #define DFLOWpending 0                  /* runnable */
      40             : #define DFLOWrunning 1                  /* currently in progress */
      41             : #define DFLOWwrapup  2                  /* done! */
      42             : #define DFLOWretry   3                  /* reschedule */
      43             : #define DFLOWskipped 4                  /* due to errors */
      44             : 
      45             : /* The per instruction status of execution */
      46             : typedef struct FLOWEVENT {
      47             :         struct DATAFLOW *flow;          /* execution context */
      48             :         int pc;                                         /* pc in underlying malblock */
      49             :         int blocks;                                     /* awaiting for variables */
      50             :         sht state;                                      /* of execution */
      51             :         lng clk;
      52             :         sht cost;
      53             :         lng hotclaim;                           /* memory foot print of result variables */
      54             :         lng argclaim;                           /* memory foot print of arguments */
      55             :         lng maxclaim;                           /* memory foot print of largest argument, could be used to indicate result size */
      56             :         struct FLOWEVENT *next;         /* linked list for queues */
      57             : } *FlowEvent, FlowEventRec;
      58             : 
      59             : typedef struct queue {
      60             :         int exitcount;                          /* how many threads should exit */
      61             :         FlowEvent first, last;          /* first and last element of the queue */
      62             :         MT_Lock l;                                      /* it's a shared resource, ie we need locks */
      63             :         MT_Sema s;                                      /* threads wait on empty queues */
      64             : } Queue;
      65             : 
      66             : /*
      67             :  * The dataflow dependency is administered in a graph list structure.
      68             :  * For each instruction we keep the list of instructions that
      69             :  * should be checked for eligibility once we are finished with it.
      70             :  */
      71             : typedef struct DATAFLOW {
      72             :         Client cntxt;                           /* for debugging and client resolution */
      73             :         MalBlkPtr mb;                           /* carry the context */
      74             :         MalStkPtr stk;
      75             :         int start, stop;                        /* guarded block under consideration */
      76             :         FlowEvent status;                       /* status of each instruction */
      77             :         ATOMIC_PTR_TYPE error;          /* error encountered */
      78             :         int *nodes;                                     /* dependency graph nodes */
      79             :         int *edges;                                     /* dependency graph */
      80             :         MT_Lock flowlock;                       /* lock to protect the above */
      81             :         Queue *done;                            /* instructions handled */
      82             :         bool set_qry_ctx;
      83             : } *DataFlow, DataFlowRec;
      84             : 
      85             : struct worker {
      86             :         MT_Id id;
      87             :         enum { WAITING, RUNNING, FREE, EXITED, FINISHING } flag;
      88             :         ATOMIC_PTR_TYPE cntxt;          /* client we do work for (NULL -> any) */
      89             :         MT_Sema s;
      90             :         struct worker *next;
      91             :         char errbuf[GDKMAXERRLEN];      /* GDKerrbuf so that we can allocate before fork */
      92             : };
      93             : /* heads of three mutually exclusive linked lists, all using the .next
      94             :  * field in the worker struct */
      95             : static struct worker *workers;            /* "working" workers */
      96             : static struct worker *exited_workers; /* to be joined threads (.flag==EXITED) */
      97             : static struct worker *free_workers;     /* free workers (.flag==FREE) */
      98             : static int free_count = 0;              /* number of free threads */
      99             : static int free_max = 0;                /* max number of spare free threads */
     100             : 
     101             : static Queue *todo = 0;                 /* pending instructions */
     102             : 
     103             : static ATOMIC_TYPE exiting = ATOMIC_VAR_INIT(0);
     104             : static MT_Lock dataflowLock = MT_LOCK_INITIALIZER(dataflowLock);
     105             : 
     106             : /*
     107             :  * Calculate the size of the dataflow dependency graph.
     108             :  */
     109             : static int
     110             : DFLOWgraphSize(MalBlkPtr mb, int start, int stop)
     111             : {
     112             :         int cnt = 0;
     113             : 
     114     8348408 :         for (int i = start; i < stop; i++)
     115     8203513 :                 cnt += getInstrPtr(mb, i)->argc;
     116      144895 :         return cnt;
     117             : }
     118             : 
     119             : /*
     120             :  * The dataflow execution is confined to a barrier block.
     121             :  * Within the block there are multiple flows, which, in principle,
     122             :  * can be executed in parallel.
     123             :  */
     124             : 
     125             : static Queue *
     126      145221 : q_create(const char *name)
     127             : {
     128      145221 :         Queue *q = GDKzalloc(sizeof(Queue));
     129             : 
     130      145221 :         if (q == NULL)
     131             :                 return NULL;
     132      145221 :         MT_lock_init(&q->l, name);
     133      145221 :         MT_sema_init(&q->s, 0, name);
     134      145221 :         return q;
     135             : }
     136             : 
     137             : static void
     138      144895 : q_destroy(Queue *q)
     139             : {
     140      144895 :         assert(q);
     141      144895 :         MT_lock_destroy(&q->l);
     142      144894 :         MT_sema_destroy(&q->s);
     143      144895 :         GDKfree(q);
     144      144895 : }
     145             : 
     146             : /* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue is possible */
     147             : /* we might actually sort it for better scheduling behavior */
     148             : static void
     149    12955155 : q_enqueue(Queue *q, FlowEvent d)
     150             : {
     151    12955155 :         assert(q);
     152    12955155 :         assert(d);
     153    12955155 :         MT_lock_set(&q->l);
     154    12952048 :         if (q->first == NULL) {
     155     4400373 :                 assert(q->last == NULL);
     156     4400373 :                 q->first = q->last = d;
     157             :         } else {
     158     8551675 :                 assert(q->last != NULL);
     159     8551675 :                 q->last->next = d;
     160     8551675 :                 q->last = d;
     161             :         }
     162    12952048 :         d->next = NULL;
     163    12952048 :         MT_lock_unset(&q->l);
     164    12951492 :         MT_sema_up(&q->s);
     165    12946776 : }
     166             : 
     167             : /*
     168             :  * A priority queue over the hot claims of memory may
     169             :  * be more effective. It prioritizes those instructions
     170             :  * that want to use a big recent result
     171             :  */
     172             : 
     173             : static void
     174         323 : q_requeue(Queue *q, FlowEvent d)
     175             : {
     176         323 :         assert(q);
     177         323 :         assert(d);
     178         323 :         MT_lock_set(&q->l);
     179         323 :         if (q->first == NULL) {
     180           0 :                 assert(q->last == NULL);
     181           0 :                 q->first = q->last = d;
     182           0 :                 d->next = NULL;
     183             :         } else {
     184         323 :                 assert(q->last != NULL);
     185         323 :                 d->next = q->first;
     186         323 :                 q->first = d;
     187             :         }
     188         323 :         MT_lock_unset(&q->l);
     189         323 :         MT_sema_up(&q->s);
     190         323 : }
     191             : 
     192             : static FlowEvent
     193    13333721 : q_dequeue(Queue *q, Client cntxt)
     194             : {
     195    13333721 :         assert(q);
     196    13333721 :         MT_sema_down(&q->s);
     197    13326930 :         if (ATOMIC_GET(&exiting))
     198             :                 return NULL;
     199    13325691 :         MT_lock_set(&q->l);
     200    13315276 :         if (cntxt == NULL && q->exitcount > 0) {
     201      144895 :                 q->exitcount--;
     202      144895 :                 MT_lock_unset(&q->l);
     203      144895 :                 return NULL;
     204             :         }
     205             : 
     206    13170381 :         FlowEvent *dp = &q->first;
     207    13170381 :         FlowEvent pd = NULL;
     208             :         /* if cntxt == NULL, return the first event, if cntxt != NULL, find
     209             :          * the first event in the queue with matching cntxt value and return
     210             :          * that */
     211    13170381 :         if (cntxt != NULL) {
     212    10610399 :                 while (*dp && (*dp)->flow->cntxt != cntxt) {
     213     8827186 :                         pd = *dp;
     214     8827186 :                         dp = &pd->next;
     215             :                 }
     216             :         }
     217    13170381 :         FlowEvent d = *dp;
     218    13170381 :         if (d) {
     219    12925993 :                 *dp = d->next;
     220    12925993 :                 d->next = NULL;
     221    12925993 :                 if (*dp == NULL)
     222     4418253 :                         q->last = pd;
     223             :         }
     224    13170381 :         MT_lock_unset(&q->l);
     225    13159460 :         return d;
     226             : }
     227             : 
     228             : /*
     229             :  * We simply move an instruction into the front of the queue.
     230             :  * Beware, we assume that variables are assigned a value once, otherwise
     231             :  * the order may really create errors.
     232             :  * The order of the instructions should be retained as long as possible.
     233             :  * Delay processing when we run out of memory.  Push the instruction back
     234             :  * on the end of queue, waiting for another attempt. Problem might become
     235             :  * that all threads but one are cycling through the queue, each time
     236             :  * finding an eligible instruction, but without enough space.
     237             :  * Therefore, we wait for a few milliseconds as an initial punishment.
     238             :  *
     239             :  * The process could be refined by checking for cheap operations,
     240             :  * i.e. those that would require no memory at all (aggr.count)
     241             :  * This, however, would lead to a dependency to the upper layers,
     242             :  * because in the kernel we don't know what routines are available
     243             :  * with this property. Nor do we maintain such properties.
     244             :  */
     245             : 
     246             : static void
     247        7761 : DFLOWworker(void *T)
     248             : {
     249        7761 :         struct worker *t = (struct worker *) T;
     250        7761 :         bool locked = false;
     251             : #ifdef _MSC_VER
     252             :         srand((unsigned int) GDKusec());
     253             : #endif
     254        7761 :         GDKsetbuf(t->errbuf);                /* where to leave errors */
     255        7761 :         snprintf(t->s.name, sizeof(t->s.name), "DFLOWsema%04zu", MT_getpid());
     256             : 
     257      145869 :         for (;;) {
     258      145869 :                 DataFlow flow;
     259      145869 :                 FlowEvent fe = 0, fnxt = 0;
     260      145869 :                 str error = 0;
     261      145869 :                 int i;
     262      145869 :                 lng claim;
     263      145869 :                 Client cntxt;
     264      145869 :                 InstrPtr p;
     265             : 
     266      145869 :                 GDKclrerr();
     267             : 
     268      145869 :                 if (t->flag == WAITING) {
     269             :                         /* wait until we are allowed to start working */
     270      144894 :                         MT_sema_down(&t->s);
     271      144893 :                         t->flag = RUNNING;
     272      144893 :                         if (ATOMIC_GET(&exiting)) {
     273             :                                 break;
     274             :                         }
     275             :                 }
     276      145868 :                 assert(t->flag == RUNNING);
     277      145868 :                 cntxt = ATOMIC_PTR_GET(&t->cntxt);
     278     8447761 :                 while (1) {
     279     8447761 :                         MT_thread_set_qry_ctx(NULL);
     280     8440237 :                         if (fnxt == 0) {
     281     5284400 :                                 MT_thread_setworking("waiting for work");
     282     5286143 :                                 cntxt = ATOMIC_PTR_GET(&t->cntxt);
     283     5286143 :                                 fe = q_dequeue(todo, cntxt);
     284     5287650 :                                 if (fe == NULL) {
     285      390519 :                                         if (cntxt) {
     286             :                                                 /* we're not done yet with work for the current
     287             :                                                  * client (as far as we know), so give up the CPU
     288             :                                                  * and let the scheduler enter some more work, but
     289             :                                                  * first compensate for the down we did in
     290             :                                                  * dequeue */
     291      244654 :                                                 MT_sema_up(&todo->s);
     292      244649 :                                                 MT_sleep_ms(1);
     293     8692407 :                                                 continue;
     294             :                                         }
     295             :                                         /* no more work to be done: exit */
     296      145865 :                                         break;
     297             :                                 }
     298     4897131 :                                 if (fe->flow->cntxt && fe->flow->cntxt->mythread)
     299     4896282 :                                         MT_thread_setworking(fe->flow->cntxt->mythread);
     300             :                         } else
     301             :                                 fe = fnxt;
     302     8052656 :                         if (ATOMIC_GET(&exiting)) {
     303             :                                 break;
     304             :                         }
     305     8052656 :                         fnxt = 0;
     306     8052656 :                         assert(fe);
     307     8052656 :                         flow = fe->flow;
     308     8052656 :                         assert(flow);
     309     8052656 :                         MT_thread_set_qry_ctx(flow->set_qry_ctx ? &flow->cntxt->
     310             :                                                                   qryctx : NULL);
     311             : 
     312             :                         /* whenever we have a (concurrent) error, skip it */
     313     8054049 :                         if (ATOMIC_PTR_GET(&flow->error)) {
     314        4084 :                                 q_enqueue(flow->done, fe);
     315        4087 :                                 continue;
     316             :                         }
     317             : 
     318     8049965 :                         p = getInstrPtr(flow->mb, fe->pc);
     319     8049965 :                         claim = fe->argclaim;
     320    16104530 :                         if (p->fcn != (MALfcn) deblockdataflow &&    /* never block on deblockdataflow() */
     321     8049965 :                                 !MALadmission_claim(flow->cntxt, flow->mb, flow->stk, p, claim)) {
     322         323 :                                 fe->hotclaim = 0;    /* don't assume priority anymore */
     323         323 :                                 fe->maxclaim = 0;
     324         323 :                                 MT_lock_set(&todo->l);
     325         323 :                                 FlowEvent last = todo->last;
     326         323 :                                 MT_lock_unset(&todo->l);
     327         323 :                                 if (last == NULL)
     328           0 :                                         MT_sleep_ms(DELAYUNIT);
     329         323 :                                 q_requeue(todo, fe);
     330         323 :                                 continue;
     331             :                         }
     332     8054242 :                         ATOMIC_BASE_TYPE wrks = ATOMIC_INC(&flow->cntxt->workers);
     333     8054242 :                         ATOMIC_BASE_TYPE mwrks = ATOMIC_GET(&flow->mb->workers);
     334     8054250 :                         while (wrks > mwrks) {
     335       15246 :                                 if (ATOMIC_CAS(&flow->mb->workers, &mwrks, wrks))
     336             :                                         break;
     337             :                         }
     338     8054244 :                         error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1,
     339             :                                                                    flow->stk, 0, 0);
     340     8051396 :                         ATOMIC_DEC(&flow->cntxt->workers);
     341             :                         /* release the memory claim */
     342     8051396 :                         MALadmission_release(flow->cntxt, flow->mb, flow->stk, p, claim);
     343             : 
     344     8052357 :                         MT_lock_set(&flow->flowlock);
     345     8054532 :                         fe->state = DFLOWwrapup;
     346     8054532 :                         MT_lock_unset(&flow->flowlock);
     347     8054174 :                         if (error) {
     348         504 :                                 void *null = NULL;
     349             :                                 /* only collect one error (from one thread, needed for stable testing) */
     350         504 :                                 if (!ATOMIC_PTR_CAS(&flow->error, &null, error))
     351          28 :                                         freeException(error);
     352             :                                 /* after an error we skip the rest of the block */
     353         504 :                                 q_enqueue(flow->done, fe);
     354         504 :                                 continue;
     355             :                         }
     356             : 
     357             :                         /* see if you can find an eligible instruction that uses the
     358             :                          * result just produced. Then we can continue with it right away.
     359             :                          * We are just looking forward for the last block, which means we
     360             :                          * are safe from concurrent actions. No other thread can steal it,
     361             :                          * because we hold the logical lock.
     362             :                          * All eligible instructions are queued
     363             :                          */
     364     8053670 :                         p = getInstrPtr(flow->mb, fe->pc);
     365     8053670 :                         assert(p);
     366     8053670 :                         fe->hotclaim = 0;
     367     8053670 :                         fe->maxclaim = 0;
     368             : 
     369    16759537 :                         for (i = 0; i < p->retc; i++) {
     370     8708148 :                                 lng footprint;
     371     8708148 :                                 footprint = getMemoryClaim(flow->mb, flow->stk, p, i, FALSE);
     372     8705867 :                                 fe->hotclaim += footprint;
     373     8705867 :                                 if (footprint > fe->maxclaim)
     374     3767717 :                                         fe->maxclaim = footprint;
     375             :                         }
     376             : 
     377             : /* Try to get rid of the hot potato or locate an alternative to proceed.
     378             :  */
     379             : #define HOTPOTATO
     380             : #ifdef HOTPOTATO
     381             :                         /* HOT potato choice */
     382     8051389 :                         int last = 0, nxt = -1;
     383     8051389 :                         lng nxtclaim = -1;
     384             : 
     385     8051389 :                         MT_lock_set(&flow->flowlock);
     386     8054155 :                         for (last = fe->pc - flow->start;
     387    30564223 :                                  last >= 0 && (i = flow->nodes[last]) > 0;
     388    22510068 :                                  last = flow->edges[last]) {
     389    22510068 :                                 if (flow->status[i].state == DFLOWpending
     390    22509779 :                                         && flow->status[i].blocks == 1) {
     391             :                                         /* find the one with the largest footprint */
     392     6164670 :                                         if (nxt == -1 || flow->status[i].argclaim > nxtclaim) {
     393     3443815 :                                                 nxt = i;
     394     3443815 :                                                 nxtclaim = flow->status[i].argclaim;
     395             :                                         }
     396             :                                 }
     397             :                         }
     398             :                         /* hot potato can not be removed, use alternative to proceed */
     399     8054155 :                         if (nxt >= 0) {
     400     3162199 :                                 flow->status[nxt].state = DFLOWrunning;
     401     3162199 :                                 flow->status[nxt].blocks = 0;
     402     3162199 :                                 flow->status[nxt].hotclaim = fe->hotclaim;
     403     3162199 :                                 flow->status[nxt].argclaim += fe->hotclaim;
     404     3162199 :                                 if (flow->status[nxt].maxclaim < fe->maxclaim)
     405     1599108 :                                         flow->status[nxt].maxclaim = fe->maxclaim;
     406             :                                 fnxt = flow->status + nxt;
     407             :                         }
     408     8054155 :                         MT_lock_unset(&flow->flowlock);
     409             : #endif
     410             : 
     411     8053819 :                         q_enqueue(flow->done, fe);
     412     8052333 :                         if (fnxt == 0 && profilerStatus) {
     413           0 :                                 profilerHeartbeatEvent("wait");
     414             :                         }
     415             :                 }
     416      145865 :                 MT_lock_set(&dataflowLock);
     417      145868 :                 if (GDKexiting() || ATOMIC_GET(&exiting) || free_count >= free_max) {
     418             :                         locked = true;
     419             :                         break;
     420             :                 }
     421      138706 :                 free_count++;
     422      138706 :                 struct worker **tp = &workers;
     423      547227 :                 while (*tp && *tp != t)
     424      408521 :                         tp = &(*tp)->next;
     425      138706 :                 assert(*tp && *tp == t);
     426      138706 :                 *tp = t->next;
     427      138706 :                 t->flag = FREE;
     428      138706 :                 t->next = free_workers;
     429      138706 :                 free_workers = t;
     430      138706 :                 MT_lock_unset(&dataflowLock);
     431      138706 :                 MT_thread_setworking("idle, waiting for new client");
     432      138706 :                 MT_sema_down(&t->s);
     433      138705 :                 if (GDKexiting() || ATOMIC_GET(&exiting))
     434             :                         break;
     435      138109 :                 assert(t->flag == WAITING);
     436             :         }
     437         596 :         if (!locked)
     438         596 :                 MT_lock_set(&dataflowLock);
     439        7758 :         if (t->flag != FINISHING) {
     440        7138 :                 struct worker **tp = t->flag == FREE ? &free_workers : &workers;
     441       37859 :                 while (*tp && *tp != t)
     442       30721 :                         tp = &(*tp)->next;
     443        7138 :                 assert(*tp && *tp == t);
     444        7138 :                 *tp = t->next;
     445        7138 :                 t->flag = EXITED;
     446        7138 :                 t->next = exited_workers;
     447        7138 :                 exited_workers = t;
     448             :         }
     449        7758 :         MT_lock_unset(&dataflowLock);
     450        7758 :         GDKsetbuf(NULL);
     451        7758 : }
     452             : 
     453             : /*
     454             :  * Create an interpreter pool.
     455             :  * One worker will adaptively be available for each client.
     456             :  * The remainder are taken from the GDKnr_threads argument and
     457             :  * typically is equal to the number of cores
     458             :  * The workers are assembled in a local table to enable debugging.
     459             :  */
     460             : static int
     461         326 : DFLOWinitialize(void)
     462             : {
     463         326 :         int limit;
     464         326 :         int created = 0;
     465             : 
     466         326 :         MT_lock_set(&mal_contextLock);
     467         326 :         MT_lock_set(&dataflowLock);
     468         326 :         if (todo) {
     469             :                 /* somebody else beat us to it */
     470           0 :                 MT_lock_unset(&dataflowLock);
     471           0 :                 MT_lock_unset(&mal_contextLock);
     472           0 :                 return 0;
     473             :         }
     474         326 :         free_max = GDKgetenv_int("dataflow_max_free",
     475             :                                                          GDKnr_threads < 4 ? 4 : GDKnr_threads);
     476         326 :         todo = q_create("todo");
     477         326 :         if (todo == NULL) {
     478           0 :                 MT_lock_unset(&dataflowLock);
     479           0 :                 MT_lock_unset(&mal_contextLock);
     480           0 :                 return -1;
     481             :         }
     482         326 :         limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
     483        1302 :         while (limit > 0) {
     484         976 :                 limit--;
     485         976 :                 struct worker *t = GDKmalloc(sizeof(*t));
     486         976 :                 if (t == NULL) {
     487           0 :                         TRC_CRITICAL(MAL_SERVER, "cannot allocate structure for worker");
     488           0 :                         continue;
     489             :                 }
     490         976 :                 *t = (struct worker) {
     491             :                         .flag = RUNNING,
     492             :                         .cntxt = ATOMIC_PTR_VAR_INIT(NULL),
     493             :                 };
     494         976 :                 MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
     495         976 :                 if (MT_create_thread(&t->id, DFLOWworker, t,
     496             :                                                          MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
     497           0 :                         ATOMIC_PTR_DESTROY(&t->cntxt);
     498           0 :                         MT_sema_destroy(&t->s);
     499           0 :                         GDKfree(t);
     500             :                 } else {
     501         976 :                         t->next = workers;
     502         976 :                         workers = t;
     503         976 :                         created++;
     504             :                 }
     505             :         }
     506         326 :         if (created == 0) {
     507             :                 /* no threads created */
     508           0 :                 q_destroy(todo);
     509           0 :                 todo = NULL;
     510           0 :                 MT_lock_unset(&dataflowLock);
     511           0 :                 MT_lock_unset(&mal_contextLock);
     512           0 :                 return -1;
     513             :         }
     514         326 :         MT_lock_unset(&dataflowLock);
     515         326 :         MT_lock_unset(&mal_contextLock);
     516         326 :         return 0;
     517             : }
     518             : 
     519             : /*
     520             :  * The dataflow administration is based on administration of
     521             :  * how many variables are still missing before it can be executed.
     522             :  * For each instruction we keep a list of instructions whose
     523             :  * blocking counter should be decremented upon finishing it.
     524             :  */
     525             : static str
     526      144895 : DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size)
     527             : {
     528      144895 :         int pc, i, j, k, l, n, etop = 0;
     529      144895 :         int *assign;
     530      144895 :         InstrPtr p;
     531             : 
     532      144895 :         if (flow == NULL)
     533           0 :                 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL");
     534      144895 :         if (mb == NULL)
     535           0 :                 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL");
     536      144895 :         assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
     537      144895 :         if (assign == NULL)
     538           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     539      144895 :         etop = flow->stop - flow->start;
     540     8203957 :         for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
     541     8059062 :                 p = getInstrPtr(mb, pc);
     542     8059062 :                 if (p == NULL) {
     543           0 :                         GDKfree(assign);
     544           0 :                         throw(MAL, "dataflow",
     545             :                                   "DFLOWinitBlk(): getInstrPtr() returned NULL");
     546             :                 }
     547             : 
     548             :                 /* initial state, ie everything can run */
     549     8059062 :                 flow->status[n].flow = flow;
     550     8059062 :                 flow->status[n].pc = pc;
     551     8059062 :                 flow->status[n].state = DFLOWpending;
     552     8059062 :                 flow->status[n].cost = -1;
     553     8059062 :                 ATOMIC_PTR_SET(&flow->status[n].flow->error, NULL);
     554             : 
     555             :                 /* administer flow dependencies */
     556    37867218 :                 for (j = p->retc; j < p->argc; j++) {
     557             :                         /* list of instructions that wake n-th instruction up */
     558    29808156 :                         if (!isVarConstant(mb, getArg(p, j)) && (k = assign[getArg(p, j)])) {
     559    13796675 :                                 assert(k < pc);      /* only dependencies on earlier instructions */
     560             :                                 /* add edge to the target instruction for wakeup call */
     561    13796675 :                                 k -= flow->start;
     562    13796675 :                                 if (flow->nodes[k]) {
     563             :                                         /* add wakeup to tail of list */
     564   101094933 :                                         for (i = k; flow->edges[i] > 0; i = flow->edges[i])
     565             :                                                 ;
     566    12067403 :                                         flow->nodes[etop] = n;
     567    12067403 :                                         flow->edges[etop] = -1;
     568    12067403 :                                         flow->edges[i] = etop;
     569    12067403 :                                         etop++;
     570    12067403 :                                         (void) size;
     571    12067403 :                                         if (etop == size) {
     572         220 :                                                 int *tmp;
     573             :                                                 /* in case of realloc failure, the original
     574             :                                                  * pointers will be freed by the caller */
     575         220 :                                                 tmp = (int *) GDKrealloc(flow->nodes,
     576             :                                                                                                  sizeof(int) * 2 * size);
     577         220 :                                                 if (tmp == NULL) {
     578           0 :                                                         GDKfree(assign);
     579           0 :                                                         throw(MAL, "dataflow",
     580             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     581             :                                                 }
     582         220 :                                                 flow->nodes = tmp;
     583         220 :                                                 tmp = (int *) GDKrealloc(flow->edges,
     584             :                                                                                                  sizeof(int) * 2 * size);
     585         220 :                                                 if (tmp == NULL) {
     586           0 :                                                         GDKfree(assign);
     587           0 :                                                         throw(MAL, "dataflow",
     588             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     589             :                                                 }
     590         220 :                                                 flow->edges = tmp;
     591         220 :                                                 size *= 2;
     592             :                                         }
     593             :                                 } else {
     594     1729272 :                                         flow->nodes[k] = n;
     595     1729272 :                                         flow->edges[k] = -1;
     596             :                                 }
     597             : 
     598    13796675 :                                 flow->status[n].blocks++;
     599             :                         }
     600             : 
     601             :                         /* list of instructions to be woken up explicitly */
     602    29808156 :                         if (!isVarConstant(mb, getArg(p, j))) {
     603             :                                 /* be careful, watch out for garbage collection interference */
     604             :                                 /* those should be scheduled after all its other uses */
     605    15227129 :                                 l = getEndScope(mb, getArg(p, j));
     606    15227129 :                                 if (l != pc && l < flow->stop && l > flow->start) {
     607             :                                         /* add edge to the target instruction for wakeup call */
     608     8725254 :                                         assert(pc < l);      /* only dependencies on earlier instructions */
     609     8725254 :                                         l -= flow->start;
     610     8725254 :                                         if (flow->nodes[n]) {
     611             :                                                 /* add wakeup to tail of list */
     612    39576796 :                                                 for (i = n; flow->edges[i] > 0; i = flow->edges[i])
     613             :                                                         ;
     614     4290326 :                                                 flow->nodes[etop] = l;
     615     4290326 :                                                 flow->edges[etop] = -1;
     616     4290326 :                                                 flow->edges[i] = etop;
     617     4290326 :                                                 etop++;
     618     4290326 :                                                 if (etop == size) {
     619         117 :                                                         int *tmp;
     620             :                                                         /* in case of realloc failure, the original
     621             :                                                          * pointers will be freed by the caller */
     622         117 :                                                         tmp = (int *) GDKrealloc(flow->nodes,
     623             :                                                                                                          sizeof(int) * 2 * size);
     624         117 :                                                         if (tmp == NULL) {
     625           0 :                                                                 GDKfree(assign);
     626           0 :                                                                 throw(MAL, "dataflow",
     627             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     628             :                                                         }
     629         117 :                                                         flow->nodes = tmp;
     630         117 :                                                         tmp = (int *) GDKrealloc(flow->edges,
     631             :                                                                                                          sizeof(int) * 2 * size);
     632         117 :                                                         if (tmp == NULL) {
     633           0 :                                                                 GDKfree(assign);
     634           0 :                                                                 throw(MAL, "dataflow",
     635             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     636             :                                                         }
     637         117 :                                                         flow->edges = tmp;
     638         117 :                                                         size *= 2;
     639             :                                                 }
     640             :                                         } else {
     641     4434928 :                                                 flow->nodes[n] = l;
     642     4434928 :                                                 flow->edges[n] = -1;
     643             :                                         }
     644     8725254 :                                         flow->status[l].blocks++;
     645             :                                 }
     646             :                         }
     647             :                 }
     648             : 
     649    16772432 :                 for (j = 0; j < p->retc; j++)
     650     8713370 :                         assign[getArg(p, j)] = pc;      /* ensure recognition of dependency on first instruction and constant */
     651             :         }
     652      144895 :         GDKfree(assign);
     653             : 
     654      144895 :         return MAL_SUCCEED;
     655             : }
     656             : 
     657             : /*
     658             :  * Parallel processing is mostly driven by dataflow, but within this context
     659             :  * there may be different schemes to take instructions into execution.
     660             :  * The admission scheme (and wrapup) are the necessary scheduler hooks.
     661             :  * A scheduler registers the functions needed and should release them
     662             :  * at the end of the parallel block.
     663             :  * They take effect after we have ensured that the basic properties for
     664             :  * execution hold.
     665             :  */
     666             : static str
     667      144895 : DFLOWscheduler(DataFlow flow, struct worker *w)
     668             : {
     669      144895 :         int last;
     670      144895 :         int i;
     671      144895 :         int j;
     672      144895 :         InstrPtr p;
     673      144895 :         int tasks = 0, actions = 0;
     674      144895 :         str ret = MAL_SUCCEED;
     675      144895 :         FlowEvent fe, f = 0;
     676             : 
     677      144895 :         if (flow == NULL)
     678           0 :                 throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL");
     679      144895 :         actions = flow->stop - flow->start;
     680      144895 :         if (actions == 0)
     681           0 :                 throw(MAL, "dataflow", "Empty dataflow block");
     682             :         /* initialize the eligible statements */
     683      144895 :         fe = flow->status;
     684             : 
     685      144895 :         ATOMIC_DEC(&flow->cntxt->workers);
     686      144895 :         MT_lock_set(&flow->flowlock);
     687     8348920 :         for (i = 0; i < actions; i++)
     688     8059130 :                 if (fe[i].blocks == 0) {
     689      878223 :                         p = getInstrPtr(flow->mb, fe[i].pc);
     690      878223 :                         if (p == NULL) {
     691           0 :                                 MT_lock_unset(&flow->flowlock);
     692           0 :                                 ATOMIC_INC(&flow->cntxt->workers);
     693           0 :                                 throw(MAL, "dataflow",
     694             :                                           "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc) returned NULL");
     695             :                         }
     696      878223 :                         fe[i].argclaim = 0;
     697     5098785 :                         for (j = p->retc; j < p->argc; j++)
     698     4220562 :                                 fe[i].argclaim += getMemoryClaim(fe[0].flow->mb,
     699     4220565 :                                                                                                  fe[0].flow->stk, p, j, FALSE);
     700      878220 :                         flow->status[i].state = DFLOWrunning;
     701      878220 :                         q_enqueue(todo, flow->status + i);
     702             :                 }
     703      144895 :         MT_lock_unset(&flow->flowlock);
     704      144895 :         MT_sema_up(&w->s);
     705             : 
     706      144895 :         while (actions != tasks) {
     707     8058621 :                 f = q_dequeue(flow->done, NULL);
     708     8058703 :                 if (ATOMIC_GET(&exiting))
     709             :                         break;
     710     8058703 :                 if (f == NULL) {
     711           0 :                         ATOMIC_INC(&flow->cntxt->workers);
     712           0 :                         throw(MAL, "dataflow",
     713             :                                   "DFLOWscheduler(): q_dequeue(flow->done) returned NULL");
     714             :                 }
     715             : 
     716             :                 /*
     717             :                  * When an instruction is finished we have to reduce the blocked
     718             :                  * counter for all dependent instructions.  for those where it
     719             :                  * drops to zero we can scheduler it we do it here instead of the scheduler
     720             :                  */
     721             : 
     722     8058703 :                 MT_lock_set(&flow->flowlock);
     723     8058383 :                 tasks++;
     724     8058383 :                 for (last = f->pc - flow->start;
     725    30578596 :                          last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last])
     726    22520020 :                         if (flow->status[i].state == DFLOWpending) {
     727    19358701 :                                 flow->status[i].argclaim += f->hotclaim;
     728    19358701 :                                 if (flow->status[i].blocks == 1) {
     729     4018599 :                                         flow->status[i].blocks--;
     730     4018599 :                                         flow->status[i].state = DFLOWrunning;
     731     4018599 :                                         q_enqueue(todo, flow->status + i);
     732             :                                 } else {
     733    15340102 :                                         flow->status[i].blocks--;
     734             :                                 }
     735             :                         }
     736     8203470 :                 MT_lock_unset(&flow->flowlock);
     737             :         }
     738             :         /* release the worker from its specific task (turn it into a
     739             :          * generic worker) */
     740      144894 :         ATOMIC_PTR_SET(&w->cntxt, NULL);
     741      144894 :         ATOMIC_INC(&flow->cntxt->workers);
     742             :         /* wrap up errors */
     743      144894 :         assert(flow->done->last == 0);
     744      144894 :         if ((ret = ATOMIC_PTR_XCG(&flow->error, NULL)) != NULL) {
     745         476 :                 TRC_DEBUG(MAL_SERVER, "Errors encountered: %s\n", ret);
     746             :         }
     747             :         return ret;
     748             : }
     749             : 
     750             : /* called and returns with dataflowLock locked, temporarily unlocks
     751             :  * join the thread associated with the worker and destroy the structure */
     752             : static inline void
     753        7758 : finish_worker(struct worker *t)
     754             : {
     755        7758 :         t->flag = FINISHING;
     756        7758 :         MT_lock_unset(&dataflowLock);
     757        7758 :         MT_join_thread(t->id);
     758        7758 :         MT_sema_destroy(&t->s);
     759        7758 :         ATOMIC_PTR_DESTROY(&t->cntxt);
     760        7758 :         GDKfree(t);
     761        7758 :         MT_lock_set(&dataflowLock);
     762        7758 : }
     763             : 
     764             : /* We create a pool of GDKnr_threads-1 generic workers, that is,
     765             :  * workers that will take on jobs from any clients.  In addition, we
     766             :  * create a single specific worker per client (i.e. each time we enter
     767             :  * here).  This specific worker will only do work for the client for
     768             :  * which it was started.  In this way we can guarantee that there will
     769             :  * always be progress for the client, even if all other workers are
     770             :  * doing something big.
     771             :  *
     772             :  * When all jobs for a client have been done (there are no more
     773             :  * entries for the client in the queue), the specific worker turns
     774             :  * itself into a generic worker.  At the same time, we signal that one
     775             :  * generic worker should exit and this function returns.  In this way
     776             :  * we make sure that there are once again GDKnr_threads-1 generic
     777             :  * workers. */
     778             : str
     779      144895 : runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc,
     780             :                            MalStkPtr stk)
     781             : {
     782      144895 :         DataFlow flow = NULL;
     783      144895 :         str msg = MAL_SUCCEED;
     784      144895 :         int size;
     785      144895 :         bit *ret;
     786      144895 :         struct worker *t;
     787             : 
     788      144895 :         if (stk == NULL)
     789           0 :                 throw(MAL, "dataflow", "runMALdataflow(): Called with stk == NULL");
     790      144895 :         ret = getArgReference_bit(stk, getInstrPtr(mb, startpc), 0);
     791      144895 :         *ret = FALSE;
     792             : 
     793      144895 :         assert(stoppc > startpc);
     794             : 
     795             :         /* check existence of workers */
     796      144895 :         if (todo == NULL) {
     797             :                 /* create thread pool */
     798         326 :                 if (GDKnr_threads <= 1 || DFLOWinitialize() < 0) {
     799             :                         /* no threads created, run serially */
     800           0 :                         *ret = TRUE;
     801           0 :                         return MAL_SUCCEED;
     802             :                 }
     803             :         }
     804      144895 :         assert(todo);
     805             :         /* in addition, create one more worker that will only execute
     806             :          * tasks for the current client to compensate for our waiting
     807             :          * until all work is done */
     808      144895 :         MT_lock_set(&dataflowLock);
     809             :         /* join with already exited threads */
     810      151084 :         while (exited_workers != NULL) {
     811        6189 :                 assert(exited_workers->flag == EXITED);
     812        6189 :                 struct worker *t = exited_workers;
     813        6189 :                 exited_workers = exited_workers->next;
     814        6189 :                 finish_worker(t);
     815             :         }
     816      144895 :         assert(cntxt != NULL);
     817      144895 :         if (free_workers != NULL) {
     818      138109 :                 t = free_workers;
     819      138109 :                 assert(t->flag == FREE);
     820      138109 :                 assert(free_count > 0);
     821      138109 :                 free_count--;
     822      138109 :                 free_workers = t->next;
     823      138109 :                 t->next = workers;
     824      138109 :                 workers = t;
     825      138109 :                 t->flag = WAITING;
     826      138109 :                 ATOMIC_PTR_SET(&t->cntxt, cntxt);
     827      138109 :                 MT_sema_up(&t->s);
     828             :         } else {
     829        6786 :                 t = GDKmalloc(sizeof(*t));
     830        6786 :                 if (t != NULL) {
     831        6786 :                         *t = (struct worker) {
     832             :                                 .flag = WAITING,
     833             :                                 .cntxt = ATOMIC_PTR_VAR_INIT(cntxt),
     834             :                         };
     835        6786 :                         MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
     836        6786 :                         if (MT_create_thread(&t->id, DFLOWworker, t,
     837             :                                                                  MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
     838           0 :                                 ATOMIC_PTR_DESTROY(&t->cntxt);
     839           0 :                                 MT_sema_destroy(&t->s);
     840           0 :                                 GDKfree(t);
     841           0 :                                 t = NULL;
     842             :                         } else {
     843        6786 :                                 t->next = workers;
     844        6786 :                                 workers = t;
     845             :                         }
     846             :                 }
     847        6786 :                 if (t == NULL) {
     848             :                         /* cannot start new thread, run serially */
     849           0 :                         *ret = TRUE;
     850           0 :                         MT_lock_unset(&dataflowLock);
     851           0 :                         return MAL_SUCCEED;
     852             :                 }
     853             :         }
     854      144895 :         MT_lock_unset(&dataflowLock);
     855             : 
     856      144895 :         flow = (DataFlow) GDKzalloc(sizeof(DataFlowRec));
     857      144895 :         if (flow == NULL)
     858           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     859             : 
     860      144895 :         size = DFLOWgraphSize(mb, startpc, stoppc);
     861      144895 :         size += stoppc - startpc;
     862             : 
     863      289790 :         *flow = (DataFlowRec) {
     864             :                 .cntxt = cntxt,
     865             :                 .mb = mb,
     866             :                 .stk = stk,
     867      144895 :                 .set_qry_ctx = MT_thread_get_qry_ctx() != NULL,
     868             :                 /* keep real block count, exclude brackets */
     869      144895 :                 .start = startpc + 1,
     870             :                 .stop = stoppc,
     871      144895 :                 .done = q_create("flow->done"),
     872      144895 :                 .status = (FlowEvent) GDKzalloc((stoppc - startpc + 1) *
     873             :                                                                                 sizeof(FlowEventRec)),
     874             :                 .error = ATOMIC_PTR_VAR_INIT(NULL),
     875      144895 :                 .nodes = (int *) GDKzalloc(sizeof(int) * size),
     876      144894 :                 .edges = (int *) GDKzalloc(sizeof(int) * size),
     877             :         };
     878             : 
     879      144895 :         if (flow->done == NULL) {
     880           0 :                 GDKfree(flow->status);
     881           0 :                 GDKfree(flow->nodes);
     882           0 :                 GDKfree(flow->edges);
     883           0 :                 GDKfree(flow);
     884           0 :                 throw(MAL, "dataflow",
     885             :                           "runMALdataflow(): Failed to create flow->done queue");
     886             :         }
     887             : 
     888      144895 :         if (flow->status == NULL || flow->nodes == NULL || flow->edges == NULL) {
     889           0 :                 q_destroy(flow->done);
     890           0 :                 GDKfree(flow->status);
     891           0 :                 GDKfree(flow->nodes);
     892           0 :                 GDKfree(flow->edges);
     893           0 :                 GDKfree(flow);
     894           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     895             :         }
     896             : 
     897      144895 :         MT_lock_init(&flow->flowlock, "flow->flowlock");
     898      144892 :         msg = DFLOWinitBlk(flow, mb, size);
     899             : 
     900      144896 :         if (msg == MAL_SUCCEED)
     901      144895 :                 msg = DFLOWscheduler(flow, t);
     902             : 
     903      144895 :         GDKfree(flow->status);
     904      144895 :         GDKfree(flow->edges);
     905      144895 :         GDKfree(flow->nodes);
     906      144895 :         q_destroy(flow->done);
     907      144895 :         MT_lock_destroy(&flow->flowlock);
     908      144895 :         ATOMIC_PTR_DESTROY(&flow->error);
     909      144895 :         GDKfree(flow);
     910             : 
     911             :         /* we created one worker, now tell one worker to exit again */
     912      144895 :         MT_lock_set(&todo->l);
     913      144895 :         todo->exitcount++;
     914      144895 :         MT_lock_unset(&todo->l);
     915      144895 :         MT_sema_up(&todo->s);
     916             : 
     917      144895 :         return msg;
     918             : }
     919             : 
     920             : str
     921           0 : deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     922             : {
     923           0 :         int *ret = getArgReference_int(stk, pci, 0);
     924           0 :         int *val = getArgReference_int(stk, pci, 1);
     925           0 :         (void) cntxt;
     926           0 :         (void) mb;
     927           0 :         *ret = *val;
     928           0 :         return MAL_SUCCEED;
     929             : }
     930             : 
     931             : static void
     932         325 : stopMALdataflow(void)
     933             : {
     934         325 :         ATOMIC_SET(&exiting, 1);
     935         325 :         if (todo) {
     936         325 :                 MT_lock_set(&dataflowLock);
     937             :                 /* first wake up all running threads */
     938         325 :                 int n = 0;
     939         921 :                 for (struct worker *t = free_workers; t; t = t->next)
     940         596 :                         n++;
     941        1298 :                 for (struct worker *t = workers; t; t = t->next)
     942         973 :                         n++;
     943        1894 :                 while (n-- > 0) {
     944             :                         /* one UP for each thread we know about */
     945        1894 :                         MT_sema_up(&todo->s);
     946             :                 }
     947         921 :                 while (free_workers) {
     948         596 :                         struct worker *t = free_workers;
     949         596 :                         assert(free_count > 0);
     950         596 :                         free_count--;
     951         596 :                         free_workers = free_workers->next;
     952         596 :                         MT_sema_up(&t->s);
     953         596 :                         finish_worker(t);
     954             :                 }
     955         349 :                 while (workers) {
     956          24 :                         struct worker *t = workers;
     957          24 :                         workers = workers->next;
     958          24 :                         finish_worker(t);
     959             :                 }
     960        1274 :                 while (exited_workers) {
     961         949 :                         struct worker *t = exited_workers;
     962         949 :                         exited_workers = exited_workers->next;
     963         949 :                         finish_worker(t);
     964             :                 }
     965         325 :                 MT_lock_unset(&dataflowLock);
     966             :         }
     967         325 : }
     968             : 
     969             : void
     970         325 : mal_dataflow_reset(void)
     971             : {
     972         325 :         stopMALdataflow();
     973         325 :         workers = exited_workers = NULL;
     974         325 :         if (todo) {
     975         325 :                 MT_lock_destroy(&todo->l);
     976         325 :                 MT_sema_destroy(&todo->s);
     977         325 :                 GDKfree(todo);
     978             :         }
     979         325 :         todo = 0;                                       /* pending instructions */
     980         325 :         ATOMIC_SET(&exiting, 0);
     981         325 : }

Generated by: LCOV version 1.14