Mercurial > hg > monetdb-java
diff src/main/java/org/monetdb/mcl/net/MapiSocket.java @ 502:83354bd21320 onclient
Upload fake data when an upload request is received
author | Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com> |
---|---|
date | Tue, 17 Aug 2021 12:21:09 +0200 (2021-08-17) |
parents | eaad79c3235f |
children | 1db3398b78f7 |
line wrap: on
line diff
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java +++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java @@ -1122,6 +1122,12 @@ public class MapiSocket { /* cannot (yet } } + + public UploadStream uploadStream() { + return new UploadStream(); + } + + /** * Destructor called by garbage collector before destroying this * object tries to disconnect the MonetDB connection if it has not @@ -1135,4 +1141,98 @@ public class MapiSocket { /* cannot (yet close(); super.finalize(); } + + public class UploadStream extends FilterOutputStream { + private final int CHUNK_SIZE = 100; + private boolean closed = false; + private int chunkLeft = CHUNK_SIZE; + private byte[] promptBuffer; + + UploadStream() { + super(toMonet); + assert LineType.MORE.bytes().length == LineType.FILETRANSFER.bytes().length; + int promptLen = LineType.MORE.bytes().length; + promptBuffer = new byte[promptLen + 1]; + } + + @Override + public void write(int b) throws IOException { + handleChunking(); + super.write(b); + wrote(1); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + while (len > 0) { + handleChunking(); + int toWrite = Integer.min(len, chunkLeft); + super.write(b, off, toWrite); + off += toWrite; + len -= toWrite; + wrote(toWrite); + } + } + + @Override + public void flush() throws IOException { + // suppress flushes + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + if (chunkLeft != CHUNK_SIZE) { + // flush pending data + flushAndReadPrompt(); + } + // send empty block + out.flush(); + LineType acknowledgement = readPrompt(); + if (acknowledgement != LineType.FILETRANSFER) { + throw new IOException("Expected server to acknowledge end of file"); + } + } + + private void wrote(int i) { + chunkLeft -= i; + } + + private void handleChunking() throws IOException { + if (chunkLeft > 0) { + return; + } + flushAndReadPrompt(); + } + + private void flushAndReadPrompt() throws IOException { + out.flush(); + chunkLeft = CHUNK_SIZE; + LineType lineType = readPrompt(); + switch (lineType) { + case MORE: + return; + case FILETRANSFER: + throw new IOException("Server aborted the upload"); + default: + throw new IOException("Expected MORE/DONE from server, got " + lineType); + } + } + + private LineType readPrompt() throws IOException { + int nread = fromMonet.read(promptBuffer); + if (nread != promptBuffer.length || promptBuffer[promptBuffer.length - 1] != '\n') { + throw new IOException("server return incomplete prompt"); + } + LineType lineType = LineType.classify(promptBuffer); + return lineType; + } + } }