Mercurial > hg > monetdb-java
changeset 105:d4c6a01cc300 embedded
Theere was a bug happening when the user's query was larger than the block size. The JDBC client was flushing the stream to the server more than once, so it was fixed to flush only when the query ends.
author | Pedro Ferreira <pedro.ferreira@monetdbsolutions.com> |
---|---|
date | Fri, 20 Jan 2017 09:52:53 +0100 (2017-01-20) |
parents | a00241382675 |
children | d39f656b6614 |
files | src/main/java/nl/cwi/monetdb/mcl/connection/mapi/AbstractSocket.java src/main/java/nl/cwi/monetdb/mcl/connection/mapi/OldMapiSocket.java src/main/java/nl/cwi/monetdb/mcl/protocol/oldmapi/OldMapiProtocol.java |
diffstat | 3 files changed, 43 insertions(+), 21 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/nl/cwi/monetdb/mcl/connection/mapi/AbstractSocket.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/mapi/AbstractSocket.java @@ -52,11 +52,11 @@ public abstract class AbstractSocket imp AbstractSocket(String hostname, int port, MapiConnection connection) throws IOException { this.socket = new Socket(hostname, port); this.connection = connection; - this.bufferIn = ByteBuffer.wrap(new byte[getBlockSize()]); - this.bufferOut = ByteBuffer.wrap(new byte[getBlockSize()]); - this.stringsDecoded = CharBuffer.allocate(getBlockSize()); + this.bufferIn = ByteBuffer.wrap(new byte[getFullBlockSize()]); + this.bufferOut = ByteBuffer.wrap(new byte[getFullBlockSize()]); + this.stringsDecoded = CharBuffer.allocate(getFullBlockSize()); this.stringsDecoded.flip(); - this.stringsEncoded = CharBuffer.allocate(getBlockSize()); + this.stringsEncoded = CharBuffer.allocate(getFullBlockSize()); } /** @@ -100,6 +100,13 @@ public abstract class AbstractSocket imp } /** + * Gets the underlying socket full block size. + * + * @return The underlying socket full block size + */ + public abstract int getFullBlockSize(); + + /** * Gets the underlying socket block size. * * @return The underlying socket block size @@ -134,7 +141,7 @@ public abstract class AbstractSocket imp * * @throws IOException If an error in the underlying connection happened */ - private void readToBuffer() throws IOException { + private void readToInputBuffer() throws IOException { int read = this.readToBufferIn(this.bufferIn); if(read == 0) { throw new IOException("The server has reached EOF!"); @@ -158,15 +165,18 @@ public abstract class AbstractSocket imp boolean found = false; char[] sourceArray = this.stringsDecoded.array(); int sourcePosition = this.stringsDecoded.position(); + int sourceLimit = this.stringsDecoded.limit(); char[] destinationArray = lineBuffer.array(); int destinationPosition = 0; int destinationLimit = lineBuffer.limit(); while(!found) { - if(!this.stringsDecoded.hasRemaining()) { - this.readToBuffer(); + if(sourcePosition >= sourceLimit) { + this.stringsDecoded.position(sourcePosition); + this.readToInputBuffer(); sourceArray = this.stringsDecoded.array(); sourcePosition = 0; + sourceLimit = this.stringsDecoded.limit(); } char c = sourceArray[sourcePosition++]; if(c == '\n') { @@ -189,9 +199,10 @@ public abstract class AbstractSocket imp /** * Helper method to write, encode into UTF-8 and flush. * + * @param toFlush A boolean indicating to flush the underlying stream or not * @throws IOException If an error in the underlying connection happened */ - private void flushOutputCharBuffer() throws IOException { + private void writeToOutputBuffer(boolean toFlush) throws IOException { this.stringsEncoded.flip(); this.utf8Encoder.reset(); this.utf8Encoder.encode(this.stringsEncoded, this.bufferOut, true); @@ -201,23 +212,28 @@ public abstract class AbstractSocket imp if(written == 0) { throw new IOException("The query could not be sent to the server!"); } else { - this.flush(); + if(toFlush) { + this.flush(); + } } } /** * Writes a String line into the underlying socket. * + * @param line The line to write in the socket * @throws IOException If an error in the underlying connection happened */ private void writeNextBlock(String line) throws IOException { int limit = line.length(); int destinationPosition = this.stringsEncoded.position(); + int destinationCapacity = this.stringsEncoded.capacity(); char[] destinationArray = this.stringsEncoded.array(); for (int i = 0; i < limit; i++) { - if (!this.stringsEncoded.hasRemaining()) { - this.flushOutputCharBuffer(); + if (destinationPosition >= destinationCapacity) { + this.stringsEncoded.position(destinationPosition); + this.writeToOutputBuffer(false); destinationArray = this.stringsEncoded.array(); destinationPosition = 0; } @@ -242,9 +258,6 @@ public abstract class AbstractSocket imp if(suffix != null) { this.writeNextBlock(suffix); } - this.writeNextBlock("\n"); - if (this.stringsEncoded.hasRemaining()) { - this.flushOutputCharBuffer(); - } + this.writeToOutputBuffer(true); } }
--- a/src/main/java/nl/cwi/monetdb/mcl/connection/mapi/OldMapiSocket.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/mapi/OldMapiSocket.java @@ -36,8 +36,11 @@ import java.nio.ByteBuffer; */ public class OldMapiSocket extends AbstractSocket { + /** The full blocksize to use in the upper layer buffers */ + public final static int FULL_BLOCK = 8 * 1024; + /** The blocksize (hardcoded in compliance with stream.mx) */ - public final static int BLOCK = 8 * 1024 - 2; + private static final int BLOCK = FULL_BLOCK - 2; /** * A short in two bytes for holding the block size in bytes. @@ -61,6 +64,14 @@ public class OldMapiSocket extends Abstr } /** + * The block size to be used in the upper layer buffers + */ + @Override + public int getFullBlockSize() { + return FULL_BLOCK; + } + + /** * The block size will be the one hardcoded on the connection. */ @Override @@ -99,7 +110,7 @@ public class OldMapiSocket extends Abstr private int blockLen = 0; - private final byte[] block = new byte[BLOCK]; + private final byte[] block = new byte[BLOCK + 3]; //\n.\n /** * Constructs this BlockInputStream, backed by the given InputStream. A BufferedInputStream is internally used. @@ -182,7 +193,6 @@ public class OldMapiSocket extends Abstr block[blockLen++] = MapiConnection.PROMPT_CHAR; block[blockLen++] = '\n'; } - return blockLen; } @@ -319,7 +329,6 @@ public class OldMapiSocket extends Abstr int write(ByteBuffer b, int off, int len) throws IOException { int t, written = 0; - b.flip(); while (len > 0) { t = BLOCK - writePos; if (len > t) {
--- a/src/main/java/nl/cwi/monetdb/mcl/protocol/oldmapi/OldMapiProtocol.java +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/oldmapi/OldMapiProtocol.java @@ -59,7 +59,7 @@ public class OldMapiProtocol extends Abs public OldMapiProtocol(OldMapiSocket socket) { this.socket = socket; - this.lineBuffer = CharBuffer.wrap(new char[OldMapiSocket.BLOCK]); + this.lineBuffer = CharBuffer.wrap(new char[OldMapiSocket.FULL_BLOCK]); this.tupleLineBuffer = CharBuffer.wrap(new char[TUPLE_LINE_BUFFER_DEFAULT_SIZE]); } @@ -267,7 +267,7 @@ public class OldMapiProtocol extends Abs * @throws IOException If an error in the underlying connection happened. */ @Override - public synchronized void writeNextQuery(String prefix, String query, String suffix) throws IOException { + public void writeNextQuery(String prefix, String query, String suffix) throws IOException { this.socket.writeNextLine(prefix, query, suffix); // reset reader state, last line isn't valid any more now this.currentServerResponseHeader = ServerResponses.UNKNOWN;