All, Quick question, how many R interpreters are running within MonetDB if you are using R UDFs? Is there a single shared interpreter or are there multiple copies running? Thanks, Dave
Hey Dave,
There's only a single R interpreter running. When an R function is called a lock is acquired that prevents other R functions from running at the same time. There has been some work to running multiple interpreters in parallel in separate processes for Python UDFs, but this has not been ported to R UDFs yet.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
Mark,
Thanks for the info. To confirm, If I write my UDFs in Python, they will be executed in parallel (if required)?
Thanks,
Dave
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Thursday, June 16, 2016 8:05 AM
To: users-list
Subject: Re: R UDFs
Hey Dave,
There's only a single R interpreter running. When an R function is called a lock is acquired that prevents other R functions from running at the same time. There has been some work to running multiple interpreters in parallel in separate processes for Python UDFs, but this has not been ported to R UDFs yet.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
I just read your Embedded Python/NumPy blog post. Excellent!
Is there a preferred/required version of Python? I was looking at the config log and I see it checking for Python2 & Python3.
-----Original Message-----
From: Anderson, David B [ICG-MKTS]
Sent: Thursday, June 16, 2016 11:34 AM
To: users-list
Subject: RE: R UDFs
Mark,
Thanks for the info. To confirm, If I write my UDFs in Python, they will be executed in parallel (if required)?
Thanks,
Dave
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Thursday, June 16, 2016 8:05 AM
To: users-list
Subject: Re: R UDFs
Hey Dave,
There's only a single R interpreter running. When an R function is called a lock is acquired that prevents other R functions from running at the same time. There has been some work to running multiple interpreters in parallel in separate processes for Python UDFs, but this has not been ported to R UDFs yet.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
Hey Dave,
In the 'default' branch only Python 2.7 UDFs are supported, Python 3 UDFs are currently in the works so you can choose which version you want to use. If you want to use Python 3 UDFs right now you can use the 'python3udf' branch instead.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
Mark,
Can I use Python 2.6, or does it have to be 2.7?
Dave
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Friday, June 17, 2016 9:32 AM
To: users-list
Subject: Re: R UDFs
Hey Dave,
In the 'default' branch only Python 2.7 UDFs are supported, Python 3 UDFs are currently in the works so you can choose which version you want to use. If you want to use Python 3 UDFs right now you can use the 'python3udf' branch instead.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
Hey Dave,
I have only tested it with Python 2.7, but I see no reason why it won't work with Python 2.6.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
Mark,
One more general question. I need to write a Python aggregate UDF which will compute a weighted quartile given a pair of columns. In my case, I will be performing this across a large number of groupings generated by a GROUP BY clause.
Will MonetDB pass the entire columns into the Python UDF, or will slices (corresponding to collections of sub-groups) be passed into the UDF? Trying to figure out how parallelized the computation will be.
All of the docs mention that computations like quartile() need the entire column, but they really only need the "entire" column for each grouping. I haven't sat down with a debugger and MonetDB to see exactly what is getting passed in.
Thanks,
Dave
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Saturday, June 18, 2016 4:49 AM
To: users-list
Subject: Re: R UDFs
Hey Dave,
I have only tested it with Python 2.7, but I see no reason why it won't work with Python 2.6.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
Hey Dave,
If you create the function with LANGUAGE PYTHON_MAP, the function will be called once for every group with the input columns spliced over the group. If you create the function with LANGUAGE PYTHON, the function will be called only once and the groups will be passed in as parameter to the function.
See here for more information: https://www.monetdb.org/wiki/Python_User_Defined_Functions#User-Defined_Aggr...
I should note that parallel aggregates have not been tested as much as the rest of Python UDFs, so if you experience any problems with them please tell me so I can fix them (also please add a reproducible example when you do ;))
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
Mark,
It looks like you are expecting numpy to be installed in the standard system directories. Are there plans to allow it to be installed in a non-standard location? Our build environment is atypical/old and I need to build many libraries from source and install them in non-system directories. Trying to not hack the configure scripts.
Thanks,
Dave
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Saturday, June 18, 2016 4:49 AM
To: users-list
Subject: Re: R UDFs
Hey Dave,
I have only tested it with Python 2.7, but I see no reason why it won't work with Python 2.6.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
Hey Anderson,
That should already be possible. The configure script performs the following steps:
- Get Python link flags by calling 'python-config --ldflags'
- Get Python include directory by calling 'python -c "from distutils.sysconfig import get_python_inc; get_python_inc();"'
- Get the NumPy include path by calling 'python -c "import numpy; numpy.get_include()"'
It is possible that the wrong python-config or the wrong python is being used in these steps if you are not using the system installation of Python. You can manually specify 'python-config' using --with-pyconfig="/path/to/python-config", and specify Python using --with-python2="/path/to/python2".
Hope that helps.
Mark
----- Original Message -----
From: "Anderson, David B"
4. unique = numpy.unique(aggr_group)
Mark,
Thanks for the pointers. I was able to build MonetDB with integrated Python, but I haven't been able to get aggregate Python functions to execute in parallel. I was hoping that when I used a Python aggregate function in conjunction with a group by clause, multiple Python interpeters would be launched in parallel to work on subsets of the hidden parameter aggr_group, but it appears that the computation for all elements of aggr_group occurs within a single thread. Is this the expected behavior?
I tested using a modification of one of your aggregate examples, where I also had "--set gdk_nr_threads 4 --forcemito" passed into the mserver5 command line arguments.
CREATE AGGREGATE python_pid(val INTEGER)
RETURNS INTEGER
LANGUAGE PYTHON {
import os
import time
pid = os.getpid()
time.sleep(10)
unique = numpy.unique(aggr_group)
x = numpy.zeros(shape=(unique.size))
for i in range(0, unique.size):
x[i] = pid
return(x)
};
This always returns the same PID, and I only see one thread running in top.
If I try to use the PYTHON_MAP API,
CREATE AGGREGATE python_map_pid(val INTEGER)
RETURNS INTEGER
LANGUAGE PYTHON_MAP {
import os
pid = os.getpid()
unique = numpy.unique(aggr_group)
x = numpy.zeros(shape=(unique.size))
for i in range(0, unique.size):
x[i] = pid
return(x)
};
I get the following error:
sql>select fctr, python_map_pid(value) from mini group by fctr;
MALException:pyapi.eval:Python exception
2. import os
3. pid = os.getpid()
5. x = numpy.zeros(shape=(unique.size))
6. for i in range(0, unique.size):
global name 'aggr_group' is not defined
So it looks like PYTHON_MAP isn't supposed to be used with aggregate functions?
Thanks,
Dave
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Tuesday, June 28, 2016 5:35 PM
To: users-list
Subject: Re: R UDFs
Hey Anderson,
That should already be possible. The configure script performs the following steps:
- Get Python link flags by calling 'python-config --ldflags'
- Get Python include directory by calling 'python -c "from distutils.sysconfig import get_python_inc; get_python_inc();"'
- Get the NumPy include path by calling 'python -c "import numpy; numpy.get_include()"'
It is possible that the wrong python-config or the wrong python is being used in these steps if you are not using the system installation of Python. You can manually specify 'python-config' using --with-pyconfig="/path/to/python-config", and specify Python using --with-python2="/path/to/python2".
Hope that helps.
Mark
----- Original Message -----
From: "Anderson, David B"
4. unique = numpy.unique(aggr_group)
Hey Anderson,
When PYTHON_MAP is enabled, the UDF is called once per group with only the elements of the group as input, as specified here: https://www.monetdb.org/wiki/Python_User_Defined_Functions#Parallel_Aggregat...
As such, `aggr_group` is not passed as input (because it is redundant; it would contain the same group number for every element because only the elements of one group are passed in). Thus your function fails when trying to use aggr_group because it does not exist.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
4. unique = numpy.unique(aggr_group)
Ahh, got it. Thanks!!
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Tuesday, July 05, 2016 4:57 PM
To: users-list
Subject: Re: R UDFs
Hey Anderson,
When PYTHON_MAP is enabled, the UDF is called once per group with only the elements of the group as input, as specified here: https://www.monetdb.org/wiki/Python_User_Defined_Functions#Parallel_Aggregat...
As such, `aggr_group` is not passed as input (because it is redundant; it would contain the same group number for every element because only the elements of one group are passed in). Thus your function fails when trying to use aggr_group because it does not exist.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
4. unique = numpy.unique(aggr_group)
Mark,
I changed my function to remove the dependence on aggr_group:
CREATE AGGREGATE python_map_pid(val INTEGER)
RETURNS INTEGER
LANGUAGE PYTHON_MAP {
import os
pid = os.getpid()
return(pid)
};
But it isn't getting executed in parallel for the distinct groups. Do I need to do something else?
sql>select fctr,python_map_pid(v1) from mini group by fctr;
+------+------+
| fctr | L1 |
+======+======+
| 12 | 3295 |
| 15 | 3295 |
| 94 | 3295 |
| 66 | 3295 |
| 72 | 3295 |
| 52 | 3295 |
| 32 | 3295 |
| 64 | 3295 |
| 7 | 3295 |
| 56 | 3295 |
| 79 | 3295 |
| 35 | 3295 |
...
| 39 | 3295 |
| 96 | 3295 |
| 6 | 3295 |
+------+------+
99 tuples (148.171ms)
sql>explain select fctr,python_map_pid(v1) from mini group by fctr;
+------------------------------------------------------------------------------------------------------------------------------+
| mal |
+==============================================================================================================================+
| function user.s2_1():void; |
| X_45:void := querylog.define("select fctr,python_map_pid(v1) from mini group by fctr;","default_pipe",90); |
| barrier X_155 := language.dataflow(); |
| X_22 := bat.new(nil:oid,nil:str); |
| X_30 := bat.append(X_22,"sys.mini"); |
| X_38 := bat.append(X_30,"sys.L1"); |
| X_25 := bat.new(nil:oid,nil:str); |
| X_32 := bat.append(X_25,"fctr"); |
| X_40 := bat.append(X_32,"L1"); |
| X_26 := bat.new(nil:oid,nil:str); |
| X_33 := bat.append(X_26,"int"); |
| X_42 := bat.append(X_33,"int"); |
| X_27 := bat.new(nil:oid,nil:int); |
| X_35 := bat.append(X_27,32); |
| X_43 := bat.append(X_35,32); |
| X_29 := bat.new(nil:oid,nil:int); |
| X_37 := bat.append(X_29,0); |
| X_44 := bat.append(X_37,0); |
| X_1 := sql.mvc(); |
| C_63:bat[:oid] := sql.tid(X_1,"sys","mini",0,4); |
| X_68:bat[:int] := sql.bind(X_1,"sys","mini","fctr",0,0,4); |
| (C_72:bat[:oid],X_73:bat[:int]) := sql.bind(X_1,"sys","mini","fctr",2,0,4); |
| X_93 := sql.delta(X_68,C_72,X_73); |
| X_97 := algebra.projection(C_63,X_93); |
| (X_101,X_102,X_103) := group.subgroupdone(X_97); |
| X_104 := algebra.projection(X_102,X_97); |
| C_64:bat[:oid] := sql.tid(X_1,"sys","mini",1,4); |
| X_69:bat[:int] := sql.bind(X_1,"sys","mini","fctr",0,1,4); |
| (C_74:bat[:oid],X_75:bat[:int]) := sql.bind(X_1,"sys","mini","fctr",2,1,4); |
| X_94 := sql.delta(X_69,C_74,X_75); |
| X_98 := algebra.projection(C_64,X_94); |
| (X_105,X_106,X_107) := group.subgroupdone(X_98); |
| X_108 := algebra.projection(X_106,X_98); |
| C_65:bat[:oid] := sql.tid(X_1,"sys","mini",2,4); |
| X_70:bat[:int] := sql.bind(X_1,"sys","mini","fctr",0,2,4); |
| (C_76:bat[:oid],X_77:bat[:int]) := sql.bind(X_1,"sys","mini","fctr",2,2,4); |
| X_95 := sql.delta(X_70,C_76,X_77); |
| X_99 := algebra.projection(C_65,X_95); |
| (X_109,X_110,X_111) := group.subgroupdone(X_99); |
| X_112 := algebra.projection(X_110,X_99); |
| C_67:bat[:oid] := sql.tid(X_1,"sys","mini",3,4); |
| X_71:bat[:int] := sql.bind(X_1,"sys","mini","fctr",0,3,4); |
| (C_78:bat[:oid],X_79:bat[:int]) := sql.bind(X_1,"sys","mini","fctr",2,3,4); |
| X_7:bat[:int] := sql.bind(X_1,"sys","mini","fctr",1); |
| X_96 := sql.delta(X_71,C_78,X_79,X_7); |
| X_100 := algebra.projection(C_67,X_96); |
| (X_113,X_114,X_115) := group.subgroupdone(X_100); |
| X_116 := algebra.projection(X_114,X_100); |
| X_135 := mat.packIncrement(X_104,4); |
| X_137 := mat.packIncrement(X_135,X_108); |
| X_138 := mat.packIncrement(X_137,X_112); |
| X_9 := mat.packIncrement(X_138,X_116); |
| (X_10,r1_15,X_117) := group.subgroupdone(X_9); |
| X_12 := algebra.projection(r1_15,X_9); |
| X_80:bat[:int] := sql.bind(X_1,"sys","mini","v1",0,0,4); |
| (C_84:bat[:oid],X_85:bat[:int]) := sql.bind(X_1,"sys","mini","v1",2,0,4); |
| X_118 := sql.delta(X_80,C_84,X_85); |
| X_122 := algebra.projection(C_63,X_118); |
| X_81:bat[:int] := sql.bind(X_1,"sys","mini","v1",0,1,4); |
| (C_86:bat[:oid],X_87:bat[:int]) := sql.bind(X_1,"sys","mini","v1",2,1,4); |
| X_119 := sql.delta(X_81,C_86,X_87); |
| X_123 := algebra.projection(C_64,X_119); |
| X_82:bat[:int] := sql.bind(X_1,"sys","mini","v1",0,2,4); |
| (C_88:bat[:oid],X_89:bat[:int]) := sql.bind(X_1,"sys","mini","v1",2,2,4); |
| X_120 := sql.delta(X_82,C_88,X_89); |
| X_124 := algebra.projection(C_65,X_120); |
| X_83:bat[:int] := sql.bind(X_1,"sys","mini","v1",0,3,4); |
| (C_90:bat[:oid],X_91:bat[:int]) := sql.bind(X_1,"sys","mini","v1",2,3,4); |
| X_16:bat[:int] := sql.bind(X_1,"sys","mini","v1",1); |
| X_121 := sql.delta(X_83,C_90,X_91,X_16); |
| X_125 := algebra.projection(C_67,X_121); |
| X_140 := mat.packIncrement(X_122,4); |
| X_141 := mat.packIncrement(X_140,X_123); |
| X_142 := mat.packIncrement(X_141,X_124); |
| X_17 := mat.packIncrement(X_142,X_125); |
| X_18:bat[:int] := pyapimap.subeval_aggr(nil,"{\n import os\n pid = os.getpid()\n return(pid)\n};",X_17,X_10,r1_ |
: 15,true); :
| X_13 := bat.setKey(X_12,true); |
| language.pass(X_97); |
| language.pass(X_98); |
| language.pass(X_99); |
| language.pass(X_100); |
| language.pass(X_9); |
| language.pass(C_63); |
| language.pass(C_64); |
| language.pass(C_65); |
| language.pass(C_67); |
| language.pass(r1_15); |
| exit X_155; |
| sql.resultSet(X_38,X_40,X_42,X_43,X_44,X_13,X_18); |
| end user.s2_1; |
| #inline actions= 0 time=2 usec |
| #remap actions= 0 time=5 usec |
| #costmodel actions= 1 time=4 usec |
| #coercion actions= 0 time=2 usec |
| #evaluate actions= 0 time=4 usec |
| #aliases actions= 0 time=9 usec |
| #mitosis actions= 1 time=70 usec |
| #mergetable actions= 5 time=139 usec |
| #deadcode actions=10 time=18 usec |
| #aliases actions= 0 time=12 usec |
| #constants actions= 5 time=13 usec |
| #commonTerms actions= 5 time=29 usec |
| #projectionpath actions= 0 time=11 usec |
| #deadcode actions= 5 time=14 usec |
| #reorder actions= 1 time=53 usec |
| #reduce actions=40 time=44 usec |
| #matpack actions= 2 time=14 usec |
| #dataflow actions=26 time=49 usec |
| #multiplex actions= 0 time=4 usec |
| #profiler actions= 1 time=10 usec |
| #candidates actions= 1 time=2 usec |
| #garbagecollector actions= 1 time=42 usec |
| #total actions= 1 time=687 usec |
+------------------------------------------------------------------------------------------------------------------------------+
113 tuples (3.308ms)
sql>
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Tuesday, July 05, 2016 4:57 PM
To: users-list
Subject: Re: R UDFs
Hey Anderson,
When PYTHON_MAP is enabled, the UDF is called once per group with only the elements of the group as input, as specified here: https://www.monetdb.org/wiki/Python_User_Defined_Functions#Parallel_Aggregat...
As such, `aggr_group` is not passed as input (because it is redundant; it would contain the same group number for every element because only the elements of one group are passed in). Thus your function fails when trying to use aggr_group because it does not exist.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
4. unique = numpy.unique(aggr_group)
Hey Anderson,
It should be getting executed in parallel, but it indeed does not support multiprocessing. It only uses multithreaded parallelism. Parallel aggregations were kind of added as a last-minute hack, so it also does not show up on the MAL plan as the parallelization happens inside the Python UDF.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
4. unique = numpy.unique(aggr_group)
Bingo. When I switch to use the thread id as the return value, I'm seeing multiple return values.
Thanks,
Dave
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Wednesday, July 06, 2016 7:03 AM
To: users-list
Subject: Re: R UDFs
Hey Anderson,
It should be getting executed in parallel, but it indeed does not support multiprocessing. It only uses multithreaded parallelism. Parallel aggregations were kind of added as a last-minute hack, so it also does not show up on the MAL plan as the parallelization happens inside the Python UDF.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"
participants (2)
-
Anderson, David B
-
Mark Raasveldt