view src/main/java/nl/cwi/monetdb/mcl/connection/mapi/AbstractSocket.java @ 73:953422c41194 embedded

The data retrieval in ResultSets is now Column wise. Ready to start the embedded integrate, but it has to perform extra tests for the more rare types.
author Pedro Ferreira <pedro.ferreira@monetdbsolutions.com>
date Tue, 13 Dec 2016 18:35:30 +0100 (2016-12-13)
parents src/main/java/nl/cwi/monetdb/mcl/connection/socket/AbstractSocket.java@4e2a2a81cc6a
children 17365ed26611
line wrap: on
line source
package nl.cwi.monetdb.mcl.connection.mapi;

import java.io.Closeable;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;

/**
 * Created by ferreira on 12/9/16.
 */
public abstract class AbstractSocket implements Closeable {

    protected final Socket socket;

    protected final MapiConnection connection;

    private final ByteBuffer bufferIn;

    private final ByteBuffer bufferOut;

    private final CharBuffer stringsEncoded;

    private final CharBuffer stringsDecoded;

    private final CharsetEncoder asciiEncoder = StandardCharsets.UTF_8.newEncoder();

    private final CharsetDecoder asciiDecoder = StandardCharsets.UTF_8.newDecoder();

    private boolean hasFinished;

    public AbstractSocket(String hostname, int port, MapiConnection connection) throws IOException {
        this.socket = new Socket(hostname, port);
        this.connection = connection;
        this.bufferIn = ByteBuffer.wrap(new byte[getBlockSize()]);
        this.bufferOut = ByteBuffer.wrap(new byte[getBlockSize()]);
        this.stringsEncoded = CharBuffer.allocate(getBlockSize());
        this.stringsDecoded = CharBuffer.allocate(getBlockSize());
        this.stringsDecoded.flip();
    }

    public int getSoTimeout() throws SocketException {
        return socket.getSoTimeout();
    }

    public void setSoTimeout(int s) throws SocketException {
        socket.setSoTimeout(s);
    }

    public void setTcpNoDelay(boolean on) throws SocketException {
        socket.setTcpNoDelay(on);
    }

    public void setSocketChannelEndianness(ByteOrder bo) {
        this.bufferIn.order(bo);
        this.bufferOut.order(bo);
    }

    public abstract int getBlockSize();

    abstract int readToBufferIn(ByteBuffer bufferIn) throws IOException;

    abstract int writeFromBufferOut(ByteBuffer bufferOut) throws IOException;

    abstract void flush() throws IOException;

    private void readToBuffer() throws IOException {
        int read = this.readToBufferIn(this.bufferIn);
        if(read == 0) {
            this.hasFinished = true;
            throw new IOException("Done!");
        }
        this.stringsDecoded.clear();
        this.asciiDecoder.reset();
        this.asciiDecoder.decode(this.bufferIn, this.stringsDecoded,true);
        this.asciiDecoder.flush(this.stringsDecoded);
        this.stringsDecoded.flip();
    }

    public int readLine(StringBuilder builder) throws IOException {
        builder.setLength(0);
        boolean found = false;
        char[] array = this.stringsDecoded.array();
        int position = this.stringsDecoded.position();

        while(!found) {
            if(!this.stringsDecoded.hasRemaining()) {
                this.readToBuffer();
                array = this.stringsDecoded.array();
                position = 0;
            }
            char c = array[position++];
            if(c == '\n') {
                found = true;
            } else {
                builder.append(c);
            }
        }
        this.stringsDecoded.position(position);
        return builder.length();
    }

    private void flushOutputCharBuffer() throws IOException {
        this.stringsEncoded.flip();
        this.asciiEncoder.reset();
        this.asciiEncoder.encode(this.stringsEncoded, this.bufferOut, true);
        this.asciiEncoder.flush(this.bufferOut);
        this.stringsEncoded.clear();
        int written = this.writeFromBufferOut(this.bufferOut);
        if(written == 0) {
            this.hasFinished = true;
            throw new IOException("Done!");
        } else {
            this.flush();
        }
    }

    private void writeNextBlock(String line) throws IOException {
        int limit = line.length();
        for (int i = 0; i < limit; i++) {
            if (!this.stringsEncoded.hasRemaining()) {
                this.flushOutputCharBuffer();
            }
            this.stringsEncoded.put(line.charAt(i));
        }
    }

    public void writeNextLine(String prefix, String line, String suffix) throws IOException {
        if(prefix != null) {
            this.writeNextBlock(prefix);
        }
        this.writeNextBlock(line);
        if(suffix != null) {
            this.writeNextBlock(suffix);
        }
        this.writeNextBlock("\n");
        if (this.stringsEncoded.hasRemaining()) {
            this.flushOutputCharBuffer();
        }
    }
}