Line data Source code
1 : /* 2 : * SPDX-License-Identifier: MPL-2.0 3 : * 4 : * This Source Code Form is subject to the terms of the Mozilla Public 5 : * License, v. 2.0. If a copy of the MPL was not distributed with this 6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. 7 : * 8 : * Copyright 2024 MonetDB Foundation; 9 : * Copyright August 2008 - 2023 MonetDB B.V.; 10 : * Copyright 1997 - July 2008 CWI. 11 : */ 12 : 13 : #include "monetdb_config.h" 14 : #include "stream.h" 15 : #include "stream_internal.h" 16 : 17 : 18 : 19 : /* fixed-width format streams */ 20 : #define STREAM_FWF_NAME "fwf_ftw" 21 : 22 : typedef struct { 23 : stream *s; 24 : bool eof; 25 : /* config */ 26 : size_t num_fields; 27 : size_t *widths; 28 : char filler; 29 : /* state */ 30 : size_t line_len; 31 : char *in_buf; 32 : char *out_buf; 33 : size_t out_buf_start; 34 : size_t out_buf_remaining; 35 : } stream_fwf_data; 36 : 37 : 38 : static ssize_t 39 4 : stream_fwf_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt) 40 : { 41 4 : stream_fwf_data *fsd; 42 4 : size_t to_write = cnt; 43 4 : size_t buf_written = 0; 44 4 : char nl_buf; 45 : 46 4 : fsd = (stream_fwf_data *) s->stream_data.p; 47 4 : if (fsd == NULL || elmsize != 1) { 48 : return -1; 49 : } 50 4 : if (fsd->eof) { 51 2 : s->eof = 1; 52 2 : return 0; 53 : } 54 : 55 30034 : while (to_write > 0) { 56 : /* input conversion */ 57 30034 : if (fsd->out_buf_remaining == 0) { /* need to convert next line */ 58 30034 : size_t field_idx, in_buf_pos = 0, out_buf_pos = 0; 59 30034 : ssize_t actually_read = fsd->s->read(fsd->s, fsd->in_buf, 1, fsd->line_len); 60 30034 : if (actually_read < (ssize_t) fsd->line_len) { /* incomplete last line */ 61 3 : if (actually_read < 0) { 62 0 : return actually_read; /* this is an error */ 63 : } 64 3 : if (actually_read == 0) { 65 2 : fsd->eof |= fsd->s->eof; 66 2 : s->eof = fsd->eof; 67 2 : return (ssize_t) buf_written; /* skip last line */ 68 : } 69 : } 70 : /* consume to next newline */ 71 30046 : while (fsd->s->read(fsd->s, &nl_buf, 1, 1) == 1 && 72 30046 : nl_buf != '\n') 73 : ; 74 30032 : fsd->eof |= fsd->s->eof; 75 30032 : s->eof = fsd->eof; 76 : 77 720384 : for (field_idx = 0; field_idx < fsd->num_fields; field_idx++) { 78 690352 : char *val_start, *val_end; 79 690352 : val_start = fsd->in_buf + in_buf_pos; 80 690352 : in_buf_pos += fsd->widths[field_idx]; 81 690352 : val_end = fsd->in_buf + in_buf_pos - 1; 82 1890686 : while (*val_start == fsd->filler) 83 1200334 : val_start++; 84 1230352 : while (*val_end == fsd->filler) 85 540000 : val_end--; 86 1951330 : while (val_start <= val_end) { 87 1260978 : if (*val_start == STREAM_FWF_FIELD_SEP) { 88 0 : fsd->out_buf[out_buf_pos++] = STREAM_FWF_ESCAPE; 89 : } 90 1260978 : fsd->out_buf[out_buf_pos++] = *val_start++; 91 : } 92 690352 : fsd->out_buf[out_buf_pos++] = STREAM_FWF_FIELD_SEP; 93 : } 94 30032 : fsd->out_buf[out_buf_pos++] = STREAM_FWF_RECORD_SEP; 95 30032 : fsd->out_buf_remaining = out_buf_pos; 96 30032 : fsd->out_buf_start = 0; 97 : } 98 : /* now we know something is in output_buf so deliver it */ 99 30032 : if (fsd->out_buf_remaining <= to_write) { 100 30032 : memcpy((char *) buf + buf_written, fsd->out_buf + fsd->out_buf_start, fsd->out_buf_remaining); 101 30032 : to_write -= fsd->out_buf_remaining; 102 30032 : buf_written += fsd->out_buf_remaining; 103 30032 : fsd->out_buf_remaining = 0; 104 : } else { 105 0 : memcpy((char *) buf + buf_written, fsd->out_buf + fsd->out_buf_start, to_write); 106 0 : fsd->out_buf_start += to_write; 107 0 : fsd->out_buf_remaining -= to_write; 108 0 : buf_written += to_write; 109 0 : to_write = 0; 110 : } 111 : } 112 0 : return (ssize_t) buf_written; 113 : } 114 : 115 : 116 : static void 117 4 : stream_fwf_close(stream *s) 118 : { 119 4 : stream_fwf_data *fsd = (stream_fwf_data *) s->stream_data.p; 120 : 121 4 : if (fsd != NULL) { 122 2 : stream_fwf_data *fsd = (stream_fwf_data *) s->stream_data.p; 123 2 : close_stream(fsd->s); 124 2 : free(fsd->widths); 125 2 : free(fsd->in_buf); 126 2 : free(fsd->out_buf); 127 2 : free(fsd); 128 2 : s->stream_data.p = NULL; 129 : } 130 4 : } 131 : 132 : static void 133 2 : stream_fwf_destroy(stream *s) 134 : { 135 2 : stream_fwf_close(s); 136 2 : destroy_stream(s); 137 2 : } 138 : 139 : stream * 140 2 : stream_fwf_create(stream *restrict s, size_t num_fields, size_t *restrict widths, char filler) 141 : { 142 2 : stream *ns; 143 2 : stream_fwf_data *fsd = malloc(sizeof(stream_fwf_data)); 144 : 145 2 : if (fsd == NULL) { 146 0 : mnstr_set_open_error(STREAM_FWF_NAME, errno, NULL); 147 0 : return NULL; 148 : } 149 2 : *fsd = (stream_fwf_data) { 150 : .s = s, 151 : .num_fields = num_fields, 152 : .widths = widths, 153 : .filler = filler, 154 : .line_len = 0, 155 : .eof = false, 156 : }; 157 36 : for (size_t i = 0; i < num_fields; i++) { 158 34 : fsd->line_len += widths[i]; 159 : } 160 2 : fsd->in_buf = malloc(fsd->line_len); 161 2 : if (fsd->in_buf == NULL) { 162 0 : free(fsd); 163 0 : mnstr_set_open_error(STREAM_FWF_NAME, errno, NULL); 164 0 : return NULL; 165 : } 166 2 : fsd->out_buf = malloc(fsd->line_len * 3); 167 2 : if (fsd->out_buf == NULL) { 168 0 : free(fsd->in_buf); 169 0 : free(fsd); 170 0 : mnstr_set_open_error(STREAM_FWF_NAME, errno, NULL); 171 0 : return NULL; 172 : } 173 2 : if ((ns = create_stream(STREAM_FWF_NAME)) == NULL) { 174 0 : free(fsd->in_buf); 175 0 : free(fsd->out_buf); 176 0 : free(fsd); 177 0 : return NULL; 178 : } 179 2 : ns->read = stream_fwf_read; 180 2 : ns->close = stream_fwf_close; 181 2 : ns->destroy = stream_fwf_destroy; 182 2 : ns->write = NULL; 183 2 : ns->flush = NULL; 184 2 : ns->readonly = true; 185 2 : ns->stream_data.p = fsd; 186 2 : return ns; 187 : }