Mercurial > hg > monetdb-java
changeset 63:6325594f01af embedded
Lots of cleaning, but still a long way to go.
line wrap: on
line diff
--- a/example/SQLcopyinto.java +++ b/example/SQLcopyinto.java @@ -10,9 +10,9 @@ import java.sql.*; import java.io.*; import java.util.*; +import nl.cwi.monetdb.mcl.connection.DeleteMe; import nl.cwi.monetdb.mcl.io.AbstractMCLReader; import nl.cwi.monetdb.mcl.io.AbstractMCLWriter; -import nl.cwi.monetdb.mcl.net.*; /** * This example demonstrates how the MonetDB JDBC driver can facilitate @@ -47,7 +47,7 @@ public class SQLcopyinto { // of course also be done simultaneously with the JDBC // connection being kept connected - MapiSocket server = new MapiSocket("localhost", 50000, "monetdb", "monetdb", false, "sql", "SHA256"); + DeleteMe server = new DeleteMe("localhost", 50000, "monetdb", "monetdb", false, "sql", "SHA256"); server.setDatabase("database"); server.setLanguage("sql");
--- a/src/main/java/nl/cwi/monetdb/embedded/env/MonetDBEmbeddedConnection.java +++ b/src/main/java/nl/cwi/monetdb/embedded/env/MonetDBEmbeddedConnection.java @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHa * * @author <a href="mailto:pedro.ferreira@monetdbsolutions.com">Pedro Ferreira</a> */ -public class MonetDBEmbeddedConnection implements IEmbeddedConnection { +public final class MonetDBEmbeddedConnection implements IEmbeddedConnection { private final long connectionPointer;
--- a/src/main/java/nl/cwi/monetdb/embedded/env/MonetDBEmbeddedDatabase.java +++ b/src/main/java/nl/cwi/monetdb/embedded/env/MonetDBEmbeddedDatabase.java @@ -8,7 +8,8 @@ package nl.cwi.monetdb.embedded.env; -import nl.cwi.monetdb.mcl.net.EmbeddedMonetDB; +import nl.cwi.monetdb.mcl.connection.EmbeddedMonetDB; +import nl.cwi.monetdb.mcl.io.InternalConnection; import java.util.concurrent.ConcurrentHashMap; @@ -21,7 +22,7 @@ import java.util.concurrent.ConcurrentHa * * @author <a href="mailto:pedro.ferreira@monetdbsolutions.com">Pedro Ferreira</a> */ -public class MonetDBEmbeddedDatabase { +public final class MonetDBEmbeddedDatabase { private static MonetDBEmbeddedDatabase MonetDBEmbeddedDatabase = null; @@ -123,7 +124,7 @@ public class MonetDBEmbeddedDatabase { } } - /** + /* * Stops the database asynchronously. All the pending connections will be shut down as well. * * @throws MonetDBEmbeddedException If the database is not running or an error in the database occurred @@ -162,7 +163,7 @@ public class MonetDBEmbeddedDatabase { } } - /** + /* * Creates a connection on the database, set on the default schema asynchronously. * * @return A MonetDBEmbeddedConnection instance @@ -172,19 +173,20 @@ public class MonetDBEmbeddedDatabase { return CompletableFuture.supplyAsync(() -> this.createConnectionInternal()); }*/ - public static void AddJDBCEmbeddedConnection(EmbeddedMonetDB con) throws MonetDBEmbeddedException { + public static InternalConnection AddJDBCEmbeddedConnection() throws MonetDBEmbeddedException { if(MonetDBEmbeddedDatabase == null) { throw new MonetDBEmbeddedException("The database is not running!"); } else { - MonetDBEmbeddedDatabase.createJDBCConnectionInternal(con); - MonetDBEmbeddedDatabase.connections.put(con.getConnectionPointer(), con); + InternalConnection res = MonetDBEmbeddedDatabase.createJDBCConnectionInternal(); + MonetDBEmbeddedDatabase.connections.put(res.getConnectionPointer(), res); + return res; } } /** * Removes a connection from this database. */ - protected static void RemoveConnection(MonetDBEmbeddedConnection con) { + static void RemoveConnection(MonetDBEmbeddedConnection con) { MonetDBEmbeddedDatabase.connections.remove(con.getConnectionPointer()); } @@ -208,5 +210,5 @@ public class MonetDBEmbeddedDatabase { /** * Internal implementation to create a JDBC embeddded connection on this database. */ - private native void createJDBCConnectionInternal(EmbeddedMonetDB emc) throws MonetDBEmbeddedException; + private native InternalConnection createJDBCConnectionInternal() throws MonetDBEmbeddedException; }
--- a/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSet.java +++ b/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSet.java @@ -25,7 +25,7 @@ import java.util.ListIterator; * * @author <a href="mailto:pedro.ferreira@monetdbsolutions.com">Pedro Ferreira</a> */ -public class QueryResultSet extends AbstractResultTable implements Iterable { +public final class QueryResultSet extends AbstractResultTable implements Iterable { /** * The table C pointer.
--- a/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetBooleanColumn.java +++ b/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetBooleanColumn.java @@ -15,7 +15,7 @@ import nl.cwi.monetdb.embedded.env.Monet * * @author <a href="mailto:pedro.ferreira@monetdbsolutions.com">Pedro Ferreira</a> */ -public class QueryResultSetBooleanColumn extends AbstractQueryResultSetColumn<boolean[]> { +public final class QueryResultSetBooleanColumn extends AbstractQueryResultSetColumn<boolean[]> { /** * Gets MonetDB's boolean null constant
--- a/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetByteColumn.java +++ b/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetByteColumn.java @@ -15,7 +15,7 @@ import nl.cwi.monetdb.embedded.env.Monet * * @author <a href="mailto:pedro.ferreira@monetdbsolutions.com">Pedro Ferreira</a> */ -public class QueryResultSetByteColumn extends AbstractQueryResultSetColumn<byte[]> { +public final class QueryResultSetByteColumn extends AbstractQueryResultSetColumn<byte[]> { /** * Gets MonetDB's byte null constant
--- a/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetDoubleColumn.java +++ b/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetDoubleColumn.java @@ -15,7 +15,7 @@ import nl.cwi.monetdb.embedded.env.Monet * * @author <a href="mailto:pedro.ferreira@monetdbsolutions.com">Pedro Ferreira</a> */ -public class QueryResultSetDoubleColumn extends AbstractQueryResultSetColumn<double[]> { +public final class QueryResultSetDoubleColumn extends AbstractQueryResultSetColumn<double[]> { /** * Gets MonetDB's double null constant
--- a/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetFloatColumn.java +++ b/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetFloatColumn.java @@ -15,7 +15,7 @@ import nl.cwi.monetdb.embedded.env.Monet * * @author <a href="mailto:pedro.ferreira@monetdbsolutions.com">Pedro Ferreira</a> */ -public class QueryResultSetFloatColumn extends AbstractQueryResultSetColumn<float[]> { +public final class QueryResultSetFloatColumn extends AbstractQueryResultSetColumn<float[]> { /** * Gets MonetDB's float null constant
--- a/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetIntColumn.java +++ b/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetIntColumn.java @@ -15,7 +15,7 @@ import nl.cwi.monetdb.embedded.env.Monet * * @author <a href="mailto:pedro.ferreira@monetdbsolutions.com">Pedro Ferreira</a> */ -public class QueryResultSetIntColumn extends AbstractQueryResultSetColumn<int[]> { +public final class QueryResultSetIntColumn extends AbstractQueryResultSetColumn<int[]> { /** * Gets MonetDB's int null constant
--- a/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetLongColumn.java +++ b/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetLongColumn.java @@ -15,7 +15,7 @@ import nl.cwi.monetdb.embedded.env.Monet * * @author <a href="mailto:pedro.ferreira@monetdbsolutions.com">Pedro Ferreira</a> */ -public class QueryResultSetLongColumn extends AbstractQueryResultSetColumn<long[]> { +public final class QueryResultSetLongColumn extends AbstractQueryResultSetColumn<long[]> { /** * Gets MonetDB's long null constant
--- a/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetObjectColumn.java +++ b/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetObjectColumn.java @@ -20,7 +20,7 @@ import java.util.ListIterator; * @param <T> The Java class of the mapped MonetDB column * @author <a href="mailto:pedro.ferreira@monetdbsolutions.com">Pedro Ferreira</a> */ -public class QueryResultSetObjectColumn<T> extends AbstractQueryResultSetColumn<T[]> implements Iterable<T> { +public final class QueryResultSetObjectColumn<T> extends AbstractQueryResultSetColumn<T[]> implements Iterable<T> { /** * A null pointer returning method.
--- a/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetShortColumn.java +++ b/src/main/java/nl/cwi/monetdb/embedded/resultset/QueryResultSetShortColumn.java @@ -15,7 +15,7 @@ import nl.cwi.monetdb.embedded.env.Monet * * @author <a href="mailto:pedro.ferreira@monetdbsolutions.com">Pedro Ferreira</a> */ -public class QueryResultSetShortColumn extends AbstractQueryResultSetColumn<short[]> { +public final class QueryResultSetShortColumn extends AbstractQueryResultSetColumn<short[]> { /** * Gets MonetDB's short null constant
--- a/src/main/java/nl/cwi/monetdb/embedded/tables/MonetDBTable.java +++ b/src/main/java/nl/cwi/monetdb/embedded/tables/MonetDBTable.java @@ -22,7 +22,7 @@ import nl.cwi.monetdb.embedded.resultset * * @author <a href="mailto:pedro.ferreira@monetdbsolutions.com">Pedro Ferreira</a> */ -public class MonetDBTable extends AbstractResultTable { +public final class MonetDBTable extends AbstractResultTable { private final String tableSchema;
--- a/src/main/java/nl/cwi/monetdb/embedded/tables/RowIterator.java +++ b/src/main/java/nl/cwi/monetdb/embedded/tables/RowIterator.java @@ -154,5 +154,5 @@ public class RowIterator extends Abstrac /** * Sets the next value to iterate. */ - protected void setNextIteration() { this.currentIterationNumber++; } + void setNextIteration() { this.currentIterationNumber++; } }
--- a/src/main/java/nl/cwi/monetdb/jdbc/MonetBlob.java +++ b/src/main/java/nl/cwi/monetdb/jdbc/MonetBlob.java @@ -268,8 +268,7 @@ public class MonetBlob implements Blob { throw new SQLException("This Blob object has been freed", "M1M20"); try { /* transactions? what are you talking about? */ - for (int i = (int)pos; i < len; i++) - buf[i] = bytes[offset - 1 + i]; + System.arraycopy(bytes, offset - 1 + (int) pos, buf, (int) pos, len - (int) pos); } catch (IndexOutOfBoundsException e) { throw new SQLException(e.getMessage(), "M0M10"); } @@ -291,8 +290,7 @@ public class MonetBlob implements Blob { throw new SQLException("This Blob object has been freed", "M1M20"); if (buf.length > len) { byte[] newbuf = new byte[(int)len]; - for (int i = 0; i < len; i++) - newbuf[i] = buf[i]; + System.arraycopy(buf, 0, newbuf, 0, (int) len); buf = newbuf; } }
--- a/src/main/java/nl/cwi/monetdb/jdbc/MonetConnection.java +++ b/src/main/java/nl/cwi/monetdb/jdbc/MonetConnection.java @@ -46,9 +46,9 @@ import nl.cwi.monetdb.jdbc.types.URL; import nl.cwi.monetdb.mcl.MCLException; import nl.cwi.monetdb.mcl.io.AbstractMCLReader; import nl.cwi.monetdb.mcl.io.AbstractMCLWriter; -import nl.cwi.monetdb.mcl.net.EmbeddedMonetDB; -import nl.cwi.monetdb.mcl.net.MapiSocket; -import nl.cwi.monetdb.mcl.net.AbstractMCLConnection; +import nl.cwi.monetdb.mcl.connection.EmbeddedMonetDB; +import nl.cwi.monetdb.mcl.connection.DeleteMe; +import nl.cwi.monetdb.mcl.connection.AbstractMonetDBConnection; import nl.cwi.monetdb.mcl.parser.HeaderLineParser; import nl.cwi.monetdb.mcl.parser.StartOfHeaderParser; import nl.cwi.monetdb.mcl.parser.MCLParseException; @@ -81,7 +81,7 @@ import nl.cwi.monetdb.mcl.parser.MCLPars public class MonetConnection extends MonetWrapper implements Connection { /** A connection to mserver5 either through MAPI with TCP or embedded */ - private final AbstractMCLConnection server; + private final AbstractMonetDBConnection server; /** The Reader from the server */ private final AbstractMCLReader in; /** The Writer to the server */ @@ -179,7 +179,7 @@ public class MonetConnection extends Mon language = "sql"; addWarning("No language given, defaulting to 'sql'", "M1M05"); } - server = new MapiSocket(hostname, port, database, username, debug, language, hash); + server = new DeleteMe(hostname, port, database, username, debug, language, hash); try { server.setSoTimeout(sockTimeout); } catch (SocketException e) { @@ -239,7 +239,7 @@ public class MonetConnection extends Mon // the following initialisers are only valid when the language // is SQL... - if (server.getLang() == AbstractMCLConnection.LANG_SQL) { + if (server.getLang() == AbstractMonetDBConnection.LANG_SQL) { // enable auto commit setAutoCommit(true); // set our time zone on the server @@ -254,7 +254,7 @@ public class MonetConnection extends Mon } } - protected AbstractMCLConnection getServer() { + protected AbstractMonetDBConnection getServer() { return server; } @@ -627,7 +627,7 @@ public class MonetConnection extends Mon */ @Override public DatabaseMetaData getMetaData() throws SQLException { - if (server.getLang() != AbstractMCLConnection.LANG_SQL) + if (server.getLang() != AbstractMonetDBConnection.LANG_SQL) throw new SQLException("This method is only supported in SQL mode", "M0M04"); return new MonetDatabaseMetaData(this); @@ -1854,7 +1854,7 @@ public class MonetConnection extends Mon /// in memory when dealing with random access to /// reduce memory blow-up - // if we're running forward only, we can discard the old + // if we're running forward only, we can discard the oldmapi // block loaded if (parent.rstype == ResultSet.TYPE_FORWARD_ONLY) { for (int i = 0; i < block; i++) @@ -2322,7 +2322,7 @@ public class MonetConnection extends Mon int size = cachesize == 0 ? DEF_FETCHSIZE : cachesize; size = maxrows != 0 ? Math.min(maxrows, size) : size; // don't do work if it's not needed - if (server.getLang() == AbstractMCLConnection.LANG_SQL && size != curReplySize && templ != server.getCommandHeaderTemplates()) { + if (server.getLang() == AbstractMonetDBConnection.LANG_SQL && size != curReplySize && templ != server.getCommandHeaderTemplates()) { sendControlCommand("reply_size " + size); // store the reply size after a successful change @@ -2387,7 +2387,7 @@ public class MonetConnection extends Mon // have an additional datablock if (rowcount < tuplecount) { if (rsresponses == null) - rsresponses = new HashMap<Integer, ResultSetResponse>(); + rsresponses = new HashMap<>(); rsresponses.put( id, (ResultSetResponse) res
--- a/src/main/java/nl/cwi/monetdb/jdbc/MonetStatement.java +++ b/src/main/java/nl/cwi/monetdb/jdbc/MonetStatement.java @@ -8,7 +8,7 @@ package nl.cwi.monetdb.jdbc; -import nl.cwi.monetdb.mcl.net.AbstractMCLConnection; +import nl.cwi.monetdb.mcl.connection.AbstractMonetDBConnection; import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.Statement; @@ -202,7 +202,7 @@ public class MonetStatement extends Mone boolean first = true; boolean error = false; - AbstractMCLConnection server = connection.getServer(); + AbstractMonetDBConnection server = connection.getServer(); BatchUpdateException e = new BatchUpdateException("Error(s) occurred while executing the batch, see next SQLExceptions for details", "22000", counts); StringBuilder tmpBatch = new StringBuilder(server.getBlockSize());
rename from src/main/java/nl/cwi/monetdb/mcl/net/AbstractMCLConnection.java rename to src/main/java/nl/cwi/monetdb/mcl/connection/AbstractMonetDBConnection.java --- a/src/main/java/nl/cwi/monetdb/mcl/net/AbstractMCLConnection.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/AbstractMonetDBConnection.java @@ -1,12 +1,8 @@ -package nl.cwi.monetdb.mcl.net; +package nl.cwi.monetdb.mcl.connection; import nl.cwi.monetdb.mcl.MCLException; -import nl.cwi.monetdb.mcl.io.AbstractMCLReader; -import nl.cwi.monetdb.mcl.io.AbstractMCLWriter; -import nl.cwi.monetdb.mcl.parser.HeaderLineParser; import nl.cwi.monetdb.mcl.parser.MCLParseException; -import nl.cwi.monetdb.mcl.parser.StartOfHeaderParser; -import nl.cwi.monetdb.mcl.parser.TupleLineParser; +import nl.cwi.monetdb.mcl.protocol.AbstractProtocolParser; import java.io.*; import java.net.SocketException; @@ -15,82 +11,32 @@ import java.util.List; /** * Created by ferreira on 11/23/16. */ -public abstract class AbstractMCLConnection { - - /** the SQL language */ - public final static int LANG_SQL = 0; - /** the MAL language (officially *NOT* supported) */ - public final static int LANG_MAL = 3; - /** an unknown language */ - public final static int LANG_UNKNOWN = -1; +public abstract class AbstractMonetDBConnection { - /** The hostname to connect to */ - protected String hostname; - /** The port to connect on the host to */ - protected int port = -1; + /** The language to connect with */ + protected MonetDBLanguage currentMonetDBLanguage = MonetDBLanguage.LANG_SQL; /** The database to connect to */ - protected String database; - /** The username to use when authenticating */ - protected String username; + protected final String database; + /** Authentication hash method */ + protected final String hash; /** Whether we are debugging or not */ protected boolean debug; - /** The language to connect with */ - protected String language; - /** The hash methods to use (null = default) */ - protected String hash; /** The Writer for the debug log-file */ protected Writer log; - /** The language which is used */ - protected int lang; - /** A template to apply to each query (like pre and post fixes) */ - protected String[] queryTempl = new String[3]; // pre, post, sep - /** A template to apply to each command (like pre and post fixes) */ - protected String[] commandTempl = new String[3]; // pre, post, sep - - public AbstractMCLConnection(String hostname, int port, String database, String username, boolean debug, String language, String hash, String[] queryTempl, String[] commandTempl) { - this.hostname = hostname; - this.port = port; + public AbstractMonetDBConnection(String database, String hash, boolean debug, MonetDBLanguage lang) throws IOException { this.database = database; - this.username = username; - this.debug = debug; this.hash = hash; - this.queryTempl = queryTempl; - this.commandTempl = commandTempl; - this.setLanguage(language); - } - - public String getHostname() { - return hostname; + this.debug = debug; + this.currentMonetDBLanguage = lang; } - public abstract void setHostname(String hostname); - - public int getPort() { - return port; - } - - public abstract void setPort(int port); - public String getDatabase() { return database; } - /** - * Sets the database to connect to. If database is null, a - * connection is made to the default database of the server. This - * is also the default. - * - * @param db the database - */ - public abstract void setDatabase(String db); - - public String getUsername() { - return username; - } - - protected void setUsername(String username) { - this.username = username; + public String getHash() { + return hash; } public boolean isDebug() { @@ -106,56 +52,13 @@ public abstract class AbstractMCLConnect this.debug = debug; } - public String getLanguage() { - return language; - } - - /** - * Sets the language to use for this connection. - * - * @param language the language - */ - public abstract void setLanguage(String language); - - public int getLang() { - return lang; - } - - public String getHash() { - return hash; + public MonetDBLanguage getCurrentMonetDBLanguage() { + return currentMonetDBLanguage; } - /** - * Sets the hash method to use. Note that this method is intended - * for debugging purposes. Setting a hash method can yield in - * connection failures. Multiple hash methods can be given by - * separating the hashes by commas. - * DON'T USE THIS METHOD if you don't know what you're doing. - * - * @param hash the hash method to use - */ - public abstract void setHash(String hash); - - /** - * Gets the SO_TIMEOUT from the underlying Socket. - * - * @return the currently in use timeout in milliseconds - * @throws SocketException Issue with the socket - */ - public abstract int getSoTimeout() throws SocketException; - - /** - * Set the SO_TIMEOUT on the underlying Socket. When for some - * reason the connection to the database hangs, this setting can be - * useful to break out of this indefinite wait. - * This option must be enabled prior to entering the blocking - * operation to have effect. - * - * @param s The specified timeout, in milliseconds. A timeout - * of zero is interpreted as an infinite timeout. - * @throws SocketException Issue with the socket - */ - public abstract void setSoTimeout(int s) throws SocketException; + public void setCurrentMonetDBLanguage(MonetDBLanguage currentMonetDBLanguage) { + this.currentMonetDBLanguage = currentMonetDBLanguage; + } /** * Connects to the given host and port, logging in as the given @@ -172,39 +75,6 @@ public abstract class AbstractMCLConnect public abstract List<String> connect(String user, String pass) throws IOException, MCLParseException, MCLException; /** - * Returns an InputStream that reads from this open connection on - * the MapiSocket. - * - * @return an input stream that reads from this open connection - */ - public abstract InputStream getInputStream(); - - /** - * Returns an output stream for this MapiSocket. - * - * @return an output stream for writing bytes to this MapiSocket - */ - public abstract OutputStream getOutputStream(); - - /** - * Returns a Reader for this MapiSocket. The Reader is a - * BufferedMCLReader which does protocol interpretation of the - * BlockInputStream produced by this MapiSocket. - * - * @return a BufferedMCLReader connected to this MapiSocket - */ - public abstract AbstractMCLReader getReader(); - - /** - * Returns a Writer for this MapiSocket. The Writer is a - * BufferedMCLWriter which produces protocol compatible data blocks - * that the BlockOutputStream can properly translate into blocks. - * - * @return a BufferedMCLWriter connected to this MapiSocket - */ - public abstract AbstractMCLWriter getWriter(); - - /** * Enables logging to a file what is read and written from and to * the server. Logging can be enabled at any time. However, it is * encouraged to start debugging before actually connecting the @@ -244,28 +114,67 @@ public abstract class AbstractMCLConnect debug = true; } - public String getQueryTemplateHeader(int index) { - return queryTempl[index] == null ? "" : queryTempl[index]; + /** + * Writes a logline tagged with a timestamp using the given string. + * Used for debugging purposes only and represents a message that is + * connected to writing to the socket. A logline might look like: + * TX 152545124: Hello MonetDB! + * + * @param message the message to log + * @throws IOException if an IO error occurs while writing to the logfile + */ + private void logTx(String message) throws IOException { + log.write("TX " + System.currentTimeMillis() + + ": " + message + "\n"); + } + + /** + * Writes a logline tagged with a timestamp using the given string. + * Lines written using this log method are tagged as "added + * metadata" which is not strictly part of the data sent. + * + * @param message the message to log + * @throws IOException if an IO error occurs while writing to the logfile + */ + private void logTd(String message) throws IOException { + log.write("TD " + System.currentTimeMillis() + + ": " + message + "\n"); } - public String getCommandTemplateHeader(int index) { - return commandTempl[index] == null ? "" : commandTempl[index]; + /** + * Writes a logline tagged with a timestamp using the given string, + * and flushes afterwards. Used for debugging purposes only and + * represents a message that is connected to reading from the + * socket. The log is flushed after writing the line. A logline + * might look like: + * RX 152545124: Hi JDBC! + * + * @param message the message to log + * @throws IOException if an IO error occurs while writing to the logfile + */ + private void logRx(String message) throws IOException { + log.write("RX " + System.currentTimeMillis() + + ": " + message + "\n"); + log.flush(); } - public String[] getCommandHeaderTemplates() { - return commandTempl; - } - - public String[] getQueryHeaderTemplates() { - return queryTempl; + /** + * Writes a logline tagged with a timestamp using the given string, + * and flushes afterwards. Lines written using this log method are + * tagged as "added metadata" which is not strictly part of the data + * received. + * + * @param message the message to log + * @throws IOException if an IO error occurs while writing to the logfile + */ + private void logRd(String message) throws IOException { + log.write("RD " + System.currentTimeMillis() + + ": " + message + "\n"); + log.flush(); } public synchronized void close() { try { - if (this.getReader() != null) this.getReader().close(); - if (this.getWriter() != null) this.getWriter().close(); - if (this.getInputStream() != null) this.getInputStream().close(); - if (this.getOutputStream() != null) this.getOutputStream().close(); if (debug && log instanceof FileWriter) log.close(); } catch (IOException e) { // ignore it @@ -287,9 +196,9 @@ public abstract class AbstractMCLConnect public abstract int getBlockSize(); - public abstract StartOfHeaderParser getStartOfHeaderParser(); + public abstract int getSoTimeout() throws SocketException; - public abstract HeaderLineParser getHeaderLineParser(int capacity); + public abstract void setSoTimeout(int s) throws SocketException; - public abstract TupleLineParser getTupleLineParser(int capacity); + public abstract AbstractProtocolParser getUnderlyingProtocol(); }
rename from src/main/java/nl/cwi/monetdb/mcl/net/MapiSocket.java rename to src/main/java/nl/cwi/monetdb/mcl/connection/DeleteMe.java --- a/src/main/java/nl/cwi/monetdb/mcl/net/MapiSocket.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/DeleteMe.java @@ -6,1028 +6,16 @@ * Copyright 1997 - July 2008 CWI, August 2008 - 2016 MonetDB B.V. */ -package nl.cwi.monetdb.mcl.net; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.FilterInputStream; -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.UnsupportedEncodingException; -import java.net.Socket; -import java.net.SocketException; -import java.net.URI; -import java.net.URISyntaxException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import nl.cwi.monetdb.mcl.MCLException; -import nl.cwi.monetdb.mcl.io.AbstractMCLReader; -import nl.cwi.monetdb.mcl.io.AbstractMCLWriter; -import nl.cwi.monetdb.mcl.io.BufferedMCLReader; -import nl.cwi.monetdb.mcl.io.BufferedMCLWriter; -import nl.cwi.monetdb.mcl.parser.HeaderLineParser; -import nl.cwi.monetdb.mcl.parser.MCLParseException; -import nl.cwi.monetdb.mcl.parser.StartOfHeaderParser; -import nl.cwi.monetdb.mcl.parser.TupleLineParser; -import nl.cwi.monetdb.mcl.parser.socket.SocketHeaderLineParser; -import nl.cwi.monetdb.mcl.parser.socket.SocketStartOfHeaderParser; -import nl.cwi.monetdb.mcl.parser.socket.SocketTupleLineParser; - -/** - * A Socket for communicating with the MonetDB database in MAPI block - * mode. - * - * The MapiSocket implements the protocol specifics of the MAPI block - * mode protocol, and interfaces it as a socket that delivers a - * BufferedReader and a BufferedWriter. Because logging in is an - * integral part of the MAPI protocol, the MapiSocket performs the login - * procedure. Like the Socket class, various options can be set before - * calling the connect() method to influence the login process. Only - * after a successful call to connect() the BufferedReader and - * BufferedWriter can be retrieved. - * <br /> - * For each line read, it is determined what type of line it is - * according to the MonetDB MAPI protocol. This results in a line to be - * PROMPT, HEADER, RESULT, ERROR or UNKNOWN. Use the getLineType() - * method on the BufferedMCLReader to retrieve the type of the last - * line read. - * - * For debugging purposes a socket level debugging is implemented where - * each and every interaction to and from the MonetDB server is logged - * to a file on disk.<br /> - * Incoming messages are prefixed by "RX" (received by the driver), - * outgoing messages by "TX" (transmitted by the driver). Special - * decoded non-human readable messages are prefixed with "RD" and "TD" - * instead. Following this two char prefix, a timestamp follows as the - * number of milliseconds since the UNIX epoch. The rest of the line is - * a String representation of the data sent or received. - * - * The general use of this Socket must be seen only in the full context - * of a MAPI connection to a server. It has the same ingredients as a - * normal Socket, allowing for seamless plugging. - * <pre> - * Socket \ / InputStream ----> (BufferedMCL)Reader - * > o < - * MapiSocket / \ OutputStream ----> (BufferedMCL)Writer - * </pre> - * The MapiSocket allows to retrieve Streams for communicating. They - * are interfaced, so they can be chained in any way. While the Socket - * transparently deals with how data is sent over the wire, the actual - * data read needs to be interpreted, for which a Reader/Writer - * interface is most sufficient. In particular the BufferedMCL* - * implementations of those interfaces supply some extra functionality - * geared towards the format of the data. - * - * @author Fabian Groffen - * @version 4.1 - * @see BufferedMCLReader - * @see BufferedMCLWriter - */ -public final class MapiSocket extends AbstractMCLConnection { - - /** The blocksize (hardcoded in compliance with stream.mx) */ - private static final int BLOCK = 8 * 1024 - 2; - - private static char hexChar(int n) { return (n > 9) ? (char) ('a' + (n - 10)) : (char) ('0' + n); } +package nl.cwi.monetdb.mcl.connection; - /** - * Small helper method to convert a byte string to a hexadecimal - * string representation. - * - * @param digest the byte array to convert - * @return the byte array as hexadecimal string - */ - private static String toHex(byte[] digest) { - char[] result = new char[digest.length * 2]; - int pos = 0; - for (byte aDigest : digest) { - result[pos++] = hexChar((aDigest & 0xf0) >> 4); - result[pos++] = hexChar(aDigest & 0x0f); - } - return new String(result); - } - - /** The TCP Socket to mserver */ - private Socket con = null; - /** Stream from the Socket for reading */ - private InputStream fromMonet; - /** Stream from the Socket for writing */ - private OutputStream toMonet; - /** MCLReader on the InputStream */ - private BufferedMCLReader reader; - /** MCLWriter on the OutputStream */ - private BufferedMCLWriter writer; - /** protocol version of the connection */ - private int version; - - /** Whether we should follow redirects */ - private boolean followRedirects = true; - /** How many redirections do we follow until we're fed up with it? */ - private int ttl = 10; - - /** A short in two bytes for holding the block size in bytes */ - private byte[] blklen = new byte[2]; - - public MapiSocket(String hostname, int port, String database, String username, boolean debug, String language, String hash) { - super(hostname, port, database, username, debug, language, hash, new String[]{"", "\n;", "\n;\n"}, new String[]{"", "\n;", "\n;\n"}); - } - - @Override - public void setHostname(String hostname) { - this.hostname = hostname; - } - - @Override - public void setPort(int port) { - this.port = port; - } +import java.io.IOException; - @Override - public void setDatabase(String db) { - this.database = db; - } - - @Override - public void setHash(String hash) { - this.hash = hash; - } - - @Override - public void setSoTimeout(int s) throws SocketException { - // limit time to wait on blocking operations (0 = indefinite) - con.setSoTimeout(s); - } - - @Override - public int getSoTimeout() throws SocketException { - return con.getSoTimeout(); - } - - /** - * Sets the language to use for this connection. - * - * @param language the language - */ - public void setLanguage(String language) { - this.language = language; - if ("sql".equals(language)) { - lang = LANG_SQL; - } else if ("mal".equals(language)) { - lang = LANG_MAL; - } else { - lang = LANG_UNKNOWN; - } - if (lang == LANG_SQL) { - queryTempl[0] = "s"; // pre - queryTempl[1] = "\n;"; // post - queryTempl[2] = "\n;\n"; // separator - - commandTempl[0] = "X"; // pre - commandTempl[1] = null; // post - commandTempl[2] = "\nX"; // separator - } else if (lang == LANG_MAL) { - queryTempl[0] = null; - queryTempl[1] = ";\n"; - queryTempl[2] = ";\n"; - - commandTempl[0] = null; // pre - commandTempl[1] = null; // post - commandTempl[2] = null; // separator - } - } - /** - * Sets whether MCL redirections should be followed or not. If set - * to false, an MCLException will be thrown when a redirect is - * encountered during connect. The default bahaviour is to - * automatically follow redirects. - * - * @param r whether to follow redirects (true) or not (false) - */ - public void setFollowRedirects(boolean r) { - this.followRedirects = r; - } - - /** - * Sets the number of redirects that are followed when - * followRedirects is true. In order to avoid going into an endless - * loop due to some evil server, or another error, a maximum number - * of redirects that may be followed can be set here. Note that to - * disable the following of redirects you should use - * setFollowRedirects. - * - * @see #setFollowRedirects(boolean r) - * @param t the number of redirects before an exception is thrown - */ - public void setTTL(int t) { - this.ttl = t; - } +public final class DeleteMe extends MapiConnection { - @Override - public List<String> connect(String user, String pass) - throws IOException, MCLParseException, MCLException { - // Wrap around the internal connect that needs to know if it - // should really make a TCP connection or not. - List<String> res = connect(this.hostname, this.port, user, pass, true); - // apply NetworkTimeout value from legacy (pre 4.1) driver - // so_timeout calls - this.setSoTimeout(this.getSoTimeout()); - return res; - } - - private List<String> connect(String host, int port, String user, String pass, - boolean makeConnection) - throws IOException, MCLParseException, MCLException { - if (ttl-- <= 0) - throw new MCLException("Maximum number of redirects reached, aborting connection attempt. Sorry."); - - if (makeConnection) { - con = new Socket(host, port); - // set nodelay, as it greatly speeds up small messages (like we - // often do) - con.setTcpNoDelay(true); - - fromMonet = new BlockInputStream(con.getInputStream()); - toMonet = new BlockOutputStream(con.getOutputStream()); - try { - reader = new BufferedMCLReader(fromMonet, "UTF-8"); - writer = new BufferedMCLWriter(toMonet, "UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new AssertionError(e.toString()); - } - writer.registerReader(reader); - } - - String c = reader.readLine(); - reader.waitForPrompt(); - writer.writeLine( - getChallengeResponse( - c, - user, - pass, - language, - database, - hash - ) - ); - // read monet response till prompt - List<String> redirects = new ArrayList<>(); - List<String> warns = new ArrayList<>(); - String err = "", tmp; - int lineType; - do { - if ((tmp = reader.readLine()) == null) - throw new IOException("Read from " + - con.getInetAddress().getHostName() + ":" + - con.getPort() + ": End of stream reached"); - if ((lineType = reader.getLineType()) == BufferedMCLReader.ERROR) { - err += "\n" + tmp.substring(7); - } else if (lineType == BufferedMCLReader.INFO) { - warns.add(tmp.substring(1)); - } else if (lineType == BufferedMCLReader.REDIRECT) { - redirects.add(tmp.substring(1)); - } - } while (lineType != BufferedMCLReader.PROMPT); - if (!err.equals("")) { - close(); - throw new MCLException(err.trim()); - } - if (!redirects.isEmpty()) { - if (followRedirects) { - // Ok, server wants us to go somewhere else. The list - // might have multiple clues on where to go. For now we - // don't support anything intelligent but trying the - // first one. URI should be in form of: - // "mapi:monetdb://host:port/database?arg=value&..." - // or - // "mapi:merovingian://proxy?arg=value&..." - // note that the extra arguments must be obeyed in both - // cases - String suri = redirects.get(0); - if (!suri.startsWith("mapi:")) - throw new MCLException("unsupported redirect: " + suri); - - URI u; - try { - u = new URI(suri.substring(5)); - } catch (URISyntaxException e) { - throw new MCLParseException(e.toString()); - } - - tmp = u.getQuery(); - if (tmp != null) { - String args[] = tmp.split("&"); - for (String arg : args) { - int pos = arg.indexOf("="); - if (pos > 0) { - tmp = arg.substring(0, pos); - switch (tmp) { - case "database": - tmp = arg.substring(pos + 1); - if (!tmp.equals(database)) { - warns.add("redirect points to different " + - "database: " + tmp); - setDatabase(tmp); - } - break; - case "language": - tmp = arg.substring(pos + 1); - warns.add("redirect specifies use of different language: " + tmp); - setLanguage(tmp); - break; - case "user": - tmp = arg.substring(pos + 1); - if (!tmp.equals(user)) - warns.add("ignoring different username '" + tmp + "' set by " + - "redirect, what are the security implications?"); - break; - case "password": - warns.add("ignoring different password set by redirect, " + - "what are the security implications?"); - break; - default: - warns.add("ignoring unknown argument '" + tmp + "' from redirect"); - break; - } - } else { - warns.add("ignoring illegal argument from redirect: " + arg); - } - } - } - - switch (u.getScheme()) { - case "monetdb": - // this is a redirect to another (monetdb) server, - // which means a full reconnect - // avoid the debug log being closed - if (debug) { - debug = false; - close(); - debug = true; - } else { - close(); - } - tmp = u.getPath(); - if (tmp != null && tmp.length() != 0) { - tmp = tmp.substring(1).trim(); - if (!tmp.isEmpty() && !tmp.equals(database)) { - warns.add("redirect points to different " + - "database: " + tmp); - setDatabase(tmp); - } - } - int p = u.getPort(); - warns.addAll(connect(u.getHost(), p == -1 ? port : p, - user, pass, true)); - warns.add("Redirect by " + host + ":" + port + " to " + suri); - break; - case "merovingian": - // reuse this connection to inline connect to the - // right database that Merovingian proxies for us - warns.addAll(connect(host, port, user, pass, false)); - break; - default: - throw new MCLException("unsupported scheme in redirect: " + suri); - } - } else { - StringBuilder msg = new StringBuilder("The server sent a redirect for this connection:"); - for (String it : redirects) { - msg.append(" [").append(it).append("]"); - } - throw new MCLException(msg.toString()); - } - } - return warns; + public DeleteMe(String database, boolean debug, MonetDBLanguage lang, String hostname, int port) throws IOException { + super(database, debug, lang, hostname, port, 9); } - /** - * A little helper function that processes a challenge string, and - * returns a response string for the server. If the challenge - * string is null, a challengeless response is returned. - * - * @param chalstr the challenge string - * @param username the username to use - * @param password the password to use - * @param language the language to use - * @param database the database to connect to - * @param hash the hash method(s) to use, or NULL for all supported - * hashes - */ - private String getChallengeResponse( - String chalstr, - String username, - String password, - String language, - String database, - String hash - ) throws MCLParseException, MCLException, IOException { - String response; - String algo; - - // parse the challenge string, split it on ':' - String[] chaltok = chalstr.split(":"); - if (chaltok.length <= 4) throw - new MCLParseException("Server challenge string unusable! Challenge contains too few tokens: " + chalstr); - - // challenge string to use as salt/key - String challenge = chaltok[0]; - String servert = chaltok[1]; - try { - version = Integer.parseInt(chaltok[2].trim()); // protocol version - } catch (NumberFormatException e) { - throw new MCLParseException("Protocol version unparseable: " + chaltok[3]); - } - - // handle the challenge according to the version it is - switch (version) { - default: - throw new MCLException("Unsupported protocol version: " + version); - case 9: - // proto 9 is like 8, but uses a hash instead of the - // plain password, the server tells us which hash in the - // challenge after the byte-order - - /* NOTE: Java doesn't support RIPEMD160 :( */ - switch (chaltok[5]) { - case "SHA512": - algo = "SHA-512"; - break; - case "SHA384": - algo = "SHA-384"; - break; - case "SHA256": - algo = "SHA-256"; - /* NOTE: Java doesn't support SHA-224 */ - break; - case "SHA1": - algo = "SHA-1"; - break; - case "MD5": - algo = "MD5"; - break; - default: - throw new MCLException("Unsupported password hash: " + chaltok[5]); - } - - try { - MessageDigest md = MessageDigest.getInstance(algo); - md.update(password.getBytes("UTF-8")); - byte[] digest = md.digest(); - password = toHex(digest); - } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) { - throw new AssertionError("internal error: " + e.toString()); - } - - // proto 7 (finally) used the challenge and works with a - // password hash. The supported implementations come - // from the server challenge. We chose the best hash - // we can find, in the order SHA1, MD5, plain. Also, - // the byte-order is reported in the challenge string, - // which makes sense, since only blockmode is supported. - // proto 8 made this obsolete, but retained the - // byte-order report for future "binary" transports. In - // proto 8, the byte-order of the blocks is always little - // endian because most machines today are. - String hashes = (hash == null ? chaltok[3] : hash); - Set<String> hashesSet = new HashSet<>(Arrays.asList(hashes.toUpperCase().split("[, ]"))); - - // if we deal with merovingian, mask our credentials - if (servert.equals("merovingian") && !language.equals("control")) { - username = "merovingian"; - password = "merovingian"; - } - String pwhash; - - if (hashesSet.contains("SHA512")) { - algo = "SHA-512"; - pwhash = "{SHA512}"; - } else if (hashesSet.contains("SHA384")) { - algo = "SHA-384"; - pwhash = "{SHA384}"; - } else if (hashesSet.contains("SHA256")) { - algo = "SHA-256"; - pwhash = "{SHA256}"; - } else if (hashesSet.contains("SHA1")) { - algo = "SHA-1"; - pwhash = "{SHA1}"; - } else if (hashesSet.contains("MD5")) { - algo = "MD5"; - pwhash = "{MD5}"; - } else { - throw new MCLException("no supported password hashes in " + hashes); - } - try { - MessageDigest md = MessageDigest.getInstance(algo); - md.update(password.getBytes("UTF-8")); - md.update(challenge.getBytes("UTF-8")); - byte[] digest = md.digest(); - pwhash += toHex(digest); - } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) { - throw new AssertionError("internal error: " + e.toString()); - } - // TODO: some day when we need this, we should store - // this - switch (chaltok[4]) { - case "BIG": - // byte-order of server is big-endian - break; - case "LIT": - // byte-order of server is little-endian - break; - default: - throw new MCLParseException("Invalid byte-order: " + chaltok[5]); - } - - // generate response - response = "BIG:"; // JVM byte-order is big-endian - response += username + ":" + pwhash + ":" + language; - response += ":" + (database == null ? "" : database) + ":"; - - return response; - } - } - - @Override - public InputStream getInputStream() { - return fromMonet; - } - - @Override - public OutputStream getOutputStream() { - return toMonet; - } - - @Override - public AbstractMCLReader getReader() { - return reader; - } - - @Override - public AbstractMCLWriter getWriter() { - return writer; - } - - /** - * Returns the mapi protocol version used by this socket. The - * protocol version depends on the server being used. Users of the - * MapiSocket should check this version to act appropriately. - * - * @return the mapi protocol version - */ - public int getProtocolVersion() { - return version; - } - - /** - * Inner class that is used to write data on a normal stream as a - * blocked stream. A call to the flush() method will write a - * "final" block to the underlying stream. Non-final blocks are - * written as soon as one or more bytes would not fit in the - * current block any more. This allows to write to a block to it's - * full size, and then flush it explicitly to have a final block - * being written to the stream. - */ - class BlockOutputStream extends FilterOutputStream { - private int writePos = 0; - private byte[] block = new byte[BLOCK]; - private int blocksize = 0; - - /** - * Constructs this BlockOutputStream, backed by the given - * OutputStream. A BufferedOutputStream is internally used. - */ - public BlockOutputStream(OutputStream out) { - // always use a buffered stream, even though we know how - // much bytes to write/read, since this is just faster for - // some reason - super(new BufferedOutputStream(out)); - } - - @Override - public void flush() throws IOException { - // write the block (as final) then flush. - writeBlock(true); - out.flush(); - - // it's a bit nasty if an exception is thrown from the log, - // but ignoring it can be nasty as well, so it is decided to - // let it go so there is feedback about something going wrong - // it's a bit nasty if an exception is thrown from the log, - // but ignoring it can be nasty as well, so it is decided to - // let it go so there is feedback about something going wrong - if (debug) { - log.flush(); - } - } - - /** - * writeBlock puts the data in the block on the stream. The - * boolean last controls whether the block is sent with an - * indicator to note it is the last block of a sequence or not. - * - * @param last whether this is the last block - * @throws IOException if writing to the stream failed - */ - public void writeBlock(boolean last) throws IOException { - if (last) { - // always fits, because of BLOCK's size - blocksize = (short)writePos; - // this is the last block, so encode least - // significant bit in the first byte (little-endian) - blklen[0] = (byte)(blocksize << 1 & 0xFF | 1); - blklen[1] = (byte)(blocksize >> 7); - } else { - // always fits, because of BLOCK's size - blocksize = (short)BLOCK; - // another block will follow, encode least - // significant bit in the first byte (little-endian) - blklen[0] = (byte)(blocksize << 1 & 0xFF); - blklen[1] = (byte)(blocksize >> 7); - } - - out.write(blklen); - - // write the actual block - out.write(block, 0, writePos); - - if (debug) { - if (last) { - logTd("write final block: " + writePos + " bytes"); - } else { - logTd("write block: " + writePos + " bytes"); - } - logTx(new String(block, 0, writePos, "UTF-8")); - } - - writePos = 0; - } - - @Override - public void write(int b) throws IOException { - if (writePos == BLOCK) { - writeBlock(false); - } - block[writePos++] = (byte)b; - } - - @Override - public void write(byte[] b) throws IOException { - write(b, 0, b.length); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - int t; - while (len > 0) { - t = BLOCK - writePos; - if (len > t) { - System.arraycopy(b, off, block, writePos, t); - off += t; - len -= t; - writePos += t; - writeBlock(false); - } else { - System.arraycopy(b, off, block, writePos, len); - writePos += len; - break; - } - } - } - - @Override - public void close() throws IOException { - // we don't want the flush() method to be called (default of - // the FilterOutputStream), so we close manually here - out.close(); - } - } - - /** - * Inner class that is used to make the data on the blocked stream - * available as a normal stream. - */ - class BlockInputStream extends FilterInputStream { - private int readPos = 0; - private int blockLen = 0; - private byte[] block = new byte[BLOCK + 3]; // \n.\n - - /** - * Constructs this BlockInputStream, backed by the given - * InputStream. A BufferedInputStream is internally used. - */ - public BlockInputStream(InputStream in) { - // always use a buffered stream, even though we know how - // much bytes to write/read, since this is just faster for - // some reason - super(new BufferedInputStream(in)); - } - - @Override - public int available() { - return blockLen - readPos; - } - - @Override - public boolean markSupported() { - return false; - } - - @Override - public void mark(int readlimit) { - throw new AssertionError("Not implemented!"); - } - - @Override - public void reset() { - throw new AssertionError("Not implemented!"); - } - - /** - * Small wrapper to get a blocking variant of the read() method - * on the BufferedInputStream. We want to benefit from the - * Buffered pre-fetching, but not dealing with half blocks. - * Changing this class to be able to use the partially received - * data will greatly complicate matters, while a performance - * improvement is debatable given the relatively small size of - * our blocks. Maybe it does speed up on slower links, then - * consider this method a quick bug fix/workaround. - * - * @return false if reading the block failed due to EOF - */ - private boolean _read(byte[] b, int len) throws IOException { - int s; - int off = 0; - - while (len > 0) { - s = in.read(b, off, len); - if (s == -1) { - // if we have read something before, we should have been - // able to read the whole, so make this fatal - if (off > 0) { - if (debug) { - logRd("the following incomplete block was received:"); - logRx(new String(b, 0, off, "UTF-8")); - } - throw new IOException("Read from " + - con.getInetAddress().getHostName() + ":" + - con.getPort() + ": Incomplete block read from stream"); - } - if (debug) - logRd("server closed the connection (EOF)"); - return false; - } - len -= s; - off += s; - } - - return true; - } - - /** - * Reads the next block on the stream into the internal buffer, - * or writes the prompt in the buffer. - * - * The blocked stream protocol consists of first a two byte - * integer indicating the length of the block, then the - * block, followed by another length + block. The end of - * such sequence is put in the last bit of the length, and - * hence this length should be shifted to the right to - * obtain the real length value first. We simply fetch - * blocks here as soon as they are needed for the stream's - * read methods. - * - * The user-flush, which is an implicit effect of the end of - * a block sequence, is communicated beyond the stream by - * inserting a prompt sequence on the stream after the last - * block. This method makes sure that a final block ends with a - * newline, if it doesn't already, in order to facilitate a - * Reader that is possibly chained to this InputStream. - * - * If the stream is not positioned correctly, hell will break - * loose. - */ - private int readBlock() throws IOException { - // read next two bytes (short) - if (!_read(blklen, 2)) - return(-1); - - // Get the short-value and store its value in blockLen. - blockLen = (short)( - (blklen[0] & 0xFF) >> 1 | - (blklen[1] & 0xFF) << 7 - ); - readPos = 0; - - if (debug) { - if ((blklen[0] & 0x1) == 1) { - logRd("read final block: " + blockLen + " bytes"); - } else { - logRd("read new block: " + blockLen + " bytes"); - } - } - - // sanity check to avoid bad servers make us do an ugly - // stack trace - if (blockLen > block.length) - throw new AssertionError("Server sent a block " + - "larger than BLOCKsize: " + - blockLen + " > " + block.length); - if (!_read(block, blockLen)) - return(-1); - - if (debug) - logRx(new String(block, 0, blockLen, "UTF-8")); - - // if this is the last block, make it end with a newline and - // prompt - if ((blklen[0] & 0x1) == 1) { - if (blockLen > 0 && block[blockLen - 1] != '\n') { - // to terminate the block in a Reader - block[blockLen++] = '\n'; - } - // insert 'fake' flush - block[blockLen++] = BufferedMCLReader.PROMPT; - block[blockLen++] = '\n'; - if (debug) - logRd("inserting prompt"); - } - - return(blockLen); - } - - @Override - public int read() throws IOException { - if (available() == 0) { - if (readBlock() == -1) - return(-1); - } - - if (debug) - logRx(new String(block, readPos, 1, "UTF-8")); - return (int)block[readPos++]; - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - int t; - int size = 0; - while (size < len) { - t = available(); - if (t == 0) { - if (size != 0) - break; - if (readBlock() == -1) { - size = -1; - break; - } - t = available(); - } - if (len > t) { - System.arraycopy(block, readPos, b, off, t); - off += t; - len -= t; - readPos += t; - size += t; - } else { - System.arraycopy(block, readPos, b, off, len); - readPos += len; - size += len; - break; - } - } - return size; - } - - @Override - public long skip(long n) throws IOException { - long skip = n; - int t; - while (skip > 0) { - t = available(); - if (skip > t) { - skip -= t; - readPos += t; - readBlock(); - } else { - readPos += skip; - break; - } - } - return n; - } - } - - /** - * Closes the streams and socket connected to the server if - * possible. If an error occurs during disconnecting it is ignored. - */ - public synchronized void close() { - super.close(); - try { - if (con != null) con.close(); - } catch (IOException e) { - // ignore it - } - } - - /** - * Writes a logline tagged with a timestamp using the given string. - * Used for debugging purposes only and represents a message that is - * connected to writing to the socket. A logline might look like: - * TX 152545124: Hello MonetDB! - * - * @param message the message to log - * @throws IOException if an IO error occurs while writing to the logfile - */ - private void logTx(String message) throws IOException { - log.write("TX " + System.currentTimeMillis() + - ": " + message + "\n"); - } - - /** - * Writes a logline tagged with a timestamp using the given string. - * Lines written using this log method are tagged as "added - * metadata" which is not strictly part of the data sent. - * - * @param message the message to log - * @throws IOException if an IO error occurs while writing to the logfile - */ - private void logTd(String message) throws IOException { - log.write("TD " + System.currentTimeMillis() + - ": " + message + "\n"); - } - - /** - * Writes a logline tagged with a timestamp using the given string, - * and flushes afterwards. Used for debugging purposes only and - * represents a message that is connected to reading from the - * socket. The log is flushed after writing the line. A logline - * might look like: - * RX 152545124: Hi JDBC! - * - * @param message the message to log - * @throws IOException if an IO error occurs while writing to the logfile - */ - private void logRx(String message) throws IOException { - log.write("RX " + System.currentTimeMillis() + - ": " + message + "\n"); - log.flush(); - } - - /** - * Writes a logline tagged with a timestamp using the given string, - * and flushes afterwards. Lines written using this log method are - * tagged as "added metadata" which is not strictly part of the data - * received. - * - * @param message the message to log - * @throws IOException if an IO error occurs while writing to the logfile - */ - private void logRd(String message) throws IOException { - log.write("RD " + System.currentTimeMillis() + - ": " + message + "\n"); - log.flush(); - } - - @Override - public String getJDBCURL() { - String language = ""; - if (this.getLang() == AbstractMCLConnection.LANG_MAL) - language = "?language=mal"; - return "jdbc:monetdb://" + this.getHostname() + ":" + this.getPort() + "/" + this.getDatabase() + language; - } - - @Override - public int getBlockSize() { - return BLOCK; - } - - @Override - public StartOfHeaderParser getStartOfHeaderParser() { - return new SocketStartOfHeaderParser(); - } - - @Override - public HeaderLineParser getHeaderLineParser(int capacity) { - return new SocketHeaderLineParser(capacity); - } - - @Override - public TupleLineParser getTupleLineParser(int capacity) { - return new SocketTupleLineParser(capacity); - } }
rename from src/main/java/nl/cwi/monetdb/mcl/net/EmbeddedMonetDB.java rename to src/main/java/nl/cwi/monetdb/mcl/connection/EmbeddedMonetDB.java --- a/src/main/java/nl/cwi/monetdb/mcl/net/EmbeddedMonetDB.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/EmbeddedMonetDB.java @@ -1,20 +1,10 @@ -package nl.cwi.monetdb.mcl.net; +package nl.cwi.monetdb.mcl.connection; -import nl.cwi.monetdb.embedded.env.IEmbeddedConnection; import nl.cwi.monetdb.embedded.env.MonetDBEmbeddedDatabase; import nl.cwi.monetdb.embedded.env.MonetDBEmbeddedException; import nl.cwi.monetdb.mcl.MCLException; -import nl.cwi.monetdb.mcl.io.AbstractMCLReader; -import nl.cwi.monetdb.mcl.io.AbstractMCLWriter; -import nl.cwi.monetdb.mcl.io.EmbeddedMCLReader; -import nl.cwi.monetdb.mcl.io.EmbeddedMCLWriter; -import nl.cwi.monetdb.mcl.parser.HeaderLineParser; +import nl.cwi.monetdb.mcl.io.*; import nl.cwi.monetdb.mcl.parser.MCLParseException; -import nl.cwi.monetdb.mcl.parser.StartOfHeaderParser; -import nl.cwi.monetdb.mcl.parser.TupleLineParser; -import nl.cwi.monetdb.mcl.parser.embedded.EmbeddedHeaderLineParser; -import nl.cwi.monetdb.mcl.parser.embedded.EmbeddedStartOfHeaderParser; -import nl.cwi.monetdb.mcl.parser.embedded.EmbeddedTupleLineParser; import java.io.*; import java.net.SocketException; @@ -23,20 +13,14 @@ import java.util.List; /** * Created by ferreira on 11/23/16. */ -public class EmbeddedMonetDB extends AbstractMCLConnection implements IEmbeddedConnection { - - private long connectionPointer; - - protected static final int BUFFER_SIZE = 102400; //100 kb to start +public final class EmbeddedMonetDB extends AbstractMonetDBConnection { private final String directory; - private EmbeddedMCLReader reader; + private InternalConnection connection; - private EmbeddedMCLWriter writer; - - public EmbeddedMonetDB(String hostname, int port, String database, String username, boolean debug, String language, String hash, String directory) { - super(hostname, port, database, username, debug, language, hash, new String[]{"", "\n;", "\n;\n"}, new String[]{"X", null, "\nX"}); + public EmbeddedMonetDB(String database, String hash, boolean debug, MonetDBLanguage lang, String directory) throws IOException { + super(database, hash, debug, lang); this.directory = directory; } @@ -45,43 +29,6 @@ public class EmbeddedMonetDB extends Abs } @Override - public void setHostname(String hostname) { - throw new IllegalArgumentException("Cannot set a hostname on a embedded connection!"); - } - - @Override - public void setPort(int port) { - throw new IllegalArgumentException("Cannot set a port on a embedded connection!"); - } - - @Override - public void setDatabase(String db) { - throw new IllegalArgumentException("Not yet planned!"); - } - - @Override - public void setLanguage(String language) { - if(this.lang != LANG_SQL) { - throw new IllegalArgumentException("The embedded connection only supports the SQL language!"); - } - } - - @Override - public void setHash(String hash) { - throw new IllegalArgumentException("The embedded connection does not support user authentication yet!"); - } - - @Override - public int getSoTimeout() throws SocketException { - throw new IllegalArgumentException("Cannot get socket timeout on an embedded connection!"); - } - - @Override - public void setSoTimeout(int s) throws SocketException { - throw new IllegalArgumentException("Cannot set socket timeout on an embedded connection!"); - } - - @Override public List<String> connect(String user, String pass) throws IOException, MCLParseException, MCLException { try { if(MonetDBEmbeddedDatabase.IsDatabaseRunning() && !MonetDBEmbeddedDatabase.GetDatabaseDirectory().equals(this.directory)) { @@ -89,9 +36,7 @@ public class EmbeddedMonetDB extends Abs } else { MonetDBEmbeddedDatabase.StartDatabase(this.directory, true, false); } - this.reader = new EmbeddedMCLReader(); - this.writer = new EmbeddedMCLWriter(this.reader); - MonetDBEmbeddedDatabase.AddJDBCEmbeddedConnection(this); + this.connection = MonetDBEmbeddedDatabase.AddJDBCEmbeddedConnection(); } catch (MonetDBEmbeddedException ex) { throw new MCLException(ex); } @@ -99,26 +44,6 @@ public class EmbeddedMonetDB extends Abs } @Override - public InputStream getInputStream() { - throw new IllegalArgumentException("Not available!"); - } - - @Override - public OutputStream getOutputStream() { - throw new IllegalArgumentException("Not available!"); - } - - @Override - public AbstractMCLReader getReader() { - return this.reader; - } - - @Override - public AbstractMCLWriter getWriter() { - return this.writer; - } - - @Override public synchronized void close() { super.close(); try { @@ -130,38 +55,21 @@ public class EmbeddedMonetDB extends Abs @Override public String getJDBCURL() { - return "jdbc:monetdb://" + this.getHostname() + "@" + this.getDirectory() + "/" + this.getDatabase(); + return "jdbc:monetdb://localhost@" + this.getDirectory() + "/" + this.getDatabase(); } @Override public int getBlockSize() { - return BUFFER_SIZE; - } - - @Override - public StartOfHeaderParser getStartOfHeaderParser() { - return new EmbeddedStartOfHeaderParser(); - } - - @Override - public HeaderLineParser getHeaderLineParser(int capacity) { - return new EmbeddedHeaderLineParser(capacity); + return Integer.MAX_VALUE; } @Override - public TupleLineParser getTupleLineParser(int capacity) { - return new EmbeddedTupleLineParser(capacity); + public int getSoTimeout() throws SocketException { + throw new IllegalArgumentException("Cannot get a timeout on a embedded connection!"); } @Override - public long getConnectionPointer() { - return connectionPointer; + public void setSoTimeout(int s) throws SocketException { + throw new IllegalArgumentException("Cannot set a timeout on a embedded connection!"); } - - @Override - public void closeConnectionImplementation() { - this.closeConnectionInternal(this.connectionPointer); - } - - private native void closeConnectionInternal(long connectionPointer); }
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/MapiConnection.java @@ -0,0 +1,490 @@ +package nl.cwi.monetdb.mcl.connection; + +import nl.cwi.monetdb.mcl.MCLException; +import nl.cwi.monetdb.mcl.io.BufferedMCLReader; +import nl.cwi.monetdb.mcl.io.BufferedMCLWriter; +import nl.cwi.monetdb.mcl.io.SocketConnection; +import nl.cwi.monetdb.mcl.io.SocketIOHandler; +import nl.cwi.monetdb.mcl.parser.MCLParseException; +import nl.cwi.monetdb.mcl.protocol.AbstractProtocolParser; +import nl.cwi.monetdb.mcl.protocol.ServerResponses; +import nl.cwi.monetdb.mcl.protocol.oldmapi.OldMapiProtocol; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.SocketException; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.*; + +/** + * A Socket for communicating with the MonetDB database in MAPI block + * mode. + * + * The MapiSocket implements the protocol specifics of the MAPI block + * mode protocol, and interfaces it as a socket that delivers a + * BufferedReader and a BufferedWriter. Because logging in is an + * integral part of the MAPI protocol, the MapiSocket performs the login + * procedure. Like the Socket class, various options can be set before + * calling the connect() method to influence the login process. Only + * after a successful call to connect() the BufferedReader and + * BufferedWriter can be retrieved. + * <br /> + * For each line read, it is determined what type of line it is + * according to the MonetDB MAPI protocol. This results in a line to be + * PROMPT, HEADER, RESULT, ERROR or UNKNOWN. Use the getLineType() + * method on the BufferedMCLReader to retrieve the type of the last + * line read. + * + * For debugging purposes a socket level debugging is implemented where + * each and every interaction to and from the MonetDB server is logged + * to a file on disk.<br /> + * Incoming messages are prefixed by "RX" (received by the driver), + * outgoing messages by "TX" (transmitted by the driver). Special + * decoded non-human readable messages are prefixed with "RD" and "TD" + * instead. Following this two char prefix, a timestamp follows as the + * number of milliseconds since the UNIX epoch. The rest of the line is + * a String representation of the data sent or received. + * + * The general use of this Socket must be seen only in the full context + * of a MAPI connection to a server. It has the same ingredients as a + * normal Socket, allowing for seamless plugging. + * <pre> + * Socket \ / InputStream ----> (BufferedMCL)Reader + * > o < + * MapiSocket / \ OutputStream ----> (BufferedMCL)Writer + * </pre> + * The MapiSocket allows to retrieve Streams for communicating. They + * are interfaced, so they can be chained in any way. While the Socket + * transparently deals with how data is sent over the wire, the actual + * data read needs to be interpreted, for which a Reader/Writer + * interface is most sufficient. In particular the BufferedMCL* + * implementations of those interfaces supply some extra functionality + * geared towards the format of the data. + * + * @author Fabian Groffen + * @version 4.1 + * @see BufferedMCLReader + * @see BufferedMCLWriter + */ +public class MapiConnection extends AbstractMonetDBConnection { + + private static char hexChar(int n) { return (n > 9) ? (char) ('a' + (n - 10)) : (char) ('0' + n); } + + /** + * Small helper method to convert a byte string to a hexadecimal + * string representation. + * + * @param digest the byte array to convert + * @return the byte array as hexadecimal string + */ + private static String toHex(byte[] digest) { + char[] result = new char[digest.length * 2]; + int pos = 0; + for (byte aDigest : digest) { + result[pos++] = hexChar((aDigest & 0xf0) >> 4); + result[pos++] = hexChar(aDigest & 0x0f); + } + return new String(result); + } + + /** The hostname to connect to */ + protected final String hostname; + /** The port to connect on the host to */ + protected final int port; + /** Whether we should follow redirects */ + protected boolean followRedirects = true; + /** How many redirections do we follow until we're fed up with it? */ + protected int ttl = 10; + /** protocol version of the connection */ + protected int version; + + protected OldMapiProtocol protocol; + + public MapiConnection(String database, String hash, boolean debug, MonetDBLanguage lang, String hostname, int port) throws IOException { + super(database, hash, debug, lang); + this.hostname = hostname; + this.port = port; + } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + /** + * Sets whether MCL redirections should be followed or not. If set + * to false, an MCLException will be thrown when a redirect is + * encountered during connect. The default behaviour is to + * automatically follow redirects. + * + * @param r whether to follow redirects (true) or not (false) + */ + public void setFollowRedirects(boolean r) { + this.followRedirects = r; + } + + /** + * Sets the number of redirects that are followed when + * followRedirects is true. In order to avoid going into an endless + * loop due to some evil server, or another error, a maximum number + * of redirects that may be followed can be set here. Note that to + * disable the following of redirects you should use + * setFollowRedirects. + * + * @see #setFollowRedirects(boolean r) + * @param t the number of redirects before an exception is thrown + */ + public void setTTL(int t) { + this.ttl = t; + } + + /** + * Returns the mapi protocol version used by this socket. The + * protocol version depends on the server being used. Users of the + * MapiSocket should check this version to act appropriately. + * + * @return the mapi protocol version + */ + public int getProtocolVersion() { + return this.version; + } + + @Override + public String getJDBCURL() { + String language = ""; + if (this.getCurrentMonetDBLanguage() == MonetDBLanguage.LANG_MAL) + language = "?language=mal"; + return "jdbc:monetdb://" + this.getHostname() + ":" + this.getPort() + "/" + this.getDatabase() + language; + } + + @Override + public void close() { + super.close(); + try { + protocol.getHandler().getConnection().close(); + } catch (IOException e) { + // ignore it + } + } + + public int getBlockSize() { + return protocol.getHandler().getConnection().getBlockSize(); + } + + public int getSoTimeout() throws SocketException { + return protocol.getHandler().getConnection().getSoTimeout(); + } + + public void setSoTimeout(int s) throws SocketException { + protocol.getHandler().getConnection().setSoTimeout(s); + } + + @Override + public AbstractProtocolParser getUnderlyingProtocol() { + return protocol; + } + + private List<String> connect(String user, String pass, boolean makeConnection) throws IOException, MCLParseException, MCLException { + if (ttl-- <= 0) + throw new MCLException("Maximum number of redirects reached, aborting connection attempt. Sorry."); + + if (makeConnection) { + this.protocol = new OldMapiProtocol(new SocketConnection(this.hostname, this.port)); + // set nodelay, as it greatly speeds up small messages (like we + // often do) + this.protocol.getHandler().getConnection().setTcpNoDelay(true); + //TODO writer.registerReader(reader); + } + + ServerResponses nextResponse; + + String test = getChallengeResponse(user, pass, language, database, hash); + + writer.writeLine(); + + // read monet response till prompt + List<String> redirects = new ArrayList<>(); + List<String> warns = new ArrayList<>(); + String err = "", tmp; + int lineType; + do { + if ((tmp = reader.readLine()) == null) + throw new IOException("Read from " + this.getHostname() + ":" + this.getPort() + ": End of stream reached"); + if ((lineType = reader.getLineType()) == BufferedMCLReader.ERROR) { + err += "\n" + tmp.substring(7); + } else if (lineType == BufferedMCLReader.INFO) { + warns.add(tmp.substring(1)); + } else if (lineType == BufferedMCLReader.REDIRECT) { + redirects.add(tmp.substring(1)); + } + } while (lineType != BufferedMCLReader.PROMPT); + if (!err.equals("")) { + close(); + throw new MCLException(err.trim()); + } + if (!redirects.isEmpty()) { + if (followRedirects) { + // Ok, server wants us to go somewhere else. The list + // might have multiple clues on where to go. For now we + // don't support anything intelligent but trying the + // first one. URI should be in form of: + // "mapi:monetdb://host:port/database?arg=value&..." + // or + // "mapi:merovingian://proxy?arg=value&..." + // note that the extra arguments must be obeyed in both + // cases + String suri = redirects.get(0); + if (!suri.startsWith("mapi:")) + throw new MCLException("unsupported redirect: " + suri); + + URI u; + try { + u = new URI(suri.substring(5)); + } catch (URISyntaxException e) { + throw new MCLParseException(e.toString()); + } + + tmp = u.getQuery(); + if (tmp != null) { + String args[] = tmp.split("&"); + for (String arg : args) { + int pos = arg.indexOf("="); + if (pos > 0) { + tmp = arg.substring(0, pos); + switch (tmp) { + case "database": + tmp = arg.substring(pos + 1); + if (!tmp.equals(database)) { + warns.add("redirect points to different " + + "database: " + tmp); + setDatabase(tmp); + } + break; + case "language": + tmp = arg.substring(pos + 1); + warns.add("redirect specifies use of different language: " + tmp); + setLanguage(tmp); + break; + case "user": + tmp = arg.substring(pos + 1); + if (!tmp.equals(user)) + warns.add("ignoring different username '" + tmp + "' set by " + + "redirect, what are the security implications?"); + break; + case "password": + warns.add("ignoring different password set by redirect, " + + "what are the security implications?"); + break; + default: + warns.add("ignoring unknown argument '" + tmp + "' from redirect"); + break; + } + } else { + warns.add("ignoring illegal argument from redirect: " + arg); + } + } + } + + switch (u.getScheme()) { + case "monetdb": + // this is a redirect to another (monetdb) server, + // which means a full reconnect + // avoid the debug log being closed + if (debug) { + debug = false; + close(); + debug = true; + } else { + close(); + } + tmp = u.getPath(); + if (tmp != null && tmp.length() != 0) { + tmp = tmp.substring(1).trim(); + if (!tmp.isEmpty() && !tmp.equals(database)) { + warns.add("redirect points to different " + + "database: " + tmp); + setDatabase(tmp); + } + } + int p = u.getPort(); + warns.addAll(connect(u.getHost(), p == -1 ? port : p, user, pass, true)); + warns.add("Redirect by " + host + ":" + port + " to " + suri); + break; + case "merovingian": + // reuse this connection to inline connect to the + // right database that Merovingian proxies for us + warns.addAll(connect(host, port, user, pass, false)); + break; + default: + throw new MCLException("unsupported scheme in redirect: " + suri); + } + } else { + StringBuilder msg = new StringBuilder("The server sent a redirect for this connection:"); + for (String it : redirects) { + msg.append(" [").append(it).append("]"); + } + throw new MCLException(msg.toString()); + } + } + return warns; + } + + @Override + public List<String> connect(String user, String pass) throws IOException, MCLParseException, MCLException { + // Wrap around the internal connect that needs to know if it + // should really make a TCP connection or not. + List<String> res = connect(user, pass, true); + // apply NetworkTimeout value from legacy (pre 4.1) driver + // so_timeout calls + this.setSoTimeout(this.getSoTimeout()); + return res; + } + + /** + * A little helper function that processes a challenge string, and + * returns a response string for the server. If the challenge + * string is null, a challengeless response is returned. + * + * @param chalstr the challenge string + * @param username the username to use + * @param password the password to use + * @param language the language to use + * @param database the database to connect to + * @param hash the hash method(s) to use, or NULL for all supported + * hashes + */ + private String getChallengeResponse(String username, String password, String language, String database, String hash) + throws MCLParseException, MCLException, IOException { + String response; + String algo; + + // parse the challenge string, split it on ':' + String[] chaltok = chalstr.split(":"); + if (chaltok.length <= 4) throw + new MCLParseException("Server challenge string unusable! Challenge contains too few tokens: " + chalstr); + + // challenge string to use as salt/key + String challenge = chaltok[0]; + String servert = chaltok[1]; + try { + version = Integer.parseInt(chaltok[2].trim()); // protocol version + } catch (NumberFormatException e) { + throw new MCLParseException("Protocol version unparseable: " + chaltok[3]); + } + + // handle the challenge according to the version it is + switch (version) { + default: + throw new MCLException("Unsupported protocol version: " + version); + case 9: + // proto 9 is like 8, but uses a hash instead of the + // plain password, the server tells us which hash in the + // challenge after the byte-order + + /* NOTE: Java doesn't support RIPEMD160 :( */ + switch (chaltok[5]) { + case "SHA512": + algo = "SHA-512"; + break; + case "SHA384": + algo = "SHA-384"; + break; + case "SHA256": + algo = "SHA-256"; + /* NOTE: Java doesn't support SHA-224 */ + break; + case "SHA1": + algo = "SHA-1"; + break; + case "MD5": + algo = "MD5"; + break; + default: + throw new MCLException("Unsupported password hash: " + chaltok[5]); + } + + try { + MessageDigest md = MessageDigest.getInstance(algo); + md.update(password.getBytes("UTF-8")); + byte[] digest = md.digest(); + password = toHex(digest); + } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) { + throw new AssertionError("internal error: " + e.toString()); + } + + // proto 7 (finally) used the challenge and works with a + // password hash. The supported implementations come + // from the server challenge. We chose the best hash + // we can find, in the order SHA1, MD5, plain. Also, + // the byte-order is reported in the challenge string, + // which makes sense, since only blockmode is supported. + // proto 8 made this obsolete, but retained the + // byte-order report for future "binary" transports. In + // proto 8, the byte-order of the blocks is always little + // endian because most machines today are. + String hashes = (hash == null ? chaltok[3] : hash); + Set<String> hashesSet = new HashSet<>(Arrays.asList(hashes.toUpperCase().split("[, ]"))); + + // if we deal with merovingian, mask our credentials + if (servert.equals("merovingian") && !language.equals("control")) { + username = "merovingian"; + password = "merovingian"; + } + String pwhash; + + if (hashesSet.contains("SHA512")) { + algo = "SHA-512"; + pwhash = "{SHA512}"; + } else if (hashesSet.contains("SHA384")) { + algo = "SHA-384"; + pwhash = "{SHA384}"; + } else if (hashesSet.contains("SHA256")) { + algo = "SHA-256"; + pwhash = "{SHA256}"; + } else if (hashesSet.contains("SHA1")) { + algo = "SHA-1"; + pwhash = "{SHA1}"; + } else if (hashesSet.contains("MD5")) { + algo = "MD5"; + pwhash = "{MD5}"; + } else { + throw new MCLException("no supported password hashes in " + hashes); + } + try { + MessageDigest md = MessageDigest.getInstance(algo); + md.update(password.getBytes("UTF-8")); + md.update(challenge.getBytes("UTF-8")); + byte[] digest = md.digest(); + pwhash += toHex(digest); + } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) { + throw new AssertionError("internal error: " + e.toString()); + } + // TODO: some day when we need this, we should store + // this + switch (chaltok[4]) { + case "BIG": + // byte-order of server is big-endian + break; + case "LIT": + // byte-order of server is little-endian + break; + default: + throw new MCLParseException("Invalid byte-order: " + chaltok[5]); + } + + // generate response + response = "BIG:"; // JVM byte-order is big-endian + response += username + ":" + pwhash + ":" + language; + response += ":" + (database == null ? "" : database) + ":"; + + return response; + } + } + +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/MonetDBLanguage.java @@ -0,0 +1,31 @@ +package nl.cwi.monetdb.mcl.connection; + +/** + * Created by ferreira on 11/30/16. + */ +public enum MonetDBLanguage { + + /** the SQL language */ + LANG_SQL(new byte[][]{"s".getBytes(), "\n;".getBytes(), "\n;\n".getBytes()}, new byte[][]{"X".getBytes(), null, "\nX".getBytes()}), + /** the MAL language (officially *NOT* supported) */ + LANG_MAL(new byte[][]{null, ";\n".getBytes(), ";\n".getBytes()}, new byte[][]{null, null, null}), + /** an unknown language */ + LANG_UNKNOWN(null, null); + + MonetDBLanguage(byte[][] queryTemplate, byte[][] commandTemplate) { + this.queryTemplate = queryTemplate; + this.commandTemplate = commandTemplate; + } + + private final byte[][] queryTemplate; + + private final byte[][] commandTemplate; + + public byte[] getQueryTemplateIndex(int index) { + return queryTemplate[index]; + } + + public byte[] getCommandTemplateIndex(int index) { + return commandTemplate[index]; + } +}
--- a/src/main/java/nl/cwi/monetdb/mcl/io/AbstractMCLReader.java +++ b/src/main/java/nl/cwi/monetdb/mcl/io/AbstractMCLReader.java @@ -9,9 +9,6 @@ import java.io.Reader; */ public abstract class AbstractMCLReader extends BufferedReader { - /** The type of the last line read */ - protected int lineType; - /** "there is currently no line", or the the type is unknown is represented by UNKNOWN */ public final static int UNKNOWN = 0; @@ -32,6 +29,9 @@ public abstract class AbstractMCLReader /** a line starting with # indicates INFO */ public final static int INFO = '#'; + /** The type of the last line read */ + protected int lineType; + public AbstractMCLReader(Reader in) { super(in); } @@ -47,46 +47,5 @@ public abstract class AbstractMCLReader return lineType; } - /** - * Sets the linetype to the type of the string given. If the string - * is null, lineType is set to UNKNOWN. - * - * @param line the string to examine - */ - public void setLineType(String line) { - lineType = UNKNOWN; - if (line == null || line.length() == 0) - return; - switch (line.charAt(0)) { - case '!': - lineType = ERROR; - break; - case '&': - lineType = SOHEADER; - break; - case '%': - lineType = HEADER; - break; - case '[': - lineType = RESULT; - break; - case '=': - lineType = RESULT; - break; - case '^': - lineType = REDIRECT; - break; - case '#': - lineType = INFO; - break; - case '.': - lineType = PROMPT; - break; - case ',': - lineType = MORE; - break; - } - } - public abstract String waitForPrompt() throws IOException; }
--- a/src/main/java/nl/cwi/monetdb/mcl/io/BufferedMCLReader.java +++ b/src/main/java/nl/cwi/monetdb/mcl/io/BufferedMCLReader.java @@ -8,6 +8,8 @@ package nl.cwi.monetdb.mcl.io; +import nl.cwi.monetdb.mcl.connection.DeleteMe; + import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -37,7 +39,7 @@ import java.io.UnsupportedEncodingExcept * messages as the server receives them. * * @author Fabian Groffen <Fabian.Groffen> - * @see nl.cwi.monetdb.mcl.net.MapiSocket + * @see DeleteMe * @see BufferedMCLWriter */ public class BufferedMCLReader extends AbstractMCLReader { @@ -66,6 +68,47 @@ public class BufferedMCLReader extends A } /** + * Sets the linetype to the type of the string given. If the string + * is null, lineType is set to UNKNOWN. + * + * @param line the string to examine + */ + void setLineType(String line) { + lineType = UNKNOWN; + if (line == null || line.length() == 0) + return; + switch (line.charAt(0)) { + case '!': + lineType = ERROR; + break; + case '&': + lineType = SOHEADER; + break; + case '%': + lineType = HEADER; + break; + case '[': + lineType = RESULT; + break; + case '=': + lineType = RESULT; + break; + case '^': + lineType = REDIRECT; + break; + case '#': + lineType = INFO; + break; + case '.': + lineType = PROMPT; + break; + case ',': + lineType = MORE; + break; + } + } + + /** * Read a line of text. A line is considered to be terminated by * any one of a line feed ('\n'), a carriage return ('\r'), or a * carriage return followed immediately by a linefeed. Before this @@ -85,7 +128,7 @@ public class BufferedMCLReader extends A @Override public String readLine() throws IOException { String r = super.readLine(); - setLineType(r); + this.setLineType(r); if (lineType == ERROR && !r.matches("^![0-9A-Z]{5}!.+")) r = "!22000!" + r.substring(1); return r; @@ -109,7 +152,7 @@ public class BufferedMCLReader extends A StringBuilder res = new StringBuilder(); String tmp; while (lineType != PROMPT) { - if ((tmp = readLine()) == null) + if ((tmp = this.readLine()) == null) throw new IOException("Connection to server lost!"); if (lineType == ERROR) res.append("\n").append(tmp.substring(1));
--- a/src/main/java/nl/cwi/monetdb/mcl/io/BufferedMCLWriter.java +++ b/src/main/java/nl/cwi/monetdb/mcl/io/BufferedMCLWriter.java @@ -8,6 +8,8 @@ package nl.cwi.monetdb.mcl.io; +import nl.cwi.monetdb.mcl.connection.DeleteMe; + import java.io.*; /** @@ -28,7 +30,7 @@ import java.io.*; * class client-oriented when a reader is registered. * * @author Fabian Groffen <Fabian.Groffen> - * @see nl.cwi.monetdb.mcl.net.MapiSocket + * @see DeleteMe * @see BufferedMCLWriter */ public class BufferedMCLWriter extends AbstractMCLWriter { @@ -72,6 +74,6 @@ public class BufferedMCLWriter extends A this.write(line); this.flush(); // reset reader state, last line isn't valid any more now - if (reader != null) reader.setLineType(null); + if (reader != null) ((BufferedMCLReader)reader).setLineType(null); } }
--- a/src/main/java/nl/cwi/monetdb/mcl/io/EmbeddedMCLReader.java +++ b/src/main/java/nl/cwi/monetdb/mcl/io/EmbeddedMCLReader.java @@ -1,21 +1,35 @@ package nl.cwi.monetdb.mcl.io; +import nl.cwi.monetdb.mcl.connection.EmbeddedMonetDB; + import java.io.*; /** * Created by ferreira on 11/24/16. */ -public class EmbeddedMCLReader extends AbstractMCLReader { +public final class EmbeddedMCLReader extends AbstractMCLReader { + + private final EmbeddedMonetDB connection; + + private int readerCurrentPos; - public EmbeddedMCLReader() { + private final int[] responseHeaderValues = new int[4]; + + private String nextLine = ""; + + public EmbeddedMCLReader(EmbeddedMonetDB connection) { super(null); + this.connection = connection; } @Override public String readLine() throws IOException { - String res = this.readLineInternal(); //this readline will never wait!! - setLineType(res); - if (lineType == ERROR && !res.matches("^![0-9A-Z]{5}!.+")) + this.lineType = this.responseHeaderValues[this.readerCurrentPos]; + this.readerCurrentPos++; + + String res = this.nextLine; //this readline will never wait!! + + if (this.lineType == ERROR && !res.matches("^![0-9A-Z]{5}!.+")) res = "!22000!" + res.substring(1); return res; } @@ -29,6 +43,4 @@ public class EmbeddedMCLReader extends A } return null; } - - private native String readLineInternal(); }
--- a/src/main/java/nl/cwi/monetdb/mcl/io/EmbeddedMCLWriter.java +++ b/src/main/java/nl/cwi/monetdb/mcl/io/EmbeddedMCLWriter.java @@ -1,22 +1,43 @@ package nl.cwi.monetdb.mcl.io; +import nl.cwi.monetdb.mcl.connection.EmbeddedMonetDB; +import nl.cwi.monetdb.mcl.parser.embedded.EmbeddedHeaderLineParser; +import nl.cwi.monetdb.mcl.parser.embedded.EmbeddedStartOfHeaderParser; +import nl.cwi.monetdb.mcl.parser.embedded.EmbeddedTupleLineParser; + import java.io.*; /** * Created by ferreira on 11/24/16. */ -public class EmbeddedMCLWriter extends AbstractMCLWriter { +public final class EmbeddedMCLWriter extends AbstractMCLWriter { + + private final EmbeddedMonetDB connection; + + private final EmbeddedStartOfHeaderParser sohp; + + private EmbeddedHeaderLineParser nexthlp; + + private EmbeddedTupleLineParser nexttlp; - public EmbeddedMCLWriter(EmbeddedMCLReader reader) { + public void setNexthlp(EmbeddedHeaderLineParser nexthlp) { + this.nexthlp = nexthlp; + } + + public void setNexttlp(EmbeddedTupleLineParser nexttlp) { + this.nexttlp = nexttlp; + } + + public EmbeddedMCLWriter(EmbeddedMonetDB con, EmbeddedMCLReader reader, EmbeddedStartOfHeaderParser sohp) { super(null); + this.connection = con; this.reader = reader; + this.sohp = sohp; } @Override public void writeLine(String line) throws IOException { - this.writeInternal(line); this.reader.notify(); //wake up the embedded reader. } - private native void writeInternal(String str); }
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/io/InternalConnection.java @@ -0,0 +1,27 @@ +package nl.cwi.monetdb.mcl.io; + +import nl.cwi.monetdb.embedded.env.IEmbeddedConnection; + +/** + * Created by ferreira on 11/30/16. + */ +public class InternalConnection implements IEmbeddedConnection { + + private long connectionPointer; + + public InternalConnection(long connectionPointer) { + this.connectionPointer = connectionPointer; + } + + @Override + public long getConnectionPointer() { + return this.connectionPointer; + } + + @Override + public void closeConnectionImplementation() { + this.closeConnectionInternal(this.connectionPointer); + } + + private native void closeConnectionInternal(long connectionPointer); +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/io/SocketConnection.java @@ -0,0 +1,54 @@ +package nl.cwi.monetdb.mcl.io; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +/** + * Created by ferreira on 11/29/16. + */ +public class SocketConnection implements Closeable { + + /** The blocksize (hardcoded in compliance with stream.mx) */ + private static final int BLOCK = 8 * 1024 - 2; + + /** The socket channel */ + private SocketChannel connection; + + public SocketConnection(String hostname, int port) throws IOException { + this.connection = SocketChannel.open(new InetSocketAddress(hostname, port)); + this.connection.configureBlocking(true); + } + + public int getSoTimeout() throws SocketException { + return connection.socket().getSoTimeout(); + } + + public void setSoTimeout(int s) throws SocketException { + connection.socket().setSoTimeout(s); + } + + public int getBlockSize() { + return BLOCK; + } + + public int readMore(ByteBuffer dst) throws IOException { + return connection.read(dst); + } + + public int writeMore(ByteBuffer src) throws IOException { + return connection.write(src); + } + + public void setTcpNoDelay(boolean on) throws SocketException { + this.connection.socket().setTcpNoDelay(on); + } + + @Override + public void close() throws IOException { + connection.close(); + } +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/io/SocketIOHandler.java @@ -0,0 +1,141 @@ +package nl.cwi.monetdb.mcl.io; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Created by ferreira on 11/29/16. + */ +public class SocketIOHandler { + + private static final int CHAR_SIZE = Character.SIZE / Byte.SIZE; + + private static final int SHORT_SIZE = Short.SIZE / Byte.SIZE; + + private static final int INTEGER_SIZE = Integer.SIZE / Byte.SIZE; + + private static final int LONG_SIZE = Long.SIZE / Byte.SIZE; + + private static final int FLOAT_SIZE = Float.SIZE / Byte.SIZE; + + private static final int DOUBLE_SIZE = Double.SIZE / Byte.SIZE; + + private static final int INTERMEDIATE_BUFFER_SIZE = 1024; + + private boolean hasFinished; + + private ByteBuffer bufferIn = ByteBuffer.allocateDirect(INTERMEDIATE_BUFFER_SIZE); + + private ByteBuffer bufferOut = ByteBuffer.allocateDirect(INTERMEDIATE_BUFFER_SIZE); + + private final SocketConnection connection; + + public SocketIOHandler(SocketConnection connection) { + this.connection = connection; + } + + public SocketConnection getConnection() { + return connection; + } + + public byte readNextByte() throws IOException { + if(this.bufferIn.remaining() < Byte.SIZE) { + this.refillBufferIn(); + } + return this.bufferIn.get(); + } + + public char readNextChar() throws IOException { + if(this.bufferIn.remaining() < CHAR_SIZE) { + this.refillBufferIn(); + } + return this.bufferIn.getChar(); + } + + public short readNextShort() throws IOException { + if(this.bufferIn.remaining() < SHORT_SIZE) { + this.refillBufferIn(); + } + return this.bufferIn.getShort(); + } + + public int readNextInt() throws IOException { + if(this.bufferIn.remaining() < INTEGER_SIZE) { + this.refillBufferIn(); + } + return this.bufferIn.getInt(); + } + + public long readNextLong() throws IOException { + if(this.bufferIn.remaining() < LONG_SIZE) { + this.refillBufferIn(); + } + return this.bufferIn.getLong(); + } + + public float readNextFloat() throws IOException { + if(this.bufferIn.remaining() < FLOAT_SIZE) { + this.refillBufferIn(); + } + return this.bufferIn.getFloat(); + } + + public double readNextDouble() throws IOException { + if(this.bufferIn.remaining() < DOUBLE_SIZE) { + this.refillBufferIn(); + } + return this.bufferIn.getDouble(); + } + + public void readUntilChar(StringBuilder builder, char limit) throws IOException { + builder.setLength(0); + boolean found = false; + + while(!found) { + if (this.bufferIn.remaining() < CHAR_SIZE) { + this.refillBufferIn(); + } + char next = this.bufferIn.getChar(); + builder.append(next); + if(next == limit) { + found = true; + } + } + } + + private void refillBufferIn() throws IOException { + bufferIn.compact(); + if(!hasFinished) { + try { + connection.readMore(this.bufferIn); + bufferIn.flip(); + } catch (IOException ex) { + hasFinished = true; + } + } else { + throw new IOException("Done!"); + } + } + + public void writeNextLine(byte[] prefix, String line, byte[] suffix) throws IOException { + bufferOut.clear(); + this.writeNextBlock(prefix); + this.writeNextBlock(line.getBytes()); + this.writeNextBlock(suffix); + if (bufferOut.hasRemaining()) { + bufferOut.flip(); + connection.writeMore(this.bufferOut); + } + } + + private void writeNextBlock(byte[] block) throws IOException { + for (byte aBlock : block) { + if (!bufferOut.hasRemaining()) { + bufferOut.flip(); + connection.writeMore(this.bufferOut); + bufferOut.clear(); + } + bufferOut.put(aBlock); + } + } +}
--- a/src/main/java/nl/cwi/monetdb/mcl/parser/MCLParser.java +++ b/src/main/java/nl/cwi/monetdb/mcl/parser/MCLParser.java @@ -27,7 +27,8 @@ public abstract class MCLParser { public final String values[]; /** The int values found while parsing. Public, you may touch it. */ public final int intValues[]; - private int colnr; + + protected int colnr; /** * Creates an MCLParser targeted at a given number of field values.
--- a/src/main/java/nl/cwi/monetdb/mcl/parser/StartOfHeaderParser.java +++ b/src/main/java/nl/cwi/monetdb/mcl/parser/StartOfHeaderParser.java @@ -31,8 +31,6 @@ public abstract class StartOfHeaderParse protected int pos; - public StartOfHeaderParser() {} - public abstract int parse(String in) throws MCLParseException; public abstract int getNextAsInt() throws MCLParseException;
--- a/src/main/java/nl/cwi/monetdb/mcl/parser/embedded/EmbeddedHeaderLineParser.java +++ b/src/main/java/nl/cwi/monetdb/mcl/parser/embedded/EmbeddedHeaderLineParser.java @@ -8,6 +8,8 @@ import nl.cwi.monetdb.mcl.parser.MCLPars */ public class EmbeddedHeaderLineParser extends HeaderLineParser { + private long resultSetPointer; + /** * Creates an MCLParser targeted at a given number of field values. * The lines parsed by an instance of this MCLParser should have @@ -21,6 +23,22 @@ public class EmbeddedHeaderLineParser ex @Override public int parse(String source) throws MCLParseException { - return 0; + /*switch(this.colnr) { + case HeaderLineParser.NAME: + System.arraycopy(this.columnNames, 0, this.values, 0, this.values.length); + break; + case HeaderLineParser.LENGTH: + System.arraycopy(this.columnLengths, 0, this.intValues, 0, this.intValues.length); + break; + case HeaderLineParser.TABLE: + System.arraycopy(this.columnTables, 0, this.values, 0, this.values.length); + break; + case HeaderLineParser.TYPE: + System.arraycopy(this.columnTypes, 0, this.values, 0, this.values.length); + break; + }*/ + return this.parseNextHeadLineInternal(); } + + private native int parseNextHeadLineInternal(); }
--- a/src/main/java/nl/cwi/monetdb/mcl/parser/embedded/EmbeddedStartOfHeaderParser.java +++ b/src/main/java/nl/cwi/monetdb/mcl/parser/embedded/EmbeddedStartOfHeaderParser.java @@ -8,18 +8,48 @@ import nl.cwi.monetdb.mcl.parser.StartOf */ public class EmbeddedStartOfHeaderParser extends StartOfHeaderParser { + private int nextResponseType; + + private final int[] nextIntValues = new int[4]; + + private String nextStringValue; + @Override public int parse(String in) throws MCLParseException { - return 0; + this.pos = 0; + switch (this.nextResponseType) { + /*case Q_PARSE:*/ + case Q_SCHEMA: + this.len = 0; + break; + case Q_TABLE: + case Q_PREPARE: + this.len = 4; + break; + case Q_UPDATE: + this.len = 2; + break; + case Q_TRANS: + this.len = 1; + break; + /*case Q_BLOCK: + len = 3; + break;*/ + default: + throw new MCLParseException("invalid or unknown header", 1); + } + return this.nextResponseType; } @Override public int getNextAsInt() throws MCLParseException { - return 0; + int res = this.nextIntValues[this.pos]; + this.pos++; + return res; } @Override public String getNextAsString() throws MCLParseException { - return null; + return this.nextStringValue; } }
--- a/src/main/java/nl/cwi/monetdb/mcl/parser/socket/SocketHeaderLineParser.java +++ b/src/main/java/nl/cwi/monetdb/mcl/parser/socket/SocketHeaderLineParser.java @@ -11,7 +11,6 @@ package nl.cwi.monetdb.mcl.parser.socket import nl.cwi.monetdb.mcl.parser.HeaderLineParser; import nl.cwi.monetdb.mcl.parser.MCLParseException; -import nl.cwi.monetdb.mcl.parser.MCLParser; /** * The SocketHeaderLineParser is a generic MCLParser that extracts values from
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/AbstractProtocolParser.java @@ -0,0 +1,46 @@ +package nl.cwi.monetdb.mcl.protocol; + +/** + * Created by ferreira on 11/30/16. + */ +public abstract class AbstractProtocolParser { + + private ServerResponses currentServerResponseHeader = ServerResponses.UNKNOWN; + + private StarterHeaders currentStarterHeader = StarterHeaders.Q_UNKNOWN; + + private TableResultHeaders currentTableResultSetHeader = TableResultHeaders.UNKNOWN; + + public ServerResponses getCurrentServerResponseHeader() { + return currentServerResponseHeader; + } + + public StarterHeaders getCurrentStarterHeader() { + return currentStarterHeader; + } + + public TableResultHeaders getCurrentTableResultSetHeader() { + return currentTableResultSetHeader; + } + + public ServerResponses getNextResponseHeader() { + this.currentServerResponseHeader = this.getNextResponseHeaderImplementation(); + return this.currentServerResponseHeader; + } + + public StarterHeaders getNextStarterHeader() { + this.currentStarterHeader = this.getNextStarterHeaderImplementation(); + return this.currentStarterHeader; + } + + public TableResultHeaders getNextTableHeader() { + this.currentTableResultSetHeader = this.getNextTableHeaderImplementation(); + return this.currentTableResultSetHeader; + } + + protected abstract ServerResponses getNextResponseHeaderImplementation(); + + protected abstract TableResultHeaders getNextTableHeaderImplementation(); + + protected abstract StarterHeaders getNextStarterHeaderImplementation(); +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/ServerResponses.java @@ -0,0 +1,26 @@ +package nl.cwi.monetdb.mcl.protocol; + +/** + * Created by ferreira on 11/30/16. + */ +public enum ServerResponses { + + /** "there is currently no line", or the the type is unknown is represented by UNKNOWN */ + UNKNOWN, + /** a line starting with ! indicates ERROR */ + ERROR, + /** a line starting with % indicates HEADER */ + HEADER, + /** a line starting with [ indicates RESULT */ + RESULT, + /** a line which matches the pattern of prompt1 is a PROMPT */ + PROMPT, + /** a line which matches the pattern of prompt2 is a MORE */ + MORE, + /** a line starting with & indicates the start of a header block */ + SOHEADER, + /** a line starting with ^ indicates REDIRECT */ + REDIRECT, + /** a line starting with # indicates INFO */ + INFO +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/StarterHeaders.java @@ -0,0 +1,25 @@ +package nl.cwi.monetdb.mcl.protocol; + +/** + * Created by ferreira on 11/30/16. + */ +public enum StarterHeaders { + + /** A parse response (not handled) */ + Q_PARSE, + /** A tabular response (typical ResultSet) */ + Q_TABLE, + /** A response to an update statement, contains number of affected rows and generated key-id */ + Q_UPDATE, + /** A response to a schema update */ + Q_SCHEMA, + /** A response to a transaction statement (start, rollback, abort, commit) */ + Q_TRANS, + /** A tabular response in response to a PREPARE statement containing + * information about the wildcard values that need to be supplied */ + Q_PREPARE, + /** A tabular continuation response (for a ResultSet) */ + Q_BLOCK, + /** An unknown and unsupported response */ + Q_UNKNOWN +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/TableResultHeaders.java @@ -0,0 +1,12 @@ +package nl.cwi.monetdb.mcl.protocol; + +/** + * Created by ferreira on 11/30/16. + */ +public enum TableResultHeaders { + UNKNOWN, + NAME, + LENGTH, + TABLE, + TYPE +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/embedded/EmbeddedProtocol.java @@ -0,0 +1,34 @@ +package nl.cwi.monetdb.mcl.protocol.embedded; + +import nl.cwi.monetdb.mcl.io.InternalConnection; +import nl.cwi.monetdb.mcl.protocol.AbstractProtocolParser; +import nl.cwi.monetdb.mcl.protocol.ServerResponses; +import nl.cwi.monetdb.mcl.protocol.StarterHeaders; +import nl.cwi.monetdb.mcl.protocol.TableResultHeaders; + +/** + * Created by ferreira on 11/30/16. + */ +public class EmbeddedProtocol extends AbstractProtocolParser { + + private final InternalConnection embeddedConnection; + + public EmbeddedProtocol(InternalConnection embeddedConnection) { + this.embeddedConnection = embeddedConnection; + } + + @Override + public ServerResponses getNextResponseHeaderImplementation() { + return null; + } + + @Override + public StarterHeaders getNextStarterHeaderImplementation() { + return null; + } + + @Override + public TableResultHeaders getNextTableHeaderImplementation() { + return null; + } +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/newmapi/NewMapiProtocol.java @@ -0,0 +1,35 @@ +package nl.cwi.monetdb.mcl.protocol.newmapi; + +import nl.cwi.monetdb.mcl.io.SocketConnection; +import nl.cwi.monetdb.mcl.io.SocketIOHandler; +import nl.cwi.monetdb.mcl.protocol.AbstractProtocolParser; +import nl.cwi.monetdb.mcl.protocol.ServerResponses; +import nl.cwi.monetdb.mcl.protocol.StarterHeaders; +import nl.cwi.monetdb.mcl.protocol.TableResultHeaders; + +/** + * Created by ferreira on 11/30/16. + */ +public class NewMapiProtocol extends AbstractProtocolParser { + + private final SocketIOHandler handler; + + public NewMapiProtocol(SocketConnection con) { + this.handler = new SocketIOHandler(con); + } + + @Override + public ServerResponses getNextResponseHeaderImplementation() { + return null; + } + + @Override + public StarterHeaders getNextStarterHeaderImplementation() { + return null; + } + + @Override + public TableResultHeaders getNextTableHeaderImplementation() { + return null; + } +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/oldmapi/OldMapiConverter.java @@ -0,0 +1,56 @@ +package nl.cwi.monetdb.mcl.protocol.oldmapi; + +import nl.cwi.monetdb.mcl.protocol.ServerResponses; +import nl.cwi.monetdb.mcl.protocol.StarterHeaders; + +/** + * Created by ferreira on 11/30/16. + */ +public final class OldMapiConverter { + + static ServerResponses GetNextResponseOnOldMapi(char nextChar) { + switch (nextChar) { + case '!': + return ServerResponses.ERROR; + case '&': + return ServerResponses.SOHEADER; + case '%': + return ServerResponses.HEADER; + case '[': + return ServerResponses.RESULT; + case '=': + return ServerResponses.RESULT; + case '^': + return ServerResponses.REDIRECT; + case '#': + return ServerResponses.INFO; + case '.': + return ServerResponses.PROMPT; + case ',': + return ServerResponses.MORE; + default: + return ServerResponses.UNKNOWN; + } + } + + static StarterHeaders GetNextStartHeaderOnOldMapi(char nextChar) { + switch (nextChar) { + case '0': + return StarterHeaders.Q_PARSE; + case '1': + return StarterHeaders.Q_TABLE; + case '2': + return StarterHeaders.Q_UPDATE; + case '3': + return StarterHeaders.Q_SCHEMA; + case '4': + return StarterHeaders.Q_TRANS; + case '5': + return StarterHeaders.Q_PREPARE; + case '6': + return StarterHeaders.Q_BLOCK; + default: + return StarterHeaders.Q_UNKNOWN; + } + } +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/oldmapi/OldMapiProtocol.java @@ -0,0 +1,51 @@ +package nl.cwi.monetdb.mcl.protocol.oldmapi; + +import nl.cwi.monetdb.mcl.io.SocketConnection; +import nl.cwi.monetdb.mcl.io.SocketIOHandler; +import nl.cwi.monetdb.mcl.protocol.AbstractProtocolParser; +import nl.cwi.monetdb.mcl.protocol.ServerResponses; +import nl.cwi.monetdb.mcl.protocol.StarterHeaders; +import nl.cwi.monetdb.mcl.protocol.TableResultHeaders; + +import java.io.IOException; + +/** + * Created by ferreira on 11/30/16. + */ +public class OldMapiProtocol extends AbstractProtocolParser { + + private final SocketIOHandler handler; + + public OldMapiProtocol(SocketConnection con) { + this.handler = new SocketIOHandler(con); + } + + public SocketIOHandler getHandler() { + return handler; + } + + @Override + public ServerResponses getNextResponseHeaderImplementation() { + try { + char nextToken = handler.readNextChar(); + return OldMapiConverter.GetNextResponseOnOldMapi(nextToken); + } catch (IOException e) { + return ServerResponses.ERROR; + } + } + + @Override + public StarterHeaders getNextStarterHeaderImplementation() { + try { + char nextToken = handler.readNextChar(); + return OldMapiConverter.GetNextStartHeaderOnOldMapi(nextToken); + } catch (IOException e) { + return StarterHeaders.Q_UNKNOWN; + } + } + + @Override + public TableResultHeaders getNextTableHeaderImplementation() { + return null; + } +}
--- a/src/main/java/nl/cwi/monetdb/merovingian/Control.java +++ b/src/main/java/nl/cwi/monetdb/merovingian/Control.java @@ -10,7 +10,7 @@ package nl.cwi.monetdb.merovingian; import nl.cwi.monetdb.mcl.io.AbstractMCLReader; import nl.cwi.monetdb.mcl.io.AbstractMCLWriter; -import nl.cwi.monetdb.mcl.net.MapiSocket; +import nl.cwi.monetdb.mcl.connection.DeleteMe; import nl.cwi.monetdb.mcl.MCLException; import nl.cwi.monetdb.mcl.parser.MCLParseException; @@ -117,7 +117,7 @@ public class Control { { AbstractMCLReader min; AbstractMCLWriter mout; - MapiSocket ms = new MapiSocket(host, port, "monetdb", "monetdb", false, "sql", "SHA256"); + DeleteMe ms = new DeleteMe(host, port, "monetdb", "monetdb", false, "sql", "SHA256"); ms.setDatabase("merovingian"); ms.setLanguage("control"); if (debug != null) @@ -131,7 +131,7 @@ public class Control { } catch (AssertionError e) { // mcl panics ms.close(); - // Try old protocol instead + // Try oldmapi protocol instead Socket s; PrintStream out; BufferedReader in;
--- a/src/main/java/nl/cwi/monetdb/util/SQLRestore.java +++ b/src/main/java/nl/cwi/monetdb/util/SQLRestore.java @@ -17,7 +17,7 @@ import java.util.concurrent.atomic.Atomi import nl.cwi.monetdb.mcl.MCLException; import nl.cwi.monetdb.mcl.io.AbstractMCLReader; import nl.cwi.monetdb.mcl.io.AbstractMCLWriter; -import nl.cwi.monetdb.mcl.net.MapiSocket; +import nl.cwi.monetdb.mcl.connection.DeleteMe; import nl.cwi.monetdb.mcl.parser.MCLParseException; /** @@ -104,7 +104,7 @@ public class SQLRestore { * @throws IOException */ public void restore(File source) throws IOException { - MapiSocket ms = new MapiSocket(_host, _port, _dbName, _user, false, "sql", "SHA256"); + DeleteMe ms = new DeleteMe(_host, _port, _dbName, _user, false, "sql", "SHA256"); try { ms.connect(_user, _password);