view src/main/java/nl/cwi/monetdb/mcl/net/MapiSocket.java @ 0:a5a898f6886c

Copy of MonetDB java directory changeset e6e32756ad31.
author Sjoerd Mullender <sjoerd@acm.org>
date Wed, 21 Sep 2016 09:34:48 +0200 (2016-09-21)
parents
children 7e0d71a22677 6cc63d6cb224
line wrap: on
line source
/*
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0.  If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 *
 * Copyright 1997 - July 2008 CWI, August 2008 - 2016 MonetDB B.V.
 */

package nl.cwi.monetdb.mcl.net;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.FileWriter;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.net.Socket;
import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import nl.cwi.monetdb.mcl.MCLException;
import nl.cwi.monetdb.mcl.io.BufferedMCLReader;
import nl.cwi.monetdb.mcl.io.BufferedMCLWriter;
import nl.cwi.monetdb.mcl.parser.MCLParseException;

/**
 * A Socket for communicating with the MonetDB database in MAPI block
 * mode.
 * 
 * The MapiSocket implements the protocol specifics of the MAPI block
 * mode protocol, and interfaces it as a socket that delivers a
 * BufferedReader and a BufferedWriter.  Because logging in is an
 * integral part of the MAPI protocol, the MapiSocket performs the login
 * procedure.  Like the Socket class, various options can be set before
 * calling the connect() method to influence the login process.  Only
 * after a successful call to connect() the BufferedReader and
 * BufferedWriter can be retrieved.
 * <br />
 * For each line read, it is determined what type of line it is
 * according to the MonetDB MAPI protocol.  This results in a line to be
 * PROMPT, HEADER, RESULT, ERROR or UNKNOWN.  Use the getLineType()
 * method on the BufferedMCLReader to retrieve the type of the last
 * line read.
 * 
 * For debugging purposes a socket level debugging is implemented where
 * each and every interaction to and from the MonetDB server is logged
 * to a file on disk.<br />
 * Incoming messages are prefixed by "RX" (received by the driver),
 * outgoing messages by "TX" (transmitted by the driver).  Special
 * decoded non-human readable messages are prefixed with "RD" and "TD"
 * instead.  Following this two char prefix, a timestamp follows as the
 * number of milliseconds since the UNIX epoch.  The rest of the line is
 * a String representation of the data sent or received.
 * 
 * The general use of this Socket must be seen only in the full context
 * of a MAPI connection to a server.  It has the same ingredients as a
 * normal Socket, allowing for seamless plugging.
 * <pre>
 *    Socket   \     /  InputStream  ----&gt; (BufferedMCL)Reader
 *              &gt; o &lt;
 *  MapiSocket /     \ OutputStream  ----&gt; (BufferedMCL)Writer
 * </pre>
 * The MapiSocket allows to retrieve Streams for communicating.  They
 * are interfaced, so they can be chained in any way.  While the Socket
 * transparently deals with how data is sent over the wire, the actual
 * data read needs to be interpreted, for which a Reader/Writer
 * interface is most sufficient.  In particular the BufferedMCL*
 * implementations of those interfaces supply some extra functionality
 * geared towards the format of the data.
 *
 * @author Fabian Groffen
 * @version 4.1
 * @see nl.cwi.monetdb.mcl.io.BufferedMCLReader
 * @see nl.cwi.monetdb.mcl.io.BufferedMCLWriter
 */
public final class MapiSocket {
	/** The TCP Socket to mserver */
	private Socket con;
	/** Stream from the Socket for reading */
	private InputStream fromMonet;
	/** Stream from the Socket for writing */
	private OutputStream toMonet;
	/** MCLReader on the InputStream */
	private BufferedMCLReader reader;
	/** MCLWriter on the OutputStream */
	private BufferedMCLWriter writer;
	/** protocol version of the connection */
	private int version;

	/** The database to connect to */
	private String database = null;
	/** The language to connect with */
	private String language = "sql";
	/** The hash methods to use (null = default) */
	private String hash = null;
	/** Whether we should follow redirects */
	private boolean followRedirects = true;
	/** How many redirections do we follow until we're fed up with it? */
	private int ttl = 10;
	/** Whether we are debugging or not */
	private boolean debug = false;
	/** The Writer for the debug log-file */
	private Writer log;

	/** The blocksize (hardcoded in compliance with stream.mx) */
	public final static int BLOCK = 8 * 1024 - 2;

	/** A short in two bytes for holding the block size in bytes */
	private byte[] blklen = new byte[2];

	/**
	 * Constructs a new MapiSocket.
	 */
	public MapiSocket() {
		con = null;
	}

	/**
	 * Sets the database to connect to.  If database is null, a
	 * connection is made to the default database of the server.  This
	 * is also the default.
	 *
	 * @param db the database
	 */
	public void setDatabase(String db) {
		this.database = db;
	}

	/**
	 * Sets the language to use for this connection.
	 *
	 * @param lang the language
	 */
	public void setLanguage(String lang) {
		this.language = lang;
	}

	/**
	 * Sets the hash method to use.  Note that this method is intended
	 * for debugging purposes.  Setting a hash method can yield in
	 * connection failures.  Multiple hash methods can be given by
	 * separating the hashes by commas.
	 * DON'T USE THIS METHOD if you don't know what you're doing.
	 *
	 * @param hash the hash method to use
	 */
	public void setHash(String hash) {
		this.hash = hash;
	}

	/**
	 * Sets whether MCL redirections should be followed or not.  If set
	 * to false, an MCLException will be thrown when a redirect is
	 * encountered during connect.  The default bahaviour is to
	 * automatically follow redirects.
	 *
	 * @param r whether to follow redirects (true) or not (false)
	 */
	public void setFollowRedirects(boolean r) {
		this.followRedirects = r;
	}

	/**
	 * Sets the number of redirects that are followed when
	 * followRedirects is true.  In order to avoid going into an endless
	 * loop due to some evil server, or another error, a maximum number
	 * of redirects that may be followed can be set here.  Note that to
	 * disable the following of redirects you should use
	 * setFollowRedirects.
	 *
	 * @see #setFollowRedirects(boolean r)
	 * @param t the number of redirects before an exception is thrown
	 */
	public void setTTL(int t) {
		this.ttl = t;
	}

	/**
	 * Set the SO_TIMEOUT on the underlying Socket.  When for some
	 * reason the connection to the database hangs, this setting can be
	 * useful to break out of this indefinite wait.
	 * This option must be enabled prior to entering the blocking
	 * operation to have effect.
	 *
	 * @param s The specified timeout, in milliseconds.  A timeout
	 *        of zero is interpreted as an infinite timeout.
	 * @throws SocketException Issue with the socket
	 */
	public void setSoTimeout(int s) throws SocketException {
		// limit time to wait on blocking operations (0 = indefinite)
		con.setSoTimeout(s);
	}

	/**
	 * Gets the SO_TIMEOUT from the underlying Socket.
	 *
	 * @return the currently in use timeout in milliseconds
	 * @throws SocketException Issue with the socket
	 */
	public int getSoTimeout() throws SocketException {
		return con.getSoTimeout();
	}
	
	/**
	 * Enables/disables debug
	 *
	 * @param debug Value to set
	 */
	public void setDebug(boolean debug) {
		this.debug = debug;
	}

	/**
	 * Connects to the given host and port, logging in as the given
	 * user.  If followRedirect is false, a RedirectionException is
	 * thrown when a redirect is encountered.
	 *
	 * @param host the hostname, or null for the loopback address
	 * @param port the port number
	 * @param user the username
	 * @param pass the password
	 * @return A List with informational (warning) messages. If this 
	 * 		list is empty; then there are no warnings.
	 * @throws IOException if an I/O error occurs when creating the
	 *         socket
	 * @throws MCLParseException if bogus data is received
	 * @throws MCLException if an MCL related error occurs
	 */
	public List<String> connect(String host, int port, String user, String pass)
		throws IOException, MCLParseException, MCLException {
		// Wrap around the internal connect that needs to know if it
		// should really make a TCP connection or not.
		return connect(host, port, user, pass, true);
	}

	private List<String> connect(String host, int port, String user, String pass,
			boolean makeConnection)
		throws IOException, MCLParseException, MCLException {
		if (ttl-- <= 0)
			throw new MCLException("Maximum number of redirects reached, aborting connection attempt.  Sorry.");

		if (makeConnection) {
			con = new Socket(host, port);
			// set nodelay, as it greatly speeds up small messages (like we
			// often do)
			con.setTcpNoDelay(true);

			fromMonet = new BlockInputStream(con.getInputStream());
			toMonet = new BlockOutputStream(con.getOutputStream());
			try {
				reader = new BufferedMCLReader(fromMonet, "UTF-8");
				writer = new BufferedMCLWriter(toMonet, "UTF-8");
			} catch (UnsupportedEncodingException e) {
				throw new AssertionError(e.toString());
			}
			writer.registerReader(reader);
		}

		String c = reader.readLine();
		reader.waitForPrompt();
		writer.writeLine(
				getChallengeResponse(
					c,
					user,
					pass,
					language,
					database,
					hash
					)
				);

		// read monet response till prompt
		List<String> redirects = new ArrayList<String>();
		List<String> warns = new ArrayList<String>();
		String err = "", tmp;
		int lineType;
		do {
			if ((tmp = reader.readLine()) == null)
				throw new IOException("Read from " +
						con.getInetAddress().getHostName() + ":" +
						con.getPort() + ": End of stream reached");
			if ((lineType = reader.getLineType()) == BufferedMCLReader.ERROR) {
				err += "\n" + tmp.substring(7);
			} else if (lineType == BufferedMCLReader.INFO) {
				warns.add(tmp.substring(1));
			} else if (lineType == BufferedMCLReader.REDIRECT) {
				redirects.add(tmp.substring(1));
			}
		} while (lineType != BufferedMCLReader.PROMPT);
		if (err != "") {
			close();
			throw new MCLException(err.trim());
		}
		if (!redirects.isEmpty()) {
			if (followRedirects) {
				// Ok, server wants us to go somewhere else.  The list
				// might have multiple clues on where to go.  For now we
				// don't support anything intelligent but trying the
				// first one.  URI should be in form of:
				// "mapi:monetdb://host:port/database?arg=value&..."
				// or
				// "mapi:merovingian://proxy?arg=value&..."
				// note that the extra arguments must be obeyed in both
				// cases
				String suri = redirects.get(0).toString();
				if (!suri.startsWith("mapi:"))
					throw new MCLException("unsupported redirect: " + suri);

				URI u;
				try {
					u = new URI(suri.substring(5));
				} catch (URISyntaxException e) {
					throw new MCLParseException(e.toString());
				}

				tmp = u.getQuery();
				if (tmp != null) {
					String args[] = tmp.split("&");
					for (int i = 0; i < args.length; i++) {
						int pos = args[i].indexOf("=");
						if (pos > 0) {
							tmp = args[i].substring(0, pos);
							if (tmp.equals("database")) {
								tmp = args[i].substring(pos + 1);
								if (!tmp.equals(database)) {
									warns.add("redirect points to different " +
											"database: " + tmp);
									setDatabase(tmp);
								}
							} else if (tmp.equals("language")) {
								tmp = args[i].substring(pos + 1);
								warns.add("redirect specifies use of different language: " + tmp);
								setLanguage(tmp);
							} else if (tmp.equals("user")) {
								tmp = args[i].substring(pos + 1);
								if (!tmp.equals(user))
									warns.add("ignoring different username '" + tmp + "' set by " +
											"redirect, what are the security implications?");
							} else if (tmp.equals("password")) {
								warns.add("ignoring different password set by redirect, " +
										"what are the security implications?");
							} else {
								warns.add("ignoring unknown argument '" + tmp + "' from redirect");
							}
						} else {
							warns.add("ignoring illegal argument from redirect: " +
									args[i]);
						}
					}
				}

				if (u.getScheme().equals("monetdb")) {
					// this is a redirect to another (monetdb) server,
					// which means a full reconnect
					// avoid the debug log being closed
					if (debug) {
						debug = false;
						close();
						debug = true;
					} else {
						close();
					}
					tmp = u.getPath();
					if (tmp != null && tmp.length() != 0) {
						tmp = tmp.substring(1).trim();
						if (!tmp.isEmpty() && !tmp.equals(database)) {
							warns.add("redirect points to different " +
								"database: " + tmp);
							setDatabase(tmp);
						}
					}
					int p = u.getPort();
					warns.addAll(connect(u.getHost(), p == -1 ? port : p,
							user, pass, true));
					warns.add("Redirect by " + host + ":" + port + " to " + suri);
				} else if (u.getScheme().equals("merovingian")) {
					// reuse this connection to inline connect to the
					// right database that Merovingian proxies for us
					warns.addAll(connect(host, port, user, pass, false));
				} else {
					throw new MCLException("unsupported scheme in redirect: " + suri);
				}
			} else {
				StringBuilder msg = new StringBuilder("The server sent a redirect for this connection:");
				for (String it : redirects) {
					msg.append(" [" + it + "]");
				}
				throw new MCLException(msg.toString());
			}
		}
		return warns;
	}

	/**
	 * A little helper function that processes a challenge string, and
	 * returns a response string for the server.  If the challenge
	 * string is null, a challengeless response is returned.
	 *
	 * @param chalstr the challenge string
	 * @param username the username to use
	 * @param password the password to use
	 * @param language the language to use
	 * @param database the database to connect to
	 * @param hash the hash method(s) to use, or NULL for all supported
	 *             hashes
	 */
	private String getChallengeResponse(
			String chalstr,
			String username,
			String password,
			String language,
			String database,
			String hash
	) throws MCLParseException, MCLException, IOException {
		String response;
		String algo;

		// parse the challenge string, split it on ':'
		String[] chaltok = chalstr.split(":");
		if (chaltok.length <= 4) throw
			new MCLParseException("Server challenge string unusable!  Challenge contains too few tokens: " + chalstr);

		// challenge string to use as salt/key
		String challenge = chaltok[0];
		String servert = chaltok[1];
		try {
			version = Integer.parseInt(chaltok[2].trim());	// protocol version
		} catch (NumberFormatException e) {
			throw new MCLParseException("Protocol version unparseable: " + chaltok[3]);
		}

		// handle the challenge according to the version it is
		switch (version) {
			default:
				throw new MCLException("Unsupported protocol version: " + version);
			case 9:
				// proto 9 is like 8, but uses a hash instead of the
				// plain password, the server tells us which hash in the
				// challenge after the byte-order

				/* NOTE: Java doesn't support RIPEMD160 :( */
				if (chaltok[5].equals("SHA512")) {
					algo = "SHA-512";
				} else if (chaltok[5].equals("SHA384")) {
					algo = "SHA-384";
				} else if (chaltok[5].equals("SHA256")) {
					algo = "SHA-256";
				/* NOTE: Java doesn't support SHA-224 */
				} else if (chaltok[5].equals("SHA1")) {
					algo = "SHA-1";
				} else if (chaltok[5].equals("MD5")) {
					algo = "MD5";
				} else {
					throw new MCLException("Unsupported password hash: " +
							chaltok[5]);
				}

				try {
					MessageDigest md = MessageDigest.getInstance(algo);
					md.update(password.getBytes("UTF-8"));
					byte[] digest = md.digest();
					password = toHex(digest);
				} catch (NoSuchAlgorithmException e) {
					throw new AssertionError("internal error: " + e.toString());
				} catch (UnsupportedEncodingException e) {
					throw new AssertionError("internal error: " + e.toString());
				}

				// proto 7 (finally) used the challenge and works with a
				// password hash.  The supported implementations come
				// from the server challenge.  We chose the best hash
				// we can find, in the order SHA1, MD5, plain.  Also,
				// the byte-order is reported in the challenge string,
				// which makes sense, since only blockmode is supported.
				// proto 8 made this obsolete, but retained the
				// byte-order report for future "binary" transports.  In
				// proto 8, the byte-order of the blocks is always little
				// endian because most machines today are.
				String hashes = (hash == null ? chaltok[3] : hash);
				Set<String> hashesSet = new HashSet<String>(Arrays.asList(hashes.toUpperCase().split("[, ]")));

				// if we deal with merovingian, mask our credentials
				if (servert.equals("merovingian") && !language.equals("control")) {
					username = "merovingian";
					password = "merovingian";
				}
				String pwhash;
				algo = null;
				
				if (hashesSet.contains("SHA512")) {
					algo = "SHA-512";
					pwhash = "{SHA512}";
				} else if (hashesSet.contains("SHA384")) {
					algo = "SHA-384";
					pwhash = "{SHA384}";
				} else if (hashesSet.contains("SHA256")) {
					algo = "SHA-256";
					pwhash = "{SHA256}";
				} else if (hashesSet.contains("SHA1")) {
					algo = "SHA-1";
					pwhash = "{SHA1}";
				} else if (hashesSet.contains("MD5")) {
					algo = "MD5";
					pwhash = "{MD5}";
				} else {
					throw new MCLException("no supported password hashes in " + hashes);
				}
				if (algo != null) {
					try {
						MessageDigest md = MessageDigest.getInstance(algo);
						md.update(password.getBytes("UTF-8"));
						md.update(challenge.getBytes("UTF-8"));
						byte[] digest = md.digest();
						pwhash += toHex(digest);
					} catch (NoSuchAlgorithmException e) {
						throw new AssertionError("internal error: " + e.toString());
					} catch (UnsupportedEncodingException e) {
						throw new AssertionError("internal error: " + e.toString());
					}
				}
				// TODO: some day when we need this, we should store
				// this
				if (chaltok[4].equals("BIG")) {
					// byte-order of server is big-endian
				} else if (chaltok[4].equals("LIT")) {
					// byte-order of server is little-endian
				} else {
					throw new MCLParseException("Invalid byte-order: " + chaltok[5]);
				}

				// generate response
				response = "BIG:";	// JVM byte-order is big-endian
				response += username + ":" + pwhash + ":" + language;
				response += ":" + (database == null ? "" : database) + ":";
				
				return response;
		}
	}
	
	private static char hexChar(int n) {
		return (n > 9) 
				? (char) ('a' + (n - 10))
				: (char) ('0' + n);
	}

	/**
	 * Small helper method to convert a byte string to a hexadecimal
	 * string representation.
	 *
	 * @param digest the byte array to convert
	 * @return the byte array as hexadecimal string
	 */
	private static String toHex(byte[] digest) {
		char[] result = new char[digest.length * 2];
		int pos = 0;
		for (int i = 0; i < digest.length; i++) {
			result[pos++] = hexChar((digest[i] & 0xf0) >> 4);
			result[pos++] = hexChar(digest[i] & 0x0f);
		}
		return new String(result);
	}

	/**
	 * Returns an InputStream that reads from this open connection on
	 * the MapiSocket.
	 *
	 * @return an input stream that reads from this open connection
	 */
	public InputStream getInputStream() {
		return fromMonet;
	}

	/**
	 * Returns an output stream for this MapiSocket.
	 *
	 * @return an output stream for writing bytes to this MapiSocket
	 */
	public OutputStream getOutputStream() {
		return toMonet;
	}

	/**
	 * Returns a Reader for this MapiSocket.  The Reader is a
	 * BufferedMCLReader which does protocol interpretation of the
	 * BlockInputStream produced by this MapiSocket.
	 *
	 * @return a BufferedMCLReader connected to this MapiSocket
	 */
	public BufferedMCLReader getReader() {
		return reader;
	}

	/**
	 * Returns a Writer for this MapiSocket.  The Writer is a
	 * BufferedMCLWriter which produces protocol compatible data blocks
	 * that the BlockOutputStream can properly translate into blocks.
	 *
	 * @return a BufferedMCLWriter connected to this MapiSocket
	 */
	public BufferedMCLWriter getWriter() {
		return writer;
	}

	/**
	 * Returns the mapi protocol version used by this socket.  The
	 * protocol version depends on the server being used.  Users of the
	 * MapiSocket should check this version to act appropriately.
	 *
	 * @return the mapi protocol version
	 */
	public int getProtocolVersion() {
		return version;
	}

	/**
	 * Enables logging to a file what is read and written from and to
	 * the server.  Logging can be enabled at any time.  However, it is
	 * encouraged to start debugging before actually connecting the
	 * socket.
	 *
	 * @param filename the name of the file to write to
	 * @throws IOException if the file could not be opened for writing
	 */
	public void debug(String filename) throws IOException {
		debug(new FileWriter(filename));
	}
	
	/**
	 * Enables logging to a stream what is read and written from and to
	 * the server.  Logging can be enabled at any time.  However, it is
	 * encouraged to start debugging before actually connecting the
	 * socket.
	 *
	 * @param out to write the log to
	 * @throws IOException if the file could not be opened for writing
	 */
	public void debug(PrintStream out) throws IOException {
		debug(new PrintWriter(out));
	}
	
	/**
	 * Enables logging to a stream what is read and written from and to
	 * the server.  Logging can be enabled at any time.  However, it is
	 * encouraged to start debugging before actually connecting the
	 * socket.
	 *
	 * @param out to write the log to
	 * @throws IOException if the file could not be opened for writing
	 */
	public void debug(Writer out) throws IOException {
		log = out;
		debug = true;
	}

	/**
	 * Inner class that is used to write data on a normal stream as a
	 * blocked stream.  A call to the flush() method will write a
	 * "final" block to the underlying stream.  Non-final blocks are
	 * written as soon as one or more bytes would not fit in the
	 * current block any more.  This allows to write to a block to it's
	 * full size, and then flush it explicitly to have a final block
	 * being written to the stream.
	 */
	class BlockOutputStream extends FilterOutputStream {
		private int writePos = 0;
		private byte[] block = new byte[BLOCK];
		private int blocksize = 0;

		/**
		 * Constructs this BlockOutputStream, backed by the given
		 * OutputStream.  A BufferedOutputStream is internally used.
		 */
		public BlockOutputStream(OutputStream out) {
			// always use a buffered stream, even though we know how
			// much bytes to write/read, since this is just faster for
			// some reason
			super(new BufferedOutputStream(out));
		}

		@Override
		public void flush() throws IOException {
			// write the block (as final) then flush.
			writeBlock(true);
			out.flush();

			// it's a bit nasty if an exception is thrown from the log,
			// but ignoring it can be nasty as well, so it is decided to
			// let it go so there is feedback about something going wrong
			// it's a bit nasty if an exception is thrown from the log,
			// but ignoring it can be nasty as well, so it is decided to
			// let it go so there is feedback about something going wrong
			if (debug) {
				log.flush();
			}
		}

		/**
		 * writeBlock puts the data in the block on the stream.  The
		 * boolean last controls whether the block is sent with an
		 * indicator to note it is the last block of a sequence or not.
		 *
		 * @param last whether this is the last block
		 * @throws IOException if writing to the stream failed
		 */
		public void writeBlock(boolean last) throws IOException {
			if (last) {
				// always fits, because of BLOCK's size
				blocksize = (short)writePos;
				// this is the last block, so encode least
				// significant bit in the first byte (little-endian)
				blklen[0] = (byte)(blocksize << 1 & 0xFF | 1);
				blklen[1] = (byte)(blocksize >> 7);
			} else {
				// always fits, because of BLOCK's size
				blocksize = (short)BLOCK;
				// another block will follow, encode least
				// significant bit in the first byte (little-endian)
				blklen[0] = (byte)(blocksize << 1 & 0xFF);
				blklen[1] = (byte)(blocksize >> 7);
			}

			out.write(blklen);

			// write the actual block
			out.write(block, 0, writePos);

			if (debug) {
				if (last) {
					logTd("write final block: " + writePos + " bytes");
				} else {
					logTd("write block: " + writePos + " bytes");
				}
				logTx(new String(block, 0, writePos, "UTF-8"));
			}

			writePos = 0;
		}

		@Override
		public void write(int b) throws IOException {
			if (writePos == BLOCK) {
				writeBlock(false);
			}
			block[writePos++] = (byte)b;
		}

		@Override
		public void write(byte[] b) throws IOException {
			write(b, 0, b.length);
		}

		@Override
		public void write(byte[] b, int off, int len) throws IOException {
			int t = 0;
			while (len > 0) {
				t = BLOCK - writePos;
				if (len > t) {
					System.arraycopy(b, off, block, writePos, t);
					off += t;
					len -= t;
					writePos += t;
					writeBlock(false);
				} else {
					System.arraycopy(b, off, block, writePos, len);
					writePos += len;
					break;
				}
			}
		}

		@Override
		public void close() throws IOException {
			// we don't want the flush() method to be called (default of
			// the FilterOutputStream), so we close manually here
			out.close();
		}
	}


	/**
	 * Inner class that is used to make the data on the blocked stream
	 * available as a normal stream.
	 */
	class BlockInputStream extends FilterInputStream {
		private int readPos = 0;
		private int blockLen = 0;
		private byte[] block = new byte[BLOCK + 3]; // \n.\n

		/**
		 * Constructs this BlockInputStream, backed by the given
		 * InputStream.  A BufferedInputStream is internally used.
		 */
		public BlockInputStream(InputStream in) {
			// always use a buffered stream, even though we know how
			// much bytes to write/read, since this is just faster for
			// some reason
			super(new BufferedInputStream(in));
		}

		@Override
		public int available() {
			return blockLen - readPos;
		}

		@Override
		public boolean markSupported() {
			return false;
		}

		@Override
		public void mark(int readlimit) {
			throw new AssertionError("Not implemented!");
		}

		@Override
		public void reset() {
			throw new AssertionError("Not implemented!");
		}

		/**
		 * Small wrapper to get a blocking variant of the read() method
		 * on the BufferedInputStream.  We want to benefit from the
		 * Buffered pre-fetching, but not dealing with half blocks.
		 * Changing this class to be able to use the partially received
		 * data will greatly complicate matters, while a performance
		 * improvement is debatable given the relatively small size of
		 * our blocks.  Maybe it does speed up on slower links, then
		 * consider this method a quick bug fix/workaround.
		 *
		 * @return false if reading the block failed due to EOF
		 */
		private boolean _read(byte[] b, int len) throws IOException {
			int s;
			int off = 0;

			while (len > 0) {
				s = in.read(b, off, len);
				if (s == -1) {
					// if we have read something before, we should have been
					// able to read the whole, so make this fatal
					if (off > 0) {
						if (debug) {
							logRd("the following incomplete block was received:");
							logRx(new String(b, 0, off, "UTF-8"));
						}
						throw new IOException("Read from " +
								con.getInetAddress().getHostName() + ":" +
								con.getPort() + ": Incomplete block read from stream");
					}
					if (debug)
						logRd("server closed the connection (EOF)");
					return false;
				}
				len -= s;
				off += s;
			}

			return true;
		}

		/**
		 * Reads the next block on the stream into the internal buffer,
		 * or writes the prompt in the buffer.
		 *
		 * The blocked stream protocol consists of first a two byte
		 * integer indicating the length of the block, then the
		 * block, followed by another length + block.  The end of
		 * such sequence is put in the last bit of the length, and
		 * hence this length should be shifted to the right to
		 * obtain the real length value first.  We simply fetch
		 * blocks here as soon as they are needed for the stream's
		 * read methods.
		 *
		 * The user-flush, which is an implicit effect of the end of
		 * a block sequence, is communicated beyond the stream by
		 * inserting a prompt sequence on the stream after the last
		 * block.  This method makes sure that a final block ends with a
		 * newline, if it doesn't already, in order to facilitate a
		 * Reader that is possibly chained to this InputStream.
		 *
		 * If the stream is not positioned correctly, hell will break
		 * loose.
		 */
		private int readBlock() throws IOException {
			// read next two bytes (short)
			if (!_read(blklen, 2))
				return(-1);

			// Get the short-value and store its value in blockLen.
			blockLen = (short)(
					(blklen[0] & 0xFF) >> 1 |
					(blklen[1] & 0xFF) << 7
					);
			readPos = 0;

			if (debug) {
				if ((blklen[0] & 0x1) == 1) {
					logRd("read final block: " + blockLen + " bytes");
				} else {
					logRd("read new block: " + blockLen + " bytes");
				}
			}

			// sanity check to avoid bad servers make us do an ugly
			// stack trace
			if (blockLen > block.length)
				throw new AssertionError("Server sent a block " +
						"larger than BLOCKsize: " +
						blockLen + " > " + block.length);
			if (!_read(block, blockLen))
				return(-1);

			if (debug)
				logRx(new String(block, 0, blockLen, "UTF-8"));

			// if this is the last block, make it end with a newline and
			// prompt
			if ((blklen[0] & 0x1) == 1) {
				if (blockLen > 0 && block[blockLen - 1] != '\n') {
					// to terminate the block in a Reader
					block[blockLen++] = '\n';
				}
				// insert 'fake' flush
				block[blockLen++] = BufferedMCLReader.PROMPT;
				block[blockLen++] = '\n';
				if (debug)
					logRd("inserting prompt");
			}

			return(blockLen);
		}

		@Override
		public int read() throws IOException {
			if (available() == 0) {
				if (readBlock() == -1)
					return(-1);
			}
				
			if (debug)
				logRx(new String(block, readPos, 1, "UTF-8"));
			return (int)block[readPos++];
		}

		@Override
		public int read(byte[] b) throws IOException {
			return read(b, 0, b.length);
		}

		@Override
		public int read(byte[] b, int off, int len) throws IOException {
			int t;
			int size = 0;
			while (size < len) {
				t = available();
				if (t == 0) {
					if (size != 0)
						break;
					if (readBlock() == -1) {
						if (size == 0)
							size = -1;
						break;
					}
					t = available();
				}
				if (len > t) {
					System.arraycopy(block, readPos, b, off, t);
					off += t;
					len -= t;
					readPos += t;
					size += t;
				} else {
					System.arraycopy(block, readPos, b, off, len);
					readPos += len;
					size += len;
					break;
				}
			}
			return size;
		}

		@Override
		public long skip(long n) throws IOException {
			long skip = n;
			int t = 0;
			while (skip > 0) {
				t = available();
				if (skip > t) {
					skip -= t;
					readPos += t;
					readBlock();
				} else {
					readPos += skip;
					break;
				}
			}
			return n;
		}
	}

	/**
	 * Closes the streams and socket connected to the server if
	 * possible.  If an error occurs during disconnecting it is ignored.
	 */
	public synchronized void close() {
		try {
			if (reader != null) reader.close();
			if (writer != null) writer.close();
			if (fromMonet != null) fromMonet.close();
			if (toMonet != null) toMonet.close();
			if (con != null) con.close();
			if (debug && log instanceof FileWriter) log.close();
		} catch (IOException e) {
			// ignore it
		}
	}

	/**
	 * Destructor called by garbage collector before destroying this
	 * object tries to disconnect the MonetDB connection if it has not
	 * been disconnected already.
	 */
	@Override
	protected void finalize() throws Throwable {
		close();
		super.finalize();
	}


	/**
	 * Writes a logline tagged with a timestamp using the given string.
	 * Used for debugging purposes only and represents a message that is
	 * connected to writing to the socket.  A logline might look like:
	 * TX 152545124: Hello MonetDB!
	 *
	 * @param message the message to log
	 * @throws IOException if an IO error occurs while writing to the logfile
	 */
	private void logTx(String message) throws IOException {
		log.write("TX " + System.currentTimeMillis() +
			": " + message + "\n");
	}

	/**
	 * Writes a logline tagged with a timestamp using the given string.
	 * Lines written using this log method are tagged as "added
	 * metadata" which is not strictly part of the data sent.
	 *
	 * @param message the message to log
	 * @throws IOException if an IO error occurs while writing to the logfile
	 */
	private void logTd(String message) throws IOException {
		log.write("TD " + System.currentTimeMillis() +
			": " + message + "\n");
	}

	/**
	 * Writes a logline tagged with a timestamp using the given string,
	 * and flushes afterwards.  Used for debugging purposes only and
	 * represents a message that is connected to reading from the
	 * socket.  The log is flushed after writing the line.  A logline
	 * might look like:
	 * RX 152545124: Hi JDBC!
	 *
	 * @param message the message to log
	 * @throws IOException if an IO error occurs while writing to the logfile
	 */
	private void logRx(String message) throws IOException {
		log.write("RX " + System.currentTimeMillis() +
			": " + message + "\n");
		log.flush();
	}

	/**
	 * Writes a logline tagged with a timestamp using the given string,
	 * and flushes afterwards.  Lines written using this log method are
	 * tagged as "added metadata" which is not strictly part of the data
	 * received.
	 *
	 * @param message the message to log
	 * @throws IOException if an IO error occurs while writing to the logfile
	 */
	private void logRd(String message) throws IOException {
		log.write("RD " + System.currentTimeMillis() +
			": " + message + "\n");
		log.flush();
	}
}