LCOV - code coverage report
Current view: top level - common/stream - mapi_stream.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 113 128 88.3 %
Date: 2024-04-25 20:03:45 Functions: 11 11 100.0 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "stream.h"
      15             : #include "stream_internal.h"
      16             : #include "mapi_prompt.h"
      17             : 
      18             : static ssize_t
      19     1899406 : byte_counting_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
      20             : {
      21     1899406 :         uint64_t *counter = (uint64_t*) s->stream_data.p;
      22     1899406 :         ssize_t nwritten = s->inner->write(s->inner, buf, elmsize, cnt);
      23     1899406 :         if (nwritten >= 0) {
      24     1899406 :                 *counter += elmsize * nwritten;
      25             :         }
      26     1899406 :         return nwritten;
      27             : }
      28             : 
      29             : 
      30             : stream *
      31         601 : byte_counting_stream(stream *wrapped, uint64_t *counter)
      32             : {
      33         601 :         stream *s = create_wrapper_stream(NULL, wrapped);
      34         601 :         if (!s)
      35             :                 return NULL;
      36         601 :         s->stream_data.p = counter;
      37         601 :         s->write = &byte_counting_write;
      38         601 :         s->destroy = &destroy_stream;
      39         601 :         return s;
      40             : }
      41             : 
      42             : 
      43             : 
      44             : static void
      45          83 : discard(stream *s)
      46             : {
      47         158 :         static char bitbucket[8192];
      48         158 :         while (1) {
      49         158 :                 ssize_t nread = mnstr_read(s, bitbucket, 1, sizeof(bitbucket));
      50         158 :                 if (nread <= 0)
      51          83 :                         return;
      52             :                 assert(1);
      53             :         }
      54             : }
      55             : 
      56             : struct mapi_filetransfer {
      57             :         stream *from_client; // set to NULL after sending MAPI_PROMPT3
      58             :         stream *to_client; // set to NULL when client sends empty
      59             : };
      60             : 
      61             : static ssize_t
      62     4999853 : upload_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
      63             : {
      64     4999853 :         struct mapi_filetransfer *state = s->stream_data.p;
      65             : 
      66     4999853 :         if (state->from_client == NULL) {
      67           0 :                 assert(s->eof);
      68             :                 return 0;
      69             :         }
      70             : 
      71     4999853 :         ssize_t nread = mnstr_read(state->from_client, buf, elmsize, cnt);
      72     4999853 :         if (nread != 0 || state->from_client->eof)
      73             :                 return nread;
      74             : 
      75             :         // before returning the 0 we send the prompt and make another attempt.
      76        1606 :         if (
      77        1606 :                         mnstr_write(state->to_client, PROMPT2, strlen(PROMPT2), 1) != 1
      78        1606 :                 ||      mnstr_flush(state->to_client, MNSTR_FLUSH_ALL) < 0
      79             :         ) {
      80           0 :                 mnstr_set_error(s, mnstr_errnr(state->to_client), "%s", mnstr_peek_error(state->to_client));
      81           0 :                 return -1;
      82             :         }
      83             : 
      84             :         // if it succeeds, return that to the client.
      85             :         // if it's still a block boundary, return that to the client.
      86             :         // if there's an error, return that to the client.
      87        1606 :         nread = mnstr_read(state->from_client, buf, elmsize, cnt);
      88        1606 :         if (nread > 0)
      89             :                 return nread;
      90         129 :         if (nread == 0) {
      91         129 :                 s->eof = true;
      92         129 :                 state->from_client = NULL;
      93         129 :                 return nread;
      94             :         } else {
      95           0 :                 mnstr_set_error(s, mnstr_errnr(state->from_client), "%s", mnstr_peek_error(state->from_client));
      96           0 :                 return -1;
      97             :         }
      98             : }
      99             : 
     100             : static void
     101         135 : upload_close(stream *s)
     102             : {
     103         135 :         struct mapi_filetransfer *state = s->stream_data.p;
     104             : 
     105         135 :         stream *from = state->from_client;
     106         135 :         if (from)
     107           6 :                 discard(from);
     108             : 
     109         135 :         stream *to = state->to_client;
     110         135 :         mnstr_write(to, PROMPT3, strlen(PROMPT3), 1);
     111         135 :         mnstr_flush(to, MNSTR_FLUSH_DATA);
     112         135 : }
     113             : 
     114             : static ssize_t
     115    15000113 : download_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
     116             : {
     117    15000113 :         struct mapi_filetransfer *state = s->stream_data.p;
     118    15000113 :         stream *to = state->to_client;
     119    15000113 :         return to->write(to, buf, elmsize, cnt);
     120             : }
     121             : 
     122             : static void
     123          75 : download_close(stream *s)
     124             : {
     125          75 :         struct mapi_filetransfer *state = s->stream_data.p;
     126             : 
     127          75 :         stream *to = state->to_client;
     128          75 :         stream *from = state->from_client;
     129          75 :         if (to)
     130          75 :                 mnstr_flush(to, MNSTR_FLUSH_DATA);
     131          75 :         if (from)
     132          75 :                 discard(from);
     133          75 : }
     134             : 
     135             : static void
     136         210 : destroy(stream *s)
     137             : {
     138         210 :         struct mapi_filetransfer *state = s->stream_data.p;
     139         210 :         free(state);
     140         210 :         destroy_stream(s);
     141         210 : }
     142             : 
     143             : 
     144             : static stream*
     145         212 : setup_transfer(const char *req, const char *filename, bstream *bs, stream *ws)
     146             : {
     147         212 :         const char *msg = NULL;
     148         212 :         stream *s = NULL;
     149         212 :         struct mapi_filetransfer *state = NULL;
     150         212 :         ssize_t nwritten;
     151         212 :         ssize_t nread;
     152         212 :         bool ok;
     153             : 
     154         260 :         while (!bs->eof)
     155          48 :                 bstream_next(bs);
     156         212 :         stream *rs = bs->s;
     157         212 :         assert(isa_block_stream(ws));
     158         212 :         assert(isa_block_stream(rs));
     159             : 
     160         212 :         nwritten = mnstr_printf(ws, PROMPT3 "%s %s\n", req, filename);
     161         212 :         if (nwritten <= 0) {
     162           0 :                 msg = mnstr_peek_error(ws);
     163           0 :                 goto end;
     164             :         }
     165         212 :         if (mnstr_flush(ws, MNSTR_FLUSH_ALL) < 0) {
     166           0 :                 msg = mnstr_peek_error(ws);
     167           0 :                 goto end;
     168             :         }
     169             : 
     170         212 :         char buf[256];
     171         212 :         nread = mnstr_readline(rs, buf, sizeof(buf));
     172         212 :         ok = (nread == 0 || (nread == 1 && buf[0] == '\n'));
     173         212 :         if (!ok) {
     174           2 :                 msg = buf;
     175           2 :                 discard(rs);
     176           2 :                 goto end;
     177             :         }
     178             : 
     179             :         // Client accepted the request
     180         210 :         state = malloc(sizeof(*state));
     181         210 :         if (!state) {
     182           0 :                 msg = "malloc failed";
     183           0 :                 goto end;
     184             :         }
     185         210 :         s = create_stream("ONCLIENT");
     186         210 :         if (!s) {
     187           0 :                 free(state);                    /* no chance to free through destroy function */
     188           0 :                 msg = mnstr_peek_error(NULL);
     189           0 :                 goto end;
     190             :         }
     191         210 :         state->from_client = rs;
     192         210 :         state->to_client = ws;
     193         210 :         s->stream_data.p = state;
     194           0 : end:
     195         212 :         if (msg) {
     196           2 :                 mnstr_destroy(s);
     197           2 :                 mnstr_set_open_error(filename, 0, "ON CLIENT: %s", msg);
     198           2 :                 return NULL;
     199             :         } else {
     200         210 :                 return s;
     201             :         }
     202             : }
     203             : 
     204             : stream*
     205         137 : mapi_request_upload(const char *filename, bool binary, bstream *bs, stream *ws)
     206             : {
     207         137 :         const char *req = binary ? "rb" : "r 0";
     208         137 :         stream *s = setup_transfer(req, filename, bs, ws);
     209         137 :         if (s == NULL)
     210             :                 return NULL;
     211             : 
     212         135 :         s->binary = binary;
     213         135 :         s->read = upload_read;
     214         135 :         s->close = upload_close;
     215         135 :         s->destroy = destroy;
     216             : 
     217         135 :         return s;
     218             : }
     219             : 
     220             : stream*
     221          75 : mapi_request_download(const char *filename, bool binary, bstream *bs, stream *ws)
     222             : {
     223          75 :         const char *req = binary ? "wb" : "w";
     224          75 :         stream *s = setup_transfer(req, filename, bs, ws);
     225          75 :         if (s == NULL)
     226             :                 return NULL;
     227             : 
     228          75 :         s->binary = binary;
     229          75 :         s->readonly = false;
     230          75 :         s->write = download_write;
     231          75 :         s->close = download_close;
     232          75 :         s->destroy = destroy;
     233             : 
     234          75 :         return s;
     235             : }

Generated by: LCOV version 1.14