Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "stream.h"
15 : #include "stream_internal.h"
16 : #include "mapi_prompt.h"
17 :
18 : static ssize_t
19 1899406 : byte_counting_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
20 : {
21 1899406 : uint64_t *counter = (uint64_t*) s->stream_data.p;
22 1899406 : ssize_t nwritten = s->inner->write(s->inner, buf, elmsize, cnt);
23 1899406 : if (nwritten >= 0) {
24 1899406 : *counter += elmsize * nwritten;
25 : }
26 1899406 : return nwritten;
27 : }
28 :
29 :
30 : stream *
31 601 : byte_counting_stream(stream *wrapped, uint64_t *counter)
32 : {
33 601 : stream *s = create_wrapper_stream(NULL, wrapped);
34 601 : if (!s)
35 : return NULL;
36 601 : s->stream_data.p = counter;
37 601 : s->write = &byte_counting_write;
38 601 : s->destroy = &destroy_stream;
39 601 : return s;
40 : }
41 :
42 :
43 :
44 : static void
45 83 : discard(stream *s)
46 : {
47 158 : static char bitbucket[8192];
48 158 : while (1) {
49 158 : ssize_t nread = mnstr_read(s, bitbucket, 1, sizeof(bitbucket));
50 158 : if (nread <= 0)
51 83 : return;
52 : assert(1);
53 : }
54 : }
55 :
56 : struct mapi_filetransfer {
57 : stream *from_client; // set to NULL after sending MAPI_PROMPT3
58 : stream *to_client; // set to NULL when client sends empty
59 : };
60 :
61 : static ssize_t
62 4999853 : upload_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
63 : {
64 4999853 : struct mapi_filetransfer *state = s->stream_data.p;
65 :
66 4999853 : if (state->from_client == NULL) {
67 0 : assert(s->eof);
68 : return 0;
69 : }
70 :
71 4999853 : ssize_t nread = mnstr_read(state->from_client, buf, elmsize, cnt);
72 4999853 : if (nread != 0 || state->from_client->eof)
73 : return nread;
74 :
75 : // before returning the 0 we send the prompt and make another attempt.
76 1606 : if (
77 1606 : mnstr_write(state->to_client, PROMPT2, strlen(PROMPT2), 1) != 1
78 1606 : || mnstr_flush(state->to_client, MNSTR_FLUSH_ALL) < 0
79 : ) {
80 0 : mnstr_set_error(s, mnstr_errnr(state->to_client), "%s", mnstr_peek_error(state->to_client));
81 0 : return -1;
82 : }
83 :
84 : // if it succeeds, return that to the client.
85 : // if it's still a block boundary, return that to the client.
86 : // if there's an error, return that to the client.
87 1606 : nread = mnstr_read(state->from_client, buf, elmsize, cnt);
88 1606 : if (nread > 0)
89 : return nread;
90 129 : if (nread == 0) {
91 129 : s->eof = true;
92 129 : state->from_client = NULL;
93 129 : return nread;
94 : } else {
95 0 : mnstr_set_error(s, mnstr_errnr(state->from_client), "%s", mnstr_peek_error(state->from_client));
96 0 : return -1;
97 : }
98 : }
99 :
100 : static void
101 135 : upload_close(stream *s)
102 : {
103 135 : struct mapi_filetransfer *state = s->stream_data.p;
104 :
105 135 : stream *from = state->from_client;
106 135 : if (from)
107 6 : discard(from);
108 :
109 135 : stream *to = state->to_client;
110 135 : mnstr_write(to, PROMPT3, strlen(PROMPT3), 1);
111 135 : mnstr_flush(to, MNSTR_FLUSH_DATA);
112 135 : }
113 :
114 : static ssize_t
115 15000113 : download_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
116 : {
117 15000113 : struct mapi_filetransfer *state = s->stream_data.p;
118 15000113 : stream *to = state->to_client;
119 15000113 : return to->write(to, buf, elmsize, cnt);
120 : }
121 :
122 : static void
123 75 : download_close(stream *s)
124 : {
125 75 : struct mapi_filetransfer *state = s->stream_data.p;
126 :
127 75 : stream *to = state->to_client;
128 75 : stream *from = state->from_client;
129 75 : if (to)
130 75 : mnstr_flush(to, MNSTR_FLUSH_DATA);
131 75 : if (from)
132 75 : discard(from);
133 75 : }
134 :
135 : static void
136 210 : destroy(stream *s)
137 : {
138 210 : struct mapi_filetransfer *state = s->stream_data.p;
139 210 : free(state);
140 210 : destroy_stream(s);
141 210 : }
142 :
143 :
144 : static stream*
145 212 : setup_transfer(const char *req, const char *filename, bstream *bs, stream *ws)
146 : {
147 212 : const char *msg = NULL;
148 212 : stream *s = NULL;
149 212 : struct mapi_filetransfer *state = NULL;
150 212 : ssize_t nwritten;
151 212 : ssize_t nread;
152 212 : bool ok;
153 :
154 260 : while (!bs->eof)
155 48 : bstream_next(bs);
156 212 : stream *rs = bs->s;
157 212 : assert(isa_block_stream(ws));
158 212 : assert(isa_block_stream(rs));
159 :
160 212 : nwritten = mnstr_printf(ws, PROMPT3 "%s %s\n", req, filename);
161 212 : if (nwritten <= 0) {
162 0 : msg = mnstr_peek_error(ws);
163 0 : goto end;
164 : }
165 212 : if (mnstr_flush(ws, MNSTR_FLUSH_ALL) < 0) {
166 0 : msg = mnstr_peek_error(ws);
167 0 : goto end;
168 : }
169 :
170 212 : char buf[256];
171 212 : nread = mnstr_readline(rs, buf, sizeof(buf));
172 212 : ok = (nread == 0 || (nread == 1 && buf[0] == '\n'));
173 212 : if (!ok) {
174 2 : msg = buf;
175 2 : discard(rs);
176 2 : goto end;
177 : }
178 :
179 : // Client accepted the request
180 210 : state = malloc(sizeof(*state));
181 210 : if (!state) {
182 0 : msg = "malloc failed";
183 0 : goto end;
184 : }
185 210 : s = create_stream("ONCLIENT");
186 210 : if (!s) {
187 0 : free(state); /* no chance to free through destroy function */
188 0 : msg = mnstr_peek_error(NULL);
189 0 : goto end;
190 : }
191 210 : state->from_client = rs;
192 210 : state->to_client = ws;
193 210 : s->stream_data.p = state;
194 0 : end:
195 212 : if (msg) {
196 2 : mnstr_destroy(s);
197 2 : mnstr_set_open_error(filename, 0, "ON CLIENT: %s", msg);
198 2 : return NULL;
199 : } else {
200 210 : return s;
201 : }
202 : }
203 :
204 : stream*
205 137 : mapi_request_upload(const char *filename, bool binary, bstream *bs, stream *ws)
206 : {
207 137 : const char *req = binary ? "rb" : "r 0";
208 137 : stream *s = setup_transfer(req, filename, bs, ws);
209 137 : if (s == NULL)
210 : return NULL;
211 :
212 135 : s->binary = binary;
213 135 : s->read = upload_read;
214 135 : s->close = upload_close;
215 135 : s->destroy = destroy;
216 :
217 135 : return s;
218 : }
219 :
220 : stream*
221 75 : mapi_request_download(const char *filename, bool binary, bstream *bs, stream *ws)
222 : {
223 75 : const char *req = binary ? "wb" : "w";
224 75 : stream *s = setup_transfer(req, filename, bs, ws);
225 75 : if (s == NULL)
226 : return NULL;
227 :
228 75 : s->binary = binary;
229 75 : s->readonly = false;
230 75 : s->write = download_write;
231 75 : s->close = download_close;
232 75 : s->destroy = destroy;
233 :
234 75 : return s;
235 : }
|