Mercurial > hg > monetdb-java
changeset 521:72007c4f8f8a onclient
Allow MonetUploadHandler to configure the chunk size
(every chunk_size bytes, the client suspends the upload and
asks the server whether to continue)
author | Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com> |
---|---|
date | Thu, 26 Aug 2021 11:50:11 +0200 (2021-08-26) |
parents | b4c7816e3592 |
children | 279414178dc6 |
files | src/main/java/org/monetdb/jdbc/MonetConnection.java src/main/java/org/monetdb/mcl/net/MapiSocket.java |
diffstat | 2 files changed, 24 insertions(+), 6 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/org/monetdb/jdbc/MonetConnection.java +++ b/src/main/java/org/monetdb/jdbc/MonetConnection.java @@ -3267,6 +3267,7 @@ public class MonetConnection private final MapiSocket server; private PrintStream print = null; private String error = null; + private int customChunkSize = -1; Upload(MapiSocket server) { this.server = server; @@ -3279,13 +3280,17 @@ public class MonetConnection error = errorMessage; } + public void setChunkSize(int chunkSize) { + this.customChunkSize = chunkSize; + } + public PrintStream getStream() throws IOException { if (error != null) { throw new IOException("Cannot send data after an error has been sent"); } if (print == null) { try { - MapiSocket.UploadStream up = server.uploadStream(); + MapiSocket.UploadStream up = customChunkSize >= 0 ? server.uploadStream(customChunkSize) : server.uploadStream(); print = new PrintStream(up, false, "UTF-8"); up.write('\n'); } catch (UnsupportedEncodingException e) {
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java +++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java @@ -1164,6 +1164,9 @@ public class MapiSocket { /* cannot (yet } } + public UploadStream uploadStream(int chunkSize) { + return new UploadStream(chunkSize); + } public UploadStream uploadStream() { return new UploadStream(); @@ -1188,16 +1191,26 @@ public class MapiSocket { /* cannot (yet } public class UploadStream extends FilterOutputStream { - private final int CHUNK_SIZE = 100; + public final static int DEFAULT_CHUNK_SIZE = 1024 * 1024; + private final int chunkSize; private boolean closed = false; - private int chunkLeft = CHUNK_SIZE; + private int chunkLeft; private byte[] promptBuffer; - UploadStream() { + UploadStream(int chunkSize) { super(toMonet); + if (chunkSize <= 0) { + throw new IllegalArgumentException("chunk size must be positive"); + } + this.chunkSize = chunkSize; assert LineType.MORE.bytes().length == LineType.FILETRANSFER.bytes().length; int promptLen = LineType.MORE.bytes().length; promptBuffer = new byte[promptLen + 1]; + chunkLeft = this.chunkSize; + } + + UploadStream() { + this(DEFAULT_CHUNK_SIZE); } @Override @@ -1234,7 +1247,7 @@ public class MapiSocket { /* cannot (yet if (closed) { return; } - if (chunkLeft != CHUNK_SIZE) { + if (chunkLeft != chunkSize) { // flush pending data flushAndReadPrompt(); } @@ -1259,7 +1272,7 @@ public class MapiSocket { /* cannot (yet private void flushAndReadPrompt() throws IOException { out.flush(); - chunkLeft = CHUNK_SIZE; + chunkLeft = chunkSize; LineType lineType = readPrompt(); switch (lineType) { case MORE: