LCOV - code coverage report
Current view: top level - common/stream - bs.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 130 156 83.3 %
Date: 2024-04-26 00:35:57 Functions: 7 9 77.8 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "stream.h"
      15             : #include "stream_internal.h"
      16             : 
      17             : /* ------------------------------------------------------------------ */
      18             : 
      19             : /* A buffered stream consists of a sequence of blocks.  Each block
      20             :  * consists of a count followed by the data in the block.  A flush is
      21             :  * indicated by an empty block (i.e. just a count of 0).
      22             :  */
      23             : 
      24             : static bs *
      25       79068 : bs_create(void)
      26             : {
      27             :         /* should be a binary stream */
      28       79068 :         bs *ns;
      29             : 
      30       79068 :         if ((ns = malloc(sizeof(*ns))) == NULL)
      31             :                 return NULL;
      32       79068 :         *ns = (bs) {0};
      33       79068 :         return ns;
      34             : }
      35             : 
      36             : /* Collect data until the internal buffer is filled, then write the
      37             :  * filled buffer to the underlying stream.
      38             :  * Struct field usage:
      39             :  * s - the underlying stream;
      40             :  * buf - the buffer in which data is collected;
      41             :  * nr - how much of buf is already filled (if nr == sizeof(buf) the
      42             :  *      data is written to the underlying stream, so upon entry nr <
      43             :  *      sizeof(buf));
      44             :  * itotal - unused.
      45             :  */
      46             : ssize_t
      47    30111720 : bs_write(stream *restrict ss, const void *restrict buf, size_t elmsize, size_t cnt)
      48             : {
      49    30111720 :         bs *s;
      50    30111720 :         size_t todo = cnt * elmsize;
      51    30111720 :         uint16_t blksize;
      52             : 
      53    30111720 :         s = (bs *) ss->stream_data.p;
      54    30111720 :         if (s == NULL)
      55             :                 return -1;
      56    30111720 :         assert(!ss->readonly);
      57    30111720 :         assert(s->nr < sizeof(s->buf));
      58    60625575 :         while (todo > 0) {
      59    30513748 :                 size_t n = sizeof(s->buf) - s->nr;
      60             : 
      61    30513748 :                 if (todo < n)
      62             :                         n = todo;
      63    30513748 :                 memcpy(s->buf + s->nr, buf, n);
      64    30513748 :                 s->nr += (unsigned) n;
      65    30513748 :                 todo -= n;
      66    30513748 :                 buf = ((const char *) buf + n);
      67    30513748 :                 if (s->nr == sizeof(s->buf)) {
      68             :                         /* block is full, write it to the stream */
      69             : #ifdef BSTREAM_DEBUG
      70             :                         {
      71             :                                 unsigned i;
      72             : 
      73             :                                 fprintf(stderr, "W %s %u \"", ss->name, s->nr);
      74             :                                 for (i = 0; i < s->nr; i++)
      75             :                                         if (' ' <= s->buf[i] && s->buf[i] < 127)
      76             :                                                 putc(s->buf[i], stderr);
      77             :                                         else
      78             :                                                 fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
      79             :                                 fprintf(stderr, "\"\n");
      80             :                         }
      81             : #endif
      82             :                         /* since the block is at max BLOCK (8K) - 2 size we can
      83             :                          * store it in a two byte integer */
      84      415726 :                         blksize = (uint16_t) s->nr;
      85      415726 :                         s->bytes += s->nr;
      86             :                         /* the last bit tells whether a flush is in
      87             :                          * there, it's not at this moment, so shift it
      88             :                          * to the left */
      89      415726 :                         blksize <<= 1;
      90      415726 :                         if (!mnstr_writeSht(ss->inner, (int16_t) blksize) ||
      91      415833 :                             ss->inner->write(ss->inner, s->buf, 1, s->nr) != (ssize_t) s->nr) {
      92           0 :                                 mnstr_copy_error(ss, ss->inner);
      93           0 :                                 s->nr = 0; /* data is lost due to error */
      94           0 :                                 return -1;
      95             :                         }
      96      415833 :                         s->blks++;
      97      415833 :                         s->nr = 0;
      98             :                 }
      99             :         }
     100    30111827 :         return (ssize_t) cnt;
     101             : }
     102             : 
     103             : /* If the internal buffer is partially filled, write it to the
     104             :  * underlying stream.  Then in any case write an empty buffer to the
     105             :  * underlying stream to indicate to the receiver that the data was
     106             :  * flushed.
     107             :  */
     108             : static int
     109      559015 : bs_flush(stream *ss, mnstr_flush_level flush_level)
     110             : {
     111      559015 :         uint16_t blksize;
     112      559015 :         bs *s;
     113             : 
     114      559015 :         s = (bs *) ss->stream_data.p;
     115      559015 :         if (s == NULL)
     116             :                 return -1;
     117      559015 :         assert(!ss->readonly);
     118      559015 :         assert(s->nr < sizeof(s->buf));
     119      559015 :         if (!ss->readonly) {
     120             :                 /* flush the rest of buffer (if s->nr > 0), then set the
     121             :                  * last bit to 1 to to indicate user-instigated flush */
     122             : #ifdef BSTREAM_DEBUG
     123             :                 if (s->nr > 0) {
     124             :                         unsigned i;
     125             : 
     126             :                         fprintf(stderr, "W %s %u \"", ss->name, s->nr);
     127             :                         for (i = 0; i < s->nr; i++)
     128             :                                 if (' ' <= s->buf[i] && s->buf[i] < 127)
     129             :                                         putc(s->buf[i], stderr);
     130             :                                 else
     131             :                                         fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
     132             :                         fprintf(stderr, "\"\n");
     133             :                         fprintf(stderr, "W %s 0\n", ss->name);
     134             :                 }
     135             : #endif
     136      559015 :                 blksize = (uint16_t) (s->nr << 1);
     137      559015 :                 s->bytes += s->nr;
     138             :                 /* indicate that this is the last buffer of a block by
     139             :                  * setting the low-order bit */
     140      559015 :                 blksize |= 1;
     141             :                 /* always flush (even empty blocks) needed for the protocol) */
     142      559015 :                 if ((!mnstr_writeSht(ss->inner, (int16_t) blksize) ||
     143      558900 :                      (s->nr > 0 &&
     144      504985 :                       ss->inner->write(ss->inner, s->buf, 1, s->nr) != (ssize_t) s->nr))) {
     145         176 :                         mnstr_copy_error(ss, ss->inner);
     146         176 :                         s->nr = 0; /* data is lost due to error */
     147         176 :                         return -1;
     148             :                 }
     149             :                 // shouldn't we flush ss->inner too?
     150      558770 :                 (void) flush_level;
     151      558770 :                 s->blks++;
     152      558770 :                 s->nr = 0;
     153             :         }
     154      558770 :         return 0;
     155             : }
     156             : 
     157             : /* Read buffered data and return the number of items read.  At the
     158             :  * flush boundary we will return 0 to indicate the end of a block,
     159             :  * unless prompt and pstream are set. In that case, only return 0
     160             :  * after the prompt has been written to pstream and another read
     161             :  * attempt immediately returns a block boundary.
     162             :  *
     163             :  * Structure field usage:
     164             :  * s - the underlying stream;
     165             :  * buf - not used;
     166             :  * itotal - the amount of data in the current block that hasn't yet
     167             :  *          been read;
     168             :  * nr - indicates whether the flush marker has to be returned.
     169             :  */
     170             : ssize_t
     171     6509956 : bs_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
     172             : {
     173     6509956 :         bs *s;
     174     6509956 :         size_t todo = cnt * elmsize;
     175     6509956 :         size_t n;
     176             : 
     177     6509956 :         s = (bs *) ss->stream_data.p;
     178     6509956 :         if (s == NULL)
     179             :                 return -1;
     180     6509956 :         assert(ss->readonly);
     181     6509956 :         assert(s->nr <= 1);
     182             : 
     183     6509956 :         if (s->itotal == 0) {
     184     1075847 :                 int16_t blksize = 0;
     185             : 
     186     1075847 :                 if (s->nr) {
     187             :                         /* We read the closing block but hadn't
     188             :                          * returned that yet. Return it now, and note
     189             :                          * that we did by setting s->nr to 0. */
     190      515659 :                         assert(s->nr == 1);
     191      515659 :                         s->nr = 0;
     192     1069634 :                         return 0;
     193             :                 }
     194             : 
     195      560188 :                 assert(s->nr == 0);
     196             : 
     197             :                 /* There is nothing more to read in the current block,
     198             :                  * so read the count for the next block */
     199      560188 :                 switch (mnstr_readSht(ss->inner, &blksize)) {
     200          56 :                 case -1:
     201          56 :                         mnstr_copy_error(ss, ss->inner);
     202          56 :                         return -1;
     203       38260 :                 case 0:
     204       38260 :                         ss->eof |= ss->inner->eof;
     205       38260 :                         return 0;
     206             :                 case 1:
     207             :                         break;
     208             :                 }
     209      522073 :                 if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
     210           0 :                         mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %d", blksize);
     211           0 :                         return -1;
     212             :                 }
     213             : #ifdef BSTREAM_DEBUG
     214             :                 fprintf(stderr, "RC size: %u, final: %s\n", (uint16_t) blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
     215             :                 fprintf(stderr, "RC %s %u\n", ss->name, (uint16_t) blksize);
     216             : #endif
     217      522073 :                 s->itotal = (uint16_t) blksize >> 1;   /* amount readable */
     218             :                 /* store whether this was the last block or not */
     219      522073 :                 s->nr = (uint16_t) blksize & 1;
     220      522073 :                 s->bytes += s->itotal;
     221      522073 :                 s->blks++;
     222             :         }
     223             : 
     224             :         /* Fill the caller's buffer. */
     225             :         cnt = 0;                /* count how much we put into the buffer */
     226    11614488 :         while (todo > 0) {
     227             :                 /* there is more data waiting in the current block, so
     228             :                  * read it */
     229     6180476 :                 n = todo < s->itotal ? todo : s->itotal;
     230    12343076 :                 while (n > 0) {
     231     6162710 :                         ssize_t m = ss->inner->read(ss->inner, buf, 1, n);
     232             : 
     233     6162600 :                         if (m <= 0) {
     234           0 :                                 ss->eof |= ss->inner->eof;
     235           0 :                                 mnstr_copy_error(ss, ss->inner);
     236           0 :                                 return -1;
     237             :                         }
     238             : #ifdef BSTREAM_DEBUG
     239             :                         {
     240             :                                 ssize_t i;
     241             : 
     242             :                                 fprintf(stderr, "RD %s %zd \"", ss->name, m);
     243             :                                 for (i = 0; i < m; i++)
     244             :                                         if (' ' <= ((char *) buf)[i] &&
     245             :                                             ((char *) buf)[i] < 127)
     246             :                                                 putc(((char *) buf)[i], stderr);
     247             :                                         else
     248             :                                                 fprintf(stderr, "\\%03o", ((unsigned char *) buf)[i]);
     249             :                                 fprintf(stderr, "\"\n");
     250             :                         }
     251             : #endif
     252     6162600 :                         buf = (void *) ((char *) buf + m);
     253     6162600 :                         cnt += (size_t) m;
     254     6162600 :                         n -= (size_t) m;
     255     6162600 :                         s->itotal -= (unsigned) m;
     256     6162600 :                         todo -= (size_t) m;
     257             :                 }
     258             : 
     259     6180366 :                 if (s->itotal == 0) {
     260      957929 :                         int16_t blksize = 0;
     261             : 
     262             :                         /* The current block has been completely read,
     263             :                          * so read the count for the next block, only
     264             :                          * if the previous was not the last one */
     265      957929 :                         if (s->nr)
     266             :                                 break;
     267      435869 :                         switch (mnstr_readSht(ss->inner, &blksize)) {
     268           0 :                         case -1:
     269           0 :                                 mnstr_copy_error(ss, ss->inner);
     270           0 :                                 return -1;
     271           0 :                         case 0:
     272           0 :                                 ss->eof |= ss->inner->eof;
     273           0 :                                 return 0;
     274             :                         case 1:
     275             :                                 break;
     276             :                         }
     277      435869 :                         if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
     278           0 :                                 mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %d", blksize);
     279           0 :                                 return -1;
     280             :                         }
     281             : #ifdef BSTREAM_DEBUG
     282             :                         fprintf(stderr, "RC size: %d, final: %s\n", (uint16_t) blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
     283             :                         fprintf(stderr, "RC %s %d\n", ss->name, s->nr);
     284             :                         fprintf(stderr, "RC %s %d\n", ss->name, blksize);
     285             : #endif
     286      435869 :                         s->itotal = (uint16_t) blksize >> 1;   /* amount readable */
     287             :                         /* store whether this was the last block or not */
     288      435869 :                         s->nr = (uint16_t) blksize & 1;
     289      435869 :                         s->bytes += s->itotal;
     290      435869 :                         s->blks++;
     291             :                 }
     292             :         }
     293             :         /* if we got an empty block with the end-of-sequence marker
     294             :          * set (low-order bit) we must only return an empty read once,
     295             :          * so we must squash the flag that we still have to return an
     296             :          * empty read */
     297     5956072 :         if (todo > 0 && cnt == 0)
     298        6320 :                 s->nr = 0;
     299     5956072 :         return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0);
     300             : }
     301             : 
     302             : 
     303             : 
     304             : static void
     305       79053 : bs_close(stream *ss)
     306             : {
     307       79053 :         bs *s;
     308             : 
     309       79053 :         s = (bs *) ss->stream_data.p;
     310       79053 :         assert(s);
     311       79053 :         if (s == NULL)
     312             :                 return;
     313       79053 :         if (!ss->readonly && s->nr > 0)
     314          11 :                 bs_flush(ss, MNSTR_FLUSH_DATA);
     315       79053 :         mnstr_close(ss->inner);
     316             : }
     317             : 
     318             : void
     319       79053 : bs_destroy(stream *ss)
     320             : {
     321       79053 :         bs *s;
     322             : 
     323       79053 :         s = (bs *) ss->stream_data.p;
     324       79053 :         assert(s);
     325       79053 :         if (s) {
     326       79053 :                 if (ss->inner)
     327       79053 :                         ss->inner->destroy(ss->inner);
     328       79053 :                 free(s);
     329             :         }
     330       79053 :         destroy_stream(ss);
     331       79054 : }
     332             : 
     333             : void
     334           0 : bs_clrerr(stream *s)
     335             : {
     336           0 :         if (s->stream_data.p)
     337           0 :                 mnstr_clearerr(s->inner);
     338           0 : }
     339             : 
     340             : stream *
     341           0 : bs_stream(stream *s)
     342             : {
     343           0 :         assert(isa_block_stream(s));
     344           0 :         return s->inner;
     345             : }
     346             : 
     347             : stream *
     348       79067 : block_stream(stream *s)
     349             : {
     350       79067 :         stream *ns;
     351       79067 :         bs *b;
     352             : 
     353       79067 :         if (s == NULL)
     354             :                 return NULL;
     355             : #ifdef STREAM_DEBUG
     356             :         fprintf(stderr, "block_stream %s\n", s->name ? s->name : "<unnamed>");
     357             : #endif
     358       79067 :         if ((ns = create_wrapper_stream(NULL, s)) == NULL)
     359             :                 return NULL;
     360       79067 :         if ((b = bs_create()) == NULL) {
     361           0 :                 destroy_stream(ns);
     362           0 :                 mnstr_set_open_error(s->name, 0, "bs_create failed");
     363           0 :                 return NULL;
     364             :         }
     365             :         /* blocksizes have a fixed little endian byteorder */
     366             : #ifdef WORDS_BIGENDIAN
     367             :         s->swapbytes = true;
     368             : #endif
     369             : 
     370       79067 :         ns->flush = bs_flush;
     371       79067 :         ns->read = bs_read;
     372       79067 :         ns->write = bs_write;
     373       79067 :         ns->close = bs_close;
     374       79067 :         ns->destroy = bs_destroy;
     375       79067 :         ns->stream_data.p = (void *) b;
     376             : 
     377       79067 :         return ns;
     378             : }

Generated by: LCOV version 1.14