Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * Niels Nes
15 : * A simple interface to IO streams
16 : * All file IO is tunneled through the stream library, which guarantees
17 : * cross-platform capabilities. Several protocols are provided, e.g. it
18 : * can be used to open 'non compressed, gzipped, bzip2ed' data files. It
19 : * encapsulates the corresponding library managed in common/stream.
20 : */
21 :
22 : #include "monetdb_config.h"
23 : #include "streams.h"
24 : #include "mal_exception.h"
25 :
26 : static str
27 1 : mnstr_open_rstreamwrap(Stream *S, str *filename)
28 : {
29 1 : stream *s;
30 :
31 1 : if ((s = open_rstream(*filename)) == NULL
32 1 : || mnstr_errnr(s) != MNSTR_NO__ERROR) {
33 0 : if (s)
34 0 : close_stream(s);
35 0 : throw(IO, "streams.open", "could not open file '%s': %s",
36 : *filename, mnstr_peek_error(NULL));
37 : } else {
38 1 : *(stream **) S = s;
39 : }
40 :
41 1 : return MAL_SUCCEED;
42 : }
43 :
44 : static str
45 1 : mnstr_open_wstreamwrap(Stream *S, str *filename)
46 : {
47 1 : stream *s;
48 :
49 1 : if ((s = open_wstream(*filename)) == NULL
50 1 : || mnstr_errnr(s) != MNSTR_NO__ERROR) {
51 0 : if (s)
52 0 : close_stream(s);
53 0 : throw(IO, "streams.open", "could not open file '%s': %s",
54 : *filename, mnstr_peek_error(NULL));
55 : } else {
56 1 : *(stream **) S = s;
57 : }
58 :
59 1 : return MAL_SUCCEED;
60 : }
61 :
62 : static str
63 1 : mnstr_open_rastreamwrap(Stream *S, str *filename)
64 : {
65 1 : stream *s;
66 :
67 1 : if ((s = open_rastream(*filename)) == NULL
68 1 : || mnstr_errnr(s) != MNSTR_NO__ERROR) {
69 0 : if (s)
70 0 : close_stream(s);
71 0 : throw(IO, "streams.open", "could not open file '%s': %s",
72 : *filename, mnstr_peek_error(NULL));
73 : } else {
74 1 : *(stream **) S = s;
75 : }
76 :
77 1 : return MAL_SUCCEED;
78 : }
79 :
80 : static str
81 1 : mnstr_open_wastreamwrap(Stream *S, str *filename)
82 : {
83 1 : stream *s;
84 :
85 1 : if ((s = open_wastream(*filename)) == NULL
86 1 : || mnstr_errnr(s) != MNSTR_NO__ERROR) {
87 0 : if (s)
88 0 : close_stream(s);
89 0 : throw(IO, "streams.open", "could not open file '%s': %s",
90 : *filename, mnstr_peek_error(NULL));
91 : } else {
92 1 : *(stream **) S = s;
93 : }
94 :
95 1 : return MAL_SUCCEED;
96 : }
97 :
98 : static str
99 1 : mnstr_write_stringwrap(void *ret, Stream *S, str *data)
100 : {
101 1 : stream *s = *(stream **) S;
102 1 : (void) ret;
103 :
104 1 : if (mnstr_write(s, *data, 1, strlen(*data)) < 0)
105 0 : throw(IO, "streams.writeStr", "failed to write string");
106 :
107 : return MAL_SUCCEED;
108 : }
109 :
110 : static str
111 1 : mnstr_writeIntwrap(void *ret, Stream *S, int *data)
112 : {
113 1 : stream *s = *(stream **) S;
114 1 : (void) ret;
115 :
116 1 : if (!mnstr_writeInt(s, *data))
117 0 : throw(IO, "streams.writeInt", "failed to write int");
118 :
119 : return MAL_SUCCEED;
120 : }
121 :
122 : static str
123 1 : mnstr_readIntwrap(int *ret, Stream *S)
124 : {
125 1 : stream *s = *(stream **) S;
126 :
127 1 : if (mnstr_readInt(s, ret) != 1)
128 0 : throw(IO, "streams.readInt", "failed to read int");
129 :
130 : return MAL_SUCCEED;
131 : }
132 :
133 : #define CHUNK (64 * 1024)
134 : static str
135 1 : mnstr_read_stringwrap(str *res, Stream *S)
136 : {
137 1 : stream *s = *(stream **) S;
138 1 : ssize_t len = 0;
139 1 : size_t size = CHUNK +1;
140 1 : char *buf = GDKmalloc(size), *start = buf, *tmp;
141 :
142 1 : if (buf == NULL)
143 0 : throw(MAL, "mnstr_read_stringwrap", SQLSTATE(HY013) MAL_MALLOC_FAIL);
144 2 : while ((len = mnstr_read(s, start, 1, CHUNK)) > 0) {
145 1 : size += len;
146 1 : tmp = GDKrealloc(buf, size);
147 1 : if (tmp == NULL) {
148 0 : GDKfree(buf);
149 0 : throw(MAL, "mnstr_read_stringwrap",
150 : SQLSTATE(HY013) MAL_MALLOC_FAIL);
151 : }
152 1 : buf = tmp;
153 1 : start = buf + size - CHUNK -1;
154 :
155 1 : *start = '\0';
156 : }
157 1 : if (len < 0)
158 0 : throw(IO, "streams.readStr", "failed to read string");
159 1 : start += len;
160 1 : *start = '\0';
161 1 : *res = buf;
162 :
163 1 : return MAL_SUCCEED;
164 : }
165 :
166 : static str
167 1 : mnstr_flush_streamwrap(void *ret, Stream *S)
168 : {
169 1 : stream *s = *(stream **) S;
170 1 : (void) ret;
171 :
172 1 : if (mnstr_flush(s, MNSTR_FLUSH_DATA))
173 0 : throw(IO, "streams.flush", "failed to flush stream");
174 :
175 : return MAL_SUCCEED;
176 : }
177 :
178 : static str
179 4 : mnstr_close_streamwrap(void *ret, Stream *S)
180 : {
181 4 : (void) ret;
182 :
183 4 : close_stream(*(stream **) S);
184 :
185 4 : return MAL_SUCCEED;
186 : }
187 :
188 : static str
189 0 : open_block_streamwrap(Stream *S, Stream *is)
190 : {
191 0 : if ((*(stream **) S = block_stream(*(stream **) is)) == NULL)
192 0 : throw(IO, "bstreams.open", "failed to open block stream");
193 :
194 : return MAL_SUCCEED;
195 : }
196 :
197 : static str
198 0 : bstream_create_wrapwrap(Bstream *Bs, Stream *S, int *bufsize)
199 : {
200 0 : if ((*(bstream **) Bs = bstream_create(*(stream **) S,
201 0 : (size_t) *bufsize)) == NULL)
202 0 : throw(IO, "bstreams.create", "failed to create block stream");
203 :
204 : return MAL_SUCCEED;
205 : }
206 :
207 : static str
208 0 : bstream_destroy_wrapwrap(void *ret, Bstream *BS)
209 : {
210 0 : (void) ret;
211 :
212 0 : bstream_destroy(*(bstream **) BS);
213 :
214 0 : return MAL_SUCCEED;
215 : }
216 :
217 : static str
218 0 : bstream_read_wrapwrap(int *res, Bstream *BS, int *size)
219 : {
220 0 : *res = (int) bstream_read(*(bstream **) BS, (size_t) *size);
221 :
222 0 : return MAL_SUCCEED;
223 : }
224 :
225 : #include "mel.h"
226 : mel_atom streams_init_atoms[] = {
227 : { .name="streams", .basetype="ptr", },
228 : { .name="bstream", .basetype="ptr", }, { .cmp=NULL }
229 : };
230 : mel_func streams_init_funcs[] = {
231 : command("streams", "openReadBytes", mnstr_open_rstreamwrap, true, "open a file stream for reading", args(1,2, arg("",streams),arg("filename",str))),
232 : command("streams", "openWriteBytes", mnstr_open_wstreamwrap, true, "open a file stream for writing", args(1,2, arg("",streams),arg("filename",str))),
233 : command("streams", "openRead", mnstr_open_rastreamwrap, true, "open ascii file stream for reading", args(1,2, arg("",streams),arg("filename",str))),
234 : command("streams", "openWrite", mnstr_open_wastreamwrap, true, "open ascii file stream for writing", args(1,2, arg("",streams),arg("filename",str))),
235 : command("streams", "blocked", open_block_streamwrap, true, "open a block based stream", args(1,2, arg("",streams),arg("s",streams))),
236 : command("streams", "writeStr", mnstr_write_stringwrap, true, "write data on the stream", args(1,3, arg("",void),arg("s",streams),arg("data",str))),
237 : command("streams", "writeInt", mnstr_writeIntwrap, true, "write data on the stream", args(1,3, arg("",void),arg("s",streams),arg("data",int))),
238 : command("streams", "readStr", mnstr_read_stringwrap, true, "read string data from the stream", args(1,2, arg("",str),arg("s",streams))),
239 : command("streams", "readInt", mnstr_readIntwrap, true, "read integer data from the stream", args(1,2, arg("",int),arg("s",streams))),
240 : command("streams", "flush", mnstr_flush_streamwrap, true, "flush the stream", args(0,1, arg("s",streams))),
241 : command("streams", "close", mnstr_close_streamwrap, true, "close and destroy the stream s", args(0,1, arg("s",streams))),
242 : command("streams", "create", bstream_create_wrapwrap, true, "create a buffered stream", args(1,3, arg("",bstream),arg("s",streams),arg("bufsize",int))),
243 : command("streams", "destroy", bstream_destroy_wrapwrap, true, "destroy bstream", args(0,1, arg("s",bstream))),
244 : command("streams", "read", bstream_read_wrapwrap, true, "read at least size bytes into the buffer of s", args(1,3, arg("",int),arg("s",bstream),arg("size",int))),
245 : { .imp=NULL }
246 : };
247 : #include "mal_import.h"
248 : #ifdef _MSC_VER
249 : #undef read
250 : #pragma section(".CRT$XCU",read)
251 : #endif
252 334 : LIB_STARTUP_FUNC(init_streams_mal)
253 334 : { mal_module("streams", streams_init_atoms, streams_init_funcs); }
|