view src/main/java/org/monetdb/mcl/net/MapiSocket.java @ 943:ff075ed5ce81

Spell check.
author Sjoerd Mullender <sjoerd@acm.org>
date Thu, 09 Jan 2025 10:56:14 +0100 (2 months ago)
parents d416e9b6b3d0
children
line wrap: on
line source
/*
 * SPDX-License-Identifier: MPL-2.0
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0.  If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 *
 * Copyright 2024, 2025 MonetDB Foundation;
 * Copyright August 2008 - 2023 MonetDB B.V.;
 * Copyright 1997 - July 2008 CWI.
 */

package org.monetdb.mcl.net;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FileWriter;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Writer;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;

import javax.net.ssl.SSLException;

import org.monetdb.mcl.MCLException;
import org.monetdb.mcl.io.BufferedMCLReader;
import org.monetdb.mcl.io.BufferedMCLWriter;
import org.monetdb.mcl.io.LineType;
import org.monetdb.mcl.parser.MCLParseException;

/**
 * A Socket for communicating with the MonetDB database in MAPI block
 * mode.
 *
 * The MapiSocket implements the protocol specifics of the MAPI block
 * mode protocol, and interfaces it as a socket that delivers a
 * BufferedReader and a BufferedWriter.  Because logging in is an
 * integral part of the MAPI protocol, the MapiSocket performs the login
 * procedure.  Like the Socket class, various options can be set before
 * calling the connect() method to influence the login process.  Only
 * after a successful call to connect() the BufferedReader and
 * BufferedWriter can be retrieved.
 *
 * For each line read, it is determined what type of line it is
 * according to the MonetDB MAPI protocol.  This results in a line to be
 * PROMPT, HEADER, RESULT, ERROR or UNKNOWN.  Use the getLineType()
 * method on the BufferedMCLReader to retrieve the type of the last
 * line read.
 *
 * For debugging purposes a socket level debugging is implemented where
 * each and every interaction to and from the MonetDB server is logged
 * to a file on disk.
 * Incoming messages are prefixed by "RX" (received by the driver),
 * outgoing messages by "TX" (transmitted by the driver).  Special
 * decoded non-human readable messages are prefixed with "RD" and "TD"
 * instead.  Following this two char prefix, a timestamp follows as the
 * number of milliseconds since the UNIX epoch.  The rest of the line is
 * a String representation of the data sent or received.
 *
 * The general use of this Socket must be seen only in the full context
 * of a MAPI connection to a server.  It has the same ingredients as a
 * normal Socket, allowing for seamless plugging.
 * <pre>
 *    Socket   \     /  InputStream  ----&gt; (BufferedMCL)Reader
 *              &gt; o &lt;
 *  MapiSocket /     \ OutputStream  ----&gt; (BufferedMCL)Writer
 * </pre>
 * The MapiSocket allows to retrieve Streams for communicating.  They
 * are interfaced, so they can be chained in any way.  While the Socket
 * transparently deals with how data is sent over the wire, the actual
 * data read needs to be interpreted, for which a Reader/Writer
 * interface is most sufficient.  In particular the BufferedMCL*
 * implementations of those interfaces supply some extra functionality
 * geared towards the format of the data.
 *
 * @author Fabian Groffen
 * @version 4.4
 * @see org.monetdb.mcl.io.BufferedMCLReader
 * @see org.monetdb.mcl.io.BufferedMCLWriter
 */
public final class MapiSocket {
	/* an even number of NUL bytes used during the handshake */
	private static final byte[] NUL_BYTES = new byte[]{ 0, 0, 0, 0, 0, 0, 0, 0 };

	/* A mapping between hash algorithm names as used in the MAPI
	 * protocol, and the names by which the Java runtime knows them.
	 */
	private static final String[][] KNOWN_ALGORITHMS = new String[][] {
			{"SHA512", "SHA-512"},
			{"SHA384", "SHA-384"},
			{"SHA256", "SHA-256"},
			// should we deprecate this by now?
			{"SHA1", "SHA-1"},
	};

	// MUST be lowercase!
	private static final char[] HEXDIGITS = "0123456789abcdef".toCharArray();

	/** Connection parameters */
	private Target target;
	/** The TCP Socket to mserver */
	private Socket con;
	/** Stream from the Socket for reading */
	private BlockInputStream fromMonet;
	/** Stream from the Socket for writing */
	private OutputStream toMonet;
	/** MCLReader on the InputStream */
	private BufferedMCLReader reader;
	/** MCLWriter on the OutputStream */
	private BufferedMCLWriter writer;
	/** protocol version of the connection */
	private int version;
	private boolean supportsClientInfo;

	/** Whether we should follow redirects.
	 * Not sure why this needs to be separate
	 * from 'ttl' but someone someday explicitly documented setTtl
	 * with 'to disable completely, use followRedirects' so
	 * apparently there is a use case.
	 */
	private boolean followRedirects = true;
	/** How many redirections do we follow until we're fed up with it? */
	private int ttl = 10;

	/** The Writer for the debug log-file */
	private Writer log;

	/** The blocksize (hardcoded in compliance with MonetDB common/stream/stream.h) */
	public final static int BLOCK = 8190;

	/** A short in two bytes for holding the block size in bytes */
	private final byte[] blklen = new byte[2];

	/**
	 * Constructs a new MapiSocket.
	 */
	public MapiSocket() {
		target = new Target();
		con = null;
	}

	/**
	 * Sets the database to connect to.  If database is null, a
	 * connection is made to the default database of the server.  This
	 * is also the default.
	 *
	 * @param db the database
	 */
	public void setDatabase(final String db) {
		target.setDatabase(db);
	}

	/**
	 * Sets the language to use for this connection.
	 *
	 * @param lang the language
	 */
	public void setLanguage(final String lang) {
		target.setLanguage(lang);
	}

	/**
	 * Sets the hash method to use.  Note that this method is intended
	 * for debugging purposes.  Setting a hash method can yield in
	 * connection failures.  Multiple hash methods can be given by
	 * separating the hashes by commas.
	 * DON'T USE THIS METHOD if you don't know what you're doing.
	 *
	 * @param hash the hash method to use
	 */
	public void setHash(final String hash) {
		target.setHash(hash);
	}

	/**
	 * Sets whether MCL redirections should be followed or not.  If set
	 * to false, an MCLException will be thrown when a redirect is
	 * encountered during connect.  The default behavior is to
	 * automatically follow redirects.
	 *
	 * @param r whether to follow redirects (true) or not (false)
	 */
	public void setFollowRedirects(final boolean r) {
		this.followRedirects = r;
	}

	/**
	 * Sets the number of redirects that are followed when
	 * followRedirects is true.  In order to avoid going into an endless
	 * loop due to some evil server, or another error, a maximum number
	 * of redirects that may be followed can be set here.  Note that to
	 * disable the following of redirects you should use
	 * setFollowRedirects.
	 *
	 * @see #setFollowRedirects(boolean r)
	 * @param t the number of redirects before an exception is thrown
	 */
	public void setTTL(final int t) {
		this.ttl = t;
	}

	/**
	 * Set the SO_TIMEOUT on the underlying Socket.  When for some
	 * reason the connection to the database hangs, this setting can be
	 * useful to break out of this indefinite wait.
	 * This option must be enabled prior to entering the blocking
	 * operation to have effect.
	 *
	 * @param s The specified timeout, in milliseconds.  A timeout
	 *        of zero will disable timeout (i.e., timeout of infinity).
	 * @throws SocketException Issue with the socket
	 */
	public void setSoTimeout(final int s) throws SocketException {
		if (s < 0) {
			throw new IllegalArgumentException("timeout can't be negative");
		}
		target.setSoTimeout(s);
		// limit time to wait on blocking operations
		if (con != null) {
			con.setSoTimeout(s);
		}
	}

	/**
	 * Gets the SO_TIMEOUT from the underlying Socket.
	 *
	 * @return the currently in use timeout in milliseconds
	 * @throws SocketException Issue with the socket
	 */
	public int getSoTimeout() throws SocketException {
		return target.getSoTimeout();
	}

	/**
	 * Enables/disables debug mode with logging to file
	 *
	 * @param debug Value to set
	 */
	public void setDebug(final boolean debug) {
		target.setDebug(debug);
	}

	/**
	 * Connects to the given host and port, logging in as the given
	 * user.  If followRedirect is false, a RedirectionException is
	 * thrown when a redirect is encountered.
	 *
	 * @param host the hostname, or null for the loopback address
	 * @param port the port number (must be between 0 and 65535, inclusive)
	 * @param user the username
	 * @param pass the password
	 * @return A List with informational (warning) messages. If this
	 *		list is empty; then there are no warnings.
	 * @throws IOException if an I/O error occurs when creating the socket
	 * @throws SocketException - if there is an error in the underlying protocol, such as a TCP error.
	 * @throws UnknownHostException if the IP address of the host could not be determined
	 * @throws MCLParseException if bogus data is received
	 * @throws MCLException if an MCL related error occurs
	 */
	public List<String> connect(final String host, final int port, final String user, final String pass)
		throws IOException, SocketException, UnknownHostException, MCLParseException, MCLException
	{
		target.setHost(host);
		target.setPort(port);
		target.setUser(user);
		target.setPassword(pass);
		return connect(target, null);
	}

	public List<String> connect(String url, Properties props) throws URISyntaxException, ValidationError, MCLException, MCLParseException, IOException {
		return connect(new Target(url, props), null);
	}

	/**
	 * Connect according to the settings in the 'target' parameter.
	 * If followRedirect is false, a RedirectionException is
	 * thrown when a redirect is encountered.
	 *
	 * Some settings, such as the initial reply size, can already be configured
	 * during the handshake, saving a command round-trip later on.
	 * To do so, create and pass a subclass of {@link MapiSocket.OptionsCallback}.
	 *
	 * @param target the connection settings
	 * @param callback will be called if the server allows options to be set during the
	 * initial handshake
	 * @return A List with informational (warning) messages. If this
	 *		list is empty; then there are no warnings.
	 * @throws IOException if an I/O error occurs when creating the socket
	 * @throws SocketException - if there is an error in the underlying protocol, such as a TCP error.
	 * @throws UnknownHostException if the IP address of the host could not be determined
	 * @throws MCLParseException if bogus data is received
	 * @throws MCLException if an MCL related error occurs
	 */
	public List<String> connect(Target target, OptionsCallback callback) throws MCLException, MCLParseException, IOException {
		// get rid of any earlier connection state, including the existing target
		close();
		this.target = target;

		Target.Validated validated;
		try {
			validated = target.validate();
		} catch (ValidationError e) {
			throw new MCLException(e.getMessage());
		}

		if (validated.connectScan()) {
			return scanUnixSockets(callback);
		}

		ArrayList<String> warnings = new ArrayList<>();
		int attempts = 0;
		do {
			boolean ok = false;
			try {
				boolean done = tryConnect(callback, warnings);
				ok = true;
				if (done) {
					return warnings;
				}
			} finally {
				if (!ok)
					close();
			}
		} while (followRedirects && attempts++ < this.ttl);
		throw new MCLException("max redirect count exceeded");
	}

	private List<String> scanUnixSockets(OptionsCallback callback) throws MCLException, MCLParseException, IOException {
		// Because we do not support Unix Domain sockets, we just go back to connect().
		// target.connectScan() will now return false;
		target.setHost("localhost");
		return connect(target, callback);
	}

	private boolean tryConnect(OptionsCallback callback, ArrayList<String> warningBuffer) throws MCLException, IOException {
		try {
			// We need a valid target
			Target.Validated validated = target.validate();
			// con will be non-null if the previous attempt ended in a redirect to mapi:monetdb://proxy
			if (con == null)
				connectSocket(validated);
			return handshake(validated, callback, warningBuffer);
		} catch (IOException | MCLException e) {
			close();
			throw e;
		} catch (ValidationError e) {
			close();
			throw new MCLException(e.getMessage());
		}
	}

	private void connectSocket(Target.Validated validated) throws MCLException, IOException {
		// This method performs steps 2-6 of the procedure outlined in the URL spec
		String tcpHost = validated.connectTcp();
		if (tcpHost.isEmpty()) {
			throw new MCLException("Unix domain sockets are not supported, only TCP");
		}
		int port = validated.connectPort();
		Socket sock = null;
		try {
			sock = new Socket(tcpHost, port);
			sock.setSoTimeout(validated.getSoTimeout());
			sock.setTcpNoDelay(true);
			sock.setKeepAlive(true);

			sock = wrapTLS(sock, validated);

			fromMonet = new BlockInputStream(sock.getInputStream());
			toMonet = new BlockOutputStream(sock.getOutputStream());
			reader = new BufferedMCLReader(fromMonet, StandardCharsets.UTF_8);
			writer = new BufferedMCLWriter(toMonet, StandardCharsets.UTF_8);
			writer.registerReader(reader);
			reader.advance();

			// Only assign to sock when everything went ok so far
			con = sock;
			sock = null;
		} catch (SSLException e) {
			throw new MCLException("SSL error: " + e.getMessage(), e);
		} catch (IOException e) {
			throw new MCLException("Could not connect to " + tcpHost + ":" + port + ": " + e.getMessage(), e);
		} finally {
			if (sock != null)
				try {
					sock.close();
				} catch (IOException e) {
					// ignore
				}
		}
	}

	private Socket wrapTLS(Socket sock, Target.Validated validated) throws IOException {
		if (validated.getTls())
			return SecureSocket.wrap(validated, sock);
		else {
			// Send an even number of NUL bytes to avoid a deadlock if
			// we're accidentally connecting to a TLS-protected server.
			// The cause of the deadlock is that we speak MAPI and we wait
			// for the server to send a MAPI challenge.
			// However, if the server is trying to set up TLS, it will be
			// waiting for us to send a TLS 'Client Hello' packet.
			// Hence, deadlock.
			// NUL NUL is a no-op in MAPI and will hopefully force an error
			// in the TLS server. This does not always work, some
			// TLS implementations abort on the first NUL, some need more NULs
			// than we are prepared to send here. 8 seems to be a good number.
			sock.getOutputStream().write(NUL_BYTES);
		}
		return sock;
	}

	private boolean handshake(Target.Validated validated, OptionsCallback callback, ArrayList<String> warnings) throws IOException, MCLException {
		String challenge = reader.getLine();
		reader.advance();
		if (reader.getLineType() != LineType.PROMPT)
			throw new MCLException("Garbage after server challenge: " + reader.getLine());
		String response = challengeResponse(validated, challenge, callback);
		writer.writeLine(response);
		reader.advance();

		// Process the response lines.
		String redirect = null;
		StringBuilder errors = new StringBuilder();
		while (reader.getLineType() != LineType.PROMPT) {
			switch (reader.getLineType()) {
				case REDIRECT:
					if (redirect == null)
						redirect = reader.getLine(1);
					break;
				case ERROR:
					if (errors.length() > 0)
						errors.append("\n");
					errors.append(reader.getLine(7));  // 7 not 1!
					break;
				case INFO:
					warnings.add(reader.getLine(1));
					break;
				default:
					// ignore??!!
					break;
			}
			reader.advance();
		}
		if (errors.length() > 0)
			throw new MCLException(errors.toString());

		if (redirect == null)
			return true;   // we're happy

		// process redirect
		try {
			MonetUrlParser.parse(target, redirect);
		} catch (URISyntaxException | ValidationError e) {
			throw new MCLException("While processing redirect " + redirect + ": " + e.getMessage(), e);
		}
		if (redirect.startsWith("mapi:merovingian://proxy")) {
			// The reader is stuck at LineType.PROMPT but actually the
			// next challenge is already there.
			reader.resetLineType();
			reader.advance();
		} else {
			close();
		}

		return false;   // we need another go
	}

	private String challengeResponse(Target.Validated validated, final String challengeLine, OptionsCallback callback) throws MCLException {
		// The challengeLine looks like this:
		//
		// 45IYyVyRnbgEnK92ad:merovingian:9:RIPEMD160,SHA512,SHA384,SHA256,SHA224,SHA1:LIT:SHA512:
		// WgHIibSyH:mserver:9:RIPEMD160,SHA512,SHA384,SHA256,SHA224,SHA1:LIT:SHA512:sql=6:BINARY=1:
		// 0         1       2 3                                          4   5      6     7

		String[] parts = challengeLine.split(":");
		if (parts.length < 3)
			throw new MCLException("Invalid challenge: expect at least 3 fields");
		String saltPart = parts[0];
		String serverTypePart = parts[1];
		String versionPart = parts[2];
		int version;
		if (versionPart.equals("9"))
			version = 9;
		else
			throw new MCLException("Protocol versions other than 9 are note supported: " + versionPart);
		if (parts.length < 6)
			throw new MCLException("Protocol version " + version + " requires at least 6 fields, found " + parts.length + ": " + challengeLine);
		String serverHashesPart = parts[3];
//		String endianPart = parts[4];
		String passwordHashPart = parts[5];
		String optionsPart = parts.length > 6 ? parts[6] : null;
//		String binaryPart = parts.length > 7 ? parts[7] : null;

		if (parts.length > 9)
			supportsClientInfo = true;

		String userResponse;
		String password = target.getPassword();
		if (serverTypePart.equals("merovingian") && !target.getLanguage().equals("control")) {
			userResponse = "merovingian";
			password = "merovingian";
		} else {
			userResponse = target.getUser();
		}
		String optionsResponse = handleOptions(callback, optionsPart);

		// Response looks like this:
		//
		// LIT:monetdb:{RIPEMD160}f2236256e5a9b20a5ecab4396e36c14f66c3e3c5:sql:demo
		// :FILETRANS:auto_commit=1,reply_size=1000,size_header=0,columnar_protocol=0,time_zone=3600:
		StringBuilder response = new StringBuilder(80);
		response.append("BIG:");
		response.append(userResponse).append(":");
		hashPassword(response, saltPart, password, passwordHashPart, validated.getHash(), serverHashesPart);
		response.append(":");
		response.append(validated.getLanguage()).append(":");
		response.append(validated.getDatabase()).append(":");
		response.append("FILETRANS:");
		response.append(optionsResponse).append(":");

		return response.toString();
	}

	private String hashPassword(StringBuilder responseBuffer, String salt, String password, String passwordAlgo, String configuredHashes, String serverSupportedAlgos) throws MCLException {
		// First determine which hash algorithms we can choose from for the challenge response.
		// This defaults to whatever the server offers but may be restricted by the user.
		Set<String> algoSet = new HashSet<>(Arrays.asList(serverSupportedAlgos.split(",")));
		if (!configuredHashes.isEmpty()) {
			String[] allowedList = configuredHashes.toUpperCase().split("[, ]");
			Set<String> allowedSet = new HashSet<>(Arrays.asList(allowedList));
			algoSet.retainAll(allowedSet);
			if (algoSet.isEmpty()) {
				throw new MCLException("None of the hash algorithms in <" + configuredHashes + "> are supported, server only supports <" + serverSupportedAlgos + ">");
			}
		}

		int maxHashDigits = 512 / 4;

		// We'll collect the result in the responseBuffer.
		// It will start with '{' HASHNAME '}' followed by hexdigits

		// This is where we accumulate what will eventually be hashed into the hexdigits above.
		// It consists of the hexadecimal pre-hash of the password,
		// followed by the salt from the server
		StringBuilder intermediate = new StringBuilder(maxHashDigits + salt.length());

		MessageDigest passwordDigest = pickBestAlgorithm(Collections.singleton(passwordAlgo), null);
		// Here's the password..
		hexhash(intermediate, passwordDigest, password);
		// .. and here's the salt
		intermediate.append(salt);

		responseBuffer.append('{');
		MessageDigest responseDigest = pickBestAlgorithm(algoSet, responseBuffer);
		// the call above has appended the HASHNAME, now add '}'
		responseBuffer.append('}');
		// pickBestAlgorithm has appended HASHNAME, buffer now contains '{' HASHNAME '}'
		hexhash(responseBuffer, responseDigest, intermediate.toString());
		// response buffer now contains '{' HASHNAME '}' HEX_DIGITS_OF_INTERMEDIATE_BUFFER

		return responseBuffer.toString();
	}

	/**
	 * Pick the most preferred digest algorithm and return a MessageDigest instance for that.
	 *
	 * @param algos          the MAPI names of permitted algorithms
	 * @param appendMapiName if not null, append MAPI name of chose algorithm to this buffer
	 * @return instance of the chosen digester
	 * @throws MCLException if none of the options is supported
	 */
	private MessageDigest pickBestAlgorithm(Set<String> algos, StringBuilder appendMapiName) throws MCLException {
		for (String[] choice : KNOWN_ALGORITHMS) {
			String mapiName = choice[0];
			String algoName = choice[1];
			MessageDigest digest;
			if (!algos.contains(mapiName))
				continue;
			try {
				digest = MessageDigest.getInstance(algoName);
			} catch (NoSuchAlgorithmException e) {
				continue;
			}
			// we found a match
			if (appendMapiName != null) {
				appendMapiName.append(mapiName);
			}
			return digest;
		}
		String algoNames = String.join(",", algos);
		throw new MCLException("No supported hash algorithm: " + algoNames);
	}

	/**
	 * Hash the text into the MessageDigest and append the hexadecimal form of the
	 * resulting digest to buffer.
	 *
	 * @param buffer where the hex digits are appended
	 * @param digest where the hex digits come from after the text has been digested
	 * @param text   text to digest
	 */
	private void hexhash(StringBuilder buffer, MessageDigest digest, String text) {
		byte[] bytes = text.getBytes(StandardCharsets.UTF_8);
		digest.update(bytes);
		byte[] output = digest.digest();
		for (byte b : output) {
			int hi = (b & 0xF0) >> 4;
			int lo = b & 0x0F;
			buffer.append(HEXDIGITS[hi]);
			buffer.append(HEXDIGITS[lo]);
		}
	}

	private String handleOptions(OptionsCallback callback, String optionsPart) throws MCLException {
		if (callback == null || optionsPart == null || optionsPart.isEmpty())
			return "";

		StringBuilder buffer = new StringBuilder();
		callback.setBuffer(buffer);
		for (String optlevel : optionsPart.split(",")) {
			int eqindex = optlevel.indexOf('=');
			if (eqindex < 0)
				throw new MCLException("Invalid options part in server challenge: " + optionsPart);
			String lang = optlevel.substring(0, eqindex);
			int level;
			try {
				level = Integer.parseInt(optlevel.substring(eqindex + 1));
			} catch (NumberFormatException e) {
				throw new MCLException("Invalid option level in server challenge: " + optlevel);
			}
			callback.addOptions(lang, level);
		}

		return buffer.toString();
	}

	/**
	 * Returns an InputStream that reads from this open connection on
	 * the MapiSocket.
	 *
	 * @return an input stream that reads from this open connection
	 */
	public InputStream getInputStream() {
		return fromMonet;
	}

	/**
	 * Returns an output stream for this MapiSocket.
	 *
	 * @return an output stream for writing bytes to this MapiSocket
	 */
	public OutputStream getOutputStream() {
		return toMonet;
	}

	/**
	 * Returns a Reader for this MapiSocket.  The Reader is a
	 * BufferedMCLReader which does protocol interpretation of the
	 * BlockInputStream produced by this MapiSocket.
	 *
	 * @return a BufferedMCLReader connected to this MapiSocket
	 */
	public BufferedMCLReader getReader() {
		return reader;
	}

	/**
	 * Returns a Writer for this MapiSocket.  The Writer is a
	 * BufferedMCLWriter which produces protocol compatible data blocks
	 * that the BlockOutputStream can properly translate into blocks.
	 *
	 * @return a BufferedMCLWriter connected to this MapiSocket
	 */
	public BufferedMCLWriter getWriter() {
		return writer;
	}

	/**
	 * Returns the mapi protocol version used by this socket.  The
	 * protocol version depends on the server being used.  Users of the
	 * MapiSocket should check this version to act appropriately.
	 *
	 * @return the mapi protocol version
	 */
	public int getProtocolVersion() {
		return version;
	}

	/**
	 * Enables logging to a file what is read and written from and to
	 * the server.  Logging can be enabled at any time.  However, it is
	 * encouraged to start debugging before actually connecting the
	 * socket.
	 *
	 * @param filename the name of the file to write to
	 * @throws IOException if the file could not be opened for writing
	 */
	public void debug(final String filename) throws IOException {
		debug(new FileWriter(filename));
	}

	/**
	 * Enables logging to a stream what is read and written from and to
	 * the server.  Logging can be enabled at any time.  However, it is
	 * encouraged to start debugging before actually connecting the
	 * socket.
	 *
	 * @param out to write the log to a print stream
	 * @throws IOException if the file could not be opened for writing
	 */
// disabled as it is not used by JDBC driver code
//	public void debug(PrintStream out) throws IOException {
//		debug(new PrintWriter(out));
//	}

	/**
	 * Enables logging to a stream what is read and written from and to
	 * the server.  Logging can be enabled at any time.  However, it is
	 * encouraged to start debugging before actually connecting the
	 * socket.
	 *
	 * @param out to write the log to
	 */
	public void debug(final Writer out) {
		log = out;
		setDebug(true);
	}

	/**
	 * Get the log Writer.
	 *
	 * @return the log writer
	 */
	public Writer getLogWriter() {
		return log;
	}

	/**
	 * Writes a logline tagged with a timestamp using the given type and message
	 * and optionally flushes afterwards.
	 *
	 * Used for debugging purposes only and represents a message data that is
	 * connected to reading (RD or RX) or writing (TD or TX) to the socket.
	 * R=Receive, T=Transmit, D=Data, X=??
	 *
	 * @param type  message type: either RD, RX, TD or TX
	 * @param message  the message to log
	 * @param flush  whether we need to flush buffered data to the logfile.
	 * @throws IOException if an IO error occurs while writing to the logfile
	 */
	private final void log(final String type, final String message, final boolean flush) throws IOException {
		log.write(type + System.currentTimeMillis() + ": " + message + "\n");
		if (flush)
			log.flush();
	}

	/**
	 * For internal use
	 *
	 * @param b to enable/disable insert 'fake' newline and prompt
	 * @return previous setting
	 */
	public boolean setInsertFakePrompts(boolean b) {
		return fromMonet.setInsertFakePrompts(b);
	}

	public boolean isDebug() {
		return target.isDebug();
	}

	public boolean canClientInfo() {
		return supportsClientInfo;
	}


	/**
	 * Inner class that is used to write data on a normal stream as a
	 * blocked stream.  A call to the flush() method will write a
	 * "final" block to the underlying stream.  Non-final blocks are
	 * written as soon as one or more bytes would not fit in the
	 * current block any more.  This allows to write to a block to it's
	 * full size, and then flush it explicitly to have a final block
	 * being written to the stream.
	 */
	final class BlockOutputStream extends FilterOutputStream {
		private int writePos = 0;
		private int blocksize = 0;
		private final byte[] block = new byte[BLOCK];

		/**
		 * Constructs this BlockOutputStream, backed by the given
		 * OutputStream.  A BufferedOutputStream is internally used.
		 * @param out an OutputStream
		 */
		public BlockOutputStream(final OutputStream out) {
			// always use a buffered stream, even though we know how
			// much bytes to write/read, since this is just faster for
			// some reason
			super(new BufferedOutputStream(out));
		}

		@Override
		public void flush() throws IOException {
			// write the block (as final) then flush.
			writeBlock(true);
			out.flush();

			// it's a bit nasty if an exception is thrown from the log,
			// but ignoring it can be nasty as well, so it is decided to
			// let it go so there is feedback about something going wrong
			// it's a bit nasty if an exception is thrown from the log,
			// but ignoring it can be nasty as well, so it is decided to
			// let it go so there is feedback about something going wrong
			if (isDebug()) {
				log.flush();
			}
		}

		/**
		 * writeBlock puts the data in the block on the stream.  The
		 * boolean last controls whether the block is sent with an
		 * indicator to note it is the last block of a sequence or not.
		 *
		 * @param last whether this is the last block
		 * @throws IOException if writing to the stream failed
		 */
		public void writeBlock(final boolean last) throws IOException {
			if (last) {
				// always fits, because of BLOCK's size
				blocksize = (short)writePos;
				// this is the last block, so encode least
				// significant bit in the first byte (little-endian)
				blklen[0] = (byte)(blocksize << 1 & 0xFF | 1);
				blklen[1] = (byte)(blocksize >> 7);
			} else {
				// always fits, because of BLOCK's size
				blocksize = (short)BLOCK;
				// another block will follow, encode least
				// significant bit in the first byte (little-endian)
				blklen[0] = (byte)(blocksize << 1 & 0xFF);
				blklen[1] = (byte)(blocksize >> 7);
			}

			out.write(blklen);
			// write the actual block
			out.write(block, 0, writePos);

			if (isDebug()) {
				if (last) {
					log("TD ", "write final block: " + writePos + " bytes", false);
				} else {
					log("TD ", "write block: " + writePos + " bytes", false);
				}
				log("TX ", new String(block, 0, writePos, StandardCharsets.UTF_8), true);
			}

			writePos = 0;
		}

		@Override
		public void write(final int b) throws IOException {
			if (writePos == BLOCK) {
				writeBlock(false);
			}
			block[writePos++] = (byte)b;
		}

		@Override
		public void write(final byte[] b) throws IOException {
			write(b, 0, b.length);
		}

		@Override
		public void write(final byte[] b, int off, int len) throws IOException {
			while (len > 0) {
				int t = BLOCK - writePos;
				if (len > t) {
					System.arraycopy(b, off, block, writePos, t);
					off += t;
					len -= t;
					writePos += t;
					writeBlock(false);
				} else {
					System.arraycopy(b, off, block, writePos, len);
					writePos += len;
					break;
				}
			}
		}

		@Override
		public void close() throws IOException {
			// we don't want the flush() method to be called (default of
			// the FilterOutputStream), so we close manually here
			out.close();
		}
	}


	/**
	 * Inner class that is used to make the data on the blocked stream
	 * available as a normal stream.
	 */
	final class BlockInputStream extends FilterInputStream {
		private int readPos = 0;
		private int blockLen = 0;
		private boolean wasEndBlock = false;
		private final byte[] block = new byte[BLOCK + 3]; // \n.\n
		private boolean insertFakePrompts = true;

		/**
		 * Constructs this BlockInputStream, backed by the given
		 * InputStream.  A BufferedInputStream is internally used.
		 * @param in an InputStream
		 */
		public BlockInputStream(final InputStream in) {
			// always use a buffered stream, even though we know how
			// much bytes to write/read, since this is just faster for
			// some reason
			super(new BufferedInputStream(in));
		}

		public boolean setInsertFakePrompts(boolean doFake) {
			boolean old = insertFakePrompts;
			insertFakePrompts = doFake;
			return old;
		}

		@Override
		public int available() {
			return blockLen - readPos;
		}

		@Override
		public boolean markSupported() {
			return false;
		}

		@Override
		public void mark(final int readlimit) {
			throw new AssertionError("Not implemented!");
		}

		@Override
		public void reset() {
			throw new AssertionError("Not implemented!");
		}

		/**
		 * Small wrapper to get a blocking variant of the read() method
		 * on the BufferedInputStream.  We want to benefit from the
		 * Buffered pre-fetching, but not dealing with half blocks.
		 * Changing this class to be able to use the partially received
		 * data will greatly complicate matters, while a performance
		 * improvement is debatable given the relatively small size of
		 * our blocks.  Maybe it does speed up on slower links, then
		 * consider this method a quick bug fix/workaround.
		 *
		 * @param b a byte array to store read bytes
		 * @param len number of bytes to read
		 * @return false if reading the block failed due to EOF
		 * @throws IOException if an IO error occurs while reading
		 */
		private boolean _read(final byte[] b, int len) throws IOException {
			int s;
			int off = 0;
			while (len > 0) {
				s = in.read(b, off, len);
				if (s == -1) {
					// if we have read something before, we should have been
					// able to read the whole, so make this fatal
					if (off > 0) {
						if (isDebug()) {
							log("RD ", "the following incomplete block was received:", false);
							log("RX ", new String(b, 0, off, StandardCharsets.UTF_8), true);
						}
						throw new IOException("Read from " +
								con.getInetAddress().getHostName() + ":" +
								con.getPort() + ": Incomplete block read from stream");
					}
					if (isDebug())
						log("RD ", "server closed the connection (EOF)", true);
					return false;
				}
				len -= s;
				off += s;
			}

			return true;
		}

		/**
		 * Reads the next block on the stream into the internal buffer,
		 * or writes the prompt in the buffer.
		 *
		 * The blocked stream protocol consists of first a two byte
		 * integer indicating the length of the block, then the
		 * block, followed by another length + block.  The end of
		 * such sequence is put in the last bit of the length, and
		 * hence this length should be shifted to the right to
		 * obtain the real length value first.  We simply fetch
		 * blocks here as soon as they are needed for the stream's
		 * read methods.
		 *
		 * The user-flush, which is an implicit effect of the end of
		 * a block sequence, is communicated beyond the stream by
		 * inserting a prompt sequence on the stream after the last
		 * block.  This method makes sure that a final block ends with a
		 * newline, if it doesn't already, in order to facilitate a
		 * Reader that is possibly chained to this InputStream.
		 *
		 * If the stream is not positioned correctly, hell will break
		 * loose.
		 *
		 * @return blockLen
		 * @throws IOException if an IO error occurs while reading
		 */
		private int readBlock() throws IOException {
			// read next two bytes (short)
			if (!_read(blklen, 2))
				return(-1);

			// Get the short-value and store its value in blockLen.
			blockLen = (short)(
					(blklen[0] & 0xFF) >> 1 |
					(blklen[1] & 0xFF) << 7
					);
			wasEndBlock = (blklen[0] & 0x1) == 1;

			readPos = 0;

			if (isDebug()) {
				if (wasEndBlock) {
					log("RD ", "read final block: " + blockLen + " bytes", false);
				} else {
					log("RD ", "read new block: " + blockLen + " bytes", false);
				}
			}

			// sanity check to avoid bad servers make us do an ugly
			// stack trace
			if (blockLen > block.length)
				throw new IOException("Server sent a block larger than BLOCKsize: " +
						blockLen + " > " + block.length);
			if (!_read(block, blockLen))
				return -1;

			if (isDebug())
				log("RX ", new String(block, 0, blockLen, StandardCharsets.UTF_8), true);

			// if this is the last block, make it end with a newline and prompt
			if (wasEndBlock) {
				// insert 'fake' newline and prompt
				if (insertFakePrompts) {
					if (blockLen > 0 && block[blockLen - 1] != '\n') {
						// to terminate the block in a Reader
						block[blockLen++] = '\n';
					}
					for (byte b : LineType.PROMPT.bytes()) {
						block[blockLen++] = b;
					}
					block[blockLen++] = '\n';
					if (isDebug()) {
						log("RD ", "inserting prompt", true);
					}
				}
			}

			return blockLen;
		}

		@Override
		public int read() throws IOException {
			if (available() == 0) {
				if (readBlock() == -1)
					return -1;
			}

			if (isDebug())
				log("RX ", new String(block, readPos, 1, StandardCharsets.UTF_8), true);

			return block[readPos++] & 0xFF;
		}

		@Override
		public int read(final byte[] b) throws IOException {
			return read(b, 0, b.length);
		}

		@Override
		public int read(final byte[] b, int off, int len) throws IOException {
			int t;
			int size = 0;
			while (size < len) {
				t = available();
				if (t == 0) {
					if (size != 0)
						break;
					if (readBlock() == -1) {
						if (size == 0)
							size = -1;
						break;
					}
					t = available();
				}
				if (len > t) {
					System.arraycopy(block, readPos, b, off, t);
					off += t;
					len -= t;
					readPos += t;
					size += t;
				} else {
					System.arraycopy(block, readPos, b, off, len);
					readPos += len;
					size += len;
					break;
				}
			}
			return size;
		}

		@Override
		public long skip(final long n) throws IOException {
			long skip = n;
			while (skip > 0) {
				int t = available();
				if (skip > t) {
					skip -= t;
					readPos += t;
					readBlock();
				} else {
					readPos += (int)skip;
					break;
				}
			}
			return n;
		}

		/**
		 * For internal use
		 * @return new Raw object
		 */
		Raw getRaw() {
			return new Raw();
		}

		/** An alternative I/O interface that exposes the block based nature of the MAPI protocol */
		final class Raw {
			byte[] getBytes() {
				return block;
			}

			int getLength() {
				return blockLen;
			}

			int getPosition() {
				return readPos;
			}

			int consume(int delta) {
				int pos = readPos;
				readPos += delta;
				return pos;
			}

			int readBlock() throws IOException {
				boolean wasFaking = setInsertFakePrompts(false);
				try {
					return BlockInputStream.this.readBlock();
				} finally {
					setInsertFakePrompts(wasFaking);
				}
			}

			boolean wasEndBlock() {
				return wasEndBlock;
			}
		}
	}

	/**
	 * Closes the streams and socket connected to the server if possible.
	 * If an error occurs at closing a resource, it is ignored so as many
	 * resources as possible are closed.
	 */
	public synchronized void close() {
		if (writer != null) {
			try {
				writer.close();
				writer = null;
			} catch (IOException e) { /* ignore it */ }
		}
		if (reader != null) {
			try {
				reader.close();
				reader = null;
			} catch (IOException e) { /* ignore it */ }
		}
		if (toMonet != null) {
			try {
				toMonet.close();
				toMonet = null;
			} catch (IOException e) { /* ignore it */ }
		}
		if (fromMonet != null) {
			try {
				fromMonet.close();
				fromMonet = null;
			} catch (IOException e) { /* ignore it */ }
		}
		if (con != null) {
			try {
				con.close();	// close the socket
				con = null;
			} catch (IOException e) { /* ignore it */ }
		}
		if (isDebug() && log != null && log instanceof FileWriter) {
			try {
				log.close();
				log = null;
			} catch (IOException e) { /* ignore it */ }
		}
	}

	/**
	 * Return an UploadStream for use with for example COPY FROM filename ON CLIENT.
	 *
	 * Building block for {@link org.monetdb.jdbc.MonetConnection.UploadHandler}.
	 *
	 * @param chunkSize chunk size for the upload stream
	 * @return UploadStream new upload stream with the given chunk size
	 */
	public UploadStream uploadStream(int chunkSize) {
		return new UploadStream(chunkSize);
	}

	/**
	 * Return an UploadStream for use with for example COPY FROM filename ON CLIENT.
	 *
	 * Building block for {@link org.monetdb.jdbc.MonetConnection.UploadHandler}.
	 *
	 * @return UploadStream new upload stream
	 */
	public UploadStream uploadStream() {
		return new UploadStream();
	}

	/**
	 * Return a DownloadStream for use with for example COPY INTO filename ON CLIENT
	 *
	 * Building block for {@link org.monetdb.jdbc.MonetConnection.DownloadHandler}.
	 *
	 * @param prependCr convert \n to \r\n
	 * @return DownloadStream new download stream
	 */
	public DownloadStream downloadStream(boolean prependCr) {
		return new DownloadStream(fromMonet.getRaw(), toMonet, prependCr);
	}


	/**
	 * Stream of data sent to the server
	 *
	 * Building block for {@link org.monetdb.jdbc.MonetConnection.UploadHandler}.
	 *
	 * An UploadStream has a chunk size. Every chunk size bytes, the server gets
	 * the opportunity to abort the upload.
	 */
	public class UploadStream extends FilterOutputStream {
		public final static int DEFAULT_CHUNK_SIZE = 1024 * 1024;
		private final int chunkSize;
		private boolean closed = false;
		private boolean serverCancelled = false;
		private int chunkLeft;
		private byte[] promptBuffer;
		private Runnable cancellationCallback = null;

		/**
		 * Create an UploadStream with the given chunk size
		 * @param chunkSize chunk size for the upload stream
		 */
		UploadStream(final int chunkSize) {
			super(toMonet);
			if (chunkSize <= 0) {
				throw new IllegalArgumentException("chunk size must be positive");
			}
			this.chunkSize = chunkSize;
			assert LineType.MORE.bytes().length == LineType.FILETRANSFER.bytes().length;
			int promptLen = LineType.MORE.bytes().length;
			promptBuffer = new byte[promptLen + 1];
			chunkLeft = this.chunkSize;
		}

		/** Create an UploadStream with the default chunk size */
		UploadStream() {
			this(DEFAULT_CHUNK_SIZE);
		}

		/** Set a callback to be invoked if the server cancels the upload
		 *
		 * @param cancellationCallback callback to call
		 */
		public void setCancellationCallback(final Runnable cancellationCallback) {
			this.cancellationCallback = cancellationCallback;
		}

		@Override
		public void write(final int b) throws IOException {
			if (serverCancelled) {
				// We have already thrown an exception and apparently that has been ignored.
				// Probably because they're calling print methods instead of write.
				// Throw another one, maybe they'll catch this one.
				throw new IOException("Server aborted the upload");
			}
			handleChunking();
			out.write(b);
			wrote(1);
		}

		@Override
		public void write(final byte[] b) throws IOException {
			this.write(b, 0, b.length);
		}

		@Override
		public void write(final byte[] b, int off, int len) throws IOException {
			if (serverCancelled) {
				// We have already thrown an exception and apparently that has been ignored.
				// Probably because they're calling print methods instead of write.
				// Throw another one, maybe they'll catch this one.
				throw new IOException("Server aborted the upload");
			}
			while (len > 0) {
				handleChunking();
				int toWrite = Integer.min(len, chunkLeft);
				out.write(b, off, toWrite);
				off += toWrite;
				len -= toWrite;
				wrote(toWrite);
			}
		}

		@Override
		public void flush() throws IOException {
			// suppress flushes
		}

		@Override
		public void close() throws IOException {
			if (closed) {
				return;
			}
			closed = true;

			if (serverCancelled)
				closeAfterServerCancelled();
			else
				closeAfterSuccesfulUpload();
		}

		private void closeAfterSuccesfulUpload() throws IOException {
			if (chunkLeft != chunkSize) {
				// flush pending data
				flushAndReadPrompt();
			}
			// send empty block
			out.flush();
			final LineType acknowledgement = readPrompt();
			if (acknowledgement != LineType.FILETRANSFER) {
				throw new IOException("Expected server to acknowledge end of file");
			}
		}

		private void closeAfterServerCancelled() {
			// nothing to do here, we have already read the error prompt.
		}

		private void wrote(final int i) {
			chunkLeft -= i;
		}

		private void handleChunking() throws IOException {
			if (chunkLeft > 0) {
				return;
			}
			flushAndReadPrompt();
		}

		private void flushAndReadPrompt() throws IOException {
			out.flush();
			chunkLeft = chunkSize;
			final LineType lineType = readPrompt();
			switch (lineType) {
				case MORE:
					return;
				case FILETRANSFER:
					// Note, if the caller is calling print methods instead of write, the IO exception gets hidden.
					// This is unfortunate but there's nothing we can do about it.
					serverCancelled = true;
					if (cancellationCallback != null) {
						cancellationCallback.run();
					}
					throw new IOException("Server aborted the upload");
				default:
					throw new IOException("Expected MORE/DONE from server, got " + lineType);
			}
		}

		private LineType readPrompt() throws IOException {
			final int nread = fromMonet.read(promptBuffer);
			if (nread != promptBuffer.length || promptBuffer[promptBuffer.length - 1] != '\n') {
				throw new IOException("server return incomplete prompt");
			}
			return LineType.classify(promptBuffer);
		}
	}


	/**
	 * Stream of data received from the server
	 *
	 * Building block for {@link org.monetdb.jdbc.MonetConnection.DownloadHandler}.
	 */
	public static class DownloadStream extends InputStream {
		private final BlockInputStream.Raw rawIn;
		private final OutputStream out;
		private final boolean prependCr;
		private boolean endBlockSeen = false;
		private boolean closed = false;
		private boolean newlinePending = false; // used for crlf conversion

		DownloadStream(BlockInputStream.Raw rawIn, OutputStream out, boolean prependCr) {
			this.rawIn = rawIn;
			this.out = out;
			this.prependCr = prependCr;
		}

		void nextBlock() throws IOException {
			if (endBlockSeen || closed)
				return;
			final int ret = rawIn.readBlock();
			if (ret < 0 || rawIn.wasEndBlock()) {
				endBlockSeen = true;
			}
		}

		@Override
		public void close() throws IOException {
			if (closed)
				return;
			closed = true;
			while (!endBlockSeen) {
				nextBlock();
			}
			// Send acknowledgement to server
			out.write('\n');
			out.flush();
			// Do whatever super has to do
			super.close();
		}

		@Override
		public int read() throws IOException {
			final byte[] buf = { 0 };
			final int nread = read(buf, 0, 1);
			if (nread == 1)
				return buf[0] & 0xFF;
			else
				return -1;
		}

		@Override
		public int read(final byte[] dest, int off, int len) throws IOException {
			final int origOff = off;
			int end = off + len;

			while (off < end) {
				// minimum of what's requested and what we have in stock
				int chunk = Integer.min(end - off, rawIn.getLength() - rawIn.getPosition());
				assert chunk >= 0;
				if (chunk == 0) {
					// make progress by fetching more data
					if (endBlockSeen)
						break;
					nextBlock();
					continue;
				}
				// make progress copying some bytes
				if (!prependCr) {
					// no conversion needed, use arraycopy
					System.arraycopy(rawIn.getBytes(), rawIn.getPosition(), dest, off, chunk);
					off += chunk;
					rawIn.consume(chunk);
				} else {
					int chunkEnd = off + chunk;
					if (newlinePending && off < chunkEnd) {
						// we were in the middle of a line ending conversion
						dest[off++] = '\n';
						newlinePending = false;
					}
					while (off < chunkEnd) {
						byte b = rawIn.getBytes()[rawIn.consume(1)];
						if (b != '\n') {
							dest[off++] = b;
						} else if (chunkEnd - off >= 2) {
							dest[off++] = '\r';
							dest[off++] = '\n';
						} else {
							dest[off++] = '\r';
							newlinePending = true;
							break;
						}
					}
				}
			}

			if (off < end && newlinePending) {
				dest[off++] = '\n';
				newlinePending = false;
			}

			if (off == origOff && endBlockSeen)
				return -1;
			else
				return off - origOff;
		}
	}

	/**
	 * Callback used during the initial MAPI handshake.
	 *
	 * Newer MonetDB versions allow setting some options during the handshake.
	 * The options are language-specific and each has a 'level'. The server
	 * advertises up to which level options are supported for a given language.
	 * For each language/option combination, {@link #addOptions} will be invoked
	 * during the handshake. This method should call {@link #contribute} for each
	 * option it wants to set.
	 *
	 * At the time of writing, only the 'sql' language supports options,
	 * they are listed in enum mapi_handshake_options_levels in mapi.h.
	 */
	public static abstract class OptionsCallback {
		private StringBuilder buffer;

		/**
		 * Callback called for each language/level combination supported by the
		 * server. May call {@link #contribute} for options with a level STRICTLY
		 * LOWER than the level passed as a parameter.
		 * @param lang language advertised by the server
		 * @param level one higher than the maximum supported option
		 */
		public abstract void addOptions(String lang, int level);

		/**
		 * Pass option=value during the handshake
		 * @param field name
		 * @param value int value
		 */
		protected void contribute(String field, int value) {
			if (buffer.length() > 0)
				buffer.append(',');
			buffer.append(field);
			buffer.append('=');
			buffer.append(value);
		}

		/**
		 * Set the buffer
		 * @param buf a non null StringBuilder object
		 */
		void setBuffer(StringBuilder buf) {
			buffer = buf;
		}
	}
}