LCOV - code coverage report
Current view: top level - common/stream - mapi_stream.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 117 134 87.3 %
Date: 2024-04-26 00:35:57 Functions: 11 11 100.0 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "stream.h"
      15             : #include "stream_internal.h"
      16             : #include "mapi_prompt.h"
      17             : 
      18             : static ssize_t
      19     1898807 : byte_counting_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
      20             : {
      21     1898807 :         uint64_t *counter = (uint64_t*) s->stream_data.p;
      22     1898807 :         ssize_t nwritten = s->inner->write(s->inner, buf, elmsize, cnt);
      23     1898807 :         if (nwritten >= 0) {
      24     1898807 :                 *counter += elmsize * nwritten;
      25             :         }
      26     1898807 :         return nwritten;
      27             : }
      28             : 
      29             : 
      30             : stream *
      31         600 : byte_counting_stream(stream *wrapped, uint64_t *counter)
      32             : {
      33         600 :         stream *s = create_wrapper_stream(NULL, wrapped);
      34         600 :         if (!s)
      35             :                 return NULL;
      36         600 :         s->stream_data.p = counter;
      37         600 :         s->write = &byte_counting_write;
      38         600 :         s->destroy = &destroy_stream;
      39         600 :         return s;
      40             : }
      41             : 
      42             : 
      43             : 
      44             : static void
      45         101 : discard(stream *s)
      46             : {
      47         176 :         static char bitbucket[8192];
      48         176 :         while (1) {
      49         176 :                 ssize_t nread = mnstr_read(s, bitbucket, 1, sizeof(bitbucket));
      50         176 :                 if (nread <= 0)
      51         101 :                         return;
      52             :                 assert(1);
      53             :         }
      54             : }
      55             : 
      56             : struct mapi_filetransfer {
      57             :         stream *from_client; // set to NULL after sending MAPI_PROMPT3
      58             :         stream *to_client; // set to NULL when client sends empty
      59             : };
      60             : 
      61             : static ssize_t
      62     4999887 : upload_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
      63             : {
      64     4999887 :         struct mapi_filetransfer *state = s->stream_data.p;
      65             : 
      66     4999887 :         if (state->from_client == NULL) {
      67           0 :                 assert(s->eof);
      68             :                 return 0;
      69             :         }
      70             : 
      71     4999887 :         ssize_t nread = mnstr_read(state->from_client, buf, elmsize, cnt);
      72     4999887 :         if (nread != 0 || state->from_client->eof)
      73             :                 return nread;
      74             : 
      75             :         // before returning the 0 we send the prompt and make another attempt.
      76        1621 :         if (
      77        1621 :                         mnstr_write(state->to_client, PROMPT2, strlen(PROMPT2), 1) != 1
      78        1621 :                 ||      mnstr_flush(state->to_client, MNSTR_FLUSH_ALL) < 0
      79             :         ) {
      80           0 :                 mnstr_set_error(s, mnstr_errnr(state->to_client), "%s", mnstr_peek_error(state->to_client));
      81           0 :                 return -1;
      82             :         }
      83             : 
      84             :         // if it succeeds, return that to the client.
      85             :         // if it's still a block boundary, return that to the client.
      86             :         // if there's an error, return that to the client.
      87        1621 :         nread = mnstr_read(state->from_client, buf, elmsize, cnt);
      88        1621 :         if (nread > 0)
      89             :                 return nread;
      90         128 :         if (nread == 0) {
      91         128 :                 s->eof = true;
      92         128 :                 state->from_client = NULL;
      93         128 :                 return nread;
      94             :         } else {
      95           0 :                 mnstr_set_error(s, mnstr_errnr(state->from_client), "%s", mnstr_peek_error(state->from_client));
      96           0 :                 return -1;
      97             :         }
      98             : }
      99             : 
     100             : static void
     101         152 : upload_close(stream *s)
     102             : {
     103         152 :         struct mapi_filetransfer *state = s->stream_data.p;
     104             : 
     105         152 :         stream *from = state->from_client;
     106         152 :         if (from)
     107          24 :                 discard(from);
     108             : 
     109         152 :         stream *to = state->to_client;
     110         152 :         mnstr_write(to, PROMPT3, strlen(PROMPT3), 1);
     111         152 :         mnstr_flush(to, MNSTR_FLUSH_DATA);
     112         152 : }
     113             : 
     114             : static ssize_t
     115    15000113 : download_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
     116             : {
     117    15000113 :         struct mapi_filetransfer *state = s->stream_data.p;
     118    15000113 :         stream *to = state->to_client;
     119    15000113 :         return to->write(to, buf, elmsize, cnt);
     120             : }
     121             : 
     122             : static void
     123          75 : download_close(stream *s)
     124             : {
     125          75 :         struct mapi_filetransfer *state = s->stream_data.p;
     126             : 
     127          75 :         stream *to = state->to_client;
     128          75 :         stream *from = state->from_client;
     129          75 :         if (to)
     130          75 :                 mnstr_flush(to, MNSTR_FLUSH_DATA);
     131          75 :         if (from)
     132          75 :                 discard(from);
     133          75 : }
     134             : 
     135             : static void
     136         227 : destroy(stream *s)
     137             : {
     138         227 :         struct mapi_filetransfer *state = s->stream_data.p;
     139         227 :         free(state);
     140         227 :         destroy_stream(s);
     141         227 : }
     142             : 
     143             : 
     144             : static stream*
     145         229 : setup_transfer(const char *req, const char *filename, bstream *bs, stream *ws)
     146             : {
     147         229 :         const char *msg = NULL;
     148         229 :         stream *s = NULL;
     149         229 :         struct mapi_filetransfer *state = NULL;
     150         229 :         ssize_t nwritten;
     151         229 :         ssize_t nread;
     152         229 :         bool ok;
     153         229 :         int oob = 0;
     154             : 
     155         295 :         while (!bs->eof)
     156          66 :                 bstream_next(bs);
     157         229 :         stream *rs = bs->s;
     158         229 :         assert(isa_block_stream(ws));
     159         229 :         assert(isa_block_stream(rs));
     160             : 
     161         229 :         nwritten = mnstr_printf(ws, PROMPT3 "%s %s\n", req, filename);
     162         229 :         if (nwritten <= 0) {
     163           0 :                 msg = mnstr_peek_error(ws);
     164           0 :                 goto end;
     165             :         }
     166         229 :         if (mnstr_flush(ws, MNSTR_FLUSH_ALL) < 0) {
     167           0 :                 msg = mnstr_peek_error(ws);
     168           0 :                 goto end;
     169             :         }
     170             : 
     171         229 :         char buf[256];
     172         229 :         nread = mnstr_readline(rs, buf, sizeof(buf));
     173         229 :         ok = ((nread == 0 || (nread == 1 && buf[0] == '\n')) && !(oob = mnstr_getoob(rs)));
     174         229 :         if (!ok) {
     175           2 :                 switch (oob) {
     176             :                 case 1:                                 /* client side interrupt */
     177             :                         msg = "Query aborted";
     178             :                         break;
     179           0 :                 case 2:
     180           0 :                         msg = "Read error on client";
     181           0 :                         break;
     182           2 :                 default:
     183           2 :                         msg = nread > 0 ? buf : "Unknown error";
     184             :                         break;
     185             :                 }
     186           2 :                 discard(rs);
     187           2 :                 goto end;
     188             :         }
     189             : 
     190             :         // Client accepted the request
     191         227 :         state = malloc(sizeof(*state));
     192         227 :         if (!state) {
     193           0 :                 msg = "malloc failed";
     194           0 :                 goto end;
     195             :         }
     196         227 :         s = create_stream("ONCLIENT");
     197         227 :         if (!s) {
     198           0 :                 free(state);                    /* no chance to free through destroy function */
     199           0 :                 msg = mnstr_peek_error(NULL);
     200           0 :                 goto end;
     201             :         }
     202         227 :         state->from_client = rs;
     203         227 :         state->to_client = ws;
     204         227 :         s->stream_data.p = state;
     205           2 : end:
     206         229 :         if (msg) {
     207           2 :                 mnstr_destroy(s);
     208           2 :                 mnstr_set_open_error(filename, 0, "ON CLIENT: %s", msg);
     209           2 :                 return NULL;
     210             :         } else {
     211         227 :                 return s;
     212             :         }
     213             : }
     214             : 
     215             : stream*
     216         154 : mapi_request_upload(const char *filename, bool binary, bstream *bs, stream *ws)
     217             : {
     218         154 :         const char *req = binary ? "rb" : "r 0";
     219         154 :         stream *s = setup_transfer(req, filename, bs, ws);
     220         154 :         if (s == NULL)
     221             :                 return NULL;
     222             : 
     223         152 :         s->binary = binary;
     224         152 :         s->read = upload_read;
     225         152 :         s->close = upload_close;
     226         152 :         s->destroy = destroy;
     227             : 
     228         152 :         return s;
     229             : }
     230             : 
     231             : stream*
     232          75 : mapi_request_download(const char *filename, bool binary, bstream *bs, stream *ws)
     233             : {
     234          75 :         const char *req = binary ? "wb" : "w";
     235          75 :         stream *s = setup_transfer(req, filename, bs, ws);
     236          75 :         if (s == NULL)
     237             :                 return NULL;
     238             : 
     239          75 :         s->binary = binary;
     240          75 :         s->readonly = false;
     241          75 :         s->write = download_write;
     242          75 :         s->close = download_close;
     243          75 :         s->destroy = destroy;
     244             : 
     245          75 :         return s;
     246             : }

Generated by: LCOV version 1.14