Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /* stream
14 : * ======
15 : * Niels Nes
16 : * An simple interface to streams
17 : *
18 : * Processing files, streams, and sockets is quite different on Linux
19 : * and Windows platforms. To improve portability between both, we advise
20 : * to replace the stdio actions with the stream functionality provided
21 : * here.
22 : *
23 : * This interface can also be used to open 'non compressed, gzipped,
24 : * bz2zipped' data files and sockets. Using this interface one could
25 : * easily switch between the various underlying storage types.
26 : *
27 : * buffered streams
28 : * ----------------
29 : *
30 : * The bstream (or buffered_stream) can be used for efficient reading of
31 : * a stream. Reading can be done in large chunks and access can be done
32 : * in smaller bits, by directly accessing the underlying buffer.
33 : *
34 : * Beware that a flush on a buffered stream emits an empty block to
35 : * synchronize with the other side, telling it has reached the end of
36 : * the sequence and can close its descriptors.
37 : *
38 : * bstream functions
39 : * -----------------
40 : *
41 : * The bstream_create gets a read stream (rs) as input and the initial
42 : * chunk size and creates a buffered stream from this. A spare byte is
43 : * kept at the end of the buffer. The bstream_read will at least read
44 : * the next 'size' bytes. If the not read data (aka pos < len) together
45 : * with the new data will not fit in the current buffer it is resized.
46 : * The spare byte is kept.
47 : *
48 : * tee streams
49 : * -----------
50 : *
51 : * A tee stream is a write stream that duplicates all output to two
52 : * write streams of the same type (txt/bin).
53 : */
54 :
55 : /* Generic stream handling code such as init and close */
56 :
57 : #include "monetdb_config.h"
58 : #include "stream.h"
59 : #include "stream_internal.h"
60 : #include <stdio.h>
61 :
62 :
63 : #ifdef HAVE_PTHREAD_H
64 : #include <pthread.h>
65 : #endif
66 :
67 : struct tl_error_buf {
68 : char msg[1024];
69 : };
70 :
71 : static int tl_error_init(void);
72 : static struct tl_error_buf *get_tl_error_buf(void);
73 :
74 : #ifdef HAVE_PTHREAD_H
75 :
76 : static pthread_key_t tl_error_key;
77 :
78 : static void
79 528 : clear_main_tl_error_buf(void)
80 : {
81 528 : void *p = pthread_getspecific(tl_error_key);
82 528 : if (p != NULL) {
83 528 : pthread_setspecific(tl_error_key, NULL);
84 528 : free(p);
85 : }
86 528 : }
87 :
88 : static int
89 528 : tl_error_init(void)
90 : {
91 528 : if (pthread_key_create(&tl_error_key, free) != 0)
92 : return -1;
93 : // Turns out the destructor registered with pthread_key_create() does not
94 : // always run for the main thread. This atexit hook clears the main thread's
95 : // error buffer to avoid this being reported as a memory leak.
96 528 : atexit(clear_main_tl_error_buf);
97 528 : return 0;
98 : }
99 :
100 : static struct tl_error_buf*
101 387320 : get_tl_error_buf(void)
102 : {
103 387320 : struct tl_error_buf *p = pthread_getspecific(tl_error_key);
104 387319 : if (p == NULL) {
105 3966 : p = malloc(sizeof(*p));
106 3966 : if (p == NULL)
107 : return NULL;
108 3966 : *p = (struct tl_error_buf) { .msg = {0} };
109 3966 : pthread_setspecific(tl_error_key, p);
110 3966 : struct tl_error_buf *second_attempt = pthread_getspecific(tl_error_key);
111 3966 : assert(p == second_attempt && "maybe mnstr_init has not been called?");
112 : (void) second_attempt; // suppress warning if asserts disabled
113 : }
114 : return p;
115 : }
116 :
117 : #elif defined(WIN32)
118 :
119 : static DWORD tl_error_key = 0;
120 :
121 : static int
122 : tl_error_init(void)
123 : {
124 : DWORD key = TlsAlloc();
125 : if (key == TLS_OUT_OF_INDEXES)
126 : return -1;
127 : else {
128 : tl_error_key = key;
129 : return 0;
130 : }
131 : }
132 :
133 : static struct tl_error_buf*
134 : get_tl_error_buf(void)
135 : {
136 : struct tl_error_buf *p = TlsGetValue(tl_error_key);
137 :
138 : if (p == NULL) {
139 : if (GetLastError() != ERROR_SUCCESS)
140 : return NULL; // something went terribly wrong
141 :
142 : // otherwise, initialize
143 : p = malloc(sizeof(*p));
144 : if (p == NULL)
145 : return NULL;
146 : *p = (struct tl_error_buf) { .msg = 0 };
147 : if (!TlsSetValue(tl_error_key, p)) {
148 : free(p);
149 : return NULL;
150 : }
151 :
152 : struct tl_error_buf *second_attempt = TlsGetValue(tl_error_key);
153 : assert(p == second_attempt /* maybe mnstr_init has not been called? */);
154 : (void) second_attempt; // suppress warning if asserts disabled
155 : }
156 : return p;
157 : }
158 :
159 : #else
160 :
161 : #error "no pthreads and no Windows, don't know what to do"
162 :
163 : #endif
164 :
165 : static const char *mnstr_error_kind_description(mnstr_error_kind kind);
166 :
167 : int
168 720 : mnstr_init(void)
169 : {
170 720 : static ATOMIC_FLAG inited = ATOMIC_FLAG_INIT;
171 :
172 720 : if (ATOMIC_TAS(&inited))
173 : return 0;
174 :
175 528 : if (tl_error_init()< 0)
176 : return -1;
177 :
178 : #ifdef NATIVE_WIN32
179 : WSADATA w;
180 : if (WSAStartup(0x0101, &w) != 0)
181 : return -1;
182 : #endif
183 :
184 : return 0;
185 : }
186 :
187 : const char *
188 0 : mnstr_version(void)
189 : {
190 0 : return STREAM_VERSION;
191 : }
192 :
193 : /* Read at most cnt elements of size elmsize from the stream. Returns
194 : * the number of elements actually read or < 0 on failure. */
195 : ssize_t
196 17184422 : mnstr_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
197 : {
198 17184422 : if (s == NULL || buf == NULL)
199 : return -1;
200 : #ifdef STREAM_DEBUG
201 : fprintf(stderr, "read %s %zu %zu\n",
202 : s->name ? s->name : "<unnamed>", elmsize, cnt);
203 : #endif
204 17184422 : assert(s->readonly);
205 17184422 : if (s->errkind != MNSTR_NO__ERROR)
206 : return -1;
207 17184422 : return s->read(s, buf, elmsize, cnt);
208 : }
209 :
210 :
211 : /* Write cnt elements of size elmsize to the stream. Returns the
212 : * number of elements actually written. If elmsize or cnt equals zero,
213 : * returns cnt. */
214 : ssize_t
215 33459996 : mnstr_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
216 : {
217 33459996 : if (s == NULL || buf == NULL)
218 : return -1;
219 : #ifdef STREAM_DEBUG
220 : fprintf(stderr, "write %s %zu %zu\n",
221 : s->name ? s->name : "<unnamed>", elmsize, cnt);
222 : #endif
223 33459994 : assert(!s->readonly);
224 33459994 : if (s->errkind != MNSTR_NO__ERROR)
225 : return -1;
226 33459984 : return s->write(s, buf, elmsize, cnt);
227 : }
228 :
229 :
230 : /* Read one line (seperated by \n) of at most maxcnt-1 characters from
231 : * the stream. Returns the number of characters actually read,
232 : * includes the trailing \n; terminated by a NULL byte. */
233 : ssize_t
234 108847 : mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt)
235 : {
236 108847 : char *b = buf, *start = buf;
237 :
238 108847 : if (s == NULL || buf == NULL)
239 : return -1;
240 : #ifdef STREAM_DEBUG
241 : fprintf(stderr, "readline %s %zu\n",
242 : s->name ? s->name : "<unnamed>", maxcnt);
243 : #endif
244 108847 : assert(s->readonly);
245 108847 : if (s->errkind != MNSTR_NO__ERROR)
246 : return -1;
247 108847 : if (maxcnt == 0)
248 : return 0;
249 108847 : if (maxcnt == 1) {
250 0 : *start = 0;
251 0 : return 0;
252 : }
253 18247315 : for (;;) {
254 18247315 : switch (s->read(s, start, 1, 1)) {
255 18247055 : case 1:
256 : /* successfully read a character,
257 : * check whether it is the line
258 : * separator and whether we have space
259 : * left for more */
260 18247055 : if (*start++ == '\n' || --maxcnt == 1) {
261 108587 : *start = 0;
262 108587 : return (ssize_t) (start - b);
263 : }
264 : break;
265 0 : case -1:
266 : /* error: if we didn't read anything yet,
267 : * return the error, otherwise return what we
268 : * have */
269 0 : if (start == b)
270 : return -1;
271 : /* fall through */
272 : case 0:
273 : /* end of file: return what we have */
274 260 : *start = 0;
275 260 : return (ssize_t) (start - b);
276 : }
277 : }
278 : }
279 :
280 :
281 : void
282 38206 : mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void *data)
283 : {
284 38206 : if (s) {
285 38206 : s->timeout = ms;
286 38206 : s->timeout_func = func;
287 38206 : s->timeout_data = data;
288 38206 : if (s->update_timeout)
289 38206 : s->update_timeout(s);
290 : }
291 38205 : }
292 :
293 :
294 : void
295 79968 : mnstr_close(stream *s)
296 : {
297 79968 : if (s) {
298 : #ifdef STREAM_DEBUG
299 : fprintf(stderr, "close %s\n", s->name ? s->name : "<unnamed>");
300 : #endif
301 79968 : s->close(s);
302 : }
303 79968 : }
304 :
305 :
306 : void
307 1379 : mnstr_destroy(stream *s)
308 : {
309 1379 : if (s) {
310 : #ifdef STREAM_DEBUG
311 : fprintf(stderr, "destroy %s\n",
312 : s->name ? s->name : "<unnamed>");
313 : #endif
314 1373 : s->destroy(s);
315 : }
316 1379 : }
317 :
318 : void
319 317 : mnstr_va_set_error(stream *s, mnstr_error_kind kind, const char *fmt, va_list ap)
320 : {
321 317 : if (s == NULL)
322 : return;
323 :
324 317 : s->errkind = kind;
325 :
326 317 : if (kind == MNSTR_NO__ERROR) {
327 0 : s->errmsg[0] = '\0';
328 0 : return;
329 : }
330 :
331 317 : char *start = &s->errmsg[0];
332 317 : char *end = start + sizeof(s->errmsg);
333 317 : if (s->name != NULL)
334 317 : start += snprintf(start, end - start, "stream %s: ", s->name);
335 :
336 317 : if (start >= end - 1)
337 : return;
338 :
339 317 : if (fmt == NULL)
340 19 : fmt = mnstr_error_kind_description(kind);
341 :
342 : // Complicated pointer dance in order to shut up 'might be a candidate
343 : // for gnu_printf format attribute' warning from gcc.
344 : // It's really eager to trace where the vsnprintf ends up, we need
345 : // the ? : to throw it off its scent.
346 : // Similarly, the parentheses around the 1 serve to suppress a Clang
347 : // warning about dead code (the atoi).
348 317 : void *f1 = (1) ? (void*)&vsnprintf : (void*)&atoi;
349 317 : int (*f)(char *str, size_t size, const char *format, va_list ap) = f1;
350 317 : f(start, end - start, fmt, ap);
351 : }
352 :
353 : void
354 20 : mnstr_set_error(stream *s, mnstr_error_kind kind, const char *fmt, ...)
355 : {
356 20 : va_list ap;
357 20 : va_start(ap, fmt);
358 20 : mnstr_va_set_error(s, kind, fmt, ap);
359 20 : va_end(ap);
360 20 : }
361 :
362 : static size_t my_strerror_r(int error_nr, char *buf, size_t len);
363 :
364 : void
365 297 : mnstr_set_error_errno(stream *s, mnstr_error_kind kind, const char *fmt, ...)
366 : {
367 297 : va_list ap;
368 297 : va_start(ap, fmt);
369 297 : mnstr_va_set_error(s, kind, fmt, ap);
370 297 : va_end(ap);
371 :
372 : /* append as much as fits of the system error message */
373 297 : char *start = &s->errmsg[0] + strlen(s->errmsg);
374 297 : char *end = &s->errmsg[0] + sizeof(s->errmsg);
375 297 : if (end - start >= 3) {
376 297 : start = stpcpy(start, ": ");
377 297 : start += my_strerror_r(errno, start, end - start);
378 : }
379 297 : }
380 :
381 :
382 : void
383 387317 : mnstr_set_open_error(const char *name, int errnr, const char *fmt, ...)
384 : {
385 387317 : va_list ap;
386 :
387 387317 : struct tl_error_buf *buf = get_tl_error_buf();
388 387315 : if (buf == NULL)
389 387186 : return; // hopeless
390 :
391 387315 : if (errnr == 0 && fmt == NULL) {
392 387186 : buf->msg[0] = '\0';
393 387186 : return;
394 : }
395 :
396 129 : char *start = &buf->msg[0];
397 129 : char *end = start + sizeof(buf->msg);
398 :
399 129 : if (name != NULL)
400 129 : start += snprintf(start, end - start, "when opening %s: ", name);
401 129 : if (start >= end - 1)
402 : return;
403 :
404 129 : if (fmt != NULL) {
405 129 : va_start(ap, fmt);
406 129 : start += vsnprintf(start, end - start, fmt, ap);
407 129 : va_end(ap);
408 : }
409 129 : if (start >= end - 1)
410 : return;
411 :
412 129 : if (errnr != 0 && end - start >= 3) {
413 127 : start = stpcpy(start, ": ");
414 127 : start += my_strerror_r(errno, start, end - start);
415 : }
416 129 : if (start >= end - 1)
417 : return;
418 : }
419 :
420 : static size_t
421 424 : my_strerror_r(int error_nr, char *buf, size_t buflen)
422 : {
423 : // Three cases:
424 : // 1. no strerror_r
425 : // 2. gnu strerror_r (returns char* and does not always fill buffer)
426 : // 3. xsi strerror_r (returns int and always fills the buffer)
427 424 : char *to_move;
428 : #ifndef HAVE_STRERROR_R
429 : // Hope for the best
430 : to_move = strerror(error_nr);
431 : #elif !defined(_GNU_SOURCE) || !_GNU_SOURCE
432 : // standard strerror_r always writes to buf
433 : int result_code = strerror_r(error_nr, buf, buflen);
434 : if (result_code == 0)
435 : to_move = NULL;
436 : else
437 : to_move = "<failed to retrieve error message>";
438 : #else
439 : // gnu strerror_r sometimes only returns static string, needs copy
440 424 : to_move = strerror_r(error_nr, buf, buflen);
441 : #endif
442 424 : if (to_move != NULL) {
443 : // move to buffer
444 424 : size_t size = strlen(to_move) + 1;
445 424 : assert(size <= buflen);
446 : // strerror_r may have return a pointer to/into the buffer
447 424 : memmove(buf, to_move, size);
448 424 : return size - 1;
449 : } else {
450 0 : return strlen(buf);
451 : }
452 : }
453 :
454 :
455 :
456 315 : void mnstr_copy_error(stream *dst, stream *src)
457 : {
458 315 : dst->errkind = src->errkind;
459 315 : memcpy(dst->errmsg, src->errmsg, sizeof(dst->errmsg));
460 315 : }
461 :
462 : char *
463 0 : mnstr_error(const stream *s)
464 : {
465 0 : const char *msg = mnstr_peek_error(s);
466 0 : if (msg != NULL)
467 0 : return strdup(msg);
468 : else
469 : return NULL;
470 : }
471 :
472 : const char*
473 7 : mnstr_peek_error(const stream *s)
474 : {
475 7 : if (s == NULL) {
476 3 : struct tl_error_buf *b = get_tl_error_buf();
477 3 : if (b != NULL)
478 3 : return b->msg;
479 : else
480 : return "unknown error";
481 : }
482 :
483 4 : if (s->errkind == MNSTR_NO__ERROR)
484 : return "no error";
485 :
486 4 : if (s->errmsg[0] != '\0')
487 4 : return s->errmsg;
488 :
489 0 : return mnstr_error_kind_description(s->errkind);
490 : }
491 :
492 : static const char *
493 19 : mnstr_error_kind_description(mnstr_error_kind kind)
494 : {
495 19 : switch (kind) {
496 : case MNSTR_NO__ERROR:
497 : /* unreachable */
498 0 : assert(0);
499 : return NULL;
500 : case MNSTR_OPEN_ERROR:
501 : return "error could not open";
502 1 : case MNSTR_READ_ERROR:
503 1 : return "error reading";
504 0 : case MNSTR_WRITE_ERROR:
505 0 : return "error writing";
506 : case MNSTR_TIMEOUT:
507 : return "timeout";
508 : case MNSTR_UNEXPECTED_EOF:
509 : return "timeout";
510 : }
511 :
512 0 : return "Unknown error";
513 : }
514 :
515 : /* flush buffer, return 0 on success, non-zero on failure */
516 : int
517 730083 : mnstr_flush(stream *s, mnstr_flush_level flush_level)
518 : {
519 730083 : if (s == NULL)
520 : return -1;
521 : #ifdef STREAM_DEBUG
522 : fprintf(stderr, "flush %s\n", s->name ? s->name : "<unnamed>");
523 : #endif
524 730082 : assert(!s->readonly);
525 730082 : if (s->errkind != MNSTR_NO__ERROR)
526 : return -1;
527 730082 : if (s->flush)
528 728627 : return s->flush(s, flush_level);
529 : return 0;
530 : }
531 :
532 :
533 : /* sync file to disk, return 0 on success, non-zero on failure */
534 : int
535 36 : mnstr_fsync(stream *s)
536 : {
537 36 : if (s == NULL)
538 : return -1;
539 : #ifdef STREAM_DEBUG
540 : fprintf(stderr, "fsync %s (%d)\n",
541 : s->name ? s->name : "<unnamed>", s->errnr);
542 : #endif
543 36 : assert(!s->readonly);
544 36 : if (s->errkind != MNSTR_NO__ERROR)
545 : return -1;
546 36 : if (s->fsync)
547 36 : return s->fsync(s);
548 : return 0;
549 : }
550 :
551 :
552 : int
553 0 : mnstr_fgetpos(stream *restrict s, fpos_t *restrict p)
554 : {
555 0 : if (s == NULL || p == NULL)
556 : return -1;
557 : #ifdef STREAM_DEBUG
558 : fprintf(stderr, "fgetpos %s\n", s->name ? s->name : "<unnamed>");
559 : #endif
560 0 : if (s->errkind != MNSTR_NO__ERROR)
561 : return -1;
562 0 : if (s->fgetpos)
563 0 : return s->fgetpos(s, p);
564 : return 0;
565 : }
566 :
567 :
568 : int
569 0 : mnstr_fsetpos(stream *restrict s, fpos_t *restrict p)
570 : {
571 0 : if (s == NULL)
572 : return -1;
573 : #ifdef STREAM_DEBUG
574 : fprintf(stderr, "fsetpos %s\n", s->name ? s->name : "<unnamed>");
575 : #endif
576 0 : if (s->errkind != MNSTR_NO__ERROR)
577 : return -1;
578 0 : if (s->fsetpos)
579 0 : return s->fsetpos(s, p);
580 : return 0;
581 : }
582 :
583 :
584 : int
585 8616075 : mnstr_isalive(const stream *s)
586 : {
587 8616075 : if (s == NULL)
588 : return 0;
589 8616075 : if (s->errkind != MNSTR_NO__ERROR)
590 : return -1;
591 8616075 : if (s->isalive)
592 7771597 : return s->isalive(s);
593 : return 1;
594 : }
595 :
596 :
597 : bool
598 189222 : mnstr_eof(const stream *s)
599 : {
600 189222 : return s->eof;
601 : }
602 :
603 : const char *
604 25402 : mnstr_name(const stream *s)
605 : {
606 25402 : if (s == NULL)
607 : return "connection terminated";
608 25402 : return s->name;
609 : }
610 :
611 :
612 : mnstr_error_kind
613 4946526 : mnstr_errnr(const stream *s)
614 : {
615 4946526 : if (s == NULL)
616 : return MNSTR_READ_ERROR;
617 4946526 : return s->errkind;
618 : }
619 :
620 : const char *
621 0 : mnstr_error_kind_name(mnstr_error_kind k)
622 : {
623 0 : switch (k) {
624 : case MNSTR_NO__ERROR:
625 : return "MNSTR_NO__ERROR";
626 0 : case MNSTR_OPEN_ERROR:
627 0 : return "MNSTR_OPEN_ERROR";
628 0 : case MNSTR_READ_ERROR:
629 0 : return "MNSTR_READ_ERROR";
630 0 : case MNSTR_WRITE_ERROR:
631 0 : return "MNSTR_WRITE_ERROR";
632 0 : case MNSTR_TIMEOUT:
633 0 : return "MNSTR_TIMEOUT";
634 0 : default:
635 0 : return "<UNKNOWN_ERROR>";
636 : }
637 :
638 : }
639 : void
640 2 : mnstr_clearerr(stream *s)
641 : {
642 2 : if (s != NULL) {
643 2 : s->errkind = MNSTR_NO__ERROR;
644 2 : s->errmsg[0] = '\0';
645 2 : if (s->clrerr)
646 0 : s->clrerr(s);
647 : }
648 2 : }
649 :
650 :
651 : bool
652 605 : mnstr_isbinary(const stream *s)
653 : {
654 605 : if (s == NULL)
655 : return false;
656 605 : return s->binary;
657 : }
658 :
659 :
660 : bool
661 0 : mnstr_get_swapbytes(const stream *s)
662 : {
663 0 : if (s == NULL)
664 : return 0;
665 0 : return s->swapbytes;
666 : }
667 :
668 :
669 : /* set stream to big-endian/little-endian byte order; the default is
670 : * native byte order */
671 : void
672 39586 : mnstr_set_bigendian(stream *s, bool bigendian)
673 : {
674 39586 : if (s == NULL)
675 : return;
676 : #ifdef STREAM_DEBUG
677 : fprintf(stderr, "mnstr_set_bigendian %s %s\n",
678 : s->name ? s->name : "<unnamed>",
679 : swapbytes ? "true" : "false");
680 : #endif
681 39586 : assert(s->readonly);
682 39586 : s->binary = true;
683 : #ifdef WORDS_BIGENDIAN
684 : s->swapbytes = !bigendian;
685 : #else
686 39586 : s->swapbytes = bigendian;
687 : #endif
688 : }
689 :
690 :
691 : void
692 67710 : close_stream(stream *s)
693 : {
694 67710 : if (s) {
695 67707 : if (s->close)
696 67707 : s->close(s);
697 67707 : if (s->destroy)
698 67707 : s->destroy(s);
699 : }
700 67710 : }
701 :
702 :
703 : void
704 387108 : destroy_stream(stream *s)
705 : {
706 387108 : if (s->name)
707 387108 : free(s->name);
708 387108 : free(s);
709 387108 : }
710 :
711 :
712 : stream *
713 387188 : create_stream(const char *name)
714 : {
715 387188 : stream *s;
716 :
717 387188 : if (name == NULL) {
718 0 : mnstr_set_open_error(NULL, 0, "internal error: name not set");
719 0 : return NULL;
720 : }
721 387188 : if ((s = (stream *) malloc(sizeof(*s))) == NULL) {
722 0 : mnstr_set_open_error(name, errno, "malloc failed");
723 0 : return NULL;
724 : }
725 387188 : *s = (stream) {
726 : .swapbytes = false,
727 : .readonly = true,
728 : .isutf8 = false, /* not known for sure */
729 : .binary = false,
730 : .eof = false,
731 387188 : .name = strdup(name),
732 : .errkind = MNSTR_NO__ERROR,
733 : .errmsg = {0},
734 : .destroy = destroy_stream,
735 : };
736 387188 : if(s->name == NULL) {
737 0 : free(s);
738 0 : mnstr_set_open_error(name, errno, "malloc failed");
739 0 : return NULL;
740 : }
741 : #ifdef STREAM_DEBUG
742 : fprintf(stderr, "create_stream %s -> %p\n",
743 : name ? name : "<unnamed>", s);
744 : #endif
745 387188 : mnstr_set_open_error(NULL, 0, NULL); // clear the error
746 387188 : return s;
747 : }
748 :
749 :
750 : static ssize_t
751 0 : wrapper_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
752 : {
753 0 : ssize_t ret = s->inner->read(s->inner, buf, elmsize, cnt);
754 0 : s->eof |= s->inner->eof;
755 0 : return ret;
756 : }
757 :
758 :
759 : static ssize_t
760 0 : wrapper_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
761 : {
762 0 : return s->inner->write(s->inner, buf, elmsize, cnt);
763 : }
764 :
765 :
766 : static void
767 0 : wrapper_close(stream *s)
768 : {
769 0 : s->inner->close(s->inner);
770 0 : }
771 :
772 :
773 : static void
774 0 : wrapper_clrerr(stream *s)
775 : {
776 0 : s->inner->clrerr(s->inner);
777 0 : }
778 :
779 :
780 : static void
781 0 : wrapper_destroy(stream *s)
782 : {
783 0 : s->inner->destroy(s->inner);
784 0 : destroy_stream(s);
785 0 : }
786 :
787 :
788 : static int
789 0 : wrapper_flush(stream *s, mnstr_flush_level flush_level)
790 : {
791 0 : return s->inner->flush(s->inner, flush_level);
792 : }
793 :
794 :
795 : static int
796 4 : wrapper_fsync(stream *s)
797 : {
798 4 : return s->inner->fsync(s->inner);
799 : }
800 :
801 :
802 : static int
803 0 : wrapper_fgetpos(stream *restrict s, fpos_t *restrict p)
804 : {
805 0 : return s->inner->fgetpos(s->inner, p);
806 : }
807 :
808 :
809 : static int
810 0 : wrapper_fsetpos(stream *restrict s, fpos_t *restrict p)
811 : {
812 0 : return s->inner->fsetpos(s->inner, p);
813 : }
814 :
815 :
816 : static void
817 38206 : wrapper_update_timeout(stream *s)
818 : {
819 38206 : s->inner->timeout = s->timeout;
820 38206 : s->inner->timeout_func = s->timeout_func;
821 38206 : s->inner->timeout_data = s->timeout_data;
822 38206 : s->inner->update_timeout(s->inner);
823 38205 : }
824 :
825 :
826 : static int
827 7771766 : wrapper_isalive(const stream *s)
828 : {
829 7771766 : return s->inner->isalive(s->inner);
830 : }
831 :
832 :
833 : stream *
834 80579 : create_wrapper_stream(const char *name, stream *inner)
835 : {
836 80579 : if (inner == NULL)
837 : return NULL;
838 80579 : if (name == NULL)
839 80575 : name = inner->name;
840 80579 : stream *s = create_stream(name);
841 80579 : if (s == NULL)
842 : return NULL;
843 :
844 :
845 80579 : s->swapbytes = inner->swapbytes;
846 80579 : s->readonly = inner->readonly;
847 80579 : s->isutf8 = inner->isutf8;
848 80579 : s->binary = inner->binary;
849 80579 : s->timeout = inner->timeout;
850 80579 : s->inner = inner;
851 :
852 80579 : s->read = inner->read == NULL ? NULL : wrapper_read;
853 80579 : s->write = inner->write == NULL ? NULL : wrapper_write;
854 80579 : s->close = inner->close == NULL ? NULL : wrapper_close;
855 80579 : s->clrerr = inner->clrerr == NULL ? NULL : wrapper_clrerr;
856 80579 : s->destroy = wrapper_destroy;
857 80579 : s->flush = inner->flush == NULL ? NULL : wrapper_flush;
858 80579 : s->fsync = inner->fsync == NULL ? NULL : wrapper_fsync;
859 80579 : s->fgetpos = inner->fgetpos == NULL ? NULL : wrapper_fgetpos;
860 80579 : s->fsetpos = inner->fsetpos == NULL ? NULL : wrapper_fsetpos;
861 80579 : s->isalive = inner->isalive == NULL ? NULL : wrapper_isalive;
862 80579 : s->update_timeout = inner->update_timeout == NULL ? NULL : wrapper_update_timeout;
863 :
864 80579 : return s;
865 : }
866 :
867 : /* ------------------------------------------------------------------ */
868 : /* streams working on a disk file, compressed or not */
869 :
870 : stream *
871 13374 : open_rstream(const char *filename)
872 : {
873 13374 : if (filename == NULL)
874 : return NULL;
875 : #ifdef STREAM_DEBUG
876 : fprintf(stderr, "open_rstream %s\n", filename);
877 : #endif
878 :
879 13374 : stream *s = open_stream(filename, "rb");
880 13374 : if (s == NULL)
881 : return NULL;
882 :
883 13247 : stream *c = compressed_stream(s, 0);
884 13247 : if (c == NULL)
885 0 : close_stream(s);
886 :
887 : return c;
888 : }
889 :
890 : stream *
891 12154 : open_wstream(const char *filename)
892 : {
893 12154 : if (filename == NULL)
894 : return NULL;
895 : #ifdef STREAM_DEBUG
896 : fprintf(stderr, "open_wstream %s\n", filename);
897 : #endif
898 :
899 12154 : stream *s = open_stream(filename, "wb");
900 12154 : if (s == NULL)
901 : return NULL;
902 :
903 12154 : stream *c = compressed_stream(s, 0);
904 12154 : if (c == NULL) {
905 0 : close_stream(s);
906 0 : (void) file_remove(filename);
907 : }
908 :
909 : return c;
910 : }
911 :
912 : stream *
913 312 : open_rastream(const char *filename)
914 : {
915 312 : if (filename == NULL)
916 : return NULL;
917 : #ifdef STREAM_DEBUG
918 : fprintf(stderr, "open_rastream %s\n", filename);
919 : #endif
920 312 : stream *s = open_rstream(filename);
921 312 : if (s == NULL)
922 : return NULL;
923 :
924 297 : stream *t = create_text_stream(s);
925 297 : if (t == NULL)
926 0 : close_stream(s);
927 :
928 : return t;
929 : }
930 :
931 : stream *
932 5 : open_wastream(const char *filename)
933 : {
934 5 : if (filename == NULL)
935 : return NULL;
936 : #ifdef STREAM_DEBUG
937 : fprintf(stderr, "open_wastream %s\n", filename);
938 : #endif
939 5 : stream *s = open_wstream(filename);
940 5 : if (s == NULL)
941 : return NULL;
942 :
943 5 : stream *t = create_text_stream(s);
944 5 : if (t == NULL) {
945 0 : close_stream(s);
946 0 : (void) file_remove(filename);
947 : }
948 :
949 : return t;
950 : }
951 :
952 :
953 : /* put here because it depends on both bs_read AND bs2_read */
954 : bool
955 465942 : isa_block_stream(const stream *s)
956 : {
957 465942 : assert(s != NULL);
958 931884 : return s &&
959 465942 : ((s->read == bs_read ||
960 3300 : s->write == bs_write));
961 : }
962 :
963 :
964 : /* Put here because I need to think very carefully about this
965 : * mnstr_read(,, 0, 0). What would that mean?
966 : */
967 : ssize_t
968 39759 : mnstr_read_block(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
969 : {
970 39759 : ssize_t len = 0;
971 39759 : char x = 0;
972 :
973 39759 : if (s == NULL || buf == NULL)
974 : return -1;
975 39759 : assert(s->read == bs_read || s->write == bs_write);
976 79518 : if ((len = mnstr_read(s, buf, elmsize, cnt)) < 0 ||
977 39759 : mnstr_read(s, &x, 0, 0) < 0 /* read prompt */ ||
978 39759 : x > 0)
979 1 : return -1;
980 : return len;
981 : }
|