LCOV - code coverage report
Current view: top level - common/stream - lz4_stream.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 125 180 69.4 %
Date: 2024-04-26 00:35:57 Functions: 12 18 66.7 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /* streams working on a lzma/xz-compressed disk file */
      14             : 
      15             : #include "monetdb_config.h"
      16             : #include "stream.h"
      17             : #include "stream_internal.h"
      18             : #include "pump.h"
      19             : 
      20             : 
      21             : #ifdef HAVE_LIBLZ4
      22             : 
      23             : #define READ_CHUNK (1024)
      24             : #define WRITE_CHUNK (1024)
      25             : 
      26             : struct inner_state {
      27             :         pump_buffer src_win;
      28             :         pump_buffer dst_win;
      29             :         pump_buffer buffer;
      30             :         union {
      31             :                 LZ4F_cctx *c;
      32             :                 LZ4F_dctx *d;
      33             :         } ctx;
      34             :         LZ4F_preferences_t compression_prefs;
      35             :         LZ4F_errorCode_t error_code;
      36             :         bool finished;
      37             : };
      38             : 
      39             : static pump_buffer
      40       14195 : get_src_win(inner_state_t *inner_state)
      41             : {
      42       14195 :         return inner_state->src_win;
      43             : }
      44             : 
      45             : static void
      46        1267 : set_src_win(inner_state_t *inner_state, pump_buffer buf)
      47             : {
      48        1267 :         inner_state->src_win = buf;
      49        1267 : }
      50             : 
      51             : static pump_buffer
      52       16967 : get_dst_win(inner_state_t *inner_state)
      53             : {
      54       16967 :         return inner_state->dst_win;
      55             : }
      56             : 
      57             : static void
      58        2277 : set_dst_win(inner_state_t *inner_state, pump_buffer buf)
      59             : {
      60        2277 :         inner_state->dst_win = buf;
      61        2277 : }
      62             : 
      63             : static pump_buffer
      64        2779 : get_buffer(inner_state_t *inner_state)
      65             : {
      66        2779 :         return inner_state->buffer;
      67             : }
      68             : 
      69             : static pump_result
      70        2546 : decomp(inner_state_t *inner_state, pump_action action)
      71             : {
      72        2546 :         LZ4F_errorCode_t ret;
      73             : 
      74        2546 :         if (inner_state->src_win.count == 0 && action == PUMP_FINISH)
      75           4 :                 inner_state->finished = true;
      76        2546 :         if (inner_state->finished)
      77             :                 return PUMP_END;
      78             : 
      79        2542 :         LZ4F_decompressOptions_t opts = {0};
      80        2542 :         size_t nsrc = inner_state->src_win.count; // amount available
      81        2542 :         size_t ndst = inner_state->dst_win.count; // space available
      82        5084 :         ret = LZ4F_decompress(
      83             :                 inner_state->ctx.d,
      84        2542 :                 inner_state->dst_win.start, &ndst,
      85        2542 :                 inner_state->src_win.start, &nsrc,
      86             :                 &opts);
      87             :         // Now nsrc has become the amount consumed, ndst the amount produced!
      88        2542 :         inner_state->src_win.start += nsrc;
      89        2542 :         inner_state->src_win.count -= nsrc;
      90        2542 :         inner_state->dst_win.start += ndst;
      91        2542 :         inner_state->dst_win.count -= ndst;
      92             : 
      93        2542 :         if (LZ4F_isError(ret)) {
      94           0 :                 inner_state->error_code = ret;
      95           0 :                 return PUMP_ERROR;
      96             :         }
      97             :         return PUMP_OK;
      98             : }
      99             : 
     100             : static void
     101           2 : decomp_end(inner_state_t *inner_state)
     102             : {
     103           2 :         LZ4F_freeDecompressionContext(inner_state->ctx.d);
     104           2 :         free(inner_state->buffer.start);
     105           2 :         free(inner_state);
     106           2 : }
     107             : 
     108             : 
     109             : static pump_result
     110        4168 : compr(inner_state_t *inner_state, pump_action action)
     111             : {
     112        4168 :         LZ4F_compressOptions_t opts = {0};
     113        4168 :         size_t consumed;
     114        4168 :         LZ4F_errorCode_t produced;
     115        4168 :         pump_result intended_result;
     116             : 
     117        4168 :         if (inner_state->finished)
     118             :                 return PUMP_END;
     119             : 
     120        4168 :         size_t chunk = inner_state->src_win.count;
     121        4168 :         if (chunk > WRITE_CHUNK)
     122             :                 chunk = WRITE_CHUNK;
     123             : 
     124        4168 :         switch (action) {
     125             : 
     126        3964 :                 case PUMP_NO_FLUSH:
     127        3964 :                         produced = LZ4F_compressUpdate(
     128             :                                 inner_state->ctx.c,
     129        3964 :                                 inner_state->dst_win.start,
     130             :                                 inner_state->dst_win.count,
     131        3964 :                                 inner_state->src_win.start,
     132             :                                 chunk,
     133             :                                 &opts);
     134        3964 :                         consumed = chunk;
     135        3964 :                         intended_result = PUMP_OK;
     136        3964 :                         break;
     137             : 
     138         202 :                 case PUMP_FLUSH_ALL:
     139             :                 case PUMP_FLUSH_DATA:
     140             :                         // FLUSH_ALL not supported yet, just flush the data
     141         202 :                         produced = LZ4F_flush(
     142             :                                 inner_state->ctx.c,
     143         202 :                                 inner_state->dst_win.start,
     144             :                                 inner_state->dst_win.count,
     145             :                                 &opts);
     146         202 :                         consumed = 0;
     147         202 :                         intended_result = PUMP_END;
     148         202 :                         break;
     149             : 
     150           2 :                 case PUMP_FINISH:
     151           4 :                         produced = LZ4F_compressEnd(
     152             :                                 inner_state->ctx.c,
     153           2 :                                 inner_state->dst_win.start,
     154             :                                 inner_state->dst_win.count,
     155             :                                 &opts);
     156           2 :                         consumed = 0;
     157           2 :                         inner_state->finished = true;
     158           2 :                         intended_result = PUMP_END;
     159           2 :                         break;
     160             : 
     161             :                 default:
     162           0 :                         assert(0); // shut up, compiler!
     163             :                         return PUMP_ERROR;
     164             :         }
     165             : 
     166        4168 :         if (LZ4F_isError(produced)) {
     167           0 :                 inner_state->error_code = produced;
     168           0 :                 return PUMP_ERROR;
     169             :         }
     170             : 
     171        4168 :         inner_state->src_win.start += consumed;
     172        4168 :         inner_state->src_win.count -= consumed;
     173        4168 :         inner_state->dst_win.start += produced;
     174        4168 :         inner_state->dst_win.count -= produced;
     175             : 
     176        4168 :         return intended_result;
     177             : }
     178             : 
     179             : static void
     180           2 : compr_end(inner_state_t *inner_state)
     181             : {
     182           2 :         LZ4F_freeCompressionContext(inner_state->ctx.c);
     183           2 :         free(inner_state->buffer.start);
     184           2 :         free(inner_state);
     185           2 : }
     186             : 
     187             : static const char*
     188           0 : get_error(inner_state_t *inner_state)
     189             : {
     190           0 :         return LZ4F_getErrorName(inner_state->error_code);
     191             : }
     192             : 
     193             : static stream *
     194           2 : setup_decompression(stream *inner, pump_state *state)
     195             : {
     196           2 :         inner_state_t *inner_state = state->inner_state;
     197           2 :         void *buf = malloc(READ_CHUNK);
     198           2 :         if (buf == NULL)
     199             :                 return NULL;
     200             : 
     201           2 :         inner_state->buffer = (pump_buffer) { .start = buf, .count = READ_CHUNK };
     202           2 :         inner_state->src_win = inner_state->buffer;
     203           2 :         inner_state->src_win.count = 0;
     204             : 
     205           2 :         LZ4F_errorCode_t ret = LZ4F_createDecompressionContext(
     206             :                 &inner_state->ctx.d, LZ4F_VERSION);
     207           2 :         if (LZ4F_isError(ret)) {
     208           0 :                 free(buf);
     209           0 :                 mnstr_set_open_error(inner->name, 0, "failed to initialize lz4: %s", LZ4F_getErrorName(ret));
     210           0 :                 return NULL;
     211             :         }
     212             : 
     213           2 :         state->worker = decomp;
     214           2 :         state->finalizer = decomp_end;
     215             : 
     216           2 :         stream *s = pump_stream(inner, state);
     217           2 :         if (s == NULL) {
     218           0 :                 free(buf);
     219           0 :                 return NULL;
     220             :         }
     221             : 
     222             :         return s;
     223             : }
     224             : 
     225             : static stream *
     226           2 : setup_compression(stream *inner, pump_state *state, int level)
     227             : {
     228           2 :         inner_state_t *inner_state = state->inner_state;
     229           2 :         LZ4F_errorCode_t ret;
     230             : 
     231             :         // When pumping data into the compressor, the output buffer must be
     232             :         // sufficiently large to hold all output caused by the current input. We
     233             :         // will restrict our writes to be at most WRITE_CHUCK large and allocate
     234             :         // a buffer that can accomodate even the worst case amount of output
     235             :         // caused by input of that size.
     236             : 
     237             :         // The required size depends on the preferences so we set those first.
     238           2 :         memset(&inner_state->compression_prefs, 0, sizeof(inner_state->compression_prefs));
     239           2 :         inner_state->compression_prefs.compressionLevel = level;
     240             : 
     241             :         // Set up a buffer that can hold the largest output block plus the initial
     242             :         // header frame.
     243           2 :         size_t bound = LZ4F_compressBound(WRITE_CHUNK, &inner_state->compression_prefs);
     244           2 :         size_t bufsize = bound + LZ4F_HEADER_SIZE_MAX;
     245           2 :         char *buffer = malloc(bufsize);
     246           2 :         if (buffer == NULL)
     247             :                 return NULL;
     248           2 :         inner_state->buffer = (pump_buffer) { .start = buffer, .count = bufsize };
     249           2 :         inner_state->dst_win = inner_state->buffer;
     250           2 :         state->elbow_room = bound;
     251             : 
     252           2 :         ret = LZ4F_createCompressionContext(&inner_state->ctx.c, LZ4F_VERSION);
     253           2 :         if (LZ4F_isError(ret)) {
     254           0 :                 free(buffer);
     255           0 :                 return NULL;
     256             :         }
     257             : 
     258             :         // Write the header frame.
     259           4 :         size_t nwritten = LZ4F_compressBegin(
     260             :                 inner_state->ctx.c,
     261           2 :                 inner_state->dst_win.start,
     262             :                 inner_state->dst_win.count,
     263             :                 &inner_state->compression_prefs
     264             :         );
     265           2 :         if (LZ4F_isError(nwritten)) {
     266           0 :                 LZ4F_freeCompressionContext(inner_state->ctx.c);
     267           0 :                 free(buffer);
     268           0 :                 mnstr_set_open_error(inner->name, 0, "failed to initialize lz4: %s", LZ4F_getErrorName(ret));
     269           0 :                 return NULL;
     270             :         }
     271           2 :         inner_state->dst_win.start += nwritten;
     272           2 :         inner_state->dst_win.count -= nwritten;
     273             : 
     274           2 :         state->worker = compr;
     275           2 :         state->finalizer = compr_end;
     276             : 
     277           2 :         stream *s = pump_stream(inner, state);
     278           2 :         if (s == NULL) {
     279           0 :                 free(buffer);
     280           0 :                 return NULL;
     281             :         }
     282             : 
     283             :         return s;
     284             : }
     285             : 
     286             : stream *
     287           4 : lz4_stream(stream *inner, int level)
     288             : {
     289           4 :         inner_state_t *inner_state = calloc(1, sizeof(inner_state_t));
     290           4 :         pump_state *state = calloc(1, sizeof(pump_state));
     291           4 :         if (inner_state == NULL || state == NULL) {
     292           0 :                 free(inner_state);
     293           0 :                 free(state);
     294           0 :                 mnstr_set_open_error(inner->name, errno, "couldn't initialize lz4 stream");
     295           0 :                 return NULL;
     296             :         }
     297             : 
     298           4 :         state->inner_state = inner_state;
     299           4 :         state->get_src_win = get_src_win;
     300           4 :         state->set_src_win = set_src_win;
     301           4 :         state->get_dst_win = get_dst_win;
     302           4 :         state->set_dst_win = set_dst_win;
     303           4 :         state->get_buffer = get_buffer;
     304           4 :         state->get_error = get_error;
     305             : 
     306           4 :         stream *s;
     307           4 :         if (inner->readonly)
     308           2 :                 s = setup_decompression(inner, state);
     309             :         else
     310           2 :                 s = setup_compression(inner, state, level);
     311             : 
     312           4 :         if (s == NULL) {
     313           0 :                 free(inner_state);
     314           0 :                 free(state);
     315           0 :                 return NULL;
     316             :         }
     317             : 
     318             :         return s;
     319             : }
     320             : 
     321             : static stream *
     322           0 : open_lz4stream(const char *restrict filename, const char *restrict flags)
     323             : {
     324           0 :         stream *inner;
     325           0 :         int preset = 6;
     326             : 
     327           0 :         inner = open_stream(filename, flags);
     328           0 :         if (inner == NULL)
     329             :                 return NULL;
     330             : 
     331           0 :         return lz4_stream(inner, preset);
     332             : }
     333             : 
     334             : stream *
     335           0 : open_lz4rstream(const char *filename)
     336             : {
     337           0 :         stream *s = open_lz4stream(filename, "rb");
     338           0 :         if (s == NULL)
     339             :                 return NULL;
     340             : 
     341           0 :         assert(s->readonly == true);
     342           0 :         assert(s->binary == true);
     343             :         return s;
     344             : }
     345             : 
     346             : stream *
     347           0 : open_lz4wstream(const char *restrict filename, const char *restrict mode)
     348             : {
     349           0 :         stream *s = open_lz4stream(filename, mode);
     350           0 :         if (s == NULL)
     351             :                 return NULL;
     352             : 
     353           0 :         assert(s->readonly == false);
     354           0 :         assert(s->binary == true);
     355             :         return s;
     356             : }
     357             : 
     358             : stream *
     359           0 : open_lz4rastream(const char *filename)
     360             : {
     361           0 :         stream *s = open_lz4stream(filename, "r");
     362           0 :         s = create_text_stream(s);
     363           0 :         if (s == NULL)
     364             :                 return NULL;
     365             : 
     366           0 :         assert(s->readonly == true);
     367           0 :         assert(s->binary == false);
     368             :         return s;
     369             : }
     370             : 
     371             : stream *
     372           0 : open_lz4wastream(const char *restrict filename, const char *restrict mode)
     373             : {
     374           0 :         stream *s = open_lz4stream(filename, mode);
     375           0 :         s = create_text_stream(s);
     376           0 :         if (s == NULL)
     377             :                 return NULL;
     378           0 :         assert(s->readonly == false);
     379           0 :         assert(s->binary == false);
     380             :         return s;
     381             : }
     382             : #else
     383             : 
     384             : stream *
     385             : lz4_stream(stream *inner, int preset)
     386             : {
     387             :         (void) inner;
     388             :         (void) preset;
     389             :         mnstr_set_open_error(inner->name, 0, "LZ4 support has been left out of this MonetDB");
     390             :         return NULL;
     391             : }
     392             : 
     393             : stream *
     394             : open_lz4rstream(const char *filename)
     395             : {
     396             :         mnstr_set_open_error(filename, 0, "LZ4 support has been left out of this MonetDB");
     397             :         return NULL;
     398             : }
     399             : 
     400             : stream *
     401             : open_lz4wstream(const char *restrict filename, const char *restrict mode)
     402             : {
     403             :         (void) mode;
     404             :         mnstr_set_open_error(filename, 0, "LZ4 support has been left out of this MonetDB");
     405             :         return NULL;
     406             : }
     407             : 
     408             : stream *
     409             : open_lz4rastream(const char *filename)
     410             : {
     411             :         mnstr_set_open_error(filename, 0, "LZ4 support has been left out of this MonetDB");
     412             :         return NULL;
     413             : }
     414             : 
     415             : stream *
     416             : open_lz4wastream(const char *restrict filename, const char *restrict mode)
     417             : {
     418             :         (void) mode;
     419             :         mnstr_set_open_error(filename, 0, "LZ4 support has been left out of this MonetDB");
     420             :         return NULL;
     421             : }
     422             : 
     423             : #endif

Generated by: LCOV version 1.14