LCOV - code coverage report
Current view: top level - monetdb5/optimizer - opt_mitosis.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 165 195 84.6 %
Date: 2024-12-20 20:06:10 Functions: 1 1 100.0 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "opt_mitosis.h"
      15             : #include "mal_interpreter.h"
      16             : #include "gdk_utils.h"
      17             : 
      18             : #define MIN_PART_SIZE 100000    /* minimal record count per partition */
      19             : #define MAX_PARTS2THREADS_RATIO 4       /* There should be at most this multiple more of partitions then threads */
      20             : 
      21             : 
      22             : str
      23      364553 : OPTmitosisImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
      24             :                                                  InstrPtr pci)
      25             : {
      26      364553 :         int i, j, limit, slimit, estimate = 0, pieces = 1, mito_parts = 0,
      27      364553 :                 mito_size = 0, row_size = 0, mt = -1, nr_cols = 0, nr_aggrs = 0,
      28      364553 :                 nr_maps = 0;
      29      364553 :         str schema = 0, table = 0;
      30      364553 :         BUN r = 0, rowcnt = 0;          /* table should be sizeable to consider parallel execution */
      31      364553 :         InstrPtr p, q, *old, target = 0;
      32      364553 :         size_t argsize = 6 * sizeof(lng), m = 0;
      33             :         /*       estimate size per operator estimate:   4 args + 2 res */
      34      364553 :         int threads = GDKnr_threads ? GDKnr_threads : 1, maxparts = MAXSLICES;
      35      364553 :         str msg = MAL_SUCCEED;
      36             : 
      37             :         /* if the user has associated limitation on the number of threads, respect it in the
      38             :          * generation of the number of partitions. Beware, they may lead to larger pieces, it only
      39             :          * limits the CPU power */
      40      364553 :         if (cntxt->workerlimit)
      41           0 :                 threads = cntxt->workerlimit;
      42      364553 :         (void) cntxt;
      43      364553 :         (void) stk;
      44             : 
      45      364553 :         old = mb->stmt;
      46    14796404 :         for (i = 1; i < mb->stop; i++) {
      47    14434469 :                 InstrPtr p = old[i];
      48             : 
      49    14434469 :                 if (getModuleId(p) == sqlRef && getFunctionId(p) == assertRef
      50        9210 :                         && p->argc > 2 && getArgType(mb, p, 2) == TYPE_str
      51        9210 :                         && isVarConstant(mb, getArg(p, 2))
      52        9210 :                         && getVarConstant(mb, getArg(p, 2)).val.sval != NULL
      53        9210 :                         &&
      54        9210 :                         (strstr(getVarConstant(mb, getArg(p, 2)).val.sval,
      55             :                                         "PRIMARY KEY constraint")
      56        8004 :                          || strstr(getVarConstant(mb, getArg(p, 2)).val.sval,
      57             :                                            "UNIQUE constraint"))) {
      58        1274 :                         pieces = 0;
      59        1274 :                         goto bailout;
      60             :                 }
      61             : 
      62             :                 /* mitosis/mergetable bailout conditions */
      63             :                 /* Crude protection against self join explosion */
      64    14433195 :                 if (p->retc == 2 && isMatJoinOp(p))
      65    14433195 :                         maxparts = threads;
      66             : 
      67    14433195 :                 nr_aggrs += (p->argc > 2 && getModuleId(p) == aggrRef);
      68    14433195 :                 nr_maps += (isMapOp(p));
      69             : 
      70    14433188 :                 if (p->argc > 2 && getModuleId(p) == aggrRef
      71        7094 :                         && getFunctionId(p) != subcountRef && getFunctionId(p) != subminRef
      72        3078 :                         && getFunctionId(p) != submaxRef && getFunctionId(p) != subavgRef
      73        2557 :                         && getFunctionId(p) != subsumRef && getFunctionId(p) != subprodRef
      74         931 :                         && getFunctionId(p) != countRef && getFunctionId(p) != minRef
      75         311 :                         && getFunctionId(p) != maxRef && getFunctionId(p) != avgRef
      76         259 :                         && getFunctionId(p) != sumRef && getFunctionId(p) != prodRef) {
      77         259 :                         pieces = 0;
      78         259 :                         goto bailout;
      79             :                 }
      80             : 
      81             :                 /* rtree functions should not be optimized by mitosis
      82             :                  * (single-threaded execution) */
      83    14432929 :                 if (getModuleId(p) == rtreeRef) {
      84           0 :                         pieces = 0;
      85           0 :                         goto bailout;
      86             :                 }
      87             : 
      88             :                 /* do not split up floating point bat that is being summed */
      89    14432929 :                 if (p->retc == 1 &&
      90    13786836 :                         (((p->argc == 5 || p->argc == 6)
      91      953614 :                           && getModuleId(p) == aggrRef
      92        6067 :                           && getFunctionId(p) == subsumRef)
      93    13785214 :                          || (p->argc == 4
      94      598379 :                                  && getModuleId(p) == aggrRef
      95           0 :                                  && getFunctionId(p) == sumRef))
      96        1622 :                         && isaBatType(getArgType(mb, p, p->retc))
      97        1622 :                         && (getBatType(getArgType(mb, p, p->retc)) == TYPE_flt
      98             :                                 || getBatType(getArgType(mb, p, p->retc)) == TYPE_dbl)) {
      99          14 :                         pieces = 0;
     100          14 :                         goto bailout;
     101             :                 }
     102             : 
     103    14432915 :                 if (p->argc > 2
     104     7818804 :                         && (getModuleId(p) == capiRef || getModuleId(p) == rapiRef
     105     7818773 :                                 || getModuleId(p) == pyapi3Ref)
     106         143 :                         && getFunctionId(p) == subeval_aggrRef) {
     107          26 :                         pieces = 0;
     108          26 :                         goto bailout;
     109             :                 }
     110             : 
     111             :                 /* Mergetable cannot handle intersect/except's for now */
     112    14432889 :                 if (getModuleId(p) == algebraRef && getFunctionId(p) == groupbyRef) {
     113        1038 :                         pieces = 0;
     114        1038 :                         goto bailout;
     115             :                 }
     116             : 
     117             :                 /* locate the largest non-partitioned table */
     118    14431851 :                 if (getModuleId(p) != sqlRef
     119     1185450 :                         || (getFunctionId(p) != bindRef && getFunctionId(p) != bindidxRef
     120      664280 :                                 && getFunctionId(p) != tidRef))
     121    13779485 :                         continue;
     122             :                 /* don't split insert BATs */
     123      652366 :                 if (p->argc > 5 && getVarConstant(mb, getArg(p, 5)).val.ival == 1)
     124           0 :                         continue;
     125      652366 :                 if (p->argc > 6)
     126      143026 :                         continue;                       /* already partitioned */
     127             :                 /*
     128             :                  * The SQL optimizer already collects the counts of the base
     129             :                  * table and passes them on as a row property.  All pieces for a
     130             :                  * single subplan should ideally fit together.
     131             :                  */
     132      509340 :                 r = getRowCnt(mb, getArg(p, 0));
     133      509340 :                 if (r == rowcnt)
     134      257475 :                         nr_cols++;
     135      509340 :                 if (r > rowcnt) {
     136             :                         /* the rowsize depends on the column types, assume void-headed */
     137       51005 :                         row_size = ATOMsize(getBatType(getArgType(mb, p, 0)));
     138       51005 :                         rowcnt = r;
     139       51005 :                         nr_cols = 1;
     140       51005 :                         target = p;
     141       51005 :                         estimate++;
     142       51005 :                         r = 0;
     143             :                 }
     144             :         }
     145      361935 :         if (target == 0) {
     146      314054 :                 pieces = 0;
     147      314054 :                 goto bailout;
     148             :         }
     149             :         /*
     150             :          * The number of pieces should be based on the footprint of the
     151             :          * query plan, such that preferably it can be handled without
     152             :          * swapping intermediates.  For the time being we just go for pieces
     153             :          * that fit into memory in isolation.  A fictive rowcount is derived
     154             :          * based on argument types, such that all pieces would fit into
     155             :          * memory conveniently for processing. We attempt to use not more
     156             :          * threads than strictly needed.
     157             :          * Experience shows that the pieces should not be too small.
     158             :          * If we should limit to |threads| is still an open issue.
     159             :          *
     160             :          * Take into account the number of client connections,
     161             :          * because all user together are responsible for resource contentions
     162             :          */
     163       47881 :         MT_lock_set(&mal_contextLock);
     164       47885 :         cntxt->idle = 0;                     // this one is definitely not idle
     165       47885 :         MT_lock_unset(&mal_contextLock);
     166             : 
     167             :         /* improve memory usage estimation */
     168       47885 :         if (nr_cols > 1 || nr_aggrs > 1 || nr_maps > 1)
     169       47784 :                 argsize = (nr_cols + nr_aggrs + nr_maps) * sizeof(lng);
     170             :         /* We haven't assigned the number of pieces.
     171             :          * Determine the memory available for this client
     172             :          */
     173             : 
     174             :         /* respect the memory limit size set for the user
     175             :          * and determine the column part size
     176             :          */
     177       47885 :         m = GDK_mem_maxsize / MCactiveClients();        /* use temporarily */
     178       47885 :         if (cntxt->memorylimit > 0 && (size_t) cntxt->memorylimit << 20 < m)
     179           0 :                 m = ((size_t) cntxt->memorylimit << 20) / argsize;
     180       47885 :         else if (cntxt->maxmem > 0 && cntxt->maxmem < (lng) m)
     181           0 :                 m = (size_t) (cntxt->maxmem / argsize);
     182             :         else
     183       47885 :                 m = m / argsize;
     184             : 
     185             :         /* if data exceeds memory size,
     186             :          * i.e., (rowcnt*argsize > GDK_mem_maxsize),
     187             :          * i.e., (rowcnt > GDK_mem_maxsize/argsize = m) */
     188       47885 :         if (rowcnt > m && m / threads > 0) {
     189             :                 /* create |pieces| > |threads| partitions such that
     190             :                  * |threads| partitions at a time fit in memory,
     191             :                  * i.e., (threads*(rowcnt/pieces) <= m),
     192             :                  * i.e., (rowcnt/pieces <= m/threads),
     193             :                  * i.e., (pieces => rowcnt/(m/threads))
     194             :                  * (assuming that (m > threads*MIN_PART_SIZE)) */
     195             :                 /* the number of pieces affects SF-100, going beyond 8x increases
     196             :                  * the optimizer costs beyond the execution time
     197             :                  */
     198           0 :                 pieces = ((int) ceil((double) rowcnt / (m / threads)));
     199           0 :                 if (pieces <= threads)
     200             :                         pieces = threads;
     201       47885 :         } else if (rowcnt > MIN_PART_SIZE) {
     202             :                 /* exploit parallelism, but ensure minimal partition size to
     203             :                  * limit overhead */
     204         193 :                 pieces = MIN((int) ceil((double) rowcnt / MIN_PART_SIZE),
     205             :                                          MAX_PARTS2THREADS_RATIO * threads);
     206             :         }
     207             : 
     208             :         /* when testing, always aim for full parallelism, but avoid
     209             :          * empty pieces */
     210       47885 :         FORCEMITODEBUG if (pieces < threads)
     211       47617 :                  pieces = (int) MIN((BUN) threads, rowcnt);
     212             :         /* prevent plan explosion */
     213       47885 :         if (pieces > maxparts)
     214             :                 pieces = maxparts;
     215             :         /* to enable experimentation we introduce the option to set
     216             :          * the number of partitions required and/or the size of each chunk (in K)
     217             :          */
     218       47885 :         mito_parts = GDKgetenv_int("mito_parts", 0);
     219       47885 :         if (mito_parts > 0)
     220           0 :                 pieces = mito_parts;
     221       47885 :         mito_size = GDKgetenv_int("mito_size", 0);
     222       47885 :         if (mito_size > 0)
     223           0 :                 pieces = (int) ((rowcnt * row_size) / (mito_size * 1024));
     224             : 
     225       47885 :         if (pieces <= 1) {
     226         387 :                 pieces = 0;
     227         387 :                 goto bailout;
     228             :         }
     229             : 
     230             :         /* at this stage we have identified the #chunks to be used for the largest table */
     231       47498 :         limit = mb->stop;
     232       47498 :         slimit = mb->ssize;
     233       47498 :         if (newMalBlkStmt(mb, mb->stop + 2 * estimate) < 0)
     234           0 :                 throw(MAL, "optimizer.mitosis", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     235       47498 :         estimate = 0;
     236             : 
     237       47498 :         schema = getVarConstant(mb, getArg(target, 2)).val.sval;
     238       47498 :         table = getVarConstant(mb, getArg(target, 3)).val.sval;
     239     4391334 :         for (i = 0; mb->errors == NULL && i < limit; i++) {
     240     4343836 :                 int upd = 0, qtpe, rtpe = 0, qv, rv;
     241     4343836 :                 InstrPtr matq, matr = NULL;
     242     4343836 :                 p = old[i];
     243             : 
     244     4343836 :                 if (getModuleId(p) != sqlRef
     245      922608 :                         || !(getFunctionId(p) == bindRef || getFunctionId(p) == bindidxRef
     246      430977 :                                  || getFunctionId(p) == tidRef)) {
     247     3732171 :                         pushInstruction(mb, p);
     248     3732171 :                         continue;
     249             :                 }
     250             :                 /* don't split insert BATs */
     251      611665 :                 if (p->argc == 6 && getVarConstant(mb, getArg(p, 5)).val.ival == 1) {
     252           0 :                         pushInstruction(mb, p);
     253           0 :                         continue;
     254             :                 }
     255      611665 :                 r = getRowCnt(mb, getArg(p, 0));
     256      611665 :                 if (r < rowcnt) {
     257      315143 :                         pushInstruction(mb, p);
     258      315143 :                         continue;
     259             :                 }
     260             :                 /* Don't split the (index) bat if we already have identified a range */
     261             :                 /* This will happen if we inline separately optimized routines */
     262      296522 :                 if (p->argc > 7) {
     263           0 :                         pushInstruction(mb, p);
     264           0 :                         continue;
     265             :                 }
     266      296522 :                 if (p->retc == 2)
     267       62579 :                         upd = 1;
     268      297010 :                 if (mt < 0
     269      296522 :                         && (strcmp(schema, getVarConstant(mb, getArg(p, 2 + upd)).val.sval)
     270      296522 :                                 || strcmp(table,
     271      296522 :                                                   getVarConstant(mb, getArg(p, 3 + upd)).val.sval))) {
     272         488 :                         pushInstruction(mb, p);
     273         488 :                         continue;
     274             :                 }
     275             :                 /* we keep the original bind operation, because it allows for
     276             :                  * easy undo when the mergtable can not do something */
     277             :                 // pushInstruction(mb, p);
     278             : 
     279      296034 :                 qtpe = getVarType(mb, getArg(p, 0));
     280             : 
     281      296034 :                 matq = newInstructionArgs(NULL, matRef, newRef, pieces + 1);
     282      296028 :                 if (matq == NULL) {
     283           0 :                         msg = createException(MAL, "optimizer.mitosis",
     284             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     285           0 :                         break;
     286             :                 }
     287      296028 :                 getArg(matq, 0) = getArg(p, 0);
     288             : 
     289      296028 :                 if (upd) {
     290       62579 :                         matr = newInstructionArgs(NULL, matRef, newRef, pieces + 1);
     291       62579 :                         if (matr == NULL) {
     292           0 :                                 freeInstruction(matq);
     293           0 :                                 msg = createException(MAL, "optimizer.mitosis",
     294             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     295           0 :                                 break;
     296             :                         }
     297       62579 :                         getArg(matr, 0) = getArg(p, 1);
     298       62579 :                         rtpe = getVarType(mb, getArg(p, 1));
     299             :                 }
     300             : 
     301     1451771 :                 for (j = 0; j < pieces; j++) {
     302     1155758 :                         q = copyInstruction(p);
     303     1155846 :                         if (q == NULL) {
     304           0 :                                 freeInstruction(matr);
     305           0 :                                 freeInstruction(matq);
     306           0 :                                 for (; i < limit; i++)
     307           0 :                                         if (old[i])
     308           0 :                                                 pushInstruction(mb, old[i]);
     309           0 :                                 GDKfree(old);
     310           0 :                                 throw(MAL, "optimizer.mitosis",
     311             :                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     312             :                         }
     313     1155846 :                         q = pushInt(mb, q, j);
     314     1155743 :                         q = pushInt(mb, q, pieces);
     315             : 
     316     1155710 :                         qv = getArg(q, 0) = newTmpVariable(mb, qtpe);
     317     1155839 :                         if (upd) {
     318      240711 :                                 rv = getArg(q, 1) = newTmpVariable(mb, rtpe);
     319             :                         }
     320     1155839 :                         pushInstruction(mb, q);
     321     1155781 :                         matq = pushArgument(mb, matq, qv);
     322     1155743 :                         if (upd)
     323      240711 :                                 matr = pushArgument(mb, matr, rv);
     324             :                 }
     325      296013 :                 pushInstruction(mb, matq);
     326      296011 :                 if (upd)
     327       62579 :                         pushInstruction(mb, matr);
     328      296011 :                 freeInstruction(p);
     329             :         }
     330     9527741 :         for (; i < slimit; i++)
     331     9480243 :                 if (old[i])
     332           0 :                         pushInstruction(mb, old[i]);
     333       47498 :         GDKfree(old);
     334             : 
     335             :         /* Defense line against incorrect plans */
     336       47498 :         if (msg == MAL_SUCCEED) {
     337       47498 :                 msg = chkTypes(cntxt->usermodule, mb, FALSE);
     338       47498 :                 if (msg == MAL_SUCCEED) {
     339       47498 :                         msg = chkFlow(mb);
     340       47498 :                         if (msg == MAL_SUCCEED)
     341       47498 :                                 msg = chkDeclarations(mb);
     342             :                 }
     343             :         }
     344           0 :   bailout:
     345             :         /* keep actions taken as a fake argument */
     346      364549 :         (void) pushInt(mb, pci, pieces);
     347      364549 :         return msg;
     348             : }

Generated by: LCOV version 1.14