Re: MonetDB: Feb2013 - Organize parallelism per worker pool
Martin, could you please also add sql/test/BugTracker-2013/Tests/nestedcalls.sql ? And could you plase also add the test scripts and stable output for tests sql/test/BugTracker-2013/Tests/oid_handling sql/test/BugTracker-2013/Tests/constraint_checking.Bug_3335 sql/test/BugTracker-2013/Tests/pivot.Bug-3339 sql/test/BugTracker-2013/Tests/recursion or alternatively remove them from sql/test/BugTracker-2013/Tests/All Thanks! In any case, propagation of this will checking will result in (expected/unavoidable) conflicts in monetdb5/mal/mal_dataflow.[ch] and (avoidable) conflicts in sql/test/BugTracker-2013/Tests All of these will need to be solved by hand ... Stefan ----- Original Message -----
Changeset: 489815265a61 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=489815265a61 Added Files: sql/test/BugTracker-2013/Tests/nestedcalls.stable.err sql/test/BugTracker-2013/Tests/nestedcalls.stable.out Modified Files: monetdb5/mal/mal_dataflow.c monetdb5/mal/mal_dataflow.h monetdb5/modules/mal/language.c monetdb5/modules/mal/language.h sql/test/BugTracker-2013/Tests/All Branch: Feb2013 Log Message:
Organize parallelism per worker pool Each (recursive) dataflow block is handed a worker pool now. Once we run out of worker pools or fail to create a worker, we continue in sequential mode.
Cleaning up the threads is implicit, like all other non-pool interpreters that might be active at system shutdown.
diffs (truncated from 762 to 300 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -35,6 +35,7 @@ * The flow graphs should be organized such that parallel threads can * access it mostly without expensive locking. */ +#include "monetdb_config.h" #include "mal_dataflow.h" #include "mal_client.h"
@@ -82,10 +83,10 @@ typedef struct DATAFLOW { Queue *done; /* instructions handled */ } *DataFlow, DataFlowRec;
-#define MAXQ 1024 -static MT_Id workers[THREADS] = {0}; -static int workerqueue[THREADS] = {0}; /* maps workers towards the todo queues */ -static Queue *todo[MAXQ] = {0}; /* pending instructions organized by user MAXTODO > #users */ +#define MAXQ 256 +static Queue *todos[MAXQ] = {0}; /* pending instructions organized by dataflow block */ +static bit occupied[MAXQ]={0}; /* worker pool is in use? */ +static int volatile exiting = 0;
/* * Calculate the size of the dataflow dependency graph. @@ -108,9 +109,8 @@ DFLOWgraphSize(MalBlkPtr mb, int start, */
static Queue* -q_create(int sz) +q_create(int sz, const char *name) { - const char* name = "q_create"; Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
if (q == NULL) @@ -208,6 +208,8 @@ q_dequeue(Queue *q)
assert(q); MT_sema_down(&q->s, "q_dequeue"); + if (exiting) + return NULL; MT_lock_set(&q->l, "q_dequeue"); assert(q->last > 0); if (q->last > 0) { @@ -255,25 +257,23 @@ DFLOWworker(void *t) { DataFlow flow; FlowEvent fe = 0, fnxt = 0; - int id = (int) ((MT_Id *) t - workers), last = 0; - int wq; Thread thr; str error = 0; - - int i; - lng usec = 0; + Queue *todo = *(Queue **) t; + int i,last;
thr = THRnew("DFLOWworker");
GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */ GDKerrbuf[0] = 0; while (1) { - assert(workerqueue[id] > 0); - wq = workerqueue[id] - 1; if (fnxt == 0) - fe = q_dequeue(todo[wq]); + fe = q_dequeue(todo); else fe = fnxt; + if (exiting) { + break; + } fnxt = 0; assert(fe); flow = fe->flow; @@ -285,22 +285,20 @@ DFLOWworker(void *t) continue; }
- usec = GDKusec(); /* skip all instructions when we have encontered an error */ if (flow->error == 0) { #ifdef USE_MAL_ADMISSION if (MALadmission(fe->argclaim, fe->hotclaim)) { fe->hotclaim = 0; /* don't assume priority anymore */ - assert(todo[wq]); - if (todo[wq]->last == 0) + if (todo->last == 0) MT_sleep_ms(DELAYUNIT); - q_requeue(todo[wq], fe); + q_requeue(todo, fe); continue; } #endif error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= " LLFMT "," LLFMT " %s\n", - fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : ""); + fe->pc, (int)((Queue **)t - todos), fe->argclaim, fe->hotclaim, error ? error : ""); #ifdef USE_MAL_ADMISSION /* release the memory claim */ MALadmission(-fe->argclaim, -fe->hotclaim); @@ -331,8 +329,8 @@ DFLOWworker(void *t) InstrPtr p = getInstrPtr(flow->mb, fe->pc); assert(p); fe->hotclaim = 0; - for (i = 0; i < p->retc; i++) - fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, fe->pc, i, FALSE); + //for (i = 0; i < p->retc; i++) + //fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, p, i, FALSE); } #endif MT_lock_set(&flow->flowlock, "MALworker"); @@ -351,56 +349,64 @@ DFLOWworker(void *t)
q_enqueue(flow->done, fe); if ( fnxt == 0) { - assert(todo[wq]); - if (todo[wq]->last == 0) + if (todo->last == 0) profilerHeartbeatEvent("wait"); - else - MALresourceFairness(NULL, NULL, usec); } } GDKfree(GDKerrbuf); GDKsetbuf(0); - workerqueue[wq] = 0; - workers[wq] = 0; THRdel(thr); }
/* - * Create a set of DFLOW interpreters. + * Create an interpreter pool. * One worker will adaptively be available for each client. * The remainder are taken from the GDKnr_threads argument and - * typically is equal to the number of cores + * typically is equal to the number of cores. + * A recursive MAL function call would make for one worker less, + * which limits the number of cores for parallel processing. * The workers are assembled in a local table to enable debugging. + * + * BEWARE, failure to create a new worker thread is not an error + * but would lead to serial execution. */ -static void -DFLOWinitialize(int index) +static int +DFLOWinitialize(void) { - int i, worker, limit; + int i, threads, grp; + MT_Id worker;
- assert(index >= 0); - assert(index < THREADS); + threads = GDKnr_threads ? GDKnr_threads : 1; MT_lock_set(&mal_contextLock, "DFLOWinitialize"); - if (todo[index]) { - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - return; + for(grp = 0; grp< MAXQ; grp++) + if ( occupied[grp] == FALSE){ + occupied[grp] = TRUE; + break; + } + MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + if (grp > THREADS) { + // continue non-parallel + return -1; } - todo[index] = q_create(2048); - assert(todo[index]); - limit = GDKnr_threads ? GDKnr_threads : 1; - assert(limit <= THREADS); - for (worker = 0, i = 0; i < limit; i++){ - for (; worker < THREADS; worker++) - if( workers[worker] == 0) - break; - assert(worker < THREADS); - if (worker < THREADS) { - assert(workers[worker] == 0); - MT_create_thread(&workers[worker], DFLOWworker, (void *) &workers[worker], MT_THR_JOINABLE); - assert(workers[worker] > 0); - workerqueue[worker] = index + 1; + if ( todos[grp] ) + return grp; + + todos[grp] = q_create(2048, "todo"); + if (todos[grp] == NULL) + return -1; + + // associate a set of workers with the pool + for (i = 0; grp>= 0 && i < threads; i++){ + if (MT_create_thread(&worker, DFLOWworker, (void *) &todos[grp], MT_THR_JOINABLE) < 0) { + //Can not create interpreter thread + grp = -1; + } + if (worker == 0) { + //Failed to create interpreter thread + grp = -1; } } - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + return grp; }
/* @@ -409,18 +415,28 @@ DFLOWinitialize(int index) * For each instruction we keep a list of instructions whose * blocking counter should be decremented upon finishing it. */ -static void +static str DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size) { int pc, i, j, k, l, n, etop = 0; int *assign; InstrPtr p;
+ if (flow == NULL) + throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL"); + if (mb == NULL) + throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL"); PARDEBUG printf("Initialize dflow block\n"); assign = (int *) GDKzalloc(mb->vtop * sizeof(int)); + if (assign == NULL) + throw(MAL, "dataflow", "DFLOWinitBlk(): Failed to allocate assign"); etop = flow->stop - flow->start; for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) { p = getInstrPtr(mb, pc); + if (p == NULL) { + GDKfree(assign); + throw(MAL, "dataflow", "DFLOWinitBlk(): getInstrPtr() returned NULL"); + }
/* initial state, ie everything can run */ flow->status[n].flow = flow; @@ -501,6 +517,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb #ifdef USE_MAL_ADMISSION memorypool = memoryclaims = 0; #endif + return MAL_SUCCEED; }
/* @@ -528,18 +545,17 @@ static void showFlowEvent(DataFlow flow, */
static str -DFLOWscheduler(DataFlow flow) +DFLOWscheduler(DataFlow flow, Queue *todo) { int last; int i; #ifdef USE_MAL_ADMISSION - int j; + //int j; InstrPtr p; #endif int tasks=0, actions; str ret = MAL_SUCCEED; FlowEvent fe, f = 0; - int wq;
if (flow == NULL) throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL"); @@ -549,19 +565,19 @@ DFLOWscheduler(DataFlow flow) /* initialize the eligible statements */ fe = flow->status;
- if (fe[0].flow->cntxt->flags & timerFlag) - fe[0].flow->cntxt->timer = GDKusec(); - MT_lock_set(&flow->flowlock, "MALworker"); - wq = flow->cntxt->idx; for (i = 0; i < actions; i++) if (fe[i].blocks == 0) { #ifdef USE_MAL_ADMISSION p = getInstrPtr(flow->mb,fe[i].pc); - for (j = p->retc; j < p->argc; j++) - fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, fe[i].pc, j, FALSE); + if (p == NULL) { + MT_lock_unset(&flow->flowlock, "MALworker"); + throw(MAL, "dataflow", "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc) returned NULL"); + } + //for (j = p->retc; j < p->argc; j++) + //fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, p, j, FALSE); #endif - q_enqueue(todo[wq], flow->status + i); + q_enqueue(todo, flow->status + i); flow->status[i].state = DFLOWrunning; PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); } @@ -571,6 +587,10 @@ DFLOWscheduler(DataFlow flow)
while (actions != tasks ) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list
-- | Stefan.Manegold@CWI.nl | DB Architectures (DA) | | www.CWI.nl/~manegold/ | Science Park 123 (L321) | | +31 (0)20 592-4212 | 1098 XG Amsterdam (NL) |
Martin, Also, by adding an and exporting new functions stopMALdataflow & CMDcallFunction, this changeset changes the API, and thus must not remain as-is in the Feb2013 release branch. Are these two new functions indeed inherently required for fixing bug 3346? At least I cannot even find any place where these new functions are used / called ... Thanks! Stefan ----- Original Message -----
Martin,
could you please also add sql/test/BugTracker-2013/Tests/nestedcalls.sql ?
And could you plase also add the test scripts and stable output for tests sql/test/BugTracker-2013/Tests/oid_handling sql/test/BugTracker-2013/Tests/constraint_checking.Bug_3335 sql/test/BugTracker-2013/Tests/pivot.Bug-3339 sql/test/BugTracker-2013/Tests/recursion or alternatively remove them from sql/test/BugTracker-2013/Tests/All
Thanks!
In any case, propagation of this will checking will result in (expected/unavoidable) conflicts in monetdb5/mal/mal_dataflow.[ch] and (avoidable) conflicts in sql/test/BugTracker-2013/Tests
All of these will need to be solved by hand ...
Stefan
----- Original Message -----
Changeset: 489815265a61 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=489815265a61 Added Files: sql/test/BugTracker-2013/Tests/nestedcalls.stable.err sql/test/BugTracker-2013/Tests/nestedcalls.stable.out Modified Files: monetdb5/mal/mal_dataflow.c monetdb5/mal/mal_dataflow.h monetdb5/modules/mal/language.c monetdb5/modules/mal/language.h sql/test/BugTracker-2013/Tests/All Branch: Feb2013 Log Message:
Organize parallelism per worker pool Each (recursive) dataflow block is handed a worker pool now. Once we run out of worker pools or fail to create a worker, we continue in sequential mode.
Cleaning up the threads is implicit, like all other non-pool interpreters that might be active at system shutdown.
diffs (truncated from 762 to 300 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -35,6 +35,7 @@ * The flow graphs should be organized such that parallel threads can * access it mostly without expensive locking. */ +#include "monetdb_config.h" #include "mal_dataflow.h" #include "mal_client.h"
@@ -82,10 +83,10 @@ typedef struct DATAFLOW { Queue *done; /* instructions handled */ } *DataFlow, DataFlowRec;
-#define MAXQ 1024 -static MT_Id workers[THREADS] = {0}; -static int workerqueue[THREADS] = {0}; /* maps workers towards the todo queues */ -static Queue *todo[MAXQ] = {0}; /* pending instructions organized by user MAXTODO > #users */ +#define MAXQ 256 +static Queue *todos[MAXQ] = {0}; /* pending instructions organized by dataflow block */ +static bit occupied[MAXQ]={0}; /* worker pool is in use? */ +static int volatile exiting = 0;
/* * Calculate the size of the dataflow dependency graph. @@ -108,9 +109,8 @@ DFLOWgraphSize(MalBlkPtr mb, int start, */
static Queue* -q_create(int sz) +q_create(int sz, const char *name) { - const char* name = "q_create"; Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
if (q == NULL) @@ -208,6 +208,8 @@ q_dequeue(Queue *q)
assert(q); MT_sema_down(&q->s, "q_dequeue"); + if (exiting) + return NULL; MT_lock_set(&q->l, "q_dequeue"); assert(q->last > 0); if (q->last > 0) { @@ -255,25 +257,23 @@ DFLOWworker(void *t) { DataFlow flow; FlowEvent fe = 0, fnxt = 0; - int id = (int) ((MT_Id *) t - workers), last = 0; - int wq; Thread thr; str error = 0; - - int i; - lng usec = 0; + Queue *todo = *(Queue **) t; + int i,last;
thr = THRnew("DFLOWworker");
GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */ GDKerrbuf[0] = 0; while (1) { - assert(workerqueue[id] > 0); - wq = workerqueue[id] - 1; if (fnxt == 0) - fe = q_dequeue(todo[wq]); + fe = q_dequeue(todo); else fe = fnxt; + if (exiting) { + break; + } fnxt = 0; assert(fe); flow = fe->flow; @@ -285,22 +285,20 @@ DFLOWworker(void *t) continue; }
- usec = GDKusec(); /* skip all instructions when we have encontered an error */ if (flow->error == 0) { #ifdef USE_MAL_ADMISSION if (MALadmission(fe->argclaim, fe->hotclaim)) { fe->hotclaim = 0; /* don't assume priority anymore */ - assert(todo[wq]); - if (todo[wq]->last == 0) + if (todo->last == 0) MT_sleep_ms(DELAYUNIT); - q_requeue(todo[wq], fe); + q_requeue(todo, fe); continue; } #endif error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= " LLFMT "," LLFMT " %s\n", - fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : ""); + fe->pc, (int)((Queue **)t - todos), fe->argclaim, fe->hotclaim, error ? error : ""); #ifdef USE_MAL_ADMISSION /* release the memory claim */ MALadmission(-fe->argclaim, -fe->hotclaim); @@ -331,8 +329,8 @@ DFLOWworker(void *t) InstrPtr p = getInstrPtr(flow->mb, fe->pc); assert(p); fe->hotclaim = 0; - for (i = 0; i < p->retc; i++) - fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, fe->pc, i, FALSE); + //for (i = 0; i < p->retc; i++) + //fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, p, i, FALSE); } #endif MT_lock_set(&flow->flowlock, "MALworker"); @@ -351,56 +349,64 @@ DFLOWworker(void *t)
q_enqueue(flow->done, fe); if ( fnxt == 0) { - assert(todo[wq]); - if (todo[wq]->last == 0) + if (todo->last == 0) profilerHeartbeatEvent("wait"); - else - MALresourceFairness(NULL, NULL, usec); } } GDKfree(GDKerrbuf); GDKsetbuf(0); - workerqueue[wq] = 0; - workers[wq] = 0; THRdel(thr); }
/* - * Create a set of DFLOW interpreters. + * Create an interpreter pool. * One worker will adaptively be available for each client. * The remainder are taken from the GDKnr_threads argument and - * typically is equal to the number of cores + * typically is equal to the number of cores. + * A recursive MAL function call would make for one worker less, + * which limits the number of cores for parallel processing. * The workers are assembled in a local table to enable debugging. + * + * BEWARE, failure to create a new worker thread is not an error + * but would lead to serial execution. */ -static void -DFLOWinitialize(int index) +static int +DFLOWinitialize(void) { - int i, worker, limit; + int i, threads, grp; + MT_Id worker;
- assert(index >= 0); - assert(index < THREADS); + threads = GDKnr_threads ? GDKnr_threads : 1; MT_lock_set(&mal_contextLock, "DFLOWinitialize"); - if (todo[index]) { - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - return; + for(grp = 0; grp< MAXQ; grp++) + if ( occupied[grp] == FALSE){ + occupied[grp] = TRUE; + break; + } + MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + if (grp > THREADS) { + // continue non-parallel + return -1; } - todo[index] = q_create(2048); - assert(todo[index]); - limit = GDKnr_threads ? GDKnr_threads : 1; - assert(limit <= THREADS); - for (worker = 0, i = 0; i < limit; i++){ - for (; worker < THREADS; worker++) - if( workers[worker] == 0) - break; - assert(worker < THREADS); - if (worker < THREADS) { - assert(workers[worker] == 0); - MT_create_thread(&workers[worker], DFLOWworker, (void *) &workers[worker], MT_THR_JOINABLE); - assert(workers[worker] > 0); - workerqueue[worker] = index + 1; + if ( todos[grp] ) + return grp; + + todos[grp] = q_create(2048, "todo"); + if (todos[grp] == NULL) + return -1; + + // associate a set of workers with the pool + for (i = 0; grp>= 0 && i < threads; i++){ + if (MT_create_thread(&worker, DFLOWworker, (void *) &todos[grp], MT_THR_JOINABLE) < 0) { + //Can not create interpreter thread + grp = -1; + } + if (worker == 0) { + //Failed to create interpreter thread + grp = -1; } } - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + return grp; }
/* @@ -409,18 +415,28 @@ DFLOWinitialize(int index) * For each instruction we keep a list of instructions whose * blocking counter should be decremented upon finishing it. */ -static void +static str DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size) { int pc, i, j, k, l, n, etop = 0; int *assign; InstrPtr p;
+ if (flow == NULL) + throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL"); + if (mb == NULL) + throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL"); PARDEBUG printf("Initialize dflow block\n"); assign = (int *) GDKzalloc(mb->vtop * sizeof(int)); + if (assign == NULL) + throw(MAL, "dataflow", "DFLOWinitBlk(): Failed to allocate assign"); etop = flow->stop - flow->start; for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) { p = getInstrPtr(mb, pc); + if (p == NULL) { + GDKfree(assign); + throw(MAL, "dataflow", "DFLOWinitBlk(): getInstrPtr() returned NULL"); + }
/* initial state, ie everything can run */ flow->status[n].flow = flow; @@ -501,6 +517,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb #ifdef USE_MAL_ADMISSION memorypool = memoryclaims = 0; #endif + return MAL_SUCCEED; }
/* @@ -528,18 +545,17 @@ static void showFlowEvent(DataFlow flow, */
static str -DFLOWscheduler(DataFlow flow) +DFLOWscheduler(DataFlow flow, Queue *todo) { int last; int i; #ifdef USE_MAL_ADMISSION - int j; + //int j; InstrPtr p; #endif int tasks=0, actions; str ret = MAL_SUCCEED; FlowEvent fe, f = 0; - int wq;
if (flow == NULL) throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL"); @@ -549,19 +565,19 @@ DFLOWscheduler(DataFlow flow) /* initialize the eligible statements */ fe = flow->status;
- if (fe[0].flow->cntxt->flags & timerFlag) - fe[0].flow->cntxt->timer = GDKusec(); - MT_lock_set(&flow->flowlock, "MALworker"); - wq = flow->cntxt->idx; for (i = 0; i < actions; i++) if (fe[i].blocks == 0) { #ifdef USE_MAL_ADMISSION p = getInstrPtr(flow->mb,fe[i].pc); - for (j = p->retc; j < p->argc; j++) - fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, fe[i].pc, j, FALSE); + if (p == NULL) { + MT_lock_unset(&flow->flowlock, "MALworker"); + throw(MAL, "dataflow", "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc) returned NULL"); + } + //for (j = p->retc; j < p->argc; j++) + //fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, p, j, FALSE); #endif - q_enqueue(todo[wq], flow->status + i); + q_enqueue(todo, flow->status + i); flow->status[i].state = DFLOWrunning; PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); } @@ -571,6 +587,10 @@ DFLOWscheduler(DataFlow flow)
while (actions != tasks ) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list
-- | Stefan.Manegold@CWI.nl | DB Architectures (DA) | | www.CWI.nl/~manegold/ | Science Park 123 (L321) | | +31 (0)20 592-4212 | 1098 XG Amsterdam (NL) |
_______________________________________________ developers-list mailing list developers-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/developers-list
-- | Stefan.Manegold@CWI.nl | DB Architectures (DA) | | www.CWI.nl/~manegold/ | Science Park 123 (L321) | | +31 (0)20 592-4212 | 1098 XG Amsterdam (NL) |
Martin, Compilation and testing appears to work also without these two new functions. I'll feel free to remove them, again, if you don't mind. Could you please take care of the tests I mention below? (Or let me know what I shall do?) Thanks! Stefan ----- Original Message -----
Martin,
Also, by adding an and exporting new functions stopMALdataflow & CMDcallFunction, this changeset changes the API, and thus must not remain as-is in the Feb2013 release branch.
Are these two new functions indeed inherently required for fixing bug 3346?
At least I cannot even find any place where these new functions are used / called ...
Thanks! Stefan
----- Original Message -----
Martin,
could you please also add sql/test/BugTracker-2013/Tests/nestedcalls.sql ?
And could you plase also add the test scripts and stable output for tests sql/test/BugTracker-2013/Tests/oid_handling sql/test/BugTracker-2013/Tests/constraint_checking.Bug_3335 sql/test/BugTracker-2013/Tests/pivot.Bug-3339 sql/test/BugTracker-2013/Tests/recursion or alternatively remove them from sql/test/BugTracker-2013/Tests/All
Thanks!
In any case, propagation of this will checking will result in (expected/unavoidable) conflicts in monetdb5/mal/mal_dataflow.[ch] and (avoidable) conflicts in sql/test/BugTracker-2013/Tests
All of these will need to be solved by hand ...
Stefan
----- Original Message -----
Changeset: 489815265a61 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=489815265a61 Added Files: sql/test/BugTracker-2013/Tests/nestedcalls.stable.err sql/test/BugTracker-2013/Tests/nestedcalls.stable.out Modified Files: monetdb5/mal/mal_dataflow.c monetdb5/mal/mal_dataflow.h monetdb5/modules/mal/language.c monetdb5/modules/mal/language.h sql/test/BugTracker-2013/Tests/All Branch: Feb2013 Log Message:
Organize parallelism per worker pool Each (recursive) dataflow block is handed a worker pool now. Once we run out of worker pools or fail to create a worker, we continue in sequential mode.
Cleaning up the threads is implicit, like all other non-pool interpreters that might be active at system shutdown.
diffs (truncated from 762 to 300 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -35,6 +35,7 @@ * The flow graphs should be organized such that parallel threads can * access it mostly without expensive locking. */ +#include "monetdb_config.h" #include "mal_dataflow.h" #include "mal_client.h"
@@ -82,10 +83,10 @@ typedef struct DATAFLOW { Queue *done; /* instructions handled */ } *DataFlow, DataFlowRec;
-#define MAXQ 1024 -static MT_Id workers[THREADS] = {0}; -static int workerqueue[THREADS] = {0}; /* maps workers towards the todo queues */ -static Queue *todo[MAXQ] = {0}; /* pending instructions organized by user MAXTODO > #users */ +#define MAXQ 256 +static Queue *todos[MAXQ] = {0}; /* pending instructions organized by dataflow block */ +static bit occupied[MAXQ]={0}; /* worker pool is in use? */ +static int volatile exiting = 0;
/* * Calculate the size of the dataflow dependency graph. @@ -108,9 +109,8 @@ DFLOWgraphSize(MalBlkPtr mb, int start, */
static Queue* -q_create(int sz) +q_create(int sz, const char *name) { - const char* name = "q_create"; Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
if (q == NULL) @@ -208,6 +208,8 @@ q_dequeue(Queue *q)
assert(q); MT_sema_down(&q->s, "q_dequeue"); + if (exiting) + return NULL; MT_lock_set(&q->l, "q_dequeue"); assert(q->last > 0); if (q->last > 0) { @@ -255,25 +257,23 @@ DFLOWworker(void *t) { DataFlow flow; FlowEvent fe = 0, fnxt = 0; - int id = (int) ((MT_Id *) t - workers), last = 0; - int wq; Thread thr; str error = 0; - - int i; - lng usec = 0; + Queue *todo = *(Queue **) t; + int i,last;
thr = THRnew("DFLOWworker");
GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */ GDKerrbuf[0] = 0; while (1) { - assert(workerqueue[id] > 0); - wq = workerqueue[id] - 1; if (fnxt == 0) - fe = q_dequeue(todo[wq]); + fe = q_dequeue(todo); else fe = fnxt; + if (exiting) { + break; + } fnxt = 0; assert(fe); flow = fe->flow; @@ -285,22 +285,20 @@ DFLOWworker(void *t) continue; }
- usec = GDKusec(); /* skip all instructions when we have encontered an error */ if (flow->error == 0) { #ifdef USE_MAL_ADMISSION if (MALadmission(fe->argclaim, fe->hotclaim)) { fe->hotclaim = 0; /* don't assume priority anymore */ - assert(todo[wq]); - if (todo[wq]->last == 0) + if (todo->last == 0) MT_sleep_ms(DELAYUNIT); - q_requeue(todo[wq], fe); + q_requeue(todo, fe); continue; } #endif error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= " LLFMT "," LLFMT " %s\n", - fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : ""); + fe->pc, (int)((Queue **)t - todos), fe->argclaim, fe->hotclaim, error ? error : ""); #ifdef USE_MAL_ADMISSION /* release the memory claim */ MALadmission(-fe->argclaim, -fe->hotclaim); @@ -331,8 +329,8 @@ DFLOWworker(void *t) InstrPtr p = getInstrPtr(flow->mb, fe->pc); assert(p); fe->hotclaim = 0; - for (i = 0; i < p->retc; i++) - fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, fe->pc, i, FALSE); + //for (i = 0; i < p->retc; i++) + //fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, p, i, FALSE); } #endif MT_lock_set(&flow->flowlock, "MALworker"); @@ -351,56 +349,64 @@ DFLOWworker(void *t)
q_enqueue(flow->done, fe); if ( fnxt == 0) { - assert(todo[wq]); - if (todo[wq]->last == 0) + if (todo->last == 0) profilerHeartbeatEvent("wait"); - else - MALresourceFairness(NULL, NULL, usec); } } GDKfree(GDKerrbuf); GDKsetbuf(0); - workerqueue[wq] = 0; - workers[wq] = 0; THRdel(thr); }
/* - * Create a set of DFLOW interpreters. + * Create an interpreter pool. * One worker will adaptively be available for each client. * The remainder are taken from the GDKnr_threads argument and - * typically is equal to the number of cores + * typically is equal to the number of cores. + * A recursive MAL function call would make for one worker less, + * which limits the number of cores for parallel processing. * The workers are assembled in a local table to enable debugging. + * + * BEWARE, failure to create a new worker thread is not an error + * but would lead to serial execution. */ -static void -DFLOWinitialize(int index) +static int +DFLOWinitialize(void) { - int i, worker, limit; + int i, threads, grp; + MT_Id worker;
- assert(index >= 0); - assert(index < THREADS); + threads = GDKnr_threads ? GDKnr_threads : 1; MT_lock_set(&mal_contextLock, "DFLOWinitialize"); - if (todo[index]) { - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - return; + for(grp = 0; grp< MAXQ; grp++) + if ( occupied[grp] == FALSE){ + occupied[grp] = TRUE; + break; + } + MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + if (grp > THREADS) { + // continue non-parallel + return -1; } - todo[index] = q_create(2048); - assert(todo[index]); - limit = GDKnr_threads ? GDKnr_threads : 1; - assert(limit <= THREADS); - for (worker = 0, i = 0; i < limit; i++){ - for (; worker < THREADS; worker++) - if( workers[worker] == 0) - break; - assert(worker < THREADS); - if (worker < THREADS) { - assert(workers[worker] == 0); - MT_create_thread(&workers[worker], DFLOWworker, (void *) &workers[worker], MT_THR_JOINABLE); - assert(workers[worker] > 0); - workerqueue[worker] = index + 1; + if ( todos[grp] ) + return grp; + + todos[grp] = q_create(2048, "todo"); + if (todos[grp] == NULL) + return -1; + + // associate a set of workers with the pool + for (i = 0; grp>= 0 && i < threads; i++){ + if (MT_create_thread(&worker, DFLOWworker, (void *) &todos[grp], MT_THR_JOINABLE) < 0) { + //Can not create interpreter thread + grp = -1; + } + if (worker == 0) { + //Failed to create interpreter thread + grp = -1; } } - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); + return grp; }
/* @@ -409,18 +415,28 @@ DFLOWinitialize(int index) * For each instruction we keep a list of instructions whose * blocking counter should be decremented upon finishing it. */ -static void +static str DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size) { int pc, i, j, k, l, n, etop = 0; int *assign; InstrPtr p;
+ if (flow == NULL) + throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL"); + if (mb == NULL) + throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL"); PARDEBUG printf("Initialize dflow block\n"); assign = (int *) GDKzalloc(mb->vtop * sizeof(int)); + if (assign == NULL) + throw(MAL, "dataflow", "DFLOWinitBlk(): Failed to allocate assign"); etop = flow->stop - flow->start; for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) { p = getInstrPtr(mb, pc); + if (p == NULL) { + GDKfree(assign); + throw(MAL, "dataflow", "DFLOWinitBlk(): getInstrPtr() returned NULL"); + }
/* initial state, ie everything can run */ flow->status[n].flow = flow; @@ -501,6 +517,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb #ifdef USE_MAL_ADMISSION memorypool = memoryclaims = 0; #endif + return MAL_SUCCEED; }
/* @@ -528,18 +545,17 @@ static void showFlowEvent(DataFlow flow, */
static str -DFLOWscheduler(DataFlow flow) +DFLOWscheduler(DataFlow flow, Queue *todo) { int last; int i; #ifdef USE_MAL_ADMISSION - int j; + //int j; InstrPtr p; #endif int tasks=0, actions; str ret = MAL_SUCCEED; FlowEvent fe, f = 0; - int wq;
if (flow == NULL) throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL"); @@ -549,19 +565,19 @@ DFLOWscheduler(DataFlow flow) /* initialize the eligible statements */ fe = flow->status;
- if (fe[0].flow->cntxt->flags & timerFlag) - fe[0].flow->cntxt->timer = GDKusec(); - MT_lock_set(&flow->flowlock, "MALworker"); - wq = flow->cntxt->idx; for (i = 0; i < actions; i++) if (fe[i].blocks == 0) { #ifdef USE_MAL_ADMISSION p = getInstrPtr(flow->mb,fe[i].pc); - for (j = p->retc; j < p->argc; j++) - fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, fe[i].pc, j, FALSE); + if (p == NULL) { + MT_lock_unset(&flow->flowlock, "MALworker"); + throw(MAL, "dataflow", "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc) returned NULL"); + } + //for (j = p->retc; j < p->argc; j++) + //fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, p, j, FALSE); #endif - q_enqueue(todo[wq], flow->status + i); + q_enqueue(todo, flow->status + i); flow->status[i].state = DFLOWrunning; PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); } @@ -571,6 +587,10 @@ DFLOWscheduler(DataFlow flow)
while (actions != tasks ) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list
-- | Stefan.Manegold@CWI.nl | DB Architectures (DA) | | www.CWI.nl/~manegold/ | Science Park 123 (L321) | | +31 (0)20 592-4212 | 1098 XG Amsterdam (NL) |
_______________________________________________ developers-list mailing list developers-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/developers-list
-- | Stefan.Manegold@CWI.nl | DB Architectures (DA) | | www.CWI.nl/~manegold/ | Science Park 123 (L321) | | +31 (0)20 592-4212 | 1098 XG Amsterdam (NL) |
_______________________________________________ developers-list mailing list developers-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/developers-list
-- | Stefan.Manegold@CWI.nl | DB Architectures (DA) | | www.CWI.nl/~manegold/ | Science Park 123 (L321) | | +31 (0)20 592-4212 | 1098 XG Amsterdam (NL) |
participants (1)
-
Stefan Manegold