Line data Source code
1 : /* 2 : * SPDX-License-Identifier: MPL-2.0 3 : * 4 : * This Source Code Form is subject to the terms of the Mozilla Public 5 : * License, v. 2.0. If a copy of the MPL was not distributed with this 6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. 7 : * 8 : * Copyright 2024 MonetDB Foundation; 9 : * Copyright August 2008 - 2023 MonetDB B.V.; 10 : * Copyright 1997 - July 2008 CWI. 11 : */ 12 : 13 : #include "monetdb_config.h" 14 : #include "stream.h" 15 : #include "stream_internal.h" 16 : 17 : 18 : 19 : /* fixed-width format streams */ 20 : #define STREAM_FWF_NAME "fwf_ftw" 21 : 22 : typedef struct { 23 : stream *s; 24 : bool eof; 25 : /* config */ 26 : size_t num_fields; 27 : size_t *widths; 28 : char filler; 29 : /* state */ 30 : size_t line_len; 31 : char *in_buf; 32 : char *out_buf; 33 : size_t out_buf_start; 34 : size_t out_buf_remaining; 35 : } stream_fwf_data; 36 : 37 : 38 : static ssize_t 39 2 : stream_fwf_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt) 40 : { 41 2 : stream_fwf_data *fsd; 42 2 : size_t to_write = cnt; 43 2 : size_t buf_written = 0; 44 2 : char nl_buf; 45 : 46 2 : fsd = (stream_fwf_data *) s->stream_data.p; 47 2 : if (fsd == NULL || elmsize != 1) { 48 : return -1; 49 : } 50 2 : if (fsd->eof) 51 : return 0; 52 : 53 33 : while (to_write > 0) { 54 : /* input conversion */ 55 33 : if (fsd->out_buf_remaining == 0) { /* need to convert next line */ 56 33 : size_t field_idx, in_buf_pos = 0, out_buf_pos = 0; 57 33 : ssize_t actually_read = fsd->s->read(fsd->s, fsd->in_buf, 1, fsd->line_len); 58 33 : if (actually_read < (ssize_t) fsd->line_len) { /* incomplete last line */ 59 1 : if (actually_read < 0) { 60 : return actually_read; /* this is an error */ 61 : } 62 1 : fsd->eof |= fsd->s->eof; 63 1 : return (ssize_t) buf_written; /* skip last line */ 64 : } 65 : /* consume to next newline */ 66 32 : while (fsd->s->read(fsd->s, &nl_buf, 1, 1) == 1 && 67 32 : nl_buf != '\n') 68 : ; 69 32 : fsd->eof |= fsd->s->eof; 70 : 71 384 : for (field_idx = 0; field_idx < fsd->num_fields; field_idx++) { 72 352 : char *val_start, *val_end; 73 352 : val_start = fsd->in_buf + in_buf_pos; 74 352 : in_buf_pos += fsd->widths[field_idx]; 75 352 : val_end = fsd->in_buf + in_buf_pos - 1; 76 686 : while (*val_start == fsd->filler) 77 334 : val_start++; 78 352 : while (*val_end == fsd->filler) 79 0 : val_end--; 80 1330 : while (val_start <= val_end) { 81 978 : if (*val_start == STREAM_FWF_FIELD_SEP) { 82 0 : fsd->out_buf[out_buf_pos++] = STREAM_FWF_ESCAPE; 83 : } 84 978 : fsd->out_buf[out_buf_pos++] = *val_start++; 85 : } 86 352 : fsd->out_buf[out_buf_pos++] = STREAM_FWF_FIELD_SEP; 87 : } 88 32 : fsd->out_buf[out_buf_pos++] = STREAM_FWF_RECORD_SEP; 89 32 : fsd->out_buf_remaining = out_buf_pos; 90 32 : fsd->out_buf_start = 0; 91 : } 92 : /* now we know something is in output_buf so deliver it */ 93 32 : if (fsd->out_buf_remaining <= to_write) { 94 32 : memcpy((char *) buf + buf_written, fsd->out_buf + fsd->out_buf_start, fsd->out_buf_remaining); 95 32 : to_write -= fsd->out_buf_remaining; 96 32 : buf_written += fsd->out_buf_remaining; 97 32 : fsd->out_buf_remaining = 0; 98 : } else { 99 0 : memcpy((char *) buf + buf_written, fsd->out_buf + fsd->out_buf_start, to_write); 100 0 : fsd->out_buf_start += to_write; 101 0 : fsd->out_buf_remaining -= to_write; 102 0 : buf_written += to_write; 103 0 : to_write = 0; 104 : } 105 : } 106 0 : return (ssize_t) buf_written; 107 : } 108 : 109 : 110 : static void 111 2 : stream_fwf_close(stream *s) 112 : { 113 2 : stream_fwf_data *fsd = (stream_fwf_data *) s->stream_data.p; 114 : 115 2 : if (fsd != NULL) { 116 1 : stream_fwf_data *fsd = (stream_fwf_data *) s->stream_data.p; 117 1 : close_stream(fsd->s); 118 1 : free(fsd->widths); 119 1 : free(fsd->in_buf); 120 1 : free(fsd->out_buf); 121 1 : free(fsd); 122 1 : s->stream_data.p = NULL; 123 : } 124 2 : } 125 : 126 : static void 127 1 : stream_fwf_destroy(stream *s) 128 : { 129 1 : stream_fwf_close(s); 130 1 : destroy_stream(s); 131 1 : } 132 : 133 : stream * 134 1 : stream_fwf_create(stream *restrict s, size_t num_fields, size_t *restrict widths, char filler) 135 : { 136 1 : stream *ns; 137 1 : stream_fwf_data *fsd = malloc(sizeof(stream_fwf_data)); 138 : 139 1 : if (fsd == NULL) { 140 0 : mnstr_set_open_error(STREAM_FWF_NAME, errno, NULL); 141 0 : return NULL; 142 : } 143 1 : *fsd = (stream_fwf_data) { 144 : .s = s, 145 : .num_fields = num_fields, 146 : .widths = widths, 147 : .filler = filler, 148 : .line_len = 0, 149 : .eof = false, 150 : }; 151 12 : for (size_t i = 0; i < num_fields; i++) { 152 11 : fsd->line_len += widths[i]; 153 : } 154 1 : fsd->in_buf = malloc(fsd->line_len); 155 1 : if (fsd->in_buf == NULL) { 156 0 : free(fsd); 157 0 : mnstr_set_open_error(STREAM_FWF_NAME, errno, NULL); 158 0 : return NULL; 159 : } 160 1 : fsd->out_buf = malloc(fsd->line_len * 3); 161 1 : if (fsd->out_buf == NULL) { 162 0 : free(fsd->in_buf); 163 0 : free(fsd); 164 0 : mnstr_set_open_error(STREAM_FWF_NAME, errno, NULL); 165 0 : return NULL; 166 : } 167 1 : if ((ns = create_stream(STREAM_FWF_NAME)) == NULL) { 168 0 : free(fsd->in_buf); 169 0 : free(fsd->out_buf); 170 0 : free(fsd); 171 0 : return NULL; 172 : } 173 1 : ns->read = stream_fwf_read; 174 1 : ns->close = stream_fwf_close; 175 1 : ns->destroy = stream_fwf_destroy; 176 1 : ns->write = NULL; 177 1 : ns->flush = NULL; 178 1 : ns->readonly = true; 179 1 : ns->stream_data.p = fsd; 180 1 : return ns; 181 : }