changeset 521:72007c4f8f8a onclient

Allow MonetUploadHandler to configure the chunk size (every chunk_size bytes, the client suspends the upload and asks the server whether to continue)
author Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com>
date Thu, 26 Aug 2021 11:50:11 +0200 (2021-08-26)
parents b4c7816e3592
children 279414178dc6
files src/main/java/org/monetdb/jdbc/MonetConnection.java src/main/java/org/monetdb/mcl/net/MapiSocket.java
diffstat 2 files changed, 24 insertions(+), 6 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/org/monetdb/jdbc/MonetConnection.java
+++ b/src/main/java/org/monetdb/jdbc/MonetConnection.java
@@ -3267,6 +3267,7 @@ public class MonetConnection
 		private final MapiSocket server;
 		private PrintStream print = null;
 		private String error = null;
+		private int customChunkSize = -1;
 
 		Upload(MapiSocket server) {
 			this.server = server;
@@ -3279,13 +3280,17 @@ public class MonetConnection
 			error = errorMessage;
 		}
 
+		public void setChunkSize(int chunkSize) {
+			this.customChunkSize = chunkSize;
+		}
+
 		public PrintStream getStream() throws IOException {
 			if (error != null) {
 				throw new IOException("Cannot send data after an error has been sent");
 			}
 			if (print == null) {
 				try {
-					MapiSocket.UploadStream up = server.uploadStream();
+					MapiSocket.UploadStream up = customChunkSize >= 0 ? server.uploadStream(customChunkSize) : server.uploadStream();
 					print = new PrintStream(up, false, "UTF-8");
 					up.write('\n');
 				} catch (UnsupportedEncodingException e) {
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java
+++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
@@ -1164,6 +1164,9 @@ public class MapiSocket {	/* cannot (yet
 		}
 	}
 
+	public UploadStream uploadStream(int chunkSize) {
+		return new UploadStream(chunkSize);
+	}
 
 	public UploadStream uploadStream() {
 		return new UploadStream();
@@ -1188,16 +1191,26 @@ public class MapiSocket {	/* cannot (yet
 	}
 
 	public class UploadStream extends FilterOutputStream {
-		private final int CHUNK_SIZE = 100;
+		public final static int DEFAULT_CHUNK_SIZE = 1024 * 1024;
+		private final int chunkSize;
 		private boolean closed = false;
-		private int chunkLeft = CHUNK_SIZE;
+		private int chunkLeft;
 		private byte[] promptBuffer;
 
-		UploadStream() {
+		UploadStream(int chunkSize) {
 			super(toMonet);
+			if (chunkSize <= 0) {
+				throw new IllegalArgumentException("chunk size must be positive");
+			}
+			this.chunkSize = chunkSize;
 			assert LineType.MORE.bytes().length == LineType.FILETRANSFER.bytes().length;
 			int promptLen = LineType.MORE.bytes().length;
 			promptBuffer = new byte[promptLen + 1];
+			chunkLeft = this.chunkSize;
+		}
+
+		UploadStream() {
+			this(DEFAULT_CHUNK_SIZE);
 		}
 
 		@Override
@@ -1234,7 +1247,7 @@ public class MapiSocket {	/* cannot (yet
 			if (closed) {
 				return;
 			}
-			if (chunkLeft != CHUNK_SIZE) {
+			if (chunkLeft != chunkSize) {
 				// flush pending data
 				flushAndReadPrompt();
 			}
@@ -1259,7 +1272,7 @@ public class MapiSocket {	/* cannot (yet
 
 		private void flushAndReadPrompt() throws IOException {
 			out.flush();
-			chunkLeft = CHUNK_SIZE;
+			chunkLeft = chunkSize;
 			LineType lineType = readPrompt();
 			switch (lineType) {
 				case MORE: