Hey Stefano,
Indeed, all you have to do is create the function with LANGUAGE PYTHON_MAP instead of LANGUAGE PYTHON to parallelize the function. It should be noted that the parallelization uses a heuristic based on table size to determine if parallelization is worth the effort. If you are doing expensive operations on a small table then the functions might indeed not be parallelized because it incorrectly assumes the parallelization is not worth the effort. You can force parallelization of queries regardless of table size by starting mserver5 with the --forcemito option.
If you want to check if your function is parallelized you can look at the explain output (type 'explain <SQL query>;' instead of <SQL query>), this should present you with the generated MAL plan that specifies how the SQL query is executed. An example of a MAL plan with parallel Python execution is given below (note that 'batpyapimap.eval', which is the python MAL operator, is called multiple times).
The amount of Python processes launched should be equal to the amount of cores on your machine, as specified in the startup of mserver5 (e.g. # Serving database 'demo', using 4 threads). You can change the amount of threads with the option '--set gdk_nr_threads=n' if you want to use a different amount of threads.
Hope that helps.
Mark
+--------------------------------------------------------------------------------------------------+
| mal |
+==================================================================================================+
| function user.s8_1():void; |
| X_30:void := querylog.define("explain select python_function(i) from integers;","default_pip |
: e",46); :
| barrier X_93 := language.dataflow(); |
| X_13 := bat.new(nil:oid,nil:str); |
| X_21 := bat.append(X_13,"sys.L"); |
| X_16 := bat.new(nil:oid,nil:str); |
| X_23 := bat.append(X_16,"python_function_i"); |
| X_17 := bat.new(nil:oid,nil:str); |
| X_25 := bat.append(X_17,"int"); |
| X_18 := bat.new(nil:oid,nil:int); |
| X_27 := bat.append(X_18,32); |
| X_20 := bat.new(nil:oid,nil:int); |
| X_29 := bat.append(X_20,0); |
| X_1 := sql.mvc(); |
| C_48:bat[:oid] := sql.tid(X_1,"sys","integers",0,4); |
| X_53:bat[:int] := sql.bind(X_1,"sys","integers","i",0,0,4); |
| (C_57:bat[:oid],X_58:bat[:int]) := sql.bind(X_1,"sys","integers","i",2,0,4); |
| X_66 := sql.delta(X_53,C_57,X_58); |
| X_70 := algebra.projection(C_48,X_66); |
| X_74 := batpyapimap.eval(nil,"{ return i * 2 };",X_70); |
| C_49:bat[:oid] := sql.tid(X_1,"sys","integers",1,4); |
| X_54:bat[:int] := sql.bind(X_1,"sys","integers","i",0,1,4); |
| (C_59:bat[:oid],X_60:bat[:int]) := sql.bind(X_1,"sys","integers","i",2,1,4); |
| X_67 := sql.delta(X_54,C_59,X_60); |
| X_71 := algebra.projection(C_49,X_67); |
| X_75 := batpyapimap.eval(nil,"{ return i * 2 };",X_71); |
| C_50:bat[:oid] := sql.tid(X_1,"sys","integers",2,4); |
| X_55:bat[:int] := sql.bind(X_1,"sys","integers","i",0,2,4); |
| (C_61:bat[:oid],X_62:bat[:int]) := sql.bind(X_1,"sys","integers","i",2,2,4); |
| X_68 := sql.delta(X_55,C_61,X_62); |
| X_72 := algebra.projection(C_50,X_68); |
| X_76 := batpyapimap.eval(nil,"{ return i * 2 };",X_72); |
| C_52:bat[:oid] := sql.tid(X_1,"sys","integers",3,4); |
| X_56:bat[:int] := sql.bind(X_1,"sys","integers","i",0,3,4); |
| (C_63:bat[:oid],X_64:bat[:int]) := sql.bind(X_1,"sys","integers","i",2,3,4); |
| X_7:bat[:int] := sql.bind(X_1,"sys","integers","i",1); |
| X_69 := sql.delta(X_56,C_63,X_64,X_7); |
| X_73 := algebra.projection(C_52,X_69); |
| X_77 := batpyapimap.eval(nil,"{ return i * 2 };",X_73); |
| X_87 := mat.packIncrement(X_74,4); |
| X_89 := mat.packIncrement(X_87,X_75); |
| X_90 := mat.packIncrement(X_89,X_76); |
| X_9:bat[:int] := mat.packIncrement(X_90,X_77); |
| exit X_93; |
| sql.resultSet(X_21,X_23,X_25,X_27,X_29,X_9); |
| end user.s8_1; |
+--------------------------------------------------------------------------------------------------+
----- Original Message -----
From: "Stefano Fioravanzo"