Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "stream.h"
15 : #include "stream_internal.h"
16 : #include "mapi_prompt.h"
17 :
18 : static ssize_t
19 1800170 : byte_counting_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
20 : {
21 1800170 : uint64_t *counter = (uint64_t*) s->stream_data.p;
22 1800170 : ssize_t nwritten = s->inner->write(s->inner, buf, elmsize, cnt);
23 1800170 : if (nwritten >= 0) {
24 1800170 : *counter += elmsize * nwritten;
25 : }
26 1800170 : return nwritten;
27 : }
28 :
29 :
30 : stream *
31 600 : byte_counting_stream(stream *wrapped, uint64_t *counter)
32 : {
33 600 : stream *s = create_wrapper_stream(NULL, wrapped);
34 600 : if (!s)
35 : return NULL;
36 600 : s->stream_data.p = counter;
37 600 : s->write = &byte_counting_write;
38 600 : s->destroy = &destroy_stream;
39 600 : return s;
40 : }
41 :
42 :
43 :
44 : static void
45 213 : discard(stream *s)
46 : {
47 288 : static char bitbucket[8192];
48 288 : while (1) {
49 288 : ssize_t nread = mnstr_read(s, bitbucket, 1, sizeof(bitbucket));
50 288 : if (nread <= 0)
51 213 : return;
52 : assert(1);
53 : }
54 : }
55 :
56 : struct mapi_filetransfer {
57 : stream *from_client; // set to NULL after sending MAPI_PROMPT3
58 : stream *to_client; // set to NULL when client sends empty
59 : };
60 :
61 : static ssize_t
62 5060262 : upload_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
63 : {
64 5060262 : struct mapi_filetransfer *state = s->stream_data.p;
65 :
66 5060262 : if (state->from_client == NULL) {
67 0 : assert(s->eof);
68 : return 0;
69 : }
70 :
71 5060262 : ssize_t nread = mnstr_read(state->from_client, buf, elmsize, cnt);
72 5060262 : if (nread != 0 || state->from_client->eof)
73 : return nread;
74 :
75 : // before returning the 0 we send the prompt and make another attempt.
76 1736 : if (
77 1736 : mnstr_write(state->to_client, PROMPT2, strlen(PROMPT2), 1) != 1
78 1736 : || mnstr_flush(state->to_client, MNSTR_FLUSH_ALL) < 0
79 : ) {
80 0 : mnstr_set_error(s, mnstr_errnr(state->to_client), "%s", mnstr_peek_error(state->to_client));
81 0 : return -1;
82 : }
83 :
84 : // if it succeeds, return that to the client.
85 : // if it's still a block boundary, return that to the client.
86 : // if there's an error, return that to the client.
87 1736 : nread = mnstr_read(state->from_client, buf, elmsize, cnt);
88 1736 : if (nread > 0)
89 : return nread;
90 220 : if (nread == 0) {
91 220 : s->eof = true;
92 220 : state->from_client = NULL;
93 220 : return nread;
94 : } else {
95 0 : mnstr_set_error(s, mnstr_errnr(state->from_client), "%s", mnstr_peek_error(state->from_client));
96 0 : return -1;
97 : }
98 : }
99 :
100 : static void
101 355 : upload_close(stream *s)
102 : {
103 355 : struct mapi_filetransfer *state = s->stream_data.p;
104 :
105 355 : stream *from = state->from_client;
106 355 : if (from)
107 135 : discard(from);
108 :
109 355 : stream *to = state->to_client;
110 355 : mnstr_write(to, PROMPT3, strlen(PROMPT3), 1);
111 355 : mnstr_flush(to, MNSTR_FLUSH_DATA);
112 355 : }
113 :
114 : static ssize_t
115 15000113 : download_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
116 : {
117 15000113 : struct mapi_filetransfer *state = s->stream_data.p;
118 15000113 : stream *to = state->to_client;
119 15000113 : return to->write(to, buf, elmsize, cnt);
120 : }
121 :
122 : static void
123 75 : download_close(stream *s)
124 : {
125 75 : struct mapi_filetransfer *state = s->stream_data.p;
126 :
127 75 : stream *to = state->to_client;
128 75 : stream *from = state->from_client;
129 75 : if (to)
130 75 : mnstr_flush(to, MNSTR_FLUSH_DATA);
131 75 : if (from)
132 75 : discard(from);
133 75 : }
134 :
135 : static void
136 430 : destroy(stream *s)
137 : {
138 430 : struct mapi_filetransfer *state = s->stream_data.p;
139 430 : free(state);
140 430 : destroy_stream(s);
141 430 : }
142 :
143 :
144 : static stream*
145 433 : setup_transfer(const char *req, const char *filename, bstream *bs, stream *ws)
146 : {
147 433 : const char *msg = NULL;
148 433 : stream *s = NULL;
149 433 : struct mapi_filetransfer *state = NULL;
150 433 : ssize_t nwritten;
151 433 : ssize_t nread;
152 433 : bool ok;
153 433 : int oob = 0;
154 :
155 500 : while (!bs->eof) {
156 67 : if (bstream_next(bs) < 0) {
157 0 : msg = mnstr_peek_error(ws);
158 0 : goto end;
159 : }
160 : }
161 433 : stream *rs = bs->s;
162 433 : assert(isa_block_stream(ws));
163 433 : assert(isa_block_stream(rs));
164 :
165 433 : nwritten = mnstr_printf(ws, PROMPT3 "%s %s\n", req, filename);
166 433 : if (nwritten <= 0) {
167 0 : msg = mnstr_peek_error(ws);
168 0 : goto end;
169 : }
170 433 : if (mnstr_flush(ws, MNSTR_FLUSH_ALL) < 0) {
171 0 : msg = mnstr_peek_error(ws);
172 0 : goto end;
173 : }
174 :
175 433 : char buf[256];
176 433 : nread = mnstr_readline(rs, buf, sizeof(buf));
177 433 : ok = ((nread == 0 || (nread == 1 && buf[0] == '\n')) && !(oob = mnstr_getoob(rs)));
178 433 : if (!ok) {
179 3 : switch (oob) {
180 : case 1: /* client side interrupt */
181 : msg = "Query aborted";
182 : break;
183 0 : case 2:
184 0 : msg = "Read error on client";
185 0 : break;
186 3 : default:
187 3 : msg = nread > 0 ? buf : "Unknown error";
188 : break;
189 : }
190 3 : discard(rs);
191 3 : goto end;
192 : }
193 :
194 : // Client accepted the request
195 430 : state = malloc(sizeof(*state));
196 430 : if (!state) {
197 0 : msg = "malloc failed";
198 0 : goto end;
199 : }
200 430 : s = create_stream("ONCLIENT");
201 430 : if (!s) {
202 0 : free(state); /* no chance to free through destroy function */
203 0 : msg = mnstr_peek_error(NULL);
204 0 : goto end;
205 : }
206 430 : state->from_client = rs;
207 430 : state->to_client = ws;
208 430 : s->stream_data.p = state;
209 3 : end:
210 433 : if (msg) {
211 3 : mnstr_destroy(s);
212 3 : mnstr_set_open_error(filename, 0, "ON CLIENT: %s", msg);
213 3 : return NULL;
214 : } else {
215 430 : return s;
216 : }
217 : }
218 :
219 : stream*
220 358 : mapi_request_upload(const char *filename, bool binary, bstream *bs, stream *ws)
221 : {
222 358 : const char *req = binary ? "rb" : "r 0";
223 358 : stream *s = setup_transfer(req, filename, bs, ws);
224 358 : if (s == NULL)
225 : return NULL;
226 :
227 355 : s->binary = binary;
228 355 : s->read = upload_read;
229 355 : s->close = upload_close;
230 355 : s->destroy = destroy;
231 :
232 355 : return s;
233 : }
234 :
235 : stream*
236 75 : mapi_request_download(const char *filename, bool binary, bstream *bs, stream *ws)
237 : {
238 75 : const char *req = binary ? "wb" : "w";
239 75 : stream *s = setup_transfer(req, filename, bs, ws);
240 75 : if (s == NULL)
241 : return NULL;
242 :
243 75 : s->binary = binary;
244 75 : s->readonly = false;
245 75 : s->write = download_write;
246 75 : s->close = download_close;
247 75 : s->destroy = destroy;
248 :
249 75 : return s;
250 : }
|