view src/main/java/nl/cwi/monetdb/mcl/connection/mapi/AbstractSocket.java @ 91:6f74e01c57da embedded

Made fixings regarding the null values retrieval. The JDBC embedded connection is working!!! :) Some more testing, optimizations and compilations fixes are still required.
author Pedro Ferreira <pedro.ferreira@monetdbsolutions.com>
date Thu, 05 Jan 2017 17:57:57 +0000 (2017-01-05)
parents 2b5e32efb1a4
children 1dcb51573c89
line wrap: on
line source
/*
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0.  If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 *
 * Copyright 1997 - July 2008 CWI, August 2008 - 2017 MonetDB B.V.
 */

package nl.cwi.monetdb.mcl.connection.mapi;

import nl.cwi.monetdb.mcl.connection.helpers.BufferReallocator;

import java.io.Closeable;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;

public abstract class AbstractSocket implements Closeable {

    protected final Socket socket;

    protected final MapiConnection connection;

    private final ByteBuffer bufferIn;

    private final ByteBuffer bufferOut;

    private final CharBuffer stringsEncoded;

    private final CharBuffer stringsDecoded;

    private final CharsetEncoder asciiEncoder = StandardCharsets.UTF_8.newEncoder();

    private final CharsetDecoder asciiDecoder = StandardCharsets.UTF_8.newDecoder();

    AbstractSocket(String hostname, int port, MapiConnection connection) throws IOException {
        this.socket = new Socket(hostname, port);
        this.connection = connection;
        this.bufferIn = ByteBuffer.wrap(new byte[getBlockSize()]);
        this.bufferOut = ByteBuffer.wrap(new byte[getBlockSize()]);
        this.stringsEncoded = CharBuffer.allocate(getBlockSize());
        this.stringsDecoded = CharBuffer.allocate(getBlockSize());
        this.stringsDecoded.flip();
    }

    int getSoTimeout() throws SocketException {
        return socket.getSoTimeout();
    }

    void setSoTimeout(int s) throws SocketException {
        socket.setSoTimeout(s);
    }

    void setTcpNoDelay(boolean on) throws SocketException {
        socket.setTcpNoDelay(on);
    }

    void setSocketChannelEndianness(ByteOrder bo) {
        this.bufferIn.order(bo);
        this.bufferOut.order(bo);
    }

    public abstract int getBlockSize();

    abstract int readToBufferIn(ByteBuffer bufferIn) throws IOException;

    abstract int writeFromBufferOut(ByteBuffer bufferOut) throws IOException;

    abstract void flush() throws IOException;

    private void readToBuffer() throws IOException {
        int read = this.readToBufferIn(this.bufferIn);
        if(read == 0) {
            throw new IOException("The server has reached EOF!");
        }
        this.stringsDecoded.clear();
        this.asciiDecoder.reset();
        this.asciiDecoder.decode(this.bufferIn, this.stringsDecoded,true);
        this.asciiDecoder.flush(this.stringsDecoded);
        this.stringsDecoded.flip();
    }

    public CharBuffer readLine(CharBuffer lineBuffer) throws IOException {
        lineBuffer.clear();
        boolean found = false;
        char[] sourceArray = this.stringsDecoded.array();
        int sourcePosition = this.stringsDecoded.position();
        char[] destinationArray = lineBuffer.array();
        int destinationPosition = 0;
        int destinationLimit = lineBuffer.limit();

        while(!found) {
            if(!this.stringsDecoded.hasRemaining()) {
                this.readToBuffer();
                sourceArray = this.stringsDecoded.array();
                sourcePosition = 0;
            }
            char c = sourceArray[sourcePosition++];
            if(c == '\n') {
                found = true;
            } else {
                if(destinationPosition + 1 >= destinationLimit) {
                    lineBuffer = BufferReallocator.ReallocateBuffer(lineBuffer);
                    destinationArray = lineBuffer.array();
                    destinationLimit = lineBuffer.limit();
                }
                destinationArray[destinationPosition++] = c;
            }
        }
        this.stringsDecoded.position(sourcePosition);
        lineBuffer.position(destinationPosition);
        lineBuffer.flip();
        return lineBuffer;
    }

    private void flushOutputCharBuffer() throws IOException {
        this.stringsEncoded.flip();
        this.asciiEncoder.reset();
        this.asciiEncoder.encode(this.stringsEncoded, this.bufferOut, true);
        this.asciiEncoder.flush(this.bufferOut);
        this.stringsEncoded.clear();
        int written = this.writeFromBufferOut(this.bufferOut);
        if(written == 0) {
            throw new IOException("The query could not be sent to the server!");
        } else {
            this.flush();
        }
    }

    private void writeNextBlock(String line) throws IOException {
        int limit = line.length();
        int destinationPosition = this.stringsEncoded.position();
        char[] destinationArray = this.stringsEncoded.array();

        for (int i = 0; i < limit; i++) {
            if (!this.stringsEncoded.hasRemaining()) {
                this.flushOutputCharBuffer();
                destinationArray = this.stringsEncoded.array();
                destinationPosition = 0;
            }
            destinationArray[destinationPosition++] = line.charAt(i);
        }
        this.stringsEncoded.position(destinationPosition);
    }

    public void writeNextLine(String prefix, String line, String suffix) throws IOException {
        if(prefix != null) {
            this.writeNextBlock(prefix);
        }
        this.writeNextBlock(line);
        if(suffix != null) {
            this.writeNextBlock(suffix);
        }
        this.writeNextBlock("\n");
        if (this.stringsEncoded.hasRemaining()) {
            this.flushOutputCharBuffer();
        }
    }
}