Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /* stream
14 : * ======
15 : * Niels Nes
16 : * An simple interface to streams
17 : *
18 : * Processing files, streams, and sockets is quite different on Linux
19 : * and Windows platforms. To improve portability between both, we advise
20 : * to replace the stdio actions with the stream functionality provided
21 : * here.
22 : *
23 : * This interface can also be used to open 'non compressed, gzipped,
24 : * bz2zipped' data files and sockets. Using this interface one could
25 : * easily switch between the various underlying storage types.
26 : *
27 : * buffered streams
28 : * ----------------
29 : *
30 : * The bstream (or buffered_stream) can be used for efficient reading of
31 : * a stream. Reading can be done in large chunks and access can be done
32 : * in smaller bits, by directly accessing the underlying buffer.
33 : *
34 : * Beware that a flush on a buffered stream emits an empty block to
35 : * synchronize with the other side, telling it has reached the end of
36 : * the sequence and can close its descriptors.
37 : *
38 : * bstream functions
39 : * -----------------
40 : *
41 : * The bstream_create gets a read stream (rs) as input and the initial
42 : * chunk size and creates a buffered stream from this. A spare byte is
43 : * kept at the end of the buffer. The bstream_read will at least read
44 : * the next 'size' bytes. If the not read data (aka pos < len) together
45 : * with the new data will not fit in the current buffer it is resized.
46 : * The spare byte is kept.
47 : *
48 : * tee streams
49 : * -----------
50 : *
51 : * A tee stream is a write stream that duplicates all output to two
52 : * write streams of the same type (txt/bin).
53 : */
54 :
55 : /* Generic stream handling code such as init and close */
56 :
57 : #include "monetdb_config.h"
58 : #include "stream.h"
59 : #include "stream_internal.h"
60 : #include <stdio.h>
61 :
62 :
63 : #ifdef HAVE_PTHREAD_H
64 : #include <pthread.h>
65 : #endif
66 :
67 : struct tl_error_buf {
68 : char msg[1024];
69 : };
70 :
71 : static int tl_error_init(void);
72 : static struct tl_error_buf *get_tl_error_buf(void);
73 :
74 : #ifdef HAVE_PTHREAD_H
75 :
76 : static pthread_key_t tl_error_key;
77 :
78 : static void
79 565 : clear_main_tl_error_buf(void)
80 : {
81 565 : void *p = pthread_getspecific(tl_error_key);
82 565 : if (p != NULL) {
83 561 : pthread_setspecific(tl_error_key, NULL);
84 561 : free(p);
85 : }
86 565 : }
87 :
88 : static int
89 565 : tl_error_init(void)
90 : {
91 565 : if (pthread_key_create(&tl_error_key, free) != 0)
92 : return -1;
93 : // Turns out the destructor registered with pthread_key_create() does not
94 : // always run for the main thread. This atexit hook clears the main thread's
95 : // error buffer to avoid this being reported as a memory leak.
96 565 : atexit(clear_main_tl_error_buf);
97 565 : return 0;
98 : }
99 :
100 : static struct tl_error_buf*
101 392548 : get_tl_error_buf(void)
102 : {
103 392548 : struct tl_error_buf *p = pthread_getspecific(tl_error_key);
104 392549 : if (p == NULL) {
105 4065 : p = malloc(sizeof(*p));
106 4065 : if (p == NULL)
107 : return NULL;
108 4065 : *p = (struct tl_error_buf) { .msg = {0} };
109 4065 : pthread_setspecific(tl_error_key, p);
110 4065 : struct tl_error_buf *second_attempt = pthread_getspecific(tl_error_key);
111 4065 : assert(p == second_attempt && "maybe mnstr_init has not been called?");
112 : (void) second_attempt; // suppress warning if asserts disabled
113 : }
114 : return p;
115 : }
116 :
117 : #elif defined(WIN32)
118 :
119 : static DWORD tl_error_key = 0;
120 :
121 : static int
122 : tl_error_init(void)
123 : {
124 : DWORD key = TlsAlloc();
125 : if (key == TLS_OUT_OF_INDEXES)
126 : return -1;
127 : else {
128 : tl_error_key = key;
129 : return 0;
130 : }
131 : }
132 :
133 : static struct tl_error_buf*
134 : get_tl_error_buf(void)
135 : {
136 : struct tl_error_buf *p = TlsGetValue(tl_error_key);
137 :
138 : if (p == NULL) {
139 : if (GetLastError() != ERROR_SUCCESS)
140 : return NULL; // something went terribly wrong
141 :
142 : // otherwise, initialize
143 : p = malloc(sizeof(*p));
144 : if (p == NULL)
145 : return NULL;
146 : *p = (struct tl_error_buf) { .msg = 0 };
147 : if (!TlsSetValue(tl_error_key, p)) {
148 : free(p);
149 : return NULL;
150 : }
151 :
152 : struct tl_error_buf *second_attempt = TlsGetValue(tl_error_key);
153 : assert(p == second_attempt /* maybe mnstr_init has not been called? */);
154 : (void) second_attempt; // suppress warning if asserts disabled
155 : }
156 : return p;
157 : }
158 :
159 : #else
160 :
161 : #error "no pthreads and no Windows, don't know what to do"
162 :
163 : #endif
164 :
165 : static const char *mnstr_error_kind_description(mnstr_error_kind kind);
166 :
167 : int
168 768 : mnstr_init(void)
169 : {
170 768 : static ATOMIC_FLAG inited = ATOMIC_FLAG_INIT;
171 :
172 768 : if (ATOMIC_TAS(&inited))
173 : return 0;
174 :
175 565 : if (tl_error_init()< 0)
176 : return -1;
177 :
178 : #ifdef NATIVE_WIN32
179 : WSADATA w;
180 : if (WSAStartup(0x0101, &w) != 0)
181 : return -1;
182 : #endif
183 :
184 : return 0;
185 : }
186 :
187 : const char *
188 0 : mnstr_version(void)
189 : {
190 0 : return STREAM_VERSION;
191 : }
192 :
193 : /* Read at most cnt elements of size elmsize from the stream. Returns
194 : * the number of elements actually read or < 0 on failure. */
195 : ssize_t
196 17268465 : mnstr_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
197 : {
198 17268465 : if (s == NULL || buf == NULL)
199 : return -1;
200 : #ifdef STREAM_DEBUG
201 : fprintf(stderr, "read %s %zu %zu\n",
202 : s->name ? s->name : "<unnamed>", elmsize, cnt);
203 : #endif
204 17268465 : assert(s->readonly);
205 17268465 : if (s->errkind != MNSTR_NO__ERROR)
206 : return -1;
207 17268465 : return s->read(s, buf, elmsize, cnt);
208 : }
209 :
210 :
211 : /* Write cnt elements of size elmsize to the stream. Returns the
212 : * number of elements actually written. If elmsize or cnt equals zero,
213 : * returns cnt. */
214 : ssize_t
215 37134145 : mnstr_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
216 : {
217 37134145 : if (s == NULL || buf == NULL)
218 : return -1;
219 : #ifdef STREAM_DEBUG
220 : fprintf(stderr, "write %s %zu %zu\n",
221 : s->name ? s->name : "<unnamed>", elmsize, cnt);
222 : #endif
223 37134143 : assert(!s->readonly);
224 37134143 : if (s->errkind != MNSTR_NO__ERROR)
225 : return -1;
226 37134143 : return s->write(s, buf, elmsize, cnt);
227 : }
228 :
229 :
230 : /* Read one line (separated by \n) of at most maxcnt-1 characters from
231 : * the stream. Returns the number of characters actually read,
232 : * includes the trailing \n; terminated by a NULL byte. */
233 : ssize_t
234 111190 : mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt)
235 : {
236 111190 : char *b = buf, *start = buf;
237 :
238 111190 : if (s == NULL || buf == NULL)
239 : return -1;
240 : #ifdef STREAM_DEBUG
241 : fprintf(stderr, "readline %s %zu\n",
242 : s->name ? s->name : "<unnamed>", maxcnt);
243 : #endif
244 111190 : assert(s->readonly);
245 111190 : if (s->errkind != MNSTR_NO__ERROR)
246 : return -1;
247 111190 : if (maxcnt == 0)
248 : return 0;
249 111190 : if (maxcnt == 1) {
250 0 : *start = 0;
251 0 : return 0;
252 : }
253 18508309 : for (;;) {
254 18508309 : switch (s->read(s, start, 1, 1)) {
255 18508043 : case 1:
256 : /* successfully read a character,
257 : * check whether it is the line
258 : * separator and whether we have space
259 : * left for more */
260 18508043 : if (*start++ == '\n' || --maxcnt == 1) {
261 110924 : *start = 0;
262 110924 : return (ssize_t) (start - b);
263 : }
264 : break;
265 0 : case -1:
266 : /* error: if we didn't read anything yet,
267 : * return the error, otherwise return what we
268 : * have */
269 0 : if (start == b)
270 : return -1;
271 : /* fall through */
272 : case 0:
273 : /* end of file: return what we have */
274 266 : *start = 0;
275 266 : return (ssize_t) (start - b);
276 : }
277 : }
278 : }
279 :
280 :
281 : void
282 37353 : mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void *data)
283 : {
284 37353 : if (s) {
285 37353 : s->timeout = ms;
286 37353 : s->timeout_func = func;
287 37353 : s->timeout_data = data;
288 37353 : if (s->update_timeout)
289 37353 : s->update_timeout(s);
290 : }
291 37353 : }
292 :
293 :
294 : void
295 78083 : mnstr_close(stream *s)
296 : {
297 78083 : if (s) {
298 : #ifdef STREAM_DEBUG
299 : fprintf(stderr, "close %s\n", s->name ? s->name : "<unnamed>");
300 : #endif
301 78083 : s->close(s);
302 : }
303 78080 : }
304 :
305 :
306 : void
307 1095 : mnstr_destroy(stream *s)
308 : {
309 1095 : if (s) {
310 : #ifdef STREAM_DEBUG
311 : fprintf(stderr, "destroy %s\n",
312 : s->name ? s->name : "<unnamed>");
313 : #endif
314 1093 : s->destroy(s);
315 : }
316 1095 : }
317 :
318 : void
319 181 : mnstr_va_set_error(stream *s, mnstr_error_kind kind, const char *fmt, va_list ap)
320 : {
321 181 : if (s == NULL)
322 : return;
323 :
324 181 : if (s->errkind != MNSTR_NO__ERROR && kind != MNSTR_NO__ERROR) {
325 : /* keep the first error */
326 : return;
327 : }
328 :
329 181 : s->errkind = kind;
330 :
331 181 : if (kind == MNSTR_NO__ERROR) {
332 0 : s->errmsg[0] = '\0';
333 0 : return;
334 : }
335 :
336 181 : char *start = &s->errmsg[0];
337 181 : char *end = start + sizeof(s->errmsg);
338 181 : if (s->name != NULL)
339 181 : start += snprintf(start, end - start, "stream %s: ", s->name);
340 :
341 181 : if (start >= end - 1)
342 : return;
343 :
344 181 : if (fmt == NULL)
345 19 : fmt = mnstr_error_kind_description(kind);
346 :
347 : // Complicated pointer dance in order to shut up 'might be a candidate
348 : // for gnu_printf format attribute' warning from gcc.
349 : // It's really eager to trace where the vsnprintf ends up, we need
350 : // the ? : to throw it off its scent.
351 : // Similarly, the parentheses around the 1 serve to suppress a Clang
352 : // warning about dead code (the atoi).
353 181 : void *f1 = (1) ? (void*)&vsnprintf : (void*)&atoi;
354 181 : int (*f)(char *str, size_t size, const char *format, va_list ap) = f1;
355 181 : f(start, end - start, fmt, ap);
356 : }
357 :
358 : void
359 18 : mnstr_set_error(stream *s, mnstr_error_kind kind, const char *fmt, ...)
360 : {
361 18 : va_list ap;
362 18 : va_start(ap, fmt);
363 18 : mnstr_va_set_error(s, kind, fmt, ap);
364 18 : va_end(ap);
365 18 : }
366 :
367 : static size_t my_strerror_r(int error_nr, char *buf, size_t len);
368 :
369 : void
370 163 : mnstr_set_error_errno(stream *s, mnstr_error_kind kind, const char *fmt, ...)
371 : {
372 163 : int e = errno;
373 163 : va_list ap;
374 163 : va_start(ap, fmt);
375 163 : mnstr_va_set_error(s, kind, fmt, ap);
376 163 : va_end(ap);
377 :
378 : /* append as much as fits of the system error message */
379 163 : char *start = &s->errmsg[0] + strlen(s->errmsg);
380 163 : char *end = &s->errmsg[0] + sizeof(s->errmsg);
381 163 : if (end - start >= 3) {
382 163 : start = stpcpy(start, ": ");
383 163 : start += my_strerror_r(e, start, end - start);
384 : }
385 163 : }
386 :
387 :
388 : void
389 392545 : mnstr_set_open_error(const char *name, int errnr, const char *fmt, ...)
390 : {
391 392545 : va_list ap;
392 :
393 392545 : struct tl_error_buf *buf = get_tl_error_buf();
394 392546 : if (buf == NULL)
395 392408 : return; // hopeless
396 :
397 392546 : if (errnr == 0 && fmt == NULL) {
398 392408 : buf->msg[0] = '\0';
399 392408 : return;
400 : }
401 :
402 138 : char *start = &buf->msg[0];
403 138 : char *end = start + sizeof(buf->msg);
404 :
405 138 : if (name != NULL)
406 138 : start += snprintf(start, end - start, "when opening %s: ", name);
407 138 : if (start >= end - 1)
408 : return;
409 :
410 138 : if (fmt != NULL) {
411 138 : va_start(ap, fmt);
412 138 : start += vsnprintf(start, end - start, fmt, ap);
413 138 : va_end(ap);
414 : }
415 138 : if (start >= end - 1)
416 : return;
417 :
418 138 : if (errnr != 0 && end - start >= 3) {
419 136 : start = stpcpy(start, ": ");
420 136 : start += my_strerror_r(errno, start, end - start);
421 : }
422 138 : if (start >= end - 1)
423 : return;
424 : }
425 :
426 : static size_t
427 299 : my_strerror_r(int error_nr, char *buf, size_t buflen)
428 : {
429 : // Four cases:
430 : // 1. strerror_s
431 : // 2. gnu strerror_r (returns char* and does not always fill buffer)
432 : // 3. xsi strerror_r (returns int and always fills the buffer)
433 : // 4. no strerror_r and no strerror_s
434 299 : char *to_move;
435 : #ifdef HAVE_STRERROR_S
436 : int result_code = strerror_s(buf, buflen, error_nr);
437 : if (result_code == 0)
438 : to_move = NULL;
439 : else
440 : to_move = "<failed to retrieve error message>";
441 : #elif defined(HAVE_STRERROR_R)
442 : #ifdef STRERROR_R_CHARP
443 : // gnu strerror_r sometimes only returns static string, needs copy
444 299 : to_move = strerror_r(error_nr, buf, buflen);
445 : #else
446 : int result_code = strerror_r(error_nr, buf, buflen);
447 : if (result_code == 0)
448 : to_move = NULL;
449 : else
450 : to_move = "<failed to retrieve error message>";
451 : #endif
452 : #else
453 : // Hope for the best
454 : to_move = strerror(error_nr);
455 : #endif
456 299 : if (to_move != NULL) {
457 : // move to buffer
458 299 : size_t size = strlen(to_move) + 1;
459 299 : assert(size <= buflen);
460 : // strerror_r may have return a pointer to/into the buffer
461 299 : memmove(buf, to_move, size);
462 299 : return size - 1;
463 : } else {
464 0 : return strlen(buf);
465 : }
466 : }
467 :
468 :
469 :
470 181 : void mnstr_copy_error(stream *dst, stream *src)
471 : {
472 181 : if (dst->errkind == MNSTR_NO__ERROR) {
473 181 : dst->errkind = src->errkind;
474 181 : memcpy(dst->errmsg, src->errmsg, sizeof(dst->errmsg));
475 : }
476 181 : }
477 :
478 : char *
479 0 : mnstr_error(const stream *s)
480 : {
481 0 : const char *msg = mnstr_peek_error(s);
482 0 : if (msg != NULL)
483 0 : return strdup(msg);
484 : else
485 : return NULL;
486 : }
487 :
488 : const char*
489 5 : mnstr_peek_error(const stream *s)
490 : {
491 5 : if (s == NULL) {
492 3 : struct tl_error_buf *b = get_tl_error_buf();
493 3 : if (b != NULL)
494 3 : return b->msg;
495 : else
496 : return "unknown error";
497 : }
498 :
499 2 : if (s->errkind == MNSTR_NO__ERROR)
500 : return "no error";
501 :
502 2 : if (s->errmsg[0] != '\0')
503 2 : return s->errmsg;
504 :
505 0 : return mnstr_error_kind_description(s->errkind);
506 : }
507 :
508 : static const char *
509 19 : mnstr_error_kind_description(mnstr_error_kind kind)
510 : {
511 19 : switch (kind) {
512 : case MNSTR_NO__ERROR:
513 : /* unreachable */
514 0 : assert(0);
515 : return NULL;
516 : case MNSTR_OPEN_ERROR:
517 : return "error could not open";
518 1 : case MNSTR_READ_ERROR:
519 1 : return "error reading";
520 0 : case MNSTR_WRITE_ERROR:
521 0 : return "error writing";
522 0 : case MNSTR_INTERRUPT:
523 0 : return "interrupted";
524 : case MNSTR_TIMEOUT:
525 : return "timeout";
526 : case MNSTR_UNEXPECTED_EOF:
527 : return "timeout";
528 : }
529 :
530 0 : return "Unknown error";
531 : }
532 :
533 : /* flush buffer, return 0 on success, non-zero on failure */
534 : int
535 781569 : mnstr_flush(stream *s, mnstr_flush_level flush_level)
536 : {
537 781569 : if (s == NULL)
538 : return -1;
539 : #ifdef STREAM_DEBUG
540 : fprintf(stderr, "flush %s\n", s->name ? s->name : "<unnamed>");
541 : #endif
542 781568 : assert(!s->readonly);
543 781568 : if (s->errkind != MNSTR_NO__ERROR)
544 : return -1;
545 781568 : if (s->flush)
546 780066 : return s->flush(s, flush_level);
547 : return 0;
548 : }
549 :
550 :
551 : /* sync file to disk, return 0 on success, non-zero on failure */
552 : int
553 36 : mnstr_fsync(stream *s)
554 : {
555 36 : if (s == NULL)
556 : return -1;
557 : #ifdef STREAM_DEBUG
558 : fprintf(stderr, "fsync %s (%d)\n",
559 : s->name ? s->name : "<unnamed>", s->errnr);
560 : #endif
561 36 : assert(!s->readonly);
562 36 : if (s->errkind != MNSTR_NO__ERROR)
563 : return -1;
564 36 : if (s->fsync)
565 36 : return s->fsync(s);
566 : return 0;
567 : }
568 :
569 :
570 : int
571 0 : mnstr_fgetpos(stream *restrict s, fpos_t *restrict p)
572 : {
573 0 : if (s == NULL || p == NULL)
574 : return -1;
575 : #ifdef STREAM_DEBUG
576 : fprintf(stderr, "fgetpos %s\n", s->name ? s->name : "<unnamed>");
577 : #endif
578 0 : if (s->errkind != MNSTR_NO__ERROR)
579 : return -1;
580 0 : if (s->fgetpos)
581 0 : return s->fgetpos(s, p);
582 : return 0;
583 : }
584 :
585 :
586 : int
587 0 : mnstr_fsetpos(stream *restrict s, fpos_t *restrict p)
588 : {
589 0 : if (s == NULL)
590 : return -1;
591 : #ifdef STREAM_DEBUG
592 : fprintf(stderr, "fsetpos %s\n", s->name ? s->name : "<unnamed>");
593 : #endif
594 0 : if (s->errkind != MNSTR_NO__ERROR)
595 : return -1;
596 0 : if (s->fsetpos)
597 0 : return s->fsetpos(s, p);
598 : return 0;
599 : }
600 :
601 :
602 : int
603 0 : mnstr_isalive(const stream *s)
604 : {
605 0 : if (s == NULL)
606 : return 0;
607 0 : if (s->errkind != MNSTR_NO__ERROR)
608 : return -1;
609 0 : if (s->isalive)
610 0 : return s->isalive(s);
611 : return 1;
612 : }
613 :
614 : int
615 16112219 : mnstr_getoob(const stream *s)
616 : {
617 16112219 : if (s->getoob)
618 15166116 : return s->getoob(s);
619 : return 0;
620 : }
621 :
622 : int
623 0 : mnstr_putoob(const stream *s, char val)
624 : {
625 0 : if (s->putoob)
626 0 : return s->putoob(s, val);
627 : return -1;
628 : }
629 :
630 :
631 : bool
632 231963 : mnstr_eof(const stream *s)
633 : {
634 231963 : return s->eof;
635 : }
636 :
637 : const char *
638 26261 : mnstr_name(const stream *s)
639 : {
640 26261 : if (s == NULL)
641 : return "connection terminated";
642 26261 : return s->name;
643 : }
644 :
645 :
646 : mnstr_error_kind
647 5233538 : mnstr_errnr(const stream *s)
648 : {
649 5233538 : if (s == NULL)
650 : return MNSTR_READ_ERROR;
651 5233538 : return s->errkind;
652 : }
653 :
654 : const char *
655 0 : mnstr_error_kind_name(mnstr_error_kind k)
656 : {
657 0 : switch (k) {
658 : case MNSTR_NO__ERROR:
659 : return "MNSTR_NO__ERROR";
660 0 : case MNSTR_OPEN_ERROR:
661 0 : return "MNSTR_OPEN_ERROR";
662 0 : case MNSTR_READ_ERROR:
663 0 : return "MNSTR_READ_ERROR";
664 0 : case MNSTR_WRITE_ERROR:
665 0 : return "MNSTR_WRITE_ERROR";
666 0 : case MNSTR_INTERRUPT:
667 0 : return "MNSTR_INTERRUPT";
668 0 : case MNSTR_TIMEOUT:
669 0 : return "MNSTR_TIMEOUT";
670 0 : default:
671 0 : return "<UNKNOWN_ERROR>";
672 : }
673 :
674 : }
675 :
676 : static void
677 0 : clearerror(stream *s)
678 : {
679 0 : if (s != NULL) {
680 0 : s->errkind = MNSTR_NO__ERROR;
681 0 : s->errmsg[0] = '\0';
682 : }
683 0 : }
684 :
685 : void
686 0 : mnstr_clearerr(stream *s)
687 : {
688 0 : clearerror(s);
689 0 : if (s != NULL && s->clrerr)
690 0 : s->clrerr(s);
691 0 : }
692 :
693 :
694 : bool
695 602 : mnstr_isbinary(const stream *s)
696 : {
697 602 : if (s == NULL)
698 : return false;
699 602 : return s->binary;
700 : }
701 :
702 :
703 : bool
704 0 : mnstr_get_swapbytes(const stream *s)
705 : {
706 0 : if (s == NULL)
707 : return 0;
708 0 : return s->swapbytes;
709 : }
710 :
711 :
712 : /* set stream to big-endian/little-endian byte order; the default is
713 : * native byte order */
714 : void
715 38771 : mnstr_set_bigendian(stream *s, bool bigendian)
716 : {
717 38771 : if (s == NULL)
718 : return;
719 : #ifdef STREAM_DEBUG
720 : fprintf(stderr, "mnstr_set_bigendian %s %s\n",
721 : s->name ? s->name : "<unnamed>",
722 : swapbytes ? "true" : "false");
723 : #endif
724 38771 : assert(s->readonly);
725 38771 : s->binary = true;
726 : #ifdef WORDS_BIGENDIAN
727 : s->swapbytes = !bigendian;
728 : #else
729 38771 : s->swapbytes = bigendian;
730 : #endif
731 : }
732 :
733 :
734 : void
735 68042 : close_stream(stream *s)
736 : {
737 68042 : if (s) {
738 68039 : if (s->close)
739 68039 : s->close(s);
740 68039 : if (s->destroy)
741 68039 : s->destroy(s);
742 : }
743 68043 : }
744 :
745 :
746 : void
747 392316 : destroy_stream(stream *s)
748 : {
749 392316 : if (s->name)
750 392316 : free(s->name);
751 392316 : free(s);
752 392316 : }
753 :
754 :
755 : stream *
756 392407 : create_stream(const char *name)
757 : {
758 392407 : stream *s;
759 :
760 392407 : if (name == NULL) {
761 0 : mnstr_set_open_error(NULL, 0, "internal error: name not set");
762 0 : return NULL;
763 : }
764 392407 : if ((s = (stream *) malloc(sizeof(*s))) == NULL) {
765 0 : mnstr_set_open_error(name, errno, "malloc failed");
766 0 : return NULL;
767 : }
768 392407 : *s = (stream) {
769 : .swapbytes = false,
770 : .readonly = true,
771 : .isutf8 = false, /* not known for sure */
772 : .binary = false,
773 : .eof = false,
774 392407 : .name = strdup(name),
775 : .errkind = MNSTR_NO__ERROR,
776 : .errmsg = {0},
777 : .destroy = destroy_stream,
778 : .clrerr = clearerror,
779 : };
780 392407 : if(s->name == NULL) {
781 0 : free(s);
782 0 : mnstr_set_open_error(name, errno, "malloc failed");
783 0 : return NULL;
784 : }
785 : #ifdef STREAM_DEBUG
786 : fprintf(stderr, "create_stream %s -> %p\n",
787 : name ? name : "<unnamed>", s);
788 : #endif
789 392407 : mnstr_set_open_error(NULL, 0, NULL); // clear the error
790 392407 : return s;
791 : }
792 :
793 :
794 : static ssize_t
795 0 : wrapper_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
796 : {
797 0 : ssize_t ret = s->inner->read(s->inner, buf, elmsize, cnt);
798 0 : s->eof |= s->inner->eof;
799 0 : return ret;
800 : }
801 :
802 :
803 : static ssize_t
804 0 : wrapper_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
805 : {
806 0 : return s->inner->write(s->inner, buf, elmsize, cnt);
807 : }
808 :
809 :
810 : static void
811 0 : wrapper_close(stream *s)
812 : {
813 0 : s->inner->close(s->inner);
814 0 : }
815 :
816 :
817 : static void
818 0 : wrapper_clrerr(stream *s)
819 : {
820 0 : s->inner->clrerr(s->inner);
821 0 : }
822 :
823 :
824 : static void
825 0 : wrapper_destroy(stream *s)
826 : {
827 0 : s->inner->destroy(s->inner);
828 0 : destroy_stream(s);
829 0 : }
830 :
831 :
832 : static int
833 0 : wrapper_flush(stream *s, mnstr_flush_level flush_level)
834 : {
835 0 : return s->inner->flush(s->inner, flush_level);
836 : }
837 :
838 :
839 : static int
840 4 : wrapper_fsync(stream *s)
841 : {
842 4 : return s->inner->fsync(s->inner);
843 : }
844 :
845 :
846 : static int
847 0 : wrapper_fgetpos(stream *restrict s, fpos_t *restrict p)
848 : {
849 0 : return s->inner->fgetpos(s->inner, p);
850 : }
851 :
852 :
853 : static int
854 0 : wrapper_fsetpos(stream *restrict s, fpos_t *restrict p)
855 : {
856 0 : return s->inner->fsetpos(s->inner, p);
857 : }
858 :
859 :
860 : static void
861 37354 : wrapper_update_timeout(stream *s)
862 : {
863 37354 : s->inner->timeout = s->timeout;
864 37354 : s->inner->timeout_func = s->timeout_func;
865 37354 : s->inner->timeout_data = s->timeout_data;
866 37354 : s->inner->update_timeout(s->inner);
867 37351 : }
868 :
869 :
870 : static int
871 0 : wrapper_isalive(const stream *s)
872 : {
873 0 : return s->inner->isalive(s->inner);
874 : }
875 :
876 :
877 : static int
878 15156084 : wrapper_getoob(const stream *s)
879 : {
880 15156084 : return s->inner->getoob(s->inner);
881 : }
882 :
883 :
884 : static int
885 0 : wrapper_putoob(const stream *s, char val)
886 : {
887 0 : return s->inner->putoob(s->inner, val);
888 : }
889 :
890 :
891 : stream *
892 78683 : create_wrapper_stream(const char *name, stream *inner)
893 : {
894 78683 : if (inner == NULL)
895 : return NULL;
896 78683 : if (name == NULL)
897 78683 : name = inner->name;
898 78683 : stream *s = create_stream(name);
899 78683 : if (s == NULL)
900 : return NULL;
901 :
902 :
903 78683 : s->swapbytes = inner->swapbytes;
904 78683 : s->readonly = inner->readonly;
905 78683 : s->isutf8 = inner->isutf8;
906 78683 : s->binary = inner->binary;
907 78683 : s->timeout = inner->timeout;
908 78683 : s->inner = inner;
909 :
910 78683 : s->read = inner->read == NULL ? NULL : wrapper_read;
911 78683 : s->write = inner->write == NULL ? NULL : wrapper_write;
912 78683 : s->close = inner->close == NULL ? NULL : wrapper_close;
913 78683 : s->clrerr = inner->clrerr == NULL ? NULL : wrapper_clrerr;
914 78683 : s->destroy = wrapper_destroy;
915 78683 : s->flush = inner->flush == NULL ? NULL : wrapper_flush;
916 78683 : s->fsync = inner->fsync == NULL ? NULL : wrapper_fsync;
917 78683 : s->fgetpos = inner->fgetpos == NULL ? NULL : wrapper_fgetpos;
918 78683 : s->fsetpos = inner->fsetpos == NULL ? NULL : wrapper_fsetpos;
919 78683 : s->isalive = inner->isalive == NULL ? NULL : wrapper_isalive;
920 78683 : s->getoob = inner->getoob == NULL ? NULL : wrapper_getoob;
921 78683 : s->putoob = inner->putoob == NULL ? NULL : wrapper_putoob;
922 78683 : s->update_timeout = inner->update_timeout == NULL ? NULL : wrapper_update_timeout;
923 :
924 78683 : return s;
925 : }
926 :
927 : /* ------------------------------------------------------------------ */
928 : /* streams working on a disk file, compressed or not */
929 :
930 : stream *
931 13736 : open_rstream(const char *filename)
932 : {
933 13736 : if (filename == NULL)
934 : return NULL;
935 : #ifdef STREAM_DEBUG
936 : fprintf(stderr, "open_rstream %s\n", filename);
937 : #endif
938 :
939 13736 : stream *s = open_stream(filename, "rb");
940 13736 : if (s == NULL)
941 : return NULL;
942 :
943 13600 : stream *c = compressed_stream(s, 0);
944 13600 : if (c == NULL)
945 0 : close_stream(s);
946 :
947 : return c;
948 : }
949 :
950 : stream *
951 12660 : open_wstream(const char *filename)
952 : {
953 12660 : if (filename == NULL)
954 : return NULL;
955 : #ifdef STREAM_DEBUG
956 : fprintf(stderr, "open_wstream %s\n", filename);
957 : #endif
958 :
959 12660 : stream *s = open_stream(filename, "wb");
960 12660 : if (s == NULL)
961 : return NULL;
962 :
963 12660 : stream *c = compressed_stream(s, 0);
964 12660 : if (c == NULL) {
965 0 : close_stream(s);
966 0 : (void) file_remove(filename);
967 : }
968 :
969 : return c;
970 : }
971 :
972 : stream *
973 167 : open_rastream(const char *filename)
974 : {
975 167 : if (filename == NULL)
976 : return NULL;
977 : #ifdef STREAM_DEBUG
978 : fprintf(stderr, "open_rastream %s\n", filename);
979 : #endif
980 167 : stream *s = open_rstream(filename);
981 167 : if (s == NULL)
982 : return NULL;
983 :
984 133 : stream *t = create_text_stream(s);
985 133 : if (t == NULL)
986 0 : close_stream(s);
987 :
988 : return t;
989 : }
990 :
991 : stream *
992 26 : open_wastream(const char *filename)
993 : {
994 26 : if (filename == NULL)
995 : return NULL;
996 : #ifdef STREAM_DEBUG
997 : fprintf(stderr, "open_wastream %s\n", filename);
998 : #endif
999 26 : stream *s = open_wstream(filename);
1000 26 : if (s == NULL)
1001 : return NULL;
1002 :
1003 26 : stream *t = create_text_stream(s);
1004 26 : if (t == NULL) {
1005 0 : close_stream(s);
1006 0 : (void) file_remove(filename);
1007 : }
1008 :
1009 : return t;
1010 : }
1011 :
1012 :
1013 : /* put here because it depends on both bs_read AND bs2_read */
1014 : bool
1015 507789 : isa_block_stream(const stream *s)
1016 : {
1017 507789 : assert(s != NULL);
1018 1015578 : return s &&
1019 507789 : ((s->read == bs_read ||
1020 3415 : s->write == bs_write));
1021 : }
1022 :
1023 :
1024 : /* Put here because I need to think very carefully about this
1025 : * mnstr_read(,, 0, 0). What would that mean?
1026 : */
1027 : ssize_t
1028 38953 : mnstr_read_block(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
1029 : {
1030 38953 : ssize_t len = 0;
1031 38953 : char x = 0;
1032 :
1033 38953 : if (s == NULL || buf == NULL)
1034 : return -1;
1035 38953 : assert(s->read == bs_read || s->write == bs_write);
1036 77900 : if ((len = mnstr_read(s, buf, elmsize, cnt)) < 0 ||
1037 38949 : mnstr_read(s, &x, 0, 0) < 0 /* read prompt */ ||
1038 38947 : x > 0)
1039 1 : return -1;
1040 : return len;
1041 : }
|