Mercurial > hg > monetdb-java
view src/main/java/nl/cwi/monetdb/mcl/connection/mapi/AbstractSocket.java @ 239:6d6e62ca590d embedded
Cast a ByteBuffer to a Buffer before calling flip. This fixes compiling to JDK 7 from JDK9+ compilers.
Details: https://github.com/plasma-umass/doppio/issues/497
author | Pedro Ferreira <pedro.ferreira@monetdbsolutions.com> |
---|---|
date | Tue, 12 Jun 2018 14:38:27 +0200 (2018-06-12) |
parents | 5b13ccaba741 |
children | 4face9f42efc |
line wrap: on
line source
/* * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * * Copyright 1997 - July 2008 CWI, August 2008 - 2018 MonetDB B.V. */ package nl.cwi.monetdb.mcl.connection.mapi; import nl.cwi.monetdb.mcl.connection.helpers.BufferReallocator; import java.io.Closeable; import java.io.IOException; import java.net.Socket; import java.net.SocketException; import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.CharBuffer; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; import java.nio.charset.StandardCharsets; /** * An abstract class to be extended by a JDBC socket connection. The base idea of this class is to allow easy * integrations with future versions of the MAPI protocol. With new versions of the protocol, the way the data is * fetched will be different hence this class should be sub-classed according to the protocol itself. * <br/> * Meanwhile the implementation of this class uses Java ByteBuffers which allows memory re-usage for more performance. * Also MonetDB uses UTF-8 as its character encoding, hence it is required to convert into UTF-16 (JVM encoding). * * @author Pedro Ferreira */ public abstract class AbstractSocket implements Closeable { /** The TCP Socket to mserver */ protected final Socket socket; /** The MAPI connection this socket belong to */ protected final MapiConnection connection; /** ByteBuffer to read from the underlying socket InputStream */ private final ByteBuffer bufferIn; /** ByteBuffer to write into the underlying socket OutputStream */ private final ByteBuffer bufferOut; /** The bytes read from the bufferIn decoded into UTF-16 */ private final CharBuffer stringsDecoded; /** The bytes to write into the bufferOut encoded into UTF-8 */ private final CharBuffer stringsEncoded; /** UTF-8 encoder */ private final CharsetEncoder utf8Encoder = StandardCharsets.UTF_8.newEncoder(); /** UTF-8 decoder */ private final CharsetDecoder utf8Decoder = StandardCharsets.UTF_8.newDecoder(); AbstractSocket(String hostname, int port, MapiConnection connection) throws IOException { this.socket = new Socket(hostname, port); this.connection = connection; this.bufferIn = ByteBuffer.wrap(new byte[getFullBlockSize()]); this.bufferOut = ByteBuffer.wrap(new byte[getFullBlockSize()]); this.stringsDecoded = CharBuffer.allocate(getFullBlockSize()); ((Buffer)this.stringsDecoded).flip(); this.stringsEncoded = CharBuffer.allocate(getFullBlockSize()); } /** * Get the socket timeout in milliseconds. * * @return The currently in use socket timeout in milliseconds * @throws SocketException If an error in the underlying connection happened */ int getSoTimeout() throws SocketException { return socket.getSoTimeout(); } /** * Sets the socket timeout in milliseconds. * * @param s The socket timeout in milliseconds * @throws SocketException If an error in the underlying connection happened */ void setSoTimeout(int s) throws SocketException { socket.setSoTimeout(s); } /** * Sets the TCP no delay feature in the underlying socket. * * @param on A true or false value * @throws SocketException If an error in the underlying connection happened */ void setTcpNoDelay(boolean on) throws SocketException { socket.setTcpNoDelay(on); } /** * Sets the underlying socket Endianness. * * @param bo A ByteOrder order value either Little-endian or Big-endian */ void setSocketChannelEndianness(ByteOrder bo) { this.bufferIn.order(bo); this.bufferOut.order(bo); } /** * Gets the underlying socket full block size. * * @return The underlying socket full block size */ public abstract int getFullBlockSize(); /** * Gets the underlying socket block size. * * @return The underlying socket block size */ public abstract int getBlockSize(); /** * Reads from the underlying socket into the bufferIn. * * @return The number off bytes read * @throws IOException If an error in the underlying connection happened */ abstract int readToBufferIn(ByteBuffer bufferIn) throws IOException; /** * Writes from bufferOut into the underlying socket. * * @return The number off bytes written * @throws IOException If an error in the underlying connection happened */ abstract int writeFromBufferOut(ByteBuffer bufferOut) throws IOException; /** * Flushes the output. * * @throws IOException If an error in the underlying connection happened */ abstract void flush() throws IOException; /** * Helper method to read and decode UTF-8 data. * * @throws IOException If an error in the underlying connection happened */ private void readToInputBuffer() throws IOException { int read = this.readToBufferIn(this.bufferIn); if(read == 0) { throw new IOException("The server has reached EOF!"); } this.stringsDecoded.clear(); this.utf8Decoder.reset(); this.utf8Decoder.decode(this.bufferIn, this.stringsDecoded,true); this.utf8Decoder.flush(this.stringsDecoded); ((Buffer)this.stringsDecoded).flip(); } /** * Reads a line into the input lineBuffer, reallocating it if necessary. * * @param lineBuffer The buffer the data will be read into * @return The input lineBuffer * @throws IOException If an error in the underlying connection happened */ public CharBuffer readLine(CharBuffer lineBuffer) throws IOException { lineBuffer.clear(); boolean found = false; char[] sourceArray = this.stringsDecoded.array(); int sourcePosition = this.stringsDecoded.position(); int sourceLimit = this.stringsDecoded.limit(); char[] destinationArray = lineBuffer.array(); int destinationPosition = 0; int destinationLimit = lineBuffer.limit(); while(!found) { if(sourcePosition >= sourceLimit) { this.stringsDecoded.position(sourcePosition); this.readToInputBuffer(); sourceArray = this.stringsDecoded.array(); sourcePosition = 0; sourceLimit = this.stringsDecoded.limit(); } char c = sourceArray[sourcePosition++]; if(c == '\n') { found = true; } else { if(destinationPosition + 1 >= destinationLimit) { lineBuffer = BufferReallocator.reallocateBuffer(lineBuffer); destinationArray = lineBuffer.array(); destinationLimit = lineBuffer.limit(); } destinationArray[destinationPosition++] = c; } } this.stringsDecoded.position(sourcePosition); lineBuffer.position(destinationPosition); ((Buffer)lineBuffer).flip(); return lineBuffer; } /** * Helper method to write, encode into UTF-8 and flush. * * @param toFlush A boolean indicating to flush the underlying stream or not * @throws IOException If an error in the underlying connection happened */ private void writeToOutputBuffer(boolean toFlush) throws IOException { ((Buffer)this.stringsEncoded).flip(); this.utf8Encoder.reset(); CoderResult res; int written = 0; do { //to avoid overflow in the UTF-16 to UTF-8 conversion, has to do this cycle res = this.utf8Encoder.encode(this.stringsEncoded, this.bufferOut, false); written += this.writeFromBufferOut(this.bufferOut); } while (res == CoderResult.OVERFLOW); this.utf8Encoder.encode(this.stringsEncoded, this.bufferOut, true); this.utf8Encoder.flush(this.bufferOut); written += this.writeFromBufferOut(this.bufferOut); this.stringsEncoded.clear(); this.bufferOut.clear(); if(written == 0) { throw new IOException("The query could not be sent to the server!"); } else { if(toFlush) { this.flush(); } } } /** * Writes a String line into the underlying socket. * * @param line The line to write in the socket * @throws IOException If an error in the underlying connection happened */ private void writeNextBlock(String line) throws IOException { int limit = line.length(); int destinationPosition = this.stringsEncoded.position(); int destinationCapacity = this.stringsEncoded.capacity(); char[] destinationArray = this.stringsEncoded.array(); for (int i = 0; i < limit; i++) { if (destinationPosition >= destinationCapacity) { this.stringsEncoded.position(destinationPosition); this.writeToOutputBuffer(false); destinationArray = this.stringsEncoded.array(); destinationPosition = 0; } destinationArray[destinationPosition++] = line.charAt(i); } this.stringsEncoded.position(destinationPosition); } /** * Writes a String line as well a String prefix and suffix if supplied. * * @param prefix The prefix to write before the line if provided * @param line The line to write into the socket * @param suffix The suffix to write after the line if provided * @throws IOException If an error in the underlying connection happened */ public void writeNextLine(String prefix, String line, String suffix) throws IOException { if(prefix != null) { this.writeNextBlock(prefix); } this.writeNextBlock(line); if(suffix != null) { this.writeNextBlock(suffix); } this.writeToOutputBuffer(true); } }