Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /* streams working on a lzma/xz-compressed disk file */
14 :
15 : #include "monetdb_config.h"
16 : #include "stream.h"
17 : #include "stream_internal.h"
18 : #include "pump.h"
19 :
20 : #include <assert.h>
21 :
22 : static pump_result pump_in(stream *s);
23 : static pump_result pump_out(stream *s, pump_action action);
24 :
25 : static ssize_t pump_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt);
26 : static ssize_t pump_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt);
27 : static int pump_flush(stream *s, mnstr_flush_level flush_level);
28 : static void pump_close(stream *s);
29 : static void pump_destroy(stream *s);
30 :
31 :
32 : stream *
33 509 : pump_stream(stream *inner, pump_state *state)
34 : {
35 509 : assert(inner);
36 509 : assert(state);
37 509 : assert(state->set_src_win != NULL);
38 509 : assert(state->get_dst_win != NULL);
39 509 : assert(state->set_dst_win != NULL);
40 509 : assert(state->get_buffer != NULL);
41 509 : assert(state->worker != NULL);
42 509 : assert(state->get_error != NULL);
43 509 : assert(state->finalizer != NULL);
44 :
45 509 : inner_state_t *inner_state = state->inner_state;
46 :
47 509 : stream *s = create_wrapper_stream(NULL, inner);
48 509 : if (s == NULL)
49 : return NULL;
50 :
51 509 : pump_buffer buf = state->get_buffer(inner_state);
52 509 : if (s->readonly) {
53 : // Read from inner stream to src buffer through pumper to outbufs.
54 : // This means the src window starts empty
55 477 : buf.count = 0;
56 477 : state->set_src_win(inner_state, buf);
57 : } else {
58 : // from inbufs through pumper to dst buffer to inner stream.
59 : // This means the out window is our whole buffer.
60 : // Check for NULL in case caller has already initialized it
61 : // and written something
62 32 : if (state->get_dst_win(inner_state).start == NULL)
63 2 : state->set_dst_win(inner_state, buf);
64 : }
65 :
66 509 : s->stream_data.p = (void*) state;
67 509 : s->read = pump_read;
68 509 : s->write = pump_write;
69 509 : s->flush = pump_flush;
70 509 : s->close = pump_close;
71 509 : s->destroy = pump_destroy;
72 509 : return s;
73 : }
74 :
75 :
76 : static ssize_t
77 47207 : pump_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
78 : {
79 47207 : pump_state *state = (pump_state*) s->stream_data.p;
80 47207 : inner_state_t *inner_state = state->inner_state;
81 47207 : size_t size = elmsize * cnt;
82 :
83 47207 : state->set_dst_win(inner_state, (pump_buffer){ .start = buf, .count = size});
84 47207 : pump_result ret = pump_in(s);
85 47207 : if (ret == PUMP_ERROR) {
86 0 : const char *msg = state->get_error(inner_state);
87 0 : if (msg != NULL)
88 0 : msg = "processing failed without further error indication";
89 0 : mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
90 0 : return -1;
91 : }
92 :
93 47207 : char *free_space = state->get_dst_win(inner_state).start;
94 47207 : ssize_t nread = free_space - (char*) buf;
95 :
96 47207 : return nread / (ssize_t) elmsize;
97 : }
98 :
99 :
100 : static ssize_t
101 1218328 : pump_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
102 : {
103 1218328 : pump_state *state = (pump_state*) s->stream_data.p;
104 1218328 : inner_state_t *inner_state = state->inner_state;
105 1218328 : size_t size = elmsize * cnt;
106 :
107 1218328 : if (size == 0)
108 70 : return cnt;
109 :
110 1218258 : state->set_src_win(inner_state, (pump_buffer){ .start = (void*)buf, .count = size });
111 1218258 : pump_result ret = pump_out(s, PUMP_NO_FLUSH);
112 1218258 : if (ret == PUMP_ERROR) {
113 0 : const char *msg = state->get_error(inner_state);
114 0 : if (msg != NULL)
115 0 : msg = "processing failed without further error indication";
116 0 : mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
117 0 : return -1;
118 : }
119 1218258 : ssize_t nwritten = state->get_src_win(inner_state).start - (char*)buf;
120 1218258 : return nwritten / (ssize_t) elmsize;
121 : }
122 :
123 :
124 841 : static int pump_flush(stream *s, mnstr_flush_level flush_level)
125 : {
126 841 : pump_state *state = (pump_state*) s->stream_data.p;
127 841 : inner_state_t *inner_state = state->inner_state;
128 841 : pump_action action;
129 :
130 841 : switch (flush_level) {
131 : case MNSTR_FLUSH_DATA:
132 : action = PUMP_FLUSH_DATA;
133 : break;
134 833 : case MNSTR_FLUSH_ALL:
135 833 : action = PUMP_FLUSH_ALL;
136 833 : break;
137 : default:
138 0 : assert(0 /* unknown flush_level */);
139 : action = PUMP_FLUSH_DATA;
140 : break;
141 : }
142 :
143 841 : state->set_src_win(inner_state, (pump_buffer){ .start = NULL, .count = 0 });
144 841 : ssize_t nwritten = pump_out(s, action);
145 841 : if (nwritten < 0)
146 : return -1;
147 : else
148 841 : return mnstr_flush(s->inner, flush_level);
149 : }
150 :
151 :
152 : static void
153 509 : pump_close(stream *s)
154 : {
155 509 : pump_state *state = (pump_state*) s->stream_data.p;
156 509 : inner_state_t *inner_state = state->inner_state;
157 :
158 509 : if (!s->readonly) {
159 32 : state->set_src_win(inner_state, (pump_buffer){ .start = NULL, .count = 0 });
160 32 : pump_out(s, PUMP_FINISH);
161 : }
162 509 : mnstr_close(s->inner);
163 509 : }
164 :
165 :
166 : static void
167 509 : pump_destroy(stream *s)
168 : {
169 509 : pump_state *state = (pump_state*) s->stream_data.p;
170 509 : inner_state_t *inner_state = state->inner_state;
171 :
172 509 : state->finalizer(inner_state);
173 509 : free(state);
174 509 : mnstr_destroy(s->inner);
175 509 : destroy_stream(s);
176 509 : }
177 :
178 : static pump_result
179 47207 : pump_in(stream *s)
180 : {
181 47207 : pump_state *state = (pump_state *) s->stream_data.p;
182 47207 : inner_state_t *inner_state = state->inner_state;
183 :
184 47207 : char *before = state->get_dst_win(inner_state).start;
185 47207 : (void) before; // nice while in the debugger
186 :
187 47207 : pump_buffer buffer = state->get_buffer(inner_state);
188 82424 : while (1) {
189 129631 : pump_buffer dst = state->get_dst_win(inner_state);
190 129631 : pump_buffer src = state->get_src_win(inner_state);
191 :
192 129631 : if (dst.count == 0)
193 : // Output buffer is full, we're done.
194 47207 : return PUMP_OK;
195 :
196 : // Handle input, if possible and necessary
197 83172 : if (src.start != NULL && src.count == 0) {
198 : // start != NULL means we haven't encountered EOF yet
199 :
200 36141 : ssize_t nread = mnstr_read(s->inner, buffer.start, 1, buffer.count);
201 :
202 36141 : if (nread < 0)
203 : // Error. Return directly, discarding any data lingering
204 : // in the internal state.
205 : return PUMP_ERROR;
206 36141 : if (nread == 0) {
207 : // Set to NULL so we'll remember next time.
208 : // Maybe there is some data in the internal state we don't
209 : // return immediately.
210 469 : src = (pump_buffer){.start=NULL, .count=0};
211 469 : s->eof |= s->inner->eof;
212 : } else
213 : // All good
214 35672 : src = (pump_buffer) { .start = buffer.start, .count = nread};
215 :
216 36141 : state->set_src_win(inner_state, src);
217 : }
218 :
219 83172 : pump_action action = (src.start != NULL) ? PUMP_NO_FLUSH : PUMP_FINISH;
220 :
221 : // Try to make some progress
222 83172 : assert(dst.count > 0);
223 83172 : assert(src.count > 0 || action == PUMP_FINISH);
224 83172 : pump_result ret = state->worker(inner_state, action);
225 83172 : if (ret == PUMP_ERROR)
226 : return PUMP_ERROR;
227 :
228 83172 : if (ret == PUMP_END)
229 : // If you say so
230 : return PUMP_END;
231 :
232 : // If we get here we made some progress so we're ready for a new iteration.
233 : }
234 : }
235 :
236 :
237 : static pump_result
238 1219131 : pump_out(stream *s, pump_action action)
239 : {
240 1219131 : pump_state *state = (pump_state *) s->stream_data.p;
241 1219131 : inner_state_t *inner_state = state->inner_state;
242 :
243 1219131 : void *before = state->get_src_win(inner_state).start;
244 1219131 : (void) before; // nice while in the debugger
245 :
246 1219131 : pump_buffer buffer = state->get_buffer(inner_state);
247 :
248 1227045 : while (1) {
249 1223088 : pump_buffer dst = state->get_dst_win(inner_state);
250 1223088 : pump_buffer src = state->get_src_win(inner_state);
251 :
252 : // Make sure there is room in the output buffer
253 1223088 : assert(state->elbow_room <= buffer.count);
254 1223088 : if (dst.count == 0 || dst.count < state->elbow_room) {
255 319 : size_t amount = dst.start - buffer.start;
256 319 : ssize_t nwritten = mnstr_write(s->inner, buffer.start, 1, amount);
257 319 : if (nwritten != (ssize_t)amount)
258 1219131 : return PUMP_ERROR;
259 319 : dst = buffer;
260 319 : state->set_dst_win(inner_state, dst); // reset output window
261 : }
262 :
263 : // Try to make progress
264 1223088 : pump_result ret = state->worker(inner_state, action);
265 1223088 : if (ret == PUMP_ERROR)
266 : return PUMP_ERROR;
267 :
268 : // src and dst have been invalidated by the call to worker
269 1222877 : dst = state->get_dst_win(inner_state);
270 1222877 : src = state->get_src_win(inner_state);
271 :
272 : // There was no error but if input is still available, we definitely
273 : // need another round
274 1222877 : if (src.count > 0)
275 3745 : continue;
276 :
277 : // Though the input data has been consumed, some of it might still
278 : // linger in the internal state.
279 1219132 : if (action == PUMP_NO_FLUSH) {
280 : // Let it linger, we'll combine it with the next batch
281 1218258 : assert(ret == PUMP_OK); // worker would never PUMP_END, would it?
282 : return PUMP_OK;
283 : }
284 :
285 : // We are flushing or finishing or whatever.
286 : // We may need to do more iterations to fully flush the internal state.
287 : // Is there any internal state left?
288 874 : if (ret == PUMP_OK)
289 : // yes, there is
290 212 : continue;
291 :
292 : // All internal state has been drained.
293 : // Now drain the output buffer
294 662 : assert(ret == PUMP_END);
295 662 : size_t amount = dst.start - buffer.start;
296 662 : if (amount > 0) {
297 657 : ssize_t nwritten = mnstr_write(s->inner, buffer.start, 1, amount);
298 657 : if (nwritten != (ssize_t)amount)
299 : return PUMP_ERROR;
300 : }
301 662 : state->set_dst_win(inner_state, buffer); // reset output window
302 662 : return PUMP_END;
303 : }
304 :
305 :
306 : }
|