Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /* streams working on a lzma/xz-compressed disk file */
14 :
15 : #include "monetdb_config.h"
16 : #include "stream.h"
17 : #include "stream_internal.h"
18 : #include "pump.h"
19 :
20 : #include <assert.h>
21 :
22 : static pump_result pump_in(stream *s);
23 : static pump_result pump_out(stream *s, pump_action action);
24 :
25 : static ssize_t pump_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt);
26 : static ssize_t pump_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt);
27 : static int pump_flush(stream *s, mnstr_flush_level flush_level);
28 : static void pump_close(stream *s);
29 : static void pump_destroy(stream *s);
30 :
31 :
32 : stream *
33 454 : pump_stream(stream *inner, pump_state *state)
34 : {
35 454 : assert(inner);
36 454 : assert(state);
37 454 : assert(state->set_src_win != NULL);
38 454 : assert(state->get_dst_win != NULL);
39 454 : assert(state->set_dst_win != NULL);
40 454 : assert(state->get_buffer != NULL);
41 454 : assert(state->worker != NULL);
42 454 : assert(state->get_error != NULL);
43 454 : assert(state->finalizer != NULL);
44 :
45 454 : inner_state_t *inner_state = state->inner_state;
46 :
47 454 : stream *s = create_wrapper_stream(NULL, inner);
48 454 : if (s == NULL)
49 : return NULL;
50 :
51 454 : pump_buffer buf = state->get_buffer(inner_state);
52 454 : if (s->readonly) {
53 : // Read from inner stream to src buffer through pumper to outbufs.
54 : // This means the src window starts empty
55 444 : buf.count = 0;
56 444 : state->set_src_win(inner_state, buf);
57 : } else {
58 : // from inbufs through pumper to dst buffer to inner stream.
59 : // This means the out window is our whole buffer.
60 : // Check for NULL in case caller has already initialized it
61 : // and written something
62 10 : if (state->get_dst_win(inner_state).start == NULL)
63 2 : state->set_dst_win(inner_state, buf);
64 : }
65 :
66 454 : s->stream_data.p = (void*) state;
67 454 : s->read = pump_read;
68 454 : s->write = pump_write;
69 454 : s->flush = pump_flush;
70 454 : s->close = pump_close;
71 454 : s->destroy = pump_destroy;
72 454 : return s;
73 : }
74 :
75 :
76 : static ssize_t
77 32603 : pump_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
78 : {
79 32603 : pump_state *state = (pump_state*) s->stream_data.p;
80 32603 : inner_state_t *inner_state = state->inner_state;
81 32603 : size_t size = elmsize * cnt;
82 :
83 32603 : state->set_dst_win(inner_state, (pump_buffer){ .start = buf, .count = size});
84 32603 : pump_result ret = pump_in(s);
85 32603 : if (ret == PUMP_ERROR) {
86 0 : const char *msg = state->get_error(inner_state);
87 0 : if (msg != NULL)
88 0 : msg = "processing failed without further error indication";
89 0 : mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
90 0 : return -1;
91 : }
92 :
93 32603 : char *free_space = state->get_dst_win(inner_state).start;
94 32603 : ssize_t nread = free_space - (char*) buf;
95 :
96 32603 : return nread / (ssize_t) elmsize;
97 : }
98 :
99 :
100 : static ssize_t
101 2275 : pump_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
102 : {
103 2275 : pump_state *state = (pump_state*) s->stream_data.p;
104 2275 : inner_state_t *inner_state = state->inner_state;
105 2275 : size_t size = elmsize * cnt;
106 :
107 2275 : if (size == 0)
108 0 : return cnt;
109 :
110 2275 : state->set_src_win(inner_state, (pump_buffer){ .start = (void*)buf, .count = size });
111 2275 : pump_result ret = pump_out(s, PUMP_NO_FLUSH);
112 2275 : if (ret == PUMP_ERROR) {
113 0 : const char *msg = state->get_error(inner_state);
114 0 : if (msg != NULL)
115 0 : msg = "processing failed without further error indication";
116 0 : mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
117 0 : return -1;
118 : }
119 2275 : ssize_t nwritten = state->get_src_win(inner_state).start - (char*)buf;
120 2275 : return nwritten / (ssize_t) elmsize;
121 : }
122 :
123 :
124 807 : static int pump_flush(stream *s, mnstr_flush_level flush_level)
125 : {
126 807 : pump_state *state = (pump_state*) s->stream_data.p;
127 807 : inner_state_t *inner_state = state->inner_state;
128 807 : pump_action action;
129 :
130 807 : switch (flush_level) {
131 : case MNSTR_FLUSH_DATA:
132 : action = PUMP_FLUSH_DATA;
133 : break;
134 806 : case MNSTR_FLUSH_ALL:
135 806 : action = PUMP_FLUSH_ALL;
136 806 : break;
137 : default:
138 0 : assert(0 /* unknown flush_level */);
139 : action = PUMP_FLUSH_DATA;
140 : break;
141 : }
142 :
143 807 : state->set_src_win(inner_state, (pump_buffer){ .start = NULL, .count = 0 });
144 807 : ssize_t nwritten = pump_out(s, action);
145 807 : if (nwritten < 0)
146 : return -1;
147 : else
148 807 : return mnstr_flush(s->inner, flush_level);
149 : }
150 :
151 :
152 : static void
153 454 : pump_close(stream *s)
154 : {
155 454 : pump_state *state = (pump_state*) s->stream_data.p;
156 454 : inner_state_t *inner_state = state->inner_state;
157 :
158 454 : if (!s->readonly) {
159 10 : state->set_src_win(inner_state, (pump_buffer){ .start = NULL, .count = 0 });
160 10 : pump_out(s, PUMP_FINISH);
161 : }
162 454 : mnstr_close(s->inner);
163 454 : }
164 :
165 :
166 : static void
167 454 : pump_destroy(stream *s)
168 : {
169 454 : pump_state *state = (pump_state*) s->stream_data.p;
170 454 : inner_state_t *inner_state = state->inner_state;
171 :
172 454 : state->finalizer(inner_state);
173 454 : free(state);
174 454 : mnstr_destroy(s->inner);
175 454 : destroy_stream(s);
176 454 : }
177 :
178 : static pump_result
179 32603 : pump_in(stream *s)
180 : {
181 32603 : pump_state *state = (pump_state *) s->stream_data.p;
182 32603 : inner_state_t *inner_state = state->inner_state;
183 :
184 32603 : char *before = state->get_dst_win(inner_state).start;
185 32603 : (void) before; // nice while in the debugger
186 :
187 32603 : pump_buffer buffer = state->get_buffer(inner_state);
188 67272 : while (1) {
189 99875 : pump_buffer dst = state->get_dst_win(inner_state);
190 99875 : pump_buffer src = state->get_src_win(inner_state);
191 :
192 99875 : if (dst.count == 0)
193 : // Output buffer is full, we're done.
194 32603 : return PUMP_OK;
195 :
196 : // Handle input, if possible and necessary
197 67962 : if (src.start != NULL && src.count == 0) {
198 : // start != NULL means we haven't encountered EOF yet
199 :
200 35806 : ssize_t nread = mnstr_read(s->inner, buffer.start, 1, buffer.count);
201 :
202 35806 : if (nread < 0)
203 : // Error. Return directly, discarding any data lingering
204 : // in the internal state.
205 : return PUMP_ERROR;
206 35806 : if (nread == 0) {
207 : // Set to NULL so we'll remember next time.
208 : // Maybe there is some data in the internal state we don't
209 : // return immediately.
210 436 : src = (pump_buffer){.start=NULL, .count=0};
211 436 : s->eof |= s->inner->eof;
212 : } else
213 : // All good
214 35370 : src = (pump_buffer) { .start = buffer.start, .count = nread};
215 :
216 35806 : state->set_src_win(inner_state, src);
217 : }
218 :
219 67962 : pump_action action = (src.start != NULL) ? PUMP_NO_FLUSH : PUMP_FINISH;
220 :
221 : // Try to make some progress
222 67962 : assert(dst.count > 0);
223 67962 : assert(src.count > 0 || action == PUMP_FINISH);
224 67962 : pump_result ret = state->worker(inner_state, action);
225 67962 : if (ret == PUMP_ERROR)
226 : return PUMP_ERROR;
227 :
228 67962 : if (ret == PUMP_END)
229 : // If you say so
230 : return PUMP_END;
231 :
232 : // If we get here we made some progress so we're ready for a new iteration.
233 : }
234 : }
235 :
236 :
237 : static pump_result
238 3092 : pump_out(stream *s, pump_action action)
239 : {
240 3092 : pump_state *state = (pump_state *) s->stream_data.p;
241 3092 : inner_state_t *inner_state = state->inner_state;
242 :
243 3092 : void *before = state->get_src_win(inner_state).start;
244 3092 : (void) before; // nice while in the debugger
245 :
246 3092 : pump_buffer buffer = state->get_buffer(inner_state);
247 :
248 10376 : while (1) {
249 6734 : pump_buffer dst = state->get_dst_win(inner_state);
250 6734 : pump_buffer src = state->get_src_win(inner_state);
251 :
252 : // Make sure there is room in the output buffer
253 6734 : assert(state->elbow_room <= buffer.count);
254 6734 : if (dst.count == 0 || dst.count < state->elbow_room) {
255 50 : size_t amount = dst.start - buffer.start;
256 50 : ssize_t nwritten = mnstr_write(s->inner, buffer.start, 1, amount);
257 50 : if (nwritten != (ssize_t)amount)
258 3092 : return PUMP_ERROR;
259 50 : dst = buffer;
260 50 : state->set_dst_win(inner_state, dst); // reset output window
261 : }
262 :
263 : // Try to make progress
264 6734 : pump_result ret = state->worker(inner_state, action);
265 6734 : if (ret == PUMP_ERROR)
266 : return PUMP_ERROR;
267 :
268 : // src and dst have been invalidated by the call to worker
269 6533 : dst = state->get_dst_win(inner_state);
270 6533 : src = state->get_src_win(inner_state);
271 :
272 : // There was no error but if input is still available, we definitely
273 : // need another round
274 6533 : if (src.count > 0)
275 3439 : continue;
276 :
277 : // Though the input data has been consumed, some of it might still
278 : // linger in the internal state.
279 3094 : if (action == PUMP_NO_FLUSH) {
280 : // Let it linger, we'll combine it with the next batch
281 2275 : assert(ret == PUMP_OK); // worker would never PUMP_END, would it?
282 : return PUMP_OK;
283 : }
284 :
285 : // We are flushing or finishing or whatever.
286 : // We may need to do more iterations to fully flush the internal state.
287 : // Is there any internal state left?
288 819 : if (ret == PUMP_OK)
289 : // yes, there is
290 203 : continue;
291 :
292 : // All internal state has been drained.
293 : // Now drain the output buffer
294 616 : assert(ret == PUMP_END);
295 616 : size_t amount = dst.start - buffer.start;
296 616 : if (amount > 0) {
297 615 : ssize_t nwritten = mnstr_write(s->inner, buffer.start, 1, amount);
298 615 : if (nwritten != (ssize_t)amount)
299 : return PUMP_ERROR;
300 : }
301 616 : state->set_dst_win(inner_state, buffer); // reset output window
302 616 : return PUMP_END;
303 : }
304 :
305 :
306 : }
|