LCOV - code coverage report
Current view: top level - monetdb5/optimizer - opt_mitosis.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 165 195 84.6 %
Date: 2024-10-03 20:03:20 Functions: 1 1 100.0 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "opt_mitosis.h"
      15             : #include "mal_interpreter.h"
      16             : #include "gdk_utils.h"
      17             : 
      18             : #define MIN_PART_SIZE 100000    /* minimal record count per partition */
      19             : #define MAX_PARTS2THREADS_RATIO 4       /* There should be at most this multiple more of partitions then threads */
      20             : 
      21             : 
      22             : str
      23      363892 : OPTmitosisImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
      24             :                                                  InstrPtr pci)
      25             : {
      26      363892 :         int i, j, limit, slimit, estimate = 0, pieces = 1, mito_parts = 0,
      27      363892 :                 mito_size = 0, row_size = 0, mt = -1, nr_cols = 0, nr_aggrs = 0,
      28      363892 :                 nr_maps = 0;
      29      363892 :         str schema = 0, table = 0;
      30      363892 :         BUN r = 0, rowcnt = 0;          /* table should be sizeable to consider parallel execution */
      31      363892 :         InstrPtr p, q, *old, target = 0;
      32      363892 :         size_t argsize = 6 * sizeof(lng), m = 0;
      33             :         /*       estimate size per operator estimate:   4 args + 2 res */
      34      363892 :         int threads = GDKnr_threads ? GDKnr_threads : 1, maxparts = MAXSLICES;
      35      363892 :         str msg = MAL_SUCCEED;
      36             : 
      37             :         /* if the user has associated limitation on the number of threads, respect it in the
      38             :          * generation of the number of partitions. Beware, they may lead to larger pieces, it only
      39             :          * limits the CPU power */
      40      363892 :         if (cntxt->workerlimit)
      41           0 :                 threads = cntxt->workerlimit;
      42      363892 :         (void) cntxt;
      43      363892 :         (void) stk;
      44             : 
      45      363892 :         old = mb->stmt;
      46    14749178 :         for (i = 1; i < mb->stop; i++) {
      47    14387903 :                 InstrPtr p = old[i];
      48             : 
      49    14387903 :                 if (getModuleId(p) == sqlRef && getFunctionId(p) == assertRef
      50        9208 :                         && p->argc > 2 && getArgType(mb, p, 2) == TYPE_str
      51        9208 :                         && isVarConstant(mb, getArg(p, 2))
      52        9208 :                         && getVarConstant(mb, getArg(p, 2)).val.sval != NULL
      53        9208 :                         &&
      54        9208 :                         (strstr(getVarConstant(mb, getArg(p, 2)).val.sval,
      55             :                                         "PRIMARY KEY constraint")
      56        8003 :                          || strstr(getVarConstant(mb, getArg(p, 2)).val.sval,
      57             :                                            "UNIQUE constraint"))) {
      58        1273 :                         pieces = 0;
      59        1273 :                         goto bailout;
      60             :                 }
      61             : 
      62             :                 /* mitosis/mergetable bailout conditions */
      63             :                 /* Crude protection against self join explosion */
      64    14386630 :                 if (p->retc == 2 && isMatJoinOp(p))
      65    14386630 :                         maxparts = threads;
      66             : 
      67    14386630 :                 nr_aggrs += (p->argc > 2 && getModuleId(p) == aggrRef);
      68    14386630 :                 nr_maps += (isMapOp(p));
      69             : 
      70    14386623 :                 if (p->argc > 2 && getModuleId(p) == aggrRef
      71        7091 :                         && getFunctionId(p) != subcountRef && getFunctionId(p) != subminRef
      72        3076 :                         && getFunctionId(p) != submaxRef && getFunctionId(p) != subavgRef
      73        2557 :                         && getFunctionId(p) != subsumRef && getFunctionId(p) != subprodRef
      74         931 :                         && getFunctionId(p) != countRef && getFunctionId(p) != minRef
      75         311 :                         && getFunctionId(p) != maxRef && getFunctionId(p) != avgRef
      76         259 :                         && getFunctionId(p) != sumRef && getFunctionId(p) != prodRef) {
      77         259 :                         pieces = 0;
      78         259 :                         goto bailout;
      79             :                 }
      80             : 
      81             :                 /* rtree functions should not be optimized by mitosis
      82             :                  * (single-threaded execution) */
      83    14386364 :                 if (getModuleId(p) == rtreeRef) {
      84           0 :                         pieces = 0;
      85           0 :                         goto bailout;
      86             :                 }
      87             : 
      88             :                 /* do not split up floating point bat that is being summed */
      89    14386364 :                 if (p->retc == 1 &&
      90    13744325 :                         (((p->argc == 5 || p->argc == 6)
      91      947057 :                           && getModuleId(p) == aggrRef
      92        6064 :                           && getFunctionId(p) == subsumRef)
      93    13742703 :                          || (p->argc == 4
      94      593053 :                                  && getModuleId(p) == aggrRef
      95           0 :                                  && getFunctionId(p) == sumRef))
      96        1622 :                         && isaBatType(getArgType(mb, p, p->retc))
      97        1622 :                         && (getBatType(getArgType(mb, p, p->retc)) == TYPE_flt
      98             :                                 || getBatType(getArgType(mb, p, p->retc)) == TYPE_dbl)) {
      99          14 :                         pieces = 0;
     100          14 :                         goto bailout;
     101             :                 }
     102             : 
     103    14386350 :                 if (p->argc > 2
     104     7787439 :                         && (getModuleId(p) == capiRef || getModuleId(p) == rapiRef
     105     7787408 :                                 || getModuleId(p) == pyapi3Ref)
     106         143 :                         && getFunctionId(p) == subeval_aggrRef) {
     107          26 :                         pieces = 0;
     108          26 :                         goto bailout;
     109             :                 }
     110             : 
     111             :                 /* Mergetable cannot handle intersect/except's for now */
     112    14386324 :                 if (getModuleId(p) == algebraRef && getFunctionId(p) == groupbyRef) {
     113        1038 :                         pieces = 0;
     114        1038 :                         goto bailout;
     115             :                 }
     116             : 
     117             :                 /* locate the largest non-partitioned table */
     118    14385286 :                 if (getModuleId(p) != sqlRef
     119     1173775 :                         || (getFunctionId(p) != bindRef && getFunctionId(p) != bindidxRef
     120      658066 :                                 && getFunctionId(p) != tidRef))
     121    13739398 :                         continue;
     122             :                 /* don't split insert BATs */
     123      645888 :                 if (p->argc > 5 && getVarConstant(mb, getArg(p, 5)).val.ival == 1)
     124           0 :                         continue;
     125      645888 :                 if (p->argc > 6)
     126      140355 :                         continue;                       /* already partitioned */
     127             :                 /*
     128             :                  * The SQL optimizer already collects the counts of the base
     129             :                  * table and passes them on as a row property.  All pieces for a
     130             :                  * single subplan should ideally fit together.
     131             :                  */
     132      505533 :                 r = getRowCnt(mb, getArg(p, 0));
     133      505533 :                 if (r == rowcnt)
     134      234010 :                         nr_cols++;
     135      505533 :                 if (r > rowcnt) {
     136             :                         /* the rowsize depends on the column types, assume void-headed */
     137       72060 :                         row_size = ATOMsize(getBatType(getArgType(mb, p, 0)));
     138       72060 :                         rowcnt = r;
     139       72060 :                         nr_cols = 1;
     140       72060 :                         target = p;
     141       72060 :                         estimate++;
     142       72060 :                         r = 0;
     143             :                 }
     144             :         }
     145      361275 :         if (target == 0) {
     146      313712 :                 pieces = 0;
     147      313712 :                 goto bailout;
     148             :         }
     149             :         /*
     150             :          * The number of pieces should be based on the footprint of the
     151             :          * query plan, such that preferably it can be handled without
     152             :          * swapping intermediates.  For the time being we just go for pieces
     153             :          * that fit into memory in isolation.  A fictive rowcount is derived
     154             :          * based on argument types, such that all pieces would fit into
     155             :          * memory conveniently for processing. We attempt to use not more
     156             :          * threads than strictly needed.
     157             :          * Experience shows that the pieces should not be too small.
     158             :          * If we should limit to |threads| is still an open issue.
     159             :          *
     160             :          * Take into account the number of client connections,
     161             :          * because all user together are responsible for resource contentions
     162             :          */
     163       47563 :         MT_lock_set(&mal_contextLock);
     164       47566 :         cntxt->idle = 0;                     // this one is definitely not idle
     165       47566 :         MT_lock_unset(&mal_contextLock);
     166             : 
     167             :         /* improve memory usage estimation */
     168       47566 :         if (nr_cols > 1 || nr_aggrs > 1 || nr_maps > 1)
     169       46230 :                 argsize = (nr_cols + nr_aggrs + nr_maps) * sizeof(lng);
     170             :         /* We haven't assigned the number of pieces.
     171             :          * Determine the memory available for this client
     172             :          */
     173             : 
     174             :         /* respect the memory limit size set for the user
     175             :          * and determine the column part size
     176             :          */
     177       47566 :         m = GDK_mem_maxsize / MCactiveClients();        /* use temporarily */
     178       47566 :         if (cntxt->memorylimit > 0 && (size_t) cntxt->memorylimit << 20 < m)
     179           0 :                 m = ((size_t) cntxt->memorylimit << 20) / argsize;
     180       47566 :         else if (cntxt->maxmem > 0 && cntxt->maxmem < (lng) m)
     181           0 :                 m = (size_t) (cntxt->maxmem / argsize);
     182             :         else
     183       47566 :                 m = m / argsize;
     184             : 
     185             :         /* if data exceeds memory size,
     186             :          * i.e., (rowcnt*argsize > GDK_mem_maxsize),
     187             :          * i.e., (rowcnt > GDK_mem_maxsize/argsize = m) */
     188       47566 :         if (rowcnt > m && m / threads > 0) {
     189             :                 /* create |pieces| > |threads| partitions such that
     190             :                  * |threads| partitions at a time fit in memory,
     191             :                  * i.e., (threads*(rowcnt/pieces) <= m),
     192             :                  * i.e., (rowcnt/pieces <= m/threads),
     193             :                  * i.e., (pieces => rowcnt/(m/threads))
     194             :                  * (assuming that (m > threads*MIN_PART_SIZE)) */
     195             :                 /* the number of pieces affects SF-100, going beyond 8x increases
     196             :                  * the optimizer costs beyond the execution time
     197             :                  */
     198           0 :                 pieces = ((int) ceil((double) rowcnt / (m / threads)));
     199           0 :                 if (pieces <= threads)
     200             :                         pieces = threads;
     201       47566 :         } else if (rowcnt > MIN_PART_SIZE) {
     202             :                 /* exploit parallelism, but ensure minimal partition size to
     203             :                  * limit overhead */
     204         194 :                 pieces = MIN((int) ceil((double) rowcnt / MIN_PART_SIZE),
     205             :                                          MAX_PARTS2THREADS_RATIO * threads);
     206             :         }
     207             : 
     208             :         /* when testing, always aim for full parallelism, but avoid
     209             :          * empty pieces */
     210       47566 :         FORCEMITODEBUG if (pieces < threads)
     211       47292 :                  pieces = (int) MIN((BUN) threads, rowcnt);
     212             :         /* prevent plan explosion */
     213       47566 :         if (pieces > maxparts)
     214             :                 pieces = maxparts;
     215             :         /* to enable experimentation we introduce the option to set
     216             :          * the number of partitions required and/or the size of each chunk (in K)
     217             :          */
     218       47566 :         mito_parts = GDKgetenv_int("mito_parts", 0);
     219       47566 :         if (mito_parts > 0)
     220           0 :                 pieces = mito_parts;
     221       47566 :         mito_size = GDKgetenv_int("mito_size", 0);
     222       47566 :         if (mito_size > 0)
     223           0 :                 pieces = (int) ((rowcnt * row_size) / (mito_size * 1024));
     224             : 
     225       47566 :         if (pieces <= 1) {
     226         380 :                 pieces = 0;
     227         380 :                 goto bailout;
     228             :         }
     229             : 
     230             :         /* at this stage we have identified the #chunks to be used for the largest table */
     231       47186 :         limit = mb->stop;
     232       47186 :         slimit = mb->ssize;
     233       47186 :         if (newMalBlkStmt(mb, mb->stop + 2 * estimate) < 0)
     234           0 :                 throw(MAL, "optimizer.mitosis", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     235       47186 :         estimate = 0;
     236             : 
     237       47186 :         schema = getVarConstant(mb, getArg(target, 2)).val.sval;
     238       47186 :         table = getVarConstant(mb, getArg(target, 3)).val.sval;
     239     4359964 :         for (i = 0; mb->errors == NULL && i < limit; i++) {
     240     4312780 :                 int upd = 0, qtpe, rtpe = 0, qv, rv;
     241     4312780 :                 InstrPtr matq, matr = NULL;
     242     4312780 :                 p = old[i];
     243             : 
     244     4312780 :                 if (getModuleId(p) != sqlRef
     245      911101 :                         || !(getFunctionId(p) == bindRef || getFunctionId(p) == bindidxRef
     246      424911 :                                  || getFunctionId(p) == tidRef)) {
     247     3707559 :                         pushInstruction(mb, p);
     248     3707561 :                         continue;
     249             :                 }
     250             :                 /* don't split insert BATs */
     251      605221 :                 if (p->argc == 6 && getVarConstant(mb, getArg(p, 5)).val.ival == 1) {
     252           0 :                         pushInstruction(mb, p);
     253           0 :                         continue;
     254             :                 }
     255      605221 :                 r = getRowCnt(mb, getArg(p, 0));
     256      605221 :                 if (r < rowcnt) {
     257      333196 :                         pushInstruction(mb, p);
     258      333196 :                         continue;
     259             :                 }
     260             :                 /* Don't split the (index) bat if we already have identified a range */
     261             :                 /* This will happen if we inline separately optimized routines */
     262      272025 :                 if (p->argc > 7) {
     263           0 :                         pushInstruction(mb, p);
     264           0 :                         continue;
     265             :                 }
     266      272025 :                 if (p->retc == 2)
     267       61581 :                         upd = 1;
     268      272510 :                 if (mt < 0
     269      272025 :                         && (strcmp(schema, getVarConstant(mb, getArg(p, 2 + upd)).val.sval)
     270      272025 :                                 || strcmp(table,
     271      272025 :                                                   getVarConstant(mb, getArg(p, 3 + upd)).val.sval))) {
     272         486 :                         pushInstruction(mb, p);
     273         485 :                         continue;
     274             :                 }
     275             :                 /* we keep the original bind operation, because it allows for
     276             :                  * easy undo when the mergtable can not do something */
     277             :                 // pushInstruction(mb, p);
     278             : 
     279      271539 :                 qtpe = getVarType(mb, getArg(p, 0));
     280             : 
     281      271539 :                 matq = newInstructionArgs(NULL, matRef, newRef, pieces + 1);
     282      271543 :                 if (matq == NULL) {
     283           0 :                         msg = createException(MAL, "optimizer.mitosis",
     284             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     285           0 :                         break;
     286             :                 }
     287      271543 :                 getArg(matq, 0) = getArg(p, 0);
     288             : 
     289      271543 :                 if (upd) {
     290       61581 :                         matr = newInstructionArgs(NULL, matRef, newRef, pieces + 1);
     291       61581 :                         if (matr == NULL) {
     292           0 :                                 freeInstruction(matq);
     293           0 :                                 msg = createException(MAL, "optimizer.mitosis",
     294             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     295           0 :                                 break;
     296             :                         }
     297       61581 :                         getArg(matr, 0) = getArg(p, 1);
     298       61581 :                         rtpe = getVarType(mb, getArg(p, 1));
     299             :                 }
     300             : 
     301     1334142 :                 for (j = 0; j < pieces; j++) {
     302     1062618 :                         q = copyInstruction(p);
     303     1062776 :                         if (q == NULL) {
     304           0 :                                 freeInstruction(matr);
     305           0 :                                 freeInstruction(matq);
     306           0 :                                 for (; i < limit; i++)
     307           0 :                                         if (old[i])
     308           0 :                                                 pushInstruction(mb, old[i]);
     309           0 :                                 GDKfree(old);
     310           0 :                                 throw(MAL, "optimizer.mitosis",
     311             :                                           SQLSTATE(HY013) MAL_MALLOC_FAIL);
     312             :                         }
     313     1062776 :                         q = pushInt(mb, q, j);
     314     1062667 :                         q = pushInt(mb, q, pieces);
     315             : 
     316     1062663 :                         qv = getArg(q, 0) = newTmpVariable(mb, qtpe);
     317     1062620 :                         if (upd) {
     318      238118 :                                 rv = getArg(q, 1) = newTmpVariable(mb, rtpe);
     319             :                         }
     320     1062620 :                         pushInstruction(mb, q);
     321     1062609 :                         matq = pushArgument(mb, matq, qv);
     322     1062599 :                         if (upd)
     323      238118 :                                 matr = pushArgument(mb, matr, rv);
     324             :                 }
     325      271524 :                 pushInstruction(mb, matq);
     326      271523 :                 if (upd)
     327       61581 :                         pushInstruction(mb, matr);
     328      271523 :                 freeInstruction(p);
     329             :         }
     330     9472722 :         for (; i < slimit; i++)
     331     9425536 :                 if (old[i])
     332           0 :                         pushInstruction(mb, old[i]);
     333       47186 :         GDKfree(old);
     334             : 
     335             :         /* Defense line against incorrect plans */
     336       47186 :         if (msg == MAL_SUCCEED) {
     337       47186 :                 msg = chkTypes(cntxt->usermodule, mb, FALSE);
     338       47186 :                 if (msg == MAL_SUCCEED) {
     339       47186 :                         msg = chkFlow(mb);
     340       47186 :                         if (msg == MAL_SUCCEED)
     341       47186 :                                 msg = chkDeclarations(mb);
     342             :                 }
     343             :         }
     344           0 :   bailout:
     345             :         /* keep actions taken as a fake argument */
     346      363887 :         (void) pushInt(mb, pci, pieces);
     347      363887 :         return msg;
     348             : }

Generated by: LCOV version 1.14