Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "stream.h"
15 : #include "stream_internal.h"
16 : #include "pump.h"
17 :
18 : /* When reading, text streams convert \r\n to \n regardless of operating system,
19 : * and they drop the leading UTF-8 BOM marker if found.
20 : * When writing on Windows, \n is translated back to \r\n.
21 : *
22 : * Currently, skipping the BOM happens when opening, not on the first read action.
23 : */
24 :
25 : #define UTF8BOM "\xEF\xBB\xBF" /* UTF-8 encoding of Unicode BOM */
26 : #define UTF8BOMLENGTH 3 /* length of above */
27 :
28 : #define BUFFER_SIZE (65536)
29 : struct inner_state {
30 : pump_buffer src_win;
31 : pump_buffer dst_win;
32 : pump_buffer putback_win;
33 : pump_state *outer_state;
34 : char putback_buf[UTF8BOMLENGTH];
35 : bool crlf_pending;
36 : char buffer[BUFFER_SIZE];
37 : };
38 :
39 :
40 : static pump_buffer
41 4991258 : get_src_win(inner_state_t *inner_state)
42 : {
43 4991258 : return inner_state->src_win;
44 : }
45 :
46 :
47 : static void
48 1251323 : set_src_win(inner_state_t *inner_state, pump_buffer buf)
49 : {
50 1251323 : inner_state->src_win = buf;
51 1251323 : }
52 :
53 :
54 : static pump_buffer
55 2647625 : get_dst_win(inner_state_t *inner_state)
56 : {
57 2647625 : return inner_state->dst_win;
58 : }
59 :
60 :
61 : static void
62 45232 : set_dst_win(inner_state_t *inner_state, pump_buffer buf)
63 : {
64 45232 : inner_state->dst_win = buf;
65 45232 : }
66 :
67 :
68 : static pump_buffer
69 1261748 : get_buffer(inner_state_t *inner_state)
70 : {
71 1261748 : return (pump_buffer) { .start = inner_state->buffer, .count = BUFFER_SIZE };
72 : }
73 :
74 : inline static void
75 3 : put_byte(inner_state_t *ist, char byte)
76 : {
77 3 : *ist->dst_win.start++ = byte;
78 3 : assert(ist->dst_win.count > 0);
79 3 : ist->dst_win.count--;
80 3 : }
81 :
82 : inline static char
83 3 : take_byte(inner_state_t *ist)
84 : {
85 3 : ist->src_win.count--;
86 3 : return *ist->src_win.start++;
87 : }
88 :
89 : static pump_result
90 79281 : text_pump_in(inner_state_t *ist, pump_action action)
91 : {
92 79281 : assert(ist->dst_win.count > 0);
93 79281 : assert(ist->src_win.count > 0 || action == PUMP_FINISH);
94 :
95 79281 : if (ist->crlf_pending) {
96 0 : if (ist->src_win.count > 0) {
97 0 : if (ist->src_win.start[0] != '\n') {
98 : // CR not followed by a LF, emit it
99 0 : put_byte(ist, '\r');
100 : }
101 : } else {
102 0 : assert(action == PUMP_FINISH);
103 : // CR followed by end of file, not LF, so emit it
104 0 : put_byte(ist, '\r');
105 : }
106 : // in any case, the CR is no longer pending
107 0 : ist->crlf_pending = false;
108 : }
109 :
110 158379 : while (1) {
111 158379 : size_t span = ist->src_win.count < ist->dst_win.count
112 : ? ist->src_win.count
113 : : ist->dst_win.count;
114 158379 : if (span == 0)
115 : break;
116 :
117 79098 : if (ist->src_win.start[0] == '\r') {
118 : // Looking at a CR. We'll handle just that, then make another round of the while loop
119 3 : if (ist->src_win.count == 1) {
120 : // Don't know what will follow, move it to the flag.
121 : // Then stop, as all available input has been consumed
122 0 : take_byte(ist);
123 0 : ist->crlf_pending = true;
124 0 : break;
125 : }
126 3 : assert(ist->src_win.count > 1); // We can safely look ahead
127 3 : if (ist->src_win.start[1] == '\n') {
128 : // Drop the CR, move the LF
129 3 : take_byte(ist);
130 3 : put_byte(ist, take_byte(ist));
131 : } else {
132 : // Move the CR
133 0 : put_byte(ist, take_byte(ist));
134 : }
135 : // progress has been made, consider the situation anew
136 3 : continue;
137 79095 : } else {
138 : // The remaining input data does not start with a CR.
139 : // Move all non-CR data to the output buffer
140 79095 : char *cr = memchr(ist->src_win.start, '\r', span);
141 79095 : if (cr != NULL) {
142 3 : span = cr - ist->src_win.start;
143 : }
144 79095 : assert(span > 0);
145 79095 : memcpy(ist->dst_win.start, ist->src_win.start, span);
146 79095 : ist->src_win.start += span;
147 79095 : ist->src_win.count -= span;
148 79095 : ist->dst_win.start += span;
149 79095 : ist->dst_win.count -= span;
150 79095 : continue;
151 : }
152 : // Unreachable, all branches above explicitly break or continue
153 : assert(0 && "UNREACHABLE");
154 : }
155 :
156 79281 : if (action == PUMP_FINISH) {
157 188 : if (ist->src_win.count > 0)
158 : // More work to do
159 : return PUMP_OK;
160 188 : if (!ist->crlf_pending)
161 : // Completely done
162 : return PUMP_END;
163 0 : if (ist->dst_win.count > 0) {
164 0 : put_byte(ist, '\r');
165 0 : ist->crlf_pending = false; // not strictly necessary
166 : // Now we're completely done
167 0 : return PUMP_END;
168 : } else
169 : // Come back another time to flush the pending CR
170 : return PUMP_OK;
171 : } else
172 : // There is no error and we are not finishing so clearly we
173 : // must return PUMP_OK
174 : return PUMP_OK;
175 : }
176 :
177 :
178 : static pump_result
179 272 : text_pump_in_with_putback(inner_state_t *ist, pump_action action)
180 : {
181 272 : if (ist->putback_win.count == 0) {
182 : // no need for this function anymore
183 133 : assert(ist->outer_state->worker == text_pump_in_with_putback);
184 133 : ist->outer_state->worker = text_pump_in;
185 133 : return text_pump_in(ist, action);
186 : }
187 :
188 : // first empty the putback buffer
189 139 : pump_buffer tmp = ist->src_win;
190 139 : ist->src_win = ist->putback_win;
191 139 : pump_result ret = text_pump_in(ist, PUMP_NO_FLUSH);
192 139 : ist->putback_win = ist->src_win;
193 139 : ist->src_win = tmp;
194 139 : return ret;
195 : }
196 :
197 :
198 : static pump_result
199 1216989 : text_pump_out(inner_state_t *ist, pump_action action)
200 : {
201 1216989 : size_t src_count = ist->src_win.count;
202 1216989 : size_t dst_count = ist->dst_win.count;
203 1216989 : size_t ncopy = src_count < dst_count ? src_count : dst_count;
204 :
205 1216989 : if (ncopy > 0)
206 1216864 : memcpy(ist->dst_win.start, ist->src_win.start, ncopy);
207 1216989 : ist->dst_win.start += ncopy;
208 1216989 : ist->dst_win.count -= ncopy;
209 1216989 : ist->src_win.start += ncopy;
210 1216989 : ist->src_win.count -= ncopy;
211 :
212 1216989 : if (ist->src_win.count > 0)
213 : // definitely not done
214 : return PUMP_OK;
215 1216737 : if (action == PUMP_NO_FLUSH)
216 : // never return PUMP_END
217 : return PUMP_OK;
218 125 : if (ist->crlf_pending)
219 : // src win empty but cr still pending so not done
220 0 : return PUMP_OK;
221 : // src win empty and no cr pending and flush or finish requested
222 : return PUMP_END;
223 : }
224 :
225 :
226 : static pump_result
227 : text_pump_out_crlf(inner_state_t *ist, pump_action action)
228 : {
229 : if (ist->crlf_pending && ist->dst_win.count > 0) {
230 : put_byte(ist, '\n');
231 : ist->crlf_pending = false;
232 : }
233 :
234 : while (ist->src_win.count > 0 && ist->dst_win.count > 0) {
235 : char c = take_byte(ist);
236 : if (c != '\n') {
237 : put_byte(ist, c);
238 : continue;
239 : }
240 : put_byte(ist, '\r');
241 : if (ist->dst_win.count > 0)
242 : put_byte(ist, '\n');
243 : else {
244 : ist->crlf_pending = true;
245 : break;
246 : }
247 : }
248 :
249 : if (ist->src_win.count > 0)
250 : // definitely not done
251 : return PUMP_OK;
252 : if (action == PUMP_NO_FLUSH)
253 : // never return PUMP_END
254 : return PUMP_OK;
255 : if (ist->crlf_pending)
256 : // src win empty but cr still pending so not done
257 : return PUMP_OK;
258 : // src win empty and no cr pending and flush or finish requested
259 : return PUMP_END;
260 : }
261 :
262 :
263 : static void
264 159 : text_end(inner_state_t *s)
265 : {
266 159 : free(s);
267 159 : }
268 :
269 :
270 : static const char*
271 0 : get_error(inner_state_t *s)
272 : {
273 0 : (void)s;
274 0 : return "line ending conversion failure";
275 : }
276 :
277 : static ssize_t
278 133 : skip_bom(stream *s)
279 : {
280 133 : pump_state *state = (pump_state*) s->stream_data.p;
281 133 : stream *inner = s->inner;
282 133 : inner_state_t *ist = state->inner_state;
283 :
284 133 : ssize_t nread = mnstr_read(inner, ist->putback_buf, 1, UTF8BOMLENGTH);
285 133 : if (nread < 0) {
286 0 : mnstr_copy_error(s, inner);
287 0 : return nread;
288 : }
289 :
290 133 : if (nread == UTF8BOMLENGTH && memcmp(ist->putback_buf, UTF8BOM, nread) == 0) {
291 : // Bingo! Skip it!
292 1 : s->isutf8 = true;
293 1 : return 3;
294 : }
295 :
296 : // We have consumed some bytes that have to be unconsumed.
297 : // skip_bom left them in the putback_buf.
298 132 : ist->putback_win.start = ist->putback_buf;
299 132 : ist->putback_win.count = nread;
300 :
301 132 : return 0;
302 : }
303 :
304 :
305 : stream *
306 173 : create_text_stream(stream *inner)
307 : {
308 173 : inner_state_t *inner_state = calloc(1, sizeof(inner_state_t));
309 173 : if (inner_state == NULL) {
310 0 : mnstr_set_open_error(inner->name, errno, NULL);
311 0 : return NULL;
312 : }
313 :
314 173 : pump_state *state = calloc(1, sizeof(pump_state));
315 173 : if (inner_state == NULL || state == NULL) {
316 0 : free(inner_state);
317 0 : mnstr_set_open_error(inner->name, errno, NULL);
318 0 : return NULL;
319 : }
320 :
321 173 : state->inner_state = inner_state;
322 173 : state->get_src_win = get_src_win;
323 173 : state->set_src_win = set_src_win;
324 173 : state->get_dst_win = get_dst_win;
325 173 : state->set_dst_win = set_dst_win;
326 173 : state->get_buffer = get_buffer;
327 173 : state->finalizer = text_end;
328 173 : state->get_error = get_error;
329 :
330 173 : inner_state->outer_state = state;
331 173 : inner_state->putback_win.start = inner_state->putback_buf;
332 173 : inner_state->putback_win.count = 0;
333 173 : if (inner->readonly) {
334 133 : inner_state->src_win.start = inner_state->buffer;
335 133 : inner_state->src_win.count = 0;
336 133 : state->worker = text_pump_in_with_putback;
337 : } else {
338 40 : inner_state->dst_win.start = inner_state->buffer;
339 40 : inner_state->dst_win.count = BUFFER_SIZE;
340 : #ifdef _MSC_VER
341 : state->worker = text_pump_out_crlf;
342 : (void) text_pump_out;
343 : #else
344 40 : state->worker = text_pump_out;
345 173 : (void) text_pump_out_crlf;
346 : #endif
347 : }
348 :
349 173 : stream *s = pump_stream(inner, state);
350 173 : if (s == NULL) {
351 0 : free(inner_state);
352 0 : free(state);
353 0 : return NULL;
354 : }
355 :
356 173 : s->binary = false;
357 :
358 173 : if (s->readonly)
359 133 : if (skip_bom(s) < 0) {
360 0 : free(inner_state);
361 0 : free(state);
362 0 : char *err = mnstr_error(s);
363 0 : mnstr_set_open_error(inner->name, 0, "while looking for a byte order mark: %s", err);
364 0 : free(err);
365 0 : destroy_stream(s);
366 0 : return NULL;
367 : }
368 :
369 : return s;
370 : }
|