view src/main/java/nl/cwi/monetdb/mcl/connection/mapi/MapiConnection.java @ 277:4face9f42efc embedded

Merge with default.
author Pedro Ferreira <pedro.ferreira@monetdbsolutions.com>
date Thu, 18 Jul 2019 11:22:55 +0200 (2019-07-18)
parents 5b13ccaba741
children 68401c1f10fa
line wrap: on
line source
/*
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0.  If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 *
 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
 */

package nl.cwi.monetdb.mcl.connection.mapi;

import nl.cwi.monetdb.jdbc.MonetConnection;
import nl.cwi.monetdb.jdbc.MonetStatement;
import nl.cwi.monetdb.mcl.connection.ControlCommands;
import nl.cwi.monetdb.mcl.connection.MCLException;
import nl.cwi.monetdb.mcl.connection.helpers.ChannelSecurity;
import nl.cwi.monetdb.mcl.protocol.ProtocolException;
import nl.cwi.monetdb.mcl.protocol.ServerResponses;
import nl.cwi.monetdb.mcl.protocol.oldmapi.OldMapiProtocol;

import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteOrder;
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLNonTransientConnectionException;
import java.util.*;

/**
 * A {@link Connection} suitable for the MonetDB database using a MAPI connection.
 *
 * @author Fabian Groffen, Martin van Dinther, Pedro Ferreira
 */
public class MapiConnection extends MonetConnection {

	/** the PROMPT ASCII char sent by the server */
	static final char PROMPT_CHAR = '.';
	/** the default number of rows that are (attempted to) read at once */
	private static final int DEF_FETCHSIZE = 250;

	/** The hostname to connect to */
	private final String hostname;
	/** The port to connect on the host to */
	private final int port;
	/** The database to connect to */
	private String database;
	/** The TCP Socket timeout in milliseconds. Default is 0 meaning the timeout is disabled (i.e., timeout of infinity) */
	private int soTimeout = 0;
	/** Whether we should follow redirects */
	private boolean followRedirects = true;
	/** How many redirections do we follow until we're fed up with it? */
	private int ttl = 10;
	/** Protocol version of the connection */
	private int version;
	/** Endianness of the server */
	private ByteOrder serverEndianness;

	public MapiConnection(Properties props, String hash, String language, boolean blobIsBinary, boolean clobIsLongChar,
						  String hostname, int port, String database) {
		super(props, hash, MapiLanguage.getLanguageFromString(language), blobIsBinary, clobIsLongChar);
		this.hostname = hostname;
		this.port = port;
		this.database = database;
	}

	/**
	 * Gets the hostname of the server used on this connection.
	 *
	 * @return The hostname of the server used on this connection
	 */
	public String getHostname() {
		return hostname;
	}

	/**
	 * Gets the port of the server used on this connection.
	 *
	 * @return The port of the server used on this connection
	 */
	public int getPort() {
		return port;
	}

	/**
	 * Gets the database to connect to. If database is null, a connection is made to the default database of the server.
	 * This is also the default.
	 *
	 * @return The database name
	 */
	public String getDatabase() {
		return database;
	}

	/**
	 * Gets the SO_TIMEOUT from the underlying Socket.
	 *
	 * @return The currently in use timeout in milliseconds
	 */
	@Override
	public int getSoTimeout() throws SocketException {
		if(protocol != null) {
			this.soTimeout = ((OldMapiProtocol)protocol).getSocket().getSoTimeout();
		}
		return this.soTimeout;
	}

	/**
	 * Set the SO_TIMEOUT on the underlying Socket. When for some reason the connection to the database hangs, this
	 * setting can be useful to break out of this indefinite wait. This option must be enabled prior to entering the
	 * blocking operation to have effect.
	 *
	 * @param timeout The specified timeout, in milliseconds. A timeout of zero is interpreted as an infinite timeout
	 */
	@Override
	public void setSoTimeout(int timeout) throws SocketException {
		if (timeout < 0) {
			throw new IllegalArgumentException("Timeout can't be negative");
		}
		if(protocol != null) {
			((OldMapiProtocol)protocol).getSocket().setSoTimeout(timeout);
		}
		this.soTimeout = timeout;
	}

	/**
	 * Gets whether MCL redirections should be followed or not. If set to false, an MCLException will be thrown when a
	 * redirect is encountered during connect. The default behaviour is to automatically follow redirects.
	 *
	 * @return Whether to follow redirects (true) or not (false)
	 */
	public boolean isFollowRedirects() {
		return followRedirects;
	}

	/**
	 * Gets the number of redirects that are followed when followRedirects is true. In order to avoid going into an
	 * endless loop due to some evil server, or another error, a maximum number of redirects that may be followed can be
	 * set here. Note that to disable the following of redirects you should use setFollowRedirects.
	 *
	 * @see #isFollowRedirects()
	 * @return The number of redirects before an exception is thrown
	 */
	public int getTtl() {
		return ttl;
	}

	/**
	 * Gets the mapi protocol version used by this socket. The protocol version depends on the server being used.
	 *
	 * @return The mapi protocol version used by this socket
	 */
	public int getVersion() {
		return version;
	}

	/**
	 * Gets the connection server endianness.
	 *
	 * @return The connection server endianness
	 */
	public ByteOrder getServerEndianness() {
		return serverEndianness;
	}

	/**
	 * On a MAPI connection, the block size will be the block size of the connection.
	 *
	 * @return The block size length
	 */
	@Override
	public int getBlockSize() {
		return ((OldMapiProtocol)protocol).getSocket().getBlockSize();
	}

	/**
	 * On a MAPI connection the default fetch size per DataBlock is 250 rows.
	 *
	 * @return The default fetch size
	 */
	@Override
	public int getDefFetchsize() {
		return DEF_FETCHSIZE;
	}

	@Override
	public int initialStringBuilderSize() {
		return this.getBlockSize();
	}

	/**
	 * Closes the underlying connection implementation. On a MAPI connection, the underlying socket is closed.
	 *
	 * @throws IOException if an I/O error occurs while closing the connection
	 */
	@Override
	public synchronized void closeUnderlyingConnection() throws IOException {
		((OldMapiProtocol)protocol).getSocket().close();
	}

	/**
	 * Gets the underlying connection JDBC String URL.
	 *
	 * @return The underlying connection JDBC String URL
	 */
	@Override
	public String getJDBCURL() {
		String res = "jdbc:monetdb://" + this.hostname + ":" + this.port + "/" + this.database;
		if (this.getLanguage() == MapiLanguage.LANG_MAL)
			res += "?language=mal";
		return res;
	}

	/**
	 * Sends a control command to the server. On a MAPI connection, regular MonetDB commands are sent to the server.
	 *
	 * @param commandID the command identifier according to {@link ControlCommands} listing
	 * @param data The integer to send according to the control command
	 * @throws SQLException if an IO exception or a database error occurs
	 */
	@Override
	public void sendControlCommand(int commandID, int data) throws SQLException {
		String command = null;
		switch (commandID) {
			case ControlCommands.AUTO_COMMIT:
				command = "auto_commit " + ((data == 1) ? "1" : "0");
				break;
			case ControlCommands.REPLY_SIZE:
				command = "reply_size " + data;
				break;
			case ControlCommands.RELEASE:
				command = "release " + data;
				break;
			case ControlCommands.CLOSE:
				command = "close " + data;
		}
		try {
			protocol.writeNextQuery(language.getCommandTemplateIndex(0), command,
					language.getCommandTemplateIndex(1));
			protocol.waitUntilPrompt();
			int csrh = protocol.getCurrentServerResponse();
			if (csrh == ServerResponses.ERROR) {
				String error = protocol.getRemainingStringLine(0);
				throw new SQLException(error.substring(6), error.substring(0, 5));
			}
		} catch (SocketTimeoutException e) {
			close(); // JDBC 4.1 semantics, abort()
			throw new SQLNonTransientConnectionException("connection timed out", "08M33");
		} catch (IOException e) {
			throw new SQLNonTransientConnectionException(e.getMessage(), "08000");
		}
	}

	/**
	 * Execute a batch query in a MAPI connection.
	 *
	 * @param statement The original MonetStatement where the batch comes from
	 * @param batch The list of queries to execute
	 * @param counts The return of the update statement of each input query
	 * @param e An exception to be thrown if an error occurs
	 * @return If all queries in the batch executed successfully or not
	 * @throws SQLException if an IO exception or a database error occurs
	 */
	@Override
	protected boolean executeNextQueryBatch(MonetStatement statement, List<String> batch, int[] counts,
											BatchUpdateException e) throws SQLException {
		int offset = 0;
		boolean first = true, error = false;
		int builderSize = this.initialStringBuilderSize();
		StringBuilder tmpBatch = new StringBuilder(builderSize);
		String sep = this.getLanguage().getQueryTemplateIndex(2);
		for (int i = 0; i < batch.size(); i++) {
			String tmp = batch.get(i);
			if (sep.length() + tmp.length() > builderSize) {
				// The thing is too big. Way too big. Since it won't be optimal anyway, just add it to whatever we
				// have and continue.
				if (!first) {
					tmpBatch.append(sep);
				}
				tmpBatch.append(tmp);
				// send and receive
				error |= statement.internalBatch(tmpBatch.toString(), counts, offset, i + 1, e);
				offset = i;
				tmpBatch.delete(0, tmpBatch.length());
				first = true;
				continue;
			}
			if (tmpBatch.length() + sep.length() + tmp.length() >= builderSize) {
				// send and receive
				error |= statement.internalBatch(tmpBatch.toString(), counts, offset, i + 1, e);
				offset = i;
				tmpBatch.delete(0, tmpBatch.length());
				first = true;
			}
			if (!first) tmpBatch.append(sep);
			first = false;
			tmpBatch.append(tmp);
		}
		// send and receive
		error |= statement.internalBatch(tmpBatch.toString(), counts, offset, counts.length, e);
		return error;
	}

	/**
	 * Connects to the given host and port, logging in as the given user. If followRedirect is false, a
	 * RedirectionException is thrown when a redirect is encountered.
	 *
	 * @param user The user name to authenticate
	 * @param pass The user's password
	 * @return A List with informational (warning) messages. If this list is empty; then there are no warnings.
	 * @throws IOException if an I/O error occurs when creating the socket
	 * @throws ProtocolException if bogus data is received
	 * @throws MCLException if an MCL related error occurs
	 */
	@Override
	public List<String> connect(String user, String pass) throws IOException, ProtocolException, MCLException {
		// Wrap around the internal connect that needs to know if it should really make a TCP connection or not.
		List<String> res = connect(this.hostname, this.port, user, pass, true);
		// apply NetworkTimeout value from legacy (pre 4.1) driver so_timeout calls
		this.setSoTimeout(this.getSoTimeout());
		return res;
	}

	public List<String> connect(String host, int port, String user, String pass, boolean makeConnection)
			throws IOException, ProtocolException, MCLException {
		if (ttl-- <= 0)
			throw new MCLException("Maximum number of redirects reached, aborting connection attempt. Sorry.");

		if (makeConnection) {
			this.protocol = new OldMapiProtocol(new OldMapiSocket(this.hostname, this.port, this));
			//set nodelay, as it greatly speeds up small messages (like we often do)
			((OldMapiProtocol)this.protocol).getSocket().setTcpNoDelay(true);
			((OldMapiProtocol)this.protocol).getSocket().setSoTimeout(this.soTimeout);
		}

		this.protocol.fetchNextResponseData();
		String nextLine = this.protocol.getRemainingStringLine(0);
		this.protocol.waitUntilPrompt();
		String test = this.getChallengeResponse(nextLine, user, pass, this.language.getRepresentation(),
				this.database, this.hash);
		this.protocol.writeNextQuery("", test, "");

		List<String> redirects = new ArrayList<>();
		List<String> warns = new ArrayList<>();
		String err = "";
		int next;

		do {
			this.protocol.fetchNextResponseData();
			next = this.protocol.getCurrentServerResponse();
			switch (next) {
				case ServerResponses.ERROR:
					err += "\n" + this.protocol.getRemainingStringLine(7);
					break;
				case ServerResponses.INFO:
					warns.add(this.protocol.getRemainingStringLine(1));
					break;
				case ServerResponses.REDIRECT:
					redirects.add(this.protocol.getRemainingStringLine(1));
			}
		} while (next != ServerResponses.PROMPT);

		if (!err.equals("")) {
			this.close();
			throw new MCLException(err.trim());
		}
		if (!redirects.isEmpty()) {
			if (followRedirects) {
				// Ok, server wants us to go somewhere else. The list might have multiple clues on where to go. For now
				// we don't support anything intelligent but trying the first one. URI should be in form of:
				// "mapi:monetdb://host:port/database?arg=value&..." or "mapi:merovingian://proxy?arg=value&..." note
				// that the extra arguments must be obeyed in both cases
				String suri = redirects.get(0);
				if (!suri.startsWith("mapi:"))
					throw new MCLException("unsupported redirect: " + suri);

				URI u;
				try {
					u = new URI(suri.substring(5));
				} catch (URISyntaxException e) {
					throw new ProtocolException(e.toString());
				}
				String tmp = u.getQuery();
				if (tmp != null) {
					String args[] = tmp.split("&");
					for (String arg : args) {
						int pos = arg.indexOf("=");
						if (pos > 0) {
							tmp = arg.substring(0, pos);
							switch (tmp) {
								case "database":
									tmp = arg.substring(pos + 1);
									if (!tmp.equals(database)) {
										warns.add("redirect points to different " + "database: " + tmp);
										this.database = tmp;
									}
									break;
								case "language":
									tmp = arg.substring(pos + 1);
									warns.add("redirect specifies use of different language: " + tmp);
									 this.language = MapiLanguage.getLanguageFromString(tmp);
									break;
								case "user":
									tmp = arg.substring(pos + 1);
									if (!tmp.equals(user))
										warns.add("ignoring different username '" + tmp + "' set by " +
												"redirect, what are the security implications?");
									break;
								case "password":
									warns.add("ignoring different password set by redirect, " +
											"what are the security implications?");
									break;
								default:
									warns.add("ignoring unknown argument '" + tmp + "' from redirect");
									break;
							}
						} else {
							warns.add("ignoring illegal argument from redirect: " + arg);
						}
					}
				}

				switch (u.getScheme()) {
					case "monetdb":
						tmp = u.getPath();
						if (tmp != null && tmp.length() != 0) {
							tmp = tmp.substring(1).trim();
							if (!tmp.isEmpty() && !tmp.equals(database)) {
								warns.add("redirect points to different " + "database: " + tmp);
								this.database = tmp;
							}
						}
						int p = u.getPort();
						warns.addAll(connect(u.getHost(), p == -1 ? port : p, user, pass, true));
						warns.add("Redirect by " + host + ":" + port + " to " + suri);
						break;
					case "merovingian":
						// reuse this connection to inline connect to the right database that Merovingian proxies for us
						warns.addAll(connect(host, port, user, pass, false));
						break;
					default:
						throw new MCLException("unsupported scheme in redirect: " + suri);
				}
			} else {
				StringBuilder msg = new StringBuilder("The server sent a redirect for this connection:");
				for (String it : redirects) {
					msg.append(" [").append(it).append("]");
				}
				throw new MCLException(msg.toString());
			}
		}
		return warns;
	}

	/**
	 * A little helper function that processes a challenge string, and returns a response string for the server.
	 * If the challenge string is null, a challengeless response is returned.
	 *
	 * @param chalstr the challenge string
	 * @param username the username to use
	 * @param password the password to use
	 * @param language the language to use
	 * @param database the database to connect to
	 * @param hash the hash method(s) to use, or NULL for all supported hashes
	 */
	private String getChallengeResponse(String chalstr, String username, String password, String language,
										String database, String hash) throws ProtocolException, MCLException,
			IOException {
		String response;
		String algo;

		// parse the challenge string, split it on ':'
		String[] chaltok = chalstr.split(":");
		if (chaltok.length <= 4)
			throw new ProtocolException("Server challenge string unusable! Challenge contains too few tokens: "
					+ chalstr);

		// challenge string to use as salt/key
		String challenge = chaltok[0];
		String servert = chaltok[1];
		try {
			this.version = Integer.parseInt(chaltok[2].trim()); // protocol version
		} catch (NumberFormatException e) {
			throw new ProtocolException("Protocol version unparseable: " + chaltok[2]);
		}

		switch (chaltok[4]) {
			case "BIG":
				this.serverEndianness = ByteOrder.BIG_ENDIAN;
				break;
			case "LIT":
				this.serverEndianness = ByteOrder.LITTLE_ENDIAN;
				break;
			default:
				throw new ProtocolException("Invalid byte-order: " + chaltok[4]);
		}
		((OldMapiProtocol)protocol).getSocket().setSocketChannelEndianness(this.serverEndianness);

		// handle the challenge according to the version it is
		switch (this.version) {
			case 9:
				// proto 9 is like 8, but uses a hash instead of the plain password, the server tells us which hash in
				// the challenge after the byte-order
				/* NOTE: Java doesn't support RIPEMD160 :( */
				switch (chaltok[5]) {
					case "SHA512":
						algo = "SHA-512";
						break;
					case "SHA384":
						algo = "SHA-384";
						break;
					case "SHA256":
						algo = "SHA-256";
				/* NOTE: Java supports SHA-224 only on 8 */
						break;
					case "SHA1":
						algo = "SHA-1";
						break;
					case "MD5":
						algo = "MD5";
						break;
					default:
						throw new MCLException("Unsupported password hash: " + chaltok[5]);
				}

				password = ChannelSecurity.digestStrings(algo, password.getBytes("UTF-8"));

				// proto 7 (finally) used the challenge and works with a password hash. The supported implementations
				// come from the server challenge. We chose the best hash we can find, in the order SHA1, MD5, plain.
				// Also, the byte-order is reported in the challenge string. proto 8 made this obsolete, but retained
				// the byte-order report for future "binary" transports. In proto 8, the byte-order of the blocks is
				// always little endian because most machines today are.
				String hashes = (hash == null ? chaltok[3] : hash);
				Set<String> hashesSet = new HashSet<>(Arrays.asList(hashes.toUpperCase().split("[, ]")));

				// if we deal with merovingian, mask our credentials
				if (servert.equals("merovingian") && !language.equals("control")) {
					username = "merovingian";
					password = "merovingian";
				}
				String pwhash;

				if (hashesSet.contains("SHA512")) {
					algo = "SHA-512";
					pwhash = "{SHA512}";
				} else if (hashesSet.contains("SHA384")) {
					algo = "SHA-384";
					pwhash = "{SHA384}";
				} else if (hashesSet.contains("SHA256")) {
					algo = "SHA-256";
					pwhash = "{SHA256}";
				} else if (hashesSet.contains("SHA1")) {
					algo = "SHA-1";
					pwhash = "{SHA1}";
				} else if (hashesSet.contains("MD5")) {
					algo = "MD5";
					pwhash = "{MD5}";
				} else {
					throw new MCLException("No supported password hashes in " + hashes);
				}

				pwhash += ChannelSecurity.digestStrings(algo, password.getBytes("UTF-8"),
						challenge.getBytes("UTF-8"));

				// generate response
				response = "BIG:";	// JVM byte-order is big-endian
				response += username + ":" + pwhash + ":" + language;
				response += ":" + (database == null ? "" : database) + ":";

				this.conn_props.setProperty("hash", hashes);
				this.conn_props.setProperty("language", language);
				this.conn_props.setProperty("database", database);

				return response;
			default:
				throw new MCLException("Unsupported protocol version: " + version);
		}
	}
}