Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /* streams working on a lzma/xz-compressed disk file */
14 :
15 : #include "monetdb_config.h"
16 : #include "stream.h"
17 : #include "stream_internal.h"
18 : #include "pump.h"
19 :
20 : #include <assert.h>
21 :
22 : static pump_result pump_in(stream *s);
23 : static pump_result pump_out(stream *s, pump_action action);
24 :
25 : static ssize_t pump_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt);
26 : static ssize_t pump_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt);
27 : static int pump_flush(stream *s, mnstr_flush_level flush_level);
28 : static void pump_close(stream *s);
29 : static void pump_destroy(stream *s);
30 :
31 :
32 : stream *
33 490 : pump_stream(stream *inner, pump_state *state)
34 : {
35 490 : assert(inner);
36 490 : assert(state);
37 490 : assert(state->set_src_win != NULL);
38 490 : assert(state->get_dst_win != NULL);
39 490 : assert(state->set_dst_win != NULL);
40 490 : assert(state->get_buffer != NULL);
41 490 : assert(state->worker != NULL);
42 490 : assert(state->get_error != NULL);
43 490 : assert(state->finalizer != NULL);
44 :
45 490 : inner_state_t *inner_state = state->inner_state;
46 :
47 490 : stream *s = create_wrapper_stream(NULL, inner);
48 490 : if (s == NULL)
49 : return NULL;
50 :
51 490 : pump_buffer buf = state->get_buffer(inner_state);
52 490 : if (s->readonly) {
53 : // Read from inner stream to src buffer through pumper to outbufs.
54 : // This means the src window starts empty
55 461 : buf.count = 0;
56 461 : state->set_src_win(inner_state, buf);
57 : } else {
58 : // from inbufs through pumper to dst buffer to inner stream.
59 : // This means the out window is our whole buffer.
60 : // Check for NULL in case caller has already initialized it
61 : // and written something
62 29 : if (state->get_dst_win(inner_state).start == NULL)
63 2 : state->set_dst_win(inner_state, buf);
64 : }
65 :
66 490 : s->stream_data.p = (void*) state;
67 490 : s->read = pump_read;
68 490 : s->write = pump_write;
69 490 : s->flush = pump_flush;
70 490 : s->close = pump_close;
71 490 : s->destroy = pump_destroy;
72 490 : return s;
73 : }
74 :
75 :
76 : static ssize_t
77 46390 : pump_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
78 : {
79 46390 : pump_state *state = (pump_state*) s->stream_data.p;
80 46390 : inner_state_t *inner_state = state->inner_state;
81 46390 : size_t size = elmsize * cnt;
82 :
83 46390 : state->set_dst_win(inner_state, (pump_buffer){ .start = buf, .count = size});
84 46390 : pump_result ret = pump_in(s);
85 46390 : if (ret == PUMP_ERROR) {
86 0 : const char *msg = state->get_error(inner_state);
87 0 : if (msg != NULL)
88 0 : msg = "processing failed without further error indication";
89 0 : mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
90 0 : return -1;
91 : }
92 :
93 46390 : char *free_space = state->get_dst_win(inner_state).start;
94 46390 : ssize_t nread = free_space - (char*) buf;
95 :
96 46390 : return nread / (ssize_t) elmsize;
97 : }
98 :
99 :
100 : static ssize_t
101 1218251 : pump_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
102 : {
103 1218251 : pump_state *state = (pump_state*) s->stream_data.p;
104 1218251 : inner_state_t *inner_state = state->inner_state;
105 1218251 : size_t size = elmsize * cnt;
106 :
107 1218251 : if (size == 0)
108 68 : return cnt;
109 :
110 1218183 : state->set_src_win(inner_state, (pump_buffer){ .start = (void*)buf, .count = size });
111 1218183 : pump_result ret = pump_out(s, PUMP_NO_FLUSH);
112 1218183 : if (ret == PUMP_ERROR) {
113 0 : const char *msg = state->get_error(inner_state);
114 0 : if (msg != NULL)
115 0 : msg = "processing failed without further error indication";
116 0 : mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
117 0 : return -1;
118 : }
119 1218183 : ssize_t nwritten = state->get_src_win(inner_state).start - (char*)buf;
120 1218183 : return nwritten / (ssize_t) elmsize;
121 : }
122 :
123 :
124 810 : static int pump_flush(stream *s, mnstr_flush_level flush_level)
125 : {
126 810 : pump_state *state = (pump_state*) s->stream_data.p;
127 810 : inner_state_t *inner_state = state->inner_state;
128 810 : pump_action action;
129 :
130 810 : switch (flush_level) {
131 : case MNSTR_FLUSH_DATA:
132 : action = PUMP_FLUSH_DATA;
133 : break;
134 808 : case MNSTR_FLUSH_ALL:
135 808 : action = PUMP_FLUSH_ALL;
136 808 : break;
137 : default:
138 0 : assert(0 /* unknown flush_level */);
139 : action = PUMP_FLUSH_DATA;
140 : break;
141 : }
142 :
143 810 : state->set_src_win(inner_state, (pump_buffer){ .start = NULL, .count = 0 });
144 810 : ssize_t nwritten = pump_out(s, action);
145 810 : if (nwritten < 0)
146 : return -1;
147 : else
148 810 : return mnstr_flush(s->inner, flush_level);
149 : }
150 :
151 :
152 : static void
153 490 : pump_close(stream *s)
154 : {
155 490 : pump_state *state = (pump_state*) s->stream_data.p;
156 490 : inner_state_t *inner_state = state->inner_state;
157 :
158 490 : if (!s->readonly) {
159 29 : state->set_src_win(inner_state, (pump_buffer){ .start = NULL, .count = 0 });
160 29 : pump_out(s, PUMP_FINISH);
161 : }
162 490 : mnstr_close(s->inner);
163 490 : }
164 :
165 :
166 : static void
167 490 : pump_destroy(stream *s)
168 : {
169 490 : pump_state *state = (pump_state*) s->stream_data.p;
170 490 : inner_state_t *inner_state = state->inner_state;
171 :
172 490 : state->finalizer(inner_state);
173 490 : free(state);
174 490 : mnstr_destroy(s->inner);
175 490 : destroy_stream(s);
176 490 : }
177 :
178 : static pump_result
179 46390 : pump_in(stream *s)
180 : {
181 46390 : pump_state *state = (pump_state *) s->stream_data.p;
182 46390 : inner_state_t *inner_state = state->inner_state;
183 :
184 46390 : char *before = state->get_dst_win(inner_state).start;
185 46390 : (void) before; // nice while in the debugger
186 :
187 46390 : pump_buffer buffer = state->get_buffer(inner_state);
188 81182 : while (1) {
189 127572 : pump_buffer dst = state->get_dst_win(inner_state);
190 127572 : pump_buffer src = state->get_src_win(inner_state);
191 :
192 127572 : if (dst.count == 0)
193 : // Output buffer is full, we're done.
194 46390 : return PUMP_OK;
195 :
196 : // Handle input, if possible and necessary
197 81905 : if (src.start != NULL && src.count == 0) {
198 : // start != NULL means we haven't encountered EOF yet
199 :
200 35980 : ssize_t nread = mnstr_read(s->inner, buffer.start, 1, buffer.count);
201 :
202 35980 : if (nread < 0)
203 : // Error. Return directly, discarding any data lingering
204 : // in the internal state.
205 : return PUMP_ERROR;
206 35980 : if (nread == 0) {
207 : // Set to NULL so we'll remember next time.
208 : // Maybe there is some data in the internal state we don't
209 : // return immediately.
210 453 : src = (pump_buffer){.start=NULL, .count=0};
211 453 : s->eof |= s->inner->eof;
212 : } else
213 : // All good
214 35527 : src = (pump_buffer) { .start = buffer.start, .count = nread};
215 :
216 35980 : state->set_src_win(inner_state, src);
217 : }
218 :
219 81905 : pump_action action = (src.start != NULL) ? PUMP_NO_FLUSH : PUMP_FINISH;
220 :
221 : // Try to make some progress
222 81905 : assert(dst.count > 0);
223 81905 : assert(src.count > 0 || action == PUMP_FINISH);
224 81905 : pump_result ret = state->worker(inner_state, action);
225 81905 : if (ret == PUMP_ERROR)
226 : return PUMP_ERROR;
227 :
228 81905 : if (ret == PUMP_END)
229 : // If you say so
230 : return PUMP_END;
231 :
232 : // If we get here we made some progress so we're ready for a new iteration.
233 : }
234 : }
235 :
236 :
237 : static pump_result
238 1219022 : pump_out(stream *s, pump_action action)
239 : {
240 1219022 : pump_state *state = (pump_state *) s->stream_data.p;
241 1219022 : inner_state_t *inner_state = state->inner_state;
242 :
243 1219022 : void *before = state->get_src_win(inner_state).start;
244 1219022 : (void) before; // nice while in the debugger
245 :
246 1219022 : pump_buffer buffer = state->get_buffer(inner_state);
247 :
248 1226786 : while (1) {
249 1222904 : pump_buffer dst = state->get_dst_win(inner_state);
250 1222904 : pump_buffer src = state->get_src_win(inner_state);
251 :
252 : // Make sure there is room in the output buffer
253 1222904 : assert(state->elbow_room <= buffer.count);
254 1222904 : if (dst.count == 0 || dst.count < state->elbow_room) {
255 319 : size_t amount = dst.start - buffer.start;
256 319 : ssize_t nwritten = mnstr_write(s->inner, buffer.start, 1, amount);
257 319 : if (nwritten != (ssize_t)amount)
258 1219022 : return PUMP_ERROR;
259 319 : dst = buffer;
260 319 : state->set_dst_win(inner_state, dst); // reset output window
261 : }
262 :
263 : // Try to make progress
264 1222904 : pump_result ret = state->worker(inner_state, action);
265 1222904 : if (ret == PUMP_ERROR)
266 : return PUMP_ERROR;
267 :
268 : // src and dst have been invalidated by the call to worker
269 1222702 : dst = state->get_dst_win(inner_state);
270 1222702 : src = state->get_src_win(inner_state);
271 :
272 : // There was no error but if input is still available, we definitely
273 : // need another round
274 1222702 : if (src.count > 0)
275 3678 : continue;
276 :
277 : // Though the input data has been consumed, some of it might still
278 : // linger in the internal state.
279 1219024 : if (action == PUMP_NO_FLUSH) {
280 : // Let it linger, we'll combine it with the next batch
281 1218183 : assert(ret == PUMP_OK); // worker would never PUMP_END, would it?
282 : return PUMP_OK;
283 : }
284 :
285 : // We are flushing or finishing or whatever.
286 : // We may need to do more iterations to fully flush the internal state.
287 : // Is there any internal state left?
288 841 : if (ret == PUMP_OK)
289 : // yes, there is
290 204 : continue;
291 :
292 : // All internal state has been drained.
293 : // Now drain the output buffer
294 637 : assert(ret == PUMP_END);
295 637 : size_t amount = dst.start - buffer.start;
296 637 : if (amount > 0) {
297 635 : ssize_t nwritten = mnstr_write(s->inner, buffer.start, 1, amount);
298 635 : if (nwritten != (ssize_t)amount)
299 : return PUMP_ERROR;
300 : }
301 637 : state->set_dst_win(inner_state, buffer); // reset output window
302 637 : return PUMP_END;
303 : }
304 :
305 :
306 : }
|