Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "stream.h"
15 : #include "stream_internal.h"
16 : #include "pump.h"
17 :
18 : /* When reading, text streams convert \r\n to \n regardless of operating system,
19 : * and they drop the leading UTF-8 BOM marker if found.
20 : * When writing on Windows, \n is translated back to \r\n.
21 : *
22 : * Currently, skipping the BOM happens when opening, not on the first read action.
23 : */
24 :
25 : #define UTF8BOM "\xEF\xBB\xBF" /* UTF-8 encoding of Unicode BOM */
26 : #define UTF8BOMLENGTH 3 /* length of above */
27 :
28 : #define BUFFER_SIZE (65536)
29 : struct inner_state {
30 : pump_buffer src_win;
31 : pump_buffer dst_win;
32 : pump_buffer putback_win;
33 : char putback_buf[UTF8BOMLENGTH];
34 : bool crlf_pending;
35 : char buffer[BUFFER_SIZE];
36 : };
37 :
38 :
39 : static pump_buffer
40 4986750 : get_src_win(inner_state_t *inner_state)
41 : {
42 4986750 : return inner_state->src_win;
43 : }
44 :
45 :
46 : static void
47 1251517 : set_src_win(inner_state_t *inner_state, pump_buffer buf)
48 : {
49 1251517 : inner_state->src_win = buf;
50 1251517 : }
51 :
52 :
53 : static pump_buffer
54 2642400 : get_dst_win(inner_state_t *inner_state)
55 : {
56 2642400 : return inner_state->dst_win;
57 : }
58 :
59 :
60 : static void
61 44139 : set_dst_win(inner_state_t *inner_state, pump_buffer buf)
62 : {
63 44139 : inner_state->dst_win = buf;
64 44139 : }
65 :
66 :
67 : static pump_buffer
68 1260227 : get_buffer(inner_state_t *inner_state)
69 : {
70 1260227 : return (pump_buffer) { .start = inner_state->buffer, .count = BUFFER_SIZE };
71 : }
72 :
73 : inline static void
74 2269006905 : put_byte(inner_state_t *ist, char byte)
75 : {
76 2269006905 : *ist->dst_win.start++ = byte;
77 2269006905 : assert(ist->dst_win.count > 0);
78 2269006905 : ist->dst_win.count--;
79 2269006905 : }
80 :
81 : inline static char
82 2269037770 : take_byte(inner_state_t *ist)
83 : {
84 2269037770 : ist->src_win.count--;
85 2269037770 : return *ist->src_win.start++;
86 : }
87 :
88 : static pump_result
89 78984 : text_pump_in(inner_state_t *ist, pump_action action)
90 : {
91 78984 : bool crlf_pending = ist->crlf_pending;
92 :
93 2269116754 : while (ist->src_win.count > 0 && ist->dst_win.count > 0) {
94 2269037770 : char c = take_byte(ist);
95 2269037770 : switch (c) {
96 31465 : case '\r':
97 31465 : if (crlf_pending) {
98 : // put the previous one, which is clearly not followed by an \n
99 435 : put_byte(ist, '\r');
100 : }
101 31465 : crlf_pending = true;
102 31465 : continue;
103 140349388 : case '\n':
104 140349388 : put_byte(ist, c);
105 140349388 : crlf_pending = false;
106 140349388 : continue;
107 2128656917 : default:
108 2128656917 : if (crlf_pending) {
109 165 : put_byte(ist, '\r');
110 165 : crlf_pending = false;
111 : // if dst_win.count was 1, there is no room for another put_byte().
112 165 : if (ist->dst_win.count > 0) {
113 165 : put_byte(ist, c);
114 : } else {
115 : // no room anymore for char c, put it back!
116 0 : ist->src_win.start--;
117 0 : ist->src_win.count++;
118 : }
119 : } else {
120 2128656752 : put_byte(ist, c);
121 : }
122 2128656917 : continue;
123 : }
124 : }
125 :
126 78984 : ist->crlf_pending = crlf_pending;
127 :
128 78984 : if (action == PUMP_FINISH) {
129 431 : if (ist->src_win.count > 0)
130 : // More work to do
131 : return PUMP_OK;
132 431 : if (!ist->crlf_pending)
133 : // Completely done
134 : return PUMP_END;
135 0 : if (ist->dst_win.count > 0) {
136 0 : put_byte(ist, '\r');
137 0 : ist->crlf_pending = false; // not strictly necessary
138 : // Now we're completely done
139 0 : return PUMP_END;
140 : } else
141 : // Come back another time to flush the pending CR
142 : return PUMP_OK;
143 : } else
144 : // There is no error and we are not finishing so clearly we
145 : // must return PUMP_OK
146 : return PUMP_OK;
147 : }
148 :
149 :
150 : static pump_result
151 78683 : text_pump_in_with_putback(inner_state_t *ist, pump_action action)
152 : {
153 78683 : if (ist->putback_win.count > 0) {
154 301 : pump_buffer tmp = ist->src_win;
155 301 : ist->src_win = ist->putback_win;
156 301 : pump_result ret = text_pump_in(ist, PUMP_NO_FLUSH);
157 301 : ist->putback_win = ist->src_win;
158 301 : ist->src_win = tmp;
159 301 : if (ret == PUMP_ERROR)
160 78683 : return PUMP_ERROR;
161 : }
162 78683 : return text_pump_in(ist, action);
163 : }
164 :
165 :
166 : static pump_result
167 1216296 : text_pump_out(inner_state_t *ist, pump_action action)
168 : {
169 1216296 : size_t src_count = ist->src_win.count;
170 1216296 : size_t dst_count = ist->dst_win.count;
171 1216296 : size_t ncopy = src_count < dst_count ? src_count : dst_count;
172 :
173 1216296 : if (ncopy > 0)
174 1216270 : memcpy(ist->dst_win.start, ist->src_win.start, ncopy);
175 1216296 : ist->dst_win.start += ncopy;
176 1216296 : ist->dst_win.count -= ncopy;
177 1216296 : ist->src_win.start += ncopy;
178 1216296 : ist->src_win.count -= ncopy;
179 :
180 1216296 : if (ist->src_win.count > 0)
181 : // definitely not done
182 : return PUMP_OK;
183 1216044 : if (action == PUMP_NO_FLUSH)
184 : // never return PUMP_END
185 : return PUMP_OK;
186 26 : if (ist->crlf_pending)
187 : // src win empty but cr still pending so not done
188 0 : return PUMP_OK;
189 : // src win empty and no cr pending and flush or finish requested
190 : return PUMP_END;
191 : }
192 :
193 :
194 : static pump_result
195 : text_pump_out_crlf(inner_state_t *ist, pump_action action)
196 : {
197 : if (ist->crlf_pending && ist->dst_win.count > 0) {
198 : put_byte(ist, '\n');
199 : ist->crlf_pending = false;
200 : }
201 :
202 : while (ist->src_win.count > 0 && ist->dst_win.count > 0) {
203 : char c = take_byte(ist);
204 : if (c != '\n') {
205 : put_byte(ist, c);
206 : continue;
207 : }
208 : put_byte(ist, '\r');
209 : if (ist->dst_win.count > 0)
210 : put_byte(ist, '\n');
211 : else {
212 : ist->crlf_pending = true;
213 : break;
214 : }
215 : }
216 :
217 : if (ist->src_win.count > 0)
218 : // definitely not done
219 : return PUMP_OK;
220 : if (action == PUMP_NO_FLUSH)
221 : // never return PUMP_END
222 : return PUMP_OK;
223 : if (ist->crlf_pending)
224 : // src win empty but cr still pending so not done
225 : return PUMP_OK;
226 : // src win empty and no cr pending and flush or finish requested
227 : return PUMP_END;
228 : }
229 :
230 :
231 : static void
232 339 : text_end(inner_state_t *s)
233 : {
234 339 : free(s);
235 339 : }
236 :
237 :
238 : static const char*
239 0 : get_error(inner_state_t *s)
240 : {
241 0 : (void)s;
242 0 : return "line ending conversion failure";
243 : }
244 :
245 : static ssize_t
246 315 : skip_bom(stream *s)
247 : {
248 315 : pump_state *state = (pump_state*) s->stream_data.p;
249 315 : stream *inner = s->inner;
250 315 : inner_state_t *ist = state->inner_state;
251 :
252 315 : ssize_t nread = mnstr_read(inner, ist->putback_buf, 1, UTF8BOMLENGTH);
253 315 : if (nread < 0) {
254 0 : mnstr_copy_error(s, inner);
255 0 : return nread;
256 : }
257 :
258 315 : if (nread == UTF8BOMLENGTH && memcmp(ist->putback_buf, UTF8BOM, nread) == 0) {
259 : // Bingo! Skip it!
260 1 : s->isutf8 = true;
261 1 : return 3;
262 : }
263 :
264 : // We have consumed some bytes that have to be unconsumed.
265 : // skip_bom left them in the putback_buf.
266 314 : ist->putback_win.start = ist->putback_buf;
267 314 : ist->putback_win.count = nread;
268 :
269 314 : return 0;
270 : }
271 :
272 :
273 : stream *
274 339 : create_text_stream(stream *inner)
275 : {
276 339 : inner_state_t *inner_state = calloc(1, sizeof(inner_state_t));
277 339 : if (inner_state == NULL) {
278 0 : mnstr_set_open_error(inner->name, errno, NULL);
279 0 : return NULL;
280 : }
281 :
282 339 : pump_state *state = calloc(1, sizeof(pump_state));
283 339 : if (inner_state == NULL || state == NULL) {
284 0 : free(inner_state);
285 0 : mnstr_set_open_error(inner->name, errno, NULL);
286 0 : return NULL;
287 : }
288 :
289 339 : state->inner_state = inner_state;
290 339 : state->get_src_win = get_src_win;
291 339 : state->set_src_win = set_src_win;
292 339 : state->get_dst_win = get_dst_win;
293 339 : state->set_dst_win = set_dst_win;
294 339 : state->get_buffer = get_buffer;
295 339 : state->finalizer = text_end;
296 339 : state->get_error = get_error;
297 :
298 339 : inner_state->putback_win.start = inner_state->putback_buf;
299 339 : inner_state->putback_win.count = 0;
300 339 : if (inner->readonly) {
301 315 : inner_state->src_win.start = inner_state->buffer;
302 315 : inner_state->src_win.count = 0;
303 315 : state->worker = text_pump_in_with_putback;
304 : } else {
305 24 : inner_state->dst_win.start = inner_state->buffer;
306 24 : inner_state->dst_win.count = BUFFER_SIZE;
307 : #ifdef _MSC_VER
308 : state->worker = text_pump_out_crlf;
309 : (void) text_pump_out;
310 : #else
311 24 : state->worker = text_pump_out;
312 339 : (void) text_pump_out_crlf;
313 : #endif
314 : }
315 :
316 339 : stream *s = pump_stream(inner, state);
317 339 : if (s == NULL) {
318 0 : free(inner_state);
319 0 : free(state);
320 0 : return NULL;
321 : }
322 :
323 339 : s->binary = false;
324 :
325 339 : if (s->readonly)
326 315 : if (skip_bom(s) < 0) {
327 0 : free(inner_state);
328 0 : free(state);
329 0 : char *err = mnstr_error(s);
330 0 : mnstr_set_open_error(inner->name, 0, "while looking for a byte order mark: %s", err);
331 0 : free(err);
332 0 : destroy_stream(s);
333 0 : return NULL;
334 : }
335 :
336 : return s;
337 : }
|