Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "stream.h"
15 : #include "stream_internal.h"
16 : #include "mapi_prompt.h"
17 :
18 : static ssize_t
19 1800622 : byte_counting_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
20 : {
21 1800622 : uint64_t *counter = (uint64_t*) s->stream_data.p;
22 1800622 : ssize_t nwritten = s->inner->write(s->inner, buf, elmsize, cnt);
23 1800622 : if (nwritten >= 0) {
24 1800622 : *counter += elmsize * nwritten;
25 : }
26 1800622 : return nwritten;
27 : }
28 :
29 :
30 : stream *
31 600 : byte_counting_stream(stream *wrapped, uint64_t *counter)
32 : {
33 600 : stream *s = create_wrapper_stream(NULL, wrapped);
34 600 : if (!s)
35 : return NULL;
36 600 : s->stream_data.p = counter;
37 600 : s->write = &byte_counting_write;
38 600 : s->destroy = &destroy_stream;
39 600 : return s;
40 : }
41 :
42 :
43 :
44 : static void
45 211 : discard(stream *s)
46 : {
47 286 : static char bitbucket[8192];
48 286 : while (1) {
49 286 : ssize_t nread = mnstr_read(s, bitbucket, 1, sizeof(bitbucket));
50 286 : if (nread <= 0)
51 211 : return;
52 : assert(1);
53 : }
54 : }
55 :
56 : struct mapi_filetransfer {
57 : stream *from_client; // set to NULL after sending MAPI_PROMPT3
58 : stream *to_client; // set to NULL when client sends empty
59 : };
60 :
61 : static ssize_t
62 5060260 : upload_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
63 : {
64 5060260 : struct mapi_filetransfer *state = s->stream_data.p;
65 :
66 5060260 : if (state->from_client == NULL) {
67 0 : assert(s->eof);
68 : return 0;
69 : }
70 :
71 5060260 : ssize_t nread = mnstr_read(state->from_client, buf, elmsize, cnt);
72 5060260 : if (nread != 0 || state->from_client->eof)
73 : return nread;
74 :
75 : // before returning the 0 we send the prompt and make another attempt.
76 1735 : if (
77 1735 : mnstr_write(state->to_client, PROMPT2, strlen(PROMPT2), 1) != 1
78 1735 : || mnstr_flush(state->to_client, MNSTR_FLUSH_ALL) < 0
79 : ) {
80 0 : mnstr_set_error(s, mnstr_errnr(state->to_client), "%s", mnstr_peek_error(state->to_client));
81 0 : return -1;
82 : }
83 :
84 : // if it succeeds, return that to the client.
85 : // if it's still a block boundary, return that to the client.
86 : // if there's an error, return that to the client.
87 1735 : nread = mnstr_read(state->from_client, buf, elmsize, cnt);
88 1735 : if (nread > 0)
89 : return nread;
90 220 : if (nread == 0) {
91 220 : s->eof = true;
92 220 : state->from_client = NULL;
93 220 : return nread;
94 : } else {
95 0 : mnstr_set_error(s, mnstr_errnr(state->from_client), "%s", mnstr_peek_error(state->from_client));
96 0 : return -1;
97 : }
98 : }
99 :
100 : static void
101 354 : upload_close(stream *s)
102 : {
103 354 : struct mapi_filetransfer *state = s->stream_data.p;
104 :
105 354 : stream *from = state->from_client;
106 354 : if (from)
107 134 : discard(from);
108 :
109 354 : stream *to = state->to_client;
110 354 : mnstr_write(to, PROMPT3, strlen(PROMPT3), 1);
111 354 : mnstr_flush(to, MNSTR_FLUSH_DATA);
112 354 : }
113 :
114 : static ssize_t
115 15000113 : download_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
116 : {
117 15000113 : struct mapi_filetransfer *state = s->stream_data.p;
118 15000113 : stream *to = state->to_client;
119 15000113 : return to->write(to, buf, elmsize, cnt);
120 : }
121 :
122 : static void
123 75 : download_close(stream *s)
124 : {
125 75 : struct mapi_filetransfer *state = s->stream_data.p;
126 :
127 75 : stream *to = state->to_client;
128 75 : stream *from = state->from_client;
129 75 : if (to)
130 75 : mnstr_flush(to, MNSTR_FLUSH_DATA);
131 75 : if (from)
132 75 : discard(from);
133 75 : }
134 :
135 : static void
136 429 : destroy(stream *s)
137 : {
138 429 : struct mapi_filetransfer *state = s->stream_data.p;
139 429 : free(state);
140 429 : destroy_stream(s);
141 429 : }
142 :
143 :
144 : static stream*
145 431 : setup_transfer(const char *req, const char *filename, bstream *bs, stream *ws)
146 : {
147 431 : const char *msg = NULL;
148 431 : stream *s = NULL;
149 431 : struct mapi_filetransfer *state = NULL;
150 431 : ssize_t nwritten;
151 431 : ssize_t nread;
152 431 : bool ok;
153 431 : int oob = 0;
154 :
155 498 : while (!bs->eof) {
156 67 : if (bstream_next(bs) < 0) {
157 0 : msg = mnstr_peek_error(ws);
158 0 : goto end;
159 : }
160 : }
161 431 : stream *rs = bs->s;
162 431 : assert(isa_block_stream(ws));
163 431 : assert(isa_block_stream(rs));
164 :
165 431 : nwritten = mnstr_printf(ws, PROMPT3 "%s %s\n", req, filename);
166 431 : if (nwritten <= 0) {
167 0 : msg = mnstr_peek_error(ws);
168 0 : goto end;
169 : }
170 431 : if (mnstr_flush(ws, MNSTR_FLUSH_ALL) < 0) {
171 0 : msg = mnstr_peek_error(ws);
172 0 : goto end;
173 : }
174 :
175 431 : char buf[256];
176 431 : nread = mnstr_readline(rs, buf, sizeof(buf));
177 431 : ok = ((nread == 0 || (nread == 1 && buf[0] == '\n')) && !(oob = mnstr_getoob(rs)));
178 431 : if (!ok) {
179 2 : switch (oob) {
180 : case 1: /* client side interrupt */
181 : msg = "Query aborted";
182 : break;
183 0 : case 2:
184 0 : msg = "Read error on client";
185 0 : break;
186 2 : default:
187 2 : msg = nread > 0 ? buf : "Unknown error";
188 : break;
189 : }
190 2 : discard(rs);
191 2 : goto end;
192 : }
193 :
194 : // Client accepted the request
195 429 : state = malloc(sizeof(*state));
196 429 : if (!state) {
197 0 : msg = "malloc failed";
198 0 : goto end;
199 : }
200 429 : s = create_stream("ONCLIENT");
201 429 : if (!s) {
202 0 : free(state); /* no chance to free through destroy function */
203 0 : msg = mnstr_peek_error(NULL);
204 0 : goto end;
205 : }
206 429 : state->from_client = rs;
207 429 : state->to_client = ws;
208 429 : s->stream_data.p = state;
209 2 : end:
210 431 : if (msg) {
211 2 : mnstr_destroy(s);
212 2 : mnstr_set_open_error(filename, 0, "ON CLIENT: %s", msg);
213 2 : return NULL;
214 : } else {
215 429 : return s;
216 : }
217 : }
218 :
219 : stream*
220 356 : mapi_request_upload(const char *filename, bool binary, bstream *bs, stream *ws)
221 : {
222 356 : const char *req = binary ? "rb" : "r 0";
223 356 : stream *s = setup_transfer(req, filename, bs, ws);
224 356 : if (s == NULL)
225 : return NULL;
226 :
227 354 : s->binary = binary;
228 354 : s->read = upload_read;
229 354 : s->close = upload_close;
230 354 : s->destroy = destroy;
231 :
232 354 : return s;
233 : }
234 :
235 : stream*
236 75 : mapi_request_download(const char *filename, bool binary, bstream *bs, stream *ws)
237 : {
238 75 : const char *req = binary ? "wb" : "w";
239 75 : stream *s = setup_transfer(req, filename, bs, ws);
240 75 : if (s == NULL)
241 : return NULL;
242 :
243 75 : s->binary = binary;
244 75 : s->readonly = false;
245 75 : s->write = download_write;
246 75 : s->close = download_close;
247 75 : s->destroy = destroy;
248 :
249 75 : return s;
250 : }
|