LCOV - code coverage report
Current view: top level - monetdb5/optimizer - opt_mitosis.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 166 196 84.7 %
Date: 2024-12-19 23:10:26 Functions: 1 1 100.0 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "opt_mitosis.h"
      15             : #include "mal_interpreter.h"
      16             : #include "gdk_utils.h"
      17             : 
      18             : #define MIN_PART_SIZE 100000    /* minimal record count per partition */
      19             : #define MAX_PARTS2THREADS_RATIO 4       /* There should be at most this multiple more of partitions then threads */
      20             : 
      21             : 
      22             : str
      23      367461 : OPTmitosisImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
      24             :                                                  InstrPtr pci)
      25             : {
      26      367461 :         int i, j, limit, slimit, estimate = 0, pieces = 1, mito_parts = 0,
      27      367461 :                 mito_size = 0, row_size = 0, mt = -1, nr_cols = 0, nr_aggrs = 0,
      28      367461 :                 nr_maps = 0;
      29      367461 :         str schema = 0, table = 0;
      30      367461 :         BUN r = 0, rowcnt = 0;          /* table should be sizeable to consider parallel execution */
      31      367461 :         InstrPtr p, q, *old, target = 0;
      32      367461 :         size_t argsize = 6 * sizeof(lng), m = 0;
      33             :         /*       estimate size per operator estimate:   4 args + 2 res */
      34      367461 :         int threads = GDKnr_threads ? GDKnr_threads : 1, maxparts = MAXSLICES;
      35      367461 :         str msg = MAL_SUCCEED;
      36             : 
      37             :         /* if the user has associated limitation on the number of threads, respect it in the
      38             :          * generation of the number of partitions. Beware, they may lead to larger pieces, it only
      39             :          * limits the CPU power */
      40      367461 :         if (cntxt->workerlimit)
      41           0 :                 threads = cntxt->workerlimit;
      42      367461 :         (void) cntxt;
      43      367461 :         (void) stk;
      44             : 
      45      367461 :         old = mb->stmt;
      46    14911887 :         for (i = 1; i < mb->stop; i++) {
      47    14547042 :                 InstrPtr p = old[i];
      48             : 
      49    14547042 :                 if (getModuleId(p) == sqlRef && getFunctionId(p) == assertRef
      50        9212 :                         && p->argc > 2 && getArgType(mb, p, 2) == TYPE_str
      51        9212 :                         && isVarConstant(mb, getArg(p, 2))
      52        9212 :                         && getVarConstant(mb, getArg(p, 2)).val.sval != NULL
      53        9212 :                         &&
      54        9212 :                         (strstr(getVarConstant(mb, getArg(p, 2)).val.sval,
      55             :                                         "PRIMARY KEY constraint")
      56        8005 :                          || strstr(getVarConstant(mb, getArg(p, 2)).val.sval,
      57             :                                            "UNIQUE constraint"))) {
      58        1275 :                         pieces = 0;
      59        1275 :                         goto bailout;
      60             :                 }
      61             : 
      62             :                 /* mitosis/mergetable bailout conditions */
      63             :                 /* Crude protection against self join explosion */
      64    14545767 :                 if (p->retc == 2 && isMatJoinOp(p))
      65    14545766 :                         maxparts = threads;
      66             : 
      67    14545766 :                 nr_aggrs += (p->argc > 2 && getModuleId(p) == aggrRef);
      68    14545766 :                 nr_maps += (isMapOp(p));
      69             : 
      70    14545782 :                 if ((getModuleId(p) == algebraRef &&
      71     1802934 :                     getFunctionId(p) == groupedfirstnRef) ||
      72    14545777 :                     (p->argc > 2 && getModuleId(p) == aggrRef
      73        7090 :                         && getFunctionId(p) != subcountRef && getFunctionId(p) != subminRef
      74        3074 :                         && getFunctionId(p) != submaxRef && getFunctionId(p) != subavgRef
      75        2567 :                         && getFunctionId(p) != subsumRef && getFunctionId(p) != subprodRef
      76         945 :                         && getFunctionId(p) != countRef && getFunctionId(p) != minRef
      77         325 :                         && getFunctionId(p) != maxRef && getFunctionId(p) != avgRef
      78         273 :                         && getFunctionId(p) != sumRef && getFunctionId(p) != prodRef)) {
      79         278 :                         pieces = 0;
      80         278 :                         goto bailout;
      81             :                 }
      82             : 
      83             :                 /* rtree functions should not be optimized by mitosis
      84             :                  * (single-threaded execution) */
      85    14545504 :                 if (getModuleId(p) == rtreeRef) {
      86           0 :                         pieces = 0;
      87           0 :                         goto bailout;
      88             :                 }
      89             : 
      90             :                 /* do not split up floating point bat that is being summed */
      91    14545504 :                 if (p->retc == 1 &&
      92    13893850 :                         (((p->argc == 5 || p->argc == 6)
      93      970773 :                           && getModuleId(p) == aggrRef
      94        6049 :                           && getFunctionId(p) == subsumRef)
      95    13892232 :                          || (p->argc == 4
      96      601818 :                                  && getModuleId(p) == aggrRef
      97           0 :                                  && getFunctionId(p) == sumRef))
      98        1618 :                         && isaBatType(getArgType(mb, p, p->retc))
      99        1618 :                         && (getBatType(getArgType(mb, p, p->retc)) == TYPE_flt
     100             :                                 || getBatType(getArgType(mb, p, p->retc)) == TYPE_dbl)) {
     101          14 :                         pieces = 0;
     102          14 :                         goto bailout;
     103             :                 }
     104             : 
     105    14545490 :                 if (p->argc > 2
     106     7873873 :                         && (getModuleId(p) == capiRef || getModuleId(p) == rapiRef
     107     7873842 :                                 || getModuleId(p) == pyapi3Ref)
     108         143 :                         && getFunctionId(p) == subeval_aggrRef) {
     109          26 :                         pieces = 0;
     110          26 :                         goto bailout;
     111             :                 }
     112             : 
     113             :                 /* Mergetable cannot handle intersect/except's for now */
     114    14545464 :                 if (getModuleId(p) == algebraRef && getFunctionId(p) == groupbyRef) {
     115        1038 :                         pieces = 0;
     116        1038 :                         goto bailout;
     117             :                 }
     118             : 
     119             :                 /* locate the largest non-partitioned table */
     120    14544426 :                 if (getModuleId(p) != sqlRef
     121     1196330 :                         || (getFunctionId(p) != bindRef && getFunctionId(p) != bindidxRef
     122      671235 :                                 && getFunctionId(p) != tidRef))
     123    13887358 :                         continue;
     124             :                 /* don't split insert BATs */
     125      657068 :                 if (p->argc > 5 && getVarConstant(mb, getArg(p, 5)).val.ival == 1)
     126           0 :                         continue;
     127      657068 :                 if (p->argc > 6)
     128      145125 :                         continue;                       /* already partitioned */
     129             :                 /*
     130             :                  * The SQL optimizer already collects the counts of the base
     131             :                  * table and passes them on as a row property.  All pieces for a
     132             :                  * single subplan should ideally fit together.
     133             :                  */
     134      511943 :                 r = getRowCnt(mb, getArg(p, 0));
     135      511943 :                 if (r == rowcnt)
     136      259421 :                         nr_cols++;
     137      511943 :                 if (r > rowcnt) {
     138             :                         /* the rowsize depends on the column types, assume void-headed */
     139       51109 :                         row_size = ATOMsize(getBatType(getArgType(mb, p, 0)));
     140       51109 :                         rowcnt = r;
     141       51109 :                         nr_cols = 1;
     142       51109 :                         target = p;
     143       51109 :                         estimate++;
     144       51109 :                         r = 0;
     145             :                 }
     146             :         }
     147      364845 :         if (target == 0) {
     148      316910 :                 pieces = 0;
     149      316910 :                 goto bailout;
     150             :         }
     151             :         /*
     152             :          * The number of pieces should be based on the footprint of the
     153             :          * query plan, such that preferably it can be handled without
     154             :          * swapping intermediates.  For the time being we just go for pieces
     155             :          * that fit into memory in isolation.  A fictive rowcount is derived
     156             :          * based on argument types, such that all pieces would fit into
     157             :          * memory conveniently for processing. We attempt to use not more
     158             :          * threads than strictly needed.
     159             :          * Experience shows that the pieces should not be too small.
     160             :          * If we should limit to |threads| is still an open issue.
     161             :          *
     162             :          * Take into account the number of client connections,
     163             :          * because all user together are responsible for resource contentions
     164             :          */
     165       47935 :         MT_lock_set(&mal_contextLock);
     166       47950 :         cntxt->idle = 0;                     // this one is definitely not idle
     167       47950 :         MT_lock_unset(&mal_contextLock);
     168             : 
     169             :         /* improve memory usage estimation */
     170       47950 :         if (nr_cols > 1 || nr_aggrs > 1 || nr_maps > 1)
     171       47849 :                 argsize = (nr_cols + nr_aggrs + nr_maps) * sizeof(lng);
     172             :         /* We haven't assigned the number of pieces.
     173             :          * Determine the memory available for this client
     174             :          */
     175             : 
     176             :         /* respect the memory limit size set for the user
     177             :          * and determine the column part size
     178             :          */
     179       47950 :         m = GDK_mem_maxsize / MCactiveClients();        /* use temporarily */
     180       47950 :         if (cntxt->memorylimit > 0 && (size_t) cntxt->memorylimit << 20 < m)
     181           0 :                 m = ((size_t) cntxt->memorylimit << 20) / argsize;
     182       47950 :         else if (cntxt->maxmem > 0 && cntxt->maxmem < (lng) m)
     183           0 :                 m = (size_t) (cntxt->maxmem / argsize);
     184             :         else
     185       47950 :                 m = m / argsize;
     186             : 
     187             :         /* if data exceeds memory size,
     188             :          * i.e., (rowcnt*argsize > GDK_mem_maxsize),
     189             :          * i.e., (rowcnt > GDK_mem_maxsize/argsize = m) */
     190       47950 :         if (rowcnt > m && m / threads > 0) {
     191             :                 /* create |pieces| > |threads| partitions such that
     192             :                  * |threads| partitions at a time fit in memory,
     193             :                  * i.e., (threads*(rowcnt/pieces) <= m),
     194             :                  * i.e., (rowcnt/pieces <= m/threads),
     195             :                  * i.e., (pieces => rowcnt/(m/threads))
     196             :                  * (assuming that (m > threads*MIN_PART_SIZE)) */
     197             :                 /* the number of pieces affects SF-100, going beyond 8x increases
     198             :                  * the optimizer costs beyond the execution time
     199             :                  */
     200           0 :                 pieces = ((int) ceil((double) rowcnt / (m / threads)));
     201           0 :                 if (pieces <= threads)
     202             :                         pieces = threads;
     203       47950 :         } else if (rowcnt > MIN_PART_SIZE) {
     204             :                 /* exploit parallelism, but ensure minimal partition size to
     205             :                  * limit overhead */
     206         189 :                 pieces = MIN((int) ceil((double) rowcnt / MIN_PART_SIZE),
     207             :                                          MAX_PARTS2THREADS_RATIO * threads);
     208             :         }
     209             : 
     210             :         /* when testing, always aim for full parallelism, but avoid
     211             :          * empty pieces */
     212       47950 :         FORCEMITODEBUG if (pieces < threads)
     213       47689 :                  pieces = (int) MIN((BUN) threads, rowcnt);
     214             :         /* prevent plan explosion */
     215       47950 :         if (pieces > maxparts)
     216             :                 pieces = maxparts;
     217             :         /* to enable experimentation we introduce the option to set
     218             :          * the number of partitions required and/or the size of each chunk (in K)
     219             :          */
     220       47950 :         mito_parts = GDKgetenv_int("mito_parts", 0);
     221       47950 :         if (mito_parts > 0)
     222           0 :                 pieces = mito_parts;
     223       47950 :         mito_size = GDKgetenv_int("mito_size", 0);
     224       47950 :         if (mito_size > 0)
     225           0 :                 pieces = (int) ((rowcnt * row_size) / (mito_size * 1024));
     226             : 
     227       47950 :         if (pieces <= 1) {
     228         399 :                 pieces = 0;
     229         399 :                 goto bailout;
     230             :         }
     231             : 
     232             :         /* at this stage we have identified the #chunks to be used for the largest table */
     233       47551 :         limit = mb->stop;
     234       47551 :         slimit = mb->ssize;
     235       47551 :         if (newMalBlkStmt(mb, mb->stop + 2 * estimate) < 0)
     236           0 :                 throw(MAL, "optimizer.mitosis", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     237       47551 :         estimate = 0;
     238             : 
     239       47551 :         schema = getVarConstant(mb, getArg(target, 2)).val.sval;
     240       47551 :         table = getVarConstant(mb, getArg(target, 3)).val.sval;
     241     4402419 :         for (i = 0; mb->errors == NULL && i < limit; i++) {
     242     4354873 :                 int upd = 0, qtpe, rtpe = 0, qv, rv;
     243     4354873 :                 InstrPtr matq, matr = NULL;
     244     4354873 :                 p = old[i];
     245             : 
     246     4354873 :                 if (getModuleId(p) != sqlRef
     247      925915 :                         || !(getFunctionId(p) == bindRef || getFunctionId(p) == bindidxRef
     248             :                                  || getFunctionId(p) == tidRef)) {
     249     3741729 :                         pushInstruction(mb, p);
     250     3741716 :                         continue;
     251             :                 }
     252             :                 /* don't split insert BATs */
     253      613144 :                 if (p->argc == 6 && getVarConstant(mb, getArg(p, 5)).val.ival == 1) {
     254           0 :                         pushInstruction(mb, p);
     255           0 :                         continue;
     256             :                 }
     257      613144 :                 r = getRowCnt(mb, getArg(p, 0));
     258      613144 :                 if (r < rowcnt) {
     259      315901 :                         pushInstruction(mb, p);
     260      315901 :                         continue;
     261             :                 }
     262             :                 /* Don't split the (index) bat if we already have identified a range */
     263             :                 /* This will happen if we inline separately optimized routines */
     264      297243 :                 if (p->argc > 7) {
     265           0 :                         pushInstruction(mb, p);
     266           0 :                         continue;
     267             :                 }
     268      297243 :                 if (p->retc == 2)
     269       63240 :                         upd = 1;
     270      297731 :                 if (mt < 0
     271      297243 :                         && (strcmp(schema, getVarConstant(mb, getArg(p, 2 + upd)).val.sval)
     272      297243 :                                 || strcmp(table,
     273      297243 :                                                   getVarConstant(mb, getArg(p, 3 + upd)).val.sval))) {
     274         486 :                         pushInstruction(mb, p);
     275         488 :                         continue;
     276             :                 }
     277             :                 /* we keep the original bind operation, because it allows for
     278             :                  * easy undo when the mergtable can not do something */
     279             :                 // pushInstruction(mb, p);
     280             : 
     281      296757 :                 qtpe = getVarType(mb, getArg(p, 0));
     282             : 
     283      296757 :                 matq = newInstructionArgs(NULL, matRef, newRef, pieces + 1);
     284      296747 :                 if (matq == NULL) {
     285           0 :                         msg = createException(MAL, "optimizer.mitosis",
     286             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     287           0 :                         break;
     288             :                 }
     289      296747 :                 getArg(matq, 0) = getArg(p, 0);
     290             : 
     291      296747 :                 if (upd) {
     292       63240 :                         matr = newInstructionArgs(NULL, matRef, newRef, pieces + 1);
     293       63240 :                         if (matr == NULL) {
     294           0 :                                 freeInstruction(matq);
     295           0 :                                 msg = createException(MAL, "optimizer.mitosis",
     296             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     297           0 :                                 break;
     298             :                         }
     299       63240 :                         getArg(matr, 0) = getArg(p, 1);
     300       63240 :                         rtpe = getVarType(mb, getArg(p, 1));
     301             :                 }
     302             : 
     303     2531086 :                 for (j = 0; j < pieces; j++) {
     304     2234333 :                         q = copyInstruction(p);
     305     2234539 :                         if (q == NULL) {
     306           0 :                                 freeInstruction(matr);
     307           0 :                                 freeInstruction(matq);
     308           0 :                                 for (; i < limit; i++)
     309           0 :                                         if (old[i])
     310           0 :                                                 pushInstruction(mb, old[i]);
     311           0 :                                 GDKfree(old);
     312           0 :                                 throw(MAL, "optimizer.mitosis",
     313             :                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     314             :                         }
     315     2234539 :                         q = pushInt(mb, q, j);
     316     2233469 :                         q = pushInt(mb, q, pieces);
     317             : 
     318     2233806 :                         qv = getArg(q, 0) = newTmpVariable(mb, qtpe);
     319     2234210 :                         if (upd) {
     320      464711 :                                 rv = getArg(q, 1) = newTmpVariable(mb, rtpe);
     321             :                         }
     322     2234210 :                         pushInstruction(mb, q);
     323     2234267 :                         matq = pushArgument(mb, matq, qv);
     324     2234339 :                         if (upd)
     325      464711 :                                 matr = pushArgument(mb, matr, rv);
     326             :                 }
     327      296753 :                 pushInstruction(mb, matq);
     328      296744 :                 if (upd)
     329       63240 :                         pushInstruction(mb, matr);
     330      296744 :                 freeInstruction(p);
     331             :         }
     332     9507189 :         for (; i < slimit; i++)
     333     9459639 :                 if (old[i])
     334           0 :                         pushInstruction(mb, old[i]);
     335       47550 :         GDKfree(old);
     336             : 
     337             :         /* Defense line against incorrect plans */
     338       47551 :         if (msg == MAL_SUCCEED) {
     339       47551 :                 msg = chkTypes(cntxt->usermodule, mb, FALSE);
     340       47549 :                 if (msg == MAL_SUCCEED) {
     341       47549 :                         msg = chkFlow(mb);
     342       47546 :                         if (msg == MAL_SUCCEED)
     343       47548 :                                 msg = chkDeclarations(mb);
     344             :                 }
     345             :         }
     346           0 :   bailout:
     347             :         /* keep actions taken as a fake argument */
     348      367488 :         (void) pushInt(mb, pci, pieces);
     349      367488 :         return msg;
     350             : }

Generated by: LCOV version 1.14