LCOV - code coverage report
Current view: top level - monetdb5/modules/atoms - streams.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 63 97 64.9 %
Date: 2024-04-25 20:03:45 Functions: 11 15 73.3 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /*
      14             :  *  Niels Nes
      15             :  *  A simple interface to IO streams
      16             :  * All file IO is tunneled through the stream library, which guarantees
      17             :  * cross-platform capabilities.  Several protocols are provided, e.g. it
      18             :  * can be used to open 'non compressed, gzipped, bzip2ed' data files.  It
      19             :  * encapsulates the corresponding library managed in common/stream.
      20             :  */
      21             : 
      22             : #include "monetdb_config.h"
      23             : #include "streams.h"
      24             : #include "mal_exception.h"
      25             : 
      26             : static str
      27           1 : mnstr_open_rstreamwrap(Stream *S, str *filename)
      28             : {
      29           1 :         stream *s;
      30             : 
      31           1 :         if ((s = open_rstream(*filename)) == NULL
      32           1 :                 || mnstr_errnr(s) != MNSTR_NO__ERROR) {
      33           0 :                 if (s)
      34           0 :                         close_stream(s);
      35           0 :                 throw(IO, "streams.open", "could not open file '%s': %s",
      36             :                           *filename, mnstr_peek_error(NULL));
      37             :         } else {
      38           1 :                 *(stream **) S = s;
      39             :         }
      40             : 
      41           1 :         return MAL_SUCCEED;
      42             : }
      43             : 
      44             : static str
      45           1 : mnstr_open_wstreamwrap(Stream *S, str *filename)
      46             : {
      47           1 :         stream *s;
      48             : 
      49           1 :         if ((s = open_wstream(*filename)) == NULL
      50           1 :                 || mnstr_errnr(s) != MNSTR_NO__ERROR) {
      51           0 :                 if (s)
      52           0 :                         close_stream(s);
      53           0 :                 throw(IO, "streams.open", "could not open file '%s': %s",
      54             :                           *filename, mnstr_peek_error(NULL));
      55             :         } else {
      56           1 :                 *(stream **) S = s;
      57             :         }
      58             : 
      59           1 :         return MAL_SUCCEED;
      60             : }
      61             : 
      62             : static str
      63           1 : mnstr_open_rastreamwrap(Stream *S, str *filename)
      64             : {
      65           1 :         stream *s;
      66             : 
      67           1 :         if ((s = open_rastream(*filename)) == NULL
      68           1 :                 || mnstr_errnr(s) != MNSTR_NO__ERROR) {
      69           0 :                 if (s)
      70           0 :                         close_stream(s);
      71           0 :                 throw(IO, "streams.open", "could not open file '%s': %s",
      72             :                           *filename, mnstr_peek_error(NULL));
      73             :         } else {
      74           1 :                 *(stream **) S = s;
      75             :         }
      76             : 
      77           1 :         return MAL_SUCCEED;
      78             : }
      79             : 
      80             : static str
      81           1 : mnstr_open_wastreamwrap(Stream *S, str *filename)
      82             : {
      83           1 :         stream *s;
      84             : 
      85           1 :         if ((s = open_wastream(*filename)) == NULL
      86           1 :                 || mnstr_errnr(s) != MNSTR_NO__ERROR) {
      87           0 :                 if (s)
      88           0 :                         close_stream(s);
      89           0 :                 throw(IO, "streams.open", "could not open file '%s': %s",
      90             :                           *filename, mnstr_peek_error(NULL));
      91             :         } else {
      92           1 :                 *(stream **) S = s;
      93             :         }
      94             : 
      95           1 :         return MAL_SUCCEED;
      96             : }
      97             : 
      98             : static str
      99           1 : mnstr_write_stringwrap(void *ret, Stream *S, str *data)
     100             : {
     101           1 :         stream *s = *(stream **) S;
     102           1 :         (void) ret;
     103             : 
     104           1 :         if (mnstr_write(s, *data, 1, strlen(*data)) < 0)
     105           0 :                 throw(IO, "streams.writeStr", "failed to write string");
     106             : 
     107             :         return MAL_SUCCEED;
     108             : }
     109             : 
     110             : static str
     111           1 : mnstr_writeIntwrap(void *ret, Stream *S, int *data)
     112             : {
     113           1 :         stream *s = *(stream **) S;
     114           1 :         (void) ret;
     115             : 
     116           1 :         if (!mnstr_writeInt(s, *data))
     117           0 :                 throw(IO, "streams.writeInt", "failed to write int");
     118             : 
     119             :         return MAL_SUCCEED;
     120             : }
     121             : 
     122             : static str
     123           1 : mnstr_readIntwrap(int *ret, Stream *S)
     124             : {
     125           1 :         stream *s = *(stream **) S;
     126             : 
     127           1 :         if (mnstr_readInt(s, ret) != 1)
     128           0 :                 throw(IO, "streams.readInt", "failed to read int");
     129             : 
     130             :         return MAL_SUCCEED;
     131             : }
     132             : 
     133             : #define CHUNK (64 * 1024)
     134             : static str
     135           1 : mnstr_read_stringwrap(str *res, Stream *S)
     136             : {
     137           1 :         stream *s = *(stream **) S;
     138           1 :         ssize_t len = 0;
     139           1 :         size_t size = CHUNK +1;
     140           1 :         char *buf = GDKmalloc(size), *start = buf, *tmp;
     141             : 
     142           1 :         if (buf == NULL)
     143           0 :                 throw(MAL, "mnstr_read_stringwrap", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     144           2 :         while ((len = mnstr_read(s, start, 1, CHUNK)) > 0) {
     145           1 :                 size += len;
     146           1 :                 tmp = GDKrealloc(buf, size);
     147           1 :                 if (tmp == NULL) {
     148           0 :                         GDKfree(buf);
     149           0 :                         throw(MAL, "mnstr_read_stringwrap",
     150             :                                   SQLSTATE(HY013) MAL_MALLOC_FAIL);
     151             :                 }
     152           1 :                 buf = tmp;
     153           1 :                 start = buf + size - CHUNK -1;
     154             : 
     155           1 :                 *start = '\0';
     156             :         }
     157           1 :         if (len < 0)
     158           0 :                 throw(IO, "streams.readStr", "failed to read string");
     159           1 :         start += len;
     160           1 :         *start = '\0';
     161           1 :         *res = buf;
     162             : 
     163           1 :         return MAL_SUCCEED;
     164             : }
     165             : 
     166             : static str
     167           1 : mnstr_flush_streamwrap(void *ret, Stream *S)
     168             : {
     169           1 :         stream *s = *(stream **) S;
     170           1 :         (void) ret;
     171             : 
     172           1 :         if (mnstr_flush(s, MNSTR_FLUSH_DATA))
     173           0 :                 throw(IO, "streams.flush", "failed to flush stream");
     174             : 
     175             :         return MAL_SUCCEED;
     176             : }
     177             : 
     178             : static str
     179           4 : mnstr_close_streamwrap(void *ret, Stream *S)
     180             : {
     181           4 :         (void) ret;
     182             : 
     183           4 :         close_stream(*(stream **) S);
     184             : 
     185           4 :         return MAL_SUCCEED;
     186             : }
     187             : 
     188             : static str
     189           0 : open_block_streamwrap(Stream *S, Stream *is)
     190             : {
     191           0 :         if ((*(stream **) S = block_stream(*(stream **) is)) == NULL)
     192           0 :                 throw(IO, "bstreams.open", "failed to open block stream");
     193             : 
     194             :         return MAL_SUCCEED;
     195             : }
     196             : 
     197             : static str
     198           0 : bstream_create_wrapwrap(Bstream *Bs, Stream *S, int *bufsize)
     199             : {
     200           0 :         if ((*(bstream **) Bs = bstream_create(*(stream **) S,
     201           0 :                                                                                    (size_t) *bufsize)) == NULL)
     202           0 :                 throw(IO, "bstreams.create", "failed to create block stream");
     203             : 
     204             :         return MAL_SUCCEED;
     205             : }
     206             : 
     207             : static str
     208           0 : bstream_destroy_wrapwrap(void *ret, Bstream *BS)
     209             : {
     210           0 :         (void) ret;
     211             : 
     212           0 :         bstream_destroy(*(bstream **) BS);
     213             : 
     214           0 :         return MAL_SUCCEED;
     215             : }
     216             : 
     217             : static str
     218           0 : bstream_read_wrapwrap(int *res, Bstream *BS, int *size)
     219             : {
     220           0 :         *res = (int) bstream_read(*(bstream **) BS, (size_t) *size);
     221             : 
     222           0 :         return MAL_SUCCEED;
     223             : }
     224             : 
     225             : #include "mel.h"
     226             : mel_atom streams_init_atoms[] = {
     227             :  { .name="streams", .basetype="ptr", },
     228             :  { .name="bstream", .basetype="ptr", },  { .cmp=NULL }
     229             : };
     230             : mel_func streams_init_funcs[] = {
     231             :  command("streams", "openReadBytes", mnstr_open_rstreamwrap, true, "open a file stream for reading", args(1,2, arg("",streams),arg("filename",str))),
     232             :  command("streams", "openWriteBytes", mnstr_open_wstreamwrap, true, "open a file stream for writing", args(1,2, arg("",streams),arg("filename",str))),
     233             :  command("streams", "openRead", mnstr_open_rastreamwrap, true, "open ascii file stream for reading", args(1,2, arg("",streams),arg("filename",str))),
     234             :  command("streams", "openWrite", mnstr_open_wastreamwrap, true, "open ascii file stream for writing", args(1,2, arg("",streams),arg("filename",str))),
     235             :  command("streams", "blocked", open_block_streamwrap, true, "open a block based stream", args(1,2, arg("",streams),arg("s",streams))),
     236             :  command("streams", "writeStr", mnstr_write_stringwrap, true, "write data on the stream", args(1,3, arg("",void),arg("s",streams),arg("data",str))),
     237             :  command("streams", "writeInt", mnstr_writeIntwrap, true, "write data on the stream", args(1,3, arg("",void),arg("s",streams),arg("data",int))),
     238             :  command("streams", "readStr", mnstr_read_stringwrap, true, "read string data from the stream", args(1,2, arg("",str),arg("s",streams))),
     239             :  command("streams", "readInt", mnstr_readIntwrap, true, "read integer data from the stream", args(1,2, arg("",int),arg("s",streams))),
     240             :  command("streams", "flush", mnstr_flush_streamwrap, true, "flush the stream", args(0,1, arg("s",streams))),
     241             :  command("streams", "close", mnstr_close_streamwrap, true, "close and destroy the stream s", args(0,1, arg("s",streams))),
     242             :  command("streams", "create", bstream_create_wrapwrap, true, "create a buffered stream", args(1,3, arg("",bstream),arg("s",streams),arg("bufsize",int))),
     243             :  command("streams", "destroy", bstream_destroy_wrapwrap, true, "destroy bstream", args(0,1, arg("s",bstream))),
     244             :  command("streams", "read", bstream_read_wrapwrap, true, "read at least size bytes into the buffer of s", args(1,3, arg("",int),arg("s",bstream),arg("size",int))),
     245             :  { .imp=NULL }
     246             : };
     247             : #include "mal_import.h"
     248             : #ifdef _MSC_VER
     249             : #undef read
     250             : #pragma section(".CRT$XCU",read)
     251             : #endif
     252         329 : LIB_STARTUP_FUNC(init_streams_mal)
     253         329 : { mal_module("streams", streams_init_atoms, streams_init_funcs); }

Generated by: LCOV version 1.14