Mercurial > hg > monetdb-java
changeset 533:b75464874130 onclient
Keep better track of whether the server has cancelled the upload
IOExceptions are suppressed by PrintStreams print/println methods.
This means the client may not realize the server cancelled
and we must suppress all further attempts to write.
Also, the end of upload handshake is different if a cancellation occurred.
author | Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com> |
---|---|
date | Fri, 27 Aug 2021 13:45:38 +0200 (2021-08-27) |
parents | 41a28ec7d1c1 |
children | b437529144f1 |
files | src/main/java/org/monetdb/mcl/net/MapiSocket.java tests/OnClientTester.java |
diffstat | 2 files changed, 60 insertions(+), 8 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java +++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java @@ -1194,6 +1194,7 @@ public class MapiSocket { /* cannot (yet public final static int DEFAULT_CHUNK_SIZE = 1024 * 1024; private final int chunkSize; private boolean closed = false; + private boolean serverCancelled = false; private int chunkLeft; private byte[] promptBuffer; @@ -1215,6 +1216,12 @@ public class MapiSocket { /* cannot (yet @Override public void write(int b) throws IOException { + if (serverCancelled) { + // We have already thrown an exception and apparently that has been ignored. + // Probably because they're calling print methods instead of write. + // Throw another one, maybe they'll catch this one. + throw new IOException("Server aborted the upload"); + } handleChunking(); super.write(b); wrote(1); @@ -1227,6 +1234,12 @@ public class MapiSocket { /* cannot (yet @Override public void write(byte[] b, int off, int len) throws IOException { + if (serverCancelled) { + // We have already thrown an exception and apparently that has been ignored. + // Probably because they're calling print methods instead of write. + // Throw another one, maybe they'll catch this one. + throw new IOException("Server aborted the upload"); + } while (len > 0) { handleChunking(); int toWrite = Integer.min(len, chunkLeft); @@ -1247,6 +1260,15 @@ public class MapiSocket { /* cannot (yet if (closed) { return; } + closed = true; + + if (serverCancelled) + closeAfterServerCancelled(); + else + closeAfterSuccesfulUpload(); + } + + private void closeAfterSuccesfulUpload() throws IOException { if (chunkLeft != chunkSize) { // flush pending data flushAndReadPrompt(); @@ -1259,6 +1281,10 @@ public class MapiSocket { /* cannot (yet } } + private void closeAfterServerCancelled() throws IOException { + // nothing to do here, we have already read the error prompt. + } + private void wrote(int i) { chunkLeft -= i; } @@ -1278,6 +1304,9 @@ public class MapiSocket { /* cannot (yet case MORE: return; case FILETRANSFER: + // Note, if the caller is calling print methods instead of write, the IO exception gets hidden. + // This is unfortunate but there's nothing we can do about it. + serverCancelled = true; throw new IOException("Server aborted the upload"); default: throw new IOException("Expected MORE/DONE from server, got " + lineType);
--- a/tests/OnClientTester.java +++ b/tests/OnClientTester.java @@ -49,46 +49,58 @@ public final class OnClientTester extend public void test_Upload() throws Exception { prepare(); - conn.setUploadHandler(new MyUploadHandler(100)); + MyUploadHandler handler = new MyUploadHandler(100); + conn.setUploadHandler(handler); update("COPY INTO foo FROM 'banana' ON CLIENT", 100); + assertEq("handler encountered write error", false, handler.encounteredWriteError()); queryInt("SELECT COUNT(*) FROM foo", 100); } public void test_ClientRefusesUpload() throws Exception { prepare(); - conn.setUploadHandler(new MyUploadHandler("immediate error")); + MyUploadHandler handler = new MyUploadHandler("immediate error"); + conn.setUploadHandler(handler); expectError("COPY INTO foo FROM 'banana' ON CLIENT", "immediate error"); + assertEq("handler encountered write error", false, handler.encounteredWriteError()); queryInt("SELECT COUNT(*) FROM foo", 0); } public void test_Offset0() throws SQLException, Failure { prepare(); - conn.setUploadHandler(new MyUploadHandler(100)); + MyUploadHandler handler = new MyUploadHandler(100); + conn.setUploadHandler(handler); update("COPY OFFSET 0 INTO foo FROM 'banana' ON CLIENT", 100); + assertEq("handler encountered write error", false, handler.encounteredWriteError()); queryInt("SELECT MIN(i) FROM foo", 1); queryInt("SELECT MAX(i) FROM foo", 100); } public void test_Offset1() throws SQLException, Failure { prepare(); - conn.setUploadHandler(new MyUploadHandler(100)); + MyUploadHandler handler = new MyUploadHandler(100); + conn.setUploadHandler(handler); update("COPY OFFSET 1 INTO foo FROM 'banana' ON CLIENT", 100); + assertEq("handler encountered write error", false, handler.encounteredWriteError()); queryInt("SELECT MIN(i) FROM foo", 1); queryInt("SELECT MAX(i) FROM foo", 100); } public void test_Offset5() throws SQLException, Failure { prepare(); - conn.setUploadHandler(new MyUploadHandler(100)); + MyUploadHandler handler = new MyUploadHandler(100); + conn.setUploadHandler(handler); update("COPY OFFSET 5 INTO foo FROM 'banana' ON CLIENT", 96); + assertEq("handler encountered write error", false, handler.encounteredWriteError()); queryInt("SELECT MIN(i) FROM foo", 5); queryInt("SELECT MAX(i) FROM foo", 100); } public void test_ServerStopsReading() throws SQLException, Failure { prepare(); - conn.setUploadHandler(new MyUploadHandler(100)); - update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT", 96); + MyUploadHandler handler = new MyUploadHandler(100); + conn.setUploadHandler(handler); + update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT", 10); + assertEq("handler encountered write error", true, handler.encounteredWriteError()); // Server stopped reading after 10 rows. Will we stay in sync? queryInt("SELECT COUNT(i) FROM foo", 10); } @@ -125,6 +137,7 @@ public final class OnClientTester extend conn.setUploadHandler(handler); handler.setChunkSize(1024 * 1024); update("COPY INTO foo FROM 'banana' ON CLIENT", n); + assertEq("handler encountered write error", false, handler.encounteredWriteError()); queryInt("SELECT COUNT(DISTINCT i) FROM foo", n); } @@ -183,8 +196,10 @@ public final class OnClientTester extend public void test_FailUploadLate() throws SQLException, Failure { prepare(); - conn.setUploadHandler(new MyUploadHandler(100, 50, "i don't like line 50")); + MyUploadHandler handler = new MyUploadHandler(100, 50, "i don't like line 50"); + conn.setUploadHandler(handler); expectError("COPY INTO foo FROM 'banana' ON CLIENT", "i don't like"); + assertEq("handler encountered write error", false, handler.encounteredWriteError()); assertEq("connection is closed", true, conn.isClosed()); } @@ -202,6 +217,7 @@ public final class OnClientTester extend private final int rows; private final int errorAt; private final String errorMessage; + private boolean encounteredWriteError; private int chunkSize = 100; // small number to trigger more bugs @@ -237,9 +253,16 @@ public final class OnClientTester extend throw new IOException(errorMessage); } stream.printf("%d|%d%n", i + 1, i + 1); + if (i % 25 == 0 && stream.checkError()) { + encounteredWriteError = true; + break; + } } } + public Object encounteredWriteError() { + return encounteredWriteError; + } } static class MyDownloadHandler implements MonetDownloadHandler {