LCOV - code coverage report
Current view: top level - common/stream - pump.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 134 145 92.4 %
Date: 2024-10-03 20:03:20 Functions: 8 8 100.0 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /* streams working on a lzma/xz-compressed disk file */
      14             : 
      15             : #include "monetdb_config.h"
      16             : #include "stream.h"
      17             : #include "stream_internal.h"
      18             : #include "pump.h"
      19             : 
      20             : #include <assert.h>
      21             : 
      22             : static pump_result pump_in(stream *s);
      23             : static pump_result pump_out(stream *s, pump_action action);
      24             : 
      25             : static ssize_t pump_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt);
      26             : static ssize_t pump_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt);
      27             : static int pump_flush(stream *s, mnstr_flush_level flush_level);
      28             : static void pump_close(stream *s);
      29             : static void pump_destroy(stream *s);
      30             : 
      31             : 
      32             : stream *
      33         509 : pump_stream(stream *inner, pump_state *state)
      34             : {
      35         509 :         assert(inner);
      36         509 :         assert(state);
      37         509 :         assert(state->set_src_win != NULL);
      38         509 :         assert(state->get_dst_win != NULL);
      39         509 :         assert(state->set_dst_win != NULL);
      40         509 :         assert(state->get_buffer != NULL);
      41         509 :         assert(state->worker != NULL);
      42         509 :         assert(state->get_error != NULL);
      43         509 :         assert(state->finalizer != NULL);
      44             : 
      45         509 :         inner_state_t *inner_state = state->inner_state;
      46             : 
      47         509 :         stream *s = create_wrapper_stream(NULL, inner);
      48         509 :         if (s == NULL)
      49             :                 return NULL;
      50             : 
      51         509 :         pump_buffer buf = state->get_buffer(inner_state);
      52         509 :         if (s->readonly) {
      53             :                 // Read from inner stream to src buffer through pumper to outbufs.
      54             :                 // This means the src window starts empty
      55         477 :                 buf.count = 0;
      56         477 :                 state->set_src_win(inner_state, buf);
      57             :         } else {
      58             :                 // from inbufs through pumper to dst buffer to inner stream.
      59             :                 // This means the out window is our whole buffer.
      60             :                 // Check for NULL in case caller has already initialized it
      61             :                 // and written something
      62          32 :                 if (state->get_dst_win(inner_state).start == NULL)
      63           2 :                         state->set_dst_win(inner_state, buf);
      64             :         }
      65             : 
      66         509 :         s->stream_data.p = (void*) state;
      67         509 :         s->read = pump_read;
      68         509 :         s->write = pump_write;
      69         509 :         s->flush = pump_flush;
      70         509 :         s->close = pump_close;
      71         509 :         s->destroy = pump_destroy;
      72         509 :         return s;
      73             : }
      74             : 
      75             : 
      76             : static ssize_t
      77       47207 : pump_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
      78             : {
      79       47207 :         pump_state *state = (pump_state*) s->stream_data.p;
      80       47207 :         inner_state_t *inner_state = state->inner_state;
      81       47207 :         size_t size = elmsize * cnt;
      82             : 
      83       47207 :         state->set_dst_win(inner_state, (pump_buffer){ .start = buf, .count = size});
      84       47207 :         pump_result ret = pump_in(s);
      85       47207 :         if (ret == PUMP_ERROR) {
      86           0 :                 const char *msg = state->get_error(inner_state);
      87           0 :                 if (msg != NULL)
      88           0 :                         msg = "processing failed without further error indication";
      89           0 :                 mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
      90           0 :                 return -1;
      91             :         }
      92             : 
      93       47207 :         char *free_space = state->get_dst_win(inner_state).start;
      94       47207 :         ssize_t nread = free_space - (char*) buf;
      95             : 
      96       47207 :         return nread / (ssize_t) elmsize;
      97             : }
      98             : 
      99             : 
     100             : static ssize_t
     101     1218328 : pump_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
     102             : {
     103     1218328 :         pump_state *state = (pump_state*) s->stream_data.p;
     104     1218328 :         inner_state_t *inner_state = state->inner_state;
     105     1218328 :         size_t size = elmsize * cnt;
     106             : 
     107     1218328 :         if (size == 0)
     108          70 :                 return cnt;
     109             : 
     110     1218258 :         state->set_src_win(inner_state, (pump_buffer){ .start = (void*)buf, .count = size });
     111     1218258 :         pump_result ret = pump_out(s, PUMP_NO_FLUSH);
     112     1218258 :         if (ret == PUMP_ERROR) {
     113           0 :                 const char *msg = state->get_error(inner_state);
     114           0 :                 if (msg != NULL)
     115           0 :                         msg = "processing failed without further error indication";
     116           0 :                 mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
     117           0 :                 return -1;
     118             :         }
     119     1218258 :         ssize_t nwritten = state->get_src_win(inner_state).start - (char*)buf;
     120     1218258 :         return nwritten / (ssize_t) elmsize;
     121             : }
     122             : 
     123             : 
     124         841 : static int pump_flush(stream *s, mnstr_flush_level flush_level)
     125             : {
     126         841 :         pump_state *state = (pump_state*) s->stream_data.p;
     127         841 :         inner_state_t *inner_state = state->inner_state;
     128         841 :         pump_action action;
     129             : 
     130         841 :         switch (flush_level) {
     131             :                 case MNSTR_FLUSH_DATA:
     132             :                         action = PUMP_FLUSH_DATA;
     133             :                         break;
     134         833 :                 case MNSTR_FLUSH_ALL:
     135         833 :                         action = PUMP_FLUSH_ALL;
     136         833 :                         break;
     137             :                 default:
     138           0 :                         assert(0 /* unknown flush_level */);
     139             :                         action = PUMP_FLUSH_DATA;
     140             :                         break;
     141             :         }
     142             : 
     143         841 :         state->set_src_win(inner_state, (pump_buffer){ .start = NULL, .count = 0 });
     144         841 :         ssize_t nwritten = pump_out(s, action);
     145         841 :         if (nwritten < 0)
     146             :                 return -1;
     147             :         else
     148         841 :                 return mnstr_flush(s->inner, flush_level);
     149             : }
     150             : 
     151             : 
     152             : static void
     153         509 : pump_close(stream *s)
     154             : {
     155         509 :         pump_state *state = (pump_state*) s->stream_data.p;
     156         509 :         inner_state_t *inner_state = state->inner_state;
     157             : 
     158         509 :         if (!s->readonly) {
     159          32 :                 state->set_src_win(inner_state, (pump_buffer){ .start = NULL, .count = 0 });
     160          32 :                 pump_out(s, PUMP_FINISH);
     161             :         }
     162         509 :         mnstr_close(s->inner);
     163         509 : }
     164             : 
     165             : 
     166             : static void
     167         509 : pump_destroy(stream *s)
     168             : {
     169         509 :         pump_state *state = (pump_state*) s->stream_data.p;
     170         509 :         inner_state_t *inner_state = state->inner_state;
     171             : 
     172         509 :         state->finalizer(inner_state);
     173         509 :         free(state);
     174         509 :         mnstr_destroy(s->inner);
     175         509 :         destroy_stream(s);
     176         509 : }
     177             : 
     178             : static pump_result
     179       47207 : pump_in(stream *s)
     180             : {
     181       47207 :         pump_state *state = (pump_state *) s->stream_data.p;
     182       47207 :         inner_state_t *inner_state = state->inner_state;
     183             : 
     184       47207 :         char *before = state->get_dst_win(inner_state).start;
     185       47207 :         (void) before; // nice while in the debugger
     186             : 
     187       47207 :         pump_buffer buffer = state->get_buffer(inner_state);
     188       82424 :         while (1) {
     189      129631 :                 pump_buffer dst = state->get_dst_win(inner_state);
     190      129631 :                 pump_buffer src = state->get_src_win(inner_state);
     191             : 
     192      129631 :                 if (dst.count == 0)
     193             :                         // Output buffer is full, we're done.
     194       47207 :                         return PUMP_OK;
     195             : 
     196             :                 // Handle input, if possible and necessary
     197       83172 :                 if (src.start != NULL && src.count == 0) {
     198             :                         // start != NULL means we haven't encountered EOF yet
     199             : 
     200       36141 :                         ssize_t nread = mnstr_read(s->inner, buffer.start, 1, buffer.count);
     201             : 
     202       36141 :                         if (nread < 0)
     203             :                                 // Error. Return directly, discarding any data lingering
     204             :                                 // in the internal state.
     205             :                                 return PUMP_ERROR;
     206       36141 :                         if (nread == 0) {
     207             :                                 // Set to NULL so we'll remember next time.
     208             :                                 // Maybe there is some data in the internal state we don't
     209             :                                 // return immediately.
     210         469 :                                 src = (pump_buffer){.start=NULL, .count=0};
     211         469 :                                 s->eof |= s->inner->eof;
     212             :                         } else
     213             :                                 // All good
     214       35672 :                                 src = (pump_buffer) { .start = buffer.start, .count = nread};
     215             : 
     216       36141 :                         state->set_src_win(inner_state, src);
     217             :                 }
     218             : 
     219       83172 :                 pump_action action = (src.start != NULL) ? PUMP_NO_FLUSH : PUMP_FINISH;
     220             : 
     221             :                 // Try to make some progress
     222       83172 :                 assert(dst.count > 0);
     223       83172 :                 assert(src.count > 0 || action == PUMP_FINISH);
     224       83172 :                 pump_result ret = state->worker(inner_state, action);
     225       83172 :                 if (ret == PUMP_ERROR)
     226             :                         return PUMP_ERROR;
     227             : 
     228       83172 :                 if (ret == PUMP_END)
     229             :                         // If you say so
     230             :                         return PUMP_END;
     231             : 
     232             :                 // If we get here we made some progress so we're ready for a new iteration.
     233             :         }
     234             : }
     235             : 
     236             : 
     237             : static pump_result
     238     1219131 : pump_out(stream *s, pump_action action)
     239             : {
     240     1219131 :         pump_state *state = (pump_state *) s->stream_data.p;
     241     1219131 :         inner_state_t *inner_state = state->inner_state;
     242             : 
     243     1219131 :         void *before = state->get_src_win(inner_state).start;
     244     1219131 :         (void) before; // nice while in the debugger
     245             : 
     246     1219131 :         pump_buffer buffer = state->get_buffer(inner_state);
     247             : 
     248     1227045 :         while (1) {
     249     1223088 :                 pump_buffer dst = state->get_dst_win(inner_state);
     250     1223088 :                 pump_buffer src = state->get_src_win(inner_state);
     251             : 
     252             :                 // Make sure there is room in the output buffer
     253     1223088 :                 assert(state->elbow_room <= buffer.count);
     254     1223088 :                 if (dst.count == 0 || dst.count < state->elbow_room) {
     255         319 :                         size_t amount = dst.start - buffer.start;
     256         319 :                         ssize_t nwritten = mnstr_write(s->inner, buffer.start, 1, amount);
     257         319 :                         if (nwritten != (ssize_t)amount)
     258     1219131 :                                 return PUMP_ERROR;
     259         319 :                         dst = buffer;
     260         319 :                         state->set_dst_win(inner_state, dst); // reset output window
     261             :                 }
     262             : 
     263             :                 // Try to make progress
     264     1223088 :                 pump_result ret = state->worker(inner_state, action);
     265     1223088 :                 if (ret == PUMP_ERROR)
     266             :                         return PUMP_ERROR;
     267             : 
     268             :                 // src and dst have been invalidated by the call to worker
     269     1222877 :                 dst = state->get_dst_win(inner_state);
     270     1222877 :                 src = state->get_src_win(inner_state);
     271             : 
     272             :                 // There was no error but if input is still available, we definitely
     273             :                 // need another round
     274     1222877 :                 if (src.count > 0)
     275        3745 :                         continue;
     276             : 
     277             :                 // Though the input data has been consumed, some of it might still
     278             :                 // linger in the internal state.
     279     1219132 :                 if (action == PUMP_NO_FLUSH) {
     280             :                         // Let it linger, we'll combine it with the next batch
     281     1218258 :                         assert(ret == PUMP_OK); // worker would never PUMP_END, would it?
     282             :                         return PUMP_OK;
     283             :                 }
     284             : 
     285             :                 // We are flushing or finishing or whatever.
     286             :                 // We may need to do more iterations to fully flush the internal state.
     287             :                 // Is there any internal state left?
     288         874 :                 if (ret == PUMP_OK)
     289             :                         // yes, there is
     290         212 :                         continue;
     291             : 
     292             :                 // All internal state has been drained.
     293             :                 // Now drain the output buffer
     294         662 :                 assert(ret == PUMP_END);
     295         662 :                 size_t amount = dst.start - buffer.start;
     296         662 :                 if (amount > 0) {
     297         657 :                         ssize_t nwritten = mnstr_write(s->inner, buffer.start, 1, amount);
     298         657 :                         if (nwritten != (ssize_t)amount)
     299             :                                 return PUMP_ERROR;
     300             :                 }
     301         662 :                 state->set_dst_win(inner_state, buffer); // reset output window
     302         662 :                 return PUMP_END;
     303             :         }
     304             : 
     305             : 
     306             : }

Generated by: LCOV version 1.14