Mercurial > hg > monetdb-java
changeset 502:83354bd21320 onclient
Upload fake data when an upload request is received
author | Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com> |
---|---|
date | Tue, 17 Aug 2021 12:21:09 +0200 (2021-08-17) |
parents | eaad79c3235f |
children | 7e3987c16cde |
files | src/main/java/org/monetdb/jdbc/MonetConnection.java src/main/java/org/monetdb/mcl/net/MapiSocket.java |
diffstat | 2 files changed, 163 insertions(+), 4 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/org/monetdb/jdbc/MonetConnection.java +++ b/src/main/java/org/monetdb/jdbc/MonetConnection.java @@ -10,6 +10,8 @@ package org.monetdb.jdbc; import java.io.File; import java.io.IOException; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; import java.net.SocketException; import java.net.SocketTimeoutException; import java.sql.CallableStatement; @@ -3110,11 +3112,14 @@ public class MonetConnection break; case FILETRANSFER: // Consume the command - String dummy = in.readLine(); + String transferCommand = in.readLine(); // Consume the fake prompt inserted by MapiSocket. - dummy = in.readLine(); - // Complain - out.writeLine("!HY000!JDBC driver does not support file transfer yet\n"); + String dummy = in.readLine(); + // Handle the request + error = handleTransfer(transferCommand); + if (error != null) { + out.writeLine("!HY000!" + error + "\n"); + } // Then prepare for the next iteration tmpLine = in.readLine(); linetype = in.getLineType(); @@ -3167,4 +3172,58 @@ public class MonetConnection } } // }}} + + private String handleTransfer(String transferCommand) throws IOException { + String[] parts = transferCommand.split(" " , 3); + if (parts.length == 3) { + if (parts[0].equals("r")) { + int offset; + try { + offset = Integer.parseInt(parts[1]); + } catch (NumberFormatException e) { + return e.toString(); + } + return handleUpload(parts[2], true, offset); + } + if (parts[0].equals("r")) { + int offset; + try { + offset = Integer.parseInt(parts[1]); + } catch (NumberFormatException e) { + return e.toString(); + } + return handleUpload(parts[2], false, offset); + } + } else if (parts.length == 2) { + if (parts[0].equals("w")) { + return handleDownload(parts[1]); + } + } + return "JDBC does not support this file transfer yet: " + transferCommand; + } + + private String handleUpload(String path, boolean textMode, int offset) throws IOException { + boolean wasFaking = server.setInsertFakeFlushes(false); + try { + MapiSocket.UploadStream us = server.uploadStream(); + us.write('\n'); + PrintStream ps = null; + try { + ps = new PrintStream(us, false, "UTF-8"); + } catch (UnsupportedEncodingException e) { + return e.toString(); + } + for (int i = 0; i < 1200; i++) { + ps.println("banana " + i); + } + ps.close(); + return null; + } finally { + server.setInsertFakeFlushes(wasFaking); + } + } + + private String handleDownload(String path) { + return "JDBC driver does not support downloads yet"; + } }
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java +++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java @@ -1122,6 +1122,12 @@ public class MapiSocket { /* cannot (yet } } + + public UploadStream uploadStream() { + return new UploadStream(); + } + + /** * Destructor called by garbage collector before destroying this * object tries to disconnect the MonetDB connection if it has not @@ -1135,4 +1141,98 @@ public class MapiSocket { /* cannot (yet close(); super.finalize(); } + + public class UploadStream extends FilterOutputStream { + private final int CHUNK_SIZE = 100; + private boolean closed = false; + private int chunkLeft = CHUNK_SIZE; + private byte[] promptBuffer; + + UploadStream() { + super(toMonet); + assert LineType.MORE.bytes().length == LineType.FILETRANSFER.bytes().length; + int promptLen = LineType.MORE.bytes().length; + promptBuffer = new byte[promptLen + 1]; + } + + @Override + public void write(int b) throws IOException { + handleChunking(); + super.write(b); + wrote(1); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + while (len > 0) { + handleChunking(); + int toWrite = Integer.min(len, chunkLeft); + super.write(b, off, toWrite); + off += toWrite; + len -= toWrite; + wrote(toWrite); + } + } + + @Override + public void flush() throws IOException { + // suppress flushes + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + if (chunkLeft != CHUNK_SIZE) { + // flush pending data + flushAndReadPrompt(); + } + // send empty block + out.flush(); + LineType acknowledgement = readPrompt(); + if (acknowledgement != LineType.FILETRANSFER) { + throw new IOException("Expected server to acknowledge end of file"); + } + } + + private void wrote(int i) { + chunkLeft -= i; + } + + private void handleChunking() throws IOException { + if (chunkLeft > 0) { + return; + } + flushAndReadPrompt(); + } + + private void flushAndReadPrompt() throws IOException { + out.flush(); + chunkLeft = CHUNK_SIZE; + LineType lineType = readPrompt(); + switch (lineType) { + case MORE: + return; + case FILETRANSFER: + throw new IOException("Server aborted the upload"); + default: + throw new IOException("Expected MORE/DONE from server, got " + lineType); + } + } + + private LineType readPrompt() throws IOException { + int nread = fromMonet.read(promptBuffer); + if (nread != promptBuffer.length || promptBuffer[promptBuffer.length - 1] != '\n') { + throw new IOException("server return incomplete prompt"); + } + LineType lineType = LineType.classify(promptBuffer); + return lineType; + } + } }