Mercurial > hg > monetdb-java
changeset 61:f1de7262d8d9 embedded
First changes to the JDBC driver.
line wrap: on
line diff
--- a/example/SQLcopyinto.java +++ b/example/SQLcopyinto.java @@ -9,8 +9,10 @@ import java.sql.*; import java.io.*; import java.util.*; + +import nl.cwi.monetdb.mcl.connection.AbstractBufferedReader; +import nl.cwi.monetdb.mcl.connection.AbstractBufferedWriter; import nl.cwi.monetdb.mcl.net.*; -import nl.cwi.monetdb.mcl.io.*; /** * This example demonstrates how the MonetDB JDBC driver can facilitate @@ -45,22 +47,22 @@ public class SQLcopyinto { // of course also be done simultaneously with the JDBC // connection being kept connected - MapiSocket server = new MapiSocket(); + MapiSocket server = new MapiSocket("localhost", 50000, "monetdb", "monetdb", false, "sql", "SHA256"); server.setDatabase("database"); server.setLanguage("sql"); try { List warning = - server.connect("localhost", 50000, "monetdb", "monetdb"); + server.connect( "monetdb", "monetdb"); if (warning != null) { - for (Iterator it = warning.iterator(); it.hasNext(); ) { - System.out.println(it.next().toString()); + for (Object aWarning : warning) { + System.out.println(aWarning.toString()); } } - BufferedMCLReader in = server.getReader(); - BufferedMCLWriter out = server.getWriter(); + AbstractBufferedReader in = server.getReader(); + AbstractBufferedWriter out = server.getWriter(); String error = in.waitForPrompt(); if (error != null)
--- a/src/main/java/nl/cwi/monetdb/client/JdbcClient.java +++ b/src/main/java/nl/cwi/monetdb/client/JdbcClient.java @@ -329,7 +329,7 @@ public final class JdbcClient { // the most optimal way, but it works by just scanning // every table for loops in a recursive manor for (Table t : tables) { - Table.checkForLoop(t, new ArrayList<>()); + Table.checkForLoop(t, new ArrayList<Table>()); } // find the graph, at this point we know there are no
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/embedded/env/IEmbeddedConnection.java @@ -0,0 +1,11 @@ +package nl.cwi.monetdb.embedded.env; + +/** + * Created by ferreira on 11/24/16. + */ +public interface IEmbeddedConnection { + + long getConnectionPointer(); + + void closeConnectionImplementation(); +}
--- a/src/main/java/nl/cwi/monetdb/embedded/env/MonetDBEmbeddedConnection.java +++ b/src/main/java/nl/cwi/monetdb/embedded/env/MonetDBEmbeddedConnection.java @@ -22,15 +22,15 @@ import java.util.concurrent.ConcurrentHa * * @author <a href="mailto:pedro.ferreira@monetdbsolutions.com">Pedro Ferreira</a> */ -public class MonetDBEmbeddedConnection { +public class MonetDBEmbeddedConnection implements IEmbeddedConnection { - protected final long connectionPointer; + private final long connectionPointer; private final ConcurrentHashMap<Long, AbstractConnectionResult> results = new ConcurrentHashMap<>(); protected MonetDBEmbeddedConnection(long connectionPointer) { this.connectionPointer = connectionPointer; } - protected long getConnectionPointer() { return connectionPointer; } + public long getConnectionPointer() { return connectionPointer; } /** * Gets the current schema set on the connection. @@ -215,7 +215,7 @@ public class MonetDBEmbeddedConnection { /** * When the database is shuts down, this method is called instead */ - protected void closeConnectionImplementation() { + public void closeConnectionImplementation() { for(AbstractConnectionResult res : this.results.values()) { res.closeImplementation(); }
--- a/src/main/java/nl/cwi/monetdb/embedded/env/MonetDBEmbeddedDatabase.java +++ b/src/main/java/nl/cwi/monetdb/embedded/env/MonetDBEmbeddedDatabase.java @@ -8,6 +8,8 @@ package nl.cwi.monetdb.embedded.env; +import nl.cwi.monetdb.mcl.embedded.EmbeddedConnection; + import java.util.concurrent.ConcurrentHashMap; /** @@ -112,7 +114,7 @@ public class MonetDBEmbeddedDatabase { if(MonetDBEmbeddedDatabase == null) { throw new MonetDBEmbeddedException("The database is not running!"); } else { - for(MonetDBEmbeddedConnection mdbec : MonetDBEmbeddedDatabase.connections.values()) { + for(IEmbeddedConnection mdbec : MonetDBEmbeddedDatabase.connections.values()) { mdbec.closeConnectionImplementation(); } MonetDBEmbeddedDatabase.connections.clear(); @@ -136,7 +138,7 @@ public class MonetDBEmbeddedDatabase { private final boolean sequentialFlag; - private final ConcurrentHashMap<Long, MonetDBEmbeddedConnection> connections = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Long, IEmbeddedConnection> connections = new ConcurrentHashMap<>(); private MonetDBEmbeddedDatabase(String dbDirectory, boolean silentFlag, boolean sequentialFlag) { this.databaseDirectory = dbDirectory; @@ -170,11 +172,20 @@ public class MonetDBEmbeddedDatabase { return CompletableFuture.supplyAsync(() -> this.createConnectionInternal()); }*/ + public static void AddJDBCEmbeddedConnection(EmbeddedConnection con) throws MonetDBEmbeddedException { + if(MonetDBEmbeddedDatabase == null) { + throw new MonetDBEmbeddedException("The database is not running!"); + } else { + MonetDBEmbeddedDatabase.createJDBCConnectionInternal(con); + MonetDBEmbeddedDatabase.connections.put(con.getConnectionPointer(), con); + } + } + /** * Removes a connection from this database. */ protected static void RemoveConnection(MonetDBEmbeddedConnection con) { - MonetDBEmbeddedDatabase.connections.remove(con.connectionPointer); + MonetDBEmbeddedDatabase.connections.remove(con.getConnectionPointer()); } /** @@ -193,4 +204,9 @@ public class MonetDBEmbeddedDatabase { * Internal implementation to create a connection on this database. */ private native MonetDBEmbeddedConnection createConnectionInternal() throws MonetDBEmbeddedException; + + /** + * Internal implementation to create a JDBC embeddded connection on this database. + */ + private native void createJDBCConnectionInternal(EmbeddedConnection emc) throws MonetDBEmbeddedException; }
--- a/src/main/java/nl/cwi/monetdb/jdbc/MonetConnection.java +++ b/src/main/java/nl/cwi/monetdb/jdbc/MonetConnection.java @@ -44,9 +44,11 @@ import java.util.concurrent.locks.Reentr import nl.cwi.monetdb.jdbc.types.INET; import nl.cwi.monetdb.jdbc.types.URL; import nl.cwi.monetdb.mcl.MCLException; -import nl.cwi.monetdb.mcl.io.BufferedMCLReader; -import nl.cwi.monetdb.mcl.io.BufferedMCLWriter; +import nl.cwi.monetdb.mcl.connection.AbstractBufferedReader; +import nl.cwi.monetdb.mcl.connection.AbstractBufferedWriter; +import nl.cwi.monetdb.mcl.embedded.EmbeddedConnection; import nl.cwi.monetdb.mcl.net.MapiSocket; +import nl.cwi.monetdb.mcl.connection.AbstractMonetDBConnection; import nl.cwi.monetdb.mcl.parser.HeaderLineParser; import nl.cwi.monetdb.mcl.parser.MCLParseException; import nl.cwi.monetdb.mcl.parser.StartOfHeaderParser; @@ -77,22 +79,13 @@ import nl.cwi.monetdb.mcl.parser.StartOf * @version 1.2 */ public class MonetConnection extends MonetWrapper implements Connection { - /** The hostname to connect to */ - private final String hostname; - /** The port to connect on the host to */ - private final int port; - /** The database to use (currently not used) */ - private final String database; - /** The username to use when authenticating */ - private final String username; - /** The password to use when authenticating */ - private final String password; - /** A connection to mserver5 using a TCP socket */ - private final MapiSocket server; + + /** A connection to mserver5 either through MAPI with TCP or embedded */ + private final AbstractMonetDBConnection server; /** The Reader from the server */ - private final BufferedMCLReader in; + private final AbstractBufferedReader in; /** The Writer to the server */ - private final BufferedMCLWriter out; + private final AbstractBufferedWriter out; /** A StartOfHeaderParser declared for reuse. */ private StartOfHeaderParser sohp = new StartOfHeaderParser(); @@ -125,20 +118,6 @@ public class MonetConnection extends Mon /** The number of results we receive from the server at once */ private int curReplySize = -1; // the server by default uses -1 (all) - /** A template to apply to each query (like pre and post fixes) */ - String[] queryTempl; - /** A template to apply to each command (like pre and post fixes) */ - String[] commandTempl; - - /** the SQL language */ - final static int LANG_SQL = 0; - /** the MAL language (officially *NOT* supported) */ - final static int LANG_MAL = 3; - /** an unknown language */ - final static int LANG_UNKNOWN = -1; - /** The language which is used */ - final int lang; - /** Whether or not BLOB is mapped to BINARY within the driver */ private final boolean blobIsBinary; @@ -157,50 +136,58 @@ public class MonetConnection extends Mon MonetConnection(Properties props) throws SQLException, IllegalArgumentException { - this.hostname = props.getProperty("host"); - int port; - try { - port = Integer.parseInt(props.getProperty("port")); - } catch (NumberFormatException e) { - port = 0; - } - this.port = port; - this.database = props.getProperty("database"); - this.username = props.getProperty("user"); - this.password = props.getProperty("password"); - String language = props.getProperty("language"); + String database = props.getProperty("database"); + if (database == null || database.trim().isEmpty()) + throw new IllegalArgumentException("database should not be null or empty"); + boolean isEmbedded = Boolean.parseBoolean(props.getProperty("embedded")); + String username = props.getProperty("user"); + String password = props.getProperty("password"); boolean debug = Boolean.valueOf(props.getProperty("debug")); - String hash = props.getProperty("hash"); blobIsBinary = Boolean.valueOf(props.getProperty("treat_blob_as_binary")); - int sockTimeout; - try { - sockTimeout = Integer.parseInt(props.getProperty("so_timeout")); - } catch (NumberFormatException e) { - sockTimeout = 0; + + if(isEmbedded) { + String directory = props.getProperty("directory"); + if (directory == null || directory.trim().isEmpty()) + throw new IllegalArgumentException("directory should not be null or empty"); + + server = new EmbeddedConnection("localhost", -1, database, username, debug, "sql", null, directory); + } else { + String hostname = props.getProperty("host"); + String hash = props.getProperty("hash"); + String language = props.getProperty("language"); + int port = 0; + int sockTimeout = 0; + + try { + port = Integer.parseInt(props.getProperty("port")); + } catch (NumberFormatException e) { + } + try { + sockTimeout = Integer.parseInt(props.getProperty("so_timeout")); + } catch (NumberFormatException e) { + } + + // check input arguments + if (hostname == null || hostname.trim().isEmpty()) + throw new IllegalArgumentException("hostname should not be null or empty"); + if (port == 0) + throw new IllegalArgumentException("port should not be 0"); + if (username == null || username.trim().isEmpty()) + throw new IllegalArgumentException("user should not be null or empty"); + if (password == null || password.trim().isEmpty()) + throw new IllegalArgumentException("password should not be null or empty"); + if (language == null || language.trim().isEmpty()) { + language = "sql"; + addWarning("No language given, defaulting to 'sql'", "M1M05"); + } + server = new MapiSocket(hostname, port, database, username, debug, language, hash); + try { + server.setSoTimeout(sockTimeout); + } catch (SocketException e) { + addWarning("The socket timeout could not be set", "M1M05"); + } + server.setLanguage(language); } - // check input arguments - if (hostname == null || hostname.trim().isEmpty()) - throw new IllegalArgumentException("hostname should not be null or empty"); - if (port == 0) - throw new IllegalArgumentException("port should not be 0"); - if (username == null || username.trim().isEmpty()) - throw new IllegalArgumentException("user should not be null or empty"); - if (password == null || password.trim().isEmpty()) - throw new IllegalArgumentException("password should not be null or empty"); - if (language == null || language.trim().isEmpty()) { - language = "sql"; - addWarning("No language given, defaulting to 'sql'", "M1M05"); - } - - // initialise query templates (filled later, but needed below) - queryTempl = new String[3]; // pre, post, sep - commandTempl = new String[3]; // pre, post, sep - - server = new MapiSocket(); - - if (hash != null) server.setHash(hash); - if (database != null) server.setDatabase(database); - server.setLanguage(language); // we're debugging here... uhm, should be off in real life if (debug) { @@ -224,24 +211,20 @@ public class MonetConnection extends Mon } try { - List<String> warnings = - server.connect(hostname, port, username, password); - for (String warning : warnings) { - addWarning(warning, "01M02"); + List<String> warnings = server.connect(username, password); + if(warnings != null) { + for (String warning : warnings) { + addWarning(warning, "01M02"); + } } - - // apply NetworkTimeout value from legacy (pre 4.1) driver - // so_timeout calls - server.setSoTimeout(sockTimeout); in = server.getReader(); out = server.getWriter(); - - String error = in.waitForPrompt(); + String error = in.waitForPrompt(); //TODO CHECK THIS if (error != null) throw new SQLException(error.substring(6), "08001"); } catch (IOException e) { - throw new SQLException("Unable to connect (" + hostname + ":" + port + "): " + e.getMessage(), "08006"); + throw new SQLException("Unable to connect (" + server.getHostname() + ":" + server.getPort() + "): " + e.getMessage(), "08006"); } catch (MCLParseException e) { throw new SQLException(e.getMessage(), "08001"); } catch (MCLException e) { @@ -253,38 +236,9 @@ public class MonetConnection extends Mon throw sqle; } - // we seem to have managed to log in, let's store the - // language used - if ("sql".equals(language)) { - lang = LANG_SQL; - } else if ("mal".equals(language)) { - lang = LANG_MAL; - } else { - lang = LANG_UNKNOWN; - } - - // fill the query templates - if (lang == LANG_SQL) { - queryTempl[0] = "s"; // pre - queryTempl[1] = "\n;"; // post - queryTempl[2] = "\n;\n"; // separator - - commandTempl[0] = "X"; // pre - commandTempl[1] = null; // post - commandTempl[2] = "\nX"; // separator - } else if (lang == LANG_MAL) { - queryTempl[0] = null; - queryTempl[1] = ";\n"; - queryTempl[2] = ";\n"; - - commandTempl[0] = null; // pre - commandTempl[1] = null; // post - commandTempl[2] = null; // separator - } - // the following initialisers are only valid when the language // is SQL... - if (lang == LANG_SQL) { + if (server.getLang() == AbstractMonetDBConnection.LANG_SQL) { // enable auto commit setAutoCommit(true); // set our time zone on the server @@ -297,9 +251,10 @@ public class MonetConnection extends Mon tz += (offset < 10 ? "0" : "") + offset; sendIndependentCommand("SET TIME ZONE INTERVAL '" + tz + "' HOUR TO MINUTE"); } + } - // we're absolutely not closed, since we're brand new - closed = false; + protected AbstractMonetDBConnection getServer() { + return server; } //== methods of interface Connection @@ -671,7 +626,7 @@ public class MonetConnection extends Mon */ @Override public DatabaseMetaData getMetaData() throws SQLException { - if (lang != LANG_SQL) + if (server.getLang() != AbstractMonetDBConnection.LANG_SQL) throw new SQLException("This method is only supported in SQL mode", "M0M04"); return new MonetDatabaseMetaData(this); @@ -1437,11 +1392,7 @@ public class MonetConnection extends Mon //== end methods of interface Connection public String getJDBCURL() { - String language = ""; - if (lang == LANG_MAL) - language = "?language=mal"; - return "jdbc:monetdb://" + hostname + ":" + port + "/" + - database + language; + return server.getJDBCURL(); } /** @@ -1463,10 +1414,7 @@ public class MonetConnection extends Mon void sendIndependentCommand(String command) throws SQLException { synchronized (server) { try { - out.writeLine( - (queryTempl[0] == null ? "" : queryTempl[0]) + - command + - (queryTempl[1] == null ? "" : queryTempl[1])); + out.writeLine(server.getQueryTemplateHeader(0) + command + server.getQueryTemplateHeader(1)); String error = in.waitForPrompt(); if (error != null) throw new SQLException(error.substring(6), @@ -1493,10 +1441,7 @@ public class MonetConnection extends Mon // send X command synchronized (server) { try { - out.writeLine( - (commandTempl[0] == null ? "" : commandTempl[0]) + - command + - (commandTempl[1] == null ? "" : commandTempl[1])); + out.writeLine(server.getCommandTemplateHeader(0) + command + server.getCommandTemplateHeader(1)); String error = in.waitForPrompt(); if (error != null) throw new SQLException(error.substring(6), @@ -1713,7 +1658,7 @@ public class MonetConnection extends Mon return resultBlocks[0].addLine(tmpLine, linetype); } - if (linetype != BufferedMCLReader.HEADER) + if (linetype != AbstractBufferedReader.HEADER) return "header expected, got: " + tmpLine; // depending on the name of the header, we continue @@ -1949,7 +1894,7 @@ public class MonetConnection extends Mon // ok, need to fetch cache block first parent.executeQuery( - commandTempl, + server.getCommandHeaderTemplates(), "export " + id + " " + ((block * cacheSize) + blockOffset) + " " + cacheSize ); rawr = resultBlocks[block]; @@ -2047,7 +1992,7 @@ public class MonetConnection extends Mon */ @Override public String addLine(String line, int linetype) { - if (linetype != BufferedMCLReader.RESULT) + if (linetype != AbstractBufferedReader.RESULT) return "protocol violation: unexpected line in data block: " + line; // add to the backing array data[++pos] = line; @@ -2341,7 +2286,7 @@ public class MonetConnection extends Mon * @throws SQLException if a database error occurs */ void processQuery(String query) throws SQLException { - executeQuery(queryTempl, query); + executeQuery(server.getQueryHeaderTemplates(), query); } /** @@ -2376,7 +2321,7 @@ public class MonetConnection extends Mon int size = cachesize == 0 ? DEF_FETCHSIZE : cachesize; size = maxrows != 0 ? Math.min(maxrows, size) : size; // don't do work if it's not needed - if (lang == LANG_SQL && size != curReplySize && templ != commandTempl) { + if (server.getLang() == AbstractMonetDBConnection.LANG_SQL && size != curReplySize && templ != server.getCommandHeaderTemplates()) { sendControlCommand("reply_size " + size); // store the reply size after a successful change @@ -2392,7 +2337,7 @@ public class MonetConnection extends Mon // as we are blocking an not consuming from it. The result // is a state where both client and server want to write, // but block. - if (query.length() > MapiSocket.BLOCK) { + if (query.length() > server.getBlockSize()) { // get a reference to the send thread if (sendThread == null) sendThread = new SendThread(out); @@ -2402,21 +2347,18 @@ public class MonetConnection extends Mon } else { // this is a simple call, which is a lot cheaper and will // always succeed for small queries. - out.writeLine( - (templ[0] == null ? "" : templ[0]) + - query + - (templ[1] == null ? "" : templ[1])); + out.writeLine((templ[0] == null ? "" : templ[0] + query + templ[1] == null ? "" : templ[1])); } // go for new results String tmpLine = in.readLine(); int linetype = in.getLineType(); Response res = null; - while (linetype != BufferedMCLReader.PROMPT) { + while (linetype != AbstractBufferedReader.PROMPT) { // each response should start with a start of header // (or error) switch (linetype) { - case BufferedMCLReader.SOHEADER: + case AbstractBufferedReader.SOHEADER: // make the response object, and fill it try { switch (sohp.parse(tmpLine)) { @@ -2545,7 +2487,7 @@ public class MonetConnection extends Mon tmpLine = in.readLine(); linetype = in.getLineType(); break; - case BufferedMCLReader.INFO: + case AbstractBufferedReader.INFO: addWarning(tmpLine.substring(1), "01000"); // read the next line (can be prompt, new @@ -2560,7 +2502,7 @@ public class MonetConnection extends Mon // message tmpLine = "!M0M10!protocol violation, unexpected line: " + tmpLine; // don't break; fall through... - case BufferedMCLReader.ERROR: + case AbstractBufferedReader.ERROR: // read everything till the prompt (should be // error) we don't know if we ignore some // garbage here... but the log should reveal @@ -2637,7 +2579,7 @@ public class MonetConnection extends Mon private String[] templ; private String query; - private BufferedMCLWriter out; + private AbstractBufferedWriter out; private String error; private int state = WAIT; @@ -2651,7 +2593,7 @@ public class MonetConnection extends Mon * * @param out the socket to write to */ - public SendThread(BufferedMCLWriter out) { + public SendThread(AbstractBufferedWriter out) { super("SendThread"); setDaemon(true); this.out = out; @@ -2675,10 +2617,7 @@ public class MonetConnection extends Mon // state is QUERY here try { - out.writeLine( - (templ[0] == null ? "" : templ[0]) + - query + - (templ[1] == null ? "" : templ[1])); + out.writeLine((templ[0] == null ? "" : templ[0]) + query + (templ[1] == null ? "" : templ[1])); } catch (IOException e) { error = e.getMessage(); }
--- a/src/main/java/nl/cwi/monetdb/jdbc/MonetStatement.java +++ b/src/main/java/nl/cwi/monetdb/jdbc/MonetStatement.java @@ -8,7 +8,7 @@ package nl.cwi.monetdb.jdbc; -import nl.cwi.monetdb.mcl.net.MapiSocket; +import nl.cwi.monetdb.mcl.connection.AbstractMonetDBConnection; import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.Statement; @@ -76,7 +76,6 @@ public class MonetStatement extends Mone /** A List to hold all queries of a batch */ private List<String> batch = new ArrayList<>(); - /** * MonetStatement constructor which checks the arguments for validity, tries * to set up a socket to MonetDB and attempts to login. @@ -146,7 +145,7 @@ public class MonetStatement extends Mone batch.clear(); } - Lock batchLock = new ReentrantLock(); + private Lock batchLock = new ReentrantLock(); /** * Submits a batch of commands to the database for execution and if @@ -189,7 +188,7 @@ public class MonetStatement extends Mone */ @Override public int[] executeBatch() throws SQLException { - // this method is synchronized to make sure noone gets inbetween the + // this method is synchronized to make sure none gets in between the // operations we execute below batchLock.lock(); @@ -203,12 +202,14 @@ public class MonetStatement extends Mone boolean first = true; boolean error = false; + AbstractMonetDBConnection server = connection.getServer(); + BatchUpdateException e = new BatchUpdateException("Error(s) occurred while executing the batch, see next SQLExceptions for details", "22000", counts); - StringBuilder tmpBatch = new StringBuilder(MapiSocket.BLOCK); - String sep = connection.queryTempl[2]; + StringBuilder tmpBatch = new StringBuilder(server.getBlockSize()); + String sep = server.getQueryTemplateHeader(2); for (int i = 0; i < batch.size(); i++) { String tmp = batch.get(i); - if (sep.length() + tmp.length() > MapiSocket.BLOCK) { + if (sep.length() + tmp.length() > server.getBlockSize()) { // The thing is too big. Way too big. Since it won't // be optimal anyway, just add it to whatever we have // and continue. @@ -222,7 +223,7 @@ public class MonetStatement extends Mone first = true; continue; } - if (tmpBatch.length() + sep.length() + tmp.length() >= MapiSocket.BLOCK) { + if (tmpBatch.length() + sep.length() + tmp.length() >= server.getBlockSize()) { // send and receive error |= internalBatch(tmpBatch.toString(), counts, offset, i + 1, e); offset = i;
--- a/src/main/java/nl/cwi/monetdb/mcl/MCLException.java +++ b/src/main/java/nl/cwi/monetdb/mcl/MCLException.java @@ -13,12 +13,14 @@ package nl.cwi.monetdb.mcl; * class should be used if no more precise Exception class exists. */ public class MCLException extends Exception { - /** - * - */ + private static final long serialVersionUID = 1L; public MCLException(String e) { super(e); } + + public MCLException(Throwable t) { + super(t); + } }
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/AbstractBufferedReader.java @@ -0,0 +1,92 @@ +package nl.cwi.monetdb.mcl.connection; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.Reader; + +/** + * Created by ferreira on 11/24/16. + */ +public abstract class AbstractBufferedReader extends BufferedReader { + + /** The type of the last line read */ + protected int lineType; + + /** "there is currently no line", or the the type is unknown is + represented by UNKNOWN */ + public final static int UNKNOWN = 0; + /** a line starting with ! indicates ERROR */ + public final static int ERROR = '!'; + /** a line starting with % indicates HEADER */ + public final static int HEADER = '%'; + /** a line starting with [ indicates RESULT */ + public final static int RESULT = '['; + /** a line which matches the pattern of prompt1 is a PROMPT */ + public final static int PROMPT = '.'; + /** a line which matches the pattern of prompt2 is a MORE */ + public final static int MORE = ','; + /** a line starting with & indicates the start of a header block */ + public final static int SOHEADER = '&'; + /** a line starting with ^ indicates REDIRECT */ + public final static int REDIRECT = '^'; + /** a line starting with # indicates INFO */ + public final static int INFO = '#'; + + public AbstractBufferedReader(Reader in) { + super(in); + } + + /** + * getLineType returns the type of the last line read. + * + * @return an integer representing the kind of line this is, one of the + * following constants: UNKNOWN, HEADER, ERROR, PROMPT, + * RESULT, REDIRECT, INFO + */ + public int getLineType() { + return lineType; + } + + /** + * Sets the linetype to the type of the string given. If the string + * is null, lineType is set to UNKNOWN. + * + * @param line the string to examine + */ + public void setLineType(String line) { + lineType = UNKNOWN; + if (line == null || line.length() == 0) + return; + switch (line.charAt(0)) { + case '!': + lineType = ERROR; + break; + case '&': + lineType = SOHEADER; + break; + case '%': + lineType = HEADER; + break; + case '[': + lineType = RESULT; + break; + case '=': + lineType = RESULT; + break; + case '^': + lineType = REDIRECT; + break; + case '#': + lineType = INFO; + break; + case '.': + lineType = PROMPT; + break; + case ',': + lineType = MORE; + break; + } + } + + public abstract String waitForPrompt() throws IOException; +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/AbstractBufferedWriter.java @@ -0,0 +1,30 @@ +package nl.cwi.monetdb.mcl.connection; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.Writer; + +/** + * Created by ferreira on 11/24/16. + */ +public abstract class AbstractBufferedWriter extends BufferedWriter { + + protected AbstractBufferedReader reader; + + public AbstractBufferedWriter(Writer out) { + super(out); + } + + /** + * Registers the given reader in this writer. A registered reader + * receives a linetype reset when a line is written from this + * writer. + * + * @param r an AbstractBufferedReader + */ + public void registerReader(AbstractBufferedReader r) { + this.reader = r; + } + + public abstract void writeLine(String line) throws IOException; +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/AbstractMonetDBConnection.java @@ -0,0 +1,310 @@ +package nl.cwi.monetdb.mcl.connection; + +import nl.cwi.monetdb.mcl.MCLException; +import nl.cwi.monetdb.mcl.parser.MCLParseException; + +import java.io.*; +import java.net.SocketException; +import java.util.List; + +/** + * Created by ferreira on 11/23/16. + */ +public abstract class AbstractMonetDBConnection { + + /** the SQL language */ + public final static int LANG_SQL = 0; + /** the MAL language (officially *NOT* supported) */ + public final static int LANG_MAL = 3; + /** an unknown language */ + public final static int LANG_UNKNOWN = -1; + + /** The hostname to connect to */ + protected String hostname; + /** The port to connect on the host to */ + protected int port = -1; + /** The database to connect to */ + protected String database; + /** The username to use when authenticating */ + protected String username; + /** Whether we are debugging or not */ + protected boolean debug; + /** The language to connect with */ + protected String language; + /** The hash methods to use (null = default) */ + protected String hash; + /** The Writer for the debug log-file */ + protected Writer log; + /** The language which is used */ + protected int lang; + + /** A template to apply to each query (like pre and post fixes) */ + protected String[] queryTempl = new String[3]; // pre, post, sep + /** A template to apply to each command (like pre and post fixes) */ + protected String[] commandTempl = new String[3]; // pre, post, sep + + public AbstractMonetDBConnection(String hostname, int port, String database, String username, boolean debug, String language, String hash, String[] queryTempl, String[] commandTempl) { + this.hostname = hostname; + this.port = port; + this.database = database; + this.username = username; + this.debug = debug; + this.hash = hash; + this.setLanguage(language); + this.queryTempl = queryTempl; + this.commandTempl = commandTempl; + } + + public String getHostname() { + return hostname; + } + + public abstract void setHostname(String hostname); + + public int getPort() { + return port; + } + + public abstract void setPort(int port); + + public String getDatabase() { + return database; + } + + /** + * Sets the database to connect to. If database is null, a + * connection is made to the default database of the server. This + * is also the default. + * + * @param db the database + */ + public abstract void setDatabase(String db); + + public String getUsername() { + return username; + } + + protected void setUsername(String username) { + this.username = username; + } + + public boolean isDebug() { + return debug; + } + + /** + * Enables/disables debug + * + * @param debug Value to set + */ + public void setDebug(boolean debug) { + this.debug = debug; + } + + public String getLanguage() { + return language; + } + + /** + * Sets the language to use for this connection. + * + * @param language the language + */ + public void setLanguage(String language) { + this.language = language; + if ("sql".equals(language)) { + lang = LANG_SQL; + } else if ("mal".equals(language)) { + lang = LANG_MAL; + } else { + lang = LANG_UNKNOWN; + } + if (lang == LANG_SQL) { + queryTempl[0] = "s"; // pre + queryTempl[1] = "\n;"; // post + queryTempl[2] = "\n;\n"; // separator + + commandTempl[0] = "X"; // pre + commandTempl[1] = null; // post + commandTempl[2] = "\nX"; // separator + } else if (lang == LANG_MAL) { + queryTempl[0] = null; + queryTempl[1] = ";\n"; + queryTempl[2] = ";\n"; + + commandTempl[0] = null; // pre + commandTempl[1] = null; // post + commandTempl[2] = null; // separator + } + } + + public int getLang() { + return lang; + } + + public String getHash() { + return hash; + } + + /** + * Sets the hash method to use. Note that this method is intended + * for debugging purposes. Setting a hash method can yield in + * connection failures. Multiple hash methods can be given by + * separating the hashes by commas. + * DON'T USE THIS METHOD if you don't know what you're doing. + * + * @param hash the hash method to use + */ + public abstract void setHash(String hash); + + /** + * Gets the SO_TIMEOUT from the underlying Socket. + * + * @return the currently in use timeout in milliseconds + * @throws SocketException Issue with the socket + */ + public abstract int getSoTimeout() throws SocketException; + + /** + * Set the SO_TIMEOUT on the underlying Socket. When for some + * reason the connection to the database hangs, this setting can be + * useful to break out of this indefinite wait. + * This option must be enabled prior to entering the blocking + * operation to have effect. + * + * @param s The specified timeout, in milliseconds. A timeout + * of zero is interpreted as an infinite timeout. + * @throws SocketException Issue with the socket + */ + public abstract void setSoTimeout(int s) throws SocketException; + + /** + * Connects to the given host and port, logging in as the given + * user. If followRedirect is false, a RedirectionException is + * thrown when a redirect is encountered. + * + * @return A List with informational (warning) messages. If this + * list is empty; then there are no warnings. + * @throws IOException if an I/O error occurs when creating the + * socket + * @throws MCLParseException if bogus data is received + * @throws MCLException if an MCL related error occurs + */ + public abstract List<String> connect(String user, String pass) throws IOException, MCLParseException, MCLException; + + /** + * Returns an InputStream that reads from this open connection on + * the MapiSocket. + * + * @return an input stream that reads from this open connection + */ + public abstract InputStream getInputStream(); + + /** + * Returns an output stream for this MapiSocket. + * + * @return an output stream for writing bytes to this MapiSocket + */ + public abstract OutputStream getOutputStream(); + + /** + * Returns a Reader for this MapiSocket. The Reader is a + * BufferedMCLReader which does protocol interpretation of the + * BlockInputStream produced by this MapiSocket. + * + * @return a BufferedMCLReader connected to this MapiSocket + */ + public abstract AbstractBufferedReader getReader(); + + /** + * Returns a Writer for this MapiSocket. The Writer is a + * BufferedMCLWriter which produces protocol compatible data blocks + * that the BlockOutputStream can properly translate into blocks. + * + * @return a BufferedMCLWriter connected to this MapiSocket + */ + public abstract AbstractBufferedWriter getWriter(); + + /** + * Enables logging to a file what is read and written from and to + * the server. Logging can be enabled at any time. However, it is + * encouraged to start debugging before actually connecting the + * socket. + * + * @param filename the name of the file to write to + * @throws IOException if the file could not be opened for writing + */ + public void debug(String filename) throws IOException { + debug(new FileWriter(filename)); + } + + /** + * Enables logging to a stream what is read and written from and to + * the server. Logging can be enabled at any time. However, it is + * encouraged to start debugging before actually connecting the + * socket. + * + * @param out to write the log to + * @throws IOException if the file could not be opened for writing + */ + public void debug(PrintStream out) throws IOException { + debug(new PrintWriter(out)); + } + + /** + * Enables logging to a stream what is read and written from and to + * the server. Logging can be enabled at any time. However, it is + * encouraged to start debugging before actually connecting the + * socket. + * + * @param out to write the log to + * @throws IOException if the file could not be opened for writing + */ + public void debug(Writer out) throws IOException { + log = out; + debug = true; + } + + public String getQueryTemplateHeader(int index) { + return queryTempl[index] == null ? "" : queryTempl[index]; + } + + public String getCommandTemplateHeader(int index) { + return commandTempl[index] == null ? "" : commandTempl[index]; + } + + public String[] getCommandHeaderTemplates() { + return commandTempl; + } + + public String[] getQueryHeaderTemplates() { + return queryTempl; + } + + public synchronized void close() { + try { + if (this.getReader() != null) this.getReader().close(); + if (this.getWriter() != null) this.getWriter().close(); + if (this.getInputStream() != null) this.getInputStream().close(); + if (this.getOutputStream() != null) this.getOutputStream().close(); + if (debug && log instanceof FileWriter) log.close(); + } catch (IOException e) { + // ignore it + } + } + + /** + * Destructor called by garbage collector before destroying this + * object tries to disconnect the MonetDB connection if it has not + * been disconnected already. + */ + @Override + protected void finalize() throws Throwable { + this.close(); + super.finalize(); + } + + public abstract String getJDBCURL(); + + public abstract int getBlockSize(); +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/embedded/EmbeddedConnection.java @@ -0,0 +1,146 @@ +package nl.cwi.monetdb.mcl.embedded; + +import nl.cwi.monetdb.embedded.env.IEmbeddedConnection; +import nl.cwi.monetdb.embedded.env.MonetDBEmbeddedDatabase; +import nl.cwi.monetdb.embedded.env.MonetDBEmbeddedException; +import nl.cwi.monetdb.mcl.MCLException; +import nl.cwi.monetdb.mcl.connection.AbstractBufferedReader; +import nl.cwi.monetdb.mcl.connection.AbstractBufferedWriter; +import nl.cwi.monetdb.mcl.connection.AbstractMonetDBConnection; +import nl.cwi.monetdb.mcl.parser.MCLParseException; + +import java.io.*; +import java.net.SocketException; +import java.util.List; + +/** + * Created by ferreira on 11/23/16. + */ +public class EmbeddedConnection extends AbstractMonetDBConnection implements IEmbeddedConnection { + + private long connectionPointer; + + protected static final int BUFFER_SIZE = 102400; //100 kb to start + + private final String directory; + + private EmbeddedReader reader; + + private EmbeddedWriter writer; + + public EmbeddedConnection(String hostname, int port, String database, String username, boolean debug, String language, String hash, String directory) { + super(hostname, port, database, username, debug, language, hash, new String[]{"s", "\n;", "\n;\n"}, new String[]{"X", null, "\nX"}); + this.directory = directory; + } + + public String getDirectory() { + return directory; + } + + @Override + public void setHostname(String hostname) { + throw new IllegalArgumentException("Cannot set a hostname on a embedded connection!"); + } + + @Override + public void setPort(int port) { + throw new IllegalArgumentException("Cannot set a port on a embedded connection!"); + } + + @Override + public void setDatabase(String db) { + throw new IllegalArgumentException("Not yet planned!"); + } + + @Override + public void setLanguage(String language) { + if(this.lang != LANG_SQL) { + throw new IllegalArgumentException("The embedded connection only supports the SQL language!"); + } + super.setLanguage(language); + } + + @Override + public void setHash(String hash) { + throw new IllegalArgumentException("The embedded connection does not support user authentication yet!"); + } + + @Override + public int getSoTimeout() throws SocketException { + throw new IllegalArgumentException("Cannot get socket timeout on an embedded connection!"); + } + + @Override + public void setSoTimeout(int s) throws SocketException { + throw new IllegalArgumentException("Cannot set socket timeout on an embedded connection!"); + } + + @Override + public List<String> connect(String user, String pass) throws IOException, MCLParseException, MCLException { + try { + if(MonetDBEmbeddedDatabase.IsDatabaseRunning() && !MonetDBEmbeddedDatabase.GetDatabaseDirectory().equals(this.directory)) { + throw new MCLException("The embedded database is already running on a different directory!"); + } else { + MonetDBEmbeddedDatabase.StartDatabase(this.directory, true, false); + } + this.reader = new EmbeddedReader(); + this.writer = new EmbeddedWriter(); + MonetDBEmbeddedDatabase.AddJDBCEmbeddedConnection(this); + } catch (MonetDBEmbeddedException ex) { + throw new MCLException(ex); + } + return null; + } + + @Override + public InputStream getInputStream() { + throw new IllegalArgumentException("Not available!"); + } + + @Override + public OutputStream getOutputStream() { + throw new IllegalArgumentException("Not available!"); + } + + @Override + public AbstractBufferedReader getReader() { + return this.reader; + } + + @Override + public AbstractBufferedWriter getWriter() { + return this.writer; + } + + @Override + public synchronized void close() { + super.close(); + try { + MonetDBEmbeddedDatabase.StopDatabase(); + } catch (MonetDBEmbeddedException e) { + // ignore it + } + } + + @Override + public String getJDBCURL() { + return "jdbc:monetdb://" + this.getHostname() + "@" + this.getDirectory() + "/" + this.getDatabase(); + } + + @Override + public int getBlockSize() { + return BUFFER_SIZE; + } + + @Override + public long getConnectionPointer() { + return connectionPointer; + } + + @Override + public void closeConnectionImplementation() { + this.closeConnectionInternal(this.connectionPointer); + } + + private native void closeConnectionInternal(long connectionPointer); +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/embedded/EmbeddedReader.java @@ -0,0 +1,43 @@ +package nl.cwi.monetdb.mcl.embedded; + +import nl.cwi.monetdb.mcl.connection.AbstractBufferedReader; + +import java.io.*; + +/** + * Created by ferreira on 11/24/16. + */ +public class EmbeddedReader extends AbstractBufferedReader { + + protected EmbeddedReader() { + super(null); + } + + @Override + public String readLine() throws IOException { + String res = this.readLineInternal(); + setLineType(res); + if (lineType == ERROR && !res.matches("^![0-9A-Z]{5}!.+")) + res = "!22000!" + res.substring(1); + return res; + } + + @Override + public synchronized String waitForPrompt() throws IOException { + try { + this.wait(); + } catch (InterruptedException e) { + throw new IOException(e); + } + String res = this.readLine(); + if (res == null) { + throw new IOException("Connection to server lost!"); + } + if (lineType == ERROR) { + return "\n" + res.substring(1); + } + return res.trim(); + } + + private native String readLineInternal(); +}
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/embedded/EmbeddedWriter.java @@ -0,0 +1,23 @@ +package nl.cwi.monetdb.mcl.embedded; + +import nl.cwi.monetdb.mcl.connection.AbstractBufferedWriter; + +import java.io.*; + +/** + * Created by ferreira on 11/24/16. + */ +public class EmbeddedWriter extends AbstractBufferedWriter { + + public EmbeddedWriter() { + super(null); + } + + @Override + public void writeLine(String line) throws IOException { + this.writeInternal(line); + this.reader.notify(); + } + + private native void writeInternal(String str); +}
rename from src/main/java/nl/cwi/monetdb/mcl/io/BufferedMCLReader.java rename to src/main/java/nl/cwi/monetdb/mcl/net/BufferedMCLReader.java --- a/src/main/java/nl/cwi/monetdb/mcl/io/BufferedMCLReader.java +++ b/src/main/java/nl/cwi/monetdb/mcl/net/BufferedMCLReader.java @@ -6,9 +6,10 @@ * Copyright 1997 - July 2008 CWI, August 2008 - 2016 MonetDB B.V. */ -package nl.cwi.monetdb.mcl.io; +package nl.cwi.monetdb.mcl.net; -import java.io.BufferedReader; +import nl.cwi.monetdb.mcl.connection.AbstractBufferedReader; + import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -21,7 +22,7 @@ import java.io.UnsupportedEncodingExcept * lines. This class is based on the BufferedReader class, and provides * extra functionality useful for MCL. * - * The BufferedMCLReader is typically used as layer inbetween an + * The BufferedMCLReader is typically used as layer in between an * InputStream and a specific interpreter of the data. * <pre> * / Response @@ -29,7 +30,7 @@ import java.io.UnsupportedEncodingExcept * \ DataBlock * </pre> * Because the BufferedMCLReader provides an efficient way to access the - * data from the stream in a linewise fashion, whereby each line is + * data from the stream in a line-wise fashion, whereby each line is * identified as a certain type, consumers can easily decide how to * parse each retrieved line. The line parsers from * nl.cwi.monetdb.mcl.parser are well suited to work with the lines @@ -39,31 +40,9 @@ import java.io.UnsupportedEncodingExcept * * @author Fabian Groffen <Fabian.Groffen> * @see nl.cwi.monetdb.mcl.net.MapiSocket - * @see nl.cwi.monetdb.mcl.io.BufferedMCLWriter + * @see BufferedMCLWriter */ -public class BufferedMCLReader extends BufferedReader { - /** The type of the last line read */ - private int lineType; - - /** "there is currently no line", or the the type is unknown is - represented by UNKNOWN */ - public final static int UNKNOWN = 0; - /** a line starting with ! indicates ERROR */ - public final static int ERROR = '!'; - /** a line starting with % indicates HEADER */ - public final static int HEADER = '%'; - /** a line starting with [ indicates RESULT */ - public final static int RESULT = '['; - /** a line which matches the pattern of prompt1 is a PROMPT */ - public final static int PROMPT = '.'; - /** a line which matches the pattern of prompt2 is a MORE */ - public final static int MORE = ','; - /** a line starting with & indicates the start of a header block */ - public final static int SOHEADER = '&'; - /** a line starting with ^ indicates REDIRECT */ - public final static int REDIRECT = '^'; - /** a line starting with # indicates INFO */ - public final static int INFO = '#'; +public class BufferedMCLReader extends AbstractBufferedReader { /** * Create a buffering character-input stream that uses a @@ -94,7 +73,7 @@ public class BufferedMCLReader extends B * carriage return followed immediately by a linefeed. Before this * method returns, it sets the linetype to any of the in MCL * recognised line types. - * + * * Warning: until the server properly prefixes all of its error * messages with SQLSTATE codes, this method prefixes all errors it * sees without sqlstate with the generic data exception code @@ -113,83 +92,30 @@ public class BufferedMCLReader extends B r = "!22000!" + r.substring(1); return r; } - - /** - * Sets the linetype to the type of the string given. If the string - * is null, lineType is set to UNKNOWN. - * - * @param line the string to examine - */ - void setLineType(String line) { - lineType = UNKNOWN; - if (line == null || line.length() == 0) - return; - switch (line.charAt(0)) { - case '!': - lineType = ERROR; - break; - case '&': - lineType = SOHEADER; - break; - case '%': - lineType = HEADER; - break; - case '[': - lineType = RESULT; - break; - case '=': - lineType = RESULT; - break; - case '^': - lineType = REDIRECT; - break; - case '#': - lineType = INFO; - break; - case '.': - lineType = PROMPT; - break; - case ',': - lineType = MORE; - break; - } - } - - /** - * getLineType returns the type of the last line read. - * - * @return an integer representing the kind of line this is, one of the - * following constants: UNKNOWN, HEADER, ERROR, PROMPT, - * RESULT, REDIRECT, INFO - */ - public int getLineType() { - return lineType; - } /** * Reads up till the MonetDB prompt, indicating the server is ready * for a new command. All read data is discarded. If the last line * read by readLine() was a prompt, this method will immediately * return. - * + * * If there are errors present in the lines that are read, then they * are put in one string and returned <b>after</b> the prompt has * been found. If no errors are present, null will be returned. * * @return a string containing error messages, or null if there aren't any * @throws IOException if an IO exception occurs while talking to the server - * - * TODO(Wouter): should probably not have to be synchronized. + StringBuilder... */ - final public synchronized String waitForPrompt() throws IOException { - String ret = "", tmp; + @Override + public synchronized String waitForPrompt() throws IOException { + StringBuilder res = new StringBuilder(); + String tmp; while (lineType != PROMPT) { if ((tmp = readLine()) == null) throw new IOException("Connection to server lost!"); if (lineType == ERROR) - ret += "\n" + tmp.substring(1); + res.append("\n").append(tmp.substring(1)); } - return ret == "" ? null : ret.trim(); + return res.toString().trim(); } - }
rename from src/main/java/nl/cwi/monetdb/mcl/io/BufferedMCLWriter.java rename to src/main/java/nl/cwi/monetdb/mcl/net/BufferedMCLWriter.java --- a/src/main/java/nl/cwi/monetdb/mcl/io/BufferedMCLWriter.java +++ b/src/main/java/nl/cwi/monetdb/mcl/net/BufferedMCLWriter.java @@ -6,7 +6,9 @@ * Copyright 1997 - July 2008 CWI, August 2008 - 2016 MonetDB B.V. */ -package nl.cwi.monetdb.mcl.io; +package nl.cwi.monetdb.mcl.net; + +import nl.cwi.monetdb.mcl.connection.AbstractBufferedWriter; import java.io.*; @@ -29,10 +31,9 @@ import java.io.*; * * @author Fabian Groffen <Fabian.Groffen> * @see nl.cwi.monetdb.mcl.net.MapiSocket - * @see nl.cwi.monetdb.mcl.io.BufferedMCLWriter + * @see BufferedMCLWriter */ -public class BufferedMCLWriter extends BufferedWriter { - private BufferedMCLReader reader; +public class BufferedMCLWriter extends AbstractBufferedWriter { /** * Create a buffered character-output stream that uses a @@ -58,17 +59,6 @@ public class BufferedMCLWriter extends B } /** - * Registers the given reader in this writer. A registered reader - * receives a linetype reset when a line is written from this - * writer. - * - * @param r an BufferedMCLReader - */ - public void registerReader(BufferedMCLReader r) { - reader = r; - } - - /** * Write a line separator. The line separator string is in this * class always the single newline character '\n'. * @@ -76,20 +66,12 @@ public class BufferedMCLWriter extends B */ @Override public void newLine() throws IOException { - write('\n'); + this.write('\n'); } - /** - * Write a single line, terminated with a line separator, and flush - * the stream. This is a shorthand method for a call to write() - * and flush(). - * - * @param line The line to write - * @throws IOException If an I/O error occurs - */ public void writeLine(String line) throws IOException { - write(line); - flush(); + this.write(line); + this.flush(); // reset reader state, last line isn't valid any more now if (reader != null) reader.setLineType(null); }
--- a/src/main/java/nl/cwi/monetdb/mcl/net/MapiSocket.java +++ b/src/main/java/nl/cwi/monetdb/mcl/net/MapiSocket.java @@ -10,16 +10,12 @@ package nl.cwi.monetdb.mcl.net; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; -import java.io.FileWriter; import java.io.FilterInputStream; import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.PrintStream; -import java.io.PrintWriter; import java.io.UnsupportedEncodingException; -import java.io.Writer; import java.net.Socket; import java.net.SocketException; import java.net.URI; @@ -33,8 +29,9 @@ import java.util.List; import java.util.Set; import nl.cwi.monetdb.mcl.MCLException; -import nl.cwi.monetdb.mcl.io.BufferedMCLReader; -import nl.cwi.monetdb.mcl.io.BufferedMCLWriter; +import nl.cwi.monetdb.mcl.connection.AbstractBufferedReader; +import nl.cwi.monetdb.mcl.connection.AbstractBufferedWriter; +import nl.cwi.monetdb.mcl.connection.AbstractMonetDBConnection; import nl.cwi.monetdb.mcl.parser.MCLParseException; /** @@ -84,12 +81,35 @@ import nl.cwi.monetdb.mcl.parser.MCLPars * * @author Fabian Groffen * @version 4.1 - * @see nl.cwi.monetdb.mcl.io.BufferedMCLReader - * @see nl.cwi.monetdb.mcl.io.BufferedMCLWriter + * @see BufferedMCLReader + * @see BufferedMCLWriter */ -public final class MapiSocket { +public final class MapiSocket extends AbstractMonetDBConnection { + + /** The blocksize (hardcoded in compliance with stream.mx) */ + private static final int BLOCK = 8 * 1024 - 2; + + private static char hexChar(int n) { return (n > 9) ? (char) ('a' + (n - 10)) : (char) ('0' + n); } + + /** + * Small helper method to convert a byte string to a hexadecimal + * string representation. + * + * @param digest the byte array to convert + * @return the byte array as hexadecimal string + */ + private static String toHex(byte[] digest) { + char[] result = new char[digest.length * 2]; + int pos = 0; + for (byte aDigest : digest) { + result[pos++] = hexChar((aDigest & 0xf0) >> 4); + result[pos++] = hexChar(aDigest & 0x0f); + } + return new String(result); + } + /** The TCP Socket to mserver */ - private Socket con; + private Socket con = null; /** Stream from the Socket for reading */ private InputStream fromMonet; /** Stream from the Socket for writing */ @@ -101,65 +121,47 @@ public final class MapiSocket { /** protocol version of the connection */ private int version; - /** The database to connect to */ - private String database = null; - /** The language to connect with */ - private String language = "sql"; - /** The hash methods to use (null = default) */ - private String hash = null; /** Whether we should follow redirects */ private boolean followRedirects = true; /** How many redirections do we follow until we're fed up with it? */ private int ttl = 10; - /** Whether we are debugging or not */ - private boolean debug = false; - /** The Writer for the debug log-file */ - private Writer log; - - /** The blocksize (hardcoded in compliance with stream.mx) */ - public final static int BLOCK = 8 * 1024 - 2; /** A short in two bytes for holding the block size in bytes */ private byte[] blklen = new byte[2]; - /** - * Constructs a new MapiSocket. - */ - public MapiSocket() { - con = null; + public MapiSocket(String hostname, int port, String database, String username, boolean debug, String language, String hash) { + super(hostname, port, database, username, debug, language, hash, new String[]{"s", "\n;", "\n;\n"}, new String[]{"s", "\n;", "\n;\n"}); } - /** - * Sets the database to connect to. If database is null, a - * connection is made to the default database of the server. This - * is also the default. - * - * @param db the database - */ + @Override + public void setHostname(String hostname) { + this.hostname = hostname; + } + + @Override + public void setPort(int port) { + this.port = port; + } + + @Override public void setDatabase(String db) { this.database = db; } - /** - * Sets the language to use for this connection. - * - * @param lang the language - */ - public void setLanguage(String lang) { - this.language = lang; + @Override + public void setHash(String hash) { + this.hash = hash; } - /** - * Sets the hash method to use. Note that this method is intended - * for debugging purposes. Setting a hash method can yield in - * connection failures. Multiple hash methods can be given by - * separating the hashes by commas. - * DON'T USE THIS METHOD if you don't know what you're doing. - * - * @param hash the hash method to use - */ - public void setHash(String hash) { - this.hash = hash; + @Override + public void setSoTimeout(int s) throws SocketException { + // limit time to wait on blocking operations (0 = indefinite) + con.setSoTimeout(s); + } + + @Override + public int getSoTimeout() throws SocketException { + return con.getSoTimeout(); } /** @@ -189,62 +191,16 @@ public final class MapiSocket { this.ttl = t; } - /** - * Set the SO_TIMEOUT on the underlying Socket. When for some - * reason the connection to the database hangs, this setting can be - * useful to break out of this indefinite wait. - * This option must be enabled prior to entering the blocking - * operation to have effect. - * - * @param s The specified timeout, in milliseconds. A timeout - * of zero is interpreted as an infinite timeout. - * @throws SocketException Issue with the socket - */ - public void setSoTimeout(int s) throws SocketException { - // limit time to wait on blocking operations (0 = indefinite) - con.setSoTimeout(s); - } - - /** - * Gets the SO_TIMEOUT from the underlying Socket. - * - * @return the currently in use timeout in milliseconds - * @throws SocketException Issue with the socket - */ - public int getSoTimeout() throws SocketException { - return con.getSoTimeout(); - } - - /** - * Enables/disables debug - * - * @param debug Value to set - */ - public void setDebug(boolean debug) { - this.debug = debug; - } - - /** - * Connects to the given host and port, logging in as the given - * user. If followRedirect is false, a RedirectionException is - * thrown when a redirect is encountered. - * - * @param host the hostname, or null for the loopback address - * @param port the port number - * @param user the username - * @param pass the password - * @return A List with informational (warning) messages. If this - * list is empty; then there are no warnings. - * @throws IOException if an I/O error occurs when creating the - * socket - * @throws MCLParseException if bogus data is received - * @throws MCLException if an MCL related error occurs - */ - public List<String> connect(String host, int port, String user, String pass) - throws IOException, MCLParseException, MCLException { + @Override + public List<String> connect(String user, String pass) + throws IOException, MCLParseException, MCLException { // Wrap around the internal connect that needs to know if it // should really make a TCP connection or not. - return connect(host, port, user, pass, true); + List<String> res = connect(this.hostname, this.port, user, pass, true); + // apply NetworkTimeout value from legacy (pre 4.1) driver + // so_timeout calls + this.setSoTimeout(this.getSoTimeout()); + return res; } private List<String> connect(String host, int port, String user, String pass, @@ -559,68 +515,24 @@ public final class MapiSocket { return response; } } - - private static char hexChar(int n) { - return (n > 9) - ? (char) ('a' + (n - 10)) - : (char) ('0' + n); - } - /** - * Small helper method to convert a byte string to a hexadecimal - * string representation. - * - * @param digest the byte array to convert - * @return the byte array as hexadecimal string - */ - private static String toHex(byte[] digest) { - char[] result = new char[digest.length * 2]; - int pos = 0; - for (byte aDigest : digest) { - result[pos++] = hexChar((aDigest & 0xf0) >> 4); - result[pos++] = hexChar(aDigest & 0x0f); - } - return new String(result); - } - - /** - * Returns an InputStream that reads from this open connection on - * the MapiSocket. - * - * @return an input stream that reads from this open connection - */ + @Override public InputStream getInputStream() { return fromMonet; } - /** - * Returns an output stream for this MapiSocket. - * - * @return an output stream for writing bytes to this MapiSocket - */ + @Override public OutputStream getOutputStream() { return toMonet; } - /** - * Returns a Reader for this MapiSocket. The Reader is a - * BufferedMCLReader which does protocol interpretation of the - * BlockInputStream produced by this MapiSocket. - * - * @return a BufferedMCLReader connected to this MapiSocket - */ - public BufferedMCLReader getReader() { + @Override + public AbstractBufferedReader getReader() { return reader; } - /** - * Returns a Writer for this MapiSocket. The Writer is a - * BufferedMCLWriter which produces protocol compatible data blocks - * that the BlockOutputStream can properly translate into blocks. - * - * @return a BufferedMCLWriter connected to this MapiSocket - */ - public BufferedMCLWriter getWriter() { + @Override + public AbstractBufferedWriter getWriter() { return writer; } @@ -636,46 +548,6 @@ public final class MapiSocket { } /** - * Enables logging to a file what is read and written from and to - * the server. Logging can be enabled at any time. However, it is - * encouraged to start debugging before actually connecting the - * socket. - * - * @param filename the name of the file to write to - * @throws IOException if the file could not be opened for writing - */ - public void debug(String filename) throws IOException { - debug(new FileWriter(filename)); - } - - /** - * Enables logging to a stream what is read and written from and to - * the server. Logging can be enabled at any time. However, it is - * encouraged to start debugging before actually connecting the - * socket. - * - * @param out to write the log to - * @throws IOException if the file could not be opened for writing - */ - public void debug(PrintStream out) throws IOException { - debug(new PrintWriter(out)); - } - - /** - * Enables logging to a stream what is read and written from and to - * the server. Logging can be enabled at any time. However, it is - * encouraged to start debugging before actually connecting the - * socket. - * - * @param out to write the log to - * @throws IOException if the file could not be opened for writing - */ - public void debug(Writer out) throws IOException { - log = out; - debug = true; - } - - /** * Inner class that is used to write data on a normal stream as a * blocked stream. A call to the flush() method will write a * "final" block to the underlying stream. Non-final blocks are @@ -799,7 +671,6 @@ public final class MapiSocket { } } - /** * Inner class that is used to make the data on the blocked stream * available as a normal stream. @@ -1025,31 +896,15 @@ public final class MapiSocket { * possible. If an error occurs during disconnecting it is ignored. */ public synchronized void close() { + super.close(); try { - if (reader != null) reader.close(); - if (writer != null) writer.close(); - if (fromMonet != null) fromMonet.close(); - if (toMonet != null) toMonet.close(); if (con != null) con.close(); - if (debug && log instanceof FileWriter) log.close(); } catch (IOException e) { // ignore it } } /** - * Destructor called by garbage collector before destroying this - * object tries to disconnect the MonetDB connection if it has not - * been disconnected already. - */ - @Override - protected void finalize() throws Throwable { - close(); - super.finalize(); - } - - - /** * Writes a logline tagged with a timestamp using the given string. * Used for debugging purposes only and represents a message that is * connected to writing to the socket. A logline might look like: @@ -1107,4 +962,17 @@ public final class MapiSocket { ": " + message + "\n"); log.flush(); } + + @Override + public String getJDBCURL() { + String language = ""; + if (this.getLang() == AbstractMonetDBConnection.LANG_MAL) + language = "?language=mal"; + return "jdbc:monetdb://" + this.getHostname() + ":" + this.getPort() + "/" + this.getDatabase() + language; + } + + @Override + public int getBlockSize() { + return BLOCK; + } }
--- a/src/main/java/nl/cwi/monetdb/mcl/parser/MCLParser.java +++ b/src/main/java/nl/cwi/monetdb/mcl/parser/MCLParser.java @@ -31,7 +31,7 @@ public abstract class MCLParser { private int colnr; /** - * Creates an MCLParser targetted at a given number of field values. + * Creates an MCLParser targeted at a given number of field values. * The lines parsed by an instance of this MCLParser should have * exactly capacity field values. *
--- a/src/main/java/nl/cwi/monetdb/merovingian/Control.java +++ b/src/main/java/nl/cwi/monetdb/merovingian/Control.java @@ -8,10 +8,12 @@ package nl.cwi.monetdb.merovingian; +import nl.cwi.monetdb.mcl.connection.AbstractBufferedReader; +import nl.cwi.monetdb.mcl.connection.AbstractBufferedWriter; import nl.cwi.monetdb.mcl.net.MapiSocket; -import nl.cwi.monetdb.mcl.io.*; import nl.cwi.monetdb.mcl.MCLException; import nl.cwi.monetdb.mcl.parser.MCLParseException; + import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -113,15 +115,15 @@ public class Control { String database, String command, boolean hasOutput) throws MerovingianException, IOException { - BufferedMCLReader min; - BufferedMCLWriter mout; - MapiSocket ms = new MapiSocket(); + AbstractBufferedReader min; + AbstractBufferedWriter mout; + MapiSocket ms = new MapiSocket(host, port, "monetdb", "monetdb", false, "sql", "SHA256"); ms.setDatabase("merovingian"); ms.setLanguage("control"); if (debug != null) ms.debug(debug); try { - ms.connect(host, port, "monetdb", passphrase); + ms.connect("monetdb", passphrase); min = ms.getReader(); mout = ms.getWriter(); } catch (MCLParseException | MCLException e) { @@ -203,16 +205,16 @@ public class Control { ArrayList<String> l = new ArrayList<>(); String tmpLine = min.readLine(); int linetype = min.getLineType(); - if (linetype == BufferedMCLReader.ERROR) + if (linetype == AbstractBufferedReader.ERROR) throw new MerovingianException(tmpLine.substring(6)); - if (linetype != BufferedMCLReader.RESULT) + if (linetype != AbstractBufferedReader.RESULT) throw new MerovingianException("unexpected line: " + tmpLine); if (!tmpLine.substring(1).equals(RESPONSE_OK)) throw new MerovingianException(tmpLine.substring(1)); tmpLine = min.readLine(); linetype = min.getLineType(); - while (linetype != BufferedMCLReader.PROMPT) { - if (linetype != BufferedMCLReader.RESULT) + while (linetype != AbstractBufferedReader.PROMPT) { + if (linetype != AbstractBufferedReader.RESULT) throw new MerovingianException("unexpected line: " + tmpLine);
--- a/src/main/java/nl/cwi/monetdb/util/SQLRestore.java +++ b/src/main/java/nl/cwi/monetdb/util/SQLRestore.java @@ -15,8 +15,8 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import nl.cwi.monetdb.mcl.MCLException; -import nl.cwi.monetdb.mcl.io.BufferedMCLReader; -import nl.cwi.monetdb.mcl.io.BufferedMCLWriter; +import nl.cwi.monetdb.mcl.connection.AbstractBufferedReader; +import nl.cwi.monetdb.mcl.connection.AbstractBufferedWriter; import nl.cwi.monetdb.mcl.net.MapiSocket; import nl.cwi.monetdb.mcl.parser.MCLParseException; @@ -42,11 +42,11 @@ public class SQLRestore { } private static class ServerResponseReader implements Runnable { - private final BufferedMCLReader _is; + private final AbstractBufferedReader _is; private final AtomicBoolean _errorState = new AtomicBoolean(false); private String _errorMessage = null; - ServerResponseReader(BufferedMCLReader is) { + ServerResponseReader(AbstractBufferedReader is) { _is = is; } @@ -58,7 +58,7 @@ public class SQLRestore { break; int result = _is.getLineType(); switch (result) { - case BufferedMCLReader.ERROR: + case AbstractBufferedReader.ERROR: _errorMessage = line; _errorState.set(true); return; @@ -104,14 +104,12 @@ public class SQLRestore { * @throws IOException */ public void restore(File source) throws IOException { - MapiSocket ms = new MapiSocket(); + MapiSocket ms = new MapiSocket(_host, _port, _dbName, _user, false, "sql", "SHA256"); try { - ms.setLanguage("sql"); - ms.setDatabase(_dbName); - ms.connect(_host, _port, _user, _password); + ms.connect(_user, _password); - BufferedMCLWriter os = ms.getWriter(); - BufferedMCLReader reader = ms.getReader(); + AbstractBufferedWriter os = ms.getWriter(); + AbstractBufferedReader reader = ms.getReader(); ServerResponseReader srr = new ServerResponseReader(reader);