Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "stream.h"
15 : #include "stream_internal.h"
16 : #include "pump.h"
17 :
18 : /* When reading, text streams convert \r\n to \n regardless of operating system,
19 : * and they drop the leading UTF-8 BOM marker if found.
20 : * When writing on Windows, \n is translated back to \r\n.
21 : *
22 : * Currently, skipping the BOM happens when opening, not on the first read action.
23 : */
24 :
25 : #define UTF8BOM "\xEF\xBB\xBF" /* UTF-8 encoding of Unicode BOM */
26 : #define UTF8BOMLENGTH 3 /* length of above */
27 :
28 : #define BUFFER_SIZE (65536)
29 : struct inner_state {
30 : pump_buffer src_win;
31 : pump_buffer dst_win;
32 : pump_buffer putback_win;
33 : pump_state *outer_state;
34 : char putback_buf[UTF8BOMLENGTH];
35 : bool crlf_pending;
36 : char buffer[BUFFER_SIZE];
37 : };
38 :
39 :
40 : static pump_buffer
41 4991272 : get_src_win(inner_state_t *inner_state)
42 : {
43 4991272 : return inner_state->src_win;
44 : }
45 :
46 :
47 : static void
48 1251335 : set_src_win(inner_state_t *inner_state, pump_buffer buf)
49 : {
50 1251335 : inner_state->src_win = buf;
51 1251335 : }
52 :
53 :
54 : static pump_buffer
55 2647651 : get_dst_win(inner_state_t *inner_state)
56 : {
57 2647651 : return inner_state->dst_win;
58 : }
59 :
60 :
61 : static void
62 45238 : set_dst_win(inner_state_t *inner_state, pump_buffer buf)
63 : {
64 45238 : inner_state->dst_win = buf;
65 45238 : }
66 :
67 :
68 : static pump_buffer
69 1261758 : get_buffer(inner_state_t *inner_state)
70 : {
71 1261758 : return (pump_buffer) { .start = inner_state->buffer, .count = BUFFER_SIZE };
72 : }
73 :
74 : inline static void
75 3 : put_byte(inner_state_t *ist, char byte)
76 : {
77 3 : *ist->dst_win.start++ = byte;
78 3 : assert(ist->dst_win.count > 0);
79 3 : ist->dst_win.count--;
80 3 : }
81 :
82 : inline static char
83 3 : take_byte(inner_state_t *ist)
84 : {
85 3 : ist->src_win.count--;
86 3 : return *ist->src_win.start++;
87 : }
88 :
89 : static pump_result
90 79295 : text_pump_in(inner_state_t *ist, pump_action action)
91 : {
92 79295 : assert(ist->dst_win.count > 0);
93 79295 : assert(ist->src_win.count > 0 || action == PUMP_FINISH);
94 :
95 79295 : if (ist->crlf_pending) {
96 0 : if (ist->src_win.count > 0) {
97 0 : if (ist->src_win.start[0] != '\n') {
98 : // CR not followed by a LF, emit it
99 0 : put_byte(ist, '\r');
100 : }
101 : } else {
102 0 : assert(action == PUMP_FINISH);
103 : // CR followed by end of file, not LF, so emit it
104 0 : put_byte(ist, '\r');
105 : }
106 : // in any case, the CR is no longer pending
107 0 : ist->crlf_pending = false;
108 : }
109 :
110 158401 : while (1) {
111 158401 : size_t span = ist->src_win.count < ist->dst_win.count
112 : ? ist->src_win.count
113 : : ist->dst_win.count;
114 158401 : if (span == 0)
115 : break;
116 :
117 79106 : if (ist->src_win.start[0] == '\r') {
118 : // Looking at a CR. We'll handle just that, then make another round of the while loop
119 3 : if (ist->src_win.count == 1) {
120 : // Don't know what will follow, move it to the flag.
121 : // Then stop, as all available input has been consumed
122 0 : take_byte(ist);
123 0 : ist->crlf_pending = true;
124 0 : break;
125 : }
126 3 : assert(ist->src_win.count > 1); // We can safely look ahead
127 3 : if (ist->src_win.start[1] == '\n') {
128 : // Drop the CR, move the LF
129 3 : take_byte(ist);
130 3 : put_byte(ist, take_byte(ist));
131 : } else {
132 : // Move the CR
133 0 : put_byte(ist, take_byte(ist));
134 : }
135 : // progress has been made, consider the situation anew
136 3 : continue;
137 79103 : } else {
138 : // The remaining input data does not start with a CR.
139 : // Move all non-CR data to the output buffer
140 79103 : char *cr = memchr(ist->src_win.start, '\r', span);
141 79103 : if (cr != NULL) {
142 3 : span = cr - ist->src_win.start;
143 : }
144 79103 : assert(span > 0);
145 79103 : memcpy(ist->dst_win.start, ist->src_win.start, span);
146 79103 : ist->src_win.start += span;
147 79103 : ist->src_win.count -= span;
148 79103 : ist->dst_win.start += span;
149 79103 : ist->dst_win.count -= span;
150 79103 : continue;
151 : }
152 : // Unreachable, all branches above explicitly break or continue
153 : assert(0 && "UNREACHABLE");
154 : }
155 :
156 79295 : if (action == PUMP_FINISH) {
157 194 : if (ist->src_win.count > 0)
158 : // More work to do
159 : return PUMP_OK;
160 194 : if (!ist->crlf_pending)
161 : // Completely done
162 : return PUMP_END;
163 0 : if (ist->dst_win.count > 0) {
164 0 : put_byte(ist, '\r');
165 0 : ist->crlf_pending = false; // not strictly necessary
166 : // Now we're completely done
167 0 : return PUMP_END;
168 : } else
169 : // Come back another time to flush the pending CR
170 : return PUMP_OK;
171 : } else
172 : // There is no error and we are not finishing so clearly we
173 : // must return PUMP_OK
174 : return PUMP_OK;
175 : }
176 :
177 :
178 : static pump_result
179 280 : text_pump_in_with_putback(inner_state_t *ist, pump_action action)
180 : {
181 280 : if (ist->putback_win.count == 0) {
182 : // no need for this function anymore
183 137 : assert(ist->outer_state->worker == text_pump_in_with_putback);
184 137 : ist->outer_state->worker = text_pump_in;
185 137 : return text_pump_in(ist, action);
186 : }
187 :
188 : // first empty the putback buffer
189 143 : pump_buffer tmp = ist->src_win;
190 143 : ist->src_win = ist->putback_win;
191 143 : pump_result ret = text_pump_in(ist, PUMP_NO_FLUSH);
192 143 : ist->putback_win = ist->src_win;
193 143 : ist->src_win = tmp;
194 143 : return ret;
195 : }
196 :
197 :
198 : static pump_result
199 1216989 : text_pump_out(inner_state_t *ist, pump_action action)
200 : {
201 1216989 : size_t src_count = ist->src_win.count;
202 1216989 : size_t dst_count = ist->dst_win.count;
203 1216989 : size_t ncopy = src_count < dst_count ? src_count : dst_count;
204 :
205 1216989 : if (ncopy > 0)
206 1216864 : memcpy(ist->dst_win.start, ist->src_win.start, ncopy);
207 1216989 : ist->dst_win.start += ncopy;
208 1216989 : ist->dst_win.count -= ncopy;
209 1216989 : ist->src_win.start += ncopy;
210 1216989 : ist->src_win.count -= ncopy;
211 :
212 1216989 : if (ist->src_win.count > 0)
213 : // definitely not done
214 : return PUMP_OK;
215 1216737 : if (action == PUMP_NO_FLUSH)
216 : // never return PUMP_END
217 : return PUMP_OK;
218 125 : if (ist->crlf_pending)
219 : // src win empty but cr still pending so not done
220 0 : return PUMP_OK;
221 : // src win empty and no cr pending and flush or finish requested
222 : return PUMP_END;
223 : }
224 :
225 :
226 : static pump_result
227 : text_pump_out_crlf(inner_state_t *ist, pump_action action)
228 : {
229 : if (ist->crlf_pending && ist->dst_win.count > 0) {
230 : put_byte(ist, '\n');
231 : ist->crlf_pending = false;
232 : }
233 :
234 : while (ist->src_win.count > 0 && ist->dst_win.count > 0) {
235 : char c = take_byte(ist);
236 : if (c != '\n') {
237 : put_byte(ist, c);
238 : continue;
239 : }
240 : put_byte(ist, '\r');
241 : if (ist->dst_win.count > 0)
242 : put_byte(ist, '\n');
243 : else {
244 : ist->crlf_pending = true;
245 : break;
246 : }
247 : }
248 :
249 : if (ist->src_win.count > 0)
250 : // definitely not done
251 : return PUMP_OK;
252 : if (action == PUMP_NO_FLUSH)
253 : // never return PUMP_END
254 : return PUMP_OK;
255 : if (ist->crlf_pending)
256 : // src win empty but cr still pending so not done
257 : return PUMP_OK;
258 : // src win empty and no cr pending and flush or finish requested
259 : return PUMP_END;
260 : }
261 :
262 :
263 : static void
264 163 : text_end(inner_state_t *s)
265 : {
266 163 : free(s);
267 163 : }
268 :
269 :
270 : static const char*
271 0 : get_error(inner_state_t *s)
272 : {
273 0 : (void)s;
274 0 : return "line ending conversion failure";
275 : }
276 :
277 : static ssize_t
278 137 : skip_bom(stream *s)
279 : {
280 137 : pump_state *state = (pump_state*) s->stream_data.p;
281 137 : stream *inner = s->inner;
282 137 : inner_state_t *ist = state->inner_state;
283 :
284 137 : ssize_t nread = mnstr_read(inner, ist->putback_buf, 1, UTF8BOMLENGTH);
285 137 : if (nread < 0) {
286 0 : mnstr_copy_error(s, inner);
287 0 : return nread;
288 : }
289 :
290 137 : if (nread == UTF8BOMLENGTH && memcmp(ist->putback_buf, UTF8BOM, nread) == 0) {
291 : // Bingo! Skip it!
292 1 : s->isutf8 = true;
293 1 : return 3;
294 : }
295 :
296 : // We have consumed some bytes that have to be unconsumed.
297 : // skip_bom left them in the putback_buf.
298 136 : ist->putback_win.start = ist->putback_buf;
299 136 : ist->putback_win.count = nread;
300 :
301 136 : return 0;
302 : }
303 :
304 :
305 : stream *
306 177 : create_text_stream(stream *inner)
307 : {
308 177 : inner_state_t *inner_state = calloc(1, sizeof(inner_state_t));
309 177 : if (inner_state == NULL) {
310 0 : mnstr_set_open_error(inner->name, errno, NULL);
311 0 : return NULL;
312 : }
313 :
314 177 : pump_state *state = calloc(1, sizeof(pump_state));
315 177 : if (inner_state == NULL || state == NULL) {
316 0 : free(inner_state);
317 0 : mnstr_set_open_error(inner->name, errno, NULL);
318 0 : return NULL;
319 : }
320 :
321 177 : state->inner_state = inner_state;
322 177 : state->get_src_win = get_src_win;
323 177 : state->set_src_win = set_src_win;
324 177 : state->get_dst_win = get_dst_win;
325 177 : state->set_dst_win = set_dst_win;
326 177 : state->get_buffer = get_buffer;
327 177 : state->finalizer = text_end;
328 177 : state->get_error = get_error;
329 :
330 177 : inner_state->outer_state = state;
331 177 : inner_state->putback_win.start = inner_state->putback_buf;
332 177 : inner_state->putback_win.count = 0;
333 177 : if (inner->readonly) {
334 137 : inner_state->src_win.start = inner_state->buffer;
335 137 : inner_state->src_win.count = 0;
336 137 : state->worker = text_pump_in_with_putback;
337 : } else {
338 40 : inner_state->dst_win.start = inner_state->buffer;
339 40 : inner_state->dst_win.count = BUFFER_SIZE;
340 : #ifdef _MSC_VER
341 : state->worker = text_pump_out_crlf;
342 : (void) text_pump_out;
343 : #else
344 40 : state->worker = text_pump_out;
345 177 : (void) text_pump_out_crlf;
346 : #endif
347 : }
348 :
349 177 : stream *s = pump_stream(inner, state);
350 177 : if (s == NULL) {
351 0 : free(inner_state);
352 0 : free(state);
353 0 : return NULL;
354 : }
355 :
356 177 : s->binary = false;
357 :
358 177 : if (s->readonly)
359 137 : if (skip_bom(s) < 0) {
360 0 : free(inner_state);
361 0 : free(state);
362 0 : char *err = mnstr_error(s);
363 0 : mnstr_set_open_error(inner->name, 0, "while looking for a byte order mark: %s", err);
364 0 : free(err);
365 0 : destroy_stream(s);
366 0 : return NULL;
367 : }
368 :
369 : return s;
370 : }
|