LCOV - code coverage report
Current view: top level - monetdb5/modules/mal - orderidx.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 52 258 20.2 %
Date: 2024-11-15 19:37:45 Functions: 4 8 50.0 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /*
      14             :  * (c) Martin Kersten, Lefteris Sidirourgos
      15             :  * Implement a parallel sort-merge MAL program generator
      16             :  */
      17             : #include "monetdb_config.h"
      18             : #include "orderidx.h"
      19             : #include "gdk.h"
      20             : #include "mal_exception.h"
      21             : #include "mal_function.h"
      22             : #include "opt_prelude.h"
      23             : 
      24             : #define MIN_PIECE       ((BUN) 1000)    /* TODO use realistic size in production */
      25             : 
      26             : str
      27           0 : OIDXdropImplementation(Client cntxt, BAT *b)
      28             : {
      29           0 :         (void) cntxt;
      30           0 :         OIDXdestroy(b);
      31           0 :         return MAL_SUCCEED;
      32             : }
      33             : 
      34             : str
      35          83 : OIDXcreateImplementation(Client cntxt, int tpe, BAT *b, int pieces)
      36             : {
      37          83 :         int i, loopvar, arg;
      38          83 :         BUN cnt, step = 0, o;
      39          83 :         MalBlkPtr smb;
      40          83 :         MalStkPtr newstk;
      41          83 :         Symbol snew = NULL;
      42          83 :         InstrPtr q, pack;
      43          83 :         char name[IDLENGTH];
      44          83 :         str msg = MAL_SUCCEED;
      45             : 
      46          83 :         if (BATcount(b) <= 1)
      47             :                 return MAL_SUCCEED;
      48             : 
      49             :         /* check if b is sorted, then index does not make sense */
      50          74 :         if (b->tsorted || b->trevsorted)
      51             :                 return MAL_SUCCEED;
      52             : 
      53             :         /* check if b already has index */
      54          74 :         if (BATcheckorderidx(b))
      55             :                 return MAL_SUCCEED;
      56             : 
      57         132 :         switch (ATOMbasetype(b->ttype)) {
      58             :         case TYPE_void:
      59             :                 /* trivially supported */
      60             :                 return MAL_SUCCEED;
      61          53 :         case TYPE_bte:
      62             :         case TYPE_sht:
      63             :         case TYPE_int:
      64             :         case TYPE_lng:
      65             : #ifdef HAVE_HGE
      66             :         case TYPE_hge:
      67             : #endif
      68             :         case TYPE_flt:
      69             :         case TYPE_dbl:
      70          53 :                 if (GDKnr_threads > 1 && BATcount(b) >= 2 * MIN_PIECE
      71           0 :                         && (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) == 0)
      72             :                         break;
      73             :                 /* fall through */
      74             :         default:
      75          73 :                 if (BATorderidx(b, true) != GDK_SUCCEED)
      76           0 :                         throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
      77             :                 return MAL_SUCCEED;
      78             :         }
      79             : 
      80           0 :         if (pieces <= 0) {
      81           0 :                 if (GDKnr_threads <= 1) {
      82             :                         pieces = 1;
      83           0 :                 } else if (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK) {
      84             :                         /* we want many pieces, even tiny ones */
      85           0 :                         if (BATcount(b) < 4)
      86             :                                 pieces = 1;
      87           0 :                         else if (BATcount(b) / 2 < (BUN) GDKnr_threads)
      88           0 :                                 pieces = (int) (BATcount(b) / 2);
      89             :                         else
      90             :                                 pieces = GDKnr_threads;
      91             :                 } else {
      92           0 :                         if (BATcount(b) < 2 * MIN_PIECE)
      93             :                                 pieces = 1;
      94           0 :                         else if (BATcount(b) / MIN_PIECE < (BUN) GDKnr_threads)
      95           0 :                                 pieces = (int) (BATcount(b) / MIN_PIECE);
      96             :                         else
      97             :                                 pieces = GDKnr_threads;
      98             :                 }
      99           0 :         } else if (BATcount(b) < (BUN) pieces || BATcount(b) < MIN_PIECE) {
     100           0 :                 pieces = 1;
     101             :         }
     102             : 
     103             :         /* create a temporary MAL function to sort the BAT in parallel */
     104           0 :         snprintf(name, IDLENGTH, "sort%d", rand() % 1000);
     105           0 :         snew = newFunction(putName("user"), putName(name), FUNCTIONsymbol);
     106           0 :         if (snew == NULL) {
     107           0 :                 throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     108             :         }
     109           0 :         smb = snew->def;
     110           0 :         q = getInstrPtr(smb, 0);
     111           0 :         if ((arg = newTmpVariable(smb, tpe)) < 0) {
     112           0 :                 msg = createException(MAL, "bat.orderidx",
     113             :                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     114           0 :                 goto bailout;
     115             :         }
     116           0 :         q = pushArgument(smb, q, arg);
     117           0 :         if ((getArg(q, 0) = newTmpVariable(smb, TYPE_void)) < 0) {
     118           0 :                 msg = createException(MAL, "bat.orderidx",
     119             :                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     120           0 :                 goto bailout;
     121             :         }
     122             : 
     123           0 :         if (resizeMalBlk(smb, 2 * pieces + 10) < 0)
     124           0 :                 goto bailout;                   // large enough
     125             :         /* create the pack instruction first, as it will hold
     126             :          * intermediate variables */
     127           0 :         pack = newInstruction(0, putName("bat"), putName("orderidx"));
     128           0 :         if (pack == NULL || (pack->argv[0] = newTmpVariable(smb, TYPE_void)) < 0) {
     129           0 :                 freeInstruction(pack);
     130           0 :                 msg = createException(MAL, "bat.orderidx",
     131             :                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     132           0 :                 goto bailout;
     133             :         }
     134           0 :         pack = pushArgument(smb, pack, arg);
     135           0 :         if (smb->errors) {
     136           0 :                 freeInstruction(pack);
     137           0 :                 msg = smb->errors;
     138           0 :                 smb->errors = NULL;
     139           0 :                 goto bailout;
     140             :         }
     141           0 :         setVarFixed(smb, getArg(pack, 0));
     142             : 
     143             :         /* the costly part executed as a parallel block */
     144           0 :         if ((loopvar = newTmpVariable(smb, TYPE_bit)) < 0) {
     145           0 :                 freeInstruction(pack);
     146           0 :                 msg = createException(MAL, "bat.orderidx",
     147             :                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     148           0 :                 goto bailout;
     149             :         }
     150           0 :         q = newStmt(smb, putName("language"), putName("dataflow"));
     151           0 :         if (q == NULL) {
     152           0 :                 freeInstruction(pack);
     153           0 :                 msg = createException(MAL, "bat.orderidx",
     154             :                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     155           0 :                 goto bailout;
     156             :         }
     157           0 :         q->barrier = BARRIERsymbol;
     158           0 :         q->argv[0] = loopvar;
     159           0 :         pushInstruction(smb, q);
     160             : 
     161           0 :         cnt = BATcount(b);
     162           0 :         step = cnt / pieces;
     163           0 :         o = 0;
     164           0 :         for (i = 0; smb->errors == NULL && i < pieces; i++) {
     165             :                 /* add slice instruction */
     166           0 :                 q = newInstruction(smb, algebraRef, putName("slice"));
     167           0 :                 if (q == NULL || (setDestVar(q, newTmpVariable(smb, TYPE_any))) < 0) {
     168           0 :                         freeInstruction(q);
     169           0 :                         freeInstruction(pack);
     170           0 :                         msg = createException(MAL, "bat.orderidx",
     171             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     172           0 :                         goto bailout;
     173             :                 }
     174           0 :                 setVarType(smb, getArg(q, 0), tpe);
     175           0 :                 setVarFixed(smb, getArg(q, 0));
     176           0 :                 q = pushArgument(smb, q, arg);
     177           0 :                 pack = pushArgument(smb, pack, getArg(q, 0));
     178           0 :                 q = pushOid(smb, q, o);
     179           0 :                 if (i == pieces - 1) {
     180             :                         o = cnt;
     181             :                 } else {
     182           0 :                         o += step;
     183             :                 }
     184           0 :                 q = pushOid(smb, q, o - 1);
     185           0 :                 pushInstruction(smb, q);
     186             :         }
     187           0 :         for (i = 0; smb->errors == NULL && i < pieces; i++) {
     188             :                 /* add sort instruction */
     189           0 :                 q = newInstruction(smb, algebraRef, putName("orderidx"));
     190           0 :                 if (q == NULL || (setDestVar(q, newTmpVariable(smb, TYPE_any))) < 0) {
     191           0 :                         freeInstruction(q);
     192           0 :                         freeInstruction(pack);
     193           0 :                         msg = createException(MAL, "bat.orderidx",
     194             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     195           0 :                         goto bailout;
     196             :                 }
     197           0 :                 setVarType(smb, getArg(q, 0), tpe);
     198           0 :                 setVarFixed(smb, getArg(q, 0));
     199           0 :                 q = pushArgument(smb, q, pack->argv[2 + i]);
     200           0 :                 q = pushBit(smb, q, 1);
     201           0 :                 pack->argv[2 + i] = getArg(q, 0);
     202           0 :                 pushInstruction(smb, q);
     203             :         }
     204             :         /* finalize OID packing, check, and evaluate */
     205           0 :         pushInstruction(smb, pack);
     206           0 :         q = newAssignment(smb);
     207           0 :         if (q == NULL) {
     208           0 :                 msg = createException(MAL, "bat.orderidx",
     209             :                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     210           0 :                 goto bailout;
     211             :         }
     212           0 :         q->barrier = EXITsymbol;
     213           0 :         q->argv[0] = loopvar;
     214           0 :         pushInstruction(smb, q);
     215           0 :         pushEndInstruction(smb);
     216           0 :         if (smb->errors)
     217           0 :                 goto bailout;
     218           0 :         msg = chkProgram(cntxt->usermodule, smb);
     219           0 :         if (msg)
     220           0 :                 goto bailout;
     221             :         /* evaluate MAL block and keep the ordered OID bat */
     222           0 :         newstk = prepareMALstack(smb, smb->vsize);
     223           0 :         if (newstk == NULL) {
     224           0 :                 msg = createException(MAL, "bat.orderidx",
     225             :                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     226           0 :                 goto bailout;
     227             :         }
     228           0 :         newstk->up = 0;
     229           0 :         newstk->stk[arg].vtype = b->ttype;
     230           0 :         newstk->stk[arg].bat = true;
     231           0 :         newstk->stk[arg].val.bval = b->batCacheid;
     232           0 :         BBPretain(newstk->stk[arg].val.bval);
     233           0 :         msg = runMALsequence(cntxt, smb, 1, 0, newstk, 0, 0);
     234           0 :         freeStack(newstk);
     235             :         /* get rid of temporary MAL block */
     236           0 :   bailout:
     237           0 :         if (msg == MAL_SUCCEED && smb->errors) {
     238           0 :                 msg = smb->errors;
     239           0 :                 smb->errors = NULL;
     240             :         }
     241           0 :         freeSymbol(snew);
     242           0 :         return msg;
     243             : }
     244             : 
     245             : static str
     246           6 : OIDXcreate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     247             : {
     248           6 :         BAT *b;
     249           6 :         str msg = MAL_SUCCEED;
     250           6 :         int pieces = -1;
     251             : 
     252           6 :         if (pci->argc == 3) {
     253           6 :                 pieces = stk->stk[pci->argv[2]].val.ival;
     254           6 :                 if (pieces < 0)
     255           0 :                         throw(MAL, "bat.orderidx", "Positive number expected");
     256             :         }
     257             : 
     258           6 :         b = BATdescriptor(*getArgReference_bat(stk, pci, 1));
     259           6 :         if (b == NULL)
     260           0 :                 throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     261           6 :         msg = OIDXcreateImplementation(cntxt, getArgType(mb, pci, 1), b, pieces);
     262           6 :         BBPunfix(b->batCacheid);
     263           6 :         return msg;
     264             : }
     265             : 
     266             : static str
     267           0 : OIDXhasorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     268             : {
     269           0 :         BAT *b;
     270           0 :         bit *ret = getArgReference_bit(stk, pci, 0);
     271           0 :         bat bid = *getArgReference_bat(stk, pci, 1);
     272             : 
     273           0 :         (void) cntxt;
     274           0 :         (void) mb;
     275             : 
     276           0 :         b = BATdescriptor(bid);
     277           0 :         if (b == NULL)
     278           0 :                 throw(MAL, "bat.hasorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     279             : 
     280           0 :         *ret = b->torderidx != NULL;
     281             : 
     282           0 :         BBPunfix(b->batCacheid);
     283           0 :         return MAL_SUCCEED;
     284             : }
     285             : 
     286             : static str
     287           6 : OIDXgetorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     288             : {
     289           6 :         BAT *b;
     290           6 :         BAT *bn;
     291           6 :         bat *ret = getArgReference_bat(stk, pci, 0);
     292           6 :         bat bid = *getArgReference_bat(stk, pci, 1);
     293             : 
     294           6 :         (void) cntxt;
     295           6 :         (void) mb;
     296             : 
     297           6 :         b = BATdescriptor(bid);
     298           6 :         if (b == NULL)
     299           0 :                 throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     300             : 
     301           6 :         if (!BATcheckorderidx(b)) {
     302           0 :                 BBPunfix(b->batCacheid);
     303           0 :                 throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     304             :         }
     305             : 
     306           6 :         if ((bn = COLnew(0, TYPE_oid, BATcount(b), TRANSIENT)) == NULL) {
     307           0 :                 BBPunfix(b->batCacheid);
     308           0 :                 throw(MAL, "bat.getorderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     309             :         }
     310           6 :         memcpy(Tloc(bn, 0), (const oid *) b->torderidx->base + ORDERIDXOFF,
     311           6 :                    BATcount(b) * SIZEOF_OID);
     312           6 :         BATsetcount(bn, BATcount(b));
     313           6 :         bn->tkey = true;
     314           6 :         bn->tsorted = bn->trevsorted = BATcount(b) <= 1;
     315           6 :         bn->tnil = false;
     316           6 :         bn->tnonil = true;
     317           6 :         *ret = bn->batCacheid;
     318           6 :         BBPkeepref(bn);
     319           6 :         BBPunfix(b->batCacheid);
     320           6 :         return MAL_SUCCEED;
     321             : }
     322             : 
     323             : static str
     324           0 : OIDXorderidx(bat *ret, const bat *bid, const bit *stable)
     325             : {
     326           0 :         BAT *b;
     327             : 
     328           0 :         (void) ret;
     329           0 :         b = BATdescriptor(*bid);
     330           0 :         if (b == NULL)
     331           0 :                 throw(MAL, "algebra.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     332             : 
     333           0 :         if (BATorderidx(b, *stable) != GDK_SUCCEED) {
     334           0 :                 BBPunfix(*bid);
     335           0 :                 throw(MAL, "algebra.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     336             :         }
     337           0 :         *ret = *bid;
     338           0 :         BBPkeepref(b);
     339           0 :         return MAL_SUCCEED;
     340             : }
     341             : 
     342             : /*
     343             :  * Merge the collection of sorted OID BATs into one
     344             :  */
     345             : 
     346             : static str
     347           0 : OIDXmerge(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     348             : {
     349           0 :         bat bid;
     350           0 :         BAT *b;
     351           0 :         BAT **a;
     352           0 :         int i, j, n_ar;
     353           0 :         BUN m_sz;
     354           0 :         gdk_return rc;
     355             : 
     356           0 :         (void) cntxt;
     357           0 :         (void) mb;
     358             : 
     359           0 :         if (pci->retc != 1)
     360           0 :                 throw(MAL, "bat.orderidx",
     361             :                           SQLSTATE(HY002) "INTERNAL ERROR, retc != 1 ");
     362           0 :         if (pci->argc < 2)
     363           0 :                 throw(MAL, "bat.orderidx", SQLSTATE(HY002) "INTERNAL ERROR, argc != 2");
     364             : 
     365           0 :         n_ar = pci->argc - 2;
     366             : 
     367           0 :         bid = *getArgReference_bat(stk, pci, 1);
     368           0 :         b = BATdescriptor(bid);
     369           0 :         if (b == NULL)
     370           0 :                 throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     371             : 
     372           0 :         if (b->torderidx) {
     373           0 :                 BBPunfix(bid);
     374           0 :                 throw(MAL, "bat.orderidx",
     375             :                           SQLSTATE(HY002) "INTERNAL ERROR, torderidx already set");
     376             :         }
     377             : 
     378           0 :         switch (ATOMbasetype(b->ttype)) {
     379             :         case TYPE_bte:
     380             :         case TYPE_sht:
     381             :         case TYPE_int:
     382             :         case TYPE_lng:
     383             : #ifdef HAVE_HGE
     384             :         case TYPE_hge:
     385             : #endif
     386             :         case TYPE_flt:
     387             :         case TYPE_dbl:
     388           0 :                 break;
     389           0 :         case TYPE_str:
     390             :                 /* TODO: support strings etc. */
     391             :         case TYPE_void:
     392             :         case TYPE_ptr:
     393             :         default:
     394           0 :                 BBPunfix(bid);
     395           0 :                 throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
     396             :         }
     397             : 
     398           0 :         if ((a = (BAT **) GDKmalloc(n_ar * sizeof(BAT *))) == NULL) {
     399           0 :                 BBPunfix(bid);
     400           0 :                 throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     401             :         }
     402             :         m_sz = 0;
     403           0 :         for (i = 0; i < n_ar; i++) {
     404           0 :                 a[i] = BATdescriptor(*getArgReference_bat(stk, pci, i + 2));
     405           0 :                 if (a[i] == NULL) {
     406           0 :                         for (j = i - 1; j >= 0; j--) {
     407           0 :                                 BBPunfix(a[j]->batCacheid);
     408             :                         }
     409           0 :                         GDKfree(a);
     410           0 :                         BBPunfix(bid);
     411           0 :                         throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     412             :                 }
     413           0 :                 m_sz += BATcount(a[i]);
     414           0 :                 if (BATcount(a[i]) == 0) {
     415           0 :                         BBPunfix(a[i]->batCacheid);
     416           0 :                         a[i] = NULL;
     417             :                 }
     418             :         }
     419           0 :         assert(m_sz == BATcount(b));
     420           0 :         for (i = 0; i < n_ar; i++) {
     421           0 :                 if (a[i] == NULL) {
     422           0 :                         n_ar--;
     423           0 :                         if (i < n_ar)
     424           0 :                                 a[i] = a[n_ar];
     425           0 :                         i--;
     426             :                 }
     427             :         }
     428           0 :         if (m_sz != BATcount(b)) {
     429             :                 BBPunfix(bid);
     430             :                 for (i = 0; i < n_ar; i++)
     431             :                         BBPunfix(a[i]->batCacheid);
     432             :                 GDKfree(a);
     433             :                 throw(MAL, "bat.orderidx", "count mismatch");
     434             :         }
     435             : 
     436           0 :         rc = GDKmergeidx(b, a, n_ar);
     437             : 
     438           0 :         for (i = 0; i < n_ar; i++)
     439           0 :                 BBPunfix(a[i]->batCacheid);
     440           0 :         GDKfree(a);
     441           0 :         BBPunfix(bid);
     442             : 
     443           0 :         if (rc != GDK_SUCCEED)
     444           0 :                 throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     445             : 
     446             :         return MAL_SUCCEED;
     447             : }
     448             : 
     449             : #include "mel.h"
     450             : mel_func orderidx_init_funcs[] = {
     451             :  pattern("bat", "orderidx", OIDXcreate, false, "Introduces the OID index arrangement of ordered values", args(1,2, arg("",void),batargany("bv",1))),
     452             :  pattern("bat", "orderidx", OIDXcreate, false, "Introduces the OID index arrangement of ordered values", args(1,3, arg("",void),batargany("bv",1),arg("pieces",int))),
     453             :  pattern("bat", "orderidx", OIDXmerge, false, "Consolidates the OID index arrangement", args(1,3, arg("",void),batargany("bv",1),batvarargany("l",1))),
     454             :  pattern("bat", "hasorderidx", OIDXhasorderidx, false, "Return true if order index exists", args(1,2, arg("",bit),batargany("bv",1))),
     455             :  pattern("bat", "getorderidx", OIDXgetorderidx, false, "Return the order index if it exists", args(1,2, batarg("",oid),batargany("bv",1))),
     456             :  command("algebra", "orderidx", OIDXorderidx, false, "Create an order index", args(1,3, batargany("",1),batargany("bv",1),arg("stable",bit))),
     457             :  { .imp=NULL }
     458             : };
     459             : #include "mal_import.h"
     460             : #ifdef _MSC_VER
     461             : #undef read
     462             : #pragma section(".CRT$XCU",read)
     463             : #endif
     464         308 : LIB_STARTUP_FUNC(init_orderidx_mal)
     465         308 : { mal_module("orderidx", NULL, orderidx_init_funcs); }

Generated by: LCOV version 1.14