changeset 502:83354bd21320 onclient

Upload fake data when an upload request is received
author Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com>
date Tue, 17 Aug 2021 12:21:09 +0200 (2021-08-17)
parents eaad79c3235f
children 7e3987c16cde
files src/main/java/org/monetdb/jdbc/MonetConnection.java src/main/java/org/monetdb/mcl/net/MapiSocket.java
diffstat 2 files changed, 163 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/org/monetdb/jdbc/MonetConnection.java
+++ b/src/main/java/org/monetdb/jdbc/MonetConnection.java
@@ -10,6 +10,8 @@ package org.monetdb.jdbc;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.sql.CallableStatement;
@@ -3110,11 +3112,14 @@ public class MonetConnection
 							break;
 						case FILETRANSFER:
 							// Consume the command
-							String dummy = in.readLine();
+							String transferCommand = in.readLine();
 							// Consume the fake prompt inserted by MapiSocket.
-							dummy = in.readLine();
-							// Complain
-							out.writeLine("!HY000!JDBC driver does not support file transfer yet\n");
+							String dummy = in.readLine();
+							// Handle the request
+							error = handleTransfer(transferCommand);
+							if (error != null) {
+								out.writeLine("!HY000!" + error + "\n");
+							}
 							// Then prepare for the next iteration
 							tmpLine = in.readLine();
 							linetype = in.getLineType();
@@ -3167,4 +3172,58 @@ public class MonetConnection
 		}
 	}
 	// }}}
+
+	private String handleTransfer(String transferCommand) throws IOException {
+		String[] parts = transferCommand.split(" " , 3);
+		if (parts.length == 3) {
+			if (parts[0].equals("r")) {
+				int offset;
+				try {
+					offset = Integer.parseInt(parts[1]);
+				} catch (NumberFormatException e) {
+					return e.toString();
+				}
+				return handleUpload(parts[2], true, offset);
+			}
+			if (parts[0].equals("r")) {
+				int offset;
+				try {
+					offset = Integer.parseInt(parts[1]);
+				} catch (NumberFormatException e) {
+					return e.toString();
+				}
+				return handleUpload(parts[2], false, offset);
+			}
+		} else if (parts.length == 2) {
+			if (parts[0].equals("w")) {
+				return handleDownload(parts[1]);
+			}
+		}
+		return "JDBC does not support this file transfer yet: " + transferCommand;
+	}
+
+	private String handleUpload(String path, boolean textMode, int offset) throws IOException {
+		boolean wasFaking = server.setInsertFakeFlushes(false);
+		try {
+			MapiSocket.UploadStream us = server.uploadStream();
+			us.write('\n');
+			PrintStream ps = null;
+			try {
+				ps = new PrintStream(us, false, "UTF-8");
+			} catch (UnsupportedEncodingException e) {
+				return e.toString();
+			}
+			for (int i = 0; i < 1200; i++) {
+				ps.println("banana " + i);
+			}
+			ps.close();
+			return null;
+		} finally {
+			server.setInsertFakeFlushes(wasFaking);
+		}
+	}
+
+	private String handleDownload(String path) {
+		return "JDBC driver does not support downloads yet";
+	}
 }
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java
+++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java
@@ -1122,6 +1122,12 @@ public class MapiSocket {	/* cannot (yet
 		}
 	}
 
+
+	public UploadStream uploadStream() {
+		return new UploadStream();
+	}
+
+
 	/**
 	 * Destructor called by garbage collector before destroying this
 	 * object tries to disconnect the MonetDB connection if it has not
@@ -1135,4 +1141,98 @@ public class MapiSocket {	/* cannot (yet
 		close();
 		super.finalize();
 	}
+
+	public class UploadStream extends FilterOutputStream {
+		private final int CHUNK_SIZE = 100;
+		private boolean closed = false;
+		private int chunkLeft = CHUNK_SIZE;
+		private byte[] promptBuffer;
+
+		UploadStream() {
+			super(toMonet);
+			assert LineType.MORE.bytes().length == LineType.FILETRANSFER.bytes().length;
+			int promptLen = LineType.MORE.bytes().length;
+			promptBuffer = new byte[promptLen + 1];
+		}
+
+		@Override
+		public void write(int b) throws IOException {
+			handleChunking();
+			super.write(b);
+			wrote(1);
+		}
+
+		@Override
+		public void write(byte[] b) throws IOException {
+			write(b, 0, b.length);
+		}
+
+		@Override
+		public void write(byte[] b, int off, int len) throws IOException {
+			while (len > 0) {
+				handleChunking();
+				int toWrite = Integer.min(len, chunkLeft);
+				super.write(b, off, toWrite);
+				off += toWrite;
+				len -= toWrite;
+				wrote(toWrite);
+			}
+		}
+
+		@Override
+		public void flush() throws IOException {
+			// suppress flushes
+		}
+
+		@Override
+		public void close() throws IOException {
+			if (closed) {
+				return;
+			}
+			if (chunkLeft != CHUNK_SIZE) {
+				// flush pending data
+				flushAndReadPrompt();
+			}
+			// send empty block
+			out.flush();
+			LineType acknowledgement = readPrompt();
+			if (acknowledgement != LineType.FILETRANSFER) {
+				throw new IOException("Expected server to acknowledge end of file");
+			}
+		}
+
+		private void wrote(int i) {
+			chunkLeft -= i;
+		}
+
+		private void handleChunking() throws IOException {
+			if (chunkLeft > 0) {
+				return;
+			}
+			flushAndReadPrompt();
+		}
+
+		private void flushAndReadPrompt() throws IOException {
+			out.flush();
+			chunkLeft = CHUNK_SIZE;
+			LineType lineType = readPrompt();
+			switch (lineType) {
+				case MORE:
+					return;
+				case FILETRANSFER:
+					throw new IOException("Server aborted the upload");
+				default:
+					throw new IOException("Expected MORE/DONE from server, got " + lineType);
+			}
+		}
+
+		private LineType readPrompt() throws IOException {
+			int nread = fromMonet.read(promptBuffer);
+			if (nread != promptBuffer.length || promptBuffer[promptBuffer.length - 1] != '\n') {
+				throw new IOException("server return incomplete prompt");
+			}
+			LineType lineType = LineType.classify(promptBuffer);
+			return lineType;
+		}
+	}
 }