Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "rel_file_loader.h"
15 : #include "rel_exp.h"
16 :
17 : #include "mal_instruction.h"
18 : #include "mal_interpreter.h"
19 : #include "mal_parser.h"
20 : #include "mal_builder.h"
21 : #include "mal_namespace.h"
22 : #include "mal_exception.h"
23 : #include "mal_linker.h"
24 : #include "mal_backend.h"
25 : #include "sql_types.h"
26 : #include "rel_bin.h"
27 : #include "json.h"
28 : #include "mutils.h"
29 :
30 : #ifdef HAVE_FCNTL_H
31 : #include <fcntl.h>
32 : #endif
33 :
34 : #include <unistd.h>
35 : // #include <glob.h> // not available on Windows
36 :
37 :
38 : typedef struct JSONFileHandle {
39 : allocator *sa;
40 : char *filename;
41 : int fd;
42 : size_t size;
43 : } JSONFileHandle;
44 :
45 :
46 : static JSONFileHandle *
47 15 : json_open(const char *fname, allocator *sa)
48 : {
49 15 : if (!sa)
50 : return NULL;
51 15 : int fd = MT_open(fname, O_RDONLY);
52 15 : if (fd < 0){
53 : // TODO add relevant trace component
54 0 : TRC_ERROR(SQL_EXECUTION, "Error opening file %s: %s", fname, strerror(errno));
55 0 : return NULL;
56 : }
57 15 : struct stat stb;
58 15 : if (MT_stat(fname, &stb) != 0) {
59 0 : TRC_ERROR(SQL_EXECUTION, "Error stat file %s: %s", fname, strerror(errno));
60 0 : close(fd);
61 0 : return NULL;
62 : }
63 15 : if (stb.st_size > 2000000000) {
64 0 : close(fd);
65 0 : TRC_ERROR(SQL_EXECUTION, "Error file %s: Too large", fname);
66 0 : return NULL;
67 : }
68 :
69 15 : JSONFileHandle *res = sa_alloc(sa, sizeof(JSONFileHandle));
70 15 : res->sa = sa;
71 15 : res->filename = sa_strdup(sa, fname);
72 15 : res->fd = fd;
73 15 : res->size = (size_t)stb.st_size;
74 15 : return res;
75 : }
76 :
77 :
78 : static void
79 15 : json_close(JSONFileHandle *jfh)
80 : {
81 15 : if (jfh && jfh->fd)
82 15 : close(jfh->fd);
83 15 : }
84 :
85 :
86 : static char *
87 15 : read_json_file(JSONFileHandle *jfh)
88 : {
89 15 : char *content = NULL;
90 15 : if (jfh) {
91 15 : unsigned int length = (unsigned int)jfh->size;
92 15 : content = sa_zalloc(jfh->sa, length + 1);
93 15 : if (content) {
94 15 : ssize_t nbytes = read(jfh->fd, content, length);
95 15 : if (nbytes < 0)
96 : return NULL;
97 15 : content[length + 1] = '\0';
98 : }
99 : }
100 : return content;
101 : }
102 :
103 :
104 : static str
105 13 : append_terms(allocator *sa, JSON *jt, BAT *b)
106 : {
107 13 : str error = MAL_SUCCEED;
108 13 : size_t offset = 0;
109 13 : JSONterm *root = jt->elm, *t;
110 13 : char *v = NULL;
111 13 : int depth = 0;
112 :
113 2010 : while(offset < (size_t) jt->free && !error) {
114 1997 : t = jt->elm + offset;
115 1997 : JSONterm *prev = offset > 0 ? jt->elm + (offset - 1) : NULL;
116 1997 : switch(t->kind) {
117 535 : case JSON_ARRAY:
118 : case JSON_OBJECT:
119 535 : if ((root->kind == JSON_ARRAY && depth == 1) || root->kind == JSON_OBJECT) {
120 26 : v = sa_strndup(sa, t->value, t->valuelen);
121 26 : if (v) {
122 26 : if (BUNappend(b, v, false) != GDK_SUCCEED) {
123 0 : error = createException(SQL, "json.append_terms", "BUNappend failed!");
124 : }
125 : }
126 : }
127 535 : if ((prev && (prev->kind == JSON_ARRAY || prev->kind == JSON_VALUE)) || (prev==NULL && root->kind ==
128 : JSON_ARRAY))
129 223 : depth ++;
130 : break;
131 : case JSON_ELEMENT:
132 : case JSON_STRING:
133 : case JSON_NUMBER:
134 : case JSON_NULL:
135 : break;
136 210 : case JSON_VALUE:
137 210 : depth --;
138 210 : break;
139 0 : default:
140 0 : error = createException(SQL, "json.append_terms", "unknown json term");
141 0 : break;
142 : }
143 1997 : offset +=1;
144 : }
145 13 : return error;
146 : }
147 :
148 :
149 : static str
150 13 : json_relation(mvc *sql, sql_subfunc *f, char *filename, list *res_exps, char *tname)
151 : {
152 13 : (void) filename;
153 13 : char *res = MAL_SUCCEED;
154 13 : list *types = sa_list(sql->sa);
155 13 : list *names = sa_list(sql->sa);
156 : // use file name as columnn name ?
157 13 : char *cname = sa_strdup(sql->sa, "json");
158 13 : list_append(names, cname);
159 13 : sql_schema *jsons = mvc_bind_schema(sql, "sys");
160 13 : if (!jsons)
161 : return NULL;
162 13 : sql_subtype *st = SA_ZNEW(sql->sa, sql_subtype);
163 13 : st->digits = st->scale = 0;
164 13 : st->multiset = 0;
165 13 : st->type = schema_bind_type(sql, jsons, "json");
166 13 : list_append(types, st);
167 13 : sql_exp *ne = exp_column(sql->sa, a_create(sql->sa, tname), cname, st, CARD_MULTI, 1, 0, 0);
168 13 : set_basecol(ne);
169 13 : ne->alias.label = -(sql->nid++);
170 13 : list_append(res_exps, ne);
171 13 : f->tname = tname;
172 13 : f->res = types;
173 13 : f->coltypes = types;
174 13 : f->colnames = names;
175 13 : return res;
176 : }
177 :
178 :
179 : static void *
180 13 : json_load(void *BE, sql_subfunc *f, char *filename, sql_exp *topn)
181 : {
182 13 : (void) topn; // TODO include topn
183 13 : backend *be = BE;
184 13 : allocator *sa = be->mvc->sa;
185 13 : sql_subtype *tpe = f->res->h->data;
186 13 : const char *tname = f->tname;
187 13 : const char *cname = f->colnames->h->data;
188 :
189 13 : stmt *s = stmt_none(be);
190 13 : InstrPtr q = newStmt(be->mb, "json", "read_json");
191 13 : q = pushStr(be->mb, q, filename);
192 13 : pushInstruction(be->mb, q);
193 13 : s->nr = getDestVar(q);
194 13 : s->q = q;
195 13 : s->nrcols = 1;
196 13 : s->subtype = *tpe;
197 : // is alias essential here?
198 13 : s = stmt_alias(be, s, 1, a_create(sa, tname), cname);
199 13 : return s;
200 : }
201 :
202 :
203 : static int TYPE_json;
204 :
205 :
206 : static str
207 346 : JSONprelude(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
208 : {
209 346 : (void)cntxt; (void)mb; (void)stk; (void)pci;
210 346 : TYPE_json = ATOMindex("json");
211 :
212 346 : fl_register("json", &json_relation, &json_load);
213 346 : return MAL_SUCCEED;
214 : }
215 :
216 :
217 : static str
218 345 : JSONepilogue(void *ret)
219 : {
220 345 : fl_unregister("json");
221 345 : (void)ret;
222 345 : return MAL_SUCCEED;
223 : }
224 :
225 :
226 : static str
227 13 : JSONread_json(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
228 : {
229 13 : (void) cntxt; (void) mb;
230 13 : char *msg = MAL_SUCCEED;
231 13 : char *fname = *(str*)getArgReference(stk, pci, pci->retc);
232 13 : allocator *sa = sa_create(NULL);
233 13 : JSONFileHandle *jfh = json_open(fname, sa);
234 13 : const char* json_str = NULL;
235 13 : JSON *jt = NULL;
236 13 : BAT *b = NULL;
237 13 : if (!jfh) {
238 0 : sa_destroy(sa);
239 0 : msg = createException(SQL, "json.read_json", "Failed to open file %s", fname);
240 0 : return msg;
241 : }
242 13 : json_str = read_json_file(jfh);
243 13 : json_close(jfh);
244 13 : if (json_str)
245 13 : jt = JSONparse(json_str);
246 13 : if (jt) {
247 13 : if (jt->error == NULL) {
248 13 : b = COLnew(0, TYPE_json, 0, TRANSIENT);
249 13 : if ((msg = append_terms(sa, jt, b)) == MAL_SUCCEED) {
250 13 : bat *res = getArgReference_bat(stk, pci, 0);
251 13 : *res = b->batCacheid;
252 13 : BBPkeepref(b);
253 : } else
254 0 : BBPreclaim(b);
255 : } else {
256 0 : msg = GDKstrdup(jt->error);
257 : }
258 13 : JSONfree(jt);
259 : } else {
260 0 : msg = createException(SQL, "json.read_json", "JSONparse error");
261 : }
262 13 : sa_destroy(sa);
263 13 : return msg;
264 : }
265 :
266 :
267 : static str
268 2 : JSONread_nd_json(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
269 : {
270 2 : (void) cntxt; (void) mb;
271 2 : char *msg = MAL_SUCCEED;
272 2 : char *fname = *(str*)getArgReference(stk, pci, pci->retc);
273 2 : allocator *sa = sa_create(NULL);
274 2 : JSONFileHandle *jfh = json_open(fname, sa);
275 2 : if (!jfh) {
276 0 : sa_destroy(sa);
277 0 : msg = createException(SQL, "json.read_nd_json", "Failed to open file %s", fname);
278 0 : return msg;
279 : }
280 2 : char *content = read_json_file(jfh);
281 2 : json_close(jfh);
282 2 : BAT *b = COLnew(0, TYPE_json, 0, TRANSIENT);
283 2 : if (content) {
284 2 : if (b) {
285 : size_t cnt = 0;
286 : char *head = content;
287 : char *tail = content;
288 11180 : while (cnt < (jfh->size + 1)) {
289 11178 : if (head[0] == '\n' || (head[0] == '\r' && head[1] == '\n')) {
290 20 : int skip = 1;
291 20 : if (head[0] == '\r' && head[1] == '\n')
292 20 : skip = 2;
293 20 : head[0] = '\0';
294 20 : JSON *jt = JSONparse(tail);
295 20 : if (jt) {
296 : // must be valid json obj str
297 20 : if (BUNappend(b, tail, false) != GDK_SUCCEED) {
298 0 : msg = createException(SQL, "json.read_nd_json", "BUNappend failed!");
299 0 : break;
300 : }
301 : } else {
302 0 : msg = createException(SQL, "json.read_nd_json", "Invalid json object, JSONparse failed!");
303 0 : break;
304 : }
305 20 : JSONfree(jt);
306 20 : tail = head + skip;
307 20 : while (tail[0] == '\n') // multiple newlines e.g. \n\n
308 0 : tail ++;
309 : head = tail;
310 : }
311 11178 : head ++;
312 11178 : cnt ++;
313 : }
314 : } else {
315 0 : msg = createException(SQL, "json.read_nd_json", "Failed to allocate bat");
316 : }
317 : } else {
318 0 : msg = createException(SQL, "json.read_nd_json", "Failed to read file %s", fname);
319 : }
320 0 : if (msg == MAL_SUCCEED) {
321 2 : bat *res = getArgReference_bat(stk, pci, 0);
322 2 : *res = b->batCacheid;
323 2 : BBPkeepref(b);
324 : } else {
325 0 : BBPreclaim(b);
326 : }
327 :
328 2 : sa_destroy(sa);
329 2 : return msg;
330 : }
331 :
332 :
333 : #include "mel.h"
334 :
335 : unsigned char _json_sql[106] = {
336 : "create function sys.read_nd_json(fname string)\n"
337 : "returns table(json JSON)\n"
338 : "external name json.read_nd_json;\n"
339 : };
340 : #include "monetdb_config.h"
341 : #include "sql_import.h"
342 :
343 : static mel_func json_init_funcs[] = {
344 : pattern("json", "prelude", JSONprelude, false, "", noargs),
345 : command("json", "epilogue", JSONepilogue, false, "", noargs),
346 : pattern("json", "read_json", JSONread_json, false, "Reads json file", args(1,2, batarg("", json), arg("filename", str))),
347 : pattern("json", "read_nd_json", JSONread_nd_json, false, "Reads new line delimited json objects", args(1,2, batarg("", json), arg("filename", str))),
348 : { .imp=NULL }
349 : };
350 :
351 : #ifdef _MSC_VER
352 : #undef read
353 : #pragma section(".CRT$XCU",read)
354 : #endif
355 346 : LIB_STARTUP_FUNC(init_json_mal)
356 346 : { mal_module("json", NULL, json_init_funcs);
357 346 : sql_register("json", _json_sql); }
358 :
|