LCOV - code coverage report
Current view: top level - common/stream - text_stream.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 116 151 76.8 %
Date: 2024-11-11 20:03:36 Functions: 12 13 92.3 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "stream.h"
      15             : #include "stream_internal.h"
      16             : #include "pump.h"
      17             : 
      18             : /* When reading, text streams convert \r\n to \n regardless of operating system,
      19             :  * and they drop the leading UTF-8 BOM marker if found.
      20             :  * When writing on Windows, \n is translated back to \r\n.
      21             :  *
      22             :  * Currently, skipping the BOM happens when opening, not on the first read action.
      23             :  */
      24             : 
      25             : #define UTF8BOM         "\xEF\xBB\xBF"        /* UTF-8 encoding of Unicode BOM */
      26             : #define UTF8BOMLENGTH   3       /* length of above */
      27             : 
      28             : #define BUFFER_SIZE (65536)
      29             : struct inner_state {
      30             :         pump_buffer src_win;
      31             :         pump_buffer dst_win;
      32             :         pump_buffer putback_win;
      33             :         pump_state *outer_state;
      34             :         char putback_buf[UTF8BOMLENGTH];
      35             :         bool crlf_pending;
      36             :         char buffer[BUFFER_SIZE];
      37             : };
      38             : 
      39             : 
      40             : static pump_buffer
      41     4991258 : get_src_win(inner_state_t *inner_state)
      42             : {
      43     4991258 :         return inner_state->src_win;
      44             : }
      45             : 
      46             : 
      47             : static void
      48     1251323 : set_src_win(inner_state_t *inner_state, pump_buffer buf)
      49             : {
      50     1251323 :         inner_state->src_win = buf;
      51     1251323 : }
      52             : 
      53             : 
      54             : static pump_buffer
      55     2647625 : get_dst_win(inner_state_t *inner_state)
      56             : {
      57     2647625 :         return inner_state->dst_win;
      58             : }
      59             : 
      60             : 
      61             : static void
      62       45232 : set_dst_win(inner_state_t *inner_state, pump_buffer buf)
      63             : {
      64       45232 :         inner_state->dst_win = buf;
      65       45232 : }
      66             : 
      67             : 
      68             : static pump_buffer
      69     1261748 : get_buffer(inner_state_t *inner_state)
      70             : {
      71     1261748 :         return (pump_buffer) { .start = inner_state->buffer, .count = BUFFER_SIZE };
      72             : }
      73             : 
      74             : inline static void
      75           3 : put_byte(inner_state_t *ist, char byte)
      76             : {
      77           3 :         *ist->dst_win.start++ = byte;
      78           3 :         assert(ist->dst_win.count > 0);
      79           3 :         ist->dst_win.count--;
      80           3 : }
      81             : 
      82             : inline static char
      83           3 : take_byte(inner_state_t *ist)
      84             : {
      85           3 :         ist->src_win.count--;
      86           3 :         return *ist->src_win.start++;
      87             : }
      88             : 
      89             : static pump_result
      90       79281 : text_pump_in(inner_state_t *ist, pump_action action)
      91             : {
      92       79281 :         assert(ist->dst_win.count > 0);
      93       79281 :         assert(ist->src_win.count > 0 || action == PUMP_FINISH);
      94             : 
      95       79281 :         if (ist->crlf_pending) {
      96           0 :                 if (ist->src_win.count > 0) {
      97           0 :                         if (ist->src_win.start[0] != '\n') {
      98             :                                 // CR not followed by a LF, emit it
      99           0 :                                 put_byte(ist, '\r');
     100             :                         }
     101             :                 } else {
     102           0 :                         assert(action == PUMP_FINISH);
     103             :                         // CR followed by end of file, not LF, so emit it
     104           0 :                         put_byte(ist, '\r');
     105             :                 }
     106             :                 // in any case, the CR is no longer pending
     107           0 :                 ist->crlf_pending = false;
     108             :         }
     109             : 
     110      158379 :         while (1) {
     111      158379 :                 size_t span = ist->src_win.count < ist->dst_win.count
     112             :                                         ? ist->src_win.count
     113             :                                         : ist->dst_win.count;
     114      158379 :                 if (span == 0)
     115             :                         break;
     116             : 
     117       79098 :                 if (ist->src_win.start[0] == '\r') {
     118             :                         // Looking at a CR. We'll handle just that, then make another round of the while loop
     119           3 :                         if (ist->src_win.count == 1) {
     120             :                                 // Don't know what will follow, move it to the flag.
     121             :                                 // Then stop, as all available input has been consumed
     122           0 :                                 take_byte(ist);
     123           0 :                                 ist->crlf_pending = true;
     124           0 :                                 break;
     125             :                         }
     126           3 :                         assert(ist->src_win.count > 1); // We can safely look ahead
     127           3 :                         if (ist->src_win.start[1] == '\n') {
     128             :                                 // Drop the CR, move the LF
     129           3 :                                 take_byte(ist);
     130           3 :                                 put_byte(ist, take_byte(ist));
     131             :                         } else {
     132             :                                 // Move the CR
     133           0 :                                 put_byte(ist, take_byte(ist));
     134             :                         }
     135             :                         // progress has been made, consider the situation anew
     136           3 :                         continue;
     137       79095 :                 } else {
     138             :                         // The remaining input data does not start with a CR.
     139             :                         // Move all non-CR data to the output buffer
     140       79095 :                         char *cr = memchr(ist->src_win.start, '\r', span);
     141       79095 :                         if (cr != NULL) {
     142           3 :                                 span = cr - ist->src_win.start;
     143             :                         }
     144       79095 :                         assert(span > 0);
     145       79095 :                         memcpy(ist->dst_win.start, ist->src_win.start, span);
     146       79095 :                         ist->src_win.start += span;
     147       79095 :                         ist->src_win.count -= span;
     148       79095 :                         ist->dst_win.start += span;
     149       79095 :                         ist->dst_win.count -= span;
     150       79095 :                         continue;
     151             :                 }
     152             :                 // Unreachable, all branches above explicitly break or continue
     153             :                 assert(0 && "UNREACHABLE");
     154             :         }
     155             : 
     156       79281 :         if (action == PUMP_FINISH) {
     157         188 :                 if (ist->src_win.count > 0)
     158             :                         // More work to do
     159             :                         return PUMP_OK;
     160         188 :                 if (!ist->crlf_pending)
     161             :                         // Completely done
     162             :                         return PUMP_END;
     163           0 :                 if (ist->dst_win.count > 0) {
     164           0 :                         put_byte(ist, '\r');
     165           0 :                         ist->crlf_pending = false; // not strictly necessary
     166             :                         // Now we're completely done
     167           0 :                         return PUMP_END;
     168             :                 } else
     169             :                         // Come back another time to flush the pending CR
     170             :                         return PUMP_OK;
     171             :         } else
     172             :                 // There is no error and we are not finishing so clearly we
     173             :                 // must return PUMP_OK
     174             :                 return PUMP_OK;
     175             : }
     176             : 
     177             : 
     178             : static pump_result
     179         272 : text_pump_in_with_putback(inner_state_t *ist, pump_action action)
     180             : {
     181         272 :         if (ist->putback_win.count == 0) {
     182             :                 // no need for this function anymore
     183         133 :                 assert(ist->outer_state->worker == text_pump_in_with_putback);
     184         133 :                 ist->outer_state->worker = text_pump_in;
     185         133 :                 return text_pump_in(ist, action);
     186             :         }
     187             : 
     188             :         // first empty the putback buffer
     189         139 :         pump_buffer tmp = ist->src_win;
     190         139 :         ist->src_win = ist->putback_win;
     191         139 :         pump_result ret = text_pump_in(ist, PUMP_NO_FLUSH);
     192         139 :         ist->putback_win = ist->src_win;
     193         139 :         ist->src_win = tmp;
     194         139 :         return ret;
     195             : }
     196             : 
     197             : 
     198             : static pump_result
     199     1216989 : text_pump_out(inner_state_t *ist, pump_action action)
     200             : {
     201     1216989 :         size_t src_count = ist->src_win.count;
     202     1216989 :         size_t dst_count = ist->dst_win.count;
     203     1216989 :         size_t ncopy = src_count < dst_count ? src_count : dst_count;
     204             : 
     205     1216989 :         if (ncopy > 0)
     206     1216864 :                 memcpy(ist->dst_win.start, ist->src_win.start, ncopy);
     207     1216989 :         ist->dst_win.start += ncopy;
     208     1216989 :         ist->dst_win.count -= ncopy;
     209     1216989 :         ist->src_win.start += ncopy;
     210     1216989 :         ist->src_win.count -= ncopy;
     211             : 
     212     1216989 :         if (ist->src_win.count > 0)
     213             :                 // definitely not done
     214             :                 return PUMP_OK;
     215     1216737 :         if (action == PUMP_NO_FLUSH)
     216             :                 // never return PUMP_END
     217             :                 return PUMP_OK;
     218         125 :         if (ist->crlf_pending)
     219             :                 // src win empty but cr still pending so not done
     220           0 :                 return PUMP_OK;
     221             :         // src win empty and no cr pending and flush or finish requested
     222             :         return PUMP_END;
     223             : }
     224             : 
     225             : 
     226             : static pump_result
     227             : text_pump_out_crlf(inner_state_t *ist, pump_action action)
     228             : {
     229             :         if (ist->crlf_pending && ist->dst_win.count > 0) {
     230             :                 put_byte(ist, '\n');
     231             :                 ist->crlf_pending = false;
     232             :         }
     233             : 
     234             :         while (ist->src_win.count > 0 && ist->dst_win.count > 0) {
     235             :                 char c = take_byte(ist);
     236             :                 if (c != '\n') {
     237             :                         put_byte(ist, c);
     238             :                         continue;
     239             :                 }
     240             :                 put_byte(ist, '\r');
     241             :                 if (ist->dst_win.count > 0)
     242             :                         put_byte(ist, '\n');
     243             :                 else {
     244             :                         ist->crlf_pending = true;
     245             :                         break;
     246             :                 }
     247             :         }
     248             : 
     249             :         if (ist->src_win.count > 0)
     250             :                 // definitely not done
     251             :                 return PUMP_OK;
     252             :         if (action == PUMP_NO_FLUSH)
     253             :                 // never return PUMP_END
     254             :                 return PUMP_OK;
     255             :         if (ist->crlf_pending)
     256             :                 // src win empty but cr still pending so not done
     257             :                 return PUMP_OK;
     258             :         // src win empty and no cr pending and flush or finish requested
     259             :         return PUMP_END;
     260             : }
     261             : 
     262             : 
     263             : static void
     264         159 : text_end(inner_state_t *s)
     265             : {
     266         159 :         free(s);
     267         159 : }
     268             : 
     269             : 
     270             : static const char*
     271           0 : get_error(inner_state_t *s)
     272             : {
     273           0 :         (void)s;
     274           0 :         return "line ending conversion failure";
     275             : }
     276             : 
     277             : static ssize_t
     278         133 : skip_bom(stream *s)
     279             : {
     280         133 :         pump_state *state = (pump_state*) s->stream_data.p;
     281         133 :         stream *inner = s->inner;
     282         133 :         inner_state_t *ist = state->inner_state;
     283             : 
     284         133 :         ssize_t nread = mnstr_read(inner, ist->putback_buf, 1, UTF8BOMLENGTH);
     285         133 :         if (nread < 0) {
     286           0 :                 mnstr_copy_error(s, inner);
     287           0 :                 return nread;
     288             :         }
     289             : 
     290         133 :         if (nread == UTF8BOMLENGTH &&  memcmp(ist->putback_buf, UTF8BOM, nread) == 0) {
     291             :                 // Bingo! Skip it!
     292           1 :                 s->isutf8 = true;
     293           1 :                 return 3;
     294             :         }
     295             : 
     296             :         // We have consumed some bytes that have to be unconsumed.
     297             :         // skip_bom left them in the putback_buf.
     298         132 :         ist->putback_win.start = ist->putback_buf;
     299         132 :         ist->putback_win.count = nread;
     300             : 
     301         132 :         return 0;
     302             : }
     303             : 
     304             : 
     305             : stream *
     306         173 : create_text_stream(stream *inner)
     307             : {
     308         173 :         inner_state_t *inner_state = calloc(1, sizeof(inner_state_t));
     309         173 :         if (inner_state == NULL) {
     310           0 :                 mnstr_set_open_error(inner->name, errno, NULL);
     311           0 :                 return NULL;
     312             :         }
     313             : 
     314         173 :         pump_state *state = calloc(1, sizeof(pump_state));
     315         173 :         if (inner_state == NULL || state == NULL) {
     316           0 :                 free(inner_state);
     317           0 :                 mnstr_set_open_error(inner->name, errno, NULL);
     318           0 :                 return NULL;
     319             :         }
     320             : 
     321         173 :         state->inner_state = inner_state;
     322         173 :         state->get_src_win = get_src_win;
     323         173 :         state->set_src_win = set_src_win;
     324         173 :         state->get_dst_win = get_dst_win;
     325         173 :         state->set_dst_win = set_dst_win;
     326         173 :         state->get_buffer = get_buffer;
     327         173 :         state->finalizer = text_end;
     328         173 :         state->get_error = get_error;
     329             : 
     330         173 :         inner_state->outer_state = state;
     331         173 :         inner_state->putback_win.start = inner_state->putback_buf;
     332         173 :         inner_state->putback_win.count = 0;
     333         173 :         if (inner->readonly) {
     334         133 :                 inner_state->src_win.start = inner_state->buffer;
     335         133 :                 inner_state->src_win.count = 0;
     336         133 :                 state->worker = text_pump_in_with_putback;
     337             :         } else {
     338          40 :                 inner_state->dst_win.start = inner_state->buffer;
     339          40 :                 inner_state->dst_win.count = BUFFER_SIZE;
     340             : #ifdef _MSC_VER
     341             :                 state->worker = text_pump_out_crlf;
     342             :                 (void) text_pump_out;
     343             : #else
     344          40 :                 state->worker = text_pump_out;
     345         173 :                 (void) text_pump_out_crlf;
     346             : #endif
     347             :         }
     348             : 
     349         173 :         stream *s = pump_stream(inner, state);
     350         173 :         if (s == NULL) {
     351           0 :                 free(inner_state);
     352           0 :                 free(state);
     353           0 :                 return NULL;
     354             :         }
     355             : 
     356         173 :         s->binary = false;
     357             : 
     358         173 :         if (s->readonly)
     359         133 :                 if (skip_bom(s) < 0) {
     360           0 :                         free(inner_state);
     361           0 :                         free(state);
     362           0 :                         char *err = mnstr_error(s);
     363           0 :                         mnstr_set_open_error(inner->name, 0, "while looking for a byte order mark: %s", err);
     364           0 :                         free(err);
     365           0 :                         destroy_stream(s);
     366           0 :                         return NULL;
     367             :                 }
     368             : 
     369             :         return s;
     370             : }

Generated by: LCOV version 1.14