Mercurial > hg > monetdb-java
view src/main/java/org/monetdb/mcl/net/MapiSocket.java @ 768:a80c21fe7bb2
Removed deprecated nl.cwi.monetdb.*.* classes and package.
Those classes were marked deprecated on 12 Nov 2020 from
release 3.0 (released on 17 Feb 2021) onwards. It includes:
nl.cwi.monetdb.client.JdbcClient.class
nl.cwi.monetdb.jdbc.MonetDriver.class
nl.cwi.monetdb.jdbc.types.INET.class
nl.cwi.monetdb.jdbc.types.URL.class
nl.cwi.monetdb.mcl.net.MapiSocket.class
These classes are now removed permanently.
Use the org.monetdb.* equivalents instead.
author | Martin van Dinther <martin.van.dinther@monetdbsolutions.com> |
---|---|
date | Thu, 06 Jul 2023 15:35:04 +0200 (21 months ago) |
parents | a39d3a45da56 |
children | 5bfe3357fb1c 9188263368cc |
line wrap: on
line source
/* * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * * Copyright 1997 - July 2008 CWI, August 2008 - 2023 MonetDB B.V. */ package org.monetdb.mcl.net; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.FileWriter; import java.io.FilterInputStream; import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Writer; import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; import java.net.URI; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import org.monetdb.mcl.MCLException; import org.monetdb.mcl.io.BufferedMCLReader; import org.monetdb.mcl.io.BufferedMCLWriter; import org.monetdb.mcl.io.LineType; import org.monetdb.mcl.parser.MCLParseException; /** * A Socket for communicating with the MonetDB database in MAPI block * mode. * * The MapiSocket implements the protocol specifics of the MAPI block * mode protocol, and interfaces it as a socket that delivers a * BufferedReader and a BufferedWriter. Because logging in is an * integral part of the MAPI protocol, the MapiSocket performs the login * procedure. Like the Socket class, various options can be set before * calling the connect() method to influence the login process. Only * after a successful call to connect() the BufferedReader and * BufferedWriter can be retrieved. * * For each line read, it is determined what type of line it is * according to the MonetDB MAPI protocol. This results in a line to be * PROMPT, HEADER, RESULT, ERROR or UNKNOWN. Use the getLineType() * method on the BufferedMCLReader to retrieve the type of the last * line read. * * For debugging purposes a socket level debugging is implemented where * each and every interaction to and from the MonetDB server is logged * to a file on disk. * Incoming messages are prefixed by "RX" (received by the driver), * outgoing messages by "TX" (transmitted by the driver). Special * decoded non-human readable messages are prefixed with "RD" and "TD" * instead. Following this two char prefix, a timestamp follows as the * number of milliseconds since the UNIX epoch. The rest of the line is * a String representation of the data sent or received. * * The general use of this Socket must be seen only in the full context * of a MAPI connection to a server. It has the same ingredients as a * normal Socket, allowing for seamless plugging. * <pre> * Socket \ / InputStream ----> (BufferedMCL)Reader * > o < * MapiSocket / \ OutputStream ----> (BufferedMCL)Writer * </pre> * The MapiSocket allows to retrieve Streams for communicating. They * are interfaced, so they can be chained in any way. While the Socket * transparently deals with how data is sent over the wire, the actual * data read needs to be interpreted, for which a Reader/Writer * interface is most sufficient. In particular the BufferedMCL* * implementations of those interfaces supply some extra functionality * geared towards the format of the data. * * @author Fabian Groffen * @version 4.3 * @see org.monetdb.mcl.io.BufferedMCLReader * @see org.monetdb.mcl.io.BufferedMCLWriter */ public final class MapiSocket { /** The TCP Socket to mserver */ private Socket con; /** The TCP Socket timeout in milliseconds. Default is 0 meaning the timeout is disabled (i.e., timeout of infinity) */ private int soTimeout = 0; /** Stream from the Socket for reading */ private BlockInputStream fromMonet; /** Stream from the Socket for writing */ private OutputStream toMonet; /** MCLReader on the InputStream */ private BufferedMCLReader reader; /** MCLWriter on the OutputStream */ private BufferedMCLWriter writer; /** protocol version of the connection */ private int version; /** The database to connect to */ private String database = null; /** The language to connect with */ private String language = "sql"; /** The hash methods to use (null = default) */ private String hash = null; /** Whether we should follow redirects */ private boolean followRedirects = true; /** How many redirections do we follow until we're fed up with it? */ private int ttl = 10; /** Whether we are debugging or not */ private boolean debug = false; /** The Writer for the debug log-file */ private Writer log; /** The blocksize (hardcoded in compliance with MonetDB common/stream/stream.h) */ public final static int BLOCK = 8 * 1024 - 2; /** A short in two bytes for holding the block size in bytes */ private final byte[] blklen = new byte[2]; /** Options that can be sent during the auth handshake if the server supports it */ private HandshakeOption<?>[] handshakeOptions; /** * Constructs a new MapiSocket. */ public MapiSocket() { con = null; } /** * Sets the database to connect to. If database is null, a * connection is made to the default database of the server. This * is also the default. * * @param db the database */ public void setDatabase(final String db) { this.database = db; } /** * Sets the language to use for this connection. * * @param lang the language */ public void setLanguage(final String lang) { this.language = lang; } /** * Sets the hash method to use. Note that this method is intended * for debugging purposes. Setting a hash method can yield in * connection failures. Multiple hash methods can be given by * separating the hashes by commas. * DON'T USE THIS METHOD if you don't know what you're doing. * * @param hash the hash method to use */ public void setHash(final String hash) { this.hash = hash; } /** * Sets whether MCL redirections should be followed or not. If set * to false, an MCLException will be thrown when a redirect is * encountered during connect. The default bahaviour is to * automatically follow redirects. * * @param r whether to follow redirects (true) or not (false) */ public void setFollowRedirects(final boolean r) { this.followRedirects = r; } /** * Sets the number of redirects that are followed when * followRedirects is true. In order to avoid going into an endless * loop due to some evil server, or another error, a maximum number * of redirects that may be followed can be set here. Note that to * disable the following of redirects you should use * setFollowRedirects. * * @see #setFollowRedirects(boolean r) * @param t the number of redirects before an exception is thrown */ public void setTTL(final int t) { this.ttl = t; } /** * Set the SO_TIMEOUT on the underlying Socket. When for some * reason the connection to the database hangs, this setting can be * useful to break out of this indefinite wait. * This option must be enabled prior to entering the blocking * operation to have effect. * * @param s The specified timeout, in milliseconds. A timeout * of zero will disable timeout (i.e., timeout of infinity). * @throws SocketException Issue with the socket */ public void setSoTimeout(final int s) throws SocketException { if (s < 0) { throw new IllegalArgumentException("timeout can't be negative"); } this.soTimeout = s; // limit time to wait on blocking operations if (con != null) { con.setSoTimeout(s); } } /** * Gets the SO_TIMEOUT from the underlying Socket. * * @return the currently in use timeout in milliseconds * @throws SocketException Issue with the socket */ public int getSoTimeout() throws SocketException { if (con != null) { this.soTimeout = con.getSoTimeout(); } return this.soTimeout; } /** * Enables/disables debug mode with logging to file * * @param debug Value to set */ public void setDebug(final boolean debug) { this.debug = debug; } /** * Connects to the given host and port, logging in as the given * user. If followRedirect is false, a RedirectionException is * thrown when a redirect is encountered. * * @param host the hostname, or null for the loopback address * @param port the port number (must be between 0 and 65535, inclusive) * @param user the username * @param pass the password * @return A List with informational (warning) messages. If this * list is empty; then there are no warnings. * @throws IOException if an I/O error occurs when creating the socket * @throws SocketException - if there is an error in the underlying protocol, such as a TCP error. * @throws UnknownHostException if the IP address of the host could not be determined * @throws MCLParseException if bogus data is received * @throws MCLException if an MCL related error occurs */ public List<String> connect(final String host, final int port, final String user, final String pass) throws IOException, SocketException, UnknownHostException, MCLParseException, MCLException { // Wrap around the internal connect that needs to know if it // should really make a TCP connection or not. return connect(host, port, user, pass, true); } /** * Connects to the given host and port, logging in as the given * user. If followRedirect is false, a RedirectionException is * thrown when a redirect is encountered. * * @param host the hostname, or null for the loopback address * @param port the port number (must be between 0 and 65535, inclusive) * @param user the username * @param pass the password * @param makeConnection whether a new socket connection needs to be created * @return A List with informational (warning) messages. If this * list is empty; then there are no warnings. * @throws IOException if an I/O error occurs when creating the socket * @throws SocketException - if there is an error in the underlying protocol, such as a TCP error. * @throws UnknownHostException if the IP address of the host could not be determined * @throws MCLParseException if bogus data is received * @throws MCLException if an MCL related error occurs */ private List<String> connect(final String host, final int port, final String user, final String pass, final boolean makeConnection) throws IOException, SocketException, UnknownHostException, MCLParseException, MCLException { if (ttl-- <= 0) throw new MCLException("Maximum number of redirects reached, aborting connection attempt."); if (makeConnection) { con = new Socket(host, port); con.setSoTimeout(this.soTimeout); // set nodelay, as it greatly speeds up small messages (like we often do) con.setTcpNoDelay(true); con.setKeepAlive(true); fromMonet = new BlockInputStream(con.getInputStream()); toMonet = new BlockOutputStream(con.getOutputStream()); reader = new BufferedMCLReader(fromMonet, StandardCharsets.UTF_8); writer = new BufferedMCLWriter(toMonet, StandardCharsets.UTF_8); writer.registerReader(reader); } reader.advance(); final String c = reader.getLine(); reader.discardRemainder(); writer.writeLine(getChallengeResponse(c, user, pass, language, database, hash)); // read monetdb mserver response till prompt final ArrayList<String> redirects = new ArrayList<String>(); final List<String> warns = new ArrayList<String>(); String err = "", tmp; do { reader.advance(); tmp = reader.getLine(); if (tmp == null) throw new IOException("Read from " + con.getInetAddress().getHostName() + ":" + con.getPort() + ": End of stream reached"); if (reader.getLineType() == LineType.ERROR) { err += "\n" + tmp.substring(7); } else if (reader.getLineType() == LineType.INFO) { warns.add(tmp.substring(1)); } else if (reader.getLineType() == LineType.REDIRECT) { redirects.add(tmp.substring(1)); } } while (reader.getLineType() != LineType.PROMPT); if (err.length() > 0) { close(); throw new MCLException(err); } if (!redirects.isEmpty()) { if (followRedirects) { // Ok, server wants us to go somewhere else. The list // might have multiple clues on where to go. For now we // don't support anything intelligent but trying the // first one. URI should be in form of: // "mapi:monetdb://host:port/database?arg=value&..." // or // "mapi:merovingian://proxy?arg=value&..." // note that the extra arguments must be obeyed in both // cases final String suri = redirects.get(0).toString(); if (!suri.startsWith("mapi:")) throw new MCLException("unsupported redirect: " + suri); final URI u; try { u = new URI(suri.substring(5)); } catch (java.net.URISyntaxException e) { throw new MCLParseException(e.toString()); } tmp = u.getQuery(); if (tmp != null) { final String args[] = tmp.split("&"); for (int i = 0; i < args.length; i++) { int pos = args[i].indexOf('='); if (pos > 0) { tmp = args[i].substring(0, pos); switch (tmp) { case "database": tmp = args[i].substring(pos + 1); if (!tmp.equals(database)) { warns.add("redirect points to different database: " + tmp); setDatabase(tmp); } break; case "language": tmp = args[i].substring(pos + 1); warns.add("redirect specifies use of different language: " + tmp); setLanguage(tmp); break; case "user": tmp = args[i].substring(pos + 1); if (!tmp.equals(user)) warns.add("ignoring different username '" + tmp + "' set by " + "redirect, what are the security implications?"); break; case "password": warns.add("ignoring different password set by redirect, " + "what are the security implications?"); break; default: warns.add("ignoring unknown argument '" + tmp + "' from redirect"); break; } } else { warns.add("ignoring illegal argument from redirect: " + args[i]); } } } if (u.getScheme().equals("monetdb")) { // this is a redirect to another (monetdb) server, // which means a full reconnect // avoid the debug log being closed if (debug) { debug = false; close(); debug = true; } else { close(); } tmp = u.getPath(); if (tmp != null && tmp.length() > 0) { tmp = tmp.substring(1).trim(); if (!tmp.isEmpty() && !tmp.equals(database)) { warns.add("redirect points to different database: " + tmp); setDatabase(tmp); } } final int p = u.getPort(); warns.addAll(connect(u.getHost(), p == -1 ? port : p, user, pass, true)); warns.add("Redirect by " + host + ":" + port + " to " + suri); } else if (u.getScheme().equals("merovingian")) { // reuse this connection to inline connect to the // right database that Merovingian proxies for us reader.resetLineType(); warns.addAll(connect(host, port, user, pass, false)); } else { throw new MCLException("unsupported scheme in redirect: " + suri); } } else { final StringBuilder msg = new StringBuilder("The server sent a redirect for this connection:"); for (String it : redirects) { msg.append(" [" + it + "]"); } throw new MCLException(msg.toString()); } } return warns; } /** * A little helper function that processes a challenge string, and * returns a response string for the server. If the challenge * string is null, a challengeless response is returned. * * @param chalstr the challenge string * for example: H8sRMhtevGd:mserver:9:PROT10,RIPEMD160,SHA256,SHA1,COMPRESSION_SNAPPY,COMPRESSION_LZ4:LIT:SHA512: * @param username the username to use * @param password the password to use * @param language the language to use * @param database the database to connect to * @param hash the hash method(s) to use, or NULL for all supported hashes * @return the response string for the server * @throws MCLParseException when parsing failed * @throws MCLException if an MCL related error occurs * @throws IOException when IO exception occurred */ private String getChallengeResponse( final String chalstr, String username, String password, final String language, final String database, final String hash ) throws MCLParseException, MCLException, IOException { // parse the challenge string, split it on ':' final String[] chaltok = chalstr.split(":"); if (chaltok.length <= 5) throw new MCLParseException("Server challenge string unusable! It contains too few (" + chaltok.length + ") tokens: " + chalstr); try { version = Integer.parseInt(chaltok[2]); // protocol version } catch (NumberFormatException e) { throw new MCLParseException("Protocol version (" + chaltok[2] + ") unparseable as integer."); } // handle the challenge according to the version it is switch (version) { case 9: // proto 9 is like 8, but uses a hash instead of the plain password // the server tells us (in 6th token) which hash in the // challenge after the byte-order token String algo; String pwhash = chaltok[5]; /* NOTE: Java doesn't support RIPEMD160 :( */ /* see: https://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#MessageDigest */ switch (pwhash) { case "SHA512": algo = "SHA-512"; break; case "SHA384": algo = "SHA-384"; break; case "SHA256": algo = "SHA-256"; /* NOTE: Java 7 doesn't support SHA-224. Java 8 does but we have not tested it. It is also not requested yet. */ break; case "SHA1": algo = "SHA-1"; break; default: /* Note: MD5 has been deprecated by security experts and support is removed from Oct 2020 release */ throw new MCLException("Unsupported password hash: " + pwhash); } try { final MessageDigest md = MessageDigest.getInstance(algo); md.update(password.getBytes(StandardCharsets.UTF_8)); password = toHex(md.digest()); } catch (NoSuchAlgorithmException e) { throw new MCLException("This JVM does not support password hash: " + pwhash + "\n" + e); } // proto 7 (finally) used the challenge and works with a // password hash. The supported implementations come // from the server challenge. We chose the best hash // we can find, in the order SHA512, SHA1, MD5, plain. // Also the byte-order is reported in the challenge string, // which makes sense, since only blockmode is supported. // proto 8 made this obsolete, but retained the // byte-order report for future "binary" transports. // In proto 8, the byte-order of the blocks is always little // endian because most machines today are. final String hashes = (hash == null || hash.isEmpty()) ? chaltok[3] : hash; final HashSet<String> hashesSet = new HashSet<String>(java.util.Arrays.asList(hashes.toUpperCase().split("[, ]"))); // split on comma or space // if we deal with merovingian, mask our credentials if (chaltok[1].equals("merovingian") && !language.equals("control")) { username = "merovingian"; password = "merovingian"; } // reuse variables algo and pwhash algo = null; pwhash = null; if (hashesSet.contains("SHA512")) { algo = "SHA-512"; pwhash = "{SHA512}"; } else if (hashesSet.contains("SHA384")) { algo = "SHA-384"; pwhash = "{SHA384}"; } else if (hashesSet.contains("SHA256")) { algo = "SHA-256"; pwhash = "{SHA256}"; } else if (hashesSet.contains("SHA1")) { algo = "SHA-1"; pwhash = "{SHA1}"; } else { /* Note: MD5 has been deprecated by security experts and support is removed from Oct 2020 release */ throw new MCLException("no supported hash algorithms found in " + hashes); } try { final MessageDigest md = MessageDigest.getInstance(algo); md.update(password.getBytes(StandardCharsets.UTF_8)); md.update(chaltok[0].getBytes(StandardCharsets.UTF_8)); // salt/key pwhash += toHex(md.digest()); } catch (NoSuchAlgorithmException e) { throw new MCLException("This JVM does not support password hash: " + pwhash + "\n" + e); } // TODO: some day when we need this, we should store this if (chaltok[4].equals("BIG")) { // byte-order of server is big-endian } else if (chaltok[4].equals("LIT")) { // byte-order of server is little-endian } else { throw new MCLParseException("Invalid byte-order: " + chaltok[4]); } // compose and return response String response = "BIG:" // JVM byte-order is big-endian + username + ":" + pwhash + ":" + language + ":" + (database == null ? "" : database) + ":" + "FILETRANS:"; // this capability is added in monetdb-jdbc-3.2.jre8.jar if (chaltok.length > 6) { // if supported, send handshake options for (String part : chaltok[6].split(",")) { if (part.startsWith("sql=") && handshakeOptions != null) { int level; try { level = Integer.parseInt(chaltok[6].substring(4)); } catch (NumberFormatException e) { throw new MCLParseException("Invalid handshake level: " + chaltok[6]); } boolean first = true; for (HandshakeOption<?> opt: handshakeOptions) { if (opt.getLevel() < level) { // server supports it if (first) { first = false; } else { response += ","; } response += opt.getFieldName() + "=" + opt.numericValue(); opt.setSent(true); } } break; } } // this ':' delimits the handshake options field. response += ":"; } return response; default: throw new MCLException("Unsupported protocol version: " + version); } } /** * Small helper method to convert a byte string to a hexadecimal * string representation. * * @param digest the byte array to convert * @return the byte array as hexadecimal string */ private final static String toHex(final byte[] digest) { final char[] result = new char[digest.length * 2]; int pos = 0; for (int i = 0; i < digest.length; i++) { result[pos++] = hexChar((digest[i] & 0xf0) >> 4); result[pos++] = hexChar(digest[i] & 0x0f); } return new String(result); } private final static char hexChar(final int n) { return (n > 9) ? (char) ('a' + (n - 10)) : (char) ('0' + n); } /** * Returns an InputStream that reads from this open connection on * the MapiSocket. * * @return an input stream that reads from this open connection */ public InputStream getInputStream() { return fromMonet; } /** * Returns an output stream for this MapiSocket. * * @return an output stream for writing bytes to this MapiSocket */ public OutputStream getOutputStream() { return toMonet; } /** * Returns a Reader for this MapiSocket. The Reader is a * BufferedMCLReader which does protocol interpretation of the * BlockInputStream produced by this MapiSocket. * * @return a BufferedMCLReader connected to this MapiSocket */ public BufferedMCLReader getReader() { return reader; } /** * Returns a Writer for this MapiSocket. The Writer is a * BufferedMCLWriter which produces protocol compatible data blocks * that the BlockOutputStream can properly translate into blocks. * * @return a BufferedMCLWriter connected to this MapiSocket */ public BufferedMCLWriter getWriter() { return writer; } /** * Returns the mapi protocol version used by this socket. The * protocol version depends on the server being used. Users of the * MapiSocket should check this version to act appropriately. * * @return the mapi protocol version */ public int getProtocolVersion() { return version; } /** * Enables logging to a file what is read and written from and to * the server. Logging can be enabled at any time. However, it is * encouraged to start debugging before actually connecting the * socket. * * @param filename the name of the file to write to * @throws IOException if the file could not be opened for writing */ public void debug(final String filename) throws IOException { debug(new FileWriter(filename)); } /** * Enables logging to a stream what is read and written from and to * the server. Logging can be enabled at any time. However, it is * encouraged to start debugging before actually connecting the * socket. * * @param out to write the log to a print stream * @throws IOException if the file could not be opened for writing */ // disabled as it is not used by JDBC driver code // public void debug(PrintStream out) throws IOException { // debug(new PrintWriter(out)); // } /** * Enables logging to a stream what is read and written from and to * the server. Logging can be enabled at any time. However, it is * encouraged to start debugging before actually connecting the * socket. * * @param out to write the log to */ public void debug(final Writer out) { log = out; debug = true; } /** * Get the log Writer. * * @return the log writer */ public Writer getLogWriter() { return log; } /** * Writes a logline tagged with a timestamp using the given type and message * and optionally flushes afterwards. * * Used for debugging purposes only and represents a message data that is * connected to reading (RD or RX) or writing (TD or TX) to the socket. * R=Receive, T=Transmit, D=Data, X=?? * * @param type message type: either RD, RX, TD or TX * @param message the message to log * @param flush whether we need to flush buffered data to the logfile. * @throws IOException if an IO error occurs while writing to the logfile */ private final void log(final String type, final String message, final boolean flush) throws IOException { log.write(type + System.currentTimeMillis() + ": " + message + "\n"); if (flush) log.flush(); } /** * Set the HandshakeOptions * * @param handshakeOptions the options array */ public void setHandshakeOptions(HandshakeOption<?>[] handshakeOptions) { this.handshakeOptions = handshakeOptions; } /** * For internal use * * @param b to enable/disable insert 'fake' newline and prompt * @return previous setting */ public boolean setInsertFakePrompts(boolean b) { return fromMonet.setInsertFakePrompts(b); } /** * Inner class that is used to write data on a normal stream as a * blocked stream. A call to the flush() method will write a * "final" block to the underlying stream. Non-final blocks are * written as soon as one or more bytes would not fit in the * current block any more. This allows to write to a block to it's * full size, and then flush it explicitly to have a final block * being written to the stream. */ final class BlockOutputStream extends FilterOutputStream { private int writePos = 0; private int blocksize = 0; private final byte[] block = new byte[BLOCK]; /** * Constructs this BlockOutputStream, backed by the given * OutputStream. A BufferedOutputStream is internally used. * @param out an OutputStream */ public BlockOutputStream(final OutputStream out) { // always use a buffered stream, even though we know how // much bytes to write/read, since this is just faster for // some reason super(new BufferedOutputStream(out)); } @Override public void flush() throws IOException { // write the block (as final) then flush. writeBlock(true); out.flush(); // it's a bit nasty if an exception is thrown from the log, // but ignoring it can be nasty as well, so it is decided to // let it go so there is feedback about something going wrong // it's a bit nasty if an exception is thrown from the log, // but ignoring it can be nasty as well, so it is decided to // let it go so there is feedback about something going wrong if (debug) { log.flush(); } } /** * writeBlock puts the data in the block on the stream. The * boolean last controls whether the block is sent with an * indicator to note it is the last block of a sequence or not. * * @param last whether this is the last block * @throws IOException if writing to the stream failed */ public void writeBlock(final boolean last) throws IOException { if (last) { // always fits, because of BLOCK's size blocksize = (short)writePos; // this is the last block, so encode least // significant bit in the first byte (little-endian) blklen[0] = (byte)(blocksize << 1 & 0xFF | 1); blklen[1] = (byte)(blocksize >> 7); } else { // always fits, because of BLOCK's size blocksize = (short)BLOCK; // another block will follow, encode least // significant bit in the first byte (little-endian) blklen[0] = (byte)(blocksize << 1 & 0xFF); blklen[1] = (byte)(blocksize >> 7); } out.write(blklen); // write the actual block out.write(block, 0, writePos); if (debug) { if (last) { log("TD ", "write final block: " + writePos + " bytes", false); } else { log("TD ", "write block: " + writePos + " bytes", false); } log("TX ", new String(block, 0, writePos, StandardCharsets.UTF_8), true); } writePos = 0; } @Override public void write(final int b) throws IOException { if (writePos == BLOCK) { writeBlock(false); } block[writePos++] = (byte)b; } @Override public void write(final byte[] b) throws IOException { write(b, 0, b.length); } @Override public void write(final byte[] b, int off, int len) throws IOException { while (len > 0) { int t = BLOCK - writePos; if (len > t) { System.arraycopy(b, off, block, writePos, t); off += t; len -= t; writePos += t; writeBlock(false); } else { System.arraycopy(b, off, block, writePos, len); writePos += len; break; } } } @Override public void close() throws IOException { // we don't want the flush() method to be called (default of // the FilterOutputStream), so we close manually here out.close(); } } /** * Inner class that is used to make the data on the blocked stream * available as a normal stream. */ final class BlockInputStream extends FilterInputStream { private int readPos = 0; private int blockLen = 0; private boolean wasEndBlock = false; private final byte[] block = new byte[BLOCK + 3]; // \n.\n private boolean insertFakePrompts = true; /** * Constructs this BlockInputStream, backed by the given * InputStream. A BufferedInputStream is internally used. * @param in an InputStream */ public BlockInputStream(final InputStream in) { // always use a buffered stream, even though we know how // much bytes to write/read, since this is just faster for // some reason super(new BufferedInputStream(in)); } public boolean setInsertFakePrompts(boolean doFake) { boolean old = insertFakePrompts; insertFakePrompts = doFake; return old; } @Override public int available() { return blockLen - readPos; } @Override public boolean markSupported() { return false; } @Override public void mark(final int readlimit) { throw new AssertionError("Not implemented!"); } @Override public void reset() { throw new AssertionError("Not implemented!"); } /** * Small wrapper to get a blocking variant of the read() method * on the BufferedInputStream. We want to benefit from the * Buffered pre-fetching, but not dealing with half blocks. * Changing this class to be able to use the partially received * data will greatly complicate matters, while a performance * improvement is debatable given the relatively small size of * our blocks. Maybe it does speed up on slower links, then * consider this method a quick bug fix/workaround. * * @param b a byte array to store read bytes * @param len number of bytes to read * @return false if reading the block failed due to EOF * @throws IOException if an IO error occurs while reading */ private boolean _read(final byte[] b, int len) throws IOException { int s; int off = 0; while (len > 0) { s = in.read(b, off, len); if (s == -1) { // if we have read something before, we should have been // able to read the whole, so make this fatal if (off > 0) { if (debug) { log("RD ", "the following incomplete block was received:", false); log("RX ", new String(b, 0, off, StandardCharsets.UTF_8), true); } throw new IOException("Read from " + con.getInetAddress().getHostName() + ":" + con.getPort() + ": Incomplete block read from stream"); } if (debug) log("RD ", "server closed the connection (EOF)", true); return false; } len -= s; off += s; } return true; } /** * Reads the next block on the stream into the internal buffer, * or writes the prompt in the buffer. * * The blocked stream protocol consists of first a two byte * integer indicating the length of the block, then the * block, followed by another length + block. The end of * such sequence is put in the last bit of the length, and * hence this length should be shifted to the right to * obtain the real length value first. We simply fetch * blocks here as soon as they are needed for the stream's * read methods. * * The user-flush, which is an implicit effect of the end of * a block sequence, is communicated beyond the stream by * inserting a prompt sequence on the stream after the last * block. This method makes sure that a final block ends with a * newline, if it doesn't already, in order to facilitate a * Reader that is possibly chained to this InputStream. * * If the stream is not positioned correctly, hell will break * loose. * * @return blockLen * @throws IOException if an IO error occurs while reading */ private int readBlock() throws IOException { // read next two bytes (short) if (!_read(blklen, 2)) return(-1); // Get the short-value and store its value in blockLen. blockLen = (short)( (blklen[0] & 0xFF) >> 1 | (blklen[1] & 0xFF) << 7 ); wasEndBlock = (blklen[0] & 0x1) == 1; readPos = 0; if (debug) { if (wasEndBlock) { log("RD ", "read final block: " + blockLen + " bytes", false); } else { log("RD ", "read new block: " + blockLen + " bytes", false); } } // sanity check to avoid bad servers make us do an ugly // stack trace if (blockLen > block.length) throw new IOException("Server sent a block larger than BLOCKsize: " + blockLen + " > " + block.length); if (!_read(block, blockLen)) return -1; if (debug) log("RX ", new String(block, 0, blockLen, StandardCharsets.UTF_8), true); // if this is the last block, make it end with a newline and prompt if (wasEndBlock) { // insert 'fake' newline and prompt if (insertFakePrompts) { if (blockLen > 0 && block[blockLen - 1] != '\n') { // to terminate the block in a Reader block[blockLen++] = '\n'; } for (byte b : LineType.PROMPT.bytes()) { block[blockLen++] = b; } block[blockLen++] = '\n'; if (debug) { log("RD ", "inserting prompt", true); } } } return blockLen; } @Override public int read() throws IOException { if (available() == 0) { if (readBlock() == -1) return -1; } if (debug) log("RX ", new String(block, readPos, 1, StandardCharsets.UTF_8), true); return (int)block[readPos++]; } @Override public int read(final byte[] b) throws IOException { return read(b, 0, b.length); } @Override public int read(final byte[] b, int off, int len) throws IOException { int t; int size = 0; while (size < len) { t = available(); if (t == 0) { if (size != 0) break; if (readBlock() == -1) { if (size == 0) size = -1; break; } t = available(); } if (len > t) { System.arraycopy(block, readPos, b, off, t); off += t; len -= t; readPos += t; size += t; } else { System.arraycopy(block, readPos, b, off, len); readPos += len; size += len; break; } } return size; } @Override public long skip(final long n) throws IOException { long skip = n; while (skip > 0) { int t = available(); if (skip > t) { skip -= t; readPos += t; readBlock(); } else { readPos += (int)skip; break; } } return n; } /** * For internal use * @return new Raw object */ Raw getRaw() { return new Raw(); } /** An alternative I/O interface that exposes the block based nature of the MAPI protocol */ final class Raw { byte[] getBytes() { return block; } int getLength() { return blockLen; } int getPosition() { return readPos; } int consume(int delta) { int pos = readPos; readPos += delta; return pos; } int readBlock() throws IOException { boolean wasFaking = setInsertFakePrompts(false); try { return BlockInputStream.this.readBlock(); } finally { setInsertFakePrompts(wasFaking); } } boolean wasEndBlock() { return wasEndBlock; } } } /** * Closes the streams and socket connected to the server if possible. * If an error occurs at closing a resource, it is ignored so as many * resources as possible are closed. */ public synchronized void close() { if (writer != null) { try { writer.close(); writer = null; } catch (IOException e) { /* ignore it */ } } if (reader != null) { try { reader.close(); reader = null; } catch (IOException e) { /* ignore it */ } } if (toMonet != null) { try { toMonet.close(); toMonet = null; } catch (IOException e) { /* ignore it */ } } if (fromMonet != null) { try { fromMonet.close(); fromMonet = null; } catch (IOException e) { /* ignore it */ } } if (con != null) { try { con.close(); // close the socket con = null; } catch (IOException e) { /* ignore it */ } } if (debug && log != null && log instanceof FileWriter) { try { log.close(); log = null; } catch (IOException e) { /* ignore it */ } } } /** * Return an UploadStream for use with for example COPY FROM filename ON CLIENT. * * Building block for {@link org.monetdb.jdbc.MonetConnection.UploadHandler}. * * @param chunkSize chunk size for the upload stream * @return UploadStream new upload stream with the given chunk size */ public UploadStream uploadStream(int chunkSize) { return new UploadStream(chunkSize); } /** * Return an UploadStream for use with for example COPY FROM filename ON CLIENT. * * Building block for {@link org.monetdb.jdbc.MonetConnection.UploadHandler}. * * @return UploadStream new upload stream */ public UploadStream uploadStream() { return new UploadStream(); } /** * Return a DownloadStream for use with for example COPY INTO filename ON CLIENT * * Building block for {@link org.monetdb.jdbc.MonetConnection.DownloadHandler}. * * @param prependCr convert \n to \r\n * @return DownloadStream new download stream */ public DownloadStream downloadStream(boolean prependCr) { return new DownloadStream(fromMonet.getRaw(), toMonet, prependCr); } /** * Stream of data sent to the server * * Building block for {@link org.monetdb.jdbc.MonetConnection.UploadHandler}. * * An UploadStream has a chunk size. Every chunk size bytes, the server gets * the opportunity to abort the upload. */ public class UploadStream extends FilterOutputStream { public final static int DEFAULT_CHUNK_SIZE = 1024 * 1024; private final int chunkSize; private boolean closed = false; private boolean serverCancelled = false; private int chunkLeft; private byte[] promptBuffer; private Runnable cancellationCallback = null; /** * Create an UploadStream with the given chunk size * @param chunkSize chunk size for the upload stream */ UploadStream(final int chunkSize) { super(toMonet); if (chunkSize <= 0) { throw new IllegalArgumentException("chunk size must be positive"); } this.chunkSize = chunkSize; assert LineType.MORE.bytes().length == LineType.FILETRANSFER.bytes().length; int promptLen = LineType.MORE.bytes().length; promptBuffer = new byte[promptLen + 1]; chunkLeft = this.chunkSize; } /** Create an UploadStream with the default chunk size */ UploadStream() { this(DEFAULT_CHUNK_SIZE); } /** Set a callback to be invoked if the server cancels the upload * * @param cancellationCallback callback to call */ public void setCancellationCallback(final Runnable cancellationCallback) { this.cancellationCallback = cancellationCallback; } @Override public void write(final int b) throws IOException { if (serverCancelled) { // We have already thrown an exception and apparently that has been ignored. // Probably because they're calling print methods instead of write. // Throw another one, maybe they'll catch this one. throw new IOException("Server aborted the upload"); } handleChunking(); out.write(b); wrote(1); } @Override public void write(final byte[] b) throws IOException { this.write(b, 0, b.length); } @Override public void write(final byte[] b, int off, int len) throws IOException { if (serverCancelled) { // We have already thrown an exception and apparently that has been ignored. // Probably because they're calling print methods instead of write. // Throw another one, maybe they'll catch this one. throw new IOException("Server aborted the upload"); } while (len > 0) { handleChunking(); int toWrite = Integer.min(len, chunkLeft); out.write(b, off, toWrite); off += toWrite; len -= toWrite; wrote(toWrite); } } @Override public void flush() throws IOException { // suppress flushes } @Override public void close() throws IOException { if (closed) { return; } closed = true; if (serverCancelled) closeAfterServerCancelled(); else closeAfterSuccesfulUpload(); } private void closeAfterSuccesfulUpload() throws IOException { if (chunkLeft != chunkSize) { // flush pending data flushAndReadPrompt(); } // send empty block out.flush(); final LineType acknowledgement = readPrompt(); if (acknowledgement != LineType.FILETRANSFER) { throw new IOException("Expected server to acknowledge end of file"); } } private void closeAfterServerCancelled() { // nothing to do here, we have already read the error prompt. } private void wrote(final int i) { chunkLeft -= i; } private void handleChunking() throws IOException { if (chunkLeft > 0) { return; } flushAndReadPrompt(); } private void flushAndReadPrompt() throws IOException { out.flush(); chunkLeft = chunkSize; final LineType lineType = readPrompt(); switch (lineType) { case MORE: return; case FILETRANSFER: // Note, if the caller is calling print methods instead of write, the IO exception gets hidden. // This is unfortunate but there's nothing we can do about it. serverCancelled = true; if (cancellationCallback != null) { cancellationCallback.run(); } throw new IOException("Server aborted the upload"); default: throw new IOException("Expected MORE/DONE from server, got " + lineType); } } private LineType readPrompt() throws IOException { final int nread = fromMonet.read(promptBuffer); if (nread != promptBuffer.length || promptBuffer[promptBuffer.length - 1] != '\n') { throw new IOException("server return incomplete prompt"); } return LineType.classify(promptBuffer); } } /** * Stream of data received from the server * * Building block for {@link org.monetdb.jdbc.MonetConnection.DownloadHandler}. */ public static class DownloadStream extends InputStream { private final BlockInputStream.Raw rawIn; private final OutputStream out; private final boolean prependCr; private boolean endBlockSeen = false; private boolean closed = false; private boolean newlinePending = false; // used for crlf conversion DownloadStream(BlockInputStream.Raw rawIn, OutputStream out, boolean prependCr) { this.rawIn = rawIn; this.out = out; this.prependCr = prependCr; } void nextBlock() throws IOException { if (endBlockSeen || closed) return; final int ret = rawIn.readBlock(); if (ret < 0 || rawIn.wasEndBlock()) { endBlockSeen = true; } } @Override public void close() throws IOException { if (closed) return; closed = true; while (!endBlockSeen) { nextBlock(); } // Send acknowledgement to server out.write('\n'); out.flush(); // Do whatever super has to do super.close(); } @Override public int read() throws IOException { final byte[] buf = { 0 }; final int nread = read(buf, 0, 1); if (nread == 1) return buf[0]; else return -1; } @Override public int read(final byte[] dest, int off, int len) throws IOException { final int origOff = off; int end = off + len; while (off < end) { // minimum of what's requested and what we have in stock int chunk = Integer.min(end - off, rawIn.getLength() - rawIn.getPosition()); assert chunk >= 0; if (chunk == 0) { // make progress by fetching more data if (endBlockSeen) break; nextBlock(); continue; } // make progress copying some bytes if (!prependCr) { // no conversion needed, use arraycopy System.arraycopy(rawIn.getBytes(), rawIn.getPosition(), dest, off, chunk); off += chunk; rawIn.consume(chunk); } else { int chunkEnd = off + chunk; if (newlinePending && off < chunkEnd) { // we were in the middle of a line ending conversion dest[off++] = '\n'; newlinePending = false; } while (off < chunkEnd) { byte b = rawIn.getBytes()[rawIn.consume(1)]; if (b != '\n') { dest[off++] = b; } else if (chunkEnd - off >= 2) { dest[off++] = '\r'; dest[off++] = '\n'; } else { dest[off++] = '\r'; newlinePending = true; break; } } } } if (off < end && newlinePending) { dest[off++] = '\n'; newlinePending = false; } if (off == origOff && endBlockSeen) return -1; else return off - origOff; } } }