Mercurial > hg > monetdb-java
view src/main/java/org/monetdb/mcl/net/MapiSocket.java @ 616:65641a7cea31
Implement line ending conversion for downloads
MonetConnection.Download#getStream returns an InputStream which
converts line endings when in text mode.
The default line ending is the platform line ending but that can be
changed. Setting it to \n can be a useful optimization if you don't
need the \r's anyway.
author | Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com> |
---|---|
date | Wed, 19 Jan 2022 14:58:01 +0100 (2022-01-19) |
parents | 1d44b8a577ca |
children | 06d69b82d409 |
line wrap: on
line source
/* * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * * Copyright 1997 - July 2008 CWI, August 2008 - 2022 MonetDB B.V. */ package org.monetdb.mcl.net; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.FileWriter; import java.io.FilterInputStream; import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.io.Writer; import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; import java.net.URI; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import org.monetdb.mcl.MCLException; import org.monetdb.mcl.io.BufferedMCLReader; import org.monetdb.mcl.io.BufferedMCLWriter; import org.monetdb.mcl.io.LineType; import org.monetdb.mcl.parser.MCLParseException; /** * A Socket for communicating with the MonetDB database in MAPI block * mode. * * The MapiSocket implements the protocol specifics of the MAPI block * mode protocol, and interfaces it as a socket that delivers a * BufferedReader and a BufferedWriter. Because logging in is an * integral part of the MAPI protocol, the MapiSocket performs the login * procedure. Like the Socket class, various options can be set before * calling the connect() method to influence the login process. Only * after a successful call to connect() the BufferedReader and * BufferedWriter can be retrieved. * * For each line read, it is determined what type of line it is * according to the MonetDB MAPI protocol. This results in a line to be * PROMPT, HEADER, RESULT, ERROR or UNKNOWN. Use the getLineType() * method on the BufferedMCLReader to retrieve the type of the last * line read. * * For debugging purposes a socket level debugging is implemented where * each and every interaction to and from the MonetDB server is logged * to a file on disk. * Incoming messages are prefixed by "RX" (received by the driver), * outgoing messages by "TX" (transmitted by the driver). Special * decoded non-human readable messages are prefixed with "RD" and "TD" * instead. Following this two char prefix, a timestamp follows as the * number of milliseconds since the UNIX epoch. The rest of the line is * a String representation of the data sent or received. * * The general use of this Socket must be seen only in the full context * of a MAPI connection to a server. It has the same ingredients as a * normal Socket, allowing for seamless plugging. * <pre> * Socket \ / InputStream ----> (BufferedMCL)Reader * > o < * MapiSocket / \ OutputStream ----> (BufferedMCL)Writer * </pre> * The MapiSocket allows to retrieve Streams for communicating. They * are interfaced, so they can be chained in any way. While the Socket * transparently deals with how data is sent over the wire, the actual * data read needs to be interpreted, for which a Reader/Writer * interface is most sufficient. In particular the BufferedMCL* * implementations of those interfaces supply some extra functionality * geared towards the format of the data. * * @author Fabian Groffen * @version 4.2 * @see org.monetdb.mcl.io.BufferedMCLReader * @see org.monetdb.mcl.io.BufferedMCLWriter */ public class MapiSocket { /* cannot (yet) be final as nl.cwi.monetdb.mcl.net.MapiSocket extends this class */ /** The TCP Socket to mserver */ private Socket con; /** The TCP Socket timeout in milliseconds. Default is 0 meaning the timeout is disabled (i.e., timeout of infinity) */ private int soTimeout = 0; /** Stream from the Socket for reading */ private BlockInputStream fromMonet; /** Stream from the Socket for writing */ private OutputStream toMonet; /** MCLReader on the InputStream */ private BufferedMCLReader reader; /** MCLWriter on the OutputStream */ private BufferedMCLWriter writer; /** protocol version of the connection */ private int version; /** The database to connect to */ private String database = null; /** The language to connect with */ private String language = "sql"; /** The hash methods to use (null = default) */ private String hash = null; /** Whether we should follow redirects */ private boolean followRedirects = true; /** How many redirections do we follow until we're fed up with it? */ private int ttl = 10; /** Whether we are debugging or not */ private boolean debug = false; /** The Writer for the debug log-file */ private Writer log; /** The blocksize (hardcoded in compliance with MonetDB common/stream/stream.h) */ public final static int BLOCK = 8 * 1024 - 2; /** A short in two bytes for holding the block size in bytes */ private final byte[] blklen = new byte[2]; /** Options that can be sent during the auth handshake if the server supports it */ private HandshakeOptions handshakeOptions; /** * Constructs a new MapiSocket. */ public MapiSocket() { con = null; } /** * Sets the database to connect to. If database is null, a * connection is made to the default database of the server. This * is also the default. * * @param db the database */ public void setDatabase(final String db) { this.database = db; } /** * Sets the language to use for this connection. * * @param lang the language */ public void setLanguage(final String lang) { this.language = lang; } /** * Sets the hash method to use. Note that this method is intended * for debugging purposes. Setting a hash method can yield in * connection failures. Multiple hash methods can be given by * separating the hashes by commas. * DON'T USE THIS METHOD if you don't know what you're doing. * * @param hash the hash method to use */ public void setHash(final String hash) { this.hash = hash; } /** * Sets whether MCL redirections should be followed or not. If set * to false, an MCLException will be thrown when a redirect is * encountered during connect. The default bahaviour is to * automatically follow redirects. * * @param r whether to follow redirects (true) or not (false) */ public void setFollowRedirects(final boolean r) { this.followRedirects = r; } /** * Sets the number of redirects that are followed when * followRedirects is true. In order to avoid going into an endless * loop due to some evil server, or another error, a maximum number * of redirects that may be followed can be set here. Note that to * disable the following of redirects you should use * setFollowRedirects. * * @see #setFollowRedirects(boolean r) * @param t the number of redirects before an exception is thrown */ public void setTTL(final int t) { this.ttl = t; } /** * Set the SO_TIMEOUT on the underlying Socket. When for some * reason the connection to the database hangs, this setting can be * useful to break out of this indefinite wait. * This option must be enabled prior to entering the blocking * operation to have effect. * * @param s The specified timeout, in milliseconds. A timeout * of zero will disable timeout (i.e., timeout of infinity). * @throws SocketException Issue with the socket */ public void setSoTimeout(final int s) throws SocketException { if (s < 0) { throw new IllegalArgumentException("timeout can't be negative"); } this.soTimeout = s; // limit time to wait on blocking operations if (con != null) { con.setSoTimeout(s); } } /** * Gets the SO_TIMEOUT from the underlying Socket. * * @return the currently in use timeout in milliseconds * @throws SocketException Issue with the socket */ public int getSoTimeout() throws SocketException { if (con != null) { this.soTimeout = con.getSoTimeout(); } return this.soTimeout; } /** * Enables/disables debug mode with logging to file * * @param debug Value to set */ public void setDebug(final boolean debug) { this.debug = debug; } /** * Connects to the given host and port, logging in as the given * user. If followRedirect is false, a RedirectionException is * thrown when a redirect is encountered. * * @param host the hostname, or null for the loopback address * @param port the port number (must be between 0 and 65535, inclusive) * @param user the username * @param pass the password * @return A List with informational (warning) messages. If this * list is empty; then there are no warnings. * @throws IOException if an I/O error occurs when creating the socket * @throws SocketException - if there is an error in the underlying protocol, such as a TCP error. * @throws UnknownHostException if the IP address of the host could not be determined * @throws MCLParseException if bogus data is received * @throws MCLException if an MCL related error occurs */ public List<String> connect(final String host, final int port, final String user, final String pass) throws IOException, SocketException, UnknownHostException, MCLParseException, MCLException { // Wrap around the internal connect that needs to know if it // should really make a TCP connection or not. return connect(host, port, user, pass, true); } private List<String> connect(final String host, final int port, final String user, final String pass, final boolean makeConnection) throws IOException, SocketException, UnknownHostException, MCLParseException, MCLException { if (ttl-- <= 0) throw new MCLException("Maximum number of redirects reached, aborting connection attempt."); if (makeConnection) { con = new Socket(host, port); con.setSoTimeout(this.soTimeout); // set nodelay, as it greatly speeds up small messages (like we often do) con.setTcpNoDelay(true); con.setKeepAlive(true); fromMonet = new BlockInputStream(con.getInputStream()); toMonet = new BlockOutputStream(con.getOutputStream()); try { reader = new BufferedMCLReader(fromMonet, "UTF-8"); writer = new BufferedMCLWriter(toMonet, "UTF-8"); writer.registerReader(reader); } catch (UnsupportedEncodingException e) { throw new MCLException(e.toString()); } } final String c = reader.readLine(); reader.waitForPrompt(); writer.writeLine(getChallengeResponse(c, user, pass, language, database, hash)); // read monetdb mserver response till prompt final ArrayList<String> redirects = new ArrayList<String>(); final List<String> warns = new ArrayList<String>(); String err = "", tmp; LineType lineType; do { tmp = reader.readLine(); if (tmp == null) throw new IOException("Read from " + con.getInetAddress().getHostName() + ":" + con.getPort() + ": End of stream reached"); lineType = reader.getLineType(); if (lineType == LineType.ERROR) { err += "\n" + tmp.substring(7); } else if (lineType == LineType.INFO) { warns.add(tmp.substring(1)); } else if (lineType == LineType.REDIRECT) { redirects.add(tmp.substring(1)); } } while (lineType != LineType.PROMPT); if (err.length() > 0) { close(); throw new MCLException(err); } if (!redirects.isEmpty()) { if (followRedirects) { // Ok, server wants us to go somewhere else. The list // might have multiple clues on where to go. For now we // don't support anything intelligent but trying the // first one. URI should be in form of: // "mapi:monetdb://host:port/database?arg=value&..." // or // "mapi:merovingian://proxy?arg=value&..." // note that the extra arguments must be obeyed in both // cases final String suri = redirects.get(0).toString(); if (!suri.startsWith("mapi:")) throw new MCLException("unsupported redirect: " + suri); final URI u; try { u = new URI(suri.substring(5)); } catch (java.net.URISyntaxException e) { throw new MCLParseException(e.toString()); } tmp = u.getQuery(); if (tmp != null) { final String args[] = tmp.split("&"); for (int i = 0; i < args.length; i++) { int pos = args[i].indexOf("="); if (pos > 0) { tmp = args[i].substring(0, pos); switch (tmp) { case "database": tmp = args[i].substring(pos + 1); if (!tmp.equals(database)) { warns.add("redirect points to different database: " + tmp); setDatabase(tmp); } break; case "language": tmp = args[i].substring(pos + 1); warns.add("redirect specifies use of different language: " + tmp); setLanguage(tmp); break; case "user": tmp = args[i].substring(pos + 1); if (!tmp.equals(user)) warns.add("ignoring different username '" + tmp + "' set by " + "redirect, what are the security implications?"); break; case "password": warns.add("ignoring different password set by redirect, " + "what are the security implications?"); break; default: warns.add("ignoring unknown argument '" + tmp + "' from redirect"); break; } } else { warns.add("ignoring illegal argument from redirect: " + args[i]); } } } if (u.getScheme().equals("monetdb")) { // this is a redirect to another (monetdb) server, // which means a full reconnect // avoid the debug log being closed if (debug) { debug = false; close(); debug = true; } else { close(); } tmp = u.getPath(); if (tmp != null && tmp.length() > 0) { tmp = tmp.substring(1).trim(); if (!tmp.isEmpty() && !tmp.equals(database)) { warns.add("redirect points to different database: " + tmp); setDatabase(tmp); } } final int p = u.getPort(); warns.addAll(connect(u.getHost(), p == -1 ? port : p, user, pass, true)); warns.add("Redirect by " + host + ":" + port + " to " + suri); } else if (u.getScheme().equals("merovingian")) { // reuse this connection to inline connect to the // right database that Merovingian proxies for us warns.addAll(connect(host, port, user, pass, false)); } else { throw new MCLException("unsupported scheme in redirect: " + suri); } } else { final StringBuilder msg = new StringBuilder("The server sent a redirect for this connection:"); for (String it : redirects) { msg.append(" [" + it + "]"); } throw new MCLException(msg.toString()); } } return warns; } /** * A little helper function that processes a challenge string, and * returns a response string for the server. If the challenge * string is null, a challengeless response is returned. * * @param chalstr the challenge string * for example: H8sRMhtevGd:mserver:9:PROT10,RIPEMD160,SHA256,SHA1,COMPRESSION_SNAPPY,COMPRESSION_LZ4:LIT:SHA512: * @param username the username to use * @param password the password to use * @param language the language to use * @param database the database to connect to * @param hash the hash method(s) to use, or NULL for all supported hashes * @return the response string for the server */ private String getChallengeResponse( final String chalstr, String username, String password, final String language, final String database, final String hash ) throws MCLParseException, MCLException, IOException { // parse the challenge string, split it on ':' final String[] chaltok = chalstr.split(":"); if (chaltok.length <= 5) throw new MCLParseException("Server challenge string unusable! It contains too few (" + chaltok.length + ") tokens: " + chalstr); try { version = Integer.parseInt(chaltok[2]); // protocol version } catch (NumberFormatException e) { throw new MCLParseException("Protocol version (" + chaltok[2] + ") unparseable as integer."); } // handle the challenge according to the version it is switch (version) { case 9: // proto 9 is like 8, but uses a hash instead of the plain password // the server tells us (in 6th token) which hash in the // challenge after the byte-order token String algo; String pwhash = chaltok[5]; /* NOTE: Java doesn't support RIPEMD160 :( */ /* see: https://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#MessageDigest */ switch (pwhash) { case "SHA512": algo = "SHA-512"; break; case "SHA384": algo = "SHA-384"; break; case "SHA256": algo = "SHA-256"; /* NOTE: Java 7 doesn't support SHA-224. Java 8 does but we have not tested it. It is also not requested yet. */ break; case "SHA1": algo = "SHA-1"; break; default: /* Note: MD5 has been deprecated by security experts and support is removed from Oct 2020 release */ throw new MCLException("Unsupported password hash: " + pwhash); } try { final MessageDigest md = MessageDigest.getInstance(algo); md.update(password.getBytes(StandardCharsets.UTF_8)); password = toHex(md.digest()); } catch (NoSuchAlgorithmException e) { throw new MCLException("This JVM does not support password hash: " + pwhash + "\n" + e); } // proto 7 (finally) used the challenge and works with a // password hash. The supported implementations come // from the server challenge. We chose the best hash // we can find, in the order SHA512, SHA1, MD5, plain. // Also the byte-order is reported in the challenge string, // which makes sense, since only blockmode is supported. // proto 8 made this obsolete, but retained the // byte-order report for future "binary" transports. // In proto 8, the byte-order of the blocks is always little // endian because most machines today are. final String hashes = (hash == null || hash.isEmpty()) ? chaltok[3] : hash; final HashSet<String> hashesSet = new HashSet<String>(java.util.Arrays.asList(hashes.toUpperCase().split("[, ]"))); // split on comma or space // if we deal with merovingian, mask our credentials if (chaltok[1].equals("merovingian") && !language.equals("control")) { username = "merovingian"; password = "merovingian"; } // reuse variables algo and pwhash algo = null; pwhash = null; if (hashesSet.contains("SHA512")) { algo = "SHA-512"; pwhash = "{SHA512}"; } else if (hashesSet.contains("SHA384")) { algo = "SHA-384"; pwhash = "{SHA384}"; } else if (hashesSet.contains("SHA256")) { algo = "SHA-256"; pwhash = "{SHA256}"; } else if (hashesSet.contains("SHA1")) { algo = "SHA-1"; pwhash = "{SHA1}"; } else { /* Note: MD5 has been deprecated by security experts and support is removed from Oct 2020 release */ throw new MCLException("no supported hash algorithms found in " + hashes); } try { final MessageDigest md = MessageDigest.getInstance(algo); md.update(password.getBytes(StandardCharsets.UTF_8)); md.update(chaltok[0].getBytes(StandardCharsets.UTF_8)); // salt/key pwhash += toHex(md.digest()); } catch (NoSuchAlgorithmException e) { throw new MCLException("This JVM does not support password hash: " + pwhash + "\n" + e); } // TODO: some day when we need this, we should store this if (chaltok[4].equals("BIG")) { // byte-order of server is big-endian } else if (chaltok[4].equals("LIT")) { // byte-order of server is little-endian } else { throw new MCLParseException("Invalid byte-order: " + chaltok[4]); } // compose and return response String response = "BIG:" // JVM byte-order is big-endian + username + ":" + pwhash + ":" + language + ":" + (database == null ? "" : database) + ":" + "FILETRANS:"; // this capability is added in monetdb-jdbc-3.2.jre8.jar if (chaltok.length > 6) { // if supported, send handshake options for (String part : chaltok[6].split(",")) { if (part.startsWith("sql=") && handshakeOptions != null) { int level; try { level = Integer.parseInt(chaltok[6].substring(4)); } catch (NumberFormatException e) { throw new MCLParseException("Invalid handshake level: " + chaltok[6]); } response += handshakeOptions.formatResponse(level); break; } } // this ':' delimits the handshake options field. response += ":"; } return response; default: throw new MCLException("Unsupported protocol version: " + version); } } /** * Small helper method to convert a byte string to a hexadecimal * string representation. * * @param digest the byte array to convert * @return the byte array as hexadecimal string */ private final static String toHex(final byte[] digest) { final char[] result = new char[digest.length * 2]; int pos = 0; for (int i = 0; i < digest.length; i++) { result[pos++] = hexChar((digest[i] & 0xf0) >> 4); result[pos++] = hexChar(digest[i] & 0x0f); } return new String(result); } private final static char hexChar(final int n) { return (n > 9) ? (char) ('a' + (n - 10)) : (char) ('0' + n); } /** * Returns an InputStream that reads from this open connection on * the MapiSocket. * * @return an input stream that reads from this open connection */ public InputStream getInputStream() { return fromMonet; } /** * Returns an output stream for this MapiSocket. * * @return an output stream for writing bytes to this MapiSocket */ public OutputStream getOutputStream() { return toMonet; } /** * Returns a Reader for this MapiSocket. The Reader is a * BufferedMCLReader which does protocol interpretation of the * BlockInputStream produced by this MapiSocket. * * @return a BufferedMCLReader connected to this MapiSocket */ public BufferedMCLReader getReader() { return reader; } /** * Returns a Writer for this MapiSocket. The Writer is a * BufferedMCLWriter which produces protocol compatible data blocks * that the BlockOutputStream can properly translate into blocks. * * @return a BufferedMCLWriter connected to this MapiSocket */ public BufferedMCLWriter getWriter() { return writer; } /** * Returns the mapi protocol version used by this socket. The * protocol version depends on the server being used. Users of the * MapiSocket should check this version to act appropriately. * * @return the mapi protocol version */ public int getProtocolVersion() { return version; } /** * Enables logging to a file what is read and written from and to * the server. Logging can be enabled at any time. However, it is * encouraged to start debugging before actually connecting the * socket. * * @param filename the name of the file to write to * @throws IOException if the file could not be opened for writing */ public void debug(final String filename) throws IOException { debug(new FileWriter(filename)); } /** * Enables logging to a stream what is read and written from and to * the server. Logging can be enabled at any time. However, it is * encouraged to start debugging before actually connecting the * socket. * * @param out to write the log to a print stream * @throws IOException if the file could not be opened for writing */ // disabled as it is not used by JDBC driver code // public void debug(PrintStream out) throws IOException { // debug(new PrintWriter(out)); // } /** * Enables logging to a stream what is read and written from and to * the server. Logging can be enabled at any time. However, it is * encouraged to start debugging before actually connecting the * socket. * * @param out to write the log to */ public void debug(final Writer out) { log = out; debug = true; } /** * Get the log Writer. * * @return the log writer */ public Writer getLogWriter() { return log; } /** * Writes a logline tagged with a timestamp using the given type and message * and optionally flushes afterwards. * * Used for debugging purposes only and represents a message data that is * connected to reading (RD or RX) or writing (TD or TX) to the socket. * R=Receive, T=Transmit, D=Data, X=?? * * @param type message type: either RD, RX, TD or TX * @param message the message to log * @param flush whether we need to flush buffered data to the logfile. * @throws IOException if an IO error occurs while writing to the logfile */ private final void log(final String type, final String message, final boolean flush) throws IOException { log.write(type + System.currentTimeMillis() + ": " + message + "\n"); if (flush) log.flush(); } public void setHandshakeOptions(HandshakeOptions handshakeOptions) { this.handshakeOptions = handshakeOptions; } public HandshakeOptions getHandshakeOptions() { return handshakeOptions; } /** * For internal use */ public boolean setInsertFakePrompts(boolean b) { return fromMonet.setInsertFakePrompts(b); } /** * Inner class that is used to write data on a normal stream as a * blocked stream. A call to the flush() method will write a * "final" block to the underlying stream. Non-final blocks are * written as soon as one or more bytes would not fit in the * current block any more. This allows to write to a block to it's * full size, and then flush it explicitly to have a final block * being written to the stream. */ final class BlockOutputStream extends FilterOutputStream { private int writePos = 0; private int blocksize = 0; private final byte[] block = new byte[BLOCK]; /** * Constructs this BlockOutputStream, backed by the given * OutputStream. A BufferedOutputStream is internally used. */ public BlockOutputStream(final OutputStream out) { // always use a buffered stream, even though we know how // much bytes to write/read, since this is just faster for // some reason super(new BufferedOutputStream(out)); } @Override public void flush() throws IOException { // write the block (as final) then flush. writeBlock(true); out.flush(); // it's a bit nasty if an exception is thrown from the log, // but ignoring it can be nasty as well, so it is decided to // let it go so there is feedback about something going wrong // it's a bit nasty if an exception is thrown from the log, // but ignoring it can be nasty as well, so it is decided to // let it go so there is feedback about something going wrong if (debug) { log.flush(); } } /** * writeBlock puts the data in the block on the stream. The * boolean last controls whether the block is sent with an * indicator to note it is the last block of a sequence or not. * * @param last whether this is the last block * @throws IOException if writing to the stream failed */ public void writeBlock(final boolean last) throws IOException { if (last) { // always fits, because of BLOCK's size blocksize = (short)writePos; // this is the last block, so encode least // significant bit in the first byte (little-endian) blklen[0] = (byte)(blocksize << 1 & 0xFF | 1); blklen[1] = (byte)(blocksize >> 7); } else { // always fits, because of BLOCK's size blocksize = (short)BLOCK; // another block will follow, encode least // significant bit in the first byte (little-endian) blklen[0] = (byte)(blocksize << 1 & 0xFF); blklen[1] = (byte)(blocksize >> 7); } out.write(blklen); // write the actual block out.write(block, 0, writePos); if (debug) { if (last) { log("TD ", "write final block: " + writePos + " bytes", false); } else { log("TD ", "write block: " + writePos + " bytes", false); } log("TX ", new String(block, 0, writePos, StandardCharsets.UTF_8), true); } writePos = 0; } @Override public void write(final int b) throws IOException { if (writePos == BLOCK) { writeBlock(false); } block[writePos++] = (byte)b; } @Override public void write(final byte[] b) throws IOException { write(b, 0, b.length); } @Override public void write(final byte[] b, int off, int len) throws IOException { while (len > 0) { int t = BLOCK - writePos; if (len > t) { System.arraycopy(b, off, block, writePos, t); off += t; len -= t; writePos += t; writeBlock(false); } else { System.arraycopy(b, off, block, writePos, len); writePos += len; break; } } } @Override public void close() throws IOException { // we don't want the flush() method to be called (default of // the FilterOutputStream), so we close manually here out.close(); } } /** * Inner class that is used to make the data on the blocked stream * available as a normal stream. */ final class BlockInputStream extends FilterInputStream { private int readPos = 0; private int blockLen = 0; private boolean wasEndBlock = false; private final byte[] block = new byte[BLOCK + 3]; // \n.\n private boolean insertFakePrompts = true; /** * Constructs this BlockInputStream, backed by the given * InputStream. A BufferedInputStream is internally used. */ public BlockInputStream(final InputStream in) { // always use a buffered stream, even though we know how // much bytes to write/read, since this is just faster for // some reason super(new BufferedInputStream(in)); } public boolean setInsertFakePrompts(boolean doFake) { boolean old = insertFakePrompts; insertFakePrompts = doFake; return old; } @Override public int available() { return blockLen - readPos; } @Override public boolean markSupported() { return false; } @Override public void mark(final int readlimit) { throw new AssertionError("Not implemented!"); } @Override public void reset() { throw new AssertionError("Not implemented!"); } /** * Small wrapper to get a blocking variant of the read() method * on the BufferedInputStream. We want to benefit from the * Buffered pre-fetching, but not dealing with half blocks. * Changing this class to be able to use the partially received * data will greatly complicate matters, while a performance * improvement is debatable given the relatively small size of * our blocks. Maybe it does speed up on slower links, then * consider this method a quick bug fix/workaround. * * @return false if reading the block failed due to EOF */ private boolean _read(final byte[] b, int len) throws IOException { int s; int off = 0; while (len > 0) { s = in.read(b, off, len); if (s == -1) { // if we have read something before, we should have been // able to read the whole, so make this fatal if (off > 0) { if (debug) { log("RD ", "the following incomplete block was received:", false); log("RX ", new String(b, 0, off, StandardCharsets.UTF_8), true); } throw new IOException("Read from " + con.getInetAddress().getHostName() + ":" + con.getPort() + ": Incomplete block read from stream"); } if (debug) log("RD ", "server closed the connection (EOF)", true); return false; } len -= s; off += s; } return true; } /** * Reads the next block on the stream into the internal buffer, * or writes the prompt in the buffer. * * The blocked stream protocol consists of first a two byte * integer indicating the length of the block, then the * block, followed by another length + block. The end of * such sequence is put in the last bit of the length, and * hence this length should be shifted to the right to * obtain the real length value first. We simply fetch * blocks here as soon as they are needed for the stream's * read methods. * * The user-flush, which is an implicit effect of the end of * a block sequence, is communicated beyond the stream by * inserting a prompt sequence on the stream after the last * block. This method makes sure that a final block ends with a * newline, if it doesn't already, in order to facilitate a * Reader that is possibly chained to this InputStream. * * If the stream is not positioned correctly, hell will break * loose. */ private int readBlock() throws IOException { // read next two bytes (short) if (!_read(blklen, 2)) return(-1); // Get the short-value and store its value in blockLen. blockLen = (short)( (blklen[0] & 0xFF) >> 1 | (blklen[1] & 0xFF) << 7 ); wasEndBlock = (blklen[0] & 0x1) == 1; readPos = 0; if (debug) { if (wasEndBlock) { log("RD ", "read final block: " + blockLen + " bytes", false); } else { log("RD ", "read new block: " + blockLen + " bytes", false); } } // sanity check to avoid bad servers make us do an ugly // stack trace if (blockLen > block.length) throw new IOException("Server sent a block larger than BLOCKsize: " + blockLen + " > " + block.length); if (!_read(block, blockLen)) return -1; if (debug) log("RX ", new String(block, 0, blockLen, StandardCharsets.UTF_8), true); // if this is the last block, make it end with a newline and prompt if (wasEndBlock) { // insert 'fake' newline and prompt if (insertFakePrompts) { if (blockLen > 0 && block[blockLen - 1] != '\n') { // to terminate the block in a Reader block[blockLen++] = '\n'; } for (byte b : LineType.PROMPT.bytes()) { block[blockLen++] = b; } block[blockLen++] = '\n'; if (debug) { log("RD ", "inserting prompt", true); } } } return blockLen; } @Override public int read() throws IOException { if (available() == 0) { if (readBlock() == -1) return -1; } if (debug) log("RX ", new String(block, readPos, 1, StandardCharsets.UTF_8), true); return (int)block[readPos++]; } @Override public int read(final byte[] b) throws IOException { return read(b, 0, b.length); } @Override public int read(final byte[] b, int off, int len) throws IOException { int t; int size = 0; while (size < len) { t = available(); if (t == 0) { if (size != 0) break; if (readBlock() == -1) { if (size == 0) size = -1; break; } t = available(); } if (len > t) { System.arraycopy(block, readPos, b, off, t); off += t; len -= t; readPos += t; size += t; } else { System.arraycopy(block, readPos, b, off, len); readPos += len; size += len; break; } } return size; } @Override public long skip(final long n) throws IOException { long skip = n; while (skip > 0) { int t = available(); if (skip > t) { skip -= t; readPos += t; readBlock(); } else { readPos += skip; break; } } return n; } /** * For internal use */ Raw getRaw() { return new Raw(); } /** An alternative I/O interface that exposes the block based nature of the MAPI protocol */ final class Raw { byte[] getBytes() { return block; } int getLength() { return blockLen; } int getPosition() { return readPos; } int consume(int delta) { int pos = readPos; readPos += delta; return pos; } int readBlock() throws IOException { boolean wasFaking = setInsertFakePrompts(false); try { return BlockInputStream.this.readBlock(); } finally { setInsertFakePrompts(wasFaking); } } boolean wasEndBlock() { return wasEndBlock; } } } /** * Closes the streams and socket connected to the server if possible. * If an error occurs at closing a resource, it is ignored so as many * resources as possible are closed. */ public synchronized void close() { if (writer != null) { try { writer.close(); writer = null; } catch (IOException e) { /* ignore it */ } } if (reader != null) { try { reader.close(); reader = null; } catch (IOException e) { /* ignore it */ } } if (toMonet != null) { try { toMonet.close(); toMonet = null; } catch (IOException e) { /* ignore it */ } } if (fromMonet != null) { try { fromMonet.close(); fromMonet = null; } catch (IOException e) { /* ignore it */ } } if (con != null) { try { con.close(); // close the socket con = null; } catch (IOException e) { /* ignore it */ } } if (debug && log != null && log instanceof FileWriter) { try { log.close(); log = null; } catch (IOException e) { /* ignore it */ } } } /** * Return an UploadStream for use with for example COPY FROM filename ON CLIENT. * * Building block for {@link org.monetdb.jdbc.MonetConnection.UploadHandler}. * @param chunkSize chunk size for the upload stream */ public UploadStream uploadStream(int chunkSize) { return new UploadStream(chunkSize); } /** * Return an UploadStream for use with for example COPY FROM filename ON CLIENT. * * Building block for {@link org.monetdb.jdbc.MonetConnection.UploadHandler}. */ public UploadStream uploadStream() { return new UploadStream(); } /** * Return a DownloadStream for use with for example COPY INTO filename ON CLIENT * * Building block for {@link org.monetdb.jdbc.MonetConnection.DownloadHandler}. * @param prependCr convert \n to \r\n */ public DownloadStream downloadStream(boolean prependCr) { return new DownloadStream(fromMonet.getRaw(), toMonet, prependCr); } /** * Destructor called by garbage collector before destroying this * object tries to disconnect the MonetDB connection if it has not * been disconnected already. * * @deprecated (since="9") */ @Override @Deprecated protected void finalize() throws Throwable { close(); super.finalize(); } /** * Stream of data sent to the server * * Building block for {@link org.monetdb.jdbc.MonetConnection.UploadHandler}. * * An UploadStream has a chunk size. Every chunk size bytes, the server gets * the opportunity to abort the upload. */ public class UploadStream extends FilterOutputStream { public final static int DEFAULT_CHUNK_SIZE = 1024 * 1024; private final int chunkSize; private boolean closed = false; private boolean serverCancelled = false; private int chunkLeft; private byte[] promptBuffer; private Runnable cancellationCallback = null; /** Create an UploadStream with the given chunk size */ UploadStream(final int chunkSize) { super(toMonet); if (chunkSize <= 0) { throw new IllegalArgumentException("chunk size must be positive"); } this.chunkSize = chunkSize; assert LineType.MORE.bytes().length == LineType.FILETRANSFER.bytes().length; int promptLen = LineType.MORE.bytes().length; promptBuffer = new byte[promptLen + 1]; chunkLeft = this.chunkSize; } /** Create an UploadStream with the default chunk size */ UploadStream() { this(DEFAULT_CHUNK_SIZE); } /** Set a callback to be invoked if the server cancels the upload */ public void setCancellationCallback(final Runnable cancellationCallback) { this.cancellationCallback = cancellationCallback; } @Override public void write(final int b) throws IOException { if (serverCancelled) { // We have already thrown an exception and apparently that has been ignored. // Probably because they're calling print methods instead of write. // Throw another one, maybe they'll catch this one. throw new IOException("Server aborted the upload"); } handleChunking(); out.write(b); wrote(1); } @Override public void write(final byte[] b) throws IOException { this.write(b, 0, b.length); } @Override public void write(final byte[] b, int off, int len) throws IOException { if (serverCancelled) { // We have already thrown an exception and apparently that has been ignored. // Probably because they're calling print methods instead of write. // Throw another one, maybe they'll catch this one. throw new IOException("Server aborted the upload"); } while (len > 0) { handleChunking(); int toWrite = Integer.min(len, chunkLeft); out.write(b, off, toWrite); off += toWrite; len -= toWrite; wrote(toWrite); } } @Override public void flush() throws IOException { // suppress flushes } @Override public void close() throws IOException { if (closed) { return; } closed = true; if (serverCancelled) closeAfterServerCancelled(); else closeAfterSuccesfulUpload(); } private void closeAfterSuccesfulUpload() throws IOException { if (chunkLeft != chunkSize) { // flush pending data flushAndReadPrompt(); } // send empty block out.flush(); final LineType acknowledgement = readPrompt(); if (acknowledgement != LineType.FILETRANSFER) { throw new IOException("Expected server to acknowledge end of file"); } } private void closeAfterServerCancelled() { // nothing to do here, we have already read the error prompt. } private void wrote(final int i) { chunkLeft -= i; } private void handleChunking() throws IOException { if (chunkLeft > 0) { return; } flushAndReadPrompt(); } private void flushAndReadPrompt() throws IOException { out.flush(); chunkLeft = chunkSize; final LineType lineType = readPrompt(); switch (lineType) { case MORE: return; case FILETRANSFER: // Note, if the caller is calling print methods instead of write, the IO exception gets hidden. // This is unfortunate but there's nothing we can do about it. serverCancelled = true; if (cancellationCallback != null) { cancellationCallback.run(); } throw new IOException("Server aborted the upload"); default: throw new IOException("Expected MORE/DONE from server, got " + lineType); } } private LineType readPrompt() throws IOException { final int nread = fromMonet.read(promptBuffer); if (nread != promptBuffer.length || promptBuffer[promptBuffer.length - 1] != '\n') { throw new IOException("server return incomplete prompt"); } return LineType.classify(promptBuffer); } } /** * Stream of data received from the server * * Building block for {@link org.monetdb.jdbc.MonetConnection.DownloadHandler}. */ public static class DownloadStream extends InputStream { private final BlockInputStream.Raw rawIn; private final OutputStream out; private final boolean prependCr; private boolean endBlockSeen = false; private boolean closed = false; private boolean newlinePending = false; // used for crlf conversion DownloadStream(BlockInputStream.Raw rawIn, OutputStream out, boolean prependCr) { this.rawIn = rawIn; this.out = out; this.prependCr = prependCr; } void nextBlock() throws IOException { if (endBlockSeen || closed) return; final int ret = rawIn.readBlock(); if (ret < 0 || rawIn.wasEndBlock()) { endBlockSeen = true; } } @Override public void close() throws IOException { if (closed) return; closed = true; while (!endBlockSeen) { nextBlock(); } // Send acknowledgement to server out.write('\n'); out.flush(); // Do whatever super has to do super.close(); } @Override public int read() throws IOException { final byte[] buf = { 0 }; final int nread = read(buf, 0, 1); if (nread == 1) return buf[0]; else return -1; } @Override public int read(final byte[] dest, int off, int len) throws IOException { final int origOff = off; int end = off + len; while (off < end) { // minimum of what's requested and what we have in stock int chunk = Integer.min(end - off, rawIn.getLength() - rawIn.getPosition()); assert chunk >= 0; if (chunk == 0) { // make progress by fetching more data if (endBlockSeen) break; nextBlock(); continue; } // make progress copying some bytes if (!prependCr) { // no conversion needed, use arraycopy System.arraycopy(rawIn.getBytes(), rawIn.getPosition(), dest, off, chunk); off += chunk; rawIn.consume(chunk); } else { int chunkEnd = off + chunk; if (newlinePending && off < chunkEnd) { // we were in the middle of a line ending conversion dest[off++] = '\n'; newlinePending = false; } while (off < chunkEnd) { byte b = rawIn.getBytes()[rawIn.consume(1)]; if (b != '\n') { dest[off++] = b; } else if (chunkEnd - off >= 2) { dest[off++] = '\r'; dest[off++] = '\n'; } else { dest[off++] = '\r'; newlinePending = true; break; } } } } if (off < end && newlinePending) { dest[off++] = '\n'; newlinePending = false; } if (off == origOff && endBlockSeen) return -1; else return off - origOff; } } }