view src/main/java/nl/cwi/monetdb/mcl/connection/mapi/OldMapiSocket.java @ 99:1dcb51573c89 embedded

Added the remaining documentation.
author Pedro Ferreira <pedro.ferreira@monetdbsolutions.com>
date Thu, 12 Jan 2017 16:32:51 +0100 (2017-01-12)
parents 6f74e01c57da
children d4c6a01cc300
line wrap: on
line source
/*
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0.  If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 *
 * Copyright 1997 - July 2008 CWI, August 2008 - 2017 MonetDB B.V.
 */

package nl.cwi.monetdb.mcl.connection.mapi;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
 * A Socket for communicating with the MonetDB database in MAPI block mode. The OldMapiSocket implements the protocol
 * specifics of the MAPI block mode protocol,
 *
 * For each line read, it is determined what type of line it is according to the MonetDB MAPI protocol version 9. This
 * results in a line to be PROMPT, HEADER, RESULT, ERROR or UNKNOWN.
 *
 * The general use of this Socket must be seen only in the full context of a MAPI connection to a server. It has the
 * same ingredients as a normal Socket, allowing for seamless plugging.
 * <pre>
 *    Socket   \     /  InputStream  ----&gt;
 *              &gt; o &lt;
 *  MapiSocket /     \ OutputStream  ----&gt;
 * </pre>
 * The OldMapiSocket allows to retrieve Streams for communicating.  They are interfaced, so they can be chained in any
 * way.  While the Socket transparently deals with how data is sent over the wire, the actual data read needs to be
 * interpreted, for which a Reader/Writer interface is most sufficient.
 *
 * @author Fabian Groffen
 * @version 4.1
 */
public class OldMapiSocket extends AbstractSocket {

    /** The blocksize (hardcoded in compliance with stream.mx) */
    public final static int BLOCK = 8 * 1024 - 2;

    /**
     * A short in two bytes for holding the block size in bytes.
     */
    private final byte[] blklen = new byte[2];

    /**
     * The socket input stream read by blocks.
     */
    private final OldMapiBlockInputStream inStream;

    /**
     * The socket output stream written by blocks.
     */
    private final OldMapiBlockOutputStream outStream;

    OldMapiSocket(String hostname, int port, MapiConnection connection) throws IOException {
        super(hostname, port, connection);
        this.inStream = new OldMapiBlockInputStream(socket.getInputStream());
        this.outStream = new OldMapiBlockOutputStream(socket.getOutputStream());
    }

    /**
     * The block size will be the one hardcoded on the connection.
     */
    @Override
    public int getBlockSize() {
        return BLOCK;
    }

    @Override
    int readToBufferIn(ByteBuffer bufferIn) throws IOException {
        return this.inStream.read(bufferIn);
    }

    @Override
    int writeFromBufferOut(ByteBuffer bufferOut) throws IOException {
        return this.outStream.write(bufferOut);
    }

    @Override
    void flush() throws IOException {
        this.outStream.flush();
    }

    @Override
    public void close() throws IOException {
        this.socket.close();
    }

    /**
     * Inner class that is used to make the data on the blocked stream available as a normal stream.
     */
    private class OldMapiBlockInputStream {

        private final InputStream inStream;

        private int readPos = 0;

        private int blockLen = 0;

        private final byte[] block = new byte[BLOCK];

        /**
         * Constructs this BlockInputStream, backed by the given InputStream. A BufferedInputStream is internally used.
         */
        OldMapiBlockInputStream(InputStream in) {
            this.inStream = in;
        }

        public int available() {
            return blockLen - readPos;
        }

        /**
         * Small wrapper to get a blocking variant of the read() method on the BufferedInputStream. We want to benefit
         * from the Buffered pre-fetching, but not dealing with half blocks. Changing this class to be able to use the
         * partially received data will greatly complicate matters, while a performance improvement is debatable given
         * the relatively small size of our blocks. Maybe it does speed up on slower links, then consider this method a
         * quick bug fix/workaround.
         *
         * @return false if reading the block failed due to EOF
         */
        private boolean _read(byte[] b, int len) throws IOException {
            int s;
            int off = 0;

            while (len > 0) {
                s = inStream.read(b, off, len);
                if (s == -1) {
                    // if we have read something before, we should have been able to read the whole, so make this fatal
                    if (off > 0) {
                        throw new IOException("Read from " + connection.getHostname() + ":" +
                                connection.getPort() + ": Incomplete block read from stream");
                    }
                    return false;
                }
                len -= s;
                off += s;
            }
            return true;
        }

        /**
         * Reads the next block on the stream into the internal buffer, or writes the prompt in the buffer.
         * <p>
         * The blocked stream protocol consists of first a two byte integer indicating the length of the block, then the
         * block, followed by another length + block. The end of such sequence is put in the last bit of the length, and
         * hence this length should be shifted to the right to obtain the real length value first. We simply fetch
         * blocks here as soon as they are needed for the stream's read methods.
         * <p>
         * The user-flush, which is an implicit effect of the end of a block sequence, is communicated beyond the stream
         * by inserting a prompt sequence on the stream after the last block. This method makes sure that a final block
         * ends with a newline, if it doesn't already, in order to facilitate a Reader that is possibly chained to this
         * InputStream.
         * <p>
         * If the stream is not positioned correctly, hell will break loose.
         */
        private int readBlock() throws IOException {
            // read next two bytes (short)
            if (!_read(blklen, 2))
                return -1;

            // Get the short-value and store its value in blockLen.
            blockLen = (short) ((blklen[0] & 0xFF) >> 1 | (blklen[1] & 0xFF) << 7);
            readPos = 0;

            // sanity check to avoid bad servers make us do an ugly stack trace
            if (blockLen > block.length)
                throw new AssertionError("Server sent a block larger than BLOCKsize: " +
                        blockLen + " > " + block.length);
            if (!_read(block, blockLen))
                return -1;

            // if this is the last block, make it end with a newline and prompt
            if ((blklen[0] & 0x1) == 1) {
                if (blockLen > 0 && block[blockLen - 1] != '\n') {
                    // to terminate the block in a Reader
                    block[blockLen++] = '\n';
                }
                // insert 'fake' flush
                block[blockLen++] = MapiConnection.PROMPT_CHAR;
                block[blockLen++] = '\n';
            }

            return blockLen;
        }

        public int read() throws IOException {
            if (available() == 0) {
                if (readBlock() == -1)
                    return -1;
            }
            return (int) block[readPos++];
        }

        public int read(ByteBuffer b) throws IOException {
            return read(b, 0, b.capacity());
        }

        public int read(ByteBuffer b, int off, int len) throws IOException {
            b.clear();
            int t;
            int size = 0;
            while (size < len) {
                t = available();
                if (t == 0) {
                    if (size != 0)
                        break;
                    if (readBlock() == -1) {
                        size = -1;
                        break;
                    }
                    t = available();
                }
                if (len > t) {
                    System.arraycopy(block, readPos, b.array(), off, t);
                    off += t;
                    len -= t;
                    readPos += t;
                    size += t;
                } else {
                    System.arraycopy(block, readPos, b.array(), off, len);
                    readPos += len;
                    size += len;
                    break;
                }
            }
            b.position(size);
            b.flip();
            return size;
        }

        public long skip(long n) throws IOException {
            long skip = n;
            int t;
            while (skip > 0) {
                t = available();
                if (skip > t) {
                    skip -= t;
                    readPos += t;
                    readBlock();
                } else {
                    readPos += skip;
                    break;
                }
            }
            return n;
        }
    }

    /**
     * Inner class that is used to write data on a normal stream as a blocked stream. A call to the flush() method will
     * write a "final" block to the underlying stream. Non-final blocks are written as soon as one or more bytes would
     * not fit in the current block any more. This allows to write to a block to it's full size, and then flush it
     * explicitly to have a final block being written to the stream.
     */
    class OldMapiBlockOutputStream {

        private final OutputStream outStream;

        private int writePos = 0;

        private byte[] block = new byte[BLOCK];

        private int blocksize = 0;

        /**
         * Constructs this BlockOutputStream, backed by the given OutputStream. A BufferedOutputStream is internally
         * used.
         */
        OldMapiBlockOutputStream(OutputStream out) {
            this.outStream = out;
        }

        void flush() throws IOException {
            // write the block (as final) then flush.
            writeBlock(true);
            outStream.flush();
        }

        /**
         * writeBlock puts the data in the block on the stream. The boolean last controls whether the block is sent with
         * an indicator to note it is the last block of a sequence or not.
         *
         * @param last whether this is the last block
         * @throws IOException if writing to the stream failed
         */
        void writeBlock(boolean last) throws IOException {
            if (last) {
                // always fits, because of BLOCK's size
                blocksize = (short) writePos;
                // this is the last block, so encode least significant bit in the first byte (little-endian)
                blklen[0] = (byte) (blocksize << 1 & 0xFF | 1);
                blklen[1] = (byte) (blocksize >> 7);
            } else {
                // always fits, because of BLOCK's size
                blocksize = (short) BLOCK;
                // another block will follow, encode least significant bit in the first byte (little-endian)
                blklen[0] = (byte) (blocksize << 1 & 0xFF);
                blklen[1] = (byte) (blocksize >> 7);
            }
            outStream.write(blklen);
            // write the actual block
            outStream.write(block, 0, writePos);
            writePos = 0;
        }

        void write(int b) throws IOException {
            if (writePos == BLOCK) {
                writeBlock(false);
            }
            block[writePos++] = (byte) b;
        }

        int write(ByteBuffer b) throws IOException {
            return write(b, 0, b.position());
        }

        int write(ByteBuffer b, int off, int len) throws IOException {
            int t, written = 0;
            b.flip();
            while (len > 0) {
                t = BLOCK - writePos;
                if (len > t) {
                    System.arraycopy(b.array(), off, block, writePos, t);
                    off += t;
                    len -= t;
                    writePos += t;
                    written += t;
                    writeBlock(false);
                } else {
                    System.arraycopy(b.array(), off, block, writePos, len);
                    writePos += len;
                    written += len;
                    break;
                }
            }
            b.clear();
            return written;
        }

        public void close() throws IOException {
            // we don't want the flush() method to be called (default of the FilterOutputStream), so we close manually
            // here
            outStream.close();
        }
    }
}