Re: MonetDB: Feb2013 - New thread group per dataflow block
Martin, this checkins appears to make several tests run into timeouts; I have not yet been able to analyze why ... A complete Mtest.py run results in * 142 out of 1863 tests could not be executed 1 out of 1863 tests produced slightly different output sql/test/BugTracker-2012/predicate_select.Bug-3090 * 6 out of 1863 tests ran into timeout * sql/benchmarks/ATIS/load * sql/benchmarks/arno/insert_ATOM * sql/benchmarks/arno/insert_BOND * sql/benchmarks/arno_flt/init * sql/benchmarks/moa/load * sql/benchmarks/wisconsin/load 1 out of 1863 tests caused an abort (assertion failure) monetdb5/tests/BugTracker/kunion-and-nil.Bug-1667 7 out of 1863 tests produced SIGNIFICANTLY different output monetdb5/optimizer/inlineFunction monetdb5/optimizer/ifthencst sql/test/BugTracker-2012/conditions_when_for_triggers_do_not_work.Bug-2073 sql/test/BugTracker-2012/create_function.Bug-3172 sql/test/BugTracker-2012/currenttime.Bug-2781 sql/test/BugTracker-2012/null_except_null.Bug-3040 sql/test/BugTracker-2012/day-of-month-localization.Bug-2962 while with the version before this checkin, it results in 41 out of 1863 tests could not be executed 1 out of 1863 tests produced slightly different output sql/test/BugTracker-2012/predicate_select.Bug-3090 1 out of 1863 tests caused an abort (assertion failure) monetdb5/tests/BugTracker/kunion-and-nil.Bug-1667 7 out of 1863 tests produced SIGNIFICANTLY different output monetdb5/optimizer/inlineFunction monetdb5/optimizer/ifthencst sql/test/BugTracker-2012/conditions_when_for_triggers_do_not_work.Bug-2073 sql/test/BugTracker-2012/create_function.Bug-3172 sql/test/BugTracker-2012/currenttime.Bug-2781 sql/test/BugTracker-2012/null_except_null.Bug-3040 sql/test/BugTracker-2012/day-of-month-localization.Bug-2962 Best, Stefan ----- Original Message -----
Changeset: 3669ddd28bf0 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3669ddd28bf0 Modified Files: monetdb5/mal/mal_dataflow.c Branch: Feb2013 Log Message:
New thread group per dataflow block The centralized worker thread group could lead to an unacceptable situation. If a user is heavily processing complex queries, then no other user could even log into the system, for its MAL statements ended up at the end of the shared queues.
The problem has been resolved by introducing a thread group per dataflow block. This may lead to a large number of processes, whose resources are managed by the OS.
It solves bug 3258
diffs (235 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -78,12 +78,12 @@ typedef struct DATAFLOW { int *nodes; /* dependency graph nodes */ int *edges; /* dependency graph */ MT_Lock flowlock; /* lock to protect the above */ + queue *todo; /* pending instructions */ queue *done; /* instructions handled */ + int threads; /* worker threads active */ + MT_Id workers[THREADS]; } *DataFlow, DataFlowRec;
-static MT_Id workers[THREADS]; -static queue *todo = 0; /* pending instructions */ - /* * Calculate the size of the dataflow dependency graph. */ @@ -138,7 +138,6 @@ q_destroy(queue *q) static void q_enqueue_(queue *q, FlowEvent d) { - assert(d); if (q->last == q->size) { q->size <<= 1; q->data = GDKrealloc(q->data, sizeof(FlowEvent) * q->size); @@ -214,7 +213,6 @@ q_dequeue(queue *q) */
MT_lock_unset(&q->l, "q_dequeue"); - assert(r); return r; }
@@ -239,14 +237,15 @@ q_dequeue(queue *q) static void DFLOWworker(void *t) { - DataFlow flow; + DataFlow flow = (DataFlow) t; FlowEvent fe = 0, fnxt = 0; - int id = (int) ((MT_Id *) t - workers), last = 0; + MT_Id id = MT_getpid(); + int last = 0; Thread thr; str error = 0;
int i; - lng usec = 0; + //lng usec = 0;
thr = THRnew("DFLOWworker");
@@ -254,8 +253,10 @@ DFLOWworker(void *t) GDKerrbuf[0] = 0; while (1) { if (fnxt == 0) - fe = q_dequeue(todo); + fe = q_dequeue(flow->todo); else fe = fnxt; + if ( fe == 0) + break; fnxt = 0; assert(fe); flow = fe->flow; @@ -266,20 +267,20 @@ DFLOWworker(void *t) continue; }
- usec = GDKusec(); + //usec = GDKusec(); /* skip all instructions when we have encontered an error */ if (flow->error == 0) { #ifdef USE_MAL_ADMISSION if (MALadmission(fe->argclaim, fe->hotclaim)) { fe->hotclaim = 0; /* don't assume priority anymore */ - if (todo->last == 0) + if (flow->todo->last == 0) MT_sleep_ms(DELAYUNIT); - q_requeue(todo, fe); + q_requeue(flow->todo, fe); continue; } #endif error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); - PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= " LLFMT "," LLFMT " %s\n", + PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= "SZFMT" claim= " LLFMT "," LLFMT " %s\n", fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : ""); #ifdef USE_MAL_ADMISSION /* release the memory claim */ @@ -330,12 +331,15 @@ DFLOWworker(void *t)
q_enqueue(flow->done, fe); if ( fnxt == 0) { - if (todo->last == 0) + if (flow->todo->last == 0) profilerHeartbeatEvent("wait"); - else - MALresourceFairness(NULL, NULL, usec); + //else + //MALresourceFairness(NULL, NULL, usec); } } + for( i = 0; i< flow->threads; i++) + if ( flow->workers[i] == id) + flow->workers[i] = 0; GDKfree(GDKerrbuf); GDKsetbuf(0); THRdel(thr); @@ -349,22 +353,51 @@ DFLOWworker(void *t) * The workers are assembled in a local table to enable debugging. */ static void -DFLOWinitialize(void) +DFLOWinitialize(DataFlow flow, int size) { - int i, limit; + int i;
- MT_lock_set(&mal_contextLock, "DFLOWinitialize"); - if (todo) { - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); - return; + MT_lock_init(&flow->flowlock, "DFLOWworker"); + flow->todo = q_create(size); + flow->done = q_create(size); + flow->threads = GDKnr_threads ? GDKnr_threads :1; + for (i = 0; i < flow->threads; i++){ + MT_create_thread(&flow->workers[i], DFLOWworker, (void *) flow, MT_THR_JOINABLE); + /* upon failure of starting threads we reduce the count */ + if ( flow->workers[i]== 0){ + flow->threads --; + i--; + } } - todo = q_create(2048); - limit = GDKnr_threads ? GDKnr_threads : 1; - for (i = 0; i < limit; i++) - MT_create_thread(&workers[i], DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE); - MT_lock_unset(&mal_contextLock, "DFLOWinitialize"); }
+static str +DFLOWfinalize(DataFlow flow) +{ + int i, cnt= flow->threads, runs =0; + + for( i = 0; i< cnt; i++) + q_enqueue(flow->todo, 0); + /* time out when threads are already killed */ + do{ + runs++; + cnt = 0; + MT_sleep_ms(1); + for( i = 0; i < flow->threads; i++) + cnt += flow->workers[i] ==0; + } while( cnt != flow->threads && runs <5000); + + if ( runs == 5000) + throw(MAL,"dataflow","Timeout on thread termination"); + GDKfree(flow->status); + GDKfree(flow->edges); + GDKfree(flow->nodes); + q_destroy(flow->done); + q_destroy(flow->todo); + MT_lock_destroy(&flow->flowlock); + GDKfree(flow); + return MAL_SUCCEED; +} /* * The dataflow administration is based on administration of * how many variables are still missing before it can be executed. @@ -518,7 +551,7 @@ DFLOWscheduler(DataFlow flow) for (j = p->retc; j < p->argc; j++) fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, fe[i].pc, j, FALSE); #endif - q_enqueue(todo, flow->status + i); + q_enqueue(flow->todo, flow->status + i); flow->status[i].state = DFLOWrunning; PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); } @@ -543,7 +576,7 @@ DFLOWscheduler(DataFlow flow) if (flow->status[i].blocks == 1 ) { flow->status[i].state = DFLOWrunning; flow->status[i].blocks--; - q_enqueue(todo, flow->status + i); + q_enqueue(flow->todo, flow->status + i); PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim); } else { @@ -579,13 +612,11 @@ runMALdataflow(Client cntxt, MalBlkPtr m
assert(stoppc > startpc);
- /* check existence of workers */ - if (workers[0] == 0) - DFLOWinitialize(); - assert(workers[0]); - assert(todo); - flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec)); + + DFLOWinitialize(flow, stoppc- startpc +1); + assert(flow->todo); + assert(flow->done);
flow->cntxt = cntxt; flow->mb = mb; @@ -596,9 +627,6 @@ runMALdataflow(Client cntxt, MalBlkPtr m flow->start = startpc + 1; flow->stop = stoppc;
- MT_lock_init(&flow->flowlock, "DFLOWworker"); - flow->done = q_create(stoppc- startpc+1); - flow->status = (FlowEvent)GDKzalloc((stoppc - startpc + 1) * sizeof(FlowEventRec)); size = DFLOWgraphSize(mb, startpc, stoppc); size += stoppc - startpc; @@ -608,11 +636,9 @@ runMALdataflow(Client cntxt, MalBlkPtr m
ret = DFLOWscheduler(flow);
- GDKfree(flow->status); - GDKfree(flow->edges); - GDKfree(flow->nodes); - q_destroy(flow->done); - MT_lock_destroy(&flow->flowlock); - GDKfree(flow); + if( ret == MAL_SUCCEED) + ret = DFLOWfinalize(flow); + else (void) + DFLOWfinalize(flow); return ret; } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list
-- | Stefan.Manegold@CWI.nl | DB Architectures (DA) | | www.CWI.nl/~manegold/ | Science Park 123 (L321) | | +31 (0)20 592-4212 | 1098 XG Amsterdam (NL) |
participants (1)
-
Stefan Manegold