diff src/main/java/org/monetdb/mcl/net/MapiSocket.java @ 502:83354bd21320 onclient

Upload fake data when an upload request is received
author Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com>
date Tue, 17 Aug 2021 12:21:09 +0200 (2021-08-17)
parents eaad79c3235f
children 1db3398b78f7
line wrap: on
line diff
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java
+++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
@@ -1122,6 +1122,12 @@ public class MapiSocket {	/* cannot (yet
 		}
 	}
 
+
+	public UploadStream uploadStream() {
+		return new UploadStream();
+	}
+
+
 	/**
 	 * Destructor called by garbage collector before destroying this
 	 * object tries to disconnect the MonetDB connection if it has not
@@ -1135,4 +1141,98 @@ public class MapiSocket {	/* cannot (yet
 		close();
 		super.finalize();
 	}
+
+	public class UploadStream extends FilterOutputStream {
+		private final int CHUNK_SIZE = 100;
+		private boolean closed = false;
+		private int chunkLeft = CHUNK_SIZE;
+		private byte[] promptBuffer;
+
+		UploadStream() {
+			super(toMonet);
+			assert LineType.MORE.bytes().length == LineType.FILETRANSFER.bytes().length;
+			int promptLen = LineType.MORE.bytes().length;
+			promptBuffer = new byte[promptLen + 1];
+		}
+
+		@Override
+		public void write(int b) throws IOException {
+			handleChunking();
+			super.write(b);
+			wrote(1);
+		}
+
+		@Override
+		public void write(byte[] b) throws IOException {
+			write(b, 0, b.length);
+		}
+
+		@Override
+		public void write(byte[] b, int off, int len) throws IOException {
+			while (len > 0) {
+				handleChunking();
+				int toWrite = Integer.min(len, chunkLeft);
+				super.write(b, off, toWrite);
+				off += toWrite;
+				len -= toWrite;
+				wrote(toWrite);
+			}
+		}
+
+		@Override
+		public void flush() throws IOException {
+			// suppress flushes
+		}
+
+		@Override
+		public void close() throws IOException {
+			if (closed) {
+				return;
+			}
+			if (chunkLeft != CHUNK_SIZE) {
+				// flush pending data
+				flushAndReadPrompt();
+			}
+			// send empty block
+			out.flush();
+			LineType acknowledgement = readPrompt();
+			if (acknowledgement != LineType.FILETRANSFER) {
+				throw new IOException("Expected server to acknowledge end of file");
+			}
+		}
+
+		private void wrote(int i) {
+			chunkLeft -= i;
+		}
+
+		private void handleChunking() throws IOException {
+			if (chunkLeft > 0) {
+				return;
+			}
+			flushAndReadPrompt();
+		}
+
+		private void flushAndReadPrompt() throws IOException {
+			out.flush();
+			chunkLeft = CHUNK_SIZE;
+			LineType lineType = readPrompt();
+			switch (lineType) {
+				case MORE:
+					return;
+				case FILETRANSFER:
+					throw new IOException("Server aborted the upload");
+				default:
+					throw new IOException("Expected MORE/DONE from server, got " + lineType);
+			}
+		}
+
+		private LineType readPrompt() throws IOException {
+			int nread = fromMonet.read(promptBuffer);
+			if (nread != promptBuffer.length || promptBuffer[promptBuffer.length - 1] != '\n') {
+				throw new IOException("server return incomplete prompt");
+			}
+			LineType lineType = LineType.classify(promptBuffer);
+			return lineType;
+		}
+	}
 }