Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /* streams working on a lzma/xz-compressed disk file */
14 :
15 : #include "monetdb_config.h"
16 : #include "stream.h"
17 : #include "stream_internal.h"
18 : #include "pump.h"
19 :
20 :
21 : #ifdef HAVE_LIBLZ4
22 :
23 : #define READ_CHUNK (1024)
24 : #define WRITE_CHUNK (1024)
25 :
26 : struct inner_state {
27 : pump_buffer src_win;
28 : pump_buffer dst_win;
29 : pump_buffer buffer;
30 : union {
31 : LZ4F_cctx *c;
32 : LZ4F_dctx *d;
33 : } ctx;
34 : LZ4F_preferences_t compression_prefs;
35 : LZ4F_errorCode_t error_code;
36 : bool finished;
37 : };
38 :
39 : static pump_buffer
40 14195 : get_src_win(inner_state_t *inner_state)
41 : {
42 14195 : return inner_state->src_win;
43 : }
44 :
45 : static void
46 1267 : set_src_win(inner_state_t *inner_state, pump_buffer buf)
47 : {
48 1267 : inner_state->src_win = buf;
49 1267 : }
50 :
51 : static pump_buffer
52 16967 : get_dst_win(inner_state_t *inner_state)
53 : {
54 16967 : return inner_state->dst_win;
55 : }
56 :
57 : static void
58 2277 : set_dst_win(inner_state_t *inner_state, pump_buffer buf)
59 : {
60 2277 : inner_state->dst_win = buf;
61 2277 : }
62 :
63 : static pump_buffer
64 2779 : get_buffer(inner_state_t *inner_state)
65 : {
66 2779 : return inner_state->buffer;
67 : }
68 :
69 : static pump_result
70 2546 : decomp(inner_state_t *inner_state, pump_action action)
71 : {
72 2546 : LZ4F_errorCode_t ret;
73 :
74 2546 : if (inner_state->src_win.count == 0 && action == PUMP_FINISH)
75 4 : inner_state->finished = true;
76 2546 : if (inner_state->finished)
77 : return PUMP_END;
78 :
79 2542 : LZ4F_decompressOptions_t opts = {0};
80 2542 : size_t nsrc = inner_state->src_win.count; // amount available
81 2542 : size_t ndst = inner_state->dst_win.count; // space available
82 5084 : ret = LZ4F_decompress(
83 : inner_state->ctx.d,
84 2542 : inner_state->dst_win.start, &ndst,
85 2542 : inner_state->src_win.start, &nsrc,
86 : &opts);
87 : // Now nsrc has become the amount consumed, ndst the amount produced!
88 2542 : inner_state->src_win.start += nsrc;
89 2542 : inner_state->src_win.count -= nsrc;
90 2542 : inner_state->dst_win.start += ndst;
91 2542 : inner_state->dst_win.count -= ndst;
92 :
93 2542 : if (LZ4F_isError(ret)) {
94 0 : inner_state->error_code = ret;
95 0 : return PUMP_ERROR;
96 : }
97 : return PUMP_OK;
98 : }
99 :
100 : static void
101 2 : decomp_end(inner_state_t *inner_state)
102 : {
103 2 : LZ4F_freeDecompressionContext(inner_state->ctx.d);
104 2 : free(inner_state->buffer.start);
105 2 : free(inner_state);
106 2 : }
107 :
108 :
109 : static pump_result
110 4168 : compr(inner_state_t *inner_state, pump_action action)
111 : {
112 4168 : LZ4F_compressOptions_t opts = {0};
113 4168 : size_t consumed;
114 4168 : LZ4F_errorCode_t produced;
115 4168 : pump_result intended_result;
116 :
117 4168 : if (inner_state->finished)
118 : return PUMP_END;
119 :
120 4168 : size_t chunk = inner_state->src_win.count;
121 4168 : if (chunk > WRITE_CHUNK)
122 : chunk = WRITE_CHUNK;
123 :
124 4168 : switch (action) {
125 :
126 3964 : case PUMP_NO_FLUSH:
127 3964 : produced = LZ4F_compressUpdate(
128 : inner_state->ctx.c,
129 3964 : inner_state->dst_win.start,
130 : inner_state->dst_win.count,
131 3964 : inner_state->src_win.start,
132 : chunk,
133 : &opts);
134 3964 : consumed = chunk;
135 3964 : intended_result = PUMP_OK;
136 3964 : break;
137 :
138 202 : case PUMP_FLUSH_ALL:
139 : case PUMP_FLUSH_DATA:
140 : // FLUSH_ALL not supported yet, just flush the data
141 202 : produced = LZ4F_flush(
142 : inner_state->ctx.c,
143 202 : inner_state->dst_win.start,
144 : inner_state->dst_win.count,
145 : &opts);
146 202 : consumed = 0;
147 202 : intended_result = PUMP_END;
148 202 : break;
149 :
150 2 : case PUMP_FINISH:
151 4 : produced = LZ4F_compressEnd(
152 : inner_state->ctx.c,
153 2 : inner_state->dst_win.start,
154 : inner_state->dst_win.count,
155 : &opts);
156 2 : consumed = 0;
157 2 : inner_state->finished = true;
158 2 : intended_result = PUMP_END;
159 2 : break;
160 :
161 : default:
162 0 : assert(0); // shut up, compiler!
163 : return PUMP_ERROR;
164 : }
165 :
166 4168 : if (LZ4F_isError(produced)) {
167 0 : inner_state->error_code = produced;
168 0 : return PUMP_ERROR;
169 : }
170 :
171 4168 : inner_state->src_win.start += consumed;
172 4168 : inner_state->src_win.count -= consumed;
173 4168 : inner_state->dst_win.start += produced;
174 4168 : inner_state->dst_win.count -= produced;
175 :
176 4168 : return intended_result;
177 : }
178 :
179 : static void
180 2 : compr_end(inner_state_t *inner_state)
181 : {
182 2 : LZ4F_freeCompressionContext(inner_state->ctx.c);
183 2 : free(inner_state->buffer.start);
184 2 : free(inner_state);
185 2 : }
186 :
187 : static const char*
188 0 : get_error(inner_state_t *inner_state)
189 : {
190 0 : return LZ4F_getErrorName(inner_state->error_code);
191 : }
192 :
193 : static stream *
194 2 : setup_decompression(stream *inner, pump_state *state)
195 : {
196 2 : inner_state_t *inner_state = state->inner_state;
197 2 : void *buf = malloc(READ_CHUNK);
198 2 : if (buf == NULL)
199 : return NULL;
200 :
201 2 : inner_state->buffer = (pump_buffer) { .start = buf, .count = READ_CHUNK };
202 2 : inner_state->src_win = inner_state->buffer;
203 2 : inner_state->src_win.count = 0;
204 :
205 2 : LZ4F_errorCode_t ret = LZ4F_createDecompressionContext(
206 : &inner_state->ctx.d, LZ4F_VERSION);
207 2 : if (LZ4F_isError(ret)) {
208 0 : free(buf);
209 0 : mnstr_set_open_error(inner->name, 0, "failed to initialize lz4: %s", LZ4F_getErrorName(ret));
210 0 : return NULL;
211 : }
212 :
213 2 : state->worker = decomp;
214 2 : state->finalizer = decomp_end;
215 :
216 2 : stream *s = pump_stream(inner, state);
217 2 : if (s == NULL) {
218 0 : free(buf);
219 0 : return NULL;
220 : }
221 :
222 : return s;
223 : }
224 :
225 : static stream *
226 2 : setup_compression(stream *inner, pump_state *state, int level)
227 : {
228 2 : inner_state_t *inner_state = state->inner_state;
229 2 : LZ4F_errorCode_t ret;
230 :
231 : // When pumping data into the compressor, the output buffer must be
232 : // sufficiently large to hold all output caused by the current input. We
233 : // will restrict our writes to be at most WRITE_CHUCK large and allocate
234 : // a buffer that can accomodate even the worst case amount of output
235 : // caused by input of that size.
236 :
237 : // The required size depends on the preferences so we set those first.
238 2 : memset(&inner_state->compression_prefs, 0, sizeof(inner_state->compression_prefs));
239 2 : inner_state->compression_prefs.compressionLevel = level;
240 :
241 : // Set up a buffer that can hold the largest output block plus the initial
242 : // header frame.
243 2 : size_t bound = LZ4F_compressBound(WRITE_CHUNK, &inner_state->compression_prefs);
244 2 : size_t bufsize = bound + LZ4F_HEADER_SIZE_MAX;
245 2 : char *buffer = malloc(bufsize);
246 2 : if (buffer == NULL)
247 : return NULL;
248 2 : inner_state->buffer = (pump_buffer) { .start = buffer, .count = bufsize };
249 2 : inner_state->dst_win = inner_state->buffer;
250 2 : state->elbow_room = bound;
251 :
252 2 : ret = LZ4F_createCompressionContext(&inner_state->ctx.c, LZ4F_VERSION);
253 2 : if (LZ4F_isError(ret)) {
254 0 : free(buffer);
255 0 : return NULL;
256 : }
257 :
258 : // Write the header frame.
259 4 : size_t nwritten = LZ4F_compressBegin(
260 : inner_state->ctx.c,
261 2 : inner_state->dst_win.start,
262 : inner_state->dst_win.count,
263 : &inner_state->compression_prefs
264 : );
265 2 : if (LZ4F_isError(nwritten)) {
266 0 : LZ4F_freeCompressionContext(inner_state->ctx.c);
267 0 : free(buffer);
268 0 : mnstr_set_open_error(inner->name, 0, "failed to initialize lz4: %s", LZ4F_getErrorName(ret));
269 0 : return NULL;
270 : }
271 2 : inner_state->dst_win.start += nwritten;
272 2 : inner_state->dst_win.count -= nwritten;
273 :
274 2 : state->worker = compr;
275 2 : state->finalizer = compr_end;
276 :
277 2 : stream *s = pump_stream(inner, state);
278 2 : if (s == NULL) {
279 0 : free(buffer);
280 0 : return NULL;
281 : }
282 :
283 : return s;
284 : }
285 :
286 : stream *
287 4 : lz4_stream(stream *inner, int level)
288 : {
289 4 : inner_state_t *inner_state = calloc(1, sizeof(inner_state_t));
290 4 : pump_state *state = calloc(1, sizeof(pump_state));
291 4 : if (inner_state == NULL || state == NULL) {
292 0 : free(inner_state);
293 0 : free(state);
294 0 : mnstr_set_open_error(inner->name, errno, "couldn't initialize lz4 stream");
295 0 : return NULL;
296 : }
297 :
298 4 : state->inner_state = inner_state;
299 4 : state->get_src_win = get_src_win;
300 4 : state->set_src_win = set_src_win;
301 4 : state->get_dst_win = get_dst_win;
302 4 : state->set_dst_win = set_dst_win;
303 4 : state->get_buffer = get_buffer;
304 4 : state->get_error = get_error;
305 :
306 4 : stream *s;
307 4 : if (inner->readonly)
308 2 : s = setup_decompression(inner, state);
309 : else
310 2 : s = setup_compression(inner, state, level);
311 :
312 4 : if (s == NULL) {
313 0 : free(inner_state);
314 0 : free(state);
315 0 : return NULL;
316 : }
317 :
318 : return s;
319 : }
320 :
321 : static stream *
322 0 : open_lz4stream(const char *restrict filename, const char *restrict flags)
323 : {
324 0 : stream *inner;
325 0 : int preset = 6;
326 :
327 0 : inner = open_stream(filename, flags);
328 0 : if (inner == NULL)
329 : return NULL;
330 :
331 0 : return lz4_stream(inner, preset);
332 : }
333 :
334 : stream *
335 0 : open_lz4rstream(const char *filename)
336 : {
337 0 : stream *s = open_lz4stream(filename, "rb");
338 0 : if (s == NULL)
339 : return NULL;
340 :
341 0 : assert(s->readonly == true);
342 0 : assert(s->binary == true);
343 : return s;
344 : }
345 :
346 : stream *
347 0 : open_lz4wstream(const char *restrict filename, const char *restrict mode)
348 : {
349 0 : stream *s = open_lz4stream(filename, mode);
350 0 : if (s == NULL)
351 : return NULL;
352 :
353 0 : assert(s->readonly == false);
354 0 : assert(s->binary == true);
355 : return s;
356 : }
357 :
358 : stream *
359 0 : open_lz4rastream(const char *filename)
360 : {
361 0 : stream *s = open_lz4stream(filename, "r");
362 0 : s = create_text_stream(s);
363 0 : if (s == NULL)
364 : return NULL;
365 :
366 0 : assert(s->readonly == true);
367 0 : assert(s->binary == false);
368 : return s;
369 : }
370 :
371 : stream *
372 0 : open_lz4wastream(const char *restrict filename, const char *restrict mode)
373 : {
374 0 : stream *s = open_lz4stream(filename, mode);
375 0 : s = create_text_stream(s);
376 0 : if (s == NULL)
377 : return NULL;
378 0 : assert(s->readonly == false);
379 0 : assert(s->binary == false);
380 : return s;
381 : }
382 : #else
383 :
384 : stream *
385 : lz4_stream(stream *inner, int preset)
386 : {
387 : (void) inner;
388 : (void) preset;
389 : mnstr_set_open_error(inner->name, 0, "LZ4 support has been left out of this MonetDB");
390 : return NULL;
391 : }
392 :
393 : stream *
394 : open_lz4rstream(const char *filename)
395 : {
396 : mnstr_set_open_error(filename, 0, "LZ4 support has been left out of this MonetDB");
397 : return NULL;
398 : }
399 :
400 : stream *
401 : open_lz4wstream(const char *restrict filename, const char *restrict mode)
402 : {
403 : (void) mode;
404 : mnstr_set_open_error(filename, 0, "LZ4 support has been left out of this MonetDB");
405 : return NULL;
406 : }
407 :
408 : stream *
409 : open_lz4rastream(const char *filename)
410 : {
411 : mnstr_set_open_error(filename, 0, "LZ4 support has been left out of this MonetDB");
412 : return NULL;
413 : }
414 :
415 : stream *
416 : open_lz4wastream(const char *restrict filename, const char *restrict mode)
417 : {
418 : (void) mode;
419 : mnstr_set_open_error(filename, 0, "LZ4 support has been left out of this MonetDB");
420 : return NULL;
421 : }
422 :
423 : #endif
|