Mercurial > hg > monetdb-java
diff src/main/java/nl/cwi/monetdb/mcl/net/MapiSocket.java @ 0:a5a898f6886c
Copy of MonetDB java directory changeset e6e32756ad31.
author | Sjoerd Mullender <sjoerd@acm.org> |
---|---|
date | Wed, 21 Sep 2016 09:34:48 +0200 (2016-09-21) |
parents | |
children | 7e0d71a22677 6cc63d6cb224 |
line wrap: on
line diff
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/net/MapiSocket.java @@ -0,0 +1,1103 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Copyright 1997 - July 2008 CWI, August 2008 - 2016 MonetDB B.V. + */ + +package nl.cwi.monetdb.mcl.net; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.EOFException; +import java.io.FileWriter; +import java.io.FilterInputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; +import java.io.Writer; +import java.net.Socket; +import java.net.SocketException; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import nl.cwi.monetdb.mcl.MCLException; +import nl.cwi.monetdb.mcl.io.BufferedMCLReader; +import nl.cwi.monetdb.mcl.io.BufferedMCLWriter; +import nl.cwi.monetdb.mcl.parser.MCLParseException; + +/** + * A Socket for communicating with the MonetDB database in MAPI block + * mode. + * + * The MapiSocket implements the protocol specifics of the MAPI block + * mode protocol, and interfaces it as a socket that delivers a + * BufferedReader and a BufferedWriter. Because logging in is an + * integral part of the MAPI protocol, the MapiSocket performs the login + * procedure. Like the Socket class, various options can be set before + * calling the connect() method to influence the login process. Only + * after a successful call to connect() the BufferedReader and + * BufferedWriter can be retrieved. + * <br /> + * For each line read, it is determined what type of line it is + * according to the MonetDB MAPI protocol. This results in a line to be + * PROMPT, HEADER, RESULT, ERROR or UNKNOWN. Use the getLineType() + * method on the BufferedMCLReader to retrieve the type of the last + * line read. + * + * For debugging purposes a socket level debugging is implemented where + * each and every interaction to and from the MonetDB server is logged + * to a file on disk.<br /> + * Incoming messages are prefixed by "RX" (received by the driver), + * outgoing messages by "TX" (transmitted by the driver). Special + * decoded non-human readable messages are prefixed with "RD" and "TD" + * instead. Following this two char prefix, a timestamp follows as the + * number of milliseconds since the UNIX epoch. The rest of the line is + * a String representation of the data sent or received. + * + * The general use of this Socket must be seen only in the full context + * of a MAPI connection to a server. It has the same ingredients as a + * normal Socket, allowing for seamless plugging. + * <pre> + * Socket \ / InputStream ----> (BufferedMCL)Reader + * > o < + * MapiSocket / \ OutputStream ----> (BufferedMCL)Writer + * </pre> + * The MapiSocket allows to retrieve Streams for communicating. They + * are interfaced, so they can be chained in any way. While the Socket + * transparently deals with how data is sent over the wire, the actual + * data read needs to be interpreted, for which a Reader/Writer + * interface is most sufficient. In particular the BufferedMCL* + * implementations of those interfaces supply some extra functionality + * geared towards the format of the data. + * + * @author Fabian Groffen + * @version 4.1 + * @see nl.cwi.monetdb.mcl.io.BufferedMCLReader + * @see nl.cwi.monetdb.mcl.io.BufferedMCLWriter + */ +public final class MapiSocket { + /** The TCP Socket to mserver */ + private Socket con; + /** Stream from the Socket for reading */ + private InputStream fromMonet; + /** Stream from the Socket for writing */ + private OutputStream toMonet; + /** MCLReader on the InputStream */ + private BufferedMCLReader reader; + /** MCLWriter on the OutputStream */ + private BufferedMCLWriter writer; + /** protocol version of the connection */ + private int version; + + /** The database to connect to */ + private String database = null; + /** The language to connect with */ + private String language = "sql"; + /** The hash methods to use (null = default) */ + private String hash = null; + /** Whether we should follow redirects */ + private boolean followRedirects = true; + /** How many redirections do we follow until we're fed up with it? */ + private int ttl = 10; + /** Whether we are debugging or not */ + private boolean debug = false; + /** The Writer for the debug log-file */ + private Writer log; + + /** The blocksize (hardcoded in compliance with stream.mx) */ + public final static int BLOCK = 8 * 1024 - 2; + + /** A short in two bytes for holding the block size in bytes */ + private byte[] blklen = new byte[2]; + + /** + * Constructs a new MapiSocket. + */ + public MapiSocket() { + con = null; + } + + /** + * Sets the database to connect to. If database is null, a + * connection is made to the default database of the server. This + * is also the default. + * + * @param db the database + */ + public void setDatabase(String db) { + this.database = db; + } + + /** + * Sets the language to use for this connection. + * + * @param lang the language + */ + public void setLanguage(String lang) { + this.language = lang; + } + + /** + * Sets the hash method to use. Note that this method is intended + * for debugging purposes. Setting a hash method can yield in + * connection failures. Multiple hash methods can be given by + * separating the hashes by commas. + * DON'T USE THIS METHOD if you don't know what you're doing. + * + * @param hash the hash method to use + */ + public void setHash(String hash) { + this.hash = hash; + } + + /** + * Sets whether MCL redirections should be followed or not. If set + * to false, an MCLException will be thrown when a redirect is + * encountered during connect. The default bahaviour is to + * automatically follow redirects. + * + * @param r whether to follow redirects (true) or not (false) + */ + public void setFollowRedirects(boolean r) { + this.followRedirects = r; + } + + /** + * Sets the number of redirects that are followed when + * followRedirects is true. In order to avoid going into an endless + * loop due to some evil server, or another error, a maximum number + * of redirects that may be followed can be set here. Note that to + * disable the following of redirects you should use + * setFollowRedirects. + * + * @see #setFollowRedirects(boolean r) + * @param t the number of redirects before an exception is thrown + */ + public void setTTL(int t) { + this.ttl = t; + } + + /** + * Set the SO_TIMEOUT on the underlying Socket. When for some + * reason the connection to the database hangs, this setting can be + * useful to break out of this indefinite wait. + * This option must be enabled prior to entering the blocking + * operation to have effect. + * + * @param s The specified timeout, in milliseconds. A timeout + * of zero is interpreted as an infinite timeout. + * @throws SocketException Issue with the socket + */ + public void setSoTimeout(int s) throws SocketException { + // limit time to wait on blocking operations (0 = indefinite) + con.setSoTimeout(s); + } + + /** + * Gets the SO_TIMEOUT from the underlying Socket. + * + * @return the currently in use timeout in milliseconds + * @throws SocketException Issue with the socket + */ + public int getSoTimeout() throws SocketException { + return con.getSoTimeout(); + } + + /** + * Enables/disables debug + * + * @param debug Value to set + */ + public void setDebug(boolean debug) { + this.debug = debug; + } + + /** + * Connects to the given host and port, logging in as the given + * user. If followRedirect is false, a RedirectionException is + * thrown when a redirect is encountered. + * + * @param host the hostname, or null for the loopback address + * @param port the port number + * @param user the username + * @param pass the password + * @return A List with informational (warning) messages. If this + * list is empty; then there are no warnings. + * @throws IOException if an I/O error occurs when creating the + * socket + * @throws MCLParseException if bogus data is received + * @throws MCLException if an MCL related error occurs + */ + public List<String> connect(String host, int port, String user, String pass) + throws IOException, MCLParseException, MCLException { + // Wrap around the internal connect that needs to know if it + // should really make a TCP connection or not. + return connect(host, port, user, pass, true); + } + + private List<String> connect(String host, int port, String user, String pass, + boolean makeConnection) + throws IOException, MCLParseException, MCLException { + if (ttl-- <= 0) + throw new MCLException("Maximum number of redirects reached, aborting connection attempt. Sorry."); + + if (makeConnection) { + con = new Socket(host, port); + // set nodelay, as it greatly speeds up small messages (like we + // often do) + con.setTcpNoDelay(true); + + fromMonet = new BlockInputStream(con.getInputStream()); + toMonet = new BlockOutputStream(con.getOutputStream()); + try { + reader = new BufferedMCLReader(fromMonet, "UTF-8"); + writer = new BufferedMCLWriter(toMonet, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new AssertionError(e.toString()); + } + writer.registerReader(reader); + } + + String c = reader.readLine(); + reader.waitForPrompt(); + writer.writeLine( + getChallengeResponse( + c, + user, + pass, + language, + database, + hash + ) + ); + + // read monet response till prompt + List<String> redirects = new ArrayList<String>(); + List<String> warns = new ArrayList<String>(); + String err = "", tmp; + int lineType; + do { + if ((tmp = reader.readLine()) == null) + throw new IOException("Read from " + + con.getInetAddress().getHostName() + ":" + + con.getPort() + ": End of stream reached"); + if ((lineType = reader.getLineType()) == BufferedMCLReader.ERROR) { + err += "\n" + tmp.substring(7); + } else if (lineType == BufferedMCLReader.INFO) { + warns.add(tmp.substring(1)); + } else if (lineType == BufferedMCLReader.REDIRECT) { + redirects.add(tmp.substring(1)); + } + } while (lineType != BufferedMCLReader.PROMPT); + if (err != "") { + close(); + throw new MCLException(err.trim()); + } + if (!redirects.isEmpty()) { + if (followRedirects) { + // Ok, server wants us to go somewhere else. The list + // might have multiple clues on where to go. For now we + // don't support anything intelligent but trying the + // first one. URI should be in form of: + // "mapi:monetdb://host:port/database?arg=value&..." + // or + // "mapi:merovingian://proxy?arg=value&..." + // note that the extra arguments must be obeyed in both + // cases + String suri = redirects.get(0).toString(); + if (!suri.startsWith("mapi:")) + throw new MCLException("unsupported redirect: " + suri); + + URI u; + try { + u = new URI(suri.substring(5)); + } catch (URISyntaxException e) { + throw new MCLParseException(e.toString()); + } + + tmp = u.getQuery(); + if (tmp != null) { + String args[] = tmp.split("&"); + for (int i = 0; i < args.length; i++) { + int pos = args[i].indexOf("="); + if (pos > 0) { + tmp = args[i].substring(0, pos); + if (tmp.equals("database")) { + tmp = args[i].substring(pos + 1); + if (!tmp.equals(database)) { + warns.add("redirect points to different " + + "database: " + tmp); + setDatabase(tmp); + } + } else if (tmp.equals("language")) { + tmp = args[i].substring(pos + 1); + warns.add("redirect specifies use of different language: " + tmp); + setLanguage(tmp); + } else if (tmp.equals("user")) { + tmp = args[i].substring(pos + 1); + if (!tmp.equals(user)) + warns.add("ignoring different username '" + tmp + "' set by " + + "redirect, what are the security implications?"); + } else if (tmp.equals("password")) { + warns.add("ignoring different password set by redirect, " + + "what are the security implications?"); + } else { + warns.add("ignoring unknown argument '" + tmp + "' from redirect"); + } + } else { + warns.add("ignoring illegal argument from redirect: " + + args[i]); + } + } + } + + if (u.getScheme().equals("monetdb")) { + // this is a redirect to another (monetdb) server, + // which means a full reconnect + // avoid the debug log being closed + if (debug) { + debug = false; + close(); + debug = true; + } else { + close(); + } + tmp = u.getPath(); + if (tmp != null && tmp.length() != 0) { + tmp = tmp.substring(1).trim(); + if (!tmp.isEmpty() && !tmp.equals(database)) { + warns.add("redirect points to different " + + "database: " + tmp); + setDatabase(tmp); + } + } + int p = u.getPort(); + warns.addAll(connect(u.getHost(), p == -1 ? port : p, + user, pass, true)); + warns.add("Redirect by " + host + ":" + port + " to " + suri); + } else if (u.getScheme().equals("merovingian")) { + // reuse this connection to inline connect to the + // right database that Merovingian proxies for us + warns.addAll(connect(host, port, user, pass, false)); + } else { + throw new MCLException("unsupported scheme in redirect: " + suri); + } + } else { + StringBuilder msg = new StringBuilder("The server sent a redirect for this connection:"); + for (String it : redirects) { + msg.append(" [" + it + "]"); + } + throw new MCLException(msg.toString()); + } + } + return warns; + } + + /** + * A little helper function that processes a challenge string, and + * returns a response string for the server. If the challenge + * string is null, a challengeless response is returned. + * + * @param chalstr the challenge string + * @param username the username to use + * @param password the password to use + * @param language the language to use + * @param database the database to connect to + * @param hash the hash method(s) to use, or NULL for all supported + * hashes + */ + private String getChallengeResponse( + String chalstr, + String username, + String password, + String language, + String database, + String hash + ) throws MCLParseException, MCLException, IOException { + String response; + String algo; + + // parse the challenge string, split it on ':' + String[] chaltok = chalstr.split(":"); + if (chaltok.length <= 4) throw + new MCLParseException("Server challenge string unusable! Challenge contains too few tokens: " + chalstr); + + // challenge string to use as salt/key + String challenge = chaltok[0]; + String servert = chaltok[1]; + try { + version = Integer.parseInt(chaltok[2].trim()); // protocol version + } catch (NumberFormatException e) { + throw new MCLParseException("Protocol version unparseable: " + chaltok[3]); + } + + // handle the challenge according to the version it is + switch (version) { + default: + throw new MCLException("Unsupported protocol version: " + version); + case 9: + // proto 9 is like 8, but uses a hash instead of the + // plain password, the server tells us which hash in the + // challenge after the byte-order + + /* NOTE: Java doesn't support RIPEMD160 :( */ + if (chaltok[5].equals("SHA512")) { + algo = "SHA-512"; + } else if (chaltok[5].equals("SHA384")) { + algo = "SHA-384"; + } else if (chaltok[5].equals("SHA256")) { + algo = "SHA-256"; + /* NOTE: Java doesn't support SHA-224 */ + } else if (chaltok[5].equals("SHA1")) { + algo = "SHA-1"; + } else if (chaltok[5].equals("MD5")) { + algo = "MD5"; + } else { + throw new MCLException("Unsupported password hash: " + + chaltok[5]); + } + + try { + MessageDigest md = MessageDigest.getInstance(algo); + md.update(password.getBytes("UTF-8")); + byte[] digest = md.digest(); + password = toHex(digest); + } catch (NoSuchAlgorithmException e) { + throw new AssertionError("internal error: " + e.toString()); + } catch (UnsupportedEncodingException e) { + throw new AssertionError("internal error: " + e.toString()); + } + + // proto 7 (finally) used the challenge and works with a + // password hash. The supported implementations come + // from the server challenge. We chose the best hash + // we can find, in the order SHA1, MD5, plain. Also, + // the byte-order is reported in the challenge string, + // which makes sense, since only blockmode is supported. + // proto 8 made this obsolete, but retained the + // byte-order report for future "binary" transports. In + // proto 8, the byte-order of the blocks is always little + // endian because most machines today are. + String hashes = (hash == null ? chaltok[3] : hash); + Set<String> hashesSet = new HashSet<String>(Arrays.asList(hashes.toUpperCase().split("[, ]"))); + + // if we deal with merovingian, mask our credentials + if (servert.equals("merovingian") && !language.equals("control")) { + username = "merovingian"; + password = "merovingian"; + } + String pwhash; + algo = null; + + if (hashesSet.contains("SHA512")) { + algo = "SHA-512"; + pwhash = "{SHA512}"; + } else if (hashesSet.contains("SHA384")) { + algo = "SHA-384"; + pwhash = "{SHA384}"; + } else if (hashesSet.contains("SHA256")) { + algo = "SHA-256"; + pwhash = "{SHA256}"; + } else if (hashesSet.contains("SHA1")) { + algo = "SHA-1"; + pwhash = "{SHA1}"; + } else if (hashesSet.contains("MD5")) { + algo = "MD5"; + pwhash = "{MD5}"; + } else { + throw new MCLException("no supported password hashes in " + hashes); + } + if (algo != null) { + try { + MessageDigest md = MessageDigest.getInstance(algo); + md.update(password.getBytes("UTF-8")); + md.update(challenge.getBytes("UTF-8")); + byte[] digest = md.digest(); + pwhash += toHex(digest); + } catch (NoSuchAlgorithmException e) { + throw new AssertionError("internal error: " + e.toString()); + } catch (UnsupportedEncodingException e) { + throw new AssertionError("internal error: " + e.toString()); + } + } + // TODO: some day when we need this, we should store + // this + if (chaltok[4].equals("BIG")) { + // byte-order of server is big-endian + } else if (chaltok[4].equals("LIT")) { + // byte-order of server is little-endian + } else { + throw new MCLParseException("Invalid byte-order: " + chaltok[5]); + } + + // generate response + response = "BIG:"; // JVM byte-order is big-endian + response += username + ":" + pwhash + ":" + language; + response += ":" + (database == null ? "" : database) + ":"; + + return response; + } + } + + private static char hexChar(int n) { + return (n > 9) + ? (char) ('a' + (n - 10)) + : (char) ('0' + n); + } + + /** + * Small helper method to convert a byte string to a hexadecimal + * string representation. + * + * @param digest the byte array to convert + * @return the byte array as hexadecimal string + */ + private static String toHex(byte[] digest) { + char[] result = new char[digest.length * 2]; + int pos = 0; + for (int i = 0; i < digest.length; i++) { + result[pos++] = hexChar((digest[i] & 0xf0) >> 4); + result[pos++] = hexChar(digest[i] & 0x0f); + } + return new String(result); + } + + /** + * Returns an InputStream that reads from this open connection on + * the MapiSocket. + * + * @return an input stream that reads from this open connection + */ + public InputStream getInputStream() { + return fromMonet; + } + + /** + * Returns an output stream for this MapiSocket. + * + * @return an output stream for writing bytes to this MapiSocket + */ + public OutputStream getOutputStream() { + return toMonet; + } + + /** + * Returns a Reader for this MapiSocket. The Reader is a + * BufferedMCLReader which does protocol interpretation of the + * BlockInputStream produced by this MapiSocket. + * + * @return a BufferedMCLReader connected to this MapiSocket + */ + public BufferedMCLReader getReader() { + return reader; + } + + /** + * Returns a Writer for this MapiSocket. The Writer is a + * BufferedMCLWriter which produces protocol compatible data blocks + * that the BlockOutputStream can properly translate into blocks. + * + * @return a BufferedMCLWriter connected to this MapiSocket + */ + public BufferedMCLWriter getWriter() { + return writer; + } + + /** + * Returns the mapi protocol version used by this socket. The + * protocol version depends on the server being used. Users of the + * MapiSocket should check this version to act appropriately. + * + * @return the mapi protocol version + */ + public int getProtocolVersion() { + return version; + } + + /** + * Enables logging to a file what is read and written from and to + * the server. Logging can be enabled at any time. However, it is + * encouraged to start debugging before actually connecting the + * socket. + * + * @param filename the name of the file to write to + * @throws IOException if the file could not be opened for writing + */ + public void debug(String filename) throws IOException { + debug(new FileWriter(filename)); + } + + /** + * Enables logging to a stream what is read and written from and to + * the server. Logging can be enabled at any time. However, it is + * encouraged to start debugging before actually connecting the + * socket. + * + * @param out to write the log to + * @throws IOException if the file could not be opened for writing + */ + public void debug(PrintStream out) throws IOException { + debug(new PrintWriter(out)); + } + + /** + * Enables logging to a stream what is read and written from and to + * the server. Logging can be enabled at any time. However, it is + * encouraged to start debugging before actually connecting the + * socket. + * + * @param out to write the log to + * @throws IOException if the file could not be opened for writing + */ + public void debug(Writer out) throws IOException { + log = out; + debug = true; + } + + /** + * Inner class that is used to write data on a normal stream as a + * blocked stream. A call to the flush() method will write a + * "final" block to the underlying stream. Non-final blocks are + * written as soon as one or more bytes would not fit in the + * current block any more. This allows to write to a block to it's + * full size, and then flush it explicitly to have a final block + * being written to the stream. + */ + class BlockOutputStream extends FilterOutputStream { + private int writePos = 0; + private byte[] block = new byte[BLOCK]; + private int blocksize = 0; + + /** + * Constructs this BlockOutputStream, backed by the given + * OutputStream. A BufferedOutputStream is internally used. + */ + public BlockOutputStream(OutputStream out) { + // always use a buffered stream, even though we know how + // much bytes to write/read, since this is just faster for + // some reason + super(new BufferedOutputStream(out)); + } + + @Override + public void flush() throws IOException { + // write the block (as final) then flush. + writeBlock(true); + out.flush(); + + // it's a bit nasty if an exception is thrown from the log, + // but ignoring it can be nasty as well, so it is decided to + // let it go so there is feedback about something going wrong + // it's a bit nasty if an exception is thrown from the log, + // but ignoring it can be nasty as well, so it is decided to + // let it go so there is feedback about something going wrong + if (debug) { + log.flush(); + } + } + + /** + * writeBlock puts the data in the block on the stream. The + * boolean last controls whether the block is sent with an + * indicator to note it is the last block of a sequence or not. + * + * @param last whether this is the last block + * @throws IOException if writing to the stream failed + */ + public void writeBlock(boolean last) throws IOException { + if (last) { + // always fits, because of BLOCK's size + blocksize = (short)writePos; + // this is the last block, so encode least + // significant bit in the first byte (little-endian) + blklen[0] = (byte)(blocksize << 1 & 0xFF | 1); + blklen[1] = (byte)(blocksize >> 7); + } else { + // always fits, because of BLOCK's size + blocksize = (short)BLOCK; + // another block will follow, encode least + // significant bit in the first byte (little-endian) + blklen[0] = (byte)(blocksize << 1 & 0xFF); + blklen[1] = (byte)(blocksize >> 7); + } + + out.write(blklen); + + // write the actual block + out.write(block, 0, writePos); + + if (debug) { + if (last) { + logTd("write final block: " + writePos + " bytes"); + } else { + logTd("write block: " + writePos + " bytes"); + } + logTx(new String(block, 0, writePos, "UTF-8")); + } + + writePos = 0; + } + + @Override + public void write(int b) throws IOException { + if (writePos == BLOCK) { + writeBlock(false); + } + block[writePos++] = (byte)b; + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + int t = 0; + while (len > 0) { + t = BLOCK - writePos; + if (len > t) { + System.arraycopy(b, off, block, writePos, t); + off += t; + len -= t; + writePos += t; + writeBlock(false); + } else { + System.arraycopy(b, off, block, writePos, len); + writePos += len; + break; + } + } + } + + @Override + public void close() throws IOException { + // we don't want the flush() method to be called (default of + // the FilterOutputStream), so we close manually here + out.close(); + } + } + + + /** + * Inner class that is used to make the data on the blocked stream + * available as a normal stream. + */ + class BlockInputStream extends FilterInputStream { + private int readPos = 0; + private int blockLen = 0; + private byte[] block = new byte[BLOCK + 3]; // \n.\n + + /** + * Constructs this BlockInputStream, backed by the given + * InputStream. A BufferedInputStream is internally used. + */ + public BlockInputStream(InputStream in) { + // always use a buffered stream, even though we know how + // much bytes to write/read, since this is just faster for + // some reason + super(new BufferedInputStream(in)); + } + + @Override + public int available() { + return blockLen - readPos; + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readlimit) { + throw new AssertionError("Not implemented!"); + } + + @Override + public void reset() { + throw new AssertionError("Not implemented!"); + } + + /** + * Small wrapper to get a blocking variant of the read() method + * on the BufferedInputStream. We want to benefit from the + * Buffered pre-fetching, but not dealing with half blocks. + * Changing this class to be able to use the partially received + * data will greatly complicate matters, while a performance + * improvement is debatable given the relatively small size of + * our blocks. Maybe it does speed up on slower links, then + * consider this method a quick bug fix/workaround. + * + * @return false if reading the block failed due to EOF + */ + private boolean _read(byte[] b, int len) throws IOException { + int s; + int off = 0; + + while (len > 0) { + s = in.read(b, off, len); + if (s == -1) { + // if we have read something before, we should have been + // able to read the whole, so make this fatal + if (off > 0) { + if (debug) { + logRd("the following incomplete block was received:"); + logRx(new String(b, 0, off, "UTF-8")); + } + throw new IOException("Read from " + + con.getInetAddress().getHostName() + ":" + + con.getPort() + ": Incomplete block read from stream"); + } + if (debug) + logRd("server closed the connection (EOF)"); + return false; + } + len -= s; + off += s; + } + + return true; + } + + /** + * Reads the next block on the stream into the internal buffer, + * or writes the prompt in the buffer. + * + * The blocked stream protocol consists of first a two byte + * integer indicating the length of the block, then the + * block, followed by another length + block. The end of + * such sequence is put in the last bit of the length, and + * hence this length should be shifted to the right to + * obtain the real length value first. We simply fetch + * blocks here as soon as they are needed for the stream's + * read methods. + * + * The user-flush, which is an implicit effect of the end of + * a block sequence, is communicated beyond the stream by + * inserting a prompt sequence on the stream after the last + * block. This method makes sure that a final block ends with a + * newline, if it doesn't already, in order to facilitate a + * Reader that is possibly chained to this InputStream. + * + * If the stream is not positioned correctly, hell will break + * loose. + */ + private int readBlock() throws IOException { + // read next two bytes (short) + if (!_read(blklen, 2)) + return(-1); + + // Get the short-value and store its value in blockLen. + blockLen = (short)( + (blklen[0] & 0xFF) >> 1 | + (blklen[1] & 0xFF) << 7 + ); + readPos = 0; + + if (debug) { + if ((blklen[0] & 0x1) == 1) { + logRd("read final block: " + blockLen + " bytes"); + } else { + logRd("read new block: " + blockLen + " bytes"); + } + } + + // sanity check to avoid bad servers make us do an ugly + // stack trace + if (blockLen > block.length) + throw new AssertionError("Server sent a block " + + "larger than BLOCKsize: " + + blockLen + " > " + block.length); + if (!_read(block, blockLen)) + return(-1); + + if (debug) + logRx(new String(block, 0, blockLen, "UTF-8")); + + // if this is the last block, make it end with a newline and + // prompt + if ((blklen[0] & 0x1) == 1) { + if (blockLen > 0 && block[blockLen - 1] != '\n') { + // to terminate the block in a Reader + block[blockLen++] = '\n'; + } + // insert 'fake' flush + block[blockLen++] = BufferedMCLReader.PROMPT; + block[blockLen++] = '\n'; + if (debug) + logRd("inserting prompt"); + } + + return(blockLen); + } + + @Override + public int read() throws IOException { + if (available() == 0) { + if (readBlock() == -1) + return(-1); + } + + if (debug) + logRx(new String(block, readPos, 1, "UTF-8")); + return (int)block[readPos++]; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int t; + int size = 0; + while (size < len) { + t = available(); + if (t == 0) { + if (size != 0) + break; + if (readBlock() == -1) { + if (size == 0) + size = -1; + break; + } + t = available(); + } + if (len > t) { + System.arraycopy(block, readPos, b, off, t); + off += t; + len -= t; + readPos += t; + size += t; + } else { + System.arraycopy(block, readPos, b, off, len); + readPos += len; + size += len; + break; + } + } + return size; + } + + @Override + public long skip(long n) throws IOException { + long skip = n; + int t = 0; + while (skip > 0) { + t = available(); + if (skip > t) { + skip -= t; + readPos += t; + readBlock(); + } else { + readPos += skip; + break; + } + } + return n; + } + } + + /** + * Closes the streams and socket connected to the server if + * possible. If an error occurs during disconnecting it is ignored. + */ + public synchronized void close() { + try { + if (reader != null) reader.close(); + if (writer != null) writer.close(); + if (fromMonet != null) fromMonet.close(); + if (toMonet != null) toMonet.close(); + if (con != null) con.close(); + if (debug && log instanceof FileWriter) log.close(); + } catch (IOException e) { + // ignore it + } + } + + /** + * Destructor called by garbage collector before destroying this + * object tries to disconnect the MonetDB connection if it has not + * been disconnected already. + */ + @Override + protected void finalize() throws Throwable { + close(); + super.finalize(); + } + + + /** + * Writes a logline tagged with a timestamp using the given string. + * Used for debugging purposes only and represents a message that is + * connected to writing to the socket. A logline might look like: + * TX 152545124: Hello MonetDB! + * + * @param message the message to log + * @throws IOException if an IO error occurs while writing to the logfile + */ + private void logTx(String message) throws IOException { + log.write("TX " + System.currentTimeMillis() + + ": " + message + "\n"); + } + + /** + * Writes a logline tagged with a timestamp using the given string. + * Lines written using this log method are tagged as "added + * metadata" which is not strictly part of the data sent. + * + * @param message the message to log + * @throws IOException if an IO error occurs while writing to the logfile + */ + private void logTd(String message) throws IOException { + log.write("TD " + System.currentTimeMillis() + + ": " + message + "\n"); + } + + /** + * Writes a logline tagged with a timestamp using the given string, + * and flushes afterwards. Used for debugging purposes only and + * represents a message that is connected to reading from the + * socket. The log is flushed after writing the line. A logline + * might look like: + * RX 152545124: Hi JDBC! + * + * @param message the message to log + * @throws IOException if an IO error occurs while writing to the logfile + */ + private void logRx(String message) throws IOException { + log.write("RX " + System.currentTimeMillis() + + ": " + message + "\n"); + log.flush(); + } + + /** + * Writes a logline tagged with a timestamp using the given string, + * and flushes afterwards. Lines written using this log method are + * tagged as "added metadata" which is not strictly part of the data + * received. + * + * @param message the message to log + * @throws IOException if an IO error occurs while writing to the logfile + */ + private void logRd(String message) throws IOException { + log.write("RD " + System.currentTimeMillis() + + ": " + message + "\n"); + log.flush(); + } +}