LCOV - code coverage report
Current view: top level - monetdb5/mal - mal_dataflow.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 430 519 82.9 %
Date: 2024-12-19 23:10:26 Functions: 12 14 85.7 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /*
      14             :  * (author) M Kersten, S Mullender
      15             :  * Dataflow processing only works on a code
      16             :  * sequence that does not include additional (implicit) flow of control
      17             :  * statements and, ideally, consist of expensive BAT operations.
      18             :  * The dataflow portion is identified as a guarded block,
      19             :  * whose entry is controlled by the function language.dataflow();
      20             :  *
      21             :  * The dataflow worker tries to follow the sequence of actions
      22             :  * as laid out in the plan, but abandon this track when it hits
      23             :  * a blocking operator, or an instruction for which not all arguments
      24             :  * are available or resources become scarce.
      25             :  *
      26             :  * The flow graphs is organized such that parallel threads can
      27             :  * access it mostly without expensive locking and dependent
      28             :  * variables are easy to find..
      29             :  */
      30             : #include "monetdb_config.h"
      31             : #include "mal_dataflow.h"
      32             : #include "mal_exception.h"
      33             : #include "mal_private.h"
      34             : #include "mal_internal.h"
      35             : #include "mal_runtime.h"
      36             : #include "mal_resource.h"
      37             : #include "mal_function.h"
      38             : 
      39             : #define DFLOWpending 0                  /* runnable */
      40             : #define DFLOWrunning 1                  /* currently in progress */
      41             : #define DFLOWwrapup  2                  /* done! */
      42             : #define DFLOWretry   3                  /* reschedule */
      43             : #define DFLOWskipped 4                  /* due to errors */
      44             : 
      45             : /* The per instruction status of execution */
      46             : typedef struct FLOWEVENT {
      47             :         struct DATAFLOW *flow;          /* execution context */
      48             :         int pc;                                         /* pc in underlying malblock */
      49             :         int blocks;                                     /* awaiting for variables */
      50             :         sht state;                                      /* of execution */
      51             :         lng clk;
      52             :         sht cost;
      53             :         lng hotclaim;                           /* memory foot print of result variables */
      54             :         lng argclaim;                           /* memory foot print of arguments */
      55             :         lng maxclaim;                           /* memory foot print of largest argument, could be used to indicate result size */
      56             :         struct FLOWEVENT *next;         /* linked list for queues */
      57             : } *FlowEvent, FlowEventRec;
      58             : 
      59             : typedef struct queue {
      60             :         int exitcount;                          /* how many threads should exit */
      61             :         FlowEvent first, last;          /* first and last element of the queue */
      62             :         MT_Lock l;                                      /* it's a shared resource, ie we need locks */
      63             :         MT_Sema s;                                      /* threads wait on empty queues */
      64             : } Queue;
      65             : 
      66             : /*
      67             :  * The dataflow dependency is administered in a graph list structure.
      68             :  * For each instruction we keep the list of instructions that
      69             :  * should be checked for eligibility once we are finished with it.
      70             :  */
      71             : typedef struct DATAFLOW {
      72             :         Client cntxt;                           /* for debugging and client resolution */
      73             :         MalBlkPtr mb;                           /* carry the context */
      74             :         MalStkPtr stk;
      75             :         int start, stop;                        /* guarded block under consideration */
      76             :         FlowEvent status;                       /* status of each instruction */
      77             :         ATOMIC_PTR_TYPE error;          /* error encountered */
      78             :         int *nodes;                                     /* dependency graph nodes */
      79             :         int *edges;                                     /* dependency graph */
      80             :         MT_Lock flowlock;                       /* lock to protect the above */
      81             :         Queue *done;                            /* instructions handled */
      82             :         bool set_qry_ctx;
      83             : } *DataFlow, DataFlowRec;
      84             : 
      85             : struct worker {
      86             :         MT_Id id;
      87             :         enum { WAITING, RUNNING, FREE, EXITED, FINISHING } flag;
      88             :         ATOMIC_PTR_TYPE cntxt;          /* client we do work for (NULL -> any) */
      89             :         MT_Sema s;
      90             :         struct worker *next;
      91             :         char errbuf[GDKMAXERRLEN];      /* GDKerrbuf so that we can allocate before fork */
      92             : };
      93             : /* heads of three mutually exclusive linked lists, all using the .next
      94             :  * field in the worker struct */
      95             : static struct worker *workers;            /* "working" workers */
      96             : static struct worker *exited_workers; /* to be joined threads (.flag==EXITED) */
      97             : static struct worker *free_workers;     /* free workers (.flag==FREE) */
      98             : static int free_count = 0;              /* number of free threads */
      99             : static int free_max = 0;                /* max number of spare free threads */
     100             : 
     101             : static Queue *todo = 0;                 /* pending instructions */
     102             : 
     103             : static ATOMIC_TYPE exiting = ATOMIC_VAR_INIT(0);
     104             : static MT_Lock dataflowLock = MT_LOCK_INITIALIZER(dataflowLock);
     105             : 
     106             : /*
     107             :  * Calculate the size of the dataflow dependency graph.
     108             :  */
     109             : static int
     110             : DFLOWgraphSize(MalBlkPtr mb, int start, int stop)
     111             : {
     112             :         int cnt = 0;
     113             : 
     114    14270637 :         for (int i = start; i < stop; i++)
     115    14113503 :                 cnt += getInstrPtr(mb, i)->argc;
     116      157134 :         return cnt;
     117             : }
     118             : 
     119             : /*
     120             :  * The dataflow execution is confined to a barrier block.
     121             :  * Within the block there are multiple flows, which, in principle,
     122             :  * can be executed in parallel.
     123             :  */
     124             : 
     125             : static Queue *
     126      157485 : q_create(const char *name)
     127             : {
     128      157485 :         Queue *q = GDKzalloc(sizeof(Queue));
     129             : 
     130      157485 :         if (q == NULL)
     131             :                 return NULL;
     132      157485 :         MT_lock_init(&q->l, name);
     133      157485 :         MT_sema_init(&q->s, 0, name);
     134      157485 :         return q;
     135             : }
     136             : 
     137             : static void
     138      157133 : q_destroy(Queue *q)
     139             : {
     140      157133 :         assert(q);
     141      157133 :         MT_lock_destroy(&q->l);
     142      157134 :         MT_sema_destroy(&q->s);
     143      157126 :         GDKfree(q);
     144      157132 : }
     145             : 
     146             : /* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue is possible */
     147             : /* we might actually sort it for better scheduling behavior */
     148             : static void
     149    22013058 : q_enqueue(Queue *q, FlowEvent d)
     150             : {
     151    22013058 :         assert(q);
     152    22013058 :         assert(d);
     153    22013058 :         MT_lock_set(&q->l);
     154    22040053 :         if (q->first == NULL) {
     155     5343037 :                 assert(q->last == NULL);
     156     5343037 :                 q->first = q->last = d;
     157             :         } else {
     158    16697016 :                 assert(q->last != NULL);
     159    16697016 :                 q->last->next = d;
     160    16697016 :                 q->last = d;
     161             :         }
     162    22040053 :         d->next = NULL;
     163    22040053 :         MT_lock_unset(&q->l);
     164    22069102 :         MT_sema_up(&q->s);
     165    22025507 : }
     166             : 
     167             : /*
     168             :  * A priority queue over the hot claims of memory may
     169             :  * be more effective. It prioritizes those instructions
     170             :  * that want to use a big recent result
     171             :  */
     172             : 
     173             : static void
     174           0 : q_requeue(Queue *q, FlowEvent d)
     175             : {
     176           0 :         assert(q);
     177           0 :         assert(d);
     178           0 :         MT_lock_set(&q->l);
     179           0 :         if (q->first == NULL) {
     180           0 :                 assert(q->last == NULL);
     181           0 :                 q->first = q->last = d;
     182           0 :                 d->next = NULL;
     183             :         } else {
     184           0 :                 assert(q->last != NULL);
     185           0 :                 d->next = q->first;
     186           0 :                 q->first = d;
     187             :         }
     188           0 :         MT_lock_unset(&q->l);
     189           0 :         MT_sema_up(&q->s);
     190           0 : }
     191             : 
     192             : static FlowEvent
     193    22262856 : q_dequeue(Queue *q, Client cntxt)
     194             : {
     195    22262856 :         assert(q);
     196    22262856 :         MT_sema_down(&q->s);
     197    22324362 :         if (ATOMIC_GET(&exiting))
     198             :                 return NULL;
     199    22321747 :         MT_lock_set(&q->l);
     200    22305821 :         if (cntxt == NULL && q->exitcount > 0) {
     201      157134 :                 q->exitcount--;
     202      157134 :                 MT_lock_unset(&q->l);
     203      157134 :                 return NULL;
     204             :         }
     205             : 
     206    22148687 :         FlowEvent *dp = &q->first;
     207    22148687 :         FlowEvent pd = NULL;
     208             :         /* if cntxt == NULL, return the first event, if cntxt != NULL, find
     209             :          * the first event in the queue with matching cntxt value and return
     210             :          * that */
     211    22148687 :         if (cntxt != NULL) {
     212    29639914 :                 while (*dp && (*dp)->flow->cntxt != cntxt) {
     213    28020824 :                         pd = *dp;
     214    28020824 :                         dp = &pd->next;
     215             :                 }
     216             :         }
     217    22148687 :         FlowEvent d = *dp;
     218    22148687 :         if (d) {
     219    21988189 :                 *dp = d->next;
     220    21988189 :                 d->next = NULL;
     221    21988189 :                 if (*dp == NULL)
     222     5363108 :                         q->last = pd;
     223             :         }
     224    22148687 :         MT_lock_unset(&q->l);
     225    22122855 :         return d;
     226             : }
     227             : 
     228             : /*
     229             :  * We simply move an instruction into the front of the queue.
     230             :  * Beware, we assume that variables are assigned a value once, otherwise
     231             :  * the order may really create errors.
     232             :  * The order of the instructions should be retained as long as possible.
     233             :  * Delay processing when we run out of memory.  Push the instruction back
     234             :  * on the end of queue, waiting for another attempt. Problem might become
     235             :  * that all threads but one are cycling through the queue, each time
     236             :  * finding an eligible instruction, but without enough space.
     237             :  * Therefore, we wait for a few milliseconds as an initial punishment.
     238             :  *
     239             :  * The process could be refined by checking for cheap operations,
     240             :  * i.e. those that would require no memory at all (aggr.count)
     241             :  * This, however, would lead to a dependency to the upper layers,
     242             :  * because in the kernel we don't know what routines are available
     243             :  * with this property. Nor do we maintain such properties.
     244             :  */
     245             : 
     246             : static void
     247        5003 : DFLOWworker(void *T)
     248             : {
     249        5003 :         struct worker *t = (struct worker *) T;
     250        5003 :         bool locked = false;
     251             : #ifdef _MSC_VER
     252             :         srand((unsigned int) GDKusec());
     253             : #endif
     254        5003 :         GDKsetbuf(t->errbuf);                /* where to leave errors */
     255        5010 :         snprintf(t->s.name, sizeof(t->s.name), "DFLOWsema%04zu", MT_getpid());
     256             : 
     257      159564 :         for (;;) {
     258      159564 :                 DataFlow flow;
     259      159564 :                 FlowEvent fe = 0, fnxt = 0;
     260      159564 :                 str error = 0;
     261      159564 :                 int i;
     262      159564 :                 lng claim;
     263      159564 :                 Client cntxt;
     264      159564 :                 InstrPtr p;
     265             : 
     266      159564 :                 GDKclrerr();
     267             : 
     268      159524 :                 if (t->flag == WAITING) {
     269             :                         /* wait until we are allowed to start working */
     270      157127 :                         MT_sema_down(&t->s);
     271      157128 :                         t->flag = RUNNING;
     272      157128 :                         if (ATOMIC_GET(&exiting)) {
     273             :                                 break;
     274             :                         }
     275             :                 }
     276      159525 :                 assert(t->flag == RUNNING);
     277      159525 :                 cntxt = ATOMIC_PTR_GET(&t->cntxt);
     278    14231118 :                 while (1) {
     279    14231118 :                         MT_thread_set_qry_ctx(NULL);
     280    14198988 :                         if (fnxt == 0) {
     281     8419385 :                                 MT_thread_setworking("waiting for work");
     282     8418647 :                                 cntxt = ATOMIC_PTR_GET(&t->cntxt);
     283     8418647 :                                 fe = q_dequeue(todo, cntxt);
     284     8439217 :                                 if (fe == NULL) {
     285      319940 :                                         if (cntxt) {
     286             :                                                 /* we're not done yet with work for the current
     287             :                                                  * client (as far as we know), so give up the CPU
     288             :                                                  * and let the scheduler enter some more work, but
     289             :                                                  * first compensate for the down we did in
     290             :                                                  * dequeue */
     291      160875 :                                                 MT_sema_up(&todo->s);
     292      160869 :                                                 MT_sleep_ms(1);
     293    14391812 :                                                 continue;
     294             :                                         }
     295             :                                         /* no more work to be done: exit */
     296      159065 :                                         break;
     297             :                                 }
     298     8119277 :                                 if (fe->flow->cntxt && fe->flow->cntxt->mythread)
     299     8118446 :                                         MT_thread_setworking(fe->flow->cntxt->mythread);
     300             :                         } else
     301             :                                 fe = fnxt;
     302    13898282 :                         if (ATOMIC_GET(&exiting)) {
     303             :                                 break;
     304             :                         }
     305    13898282 :                         fnxt = 0;
     306    13898282 :                         assert(fe);
     307    13898282 :                         flow = fe->flow;
     308    13898282 :                         assert(flow);
     309    13898282 :                         MT_thread_set_qry_ctx(flow->set_qry_ctx ? &flow->cntxt->
     310             :                                                                   qryctx : NULL);
     311             : 
     312             :                         /* whenever we have a (concurrent) error, skip it */
     313    13918785 :                         if (ATOMIC_PTR_GET(&flow->error)) {
     314        4155 :                                 q_enqueue(flow->done, fe);
     315        4151 :                                 continue;
     316             :                         }
     317             : 
     318    13914630 :                         p = getInstrPtr(flow->mb, fe->pc);
     319    13914630 :                         claim = fe->argclaim;
     320    27857738 :                         if (p->fcn != (MALfcn) deblockdataflow &&    /* never block on deblockdataflow() */
     321    13914630 :                                 !MALadmission_claim(flow->cntxt, flow->mb, flow->stk, p, claim)) {
     322           0 :                                 fe->hotclaim = 0;    /* don't assume priority anymore */
     323           0 :                                 fe->maxclaim = 0;
     324           0 :                                 MT_lock_set(&todo->l);
     325           0 :                                 FlowEvent last = todo->last;
     326           0 :                                 MT_lock_unset(&todo->l);
     327           0 :                                 if (last == NULL)
     328           0 :                                         MT_sleep_ms(DELAYUNIT);
     329           0 :                                 q_requeue(todo, fe);
     330           0 :                                 continue;
     331             :                         }
     332    13943108 :                         ATOMIC_BASE_TYPE wrks = ATOMIC_INC(&flow->cntxt->workers);
     333    13943108 :                         ATOMIC_BASE_TYPE mwrks = ATOMIC_GET(&flow->mb->workers);
     334    13943518 :                         while (wrks > mwrks) {
     335       38609 :                                 if (ATOMIC_CAS(&flow->mb->workers, &mwrks, wrks))
     336             :                                         break;
     337             :                         }
     338    13943147 :                         error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1,
     339             :                                                                    flow->stk, 0, 0);
     340    13784117 :                         ATOMIC_DEC(&flow->cntxt->workers);
     341             :                         /* release the memory claim */
     342    13784117 :                         MALadmission_release(flow->cntxt, flow->mb, flow->stk, p, claim);
     343             : 
     344    13863845 :                         MT_lock_set(&flow->flowlock);
     345    13948824 :                         fe->state = DFLOWwrapup;
     346    13948824 :                         MT_lock_unset(&flow->flowlock);
     347    13931827 :                         if (error) {
     348         531 :                                 void *null = NULL;
     349             :                                 /* only collect one error (from one thread, needed for stable testing) */
     350         531 :                                 if (!ATOMIC_PTR_CAS(&flow->error, &null, error))
     351          55 :                                         freeException(error);
     352             :                                 /* after an error we skip the rest of the block */
     353         531 :                                 q_enqueue(flow->done, fe);
     354         531 :                                 continue;
     355             :                         }
     356             : 
     357             :                         /* see if you can find an eligible instruction that uses the
     358             :                          * result just produced. Then we can continue with it right away.
     359             :                          * We are just looking forward for the last block, which means we
     360             :                          * are safe from concurrent actions. No other thread can steal it,
     361             :                          * because we hold the logical lock.
     362             :                          * All eligible instructions are queued
     363             :                          */
     364    13931296 :                         p = getInstrPtr(flow->mb, fe->pc);
     365    13931296 :                         assert(p);
     366    13931296 :                         fe->hotclaim = 0;
     367    13931296 :                         fe->maxclaim = 0;
     368             : 
     369    29008662 :                         for (i = 0; i < p->retc; i++) {
     370    15085228 :                                 lng footprint;
     371    15085228 :                                 footprint = getMemoryClaim(flow->mb, flow->stk, p, i, FALSE);
     372    15077366 :                                 fe->hotclaim += footprint;
     373    15077366 :                                 if (footprint > fe->maxclaim)
     374     5438169 :                                         fe->maxclaim = footprint;
     375             :                         }
     376             : 
     377             : /* Try to get rid of the hot potato or locate an alternative to proceed.
     378             :  */
     379             : #define HOTPOTATO
     380             : #ifdef HOTPOTATO
     381             :                         /* HOT potato choice */
     382    13923434 :                         int last = 0, nxt = -1;
     383    13923434 :                         lng nxtclaim = -1;
     384             : 
     385    13923434 :                         MT_lock_set(&flow->flowlock);
     386    13950181 :                         for (last = fe->pc - flow->start;
     387    55943869 :                                  last >= 0 && (i = flow->nodes[last]) > 0;
     388    41993688 :                                  last = flow->edges[last]) {
     389    41993688 :                                 if (flow->status[i].state == DFLOWpending
     390    41987842 :                                         && flow->status[i].blocks == 1) {
     391             :                                         /* find the one with the largest footprint */
     392    11393551 :                                         if (nxt == -1 || flow->status[i].argclaim > nxtclaim) {
     393     6306275 :                                                 nxt = i;
     394     6306275 :                                                 nxtclaim = flow->status[i].argclaim;
     395             :                                         }
     396             :                                 }
     397             :                         }
     398             :                         /* hot potato can not be removed, use alternative to proceed */
     399    13950181 :                         if (nxt >= 0) {
     400     5835214 :                                 flow->status[nxt].state = DFLOWrunning;
     401     5835214 :                                 flow->status[nxt].blocks = 0;
     402     5835214 :                                 flow->status[nxt].hotclaim = fe->hotclaim;
     403     5835214 :                                 flow->status[nxt].argclaim += fe->hotclaim;
     404     5835214 :                                 if (flow->status[nxt].maxclaim < fe->maxclaim)
     405     2265997 :                                         flow->status[nxt].maxclaim = fe->maxclaim;
     406             :                                 fnxt = flow->status + nxt;
     407             :                         }
     408    13950181 :                         MT_lock_unset(&flow->flowlock);
     409             : #endif
     410             : 
     411    13932543 :                         q_enqueue(flow->done, fe);
     412    13906217 :                         if (fnxt == 0 && profilerStatus) {
     413           0 :                                 profilerHeartbeatEvent("wait");
     414             :                         }
     415             :                 }
     416      159065 :                 MT_lock_set(&dataflowLock);
     417      159578 :                 if (GDKexiting() || ATOMIC_GET(&exiting) || free_count >= free_max) {
     418             :                         locked = true;
     419             :                         break;
     420             :                 }
     421      155193 :                 free_count++;
     422      155193 :                 struct worker **tp = &workers;
     423      948170 :                 while (*tp && *tp != t)
     424      792977 :                         tp = &(*tp)->next;
     425      155193 :                 assert(*tp && *tp == t);
     426      155193 :                 *tp = t->next;
     427      155193 :                 t->flag = FREE;
     428      155193 :                 t->next = free_workers;
     429      155193 :                 free_workers = t;
     430      155193 :                 MT_lock_unset(&dataflowLock);
     431      155193 :                 MT_thread_setworking("idle, waiting for new client");
     432      155193 :                 MT_sema_down(&t->s);
     433      155189 :                 if (GDKexiting() || ATOMIC_GET(&exiting))
     434             :                         break;
     435      154560 :                 assert(t->flag == WAITING);
     436             :         }
     437         624 :         if (!locked)
     438         624 :                 MT_lock_set(&dataflowLock);
     439        5009 :         if (t->flag != FINISHING) {
     440        4347 :                 struct worker **tp = t->flag == FREE ? &free_workers : &workers;
     441       19714 :                 while (*tp && *tp != t)
     442       15367 :                         tp = &(*tp)->next;
     443        4347 :                 assert(*tp && *tp == t);
     444        4347 :                 *tp = t->next;
     445        4347 :                 t->flag = EXITED;
     446        4347 :                 t->next = exited_workers;
     447        4347 :                 exited_workers = t;
     448             :         }
     449        5009 :         MT_lock_unset(&dataflowLock);
     450        5007 :         GDKsetbuf(NULL);
     451        5000 : }
     452             : 
     453             : /*
     454             :  * Create an interpreter pool.
     455             :  * One worker will adaptively be available for each client.
     456             :  * The remainder are taken from the GDKnr_threads argument and
     457             :  * typically is equal to the number of cores
     458             :  * The workers are assembled in a local table to enable debugging.
     459             :  */
     460             : static int
     461         351 : DFLOWinitialize(void)
     462             : {
     463         351 :         int limit;
     464         351 :         int created = 0;
     465             : 
     466         351 :         MT_lock_set(&mal_contextLock);
     467         351 :         MT_lock_set(&dataflowLock);
     468         351 :         if (todo) {
     469             :                 /* somebody else beat us to it */
     470           0 :                 MT_lock_unset(&dataflowLock);
     471           0 :                 MT_lock_unset(&mal_contextLock);
     472           0 :                 return 0;
     473             :         }
     474         351 :         free_max = GDKgetenv_int("dataflow_max_free",
     475             :                                                          GDKnr_threads < 4 ? 4 : GDKnr_threads);
     476         351 :         todo = q_create("todo");
     477         351 :         if (todo == NULL) {
     478           0 :                 MT_lock_unset(&dataflowLock);
     479           0 :                 MT_lock_unset(&mal_contextLock);
     480           0 :                 return -1;
     481             :         }
     482         351 :         limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
     483        2802 :         while (limit > 0) {
     484        2451 :                 limit--;
     485        2451 :                 struct worker *t = GDKmalloc(sizeof(*t));
     486        2451 :                 if (t == NULL) {
     487           0 :                         TRC_CRITICAL(MAL_SERVER, "cannot allocate structure for worker");
     488           0 :                         continue;
     489             :                 }
     490        2451 :                 *t = (struct worker) {
     491             :                         .flag = RUNNING,
     492             :                         .cntxt = ATOMIC_PTR_VAR_INIT(NULL),
     493             :                 };
     494        2451 :                 MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
     495        2451 :                 if (MT_create_thread(&t->id, DFLOWworker, t,
     496             :                                                          MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
     497           0 :                         ATOMIC_PTR_DESTROY(&t->cntxt);
     498           0 :                         MT_sema_destroy(&t->s);
     499           0 :                         GDKfree(t);
     500             :                 } else {
     501        2451 :                         t->next = workers;
     502        2451 :                         workers = t;
     503        2451 :                         created++;
     504             :                 }
     505             :         }
     506         351 :         if (created == 0) {
     507             :                 /* no threads created */
     508           0 :                 q_destroy(todo);
     509           0 :                 todo = NULL;
     510           0 :                 MT_lock_unset(&dataflowLock);
     511           0 :                 MT_lock_unset(&mal_contextLock);
     512           0 :                 return -1;
     513             :         }
     514         351 :         MT_lock_unset(&dataflowLock);
     515         351 :         MT_lock_unset(&mal_contextLock);
     516         351 :         return 0;
     517             : }
     518             : 
     519             : /*
     520             :  * The dataflow administration is based on administration of
     521             :  * how many variables are still missing before it can be executed.
     522             :  * For each instruction we keep a list of instructions whose
     523             :  * blocking counter should be decremented upon finishing it.
     524             :  */
     525             : static str
     526      157134 : DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size)
     527             : {
     528      157134 :         int pc, i, j, k, l, n, etop = 0;
     529      157134 :         int *assign;
     530      157134 :         InstrPtr p;
     531             : 
     532      157134 :         if (flow == NULL)
     533           0 :                 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL");
     534      157134 :         if (mb == NULL)
     535           0 :                 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL");
     536      157134 :         assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
     537      157134 :         if (assign == NULL)
     538           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     539      157134 :         etop = flow->stop - flow->start;
     540    14113087 :         for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
     541    13955953 :                 p = getInstrPtr(mb, pc);
     542    13955953 :                 if (p == NULL) {
     543           0 :                         GDKfree(assign);
     544           0 :                         throw(MAL, "dataflow",
     545             :                                   "DFLOWinitBlk(): getInstrPtr() returned NULL");
     546             :                 }
     547             : 
     548             :                 /* initial state, ie everything can run */
     549    13955953 :                 flow->status[n].flow = flow;
     550    13955953 :                 flow->status[n].pc = pc;
     551    13955953 :                 flow->status[n].state = DFLOWpending;
     552    13955953 :                 flow->status[n].cost = -1;
     553    13955953 :                 ATOMIC_PTR_SET(&flow->status[n].flow->error, NULL);
     554             : 
     555             :                 /* administer flow dependencies */
     556    66084679 :                 for (j = p->retc; j < p->argc; j++) {
     557             :                         /* list of instructions that wake n-th instruction up */
     558    52128726 :                         if (!isVarConstant(mb, getArg(p, j)) && (k = assign[getArg(p, j)])) {
     559    25907541 :                                 assert(k < pc);      /* only dependencies on earlier instructions */
     560             :                                 /* add edge to the target instruction for wakeup call */
     561    25907541 :                                 k -= flow->start;
     562    25907541 :                                 if (flow->nodes[k]) {
     563             :                                         /* add wakeup to tail of list */
     564   226241462 :                                         for (i = k; flow->edges[i] > 0; i = flow->edges[i])
     565             :                                                 ;
     566    23299908 :                                         flow->nodes[etop] = n;
     567    23299908 :                                         flow->edges[etop] = -1;
     568    23299908 :                                         flow->edges[i] = etop;
     569    23299908 :                                         etop++;
     570    23299908 :                                         (void) size;
     571    23299908 :                                         if (etop == size) {
     572         212 :                                                 int *tmp;
     573             :                                                 /* in case of realloc failure, the original
     574             :                                                  * pointers will be freed by the caller */
     575         212 :                                                 tmp = (int *) GDKrealloc(flow->nodes,
     576             :                                                                                                  sizeof(int) * 2 * size);
     577         212 :                                                 if (tmp == NULL) {
     578           0 :                                                         GDKfree(assign);
     579           0 :                                                         throw(MAL, "dataflow",
     580             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     581             :                                                 }
     582         212 :                                                 flow->nodes = tmp;
     583         212 :                                                 tmp = (int *) GDKrealloc(flow->edges,
     584             :                                                                                                  sizeof(int) * 2 * size);
     585         212 :                                                 if (tmp == NULL) {
     586           0 :                                                         GDKfree(assign);
     587           0 :                                                         throw(MAL, "dataflow",
     588             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     589             :                                                 }
     590         212 :                                                 flow->edges = tmp;
     591         212 :                                                 size *= 2;
     592             :                                         }
     593             :                                 } else {
     594     2607633 :                                         flow->nodes[k] = n;
     595     2607633 :                                         flow->edges[k] = -1;
     596             :                                 }
     597             : 
     598    25907541 :                                 flow->status[n].blocks++;
     599             :                         }
     600             : 
     601             :                         /* list of instructions to be woken up explicitly */
     602    52128726 :                         if (!isVarConstant(mb, getArg(p, j))) {
     603             :                                 /* be careful, watch out for garbage collection interference */
     604             :                                 /* those should be scheduled after all its other uses */
     605    27704131 :                                 l = getEndScope(mb, getArg(p, j));
     606    27704131 :                                 if (l != pc && l < flow->stop && l > flow->start) {
     607             :                                         /* add edge to the target instruction for wakeup call */
     608    16111980 :                                         assert(pc < l);      /* only dependencies on earlier instructions */
     609    16111980 :                                         l -= flow->start;
     610    16111980 :                                         if (flow->nodes[n]) {
     611             :                                                 /* add wakeup to tail of list */
     612    78223822 :                                                 for (i = n; flow->edges[i] > 0; i = flow->edges[i])
     613             :                                                         ;
     614     7908514 :                                                 flow->nodes[etop] = l;
     615     7908514 :                                                 flow->edges[etop] = -1;
     616     7908514 :                                                 flow->edges[i] = etop;
     617     7908514 :                                                 etop++;
     618     7908514 :                                                 if (etop == size) {
     619         209 :                                                         int *tmp;
     620             :                                                         /* in case of realloc failure, the original
     621             :                                                          * pointers will be freed by the caller */
     622         209 :                                                         tmp = (int *) GDKrealloc(flow->nodes,
     623             :                                                                                                          sizeof(int) * 2 * size);
     624         209 :                                                         if (tmp == NULL) {
     625           0 :                                                                 GDKfree(assign);
     626           0 :                                                                 throw(MAL, "dataflow",
     627             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     628             :                                                         }
     629         209 :                                                         flow->nodes = tmp;
     630         209 :                                                         tmp = (int *) GDKrealloc(flow->edges,
     631             :                                                                                                          sizeof(int) * 2 * size);
     632         209 :                                                         if (tmp == NULL) {
     633           0 :                                                                 GDKfree(assign);
     634           0 :                                                                 throw(MAL, "dataflow",
     635             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     636             :                                                         }
     637         209 :                                                         flow->edges = tmp;
     638         209 :                                                         size *= 2;
     639             :                                                 }
     640             :                                         } else {
     641     8203466 :                                                 flow->nodes[n] = l;
     642     8203466 :                                                 flow->edges[n] = -1;
     643             :                                         }
     644    16111980 :                                         flow->status[l].blocks++;
     645             :                                 }
     646             :                         }
     647             :                 }
     648             : 
     649    29072295 :                 for (j = 0; j < p->retc; j++)
     650    15116342 :                         assign[getArg(p, j)] = pc;      /* ensure recognition of dependency on first instruction and constant */
     651             :         }
     652      157134 :         GDKfree(assign);
     653             : 
     654      157134 :         return MAL_SUCCEED;
     655             : }
     656             : 
     657             : /*
     658             :  * Parallel processing is mostly driven by dataflow, but within this context
     659             :  * there may be different schemes to take instructions into execution.
     660             :  * The admission scheme (and wrapup) are the necessary scheduler hooks.
     661             :  * A scheduler registers the functions needed and should release them
     662             :  * at the end of the parallel block.
     663             :  * They take effect after we have ensured that the basic properties for
     664             :  * execution hold.
     665             :  */
     666             : static str
     667      157134 : DFLOWscheduler(DataFlow flow, struct worker *w)
     668             : {
     669      157134 :         int last;
     670      157134 :         int i;
     671      157134 :         int j;
     672      157134 :         InstrPtr p;
     673      157134 :         int tasks = 0, actions = 0;
     674      157134 :         str ret = MAL_SUCCEED;
     675      157134 :         FlowEvent fe, f = 0;
     676             : 
     677      157134 :         if (flow == NULL)
     678           0 :                 throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL");
     679      157134 :         actions = flow->stop - flow->start;
     680      157134 :         if (actions == 0)
     681           0 :                 throw(MAL, "dataflow", "Empty dataflow block");
     682             :         /* initialize the eligible statements */
     683      157134 :         fe = flow->status;
     684             : 
     685      157134 :         ATOMIC_DEC(&flow->cntxt->workers);
     686      157134 :         MT_lock_set(&flow->flowlock);
     687    14270715 :         for (i = 0; i < actions; i++)
     688    13956447 :                 if (fe[i].blocks == 0) {
     689     1002457 :                         p = getInstrPtr(flow->mb, fe[i].pc);
     690     1002457 :                         if (p == NULL) {
     691           0 :                                 MT_lock_unset(&flow->flowlock);
     692           0 :                                 ATOMIC_INC(&flow->cntxt->workers);
     693           0 :                                 throw(MAL, "dataflow",
     694             :                                           "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc) returned NULL");
     695             :                         }
     696     1002457 :                         fe[i].argclaim = 0;
     697     6026158 :                         for (j = p->retc; j < p->argc; j++)
     698     5023701 :                                 fe[i].argclaim += getMemoryClaim(fe[0].flow->mb,
     699     5023701 :                                                                                                  fe[0].flow->stk, p, j, FALSE);
     700     1002457 :                         flow->status[i].state = DFLOWrunning;
     701     1002457 :                         q_enqueue(todo, flow->status + i);
     702             :                 }
     703      157134 :         MT_lock_unset(&flow->flowlock);
     704      157134 :         MT_sema_up(&w->s);
     705             : 
     706      157134 :         while (actions != tasks) {
     707    13955345 :                 f = q_dequeue(flow->done, NULL);
     708    13955575 :                 if (ATOMIC_GET(&exiting))
     709             :                         break;
     710    13955575 :                 if (f == NULL) {
     711           0 :                         ATOMIC_INC(&flow->cntxt->workers);
     712           0 :                         throw(MAL, "dataflow",
     713             :                                   "DFLOWscheduler(): q_dequeue(flow->done) returned NULL");
     714             :                 }
     715             : 
     716             :                 /*
     717             :                  * When an instruction is finished we have to reduce the blocked
     718             :                  * counter for all dependent instructions.  for those where it
     719             :                  * drops to zero we can scheduler it we do it here instead of the scheduler
     720             :                  */
     721             : 
     722    13955575 :                 MT_lock_set(&flow->flowlock);
     723    13955217 :                 tasks++;
     724    13955217 :                 for (last = f->pc - flow->start;
     725    55965713 :                          last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last])
     726    42010810 :                         if (flow->status[i].state == DFLOWpending) {
     727    36181609 :                                 flow->status[i].argclaim += f->hotclaim;
     728    36181609 :                                 if (flow->status[i].blocks == 1) {
     729     7118139 :                                         flow->status[i].blocks--;
     730     7118139 :                                         flow->status[i].state = DFLOWrunning;
     731     7118139 :                                         q_enqueue(todo, flow->status + i);
     732             :                                 } else {
     733    29063470 :                                         flow->status[i].blocks--;
     734             :                                 }
     735             :                         }
     736    14112034 :                 MT_lock_unset(&flow->flowlock);
     737             :         }
     738             :         /* release the worker from its specific task (turn it into a
     739             :          * generic worker) */
     740      157133 :         ATOMIC_PTR_SET(&w->cntxt, NULL);
     741      157133 :         ATOMIC_INC(&flow->cntxt->workers);
     742             :         /* wrap up errors */
     743      157133 :         assert(flow->done->last == 0);
     744      157133 :         if ((ret = ATOMIC_PTR_XCG(&flow->error, NULL)) != NULL) {
     745         476 :                 TRC_DEBUG(MAL_SERVER, "Errors encountered: %s\n", ret);
     746             :         }
     747             :         return ret;
     748             : }
     749             : 
     750             : /* called and returns with dataflowLock locked, temporarily unlocks
     751             :  * join the thread associated with the worker and destroy the structure */
     752             : static inline void
     753        5009 : finish_worker(struct worker *t)
     754             : {
     755        5009 :         t->flag = FINISHING;
     756        5009 :         MT_lock_unset(&dataflowLock);
     757        5009 :         MT_join_thread(t->id);
     758        5009 :         MT_sema_destroy(&t->s);
     759        5009 :         ATOMIC_PTR_DESTROY(&t->cntxt);
     760        5009 :         GDKfree(t);
     761        5009 :         MT_lock_set(&dataflowLock);
     762        5009 : }
     763             : 
     764             : /* We create a pool of GDKnr_threads-1 generic workers, that is,
     765             :  * workers that will take on jobs from any clients.  In addition, we
     766             :  * create a single specific worker per client (i.e. each time we enter
     767             :  * here).  This specific worker will only do work for the client for
     768             :  * which it was started.  In this way we can guarantee that there will
     769             :  * always be progress for the client, even if all other workers are
     770             :  * doing something big.
     771             :  *
     772             :  * When all jobs for a client have been done (there are no more
     773             :  * entries for the client in the queue), the specific worker turns
     774             :  * itself into a generic worker.  At the same time, we signal that one
     775             :  * generic worker should exit and this function returns.  In this way
     776             :  * we make sure that there are once again GDKnr_threads-1 generic
     777             :  * workers. */
     778             : str
     779      157130 : runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc,
     780             :                            MalStkPtr stk)
     781             : {
     782      157130 :         DataFlow flow = NULL;
     783      157130 :         str msg = MAL_SUCCEED;
     784      157130 :         int size;
     785      157130 :         bit *ret;
     786      157130 :         struct worker *t;
     787             : 
     788      157130 :         if (stk == NULL)
     789           0 :                 throw(MAL, "dataflow", "runMALdataflow(): Called with stk == NULL");
     790      157130 :         ret = getArgReference_bit(stk, getInstrPtr(mb, startpc), 0);
     791      157130 :         *ret = FALSE;
     792             : 
     793      157130 :         assert(stoppc > startpc);
     794             : 
     795             :         /* check existence of workers */
     796      157130 :         if (todo == NULL) {
     797             :                 /* create thread pool */
     798         351 :                 if (GDKnr_threads <= 1 || DFLOWinitialize() < 0) {
     799             :                         /* no threads created, run serially */
     800           0 :                         *ret = TRUE;
     801           0 :                         return MAL_SUCCEED;
     802             :                 }
     803             :         }
     804      157130 :         assert(todo);
     805             :         /* in addition, create one more worker that will only execute
     806             :          * tasks for the current client to compensate for our waiting
     807             :          * until all work is done */
     808      157130 :         MT_lock_set(&dataflowLock);
     809             :         /* join with already exited threads */
     810      159075 :         while (exited_workers != NULL) {
     811        1941 :                 assert(exited_workers->flag == EXITED);
     812        1941 :                 struct worker *t = exited_workers;
     813        1941 :                 exited_workers = exited_workers->next;
     814        1941 :                 finish_worker(t);
     815             :         }
     816      157134 :         assert(cntxt != NULL);
     817      157134 :         if (free_workers != NULL) {
     818      154568 :                 t = free_workers;
     819      154568 :                 assert(t->flag == FREE);
     820      154568 :                 assert(free_count > 0);
     821      154568 :                 free_count--;
     822      154568 :                 free_workers = t->next;
     823      154568 :                 t->next = workers;
     824      154568 :                 workers = t;
     825      154568 :                 t->flag = WAITING;
     826      154568 :                 ATOMIC_PTR_SET(&t->cntxt, cntxt);
     827      154568 :                 MT_sema_up(&t->s);
     828             :         } else {
     829        2566 :                 t = GDKmalloc(sizeof(*t));
     830        2566 :                 if (t != NULL) {
     831        2566 :                         *t = (struct worker) {
     832             :                                 .flag = WAITING,
     833             :                                 .cntxt = ATOMIC_PTR_VAR_INIT(cntxt),
     834             :                         };
     835        2566 :                         MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
     836        2566 :                         if (MT_create_thread(&t->id, DFLOWworker, t,
     837             :                                                                  MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
     838           0 :                                 ATOMIC_PTR_DESTROY(&t->cntxt);
     839           0 :                                 MT_sema_destroy(&t->s);
     840           0 :                                 GDKfree(t);
     841           0 :                                 t = NULL;
     842             :                         } else {
     843        2566 :                                 t->next = workers;
     844        2566 :                                 workers = t;
     845             :                         }
     846             :                 }
     847        2566 :                 if (t == NULL) {
     848             :                         /* cannot start new thread, run serially */
     849           0 :                         *ret = TRUE;
     850           0 :                         MT_lock_unset(&dataflowLock);
     851           0 :                         return MAL_SUCCEED;
     852             :                 }
     853             :         }
     854      157134 :         MT_lock_unset(&dataflowLock);
     855             : 
     856      157134 :         flow = (DataFlow) GDKzalloc(sizeof(DataFlowRec));
     857      157134 :         if (flow == NULL)
     858           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     859             : 
     860      157134 :         size = DFLOWgraphSize(mb, startpc, stoppc);
     861      157134 :         size += stoppc - startpc;
     862             : 
     863      314268 :         *flow = (DataFlowRec) {
     864             :                 .cntxt = cntxt,
     865             :                 .mb = mb,
     866             :                 .stk = stk,
     867      157134 :                 .set_qry_ctx = MT_thread_get_qry_ctx() != NULL,
     868             :                 /* keep real block count, exclude brackets */
     869      157134 :                 .start = startpc + 1,
     870             :                 .stop = stoppc,
     871      157134 :                 .done = q_create("flow->done"),
     872      157134 :                 .status = (FlowEvent) GDKzalloc((stoppc - startpc + 1) *
     873             :                                                                                 sizeof(FlowEventRec)),
     874             :                 .error = ATOMIC_PTR_VAR_INIT(NULL),
     875      157134 :                 .nodes = (int *) GDKzalloc(sizeof(int) * size),
     876      157134 :                 .edges = (int *) GDKzalloc(sizeof(int) * size),
     877             :         };
     878             : 
     879      157134 :         if (flow->done == NULL) {
     880           0 :                 GDKfree(flow->status);
     881           0 :                 GDKfree(flow->nodes);
     882           0 :                 GDKfree(flow->edges);
     883           0 :                 GDKfree(flow);
     884           0 :                 throw(MAL, "dataflow",
     885             :                           "runMALdataflow(): Failed to create flow->done queue");
     886             :         }
     887             : 
     888      157134 :         if (flow->status == NULL || flow->nodes == NULL || flow->edges == NULL) {
     889           0 :                 q_destroy(flow->done);
     890           0 :                 GDKfree(flow->status);
     891           0 :                 GDKfree(flow->nodes);
     892           0 :                 GDKfree(flow->edges);
     893           0 :                 GDKfree(flow);
     894           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     895             :         }
     896             : 
     897      157134 :         MT_lock_init(&flow->flowlock, "flow->flowlock");
     898      157134 :         msg = DFLOWinitBlk(flow, mb, size);
     899             : 
     900      157134 :         if (msg == MAL_SUCCEED)
     901      157134 :                 msg = DFLOWscheduler(flow, t);
     902             : 
     903      157134 :         GDKfree(flow->status);
     904      157134 :         GDKfree(flow->edges);
     905      157134 :         GDKfree(flow->nodes);
     906      157134 :         q_destroy(flow->done);
     907      157132 :         MT_lock_destroy(&flow->flowlock);
     908      157133 :         ATOMIC_PTR_DESTROY(&flow->error);
     909      157133 :         GDKfree(flow);
     910             : 
     911             :         /* we created one worker, now tell one worker to exit again */
     912      157134 :         MT_lock_set(&todo->l);
     913      157134 :         todo->exitcount++;
     914      157134 :         MT_lock_unset(&todo->l);
     915      157134 :         MT_sema_up(&todo->s);
     916             : 
     917      157134 :         return msg;
     918             : }
     919             : 
     920             : str
     921           0 : deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     922             : {
     923           0 :         int *ret = getArgReference_int(stk, pci, 0);
     924           0 :         int *val = getArgReference_int(stk, pci, 1);
     925           0 :         (void) cntxt;
     926           0 :         (void) mb;
     927           0 :         *ret = *val;
     928           0 :         return MAL_SUCCEED;
     929             : }
     930             : 
     931             : static void
     932         350 : stopMALdataflow(void)
     933             : {
     934         350 :         ATOMIC_SET(&exiting, 1);
     935         350 :         if (todo) {
     936         350 :                 MT_lock_set(&dataflowLock);
     937             :                 /* first wake up all running threads */
     938         350 :                 int n = 0;
     939         974 :                 for (struct worker *t = free_workers; t; t = t->next)
     940         624 :                         n++;
     941        2794 :                 for (struct worker *t = workers; t; t = t->next)
     942        2444 :                         n++;
     943        3418 :                 while (n-- > 0) {
     944             :                         /* one UP for each thread we know about */
     945        3418 :                         MT_sema_up(&todo->s);
     946             :                 }
     947         974 :                 while (free_workers) {
     948         624 :                         struct worker *t = free_workers;
     949         624 :                         assert(free_count > 0);
     950         624 :                         free_count--;
     951         624 :                         free_workers = free_workers->next;
     952         624 :                         MT_sema_up(&t->s);
     953         624 :                         finish_worker(t);
     954             :                 }
     955         388 :                 while (workers) {
     956          38 :                         struct worker *t = workers;
     957          38 :                         workers = workers->next;
     958          38 :                         finish_worker(t);
     959             :                 }
     960        2756 :                 while (exited_workers) {
     961        2406 :                         struct worker *t = exited_workers;
     962        2406 :                         exited_workers = exited_workers->next;
     963        2406 :                         finish_worker(t);
     964             :                 }
     965         350 :                 MT_lock_unset(&dataflowLock);
     966             :         }
     967         350 : }
     968             : 
     969             : void
     970         350 : mal_dataflow_reset(void)
     971             : {
     972         350 :         stopMALdataflow();
     973         350 :         workers = exited_workers = NULL;
     974         350 :         if (todo) {
     975         350 :                 MT_lock_destroy(&todo->l);
     976         350 :                 MT_sema_destroy(&todo->s);
     977         350 :                 GDKfree(todo);
     978             :         }
     979         350 :         todo = 0;                                       /* pending instructions */
     980         350 :         ATOMIC_SET(&exiting, 0);
     981         350 : }

Generated by: LCOV version 1.14