LCOV - code coverage report
Current view: top level - common/stream - stream.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 270 374 72.2 %
Date: 2024-12-19 23:10:26 Functions: 39 59 66.1 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /* stream
      14             :  * ======
      15             :  * Niels Nes
      16             :  * An simple interface to streams
      17             :  *
      18             :  * Processing files, streams, and sockets is quite different on Linux
      19             :  * and Windows platforms. To improve portability between both, we advise
      20             :  * to replace the stdio actions with the stream functionality provided
      21             :  * here.
      22             :  *
      23             :  * This interface can also be used to open 'non compressed, gzipped,
      24             :  * bz2zipped' data files and sockets. Using this interface one could
      25             :  * easily switch between the various underlying storage types.
      26             :  *
      27             :  * buffered streams
      28             :  * ----------------
      29             :  *
      30             :  * The bstream (or buffered_stream) can be used for efficient reading of
      31             :  * a stream. Reading can be done in large chunks and access can be done
      32             :  * in smaller bits, by directly accessing the underlying buffer.
      33             :  *
      34             :  * Beware that a flush on a buffered stream emits an empty block to
      35             :  * synchronize with the other side, telling it has reached the end of
      36             :  * the sequence and can close its descriptors.
      37             :  *
      38             :  * bstream functions
      39             :  * -----------------
      40             :  *
      41             :  * The bstream_create gets a read stream (rs) as input and the initial
      42             :  * chunk size and creates a buffered stream from this. A spare byte is
      43             :  * kept at the end of the buffer.  The bstream_read will at least read
      44             :  * the next 'size' bytes. If the not read data (aka pos < len) together
      45             :  * with the new data will not fit in the current buffer it is resized.
      46             :  * The spare byte is kept.
      47             :  *
      48             :  * tee streams
      49             :  * -----------
      50             :  *
      51             :  * A tee stream is a write stream that duplicates all output to two
      52             :  * write streams of the same type (txt/bin).
      53             :  */
      54             : 
      55             : /* Generic stream handling code such as init and close */
      56             : 
      57             : #include "monetdb_config.h"
      58             : #include "stream.h"
      59             : #include "stream_internal.h"
      60             : #include <stdio.h>
      61             : 
      62             : 
      63             : #ifdef HAVE_PTHREAD_H
      64             : #include <pthread.h>
      65             : #endif
      66             : 
      67             : struct tl_error_buf {
      68             :         char msg[1024];
      69             : };
      70             : 
      71             : static int tl_error_init(void);
      72             : static struct tl_error_buf *get_tl_error_buf(void);
      73             : 
      74             : #ifdef HAVE_PTHREAD_H
      75             : 
      76             : static pthread_key_t tl_error_key;
      77             : 
      78             : static void
      79         587 : clear_main_tl_error_buf(void)
      80             : {
      81         587 :         void *p = pthread_getspecific(tl_error_key);
      82         587 :         if (p != NULL) {
      83         583 :                 pthread_setspecific(tl_error_key, NULL);
      84         583 :                 free(p);
      85             :         }
      86         587 : }
      87             : 
      88             : static int
      89         587 : tl_error_init(void)
      90             : {
      91         587 :         if (pthread_key_create(&tl_error_key, free) != 0)
      92             :                 return -1;
      93             :         // Turns out the destructor registered with pthread_key_create() does not
      94             :         // always run for the main thread. This atexit hook clears the main thread's
      95             :         // error buffer to avoid this being reported as a memory leak.
      96         587 :         atexit(clear_main_tl_error_buf);
      97         587 :         return 0;
      98             : }
      99             : 
     100             : static struct tl_error_buf*
     101      396844 : get_tl_error_buf(void)
     102             : {
     103      396844 :         struct tl_error_buf *p = pthread_getspecific(tl_error_key);
     104      396844 :         if (p == NULL) {
     105        4150 :                 p = malloc(sizeof(*p));
     106        4150 :                 if (p == NULL)
     107             :                         return NULL;
     108        4150 :                 *p = (struct tl_error_buf) { .msg = {0} };
     109        4150 :                 pthread_setspecific(tl_error_key, p);
     110        4150 :                 struct tl_error_buf *second_attempt = pthread_getspecific(tl_error_key);
     111        4150 :                 assert(p == second_attempt && "maybe mnstr_init has not been called?");
     112             :                 (void) second_attempt; // suppress warning if asserts disabled
     113             :         }
     114             :         return p;
     115             : }
     116             : 
     117             : #elif defined(WIN32)
     118             : 
     119             : static DWORD tl_error_key = 0;
     120             : 
     121             : static int
     122             : tl_error_init(void)
     123             : {
     124             :         DWORD key = TlsAlloc();
     125             :         if (key == TLS_OUT_OF_INDEXES)
     126             :                 return -1;
     127             :         else {
     128             :                 tl_error_key = key;
     129             :                 return 0;
     130             :         }
     131             : }
     132             : 
     133             : static struct tl_error_buf*
     134             : get_tl_error_buf(void)
     135             : {
     136             :         struct tl_error_buf *p = TlsGetValue(tl_error_key);
     137             : 
     138             :         if (p == NULL) {
     139             :                 if (GetLastError() != ERROR_SUCCESS)
     140             :                         return NULL; // something went terribly wrong
     141             : 
     142             :                 // otherwise, initialize
     143             :                 p = malloc(sizeof(*p));
     144             :                 if (p == NULL)
     145             :                         return NULL;
     146             :                 *p = (struct tl_error_buf) { .msg = 0 };
     147             :                 if (!TlsSetValue(tl_error_key, p)) {
     148             :                         free(p);
     149             :                         return NULL;
     150             :                 }
     151             : 
     152             :                 struct tl_error_buf *second_attempt = TlsGetValue(tl_error_key);
     153             :                 assert(p == second_attempt /* maybe mnstr_init has not been called? */);
     154             :                 (void) second_attempt; // suppress warning if asserts disabled
     155             :         }
     156             :         return p;
     157             : }
     158             : 
     159             : #else
     160             : 
     161             : #error "no pthreads and no Windows, don't know what to do"
     162             : 
     163             : #endif
     164             : 
     165             : static const char *mnstr_error_kind_description(mnstr_error_kind kind);
     166             : 
     167             : int
     168         788 : mnstr_init(void)
     169             : {
     170         788 :         static ATOMIC_FLAG inited = ATOMIC_FLAG_INIT;
     171             : 
     172         788 :         if (ATOMIC_TAS(&inited))
     173             :                 return 0;
     174             : 
     175         587 :         if (tl_error_init()< 0)
     176             :                 return -1;
     177             : 
     178             : #ifdef NATIVE_WIN32
     179             :         WSADATA w;
     180             :         if (WSAStartup(0x0101, &w) != 0)
     181             :                 return -1;
     182             : #endif
     183             : 
     184             :         return 0;
     185             : }
     186             : 
     187             : const char *
     188           0 : mnstr_version(void)
     189             : {
     190           0 :         return STREAM_VERSION;
     191             : }
     192             : 
     193             : /* Read at most cnt elements of size elmsize from the stream.  Returns
     194             :  * the number of elements actually read or < 0 on failure. */
     195             : ssize_t
     196    17275829 : mnstr_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
     197             : {
     198    17275829 :         if (s == NULL || buf == NULL)
     199             :                 return -1;
     200             : #ifdef STREAM_DEBUG
     201             :         fprintf(stderr, "read %s %zu %zu\n",
     202             :                 s->name ? s->name : "<unnamed>", elmsize, cnt);
     203             : #endif
     204    17275829 :         assert(s->readonly);
     205    17275829 :         if (s->errkind != MNSTR_NO__ERROR)
     206             :                 return -1;
     207    17275829 :         return s->read(s, buf, elmsize, cnt);
     208             : }
     209             : 
     210             : 
     211             : /* Write cnt elements of size elmsize to the stream.  Returns the
     212             :  * number of elements actually written.  If elmsize or cnt equals zero,
     213             :  * returns cnt. */
     214             : ssize_t
     215    37147119 : mnstr_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
     216             : {
     217    37147119 :         if (s == NULL || buf == NULL)
     218             :                 return -1;
     219             : #ifdef STREAM_DEBUG
     220             :         fprintf(stderr, "write %s %zu %zu\n",
     221             :                 s->name ? s->name : "<unnamed>", elmsize, cnt);
     222             : #endif
     223    37147117 :         assert(!s->readonly);
     224    37147117 :         if (s->errkind != MNSTR_NO__ERROR)
     225             :                 return -1;
     226    37147117 :         return s->write(s, buf, elmsize, cnt);
     227             : }
     228             : 
     229             : 
     230             : /* Read one line (separated by \n) of at most maxcnt-1 characters from
     231             :  * the stream.  Returns the number of characters actually read,
     232             :  * includes the trailing \n; terminated by a NULL byte. */
     233             : ssize_t
     234      111191 : mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt)
     235             : {
     236      111191 :         char *b = buf, *start = buf;
     237             : 
     238      111191 :         if (s == NULL || buf == NULL)
     239             :                 return -1;
     240             : #ifdef STREAM_DEBUG
     241             :         fprintf(stderr, "readline %s %zu\n",
     242             :                 s->name ? s->name : "<unnamed>", maxcnt);
     243             : #endif
     244      111191 :         assert(s->readonly);
     245      111191 :         if (s->errkind != MNSTR_NO__ERROR)
     246             :                 return -1;
     247      111191 :         if (maxcnt == 0)
     248             :                 return 0;
     249      111191 :         if (maxcnt == 1) {
     250           0 :                 *start = 0;
     251           0 :                 return 0;
     252             :         }
     253    18508446 :         for (;;) {
     254    18508446 :                 switch (s->read(s, start, 1, 1)) {
     255    18508180 :                 case 1:
     256             :                         /* successfully read a character,
     257             :                          * check whether it is the line
     258             :                          * separator and whether we have space
     259             :                          * left for more */
     260    18508180 :                         if (*start++ == '\n' || --maxcnt == 1) {
     261      110925 :                                 *start = 0;
     262      110925 :                                 return (ssize_t) (start - b);
     263             :                         }
     264             :                         break;
     265           0 :                 case -1:
     266             :                         /* error: if we didn't read anything yet,
     267             :                          * return the error, otherwise return what we
     268             :                          * have */
     269           0 :                         if (start == b)
     270             :                                 return -1;
     271             :                         /* fall through */
     272             :                 case 0:
     273             :                         /* end of file: return what we have */
     274         266 :                         *start = 0;
     275         266 :                         return (ssize_t) (start - b);
     276             :                 }
     277             :         }
     278             : }
     279             : 
     280             : 
     281             : void
     282       37503 : mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void *data)
     283             : {
     284       37503 :         if (s) {
     285       37503 :                 s->timeout = ms;
     286       37503 :                 s->timeout_func = func;
     287       37503 :                 s->timeout_data = data;
     288       37503 :                 if (s->update_timeout)
     289       37503 :                         s->update_timeout(s);
     290             :         }
     291       37498 : }
     292             : 
     293             : 
     294             : void
     295       78433 : mnstr_close(stream *s)
     296             : {
     297       78433 :         if (s) {
     298             : #ifdef STREAM_DEBUG
     299             :                 fprintf(stderr, "close %s\n", s->name ? s->name : "<unnamed>");
     300             : #endif
     301       78433 :                 s->close(s);
     302             :         }
     303       78428 : }
     304             : 
     305             : 
     306             : void
     307        1100 : mnstr_destroy(stream *s)
     308             : {
     309        1100 :         if (s) {
     310             : #ifdef STREAM_DEBUG
     311             :                 fprintf(stderr, "destroy %s\n",
     312             :                         s->name ? s->name : "<unnamed>");
     313             : #endif
     314        1097 :                 s->destroy(s);
     315             :         }
     316        1100 : }
     317             : 
     318             : void
     319         204 : mnstr_va_set_error(stream *s, mnstr_error_kind kind, const char *fmt, va_list ap)
     320             : {
     321         204 :         if (s == NULL)
     322             :                 return;
     323             : 
     324         204 :         if (s->errkind != MNSTR_NO__ERROR && kind != MNSTR_NO__ERROR) {
     325             :                 /* keep the first error */
     326             :                 return;
     327             :         }
     328             : 
     329         204 :         s->errkind = kind;
     330             : 
     331         204 :         if (kind == MNSTR_NO__ERROR) {
     332           0 :                 s->errmsg[0] = '\0';
     333           0 :                 return;
     334             :         }
     335             : 
     336         204 :         char *start = &s->errmsg[0];
     337         204 :         char *end = start + sizeof(s->errmsg);
     338         204 :         if (s->name != NULL)
     339         204 :                 start += snprintf(start, end - start, "stream %s: ", s->name);
     340             : 
     341         204 :         if (start >= end - 1)
     342             :                 return;
     343             : 
     344         204 :         if (fmt == NULL)
     345          19 :                 fmt = mnstr_error_kind_description(kind);
     346             : 
     347             :         // Complicated pointer dance in order to shut up 'might be a candidate
     348             :         // for gnu_printf format attribute' warning from gcc.
     349             :         // It's really eager to trace where the vsnprintf ends up, we need
     350             :         // the ? : to throw it off its scent.
     351             :         // Similarly, the parentheses around the 1 serve to suppress a Clang
     352             :         // warning about dead code (the atoi).
     353         204 :         void *f1 = (1) ? (void*)&vsnprintf : (void*)&atoi;
     354         204 :         int (*f)(char *str, size_t size, const char *format, va_list ap) = f1;
     355         204 :         f(start, end - start, fmt, ap);
     356             : }
     357             : 
     358             : void
     359          18 : mnstr_set_error(stream *s, mnstr_error_kind kind, const char *fmt, ...)
     360             : {
     361          18 :         va_list ap;
     362          18 :         va_start(ap, fmt);
     363          18 :         mnstr_va_set_error(s, kind, fmt, ap);
     364          18 :         va_end(ap);
     365          18 : }
     366             : 
     367             : static size_t my_strerror_r(int error_nr, char *buf, size_t len);
     368             : 
     369             : void
     370         186 : mnstr_set_error_errno(stream *s, mnstr_error_kind kind, const char *fmt, ...)
     371             : {
     372         186 :         int e = errno;
     373         186 :         va_list ap;
     374         186 :         va_start(ap, fmt);
     375         186 :         mnstr_va_set_error(s, kind, fmt, ap);
     376         186 :         va_end(ap);
     377             : 
     378             :         /* append as much as fits of the system error message */
     379         186 :         char *start = &s->errmsg[0] + strlen(s->errmsg);
     380         186 :         char *end = &s->errmsg[0] + sizeof(s->errmsg);
     381         186 :         if (end - start >= 3) {
     382         186 :                 start = stpcpy(start, ": ");
     383         186 :                 start += my_strerror_r(e, start, end - start);
     384             :         }
     385         186 : }
     386             : 
     387             : 
     388             : void
     389      396840 : mnstr_set_open_error(const char *name, int errnr, const char *fmt, ...)
     390             : {
     391      396840 :         va_list ap;
     392             : 
     393      396840 :         struct tl_error_buf *buf = get_tl_error_buf();
     394      396840 :         if (buf == NULL)
     395      396678 :                 return; // hopeless
     396             : 
     397      396840 :         if (errnr == 0 && fmt == NULL) {
     398      396678 :                 buf->msg[0] = '\0';
     399      396678 :                 return;
     400             :         }
     401             : 
     402         162 :         char *start = &buf->msg[0];
     403         162 :         char *end = start + sizeof(buf->msg);
     404             : 
     405         162 :         if (name != NULL)
     406         162 :                 start += snprintf(start, end - start, "when opening %s: ", name);
     407         162 :         if (start >= end - 1)
     408             :                 return;
     409             : 
     410         162 :         if (fmt != NULL) {
     411         162 :                 va_start(ap, fmt);
     412         162 :                 start += vsnprintf(start, end - start, fmt, ap);
     413         162 :                 va_end(ap);
     414             :         }
     415         162 :         if (start >= end - 1)
     416             :                 return;
     417             : 
     418         162 :         if (errnr != 0 && end - start >= 3) {
     419         159 :                 start = stpcpy(start, ": ");
     420         159 :                 start += my_strerror_r(errno, start, end - start);
     421             :         }
     422         162 :         if (start >= end - 1)
     423             :                 return;
     424             : }
     425             : 
     426             : static size_t
     427         345 : my_strerror_r(int error_nr, char *buf, size_t buflen)
     428             : {
     429             :         // Four cases:
     430             :         // 1. strerror_s
     431             :         // 2. gnu strerror_r (returns char* and does not always fill buffer)
     432             :         // 3. xsi strerror_r (returns int and always fills the buffer)
     433             :         // 4. no strerror_r and no strerror_s
     434         345 :         char *to_move;
     435             : #ifdef HAVE_STRERROR_S
     436             :         int result_code = strerror_s(buf, buflen, error_nr);
     437             :         if (result_code == 0)
     438             :                 to_move = NULL;
     439             :         else
     440             :                 to_move = "<failed to retrieve error message>";
     441             : #elif defined(HAVE_STRERROR_R)
     442             : #ifdef STRERROR_R_CHARP
     443             :         // gnu strerror_r sometimes only returns static string, needs copy
     444         345 :         to_move = strerror_r(error_nr, buf, buflen);
     445             : #else
     446             :         int result_code = strerror_r(error_nr, buf, buflen);
     447             :         if (result_code == 0)
     448             :                 to_move = NULL;
     449             :         else
     450             :                 to_move = "<failed to retrieve error message>";
     451             : #endif
     452             : #else
     453             :         // Hope for the best
     454             :         to_move = strerror(error_nr);
     455             : #endif
     456         345 :         if (to_move != NULL) {
     457             :                 // move to buffer
     458         345 :                 size_t size = strlen(to_move) + 1;
     459         345 :                 assert(size <= buflen);
     460             :                 // strerror_r may have return a pointer to/into the buffer
     461         345 :                 memmove(buf, to_move, size);
     462         345 :                 return size - 1;
     463             :         } else {
     464           0 :                 return strlen(buf);
     465             :         }
     466             : }
     467             : 
     468             : 
     469             : 
     470         204 : void mnstr_copy_error(stream *dst, stream *src)
     471             : {
     472         204 :         if (dst->errkind == MNSTR_NO__ERROR) {
     473         204 :                 dst->errkind = src->errkind;
     474         204 :                 memcpy(dst->errmsg, src->errmsg, sizeof(dst->errmsg));
     475             :         }
     476         204 : }
     477             : 
     478             : char *
     479           0 : mnstr_error(const stream *s)
     480             : {
     481           0 :         const char *msg = mnstr_peek_error(s);
     482           0 :         if (msg != NULL)
     483           0 :                 return strdup(msg);
     484             :         else
     485             :                 return NULL;
     486             : }
     487             : 
     488             : const char*
     489           6 : mnstr_peek_error(const stream *s)
     490             : {
     491           6 :         if (s == NULL) {
     492           4 :                 struct tl_error_buf *b = get_tl_error_buf();
     493           4 :                 if (b != NULL)
     494           4 :                         return b->msg;
     495             :                 else
     496             :                         return "unknown error";
     497             :         }
     498             : 
     499           2 :         if (s->errkind == MNSTR_NO__ERROR)
     500             :                 return "no error";
     501             : 
     502           2 :         if (s->errmsg[0] != '\0')
     503           2 :                 return s->errmsg;
     504             : 
     505           0 :         return mnstr_error_kind_description(s->errkind);
     506             : }
     507             : 
     508             : static const char *
     509          19 : mnstr_error_kind_description(mnstr_error_kind kind)
     510             : {
     511          19 :         switch (kind) {
     512             :         case MNSTR_NO__ERROR:
     513             :                 /* unreachable */
     514           0 :                 assert(0);
     515             :                 return NULL;
     516             :         case MNSTR_OPEN_ERROR:
     517             :                 return "error could not open";
     518           1 :         case MNSTR_READ_ERROR:
     519           1 :                 return "error reading";
     520           0 :         case MNSTR_WRITE_ERROR:
     521           0 :                 return "error writing";
     522           0 :         case MNSTR_INTERRUPT:
     523           0 :                 return "interrupted";
     524             :         case MNSTR_TIMEOUT:
     525             :                 return "timeout";
     526             :         case MNSTR_UNEXPECTED_EOF:
     527             :                 return "timeout";
     528             :         }
     529             : 
     530           0 :         return "Unknown error";
     531             : }
     532             : 
     533             : /* flush buffer, return 0 on success, non-zero on failure */
     534             : int
     535      783189 : mnstr_flush(stream *s, mnstr_flush_level flush_level)
     536             : {
     537      783189 :         if (s == NULL)
     538             :                 return -1;
     539             : #ifdef STREAM_DEBUG
     540             :         fprintf(stderr, "flush %s\n", s->name ? s->name : "<unnamed>");
     541             : #endif
     542      783188 :         assert(!s->readonly);
     543      783188 :         if (s->errkind != MNSTR_NO__ERROR)
     544             :                 return -1;
     545      783188 :         if (s->flush)
     546      781688 :                 return s->flush(s, flush_level);
     547             :         return 0;
     548             : }
     549             : 
     550             : 
     551             : /* sync file to disk, return 0 on success, non-zero on failure */
     552             : int
     553          33 : mnstr_fsync(stream *s)
     554             : {
     555          33 :         if (s == NULL)
     556             :                 return -1;
     557             : #ifdef STREAM_DEBUG
     558             :         fprintf(stderr, "fsync %s (%d)\n",
     559             :                 s->name ? s->name : "<unnamed>", s->errnr);
     560             : #endif
     561          33 :         assert(!s->readonly);
     562          33 :         if (s->errkind != MNSTR_NO__ERROR)
     563             :                 return -1;
     564          33 :         if (s->fsync)
     565          33 :                 return s->fsync(s);
     566             :         return 0;
     567             : }
     568             : 
     569             : 
     570             : int
     571           0 : mnstr_fgetpos(stream *restrict s, fpos_t *restrict p)
     572             : {
     573           0 :         if (s == NULL || p == NULL)
     574             :                 return -1;
     575             : #ifdef STREAM_DEBUG
     576             :         fprintf(stderr, "fgetpos %s\n", s->name ? s->name : "<unnamed>");
     577             : #endif
     578           0 :         if (s->errkind != MNSTR_NO__ERROR)
     579             :                 return -1;
     580           0 :         if (s->fgetpos)
     581           0 :                 return s->fgetpos(s, p);
     582             :         return 0;
     583             : }
     584             : 
     585             : 
     586             : int
     587           0 : mnstr_fsetpos(stream *restrict s, fpos_t *restrict p)
     588             : {
     589           0 :         if (s == NULL)
     590             :                 return -1;
     591             : #ifdef STREAM_DEBUG
     592             :         fprintf(stderr, "fsetpos %s\n", s->name ? s->name : "<unnamed>");
     593             : #endif
     594           0 :         if (s->errkind != MNSTR_NO__ERROR)
     595             :                 return -1;
     596           0 :         if (s->fsetpos)
     597           0 :                 return s->fsetpos(s, p);
     598             :         return 0;
     599             : }
     600             : 
     601             : 
     602             : int
     603           0 : mnstr_isalive(const stream *s)
     604             : {
     605           0 :         if (s == NULL)
     606             :                 return 0;
     607           0 :         if (s->errkind != MNSTR_NO__ERROR)
     608             :                 return -1;
     609           0 :         if (s->isalive)
     610           0 :                 return s->isalive(s);
     611             :         return 1;
     612             : }
     613             : 
     614             : int
     615    16453163 : mnstr_getoob(const stream *s)
     616             : {
     617    16453163 :         if (s->getoob)
     618    15299420 :                 return s->getoob(s);
     619             :         return 0;
     620             : }
     621             : 
     622             : int
     623           0 : mnstr_putoob(const stream *s, char val)
     624             : {
     625           0 :         if (s->putoob)
     626           0 :                 return s->putoob(s, val);
     627             :         return -1;
     628             : }
     629             : 
     630             : 
     631             : bool
     632      232188 : mnstr_eof(const stream *s)
     633             : {
     634      232188 :         return s->eof;
     635             : }
     636             : 
     637             : const char *
     638       26382 : mnstr_name(const stream *s)
     639             : {
     640       26382 :         if (s == NULL)
     641             :                 return "connection terminated";
     642       26382 :         return s->name;
     643             : }
     644             : 
     645             : 
     646             : mnstr_error_kind
     647     5230110 : mnstr_errnr(const stream *s)
     648             : {
     649     5230110 :         if (s == NULL)
     650             :                 return MNSTR_READ_ERROR;
     651     5230110 :         return s->errkind;
     652             : }
     653             : 
     654             : const char *
     655           0 : mnstr_error_kind_name(mnstr_error_kind k)
     656             : {
     657           0 :         switch (k) {
     658             :         case MNSTR_NO__ERROR:
     659             :                 return "MNSTR_NO__ERROR";
     660           0 :         case MNSTR_OPEN_ERROR:
     661           0 :                 return "MNSTR_OPEN_ERROR";
     662           0 :         case MNSTR_READ_ERROR:
     663           0 :                 return "MNSTR_READ_ERROR";
     664           0 :         case MNSTR_WRITE_ERROR:
     665           0 :                 return "MNSTR_WRITE_ERROR";
     666           0 :         case MNSTR_INTERRUPT:
     667           0 :                 return "MNSTR_INTERRUPT";
     668           0 :         case MNSTR_TIMEOUT:
     669           0 :                 return "MNSTR_TIMEOUT";
     670           0 :         default:
     671           0 :                 return "<UNKNOWN_ERROR>";
     672             :         }
     673             : 
     674             : }
     675             : 
     676             : static void
     677           0 : clearerror(stream *s)
     678             : {
     679           0 :         if (s != NULL) {
     680           0 :                 s->errkind = MNSTR_NO__ERROR;
     681           0 :                 s->errmsg[0] = '\0';
     682             :         }
     683           0 : }
     684             : 
     685             : void
     686           0 : mnstr_clearerr(stream *s)
     687             : {
     688           0 :         clearerror(s);
     689           0 :         if (s != NULL && s->clrerr)
     690           0 :                 s->clrerr(s);
     691           0 : }
     692             : 
     693             : 
     694             : bool
     695         602 : mnstr_isbinary(const stream *s)
     696             : {
     697         602 :         if (s == NULL)
     698             :                 return false;
     699         602 :         return s->binary;
     700             : }
     701             : 
     702             : 
     703             : bool
     704           0 : mnstr_get_swapbytes(const stream *s)
     705             : {
     706           0 :         if (s == NULL)
     707             :                 return 0;
     708           0 :         return s->swapbytes;
     709             : }
     710             : 
     711             : 
     712             : /* set stream to big-endian/little-endian byte order; the default is
     713             :  * native byte order */
     714             : void
     715       38910 : mnstr_set_bigendian(stream *s, bool bigendian)
     716             : {
     717       38910 :         if (s == NULL)
     718             :                 return;
     719             : #ifdef STREAM_DEBUG
     720             :         fprintf(stderr, "mnstr_set_bigendian %s %s\n",
     721             :                 s->name ? s->name : "<unnamed>",
     722             :                 swapbytes ? "true" : "false");
     723             : #endif
     724       38910 :         assert(s->readonly);
     725       38910 :         s->binary = true;
     726             : #ifdef WORDS_BIGENDIAN
     727             :         s->swapbytes = !bigendian;
     728             : #else
     729       38910 :         s->swapbytes = bigendian;
     730             : #endif
     731             : }
     732             : 
     733             : 
     734             : void
     735       68382 : close_stream(stream *s)
     736             : {
     737       68382 :         if (s) {
     738       68378 :                 if (s->close)
     739       68378 :                         s->close(s);
     740       68378 :                 if (s->destroy)
     741       68378 :                         s->destroy(s);
     742             :         }
     743       68383 : }
     744             : 
     745             : 
     746             : void
     747      396561 : destroy_stream(stream *s)
     748             : {
     749      396561 :         if (s->name)
     750      396561 :                 free(s->name);
     751      396561 :         free(s);
     752      396561 : }
     753             : 
     754             : 
     755             : stream *
     756      396678 : create_stream(const char *name)
     757             : {
     758      396678 :         stream *s;
     759             : 
     760      396678 :         if (name == NULL) {
     761           0 :                 mnstr_set_open_error(NULL, 0, "internal error: name not set");
     762           0 :                 return NULL;
     763             :         }
     764      396678 :         if ((s = (stream *) malloc(sizeof(*s))) == NULL) {
     765           0 :                 mnstr_set_open_error(name, errno, "malloc failed");
     766           0 :                 return NULL;
     767             :         }
     768      396678 :         *s = (stream) {
     769             :                 .swapbytes = false,
     770             :                 .readonly = true,
     771             :                 .isutf8 = false,        /* not known for sure */
     772             :                 .binary = false,
     773             :                 .eof = false,
     774      396678 :                 .name = strdup(name),
     775             :                 .errkind = MNSTR_NO__ERROR,
     776             :                 .errmsg = {0},
     777             :                 .destroy = destroy_stream,
     778             :                 .clrerr = clearerror,
     779             :         };
     780      396678 :         if(s->name == NULL) {
     781           0 :                 free(s);
     782           0 :                 mnstr_set_open_error(name, errno, "malloc failed");
     783           0 :                 return NULL;
     784             :         }
     785             : #ifdef STREAM_DEBUG
     786             :         fprintf(stderr, "create_stream %s -> %p\n",
     787             :                 name ? name : "<unnamed>", s);
     788             : #endif
     789      396678 :         mnstr_set_open_error(NULL, 0, NULL); // clear the error
     790      396678 :         return s;
     791             : }
     792             : 
     793             : 
     794             : static ssize_t
     795           0 : wrapper_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
     796             : {
     797           0 :         ssize_t ret = s->inner->read(s->inner, buf, elmsize, cnt);
     798           0 :         s->eof |= s->inner->eof;
     799           0 :         return ret;
     800             : }
     801             : 
     802             : 
     803             : static ssize_t
     804           0 : wrapper_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
     805             : {
     806           0 :         return s->inner->write(s->inner, buf, elmsize, cnt);
     807             : }
     808             : 
     809             : 
     810             : static void
     811           0 : wrapper_close(stream *s)
     812             : {
     813           0 :         s->inner->close(s->inner);
     814           0 : }
     815             : 
     816             : 
     817             : static void
     818           0 : wrapper_clrerr(stream *s)
     819             : {
     820           0 :         s->inner->clrerr(s->inner);
     821           0 : }
     822             : 
     823             : 
     824             : static void
     825           0 : wrapper_destroy(stream *s)
     826             : {
     827           0 :         s->inner->destroy(s->inner);
     828           0 :         destroy_stream(s);
     829           0 : }
     830             : 
     831             : 
     832             : static int
     833           0 : wrapper_flush(stream *s, mnstr_flush_level flush_level)
     834             : {
     835           0 :         return s->inner->flush(s->inner, flush_level);
     836             : }
     837             : 
     838             : 
     839             : static int
     840           4 : wrapper_fsync(stream *s)
     841             : {
     842           4 :         return s->inner->fsync(s->inner);
     843             : }
     844             : 
     845             : 
     846             : static int
     847           0 : wrapper_fgetpos(stream *restrict s, fpos_t *restrict p)
     848             : {
     849           0 :         return s->inner->fgetpos(s->inner, p);
     850             : }
     851             : 
     852             : 
     853             : static int
     854           0 : wrapper_fsetpos(stream *restrict s, fpos_t *restrict p)
     855             : {
     856           0 :         return s->inner->fsetpos(s->inner, p);
     857             : }
     858             : 
     859             : 
     860             : static void
     861       37504 : wrapper_update_timeout(stream *s)
     862             : {
     863       37504 :         s->inner->timeout = s->timeout;
     864       37504 :         s->inner->timeout_func = s->timeout_func;
     865       37504 :         s->inner->timeout_data = s->timeout_data;
     866       37504 :         s->inner->update_timeout(s->inner);
     867       37498 : }
     868             : 
     869             : 
     870             : static int
     871           0 : wrapper_isalive(const stream *s)
     872             : {
     873           0 :         return s->inner->isalive(s->inner);
     874             : }
     875             : 
     876             : 
     877             : static int
     878    15291546 : wrapper_getoob(const stream *s)
     879             : {
     880    15291546 :         return s->inner->getoob(s->inner);
     881             : }
     882             : 
     883             : 
     884             : static int
     885           0 : wrapper_putoob(const stream *s, char val)
     886             : {
     887           0 :         return s->inner->putoob(s->inner, val);
     888             : }
     889             : 
     890             : 
     891             : stream *
     892       79041 : create_wrapper_stream(const char *name, stream *inner)
     893             : {
     894       79041 :         if (inner == NULL)
     895             :                 return NULL;
     896       79041 :         if (name == NULL)
     897       79041 :                 name = inner->name;
     898       79041 :         stream *s = create_stream(name);
     899       79041 :         if (s == NULL)
     900             :                 return NULL;
     901             : 
     902             : 
     903       79041 :         s->swapbytes = inner->swapbytes;
     904       79041 :         s->readonly = inner->readonly;
     905       79041 :         s->isutf8 = inner->isutf8;
     906       79041 :         s->binary = inner->binary;
     907       79041 :         s->timeout = inner->timeout;
     908       79041 :         s->inner = inner;
     909             : 
     910       79041 :         s->read = inner->read == NULL ? NULL : wrapper_read;
     911       79041 :         s->write = inner->write == NULL ? NULL : wrapper_write;
     912       79041 :         s->close = inner->close == NULL ? NULL : wrapper_close;
     913       79041 :         s->clrerr = inner->clrerr == NULL ? NULL : wrapper_clrerr;
     914       79041 :         s->destroy = wrapper_destroy;
     915       79041 :         s->flush = inner->flush == NULL ? NULL : wrapper_flush;
     916       79041 :         s->fsync = inner->fsync == NULL ? NULL : wrapper_fsync;
     917       79041 :         s->fgetpos = inner->fgetpos == NULL ? NULL : wrapper_fgetpos;
     918       79041 :         s->fsetpos = inner->fsetpos == NULL ? NULL : wrapper_fsetpos;
     919       79041 :         s->isalive = inner->isalive == NULL ? NULL : wrapper_isalive;
     920       79041 :         s->getoob = inner->getoob == NULL ? NULL : wrapper_getoob;
     921       79041 :         s->putoob = inner->putoob == NULL ? NULL : wrapper_putoob;
     922       79041 :         s->update_timeout = inner->update_timeout == NULL ? NULL : wrapper_update_timeout;
     923             : 
     924       79041 :         return s;
     925             : }
     926             : 
     927             : /* ------------------------------------------------------------------ */
     928             : /* streams working on a disk file, compressed or not */
     929             : 
     930             : stream *
     931       13815 : open_rstream(const char *filename)
     932             : {
     933       13815 :         if (filename == NULL)
     934             :                 return NULL;
     935             : #ifdef STREAM_DEBUG
     936             :         fprintf(stderr, "open_rstream %s\n", filename);
     937             : #endif
     938             : 
     939       13815 :         stream *s = open_stream(filename, "rb");
     940       13815 :         if (s == NULL)
     941             :                 return NULL;
     942             : 
     943       13656 :         stream *c = compressed_stream(s, 0);
     944       13656 :         if (c == NULL)
     945           0 :                 close_stream(s);
     946             : 
     947             :         return c;
     948             : }
     949             : 
     950             : stream *
     951       12725 : open_wstream(const char *filename)
     952             : {
     953       12725 :         if (filename == NULL)
     954             :                 return NULL;
     955             : #ifdef STREAM_DEBUG
     956             :         fprintf(stderr, "open_wstream %s\n", filename);
     957             : #endif
     958             : 
     959       12725 :         stream *s = open_stream(filename, "wb");
     960       12725 :         if (s == NULL)
     961             :                 return NULL;
     962             : 
     963       12725 :         stream *c = compressed_stream(s, 0);
     964       12725 :         if (c == NULL) {
     965           0 :                 close_stream(s);
     966           0 :                 (void) file_remove(filename);
     967             :         }
     968             : 
     969             :         return c;
     970             : }
     971             : 
     972             : stream *
     973         171 : open_rastream(const char *filename)
     974             : {
     975         171 :         if (filename == NULL)
     976             :                 return NULL;
     977             : #ifdef STREAM_DEBUG
     978             :         fprintf(stderr, "open_rastream %s\n", filename);
     979             : #endif
     980         171 :         stream *s = open_rstream(filename);
     981         171 :         if (s == NULL)
     982             :                 return NULL;
     983             : 
     984         137 :         stream *t = create_text_stream(s);
     985         137 :         if (t == NULL)
     986           0 :                 close_stream(s);
     987             : 
     988             :         return t;
     989             : }
     990             : 
     991             : stream *
     992          40 : open_wastream(const char *filename)
     993             : {
     994          40 :         if (filename == NULL)
     995             :                 return NULL;
     996             : #ifdef STREAM_DEBUG
     997             :         fprintf(stderr, "open_wastream %s\n", filename);
     998             : #endif
     999          40 :         stream *s = open_wstream(filename);
    1000          40 :         if (s == NULL)
    1001             :                 return NULL;
    1002             : 
    1003          40 :         stream *t = create_text_stream(s);
    1004          40 :         if (t == NULL) {
    1005           0 :                 close_stream(s);
    1006           0 :                 (void) file_remove(filename);
    1007             :         }
    1008             : 
    1009             :         return t;
    1010             : }
    1011             : 
    1012             : 
    1013             : /* put here because it depends on both bs_read AND bs2_read */
    1014             : bool
    1015      508745 : isa_block_stream(const stream *s)
    1016             : {
    1017      508745 :         assert(s != NULL);
    1018     1017490 :         return s &&
    1019      508745 :                 ((s->read == bs_read ||
    1020        3415 :                   s->write == bs_write));
    1021             : }
    1022             : 
    1023             : 
    1024             : /* Put here because I need to think very carefully about this
    1025             :  * mnstr_read(,, 0, 0). What would that mean?
    1026             :  */
    1027             : ssize_t
    1028       39122 : mnstr_read_block(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
    1029             : {
    1030       39122 :         ssize_t len = 0;
    1031       39122 :         char x = 0;
    1032             : 
    1033       39122 :         if (s == NULL || buf == NULL)
    1034             :                 return -1;
    1035       39122 :         assert(s->read == bs_read || s->write == bs_write);
    1036       78231 :         if ((len = mnstr_read(s, buf, elmsize, cnt)) < 0 ||
    1037       39117 :             mnstr_read(s, &x, 0, 0) < 0 /* read prompt */  ||
    1038       39109 :             x > 0)
    1039           1 :                 return -1;
    1040             :         return len;
    1041             : }

Generated by: LCOV version 1.14