4. unique = numpy.unique(aggr_group)
Hey Anderson,
It should be getting executed in parallel, but it indeed does not support multiprocessing. It only uses multithreaded parallelism. Parallel aggregations were kind of added as a last-minute hack, so it also does not show up on the MAL plan as the parallelization happens inside the Python UDF.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
To: "users-list"
Sent: Tuesday, July 5, 2016 11:33:11 PM
Subject: RE: R UDFs
Mark,
I changed my function to remove the dependence on aggr_group:
CREATE AGGREGATE python_map_pid(val INTEGER)
RETURNS INTEGER
LANGUAGE PYTHON_MAP {
import os
pid = os.getpid()
return(pid)
};
But it isn't getting executed in parallel for the distinct groups. Do I need to do something else?
sql>select fctr,python_map_pid(v1) from mini group by fctr;
+------+------+
| fctr | L1 |
+======+======+
| 12 | 3295 |
| 15 | 3295 |
| 94 | 3295 |
| 66 | 3295 |
| 72 | 3295 |
| 52 | 3295 |
| 32 | 3295 |
| 64 | 3295 |
| 7 | 3295 |
| 56 | 3295 |
| 79 | 3295 |
| 35 | 3295 |
...
| 39 | 3295 |
| 96 | 3295 |
| 6 | 3295 |
+------+------+
99 tuples (148.171ms)
sql>explain select fctr,python_map_pid(v1) from mini group by fctr;
+------------------------------------------------------------------------------------------------------------------------------+
| mal |
+==============================================================================================================================+
| function user.s2_1():void; |
| X_45:void := querylog.define("select fctr,python_map_pid(v1) from mini group by fctr;","default_pipe",90); |
| barrier X_155 := language.dataflow(); |
| X_22 := bat.new(nil:oid,nil:str); |
| X_30 := bat.append(X_22,"sys.mini"); |
| X_38 := bat.append(X_30,"sys.L1"); |
| X_25 := bat.new(nil:oid,nil:str); |
| X_32 := bat.append(X_25,"fctr"); |
| X_40 := bat.append(X_32,"L1"); |
| X_26 := bat.new(nil:oid,nil:str); |
| X_33 := bat.append(X_26,"int"); |
| X_42 := bat.append(X_33,"int"); |
| X_27 := bat.new(nil:oid,nil:int); |
| X_35 := bat.append(X_27,32); |
| X_43 := bat.append(X_35,32); |
| X_29 := bat.new(nil:oid,nil:int); |
| X_37 := bat.append(X_29,0); |
| X_44 := bat.append(X_37,0); |
| X_1 := sql.mvc(); |
| C_63:bat[:oid] := sql.tid(X_1,"sys","mini",0,4); |
| X_68:bat[:int] := sql.bind(X_1,"sys","mini","fctr",0,0,4); |
| (C_72:bat[:oid],X_73:bat[:int]) := sql.bind(X_1,"sys","mini","fctr",2,0,4); |
| X_93 := sql.delta(X_68,C_72,X_73); |
| X_97 := algebra.projection(C_63,X_93); |
| (X_101,X_102,X_103) := group.subgroupdone(X_97); |
| X_104 := algebra.projection(X_102,X_97); |
| C_64:bat[:oid] := sql.tid(X_1,"sys","mini",1,4); |
| X_69:bat[:int] := sql.bind(X_1,"sys","mini","fctr",0,1,4); |
| (C_74:bat[:oid],X_75:bat[:int]) := sql.bind(X_1,"sys","mini","fctr",2,1,4); |
| X_94 := sql.delta(X_69,C_74,X_75); |
| X_98 := algebra.projection(C_64,X_94); |
| (X_105,X_106,X_107) := group.subgroupdone(X_98); |
| X_108 := algebra.projection(X_106,X_98); |
| C_65:bat[:oid] := sql.tid(X_1,"sys","mini",2,4); |
| X_70:bat[:int] := sql.bind(X_1,"sys","mini","fctr",0,2,4); |
| (C_76:bat[:oid],X_77:bat[:int]) := sql.bind(X_1,"sys","mini","fctr",2,2,4); |
| X_95 := sql.delta(X_70,C_76,X_77); |
| X_99 := algebra.projection(C_65,X_95); |
| (X_109,X_110,X_111) := group.subgroupdone(X_99); |
| X_112 := algebra.projection(X_110,X_99); |
| C_67:bat[:oid] := sql.tid(X_1,"sys","mini",3,4); |
| X_71:bat[:int] := sql.bind(X_1,"sys","mini","fctr",0,3,4); |
| (C_78:bat[:oid],X_79:bat[:int]) := sql.bind(X_1,"sys","mini","fctr",2,3,4); |
| X_7:bat[:int] := sql.bind(X_1,"sys","mini","fctr",1); |
| X_96 := sql.delta(X_71,C_78,X_79,X_7); |
| X_100 := algebra.projection(C_67,X_96); |
| (X_113,X_114,X_115) := group.subgroupdone(X_100); |
| X_116 := algebra.projection(X_114,X_100); |
| X_135 := mat.packIncrement(X_104,4); |
| X_137 := mat.packIncrement(X_135,X_108); |
| X_138 := mat.packIncrement(X_137,X_112); |
| X_9 := mat.packIncrement(X_138,X_116); |
| (X_10,r1_15,X_117) := group.subgroupdone(X_9); |
| X_12 := algebra.projection(r1_15,X_9); |
| X_80:bat[:int] := sql.bind(X_1,"sys","mini","v1",0,0,4); |
| (C_84:bat[:oid],X_85:bat[:int]) := sql.bind(X_1,"sys","mini","v1",2,0,4); |
| X_118 := sql.delta(X_80,C_84,X_85); |
| X_122 := algebra.projection(C_63,X_118); |
| X_81:bat[:int] := sql.bind(X_1,"sys","mini","v1",0,1,4); |
| (C_86:bat[:oid],X_87:bat[:int]) := sql.bind(X_1,"sys","mini","v1",2,1,4); |
| X_119 := sql.delta(X_81,C_86,X_87); |
| X_123 := algebra.projection(C_64,X_119); |
| X_82:bat[:int] := sql.bind(X_1,"sys","mini","v1",0,2,4); |
| (C_88:bat[:oid],X_89:bat[:int]) := sql.bind(X_1,"sys","mini","v1",2,2,4); |
| X_120 := sql.delta(X_82,C_88,X_89); |
| X_124 := algebra.projection(C_65,X_120); |
| X_83:bat[:int] := sql.bind(X_1,"sys","mini","v1",0,3,4); |
| (C_90:bat[:oid],X_91:bat[:int]) := sql.bind(X_1,"sys","mini","v1",2,3,4); |
| X_16:bat[:int] := sql.bind(X_1,"sys","mini","v1",1); |
| X_121 := sql.delta(X_83,C_90,X_91,X_16); |
| X_125 := algebra.projection(C_67,X_121); |
| X_140 := mat.packIncrement(X_122,4); |
| X_141 := mat.packIncrement(X_140,X_123); |
| X_142 := mat.packIncrement(X_141,X_124); |
| X_17 := mat.packIncrement(X_142,X_125); |
| X_18:bat[:int] := pyapimap.subeval_aggr(nil,"{\n import os\n pid = os.getpid()\n return(pid)\n};",X_17,X_10,r1_ |
: 15,true); :
| X_13 := bat.setKey(X_12,true); |
| language.pass(X_97); |
| language.pass(X_98); |
| language.pass(X_99); |
| language.pass(X_100); |
| language.pass(X_9); |
| language.pass(C_63); |
| language.pass(C_64); |
| language.pass(C_65); |
| language.pass(C_67); |
| language.pass(r1_15); |
| exit X_155; |
| sql.resultSet(X_38,X_40,X_42,X_43,X_44,X_13,X_18); |
| end user.s2_1; |
| #inline actions= 0 time=2 usec |
| #remap actions= 0 time=5 usec |
| #costmodel actions= 1 time=4 usec |
| #coercion actions= 0 time=2 usec |
| #evaluate actions= 0 time=4 usec |
| #aliases actions= 0 time=9 usec |
| #mitosis actions= 1 time=70 usec |
| #mergetable actions= 5 time=139 usec |
| #deadcode actions=10 time=18 usec |
| #aliases actions= 0 time=12 usec |
| #constants actions= 5 time=13 usec |
| #commonTerms actions= 5 time=29 usec |
| #projectionpath actions= 0 time=11 usec |
| #deadcode actions= 5 time=14 usec |
| #reorder actions= 1 time=53 usec |
| #reduce actions=40 time=44 usec |
| #matpack actions= 2 time=14 usec |
| #dataflow actions=26 time=49 usec |
| #multiplex actions= 0 time=4 usec |
| #profiler actions= 1 time=10 usec |
| #candidates actions= 1 time=2 usec |
| #garbagecollector actions= 1 time=42 usec |
| #total actions= 1 time=687 usec |
+------------------------------------------------------------------------------------------------------------------------------+
113 tuples (3.308ms)
sql>
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Tuesday, July 05, 2016 4:57 PM
To: users-list
Subject: Re: R UDFs
Hey Anderson,
When PYTHON_MAP is enabled, the UDF is called once per group with only the elements of the group as input, as specified here: https://www.monetdb.org/wiki/Python_User_Defined_Functions#Parallel_Aggregat...
As such, `aggr_group` is not passed as input (because it is redundant; it would contain the same group number for every element because only the elements of one group are passed in). Thus your function fails when trying to use aggr_group because it does not exist.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
To: "users-list"
Sent: Tuesday, July 5, 2016 9:37:57 PM
Subject: RE: R UDFs
Mark,
Thanks for the pointers. I was able to build MonetDB with integrated Python, but I haven't been able to get aggregate Python functions to execute in parallel. I was hoping that when I used a Python aggregate function in conjunction with a group by clause, multiple Python interpeters would be launched in parallel to work on subsets of the hidden parameter aggr_group, but it appears that the computation for all elements of aggr_group occurs within a single thread. Is this the expected behavior?
I tested using a modification of one of your aggregate examples, where I also had "--set gdk_nr_threads 4 --forcemito" passed into the mserver5 command line arguments.
CREATE AGGREGATE python_pid(val INTEGER) RETURNS INTEGER LANGUAGE PYTHON {
import os
import time
pid = os.getpid()
time.sleep(10)
unique = numpy.unique(aggr_group)
x = numpy.zeros(shape=(unique.size))
for i in range(0, unique.size):
x[i] = pid
return(x)
};
This always returns the same PID, and I only see one thread running in top.
If I try to use the PYTHON_MAP API,
CREATE AGGREGATE python_map_pid(val INTEGER) RETURNS INTEGER LANGUAGE PYTHON_MAP {
import os
pid = os.getpid()
unique = numpy.unique(aggr_group)
x = numpy.zeros(shape=(unique.size))
for i in range(0, unique.size):
x[i] = pid
return(x)
};
I get the following error:
sql>select fctr, python_map_pid(value) from mini group by fctr;
MALException:pyapi.eval:Python exception
2. import os
3. pid = os.getpid()
5. x = numpy.zeros(shape=(unique.size))
6. for i in range(0, unique.size):
global name 'aggr_group' is not defined
So it looks like PYTHON_MAP isn't supposed to be used with aggregate functions?
Thanks,
Dave
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Tuesday, June 28, 2016 5:35 PM
To: users-list
Subject: Re: R UDFs
Hey Anderson,
That should already be possible. The configure script performs the following steps:
- Get Python link flags by calling 'python-config --ldflags'
- Get Python include directory by calling 'python -c "from distutils.sysconfig import get_python_inc; get_python_inc();"'
- Get the NumPy include path by calling 'python -c "import numpy; numpy.get_include()"'
It is possible that the wrong python-config or the wrong python is being used in these steps if you are not using the system installation of Python. You can manually specify 'python-config' using --with-pyconfig="/path/to/python-config", and specify Python using --with-python2="/path/to/python2".
Hope that helps.
Mark
----- Original Message -----
From: "Anderson, David B"
To: "users-list"
Sent: Tuesday, June 28, 2016 10:28:50 PM
Subject: RE: R UDFs
Mark,
It looks like you are expecting numpy to be installed in the standard system directories. Are there plans to allow it to be installed in a non-standard location? Our build environment is atypical/old and I need to build many libraries from source and install them in non-system directories. Trying to not hack the configure scripts.
Thanks,
Dave
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Saturday, June 18, 2016 4:49 AM
To: users-list
Subject: Re: R UDFs
Hey Dave,
I have only tested it with Python 2.7, but I see no reason why it won't work with Python 2.6.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
To: "users-list"
Sent: Friday, June 17, 2016 6:07:25 PM
Subject: RE: R UDFs
Mark,
Can I use Python 2.6, or does it have to be 2.7?
Dave
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Friday, June 17, 2016 9:32 AM
To: users-list
Subject: Re: R UDFs
Hey Dave,
In the 'default' branch only Python 2.7 UDFs are supported, Python 3 UDFs are currently in the works so you can choose which version you want to use. If you want to use Python 3 UDFs right now you can use the 'python3udf' branch instead.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
To: "users-list"
Sent: Thursday, June 16, 2016 6:05:30 PM
Subject: RE: R UDFs
I just read your Embedded Python/NumPy blog post. Excellent!
Is there a preferred/required version of Python? I was looking at the config log and I see it checking for Python2 & Python3.
-----Original Message-----
From: Anderson, David B [ICG-MKTS]
Sent: Thursday, June 16, 2016 11:34 AM
To: users-list
Subject: RE: R UDFs
Mark,
Thanks for the info. To confirm, If I write my UDFs in Python, they will be executed in parallel (if required)?
Thanks,
Dave
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Thursday, June 16, 2016 8:05 AM
To: users-list
Subject: Re: R UDFs
Hey Dave,
There's only a single R interpreter running. When an R function is called a lock is acquired that prevents other R functions from running at the same time. There has been some work to running multiple interpreters in parallel in separate processes for Python UDFs, but this has not been ported to R UDFs yet.
Regards,
Mark
_______________________________________________
users-list mailing list
users-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/users-list