Mercurial > hg > monetdb-java
changeset 509:455497783026 onclient
Add MapiSocket.DownloadStream
author | Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com> |
---|---|
date | Thu, 19 Aug 2021 15:22:29 +0200 (2021-08-19) |
parents | 7730d65dfd55 |
children | 13b48891ac54 |
files | src/main/java/org/monetdb/mcl/net/MapiSocket.java |
diffstat | 1 files changed, 78 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java +++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java @@ -1105,7 +1105,12 @@ public class MapiSocket { /* cannot (yet } int readBlock() throws IOException { - return BlockInputStream.this.readBlock(); + boolean wasFaking = setInsertFakeFlush(false); + try { + return BlockInputStream.this.readBlock(); + } finally { + setInsertFakeFlush(wasFaking); + } } boolean wasEndBlock() { @@ -1163,6 +1168,9 @@ public class MapiSocket { /* cannot (yet return new UploadStream(); } + public DownloadStream downloadStream() { + return new DownloadStream(fromMonet.getRaw(), toMonet); + } /** * Destructor called by garbage collector before destroying this @@ -1271,4 +1279,73 @@ public class MapiSocket { /* cannot (yet return lineType; } } + + public static class DownloadStream extends InputStream { + + private final BlockInputStream.Raw rawIn; + private final OutputStream out; + private boolean endBlockSeen = false; + + DownloadStream(BlockInputStream.Raw rawIn, OutputStream out) { + this.rawIn = rawIn; + this.out = out; + } + + void nextBlock() throws IOException { + if (endBlockSeen) + return; + int ret = rawIn.readBlock(); + if (ret < 0 || rawIn.wasEndBlock()) { + endBlockSeen = true; + } + } + + @Override + public void close() throws IOException { + while (!endBlockSeen) { + nextBlock(); + } + // Send acknowledgement to server + out.write('c'); + out.flush(); + // Do whatever super has to do + super.close(); + } + + @Override + public int read() throws IOException { + byte[] buf = { 0 }; + int nread = read(buf, 0, 1); + if (nread == 1) + return buf[0]; + else + return -1; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int origOff = off; + while (len > 0) { + int chunk = Integer.min(len, rawIn.getLength() - rawIn.getPosition()); + if (chunk > 0) { + // make progress copying some bytes + System.arraycopy(rawIn.getBytes(), rawIn.getPosition(), b, off, chunk); + off += chunk; + rawIn.consume(chunk); + len -= chunk; + } else { + // make progress fetching data + if (endBlockSeen) + break; + nextBlock(); + } + } + if (off == origOff && endBlockSeen) + return -1; + else + return off - origOff; + } + } + + }