LCOV - code coverage report
Current view: top level - common/stream - stream.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 287 371 77.4 %
Date: 2024-04-26 00:35:57 Functions: 44 59 74.6 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /* stream
      14             :  * ======
      15             :  * Niels Nes
      16             :  * An simple interface to streams
      17             :  *
      18             :  * Processing files, streams, and sockets is quite different on Linux
      19             :  * and Windows platforms. To improve portability between both, we advise
      20             :  * to replace the stdio actions with the stream functionality provided
      21             :  * here.
      22             :  *
      23             :  * This interface can also be used to open 'non compressed, gzipped,
      24             :  * bz2zipped' data files and sockets. Using this interface one could
      25             :  * easily switch between the various underlying storage types.
      26             :  *
      27             :  * buffered streams
      28             :  * ----------------
      29             :  *
      30             :  * The bstream (or buffered_stream) can be used for efficient reading of
      31             :  * a stream. Reading can be done in large chunks and access can be done
      32             :  * in smaller bits, by directly accessing the underlying buffer.
      33             :  *
      34             :  * Beware that a flush on a buffered stream emits an empty block to
      35             :  * synchronize with the other side, telling it has reached the end of
      36             :  * the sequence and can close its descriptors.
      37             :  *
      38             :  * bstream functions
      39             :  * -----------------
      40             :  *
      41             :  * The bstream_create gets a read stream (rs) as input and the initial
      42             :  * chunk size and creates a buffered stream from this. A spare byte is
      43             :  * kept at the end of the buffer.  The bstream_read will at least read
      44             :  * the next 'size' bytes. If the not read data (aka pos < len) together
      45             :  * with the new data will not fit in the current buffer it is resized.
      46             :  * The spare byte is kept.
      47             :  *
      48             :  * tee streams
      49             :  * -----------
      50             :  *
      51             :  * A tee stream is a write stream that duplicates all output to two
      52             :  * write streams of the same type (txt/bin).
      53             :  */
      54             : 
      55             : /* Generic stream handling code such as init and close */
      56             : 
      57             : #include "monetdb_config.h"
      58             : #include "stream.h"
      59             : #include "stream_internal.h"
      60             : #include <stdio.h>
      61             : 
      62             : 
      63             : #ifdef HAVE_PTHREAD_H
      64             : #include <pthread.h>
      65             : #endif
      66             : 
      67             : struct tl_error_buf {
      68             :         char msg[1024];
      69             : };
      70             : 
      71             : static int tl_error_init(void);
      72             : static struct tl_error_buf *get_tl_error_buf(void);
      73             : 
      74             : #ifdef HAVE_PTHREAD_H
      75             : 
      76             : static pthread_key_t tl_error_key;
      77             : 
      78             : static void
      79         536 : clear_main_tl_error_buf(void)
      80             : {
      81         536 :         void *p = pthread_getspecific(tl_error_key);
      82         536 :         if (p != NULL) {
      83         536 :                 pthread_setspecific(tl_error_key, NULL);
      84         536 :                 free(p);
      85             :         }
      86         536 : }
      87             : 
      88             : static int
      89         536 : tl_error_init(void)
      90             : {
      91         536 :         if (pthread_key_create(&tl_error_key, free) != 0)
      92             :                 return -1;
      93             :         // Turns out the destructor registered with pthread_key_create() does not
      94             :         // always run for the main thread. This atexit hook clears the main thread's
      95             :         // error buffer to avoid this being reported as a memory leak.
      96         536 :         atexit(clear_main_tl_error_buf);
      97         536 :         return 0;
      98             : }
      99             : 
     100             : static struct tl_error_buf*
     101      388725 : get_tl_error_buf(void)
     102             : {
     103      388725 :         struct tl_error_buf *p = pthread_getspecific(tl_error_key);
     104      388725 :         if (p == NULL) {
     105        3785 :                 p = malloc(sizeof(*p));
     106        3785 :                 if (p == NULL)
     107             :                         return NULL;
     108        3785 :                 *p = (struct tl_error_buf) { .msg = {0} };
     109        3785 :                 pthread_setspecific(tl_error_key, p);
     110        3785 :                 struct tl_error_buf *second_attempt = pthread_getspecific(tl_error_key);
     111        3785 :                 assert(p == second_attempt && "maybe mnstr_init has not been called?");
     112             :                 (void) second_attempt; // suppress warning if asserts disabled
     113             :         }
     114             :         return p;
     115             : }
     116             : 
     117             : #elif defined(WIN32)
     118             : 
     119             : static DWORD tl_error_key = 0;
     120             : 
     121             : static int
     122             : tl_error_init(void)
     123             : {
     124             :         DWORD key = TlsAlloc();
     125             :         if (key == TLS_OUT_OF_INDEXES)
     126             :                 return -1;
     127             :         else {
     128             :                 tl_error_key = key;
     129             :                 return 0;
     130             :         }
     131             : }
     132             : 
     133             : static struct tl_error_buf*
     134             : get_tl_error_buf(void)
     135             : {
     136             :         struct tl_error_buf *p = TlsGetValue(tl_error_key);
     137             : 
     138             :         if (p == NULL) {
     139             :                 if (GetLastError() != ERROR_SUCCESS)
     140             :                         return NULL; // something went terribly wrong
     141             : 
     142             :                 // otherwise, initialize
     143             :                 p = malloc(sizeof(*p));
     144             :                 if (p == NULL)
     145             :                         return NULL;
     146             :                 *p = (struct tl_error_buf) { .msg = 0 };
     147             :                 if (!TlsSetValue(tl_error_key, p)) {
     148             :                         free(p);
     149             :                         return NULL;
     150             :                 }
     151             : 
     152             :                 struct tl_error_buf *second_attempt = TlsGetValue(tl_error_key);
     153             :                 assert(p == second_attempt /* maybe mnstr_init has not been called? */);
     154             :                 (void) second_attempt; // suppress warning if asserts disabled
     155             :         }
     156             :         return p;
     157             : }
     158             : 
     159             : #else
     160             : 
     161             : #error "no pthreads and no Windows, don't know what to do"
     162             : 
     163             : #endif
     164             : 
     165             : static const char *mnstr_error_kind_description(mnstr_error_kind kind);
     166             : 
     167             : int
     168         728 : mnstr_init(void)
     169             : {
     170         728 :         static ATOMIC_FLAG inited = ATOMIC_FLAG_INIT;
     171             : 
     172         728 :         if (ATOMIC_TAS(&inited))
     173             :                 return 0;
     174             : 
     175         536 :         if (tl_error_init()< 0)
     176             :                 return -1;
     177             : 
     178             : #ifdef NATIVE_WIN32
     179             :         WSADATA w;
     180             :         if (WSAStartup(0x0101, &w) != 0)
     181             :                 return -1;
     182             : #endif
     183             : 
     184             :         return 0;
     185             : }
     186             : 
     187             : const char *
     188           0 : mnstr_version(void)
     189             : {
     190           0 :         return STREAM_VERSION;
     191             : }
     192             : 
     193             : /* Read at most cnt elements of size elmsize from the stream.  Returns
     194             :  * the number of elements actually read or < 0 on failure. */
     195             : ssize_t
     196    17199170 : mnstr_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
     197             : {
     198    17199170 :         if (s == NULL || buf == NULL)
     199             :                 return -1;
     200             : #ifdef STREAM_DEBUG
     201             :         fprintf(stderr, "read %s %zu %zu\n",
     202             :                 s->name ? s->name : "<unnamed>", elmsize, cnt);
     203             : #endif
     204    17199170 :         assert(s->readonly);
     205    17199170 :         if (s->errkind != MNSTR_NO__ERROR)
     206             :                 return -1;
     207    17199170 :         return s->read(s, buf, elmsize, cnt);
     208             : }
     209             : 
     210             : 
     211             : /* Write cnt elements of size elmsize to the stream.  Returns the
     212             :  * number of elements actually written.  If elmsize or cnt equals zero,
     213             :  * returns cnt. */
     214             : ssize_t
     215    34775875 : mnstr_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
     216             : {
     217    34775875 :         if (s == NULL || buf == NULL)
     218             :                 return -1;
     219             : #ifdef STREAM_DEBUG
     220             :         fprintf(stderr, "write %s %zu %zu\n",
     221             :                 s->name ? s->name : "<unnamed>", elmsize, cnt);
     222             : #endif
     223    34775873 :         assert(!s->readonly);
     224    34775873 :         if (s->errkind != MNSTR_NO__ERROR)
     225             :                 return -1;
     226    34775863 :         return s->write(s, buf, elmsize, cnt);
     227             : }
     228             : 
     229             : 
     230             : /* Read one line (seperated by \n) of at most maxcnt-1 characters from
     231             :  * the stream.  Returns the number of characters actually read,
     232             :  * includes the trailing \n; terminated by a NULL byte. */
     233             : ssize_t
     234      109188 : mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt)
     235             : {
     236      109188 :         char *b = buf, *start = buf;
     237             : 
     238      109188 :         if (s == NULL || buf == NULL)
     239             :                 return -1;
     240             : #ifdef STREAM_DEBUG
     241             :         fprintf(stderr, "readline %s %zu\n",
     242             :                 s->name ? s->name : "<unnamed>", maxcnt);
     243             : #endif
     244      109188 :         assert(s->readonly);
     245      109188 :         if (s->errkind != MNSTR_NO__ERROR)
     246             :                 return -1;
     247      109188 :         if (maxcnt == 0)
     248             :                 return 0;
     249      109188 :         if (maxcnt == 1) {
     250           0 :                 *start = 0;
     251           0 :                 return 0;
     252             :         }
     253    18260978 :         for (;;) {
     254    18260978 :                 switch (s->read(s, start, 1, 1)) {
     255    18260717 :                 case 1:
     256             :                         /* successfully read a character,
     257             :                          * check whether it is the line
     258             :                          * separator and whether we have space
     259             :                          * left for more */
     260    18260717 :                         if (*start++ == '\n' || --maxcnt == 1) {
     261      108927 :                                 *start = 0;
     262      108927 :                                 return (ssize_t) (start - b);
     263             :                         }
     264             :                         break;
     265           0 :                 case -1:
     266             :                         /* error: if we didn't read anything yet,
     267             :                          * return the error, otherwise return what we
     268             :                          * have */
     269           0 :                         if (start == b)
     270             :                                 return -1;
     271             :                         /* fall through */
     272             :                 case 0:
     273             :                         /* end of file: return what we have */
     274         261 :                         *start = 0;
     275         261 :                         return (ssize_t) (start - b);
     276             :                 }
     277             :         }
     278             : }
     279             : 
     280             : 
     281             : void
     282       37977 : mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void *data)
     283             : {
     284       37977 :         if (s) {
     285       37977 :                 s->timeout = ms;
     286       37977 :                 s->timeout_func = func;
     287       37977 :                 s->timeout_data = data;
     288       37977 :                 if (s->update_timeout)
     289       37977 :                         s->update_timeout(s);
     290             :         }
     291       37976 : }
     292             : 
     293             : 
     294             : void
     295       79552 : mnstr_close(stream *s)
     296             : {
     297       79552 :         if (s) {
     298             : #ifdef STREAM_DEBUG
     299             :                 fprintf(stderr, "close %s\n", s->name ? s->name : "<unnamed>");
     300             : #endif
     301       79552 :                 s->close(s);
     302             :         }
     303       79550 : }
     304             : 
     305             : 
     306             : void
     307        1394 : mnstr_destroy(stream *s)
     308             : {
     309        1394 :         if (s) {
     310             : #ifdef STREAM_DEBUG
     311             :                 fprintf(stderr, "destroy %s\n",
     312             :                         s->name ? s->name : "<unnamed>");
     313             : #endif
     314        1388 :                 s->destroy(s);
     315             :         }
     316        1394 : }
     317             : 
     318             : void
     319         234 : mnstr_va_set_error(stream *s, mnstr_error_kind kind, const char *fmt, va_list ap)
     320             : {
     321         234 :         if (s == NULL)
     322             :                 return;
     323             : 
     324         234 :         s->errkind = kind;
     325             : 
     326         234 :         if (kind == MNSTR_NO__ERROR) {
     327           0 :                 s->errmsg[0] = '\0';
     328           0 :                 return;
     329             :         }
     330             : 
     331         234 :         char *start = &s->errmsg[0];
     332         234 :         char *end = start + sizeof(s->errmsg);
     333         234 :         if (s->name != NULL)
     334         234 :                 start += snprintf(start, end - start, "stream %s: ", s->name);
     335             : 
     336         234 :         if (start >= end - 1)
     337             :                 return;
     338             : 
     339         234 :         if (fmt == NULL)
     340          19 :                 fmt = mnstr_error_kind_description(kind);
     341             : 
     342             :         // Complicated pointer dance in order to shut up 'might be a candidate
     343             :         // for gnu_printf format attribute' warning from gcc.
     344             :         // It's really eager to trace where the vsnprintf ends up, we need
     345             :         // the ? : to throw it off its scent.
     346             :         // Similarly, the parentheses around the 1 serve to suppress a Clang
     347             :         // warning about dead code (the atoi).
     348         234 :         void *f1 = (1) ? (void*)&vsnprintf : (void*)&atoi;
     349         234 :         int (*f)(char *str, size_t size, const char *format, va_list ap) = f1;
     350         234 :         f(start, end - start, fmt, ap);
     351             : }
     352             : 
     353             : void
     354          20 : mnstr_set_error(stream *s, mnstr_error_kind kind, const char *fmt, ...)
     355             : {
     356          20 :         va_list ap;
     357          20 :         va_start(ap, fmt);
     358          20 :         mnstr_va_set_error(s, kind, fmt, ap);
     359          20 :         va_end(ap);
     360          20 : }
     361             : 
     362             : static size_t my_strerror_r(int error_nr, char *buf, size_t len);
     363             : 
     364             : void
     365         214 : mnstr_set_error_errno(stream *s, mnstr_error_kind kind, const char *fmt, ...)
     366             : {
     367         214 :         va_list ap;
     368         214 :         va_start(ap, fmt);
     369         214 :         mnstr_va_set_error(s, kind, fmt, ap);
     370         214 :         va_end(ap);
     371             : 
     372             :         /* append as much as fits of the system error message */
     373         214 :         char *start = &s->errmsg[0] + strlen(s->errmsg);
     374         214 :         char *end = &s->errmsg[0] + sizeof(s->errmsg);
     375         214 :         if (end - start >= 3) {
     376         214 :                 start = stpcpy(start, ": ");
     377         214 :                 start += my_strerror_r(errno, start, end - start);
     378             :         }
     379         214 : }
     380             : 
     381             : 
     382             : void
     383      388722 : mnstr_set_open_error(const char *name, int errnr, const char *fmt, ...)
     384             : {
     385      388722 :         va_list ap;
     386             : 
     387      388722 :         struct tl_error_buf *buf = get_tl_error_buf();
     388      388722 :         if (buf == NULL)
     389      388568 :                 return; // hopeless
     390             : 
     391      388722 :         if (errnr == 0 && fmt == NULL) {
     392      388568 :                 buf->msg[0] = '\0';
     393      388568 :                 return;
     394             :         }
     395             : 
     396         154 :         char *start = &buf->msg[0];
     397         154 :         char *end = start + sizeof(buf->msg);
     398             : 
     399         154 :         if (name != NULL)
     400         154 :                 start += snprintf(start, end - start, "when opening %s: ", name);
     401         154 :         if (start >= end - 1)
     402             :                 return;
     403             : 
     404         154 :         if (fmt != NULL) {
     405         154 :                 va_start(ap, fmt);
     406         154 :                 start += vsnprintf(start, end - start, fmt, ap);
     407         154 :                 va_end(ap);
     408             :         }
     409         154 :         if (start >= end - 1)
     410             :                 return;
     411             : 
     412         154 :         if (errnr != 0 && end - start >= 3) {
     413         152 :                 start = stpcpy(start, ": ");
     414         152 :                 start += my_strerror_r(errno, start, end - start);
     415             :         }
     416         154 :         if (start >= end - 1)
     417             :                 return;
     418             : }
     419             : 
     420             : static size_t
     421         366 : my_strerror_r(int error_nr, char *buf, size_t buflen)
     422             : {
     423             :         // Three cases:
     424             :         // 1. no strerror_r
     425             :         // 2. gnu strerror_r (returns char* and does not always fill buffer)
     426             :         // 3. xsi strerror_r (returns int and always fills the buffer)
     427         366 :         char *to_move;
     428             : #ifndef HAVE_STRERROR_R
     429             :         // Hope for the best
     430             :         to_move = strerror(error_nr);
     431             : #elif !defined(_GNU_SOURCE) || !_GNU_SOURCE
     432             :         // standard strerror_r always writes to buf
     433             :         int result_code = strerror_r(error_nr, buf, buflen);
     434             :         if (result_code == 0)
     435             :                 to_move = NULL;
     436             :         else
     437             :                 to_move = "<failed to retrieve error message>";
     438             : #else
     439             :         // gnu strerror_r sometimes only returns static string, needs copy
     440         366 :         to_move = strerror_r(error_nr, buf, buflen);
     441             : #endif
     442         366 :         if (to_move != NULL) {
     443             :                 // move to buffer
     444         366 :                 size_t size = strlen(to_move) + 1;
     445         366 :                 assert(size <= buflen);
     446             :                 // strerror_r may have return a pointer to/into the buffer
     447         366 :                 memmove(buf, to_move, size);
     448         366 :                 return size - 1;
     449             :         } else {
     450           0 :                 return strlen(buf);
     451             :         }
     452             : }
     453             : 
     454             : 
     455             : 
     456         232 : void mnstr_copy_error(stream *dst, stream *src)
     457             : {
     458         232 :         dst->errkind = src->errkind;
     459         232 :         memcpy(dst->errmsg, src->errmsg, sizeof(dst->errmsg));
     460         232 : }
     461             : 
     462             : char *
     463           0 : mnstr_error(const stream *s)
     464             : {
     465           0 :         const char *msg = mnstr_peek_error(s);
     466           0 :         if (msg != NULL)
     467           0 :                 return strdup(msg);
     468             :         else
     469             :                 return NULL;
     470             : }
     471             : 
     472             : const char*
     473           7 : mnstr_peek_error(const stream *s)
     474             : {
     475           7 :         if (s == NULL) {
     476           3 :                 struct tl_error_buf *b = get_tl_error_buf();
     477           3 :                 if (b != NULL)
     478           3 :                         return b->msg;
     479             :                 else
     480             :                         return "unknown error";
     481             :         }
     482             : 
     483           4 :         if (s->errkind == MNSTR_NO__ERROR)
     484             :                 return "no error";
     485             : 
     486           4 :         if (s->errmsg[0] != '\0')
     487           4 :                 return s->errmsg;
     488             : 
     489           0 :         return mnstr_error_kind_description(s->errkind);
     490             : }
     491             : 
     492             : static const char *
     493          19 : mnstr_error_kind_description(mnstr_error_kind kind)
     494             : {
     495          19 :         switch (kind) {
     496             :         case MNSTR_NO__ERROR:
     497             :                 /* unreachable */
     498           0 :                 assert(0);
     499             :                 return NULL;
     500             :         case MNSTR_OPEN_ERROR:
     501             :                 return "error could not open";
     502           1 :         case MNSTR_READ_ERROR:
     503           1 :                 return "error reading";
     504           0 :         case MNSTR_WRITE_ERROR:
     505           0 :                 return "error writing";
     506           0 :         case MNSTR_INTERRUPT:
     507           0 :                 return "interrupted";
     508             :         case MNSTR_TIMEOUT:
     509             :                 return "timeout";
     510             :         case MNSTR_UNEXPECTED_EOF:
     511             :                 return "timeout";
     512             :         }
     513             : 
     514           0 :         return "Unknown error";
     515             : }
     516             : 
     517             : /* flush buffer, return 0 on success, non-zero on failure */
     518             : int
     519      729657 : mnstr_flush(stream *s, mnstr_flush_level flush_level)
     520             : {
     521      729657 :         if (s == NULL)
     522             :                 return -1;
     523             : #ifdef STREAM_DEBUG
     524             :         fprintf(stderr, "flush %s\n", s->name ? s->name : "<unnamed>");
     525             : #endif
     526      729656 :         assert(!s->readonly);
     527      729656 :         if (s->errkind != MNSTR_NO__ERROR)
     528             :                 return -1;
     529      729656 :         if (s->flush)
     530      728202 :                 return s->flush(s, flush_level);
     531             :         return 0;
     532             : }
     533             : 
     534             : 
     535             : /* sync file to disk, return 0 on success, non-zero on failure */
     536             : int
     537           8 : mnstr_fsync(stream *s)
     538             : {
     539           8 :         if (s == NULL)
     540             :                 return -1;
     541             : #ifdef STREAM_DEBUG
     542             :         fprintf(stderr, "fsync %s (%d)\n",
     543             :                 s->name ? s->name : "<unnamed>", s->errnr);
     544             : #endif
     545           8 :         assert(!s->readonly);
     546           8 :         if (s->errkind != MNSTR_NO__ERROR)
     547             :                 return -1;
     548           8 :         if (s->fsync)
     549           8 :                 return s->fsync(s);
     550             :         return 0;
     551             : }
     552             : 
     553             : 
     554             : int
     555           0 : mnstr_fgetpos(stream *restrict s, fpos_t *restrict p)
     556             : {
     557           0 :         if (s == NULL || p == NULL)
     558             :                 return -1;
     559             : #ifdef STREAM_DEBUG
     560             :         fprintf(stderr, "fgetpos %s\n", s->name ? s->name : "<unnamed>");
     561             : #endif
     562           0 :         if (s->errkind != MNSTR_NO__ERROR)
     563             :                 return -1;
     564           0 :         if (s->fgetpos)
     565           0 :                 return s->fgetpos(s, p);
     566             :         return 0;
     567             : }
     568             : 
     569             : 
     570             : int
     571           0 : mnstr_fsetpos(stream *restrict s, fpos_t *restrict p)
     572             : {
     573           0 :         if (s == NULL)
     574             :                 return -1;
     575             : #ifdef STREAM_DEBUG
     576             :         fprintf(stderr, "fsetpos %s\n", s->name ? s->name : "<unnamed>");
     577             : #endif
     578           0 :         if (s->errkind != MNSTR_NO__ERROR)
     579             :                 return -1;
     580           0 :         if (s->fsetpos)
     581           0 :                 return s->fsetpos(s, p);
     582             :         return 0;
     583             : }
     584             : 
     585             : 
     586             : int
     587    12291487 : mnstr_isalive(const stream *s)
     588             : {
     589    12291487 :         if (s == NULL)
     590             :                 return 0;
     591    12291487 :         if (s->errkind != MNSTR_NO__ERROR)
     592             :                 return -1;
     593    12291487 :         if (s->isalive)
     594    11341471 :                 return s->isalive(s);
     595             :         return 1;
     596             : }
     597             : 
     598             : int
     599     9403842 : mnstr_getoob(const stream *s)
     600             : {
     601     9403842 :         if (s->getoob)
     602     9397055 :                 return s->getoob(s);
     603             :         return 0;
     604             : }
     605             : 
     606             : int
     607           0 : mnstr_putoob(const stream *s, char val)
     608             : {
     609           0 :         if (s->putoob)
     610           0 :                 return s->putoob(s, val);
     611             :         return -1;
     612             : }
     613             : 
     614             : 
     615             : bool
     616      190197 : mnstr_eof(const stream *s)
     617             : {
     618      190197 :         return s->eof;
     619             : }
     620             : 
     621             : const char *
     622       24652 : mnstr_name(const stream *s)
     623             : {
     624       24652 :         if (s == NULL)
     625             :                 return "connection terminated";
     626       24652 :         return s->name;
     627             : }
     628             : 
     629             : 
     630             : mnstr_error_kind
     631     5198543 : mnstr_errnr(const stream *s)
     632             : {
     633     5198543 :         if (s == NULL)
     634             :                 return MNSTR_READ_ERROR;
     635     5198543 :         return s->errkind;
     636             : }
     637             : 
     638             : const char *
     639           0 : mnstr_error_kind_name(mnstr_error_kind k)
     640             : {
     641           0 :         switch (k) {
     642             :         case MNSTR_NO__ERROR:
     643             :                 return "MNSTR_NO__ERROR";
     644           0 :         case MNSTR_OPEN_ERROR:
     645           0 :                 return "MNSTR_OPEN_ERROR";
     646           0 :         case MNSTR_READ_ERROR:
     647           0 :                 return "MNSTR_READ_ERROR";
     648           0 :         case MNSTR_WRITE_ERROR:
     649           0 :                 return "MNSTR_WRITE_ERROR";
     650           0 :         case MNSTR_INTERRUPT:
     651           0 :                 return "MNSTR_INTERRUPT";
     652           0 :         case MNSTR_TIMEOUT:
     653           0 :                 return "MNSTR_TIMEOUT";
     654           0 :         default:
     655           0 :                 return "<UNKNOWN_ERROR>";
     656             :         }
     657             : 
     658             : }
     659             : 
     660             : static void
     661           4 : clearerror(stream *s)
     662             : {
     663           2 :         if (s != NULL) {
     664           4 :                 s->errkind = MNSTR_NO__ERROR;
     665           2 :                 s->errmsg[0] = '\0';
     666             :         }
     667           2 : }
     668             : 
     669             : void
     670           2 : mnstr_clearerr(stream *s)
     671             : {
     672           2 :         clearerror(s);
     673           2 :         if (s != NULL && s->clrerr)
     674           2 :                 s->clrerr(s);
     675           2 : }
     676             : 
     677             : 
     678             : bool
     679         605 : mnstr_isbinary(const stream *s)
     680             : {
     681         605 :         if (s == NULL)
     682             :                 return false;
     683         605 :         return s->binary;
     684             : }
     685             : 
     686             : 
     687             : bool
     688           0 : mnstr_get_swapbytes(const stream *s)
     689             : {
     690           0 :         if (s == NULL)
     691             :                 return 0;
     692           0 :         return s->swapbytes;
     693             : }
     694             : 
     695             : 
     696             : /* set stream to big-endian/little-endian byte order; the default is
     697             :  * native byte order */
     698             : void
     699       39335 : mnstr_set_bigendian(stream *s, bool bigendian)
     700             : {
     701       39335 :         if (s == NULL)
     702             :                 return;
     703             : #ifdef STREAM_DEBUG
     704             :         fprintf(stderr, "mnstr_set_bigendian %s %s\n",
     705             :                 s->name ? s->name : "<unnamed>",
     706             :                 swapbytes ? "true" : "false");
     707             : #endif
     708       39335 :         assert(s->readonly);
     709       39335 :         s->binary = true;
     710             : #ifdef WORDS_BIGENDIAN
     711             :         s->swapbytes = !bigendian;
     712             : #else
     713       39335 :         s->swapbytes = bigendian;
     714             : #endif
     715             : }
     716             : 
     717             : 
     718             : void
     719       66733 : close_stream(stream *s)
     720             : {
     721       66733 :         if (s) {
     722       66730 :                 if (s->close)
     723       66730 :                         s->close(s);
     724       66731 :                 if (s->destroy)
     725       66731 :                         s->destroy(s);
     726             :         }
     727       66734 : }
     728             : 
     729             : 
     730             : void
     731      388481 : destroy_stream(stream *s)
     732             : {
     733      388481 :         if (s->name)
     734      388481 :                 free(s->name);
     735      388481 :         free(s);
     736      388481 : }
     737             : 
     738             : 
     739             : stream *
     740      388568 : create_stream(const char *name)
     741             : {
     742      388568 :         stream *s;
     743             : 
     744      388568 :         if (name == NULL) {
     745           0 :                 mnstr_set_open_error(NULL, 0, "internal error: name not set");
     746           0 :                 return NULL;
     747             :         }
     748      388568 :         if ((s = (stream *) malloc(sizeof(*s))) == NULL) {
     749           0 :                 mnstr_set_open_error(name, errno, "malloc failed");
     750           0 :                 return NULL;
     751             :         }
     752      388568 :         *s = (stream) {
     753             :                 .swapbytes = false,
     754             :                 .readonly = true,
     755             :                 .isutf8 = false,        /* not known for sure */
     756             :                 .binary = false,
     757             :                 .eof = false,
     758      388568 :                 .name = strdup(name),
     759             :                 .errkind = MNSTR_NO__ERROR,
     760             :                 .errmsg = {0},
     761             :                 .destroy = destroy_stream,
     762             :                 .clrerr = clearerror,
     763             :         };
     764      388568 :         if(s->name == NULL) {
     765           0 :                 free(s);
     766           0 :                 mnstr_set_open_error(name, errno, "malloc failed");
     767           0 :                 return NULL;
     768             :         }
     769             : #ifdef STREAM_DEBUG
     770             :         fprintf(stderr, "create_stream %s -> %p\n",
     771             :                 name ? name : "<unnamed>", s);
     772             : #endif
     773      388568 :         mnstr_set_open_error(NULL, 0, NULL); // clear the error
     774      388568 :         return s;
     775             : }
     776             : 
     777             : 
     778             : static ssize_t
     779           0 : wrapper_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
     780             : {
     781           0 :         ssize_t ret = s->inner->read(s->inner, buf, elmsize, cnt);
     782           0 :         s->eof |= s->inner->eof;
     783           0 :         return ret;
     784             : }
     785             : 
     786             : 
     787             : static ssize_t
     788           0 : wrapper_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
     789             : {
     790           0 :         return s->inner->write(s->inner, buf, elmsize, cnt);
     791             : }
     792             : 
     793             : 
     794             : static void
     795           0 : wrapper_close(stream *s)
     796             : {
     797           0 :         s->inner->close(s->inner);
     798           0 : }
     799             : 
     800             : 
     801             : static void
     802           2 : wrapper_clrerr(stream *s)
     803             : {
     804           2 :         s->inner->clrerr(s->inner);
     805           2 : }
     806             : 
     807             : 
     808             : static void
     809           0 : wrapper_destroy(stream *s)
     810             : {
     811           0 :         s->inner->destroy(s->inner);
     812           0 :         destroy_stream(s);
     813           0 : }
     814             : 
     815             : 
     816             : static int
     817           0 : wrapper_flush(stream *s, mnstr_flush_level flush_level)
     818             : {
     819           0 :         return s->inner->flush(s->inner, flush_level);
     820             : }
     821             : 
     822             : 
     823             : static int
     824           4 : wrapper_fsync(stream *s)
     825             : {
     826           4 :         return s->inner->fsync(s->inner);
     827             : }
     828             : 
     829             : 
     830             : static int
     831           0 : wrapper_fgetpos(stream *restrict s, fpos_t *restrict p)
     832             : {
     833           0 :         return s->inner->fgetpos(s->inner, p);
     834             : }
     835             : 
     836             : 
     837             : static int
     838           0 : wrapper_fsetpos(stream *restrict s, fpos_t *restrict p)
     839             : {
     840           0 :         return s->inner->fsetpos(s->inner, p);
     841             : }
     842             : 
     843             : 
     844             : static void
     845       37977 : wrapper_update_timeout(stream *s)
     846             : {
     847       37977 :         s->inner->timeout = s->timeout;
     848       37977 :         s->inner->timeout_func = s->timeout_func;
     849       37977 :         s->inner->timeout_data = s->timeout_data;
     850       37977 :         s->inner->update_timeout(s->inner);
     851       37976 : }
     852             : 
     853             : 
     854             : static int
     855    11328527 : wrapper_isalive(const stream *s)
     856             : {
     857    11328527 :         return s->inner->isalive(s->inner);
     858             : }
     859             : 
     860             : 
     861             : static int
     862     9395029 : wrapper_getoob(const stream *s)
     863             : {
     864     9395029 :         return s->inner->getoob(s->inner);
     865             : }
     866             : 
     867             : 
     868             : static int
     869           0 : wrapper_putoob(const stream *s, char val)
     870             : {
     871           0 :         return s->inner->putoob(s->inner, val);
     872             : }
     873             : 
     874             : 
     875             : stream *
     876       80161 : create_wrapper_stream(const char *name, stream *inner)
     877             : {
     878       80161 :         if (inner == NULL)
     879             :                 return NULL;
     880       80161 :         if (name == NULL)
     881       80157 :                 name = inner->name;
     882       80161 :         stream *s = create_stream(name);
     883       80161 :         if (s == NULL)
     884             :                 return NULL;
     885             : 
     886             : 
     887       80161 :         s->swapbytes = inner->swapbytes;
     888       80161 :         s->readonly = inner->readonly;
     889       80161 :         s->isutf8 = inner->isutf8;
     890       80161 :         s->binary = inner->binary;
     891       80161 :         s->timeout = inner->timeout;
     892       80161 :         s->inner = inner;
     893             : 
     894       80161 :         s->read = inner->read == NULL ? NULL : wrapper_read;
     895       80161 :         s->write = inner->write == NULL ? NULL : wrapper_write;
     896       80161 :         s->close = inner->close == NULL ? NULL : wrapper_close;
     897       80161 :         s->clrerr = inner->clrerr == NULL ? NULL : wrapper_clrerr;
     898       80161 :         s->destroy = wrapper_destroy;
     899       80161 :         s->flush = inner->flush == NULL ? NULL : wrapper_flush;
     900       80161 :         s->fsync = inner->fsync == NULL ? NULL : wrapper_fsync;
     901       80161 :         s->fgetpos = inner->fgetpos == NULL ? NULL : wrapper_fgetpos;
     902       80161 :         s->fsetpos = inner->fsetpos == NULL ? NULL : wrapper_fsetpos;
     903       80161 :         s->isalive = inner->isalive == NULL ? NULL : wrapper_isalive;
     904       80161 :         s->getoob = inner->getoob == NULL ? NULL : wrapper_getoob;
     905       80161 :         s->putoob = inner->putoob == NULL ? NULL : wrapper_putoob;
     906       80161 :         s->update_timeout = inner->update_timeout == NULL ? NULL : wrapper_update_timeout;
     907             : 
     908       80161 :         return s;
     909             : }
     910             : 
     911             : /* ------------------------------------------------------------------ */
     912             : /* streams working on a disk file, compressed or not */
     913             : 
     914             : stream *
     915       13022 : open_rstream(const char *filename)
     916             : {
     917       13022 :         if (filename == NULL)
     918             :                 return NULL;
     919             : #ifdef STREAM_DEBUG
     920             :         fprintf(stderr, "open_rstream %s\n", filename);
     921             : #endif
     922             : 
     923       13022 :         stream *s = open_stream(filename, "rb");
     924       13022 :         if (s == NULL)
     925             :                 return NULL;
     926             : 
     927       12870 :         stream *c = compressed_stream(s, 0);
     928       12870 :         if (c == NULL)
     929           0 :                 close_stream(s);
     930             : 
     931             :         return c;
     932             : }
     933             : 
     934             : stream *
     935       11781 : open_wstream(const char *filename)
     936             : {
     937       11781 :         if (filename == NULL)
     938             :                 return NULL;
     939             : #ifdef STREAM_DEBUG
     940             :         fprintf(stderr, "open_wstream %s\n", filename);
     941             : #endif
     942             : 
     943       11781 :         stream *s = open_stream(filename, "wb");
     944       11781 :         if (s == NULL)
     945             :                 return NULL;
     946             : 
     947       11781 :         stream *c = compressed_stream(s, 0);
     948       11781 :         if (c == NULL) {
     949           0 :                 close_stream(s);
     950           0 :                 (void) file_remove(filename);
     951             :         }
     952             : 
     953             :         return c;
     954             : }
     955             : 
     956             : stream *
     957         348 : open_rastream(const char *filename)
     958             : {
     959         348 :         if (filename == NULL)
     960             :                 return NULL;
     961             : #ifdef STREAM_DEBUG
     962             :         fprintf(stderr, "open_rastream %s\n", filename);
     963             : #endif
     964         348 :         stream *s = open_rstream(filename);
     965         348 :         if (s == NULL)
     966             :                 return NULL;
     967             : 
     968         315 :         stream *t = create_text_stream(s);
     969         315 :         if (t == NULL)
     970           0 :                 close_stream(s);
     971             : 
     972             :         return t;
     973             : }
     974             : 
     975             : stream *
     976          24 : open_wastream(const char *filename)
     977             : {
     978          24 :         if (filename == NULL)
     979             :                 return NULL;
     980             : #ifdef STREAM_DEBUG
     981             :         fprintf(stderr, "open_wastream %s\n", filename);
     982             : #endif
     983          24 :         stream *s = open_wstream(filename);
     984          24 :         if (s == NULL)
     985             :                 return NULL;
     986             : 
     987          24 :         stream *t = create_text_stream(s);
     988          24 :         if (t == NULL) {
     989           0 :                 close_stream(s);
     990           0 :                 (void) file_remove(filename);
     991             :         }
     992             : 
     993             :         return t;
     994             : }
     995             : 
     996             : 
     997             : /* put here because it depends on both bs_read AND bs2_read */
     998             : bool
     999      462322 : isa_block_stream(const stream *s)
    1000             : {
    1001      462322 :         assert(s != NULL);
    1002      924644 :         return s &&
    1003      462322 :                 ((s->read == bs_read ||
    1004        3313 :                   s->write == bs_write));
    1005             : }
    1006             : 
    1007             : 
    1008             : /* Put here because I need to think very carefully about this
    1009             :  * mnstr_read(,, 0, 0). What would that mean?
    1010             :  */
    1011             : ssize_t
    1012       39523 : mnstr_read_block(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
    1013             : {
    1014       39523 :         ssize_t len = 0;
    1015       39523 :         char x = 0;
    1016             : 
    1017       39523 :         if (s == NULL || buf == NULL)
    1018             :                 return -1;
    1019       39523 :         assert(s->read == bs_read || s->write == bs_write);
    1020       79039 :         if ((len = mnstr_read(s, buf, elmsize, cnt)) < 0 ||
    1021       39522 :             mnstr_read(s, &x, 0, 0) < 0 /* read prompt */  ||
    1022       39516 :             x > 0)
    1023           1 :                 return -1;
    1024             :         return len;
    1025             : }

Generated by: LCOV version 1.14