Martin, could you please also add sql/test/BugTracker-2013/Tests/nestedcalls.sql ? And could you plase also add the test scripts and stable output for tests sql/test/BugTracker-2013/Tests/oid_handling sql/test/BugTracker-2013/Tests/constraint_checking.Bug_3335 sql/test/BugTracker-2013/Tests/pivot.Bug-3339 sql/test/BugTracker-2013/Tests/recursion or alternatively remove them from sql/test/BugTracker-2013/Tests/All Thanks! In any case, propagation of this will checking will result in (expected/unavoidable) conflicts in monetdb5/mal/mal_dataflow.[ch] and (avoidable) conflicts in sql/test/BugTracker-2013/Tests All of these will need to be solved by hand ... Stefan ----- Original Message -----
Changeset: 489815265a61 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=489815265a61 Added Files: sql/test/BugTracker-2013/Tests/nestedcalls.stable.err sql/test/BugTracker-2013/Tests/nestedcalls.stable.out Modified Files: monetdb5/mal/mal_dataflow.c monetdb5/mal/mal_dataflow.h monetdb5/modules/mal/language.c monetdb5/modules/mal/language.h sql/test/BugTracker-2013/Tests/All Branch: Feb2013 Log Message:
Organize parallelism per worker pool Each (recursive) dataflow block is handed a worker pool now. Once we run out of worker pools or fail to create a worker, we continue in sequential mode.
Cleaning up the threads is implicit, like all other non-pool interpreters that might be active at system shutdown.
diffs (truncated from 762 to 300 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -35,6 +35,7 @@ * The flow graphs should be organized such that parallel threads can * access it mostly without expensive locking. */ +#include "monetdb_config.h" #include "mal_dataflow.h" #include "mal_client.h"
@@ -82,10 +83,10 @@ typedef struct DATAFLOW { Queue *done; /* instructions handled */ } *DataFlow, DataFlowRec;
-#define MAXQ 1024 -static MT_Id workers[THREADS] = {0}; -static int workerqueue[THREADS] = {0}; /* maps workers towards the todo queues */ -static Queue *todo[MAXQ] = {0}; /* pending instructions organized by user MAXTODO > #users */ +#define MAXQ 256 +static Queue *todos[MAXQ] = {0}; /* pending instructions organized by dataflow block */ +static bit occupied[MAXQ]={0}; /* worker pool is in use? */ +static int volatile exiting = 0;
/* * Calculate the size of the dataflow dependency graph. @@ -108,9 +109,8 @@ DFLOWgraphSize(MalBlkPtr mb, int start, */
static Queue* -q_create(int sz) +q_create(int sz, const char *name) { - const char* name = "q_create"; Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
if (q == NULL) @@ -208,6 +208,8 @@ q_dequeue(Queue *q)
assert(q); MT_sema_down(&q->s, "q_dequeue"); + if (exiting) + return NULL; MT_lock_set(&q->l, "q_dequeue"); assert(q->last > 0); if (q->last > 0) { @@ -255,25 +257,23 @@ DFLOWworker(void *t) { DataFlow flow; FlowEvent fe = 0, fnxt = 0; - int id = (int) ((MT_Id *) t - workers), last = 0; - int wq; Thread thr; str error = 0; - - int i; - lng usec = 0; + Queue *todo = *(Queue **) t; + int i,last;
thr = THRnew("DFLOWworker");
GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */ GDKerrbuf[0] = 0; while (1) { - assert(workerqueue[id] > 0); - wq = workerqueue[id] - 1; if (fnxt == 0) - fe = q_dequeue(todo[wq]); + fe = q_dequeue(todo); else fe = fnxt; + if (exiting) { + break; + } fnxt = 0; assert(fe); flow = fe->flow; @@ -285,22 +285,20 @@ DFLOWworker(void *t) continue; }
- usec = GDKusec(); /* skip all instructions when we have encontered an error */ if (flow->error == 0) { #ifdef USE_MAL_ADMISSION if (MALadmission(fe->argclaim, fe->hotclaim)) { fe->hotclaim = 0; /* don't assume priority anymore */ - assert(todo[wq]); - if (todo[wq]->last == 0) + if (todo->last == 0) MT_sleep_ms(DELAYUNIT); - q_requeue(todo[wq], fe); + q_requeue(todo, fe); continue; } #endif error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= " LLFMT "," LLFMT " %s\n", - fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : ""); + fe->pc, (int)((Queue **)t - todos), fe->argclaim, fe->hotclaim, error ? error : ""); #ifdef USE_MAL_ADMISSION /* release the memory claim */ MALadmission(-fe->argclaim, -fe->hotclaim); @@ -331,8 +329,8 @@ DFLOWworker(void *t) InstrPtr p = getInstrPtr(flow->mb, fe->pc); assert(p); fe->hotclaim = 0; - for (i = 0; i < p->retc; i++) - fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, fe->pc, i, FALSE); + //for (i = 0; i < p->retc; i++) + //fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, p, i, FALSE); } #endif MT_lock_set(&flow->flowlock, "MALworker"); @@ -351,56 +349,64 @@ DFLOWworker(void *t)
q_enqueue(flow->done, fe); if ( fnxt == 0) { - assert(todo[wq]); - if (todo[wq]->last == 0) + if (todo->last == 0) profilerHeartbeatEvent("wait"); - else - MALresourceFairness(NULL, NULL, usec); } } GDKfree(GDKerrbuf); GDKsetbuf(0); - workerqueue[wq] = 0; - workers[wq] = 0; THRdel(thr); }
/* - * Create a set of DFLOW interpreters. + * Create an interpreter pool. * One worker will adaptively be available for each client. * The remainder are taken from the GDKnr_threads argument and - * typically is equal to the number of cores + * typically is equal to the number of cores. + * A recursive MAL function call would make for one worker less, + * which limits the number of cores for parallel processing. * The workers are assembled in a local table to enable debugging. + * + * BEWARE, failure to create a new worker thread is not an error + * but would lead to serial execution. */ -static void -DFLOWinitialize(int index) +static int +DFLOWinitialize(void) { - int i, worker, limit; + int i, threads, grp; + MT_Id worker;
- assert(index >= 0); - assert(index < THREADS); + threads = GDKnr_threads ? GDKnr_threads : 1; MT_lock_set(&mal_contextLock, "DFLOWinitialize"); - if (todo[index]) { - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - return; + for(grp = 0; grp< MAXQ; grp++) + if ( occupied[grp] == FALSE){ + occupied[grp] = TRUE; + break; + } + MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + if (grp > THREADS) { + // continue non-parallel + return -1; } - todo[index] = q_create(2048); - assert(todo[index]); - limit = GDKnr_threads ? GDKnr_threads : 1; - assert(limit <= THREADS); - for (worker = 0, i = 0; i < limit; i++){ - for (; worker < THREADS; worker++) - if( workers[worker] == 0) - break; - assert(worker < THREADS); - if (worker < THREADS) { - assert(workers[worker] == 0); - MT_create_thread(&workers[worker], DFLOWworker, (void *) &workers[worker], MT_THR_JOINABLE); - assert(workers[worker] > 0); - workerqueue[worker] = index + 1; + if ( todos[grp] ) + return grp; + + todos[grp] = q_create(2048, "todo"); + if (todos[grp] == NULL) + return -1; + + // associate a set of workers with the pool + for (i = 0; grp>= 0 && i < threads; i++){ + if (MT_create_thread(&worker, DFLOWworker, (void *) &todos[grp], MT_THR_JOINABLE) < 0) { + //Can not create interpreter thread + grp = -1; + } + if (worker == 0) { + //Failed to create interpreter thread + grp = -1; } } - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + return grp; }
/* @@ -409,18 +415,28 @@ DFLOWinitialize(int index) * For each instruction we keep a list of instructions whose * blocking counter should be decremented upon finishing it. */ -static void +static str DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size) { int pc, i, j, k, l, n, etop = 0; int *assign; InstrPtr p;
+ if (flow == NULL) + throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL"); + if (mb == NULL) + throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL"); PARDEBUG printf("Initialize dflow block\n"); assign = (int *) GDKzalloc(mb->vtop * sizeof(int)); + if (assign == NULL) + throw(MAL, "dataflow", "DFLOWinitBlk(): Failed to allocate assign"); etop = flow->stop - flow->start; for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) { p = getInstrPtr(mb, pc); + if (p == NULL) { + GDKfree(assign); + throw(MAL, "dataflow", "DFLOWinitBlk(): getInstrPtr() returned NULL"); + }
/* initial state, ie everything can run */ flow->status[n].flow = flow; @@ -501,6 +517,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb #ifdef USE_MAL_ADMISSION memorypool = memoryclaims = 0; #endif + return MAL_SUCCEED; }
/* @@ -528,18 +545,17 @@ static void showFlowEvent(DataFlow flow, */
static str -DFLOWscheduler(DataFlow flow) +DFLOWscheduler(DataFlow flow, Queue *todo) { int last; int i; #ifdef USE_MAL_ADMISSION - int j; + //int j; InstrPtr p; #endif int tasks=0, actions; str ret = MAL_SUCCEED; FlowEvent fe, f = 0; - int wq;
if (flow == NULL) throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL"); @@ -549,19 +565,19 @@ DFLOWscheduler(DataFlow flow) /* initialize the eligible statements */ fe = flow->status;
- if (fe[0].flow->cntxt->flags & timerFlag) - fe[0].flow->cntxt->timer = GDKusec(); - MT_lock_set(&flow->flowlock, "MALworker"); - wq = flow->cntxt->idx; for (i = 0; i < actions; i++) if (fe[i].blocks == 0) { #ifdef USE_MAL_ADMISSION p = getInstrPtr(flow->mb,fe[i].pc); - for (j = p->retc; j < p->argc; j++) - fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, fe[i].pc, j, FALSE); + if (p == NULL) { + MT_lock_unset(&flow->flowlock, "MALworker"); + throw(MAL, "dataflow", "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc) returned NULL"); + } + //for (j = p->retc; j < p->argc; j++) + //fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, p, j, FALSE); #endif - q_enqueue(todo[wq], flow->status + i); + q_enqueue(todo, flow->status + i); flow->status[i].state = DFLOWrunning; PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); } @@ -571,6 +587,10 @@ DFLOWscheduler(DataFlow flow)
while (actions != tasks ) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list
-- | Stefan.Manegold@CWI.nl | DB Architectures (DA) | | www.CWI.nl/~manegold/ | Science Park 123 (L321) | | +31 (0)20 592-4212 | 1098 XG Amsterdam (NL) |