changeset 509:455497783026 onclient

Add MapiSocket.DownloadStream
author Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com>
date Thu, 19 Aug 2021 15:22:29 +0200 (2021-08-19)
parents 7730d65dfd55
children 13b48891ac54
files src/main/java/org/monetdb/mcl/net/MapiSocket.java
diffstat 1 files changed, 78 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java
+++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
@@ -1105,7 +1105,12 @@ public class MapiSocket {	/* cannot (yet
 			}
 
 			int readBlock() throws IOException {
-				return BlockInputStream.this.readBlock();
+				boolean wasFaking = setInsertFakeFlush(false);
+				try {
+					return BlockInputStream.this.readBlock();
+				} finally {
+					setInsertFakeFlush(wasFaking);
+				}
 			}
 
 			boolean wasEndBlock() {
@@ -1163,6 +1168,9 @@ public class MapiSocket {	/* cannot (yet
 		return new UploadStream();
 	}
 
+	public DownloadStream downloadStream() {
+		return new DownloadStream(fromMonet.getRaw(), toMonet);
+	}
 
 	/**
 	 * Destructor called by garbage collector before destroying this
@@ -1271,4 +1279,73 @@ public class MapiSocket {	/* cannot (yet
 			return lineType;
 		}
 	}
+
+	public static class DownloadStream extends InputStream {
+
+		private final BlockInputStream.Raw rawIn;
+		private final OutputStream out;
+		private boolean endBlockSeen = false;
+
+		DownloadStream(BlockInputStream.Raw rawIn, OutputStream out) {
+			this.rawIn = rawIn;
+			this.out = out;
+		}
+
+		void nextBlock() throws IOException {
+			if (endBlockSeen)
+				return;
+			int ret = rawIn.readBlock();
+			if (ret < 0 || rawIn.wasEndBlock()) {
+				endBlockSeen = true;
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			while (!endBlockSeen) {
+				nextBlock();
+			}
+			// Send acknowledgement to server
+			out.write('c');
+			out.flush();
+			// Do whatever super has to do
+			super.close();
+		}
+
+		@Override
+		public int read() throws IOException {
+			byte[] buf = { 0 };
+			int nread = read(buf, 0, 1);
+			if (nread == 1)
+				return buf[0];
+			else
+				return -1;
+		}
+
+		@Override
+		public int read(byte[] b, int off, int len) throws IOException {
+			int origOff = off;
+			while (len > 0) {
+				int chunk = Integer.min(len, rawIn.getLength() - rawIn.getPosition());
+				if (chunk > 0) {
+					// make progress copying some bytes
+					System.arraycopy(rawIn.getBytes(), rawIn.getPosition(), b, off, chunk);
+					off += chunk;
+					rawIn.consume(chunk);
+					len -= chunk;
+				} else {
+					// make progress fetching data
+					if (endBlockSeen)
+						break;
+					nextBlock();
+				}
+			}
+			if (off == origOff && endBlockSeen)
+				return -1;
+			else
+				return off - origOff;
+		}
+	}
+
+
 }