4. unique = numpy.unique(aggr_group)
Mark,
I changed my function to remove the dependence on aggr_group:
CREATE AGGREGATE python_map_pid(val INTEGER)
RETURNS INTEGER
LANGUAGE PYTHON_MAP {
import os
pid = os.getpid()
return(pid)
};
But it isn't getting executed in parallel for the distinct groups. Do I need to do something else?
sql>select fctr,python_map_pid(v1) from mini group by fctr;
+------+------+
| fctr | L1 |
+======+======+
| 12 | 3295 |
| 15 | 3295 |
| 94 | 3295 |
| 66 | 3295 |
| 72 | 3295 |
| 52 | 3295 |
| 32 | 3295 |
| 64 | 3295 |
| 7 | 3295 |
| 56 | 3295 |
| 79 | 3295 |
| 35 | 3295 |
...
| 39 | 3295 |
| 96 | 3295 |
| 6 | 3295 |
+------+------+
99 tuples (148.171ms)
sql>explain select fctr,python_map_pid(v1) from mini group by fctr;
+------------------------------------------------------------------------------------------------------------------------------+
| mal |
+==============================================================================================================================+
| function user.s2_1():void; |
| X_45:void := querylog.define("select fctr,python_map_pid(v1) from mini group by fctr;","default_pipe",90); |
| barrier X_155 := language.dataflow(); |
| X_22 := bat.new(nil:oid,nil:str); |
| X_30 := bat.append(X_22,"sys.mini"); |
| X_38 := bat.append(X_30,"sys.L1"); |
| X_25 := bat.new(nil:oid,nil:str); |
| X_32 := bat.append(X_25,"fctr"); |
| X_40 := bat.append(X_32,"L1"); |
| X_26 := bat.new(nil:oid,nil:str); |
| X_33 := bat.append(X_26,"int"); |
| X_42 := bat.append(X_33,"int"); |
| X_27 := bat.new(nil:oid,nil:int); |
| X_35 := bat.append(X_27,32); |
| X_43 := bat.append(X_35,32); |
| X_29 := bat.new(nil:oid,nil:int); |
| X_37 := bat.append(X_29,0); |
| X_44 := bat.append(X_37,0); |
| X_1 := sql.mvc(); |
| C_63:bat[:oid] := sql.tid(X_1,"sys","mini",0,4); |
| X_68:bat[:int] := sql.bind(X_1,"sys","mini","fctr",0,0,4); |
| (C_72:bat[:oid],X_73:bat[:int]) := sql.bind(X_1,"sys","mini","fctr",2,0,4); |
| X_93 := sql.delta(X_68,C_72,X_73); |
| X_97 := algebra.projection(C_63,X_93); |
| (X_101,X_102,X_103) := group.subgroupdone(X_97); |
| X_104 := algebra.projection(X_102,X_97); |
| C_64:bat[:oid] := sql.tid(X_1,"sys","mini",1,4); |
| X_69:bat[:int] := sql.bind(X_1,"sys","mini","fctr",0,1,4); |
| (C_74:bat[:oid],X_75:bat[:int]) := sql.bind(X_1,"sys","mini","fctr",2,1,4); |
| X_94 := sql.delta(X_69,C_74,X_75); |
| X_98 := algebra.projection(C_64,X_94); |
| (X_105,X_106,X_107) := group.subgroupdone(X_98); |
| X_108 := algebra.projection(X_106,X_98); |
| C_65:bat[:oid] := sql.tid(X_1,"sys","mini",2,4); |
| X_70:bat[:int] := sql.bind(X_1,"sys","mini","fctr",0,2,4); |
| (C_76:bat[:oid],X_77:bat[:int]) := sql.bind(X_1,"sys","mini","fctr",2,2,4); |
| X_95 := sql.delta(X_70,C_76,X_77); |
| X_99 := algebra.projection(C_65,X_95); |
| (X_109,X_110,X_111) := group.subgroupdone(X_99); |
| X_112 := algebra.projection(X_110,X_99); |
| C_67:bat[:oid] := sql.tid(X_1,"sys","mini",3,4); |
| X_71:bat[:int] := sql.bind(X_1,"sys","mini","fctr",0,3,4); |
| (C_78:bat[:oid],X_79:bat[:int]) := sql.bind(X_1,"sys","mini","fctr",2,3,4); |
| X_7:bat[:int] := sql.bind(X_1,"sys","mini","fctr",1); |
| X_96 := sql.delta(X_71,C_78,X_79,X_7); |
| X_100 := algebra.projection(C_67,X_96); |
| (X_113,X_114,X_115) := group.subgroupdone(X_100); |
| X_116 := algebra.projection(X_114,X_100); |
| X_135 := mat.packIncrement(X_104,4); |
| X_137 := mat.packIncrement(X_135,X_108); |
| X_138 := mat.packIncrement(X_137,X_112); |
| X_9 := mat.packIncrement(X_138,X_116); |
| (X_10,r1_15,X_117) := group.subgroupdone(X_9); |
| X_12 := algebra.projection(r1_15,X_9); |
| X_80:bat[:int] := sql.bind(X_1,"sys","mini","v1",0,0,4); |
| (C_84:bat[:oid],X_85:bat[:int]) := sql.bind(X_1,"sys","mini","v1",2,0,4); |
| X_118 := sql.delta(X_80,C_84,X_85); |
| X_122 := algebra.projection(C_63,X_118); |
| X_81:bat[:int] := sql.bind(X_1,"sys","mini","v1",0,1,4); |
| (C_86:bat[:oid],X_87:bat[:int]) := sql.bind(X_1,"sys","mini","v1",2,1,4); |
| X_119 := sql.delta(X_81,C_86,X_87); |
| X_123 := algebra.projection(C_64,X_119); |
| X_82:bat[:int] := sql.bind(X_1,"sys","mini","v1",0,2,4); |
| (C_88:bat[:oid],X_89:bat[:int]) := sql.bind(X_1,"sys","mini","v1",2,2,4); |
| X_120 := sql.delta(X_82,C_88,X_89); |
| X_124 := algebra.projection(C_65,X_120); |
| X_83:bat[:int] := sql.bind(X_1,"sys","mini","v1",0,3,4); |
| (C_90:bat[:oid],X_91:bat[:int]) := sql.bind(X_1,"sys","mini","v1",2,3,4); |
| X_16:bat[:int] := sql.bind(X_1,"sys","mini","v1",1); |
| X_121 := sql.delta(X_83,C_90,X_91,X_16); |
| X_125 := algebra.projection(C_67,X_121); |
| X_140 := mat.packIncrement(X_122,4); |
| X_141 := mat.packIncrement(X_140,X_123); |
| X_142 := mat.packIncrement(X_141,X_124); |
| X_17 := mat.packIncrement(X_142,X_125); |
| X_18:bat[:int] := pyapimap.subeval_aggr(nil,"{\n import os\n pid = os.getpid()\n return(pid)\n};",X_17,X_10,r1_ |
: 15,true); :
| X_13 := bat.setKey(X_12,true); |
| language.pass(X_97); |
| language.pass(X_98); |
| language.pass(X_99); |
| language.pass(X_100); |
| language.pass(X_9); |
| language.pass(C_63); |
| language.pass(C_64); |
| language.pass(C_65); |
| language.pass(C_67); |
| language.pass(r1_15); |
| exit X_155; |
| sql.resultSet(X_38,X_40,X_42,X_43,X_44,X_13,X_18); |
| end user.s2_1; |
| #inline actions= 0 time=2 usec |
| #remap actions= 0 time=5 usec |
| #costmodel actions= 1 time=4 usec |
| #coercion actions= 0 time=2 usec |
| #evaluate actions= 0 time=4 usec |
| #aliases actions= 0 time=9 usec |
| #mitosis actions= 1 time=70 usec |
| #mergetable actions= 5 time=139 usec |
| #deadcode actions=10 time=18 usec |
| #aliases actions= 0 time=12 usec |
| #constants actions= 5 time=13 usec |
| #commonTerms actions= 5 time=29 usec |
| #projectionpath actions= 0 time=11 usec |
| #deadcode actions= 5 time=14 usec |
| #reorder actions= 1 time=53 usec |
| #reduce actions=40 time=44 usec |
| #matpack actions= 2 time=14 usec |
| #dataflow actions=26 time=49 usec |
| #multiplex actions= 0 time=4 usec |
| #profiler actions= 1 time=10 usec |
| #candidates actions= 1 time=2 usec |
| #garbagecollector actions= 1 time=42 usec |
| #total actions= 1 time=687 usec |
+------------------------------------------------------------------------------------------------------------------------------+
113 tuples (3.308ms)
sql>
-----Original Message-----
From: users-list [mailto:users-list-bounces+david.b.anderson=citi.com@monetdb.org] On Behalf Of Mark Raasveldt
Sent: Tuesday, July 05, 2016 4:57 PM
To: users-list
Subject: Re: R UDFs
Hey Anderson,
When PYTHON_MAP is enabled, the UDF is called once per group with only the elements of the group as input, as specified here: https://www.monetdb.org/wiki/Python_User_Defined_Functions#Parallel_Aggregat...
As such, `aggr_group` is not passed as input (because it is redundant; it would contain the same group number for every element because only the elements of one group are passed in). Thus your function fails when trying to use aggr_group because it does not exist.
Regards,
Mark
----- Original Message -----
From: "Anderson, David B"