Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "stream.h"
15 : #include "stream_internal.h"
16 :
17 : /* ------------------------------------------------------------------ */
18 :
19 : /* A buffered stream consists of a sequence of blocks. Each block
20 : * consists of a count followed by the data in the block. A flush is
21 : * indicated by an empty block (i.e. just a count of 0).
22 : */
23 :
24 : static bs *
25 77760 : bs_create(void)
26 : {
27 : /* should be a binary stream */
28 77760 : bs *ns;
29 :
30 77760 : if ((ns = malloc(sizeof(*ns))) == NULL)
31 : return NULL;
32 77760 : *ns = (bs) {0};
33 77760 : return ns;
34 : }
35 :
36 : /* Collect data until the internal buffer is filled, then write the
37 : * filled buffer to the underlying stream.
38 : * Struct field usage:
39 : * s - the underlying stream;
40 : * buf - the buffer in which data is collected;
41 : * nr - how much of buf is already filled (if nr == sizeof(buf) the
42 : * data is written to the underlying stream, so upon entry nr <
43 : * sizeof(buf));
44 : * itotal - unused.
45 : */
46 : ssize_t
47 29074688 : bs_write(stream *restrict ss, const void *restrict buf, size_t elmsize, size_t cnt)
48 : {
49 29074688 : bs *s;
50 29074688 : size_t todo = cnt * elmsize;
51 29074688 : uint16_t blksize;
52 :
53 29074688 : s = (bs *) ss->stream_data.p;
54 29074688 : if (s == NULL)
55 : return -1;
56 29074688 : assert(!ss->readonly);
57 29074688 : assert(s->nr < sizeof(s->buf));
58 58533258 : while (todo > 0) {
59 29458547 : size_t n = sizeof(s->buf) - s->nr;
60 :
61 29458547 : if (todo < n)
62 : n = todo;
63 29458547 : memcpy(s->buf + s->nr, buf, n);
64 29458547 : s->nr += (unsigned) n;
65 29458547 : todo -= n;
66 29458547 : buf = ((const char *) buf + n);
67 29458547 : if (s->nr == sizeof(s->buf)) {
68 : /* block is full, write it to the stream */
69 : #ifdef BSTREAM_DEBUG
70 : {
71 : unsigned i;
72 :
73 : fprintf(stderr, "W %s %u \"", ss->name, s->nr);
74 : for (i = 0; i < s->nr; i++)
75 : if (' ' <= s->buf[i] && s->buf[i] < 127)
76 : putc(s->buf[i], stderr);
77 : else
78 : fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
79 : fprintf(stderr, "\"\n");
80 : }
81 : #endif
82 : /* since the block is at max BLOCK (8K) - 2 size we can
83 : * store it in a two byte integer */
84 397261 : blksize = (uint16_t) s->nr;
85 397261 : s->bytes += s->nr;
86 : /* the last bit tells whether a flush is in
87 : * there, it's not at this moment, so shift it
88 : * to the left */
89 397261 : blksize <<= 1;
90 397261 : if (!mnstr_writeSht(ss->inner, (int16_t) blksize) ||
91 397284 : ss->inner->write(ss->inner, s->buf, 1, s->nr) != (ssize_t) s->nr) {
92 0 : mnstr_copy_error(ss, ss->inner);
93 0 : s->nr = 0; /* data is lost due to error */
94 0 : return -1;
95 : }
96 397284 : s->blks++;
97 397284 : s->nr = 0;
98 : }
99 : }
100 29074711 : return (ssize_t) cnt;
101 : }
102 :
103 : /* If the internal buffer is partially filled, write it to the
104 : * underlying stream. Then in any case write an empty buffer to the
105 : * underlying stream to indicate to the receiver that the data was
106 : * flushed.
107 : */
108 : static int
109 594508 : bs_flush(stream *ss, mnstr_flush_level flush_level)
110 : {
111 594508 : uint16_t blksize;
112 594508 : bs *s;
113 :
114 594508 : s = (bs *) ss->stream_data.p;
115 594508 : if (s == NULL)
116 : return -1;
117 594508 : assert(!ss->readonly);
118 594508 : assert(s->nr < sizeof(s->buf));
119 594508 : if (!ss->readonly) {
120 : /* flush the rest of buffer (if s->nr > 0), then set the
121 : * last bit to 1 to to indicate user-instigated flush */
122 : #ifdef BSTREAM_DEBUG
123 : if (s->nr > 0) {
124 : unsigned i;
125 :
126 : fprintf(stderr, "W %s %u \"", ss->name, s->nr);
127 : for (i = 0; i < s->nr; i++)
128 : if (' ' <= s->buf[i] && s->buf[i] < 127)
129 : putc(s->buf[i], stderr);
130 : else
131 : fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
132 : fprintf(stderr, "\"\n");
133 : fprintf(stderr, "W %s 0\n", ss->name);
134 : }
135 : #endif
136 594508 : blksize = (uint16_t) (s->nr << 1);
137 594508 : s->bytes += s->nr;
138 : /* indicate that this is the last buffer of a block by
139 : * setting the low-order bit */
140 594508 : blksize |= 1;
141 : /* always flush (even empty blocks) needed for the protocol) */
142 594508 : if ((!mnstr_writeSht(ss->inner, (int16_t) blksize) ||
143 594366 : (s->nr > 0 &&
144 504007 : ss->inner->write(ss->inner, s->buf, 1, s->nr) != (ssize_t) s->nr))) {
145 148 : mnstr_copy_error(ss, ss->inner);
146 148 : s->nr = 0; /* data is lost due to error */
147 148 : return -1;
148 : }
149 : // shouldn't we flush ss->inner too?
150 594222 : (void) flush_level;
151 594222 : s->blks++;
152 594222 : s->nr = 0;
153 : }
154 594222 : return 0;
155 : }
156 :
157 : /* Read buffered data and return the number of items read. At the
158 : * flush boundary we will return 0 to indicate the end of a block,
159 : * unless prompt and pstream are set. In that case, only return 0
160 : * after the prompt has been written to pstream and another read
161 : * attempt immediately returns a block boundary.
162 : *
163 : * Structure field usage:
164 : * s - the underlying stream;
165 : * buf - not used;
166 : * itotal - the amount of data in the current block that hasn't yet
167 : * been read;
168 : * nr - indicates whether the flush marker has to be returned.
169 : */
170 : ssize_t
171 6622622 : bs_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
172 : {
173 6622622 : bs *s;
174 6622622 : size_t todo = cnt * elmsize;
175 6622622 : size_t n;
176 :
177 6622622 : s = (bs *) ss->stream_data.p;
178 6622622 : if (s == NULL)
179 : return -1;
180 6622622 : assert(ss->readonly);
181 6622622 : assert(s->nr <= 1);
182 :
183 6622622 : if (s->itotal == 0) {
184 1145862 : int16_t blksize = 0;
185 :
186 1145862 : if (s->nr) {
187 : /* We read the closing block but hadn't
188 : * returned that yet. Return it now, and note
189 : * that we did by setting s->nr to 0. */
190 550397 : assert(s->nr == 1);
191 550397 : s->nr = 0;
192 1138414 : return 0;
193 : }
194 :
195 595465 : assert(s->nr == 0);
196 :
197 : /* There is nothing more to read in the current block,
198 : * so read the count for the next block */
199 595465 : switch (mnstr_readSht(ss->inner, &blksize)) {
200 19 : case -1:
201 19 : mnstr_copy_error(ss, ss->inner);
202 19 : return -1;
203 37601 : case 0:
204 37601 : ss->eof |= ss->inner->eof;
205 37601 : return 0;
206 : case 1:
207 : break;
208 : }
209 558052 : if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
210 0 : mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %d", blksize);
211 0 : return -1;
212 : }
213 : #ifdef BSTREAM_DEBUG
214 : fprintf(stderr, "RC size: %u, final: %s\n", (uint16_t) blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
215 : fprintf(stderr, "RC %s %u\n", ss->name, (uint16_t) blksize);
216 : #endif
217 558052 : s->itotal = (uint16_t) blksize >> 1; /* amount readable */
218 : /* store whether this was the last block or not */
219 558052 : s->nr = (uint16_t) blksize & 1;
220 558052 : s->bytes += s->itotal;
221 558052 : s->blks++;
222 : }
223 :
224 : /* Fill the caller's buffer. */
225 : cnt = 0; /* count how much we put into the buffer */
226 11736765 : while (todo > 0) {
227 : /* there is more data waiting in the current block, so
228 : * read it */
229 6260337 : n = todo < s->itotal ? todo : s->itotal;
230 12504866 : while (n > 0) {
231 6244893 : ssize_t m = ss->inner->read(ss->inner, buf, 1, n);
232 :
233 6244529 : if (m <= 0) {
234 0 : ss->eof |= ss->inner->eof;
235 0 : mnstr_copy_error(ss, ss->inner);
236 0 : return -1;
237 : }
238 : #ifdef BSTREAM_DEBUG
239 : {
240 : ssize_t i;
241 :
242 : fprintf(stderr, "RD %s %zd \"", ss->name, m);
243 : for (i = 0; i < m; i++)
244 : if (' ' <= ((char *) buf)[i] &&
245 : ((char *) buf)[i] < 127)
246 : putc(((char *) buf)[i], stderr);
247 : else
248 : fprintf(stderr, "\\%03o", ((unsigned char *) buf)[i]);
249 : fprintf(stderr, "\"\n");
250 : }
251 : #endif
252 6244529 : buf = (void *) ((char *) buf + m);
253 6244529 : cnt += (size_t) m;
254 6244529 : n -= (size_t) m;
255 6244529 : s->itotal -= (unsigned) m;
256 6244529 : todo -= (size_t) m;
257 : }
258 :
259 6259973 : if (s->itotal == 0) {
260 976842 : int16_t blksize = 0;
261 :
262 : /* The current block has been completely read,
263 : * so read the count for the next block, only
264 : * if the previous was not the last one */
265 976842 : if (s->nr)
266 : break;
267 418822 : switch (mnstr_readSht(ss->inner, &blksize)) {
268 0 : case -1:
269 0 : mnstr_copy_error(ss, ss->inner);
270 0 : return -1;
271 0 : case 0:
272 0 : ss->eof |= ss->inner->eof;
273 0 : return 0;
274 : case 1:
275 : break;
276 : }
277 418822 : if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
278 0 : mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %d", blksize);
279 0 : return -1;
280 : }
281 : #ifdef BSTREAM_DEBUG
282 : fprintf(stderr, "RC size: %d, final: %s\n", (uint16_t) blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
283 : fprintf(stderr, "RC %s %d\n", ss->name, s->nr);
284 : fprintf(stderr, "RC %s %d\n", ss->name, blksize);
285 : #endif
286 418822 : s->itotal = (uint16_t) blksize >> 1; /* amount readable */
287 : /* store whether this was the last block or not */
288 418822 : s->nr = (uint16_t) blksize & 1;
289 418822 : s->bytes += s->itotal;
290 418822 : s->blks++;
291 : }
292 : }
293 : /* if we got an empty block with the end-of-sequence marker
294 : * set (low-order bit) we must only return an empty read once,
295 : * so we must squash the flag that we still have to return an
296 : * empty read */
297 6034448 : if (todo > 0 && cnt == 0)
298 7524 : s->nr = 0;
299 6034448 : return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0);
300 : }
301 :
302 :
303 :
304 : static void
305 77746 : bs_close(stream *ss)
306 : {
307 77746 : bs *s;
308 :
309 77746 : s = (bs *) ss->stream_data.p;
310 77746 : assert(s);
311 77746 : if (s == NULL)
312 : return;
313 77746 : if (!ss->readonly && s->nr > 0)
314 14 : bs_flush(ss, MNSTR_FLUSH_DATA);
315 77746 : mnstr_close(ss->inner);
316 : }
317 :
318 : void
319 77742 : bs_destroy(stream *ss)
320 : {
321 77742 : bs *s;
322 :
323 77742 : s = (bs *) ss->stream_data.p;
324 77742 : assert(s);
325 77742 : if (s) {
326 77742 : if (ss->inner)
327 77742 : ss->inner->destroy(ss->inner);
328 77746 : free(s);
329 : }
330 77746 : destroy_stream(ss);
331 77745 : }
332 :
333 : void
334 0 : bs_clrerr(stream *s)
335 : {
336 0 : if (s->stream_data.p)
337 0 : mnstr_clearerr(s->inner);
338 0 : }
339 :
340 : stream *
341 0 : bs_stream(stream *s)
342 : {
343 0 : assert(isa_block_stream(s));
344 0 : return s->inner;
345 : }
346 :
347 : stream *
348 77760 : block_stream(stream *s)
349 : {
350 77760 : stream *ns;
351 77760 : bs *b;
352 :
353 77760 : if (s == NULL)
354 : return NULL;
355 : #ifdef STREAM_DEBUG
356 : fprintf(stderr, "block_stream %s\n", s->name ? s->name : "<unnamed>");
357 : #endif
358 77760 : if ((ns = create_wrapper_stream(NULL, s)) == NULL)
359 : return NULL;
360 77760 : if ((b = bs_create()) == NULL) {
361 0 : destroy_stream(ns);
362 0 : mnstr_set_open_error(s->name, 0, "bs_create failed");
363 0 : return NULL;
364 : }
365 : /* blocksizes have a fixed little endian byteorder */
366 : #ifdef WORDS_BIGENDIAN
367 : s->swapbytes = true;
368 : #endif
369 :
370 77760 : ns->flush = bs_flush;
371 77760 : ns->read = bs_read;
372 77760 : ns->write = bs_write;
373 77760 : ns->close = bs_close;
374 77760 : ns->destroy = bs_destroy;
375 77760 : ns->stream_data.p = (void *) b;
376 :
377 77760 : return ns;
378 : }
|