Martin Kersten wrote:
Update of /cvsroot/monetdb/MonetDB5/src/modules/mal In directory sc8-pr-cvs7.sourceforge.net:/tmp/cvs-serv29523
Modified Files: bpm.mx Log Message: 70% of the code needed to handle partitioned BATs in a single server. The code is not tested yet (except for compilation errors), but should give a head start for those in-experienced in working with MonetDB source code.
Index: bpm.mx =================================================================== RCS file: /cvsroot/monetdb/MonetDB5/src/modules/mal/bpm.mx,v retrieving revision 1.23 retrieving revision 1.24 diff -u -d -r1.23 -r1.24 --- bpm.mx 4 Feb 2007 22:55:32 -0000 1.23 +++ bpm.mx 6 Feb 2007 07:16:13 -0000 1.24 @@ -229,7 +229,7 @@ address BPMdestroy comment "Destroy the BAT partition box";
-command deposit(nme:str,b:bat[:oid,:any_2]) :void +command deposit(nme:str,b:bat[:oid,:any_2]) :bat[:oid,:any_2] address BPMdeposit comment "Create a new partitioned BAT by name";
@@ -238,36 +238,36 @@ If the alias BAT denotes an existing partition, it is further broken into pieces. @mal -command rangePartition(pb:bat[:oid,:any_2], - ll:oid, lh:oid, rl:any_2, rh:any_2):void +command rangePartition(pb:bat[:any_1,:any_2], rl:any_2, rh:any_2):void address BPMrange comment "Create a range partition on a BAT"; -command rangePartition(pb:bat[:oid,:any_2], pv:bat[:oid,:any_2]):void +command rangePartition(pb:bat[:any_1,:any_2], pv:bat[:oid,:any_2]):void address BPMrangeVector comment "Create the partitions based on a range vector";
-command hashPartition(pb:bat[:oid,:any_2], slots:int):void +command hashPartition(pb:bat[:any_1,:any_2], slots:int):void address BPMhash comment "Create a hash partition on a BAT"; -command derivePartition(pb:bat[:oid,:any_2], src:bat[:oid,:any_2]):bat[:oid,:any_2] +command derivePartition(pb:bat[:any_1,:any_2], + src:bat[:any_1,:any_2]):bat[:any_1,:any_2] address BPMderived comment "Create a derived fragmentation over the head using src.";
-command take(pb:str, b:bat[:oid,:any_2]):bat[:oid,:any_2] +command take(pb:str, b:bat[:any_1,:any_2]):bat[:any_1,:any_2] address BPMtake comment "Retrieve the alias for a partitioned BAT"; -command take(alias:bat[:oid,:any_2],index:int) :bat[:oid,:any_2] +command take(alias:bat[:any_1,:any_2],index:int) :bat[:any_1,:any_2] address BPMtakePartition comment "Retrieve a single component of a partitioned BAT by index";
-command insert(pb:bat[:oid,:any_2],b:bat[:oid,:any_2]) :void +command insert(pb:bat[:any_1,:any_2],b:bat[:any_1,:any_2]) :void address BPMinsert comment "Insert elements into the BAT partitions"; -command delete(pb:bat[:oid,:any_2],b:bat[:oid,:any_2]) :void +command delete(pb:bat[:any_1,:any_2],b:bat[:any_1,:any_2]) :void address BPMdelete comment "Delete elements from the BAT partitions"; -command replace(pb:bat[:oid,:any_2],old:bat[:oid,:any_2], - nwe:bat[:oid,:any_2]) :void +command replace(pb:bat[:any_1,:any_2],old:bat[:any_1,:any_2], + nwe:bat[:any_1,:any_2]) :void address BPMreplace comment "Replace the content of the BAT partitions";
@@ -275,7 +275,7 @@ address BPMgetNames comment "Retrieve the names of all known partitioned BATs";
-command discard(alias:bat[:oid,:any_2]) :void +command discard(alias:bat[:any_1,:any_2]) :void address BPMdiscard comment "Release a partitioned BAT from the box";
@@ -285,39 +285,39 @@ Wherever possible skipping elements that don't qualify the bounds given for the head. @mal -command newIterator(grp:bat[:oid,:any_2]):bat[:oid,:any_2] +command newIterator(grp:bat[:any_1,:any_2]):bat[:any_1,:any_2] address BPMnewIterator comment "Create an iterator over the BAT partitions.";
-command newIterator(grp:bat[:oid,:any_2],first:any_2,last:any_2) - :bat[:oid,:any_2] +command newIterator(grp:bat[:any_1,:any_2],first:any_2,last:any_2) + :bat[:any_1,:any_2] address BPMnewIteratorRng comment "Create an iterator over the BAT partitions.";
-command newIterator(pb:bat[:oid,:any_2], first:oid,last:oid, +command newIterator(pb:bat[:any_1,:any_2], first:any_1,last:any_1, vlow:any_2, vhgh:any_2) :bat[:oid,:any_2] address BPMnewIteratorRng4 comment "Create an iterator over the BAT partitions.";
-command hasMoreElements(grp:bat[:oid,:any_2]) :bat[:oid,:any_2] +command hasMoreElements(grp:bat[:any_1,:any_2]) :bat[:any_1,:any_2] address BPMhasMoreElements comment "Localize the next partition for processing.";
-command hasMoreElements(pb:bat[:oid,:any_2], - low:any_2, hgh:any_2) :bat[:oid,:any_2] +command hasMoreElements(pb:bat[:any_1,:any_2], + low:any_2, hgh:any_2) :bat[:any_1,:any_2] address BPMhasMoreElementsRng2 comment "Localize the next partition for processing."; -command hasMoreElements(pb:bat[:oid,:any_2], first:oid,last:oid, - vlow:any_2, vhgh:any_2) :bat[:oid,:any_2] +command hasMoreElements(pb:bat[:any_1,:any_2], first:any_1,last:any_1, + vlow:any_2, vhgh:any_2) :bat[:any_1,:any_2] address BPMhasMoreElementsRng4 comment "Localize the next partition for processing.";
-command getDimension(b:bat[:oid,:any_2])(first:oid,last:oid, +command getDimension(b:bat[:any_1,:any_2])(first:any_1,last:any_1, vlow:any_2, vhgh:any_2) address BPMgetDimension comment "Obtain the partition boundary values.";
-command dump(alias:bat[:oid,:any_2]) +command dump(alias:bat[:any_1,:any_2]) address BPMdumpAlias comment "Give the details of the partition tree"; command dump() @@ -403,6 +403,7 @@ @c #include "mal_config.h" #include "bpm.h" +#include "bat5.h"
@- Every partition is the result of at most four operations: @@ -430,7 +431,6 @@ int nxt, prv; /* list of all partitions*/ } *Partition, PartitionRec;
-/* NOT USED YET static void BPMprintRecord(stream *f, Partition p){ stream_printf(f,"partition: %s alias %d bid %d index %d ", p->name, p->alias, p->bid, p->index); @@ -438,7 +438,6 @@ stream_printf(f,"hbucket %d tbucket %d ", p->hbucket,p->tbucket); stream_printf(f,"nxt %d prv %d\n", p->nxt,p->prv); } -*/ @- The number of partitioned BATs is considered low and a straight forward array with linear search seems @@ -470,7 +469,6 @@ } return 0; } -/* NOT USED YET static Partition getPartition(int bid, int idx){ return bpmcat+getPartitionIndex(bid,idx); } @@ -478,6 +476,7 @@ return getPartition(bid,0); }
+/* NOT USED YET static void delAlias(int bid){ int i; i= getPartitionIndex(bid,0); @@ -595,33 +594,121 @@
BBPkeepref(*ret= bn->batCacheid); BBPunfix(b->batCacheid); - throw(MAL, "bpm.deposit","NYI"); + return MAL_SUCCEED; } - +@- +Range partitioning simply runs through all partitions +and creates new ones, keeping the partitions in order. +The 'critical' part is to detect overlap of the range +and the bounds of the partition. +@c str BPMrange(int *ret, int *bid, ptr *low, ptr *hgh){ BAT *b; + int i, tpe; + int (*cmp) (ptr, ptr); + ptr nilptr; + int low_nil, hgh_nil; + Partition p; + BPMopen(); - @:getBATdescriptor(bid,b,"bpm.range")@ - /* determine the partitioning scheme */ - (void) b; - (void) low; - (void) hgh; + + p= getAlias(*bid); + if( p==0){ + throw(MAL, "bpm.range","Partition not known"); + } + i= p->index; + @:getBATdescriptor(&bpmcat[i].bid,b,"bpm.range")@ + if( b == 0) + throw(MAL,"bpm.range","Could not access BAT"); + + + /* get the comparison function */ + tpe= *bid >0? b->htype:b->ttype; + cmp= BATatoms[tpe].atomCmp; + nilptr = ATOMnilptr(tpe); + low_nil = ((*cmp) (*low, nilptr) == 0); + hgh_nil = ((*cmp) (*hgh, nilptr) == 0); + + BBPunfix(b->batCacheid); /* don't need it anymore */ + + for( ; i ; i= bpmcat[i].nxt){ + @:getBATdescriptor(&bpmcat[i].bid,b,"bpm.range")@ + + /* determine overlag by excluding outliers */ + if( *bid > 0 ){ + if( (*cmp) ((ptr) &bpmcat[i].thgh, *low) < 0 || + (*cmp) ((ptr) &bpmcat[i].tlow, *hgh) > 0 + ){ + /* this fragment need not be split */ + } else { + /* break the fragment */ +#ifdef _DEBUG_BPM_ + stream_printf(GDKout, "break fragment %d\n",i); +#endif + } + } else { + /* use reversed bat */ + if( (*cmp) ((ptr) &bpmcat[i].hhgh, *low) < 0 || + (*cmp) ((ptr) &bpmcat[i].hlow, *hgh) > 0 + ){ + /* this fragment need not be split */ + } else { +#ifdef _DEBUG_BPM_ + stream_printf(GDKout, "break fragment %d\n",i); +#endif + /* break the fragment */ + } + } + BBPunfix(b->batCacheid); + } *ret= 0; - throw(MAL, "bpm.range","NYI"); + return MAL_SUCCEED; } +@- +The vector approach could either use a single column +of ordered split points, or a double column with (low,hgh) +value pairs. (STILL TO BE DECIDED) +We should ensure that the range vector is sorted first. +@c str BPMrangeVector(int *ret, int *bid, int *pv) { - BAT *b; + BAT *b, *bpv; + BUN p,q; + /* ptr ph=0; */ + BPMopen(); @:getBATdescriptor(bid,b,"bpm.range")@ - /* determine the partitioning scheme */ - (void) b; - (void) bid; - (void) pv; - *ret= 0; - throw(MAL, "bpm.range","NYI"); + if( b == 0) + throw(MAL,"bpm.range","Cannot access BAT"); + + @:getBATdescriptor(pv,bpv,"bpm.range")@ + if( bpv == 0){ + BBPunfix(b->batCacheid); + throw(MAL,"bpm.range","Cannot access BAT"); + } + /* Apply the range partition using pairs */ + BATloop(b,p,q){ + ptr ph= BUNhead(b,p); + ptr pt= BUNtail(b,p); + BPMrange(ret, bid, &ph, &pt); + } + /* Apply the range partition using single column + BATloop(b,p,q){ + if( ph){ + ph= BUNtail(b,p); + } else { + ptr pt= BUNtail(b,p); + BPMrange(ret, bid,&ph,&pt); + ph= BUNtail(b,p); + } + } + */ + + BBPunfix(b->batCacheid); + BBPunfix(bpv->batCacheid); + return MAL_SUCCEED; } str BPMhash(int *ret, int *bid, int *slots, int *prime) @@ -654,77 +741,140 @@ str BPMtake(int *ret, str *nme) { + int i; BPMopen(); - (void) nme; - *ret =0; - throw(MAL, "bpm.take","NYI"); + i= getPartitionName(*nme); + if( i== 0){ + throw(MAL,"pbm.take","Partitioned BAT does not exist"); + } + *ret = bpmcat[i].alias; + return MAL_SUCCEED; } str BPMtakePartition(int *ret, int *bid, int *index) { + BAT *b; + int i; + BPMopen(); - (void) bid; - (void) index; - *ret =0; - throw(MAL, "bpm.take","NYI"); + i= getPartitionIndex(*bid, *index); + if( i== 0){ + throw(MAL,"pbm.take","Partitioned BAT does not exist"); + } + @:getBATdescriptor(bid,b,"bpm.take")@ + if( b == 0){ + throw(MAL,"bpm.take","Cannot access BAT"); + } + BBPkeepref(*ret =b->batCacheid); + return MAL_SUCCEED; } @- Updates +The update operations simply loop through the partitions +and apply the results. We can optimize this by inspection +of the range tables, but that is considered relevant in a distributed +case. +The updates are merely convenient operators, because ideally +the optimizer already filters out the partitions of interest. @c str BPMinsert(int *ret, int *bid, int *ins) { - BAT *b; + int i; + str msg= MAL_SUCCEED; + Partition p; + BPMopen(); - @:getBATdescriptor(bid,b,"bpm.insert")@ + p= getAlias(*bid); + if( p== 0) + throw(MAL,"bpm.insert","Can not alias BAT"); /* distributed the content */ - (void) b; - (void) ins; + for(i= p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){ + msg=BKCinsert_bat(ret,&bpmcat[i].bid,ins); + } *ret= 0; - throw(MAL, "bpm.insert","NYI"); + return msg; }
str BPMdelete(int *ret, int *bid, int *del) { - BAT *b; + int i; + Partition p; + str msg= MAL_SUCCEED; + BPMopen(); - @:getBATdescriptor(bid,b,"bpm.delete")@ - /* delete some information from the partitions */ - (void) b; - (void) del; + p= getAlias(*bid); + if( p== 0) + throw(MAL,"bpm.insert","Can not alias BAT"); + /* distributed the content */ + for(i= p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){ + msg=BKCdelete_bat(ret,&bpmcat[i].bid,del); + } *ret= 0; - throw(MAL, "bpm.delete","NYI"); + return msg; }
str BPMreplace(int *ret, int *bid, int *ins, int *del) { - BAT *b; + int i; + Partition p; + str msg= MAL_SUCCEED; + BPMopen(); - @:getBATdescriptor(bid,b,"bpm.insert")@ - /* replace values */ - (void) b; - (void) ins; - (void) del; + p= getAlias(*bid); + if( p== 0) + throw(MAL,"bpm.insert","Can not alias BAT"); + /* distributed the content */ + for( i=p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){ + msg=BKCdelete_bat(ret,&bpmcat[i].bid,del); It should be msg=BKCdelete_bat(ret,&bpmcat[i].pid,del);
Right?
+ if( msg == MAL_SUCCEED) + msg=BKCinsert_bat(ret,&bpmcat[i].bid,ins); It should be : msg=BKCinsert_bat(ret,&bpmcat[i].pid,ins); Rigth? + } *ret= 0; - throw(MAL, "bpm.replace","NYI"); + return msg; }
str BPMgetNames(int *ret) { + BAT *b; + int i; + BPMopen(); - *ret =0; - throw(MAL, "bpm.getNames","NYI"); + b = BATnew(TYPE_int, TYPE_str, BBPsize); + if (b == 0) + throw(MAL, "catalog.bbpNames", "failed to create BAT"); + + for(i=1; i
batCacheid); + if (!(b->batDirty&2)) + b = BATsetaccess(b, BAT_READ); + return MAL_SUCCEED; } str BPMdiscard(int *ret, int *bid) { + Partition p; + int i; + signed char r; + str msg= MAL_SUCCEED; + BPMopen(); - (void) bid; + p= getAlias(*bid); + if( p== 0) + throw(MAL,"bpm.discard","Can not alias BAT"); + /* discard the content */ + for(i= p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){ + msg= BKCdestroy(&r, &bpmcat[i].bid); It should be: + msg= BKCdestroy(&ret, &bpmcat[i].bid); Right?
If I am right I will fix and commit it. These differences created compilation errors in windows. Regards, Romulo