LCOV - code coverage report
Current view: top level - monetdb5/mal - mal_dataflow.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 449 519 86.5 %
Date: 2024-10-04 20:04:04 Functions: 13 14 92.9 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /*
      14             :  * (author) M Kersten, S Mullender
      15             :  * Dataflow processing only works on a code
      16             :  * sequence that does not include additional (implicit) flow of control
      17             :  * statements and, ideally, consist of expensive BAT operations.
      18             :  * The dataflow portion is identified as a guarded block,
      19             :  * whose entry is controlled by the function language.dataflow();
      20             :  *
      21             :  * The dataflow worker tries to follow the sequence of actions
      22             :  * as laid out in the plan, but abandon this track when it hits
      23             :  * a blocking operator, or an instruction for which not all arguments
      24             :  * are available or resources become scarce.
      25             :  *
      26             :  * The flow graphs is organized such that parallel threads can
      27             :  * access it mostly without expensive locking and dependent
      28             :  * variables are easy to find..
      29             :  */
      30             : #include "monetdb_config.h"
      31             : #include "mal_dataflow.h"
      32             : #include "mal_exception.h"
      33             : #include "mal_private.h"
      34             : #include "mal_internal.h"
      35             : #include "mal_runtime.h"
      36             : #include "mal_resource.h"
      37             : #include "mal_function.h"
      38             : 
      39             : #define DFLOWpending 0                  /* runnable */
      40             : #define DFLOWrunning 1                  /* currently in progress */
      41             : #define DFLOWwrapup  2                  /* done! */
      42             : #define DFLOWretry   3                  /* reschedule */
      43             : #define DFLOWskipped 4                  /* due to errors */
      44             : 
      45             : /* The per instruction status of execution */
      46             : typedef struct FLOWEVENT {
      47             :         struct DATAFLOW *flow;          /* execution context */
      48             :         int pc;                                         /* pc in underlying malblock */
      49             :         int blocks;                                     /* awaiting for variables */
      50             :         sht state;                                      /* of execution */
      51             :         lng clk;
      52             :         sht cost;
      53             :         lng hotclaim;                           /* memory foot print of result variables */
      54             :         lng argclaim;                           /* memory foot print of arguments */
      55             :         lng maxclaim;                           /* memory foot print of largest argument, could be used to indicate result size */
      56             :         struct FLOWEVENT *next;         /* linked list for queues */
      57             : } *FlowEvent, FlowEventRec;
      58             : 
      59             : typedef struct queue {
      60             :         int exitcount;                          /* how many threads should exit */
      61             :         FlowEvent first, last;          /* first and last element of the queue */
      62             :         MT_Lock l;                                      /* it's a shared resource, ie we need locks */
      63             :         MT_Sema s;                                      /* threads wait on empty queues */
      64             : } Queue;
      65             : 
      66             : /*
      67             :  * The dataflow dependency is administered in a graph list structure.
      68             :  * For each instruction we keep the list of instructions that
      69             :  * should be checked for eligibility once we are finished with it.
      70             :  */
      71             : typedef struct DATAFLOW {
      72             :         Client cntxt;                           /* for debugging and client resolution */
      73             :         MalBlkPtr mb;                           /* carry the context */
      74             :         MalStkPtr stk;
      75             :         int start, stop;                        /* guarded block under consideration */
      76             :         FlowEvent status;                       /* status of each instruction */
      77             :         ATOMIC_PTR_TYPE error;          /* error encountered */
      78             :         int *nodes;                                     /* dependency graph nodes */
      79             :         int *edges;                                     /* dependency graph */
      80             :         MT_Lock flowlock;                       /* lock to protect the above */
      81             :         Queue *done;                            /* instructions handled */
      82             :         bool set_qry_ctx;
      83             : } *DataFlow, DataFlowRec;
      84             : 
      85             : struct worker {
      86             :         MT_Id id;
      87             :         enum { WAITING, RUNNING, FREE, EXITED, FINISHING } flag;
      88             :         ATOMIC_PTR_TYPE cntxt;          /* client we do work for (NULL -> any) */
      89             :         MT_Sema s;
      90             :         struct worker *next;
      91             :         char errbuf[GDKMAXERRLEN];      /* GDKerrbuf so that we can allocate before fork */
      92             : };
      93             : /* heads of three mutually exclusive linked lists, all using the .next
      94             :  * field in the worker struct */
      95             : static struct worker *workers;            /* "working" workers */
      96             : static struct worker *exited_workers; /* to be joined threads (.flag==EXITED) */
      97             : static struct worker *free_workers;     /* free workers (.flag==FREE) */
      98             : static int free_count = 0;              /* number of free threads */
      99             : static int free_max = 0;                /* max number of spare free threads */
     100             : 
     101             : static Queue *todo = 0;                 /* pending instructions */
     102             : 
     103             : static ATOMIC_TYPE exiting = ATOMIC_VAR_INIT(0);
     104             : static MT_Lock dataflowLock = MT_LOCK_INITIALIZER(dataflowLock);
     105             : 
     106             : /*
     107             :  * Calculate the size of the dataflow dependency graph.
     108             :  */
     109             : static int
     110             : DFLOWgraphSize(MalBlkPtr mb, int start, int stop)
     111             : {
     112             :         int cnt = 0;
     113             : 
     114     8482807 :         for (int i = start; i < stop; i++)
     115     8327064 :                 cnt += getInstrPtr(mb, i)->argc;
     116      155743 :         return cnt;
     117             : }
     118             : 
     119             : /*
     120             :  * The dataflow execution is confined to a barrier block.
     121             :  * Within the block there are multiple flows, which, in principle,
     122             :  * can be executed in parallel.
     123             :  */
     124             : 
     125             : static Queue *
     126      156072 : q_create(const char *name)
     127             : {
     128      156072 :         Queue *q = GDKzalloc(sizeof(Queue));
     129             : 
     130      156072 :         if (q == NULL)
     131             :                 return NULL;
     132      156072 :         MT_lock_init(&q->l, name);
     133      156071 :         MT_sema_init(&q->s, 0, name);
     134      156072 :         return q;
     135             : }
     136             : 
     137             : static void
     138      155743 : q_destroy(Queue *q)
     139             : {
     140      155743 :         assert(q);
     141      155743 :         MT_lock_destroy(&q->l);
     142      155743 :         MT_sema_destroy(&q->s);
     143      155743 :         GDKfree(q);
     144      155741 : }
     145             : 
     146             : /* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue is possible */
     147             : /* we might actually sort it for better scheduling behavior */
     148             : static void
     149    13166241 : q_enqueue(Queue *q, FlowEvent d)
     150             : {
     151    13166241 :         assert(q);
     152    13166241 :         assert(d);
     153    13166241 :         MT_lock_set(&q->l);
     154    13163703 :         if (q->first == NULL) {
     155     4501365 :                 assert(q->last == NULL);
     156     4501365 :                 q->first = q->last = d;
     157             :         } else {
     158     8662338 :                 assert(q->last != NULL);
     159     8662338 :                 q->last->next = d;
     160     8662338 :                 q->last = d;
     161             :         }
     162    13163703 :         d->next = NULL;
     163    13163703 :         MT_lock_unset(&q->l);
     164    13165335 :         MT_sema_up(&q->s);
     165    13163134 : }
     166             : 
     167             : /*
     168             :  * A priority queue over the hot claims of memory may
     169             :  * be more effective. It prioritizes those instructions
     170             :  * that want to use a big recent result
     171             :  */
     172             : 
     173             : static void
     174      125503 : q_requeue(Queue *q, FlowEvent d)
     175             : {
     176      125503 :         assert(q);
     177      125503 :         assert(d);
     178      125503 :         MT_lock_set(&q->l);
     179      125503 :         if (q->first == NULL) {
     180           0 :                 assert(q->last == NULL);
     181           0 :                 q->first = q->last = d;
     182           0 :                 d->next = NULL;
     183             :         } else {
     184      125503 :                 assert(q->last != NULL);
     185      125503 :                 d->next = q->first;
     186      125503 :                 q->first = d;
     187             :         }
     188      125503 :         MT_lock_unset(&q->l);
     189      125503 :         MT_sema_up(&q->s);
     190      125503 : }
     191             : 
     192             : static FlowEvent
     193    13677222 : q_dequeue(Queue *q, Client cntxt)
     194             : {
     195    13677222 :         assert(q);
     196    13677231 :         MT_sema_down(&q->s);
     197    13671893 :         if (ATOMIC_GET(&exiting))
     198             :                 return NULL;
     199    13670649 :         MT_lock_set(&q->l);
     200    13649306 :         if (cntxt == NULL && q->exitcount > 0) {
     201      155743 :                 q->exitcount--;
     202      155743 :                 MT_lock_unset(&q->l);
     203      155743 :                 return NULL;
     204             :         }
     205             : 
     206    13493563 :         FlowEvent *dp = &q->first;
     207    13493563 :         FlowEvent pd = NULL;
     208             :         /* if cntxt == NULL, return the first event, if cntxt != NULL, find
     209             :          * the first event in the queue with matching cntxt value and return
     210             :          * that */
     211    13493563 :         if (cntxt != NULL) {
     212    10370945 :                 while (*dp && (*dp)->flow->cntxt != cntxt) {
     213     8430321 :                         pd = *dp;
     214     8430321 :                         dp = &pd->next;
     215             :                 }
     216             :         }
     217    13493563 :         FlowEvent d = *dp;
     218    13493563 :         if (d) {
     219    13254944 :                 *dp = d->next;
     220    13254944 :                 d->next = NULL;
     221    13254944 :                 if (*dp == NULL)
     222     4518925 :                         q->last = pd;
     223             :         }
     224    13493563 :         MT_lock_unset(&q->l);
     225    13488262 :         return d;
     226             : }
     227             : 
     228             : /*
     229             :  * We simply move an instruction into the front of the queue.
     230             :  * Beware, we assume that variables are assigned a value once, otherwise
     231             :  * the order may really create errors.
     232             :  * The order of the instructions should be retained as long as possible.
     233             :  * Delay processing when we run out of memory.  Push the instruction back
     234             :  * on the end of queue, waiting for another attempt. Problem might become
     235             :  * that all threads but one are cycling through the queue, each time
     236             :  * finding an eligible instruction, but without enough space.
     237             :  * Therefore, we wait for a few milliseconds as an initial punishment.
     238             :  *
     239             :  * The process could be refined by checking for cheap operations,
     240             :  * i.e. those that would require no memory at all (aggr.count)
     241             :  * This, however, would lead to a dependency to the upper layers,
     242             :  * because in the kernel we don't know what routines are available
     243             :  * with this property. Nor do we maintain such properties.
     244             :  */
     245             : 
     246             : static void
     247        7722 : DFLOWworker(void *T)
     248             : {
     249        7722 :         struct worker *t = (struct worker *) T;
     250        7722 :         bool locked = false;
     251             : #ifdef _MSC_VER
     252             :         srand((unsigned int) GDKusec());
     253             : #endif
     254        7722 :         GDKsetbuf(t->errbuf);                /* where to leave errors */
     255        7722 :         snprintf(t->s.name, sizeof(t->s.name), "DFLOWsema%04zu", MT_getpid());
     256             : 
     257      156728 :         for (;;) {
     258      156728 :                 DataFlow flow;
     259      156728 :                 FlowEvent fe = 0, fnxt = 0;
     260      156728 :                 str error = 0;
     261      156728 :                 int i;
     262      156728 :                 lng claim;
     263      156728 :                 Client cntxt;
     264      156728 :                 InstrPtr p;
     265             : 
     266      156728 :                 GDKclrerr();
     267             : 
     268      156729 :                 if (t->flag == WAITING) {
     269             :                         /* wait until we are allowed to start working */
     270      155743 :                         MT_sema_down(&t->s);
     271      155741 :                         t->flag = RUNNING;
     272      155741 :                         if (ATOMIC_GET(&exiting)) {
     273             :                                 break;
     274             :                         }
     275             :                 }
     276      156727 :                 assert(t->flag == RUNNING);
     277      156727 :                 cntxt = ATOMIC_PTR_GET(&t->cntxt);
     278     8688491 :                 while (1) {
     279     8688491 :                         MT_thread_set_qry_ctx(NULL);
     280     8682668 :                         if (fnxt == 0) {
     281     5514132 :                                 MT_thread_setworking("waiting for work");
     282     5514820 :                                 cntxt = ATOMIC_PTR_GET(&t->cntxt);
     283     5514820 :                                 fe = q_dequeue(todo, cntxt);
     284     5516791 :                                 if (fe == NULL) {
     285      395605 :                                         if (cntxt) {
     286             :                                                 /* we're not done yet with work for the current
     287             :                                                  * client (as far as we know), so give up the CPU
     288             :                                                  * and let the scheduler enter some more work, but
     289             :                                                  * first compensate for the down we did in
     290             :                                                  * dequeue */
     291      238880 :                                                 MT_sema_up(&todo->s);
     292      238881 :                                                 MT_sleep_ms(1);
     293     8927368 :                                                 continue;
     294             :                                         }
     295             :                                         /* no more work to be done: exit */
     296      156725 :                                         break;
     297             :                                 }
     298     5121186 :                                 if (fe->flow->cntxt && fe->flow->cntxt->mythread)
     299     5120332 :                                         MT_thread_setworking(fe->flow->cntxt->mythread);
     300             :                         } else
     301             :                                 fe = fnxt;
     302     8289304 :                         if (ATOMIC_GET(&exiting)) {
     303             :                                 break;
     304             :                         }
     305     8289304 :                         fnxt = 0;
     306     8289304 :                         assert(fe);
     307     8289304 :                         flow = fe->flow;
     308     8289304 :                         assert(flow);
     309     8289304 :                         MT_thread_set_qry_ctx(flow->set_qry_ctx ? &flow->cntxt->
     310             :                                                                   qryctx : NULL);
     311             : 
     312             :                         /* whenever we have a (concurrent) error, skip it */
     313     8292290 :                         if (ATOMIC_PTR_GET(&flow->error)) {
     314        4095 :                                 q_enqueue(flow->done, fe);
     315        4099 :                                 continue;
     316             :                         }
     317             : 
     318     8288195 :                         p = getInstrPtr(flow->mb, fe->pc);
     319     8288195 :                         claim = fe->argclaim;
     320    16580830 :                         if (p->fcn != (MALfcn) deblockdataflow &&    /* never block on deblockdataflow() */
     321     8288195 :                                 !MALadmission_claim(flow->cntxt, flow->mb, flow->stk, p, claim)) {
     322      125503 :                                 fe->hotclaim = 0;    /* don't assume priority anymore */
     323      125503 :                                 fe->maxclaim = 0;
     324      125503 :                                 MT_lock_set(&todo->l);
     325      125503 :                                 FlowEvent last = todo->last;
     326      125503 :                                 MT_lock_unset(&todo->l);
     327      125503 :                                 if (last == NULL)
     328           0 :                                         MT_sleep_ms(DELAYUNIT);
     329      125503 :                                 q_requeue(todo, fe);
     330      125503 :                                 continue;
     331             :                         }
     332     8167132 :                         ATOMIC_BASE_TYPE wrks = ATOMIC_INC(&flow->cntxt->workers);
     333     8167132 :                         ATOMIC_BASE_TYPE mwrks = ATOMIC_GET(&flow->mb->workers);
     334     8167144 :                         while (wrks > mwrks) {
     335       15340 :                                 if (ATOMIC_CAS(&flow->mb->workers, &mwrks, wrks))
     336             :                                         break;
     337             :                         }
     338     8167139 :                         error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1,
     339             :                                                                    flow->stk, 0, 0);
     340     8164361 :                         ATOMIC_DEC(&flow->cntxt->workers);
     341             :                         /* release the memory claim */
     342     8164361 :                         MALadmission_release(flow->cntxt, flow->mb, flow->stk, p, claim);
     343             : 
     344     8165022 :                         MT_lock_set(&flow->flowlock);
     345     8166747 :                         fe->state = DFLOWwrapup;
     346     8166747 :                         MT_lock_unset(&flow->flowlock);
     347     8166306 :                         if (error) {
     348         502 :                                 void *null = NULL;
     349             :                                 /* only collect one error (from one thread, needed for stable testing) */
     350         502 :                                 if (!ATOMIC_PTR_CAS(&flow->error, &null, error))
     351          26 :                                         freeException(error);
     352             :                                 /* after an error we skip the rest of the block */
     353         502 :                                 q_enqueue(flow->done, fe);
     354         502 :                                 continue;
     355             :                         }
     356             : 
     357             :                         /* see if you can find an eligible instruction that uses the
     358             :                          * result just produced. Then we can continue with it right away.
     359             :                          * We are just looking forward for the last block, which means we
     360             :                          * are safe from concurrent actions. No other thread can steal it,
     361             :                          * because we hold the logical lock.
     362             :                          * All eligible instructions are queued
     363             :                          */
     364     8165804 :                         p = getInstrPtr(flow->mb, fe->pc);
     365     8165804 :                         assert(p);
     366     8165804 :                         fe->hotclaim = 0;
     367     8165804 :                         fe->maxclaim = 0;
     368             : 
     369    16985477 :                         for (i = 0; i < p->retc; i++) {
     370     8820593 :                                 lng footprint;
     371     8820593 :                                 footprint = getMemoryClaim(flow->mb, flow->stk, p, i, FALSE);
     372     8819673 :                                 fe->hotclaim += footprint;
     373     8819673 :                                 if (footprint > fe->maxclaim)
     374     3780116 :                                         fe->maxclaim = footprint;
     375             :                         }
     376             : 
     377             : /* Try to get rid of the hot potato or locate an alternative to proceed.
     378             :  */
     379             : #define HOTPOTATO
     380             : #ifdef HOTPOTATO
     381             :                         /* HOT potato choice */
     382     8164884 :                         int last = 0, nxt = -1;
     383     8164884 :                         lng nxtclaim = -1;
     384             : 
     385     8164884 :                         MT_lock_set(&flow->flowlock);
     386     8166305 :                         for (last = fe->pc - flow->start;
     387    30971047 :                                  last >= 0 && (i = flow->nodes[last]) > 0;
     388    22804742 :                                  last = flow->edges[last]) {
     389    22804742 :                                 if (flow->status[i].state == DFLOWpending
     390    22803915 :                                         && flow->status[i].blocks == 1) {
     391             :                                         /* find the one with the largest footprint */
     392     6187278 :                                         if (nxt == -1 || flow->status[i].argclaim > nxtclaim) {
     393     3457217 :                                                 nxt = i;
     394     3457217 :                                                 nxtclaim = flow->status[i].argclaim;
     395             :                                         }
     396             :                                 }
     397             :                         }
     398             :                         /* hot potato can not be removed, use alternative to proceed */
     399     8166305 :                         if (nxt >= 0) {
     400     3175604 :                                 flow->status[nxt].state = DFLOWrunning;
     401     3175604 :                                 flow->status[nxt].blocks = 0;
     402     3175604 :                                 flow->status[nxt].hotclaim = fe->hotclaim;
     403     3175604 :                                 flow->status[nxt].argclaim += fe->hotclaim;
     404     3175604 :                                 if (flow->status[nxt].maxclaim < fe->maxclaim)
     405     1609801 :                                         flow->status[nxt].maxclaim = fe->maxclaim;
     406             :                                 fnxt = flow->status + nxt;
     407             :                         }
     408     8166305 :                         MT_lock_unset(&flow->flowlock);
     409             : #endif
     410             : 
     411     8166128 :                         q_enqueue(flow->done, fe);
     412     8162783 :                         if (fnxt == 0 && profilerStatus) {
     413           0 :                                 profilerHeartbeatEvent("wait");
     414             :                         }
     415             :                 }
     416      156725 :                 MT_lock_set(&dataflowLock);
     417      156725 :                 if (GDKexiting() || ATOMIC_GET(&exiting) || free_count >= free_max) {
     418             :                         locked = true;
     419             :                         break;
     420             :                 }
     421      149606 :                 free_count++;
     422      149606 :                 struct worker **tp = &workers;
     423      575466 :                 while (*tp && *tp != t)
     424      425860 :                         tp = &(*tp)->next;
     425      149606 :                 assert(*tp && *tp == t);
     426      149606 :                 *tp = t->next;
     427      149606 :                 t->flag = FREE;
     428      149606 :                 t->next = free_workers;
     429      149606 :                 free_workers = t;
     430      149606 :                 MT_lock_unset(&dataflowLock);
     431      149606 :                 MT_thread_setworking("idle, waiting for new client");
     432      149606 :                 MT_sema_down(&t->s);
     433      149603 :                 if (GDKexiting() || ATOMIC_GET(&exiting))
     434             :                         break;
     435      149006 :                 assert(t->flag == WAITING);
     436             :         }
     437         599 :         if (!locked)
     438         599 :                 MT_lock_set(&dataflowLock);
     439        7718 :         if (t->flag != FINISHING) {
     440        7100 :                 struct worker **tp = t->flag == FREE ? &free_workers : &workers;
     441       37041 :                 while (*tp && *tp != t)
     442       29941 :                         tp = &(*tp)->next;
     443        7100 :                 assert(*tp && *tp == t);
     444        7100 :                 *tp = t->next;
     445        7100 :                 t->flag = EXITED;
     446        7100 :                 t->next = exited_workers;
     447        7100 :                 exited_workers = t;
     448             :         }
     449        7718 :         MT_lock_unset(&dataflowLock);
     450        7718 :         GDKsetbuf(NULL);
     451        7718 : }
     452             : 
     453             : /*
     454             :  * Create an interpreter pool.
     455             :  * One worker will adaptively be available for each client.
     456             :  * The remainder are taken from the GDKnr_threads argument and
     457             :  * typically is equal to the number of cores
     458             :  * The workers are assembled in a local table to enable debugging.
     459             :  */
     460             : static int
     461         329 : DFLOWinitialize(void)
     462             : {
     463         329 :         int limit;
     464         329 :         int created = 0;
     465             : 
     466         329 :         MT_lock_set(&mal_contextLock);
     467         329 :         MT_lock_set(&dataflowLock);
     468         329 :         if (todo) {
     469             :                 /* somebody else beat us to it */
     470           0 :                 MT_lock_unset(&dataflowLock);
     471           0 :                 MT_lock_unset(&mal_contextLock);
     472           0 :                 return 0;
     473             :         }
     474         329 :         free_max = GDKgetenv_int("dataflow_max_free",
     475             :                                                          GDKnr_threads < 4 ? 4 : GDKnr_threads);
     476         329 :         todo = q_create("todo");
     477         329 :         if (todo == NULL) {
     478           0 :                 MT_lock_unset(&dataflowLock);
     479           0 :                 MT_lock_unset(&mal_contextLock);
     480           0 :                 return -1;
     481             :         }
     482         329 :         limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
     483        1314 :         while (limit > 0) {
     484         985 :                 limit--;
     485         985 :                 struct worker *t = GDKmalloc(sizeof(*t));
     486         985 :                 if (t == NULL) {
     487           0 :                         TRC_CRITICAL(MAL_SERVER, "cannot allocate structure for worker");
     488           0 :                         continue;
     489             :                 }
     490         985 :                 *t = (struct worker) {
     491             :                         .flag = RUNNING,
     492             :                         .cntxt = ATOMIC_PTR_VAR_INIT(NULL),
     493             :                 };
     494         985 :                 MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
     495         985 :                 if (MT_create_thread(&t->id, DFLOWworker, t,
     496             :                                                          MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
     497           0 :                         ATOMIC_PTR_DESTROY(&t->cntxt);
     498           0 :                         MT_sema_destroy(&t->s);
     499           0 :                         GDKfree(t);
     500             :                 } else {
     501         985 :                         t->next = workers;
     502         985 :                         workers = t;
     503         985 :                         created++;
     504             :                 }
     505             :         }
     506         329 :         if (created == 0) {
     507             :                 /* no threads created */
     508           0 :                 q_destroy(todo);
     509           0 :                 todo = NULL;
     510           0 :                 MT_lock_unset(&dataflowLock);
     511           0 :                 MT_lock_unset(&mal_contextLock);
     512           0 :                 return -1;
     513             :         }
     514         329 :         MT_lock_unset(&dataflowLock);
     515         329 :         MT_lock_unset(&mal_contextLock);
     516         329 :         return 0;
     517             : }
     518             : 
     519             : /*
     520             :  * The dataflow administration is based on administration of
     521             :  * how many variables are still missing before it can be executed.
     522             :  * For each instruction we keep a list of instructions whose
     523             :  * blocking counter should be decremented upon finishing it.
     524             :  */
     525             : static str
     526      155743 : DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size)
     527             : {
     528      155743 :         int pc, i, j, k, l, n, etop = 0;
     529      155743 :         int *assign;
     530      155743 :         InstrPtr p;
     531             : 
     532      155743 :         if (flow == NULL)
     533           0 :                 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL");
     534      155743 :         if (mb == NULL)
     535           0 :                 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL");
     536      155743 :         assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
     537      155743 :         if (assign == NULL)
     538           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     539      155743 :         etop = flow->stop - flow->start;
     540     8327079 :         for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
     541     8171336 :                 p = getInstrPtr(mb, pc);
     542     8171336 :                 if (p == NULL) {
     543           0 :                         GDKfree(assign);
     544           0 :                         throw(MAL, "dataflow",
     545             :                                   "DFLOWinitBlk(): getInstrPtr() returned NULL");
     546             :                 }
     547             : 
     548             :                 /* initial state, ie everything can run */
     549     8171336 :                 flow->status[n].flow = flow;
     550     8171336 :                 flow->status[n].pc = pc;
     551     8171336 :                 flow->status[n].state = DFLOWpending;
     552     8171336 :                 flow->status[n].cost = -1;
     553     8171336 :                 ATOMIC_PTR_SET(&flow->status[n].flow->error, NULL);
     554             : 
     555             :                 /* administer flow dependencies */
     556    38670665 :                 for (j = p->retc; j < p->argc; j++) {
     557             :                         /* list of instructions that wake n-th instruction up */
     558    30499329 :                         if (!isVarConstant(mb, getArg(p, j)) && (k = assign[getArg(p, j)])) {
     559    13821443 :                                 assert(k < pc);      /* only dependencies on earlier instructions */
     560             :                                 /* add edge to the target instruction for wakeup call */
     561    13821443 :                                 k -= flow->start;
     562    13821443 :                                 if (flow->nodes[k]) {
     563             :                                         /* add wakeup to tail of list */
     564   101342583 :                                         for (i = k; flow->edges[i] > 0; i = flow->edges[i])
     565             :                                                 ;
     566    12088449 :                                         flow->nodes[etop] = n;
     567    12088449 :                                         flow->edges[etop] = -1;
     568    12088449 :                                         flow->edges[i] = etop;
     569    12088449 :                                         etop++;
     570    12088449 :                                         (void) size;
     571    12088449 :                                         if (etop == size) {
     572         220 :                                                 int *tmp;
     573             :                                                 /* in case of realloc failure, the original
     574             :                                                  * pointers will be freed by the caller */
     575         220 :                                                 tmp = (int *) GDKrealloc(flow->nodes,
     576             :                                                                                                  sizeof(int) * 2 * size);
     577         220 :                                                 if (tmp == NULL) {
     578           0 :                                                         GDKfree(assign);
     579           0 :                                                         throw(MAL, "dataflow",
     580             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     581             :                                                 }
     582         220 :                                                 flow->nodes = tmp;
     583         220 :                                                 tmp = (int *) GDKrealloc(flow->edges,
     584             :                                                                                                  sizeof(int) * 2 * size);
     585         220 :                                                 if (tmp == NULL) {
     586           0 :                                                         GDKfree(assign);
     587           0 :                                                         throw(MAL, "dataflow",
     588             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     589             :                                                 }
     590         220 :                                                 flow->edges = tmp;
     591         220 :                                                 size *= 2;
     592             :                                         }
     593             :                                 } else {
     594     1732994 :                                         flow->nodes[k] = n;
     595     1732994 :                                         flow->edges[k] = -1;
     596             :                                 }
     597             : 
     598    13821443 :                                 flow->status[n].blocks++;
     599             :                         }
     600             : 
     601             :                         /* list of instructions to be woken up explicitly */
     602    30499329 :                         if (!isVarConstant(mb, getArg(p, j))) {
     603             :                                 /* be careful, watch out for garbage collection interference */
     604             :                                 /* those should be scheduled after all its other uses */
     605    15542057 :                                 l = getEndScope(mb, getArg(p, j));
     606    15542057 :                                 if (l != pc && l < flow->stop && l > flow->start) {
     607             :                                         /* add edge to the target instruction for wakeup call */
     608     8994705 :                                         assert(pc < l);      /* only dependencies on earlier instructions */
     609     8994705 :                                         l -= flow->start;
     610     8994705 :                                         if (flow->nodes[n]) {
     611             :                                                 /* add wakeup to tail of list */
     612    39837374 :                                                 for (i = n; flow->edges[i] > 0; i = flow->edges[i])
     613             :                                                         ;
     614     4463863 :                                                 flow->nodes[etop] = l;
     615     4463863 :                                                 flow->edges[etop] = -1;
     616     4463863 :                                                 flow->edges[i] = etop;
     617     4463863 :                                                 etop++;
     618     4463863 :                                                 if (etop == size) {
     619         117 :                                                         int *tmp;
     620             :                                                         /* in case of realloc failure, the original
     621             :                                                          * pointers will be freed by the caller */
     622         117 :                                                         tmp = (int *) GDKrealloc(flow->nodes,
     623             :                                                                                                          sizeof(int) * 2 * size);
     624         117 :                                                         if (tmp == NULL) {
     625           0 :                                                                 GDKfree(assign);
     626           0 :                                                                 throw(MAL, "dataflow",
     627             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     628             :                                                         }
     629         117 :                                                         flow->nodes = tmp;
     630         117 :                                                         tmp = (int *) GDKrealloc(flow->edges,
     631             :                                                                                                          sizeof(int) * 2 * size);
     632         117 :                                                         if (tmp == NULL) {
     633           0 :                                                                 GDKfree(assign);
     634           0 :                                                                 throw(MAL, "dataflow",
     635             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     636             :                                                         }
     637         117 :                                                         flow->edges = tmp;
     638         117 :                                                         size *= 2;
     639             :                                                 }
     640             :                                         } else {
     641     4530842 :                                                 flow->nodes[n] = l;
     642     4530842 :                                                 flow->edges[n] = -1;
     643             :                                         }
     644     8994705 :                                         flow->status[l].blocks++;
     645             :                                 }
     646             :                         }
     647             :                 }
     648             : 
     649    16997836 :                 for (j = 0; j < p->retc; j++)
     650     8826500 :                         assign[getArg(p, j)] = pc;      /* ensure recognition of dependency on first instruction and constant */
     651             :         }
     652      155743 :         GDKfree(assign);
     653             : 
     654      155743 :         return MAL_SUCCEED;
     655             : }
     656             : 
     657             : /*
     658             :  * Parallel processing is mostly driven by dataflow, but within this context
     659             :  * there may be different schemes to take instructions into execution.
     660             :  * The admission scheme (and wrapup) are the necessary scheduler hooks.
     661             :  * A scheduler registers the functions needed and should release them
     662             :  * at the end of the parallel block.
     663             :  * They take effect after we have ensured that the basic properties for
     664             :  * execution hold.
     665             :  */
     666             : static str
     667      155743 : DFLOWscheduler(DataFlow flow, struct worker *w)
     668             : {
     669      155743 :         int last;
     670      155743 :         int i;
     671      155743 :         int j;
     672      155743 :         InstrPtr p;
     673      155743 :         int tasks = 0, actions = 0;
     674      155743 :         str ret = MAL_SUCCEED;
     675      155743 :         FlowEvent fe, f = 0;
     676             : 
     677      155743 :         if (flow == NULL)
     678           0 :                 throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL");
     679      155743 :         actions = flow->stop - flow->start;
     680      155743 :         if (actions == 0)
     681           0 :                 throw(MAL, "dataflow", "Empty dataflow block");
     682             :         /* initialize the eligible statements */
     683      155743 :         fe = flow->status;
     684             : 
     685      155743 :         ATOMIC_DEC(&flow->cntxt->workers);
     686      155743 :         MT_lock_set(&flow->flowlock);
     687     8482910 :         for (i = 0; i < actions; i++)
     688     8171424 :                 if (fe[i].blocks == 0) {
     689      965176 :                         p = getInstrPtr(flow->mb, fe[i].pc);
     690      965176 :                         if (p == NULL) {
     691           0 :                                 MT_lock_unset(&flow->flowlock);
     692           0 :                                 ATOMIC_INC(&flow->cntxt->workers);
     693           0 :                                 throw(MAL, "dataflow",
     694             :                                           "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc) returned NULL");
     695             :                         }
     696      965176 :                         fe[i].argclaim = 0;
     697     5785186 :                         for (j = p->retc; j < p->argc; j++)
     698     4820010 :                                 fe[i].argclaim += getMemoryClaim(fe[0].flow->mb,
     699     4820013 :                                                                                                  fe[0].flow->stk, p, j, FALSE);
     700      965173 :                         flow->status[i].state = DFLOWrunning;
     701      965173 :                         q_enqueue(todo, flow->status + i);
     702             :                 }
     703      155743 :         MT_lock_unset(&flow->flowlock);
     704      155743 :         MT_sema_up(&w->s);
     705             : 
     706      155743 :         while (actions != tasks) {
     707     8171098 :                 f = q_dequeue(flow->done, NULL);
     708     8171190 :                 if (ATOMIC_GET(&exiting))
     709             :                         break;
     710     8171190 :                 if (f == NULL) {
     711           0 :                         ATOMIC_INC(&flow->cntxt->workers);
     712           0 :                         throw(MAL, "dataflow",
     713             :                                   "DFLOWscheduler(): q_dequeue(flow->done) returned NULL");
     714             :                 }
     715             : 
     716             :                 /*
     717             :                  * When an instruction is finished we have to reduce the blocked
     718             :                  * counter for all dependent instructions.  for those where it
     719             :                  * drops to zero we can scheduler it we do it here instead of the scheduler
     720             :                  */
     721             : 
     722     8171190 :                 MT_lock_set(&flow->flowlock);
     723     8170709 :                 tasks++;
     724     8170709 :                 for (last = f->pc - flow->start;
     725    30985071 :                          last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last])
     726    22814127 :                         if (flow->status[i].state == DFLOWpending) {
     727    19639456 :                                 flow->status[i].argclaim += f->hotclaim;
     728    19639456 :                                 if (flow->status[i].blocks == 1) {
     729     4030484 :                                         flow->status[i].blocks--;
     730     4030484 :                                         flow->status[i].state = DFLOWrunning;
     731     4030484 :                                         q_enqueue(todo, flow->status + i);
     732             :                                 } else {
     733    15608972 :                                         flow->status[i].blocks--;
     734             :                                 }
     735             :                         }
     736     8326687 :                 MT_lock_unset(&flow->flowlock);
     737             :         }
     738             :         /* release the worker from its specific task (turn it into a
     739             :          * generic worker) */
     740      155740 :         ATOMIC_PTR_SET(&w->cntxt, NULL);
     741      155740 :         ATOMIC_INC(&flow->cntxt->workers);
     742             :         /* wrap up errors */
     743      155740 :         assert(flow->done->last == 0);
     744      155740 :         if ((ret = ATOMIC_PTR_XCG(&flow->error, NULL)) != NULL) {
     745         476 :                 TRC_DEBUG(MAL_SERVER, "Errors encountered: %s\n", ret);
     746             :         }
     747             :         return ret;
     748             : }
     749             : 
     750             : /* called and returns with dataflowLock locked, temporarily unlocks
     751             :  * join the thread associated with the worker and destroy the structure */
     752             : static inline void
     753        7718 : finish_worker(struct worker *t)
     754             : {
     755        7718 :         t->flag = FINISHING;
     756        7718 :         MT_lock_unset(&dataflowLock);
     757        7718 :         MT_join_thread(t->id);
     758        7718 :         MT_sema_destroy(&t->s);
     759        7718 :         ATOMIC_PTR_DESTROY(&t->cntxt);
     760        7718 :         GDKfree(t);
     761        7718 :         MT_lock_set(&dataflowLock);
     762        7718 : }
     763             : 
     764             : /* We create a pool of GDKnr_threads-1 generic workers, that is,
     765             :  * workers that will take on jobs from any clients.  In addition, we
     766             :  * create a single specific worker per client (i.e. each time we enter
     767             :  * here).  This specific worker will only do work for the client for
     768             :  * which it was started.  In this way we can guarantee that there will
     769             :  * always be progress for the client, even if all other workers are
     770             :  * doing something big.
     771             :  *
     772             :  * When all jobs for a client have been done (there are no more
     773             :  * entries for the client in the queue), the specific worker turns
     774             :  * itself into a generic worker.  At the same time, we signal that one
     775             :  * generic worker should exit and this function returns.  In this way
     776             :  * we make sure that there are once again GDKnr_threads-1 generic
     777             :  * workers. */
     778             : str
     779      155743 : runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc,
     780             :                            MalStkPtr stk)
     781             : {
     782      155743 :         DataFlow flow = NULL;
     783      155743 :         str msg = MAL_SUCCEED;
     784      155743 :         int size;
     785      155743 :         bit *ret;
     786      155743 :         struct worker *t;
     787             : 
     788      155743 :         if (stk == NULL)
     789           0 :                 throw(MAL, "dataflow", "runMALdataflow(): Called with stk == NULL");
     790      155743 :         ret = getArgReference_bit(stk, getInstrPtr(mb, startpc), 0);
     791      155743 :         *ret = FALSE;
     792             : 
     793      155743 :         assert(stoppc > startpc);
     794             : 
     795             :         /* check existence of workers */
     796      155743 :         if (todo == NULL) {
     797             :                 /* create thread pool */
     798         329 :                 if (GDKnr_threads <= 1 || DFLOWinitialize() < 0) {
     799             :                         /* no threads created, run serially */
     800           0 :                         *ret = TRUE;
     801           0 :                         return MAL_SUCCEED;
     802             :                 }
     803             :         }
     804      155743 :         assert(todo);
     805             :         /* in addition, create one more worker that will only execute
     806             :          * tasks for the current client to compensate for our waiting
     807             :          * until all work is done */
     808      155743 :         MT_lock_set(&dataflowLock);
     809             :         /* join with already exited threads */
     810      161880 :         while (exited_workers != NULL) {
     811        6137 :                 assert(exited_workers->flag == EXITED);
     812        6137 :                 struct worker *t = exited_workers;
     813        6137 :                 exited_workers = exited_workers->next;
     814        6137 :                 finish_worker(t);
     815             :         }
     816      155743 :         assert(cntxt != NULL);
     817      155743 :         if (free_workers != NULL) {
     818      149006 :                 t = free_workers;
     819      149006 :                 assert(t->flag == FREE);
     820      149006 :                 assert(free_count > 0);
     821      149006 :                 free_count--;
     822      149006 :                 free_workers = t->next;
     823      149006 :                 t->next = workers;
     824      149006 :                 workers = t;
     825      149006 :                 t->flag = WAITING;
     826      149006 :                 ATOMIC_PTR_SET(&t->cntxt, cntxt);
     827      149006 :                 MT_sema_up(&t->s);
     828             :         } else {
     829        6737 :                 t = GDKmalloc(sizeof(*t));
     830        6737 :                 if (t != NULL) {
     831        6737 :                         *t = (struct worker) {
     832             :                                 .flag = WAITING,
     833             :                                 .cntxt = ATOMIC_PTR_VAR_INIT(cntxt),
     834             :                         };
     835        6737 :                         MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
     836        6737 :                         if (MT_create_thread(&t->id, DFLOWworker, t,
     837             :                                                                  MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
     838           0 :                                 ATOMIC_PTR_DESTROY(&t->cntxt);
     839           0 :                                 MT_sema_destroy(&t->s);
     840           0 :                                 GDKfree(t);
     841           0 :                                 t = NULL;
     842             :                         } else {
     843        6737 :                                 t->next = workers;
     844        6737 :                                 workers = t;
     845             :                         }
     846             :                 }
     847        6737 :                 if (t == NULL) {
     848             :                         /* cannot start new thread, run serially */
     849           0 :                         *ret = TRUE;
     850           0 :                         MT_lock_unset(&dataflowLock);
     851           0 :                         return MAL_SUCCEED;
     852             :                 }
     853             :         }
     854      155743 :         MT_lock_unset(&dataflowLock);
     855             : 
     856      155743 :         flow = (DataFlow) GDKzalloc(sizeof(DataFlowRec));
     857      155743 :         if (flow == NULL)
     858           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     859             : 
     860      155743 :         size = DFLOWgraphSize(mb, startpc, stoppc);
     861      155743 :         size += stoppc - startpc;
     862             : 
     863      311486 :         *flow = (DataFlowRec) {
     864             :                 .cntxt = cntxt,
     865             :                 .mb = mb,
     866             :                 .stk = stk,
     867      155743 :                 .set_qry_ctx = MT_thread_get_qry_ctx() != NULL,
     868             :                 /* keep real block count, exclude brackets */
     869      155743 :                 .start = startpc + 1,
     870             :                 .stop = stoppc,
     871      155743 :                 .done = q_create("flow->done"),
     872      155743 :                 .status = (FlowEvent) GDKzalloc((stoppc - startpc + 1) *
     873             :                                                                                 sizeof(FlowEventRec)),
     874             :                 .error = ATOMIC_PTR_VAR_INIT(NULL),
     875      155742 :                 .nodes = (int *) GDKzalloc(sizeof(int) * size),
     876      155742 :                 .edges = (int *) GDKzalloc(sizeof(int) * size),
     877             :         };
     878             : 
     879      155743 :         if (flow->done == NULL) {
     880           0 :                 GDKfree(flow->status);
     881           0 :                 GDKfree(flow->nodes);
     882           0 :                 GDKfree(flow->edges);
     883           0 :                 GDKfree(flow);
     884           0 :                 throw(MAL, "dataflow",
     885             :                           "runMALdataflow(): Failed to create flow->done queue");
     886             :         }
     887             : 
     888      155743 :         if (flow->status == NULL || flow->nodes == NULL || flow->edges == NULL) {
     889           0 :                 q_destroy(flow->done);
     890           0 :                 GDKfree(flow->status);
     891           0 :                 GDKfree(flow->nodes);
     892           0 :                 GDKfree(flow->edges);
     893           0 :                 GDKfree(flow);
     894           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     895             :         }
     896             : 
     897      155743 :         MT_lock_init(&flow->flowlock, "flow->flowlock");
     898      155743 :         msg = DFLOWinitBlk(flow, mb, size);
     899             : 
     900      155744 :         if (msg == MAL_SUCCEED)
     901      155743 :                 msg = DFLOWscheduler(flow, t);
     902             : 
     903      155743 :         GDKfree(flow->status);
     904      155742 :         GDKfree(flow->edges);
     905      155742 :         GDKfree(flow->nodes);
     906      155743 :         q_destroy(flow->done);
     907      155741 :         MT_lock_destroy(&flow->flowlock);
     908      155741 :         ATOMIC_PTR_DESTROY(&flow->error);
     909      155741 :         GDKfree(flow);
     910             : 
     911             :         /* we created one worker, now tell one worker to exit again */
     912      155743 :         MT_lock_set(&todo->l);
     913      155743 :         todo->exitcount++;
     914      155743 :         MT_lock_unset(&todo->l);
     915      155743 :         MT_sema_up(&todo->s);
     916             : 
     917      155743 :         return msg;
     918             : }
     919             : 
     920             : str
     921           0 : deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     922             : {
     923           0 :         int *ret = getArgReference_int(stk, pci, 0);
     924           0 :         int *val = getArgReference_int(stk, pci, 1);
     925           0 :         (void) cntxt;
     926           0 :         (void) mb;
     927           0 :         *ret = *val;
     928           0 :         return MAL_SUCCEED;
     929             : }
     930             : 
     931             : static void
     932         328 : stopMALdataflow(void)
     933             : {
     934         328 :         ATOMIC_SET(&exiting, 1);
     935         328 :         if (todo) {
     936         328 :                 MT_lock_set(&dataflowLock);
     937             :                 /* first wake up all running threads */
     938         328 :                 int n = 0;
     939         927 :                 for (struct worker *t = free_workers; t; t = t->next)
     940         599 :                         n++;
     941        1310 :                 for (struct worker *t = workers; t; t = t->next)
     942         982 :                         n++;
     943        1909 :                 while (n-- > 0) {
     944             :                         /* one UP for each thread we know about */
     945        1909 :                         MT_sema_up(&todo->s);
     946             :                 }
     947         927 :                 while (free_workers) {
     948         599 :                         struct worker *t = free_workers;
     949         599 :                         assert(free_count > 0);
     950         599 :                         free_count--;
     951         599 :                         free_workers = free_workers->next;
     952         599 :                         MT_sema_up(&t->s);
     953         599 :                         finish_worker(t);
     954             :                 }
     955         347 :                 while (workers) {
     956          19 :                         struct worker *t = workers;
     957          19 :                         workers = workers->next;
     958          19 :                         finish_worker(t);
     959             :                 }
     960        1291 :                 while (exited_workers) {
     961         963 :                         struct worker *t = exited_workers;
     962         963 :                         exited_workers = exited_workers->next;
     963         963 :                         finish_worker(t);
     964             :                 }
     965         328 :                 MT_lock_unset(&dataflowLock);
     966             :         }
     967         328 : }
     968             : 
     969             : void
     970         328 : mal_dataflow_reset(void)
     971             : {
     972         328 :         stopMALdataflow();
     973         328 :         workers = exited_workers = NULL;
     974         328 :         if (todo) {
     975         328 :                 MT_lock_destroy(&todo->l);
     976         328 :                 MT_sema_destroy(&todo->s);
     977         328 :                 GDKfree(todo);
     978             :         }
     979         328 :         todo = 0;                                       /* pending instructions */
     980         328 :         ATOMIC_SET(&exiting, 0);
     981         328 : }

Generated by: LCOV version 1.14