changeset 533:b75464874130 onclient

Keep better track of whether the server has cancelled the upload IOExceptions are suppressed by PrintStreams print/println methods. This means the client may not realize the server cancelled and we must suppress all further attempts to write. Also, the end of upload handshake is different if a cancellation occurred.
author Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com>
date Fri, 27 Aug 2021 13:45:38 +0200 (2021-08-27)
parents 41a28ec7d1c1
children b437529144f1
files src/main/java/org/monetdb/mcl/net/MapiSocket.java tests/OnClientTester.java
diffstat 2 files changed, 60 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java
+++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
@@ -1194,6 +1194,7 @@ public class MapiSocket {	/* cannot (yet
 		public final static int DEFAULT_CHUNK_SIZE = 1024 * 1024;
 		private final int chunkSize;
 		private boolean closed = false;
+		private boolean serverCancelled = false;
 		private int chunkLeft;
 		private byte[] promptBuffer;
 
@@ -1215,6 +1216,12 @@ public class MapiSocket {	/* cannot (yet
 
 		@Override
 		public void write(int b) throws IOException {
+			if (serverCancelled) {
+				// We have already thrown an exception and apparently that has been ignored.
+				// Probably because they're calling print methods instead of write.
+				// Throw another one, maybe they'll catch this one.
+				throw new IOException("Server aborted the upload");
+			}
 			handleChunking();
 			super.write(b);
 			wrote(1);
@@ -1227,6 +1234,12 @@ public class MapiSocket {	/* cannot (yet
 
 		@Override
 		public void write(byte[] b, int off, int len) throws IOException {
+			if (serverCancelled) {
+				// We have already thrown an exception and apparently that has been ignored.
+				// Probably because they're calling print methods instead of write.
+				// Throw another one, maybe they'll catch this one.
+				throw new IOException("Server aborted the upload");
+			}
 			while (len > 0) {
 				handleChunking();
 				int toWrite = Integer.min(len, chunkLeft);
@@ -1247,6 +1260,15 @@ public class MapiSocket {	/* cannot (yet
 			if (closed) {
 				return;
 			}
+			closed = true;
+
+			if (serverCancelled)
+				closeAfterServerCancelled();
+			else
+				closeAfterSuccesfulUpload();
+		}
+
+		private void closeAfterSuccesfulUpload() throws IOException {
 			if (chunkLeft != chunkSize) {
 				// flush pending data
 				flushAndReadPrompt();
@@ -1259,6 +1281,10 @@ public class MapiSocket {	/* cannot (yet
 			}
 		}
 
+		private void closeAfterServerCancelled() throws IOException {
+			// nothing to do here, we have already read the error prompt.
+		}
+
 		private void wrote(int i) {
 			chunkLeft -= i;
 		}
@@ -1278,6 +1304,9 @@ public class MapiSocket {	/* cannot (yet
 				case MORE:
 					return;
 				case FILETRANSFER:
+					// Note, if the caller is calling print methods instead of write, the IO exception gets hidden.
+					// This is unfortunate but there's nothing we can do about it.
+					serverCancelled = true;
 					throw new IOException("Server aborted the upload");
 				default:
 					throw new IOException("Expected MORE/DONE from server, got " + lineType);
--- a/tests/OnClientTester.java
+++ b/tests/OnClientTester.java
@@ -49,46 +49,58 @@ public final class OnClientTester extend
 
 	public void test_Upload() throws Exception {
 		prepare();
-		conn.setUploadHandler(new MyUploadHandler(100));
+		MyUploadHandler handler = new MyUploadHandler(100);
+		conn.setUploadHandler(handler);
 		update("COPY INTO foo FROM 'banana' ON CLIENT", 100);
+		assertEq("handler encountered write error", false, handler.encounteredWriteError());
 		queryInt("SELECT COUNT(*) FROM foo", 100);
 	}
 
 	public void test_ClientRefusesUpload() throws Exception {
 		prepare();
-		conn.setUploadHandler(new MyUploadHandler("immediate error"));
+		MyUploadHandler handler = new MyUploadHandler("immediate error");
+		conn.setUploadHandler(handler);
 		expectError("COPY INTO foo FROM 'banana' ON CLIENT", "immediate error");
+		assertEq("handler encountered write error", false, handler.encounteredWriteError());
 		queryInt("SELECT COUNT(*) FROM foo", 0);
 	}
 
 	public void test_Offset0() throws SQLException, Failure {
 		prepare();
-		conn.setUploadHandler(new MyUploadHandler(100));
+		MyUploadHandler handler = new MyUploadHandler(100);
+		conn.setUploadHandler(handler);
 		update("COPY OFFSET 0 INTO foo FROM 'banana' ON CLIENT", 100);
+		assertEq("handler encountered write error", false, handler.encounteredWriteError());
 		queryInt("SELECT MIN(i) FROM foo", 1);
 		queryInt("SELECT MAX(i) FROM foo", 100);
 	}
 
 	public void test_Offset1() throws SQLException, Failure {
 		prepare();
-		conn.setUploadHandler(new MyUploadHandler(100));
+		MyUploadHandler handler = new MyUploadHandler(100);
+		conn.setUploadHandler(handler);
 		update("COPY OFFSET 1 INTO foo FROM 'banana' ON CLIENT", 100);
+		assertEq("handler encountered write error", false, handler.encounteredWriteError());
 		queryInt("SELECT MIN(i) FROM foo", 1);
 		queryInt("SELECT MAX(i) FROM foo", 100);
 	}
 
 	public void test_Offset5() throws SQLException, Failure {
 		prepare();
-		conn.setUploadHandler(new MyUploadHandler(100));
+		MyUploadHandler handler = new MyUploadHandler(100);
+		conn.setUploadHandler(handler);
 		update("COPY OFFSET 5 INTO foo FROM 'banana' ON CLIENT", 96);
+		assertEq("handler encountered write error", false, handler.encounteredWriteError());
 		queryInt("SELECT MIN(i) FROM foo", 5);
 		queryInt("SELECT MAX(i) FROM foo", 100);
 	}
 
 	public void test_ServerStopsReading() throws SQLException, Failure {
 		prepare();
-		conn.setUploadHandler(new MyUploadHandler(100));
-		update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT", 96);
+		MyUploadHandler handler = new MyUploadHandler(100);
+		conn.setUploadHandler(handler);
+		update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT", 10);
+		assertEq("handler encountered write error", true, handler.encounteredWriteError());
 		// Server stopped reading after 10 rows. Will we stay in sync?
 		queryInt("SELECT COUNT(i) FROM foo", 10);
 	}
@@ -125,6 +137,7 @@ public final class OnClientTester extend
 		conn.setUploadHandler(handler);
 		handler.setChunkSize(1024 * 1024);
 		update("COPY INTO foo FROM 'banana' ON CLIENT", n);
+		assertEq("handler encountered write error", false, handler.encounteredWriteError());
 		queryInt("SELECT COUNT(DISTINCT i) FROM foo", n);
 	}
 
@@ -183,8 +196,10 @@ public final class OnClientTester extend
 
 	public void test_FailUploadLate() throws SQLException, Failure {
 		prepare();
-		conn.setUploadHandler(new MyUploadHandler(100, 50, "i don't like line 50"));
+		MyUploadHandler handler = new MyUploadHandler(100, 50, "i don't like line 50");
+		conn.setUploadHandler(handler);
 		expectError("COPY INTO foo FROM 'banana' ON CLIENT", "i don't like");
+		assertEq("handler encountered write error", false, handler.encounteredWriteError());
 		assertEq("connection is closed", true, conn.isClosed());
 	}
 
@@ -202,6 +217,7 @@ public final class OnClientTester extend
 		private final int rows;
 		private final int errorAt;
 		private final String errorMessage;
+		private boolean encounteredWriteError;
 
 		private int chunkSize = 100; // small number to trigger more bugs
 
@@ -237,9 +253,16 @@ public final class OnClientTester extend
 					throw new IOException(errorMessage);
 				}
 				stream.printf("%d|%d%n", i + 1, i + 1);
+				if (i % 25 == 0 && stream.checkError()) {
+					encounteredWriteError = true;
+					break;
+				}
 			}
 		}
 
+		public Object encounteredWriteError() {
+			return encounteredWriteError;
+		}
 	}
 
 	static class MyDownloadHandler implements MonetDownloadHandler {