Martin Kersten wrote:
> Update of /cvsroot/monetdb/MonetDB5/src/modules/mal
> In directory sc8-pr-cvs7.sourceforge.net:/tmp/cvs-serv29523
>
> Modified Files:
> bpm.mx
> Log Message:
> 70% of the code needed to handle partitioned BATs in a single server.
> The code is not tested yet (except for compilation errors), but should
> give a head start for those in-experienced in working with MonetDB source code.
>
>
> Index: bpm.mx
> ===================================================================
> RCS file: /cvsroot/monetdb/MonetDB5/src/modules/mal/bpm.mx,v
> retrieving revision 1.23
> retrieving revision 1.24
> diff -u -d -r1.23 -r1.24
> --- bpm.mx 4 Feb 2007 22:55:32 -0000 1.23
> +++ bpm.mx 6 Feb 2007 07:16:13 -0000 1.24
> @@ -229,7 +229,7 @@
> address BPMdestroy
> comment "Destroy the BAT partition box";
>
> -command deposit(nme:str,b:bat[:oid,:any_2]) :void
> +command deposit(nme:str,b:bat[:oid,:any_2]) :bat[:oid,:any_2]
> address BPMdeposit
> comment "Create a new partitioned BAT by name";
>
> @@ -238,36 +238,36 @@
> If the alias BAT denotes an existing partition, it is
> further broken into pieces.
> @mal
> -command rangePartition(pb:bat[:oid,:any_2],
> - ll:oid, lh:oid, rl:any_2, rh:any_2):void
> +command rangePartition(pb:bat[:any_1,:any_2], rl:any_2, rh:any_2):void
> address BPMrange
> comment "Create a range partition on a BAT";
> -command rangePartition(pb:bat[:oid,:any_2], pv:bat[:oid,:any_2]):void
> +command rangePartition(pb:bat[:any_1,:any_2], pv:bat[:oid,:any_2]):void
> address BPMrangeVector
> comment "Create the partitions based on a range vector";
>
> -command hashPartition(pb:bat[:oid,:any_2], slots:int):void
> +command hashPartition(pb:bat[:any_1,:any_2], slots:int):void
> address BPMhash
> comment "Create a hash partition on a BAT";
> -command derivePartition(pb:bat[:oid,:any_2], src:bat[:oid,:any_2]):bat[:oid,:any_2]
> +command derivePartition(pb:bat[:any_1,:any_2],
> + src:bat[:any_1,:any_2]):bat[:any_1,:any_2]
> address BPMderived
> comment "Create a derived fragmentation over the head using src.";
>
> -command take(pb:str, b:bat[:oid,:any_2]):bat[:oid,:any_2]
> +command take(pb:str, b:bat[:any_1,:any_2]):bat[:any_1,:any_2]
> address BPMtake
> comment "Retrieve the alias for a partitioned BAT";
> -command take(alias:bat[:oid,:any_2],index:int) :bat[:oid,:any_2]
> +command take(alias:bat[:any_1,:any_2],index:int) :bat[:any_1,:any_2]
> address BPMtakePartition
> comment "Retrieve a single component of a partitioned BAT by index";
>
> -command insert(pb:bat[:oid,:any_2],b:bat[:oid,:any_2]) :void
> +command insert(pb:bat[:any_1,:any_2],b:bat[:any_1,:any_2]) :void
> address BPMinsert
> comment "Insert elements into the BAT partitions";
> -command delete(pb:bat[:oid,:any_2],b:bat[:oid,:any_2]) :void
> +command delete(pb:bat[:any_1,:any_2],b:bat[:any_1,:any_2]) :void
> address BPMdelete
> comment "Delete elements from the BAT partitions";
> -command replace(pb:bat[:oid,:any_2],old:bat[:oid,:any_2],
> - nwe:bat[:oid,:any_2]) :void
> +command replace(pb:bat[:any_1,:any_2],old:bat[:any_1,:any_2],
> + nwe:bat[:any_1,:any_2]) :void
> address BPMreplace
> comment "Replace the content of the BAT partitions";
>
> @@ -275,7 +275,7 @@
> address BPMgetNames
> comment "Retrieve the names of all known partitioned BATs";
>
> -command discard(alias:bat[:oid,:any_2]) :void
> +command discard(alias:bat[:any_1,:any_2]) :void
> address BPMdiscard
> comment "Release a partitioned BAT from the box";
>
> @@ -285,39 +285,39 @@
> Wherever possible skipping elements that don't qualify
> the bounds given for the head.
> @mal
> -command newIterator(grp:bat[:oid,:any_2]):bat[:oid,:any_2]
> +command newIterator(grp:bat[:any_1,:any_2]):bat[:any_1,:any_2]
> address BPMnewIterator
> comment "Create an iterator over the BAT partitions.";
>
> -command newIterator(grp:bat[:oid,:any_2],first:any_2,last:any_2)
> - :bat[:oid,:any_2]
> +command newIterator(grp:bat[:any_1,:any_2],first:any_2,last:any_2)
> + :bat[:any_1,:any_2]
> address BPMnewIteratorRng
> comment "Create an iterator over the BAT partitions.";
>
> -command newIterator(pb:bat[:oid,:any_2], first:oid,last:oid,
> +command newIterator(pb:bat[:any_1,:any_2], first:any_1,last:any_1,
> vlow:any_2, vhgh:any_2) :bat[:oid,:any_2]
> address BPMnewIteratorRng4
> comment "Create an iterator over the BAT partitions.";
>
> -command hasMoreElements(grp:bat[:oid,:any_2]) :bat[:oid,:any_2]
> +command hasMoreElements(grp:bat[:any_1,:any_2]) :bat[:any_1,:any_2]
> address BPMhasMoreElements
> comment "Localize the next partition for processing.";
>
> -command hasMoreElements(pb:bat[:oid,:any_2],
> - low:any_2, hgh:any_2) :bat[:oid,:any_2]
> +command hasMoreElements(pb:bat[:any_1,:any_2],
> + low:any_2, hgh:any_2) :bat[:any_1,:any_2]
> address BPMhasMoreElementsRng2
> comment "Localize the next partition for processing.";
> -command hasMoreElements(pb:bat[:oid,:any_2], first:oid,last:oid,
> - vlow:any_2, vhgh:any_2) :bat[:oid,:any_2]
> +command hasMoreElements(pb:bat[:any_1,:any_2], first:any_1,last:any_1,
> + vlow:any_2, vhgh:any_2) :bat[:any_1,:any_2]
> address BPMhasMoreElementsRng4
> comment "Localize the next partition for processing.";
>
> -command getDimension(b:bat[:oid,:any_2])(first:oid,last:oid,
> +command getDimension(b:bat[:any_1,:any_2])(first:any_1,last:any_1,
> vlow:any_2, vhgh:any_2)
> address BPMgetDimension
> comment "Obtain the partition boundary values.";
>
> -command dump(alias:bat[:oid,:any_2])
> +command dump(alias:bat[:any_1,:any_2])
> address BPMdumpAlias
> comment "Give the details of the partition tree";
> command dump()
> @@ -403,6 +403,7 @@
> @c
> #include "mal_config.h"
> #include "bpm.h"
> +#include "bat5.h"
>
> @-
> Every partition is the result of at most four operations:
> @@ -430,7 +431,6 @@
> int nxt, prv; /* list of all partitions*/
> } *Partition, PartitionRec;
>
> -/* NOT USED YET
> static void BPMprintRecord(stream *f, Partition p){
> stream_printf(f,"partition: %s alias %d bid %d index %d ",
> p->name, p->alias, p->bid, p->index);
> @@ -438,7 +438,6 @@
> stream_printf(f,"hbucket %d tbucket %d ", p->hbucket,p->tbucket);
> stream_printf(f,"nxt %d prv %d\n", p->nxt,p->prv);
> }
> -*/
> @-
> The number of partitioned BATs is considered low and
> a straight forward array with linear search seems
> @@ -470,7 +469,6 @@
> }
> return 0;
> }
> -/* NOT USED YET
> static Partition getPartition(int bid, int idx){
> return bpmcat+getPartitionIndex(bid,idx);
> }
> @@ -478,6 +476,7 @@
> return getPartition(bid,0);
> }
>
> +/* NOT USED YET
> static void delAlias(int bid){
> int i;
> i= getPartitionIndex(bid,0);
> @@ -595,33 +594,121 @@
>
> BBPkeepref(*ret= bn->batCacheid);
> BBPunfix(b->batCacheid);
> - throw(MAL, "bpm.deposit","NYI");
> + return MAL_SUCCEED;
> }
> -
> +@-
> +Range partitioning simply runs through all partitions
> +and creates new ones, keeping the partitions in order.
> +The 'critical' part is to detect overlap of the range
> +and the bounds of the partition.
> +@c
> str
> BPMrange(int *ret, int *bid, ptr *low, ptr *hgh){
> BAT *b;
> + int i, tpe;
> + int (*cmp) (ptr, ptr);
> + ptr nilptr;
> + int low_nil, hgh_nil;
> + Partition p;
> +
> BPMopen();
> - @:getBATdescriptor(bid,b,"bpm.range")@
> - /* determine the partitioning scheme */
> - (void) b;
> - (void) low;
> - (void) hgh;
> +
> + p= getAlias(*bid);
> + if( p==0){
> + throw(MAL, "bpm.range","Partition not known");
> + }
> + i= p->index;
> + @:getBATdescriptor(&bpmcat[i].bid,b,"bpm.range")@
> + if( b == 0)
> + throw(MAL,"bpm.range","Could not access BAT");
> +
> +
> + /* get the comparison function */
> + tpe= *bid >0? b->htype:b->ttype;
> + cmp= BATatoms[tpe].atomCmp;
> + nilptr = ATOMnilptr(tpe);
> + low_nil = ((*cmp) (*low, nilptr) == 0);
> + hgh_nil = ((*cmp) (*hgh, nilptr) == 0);
> +
> + BBPunfix(b->batCacheid); /* don't need it anymore */
> +
> + for( ; i ; i= bpmcat[i].nxt){
> + @:getBATdescriptor(&bpmcat[i].bid,b,"bpm.range")@
> +
> + /* determine overlag by excluding outliers */
> + if( *bid > 0 ){
> + if( (*cmp) ((ptr) &bpmcat[i].thgh, *low) < 0 ||
> + (*cmp) ((ptr) &bpmcat[i].tlow, *hgh) > 0
> + ){
> + /* this fragment need not be split */
> + } else {
> + /* break the fragment */
> +#ifdef _DEBUG_BPM_
> + stream_printf(GDKout, "break fragment %d\n",i);
> +#endif
> + }
> + } else {
> + /* use reversed bat */
> + if( (*cmp) ((ptr) &bpmcat[i].hhgh, *low) < 0 ||
> + (*cmp) ((ptr) &bpmcat[i].hlow, *hgh) > 0
> + ){
> + /* this fragment need not be split */
> + } else {
> +#ifdef _DEBUG_BPM_
> + stream_printf(GDKout, "break fragment %d\n",i);
> +#endif
> + /* break the fragment */
> + }
> + }
> + BBPunfix(b->batCacheid);
> + }
> *ret= 0;
> - throw(MAL, "bpm.range","NYI");
> + return MAL_SUCCEED;
> }
> +@-
> +The vector approach could either use a single column
> +of ordered split points, or a double column with (low,hgh)
> +value pairs. (STILL TO BE DECIDED)
> +We should ensure that the range vector is sorted first.
> +@c
> str
> BPMrangeVector(int *ret, int *bid, int *pv)
> {
> - BAT *b;
> + BAT *b, *bpv;
> + BUN p,q;
> + /* ptr ph=0; */
> +
> BPMopen();
> @:getBATdescriptor(bid,b,"bpm.range")@
> - /* determine the partitioning scheme */
> - (void) b;
> - (void) bid;
> - (void) pv;
> - *ret= 0;
> - throw(MAL, "bpm.range","NYI");
> + if( b == 0)
> + throw(MAL,"bpm.range","Cannot access BAT");
> +
> + @:getBATdescriptor(pv,bpv,"bpm.range")@
> + if( bpv == 0){
> + BBPunfix(b->batCacheid);
> + throw(MAL,"bpm.range","Cannot access BAT");
> + }
> + /* Apply the range partition using pairs */
> + BATloop(b,p,q){
> + ptr ph= BUNhead(b,p);
> + ptr pt= BUNtail(b,p);
> + BPMrange(ret, bid, &ph, &pt);
> + }
> + /* Apply the range partition using single column
> + BATloop(b,p,q){
> + if( ph){
> + ph= BUNtail(b,p);
> + } else {
> + ptr pt= BUNtail(b,p);
> + BPMrange(ret, bid,&ph,&pt);
> + ph= BUNtail(b,p);
> + }
> + }
> + */
> +
> + BBPunfix(b->batCacheid);
> + BBPunfix(bpv->batCacheid);
> + return MAL_SUCCEED;
> }
> str
> BPMhash(int *ret, int *bid, int *slots, int *prime)
> @@ -654,77 +741,140 @@
> str
> BPMtake(int *ret, str *nme)
> {
> + int i;
> BPMopen();
> - (void) nme;
> - *ret =0;
> - throw(MAL, "bpm.take","NYI");
> + i= getPartitionName(*nme);
> + if( i== 0){
> + throw(MAL,"pbm.take","Partitioned BAT does not exist");
> + }
> + *ret = bpmcat[i].alias;
> + return MAL_SUCCEED;
> }
> str
> BPMtakePartition(int *ret, int *bid, int *index)
> {
> + BAT *b;
> + int i;
> +
> BPMopen();
> - (void) bid;
> - (void) index;
> - *ret =0;
> - throw(MAL, "bpm.take","NYI");
> + i= getPartitionIndex(*bid, *index);
> + if( i== 0){
> + throw(MAL,"pbm.take","Partitioned BAT does not exist");
> + }
> + @:getBATdescriptor(bid,b,"bpm.take")@
> + if( b == 0){
> + throw(MAL,"bpm.take","Cannot access BAT");
> + }
> + BBPkeepref(*ret =b->batCacheid);
> + return MAL_SUCCEED;
> }
> @- Updates
> +The update operations simply loop through the partitions
> +and apply the results. We can optimize this by inspection
> +of the range tables, but that is considered relevant in a distributed
> +case.
> +The updates are merely convenient operators, because ideally
> +the optimizer already filters out the partitions of interest.
> @c
> str
> BPMinsert(int *ret, int *bid, int *ins)
> {
> - BAT *b;
> + int i;
> + str msg= MAL_SUCCEED;
> + Partition p;
> +
> BPMopen();
> - @:getBATdescriptor(bid,b,"bpm.insert")@
> + p= getAlias(*bid);
> + if( p== 0)
> + throw(MAL,"bpm.insert","Can not alias BAT");
> /* distributed the content */
> - (void) b;
> - (void) ins;
> + for(i= p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){
> + msg=BKCinsert_bat(ret,&bpmcat[i].bid,ins);
> + }
> *ret= 0;
> - throw(MAL, "bpm.insert","NYI");
> + return msg;
> }
>
> str
> BPMdelete(int *ret, int *bid, int *del)
> {
> - BAT *b;
> + int i;
> + Partition p;
> + str msg= MAL_SUCCEED;
> +
> BPMopen();
> - @:getBATdescriptor(bid,b,"bpm.delete")@
> - /* delete some information from the partitions */
> - (void) b;
> - (void) del;
> + p= getAlias(*bid);
> + if( p== 0)
> + throw(MAL,"bpm.insert","Can not alias BAT");
> + /* distributed the content */
> + for(i= p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){
> + msg=BKCdelete_bat(ret,&bpmcat[i].bid,del);
> + }
> *ret= 0;
> - throw(MAL, "bpm.delete","NYI");
> + return msg;
> }
>
> str
> BPMreplace(int *ret, int *bid, int *ins, int *del)
> {
> - BAT *b;
> + int i;
> + Partition p;
> + str msg= MAL_SUCCEED;
> +
> BPMopen();
> - @:getBATdescriptor(bid,b,"bpm.insert")@
> - /* replace values */
> - (void) b;
> - (void) ins;
> - (void) del;
> + p= getAlias(*bid);
> + if( p== 0)
> + throw(MAL,"bpm.insert","Can not alias BAT");
> + /* distributed the content */
> + for( i=p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){
> + msg=BKCdelete_bat(ret,&bpmcat[i].bid,del);
It should be
msg=BKCdelete_bat(ret,&bpmcat[i].pid,del);
Right?
> + if( msg == MAL_SUCCEED)
> + msg=BKCinsert_bat(ret,&bpmcat[i].bid,ins);
It should be :
msg=BKCinsert_bat(ret,&bpmcat[i].pid,ins);
Rigth?
> + }
> *ret= 0;
> - throw(MAL, "bpm.replace","NYI");
> + return msg;
> }
>
> str
> BPMgetNames(int *ret)
> {
> + BAT *b;
> + int i;
> +
> BPMopen();
> - *ret =0;
> - throw(MAL, "bpm.getNames","NYI");
> + b = BATnew(TYPE_int, TYPE_str, BBPsize);
> + if (b == 0)
> + throw(MAL, "catalog.bbpNames", "failed to create BAT");
> +
> + for(i=1; i<bpmcattop; i++)
> + BUNins(b, &i, bpmcat[i].name,TRUE);
> + BBPkeepref(*ret = b->batCacheid);
> + if (!(b->batDirty&2))
> + b = BATsetaccess(b, BAT_READ);
> + return MAL_SUCCEED;
> }
>
> str
> BPMdiscard(int *ret, int *bid)
> {
> + Partition p;
> + int i;
> + signed char r;
> + str msg= MAL_SUCCEED;
> +
> BPMopen();
> - (void) bid;
> + p= getAlias(*bid);
> + if( p== 0)
> + throw(MAL,"bpm.discard","Can not alias BAT");
> + /* discard the content */
> + for(i= p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){
> + msg= BKCdestroy(&r, &bpmcat[i].bid);
It should be:
> + msg= BKCdestroy(&ret, &bpmcat[i].bid);
Right?
If I am right I will fix and commit it. These differences created
compilation errors in windows.
Regards,
Romulo