view src/main/java/nl/cwi/monetdb/mcl/connection/mapi/OldMapiSocket.java @ 91:6f74e01c57da embedded

Made fixings regarding the null values retrieval. The JDBC embedded connection is working!!! :) Some more testing, optimizations and compilations fixes are still required.
author Pedro Ferreira <pedro.ferreira@monetdbsolutions.com>
date Thu, 05 Jan 2017 17:57:57 +0000 (2017-01-05)
parents 2b5e32efb1a4
children 1dcb51573c89
line wrap: on
line source
/*
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0.  If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 *
 * Copyright 1997 - July 2008 CWI, August 2008 - 2017 MonetDB B.V.
 */

package nl.cwi.monetdb.mcl.connection.mapi;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

public class OldMapiSocket extends AbstractSocket {

    /** The blocksize (hardcoded in compliance with stream.mx) */
    public final static int BLOCK = 8 * 1024 - 2;

    /**
     * A short in two bytes for holding the block size in bytes
     */
    private final byte[] blklen = new byte[2];

    private final OldMapiBlockInputStream inStream;

    private final OldMapiBlockOutputStream outStream;

    OldMapiSocket(String hostname, int port, MapiConnection connection) throws IOException {
        super(hostname, port, connection);
        this.inStream = new OldMapiBlockInputStream(socket.getInputStream());
        this.outStream = new OldMapiBlockOutputStream(socket.getOutputStream());
    }

    @Override
    public int getBlockSize() {
        return BLOCK;
    }

    @Override
    int readToBufferIn(ByteBuffer bufferIn) throws IOException {
        return this.inStream.read(bufferIn);
    }

    @Override
    int writeFromBufferOut(ByteBuffer bufferOut) throws IOException {
        return this.outStream.write(bufferOut);
    }

    @Override
    void flush() throws IOException {
        this.outStream.flush();
    }

    @Override
    public void close() throws IOException {
        this.socket.close();
    }

    private class OldMapiBlockInputStream {

        private final InputStream inStream;

        private int readPos = 0;

        private int blockLen = 0;

        private final byte[] block = new byte[BLOCK];

        /**
         * Constructs this BlockInputStream, backed by the given InputStream. A BufferedInputStream is internally used.
         */
        OldMapiBlockInputStream(InputStream in) {
            this.inStream = in;
        }

        public int available() {
            return blockLen - readPos;
        }

        /**
         * Small wrapper to get a blocking variant of the read() method
         * on the BufferedInputStream.  We want to benefit from the
         * Buffered pre-fetching, but not dealing with half blocks.
         * Changing this class to be able to use the partially received
         * data will greatly complicate matters, while a performance
         * improvement is debatable given the relatively small size of
         * our blocks.  Maybe it does speed up on slower links, then
         * consider this method a quick bug fix/workaround.
         *
         * @return false if reading the block failed due to EOF
         */
        private boolean _read(byte[] b, int len) throws IOException {
            int s;
            int off = 0;

            while (len > 0) {
                s = inStream.read(b, off, len);
                if (s == -1) {
                    // if we have read something before, we should have been able to read the whole, so make this fatal
                    if (off > 0) {
                        throw new IOException("Read from " + connection.getHostname() + ":" +
                                connection.getPort() + ": Incomplete block read from stream");
                    }
                    return false;
                }
                len -= s;
                off += s;
            }
            return true;
        }

        /**
         * Reads the next block on the stream into the internal buffer,
         * or writes the prompt in the buffer.
         * <p>
         * The blocked stream protocol consists of first a two byte
         * integer indicating the length of the block, then the
         * block, followed by another length + block.  The end of
         * such sequence is put in the last bit of the length, and
         * hence this length should be shifted to the right to
         * obtain the real length value first.  We simply fetch
         * blocks here as soon as they are needed for the stream's
         * read methods.
         * <p>
         * The user-flush, which is an implicit effect of the end of
         * a block sequence, is communicated beyond the stream by
         * inserting a prompt sequence on the stream after the last
         * block.  This method makes sure that a final block ends with a
         * newline, if it doesn't already, in order to facilitate a
         * Reader that is possibly chained to this InputStream.
         * <p>
         * If the stream is not positioned correctly, hell will break
         * loose.
         */
        private int readBlock() throws IOException {
            // read next two bytes (short)
            if (!_read(blklen, 2))
                return -1;

            // Get the short-value and store its value in blockLen.
            blockLen = (short) ((blklen[0] & 0xFF) >> 1 | (blklen[1] & 0xFF) << 7);
            readPos = 0;

            // sanity check to avoid bad servers make us do an ugly stack trace
            if (blockLen > block.length)
                throw new AssertionError("Server sent a block larger than BLOCKsize: " +
                        blockLen + " > " + block.length);
            if (!_read(block, blockLen))
                return -1;

            // if this is the last block, make it end with a newline and prompt
            if ((blklen[0] & 0x1) == 1) {
                if (blockLen > 0 && block[blockLen - 1] != '\n') {
                    // to terminate the block in a Reader
                    block[blockLen++] = '\n';
                }
                // insert 'fake' flush
                block[blockLen++] = MapiConnection.PROMPT_CHAR;
                block[blockLen++] = '\n';
            }

            return blockLen;
        }

        public int read() throws IOException {
            if (available() == 0) {
                if (readBlock() == -1)
                    return -1;
            }
            return (int) block[readPos++];
        }

        public int read(ByteBuffer b) throws IOException {
            return read(b, 0, b.capacity());
        }

        public int read(ByteBuffer b, int off, int len) throws IOException {
            b.clear();
            int t;
            int size = 0;
            while (size < len) {
                t = available();
                if (t == 0) {
                    if (size != 0)
                        break;
                    if (readBlock() == -1) {
                        size = -1;
                        break;
                    }
                    t = available();
                }
                if (len > t) {
                    System.arraycopy(block, readPos, b.array(), off, t);
                    off += t;
                    len -= t;
                    readPos += t;
                    size += t;
                } else {
                    System.arraycopy(block, readPos, b.array(), off, len);
                    readPos += len;
                    size += len;
                    break;
                }
            }
            b.position(size);
            b.flip();
            return size;
        }

        public long skip(long n) throws IOException {
            long skip = n;
            int t;
            while (skip > 0) {
                t = available();
                if (skip > t) {
                    skip -= t;
                    readPos += t;
                    readBlock();
                } else {
                    readPos += skip;
                    break;
                }
            }
            return n;
        }
    }

    class OldMapiBlockOutputStream {

        private final OutputStream outStream;

        private int writePos = 0;

        private byte[] block = new byte[BLOCK];

        private int blocksize = 0;

        /**
         * Constructs this BlockOutputStream, backed by the given OutputStream. A BufferedOutputStream is internally
         * used.
         */
        OldMapiBlockOutputStream(OutputStream out) {
            this.outStream = out;
        }

        void flush() throws IOException {
            // write the block (as final) then flush.
            writeBlock(true);
            outStream.flush();
        }

        /**
         * writeBlock puts the data in the block on the stream. The boolean last controls whether the block is sent with
         * an indicator to note it is the last block of a sequence or not.
         *
         * @param last whether this is the last block
         * @throws IOException if writing to the stream failed
         */
        void writeBlock(boolean last) throws IOException {
            if (last) {
                // always fits, because of BLOCK's size
                blocksize = (short) writePos;
                // this is the last block, so encode least significant bit in the first byte (little-endian)
                blklen[0] = (byte) (blocksize << 1 & 0xFF | 1);
                blklen[1] = (byte) (blocksize >> 7);
            } else {
                // always fits, because of BLOCK's size
                blocksize = (short) BLOCK;
                // another block will follow, encode least significant bit in the first byte (little-endian)
                blklen[0] = (byte) (blocksize << 1 & 0xFF);
                blklen[1] = (byte) (blocksize >> 7);
            }
            outStream.write(blklen);
            // write the actual block
            outStream.write(block, 0, writePos);
            writePos = 0;
        }

        void write(int b) throws IOException {
            if (writePos == BLOCK) {
                writeBlock(false);
            }
            block[writePos++] = (byte) b;
        }

        int write(ByteBuffer b) throws IOException {
            return write(b, 0, b.position());
        }

        int write(ByteBuffer b, int off, int len) throws IOException {
            int t, written = 0;
            b.flip();
            while (len > 0) {
                t = BLOCK - writePos;
                if (len > t) {
                    System.arraycopy(b.array(), off, block, writePos, t);
                    off += t;
                    len -= t;
                    writePos += t;
                    written += t;
                    writeBlock(false);
                } else {
                    System.arraycopy(b.array(), off, block, writePos, len);
                    writePos += len;
                    written += len;
                    break;
                }
            }
            b.clear();
            return written;
        }

        public void close() throws IOException {
            // we don't want the flush() method to be called (default of the FilterOutputStream), so we close manually
            // here
            outStream.close();
        }
    }
}