Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /* stream
14 : * ======
15 : * Niels Nes
16 : * An simple interface to streams
17 : *
18 : * Processing files, streams, and sockets is quite different on Linux
19 : * and Windows platforms. To improve portability between both, we advise
20 : * to replace the stdio actions with the stream functionality provided
21 : * here.
22 : *
23 : * This interface can also be used to open 'non compressed, gzipped,
24 : * bz2zipped' data files and sockets. Using this interface one could
25 : * easily switch between the various underlying storage types.
26 : *
27 : * buffered streams
28 : * ----------------
29 : *
30 : * The bstream (or buffered_stream) can be used for efficient reading of
31 : * a stream. Reading can be done in large chunks and access can be done
32 : * in smaller bits, by directly accessing the underlying buffer.
33 : *
34 : * Beware that a flush on a buffered stream emits an empty block to
35 : * synchronize with the other side, telling it has reached the end of
36 : * the sequence and can close its descriptors.
37 : *
38 : * bstream functions
39 : * -----------------
40 : *
41 : * The bstream_create gets a read stream (rs) as input and the initial
42 : * chunk size and creates a buffered stream from this. A spare byte is
43 : * kept at the end of the buffer. The bstream_read will at least read
44 : * the next 'size' bytes. If the not read data (aka pos < len) together
45 : * with the new data will not fit in the current buffer it is resized.
46 : * The spare byte is kept.
47 : *
48 : * tee streams
49 : * -----------
50 : *
51 : * A tee stream is a write stream that duplicates all output to two
52 : * write streams of the same type (txt/bin).
53 : */
54 :
55 : /* Generic stream handling code such as init and close */
56 :
57 : #include "monetdb_config.h"
58 : #include "stream.h"
59 : #include "stream_internal.h"
60 : #include <stdio.h>
61 :
62 :
63 : #ifdef HAVE_PTHREAD_H
64 : #include <pthread.h>
65 : #endif
66 :
67 : struct tl_error_buf {
68 : char msg[1024];
69 : };
70 :
71 : static int tl_error_init(void);
72 : static struct tl_error_buf *get_tl_error_buf(void);
73 :
74 : #ifdef HAVE_PTHREAD_H
75 :
76 : static pthread_key_t tl_error_key;
77 :
78 : static void
79 536 : clear_main_tl_error_buf(void)
80 : {
81 536 : void *p = pthread_getspecific(tl_error_key);
82 536 : if (p != NULL) {
83 536 : pthread_setspecific(tl_error_key, NULL);
84 536 : free(p);
85 : }
86 536 : }
87 :
88 : static int
89 536 : tl_error_init(void)
90 : {
91 536 : if (pthread_key_create(&tl_error_key, free) != 0)
92 : return -1;
93 : // Turns out the destructor registered with pthread_key_create() does not
94 : // always run for the main thread. This atexit hook clears the main thread's
95 : // error buffer to avoid this being reported as a memory leak.
96 536 : atexit(clear_main_tl_error_buf);
97 536 : return 0;
98 : }
99 :
100 : static struct tl_error_buf*
101 388725 : get_tl_error_buf(void)
102 : {
103 388725 : struct tl_error_buf *p = pthread_getspecific(tl_error_key);
104 388725 : if (p == NULL) {
105 3785 : p = malloc(sizeof(*p));
106 3785 : if (p == NULL)
107 : return NULL;
108 3785 : *p = (struct tl_error_buf) { .msg = {0} };
109 3785 : pthread_setspecific(tl_error_key, p);
110 3785 : struct tl_error_buf *second_attempt = pthread_getspecific(tl_error_key);
111 3785 : assert(p == second_attempt && "maybe mnstr_init has not been called?");
112 : (void) second_attempt; // suppress warning if asserts disabled
113 : }
114 : return p;
115 : }
116 :
117 : #elif defined(WIN32)
118 :
119 : static DWORD tl_error_key = 0;
120 :
121 : static int
122 : tl_error_init(void)
123 : {
124 : DWORD key = TlsAlloc();
125 : if (key == TLS_OUT_OF_INDEXES)
126 : return -1;
127 : else {
128 : tl_error_key = key;
129 : return 0;
130 : }
131 : }
132 :
133 : static struct tl_error_buf*
134 : get_tl_error_buf(void)
135 : {
136 : struct tl_error_buf *p = TlsGetValue(tl_error_key);
137 :
138 : if (p == NULL) {
139 : if (GetLastError() != ERROR_SUCCESS)
140 : return NULL; // something went terribly wrong
141 :
142 : // otherwise, initialize
143 : p = malloc(sizeof(*p));
144 : if (p == NULL)
145 : return NULL;
146 : *p = (struct tl_error_buf) { .msg = 0 };
147 : if (!TlsSetValue(tl_error_key, p)) {
148 : free(p);
149 : return NULL;
150 : }
151 :
152 : struct tl_error_buf *second_attempt = TlsGetValue(tl_error_key);
153 : assert(p == second_attempt /* maybe mnstr_init has not been called? */);
154 : (void) second_attempt; // suppress warning if asserts disabled
155 : }
156 : return p;
157 : }
158 :
159 : #else
160 :
161 : #error "no pthreads and no Windows, don't know what to do"
162 :
163 : #endif
164 :
165 : static const char *mnstr_error_kind_description(mnstr_error_kind kind);
166 :
167 : int
168 728 : mnstr_init(void)
169 : {
170 728 : static ATOMIC_FLAG inited = ATOMIC_FLAG_INIT;
171 :
172 728 : if (ATOMIC_TAS(&inited))
173 : return 0;
174 :
175 536 : if (tl_error_init()< 0)
176 : return -1;
177 :
178 : #ifdef NATIVE_WIN32
179 : WSADATA w;
180 : if (WSAStartup(0x0101, &w) != 0)
181 : return -1;
182 : #endif
183 :
184 : return 0;
185 : }
186 :
187 : const char *
188 0 : mnstr_version(void)
189 : {
190 0 : return STREAM_VERSION;
191 : }
192 :
193 : /* Read at most cnt elements of size elmsize from the stream. Returns
194 : * the number of elements actually read or < 0 on failure. */
195 : ssize_t
196 17199170 : mnstr_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
197 : {
198 17199170 : if (s == NULL || buf == NULL)
199 : return -1;
200 : #ifdef STREAM_DEBUG
201 : fprintf(stderr, "read %s %zu %zu\n",
202 : s->name ? s->name : "<unnamed>", elmsize, cnt);
203 : #endif
204 17199170 : assert(s->readonly);
205 17199170 : if (s->errkind != MNSTR_NO__ERROR)
206 : return -1;
207 17199170 : return s->read(s, buf, elmsize, cnt);
208 : }
209 :
210 :
211 : /* Write cnt elements of size elmsize to the stream. Returns the
212 : * number of elements actually written. If elmsize or cnt equals zero,
213 : * returns cnt. */
214 : ssize_t
215 34775875 : mnstr_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
216 : {
217 34775875 : if (s == NULL || buf == NULL)
218 : return -1;
219 : #ifdef STREAM_DEBUG
220 : fprintf(stderr, "write %s %zu %zu\n",
221 : s->name ? s->name : "<unnamed>", elmsize, cnt);
222 : #endif
223 34775873 : assert(!s->readonly);
224 34775873 : if (s->errkind != MNSTR_NO__ERROR)
225 : return -1;
226 34775863 : return s->write(s, buf, elmsize, cnt);
227 : }
228 :
229 :
230 : /* Read one line (seperated by \n) of at most maxcnt-1 characters from
231 : * the stream. Returns the number of characters actually read,
232 : * includes the trailing \n; terminated by a NULL byte. */
233 : ssize_t
234 109188 : mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt)
235 : {
236 109188 : char *b = buf, *start = buf;
237 :
238 109188 : if (s == NULL || buf == NULL)
239 : return -1;
240 : #ifdef STREAM_DEBUG
241 : fprintf(stderr, "readline %s %zu\n",
242 : s->name ? s->name : "<unnamed>", maxcnt);
243 : #endif
244 109188 : assert(s->readonly);
245 109188 : if (s->errkind != MNSTR_NO__ERROR)
246 : return -1;
247 109188 : if (maxcnt == 0)
248 : return 0;
249 109188 : if (maxcnt == 1) {
250 0 : *start = 0;
251 0 : return 0;
252 : }
253 18260978 : for (;;) {
254 18260978 : switch (s->read(s, start, 1, 1)) {
255 18260717 : case 1:
256 : /* successfully read a character,
257 : * check whether it is the line
258 : * separator and whether we have space
259 : * left for more */
260 18260717 : if (*start++ == '\n' || --maxcnt == 1) {
261 108927 : *start = 0;
262 108927 : return (ssize_t) (start - b);
263 : }
264 : break;
265 0 : case -1:
266 : /* error: if we didn't read anything yet,
267 : * return the error, otherwise return what we
268 : * have */
269 0 : if (start == b)
270 : return -1;
271 : /* fall through */
272 : case 0:
273 : /* end of file: return what we have */
274 261 : *start = 0;
275 261 : return (ssize_t) (start - b);
276 : }
277 : }
278 : }
279 :
280 :
281 : void
282 37977 : mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void *data)
283 : {
284 37977 : if (s) {
285 37977 : s->timeout = ms;
286 37977 : s->timeout_func = func;
287 37977 : s->timeout_data = data;
288 37977 : if (s->update_timeout)
289 37977 : s->update_timeout(s);
290 : }
291 37976 : }
292 :
293 :
294 : void
295 79552 : mnstr_close(stream *s)
296 : {
297 79552 : if (s) {
298 : #ifdef STREAM_DEBUG
299 : fprintf(stderr, "close %s\n", s->name ? s->name : "<unnamed>");
300 : #endif
301 79552 : s->close(s);
302 : }
303 79550 : }
304 :
305 :
306 : void
307 1394 : mnstr_destroy(stream *s)
308 : {
309 1394 : if (s) {
310 : #ifdef STREAM_DEBUG
311 : fprintf(stderr, "destroy %s\n",
312 : s->name ? s->name : "<unnamed>");
313 : #endif
314 1388 : s->destroy(s);
315 : }
316 1394 : }
317 :
318 : void
319 234 : mnstr_va_set_error(stream *s, mnstr_error_kind kind, const char *fmt, va_list ap)
320 : {
321 234 : if (s == NULL)
322 : return;
323 :
324 234 : s->errkind = kind;
325 :
326 234 : if (kind == MNSTR_NO__ERROR) {
327 0 : s->errmsg[0] = '\0';
328 0 : return;
329 : }
330 :
331 234 : char *start = &s->errmsg[0];
332 234 : char *end = start + sizeof(s->errmsg);
333 234 : if (s->name != NULL)
334 234 : start += snprintf(start, end - start, "stream %s: ", s->name);
335 :
336 234 : if (start >= end - 1)
337 : return;
338 :
339 234 : if (fmt == NULL)
340 19 : fmt = mnstr_error_kind_description(kind);
341 :
342 : // Complicated pointer dance in order to shut up 'might be a candidate
343 : // for gnu_printf format attribute' warning from gcc.
344 : // It's really eager to trace where the vsnprintf ends up, we need
345 : // the ? : to throw it off its scent.
346 : // Similarly, the parentheses around the 1 serve to suppress a Clang
347 : // warning about dead code (the atoi).
348 234 : void *f1 = (1) ? (void*)&vsnprintf : (void*)&atoi;
349 234 : int (*f)(char *str, size_t size, const char *format, va_list ap) = f1;
350 234 : f(start, end - start, fmt, ap);
351 : }
352 :
353 : void
354 20 : mnstr_set_error(stream *s, mnstr_error_kind kind, const char *fmt, ...)
355 : {
356 20 : va_list ap;
357 20 : va_start(ap, fmt);
358 20 : mnstr_va_set_error(s, kind, fmt, ap);
359 20 : va_end(ap);
360 20 : }
361 :
362 : static size_t my_strerror_r(int error_nr, char *buf, size_t len);
363 :
364 : void
365 214 : mnstr_set_error_errno(stream *s, mnstr_error_kind kind, const char *fmt, ...)
366 : {
367 214 : va_list ap;
368 214 : va_start(ap, fmt);
369 214 : mnstr_va_set_error(s, kind, fmt, ap);
370 214 : va_end(ap);
371 :
372 : /* append as much as fits of the system error message */
373 214 : char *start = &s->errmsg[0] + strlen(s->errmsg);
374 214 : char *end = &s->errmsg[0] + sizeof(s->errmsg);
375 214 : if (end - start >= 3) {
376 214 : start = stpcpy(start, ": ");
377 214 : start += my_strerror_r(errno, start, end - start);
378 : }
379 214 : }
380 :
381 :
382 : void
383 388722 : mnstr_set_open_error(const char *name, int errnr, const char *fmt, ...)
384 : {
385 388722 : va_list ap;
386 :
387 388722 : struct tl_error_buf *buf = get_tl_error_buf();
388 388722 : if (buf == NULL)
389 388568 : return; // hopeless
390 :
391 388722 : if (errnr == 0 && fmt == NULL) {
392 388568 : buf->msg[0] = '\0';
393 388568 : return;
394 : }
395 :
396 154 : char *start = &buf->msg[0];
397 154 : char *end = start + sizeof(buf->msg);
398 :
399 154 : if (name != NULL)
400 154 : start += snprintf(start, end - start, "when opening %s: ", name);
401 154 : if (start >= end - 1)
402 : return;
403 :
404 154 : if (fmt != NULL) {
405 154 : va_start(ap, fmt);
406 154 : start += vsnprintf(start, end - start, fmt, ap);
407 154 : va_end(ap);
408 : }
409 154 : if (start >= end - 1)
410 : return;
411 :
412 154 : if (errnr != 0 && end - start >= 3) {
413 152 : start = stpcpy(start, ": ");
414 152 : start += my_strerror_r(errno, start, end - start);
415 : }
416 154 : if (start >= end - 1)
417 : return;
418 : }
419 :
420 : static size_t
421 366 : my_strerror_r(int error_nr, char *buf, size_t buflen)
422 : {
423 : // Three cases:
424 : // 1. no strerror_r
425 : // 2. gnu strerror_r (returns char* and does not always fill buffer)
426 : // 3. xsi strerror_r (returns int and always fills the buffer)
427 366 : char *to_move;
428 : #ifndef HAVE_STRERROR_R
429 : // Hope for the best
430 : to_move = strerror(error_nr);
431 : #elif !defined(_GNU_SOURCE) || !_GNU_SOURCE
432 : // standard strerror_r always writes to buf
433 : int result_code = strerror_r(error_nr, buf, buflen);
434 : if (result_code == 0)
435 : to_move = NULL;
436 : else
437 : to_move = "<failed to retrieve error message>";
438 : #else
439 : // gnu strerror_r sometimes only returns static string, needs copy
440 366 : to_move = strerror_r(error_nr, buf, buflen);
441 : #endif
442 366 : if (to_move != NULL) {
443 : // move to buffer
444 366 : size_t size = strlen(to_move) + 1;
445 366 : assert(size <= buflen);
446 : // strerror_r may have return a pointer to/into the buffer
447 366 : memmove(buf, to_move, size);
448 366 : return size - 1;
449 : } else {
450 0 : return strlen(buf);
451 : }
452 : }
453 :
454 :
455 :
456 232 : void mnstr_copy_error(stream *dst, stream *src)
457 : {
458 232 : dst->errkind = src->errkind;
459 232 : memcpy(dst->errmsg, src->errmsg, sizeof(dst->errmsg));
460 232 : }
461 :
462 : char *
463 0 : mnstr_error(const stream *s)
464 : {
465 0 : const char *msg = mnstr_peek_error(s);
466 0 : if (msg != NULL)
467 0 : return strdup(msg);
468 : else
469 : return NULL;
470 : }
471 :
472 : const char*
473 7 : mnstr_peek_error(const stream *s)
474 : {
475 7 : if (s == NULL) {
476 3 : struct tl_error_buf *b = get_tl_error_buf();
477 3 : if (b != NULL)
478 3 : return b->msg;
479 : else
480 : return "unknown error";
481 : }
482 :
483 4 : if (s->errkind == MNSTR_NO__ERROR)
484 : return "no error";
485 :
486 4 : if (s->errmsg[0] != '\0')
487 4 : return s->errmsg;
488 :
489 0 : return mnstr_error_kind_description(s->errkind);
490 : }
491 :
492 : static const char *
493 19 : mnstr_error_kind_description(mnstr_error_kind kind)
494 : {
495 19 : switch (kind) {
496 : case MNSTR_NO__ERROR:
497 : /* unreachable */
498 0 : assert(0);
499 : return NULL;
500 : case MNSTR_OPEN_ERROR:
501 : return "error could not open";
502 1 : case MNSTR_READ_ERROR:
503 1 : return "error reading";
504 0 : case MNSTR_WRITE_ERROR:
505 0 : return "error writing";
506 0 : case MNSTR_INTERRUPT:
507 0 : return "interrupted";
508 : case MNSTR_TIMEOUT:
509 : return "timeout";
510 : case MNSTR_UNEXPECTED_EOF:
511 : return "timeout";
512 : }
513 :
514 0 : return "Unknown error";
515 : }
516 :
517 : /* flush buffer, return 0 on success, non-zero on failure */
518 : int
519 729657 : mnstr_flush(stream *s, mnstr_flush_level flush_level)
520 : {
521 729657 : if (s == NULL)
522 : return -1;
523 : #ifdef STREAM_DEBUG
524 : fprintf(stderr, "flush %s\n", s->name ? s->name : "<unnamed>");
525 : #endif
526 729656 : assert(!s->readonly);
527 729656 : if (s->errkind != MNSTR_NO__ERROR)
528 : return -1;
529 729656 : if (s->flush)
530 728202 : return s->flush(s, flush_level);
531 : return 0;
532 : }
533 :
534 :
535 : /* sync file to disk, return 0 on success, non-zero on failure */
536 : int
537 8 : mnstr_fsync(stream *s)
538 : {
539 8 : if (s == NULL)
540 : return -1;
541 : #ifdef STREAM_DEBUG
542 : fprintf(stderr, "fsync %s (%d)\n",
543 : s->name ? s->name : "<unnamed>", s->errnr);
544 : #endif
545 8 : assert(!s->readonly);
546 8 : if (s->errkind != MNSTR_NO__ERROR)
547 : return -1;
548 8 : if (s->fsync)
549 8 : return s->fsync(s);
550 : return 0;
551 : }
552 :
553 :
554 : int
555 0 : mnstr_fgetpos(stream *restrict s, fpos_t *restrict p)
556 : {
557 0 : if (s == NULL || p == NULL)
558 : return -1;
559 : #ifdef STREAM_DEBUG
560 : fprintf(stderr, "fgetpos %s\n", s->name ? s->name : "<unnamed>");
561 : #endif
562 0 : if (s->errkind != MNSTR_NO__ERROR)
563 : return -1;
564 0 : if (s->fgetpos)
565 0 : return s->fgetpos(s, p);
566 : return 0;
567 : }
568 :
569 :
570 : int
571 0 : mnstr_fsetpos(stream *restrict s, fpos_t *restrict p)
572 : {
573 0 : if (s == NULL)
574 : return -1;
575 : #ifdef STREAM_DEBUG
576 : fprintf(stderr, "fsetpos %s\n", s->name ? s->name : "<unnamed>");
577 : #endif
578 0 : if (s->errkind != MNSTR_NO__ERROR)
579 : return -1;
580 0 : if (s->fsetpos)
581 0 : return s->fsetpos(s, p);
582 : return 0;
583 : }
584 :
585 :
586 : int
587 12291487 : mnstr_isalive(const stream *s)
588 : {
589 12291487 : if (s == NULL)
590 : return 0;
591 12291487 : if (s->errkind != MNSTR_NO__ERROR)
592 : return -1;
593 12291487 : if (s->isalive)
594 11341471 : return s->isalive(s);
595 : return 1;
596 : }
597 :
598 : int
599 9403842 : mnstr_getoob(const stream *s)
600 : {
601 9403842 : if (s->getoob)
602 9397055 : return s->getoob(s);
603 : return 0;
604 : }
605 :
606 : int
607 0 : mnstr_putoob(const stream *s, char val)
608 : {
609 0 : if (s->putoob)
610 0 : return s->putoob(s, val);
611 : return -1;
612 : }
613 :
614 :
615 : bool
616 190197 : mnstr_eof(const stream *s)
617 : {
618 190197 : return s->eof;
619 : }
620 :
621 : const char *
622 24652 : mnstr_name(const stream *s)
623 : {
624 24652 : if (s == NULL)
625 : return "connection terminated";
626 24652 : return s->name;
627 : }
628 :
629 :
630 : mnstr_error_kind
631 5198543 : mnstr_errnr(const stream *s)
632 : {
633 5198543 : if (s == NULL)
634 : return MNSTR_READ_ERROR;
635 5198543 : return s->errkind;
636 : }
637 :
638 : const char *
639 0 : mnstr_error_kind_name(mnstr_error_kind k)
640 : {
641 0 : switch (k) {
642 : case MNSTR_NO__ERROR:
643 : return "MNSTR_NO__ERROR";
644 0 : case MNSTR_OPEN_ERROR:
645 0 : return "MNSTR_OPEN_ERROR";
646 0 : case MNSTR_READ_ERROR:
647 0 : return "MNSTR_READ_ERROR";
648 0 : case MNSTR_WRITE_ERROR:
649 0 : return "MNSTR_WRITE_ERROR";
650 0 : case MNSTR_INTERRUPT:
651 0 : return "MNSTR_INTERRUPT";
652 0 : case MNSTR_TIMEOUT:
653 0 : return "MNSTR_TIMEOUT";
654 0 : default:
655 0 : return "<UNKNOWN_ERROR>";
656 : }
657 :
658 : }
659 :
660 : static void
661 4 : clearerror(stream *s)
662 : {
663 2 : if (s != NULL) {
664 4 : s->errkind = MNSTR_NO__ERROR;
665 2 : s->errmsg[0] = '\0';
666 : }
667 2 : }
668 :
669 : void
670 2 : mnstr_clearerr(stream *s)
671 : {
672 2 : clearerror(s);
673 2 : if (s != NULL && s->clrerr)
674 2 : s->clrerr(s);
675 2 : }
676 :
677 :
678 : bool
679 605 : mnstr_isbinary(const stream *s)
680 : {
681 605 : if (s == NULL)
682 : return false;
683 605 : return s->binary;
684 : }
685 :
686 :
687 : bool
688 0 : mnstr_get_swapbytes(const stream *s)
689 : {
690 0 : if (s == NULL)
691 : return 0;
692 0 : return s->swapbytes;
693 : }
694 :
695 :
696 : /* set stream to big-endian/little-endian byte order; the default is
697 : * native byte order */
698 : void
699 39335 : mnstr_set_bigendian(stream *s, bool bigendian)
700 : {
701 39335 : if (s == NULL)
702 : return;
703 : #ifdef STREAM_DEBUG
704 : fprintf(stderr, "mnstr_set_bigendian %s %s\n",
705 : s->name ? s->name : "<unnamed>",
706 : swapbytes ? "true" : "false");
707 : #endif
708 39335 : assert(s->readonly);
709 39335 : s->binary = true;
710 : #ifdef WORDS_BIGENDIAN
711 : s->swapbytes = !bigendian;
712 : #else
713 39335 : s->swapbytes = bigendian;
714 : #endif
715 : }
716 :
717 :
718 : void
719 66733 : close_stream(stream *s)
720 : {
721 66733 : if (s) {
722 66730 : if (s->close)
723 66730 : s->close(s);
724 66731 : if (s->destroy)
725 66731 : s->destroy(s);
726 : }
727 66734 : }
728 :
729 :
730 : void
731 388481 : destroy_stream(stream *s)
732 : {
733 388481 : if (s->name)
734 388481 : free(s->name);
735 388481 : free(s);
736 388481 : }
737 :
738 :
739 : stream *
740 388568 : create_stream(const char *name)
741 : {
742 388568 : stream *s;
743 :
744 388568 : if (name == NULL) {
745 0 : mnstr_set_open_error(NULL, 0, "internal error: name not set");
746 0 : return NULL;
747 : }
748 388568 : if ((s = (stream *) malloc(sizeof(*s))) == NULL) {
749 0 : mnstr_set_open_error(name, errno, "malloc failed");
750 0 : return NULL;
751 : }
752 388568 : *s = (stream) {
753 : .swapbytes = false,
754 : .readonly = true,
755 : .isutf8 = false, /* not known for sure */
756 : .binary = false,
757 : .eof = false,
758 388568 : .name = strdup(name),
759 : .errkind = MNSTR_NO__ERROR,
760 : .errmsg = {0},
761 : .destroy = destroy_stream,
762 : .clrerr = clearerror,
763 : };
764 388568 : if(s->name == NULL) {
765 0 : free(s);
766 0 : mnstr_set_open_error(name, errno, "malloc failed");
767 0 : return NULL;
768 : }
769 : #ifdef STREAM_DEBUG
770 : fprintf(stderr, "create_stream %s -> %p\n",
771 : name ? name : "<unnamed>", s);
772 : #endif
773 388568 : mnstr_set_open_error(NULL, 0, NULL); // clear the error
774 388568 : return s;
775 : }
776 :
777 :
778 : static ssize_t
779 0 : wrapper_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
780 : {
781 0 : ssize_t ret = s->inner->read(s->inner, buf, elmsize, cnt);
782 0 : s->eof |= s->inner->eof;
783 0 : return ret;
784 : }
785 :
786 :
787 : static ssize_t
788 0 : wrapper_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
789 : {
790 0 : return s->inner->write(s->inner, buf, elmsize, cnt);
791 : }
792 :
793 :
794 : static void
795 0 : wrapper_close(stream *s)
796 : {
797 0 : s->inner->close(s->inner);
798 0 : }
799 :
800 :
801 : static void
802 2 : wrapper_clrerr(stream *s)
803 : {
804 2 : s->inner->clrerr(s->inner);
805 2 : }
806 :
807 :
808 : static void
809 0 : wrapper_destroy(stream *s)
810 : {
811 0 : s->inner->destroy(s->inner);
812 0 : destroy_stream(s);
813 0 : }
814 :
815 :
816 : static int
817 0 : wrapper_flush(stream *s, mnstr_flush_level flush_level)
818 : {
819 0 : return s->inner->flush(s->inner, flush_level);
820 : }
821 :
822 :
823 : static int
824 4 : wrapper_fsync(stream *s)
825 : {
826 4 : return s->inner->fsync(s->inner);
827 : }
828 :
829 :
830 : static int
831 0 : wrapper_fgetpos(stream *restrict s, fpos_t *restrict p)
832 : {
833 0 : return s->inner->fgetpos(s->inner, p);
834 : }
835 :
836 :
837 : static int
838 0 : wrapper_fsetpos(stream *restrict s, fpos_t *restrict p)
839 : {
840 0 : return s->inner->fsetpos(s->inner, p);
841 : }
842 :
843 :
844 : static void
845 37977 : wrapper_update_timeout(stream *s)
846 : {
847 37977 : s->inner->timeout = s->timeout;
848 37977 : s->inner->timeout_func = s->timeout_func;
849 37977 : s->inner->timeout_data = s->timeout_data;
850 37977 : s->inner->update_timeout(s->inner);
851 37976 : }
852 :
853 :
854 : static int
855 11328527 : wrapper_isalive(const stream *s)
856 : {
857 11328527 : return s->inner->isalive(s->inner);
858 : }
859 :
860 :
861 : static int
862 9395029 : wrapper_getoob(const stream *s)
863 : {
864 9395029 : return s->inner->getoob(s->inner);
865 : }
866 :
867 :
868 : static int
869 0 : wrapper_putoob(const stream *s, char val)
870 : {
871 0 : return s->inner->putoob(s->inner, val);
872 : }
873 :
874 :
875 : stream *
876 80161 : create_wrapper_stream(const char *name, stream *inner)
877 : {
878 80161 : if (inner == NULL)
879 : return NULL;
880 80161 : if (name == NULL)
881 80157 : name = inner->name;
882 80161 : stream *s = create_stream(name);
883 80161 : if (s == NULL)
884 : return NULL;
885 :
886 :
887 80161 : s->swapbytes = inner->swapbytes;
888 80161 : s->readonly = inner->readonly;
889 80161 : s->isutf8 = inner->isutf8;
890 80161 : s->binary = inner->binary;
891 80161 : s->timeout = inner->timeout;
892 80161 : s->inner = inner;
893 :
894 80161 : s->read = inner->read == NULL ? NULL : wrapper_read;
895 80161 : s->write = inner->write == NULL ? NULL : wrapper_write;
896 80161 : s->close = inner->close == NULL ? NULL : wrapper_close;
897 80161 : s->clrerr = inner->clrerr == NULL ? NULL : wrapper_clrerr;
898 80161 : s->destroy = wrapper_destroy;
899 80161 : s->flush = inner->flush == NULL ? NULL : wrapper_flush;
900 80161 : s->fsync = inner->fsync == NULL ? NULL : wrapper_fsync;
901 80161 : s->fgetpos = inner->fgetpos == NULL ? NULL : wrapper_fgetpos;
902 80161 : s->fsetpos = inner->fsetpos == NULL ? NULL : wrapper_fsetpos;
903 80161 : s->isalive = inner->isalive == NULL ? NULL : wrapper_isalive;
904 80161 : s->getoob = inner->getoob == NULL ? NULL : wrapper_getoob;
905 80161 : s->putoob = inner->putoob == NULL ? NULL : wrapper_putoob;
906 80161 : s->update_timeout = inner->update_timeout == NULL ? NULL : wrapper_update_timeout;
907 :
908 80161 : return s;
909 : }
910 :
911 : /* ------------------------------------------------------------------ */
912 : /* streams working on a disk file, compressed or not */
913 :
914 : stream *
915 13022 : open_rstream(const char *filename)
916 : {
917 13022 : if (filename == NULL)
918 : return NULL;
919 : #ifdef STREAM_DEBUG
920 : fprintf(stderr, "open_rstream %s\n", filename);
921 : #endif
922 :
923 13022 : stream *s = open_stream(filename, "rb");
924 13022 : if (s == NULL)
925 : return NULL;
926 :
927 12870 : stream *c = compressed_stream(s, 0);
928 12870 : if (c == NULL)
929 0 : close_stream(s);
930 :
931 : return c;
932 : }
933 :
934 : stream *
935 11781 : open_wstream(const char *filename)
936 : {
937 11781 : if (filename == NULL)
938 : return NULL;
939 : #ifdef STREAM_DEBUG
940 : fprintf(stderr, "open_wstream %s\n", filename);
941 : #endif
942 :
943 11781 : stream *s = open_stream(filename, "wb");
944 11781 : if (s == NULL)
945 : return NULL;
946 :
947 11781 : stream *c = compressed_stream(s, 0);
948 11781 : if (c == NULL) {
949 0 : close_stream(s);
950 0 : (void) file_remove(filename);
951 : }
952 :
953 : return c;
954 : }
955 :
956 : stream *
957 348 : open_rastream(const char *filename)
958 : {
959 348 : if (filename == NULL)
960 : return NULL;
961 : #ifdef STREAM_DEBUG
962 : fprintf(stderr, "open_rastream %s\n", filename);
963 : #endif
964 348 : stream *s = open_rstream(filename);
965 348 : if (s == NULL)
966 : return NULL;
967 :
968 315 : stream *t = create_text_stream(s);
969 315 : if (t == NULL)
970 0 : close_stream(s);
971 :
972 : return t;
973 : }
974 :
975 : stream *
976 24 : open_wastream(const char *filename)
977 : {
978 24 : if (filename == NULL)
979 : return NULL;
980 : #ifdef STREAM_DEBUG
981 : fprintf(stderr, "open_wastream %s\n", filename);
982 : #endif
983 24 : stream *s = open_wstream(filename);
984 24 : if (s == NULL)
985 : return NULL;
986 :
987 24 : stream *t = create_text_stream(s);
988 24 : if (t == NULL) {
989 0 : close_stream(s);
990 0 : (void) file_remove(filename);
991 : }
992 :
993 : return t;
994 : }
995 :
996 :
997 : /* put here because it depends on both bs_read AND bs2_read */
998 : bool
999 462322 : isa_block_stream(const stream *s)
1000 : {
1001 462322 : assert(s != NULL);
1002 924644 : return s &&
1003 462322 : ((s->read == bs_read ||
1004 3313 : s->write == bs_write));
1005 : }
1006 :
1007 :
1008 : /* Put here because I need to think very carefully about this
1009 : * mnstr_read(,, 0, 0). What would that mean?
1010 : */
1011 : ssize_t
1012 39523 : mnstr_read_block(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
1013 : {
1014 39523 : ssize_t len = 0;
1015 39523 : char x = 0;
1016 :
1017 39523 : if (s == NULL || buf == NULL)
1018 : return -1;
1019 39523 : assert(s->read == bs_read || s->write == bs_write);
1020 79039 : if ((len = mnstr_read(s, buf, elmsize, cnt)) < 0 ||
1021 39522 : mnstr_read(s, &x, 0, 0) < 0 /* read prompt */ ||
1022 39516 : x > 0)
1023 1 : return -1;
1024 : return len;
1025 : }
|