Mercurial > hg > monetdb-java
changeset 557:ce2b616ed22e onclient
Add a cancellation callback to UploadHandler
author | Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com> |
---|---|
date | Thu, 16 Sep 2021 15:12:26 +0200 (2021-09-16) |
parents | 87feb93330a6 |
children | ebf65f416da9 |
files | src/main/java/org/monetdb/jdbc/MonetConnection.java src/main/java/org/monetdb/mcl/net/MapiSocket.java tests/OnClientTester.java |
diffstat | 3 files changed, 41 insertions(+), 10 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/org/monetdb/jdbc/MonetConnection.java +++ b/src/main/java/org/monetdb/jdbc/MonetConnection.java @@ -3236,7 +3236,7 @@ public class MonetConnection } long linesToSkip = offset >= 1 ? offset - 1 : 0; - Upload handle = new Upload(server); + Upload handle = new Upload(server, uploadHandler::uploadCancelled); boolean wasFaking = server.setInsertFakePrompts(false); try { uploadHandler.handleUpload(handle, path, textMode, linesToSkip); @@ -3290,6 +3290,13 @@ public class MonetConnection * where both 0 and 1 mean 'upload everything' */ void handleUpload(Upload handle, String name, boolean textMode, long linesToSkip) throws IOException; + + /** + * Called when the upload is cancelled halfway by the server. + * + * The default implementation does nothing. + */ + default void uploadCancelled() {} } /** @@ -3316,12 +3323,14 @@ public class MonetConnection */ public static class Upload { private final MapiSocket server; + private final Runnable cancellationCallback; private PrintStream print = null; private String error = null; private int customChunkSize = -1; - Upload(MapiSocket server) { + Upload(MapiSocket server, Runnable cancellationCallback) { this.server = server; + this.cancellationCallback = cancellationCallback; } /** @@ -3362,6 +3371,7 @@ public class MonetConnection if (print == null) { try { MapiSocket.UploadStream up = customChunkSize >= 0 ? server.uploadStream(customChunkSize) : server.uploadStream(); + up.setCancellationCallback(cancellationCallback); print = new PrintStream(up, false, "UTF-8"); up.write('\n'); } catch (UnsupportedEncodingException e) {
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java +++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java @@ -1233,6 +1233,7 @@ public class MapiSocket { /* cannot (yet private boolean serverCancelled = false; private int chunkLeft; private byte[] promptBuffer; + private Runnable cancellationCallback = null; /** Create an UploadStream with the given chunk size */ UploadStream(int chunkSize) { @@ -1252,6 +1253,12 @@ public class MapiSocket { /* cannot (yet this(DEFAULT_CHUNK_SIZE); } + /** Set a callback to be invoked if the server cancels the upload + */ + public void setCancellationCallback(Runnable cancellationCallback) { + this.cancellationCallback = cancellationCallback; + } + @Override public void write(int b) throws IOException { if (serverCancelled) { @@ -1345,6 +1352,9 @@ public class MapiSocket { /* cannot (yet // Note, if the caller is calling print methods instead of write, the IO exception gets hidden. // This is unfortunate but there's nothing we can do about it. serverCancelled = true; + if (cancellationCallback != null) { + cancellationCallback.run(); + } throw new IOException("Server aborted the upload"); default: throw new IOException("Expected MORE/DONE from server, got " + lineType);
--- a/tests/OnClientTester.java +++ b/tests/OnClientTester.java @@ -63,7 +63,7 @@ public final class OnClientTester extend MyUploadHandler handler = new MyUploadHandler(100); conn.setUploadHandler(handler); update("COPY INTO foo FROM 'banana' ON CLIENT", 100); - assertEq("handler encountered write error", false, handler.encounteredWriteError()); + assertEq("cancellation callback called", false, handler.isCancelled()); queryInt("SELECT COUNT(*) FROM foo", 100); } @@ -72,7 +72,7 @@ public final class OnClientTester extend MyUploadHandler handler = new MyUploadHandler("immediate error"); conn.setUploadHandler(handler); expectError("COPY INTO foo FROM 'banana' ON CLIENT", "immediate error"); - assertEq("handler encountered write error", false, handler.encounteredWriteError()); + assertEq("cancellation callback called", false, handler.isCancelled()); queryInt("SELECT COUNT(*) FROM foo", 0); } @@ -81,7 +81,7 @@ public final class OnClientTester extend MyUploadHandler handler = new MyUploadHandler(100); conn.setUploadHandler(handler); update("COPY OFFSET 0 INTO foo FROM 'banana' ON CLIENT", 100); - assertEq("handler encountered write error", false, handler.encounteredWriteError()); + assertEq("cancellation callback called", false, handler.isCancelled()); queryInt("SELECT MIN(i) FROM foo", 1); queryInt("SELECT MAX(i) FROM foo", 100); } @@ -91,7 +91,7 @@ public final class OnClientTester extend MyUploadHandler handler = new MyUploadHandler(100); conn.setUploadHandler(handler); update("COPY OFFSET 1 INTO foo FROM 'banana' ON CLIENT", 100); - assertEq("handler encountered write error", false, handler.encounteredWriteError()); + assertEq("cancellation callback called", false, handler.isCancelled()); queryInt("SELECT MIN(i) FROM foo", 1); queryInt("SELECT MAX(i) FROM foo", 100); } @@ -101,7 +101,7 @@ public final class OnClientTester extend MyUploadHandler handler = new MyUploadHandler(100); conn.setUploadHandler(handler); update("COPY OFFSET 5 INTO foo FROM 'banana' ON CLIENT", 96); - assertEq("handler encountered write error", false, handler.encounteredWriteError()); + assertEq("cancellation callback called", false, handler.isCancelled()); queryInt("SELECT MIN(i) FROM foo", 5); queryInt("SELECT MAX(i) FROM foo", 100); } @@ -111,6 +111,7 @@ public final class OnClientTester extend MyUploadHandler handler = new MyUploadHandler(100); conn.setUploadHandler(handler); update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT", 10); + assertEq("cancellation callback called", true, handler.isCancelled()); assertEq("handler encountered write error", true, handler.encounteredWriteError()); // Server stopped reading after 10 rows. Will we stay in sync? queryInt("SELECT COUNT(i) FROM foo", 10); @@ -149,7 +150,7 @@ public final class OnClientTester extend conn.setUploadHandler(handler); handler.setChunkSize(1024 * 1024); update("COPY INTO foo FROM 'banana' ON CLIENT", n); - assertEq("handler encountered write error", false, handler.encounteredWriteError()); + assertEq("cancellation callback called", false, handler.isCancelled()); queryInt("SELECT COUNT(DISTINCT i) FROM foo", n); } @@ -213,7 +214,7 @@ public final class OnClientTester extend MyUploadHandler handler = new MyUploadHandler(100, 50, "i don't like line 50"); conn.setUploadHandler(handler); expectError("COPY INTO foo FROM 'banana' ON CLIENT", "i don't like"); - assertEq("handler encountered write error", false, handler.encounteredWriteError()); + assertEq("cancellation callback called", false, handler.isCancelled()); assertEq("connection is closed", true, conn.isClosed()); } @@ -254,7 +255,8 @@ public final class OnClientTester extend private final long rows; private final long errorAt; private final String errorMessage; - private boolean encounteredWriteError; + private boolean encounteredWriteError = false; + private boolean cancelled = false; private int chunkSize = 100; // small number to trigger more bugs @@ -277,6 +279,11 @@ public final class OnClientTester extend } @Override + public void uploadCancelled() { + cancelled = true; + } + + @Override public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException { if (errorAt == -1 && errorMessage != null) { handle.sendError(errorMessage); @@ -299,6 +306,10 @@ public final class OnClientTester extend public Object encounteredWriteError() { return encounteredWriteError; } + + public boolean isCancelled() { + return cancelled; + } } static class MyDownloadHandler implements DownloadHandler {