Re: MonetDB: default - Low-level task scheduler.
There are a few things wrong with this code: - sz·=·((sz·<<·1)·>>·1); does *not* turn sz into a multiple of two (as suggested by the comment). This statement basically is a no-op. - when you create joinable threads, you should join them. - it's not a great idea to use assert to make sure that GDKmalloc succeeds. Better is to return an error. - The include of monet_options.h should be in the C file, not in the include file (which would be included elsewhere where monet_options.h will already be included). On 2012-11-07 13:03, Martin Kersten wrote:
Changeset: 5ff3c16e865f for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5ff3c16e865f Added Files: gdk/gdk_mapreduce.c gdk/gdk_mapreduce.h Modified Files: gdk/Makefile.ag monetdb5/modules/mal/groups.c Branch: default Log Message:
Low-level task scheduler. This module provide a lightweight map-reduce scheduler for multicore systems. A limited number of workers are initialized upfront, which take the tasks from a central queue. The header of these task descriptors should comply with the MRtask structure.
diffs (239 lines):
diff --git a/gdk/Makefile.ag b/gdk/Makefile.ag --- a/gdk/Makefile.ag +++ b/gdk/Makefile.ag @@ -36,7 +36,7 @@ lib_gdk = { gdk_private.h gdk_delta.h gdk_logger.h gdk_posix.h \ gdk_system.h gdk_tm.h gdk_storage.h \ gdk_calc.c gdk_calc.h gdk_calc_compare.h gdk_calc_private.h \ - gdk_aggr.c gdk_group.c \ + gdk_aggr.c gdk_group.c gdk_mapreduce.c gdk_mapreduce.h \ bat.feps bat1.feps bat2.feps \ libbat.rc LIBS = ../common/options/libmoptions \ diff --git a/gdk/gdk_mapreduce.c b/gdk/gdk_mapreduce.c new file mode 100644 --- /dev/null +++ b/gdk/gdk_mapreduce.c @@ -0,0 +1,141 @@ +/* + * The contents of this file are subject to the MonetDB Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.monetdb.org/Legal/MonetDBLicense + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the + * License for the specific language governing rights and limitations + * under the License. + * + * The Original Code is the MonetDB Database System. + * + * The Initial Developer of the Original Code is CWI. + * Portions created by CWI are Copyright (C) 1997-July 2008 CWI. + * Copyright August 2008-2012 MonetDB B.V. + * All Rights Reserved. + */ + +/* + * (co) Martin L. Kersten + * This module provide a lightweight map-reduce scheduler for multicore systems. + * A limited number of workers are initialized upfront, which take the tasks + * from a central queue. The header of these task descriptors should comply + * with the MRtask structure. + * + */ +#include "monetdb_config.h" +#include "gdk.h" +#include "gdk_mapreduce.h" + +/* each entry in the queue contains a list of tasks */ +typedef struct MRQUEUE { + MRtask **tasks; + int index; /* next available task */ + int size; /* number of tasks */ +} MRqueue; + +static MRqueue *mrqueue; +static int mrqsize= -1; /* size of queue */ +static int mrqlast= -1; +static MT_Lock mrqlock; /* its a shared resource, ie we need locks */ +static MT_Sema mrqsema; /* threads wait on empty queues */ + + +static void MRworker(void *); + +static void +MRqueueCreate(int sz) +{ + int i; + MT_Id tid; + + MT_lock_init(&mrqlock, "q_create"); + MT_lock_set(&mrqlock,"q_create"); + MT_sema_init(&mrqsema, 0, "q_create"); + sz = ((sz << 1) >> 1); /* we want a multiple of 2 */ + mrqueue = (MRqueue*)GDKzalloc(sizeof(MRqueue) *sz); + assert(mrqueue); + mrqsize = sz; + mrqlast = 0; + /* create a worker thread for each core as specified as system parameter*/ + for ( i =0; i < GDKnr_threads; i++) + MT_create_thread(&tid, MRworker, (void *) 0, MT_THR_JOINABLE); + MT_lock_unset(&mrqlock,"q_create"); +} + +static void +MRenqueue(int taskcnt, MRtask **tasks) +{ + assert(taskcnt > 0); + MT_lock_set(&mrqlock, "mrqlock"); + if (mrqlast == mrqsize) { + mrqsize <<= 1; + mrqueue = (MRqueue*) GDKrealloc(mrqueue, sizeof(MRqueue) * mrqsize); + } + mrqueue[mrqlast].index = 0; + mrqueue[mrqlast].tasks = tasks; + mrqueue[mrqlast].size = taskcnt; + mrqlast++; + MT_lock_unset(&mrqlock, "mrqlock"); + /* a task list is added for consumption*/ + while (taskcnt-- > 0) + MT_sema_up(&mrqsema, "mrqsema"); +} + +static MRtask * +MRdequeue(void) +{ + MRtask *r = NULL; + int idx; + + MT_sema_down(&mrqsema, "mrqsema"); + assert(mrqlast); + MT_lock_set(&mrqlock, "mrqlock"); + if (mrqlast > 0) { + idx = mrqueue[mrqlast-1].index; + r = mrqueue[mrqlast-1].tasks[idx++]; + if ( mrqueue[mrqlast-1].size == idx) + mrqlast--; + else + mrqueue[mrqlast-1].index = idx; + } + MT_lock_unset(&mrqlock, "mrqlock"); + assert(r); + return r; +} + +static void +MRworker(void * arg) +{ + MRtask *task; + (void) arg; + do{ + task= MRdequeue(); + (task->cmd)(task); + MT_sema_up(task->sema, "mrqsema"); + } while (1); +} + +/* schedule the tasks and return when all are done */ +void +MRschedule(int taskcnt, void **arg, void (*cmd)(void*p)) +{ + int i; + MT_Sema sema; + MRtask **task = (MRtask**) arg; + + if ( mrqueue == 0) + MRqueueCreate(1024); + + MT_sema_init(&sema, 0, "q_create"); + for ( i= 0; i < taskcnt; i++){ + task[i]->sema = & sema; + task[i]->cmd = cmd; + } + MRenqueue(taskcnt,task); + /* waiting for all report result */ + for ( i= 0; i < taskcnt; i++) + MT_sema_down(&sema, "mrqsema"); +} diff --git a/gdk/gdk_mapreduce.h b/gdk/gdk_mapreduce.h new file mode 100644 --- /dev/null +++ b/gdk/gdk_mapreduce.h @@ -0,0 +1,32 @@ +/* + * The contents of this file are subject to the MonetDB Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.monetdb.org/Legal/MonetDBLicense + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the + * License for the specific language governing rights and limitations + * under the License. + * + * The Original Code is the MonetDB Database System. + * + * The Initial Developer of the Original Code is CWI. + * Portions created by CWI are Copyright (C) 1997-July 2008 CWI. + * Copyright August 2008-2012 MonetDB B.V. + * All Rights Reserved. + */ + +#ifndef _GDK_MAPREDUCE_H_ +#define _GDK_MAPREDUCE_H_ + +#include
+ +typedef struct{ + MT_Sema *sema; /* micro scheduler handle */ + void (*cmd)(void *); /* the function to be executed */ +}MRtask; + +gdk_export void MRschedule(int taskcnt, void **arg, void (*cmd)(void *p)); + +#endif /* _GDK_MAPREDUCE_H_ */ diff --git a/monetdb5/modules/mal/groups.c b/monetdb5/modules/mal/groups.c --- a/monetdb5/modules/mal/groups.c +++ b/monetdb5/modules/mal/groups.c @@ -66,11 +66,15 @@ GRPmulticolumngroup(Client cntxt, MalBlk /* sort order may have influences */ /* SF100 Q16 showed < ordering is 2 times faster as > ordering */ for ( i = 3; i< pci->argc; i++) - for ( j = i+1; j<pci->argc; j++) - if ( sizes[j] < sizes[i]){ - l = sizes[j]; sizes[j]= sizes[i]; sizes[i]= l; - bi = bid[j]; bid[j]= bid[i]; bid[i]= bi; - } + for ( j = i+1; j<pci->argc; j++) + if ( sizes[j] < sizes[i]){ + l = sizes[j]; + sizes[j]= sizes[i]; + sizes[i]= l; + bi = bid[j]; + bid[j]= bid[i]; + bid[i]= bi; + } /* for (i=2; i<pci->argc; i++) mnstr_printf(cntxt->fdout,"# after [%d] "LLFMT"\n",i, sizes[i]); */ @@ -82,8 +86,6 @@ GRPmulticolumngroup(Client cntxt, MalBlk i = 4; if (msg == MAL_SUCCEED && pci->argc > 4 ) do { - if (*ext) - BBPdecref(*ext, TRUE); /* early break when there are as many groups as histogram entries */ b = BATdescriptor(*hist); if ( b ){ @@ -91,8 +93,8 @@ GRPmulticolumngroup(Client cntxt, MalBlk BBPreleaseref(*hist); if ( j) break; } - if (*hist) - BBPdecref(*hist, TRUE); + BBPdecref(*ext, TRUE); + BBPdecref(*hist, TRUE);
/* (grp,ext,hist) := group.subgroupdone(arg,grp) */ oldgrp= *grp; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list
-- Sjoerd Mullender _______________________________________________ developers-list mailing list developers-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/developers-list
participants (1)
-
Sjoerd Mullender