Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "stream.h"
15 : #include "stream_internal.h"
16 : #include "mapi_prompt.h"
17 :
18 : static ssize_t
19 1898807 : byte_counting_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
20 : {
21 1898807 : uint64_t *counter = (uint64_t*) s->stream_data.p;
22 1898807 : ssize_t nwritten = s->inner->write(s->inner, buf, elmsize, cnt);
23 1898807 : if (nwritten >= 0) {
24 1898807 : *counter += elmsize * nwritten;
25 : }
26 1898807 : return nwritten;
27 : }
28 :
29 :
30 : stream *
31 600 : byte_counting_stream(stream *wrapped, uint64_t *counter)
32 : {
33 600 : stream *s = create_wrapper_stream(NULL, wrapped);
34 600 : if (!s)
35 : return NULL;
36 600 : s->stream_data.p = counter;
37 600 : s->write = &byte_counting_write;
38 600 : s->destroy = &destroy_stream;
39 600 : return s;
40 : }
41 :
42 :
43 :
44 : static void
45 101 : discard(stream *s)
46 : {
47 176 : static char bitbucket[8192];
48 176 : while (1) {
49 176 : ssize_t nread = mnstr_read(s, bitbucket, 1, sizeof(bitbucket));
50 176 : if (nread <= 0)
51 101 : return;
52 : assert(1);
53 : }
54 : }
55 :
56 : struct mapi_filetransfer {
57 : stream *from_client; // set to NULL after sending MAPI_PROMPT3
58 : stream *to_client; // set to NULL when client sends empty
59 : };
60 :
61 : static ssize_t
62 4999887 : upload_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
63 : {
64 4999887 : struct mapi_filetransfer *state = s->stream_data.p;
65 :
66 4999887 : if (state->from_client == NULL) {
67 0 : assert(s->eof);
68 : return 0;
69 : }
70 :
71 4999887 : ssize_t nread = mnstr_read(state->from_client, buf, elmsize, cnt);
72 4999887 : if (nread != 0 || state->from_client->eof)
73 : return nread;
74 :
75 : // before returning the 0 we send the prompt and make another attempt.
76 1621 : if (
77 1621 : mnstr_write(state->to_client, PROMPT2, strlen(PROMPT2), 1) != 1
78 1621 : || mnstr_flush(state->to_client, MNSTR_FLUSH_ALL) < 0
79 : ) {
80 0 : mnstr_set_error(s, mnstr_errnr(state->to_client), "%s", mnstr_peek_error(state->to_client));
81 0 : return -1;
82 : }
83 :
84 : // if it succeeds, return that to the client.
85 : // if it's still a block boundary, return that to the client.
86 : // if there's an error, return that to the client.
87 1621 : nread = mnstr_read(state->from_client, buf, elmsize, cnt);
88 1621 : if (nread > 0)
89 : return nread;
90 128 : if (nread == 0) {
91 128 : s->eof = true;
92 128 : state->from_client = NULL;
93 128 : return nread;
94 : } else {
95 0 : mnstr_set_error(s, mnstr_errnr(state->from_client), "%s", mnstr_peek_error(state->from_client));
96 0 : return -1;
97 : }
98 : }
99 :
100 : static void
101 152 : upload_close(stream *s)
102 : {
103 152 : struct mapi_filetransfer *state = s->stream_data.p;
104 :
105 152 : stream *from = state->from_client;
106 152 : if (from)
107 24 : discard(from);
108 :
109 152 : stream *to = state->to_client;
110 152 : mnstr_write(to, PROMPT3, strlen(PROMPT3), 1);
111 152 : mnstr_flush(to, MNSTR_FLUSH_DATA);
112 152 : }
113 :
114 : static ssize_t
115 15000113 : download_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
116 : {
117 15000113 : struct mapi_filetransfer *state = s->stream_data.p;
118 15000113 : stream *to = state->to_client;
119 15000113 : return to->write(to, buf, elmsize, cnt);
120 : }
121 :
122 : static void
123 75 : download_close(stream *s)
124 : {
125 75 : struct mapi_filetransfer *state = s->stream_data.p;
126 :
127 75 : stream *to = state->to_client;
128 75 : stream *from = state->from_client;
129 75 : if (to)
130 75 : mnstr_flush(to, MNSTR_FLUSH_DATA);
131 75 : if (from)
132 75 : discard(from);
133 75 : }
134 :
135 : static void
136 227 : destroy(stream *s)
137 : {
138 227 : struct mapi_filetransfer *state = s->stream_data.p;
139 227 : free(state);
140 227 : destroy_stream(s);
141 227 : }
142 :
143 :
144 : static stream*
145 229 : setup_transfer(const char *req, const char *filename, bstream *bs, stream *ws)
146 : {
147 229 : const char *msg = NULL;
148 229 : stream *s = NULL;
149 229 : struct mapi_filetransfer *state = NULL;
150 229 : ssize_t nwritten;
151 229 : ssize_t nread;
152 229 : bool ok;
153 229 : int oob = 0;
154 :
155 295 : while (!bs->eof)
156 66 : bstream_next(bs);
157 229 : stream *rs = bs->s;
158 229 : assert(isa_block_stream(ws));
159 229 : assert(isa_block_stream(rs));
160 :
161 229 : nwritten = mnstr_printf(ws, PROMPT3 "%s %s\n", req, filename);
162 229 : if (nwritten <= 0) {
163 0 : msg = mnstr_peek_error(ws);
164 0 : goto end;
165 : }
166 229 : if (mnstr_flush(ws, MNSTR_FLUSH_ALL) < 0) {
167 0 : msg = mnstr_peek_error(ws);
168 0 : goto end;
169 : }
170 :
171 229 : char buf[256];
172 229 : nread = mnstr_readline(rs, buf, sizeof(buf));
173 229 : ok = ((nread == 0 || (nread == 1 && buf[0] == '\n')) && !(oob = mnstr_getoob(rs)));
174 229 : if (!ok) {
175 2 : switch (oob) {
176 : case 1: /* client side interrupt */
177 : msg = "Query aborted";
178 : break;
179 0 : case 2:
180 0 : msg = "Read error on client";
181 0 : break;
182 2 : default:
183 2 : msg = nread > 0 ? buf : "Unknown error";
184 : break;
185 : }
186 2 : discard(rs);
187 2 : goto end;
188 : }
189 :
190 : // Client accepted the request
191 227 : state = malloc(sizeof(*state));
192 227 : if (!state) {
193 0 : msg = "malloc failed";
194 0 : goto end;
195 : }
196 227 : s = create_stream("ONCLIENT");
197 227 : if (!s) {
198 0 : free(state); /* no chance to free through destroy function */
199 0 : msg = mnstr_peek_error(NULL);
200 0 : goto end;
201 : }
202 227 : state->from_client = rs;
203 227 : state->to_client = ws;
204 227 : s->stream_data.p = state;
205 2 : end:
206 229 : if (msg) {
207 2 : mnstr_destroy(s);
208 2 : mnstr_set_open_error(filename, 0, "ON CLIENT: %s", msg);
209 2 : return NULL;
210 : } else {
211 227 : return s;
212 : }
213 : }
214 :
215 : stream*
216 154 : mapi_request_upload(const char *filename, bool binary, bstream *bs, stream *ws)
217 : {
218 154 : const char *req = binary ? "rb" : "r 0";
219 154 : stream *s = setup_transfer(req, filename, bs, ws);
220 154 : if (s == NULL)
221 : return NULL;
222 :
223 152 : s->binary = binary;
224 152 : s->read = upload_read;
225 152 : s->close = upload_close;
226 152 : s->destroy = destroy;
227 :
228 152 : return s;
229 : }
230 :
231 : stream*
232 75 : mapi_request_download(const char *filename, bool binary, bstream *bs, stream *ws)
233 : {
234 75 : const char *req = binary ? "wb" : "w";
235 75 : stream *s = setup_transfer(req, filename, bs, ws);
236 75 : if (s == NULL)
237 : return NULL;
238 :
239 75 : s->binary = binary;
240 75 : s->readonly = false;
241 75 : s->write = download_write;
242 75 : s->close = download_close;
243 75 : s->destroy = destroy;
244 :
245 75 : return s;
246 : }
|