Hi Mark, I have tried what you said but with no results. Here (http://pastie.org/private/cdd0rzhpdyojuq6i0drkra http://pastie.org/private/cdd0rzhpdyojuq6i0drkra) is the explain of my query without forcing parallelization (I have deleted a few lines because it was too long) So I started mserver5 with —forcemito: mserver5 --set embedded_r=true --set embedded_py=true —dbpath=<path> --daemon=yes --set mapi_open=true --debug=2 —forcemito I did again the explain and…nothing changed. The explain is exactly the same as before. The server starts with ' Serving database ‘dbname', using 16 threads’, so at least monet knows it can use up to 16 threads. The input columns of the python function have 550 rows, not that much but the function takes a lot of time to execute so I really need to run it in parallel. Thanks, Stefano
On 28 Jun 2016, at 23:24, Mark Raasveldt
wrote: Hey Stefano,
Indeed, all you have to do is create the function with LANGUAGE PYTHON_MAP instead of LANGUAGE PYTHON to parallelize the function. It should be noted that the parallelization uses a heuristic based on table size to determine if parallelization is worth the effort. If you are doing expensive operations on a small table then the functions might indeed not be parallelized because it incorrectly assumes the parallelization is not worth the effort. You can force parallelization of queries regardless of table size by starting mserver5 with the --forcemito option.
If you want to check if your function is parallelized you can look at the explain output (type 'explain <SQL query>;' instead of <SQL query>), this should present you with the generated MAL plan that specifies how the SQL query is executed. An example of a MAL plan with parallel Python execution is given below (note that 'batpyapimap.eval', which is the python MAL operator, is called multiple times).
The amount of Python processes launched should be equal to the amount of cores on your machine, as specified in the startup of mserver5 (e.g. # Serving database 'demo', using 4 threads). You can change the amount of threads with the option '--set gdk_nr_threads=n' if you want to use a different amount of threads.
Hope that helps.
Mark
+--------------------------------------------------------------------------------------------------+ | mal | +==================================================================================================+ | function user.s8_1():void; | | X_30:void := querylog.define("explain select python_function(i) from integers;","default_pip | : e",46); : | barrier X_93 := language.dataflow(); | | X_13 := bat.new(nil:oid,nil:str); | | X_21 := bat.append(X_13,"sys.L"); | | X_16 := bat.new(nil:oid,nil:str); | | X_23 := bat.append(X_16,"python_function_i"); | | X_17 := bat.new(nil:oid,nil:str); | | X_25 := bat.append(X_17,"int"); | | X_18 := bat.new(nil:oid,nil:int); | | X_27 := bat.append(X_18,32); | | X_20 := bat.new(nil:oid,nil:int); | | X_29 := bat.append(X_20,0); | | X_1 := sql.mvc(); | | C_48:bat[:oid] := sql.tid(X_1,"sys","integers",0,4); | | X_53:bat[:int] := sql.bind(X_1,"sys","integers","i",0,0,4); | | (C_57:bat[:oid],X_58:bat[:int]) := sql.bind(X_1,"sys","integers","i",2,0,4); | | X_66 := sql.delta(X_53,C_57,X_58); | | X_70 := algebra.projection(C_48,X_66); | | X_74 := batpyapimap.eval(nil,"{ return i * 2 };",X_70); | | C_49:bat[:oid] := sql.tid(X_1,"sys","integers",1,4); | | X_54:bat[:int] := sql.bind(X_1,"sys","integers","i",0,1,4); | | (C_59:bat[:oid],X_60:bat[:int]) := sql.bind(X_1,"sys","integers","i",2,1,4); | | X_67 := sql.delta(X_54,C_59,X_60); | | X_71 := algebra.projection(C_49,X_67); | | X_75 := batpyapimap.eval(nil,"{ return i * 2 };",X_71); | | C_50:bat[:oid] := sql.tid(X_1,"sys","integers",2,4); | | X_55:bat[:int] := sql.bind(X_1,"sys","integers","i",0,2,4); | | (C_61:bat[:oid],X_62:bat[:int]) := sql.bind(X_1,"sys","integers","i",2,2,4); | | X_68 := sql.delta(X_55,C_61,X_62); | | X_72 := algebra.projection(C_50,X_68); | | X_76 := batpyapimap.eval(nil,"{ return i * 2 };",X_72); | | C_52:bat[:oid] := sql.tid(X_1,"sys","integers",3,4); | | X_56:bat[:int] := sql.bind(X_1,"sys","integers","i",0,3,4); | | (C_63:bat[:oid],X_64:bat[:int]) := sql.bind(X_1,"sys","integers","i",2,3,4); | | X_7:bat[:int] := sql.bind(X_1,"sys","integers","i",1); | | X_69 := sql.delta(X_56,C_63,X_64,X_7); | | X_73 := algebra.projection(C_52,X_69); | | X_77 := batpyapimap.eval(nil,"{ return i * 2 };",X_73); | | X_87 := mat.packIncrement(X_74,4); | | X_89 := mat.packIncrement(X_87,X_75); | | X_90 := mat.packIncrement(X_89,X_76); | | X_9:bat[:int] := mat.packIncrement(X_90,X_77); | | exit X_93; | | sql.resultSet(X_21,X_23,X_25,X_27,X_29,X_9); | | end user.s8_1; | +--------------------------------------------------------------------------------------------------+
----- Original Message ----- From: "Stefano Fioravanzo"
To: "users-list" Sent: Tuesday, June 28, 2016 12:03:49 PM Subject: python map Hello,
I have a long and complex embedded python function. This function depends on the single rows of the input columns, so I would like to parallelize it.
I’ve read about the post on embedded python and python_map. I understand that using python_map should automatically split (in a not specified way) the input columns and fire different (how many?) python processes executing on their input slice. Finally it should pack all the partial results.
Well, I have tried to use python_map, but nothing different happened. Is there something more I have to do apart from setting LANGUAGE PYTHON_MAP?
Regards,
Stefano _______________________________________________ users-list mailing list users-list@monetdb.org https://www.monetdb.org/mailman/listinfo/users-list _______________________________________________ users-list mailing list users-list@monetdb.org https://www.monetdb.org/mailman/listinfo/users-list