Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "stream.h"
15 : #include "stream_internal.h"
16 :
17 : /* ------------------------------------------------------------------ */
18 :
19 : /* A buffered stream consists of a sequence of blocks. Each block
20 : * consists of a count followed by the data in the block. A flush is
21 : * indicated by an empty block (i.e. just a count of 0).
22 : */
23 :
24 : static bs *
25 80134 : bs_create(void)
26 : {
27 : /* should be a binary stream */
28 80134 : bs *ns;
29 :
30 80134 : if ((ns = malloc(sizeof(*ns))) == NULL)
31 : return NULL;
32 80134 : *ns = (bs) {0};
33 80134 : return ns;
34 : }
35 :
36 : /* Collect data until the internal buffer is filled, then write the
37 : * filled buffer to the underlying stream.
38 : * Struct field usage:
39 : * s - the underlying stream;
40 : * buf - the buffer in which data is collected;
41 : * nr - how much of buf is already filled (if nr == sizeof(buf) the
42 : * data is written to the underlying stream, so upon entry nr <
43 : * sizeof(buf));
44 : * itotal - unused.
45 : */
46 : ssize_t
47 30349318 : bs_write(stream *restrict ss, const void *restrict buf, size_t elmsize, size_t cnt)
48 : {
49 30349318 : bs *s;
50 30349318 : size_t todo = cnt * elmsize;
51 30349318 : uint16_t blksize;
52 :
53 30349318 : s = (bs *) ss->stream_data.p;
54 30349318 : if (s == NULL)
55 : return -1;
56 30349318 : assert(!ss->readonly);
57 30349318 : assert(s->nr < sizeof(s->buf));
58 61102058 : while (todo > 0) {
59 30752742 : size_t n = sizeof(s->buf) - s->nr;
60 :
61 30752742 : if (todo < n)
62 : n = todo;
63 30752742 : memcpy(s->buf + s->nr, buf, n);
64 30752742 : s->nr += (unsigned) n;
65 30752742 : todo -= n;
66 30752742 : buf = ((const char *) buf + n);
67 30752742 : if (s->nr == sizeof(s->buf)) {
68 : /* block is full, write it to the stream */
69 : #ifdef BSTREAM_DEBUG
70 : {
71 : unsigned i;
72 :
73 : fprintf(stderr, "W %s %u \"", ss->name, s->nr);
74 : for (i = 0; i < s->nr; i++)
75 : if (' ' <= s->buf[i] && s->buf[i] < 127)
76 : putc(s->buf[i], stderr);
77 : else
78 : fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
79 : fprintf(stderr, "\"\n");
80 : }
81 : #endif
82 : /* since the block is at max BLOCK (8K) - 2 size we can
83 : * store it in a two byte integer */
84 416367 : blksize = (uint16_t) s->nr;
85 416367 : s->bytes += s->nr;
86 : /* the last bit tells whether a flush is in
87 : * there, it's not at this moment, so shift it
88 : * to the left */
89 416367 : blksize <<= 1;
90 416367 : if (!mnstr_writeSht(ss->inner, (int16_t) blksize) ||
91 416365 : ss->inner->write(ss->inner, s->buf, 1, s->nr) != (ssize_t) s->nr) {
92 0 : mnstr_copy_error(ss, ss->inner);
93 0 : s->nr = 0; /* data is lost due to error */
94 0 : return -1;
95 : }
96 416365 : s->blks++;
97 416365 : s->nr = 0;
98 : }
99 : }
100 30349316 : return (ssize_t) cnt;
101 : }
102 :
103 : /* If the internal buffer is partially filled, write it to the
104 : * underlying stream. Then in any case write an empty buffer to the
105 : * underlying stream to indicate to the receiver that the data was
106 : * flushed.
107 : */
108 : static int
109 608923 : bs_flush(stream *ss, mnstr_flush_level flush_level)
110 : {
111 608923 : uint16_t blksize;
112 608923 : bs *s;
113 :
114 608923 : s = (bs *) ss->stream_data.p;
115 608923 : if (s == NULL)
116 : return -1;
117 608923 : assert(!ss->readonly);
118 608923 : assert(s->nr < sizeof(s->buf));
119 608923 : if (!ss->readonly) {
120 : /* flush the rest of buffer (if s->nr > 0), then set the
121 : * last bit to 1 to to indicate user-instigated flush */
122 : #ifdef BSTREAM_DEBUG
123 : if (s->nr > 0) {
124 : unsigned i;
125 :
126 : fprintf(stderr, "W %s %u \"", ss->name, s->nr);
127 : for (i = 0; i < s->nr; i++)
128 : if (' ' <= s->buf[i] && s->buf[i] < 127)
129 : putc(s->buf[i], stderr);
130 : else
131 : fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
132 : fprintf(stderr, "\"\n");
133 : fprintf(stderr, "W %s 0\n", ss->name);
134 : }
135 : #endif
136 608923 : blksize = (uint16_t) (s->nr << 1);
137 608923 : s->bytes += s->nr;
138 : /* indicate that this is the last buffer of a block by
139 : * setting the low-order bit */
140 608923 : blksize |= 1;
141 : /* always flush (even empty blocks) needed for the protocol) */
142 608923 : if ((!mnstr_writeSht(ss->inner, (int16_t) blksize) ||
143 608931 : (s->nr > 0 &&
144 515995 : ss->inner->write(ss->inner, s->buf, 1, s->nr) != (ssize_t) s->nr))) {
145 165 : mnstr_copy_error(ss, ss->inner);
146 165 : s->nr = 0; /* data is lost due to error */
147 165 : return -1;
148 : }
149 : // shouldn't we flush ss->inner too?
150 608763 : (void) flush_level;
151 608763 : s->blks++;
152 608763 : s->nr = 0;
153 : }
154 608763 : return 0;
155 : }
156 :
157 : /* Read buffered data and return the number of items read. At the
158 : * flush boundary we will return 0 to indicate the end of a block,
159 : * unless prompt and pstream are set. In that case, only return 0
160 : * after the prompt has been written to pstream and another read
161 : * attempt immediately returns a block boundary.
162 : *
163 : * Structure field usage:
164 : * s - the underlying stream;
165 : * buf - not used;
166 : * itotal - the amount of data in the current block that hasn't yet
167 : * been read;
168 : * nr - indicates whether the flush marker has to be returned.
169 : */
170 : ssize_t
171 6671716 : bs_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
172 : {
173 6671716 : bs *s;
174 6671716 : size_t todo = cnt * elmsize;
175 6671716 : size_t n;
176 :
177 6671716 : s = (bs *) ss->stream_data.p;
178 6671716 : if (s == NULL)
179 : return -1;
180 6671716 : assert(ss->readonly);
181 6671716 : assert(s->nr <= 1);
182 :
183 6671716 : if (s->itotal == 0) {
184 1173997 : int16_t blksize = 0;
185 :
186 1173997 : if (s->nr) {
187 : /* We read the closing block but hadn't
188 : * returned that yet. Return it now, and note
189 : * that we did by setting s->nr to 0. */
190 563713 : assert(s->nr == 1);
191 563713 : s->nr = 0;
192 1166217 : return 0;
193 : }
194 :
195 610284 : assert(s->nr == 0);
196 :
197 : /* There is nothing more to read in the current block,
198 : * so read the count for the next block */
199 610284 : switch (mnstr_readSht(ss->inner, &blksize)) {
200 19 : case -1:
201 19 : mnstr_copy_error(ss, ss->inner);
202 19 : return -1;
203 38772 : case 0:
204 38772 : ss->eof |= ss->inner->eof;
205 38772 : return 0;
206 : case 1:
207 : break;
208 : }
209 571497 : if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
210 0 : mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %d", blksize);
211 0 : return -1;
212 : }
213 : #ifdef BSTREAM_DEBUG
214 : fprintf(stderr, "RC size: %u, final: %s\n", (uint16_t) blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
215 : fprintf(stderr, "RC %s %u\n", ss->name, (uint16_t) blksize);
216 : #endif
217 571497 : s->itotal = (uint16_t) blksize >> 1; /* amount readable */
218 : /* store whether this was the last block or not */
219 571497 : s->nr = (uint16_t) blksize & 1;
220 571497 : s->bytes += s->itotal;
221 571497 : s->blks++;
222 : }
223 :
224 : /* Fill the caller's buffer. */
225 : cnt = 0; /* count how much we put into the buffer */
226 11792450 : while (todo > 0) {
227 : /* there is more data waiting in the current block, so
228 : * read it */
229 6294735 : n = todo < s->itotal ? todo : s->itotal;
230 12569506 : while (n > 0) {
231 6274774 : ssize_t m = ss->inner->read(ss->inner, buf, 1, n);
232 :
233 6274771 : if (m <= 0) {
234 0 : ss->eof |= ss->inner->eof;
235 0 : mnstr_copy_error(ss, ss->inner);
236 0 : return -1;
237 : }
238 : #ifdef BSTREAM_DEBUG
239 : {
240 : ssize_t i;
241 :
242 : fprintf(stderr, "RD %s %zd \"", ss->name, m);
243 : for (i = 0; i < m; i++)
244 : if (' ' <= ((char *) buf)[i] &&
245 : ((char *) buf)[i] < 127)
246 : putc(((char *) buf)[i], stderr);
247 : else
248 : fprintf(stderr, "\\%03o", ((unsigned char *) buf)[i]);
249 : fprintf(stderr, "\"\n");
250 : }
251 : #endif
252 6274771 : buf = (void *) ((char *) buf + m);
253 6274771 : cnt += (size_t) m;
254 6274771 : n -= (size_t) m;
255 6274771 : s->itotal -= (unsigned) m;
256 6274771 : todo -= (size_t) m;
257 : }
258 :
259 6294732 : if (s->itotal == 0) {
260 1009191 : int16_t blksize = 0;
261 :
262 : /* The current block has been completely read,
263 : * so read the count for the next block, only
264 : * if the previous was not the last one */
265 1009191 : if (s->nr)
266 : break;
267 437693 : switch (mnstr_readSht(ss->inner, &blksize)) {
268 0 : case -1:
269 0 : mnstr_copy_error(ss, ss->inner);
270 0 : return -1;
271 0 : case 0:
272 0 : ss->eof |= ss->inner->eof;
273 0 : return 0;
274 : case 1:
275 : break;
276 : }
277 437693 : if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
278 0 : mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %d", blksize);
279 0 : return -1;
280 : }
281 : #ifdef BSTREAM_DEBUG
282 : fprintf(stderr, "RC size: %d, final: %s\n", (uint16_t) blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
283 : fprintf(stderr, "RC %s %d\n", ss->name, s->nr);
284 : fprintf(stderr, "RC %s %d\n", ss->name, blksize);
285 : #endif
286 437693 : s->itotal = (uint16_t) blksize >> 1; /* amount readable */
287 : /* store whether this was the last block or not */
288 437693 : s->nr = (uint16_t) blksize & 1;
289 437693 : s->bytes += s->itotal;
290 437693 : s->blks++;
291 : }
292 : }
293 : /* if we got an empty block with the end-of-sequence marker
294 : * set (low-order bit) we must only return an empty read once,
295 : * so we must squash the flag that we still have to return an
296 : * empty read */
297 6069213 : if (todo > 0 && cnt == 0)
298 7788 : s->nr = 0;
299 6069213 : return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0);
300 : }
301 :
302 :
303 :
304 : static void
305 80120 : bs_close(stream *ss)
306 : {
307 80120 : bs *s;
308 :
309 80120 : s = (bs *) ss->stream_data.p;
310 80120 : assert(s);
311 80120 : if (s == NULL)
312 : return;
313 80120 : if (!ss->readonly && s->nr > 0)
314 14 : bs_flush(ss, MNSTR_FLUSH_DATA);
315 80120 : mnstr_close(ss->inner);
316 : }
317 :
318 : void
319 80120 : bs_destroy(stream *ss)
320 : {
321 80120 : bs *s;
322 :
323 80120 : s = (bs *) ss->stream_data.p;
324 80120 : assert(s);
325 80120 : if (s) {
326 80120 : if (ss->inner)
327 80120 : ss->inner->destroy(ss->inner);
328 80120 : free(s);
329 : }
330 80120 : destroy_stream(ss);
331 80120 : }
332 :
333 : void
334 0 : bs_clrerr(stream *s)
335 : {
336 0 : if (s->stream_data.p)
337 0 : mnstr_clearerr(s->inner);
338 0 : }
339 :
340 : stream *
341 0 : bs_stream(stream *s)
342 : {
343 0 : assert(isa_block_stream(s));
344 0 : return s->inner;
345 : }
346 :
347 : stream *
348 80134 : block_stream(stream *s)
349 : {
350 80134 : stream *ns;
351 80134 : bs *b;
352 :
353 80134 : if (s == NULL)
354 : return NULL;
355 : #ifdef STREAM_DEBUG
356 : fprintf(stderr, "block_stream %s\n", s->name ? s->name : "<unnamed>");
357 : #endif
358 80134 : if ((ns = create_wrapper_stream(NULL, s)) == NULL)
359 : return NULL;
360 80134 : if ((b = bs_create()) == NULL) {
361 0 : destroy_stream(ns);
362 0 : mnstr_set_open_error(s->name, 0, "bs_create failed");
363 0 : return NULL;
364 : }
365 : /* blocksizes have a fixed little endian byteorder */
366 : #ifdef WORDS_BIGENDIAN
367 : s->swapbytes = true;
368 : #endif
369 :
370 80134 : ns->flush = bs_flush;
371 80134 : ns->read = bs_read;
372 80134 : ns->write = bs_write;
373 80134 : ns->close = bs_close;
374 80134 : ns->destroy = bs_destroy;
375 80134 : ns->stream_data.p = (void *) b;
376 :
377 80134 : return ns;
378 : }
|