changeset 557:ce2b616ed22e onclient

Add a cancellation callback to UploadHandler
author Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com>
date Thu, 16 Sep 2021 15:12:26 +0200 (2021-09-16)
parents 87feb93330a6
children ebf65f416da9
files src/main/java/org/monetdb/jdbc/MonetConnection.java src/main/java/org/monetdb/mcl/net/MapiSocket.java tests/OnClientTester.java
diffstat 3 files changed, 41 insertions(+), 10 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/org/monetdb/jdbc/MonetConnection.java
+++ b/src/main/java/org/monetdb/jdbc/MonetConnection.java
@@ -3236,7 +3236,7 @@ public class MonetConnection
 		}
 
 		long linesToSkip = offset >= 1 ? offset - 1 : 0;
-		Upload handle = new Upload(server);
+		Upload handle = new Upload(server, uploadHandler::uploadCancelled);
 		boolean wasFaking = server.setInsertFakePrompts(false);
 		try {
 			uploadHandler.handleUpload(handle, path, textMode, linesToSkip);
@@ -3290,6 +3290,13 @@ public class MonetConnection
 		 *                    where both 0 and 1 mean 'upload everything'
 		 */
 		void handleUpload(Upload handle, String name, boolean textMode, long linesToSkip) throws IOException;
+
+		/**
+		 * Called when the upload is cancelled halfway by the server.
+		 *
+		 * The default implementation does nothing.
+		 */
+		default void uploadCancelled() {}
 	}
 
 	/**
@@ -3316,12 +3323,14 @@ public class MonetConnection
 	 */
 	public static class Upload {
 		private final MapiSocket server;
+		private final Runnable cancellationCallback;
 		private PrintStream print = null;
 		private String error = null;
 		private int customChunkSize = -1;
 
-		Upload(MapiSocket server) {
+		Upload(MapiSocket server, Runnable cancellationCallback) {
 			this.server = server;
+			this.cancellationCallback = cancellationCallback;
 		}
 
 		/**
@@ -3362,6 +3371,7 @@ public class MonetConnection
 			if (print == null) {
 				try {
 					MapiSocket.UploadStream up = customChunkSize >= 0 ? server.uploadStream(customChunkSize) : server.uploadStream();
+					up.setCancellationCallback(cancellationCallback);
 					print = new PrintStream(up, false, "UTF-8");
 					up.write('\n');
 				} catch (UnsupportedEncodingException e) {
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java
+++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
@@ -1233,6 +1233,7 @@ public class MapiSocket {	/* cannot (yet
 		private boolean serverCancelled = false;
 		private int chunkLeft;
 		private byte[] promptBuffer;
+		private Runnable cancellationCallback = null;
 
 		/** Create an UploadStream with the given chunk size */
 		UploadStream(int chunkSize) {
@@ -1252,6 +1253,12 @@ public class MapiSocket {	/* cannot (yet
 			this(DEFAULT_CHUNK_SIZE);
 		}
 
+		/** Set a callback to be invoked if the server cancels the upload
+		 */
+		public void setCancellationCallback(Runnable cancellationCallback) {
+			this.cancellationCallback = cancellationCallback;
+		}
+
 		@Override
 		public void write(int b) throws IOException {
 			if (serverCancelled) {
@@ -1345,6 +1352,9 @@ public class MapiSocket {	/* cannot (yet
 					// Note, if the caller is calling print methods instead of write, the IO exception gets hidden.
 					// This is unfortunate but there's nothing we can do about it.
 					serverCancelled = true;
+					if (cancellationCallback != null) {
+						cancellationCallback.run();
+					}
 					throw new IOException("Server aborted the upload");
 				default:
 					throw new IOException("Expected MORE/DONE from server, got " + lineType);
--- a/tests/OnClientTester.java
+++ b/tests/OnClientTester.java
@@ -63,7 +63,7 @@ public final class OnClientTester extend
 		MyUploadHandler handler = new MyUploadHandler(100);
 		conn.setUploadHandler(handler);
 		update("COPY INTO foo FROM 'banana' ON CLIENT", 100);
-		assertEq("handler encountered write error", false, handler.encounteredWriteError());
+		assertEq("cancellation callback called", false, handler.isCancelled());
 		queryInt("SELECT COUNT(*) FROM foo", 100);
 	}
 
@@ -72,7 +72,7 @@ public final class OnClientTester extend
 		MyUploadHandler handler = new MyUploadHandler("immediate error");
 		conn.setUploadHandler(handler);
 		expectError("COPY INTO foo FROM 'banana' ON CLIENT", "immediate error");
-		assertEq("handler encountered write error", false, handler.encounteredWriteError());
+		assertEq("cancellation callback called", false, handler.isCancelled());
 		queryInt("SELECT COUNT(*) FROM foo", 0);
 	}
 
@@ -81,7 +81,7 @@ public final class OnClientTester extend
 		MyUploadHandler handler = new MyUploadHandler(100);
 		conn.setUploadHandler(handler);
 		update("COPY OFFSET 0 INTO foo FROM 'banana' ON CLIENT", 100);
-		assertEq("handler encountered write error", false, handler.encounteredWriteError());
+		assertEq("cancellation callback called", false, handler.isCancelled());
 		queryInt("SELECT MIN(i) FROM foo", 1);
 		queryInt("SELECT MAX(i) FROM foo", 100);
 	}
@@ -91,7 +91,7 @@ public final class OnClientTester extend
 		MyUploadHandler handler = new MyUploadHandler(100);
 		conn.setUploadHandler(handler);
 		update("COPY OFFSET 1 INTO foo FROM 'banana' ON CLIENT", 100);
-		assertEq("handler encountered write error", false, handler.encounteredWriteError());
+		assertEq("cancellation callback called", false, handler.isCancelled());
 		queryInt("SELECT MIN(i) FROM foo", 1);
 		queryInt("SELECT MAX(i) FROM foo", 100);
 	}
@@ -101,7 +101,7 @@ public final class OnClientTester extend
 		MyUploadHandler handler = new MyUploadHandler(100);
 		conn.setUploadHandler(handler);
 		update("COPY OFFSET 5 INTO foo FROM 'banana' ON CLIENT", 96);
-		assertEq("handler encountered write error", false, handler.encounteredWriteError());
+		assertEq("cancellation callback called", false, handler.isCancelled());
 		queryInt("SELECT MIN(i) FROM foo", 5);
 		queryInt("SELECT MAX(i) FROM foo", 100);
 	}
@@ -111,6 +111,7 @@ public final class OnClientTester extend
 		MyUploadHandler handler = new MyUploadHandler(100);
 		conn.setUploadHandler(handler);
 		update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT", 10);
+		assertEq("cancellation callback called", true, handler.isCancelled());
 		assertEq("handler encountered write error", true, handler.encounteredWriteError());
 		// Server stopped reading after 10 rows. Will we stay in sync?
 		queryInt("SELECT COUNT(i) FROM foo", 10);
@@ -149,7 +150,7 @@ public final class OnClientTester extend
 		conn.setUploadHandler(handler);
 		handler.setChunkSize(1024 * 1024);
 		update("COPY INTO foo FROM 'banana' ON CLIENT", n);
-		assertEq("handler encountered write error", false, handler.encounteredWriteError());
+		assertEq("cancellation callback called", false, handler.isCancelled());
 		queryInt("SELECT COUNT(DISTINCT i) FROM foo", n);
 	}
 
@@ -213,7 +214,7 @@ public final class OnClientTester extend
 		MyUploadHandler handler = new MyUploadHandler(100, 50, "i don't like line 50");
 		conn.setUploadHandler(handler);
 		expectError("COPY INTO foo FROM 'banana' ON CLIENT", "i don't like");
-		assertEq("handler encountered write error", false, handler.encounteredWriteError());
+		assertEq("cancellation callback called", false, handler.isCancelled());
 		assertEq("connection is closed", true, conn.isClosed());
 	}
 
@@ -254,7 +255,8 @@ public final class OnClientTester extend
 		private final long rows;
 		private final long errorAt;
 		private final String errorMessage;
-		private boolean encounteredWriteError;
+		private boolean encounteredWriteError = false;
+		private boolean cancelled = false;
 
 		private int chunkSize = 100; // small number to trigger more bugs
 
@@ -277,6 +279,11 @@ public final class OnClientTester extend
 		}
 
 		@Override
+		public void uploadCancelled() {
+			cancelled = true;
+		}
+
+		@Override
 		public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException {
 			if (errorAt == -1 && errorMessage != null) {
 				handle.sendError(errorMessage);
@@ -299,6 +306,10 @@ public final class OnClientTester extend
 		public Object encounteredWriteError() {
 			return encounteredWriteError;
 		}
+
+		public boolean isCancelled() {
+			return cancelled;
+		}
 	}
 
 	static class MyDownloadHandler implements DownloadHandler {