diff src/main/java/nl/cwi/monetdb/mcl/net/MapiSocket.java @ 0:a5a898f6886c

Copy of MonetDB java directory changeset e6e32756ad31.
author Sjoerd Mullender <sjoerd@acm.org>
date Wed, 21 Sep 2016 09:34:48 +0200 (2016-09-21)
parents
children 7e0d71a22677 6cc63d6cb224
line wrap: on
line diff
new file mode 100644
--- /dev/null
+++ b/src/main/java/nl/cwi/monetdb/mcl/net/MapiSocket.java
@@ -0,0 +1,1103 @@
+/*
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0.  If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright 1997 - July 2008 CWI, August 2008 - 2016 MonetDB B.V.
+ */
+
+package nl.cwi.monetdb.mcl.net;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.EOFException;
+import java.io.FileWriter;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.io.Writer;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import nl.cwi.monetdb.mcl.MCLException;
+import nl.cwi.monetdb.mcl.io.BufferedMCLReader;
+import nl.cwi.monetdb.mcl.io.BufferedMCLWriter;
+import nl.cwi.monetdb.mcl.parser.MCLParseException;
+
+/**
+ * A Socket for communicating with the MonetDB database in MAPI block
+ * mode.
+ * 
+ * The MapiSocket implements the protocol specifics of the MAPI block
+ * mode protocol, and interfaces it as a socket that delivers a
+ * BufferedReader and a BufferedWriter.  Because logging in is an
+ * integral part of the MAPI protocol, the MapiSocket performs the login
+ * procedure.  Like the Socket class, various options can be set before
+ * calling the connect() method to influence the login process.  Only
+ * after a successful call to connect() the BufferedReader and
+ * BufferedWriter can be retrieved.
+ * <br />
+ * For each line read, it is determined what type of line it is
+ * according to the MonetDB MAPI protocol.  This results in a line to be
+ * PROMPT, HEADER, RESULT, ERROR or UNKNOWN.  Use the getLineType()
+ * method on the BufferedMCLReader to retrieve the type of the last
+ * line read.
+ * 
+ * For debugging purposes a socket level debugging is implemented where
+ * each and every interaction to and from the MonetDB server is logged
+ * to a file on disk.<br />
+ * Incoming messages are prefixed by "RX" (received by the driver),
+ * outgoing messages by "TX" (transmitted by the driver).  Special
+ * decoded non-human readable messages are prefixed with "RD" and "TD"
+ * instead.  Following this two char prefix, a timestamp follows as the
+ * number of milliseconds since the UNIX epoch.  The rest of the line is
+ * a String representation of the data sent or received.
+ * 
+ * The general use of this Socket must be seen only in the full context
+ * of a MAPI connection to a server.  It has the same ingredients as a
+ * normal Socket, allowing for seamless plugging.
+ * <pre>
+ *    Socket   \     /  InputStream  ----&gt; (BufferedMCL)Reader
+ *              &gt; o &lt;
+ *  MapiSocket /     \ OutputStream  ----&gt; (BufferedMCL)Writer
+ * </pre>
+ * The MapiSocket allows to retrieve Streams for communicating.  They
+ * are interfaced, so they can be chained in any way.  While the Socket
+ * transparently deals with how data is sent over the wire, the actual
+ * data read needs to be interpreted, for which a Reader/Writer
+ * interface is most sufficient.  In particular the BufferedMCL*
+ * implementations of those interfaces supply some extra functionality
+ * geared towards the format of the data.
+ *
+ * @author Fabian Groffen
+ * @version 4.1
+ * @see nl.cwi.monetdb.mcl.io.BufferedMCLReader
+ * @see nl.cwi.monetdb.mcl.io.BufferedMCLWriter
+ */
+public final class MapiSocket {
+	/** The TCP Socket to mserver */
+	private Socket con;
+	/** Stream from the Socket for reading */
+	private InputStream fromMonet;
+	/** Stream from the Socket for writing */
+	private OutputStream toMonet;
+	/** MCLReader on the InputStream */
+	private BufferedMCLReader reader;
+	/** MCLWriter on the OutputStream */
+	private BufferedMCLWriter writer;
+	/** protocol version of the connection */
+	private int version;
+
+	/** The database to connect to */
+	private String database = null;
+	/** The language to connect with */
+	private String language = "sql";
+	/** The hash methods to use (null = default) */
+	private String hash = null;
+	/** Whether we should follow redirects */
+	private boolean followRedirects = true;
+	/** How many redirections do we follow until we're fed up with it? */
+	private int ttl = 10;
+	/** Whether we are debugging or not */
+	private boolean debug = false;
+	/** The Writer for the debug log-file */
+	private Writer log;
+
+	/** The blocksize (hardcoded in compliance with stream.mx) */
+	public final static int BLOCK = 8 * 1024 - 2;
+
+	/** A short in two bytes for holding the block size in bytes */
+	private byte[] blklen = new byte[2];
+
+	/**
+	 * Constructs a new MapiSocket.
+	 */
+	public MapiSocket() {
+		con = null;
+	}
+
+	/**
+	 * Sets the database to connect to.  If database is null, a
+	 * connection is made to the default database of the server.  This
+	 * is also the default.
+	 *
+	 * @param db the database
+	 */
+	public void setDatabase(String db) {
+		this.database = db;
+	}
+
+	/**
+	 * Sets the language to use for this connection.
+	 *
+	 * @param lang the language
+	 */
+	public void setLanguage(String lang) {
+		this.language = lang;
+	}
+
+	/**
+	 * Sets the hash method to use.  Note that this method is intended
+	 * for debugging purposes.  Setting a hash method can yield in
+	 * connection failures.  Multiple hash methods can be given by
+	 * separating the hashes by commas.
+	 * DON'T USE THIS METHOD if you don't know what you're doing.
+	 *
+	 * @param hash the hash method to use
+	 */
+	public void setHash(String hash) {
+		this.hash = hash;
+	}
+
+	/**
+	 * Sets whether MCL redirections should be followed or not.  If set
+	 * to false, an MCLException will be thrown when a redirect is
+	 * encountered during connect.  The default bahaviour is to
+	 * automatically follow redirects.
+	 *
+	 * @param r whether to follow redirects (true) or not (false)
+	 */
+	public void setFollowRedirects(boolean r) {
+		this.followRedirects = r;
+	}
+
+	/**
+	 * Sets the number of redirects that are followed when
+	 * followRedirects is true.  In order to avoid going into an endless
+	 * loop due to some evil server, or another error, a maximum number
+	 * of redirects that may be followed can be set here.  Note that to
+	 * disable the following of redirects you should use
+	 * setFollowRedirects.
+	 *
+	 * @see #setFollowRedirects(boolean r)
+	 * @param t the number of redirects before an exception is thrown
+	 */
+	public void setTTL(int t) {
+		this.ttl = t;
+	}
+
+	/**
+	 * Set the SO_TIMEOUT on the underlying Socket.  When for some
+	 * reason the connection to the database hangs, this setting can be
+	 * useful to break out of this indefinite wait.
+	 * This option must be enabled prior to entering the blocking
+	 * operation to have effect.
+	 *
+	 * @param s The specified timeout, in milliseconds.  A timeout
+	 *        of zero is interpreted as an infinite timeout.
+	 * @throws SocketException Issue with the socket
+	 */
+	public void setSoTimeout(int s) throws SocketException {
+		// limit time to wait on blocking operations (0 = indefinite)
+		con.setSoTimeout(s);
+	}
+
+	/**
+	 * Gets the SO_TIMEOUT from the underlying Socket.
+	 *
+	 * @return the currently in use timeout in milliseconds
+	 * @throws SocketException Issue with the socket
+	 */
+	public int getSoTimeout() throws SocketException {
+		return con.getSoTimeout();
+	}
+	
+	/**
+	 * Enables/disables debug
+	 *
+	 * @param debug Value to set
+	 */
+	public void setDebug(boolean debug) {
+		this.debug = debug;
+	}
+
+	/**
+	 * Connects to the given host and port, logging in as the given
+	 * user.  If followRedirect is false, a RedirectionException is
+	 * thrown when a redirect is encountered.
+	 *
+	 * @param host the hostname, or null for the loopback address
+	 * @param port the port number
+	 * @param user the username
+	 * @param pass the password
+	 * @return A List with informational (warning) messages. If this 
+	 * 		list is empty; then there are no warnings.
+	 * @throws IOException if an I/O error occurs when creating the
+	 *         socket
+	 * @throws MCLParseException if bogus data is received
+	 * @throws MCLException if an MCL related error occurs
+	 */
+	public List<String> connect(String host, int port, String user, String pass)
+		throws IOException, MCLParseException, MCLException {
+		// Wrap around the internal connect that needs to know if it
+		// should really make a TCP connection or not.
+		return connect(host, port, user, pass, true);
+	}
+
+	private List<String> connect(String host, int port, String user, String pass,
+			boolean makeConnection)
+		throws IOException, MCLParseException, MCLException {
+		if (ttl-- <= 0)
+			throw new MCLException("Maximum number of redirects reached, aborting connection attempt.  Sorry.");
+
+		if (makeConnection) {
+			con = new Socket(host, port);
+			// set nodelay, as it greatly speeds up small messages (like we
+			// often do)
+			con.setTcpNoDelay(true);
+
+			fromMonet = new BlockInputStream(con.getInputStream());
+			toMonet = new BlockOutputStream(con.getOutputStream());
+			try {
+				reader = new BufferedMCLReader(fromMonet, "UTF-8");
+				writer = new BufferedMCLWriter(toMonet, "UTF-8");
+			} catch (UnsupportedEncodingException e) {
+				throw new AssertionError(e.toString());
+			}
+			writer.registerReader(reader);
+		}
+
+		String c = reader.readLine();
+		reader.waitForPrompt();
+		writer.writeLine(
+				getChallengeResponse(
+					c,
+					user,
+					pass,
+					language,
+					database,
+					hash
+					)
+				);
+
+		// read monet response till prompt
+		List<String> redirects = new ArrayList<String>();
+		List<String> warns = new ArrayList<String>();
+		String err = "", tmp;
+		int lineType;
+		do {
+			if ((tmp = reader.readLine()) == null)
+				throw new IOException("Read from " +
+						con.getInetAddress().getHostName() + ":" +
+						con.getPort() + ": End of stream reached");
+			if ((lineType = reader.getLineType()) == BufferedMCLReader.ERROR) {
+				err += "\n" + tmp.substring(7);
+			} else if (lineType == BufferedMCLReader.INFO) {
+				warns.add(tmp.substring(1));
+			} else if (lineType == BufferedMCLReader.REDIRECT) {
+				redirects.add(tmp.substring(1));
+			}
+		} while (lineType != BufferedMCLReader.PROMPT);
+		if (err != "") {
+			close();
+			throw new MCLException(err.trim());
+		}
+		if (!redirects.isEmpty()) {
+			if (followRedirects) {
+				// Ok, server wants us to go somewhere else.  The list
+				// might have multiple clues on where to go.  For now we
+				// don't support anything intelligent but trying the
+				// first one.  URI should be in form of:
+				// "mapi:monetdb://host:port/database?arg=value&..."
+				// or
+				// "mapi:merovingian://proxy?arg=value&..."
+				// note that the extra arguments must be obeyed in both
+				// cases
+				String suri = redirects.get(0).toString();
+				if (!suri.startsWith("mapi:"))
+					throw new MCLException("unsupported redirect: " + suri);
+
+				URI u;
+				try {
+					u = new URI(suri.substring(5));
+				} catch (URISyntaxException e) {
+					throw new MCLParseException(e.toString());
+				}
+
+				tmp = u.getQuery();
+				if (tmp != null) {
+					String args[] = tmp.split("&");
+					for (int i = 0; i < args.length; i++) {
+						int pos = args[i].indexOf("=");
+						if (pos > 0) {
+							tmp = args[i].substring(0, pos);
+							if (tmp.equals("database")) {
+								tmp = args[i].substring(pos + 1);
+								if (!tmp.equals(database)) {
+									warns.add("redirect points to different " +
+											"database: " + tmp);
+									setDatabase(tmp);
+								}
+							} else if (tmp.equals("language")) {
+								tmp = args[i].substring(pos + 1);
+								warns.add("redirect specifies use of different language: " + tmp);
+								setLanguage(tmp);
+							} else if (tmp.equals("user")) {
+								tmp = args[i].substring(pos + 1);
+								if (!tmp.equals(user))
+									warns.add("ignoring different username '" + tmp + "' set by " +
+											"redirect, what are the security implications?");
+							} else if (tmp.equals("password")) {
+								warns.add("ignoring different password set by redirect, " +
+										"what are the security implications?");
+							} else {
+								warns.add("ignoring unknown argument '" + tmp + "' from redirect");
+							}
+						} else {
+							warns.add("ignoring illegal argument from redirect: " +
+									args[i]);
+						}
+					}
+				}
+
+				if (u.getScheme().equals("monetdb")) {
+					// this is a redirect to another (monetdb) server,
+					// which means a full reconnect
+					// avoid the debug log being closed
+					if (debug) {
+						debug = false;
+						close();
+						debug = true;
+					} else {
+						close();
+					}
+					tmp = u.getPath();
+					if (tmp != null && tmp.length() != 0) {
+						tmp = tmp.substring(1).trim();
+						if (!tmp.isEmpty() && !tmp.equals(database)) {
+							warns.add("redirect points to different " +
+								"database: " + tmp);
+							setDatabase(tmp);
+						}
+					}
+					int p = u.getPort();
+					warns.addAll(connect(u.getHost(), p == -1 ? port : p,
+							user, pass, true));
+					warns.add("Redirect by " + host + ":" + port + " to " + suri);
+				} else if (u.getScheme().equals("merovingian")) {
+					// reuse this connection to inline connect to the
+					// right database that Merovingian proxies for us
+					warns.addAll(connect(host, port, user, pass, false));
+				} else {
+					throw new MCLException("unsupported scheme in redirect: " + suri);
+				}
+			} else {
+				StringBuilder msg = new StringBuilder("The server sent a redirect for this connection:");
+				for (String it : redirects) {
+					msg.append(" [" + it + "]");
+				}
+				throw new MCLException(msg.toString());
+			}
+		}
+		return warns;
+	}
+
+	/**
+	 * A little helper function that processes a challenge string, and
+	 * returns a response string for the server.  If the challenge
+	 * string is null, a challengeless response is returned.
+	 *
+	 * @param chalstr the challenge string
+	 * @param username the username to use
+	 * @param password the password to use
+	 * @param language the language to use
+	 * @param database the database to connect to
+	 * @param hash the hash method(s) to use, or NULL for all supported
+	 *             hashes
+	 */
+	private String getChallengeResponse(
+			String chalstr,
+			String username,
+			String password,
+			String language,
+			String database,
+			String hash
+	) throws MCLParseException, MCLException, IOException {
+		String response;
+		String algo;
+
+		// parse the challenge string, split it on ':'
+		String[] chaltok = chalstr.split(":");
+		if (chaltok.length <= 4) throw
+			new MCLParseException("Server challenge string unusable!  Challenge contains too few tokens: " + chalstr);
+
+		// challenge string to use as salt/key
+		String challenge = chaltok[0];
+		String servert = chaltok[1];
+		try {
+			version = Integer.parseInt(chaltok[2].trim());	// protocol version
+		} catch (NumberFormatException e) {
+			throw new MCLParseException("Protocol version unparseable: " + chaltok[3]);
+		}
+
+		// handle the challenge according to the version it is
+		switch (version) {
+			default:
+				throw new MCLException("Unsupported protocol version: " + version);
+			case 9:
+				// proto 9 is like 8, but uses a hash instead of the
+				// plain password, the server tells us which hash in the
+				// challenge after the byte-order
+
+				/* NOTE: Java doesn't support RIPEMD160 :( */
+				if (chaltok[5].equals("SHA512")) {
+					algo = "SHA-512";
+				} else if (chaltok[5].equals("SHA384")) {
+					algo = "SHA-384";
+				} else if (chaltok[5].equals("SHA256")) {
+					algo = "SHA-256";
+				/* NOTE: Java doesn't support SHA-224 */
+				} else if (chaltok[5].equals("SHA1")) {
+					algo = "SHA-1";
+				} else if (chaltok[5].equals("MD5")) {
+					algo = "MD5";
+				} else {
+					throw new MCLException("Unsupported password hash: " +
+							chaltok[5]);
+				}
+
+				try {
+					MessageDigest md = MessageDigest.getInstance(algo);
+					md.update(password.getBytes("UTF-8"));
+					byte[] digest = md.digest();
+					password = toHex(digest);
+				} catch (NoSuchAlgorithmException e) {
+					throw new AssertionError("internal error: " + e.toString());
+				} catch (UnsupportedEncodingException e) {
+					throw new AssertionError("internal error: " + e.toString());
+				}
+
+				// proto 7 (finally) used the challenge and works with a
+				// password hash.  The supported implementations come
+				// from the server challenge.  We chose the best hash
+				// we can find, in the order SHA1, MD5, plain.  Also,
+				// the byte-order is reported in the challenge string,
+				// which makes sense, since only blockmode is supported.
+				// proto 8 made this obsolete, but retained the
+				// byte-order report for future "binary" transports.  In
+				// proto 8, the byte-order of the blocks is always little
+				// endian because most machines today are.
+				String hashes = (hash == null ? chaltok[3] : hash);
+				Set<String> hashesSet = new HashSet<String>(Arrays.asList(hashes.toUpperCase().split("[, ]")));
+
+				// if we deal with merovingian, mask our credentials
+				if (servert.equals("merovingian") && !language.equals("control")) {
+					username = "merovingian";
+					password = "merovingian";
+				}
+				String pwhash;
+				algo = null;
+				
+				if (hashesSet.contains("SHA512")) {
+					algo = "SHA-512";
+					pwhash = "{SHA512}";
+				} else if (hashesSet.contains("SHA384")) {
+					algo = "SHA-384";
+					pwhash = "{SHA384}";
+				} else if (hashesSet.contains("SHA256")) {
+					algo = "SHA-256";
+					pwhash = "{SHA256}";
+				} else if (hashesSet.contains("SHA1")) {
+					algo = "SHA-1";
+					pwhash = "{SHA1}";
+				} else if (hashesSet.contains("MD5")) {
+					algo = "MD5";
+					pwhash = "{MD5}";
+				} else {
+					throw new MCLException("no supported password hashes in " + hashes);
+				}
+				if (algo != null) {
+					try {
+						MessageDigest md = MessageDigest.getInstance(algo);
+						md.update(password.getBytes("UTF-8"));
+						md.update(challenge.getBytes("UTF-8"));
+						byte[] digest = md.digest();
+						pwhash += toHex(digest);
+					} catch (NoSuchAlgorithmException e) {
+						throw new AssertionError("internal error: " + e.toString());
+					} catch (UnsupportedEncodingException e) {
+						throw new AssertionError("internal error: " + e.toString());
+					}
+				}
+				// TODO: some day when we need this, we should store
+				// this
+				if (chaltok[4].equals("BIG")) {
+					// byte-order of server is big-endian
+				} else if (chaltok[4].equals("LIT")) {
+					// byte-order of server is little-endian
+				} else {
+					throw new MCLParseException("Invalid byte-order: " + chaltok[5]);
+				}
+
+				// generate response
+				response = "BIG:";	// JVM byte-order is big-endian
+				response += username + ":" + pwhash + ":" + language;
+				response += ":" + (database == null ? "" : database) + ":";
+				
+				return response;
+		}
+	}
+	
+	private static char hexChar(int n) {
+		return (n > 9) 
+				? (char) ('a' + (n - 10))
+				: (char) ('0' + n);
+	}
+
+	/**
+	 * Small helper method to convert a byte string to a hexadecimal
+	 * string representation.
+	 *
+	 * @param digest the byte array to convert
+	 * @return the byte array as hexadecimal string
+	 */
+	private static String toHex(byte[] digest) {
+		char[] result = new char[digest.length * 2];
+		int pos = 0;
+		for (int i = 0; i < digest.length; i++) {
+			result[pos++] = hexChar((digest[i] & 0xf0) >> 4);
+			result[pos++] = hexChar(digest[i] & 0x0f);
+		}
+		return new String(result);
+	}
+
+	/**
+	 * Returns an InputStream that reads from this open connection on
+	 * the MapiSocket.
+	 *
+	 * @return an input stream that reads from this open connection
+	 */
+	public InputStream getInputStream() {
+		return fromMonet;
+	}
+
+	/**
+	 * Returns an output stream for this MapiSocket.
+	 *
+	 * @return an output stream for writing bytes to this MapiSocket
+	 */
+	public OutputStream getOutputStream() {
+		return toMonet;
+	}
+
+	/**
+	 * Returns a Reader for this MapiSocket.  The Reader is a
+	 * BufferedMCLReader which does protocol interpretation of the
+	 * BlockInputStream produced by this MapiSocket.
+	 *
+	 * @return a BufferedMCLReader connected to this MapiSocket
+	 */
+	public BufferedMCLReader getReader() {
+		return reader;
+	}
+
+	/**
+	 * Returns a Writer for this MapiSocket.  The Writer is a
+	 * BufferedMCLWriter which produces protocol compatible data blocks
+	 * that the BlockOutputStream can properly translate into blocks.
+	 *
+	 * @return a BufferedMCLWriter connected to this MapiSocket
+	 */
+	public BufferedMCLWriter getWriter() {
+		return writer;
+	}
+
+	/**
+	 * Returns the mapi protocol version used by this socket.  The
+	 * protocol version depends on the server being used.  Users of the
+	 * MapiSocket should check this version to act appropriately.
+	 *
+	 * @return the mapi protocol version
+	 */
+	public int getProtocolVersion() {
+		return version;
+	}
+
+	/**
+	 * Enables logging to a file what is read and written from and to
+	 * the server.  Logging can be enabled at any time.  However, it is
+	 * encouraged to start debugging before actually connecting the
+	 * socket.
+	 *
+	 * @param filename the name of the file to write to
+	 * @throws IOException if the file could not be opened for writing
+	 */
+	public void debug(String filename) throws IOException {
+		debug(new FileWriter(filename));
+	}
+	
+	/**
+	 * Enables logging to a stream what is read and written from and to
+	 * the server.  Logging can be enabled at any time.  However, it is
+	 * encouraged to start debugging before actually connecting the
+	 * socket.
+	 *
+	 * @param out to write the log to
+	 * @throws IOException if the file could not be opened for writing
+	 */
+	public void debug(PrintStream out) throws IOException {
+		debug(new PrintWriter(out));
+	}
+	
+	/**
+	 * Enables logging to a stream what is read and written from and to
+	 * the server.  Logging can be enabled at any time.  However, it is
+	 * encouraged to start debugging before actually connecting the
+	 * socket.
+	 *
+	 * @param out to write the log to
+	 * @throws IOException if the file could not be opened for writing
+	 */
+	public void debug(Writer out) throws IOException {
+		log = out;
+		debug = true;
+	}
+
+	/**
+	 * Inner class that is used to write data on a normal stream as a
+	 * blocked stream.  A call to the flush() method will write a
+	 * "final" block to the underlying stream.  Non-final blocks are
+	 * written as soon as one or more bytes would not fit in the
+	 * current block any more.  This allows to write to a block to it's
+	 * full size, and then flush it explicitly to have a final block
+	 * being written to the stream.
+	 */
+	class BlockOutputStream extends FilterOutputStream {
+		private int writePos = 0;
+		private byte[] block = new byte[BLOCK];
+		private int blocksize = 0;
+
+		/**
+		 * Constructs this BlockOutputStream, backed by the given
+		 * OutputStream.  A BufferedOutputStream is internally used.
+		 */
+		public BlockOutputStream(OutputStream out) {
+			// always use a buffered stream, even though we know how
+			// much bytes to write/read, since this is just faster for
+			// some reason
+			super(new BufferedOutputStream(out));
+		}
+
+		@Override
+		public void flush() throws IOException {
+			// write the block (as final) then flush.
+			writeBlock(true);
+			out.flush();
+
+			// it's a bit nasty if an exception is thrown from the log,
+			// but ignoring it can be nasty as well, so it is decided to
+			// let it go so there is feedback about something going wrong
+			// it's a bit nasty if an exception is thrown from the log,
+			// but ignoring it can be nasty as well, so it is decided to
+			// let it go so there is feedback about something going wrong
+			if (debug) {
+				log.flush();
+			}
+		}
+
+		/**
+		 * writeBlock puts the data in the block on the stream.  The
+		 * boolean last controls whether the block is sent with an
+		 * indicator to note it is the last block of a sequence or not.
+		 *
+		 * @param last whether this is the last block
+		 * @throws IOException if writing to the stream failed
+		 */
+		public void writeBlock(boolean last) throws IOException {
+			if (last) {
+				// always fits, because of BLOCK's size
+				blocksize = (short)writePos;
+				// this is the last block, so encode least
+				// significant bit in the first byte (little-endian)
+				blklen[0] = (byte)(blocksize << 1 & 0xFF | 1);
+				blklen[1] = (byte)(blocksize >> 7);
+			} else {
+				// always fits, because of BLOCK's size
+				blocksize = (short)BLOCK;
+				// another block will follow, encode least
+				// significant bit in the first byte (little-endian)
+				blklen[0] = (byte)(blocksize << 1 & 0xFF);
+				blklen[1] = (byte)(blocksize >> 7);
+			}
+
+			out.write(blklen);
+
+			// write the actual block
+			out.write(block, 0, writePos);
+
+			if (debug) {
+				if (last) {
+					logTd("write final block: " + writePos + " bytes");
+				} else {
+					logTd("write block: " + writePos + " bytes");
+				}
+				logTx(new String(block, 0, writePos, "UTF-8"));
+			}
+
+			writePos = 0;
+		}
+
+		@Override
+		public void write(int b) throws IOException {
+			if (writePos == BLOCK) {
+				writeBlock(false);
+			}
+			block[writePos++] = (byte)b;
+		}
+
+		@Override
+		public void write(byte[] b) throws IOException {
+			write(b, 0, b.length);
+		}
+
+		@Override
+		public void write(byte[] b, int off, int len) throws IOException {
+			int t = 0;
+			while (len > 0) {
+				t = BLOCK - writePos;
+				if (len > t) {
+					System.arraycopy(b, off, block, writePos, t);
+					off += t;
+					len -= t;
+					writePos += t;
+					writeBlock(false);
+				} else {
+					System.arraycopy(b, off, block, writePos, len);
+					writePos += len;
+					break;
+				}
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			// we don't want the flush() method to be called (default of
+			// the FilterOutputStream), so we close manually here
+			out.close();
+		}
+	}
+
+
+	/**
+	 * Inner class that is used to make the data on the blocked stream
+	 * available as a normal stream.
+	 */
+	class BlockInputStream extends FilterInputStream {
+		private int readPos = 0;
+		private int blockLen = 0;
+		private byte[] block = new byte[BLOCK + 3]; // \n.\n
+
+		/**
+		 * Constructs this BlockInputStream, backed by the given
+		 * InputStream.  A BufferedInputStream is internally used.
+		 */
+		public BlockInputStream(InputStream in) {
+			// always use a buffered stream, even though we know how
+			// much bytes to write/read, since this is just faster for
+			// some reason
+			super(new BufferedInputStream(in));
+		}
+
+		@Override
+		public int available() {
+			return blockLen - readPos;
+		}
+
+		@Override
+		public boolean markSupported() {
+			return false;
+		}
+
+		@Override
+		public void mark(int readlimit) {
+			throw new AssertionError("Not implemented!");
+		}
+
+		@Override
+		public void reset() {
+			throw new AssertionError("Not implemented!");
+		}
+
+		/**
+		 * Small wrapper to get a blocking variant of the read() method
+		 * on the BufferedInputStream.  We want to benefit from the
+		 * Buffered pre-fetching, but not dealing with half blocks.
+		 * Changing this class to be able to use the partially received
+		 * data will greatly complicate matters, while a performance
+		 * improvement is debatable given the relatively small size of
+		 * our blocks.  Maybe it does speed up on slower links, then
+		 * consider this method a quick bug fix/workaround.
+		 *
+		 * @return false if reading the block failed due to EOF
+		 */
+		private boolean _read(byte[] b, int len) throws IOException {
+			int s;
+			int off = 0;
+
+			while (len > 0) {
+				s = in.read(b, off, len);
+				if (s == -1) {
+					// if we have read something before, we should have been
+					// able to read the whole, so make this fatal
+					if (off > 0) {
+						if (debug) {
+							logRd("the following incomplete block was received:");
+							logRx(new String(b, 0, off, "UTF-8"));
+						}
+						throw new IOException("Read from " +
+								con.getInetAddress().getHostName() + ":" +
+								con.getPort() + ": Incomplete block read from stream");
+					}
+					if (debug)
+						logRd("server closed the connection (EOF)");
+					return false;
+				}
+				len -= s;
+				off += s;
+			}
+
+			return true;
+		}
+
+		/**
+		 * Reads the next block on the stream into the internal buffer,
+		 * or writes the prompt in the buffer.
+		 *
+		 * The blocked stream protocol consists of first a two byte
+		 * integer indicating the length of the block, then the
+		 * block, followed by another length + block.  The end of
+		 * such sequence is put in the last bit of the length, and
+		 * hence this length should be shifted to the right to
+		 * obtain the real length value first.  We simply fetch
+		 * blocks here as soon as they are needed for the stream's
+		 * read methods.
+		 *
+		 * The user-flush, which is an implicit effect of the end of
+		 * a block sequence, is communicated beyond the stream by
+		 * inserting a prompt sequence on the stream after the last
+		 * block.  This method makes sure that a final block ends with a
+		 * newline, if it doesn't already, in order to facilitate a
+		 * Reader that is possibly chained to this InputStream.
+		 *
+		 * If the stream is not positioned correctly, hell will break
+		 * loose.
+		 */
+		private int readBlock() throws IOException {
+			// read next two bytes (short)
+			if (!_read(blklen, 2))
+				return(-1);
+
+			// Get the short-value and store its value in blockLen.
+			blockLen = (short)(
+					(blklen[0] & 0xFF) >> 1 |
+					(blklen[1] & 0xFF) << 7
+					);
+			readPos = 0;
+
+			if (debug) {
+				if ((blklen[0] & 0x1) == 1) {
+					logRd("read final block: " + blockLen + " bytes");
+				} else {
+					logRd("read new block: " + blockLen + " bytes");
+				}
+			}
+
+			// sanity check to avoid bad servers make us do an ugly
+			// stack trace
+			if (blockLen > block.length)
+				throw new AssertionError("Server sent a block " +
+						"larger than BLOCKsize: " +
+						blockLen + " > " + block.length);
+			if (!_read(block, blockLen))
+				return(-1);
+
+			if (debug)
+				logRx(new String(block, 0, blockLen, "UTF-8"));
+
+			// if this is the last block, make it end with a newline and
+			// prompt
+			if ((blklen[0] & 0x1) == 1) {
+				if (blockLen > 0 && block[blockLen - 1] != '\n') {
+					// to terminate the block in a Reader
+					block[blockLen++] = '\n';
+				}
+				// insert 'fake' flush
+				block[blockLen++] = BufferedMCLReader.PROMPT;
+				block[blockLen++] = '\n';
+				if (debug)
+					logRd("inserting prompt");
+			}
+
+			return(blockLen);
+		}
+
+		@Override
+		public int read() throws IOException {
+			if (available() == 0) {
+				if (readBlock() == -1)
+					return(-1);
+			}
+				
+			if (debug)
+				logRx(new String(block, readPos, 1, "UTF-8"));
+			return (int)block[readPos++];
+		}
+
+		@Override
+		public int read(byte[] b) throws IOException {
+			return read(b, 0, b.length);
+		}
+
+		@Override
+		public int read(byte[] b, int off, int len) throws IOException {
+			int t;
+			int size = 0;
+			while (size < len) {
+				t = available();
+				if (t == 0) {
+					if (size != 0)
+						break;
+					if (readBlock() == -1) {
+						if (size == 0)
+							size = -1;
+						break;
+					}
+					t = available();
+				}
+				if (len > t) {
+					System.arraycopy(block, readPos, b, off, t);
+					off += t;
+					len -= t;
+					readPos += t;
+					size += t;
+				} else {
+					System.arraycopy(block, readPos, b, off, len);
+					readPos += len;
+					size += len;
+					break;
+				}
+			}
+			return size;
+		}
+
+		@Override
+		public long skip(long n) throws IOException {
+			long skip = n;
+			int t = 0;
+			while (skip > 0) {
+				t = available();
+				if (skip > t) {
+					skip -= t;
+					readPos += t;
+					readBlock();
+				} else {
+					readPos += skip;
+					break;
+				}
+			}
+			return n;
+		}
+	}
+
+	/**
+	 * Closes the streams and socket connected to the server if
+	 * possible.  If an error occurs during disconnecting it is ignored.
+	 */
+	public synchronized void close() {
+		try {
+			if (reader != null) reader.close();
+			if (writer != null) writer.close();
+			if (fromMonet != null) fromMonet.close();
+			if (toMonet != null) toMonet.close();
+			if (con != null) con.close();
+			if (debug && log instanceof FileWriter) log.close();
+		} catch (IOException e) {
+			// ignore it
+		}
+	}
+
+	/**
+	 * Destructor called by garbage collector before destroying this
+	 * object tries to disconnect the MonetDB connection if it has not
+	 * been disconnected already.
+	 */
+	@Override
+	protected void finalize() throws Throwable {
+		close();
+		super.finalize();
+	}
+
+
+	/**
+	 * Writes a logline tagged with a timestamp using the given string.
+	 * Used for debugging purposes only and represents a message that is
+	 * connected to writing to the socket.  A logline might look like:
+	 * TX 152545124: Hello MonetDB!
+	 *
+	 * @param message the message to log
+	 * @throws IOException if an IO error occurs while writing to the logfile
+	 */
+	private void logTx(String message) throws IOException {
+		log.write("TX " + System.currentTimeMillis() +
+			": " + message + "\n");
+	}
+
+	/**
+	 * Writes a logline tagged with a timestamp using the given string.
+	 * Lines written using this log method are tagged as "added
+	 * metadata" which is not strictly part of the data sent.
+	 *
+	 * @param message the message to log
+	 * @throws IOException if an IO error occurs while writing to the logfile
+	 */
+	private void logTd(String message) throws IOException {
+		log.write("TD " + System.currentTimeMillis() +
+			": " + message + "\n");
+	}
+
+	/**
+	 * Writes a logline tagged with a timestamp using the given string,
+	 * and flushes afterwards.  Used for debugging purposes only and
+	 * represents a message that is connected to reading from the
+	 * socket.  The log is flushed after writing the line.  A logline
+	 * might look like:
+	 * RX 152545124: Hi JDBC!
+	 *
+	 * @param message the message to log
+	 * @throws IOException if an IO error occurs while writing to the logfile
+	 */
+	private void logRx(String message) throws IOException {
+		log.write("RX " + System.currentTimeMillis() +
+			": " + message + "\n");
+		log.flush();
+	}
+
+	/**
+	 * Writes a logline tagged with a timestamp using the given string,
+	 * and flushes afterwards.  Lines written using this log method are
+	 * tagged as "added metadata" which is not strictly part of the data
+	 * received.
+	 *
+	 * @param message the message to log
+	 * @throws IOException if an IO error occurs while writing to the logfile
+	 */
+	private void logRd(String message) throws IOException {
+		log.write("RD " + System.currentTimeMillis() +
+			": " + message + "\n");
+		log.flush();
+	}
+}