LCOV - code coverage report
Current view: top level - common/stream - mapi_stream.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 117 136 86.0 %
Date: 2024-12-19 23:10:26 Functions: 11 11 100.0 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "stream.h"
      15             : #include "stream_internal.h"
      16             : #include "mapi_prompt.h"
      17             : 
      18             : static ssize_t
      19     1800170 : byte_counting_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
      20             : {
      21     1800170 :         uint64_t *counter = (uint64_t*) s->stream_data.p;
      22     1800170 :         ssize_t nwritten = s->inner->write(s->inner, buf, elmsize, cnt);
      23     1800170 :         if (nwritten >= 0) {
      24     1800170 :                 *counter += elmsize * nwritten;
      25             :         }
      26     1800170 :         return nwritten;
      27             : }
      28             : 
      29             : 
      30             : stream *
      31         600 : byte_counting_stream(stream *wrapped, uint64_t *counter)
      32             : {
      33         600 :         stream *s = create_wrapper_stream(NULL, wrapped);
      34         600 :         if (!s)
      35             :                 return NULL;
      36         600 :         s->stream_data.p = counter;
      37         600 :         s->write = &byte_counting_write;
      38         600 :         s->destroy = &destroy_stream;
      39         600 :         return s;
      40             : }
      41             : 
      42             : 
      43             : 
      44             : static void
      45         213 : discard(stream *s)
      46             : {
      47         288 :         static char bitbucket[8192];
      48         288 :         while (1) {
      49         288 :                 ssize_t nread = mnstr_read(s, bitbucket, 1, sizeof(bitbucket));
      50         288 :                 if (nread <= 0)
      51         213 :                         return;
      52             :                 assert(1);
      53             :         }
      54             : }
      55             : 
      56             : struct mapi_filetransfer {
      57             :         stream *from_client; // set to NULL after sending MAPI_PROMPT3
      58             :         stream *to_client; // set to NULL when client sends empty
      59             : };
      60             : 
      61             : static ssize_t
      62     5060262 : upload_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
      63             : {
      64     5060262 :         struct mapi_filetransfer *state = s->stream_data.p;
      65             : 
      66     5060262 :         if (state->from_client == NULL) {
      67           0 :                 assert(s->eof);
      68             :                 return 0;
      69             :         }
      70             : 
      71     5060262 :         ssize_t nread = mnstr_read(state->from_client, buf, elmsize, cnt);
      72     5060262 :         if (nread != 0 || state->from_client->eof)
      73             :                 return nread;
      74             : 
      75             :         // before returning the 0 we send the prompt and make another attempt.
      76        1736 :         if (
      77        1736 :                         mnstr_write(state->to_client, PROMPT2, strlen(PROMPT2), 1) != 1
      78        1736 :                 ||      mnstr_flush(state->to_client, MNSTR_FLUSH_ALL) < 0
      79             :         ) {
      80           0 :                 mnstr_set_error(s, mnstr_errnr(state->to_client), "%s", mnstr_peek_error(state->to_client));
      81           0 :                 return -1;
      82             :         }
      83             : 
      84             :         // if it succeeds, return that to the client.
      85             :         // if it's still a block boundary, return that to the client.
      86             :         // if there's an error, return that to the client.
      87        1736 :         nread = mnstr_read(state->from_client, buf, elmsize, cnt);
      88        1736 :         if (nread > 0)
      89             :                 return nread;
      90         220 :         if (nread == 0) {
      91         220 :                 s->eof = true;
      92         220 :                 state->from_client = NULL;
      93         220 :                 return nread;
      94             :         } else {
      95           0 :                 mnstr_set_error(s, mnstr_errnr(state->from_client), "%s", mnstr_peek_error(state->from_client));
      96           0 :                 return -1;
      97             :         }
      98             : }
      99             : 
     100             : static void
     101         355 : upload_close(stream *s)
     102             : {
     103         355 :         struct mapi_filetransfer *state = s->stream_data.p;
     104             : 
     105         355 :         stream *from = state->from_client;
     106         355 :         if (from)
     107         135 :                 discard(from);
     108             : 
     109         355 :         stream *to = state->to_client;
     110         355 :         mnstr_write(to, PROMPT3, strlen(PROMPT3), 1);
     111         355 :         mnstr_flush(to, MNSTR_FLUSH_DATA);
     112         355 : }
     113             : 
     114             : static ssize_t
     115    15000113 : download_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
     116             : {
     117    15000113 :         struct mapi_filetransfer *state = s->stream_data.p;
     118    15000113 :         stream *to = state->to_client;
     119    15000113 :         return to->write(to, buf, elmsize, cnt);
     120             : }
     121             : 
     122             : static void
     123          75 : download_close(stream *s)
     124             : {
     125          75 :         struct mapi_filetransfer *state = s->stream_data.p;
     126             : 
     127          75 :         stream *to = state->to_client;
     128          75 :         stream *from = state->from_client;
     129          75 :         if (to)
     130          75 :                 mnstr_flush(to, MNSTR_FLUSH_DATA);
     131          75 :         if (from)
     132          75 :                 discard(from);
     133          75 : }
     134             : 
     135             : static void
     136         430 : destroy(stream *s)
     137             : {
     138         430 :         struct mapi_filetransfer *state = s->stream_data.p;
     139         430 :         free(state);
     140         430 :         destroy_stream(s);
     141         430 : }
     142             : 
     143             : 
     144             : static stream*
     145         433 : setup_transfer(const char *req, const char *filename, bstream *bs, stream *ws)
     146             : {
     147         433 :         const char *msg = NULL;
     148         433 :         stream *s = NULL;
     149         433 :         struct mapi_filetransfer *state = NULL;
     150         433 :         ssize_t nwritten;
     151         433 :         ssize_t nread;
     152         433 :         bool ok;
     153         433 :         int oob = 0;
     154             : 
     155         500 :         while (!bs->eof) {
     156          67 :                 if (bstream_next(bs) < 0) {
     157           0 :                         msg = mnstr_peek_error(ws);
     158           0 :                         goto end;
     159             :                 }
     160             :         }
     161         433 :         stream *rs = bs->s;
     162         433 :         assert(isa_block_stream(ws));
     163         433 :         assert(isa_block_stream(rs));
     164             : 
     165         433 :         nwritten = mnstr_printf(ws, PROMPT3 "%s %s\n", req, filename);
     166         433 :         if (nwritten <= 0) {
     167           0 :                 msg = mnstr_peek_error(ws);
     168           0 :                 goto end;
     169             :         }
     170         433 :         if (mnstr_flush(ws, MNSTR_FLUSH_ALL) < 0) {
     171           0 :                 msg = mnstr_peek_error(ws);
     172           0 :                 goto end;
     173             :         }
     174             : 
     175         433 :         char buf[256];
     176         433 :         nread = mnstr_readline(rs, buf, sizeof(buf));
     177         433 :         ok = ((nread == 0 || (nread == 1 && buf[0] == '\n')) && !(oob = mnstr_getoob(rs)));
     178         433 :         if (!ok) {
     179           3 :                 switch (oob) {
     180             :                 case 1:                                 /* client side interrupt */
     181             :                         msg = "Query aborted";
     182             :                         break;
     183           0 :                 case 2:
     184           0 :                         msg = "Read error on client";
     185           0 :                         break;
     186           3 :                 default:
     187           3 :                         msg = nread > 0 ? buf : "Unknown error";
     188             :                         break;
     189             :                 }
     190           3 :                 discard(rs);
     191           3 :                 goto end;
     192             :         }
     193             : 
     194             :         // Client accepted the request
     195         430 :         state = malloc(sizeof(*state));
     196         430 :         if (!state) {
     197           0 :                 msg = "malloc failed";
     198           0 :                 goto end;
     199             :         }
     200         430 :         s = create_stream("ONCLIENT");
     201         430 :         if (!s) {
     202           0 :                 free(state);                    /* no chance to free through destroy function */
     203           0 :                 msg = mnstr_peek_error(NULL);
     204           0 :                 goto end;
     205             :         }
     206         430 :         state->from_client = rs;
     207         430 :         state->to_client = ws;
     208         430 :         s->stream_data.p = state;
     209           3 : end:
     210         433 :         if (msg) {
     211           3 :                 mnstr_destroy(s);
     212           3 :                 mnstr_set_open_error(filename, 0, "ON CLIENT: %s", msg);
     213           3 :                 return NULL;
     214             :         } else {
     215         430 :                 return s;
     216             :         }
     217             : }
     218             : 
     219             : stream*
     220         358 : mapi_request_upload(const char *filename, bool binary, bstream *bs, stream *ws)
     221             : {
     222         358 :         const char *req = binary ? "rb" : "r 0";
     223         358 :         stream *s = setup_transfer(req, filename, bs, ws);
     224         358 :         if (s == NULL)
     225             :                 return NULL;
     226             : 
     227         355 :         s->binary = binary;
     228         355 :         s->read = upload_read;
     229         355 :         s->close = upload_close;
     230         355 :         s->destroy = destroy;
     231             : 
     232         355 :         return s;
     233             : }
     234             : 
     235             : stream*
     236          75 : mapi_request_download(const char *filename, bool binary, bstream *bs, stream *ws)
     237             : {
     238          75 :         const char *req = binary ? "wb" : "w";
     239          75 :         stream *s = setup_transfer(req, filename, bs, ws);
     240          75 :         if (s == NULL)
     241             :                 return NULL;
     242             : 
     243          75 :         s->binary = binary;
     244          75 :         s->readonly = false;
     245          75 :         s->write = download_write;
     246          75 :         s->close = download_close;
     247          75 :         s->destroy = destroy;
     248             : 
     249          75 :         return s;
     250             : }

Generated by: LCOV version 1.14