Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * H. Muehleisen, M. Kersten
15 : * The R interface
16 : */
17 : #include "monetdb_config.h"
18 : #include "mal.h"
19 : #include "mal_stack.h"
20 : #include "mal_linker.h"
21 : #include "gdk_utils.h"
22 : #include "gdk.h"
23 : #include "sql_catalog.h"
24 : #include "sql_execute.h"
25 : #include "mutils.h"
26 :
27 : #define RAPI_MAX_TUPLES 2147483647L
28 :
29 : // R headers
30 : #define R_INTERFACE_PTRS 1
31 : #define CSTACK_DEFNS 1
32 :
33 : /* R redefines these */
34 : #undef SIZEOF_SIZE_T
35 : #undef ERROR
36 :
37 : #define USE_RINTERNALS 1
38 :
39 : #include <Rversion.h>
40 : #include <Rembedded.h>
41 : #include <Rdefines.h>
42 : #include <Rinternals.h>
43 : #include <R_ext/Parse.h>
44 :
45 : // other headers
46 : #include <string.h>
47 :
48 : //#define _RAPI_DEBUG_
49 :
50 : // this macro blows up mmath.h pragmas
51 : #ifdef warning
52 : # undef warning
53 : #endif
54 :
55 : #define RSTR(somestr) mkCharCE(somestr, CE_UTF8)
56 :
57 : //Element-wise conversion functions, use no-op as passthrough when no conversion required
58 : #define M_TO_R_NOOP(v) (v)
59 : #define R_TO_M_NOOP(v) (v)
60 : #define M_TO_R_DATE(v) mDate_to_rDate(v)
61 : #define R_TO_M_DATE(v) rDate_to_mDate(v)
62 :
63 : #define BAT_TO_SXP(bat,bati,tpe,retsxp,newfun,ptrfun,ctype,naval,memcopy,mapfun) \
64 : do { \
65 : tpe v; size_t j; \
66 : ctype *valptr = NULL; \
67 : tpe* p = (tpe*) bati.base; \
68 : retsxp = PROTECT(newfun(bati.count)); \
69 : if (!retsxp) break; \
70 : valptr = ptrfun(retsxp); \
71 : if (bati.nonil && !bati.nil) { \
72 : if (memcopy) { \
73 : memcpy(valptr, p, \
74 : bati.count * sizeof(tpe)); \
75 : } else { \
76 : for (j = 0; j < bati.count; j++) { \
77 : valptr[j] = mapfun((ctype) p[j]); \
78 : } \
79 : } \
80 : } else { \
81 : for (j = 0; j < bati.count; j++) { \
82 : v = p[j]; \
83 : if ( is_##tpe##_nil(v)) \
84 : valptr[j] = naval; \
85 : else \
86 : valptr[j] = mapfun((ctype) v); \
87 : } \
88 : } \
89 : } while (0)
90 :
91 : #define BAT_TO_INTSXP(bat,bati,tpe,retsxp,memcopy) \
92 : BAT_TO_SXP(bat,bati,tpe,retsxp,NEW_INTEGER,INTEGER_POINTER,int,NA_INTEGER,memcopy,M_TO_R_NOOP)\
93 :
94 : #define BAT_TO_REALSXP(bat,bati,tpe,retsxp,memcopy) \
95 : BAT_TO_SXP(bat,bati,tpe,retsxp,NEW_NUMERIC,NUMERIC_POINTER,double,NA_REAL,memcopy,M_TO_R_NOOP)\
96 :
97 : //DATE stored as integer in MonetDB with epoch 0, R uses double and epoch 1970
98 : #define BAT_TO_DATESXP(bat,bati,tpe,retsxp,memcopy) \
99 : BAT_TO_SXP(bat,bati,tpe,retsxp,NEW_NUMERIC,NUMERIC_POINTER,double,NA_REAL,memcopy, M_TO_R_DATE); \
100 : SEXP klass = mkString("Date"); \
101 : classgets(retsxp, klass);
102 :
103 : #define SXP_TO_BAT(tpe, access_fun, na_check, mapfun) \
104 : do { \
105 : tpe *p, prev = tpe##_nil; size_t j; \
106 : b = COLnew(0, TYPE_##tpe, cnt, TRANSIENT); \
107 : if (!b) break; \
108 : b->tnil = false; b->tnonil = true; b->tkey = false; \
109 : b->tsorted = true; b->trevsorted = true; \
110 : b->tseqbase = oid_nil; \
111 : p = (tpe*) Tloc(b, 0); \
112 : for( j = 0; j < cnt; j++, p++){ \
113 : *p = mapfun((tpe) access_fun(s)[j]); \
114 : if (na_check){ b->tnil = true; b->tnonil = false; *p= tpe##_nil;} \
115 : if (j > 0){ \
116 : if (b->trevsorted && !is_##tpe##_nil(*p) && (is_##tpe##_nil(prev) || *p > prev)){ \
117 : b->trevsorted = false; \
118 : } else \
119 : if (b->tsorted && !is_##tpe##_nil(prev) && (is_##tpe##_nil(*p) || *p < prev)){ \
120 : b->tsorted = false; \
121 : } \
122 : } \
123 : prev = *p; \
124 : } \
125 : BATsetcount(b, cnt); \
126 : } while (0)
127 :
128 : // DATE epoch differs betwen MonetDB (00-01-01) and R (1970-01-01)
129 : // no c API for R date handling so use fixed offset
130 : // >>`-as.double(as.Date(0, origin="0-1-1"))`
131 : static const int days0To1970 = 719528;
132 :
133 : static int
134 4 : mDate_to_rDate(int v)
135 : {
136 4 : return v-days0To1970;
137 : }
138 :
139 : static int
140 3 : rDate_to_mDate(int v)
141 : {
142 3 : return v+days0To1970;
143 : }
144 :
145 : static SEXP
146 88 : bat_to_sexp(BAT* b, int type)
147 : {
148 88 : SEXP varvalue = NULL;
149 88 : BATiter bi = bat_iterator(b);
150 : // TODO: deal with SQL types (DECIMAL/TIME/TIMESTAMP)
151 88 : switch (ATOMstorage(bi.type)) {
152 0 : case TYPE_void: {
153 0 : size_t i = 0;
154 0 : varvalue = PROTECT(NEW_LOGICAL(BATcount(b)));
155 0 : if (!varvalue) {
156 0 : bat_iterator_end(&bi);
157 0 : return NULL;
158 : }
159 0 : for (i = 0; i < BATcount(b); i++) {
160 0 : LOGICAL_POINTER(varvalue)[i] = NA_LOGICAL;
161 : }
162 : break;
163 : }
164 2 : case TYPE_bte:
165 13 : BAT_TO_INTSXP(b, bi, bte, varvalue, 0);
166 : break;
167 1 : case TYPE_sht:
168 7 : BAT_TO_INTSXP(b, bi, sht, varvalue, 0);
169 : break;
170 46 : case TYPE_int:
171 : //Storage is int but the actual defined type may be different
172 46 : switch (type) {
173 : case TYPE_int:
174 : //Storage is int but the actual defined type may be different
175 42 : switch (type) {
176 42 : case TYPE_int:
177 : // special case: memcpy for int-to-int conversion without NULLs
178 1190 : BAT_TO_INTSXP(b, bi, int, varvalue, 1);
179 : break;
180 : default:
181 : if (type == ATOMindex("date")) {
182 : BAT_TO_DATESXP(b, bi, int, varvalue, 0);
183 : } else {
184 : //Type stored as int but no implementation to decode into native R type
185 : BAT_TO_INTSXP(b, bi, int, varvalue, 1);
186 : }
187 : }
188 : break;
189 4 : default:
190 4 : if (type == TYPE_date) {
191 7 : BAT_TO_DATESXP(b, bi, int, varvalue, 0);
192 : } else {
193 : //Type stored as int but no implementation to decode into native R type
194 7 : BAT_TO_INTSXP(b, bi, int, varvalue, 1);
195 : }
196 : break;
197 : }
198 : break;
199 : #ifdef HAVE_HGE
200 1 : case TYPE_hge: /* R's integers are stored as int, so we cannot be sure hge will fit */
201 6 : BAT_TO_REALSXP(b, bi, hge, varvalue, 0);
202 : break;
203 : #endif
204 2 : case TYPE_flt:
205 8 : BAT_TO_REALSXP(b, bi, flt, varvalue, 0);
206 : break;
207 18 : case TYPE_dbl:
208 : // special case: memcpy for double-to-double conversion without NULLs
209 2610 : BAT_TO_REALSXP(b, bi, dbl, varvalue, 1);
210 : break;
211 11 : case TYPE_lng: /* R's integers are stored as int, so we cannot be sure long will fit */
212 201004 : BAT_TO_REALSXP(b, bi, lng, varvalue, 0);
213 : break;
214 7 : case TYPE_str: { // there is only one string type, thus no macro here
215 7 : BUN p, q, j = 0;
216 7 : varvalue = PROTECT(NEW_STRING(BATcount(b)));
217 7 : if (varvalue == NULL) {
218 0 : bat_iterator_end(&bi);
219 0 : return NULL;
220 : }
221 : /* special case where we exploit the duplicate-eliminated string heap */
222 7 : if (GDK_ELIMDOUBLES(b->tvheap)) {
223 7 : SEXP* sexp_ptrs = GDKzalloc(b->tvheap->free * sizeof(SEXP));
224 7 : if (!sexp_ptrs) {
225 0 : bat_iterator_end(&bi);
226 0 : return NULL;
227 : }
228 490 : BATloop(b, p, q) {
229 483 : const char *t = (const char *) BUNtvar(bi, p);
230 483 : ptrdiff_t offset = t - b->tvheap->base;
231 483 : if (!sexp_ptrs[offset]) {
232 21 : if (strNil(t)) {
233 1 : sexp_ptrs[offset] = NA_STRING;
234 : } else {
235 20 : sexp_ptrs[offset] = RSTR(t);
236 : }
237 : }
238 483 : SET_STRING_ELT(varvalue, j++, sexp_ptrs[offset]);
239 : }
240 7 : GDKfree(sexp_ptrs);
241 : }
242 : else {
243 0 : if (bi.nonil) {
244 0 : BATloop(b, p, q) {
245 0 : SET_STRING_ELT(varvalue, j++, RSTR(
246 : (const char *) BUNtvar(bi, p)));
247 : }
248 : }
249 : else {
250 0 : BATloop(b, p, q) {
251 0 : const char *t = (const char *) BUNtvar(bi, p);
252 0 : if (strNil(t)) {
253 0 : SET_STRING_ELT(varvalue, j++, NA_STRING);
254 : } else {
255 0 : SET_STRING_ELT(varvalue, j++, RSTR(t));
256 : }
257 : }
258 : }
259 : }
260 : break;
261 : }
262 : }
263 88 : bat_iterator_end(&bi);
264 88 : return varvalue;
265 : }
266 :
267 69 : static BAT* sexp_to_bat(SEXP s, int type) {
268 69 : BAT* b = NULL;
269 69 : BUN cnt = LENGTH(s);
270 69 : switch (type) {
271 32 : case TYPE_int:
272 32 : if (!IS_INTEGER(s)) {
273 : return NULL;
274 : }
275 402306 : SXP_TO_BAT(int, INTEGER_POINTER, *p==NA_INTEGER, R_TO_M_NOOP);
276 : break;
277 1 : case TYPE_lng:
278 1 : if (!IS_INTEGER(s)) {
279 : return NULL;
280 : }
281 6 : SXP_TO_BAT(lng, INTEGER_POINTER, *p==NA_INTEGER, R_TO_M_NOOP);
282 : break;
283 : #ifdef HAVE_HGE
284 1 : case TYPE_hge:
285 1 : if (!IS_INTEGER(s)) {
286 : return NULL;
287 : }
288 6 : SXP_TO_BAT(hge, INTEGER_POINTER, *p==NA_INTEGER, R_TO_M_NOOP);
289 : break;
290 : #endif
291 3 : case TYPE_bte:
292 : case TYPE_bit: // only R logical types fit into bit BATs
293 3 : if (!IS_LOGICAL(s)) {
294 : return NULL;
295 : }
296 1008 : SXP_TO_BAT(bit, LOGICAL_POINTER, *p==NA_LOGICAL, R_TO_M_NOOP);
297 : break;
298 25 : case TYPE_dbl:
299 25 : if (!IS_NUMERIC(s)) {
300 : return NULL;
301 : }
302 1057 : SXP_TO_BAT(dbl, NUMERIC_POINTER, (ISNA(*p) || isnan(*p) || isinf(*p)), R_TO_M_NOOP);
303 : break;
304 6 : case TYPE_str: {
305 6 : SEXP levels;
306 6 : size_t j;
307 6 : if (!IS_CHARACTER(s) && !isFactor(s)) {
308 : return NULL;
309 : }
310 6 : b = COLnew(0, TYPE_str, cnt, TRANSIENT);
311 6 : if (!b) return NULL;
312 6 : b->tnil = false;
313 6 : b->tnonil = true;
314 6 : b->tkey = false;
315 6 : b->tsorted = false;
316 6 : b->trevsorted = false;
317 : /* get levels once, since this is a function call */
318 6 : levels = GET_LEVELS(s);
319 :
320 38 : for (j = 0; j < cnt; j++) {
321 26 : SEXP rse;
322 26 : if (isFactor(s)) {
323 5 : int ii = INTEGER(s)[j];
324 5 : if (ii == NA_INTEGER) {
325 1 : rse = NA_STRING;
326 : } else {
327 4 : rse = STRING_ELT(levels, ii - 1);
328 : }
329 : } else {
330 21 : rse = STRING_ELT(s, j);
331 : }
332 26 : if (rse == NA_STRING) {
333 2 : b->tnil = true;
334 2 : b->tnonil = false;
335 2 : if (BUNappend(b, str_nil, false) != GDK_SUCCEED) {
336 0 : BBPreclaim(b);
337 0 : return NULL;
338 : }
339 : } else {
340 24 : if (BUNappend(b, CHAR(rse), false) != GDK_SUCCEED) {
341 0 : BBPreclaim(b);
342 0 : return NULL;
343 : }
344 : }
345 : }
346 6 : BATsetcount(b, cnt);
347 6 : break;
348 : }
349 1 : default:
350 1 : if (type == TYPE_date) {
351 1 : if (!IS_NUMERIC(s)) {
352 : return NULL;
353 : }
354 4 : SXP_TO_BAT(date, NUMERIC_POINTER, *p==NA_REAL, R_TO_M_DATE);
355 : }
356 : }
357 :
358 : return b;
359 : }
360 :
361 : const char* rapi_enableflag = "embedded_r";
362 :
363 65 : static bool RAPIEnabled(void) {
364 65 : return (GDKgetenv_istrue(rapi_enableflag)
365 65 : || GDKgetenv_isyes(rapi_enableflag));
366 : }
367 :
368 : // The R-environment should be single threaded, calling for some protective measures.
369 : static MT_Lock rapiLock = MT_LOCK_INITIALIZER(rapiLock);
370 : static bool rapiInitialized = false;
371 : #if 0
372 : static char* rtypenames[] = { "NIL", "SYM", "LIST", "CLO", "ENV", "PROM",
373 : "LANG", "SPECIAL", "BUILTIN", "CHAR", "LGL", "unknown", "unknown",
374 : "INT", "REAL", "CPLX", "STR", "DOT", "ANY", "VEC", "EXPR", "BCODE",
375 : "EXTPTR", "WEAKREF", "RAW", "S4" };
376 : #endif
377 :
378 : static Client rapiClient = NULL;
379 :
380 :
381 : #if 0
382 : // helper function to translate R TYPEOF() return values to something readable
383 : char* rtypename(int rtypeid) {
384 : if (rtypeid < 0 || rtypeid > 25) {
385 : return "unknown";
386 : } else
387 : return rtypenames[rtypeid];
388 : }
389 : #endif
390 :
391 0 : static void writeConsoleEx(const char * buf, int buflen, int foo) {
392 0 : (void) buflen;
393 0 : (void) foo;
394 0 : (void) buf; // silence compiler
395 : #ifdef _RAPI_DEBUG_
396 : printf("# %s", buf);
397 : #endif
398 0 : }
399 :
400 65 : static void writeConsole(const char * buf, int buflen) {
401 65 : writeConsoleEx(buf, buflen, -42);
402 65 : }
403 :
404 3 : static void clearRErrConsole(void) {
405 : // Do nothing?
406 3 : }
407 :
408 : static char *RAPIinstalladdons(void);
409 :
410 : /* UNIX-like initialization */
411 : #ifndef WIN32
412 :
413 : #define R_INTERFACE_PTRS 1
414 : #define CSTACK_DEFNS 1
415 : #include <Rinterface.h>
416 :
417 6 : static char *RAPIinitialize(void) {
418 : // TODO: check for header/library version mismatch?
419 6 : char *e;
420 :
421 : // set R_HOME for packages etc. We know this from our configure script
422 6 : putenv("R_HOME=" RHOME);
423 :
424 : // set some command line arguments
425 : {
426 6 : structRstart rp;
427 6 : char *rargv[] = { "R",
428 : #if R_VERSION >= R_Version(4,0,0)
429 : "--no-echo",
430 : #else
431 : "--slave",
432 : #endif
433 : "--vanilla" };
434 6 : int stat = 0;
435 :
436 6 : R_DefParams(&rp);
437 : #if R_VERSION >= R_Version(4,0,0)
438 6 : rp.R_NoEcho = (Rboolean) TRUE;
439 : #else
440 : rp.R_Slave = (Rboolean) TRUE;
441 : #endif
442 6 : rp.R_Quiet = (Rboolean) TRUE;
443 6 : rp.R_Interactive = (Rboolean) FALSE;
444 6 : rp.R_Verbose = (Rboolean) FALSE;
445 6 : rp.LoadSiteFile = (Rboolean) FALSE;
446 6 : rp.LoadInitFile = (Rboolean) FALSE;
447 6 : rp.RestoreAction = SA_NORESTORE;
448 6 : rp.SaveAction = SA_NOSAVE;
449 6 : rp.NoRenviron = TRUE;
450 6 : stat = Rf_initialize_R(2, rargv);
451 6 : if (stat < 0) {
452 0 : return "Rf_initialize failed";
453 : }
454 6 : R_SetParams(&rp);
455 : }
456 :
457 : /* disable stack checking, because threads will throw it off */
458 6 : R_CStackLimit = (uintptr_t) -1;
459 : /* redirect input/output and set error handler */
460 6 : R_Outputfile = NULL;
461 6 : R_Consolefile = NULL;
462 : /* we do not want R to handle any signal, will interfere with monetdbd */
463 6 : R_SignalHandlers = 0;
464 : /* we want control R's output and input */
465 6 : ptr_R_WriteConsoleEx = writeConsoleEx;
466 6 : ptr_R_WriteConsole = writeConsole;
467 6 : ptr_R_ReadConsole = NULL;
468 6 : ptr_R_ClearerrConsole = clearRErrConsole;
469 :
470 : // big boy here
471 6 : setup_Rmainloop();
472 :
473 6 : if ((e = RAPIinstalladdons()) != 0) {
474 : return e;
475 : }
476 : #if R_VERSION < R_Version(4,2,0)
477 : // patch R internals to disallow quit and system. Setting them to NULL produces an error.
478 : SET_INTERNAL(install("quit"), R_NilValue);
479 : // install.packages() uses system2 to call gcc etc., so we cannot disable it (perhaps store the pointer somewhere just for that?)
480 : //SET_INTERNAL(install("system"), R_NilValue);
481 : #endif
482 :
483 6 : rapiInitialized = true;
484 6 : return NULL;
485 : }
486 : #else
487 :
488 : #define S_IRWXU 0000700
489 :
490 : static char *RAPIinitialize(void) {
491 : return "Sorry, no R API on Windows";
492 : }
493 :
494 : #endif
495 :
496 :
497 6 : static char *RAPIinstalladdons(void) {
498 6 : int evalErr;
499 6 : ParseStatus status;
500 6 : char rlibs[FILENAME_MAX];
501 6 : char rapiinclude[BUFSIZ];
502 6 : SEXP librisexp;
503 6 : int len;
504 :
505 : // r library folder, create if not exists
506 6 : len = snprintf(rlibs, sizeof(rlibs), "%s%c%s", GDKgetenv("gdk_dbpath"), DIR_SEP, "rapi_packages");
507 6 : if (len == -1 || len >= FILENAME_MAX)
508 : return "cannot create rapi_packages directory because the path is too large";
509 :
510 6 : if (MT_mkdir(rlibs) != 0 && errno != EEXIST) {
511 : return "cannot create rapi_packages directory";
512 : }
513 : #ifdef _RAPI_DEBUG_
514 : printf("# R libraries installed in %s\n",rlibs);
515 : #endif
516 :
517 6 : PROTECT(librisexp = allocVector(STRSXP, 1));
518 6 : SET_STRING_ELT(librisexp, 0, mkChar(rlibs));
519 6 : Rf_defineVar(Rf_install(".rapi.libdir"), librisexp, R_GlobalEnv);
520 6 : UNPROTECT(1);
521 :
522 : // run rapi.R environment setup script
523 : {
524 6 : char *f = locate_file("rapi", ".R", 0);
525 6 : snprintf(rapiinclude, sizeof(rapiinclude), "source(\"%s\")", f);
526 6 : GDKfree(f);
527 : }
528 : #if DIR_SEP != '/'
529 : {
530 : char *p;
531 : for (p = rapiinclude; *p; p++)
532 : if (*p == DIR_SEP)
533 : *p = '/';
534 : }
535 : #endif
536 6 : R_tryEvalSilent(
537 : VECTOR_ELT(
538 : R_ParseVector(mkString(rapiinclude), 1, &status,
539 : R_NilValue), 0), R_GlobalEnv, &evalErr);
540 :
541 : // of course the script may contain errors as well
542 6 : if (evalErr != FALSE) {
543 0 : return "failure running R setup script";
544 : }
545 : return NULL;
546 : }
547 :
548 : static str
549 2 : empty_return(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, size_t retcols, oid seqbase)
550 : {
551 2 : str msg = MAL_SUCCEED;
552 2 : void **res = GDKzalloc(retcols * sizeof(void*));
553 :
554 2 : if (!res) {
555 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
556 0 : goto bailout;
557 : }
558 :
559 4 : for (size_t i = 0; i < retcols; i++) {
560 2 : if (isaBatType(getArgType(mb, pci, i))) {
561 1 : BAT *b = COLnew(seqbase, getBatType(getArgType(mb, pci, i)), 0, TRANSIENT);
562 1 : if (!b) {
563 0 : msg = createException(MAL, "rapi.eval", GDK_EXCEPTION);
564 0 : goto bailout;
565 : }
566 1 : ((BAT**)res)[i] = b;
567 : } else { // single value return, only for non-grouped aggregations
568 : // return NULL to conform to SQL aggregates
569 1 : int tpe = getArgType(mb, pci, i);
570 1 : if (!VALinit(&stk->stk[pci->argv[i]], tpe, ATOMnilptr(tpe))) {
571 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
572 0 : goto bailout;
573 : }
574 1 : ((ValPtr*)res)[i] = &stk->stk[pci->argv[i]];
575 : }
576 : }
577 :
578 2 : bailout:
579 2 : if (res) {
580 4 : for (size_t i = 0; i < retcols; i++) {
581 2 : if (isaBatType(getArgType(mb, pci, i))) {
582 1 : BAT *b = ((BAT**)res)[i];
583 :
584 1 : if (b && msg) {
585 0 : BBPreclaim(b);
586 1 : } else if (b) {
587 1 : *getArgReference_bat(stk, pci, i) = b->batCacheid;
588 1 : BBPkeepref(b);
589 : }
590 1 : } else if (msg) {
591 0 : ValPtr pt = ((ValPtr*)res)[i];
592 :
593 0 : if (pt)
594 0 : VALclear(pt);
595 : }
596 : }
597 2 : GDKfree(res);
598 : }
599 2 : return msg;
600 : }
601 :
602 59 : static str RAPIeval(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, bit grouped) {
603 59 : sql_func * sqlfun = NULL;
604 59 : SEXP x, env, retval;
605 59 : SEXP varname = R_NilValue;
606 59 : SEXP varvalue = R_NilValue;
607 59 : ParseStatus status;
608 59 : int i = 0;
609 59 : char argbuf[64];
610 59 : char *argnames = NULL;
611 59 : size_t argnameslen;
612 59 : size_t pos;
613 59 : char* rcall = NULL;
614 59 : size_t rcalllen;
615 59 : int ret_cols = 0; /* int because pci->retc is int, too*/
616 59 : str *args;
617 59 : int evalErr;
618 59 : char *msg = MAL_SUCCEED;
619 59 : BAT *b;
620 59 : node * argnode;
621 59 : int seengrp = FALSE;
622 :
623 59 : rapiClient = cntxt;
624 :
625 : // If the first input argument is of type lng, this is a cardinality-only bulk operation.
626 59 : int has_card_arg = 0;
627 59 : lng card; // cardinality of non-bat inputs
628 59 : if (getArgType(mb, pci, pci->retc) == TYPE_lng) {
629 1 : has_card_arg=1;
630 1 : card = *getArgReference_lng(stk, pci, pci->retc);
631 : }
632 : else {
633 : has_card_arg=0;
634 : card = 1;
635 : }
636 59 : str exprStr = *getArgReference_str(stk, pci, pci->retc + 1 + has_card_arg);
637 :
638 59 : if (!RAPIEnabled()) {
639 0 : throw(MAL, "rapi.eval",
640 : "Embedded R has not been enabled. Start server with --set %s=true",
641 : rapi_enableflag);
642 : }
643 59 : if (!rapiInitialized) {
644 0 : throw(MAL, "rapi.eval",
645 : "Embedded R initialization has failed");
646 : }
647 :
648 59 : sqlfun = *(sql_func**) getArgReference(stk, pci, pci->retc+has_card_arg);
649 59 : args = (str*) GDKzalloc(sizeof(str) * pci->argc);
650 59 : if (args == NULL) {
651 0 : throw(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
652 : }
653 :
654 : // get the lock even before initialization of the R interpreter, as this can take a second and must be done only once.
655 59 : MT_lock_set(&rapiLock);
656 :
657 59 : env = PROTECT(eval(lang1(install("new.env")), R_GlobalEnv));
658 59 : assert(env != NULL);
659 :
660 : // first argument after the return contains the pointer to the sql_func structure
661 : // NEW macro temporarily renamed to MNEW to allow including sql_catalog.h
662 :
663 59 : if (sqlfun != NULL && sqlfun->ops->cnt > 0) {
664 23 : int carg = pci->retc + 2 + has_card_arg;
665 23 : argnode = sqlfun->ops->h;
666 70 : while (argnode) {
667 47 : char* argname = ((sql_arg*) argnode->data)->name;
668 47 : args[carg] = GDKstrdup(argname);
669 47 : carg++;
670 47 : argnode = argnode->next;
671 : }
672 : }
673 : // the first unknown argument is the group, we don't really care for the rest.
674 59 : argnameslen = 2;
675 148 : for (i = pci->retc + 2 + has_card_arg; i < pci->argc; i++) {
676 89 : if (args[i] == NULL) {
677 42 : if (!seengrp && grouped) {
678 6 : args[i] = GDKstrdup("aggr_group");
679 6 : seengrp = TRUE;
680 : } else {
681 36 : snprintf(argbuf, sizeof(argbuf), "arg%i", i - pci->retc - 1);
682 36 : args[i] = GDKstrdup(argbuf);
683 : }
684 : }
685 89 : argnameslen += strlen(args[i]) + 2; /* extra for ", " */
686 : }
687 :
688 : // install the MAL variables into the R environment
689 : // we can basically map values to int ("INTEGER") or double ("REAL")
690 144 : for (i = pci->retc + 2 + has_card_arg; i < pci->argc; i++) {
691 87 : int bat_type = getBatType(getArgType(mb,pci,i));
692 : // check for BAT or scalar first, keep code left
693 87 : if (!isaBatType(getArgType(mb,pci,i))) {
694 18 : const ValRecord *v = &stk->stk[getArg(pci, i)];
695 18 : b = BATconstant(0, v->vtype, VALptr(v), card, TRANSIENT);
696 18 : if (b == NULL) {
697 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
698 0 : goto wrapup;
699 : }
700 : } else {
701 69 : b = BATdescriptor(*getArgReference_bat(stk, pci, i));
702 69 : if (b == NULL) {
703 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
704 0 : goto wrapup;
705 : }
706 69 : if (BATcount(b) == 0) { /* empty input, generate trivial return */
707 : /* I expect all inputs to have the same size, so this should be safe */
708 2 : msg = empty_return(mb, stk, pci, pci->retc, b->hseqbase);
709 2 : BBPunfix(b->batCacheid);
710 2 : goto wrapup;
711 : }
712 : }
713 :
714 : // check the BAT count, if it is bigger than RAPI_MAX_TUPLES, fail
715 85 : if (BATcount(b) > RAPI_MAX_TUPLES) {
716 0 : msg = createException(MAL, "rapi.eval",
717 : "Got "BUNFMT" rows, but can only handle "LLFMT". Sorry.",
718 : BATcount(b), (lng) RAPI_MAX_TUPLES);
719 0 : BBPunfix(b->batCacheid);
720 0 : goto wrapup;
721 : }
722 85 : varname = PROTECT(Rf_install(args[i]));
723 85 : varvalue = bat_to_sexp(b, bat_type);
724 85 : if (varvalue == NULL) {
725 0 : msg = createException(MAL, "rapi.eval", "unknown argument type ");
726 0 : goto wrapup;
727 : }
728 85 : BBPunfix(b->batCacheid);
729 :
730 : // install vector into R environment
731 85 : Rf_defineVar(varname, varvalue, env);
732 85 : UNPROTECT(2);
733 : }
734 :
735 : /* we are going to evaluate the user function within an anonymous function call:
736 : * ret <- (function(arg1){return(arg1*2)})(42)
737 : * the user code is put inside the {}, this keeps our environment clean (TM) and gives
738 : * a clear path for return values, namely using the builtin return() function
739 : * this is also compatible with PL/R
740 : */
741 57 : pos = 0;
742 57 : argnames = malloc(argnameslen);
743 57 : if (argnames == NULL) {
744 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
745 0 : goto wrapup;
746 : }
747 57 : argnames[0] = '\0';
748 142 : for (i = pci->retc + 2 + has_card_arg; i < pci->argc; i++) {
749 170 : pos += snprintf(argnames + pos, argnameslen - pos, "%s%s",
750 128 : args[i], i < pci->argc - 1 ? ", " : "");
751 : }
752 57 : rcalllen = 2 * pos + strlen(exprStr) + 100;
753 57 : rcall = malloc(rcalllen);
754 57 : if (rcall == NULL) {
755 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
756 0 : goto wrapup;
757 : }
758 57 : snprintf(rcall, rcalllen,
759 : "ret <- as.data.frame((function(%s){%s})(%s), nm=NA, stringsAsFactors=F)\n",
760 : argnames, exprStr, argnames);
761 57 : free(argnames);
762 57 : argnames = NULL;
763 : #ifdef _RAPI_DEBUG_
764 : printf("# R call %s\n",rcall);
765 : #endif
766 :
767 57 : x = R_ParseVector(mkString(rcall), 1, &status, R_NilValue);
768 :
769 57 : if (LENGTH(x) != 1 || status != PARSE_OK) {
770 0 : msg = createException(MAL, "rapi.eval",
771 : "Error parsing R expression '%s'. ", exprStr);
772 0 : goto wrapup;
773 : }
774 :
775 57 : retval = R_tryEval(VECTOR_ELT(x, 0), env, &evalErr);
776 57 : if (evalErr != FALSE) {
777 3 : char* errormsg = strdup(R_curErrorBuf());
778 3 : size_t c;
779 3 : if (errormsg == NULL) {
780 0 : msg = createException(MAL, "rapi.eval", "Error running R expression.");
781 0 : goto wrapup;
782 : }
783 : // remove newlines from error message so it fits into a MAPI error (lol)
784 513 : for (c = 0; c < strlen(errormsg); c++) {
785 510 : if (errormsg[c] == '\r' || errormsg[c] == '\n') {
786 8 : errormsg[c] = ' ';
787 : }
788 : }
789 3 : msg = createException(MAL, "rapi.eval",
790 : "Error running R expression: %s", errormsg);
791 3 : free(errormsg);
792 3 : goto wrapup;
793 : }
794 :
795 : // ret should be a data frame with exactly as many columns as we need from retc
796 54 : ret_cols = LENGTH(retval);
797 54 : if (ret_cols != pci->retc) {
798 0 : msg = createException(MAL, "rapi.eval",
799 : "Expected result of %d columns, got %d", pci->retc, ret_cols);
800 0 : goto wrapup;
801 : }
802 :
803 : // collect the return values
804 122 : for (i = 0; i < pci->retc; i++) {
805 69 : SEXP ret_col = VECTOR_ELT(retval, i);
806 69 : int bat_type = getBatType(getArgType(mb,pci,i));
807 69 : if (bat_type == TYPE_any || bat_type == TYPE_void) {
808 0 : getArgType(mb,pci,i) = bat_type;
809 0 : msg = createException(MAL, "rapi.eval",
810 : "Unknown return value, possibly projecting with no parameters.");
811 0 : goto wrapup;
812 : }
813 :
814 : // hand over the vector into a BAT
815 69 : b = sexp_to_bat(ret_col, bat_type);
816 69 : if (b == NULL) {
817 1 : msg = createException(MAL, "rapi.eval",
818 : "Failed to convert column %i", i);
819 1 : goto wrapup;
820 : }
821 :
822 : // bat return
823 68 : if (isaBatType(getArgType(mb,pci,i))) {
824 59 : *getArgReference_bat(stk, pci, i) = b->batCacheid;
825 59 : BBPkeepref(b);
826 : } else { // single value return, only for non-grouped aggregations
827 9 : BATiter li = bat_iterator(b);
828 9 : if (VALinit(&stk->stk[pci->argv[i]], bat_type,
829 9 : BUNtail(li, 0)) == NULL) { // TODO BUNtail here
830 0 : msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
831 0 : bat_iterator_end(&li);
832 0 : goto wrapup;
833 : }
834 9 : bat_iterator_end(&li);
835 9 : BBPunfix(b->batCacheid);
836 : }
837 68 : msg = MAL_SUCCEED;
838 : }
839 53 : wrapup:
840 : /* unprotect environment, so it will be eaten by the GC. */
841 59 : UNPROTECT(1);
842 59 : MT_lock_unset(&rapiLock);
843 59 : if (argnames)
844 0 : free(argnames);
845 59 : if (rcall)
846 57 : free(rcall);
847 341 : for (i = 0; i < pci->argc; i++)
848 282 : GDKfree(args[i]);
849 59 : GDKfree(args);
850 :
851 59 : return msg;
852 : }
853 :
854 49 : static str RAPIevalStd(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
855 : InstrPtr pci) {
856 49 : return RAPIeval(cntxt, mb, stk, pci, 0);
857 : }
858 10 : static str RAPIevalAggr(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
859 : InstrPtr pci) {
860 10 : return RAPIeval(cntxt, mb, stk, pci, 1);
861 : }
862 :
863 : /* used for loopback queries from R
864 : * see test rapi10 in monetdb5/extras/rapi */
865 : extern
866 : #ifdef WIN32
867 : __declspec(dllexport)
868 : #endif
869 : void *RAPIloopback(void *query);
870 :
871 : void *
872 3 : RAPIloopback(void *query) {
873 3 : res_table* output = NULL;
874 3 : char* querystr = (char*)CHAR(STRING_ELT(query, 0));
875 3 : char* err = SQLstatementIntern(rapiClient, querystr, "name", 1, 0, &output);
876 :
877 3 : if (err) { // there was an error
878 0 : return ScalarString(RSTR(err));
879 : }
880 3 : if (output) {
881 3 : int ncols = output->nr_cols;
882 3 : if (ncols > 0) {
883 3 : int i;
884 3 : SEXP retlist, names, varvalue = R_NilValue;
885 3 : retlist = PROTECT(allocVector(VECSXP, ncols));
886 3 : names = PROTECT(NEW_STRING(ncols));
887 9 : for (i = 0; i < ncols; i++) {
888 3 : BAT *b = BATdescriptor(output->cols[i].b);
889 3 : if (b == NULL || !(varvalue = bat_to_sexp(b, TYPE_any))) {
890 0 : UNPROTECT(i + 3);
891 0 : BBPreclaim(b);
892 0 : return ScalarString(RSTR("Conversion error"));
893 : }
894 3 : BBPunfix(b->batCacheid);
895 3 : SET_STRING_ELT(names, i, RSTR(output->cols[i].name));
896 3 : SET_VECTOR_ELT(retlist, i, varvalue);
897 : }
898 3 : res_table_destroy(output);
899 3 : SET_NAMES(retlist, names);
900 3 : UNPROTECT(ncols + 2);
901 3 : return retlist;
902 : }
903 0 : res_table_destroy(output);
904 : }
905 0 : return ScalarLogical(1);
906 : }
907 :
908 6 : static str RAPIprelude(void) {
909 6 : if (RAPIEnabled()) {
910 6 : MT_lock_set(&rapiLock);
911 : /* startup internal R environment */
912 6 : if (!rapiInitialized) {
913 6 : char *initstatus;
914 6 : initstatus = RAPIinitialize();
915 6 : if (initstatus != 0) {
916 0 : MT_lock_unset(&rapiLock);
917 0 : throw(MAL, "rapi.eval",
918 : "failed to initialize R environment (%s)", initstatus);
919 : }
920 6 : Rf_defineVar(Rf_install("MONETDB_LIBDIR"), ScalarString(RSTR(LIBDIR)), R_GlobalEnv);
921 :
922 : }
923 6 : MT_lock_unset(&rapiLock);
924 6 : printf("# MonetDB/R module loaded\n");
925 6 : fflush(stdout);
926 : }
927 : return MAL_SUCCEED;
928 : }
929 :
930 : #include "mel.h"
931 : static mel_func rapi_init_funcs[] = {
932 : pattern("rapi", "eval", RAPIevalStd, false, "Execute a simple R script returning a single value", args(1,3, argany("",0),arg("fptr",ptr),arg("expr",str))),
933 : pattern("rapi", "eval", RAPIevalStd, false, "Execute a simple R script value", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
934 : pattern("rapi", "subeval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
935 : pattern("rapi", "eval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
936 : pattern("batrapi", "eval", RAPIevalStd, false, "Execute a simple R script value", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
937 : pattern("batrapi", "eval", RAPIevalStd, false, "Execute a simple R script value", args(1,4, varargany("",0),arg("card", lng), arg("fptr",ptr),arg("expr",str))),
938 : pattern("batrapi", "subeval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
939 : pattern("batrapi", "eval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
940 : { .imp=NULL }
941 : };
942 : #include "mal_import.h"
943 : #ifdef _MSC_VER
944 : #undef read
945 : #pragma section(".CRT$XCU",read)
946 : #endif
947 6 : LIB_STARTUP_FUNC(init_rapi_mal)
948 6 : { mal_module2("rapi", NULL, rapi_init_funcs, RAPIprelude, NULL); }
|