Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "stream.h"
15 : #include "stream_internal.h"
16 :
17 : /* ------------------------------------------------------------------ */
18 :
19 : /* A buffered stream consists of a sequence of blocks. Each block
20 : * consists of a count followed by the data in the block. A flush is
21 : * indicated by an empty block (i.e. just a count of 0).
22 : */
23 :
24 : static bs *
25 78178 : bs_create(void)
26 : {
27 : /* should be a binary stream */
28 78178 : bs *ns;
29 :
30 78178 : if ((ns = malloc(sizeof(*ns))) == NULL)
31 : return NULL;
32 78178 : *ns = (bs) {0};
33 78178 : return ns;
34 : }
35 :
36 : /* Collect data until the internal buffer is filled, then write the
37 : * filled buffer to the underlying stream.
38 : * Struct field usage:
39 : * s - the underlying stream;
40 : * buf - the buffer in which data is collected;
41 : * nr - how much of buf is already filled (if nr == sizeof(buf) the
42 : * data is written to the underlying stream, so upon entry nr <
43 : * sizeof(buf));
44 : * itotal - unused.
45 : */
46 : ssize_t
47 30372667 : bs_write(stream *restrict ss, const void *restrict buf, size_t elmsize, size_t cnt)
48 : {
49 30372667 : bs *s;
50 30372667 : size_t todo = cnt * elmsize;
51 30372667 : uint16_t blksize;
52 :
53 30372667 : s = (bs *) ss->stream_data.p;
54 30372667 : if (s == NULL)
55 : return -1;
56 30372667 : assert(!ss->readonly);
57 30372667 : assert(s->nr < sizeof(s->buf));
58 61148733 : while (todo > 0) {
59 30776050 : size_t n = sizeof(s->buf) - s->nr;
60 :
61 30776050 : if (todo < n)
62 : n = todo;
63 30776050 : memcpy(s->buf + s->nr, buf, n);
64 30776050 : s->nr += (unsigned) n;
65 30776050 : todo -= n;
66 30776050 : buf = ((const char *) buf + n);
67 30776050 : if (s->nr == sizeof(s->buf)) {
68 : /* block is full, write it to the stream */
69 : #ifdef BSTREAM_DEBUG
70 : {
71 : unsigned i;
72 :
73 : fprintf(stderr, "W %s %u \"", ss->name, s->nr);
74 : for (i = 0; i < s->nr; i++)
75 : if (' ' <= s->buf[i] && s->buf[i] < 127)
76 : putc(s->buf[i], stderr);
77 : else
78 : fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
79 : fprintf(stderr, "\"\n");
80 : }
81 : #endif
82 : /* since the block is at max BLOCK (8K) - 2 size we can
83 : * store it in a two byte integer */
84 416340 : blksize = (uint16_t) s->nr;
85 416340 : s->bytes += s->nr;
86 : /* the last bit tells whether a flush is in
87 : * there, it's not at this moment, so shift it
88 : * to the left */
89 416340 : blksize <<= 1;
90 416340 : if (!mnstr_writeSht(ss->inner, (int16_t) blksize) ||
91 416356 : ss->inner->write(ss->inner, s->buf, 1, s->nr) != (ssize_t) s->nr) {
92 0 : mnstr_copy_error(ss, ss->inner);
93 0 : s->nr = 0; /* data is lost due to error */
94 0 : return -1;
95 : }
96 416356 : s->blks++;
97 416356 : s->nr = 0;
98 : }
99 : }
100 30372683 : return (ssize_t) cnt;
101 : }
102 :
103 : /* If the internal buffer is partially filled, write it to the
104 : * underlying stream. Then in any case write an empty buffer to the
105 : * underlying stream to indicate to the receiver that the data was
106 : * flushed.
107 : */
108 : static int
109 607388 : bs_flush(stream *ss, mnstr_flush_level flush_level)
110 : {
111 607388 : uint16_t blksize;
112 607388 : bs *s;
113 :
114 607388 : s = (bs *) ss->stream_data.p;
115 607388 : if (s == NULL)
116 : return -1;
117 607388 : assert(!ss->readonly);
118 607388 : assert(s->nr < sizeof(s->buf));
119 607388 : if (!ss->readonly) {
120 : /* flush the rest of buffer (if s->nr > 0), then set the
121 : * last bit to 1 to to indicate user-instigated flush */
122 : #ifdef BSTREAM_DEBUG
123 : if (s->nr > 0) {
124 : unsigned i;
125 :
126 : fprintf(stderr, "W %s %u \"", ss->name, s->nr);
127 : for (i = 0; i < s->nr; i++)
128 : if (' ' <= s->buf[i] && s->buf[i] < 127)
129 : putc(s->buf[i], stderr);
130 : else
131 : fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
132 : fprintf(stderr, "\"\n");
133 : fprintf(stderr, "W %s 0\n", ss->name);
134 : }
135 : #endif
136 607388 : blksize = (uint16_t) (s->nr << 1);
137 607388 : s->bytes += s->nr;
138 : /* indicate that this is the last buffer of a block by
139 : * setting the low-order bit */
140 607388 : blksize |= 1;
141 : /* always flush (even empty blocks) needed for the protocol) */
142 607388 : if ((!mnstr_writeSht(ss->inner, (int16_t) blksize) ||
143 607389 : (s->nr > 0 &&
144 516271 : ss->inner->write(ss->inner, s->buf, 1, s->nr) != (ssize_t) s->nr))) {
145 166 : mnstr_copy_error(ss, ss->inner);
146 166 : s->nr = 0; /* data is lost due to error */
147 166 : return -1;
148 : }
149 : // shouldn't we flush ss->inner too?
150 607224 : (void) flush_level;
151 607224 : s->blks++;
152 607224 : s->nr = 0;
153 : }
154 607224 : return 0;
155 : }
156 :
157 : /* Read buffered data and return the number of items read. At the
158 : * flush boundary we will return 0 to indicate the end of a block,
159 : * unless prompt and pstream are set. In that case, only return 0
160 : * after the prompt has been written to pstream and another read
161 : * attempt immediately returns a block boundary.
162 : *
163 : * Structure field usage:
164 : * s - the underlying stream;
165 : * buf - not used;
166 : * itotal - the amount of data in the current block that hasn't yet
167 : * been read;
168 : * nr - indicates whether the flush marker has to be returned.
169 : */
170 : ssize_t
171 6669494 : bs_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
172 : {
173 6669494 : bs *s;
174 6669494 : size_t todo = cnt * elmsize;
175 6669494 : size_t n;
176 :
177 6669494 : s = (bs *) ss->stream_data.p;
178 6669494 : if (s == NULL)
179 : return -1;
180 6669494 : assert(ss->readonly);
181 6669494 : assert(s->nr <= 1);
182 :
183 6669494 : if (s->itotal == 0) {
184 1171398 : int16_t blksize = 0;
185 :
186 1171398 : if (s->nr) {
187 : /* We read the closing block but hadn't
188 : * returned that yet. Return it now, and note
189 : * that we did by setting s->nr to 0. */
190 562846 : assert(s->nr == 1);
191 562846 : s->nr = 0;
192 1163506 : return 0;
193 : }
194 :
195 608552 : assert(s->nr == 0);
196 :
197 : /* There is nothing more to read in the current block,
198 : * so read the count for the next block */
199 608552 : switch (mnstr_readSht(ss->inner, &blksize)) {
200 19 : case -1:
201 19 : mnstr_copy_error(ss, ss->inner);
202 19 : return -1;
203 37795 : case 0:
204 37795 : ss->eof |= ss->inner->eof;
205 37795 : return 0;
206 : case 1:
207 : break;
208 : }
209 570732 : if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
210 0 : mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %d", blksize);
211 0 : return -1;
212 : }
213 : #ifdef BSTREAM_DEBUG
214 : fprintf(stderr, "RC size: %u, final: %s\n", (uint16_t) blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
215 : fprintf(stderr, "RC %s %u\n", ss->name, (uint16_t) blksize);
216 : #endif
217 570732 : s->itotal = (uint16_t) blksize >> 1; /* amount readable */
218 : /* store whether this was the last block or not */
219 570732 : s->nr = (uint16_t) blksize & 1;
220 570732 : s->bytes += s->itotal;
221 570732 : s->blks++;
222 : }
223 :
224 : /* Fill the caller's buffer. */
225 : cnt = 0; /* count how much we put into the buffer */
226 11792735 : while (todo > 0) {
227 : /* there is more data waiting in the current block, so
228 : * read it */
229 6294622 : n = todo < s->itotal ? todo : s->itotal;
230 12573043 : while (n > 0) {
231 6278413 : ssize_t m = ss->inner->read(ss->inner, buf, 1, n);
232 :
233 6278421 : if (m <= 0) {
234 0 : ss->eof |= ss->inner->eof;
235 0 : mnstr_copy_error(ss, ss->inner);
236 0 : return -1;
237 : }
238 : #ifdef BSTREAM_DEBUG
239 : {
240 : ssize_t i;
241 :
242 : fprintf(stderr, "RD %s %zd \"", ss->name, m);
243 : for (i = 0; i < m; i++)
244 : if (' ' <= ((char *) buf)[i] &&
245 : ((char *) buf)[i] < 127)
246 : putc(((char *) buf)[i], stderr);
247 : else
248 : fprintf(stderr, "\\%03o", ((unsigned char *) buf)[i]);
249 : fprintf(stderr, "\"\n");
250 : }
251 : #endif
252 6278421 : buf = (void *) ((char *) buf + m);
253 6278421 : cnt += (size_t) m;
254 6278421 : n -= (size_t) m;
255 6278421 : s->itotal -= (unsigned) m;
256 6278421 : todo -= (size_t) m;
257 : }
258 :
259 6294630 : if (s->itotal == 0) {
260 1008881 : int16_t blksize = 0;
261 :
262 : /* The current block has been completely read,
263 : * so read the count for the next block, only
264 : * if the previous was not the last one */
265 1008881 : if (s->nr)
266 : break;
267 438158 : switch (mnstr_readSht(ss->inner, &blksize)) {
268 0 : case -1:
269 0 : mnstr_copy_error(ss, ss->inner);
270 0 : return -1;
271 0 : case 0:
272 0 : ss->eof |= ss->inner->eof;
273 0 : return 0;
274 : case 1:
275 : break;
276 : }
277 438158 : if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
278 0 : mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %d", blksize);
279 0 : return -1;
280 : }
281 : #ifdef BSTREAM_DEBUG
282 : fprintf(stderr, "RC size: %d, final: %s\n", (uint16_t) blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
283 : fprintf(stderr, "RC %s %d\n", ss->name, s->nr);
284 : fprintf(stderr, "RC %s %d\n", ss->name, blksize);
285 : #endif
286 438158 : s->itotal = (uint16_t) blksize >> 1; /* amount readable */
287 : /* store whether this was the last block or not */
288 438158 : s->nr = (uint16_t) blksize & 1;
289 438158 : s->bytes += s->itotal;
290 438158 : s->blks++;
291 : }
292 : }
293 : /* if we got an empty block with the end-of-sequence marker
294 : * set (low-order bit) we must only return an empty read once,
295 : * so we must squash the flag that we still have to return an
296 : * empty read */
297 6068836 : if (todo > 0 && cnt == 0)
298 7893 : s->nr = 0;
299 6068836 : return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0);
300 : }
301 :
302 :
303 :
304 : static void
305 78164 : bs_close(stream *ss)
306 : {
307 78164 : bs *s;
308 :
309 78164 : s = (bs *) ss->stream_data.p;
310 78164 : assert(s);
311 78164 : if (s == NULL)
312 : return;
313 78164 : if (!ss->readonly && s->nr > 0)
314 14 : bs_flush(ss, MNSTR_FLUSH_DATA);
315 78164 : mnstr_close(ss->inner);
316 : }
317 :
318 : void
319 78164 : bs_destroy(stream *ss)
320 : {
321 78164 : bs *s;
322 :
323 78164 : s = (bs *) ss->stream_data.p;
324 78164 : assert(s);
325 78164 : if (s) {
326 78164 : if (ss->inner)
327 78164 : ss->inner->destroy(ss->inner);
328 78164 : free(s);
329 : }
330 78164 : destroy_stream(ss);
331 78164 : }
332 :
333 : void
334 0 : bs_clrerr(stream *s)
335 : {
336 0 : if (s->stream_data.p)
337 0 : mnstr_clearerr(s->inner);
338 0 : }
339 :
340 : stream *
341 0 : bs_stream(stream *s)
342 : {
343 0 : assert(isa_block_stream(s));
344 0 : return s->inner;
345 : }
346 :
347 : stream *
348 78177 : block_stream(stream *s)
349 : {
350 78177 : stream *ns;
351 78177 : bs *b;
352 :
353 78177 : if (s == NULL)
354 : return NULL;
355 : #ifdef STREAM_DEBUG
356 : fprintf(stderr, "block_stream %s\n", s->name ? s->name : "<unnamed>");
357 : #endif
358 78177 : if ((ns = create_wrapper_stream(NULL, s)) == NULL)
359 : return NULL;
360 78178 : if ((b = bs_create()) == NULL) {
361 0 : destroy_stream(ns);
362 0 : mnstr_set_open_error(s->name, 0, "bs_create failed");
363 0 : return NULL;
364 : }
365 : /* blocksizes have a fixed little endian byteorder */
366 : #ifdef WORDS_BIGENDIAN
367 : s->swapbytes = true;
368 : #endif
369 :
370 78178 : ns->flush = bs_flush;
371 78178 : ns->read = bs_read;
372 78178 : ns->write = bs_write;
373 78178 : ns->close = bs_close;
374 78178 : ns->destroy = bs_destroy;
375 78178 : ns->stream_data.p = (void *) b;
376 :
377 78178 : return ns;
378 : }
|