Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * H. Muehleisen, M. Kersten
15 : * The R interface
16 : */
17 : #include "monetdb_config.h"
18 : #include "mal.h"
19 : #include "mal_stack.h"
20 : #include "mal_linker.h"
21 : #include "gdk.h"
22 : #include "sql_catalog.h"
23 : #include "sql_execute.h"
24 : #include "mutils.h"
25 :
26 : #define RAPI_MAX_TUPLES 2147483647L
27 :
28 : // R headers
29 : #define R_INTERFACE_PTRS 1
30 : #define CSTACK_DEFNS 1
31 :
32 : /* R redefines these */
33 : #undef SIZEOF_SIZE_T
34 : #undef ERROR
35 :
36 : #define USE_RINTERNALS 1
37 :
38 : #include <Rversion.h>
39 : #include <Rembedded.h>
40 : #include <Rdefines.h>
41 : #include <Rinternals.h>
42 : #include <R_ext/Parse.h>
43 :
44 : // other headers
45 : #include <string.h>
46 :
47 : //#define _RAPI_DEBUG_
48 :
49 : // this macro blows up mmath.h pragmas
50 : #ifdef warning
51 : # undef warning
52 : #endif
53 :
54 : #define RSTR(somestr) mkCharCE(somestr, CE_UTF8)
55 :
56 : //Element-wise conversion functions, use no-op as passthrough when no conversion required
57 : #define M_TO_R_NOOP(v) (v)
58 : #define R_TO_M_NOOP(v) (v)
59 : #define M_TO_R_DATE(v) mDate_to_rDate(v)
60 : #define R_TO_M_DATE(v) rDate_to_mDate(v)
61 :
62 : #define BAT_TO_SXP(bat,bati,tpe,retsxp,newfun,ptrfun,ctype,naval,memcopy,mapfun) \
63 : do { \
64 : tpe v; size_t j; \
65 : ctype *valptr = NULL; \
66 : tpe* p = (tpe*) bati.base; \
67 : retsxp = PROTECT(newfun(bati.count)); \
68 : if (!retsxp) break; \
69 : valptr = ptrfun(retsxp); \
70 : if (bati.nonil && !bati.nil) { \
71 : if (memcopy) { \
72 : memcpy(valptr, p, \
73 : bati.count * sizeof(tpe)); \
74 : } else { \
75 : for (j = 0; j < bati.count; j++) { \
76 : valptr[j] = mapfun((ctype) p[j]); \
77 : } \
78 : } \
79 : } else { \
80 : for (j = 0; j < bati.count; j++) { \
81 : v = p[j]; \
82 : if ( is_##tpe##_nil(v)) \
83 : valptr[j] = naval; \
84 : else \
85 : valptr[j] = mapfun((ctype) v); \
86 : } \
87 : } \
88 : } while (0)
89 :
90 : #define BAT_TO_INTSXP(bat,bati,tpe,retsxp,memcopy) \
91 : BAT_TO_SXP(bat,bati,tpe,retsxp,NEW_INTEGER,INTEGER_POINTER,int,NA_INTEGER,memcopy,M_TO_R_NOOP)\
92 :
93 : #define BAT_TO_REALSXP(bat,bati,tpe,retsxp,memcopy) \
94 : BAT_TO_SXP(bat,bati,tpe,retsxp,NEW_NUMERIC,NUMERIC_POINTER,double,NA_REAL,memcopy,M_TO_R_NOOP)\
95 :
96 : //DATE stored as integer in MonetDB with epoch 0, R uses double and epoch 1970
97 : #define BAT_TO_DATESXP(bat,bati,tpe,retsxp,memcopy) \
98 : BAT_TO_SXP(bat,bati,tpe,retsxp,NEW_NUMERIC,NUMERIC_POINTER,double,NA_REAL,memcopy, M_TO_R_DATE); \
99 : SEXP klass = mkString("Date"); \
100 : classgets(retsxp, klass);
101 :
102 : #define SXP_TO_BAT(tpe, access_fun, na_check, mapfun) \
103 : do { \
104 : tpe *p, prev = tpe##_nil; size_t j; \
105 : b = COLnew(0, TYPE_##tpe, cnt, TRANSIENT); \
106 : if (!b) break; \
107 : b->tnil = false; b->tnonil = true; b->tkey = false; \
108 : b->tsorted = true; b->trevsorted = true; \
109 : b->tseqbase = oid_nil; \
110 : p = (tpe*) Tloc(b, 0); \
111 : for( j = 0; j < cnt; j++, p++){ \
112 : *p = mapfun((tpe) access_fun(s)[j]); \
113 : if (na_check){ b->tnil = true; b->tnonil = false; *p= tpe##_nil;} \
114 : if (j > 0){ \
115 : if (b->trevsorted && !is_##tpe##_nil(*p) && (is_##tpe##_nil(prev) || *p > prev)){ \
116 : b->trevsorted = false; \
117 : } else \
118 : if (b->tsorted && !is_##tpe##_nil(prev) && (is_##tpe##_nil(*p) || *p < prev)){ \
119 : b->tsorted = false; \
120 : } \
121 : } \
122 : prev = *p; \
123 : } \
124 : BATsetcount(b, cnt); \
125 : } while (0)
126 :
127 : // DATE epoch differs between MonetDB (00-01-01) and R (1970-01-01)
128 : // no c API for R date handling so use fixed offset
129 : // >>`-as.double(as.Date(0, origin="0-1-1"))`
130 : static const int days0To1970 = 719528;
131 :
132 : static int
133 4 : mDate_to_rDate(int v)
134 : {
135 4 : return v-days0To1970;
136 : }
137 :
138 : static int
139 3 : rDate_to_mDate(int v)
140 : {
141 3 : return v+days0To1970;
142 : }
143 :
144 : static SEXP
145 88 : bat_to_sexp(BAT* b, int type)
146 : {
147 88 : SEXP varvalue = NULL;
148 88 : BATiter bi = bat_iterator(b);
149 : // TODO: deal with SQL types (DECIMAL/TIME/TIMESTAMP)
150 88 : switch (ATOMstorage(bi.type)) {
151 0 : case TYPE_void: {
152 0 : size_t i = 0;
153 0 : varvalue = PROTECT(NEW_LOGICAL(BATcount(b)));
154 0 : if (!varvalue) {
155 0 : bat_iterator_end(&bi);
156 0 : return NULL;
157 : }
158 0 : for (i = 0; i < BATcount(b); i++) {
159 0 : LOGICAL_POINTER(varvalue)[i] = NA_LOGICAL;
160 : }
161 : break;
162 : }
163 2 : case TYPE_bte:
164 13 : BAT_TO_INTSXP(b, bi, bte, varvalue, 0);
165 : break;
166 1 : case TYPE_sht:
167 7 : BAT_TO_INTSXP(b, bi, sht, varvalue, 0);
168 : break;
169 46 : case TYPE_int:
170 : //Storage is int but the actual defined type may be different
171 46 : switch (type) {
172 : case TYPE_int:
173 : //Storage is int but the actual defined type may be different
174 42 : switch (type) {
175 42 : case TYPE_int:
176 : // special case: memcpy for int-to-int conversion without NULLs
177 90 : BAT_TO_INTSXP(b, bi, int, varvalue, 1);
178 : break;
179 : default:
180 : if (type == ATOMindex("date")) {
181 : BAT_TO_DATESXP(b, bi, int, varvalue, 0);
182 : } else {
183 : //Type stored as int but no implementation to decode into native R type
184 : BAT_TO_INTSXP(b, bi, int, varvalue, 1);
185 : }
186 : }
187 : break;
188 4 : default:
189 4 : if (type == TYPE_date) {
190 6 : BAT_TO_DATESXP(b, bi, int, varvalue, 0);
191 : } else {
192 : //Type stored as int but no implementation to decode into native R type
193 2 : BAT_TO_INTSXP(b, bi, int, varvalue, 1);
194 : }
195 : break;
196 : }
197 : break;
198 : #ifdef HAVE_HGE
199 1 : case TYPE_hge: /* R's integers are stored as int, so we cannot be sure hge will fit */
200 5 : BAT_TO_REALSXP(b, bi, hge, varvalue, 0);
201 : break;
202 : #endif
203 2 : case TYPE_flt:
204 7 : BAT_TO_REALSXP(b, bi, flt, varvalue, 0);
205 : break;
206 18 : case TYPE_dbl:
207 : // special case: memcpy for double-to-double conversion without NULLs
208 492 : BAT_TO_REALSXP(b, bi, dbl, varvalue, 1);
209 : break;
210 11 : case TYPE_lng: /* R's integers are stored as int, so we cannot be sure long will fit */
211 201003 : BAT_TO_REALSXP(b, bi, lng, varvalue, 0);
212 : break;
213 7 : case TYPE_str: { // there is only one string type, thus no macro here
214 7 : BUN p, q, j = 0;
215 7 : varvalue = PROTECT(NEW_STRING(BATcount(b)));
216 7 : if (varvalue == NULL) {
217 0 : bat_iterator_end(&bi);
218 0 : return NULL;
219 : }
220 : /* special case where we exploit the duplicate-eliminated string heap */
221 7 : if (GDK_ELIMDOUBLES(b->tvheap)) {
222 7 : SEXP* sexp_ptrs = GDKzalloc(b->tvheap->free * sizeof(SEXP));
223 7 : if (!sexp_ptrs) {
224 0 : bat_iterator_end(&bi);
225 0 : return NULL;
226 : }
227 490 : BATloop(b, p, q) {
228 483 : const char *t = (const char *) BUNtvar(bi, p);
229 483 : ptrdiff_t offset = t - b->tvheap->base;
230 483 : if (!sexp_ptrs[offset]) {
231 21 : if (strNil(t)) {
232 1 : sexp_ptrs[offset] = NA_STRING;
233 : } else {
234 20 : sexp_ptrs[offset] = RSTR(t);
235 : }
236 : }
237 483 : SET_STRING_ELT(varvalue, j++, sexp_ptrs[offset]);
238 : }
239 7 : GDKfree(sexp_ptrs);
240 : }
241 : else {
242 0 : if (bi.nonil) {
243 0 : BATloop(b, p, q) {
244 0 : SET_STRING_ELT(varvalue, j++, RSTR(
245 : (const char *) BUNtvar(bi, p)));
246 : }
247 : }
248 : else {
249 0 : BATloop(b, p, q) {
250 0 : const char *t = (const char *) BUNtvar(bi, p);
251 0 : if (strNil(t)) {
252 0 : SET_STRING_ELT(varvalue, j++, NA_STRING);
253 : } else {
254 0 : SET_STRING_ELT(varvalue, j++, RSTR(t));
255 : }
256 : }
257 : }
258 : }
259 : break;
260 : }
261 : }
262 88 : bat_iterator_end(&bi);
263 88 : return varvalue;
264 : }
265 :
266 69 : static BAT* sexp_to_bat(SEXP s, int type) {
267 69 : BAT* b = NULL;
268 69 : BUN cnt = LENGTH(s);
269 69 : switch (type) {
270 32 : case TYPE_int:
271 32 : if (!IS_INTEGER(s)) {
272 : return NULL;
273 : }
274 402306 : SXP_TO_BAT(int, INTEGER_POINTER, *p==NA_INTEGER, R_TO_M_NOOP);
275 : break;
276 1 : case TYPE_lng:
277 1 : if (!IS_INTEGER(s)) {
278 : return NULL;
279 : }
280 6 : SXP_TO_BAT(lng, INTEGER_POINTER, *p==NA_INTEGER, R_TO_M_NOOP);
281 : break;
282 : #ifdef HAVE_HGE
283 1 : case TYPE_hge:
284 1 : if (!IS_INTEGER(s)) {
285 : return NULL;
286 : }
287 6 : SXP_TO_BAT(hge, INTEGER_POINTER, *p==NA_INTEGER, R_TO_M_NOOP);
288 : break;
289 : #endif
290 3 : case TYPE_bte:
291 : case TYPE_bit: // only R logical types fit into bit BATs
292 3 : if (!IS_LOGICAL(s)) {
293 : return NULL;
294 : }
295 1008 : SXP_TO_BAT(bit, LOGICAL_POINTER, *p==NA_LOGICAL, R_TO_M_NOOP);
296 : break;
297 25 : case TYPE_dbl:
298 25 : if (!IS_NUMERIC(s)) {
299 : return NULL;
300 : }
301 1057 : SXP_TO_BAT(dbl, NUMERIC_POINTER, (ISNA(*p) || isnan(*p) || isinf(*p)), R_TO_M_NOOP);
302 : break;
303 6 : case TYPE_str: {
304 6 : SEXP levels;
305 6 : size_t j;
306 6 : if (!IS_CHARACTER(s) && !isFactor(s)) {
307 : return NULL;
308 : }
309 6 : b = COLnew(0, TYPE_str, cnt, TRANSIENT);
310 6 : if (!b) return NULL;
311 6 : b->tnil = false;
312 6 : b->tnonil = true;
313 6 : b->tkey = false;
314 6 : b->tsorted = false;
315 6 : b->trevsorted = false;
316 : /* get levels once, since this is a function call */
317 6 : levels = GET_LEVELS(s);
318 :
319 38 : for (j = 0; j < cnt; j++) {
320 26 : SEXP rse;
321 26 : if (isFactor(s)) {
322 5 : int ii = INTEGER(s)[j];
323 5 : if (ii == NA_INTEGER) {
324 1 : rse = NA_STRING;
325 : } else {
326 4 : rse = STRING_ELT(levels, ii - 1);
327 : }
328 : } else {
329 21 : rse = STRING_ELT(s, j);
330 : }
331 26 : if (rse == NA_STRING) {
332 2 : b->tnil = true;
333 2 : b->tnonil = false;
334 2 : if (BUNappend(b, str_nil, false) != GDK_SUCCEED) {
335 0 : BBPreclaim(b);
336 0 : return NULL;
337 : }
338 : } else {
339 24 : if (BUNappend(b, CHAR(rse), false) != GDK_SUCCEED) {
340 0 : BBPreclaim(b);
341 0 : return NULL;
342 : }
343 : }
344 : }
345 6 : BATsetcount(b, cnt);
346 6 : break;
347 : }
348 1 : default:
349 1 : if (type == TYPE_date) {
350 1 : if (!IS_NUMERIC(s)) {
351 : return NULL;
352 : }
353 4 : SXP_TO_BAT(date, NUMERIC_POINTER, *p==NA_REAL, R_TO_M_DATE);
354 : }
355 : }
356 :
357 : return b;
358 : }
359 :
360 : const char rapi_enableflag[] = "embedded_r";
361 :
362 65 : static bool RAPIEnabled(void) {
363 65 : return (GDKgetenv_istrue(rapi_enableflag)
364 65 : || GDKgetenv_isyes(rapi_enableflag));
365 : }
366 :
367 : // The R-environment should be single threaded, calling for some protective measures.
368 : static MT_Lock rapiLock = MT_LOCK_INITIALIZER(rapiLock);
369 : static bool rapiInitialized = false;
370 : #if 0
371 : static char* rtypenames[] = { "NIL", "SYM", "LIST", "CLO", "ENV", "PROM",
372 : "LANG", "SPECIAL", "BUILTIN", "CHAR", "LGL", "unknown", "unknown",
373 : "INT", "REAL", "CPLX", "STR", "DOT", "ANY", "VEC", "EXPR", "BCODE",
374 : "EXTPTR", "WEAKREF", "RAW", "S4" };
375 : #endif
376 :
377 : static Client rapiClient = NULL;
378 :
379 :
380 : #if 0
381 : // helper function to translate R TYPEOF() return values to something readable
382 : char* rtypename(int rtypeid) {
383 : if (rtypeid < 0 || rtypeid > 25) {
384 : return "unknown";
385 : } else
386 : return rtypenames[rtypeid];
387 : }
388 : #endif
389 :
390 0 : static void writeConsoleEx(const char * buf, int buflen, int foo) {
391 0 : (void) buflen;
392 0 : (void) foo;
393 0 : (void) buf; // silence compiler
394 : #ifdef _RAPI_DEBUG_
395 : printf("# %s", buf);
396 : #endif
397 0 : }
398 :
399 65 : static void writeConsole(const char * buf, int buflen) {
400 65 : writeConsoleEx(buf, buflen, -42);
401 65 : }
402 :
403 3 : static void clearRErrConsole(void) {
404 : // Do nothing?
405 3 : }
406 :
407 : static char *RAPIinstalladdons(void);
408 :
409 : /* UNIX-like initialization */
410 : #ifndef WIN32
411 :
412 : #define R_INTERFACE_PTRS 1
413 : #define CSTACK_DEFNS 1
414 : #include <Rinterface.h>
415 :
416 6 : static char *RAPIinitialize(void) {
417 : // TODO: check for header/library version mismatch?
418 6 : char *e;
419 :
420 : // set R_HOME for packages etc. We know this from our configure script
421 6 : putenv("R_HOME=" RHOME);
422 :
423 : // set some command line arguments
424 : {
425 6 : structRstart rp;
426 6 : char *rargv[] = { "R",
427 : #if R_VERSION >= R_Version(4,0,0)
428 : "--no-echo",
429 : #else
430 : "--slave",
431 : #endif
432 : "--vanilla" };
433 6 : int stat = 0;
434 :
435 6 : R_DefParams(&rp);
436 : #if R_VERSION >= R_Version(4,0,0)
437 6 : rp.R_NoEcho = (Rboolean) TRUE;
438 : #else
439 : rp.R_Slave = (Rboolean) TRUE;
440 : #endif
441 6 : rp.R_Quiet = (Rboolean) TRUE;
442 6 : rp.R_Interactive = (Rboolean) FALSE;
443 6 : rp.R_Verbose = (Rboolean) FALSE;
444 6 : rp.LoadSiteFile = (Rboolean) FALSE;
445 6 : rp.LoadInitFile = (Rboolean) FALSE;
446 6 : rp.RestoreAction = SA_NORESTORE;
447 6 : rp.SaveAction = SA_NOSAVE;
448 6 : rp.NoRenviron = TRUE;
449 6 : stat = Rf_initialize_R(2, rargv);
450 6 : if (stat < 0) {
451 0 : return "Rf_initialize failed";
452 : }
453 6 : R_SetParams(&rp);
454 : }
455 :
456 : /* disable stack checking, because threads will throw it off */
457 6 : R_CStackLimit = (uintptr_t) -1;
458 : /* redirect input/output and set error handler */
459 6 : R_Outputfile = NULL;
460 6 : R_Consolefile = NULL;
461 : /* we do not want R to handle any signal, will interfere with monetdbd */
462 6 : R_SignalHandlers = 0;
463 : /* we want control R's output and input */
464 6 : ptr_R_WriteConsoleEx = writeConsoleEx;
465 6 : ptr_R_WriteConsole = writeConsole;
466 6 : ptr_R_ReadConsole = NULL;
467 6 : ptr_R_ClearerrConsole = clearRErrConsole;
468 :
469 : // big boy here
470 6 : setup_Rmainloop();
471 :
472 6 : if ((e = RAPIinstalladdons()) != 0) {
473 : return e;
474 : }
475 : #if R_VERSION < R_Version(4,2,0)
476 : // patch R internals to disallow quit and system. Setting them to NULL produces an error.
477 : SET_INTERNAL(install("quit"), R_NilValue);
478 : // install.packages() uses system2 to call gcc etc., so we cannot disable it (perhaps store the pointer somewhere just for that?)
479 : //SET_INTERNAL(install("system"), R_NilValue);
480 : #endif
481 :
482 6 : rapiInitialized = true;
483 6 : return NULL;
484 : }
485 : #else
486 :
487 : #define S_IRWXU 0000700
488 :
489 : static char *RAPIinitialize(void) {
490 : return "Sorry, no R API on Windows";
491 : }
492 :
493 : #endif
494 :
495 :
496 6 : static char *RAPIinstalladdons(void) {
497 6 : int evalErr;
498 6 : ParseStatus status;
499 6 : char rlibs[FILENAME_MAX];
500 6 : char rapiinclude[BUFSIZ];
501 6 : SEXP librisexp;
502 6 : int len;
503 :
504 : // r library folder, create if not exists
505 6 : len = snprintf(rlibs, sizeof(rlibs), "%s%c%s", GDKgetenv("gdk_dbpath"), DIR_SEP, "rapi_packages");
506 6 : if (len == -1 || len >= FILENAME_MAX)
507 : return "cannot create rapi_packages directory because the path is too large";
508 :
509 6 : if (MT_mkdir(rlibs) != 0 && errno != EEXIST) {
510 : return "cannot create rapi_packages directory";
511 : }
512 : #ifdef _RAPI_DEBUG_
513 : printf("# R libraries installed in %s\n",rlibs);
514 : #endif
515 :
516 6 : PROTECT(librisexp = allocVector(STRSXP, 1));
517 6 : SET_STRING_ELT(librisexp, 0, mkChar(rlibs));
518 6 : Rf_defineVar(Rf_install(".rapi.libdir"), librisexp, R_GlobalEnv);
519 6 : UNPROTECT(1);
520 :
521 : // run rapi.R environment setup script
522 : {
523 6 : char *f = locate_file("rapi", ".R", 0);
524 6 : snprintf(rapiinclude, sizeof(rapiinclude), "source(\"%s\")", f);
525 6 : GDKfree(f);
526 : }
527 : #if DIR_SEP != '/'
528 : {
529 : char *p;
530 : for (p = rapiinclude; *p; p++)
531 : if (*p == DIR_SEP)
532 : *p = '/';
533 : }
534 : #endif
535 6 : R_tryEvalSilent(
536 : VECTOR_ELT(
537 : R_ParseVector(mkString(rapiinclude), 1, &status,
538 : R_NilValue), 0), R_GlobalEnv, &evalErr);
539 :
540 : // of course the script may contain errors as well
541 6 : if (evalErr != FALSE) {
542 0 : return "failure running R setup script";
543 : }
544 : return NULL;
545 : }
546 :
547 : static str
548 2 : empty_return(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, size_t retcols, oid seqbase)
549 : {
550 2 : str msg = MAL_SUCCEED;
551 2 : void **res = GDKzalloc(retcols * sizeof(void*));
552 :
553 2 : if (!res) {
554 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
555 0 : goto bailout;
556 : }
557 :
558 4 : for (size_t i = 0; i < retcols; i++) {
559 2 : if (isaBatType(getArgType(mb, pci, i))) {
560 1 : BAT *b = COLnew(seqbase, getBatType(getArgType(mb, pci, i)), 0, TRANSIENT);
561 1 : if (!b) {
562 0 : msg = createException(MAL, "rapi.eval", GDK_EXCEPTION);
563 0 : goto bailout;
564 : }
565 1 : ((BAT**)res)[i] = b;
566 : } else { // single value return, only for non-grouped aggregations
567 : // return NULL to conform to SQL aggregates
568 1 : int tpe = getArgType(mb, pci, i);
569 1 : if (!VALinit(&stk->stk[pci->argv[i]], tpe, ATOMnilptr(tpe))) {
570 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
571 0 : goto bailout;
572 : }
573 1 : ((ValPtr*)res)[i] = &stk->stk[pci->argv[i]];
574 : }
575 : }
576 :
577 2 : bailout:
578 2 : if (res) {
579 4 : for (size_t i = 0; i < retcols; i++) {
580 2 : if (isaBatType(getArgType(mb, pci, i))) {
581 1 : BAT *b = ((BAT**)res)[i];
582 :
583 1 : if (b && msg) {
584 0 : BBPreclaim(b);
585 1 : } else if (b) {
586 1 : *getArgReference_bat(stk, pci, i) = b->batCacheid;
587 1 : BBPkeepref(b);
588 : }
589 1 : } else if (msg) {
590 0 : ValPtr pt = ((ValPtr*)res)[i];
591 :
592 0 : if (pt)
593 0 : VALclear(pt);
594 : }
595 : }
596 2 : GDKfree(res);
597 : }
598 2 : return msg;
599 : }
600 :
601 59 : static str RAPIeval(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, bit grouped) {
602 59 : sql_func * sqlfun = NULL;
603 59 : SEXP x, env, retval;
604 59 : SEXP varname = R_NilValue;
605 59 : SEXP varvalue = R_NilValue;
606 59 : ParseStatus status;
607 59 : int i = 0;
608 59 : char argbuf[64];
609 59 : char *argnames = NULL;
610 59 : size_t argnameslen;
611 59 : size_t pos;
612 59 : char* rcall = NULL;
613 59 : size_t rcalllen;
614 59 : int ret_cols = 0; /* int because pci->retc is int, too*/
615 59 : str *args;
616 59 : int evalErr;
617 59 : char *msg = MAL_SUCCEED;
618 59 : BAT *b;
619 59 : node * argnode;
620 59 : int seengrp = FALSE;
621 :
622 59 : rapiClient = cntxt;
623 :
624 : // If the first input argument is of type lng, this is a cardinality-only bulk operation.
625 59 : int has_card_arg = 0;
626 59 : lng card; // cardinality of non-bat inputs
627 59 : if (getArgType(mb, pci, pci->retc) == TYPE_lng) {
628 1 : has_card_arg=1;
629 1 : card = *getArgReference_lng(stk, pci, pci->retc);
630 : }
631 : else {
632 : has_card_arg=0;
633 : card = 1;
634 : }
635 59 : str exprStr = *getArgReference_str(stk, pci, pci->retc + 1 + has_card_arg);
636 :
637 59 : if (!RAPIEnabled()) {
638 0 : throw(MAL, "rapi.eval",
639 : "Embedded R has not been enabled. Start server with --set %s=true",
640 : rapi_enableflag);
641 : }
642 59 : if (!rapiInitialized) {
643 0 : throw(MAL, "rapi.eval",
644 : "Embedded R initialization has failed");
645 : }
646 :
647 59 : sqlfun = *(sql_func**) getArgReference(stk, pci, pci->retc+has_card_arg);
648 59 : args = (str*) GDKzalloc(sizeof(str) * pci->argc);
649 59 : if (args == NULL) {
650 0 : throw(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
651 : }
652 :
653 : // get the lock even before initialization of the R interpreter, as this can take a second and must be done only once.
654 59 : MT_lock_set(&rapiLock);
655 :
656 59 : env = PROTECT(eval(lang1(install("new.env")), R_GlobalEnv));
657 59 : assert(env != NULL);
658 :
659 : // first argument after the return contains the pointer to the sql_func structure
660 : // NEW macro temporarily renamed to MNEW to allow including sql_catalog.h
661 :
662 59 : if (sqlfun != NULL && sqlfun->ops->cnt > 0) {
663 23 : int carg = pci->retc + 2 + has_card_arg;
664 23 : argnode = sqlfun->ops->h;
665 70 : while (argnode) {
666 47 : char* argname = ((sql_arg*) argnode->data)->name;
667 47 : args[carg] = GDKstrdup(argname);
668 47 : carg++;
669 47 : argnode = argnode->next;
670 : }
671 : }
672 : // the first unknown argument is the group, we don't really care for the rest.
673 59 : argnameslen = 2;
674 148 : for (i = pci->retc + 2 + has_card_arg; i < pci->argc; i++) {
675 89 : if (args[i] == NULL) {
676 42 : if (!seengrp && grouped) {
677 6 : args[i] = GDKstrdup("aggr_group");
678 6 : seengrp = TRUE;
679 : } else {
680 36 : snprintf(argbuf, sizeof(argbuf), "arg%i", i - pci->retc - 1);
681 36 : args[i] = GDKstrdup(argbuf);
682 : }
683 : }
684 89 : argnameslen += strlen(args[i]) + 2; /* extra for ", " */
685 : }
686 :
687 : // install the MAL variables into the R environment
688 : // we can basically map values to int ("INTEGER") or double ("REAL")
689 144 : for (i = pci->retc + 2 + has_card_arg; i < pci->argc; i++) {
690 87 : int bat_type = getBatType(getArgType(mb,pci,i));
691 : // check for BAT or scalar first, keep code left
692 87 : if (!isaBatType(getArgType(mb,pci,i))) {
693 18 : const ValRecord *v = &stk->stk[getArg(pci, i)];
694 18 : b = BATconstant(0, v->vtype, VALptr(v), card, TRANSIENT);
695 18 : if (b == NULL) {
696 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
697 0 : goto wrapup;
698 : }
699 : } else {
700 69 : b = BATdescriptor(*getArgReference_bat(stk, pci, i));
701 69 : if (b == NULL) {
702 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
703 0 : goto wrapup;
704 : }
705 69 : if (BATcount(b) == 0) { /* empty input, generate trivial return */
706 : /* I expect all inputs to have the same size, so this should be safe */
707 2 : msg = empty_return(mb, stk, pci, pci->retc, b->hseqbase);
708 2 : BBPunfix(b->batCacheid);
709 2 : goto wrapup;
710 : }
711 : }
712 :
713 : // check the BAT count, if it is bigger than RAPI_MAX_TUPLES, fail
714 85 : if (BATcount(b) > RAPI_MAX_TUPLES) {
715 0 : msg = createException(MAL, "rapi.eval",
716 : "Got "BUNFMT" rows, but can only handle "LLFMT". Sorry.",
717 : BATcount(b), (lng) RAPI_MAX_TUPLES);
718 0 : BBPunfix(b->batCacheid);
719 0 : goto wrapup;
720 : }
721 85 : varname = PROTECT(Rf_install(args[i]));
722 85 : varvalue = bat_to_sexp(b, bat_type);
723 85 : if (varvalue == NULL) {
724 0 : msg = createException(MAL, "rapi.eval", "unknown argument type ");
725 0 : goto wrapup;
726 : }
727 85 : BBPunfix(b->batCacheid);
728 :
729 : // install vector into R environment
730 85 : Rf_defineVar(varname, varvalue, env);
731 85 : UNPROTECT(2);
732 : }
733 :
734 : /* we are going to evaluate the user function within an anonymous function call:
735 : * ret <- (function(arg1){return(arg1*2)})(42)
736 : * the user code is put inside the {}, this keeps our environment clean (TM) and gives
737 : * a clear path for return values, namely using the builtin return() function
738 : * this is also compatible with PL/R
739 : */
740 57 : pos = 0;
741 57 : argnames = malloc(argnameslen);
742 57 : if (argnames == NULL) {
743 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
744 0 : goto wrapup;
745 : }
746 57 : argnames[0] = '\0';
747 142 : for (i = pci->retc + 2 + has_card_arg; i < pci->argc; i++) {
748 170 : pos += snprintf(argnames + pos, argnameslen - pos, "%s%s",
749 128 : args[i], i < pci->argc - 1 ? ", " : "");
750 : }
751 57 : rcalllen = 2 * pos + strlen(exprStr) + 100;
752 57 : rcall = malloc(rcalllen);
753 57 : if (rcall == NULL) {
754 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
755 0 : goto wrapup;
756 : }
757 57 : snprintf(rcall, rcalllen,
758 : "ret <- as.data.frame((function(%s){%s})(%s), nm=NA, stringsAsFactors=F)\n",
759 : argnames, exprStr, argnames);
760 57 : free(argnames);
761 57 : argnames = NULL;
762 : #ifdef _RAPI_DEBUG_
763 : printf("# R call %s\n",rcall);
764 : #endif
765 :
766 57 : x = R_ParseVector(mkString(rcall), 1, &status, R_NilValue);
767 :
768 57 : if (LENGTH(x) != 1 || status != PARSE_OK) {
769 0 : msg = createException(MAL, "rapi.eval",
770 : "Error parsing R expression '%s'. ", exprStr);
771 0 : goto wrapup;
772 : }
773 :
774 57 : retval = R_tryEval(VECTOR_ELT(x, 0), env, &evalErr);
775 57 : if (evalErr != FALSE) {
776 3 : char* errormsg = strdup(R_curErrorBuf());
777 3 : size_t c;
778 3 : if (errormsg == NULL) {
779 0 : msg = createException(MAL, "rapi.eval", "Error running R expression.");
780 0 : goto wrapup;
781 : }
782 : // remove newlines from error message so it fits into a MAPI error (lol)
783 513 : for (c = 0; c < strlen(errormsg); c++) {
784 510 : if (errormsg[c] == '\r' || errormsg[c] == '\n') {
785 8 : errormsg[c] = ' ';
786 : }
787 : }
788 3 : msg = createException(MAL, "rapi.eval",
789 : "Error running R expression: %s", errormsg);
790 3 : free(errormsg);
791 3 : goto wrapup;
792 : }
793 :
794 : // ret should be a data frame with exactly as many columns as we need from retc
795 54 : ret_cols = LENGTH(retval);
796 54 : if (ret_cols != pci->retc) {
797 0 : msg = createException(MAL, "rapi.eval",
798 : "Expected result of %d columns, got %d", pci->retc, ret_cols);
799 0 : goto wrapup;
800 : }
801 :
802 : // collect the return values
803 122 : for (i = 0; i < pci->retc; i++) {
804 69 : SEXP ret_col = VECTOR_ELT(retval, i);
805 69 : int bat_type = getBatType(getArgType(mb,pci,i));
806 69 : if (bat_type == TYPE_any || bat_type == TYPE_void) {
807 0 : getArgType(mb,pci,i) = bat_type;
808 0 : msg = createException(MAL, "rapi.eval",
809 : "Unknown return value, possibly projecting with no parameters.");
810 0 : goto wrapup;
811 : }
812 :
813 : // hand over the vector into a BAT
814 69 : b = sexp_to_bat(ret_col, bat_type);
815 69 : if (b == NULL) {
816 1 : msg = createException(MAL, "rapi.eval",
817 : "Failed to convert column %i", i);
818 1 : goto wrapup;
819 : }
820 :
821 : // bat return
822 68 : if (isaBatType(getArgType(mb,pci,i))) {
823 59 : *getArgReference_bat(stk, pci, i) = b->batCacheid;
824 59 : BBPkeepref(b);
825 : } else { // single value return, only for non-grouped aggregations
826 9 : BATiter li = bat_iterator(b);
827 9 : if (VALinit(&stk->stk[pci->argv[i]], bat_type,
828 9 : BUNtail(li, 0)) == NULL) { // TODO BUNtail here
829 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
830 0 : bat_iterator_end(&li);
831 0 : goto wrapup;
832 : }
833 9 : bat_iterator_end(&li);
834 9 : BBPunfix(b->batCacheid);
835 : }
836 68 : msg = MAL_SUCCEED;
837 : }
838 53 : wrapup:
839 : /* unprotect environment, so it will be eaten by the GC. */
840 59 : UNPROTECT(1);
841 59 : MT_lock_unset(&rapiLock);
842 59 : if (argnames)
843 0 : free(argnames);
844 59 : if (rcall)
845 57 : free(rcall);
846 341 : for (i = 0; i < pci->argc; i++)
847 282 : GDKfree(args[i]);
848 59 : GDKfree(args);
849 :
850 59 : return msg;
851 : }
852 :
853 49 : static str RAPIevalStd(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
854 : InstrPtr pci) {
855 49 : return RAPIeval(cntxt, mb, stk, pci, 0);
856 : }
857 10 : static str RAPIevalAggr(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
858 : InstrPtr pci) {
859 10 : return RAPIeval(cntxt, mb, stk, pci, 1);
860 : }
861 :
862 : /* used for loopback queries from R
863 : * see test rapi10 in monetdb5/extras/rapi */
864 : extern
865 : #ifdef WIN32
866 : __declspec(dllexport)
867 : #endif
868 : void *RAPIloopback(void *query);
869 :
870 : void *
871 3 : RAPIloopback(void *query) {
872 3 : res_table* output = NULL;
873 3 : char* querystr = (char*)CHAR(STRING_ELT(query, 0));
874 3 : char* err = SQLstatementIntern(rapiClient, querystr, "name", 1, 0, &output);
875 :
876 3 : if (err) { // there was an error
877 0 : return ScalarString(RSTR(err));
878 : }
879 3 : if (output) {
880 3 : int ncols = output->nr_cols;
881 3 : if (ncols > 0) {
882 3 : int i;
883 3 : SEXP retlist, names, varvalue = R_NilValue;
884 3 : retlist = PROTECT(allocVector(VECSXP, ncols));
885 3 : names = PROTECT(NEW_STRING(ncols));
886 9 : for (i = 0; i < ncols; i++) {
887 3 : BAT *b = BATdescriptor(output->cols[i].b);
888 3 : if (b == NULL || !(varvalue = bat_to_sexp(b, TYPE_any))) {
889 0 : UNPROTECT(i + 3);
890 0 : BBPreclaim(b);
891 0 : return ScalarString(RSTR("Conversion error"));
892 : }
893 3 : BBPunfix(b->batCacheid);
894 3 : SET_STRING_ELT(names, i, RSTR(output->cols[i].name));
895 3 : SET_VECTOR_ELT(retlist, i, varvalue);
896 : }
897 3 : res_table_destroy(output);
898 3 : SET_NAMES(retlist, names);
899 3 : UNPROTECT(ncols + 2);
900 3 : return retlist;
901 : }
902 0 : res_table_destroy(output);
903 : }
904 0 : return ScalarLogical(1);
905 : }
906 :
907 6 : static str RAPIprelude(void) {
908 6 : if (RAPIEnabled()) {
909 6 : MT_lock_set(&rapiLock);
910 : /* startup internal R environment */
911 6 : if (!rapiInitialized) {
912 6 : char *initstatus;
913 6 : initstatus = RAPIinitialize();
914 6 : if (initstatus != 0) {
915 0 : MT_lock_unset(&rapiLock);
916 0 : throw(MAL, "rapi.eval",
917 : "failed to initialize R environment (%s)", initstatus);
918 : }
919 6 : Rf_defineVar(Rf_install("MONETDB_LIBDIR"), ScalarString(RSTR(LIBDIR)), R_GlobalEnv);
920 6 : Rf_defineVar(Rf_install("MONETDB_VERSION"), ScalarString(RSTR(MONETDB_VERSION)), R_GlobalEnv);
921 :
922 : }
923 6 : MT_lock_unset(&rapiLock);
924 6 : printf("# MonetDB/R module loaded\n");
925 6 : fflush(stdout);
926 : }
927 : return MAL_SUCCEED;
928 : }
929 :
930 : #include "mel.h"
931 : static mel_func rapi_init_funcs[] = {
932 : pattern("rapi", "eval", RAPIevalStd, false, "Execute a simple R script returning a single value", args(1,3, argany("",0),arg("fptr",ptr),arg("expr",str))),
933 : pattern("rapi", "eval", RAPIevalStd, false, "Execute a simple R script value", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
934 : pattern("rapi", "subeval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
935 : pattern("rapi", "eval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
936 : pattern("batrapi", "eval", RAPIevalStd, false, "Execute a simple R script value", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
937 : pattern("batrapi", "eval", RAPIevalStd, false, "Execute a simple R script value", args(1,4, varargany("",0),arg("card", lng), arg("fptr",ptr),arg("expr",str))),
938 : pattern("batrapi", "subeval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
939 : pattern("batrapi", "eval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
940 : { .imp=NULL }
941 : };
942 : #include "mal_import.h"
943 : #ifdef _MSC_VER
944 : #undef read
945 : #pragma section(".CRT$XCU",read)
946 : #endif
947 6 : LIB_STARTUP_FUNC(init_rapi_mal)
948 6 : { mal_module2("rapi", NULL, rapi_init_funcs, RAPIprelude, NULL); }
|